forgi 2.0.0 documentation

Contents

Source code for forgi.threedee.model.coarse_grain

from __future__ import absolute_import, unicode_literals
from __future__ import print_function
from __future__ import division
from builtins import (ascii, bytes, chr, dict, filter, hex, input,
                      map, next, oct, pow, range, round,
                      str, super, zip)

import collections as c
import contextlib
import os
import os.path as op
import shutil
import subprocess as sp
import sys
import time
import math
import warnings
import itertools as it
import json
try:
    from json import JSONDecodeError #py3k
except ImportError: #py2k
    JSONDecodeError = ValueError
try:
    import io
except ImportError:  # Python 2
    import StringIO as io
try:
    import shutil
    which = shutil.which
except (ImportError, AttributeError):
    import distutils.spawn
    which = distutils.spawn.find_executable

from io import open
import logging
from pprint import pprint


import numpy as np

import scipy.spatial
import scipy.stats

import Bio.PDB as bpdb

from logging_exceptions import log_to_exception, log_exception

from ...graph import bulge_graph as fgb
import forgi.graph.residue as fgr
from ...graph.sequence import Sequence, _insert_breakpoints_simple
from ...graph._graph_construction import _BulgeGraphConstruction
from ..utilities import graph_pdb as ftug
from . import stats as ftms
from . import transform_cg as ftmt
from ..._k2n_standalone import knotted2nested as cak
from ..utilities import mcannotate as ftum
from ..utilities import pdb as ftup
from . import descriptors as ftmd
from ..utilities import _dssr as ftud
from ..utilities import vector as ftuv
from ...utilities import debug as fud
from ...utilities import stuff as fus
import forgi.threedee.utilities.virtual_residues as ftuvres
from ...utilities.observedDict import observedDict
from ...utilities.exceptions import CgConstructionError, CgIntegrityError, GraphConstructionError
from .linecloud import CoordinateStorage, LineSegmentStorage
from ...utilities.stuff import is_string_type
from ... import config

log = logging.getLogger(__name__)

[docs]class AnnotationToolNotInstalled(ValueError): pass
try: profile # The @profile decorator from line_profiler (kernprof) except:
[docs] def profile(x): return x
[docs]def add_longrange_interactions(cg, lines): ''' Iterate over the lines in an MC-Annotate file and add information about interactions between non-adjacent elements. :param cg: A CoarseGrainRNA structure :param lines: All the lines in an MC-Annotate file ''' for line in ftum.iterate_over_interactions(lines): (from_chain, from_base, to_chain, to_base) = ftum.get_interacting_base_pairs(line) try: seq_id1 = cg.seq_ids.index( fgr.RESID(from_chain, ftum.parse_resid(from_base))) + 1 except ValueError as e: with log_to_exception(log, e): log.error("seq_ids are {}".format(cg.seq_ids)) raise seq_id2 = cg.seq_ids.index( fgr.RESID(to_chain, ftum.parse_resid(to_base))) + 1 node1 = cg.get_node_from_residue_num(seq_id1) node2 = cg.get_node_from_residue_num(seq_id2) if abs(seq_id2 - seq_id1) > 1 and node1 != node2 and not cg.has_connection(node1, node2): cg.longrange[node1].add(node2) cg.longrange[node2].add(node1)
[docs]def breakpoints_from_seqids(seqids): """ Create the list of cofold cutpoints from the seq_ids. Return 1-based breakpoints """ breakpoints = [] old_chain = None for i, seqid in enumerate(seqids): if old_chain is not None and seqid.chain != old_chain: assert i >= 1 breakpoints.append(i) # Breakpoint BEFORE current seqid! old_chain = seqid.chain return breakpoints
def _are_adjacent_basepairs(seq_ids, edge1, edge2): """ Helper function used by connected_cgs_from_pdb. :param cg: A (potentially) inconsistent cg. Only seq_ids are needed. :param edge1, edge2: A dictionary with the key "basepair" mapping to a two-tuple of RESIDS """ fromA, toA = edge1["basepair"] fromB, toB = edge2["basepair"] if fromA.chain != fromB.chain: if fromA.chain == toB.chain: fromB, toB = toB, fromB assert toA.chain == toB.chain else: assert False if (abs(seq_ids.index(fromA) - seq_ids.index(fromB)) == 1 and abs(seq_ids.index(toA) - seq_ids.index(toB)) == 1): log.debug( "Basepairs %s and %s are adjacent. No length 1 stem", edge1, edge2) return True log.debug("Basepairs %s and %s are NOT adjacent.", edge1, edge2) return False def _annotate_pdb(filename, annotation_tool, filetype, subprocess_kwargs={}): """ Get the secondary structure of the pdb by using an external tool. :param filename: The name of the temporary pdb file :param subprocess_kwargs: Will be passed as keyword arguments to subprocess.call/ subprcess.check_output :param annotation_tool: See docstring of CoarseGrainRNA.from_pdb """ not_installed_msg = ("{} was requested as annotation tool, but {} " "is not installed or not in the PATH. (Hint: run " "forgi_config.py to set the preferred annotation tool)") program = annotation_tool if program is None: c = config.read_config() if "PDB_ANNOTATION_TOOL" in c: program = c["PDB_ANNOTATION_TOOL"] log.info("Requested annotation program is %s", program) if program == "DSSR" or program is None: if which("x3dna-dssr"): return _run_dssr(filename, subprocess_kwargs) else: log.info("x3dna-dssr is not installed or not in the PATH.") if program is not None: raise AnnotationToolNotInstalled( not_installed_msg.format(program, "x3dna-dssr")) if program == "MC-Annotate" or (program is None and filetype == "pdb"): if filetype != "pdb": raise ValueError("MC-Annotate does not support MMCIF") if which("MC-Annotate"): lines = _run_mc_annotate(filename, subprocess_kwargs) try: bpseq, seq_ids = ftum.get_dotplot(lines) return bpseq, seq_ids, {} except Exception as e: log.exception( "Could not convert MC-Annotate output to dotplot") raise CgConstructionError( "Could not convert MC-Annotate output to dotplot") # from e else: log.info("MC-Annotate is not installed or not in the PATH.") if program is not None: raise AnnotationToolNotInstalled(not_installed_msg.format( program, "MC-Annotate")) elif program == "forgi" or program is None: return None else: raise ValueError("Supported programs for annotating the pdb are: " "'MC-Annotate', 'DSSR' and 'forgi', not '{}'".format(program)) def _run_dssr(filename, subprocess_kwargs={}): # See http://forum.x3dna.org/rna-structures/redirect-auxiliary-file-output/ with fus.make_temp_directory() as dssr_output_dir: dssr_out = os.path.join(dssr_output_dir, "out") arguments = ['x3dna-dssr', "-i=" + filename, "--prefix=" + os.path.join(dssr_output_dir, "d"), "-o=" + dssr_out, "--json"] # https://stackoverflow.com/a/14837250/5069869 log.info("Running DSSR: %s", sp.list2cmdline(arguments)) with open(os.path.join(dssr_output_dir, "stderror"), "w+") as errfile: try: ret_code = sp.call( arguments, universal_newlines=True, stderr=errfile, **subprocess_kwargs) try: with open(os.path.join(dssr_output_dir, "d-2ndstrs.bpseq"), encoding='ascii') as f: bpseq = f.read() with open(dssr_out) as f: try: dssr_dict = json.load(f) except JSONDecodeError as e: # DSSR does not escape backslashes (see "http://forum.x3dna.org/bug-reports/json-output-should-escape-backslashes/"), which can be in windows paths. Escape them manually. f.seek(0) json_stri = f.read().replace('\\','\\\\') log.error(json_stri) dssr_dict = json.loads(json_stri) nts = dssr_dict["nts"] seq_ids = list(map(ftud.dssr_to_pdb_resid, [ nt["nt_id"] for nt in nts])) except (OSError, IOError) as e: with log_to_exception(log, e): log.error("Content of the directory %s is %s", dssr_output_dir, os.listdir(dssr_output_dir)) raise except (OSError, IOError) as e: assert op.isfile( filename), "File {} (created by forgi) no longer exists".format(filename) errfile.seek(0) err_msg = errfile.readlines() if err_msg: if len(err_msg) >= 3: e = CgConstructionError( "DSSR could not process the file: " + err_msg[-3]) with log_to_exception(log, e): log.error("Captured Stderr is:\n%s", "".join( ["... " + line for line in err_msg])) raise e else: # On Py2.7, the filename of the executable (x3dna-dssr) is not part of the # error message raised by sp.call, # For this reason, we add the following hint (unneeded on Py3k) e.strerror += ". (Hint: Did you install x3dna-dssr?)" raise e return bpseq, seq_ids, dssr_dict def _run_mc_annotate(filename, subprocess_kwargs={}): """ Returns: A tuple (bpseq, seq_ids`) """ log.info("Running MC-Annotate") try: out = sp.check_output(['MC-Annotate', filename], universal_newlines=True, **subprocess_kwargs) except OSError as e: assert op.isfile( filename), "File {} (created by forgi) no longer exists".format(filename) e.strerror += ". Hint: Did you install MC-Annotate?" raise e lines = out.strip().split('\n') return lines
[docs]class RnaMissing3dError(LookupError): pass
[docs]class CoarseGrainRNA(fgb.BulgeGraph): ''' A coarse grain model of RNA structure based on the bulge graph representation. Each stem is represented by four parameters (two endpoints) and two twist vetors pointing towards the centers of the base pairs at each end of the helix. ''' def __init__(self, graph_construction, sequence, name=None, infos=None, _dont_split=False): ''' Initialize the new structure. ''' super(CoarseGrainRNA, self).__init__( graph_construction, sequence, name, infos, _dont_split) self.dssr = None self._virtual_atom_cache = {} #: Keys are element identifiers (e.g.: "s1" or "i3"), values are 2-tuples of vectors #: The first value of stem coordinates corresponds to the start of the stem #: (the one with the lowest nucleotide number), #: The second value to the end of the stem. #: If the coordinates for an element change, the virtual atom and virtual residue #: coordinates are automatically invalidated. self.coords = None # We can only initialize this, when we know defines.keys() self.twists = None self.sampled = dict() if self.defines: self._init_coords() #: A dictionary storing the stem-bases (not the vres basis) self.bases = {} self.stem_invs = {} #:The following 5 defaultdicts are cleared when coords or twists change. #: Global (carthesian) position of the virtual residue #: (=offset of the residue's coordinate-system) #: generated by self.add_all_virtual_residues() self.vposs = c.defaultdict(dict) #: The coordinate system specific to each virtual residue #: (3x3 matrix, carthesian coordiantes) #: generated by self.add_all_virtual_residues() self.vbases = c.defaultdict(dict) #: generated by self.add_all_virtual_residues() self.vvecs = c.defaultdict(dict) #: generated by self.add_all_virtual_residues() self.v3dposs = c.defaultdict(dict) #: generated by self.add_all_virtual_residues() #: The inverse of the transposed basis. self.vinvs = c.defaultdict(dict) #: A 3D vector. Used as hint, from what direction the Projection2D object #: should be generated in the default case. self.project_from = None self.longrange = c.defaultdict(set) self.chains = {} # the PDB chains if loaded from a PDB file self.interacting_residues = [] # Lazily loaded: self._incomplete_elements = None # Lazily calculated from interacting_residues self._interacting_elements = None ############################################################################ # Factory functions #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @classmethod def from_bg_string(cls, cg_string): ''' Populate this structure from the string representation of a graph. ''' # pylint: disable=no-member # See https://github.com/PyCQA/pylint/issues/981 # Reading the bulge_graph-part of the file cg = super(CoarseGrainRNA, cls).from_bg_string(cg_string) cg._init_coords() log.debug("BG read. Now reading 3D information") # Reading the part of the file responsible for 3D information lines = cg_string.split('\n') for line in lines: line = line.strip() parts = line.split() if len(parts) == 0: continue if parts[0] == 'coord': name = parts[1] cg.coords[name] = np.array( [list(map(float, parts[2:5])), list(map(float, parts[5:8]))]) if parts[0] == 'twist': name = parts[1] cg.twists[name] = np.array( [list(map(float, parts[2:5])), list(map(float, parts[5:8]))]) if parts[0] == 'longrange': cg.longrange[parts[1]].add(parts[2]) cg.longrange[parts[2]].add(parts[1]) if parts[0] == 'sampled': cg.sampled[parts[1]] = [parts[2]] + list(map(int, parts[3:])) if parts[0] == 'project': cg.project_from = np.array(parts[1:], dtype=float) if parts[0] == "interacting": cg.interacting_residues.append(fgr.resid_from_str(parts[1])) if parts[0] == "vres": elem = parts[1] cg.vposs[elem] = ftuvres.parse_vres(parts[2:]) # Old versions of the file may contain bulge coordinates in the wrong order. cg.add_bulge_coords_from_stems() return cg
[docs] @classmethod def from_pdb(cls, pdb_filename, load_chains=None, remove_pseudoknots=False, dissolve_length_one_stems=True, secondary_structure=None, filetype="pdb", annotation_tool=None, query_PDBeChem=False): """ :param load_chains: A list of chain_ids or None (all chains) :param secondary_structure: Only useful if we load only 1 component :param filetype: One of 'pdb' or 'cif' :param query_PDBeChem: If true, query the PDBeChem database whenever a modified residue with unknown 3-letter code is encountered. :param annotation_tool: One of "DSSR", "MC-Annotate", "forgi" or None. If this is None, we take the value of the configuration file (run forgi_config.py to create a config file). If no config file is given either, we see what tools are installed, preferring the newer DSSR over MC-Annotate and falling back to the fogi implementation if neither is in the PATH variable. If a string is given or the configuration file set, we never fall back to a different option but raise an error, if the requested tool is unavailable. """ #warnings.warn("We currently do not load any long-range interactions") # We need to create files, so we can interface with # the annotation program. if remove_pseudoknots and secondary_structure: warnings.warn( "Option 'remove_pseudoknots ignored for user-supplied secondary structure.") with fus.make_temp_directory() as output_dir: if load_chains == "biggest": chain, missing_res, ir = ftup.get_biggest_chain(pdb_filename) chains = [chain] else: chains, missing_res, ir = ftup.get_all_chains(pdb_filename) new_chains = [] for chain in chains: if load_chains in [None, "biggest"] or chain.id in load_chains: log.debug("Loaded Chain %s", chain.id) chain, modifications = ftup.clean_chain(chain, query_PDBeChem) new_chains.append(chain) log.debug("%s, %s", pdb_filename, os.path.split(pdb_filename)) fn_basename = os.path.split(pdb_filename)[1] if load_chains is None: chain_string = "None" else: chain_string = "-".join(map(str, load_chains)) rna_pdb_fn = op.join(output_dir, fn_basename + "_" + chain_string + '.temp.' + filetype) # Output cleaned version for annotating ftup.output_multiple_chains(new_chains, rna_pdb_fn, filetype) # first we annotate the 3D structure log.info("Starting annotation program for all chains") annotation = _annotate_pdb(rna_pdb_fn, annotation_tool, filetype) if annotation is None: # Fallback-annotation using forgi bpseq, seq_ids = ftup.annotate_fallback(new_chains) dssr_dict = {} else: bpseq, seq_ids, dssr_dict = annotation breakpoints = breakpoints_from_seqids(seq_ids) assert bpseq is not None, "{} {} {}".format(bpseq, seq_ids, dssr_dict) # Here, we remove Pseudoknots on the bp-seq level. not the BulgeGraph # level, because there may be multiple components, so we cannot create # a BulgeGraph. if remove_pseudoknots: #log.info("Removing pseudoknots") #log.info(bpseq) bpseq = cak.k2n_main(io.StringIO(bpseq), input_format='bpseq', #output_format = 'vienna', output_format='bpseq', method=cak.DEFAULT_METHOD, opt_method=cak.DEFAULT_OPT_METHOD, verbose=cak.DEFAULT_VERBOSE, removed=cak.DEFAULT_REMOVED) import networkx as nx chain_connections_multigraph = nx.MultiGraph() bpseq_lines = bpseq.splitlines() for line in bpseq_lines: try: from_, res, to_ = line.split() except: continue from_seqid = seq_ids[int(from_) - 1] from_chain = from_seqid.chain chain_connections_multigraph.add_node(from_chain) if int(to_) != 0 and int(to_) > int(from_): to_seqid = seq_ids[int(to_) - 1] to_chain = to_seqid.chain if from_chain != to_chain: log.debug("Adding {} - {}".format(from_chain, to_chain)) chain_connections_multigraph.add_edge( from_chain, to_chain, basepair=(from_seqid, to_seqid)) chain_connections = nx.Graph(chain_connections_multigraph) if dissolve_length_one_stems: for chain1, chain2 in it.combinations(chain_connections_multigraph.nodes(), 2): if chain2 in chain_connections_multigraph.adj[chain1]: for edge1, edge2 in it.combinations(chain_connections_multigraph.adj[chain1][chain2].values(), 2): if _are_adjacent_basepairs(seq_ids, edge1, edge2): break else: # break NOT encountered. log.debug( "No connection remaining between chains %s and %s", chain1, chain2) chain_connections.remove_edge(chain1, chain2) log.debug("CONNECTIONS: {}, nodes {}".format( chain_connections, chain_connections.nodes())) log.debug("Edges {}".format(chain_connections.edges())) pdb_base = op.splitext(op.basename(pdb_filename))[0] components = list(nx.connected_components(chain_connections)) cgs = [] if len(components) != 1 and secondary_structure: raise GraphConstructionError("User-provided secondary structure is " "ambigouse if more than 1 connected " "component loaded.") for component in nx.connected_components(chain_connections): try: cgs.append(cls._load_pdb_component(bpseq_lines, pdb_base, new_chains, component, missing_res, modifications, seq_ids, secondary_structure, dissolve_length_one_stems, ir)) except GraphConstructionError as e: log_exception(e, logging.ERROR, with_stacktrace=False) log.error( "Could not load chains %s, due to the above mentioned error.", list(component)) raise cgs.sort(key=lambda x: x.name) if dssr_dict: for cg in cgs: cg.dssr = ftud.DSSRAnnotation(dssr_dict, cg) cg.infos["dssr_stacks"] = cg.dssr.stacking_loops() log.debug("Returning %s", cgs) return cgs
@classmethod def _load_pdb_component(cls, original_bpseq_lines, name, chains, chain_ids, missing_res, modifications, seq_ids, secondary_structure="", dissolve_length_one_stems=False, interacting_residues=[]): """ :param original_bpseq_lines: List of strings. Will be filtered for chains. """ #print(component, type(component)) log.info("Loading PDB: Connected component with chains %s", str(list(chain_ids))) log.debug("missing residues %s", missing_res) # Since the external annotation program can take some time, # we do not re-annotate, but filter the bpseq_string instead. new_bpseq = [] new_seqids = [] # Filter the bpseq_lines for line in original_bpseq_lines: from_, res, to_ = line.split() from_seqid = seq_ids[int(from_) - 1] if from_seqid.chain in chain_ids: new_seqids.append(from_seqid) if to_ != '0': to_seqid = seq_ids[int(to_) - 1] if to_seqid.chain not in chain_ids: # Because of length-1-basepair to_ = 0 new_bpseq.append((from_, res, to_)) new_bpseq_str = fus.renumber_bpseq(new_bpseq) tuples, seq_str = fus.bpseq_to_tuples_and_seq(new_bpseq_str) breakpoints = breakpoints_from_seqids(new_seqids) seq_str = _insert_breakpoints_simple(seq_str, breakpoints, 1) if secondary_structure: pt = fus.dotbracket_to_pairtable(secondary_structure) # Overwrite tuples! tuples = fus.pairtable_to_tuples(pt) stru_fragments = secondary_structure.split("&") seq_fragments = seq_str.split("&") seq_lengths = list(map(len, seq_fragments)) if list(map(len, stru_fragments)) != seq_lengths: raise ValueError("The given secondary structure is inconsistent " "with the pdb-chain-lengths of %s", seq_lengths) sequence = Sequence(seq_str, new_seqids, [r for r in missing_res if r["chain"] in chain_ids], {k: v for k, v in modifications.items() if k.chain in chain_ids}) name = name + "_" + "-".join(c for c in sorted(chain_ids)) graph_constr = _BulgeGraphConstruction(tuples) cg = cls(graph_constr, sequence, name) cg = fgb._cleaned_bg(cg, dissolve_length_one_stems) cg.chains = { chain.id: chain for chain in chains if chain.id in chain_ids} ftug.add_stem_information_from_pdb_chains(cg) cg.add_bulge_coords_from_stems() ftug.add_loop_information_from_pdb_chains(cg) ftug._add_loop_vres(cg) assert len(cg.defines) == len( cg.coords), cg.defines.keys() ^ cg.coords.keys() cg.interacting_residues = list(r for r in map(fgr.resid_from_biopython, interacting_residues) if r.chain in chain_ids) return cg ############################################################################ @property def transformed(self): return ftmt.CGTransformer(self) def _get_coord_str(self): ''' Place the start and end coordinates of each stem into a string. The format is: coord s1 x1 y1 z1 x2 y2 z2 Where s1 is the name of the stem, (x1,y1,z1) and (x2,y2,z2) are the two endpoints of the stem. :return: A string containing the coordinates for all of the stems. ''' out_str = '' for key in self.coords.keys(): if key[0] in ["m", "i"]: # Bulge coordinates are redundant. They can be deduced from the stem coordinates. continue [p, n] = self.coords[key] out_str += ("coord {k} {x[0]:.16f} {x[1]:.16f} {x[2]:.16f} " "{y[0]:.16f} {y[1]:.16f} {y[2]:.16f}".format(k=key, x=p, y=n)) out_str += '\n' return out_str
[docs] def add_bulge_coords_from_stems(self): ''' Add the information about the starts and ends of the bulges (i and m elements). The stems have to be created beforehand. This is called during loading of the RNA structure from pdb and from cg files. ''' for d in self.defines.keys(): if d[0] != 's': edges = list(self.edges[d]) if len(edges) == 2: (s1b, _) = self.get_sides(edges[0], d) (s2b, _) = self.get_sides(edges[1], d) mids1 = self.coords[edges[0]] mids2 = self.coords[edges[1]] # Save coordinates in direction of the strand. if self.get_link_direction(edges[0], edges[1], d) == 1: self.coords[d] = (mids1[s1b], mids2[s2b]) else: self.coords[d] = (mids2[s2b], mids1[s1b])
[docs] def add_all_virtual_residues(self): """ Calls ftug.add_virtual_residues() for all stems of this RNA. .. note:: Don't forget to call this again if you changed the structure of the RNA, to avoid leaving it in an inconsistent state. .. warning:: Virtual residues are only added to stems, not to loop regions. The position of residues in loops is much more flexible, which is why virtual residue positions for loops usually do not make sense. If the file was loaded from the PDB, residue positions from the PDB file are stored already. """ for stem in self.stem_iterator(): try: log.debug( "Adding virtual residues for stem %s with coords %s", stem, self.coords[stem]) ftug.add_virtual_residues(self, stem) except (KeyError, ValueError, AssertionError): if np.all(np.isnan(self.coords[stem])): raise RnaMissing3dError( "No 3D coordinates available for stem {}".format(stem)) elif stem[0] == "s" and np.all(np.isnan(self.twists[stem])): raise RnaMissing3dError( "No twists available for stem {}".format(stem)) else: log.info("Reraising an ERROR in add_all_virtual_residues") raise
[docs] def get_virtual_residue(self, pos, allow_single_stranded=False): """ Get the virtual residue position in the global coordinate system for the nucleotide at position pos (1-based) :param pos: A 1-based nucleotide number :param allow_single_stranded: If True and pos is not in a stem, return a rough estimate for the residue position instead of raising an error. Currenly, for non-stem elements, these positions are on the axis of the cg-element. """ mult = 5 if isinstance(pos, fgr.RESID): pos = self.seq.to_integer(pos) elem = self.get_node_from_residue_num(pos) if elem[0] == "s" and elem not in self.v3dposs or not self.v3dposs[elem]: ftug.add_virtual_residues(self, elem) if elem[0] == "s": for i in range(self.stem_length(elem)): if self.defines[elem][0] + i == pos: vres = self.v3dposs[elem][i] return vres[0] + vres[2] * mult elif self.defines[elem][3] - i == pos: vres = self.v3dposs[elem][i] return vres[0] + vres[3] * mult else: assert False else: d = self.defines[elem] if d[0] <= pos <= d[1]: i = pos - d[0] else: i = d[1] - d[0] + 1 + pos - d[2] try: elem_coords = self.vposs[elem][i] origin, basis = ftug.element_coord_system(self, elem) new_coords = ftuv.change_basis( elem_coords, ftuv.standard_basis, basis) + origin log.debug("%s %s: coords %s mapped to %s", elem, i, elem_coords, new_coords) return new_coords except KeyError as e: try: self._has_warned_old_vres except AttributeError: self._has_warned_old_vres = set() if elem not in self._has_warned_old_vres: log.warning( "No virtual residues have been loaded for loops: %s (for pos %s) in elem %s." "Using inaccurate position along the cylinder instead.", e, pos, elem) self._has_warned_old_vres.add(elem) if not allow_single_stranded: raise ValueError( "Position {} is not in a stem! It is in {}.".format(pos, elem)) if elem[0] == "h": # We estimate the vres position along the axis of the hairpin. h_length = self.element_length(elem) / 2 pos_in_h = pos - self.defines[elem][0] +1 if pos_in_h > math.ceil(h_length): l = self.defines[elem][1] - pos+1 elif pos_in_h <=int(h_length): l = pos - self.defines[elem][0]+1 else: l = math.ceil(h_length) perc = (l) / math.ceil(h_length) elif elem[0] == "i": if pos <= self.defines[elem][1]: l = pos - self.defines[elem][0] tl = (self.defines[elem][1] - self.defines[elem][0]) else: l = self.defines[elem][3] - pos tl = (self.defines[elem][3] - self.defines[elem][2] ) perc = (l+1) / (tl+2) else: l = pos - self.defines[elem][0] perc = (l+1) / (self.element_length(elem)+1) log.debug("Calculating vres: %s,%s, %s: length=%s, perc=%s",elem, pos, l,(self.coords[elem][1] - self.coords[elem][0]), perc) return self.coords[elem][0] + (self.coords[elem][1] - self.coords[elem][0]) * perc
[docs] def get_ordered_stem_poss(self): points = [] for s in self.sorted_stem_iterator(): points += list(self.coords[s]) points = np.array(points) if np.any(np.isnan(points)): raise RnaMissing3dError("No 3D coordinates present.") return points
[docs] def get_ordered_virtual_residue_poss(self, return_elements=False): """ Get the coordinates of all stem's virtual residues in a consistent order. This is used for RMSD calculation. If no virtual_residue_positions are known, self.add_all_virtual_residues() is called automatically. :param return_elements: In addition to the positions, return a list with the cg-elements these coordinates belong to :returns: A numpy array. """ if not self.v3dposs: self.add_all_virtual_residues() vress = [] elems = [] for i in range(1, len(self.seq)+1): pos = self.get_virtual_residue(i, allow_single_stranded = True) vress.append(pos) elems.append(self.get_elem(i)) if return_elements: return np.array(vress), elems return np.array(vress)
[docs] def get_poss_for_domain(self, elements, mode="vres"): """ Get an array of coordinates only for the elements specified. ..note:: This code is still experimental in the current version of forgi. :param elements: A list of coarse grain element names. """ points = [] if mode == "fast": for e in sorted(elements): points += list(self.coords[e]) return np.array(points) elif mode == "vres": if not self.v3dposs: self.add_all_virtual_residues() vress = [] for s in sorted(elements): if s[0] != "s": continue for i in range(self.stem_length(s)): vres = self.v3dposs[s][i] vress += [vres[0] + vres[2], vres[0] + vres[3]] return np.array(vress) assert False
[docs] def steric_value(self, elements, method="r**-2"): """ Estimate, how difficult a set of elements was to build, by counting the atom density around the center of these elements """ try: if isinstance(elements, list) or isinstance(elements, tuple): center = ftuv.get_vector_centroid(self.coords[elements]) elif elements.shape == (3,): center = elements else: assert False, repr(elements) except: print(elements, repr(elements)) raise if method == "kde": # print(center) all_vas = [] for pos in range(1, self.seq_length + 1): for va in self.virtual_atoms(pos).values(): all_vas.append(va) all_vas = np.array(all_vas).T log.debug("Shape of all atoms {}".format(all_vas.shape)) # randomly take 50 Angstrom bandwidth kde = scipy.stats.gaussian_kde(all_vas, 50) return kde(center) elif method.startswith("r**"): power = -int(method[3:5]) exclude = method[5:] if exclude and exclude != "e": raise ValueError("Not supported method") value = 0 for pos in range(1, self.seq_length + 1): if exclude and self.get_node_from_residue_num(pos) in elements: continue for va in self.virtual_atoms(pos).values(): value += 1 / (1 + ftuv.vec_distance(va, center))**power return value elif method.startswith("cutoff"): cutoff = float(method.split()[1]) value = 0 for pos in range(1, self.seq_length + 1): for va in self.virtual_atoms(pos).values(): if ftuv.vec_distance(va, center) < cutoff: value += 1 return value
def _get_twist_str(self): ''' Place the twist vectors into a string. The format is: twist s1 x1 y1 z1 x2 y2 z2 Where s1 is the name of the stem and (x1,y1,z1) and (x2,y2,z2) are the unit vectors from the start of each end of the stem to the middle of the base pairs. ''' out_str = '' for key in self.twists.keys(): [p, n] = self.twists[key] out_str += ("twist {k} {x[0]:.16f} {x[1]:.16f} {x[2]:.16f} " "{y[0]:.16f} {y[1]:.16f} {y[2]:.16f}".format(k=key, x=p, y=n)) out_str += '\n' return out_str def _get_long_range_str(self): out_str = '' printed = set() for key1 in self.longrange.keys(): for key2 in self.longrange[key1]: k = [key1, key2] k.sort() k = tuple(k) if k in printed: continue printed.add(k) out_str += "longrange %s %s\n" % (key1, key2) return out_str def _get_interacting_str(self): out = [] for seqid in self.interacting_residues: out.append("interacting\t{}".format(fgr.resid_to_str(seqid))) return "\n".join(out) + "\n" def _get_sampled_stems_str(self): out_str = '' for key in self.sampled.keys(): out_str += 'sampled %s %s\n' % (key, " ".join(map(str, self.sampled[key]))) return out_str def _get_vres_str(self): out_str = "" for elem in self.vposs: if elem[0] != "s": out_str += "vres {} ".format(elem) out_str += ftuvres.serialize_vres(self.vposs[elem]) out_str += "\n" return out_str
[docs] def to_cg_string(self): ''' Output this structure in string form. ''' curr_str = self.to_bg_string() curr_str += self._get_coord_str() curr_str += self._get_twist_str() curr_str += self._get_sampled_stems_str() curr_str += self._get_long_range_str() curr_str += self._get_interacting_str() curr_str += self._get_vres_str() if self.project_from is not None: curr_str += "project {} {} {}\n".format(*self.project_from) return curr_str
[docs] def to_file(self, filename): with open(filename, 'w') as f: cg_str = self.to_cg_string() f.write(cg_str)
[docs] def get_bulge_angle_stats_core(self, elem, forward=True): ''' Return the angle stats for a particular bulge. These stats describe the relative orientation of the two stems that it connects. :param elem: The name of the bulge. :param connections: The two stems that are connected by it. :return: ftms.AngleStat object ''' connections = self.connections(elem) if not forward: connections = connections[::-1] log.debug("elem %s connections %s", elem, connections) (stem1, twist1, stem2, twist2, bulge) = ftug.get_stem_twist_and_bulge_vecs( self, elem, connections) log.debug("stem1 %s, twist1 %s, stem2 %s, twist2 %s, bulge %s", stem1, twist1, stem2, twist2, bulge) if round(np.dot(stem1, twist1), 10) != 0 or round(np.dot(stem2, twist2), 10) != 0: err = CgIntegrityError("The twists are inconsistent. " "They should be orthogonal to the corresponding stem vectors." "Inconsistency found for {},{}".format(elem, connections)) with log_to_exception(log, err): log.error("Angle stem1-twist1 %s dot_product=%s, Angle stem2-twist2 %s degrees dot_product=%s", math.degrees(ftuv.vec_angle(stem1, twist1) ), np.dot(stem1, twist1), math.degrees(ftuv.vec_angle(stem2, twist2)), np.dot(stem2, twist2),) raise err try: u, v, t, r1, u1, v1 = ftug.get_angle_stat_geometry( stem1, twist1, stem2, twist2, bulge) except ZeroDivisionError as e: with log_to_exception(log, e): log.error("Error getting stats for elem %s", elem) raise dims = self.get_bulge_dimensions(elem) ang_type = self.connection_type(elem, connections) seq = "&".join(self.get_define_seq_str(elem, adjacent=True)) log.debug("u %s, v %s", u, v) if elem[0] == "m": mls = self.find_mlonly_multiloops() for ml in mls: if elem in ml: descr = self.describe_multiloop( [x for x in ml if x[0] != "s"]) if "pseudoknot" in descr: stat_type = "pseudo" elif "open" in descr: stat_type = "open" else: stat_type = "angle" # ML break else: assert False else: stat_type = "angle" # IL angle_stat = ftms.AngleStat(stat_type, self.name, dims[0], dims[1], u, v, t, r1, u1, v1, ang_type, self.defines[elem], seq, self.vposs[elem]) return angle_stat
[docs] def get_stats(self, d): ''' Calls get_loop_stat/ get_bulge_angle_stats or get_stem_stats, depending on the element d. :returns: A 1- or 2 tuple of stats (2 in case of bulges. One for each direction) ''' if d[0] == "s": return (self.get_stem_stats(d),) elif d[0] in "mi": return self.get_bulge_angle_stats(d) else: try: stat = self.get_loop_stat(d) except ValueError: if len(self.defines) == 1: # A structure without any stem has no stats. return tuple() else: raise if d[0] == "f": stat.stat_type = "5prime" elif d[0] == "t": stat.stat_type = "3prime" return (stat,)
[docs] def get_loop_stat(self, d): ''' Return the statistics for this loop. These stats describe the relative orientation of the loop to the stem to which it is attached. :param d: The name of the loop ''' loop_stat = ftms.LoopStat() loop_stat.pdb_name = self.name loop_stat.bp_length = self.get_length(d) loop_stat.phys_length = ftuv.magnitude( self.coords[d][1] - self.coords[d][0]) stem1, = self.edges[d] # Make sure there is only one edge (s1b, s1e) = self.get_sides(stem1, d) stem1_vec = self.coords[stem1][s1b] - self.coords[stem1][s1e] twist1_vec = self.twists[stem1][s1b] bulge_vec = self.coords[d][1] - self.coords[d][0] # To avoid loops with 0 physical length. (If disconnects in the structure are modelled as loop) if ftuv.magnitude(bulge_vec) < 10**-3: bulge_vec += (10**-3) * (stem1_vec / ftuv.magnitude(stem1_vec)) (r, u, v) = ftug.get_stem_separation_parameters(stem1_vec, twist1_vec, bulge_vec) (loop_stat.r, loop_stat.u, loop_stat.v) = (r, u, v) # Will this cause problems in other parts of the code base??? loop_stat.r = loop_stat.phys_length loop_stat.define = self.defines[d] loop_stat.seq, = self.get_define_seq_str(d, adjacent=True) loop_stat.vres = self.vposs[d] if d[0] == "f": loop_stat.stat_type = "5prime" elif d[0] == "t": loop_stat.stat_type = "3prime" return loop_stat
[docs] def get_bulge_angle_stats(self, bulge): ''' Return the angle stats for a particular bulge. These stats describe the relative orientation of the two stems that it connects. :param bulge: The name of the bulge. :param connections: The two stems that are connected by it. :return: The angle statistics in one direction and angle statistics in the other direction ''' if bulge == 'start': return (ftms.AngleStat(), ftms.AngleStat()) angle_stat1 = self.get_bulge_angle_stats_core(bulge, True) angle_stat2 = self.get_bulge_angle_stats_core(bulge, False) # If we go into the reverse direction, the first vector of the element basis is inverted. # and thus also the last one (cross product of first and second) # for k,v in angle_stat2.vres.items(): # angle_stat2.vres[k]= v*[-1,1,-1] assert round(angle_stat1.get_angle(), 5) == round(angle_stat2.get_angle( ), 5), ("{}!={}".format(angle_stat1.get_angle(), angle_stat2.get_angle())) return (angle_stat1, angle_stat2)
[docs] def get_stacking_helices(self, method="Tyagi"): """ EXPERIMENTAL Return all helices (longer stacking regions) as sets. Two stems and one bulge are in a stacking relation, if self.is_stacking(bulge) is true and the stems are connected to the bulge. Further more, a stem is in a stacking relation with itself. A helix is the transitive closure this stacking relation. :returns: A list of sets of element names. """ helices = [] for d in self.defines: if d[0] in "mi" and self.is_stacking(d, method): s1, s2 = self.connections(d) helices.append(set([d, s1, s2])) if d[0] == "s": helices.append(set([d])) while True: for i, j in it.combinations(range(len(helices)), 2): stack_bag1 = helices[i] stack_bag2 = helices[j] if stack_bag1 & stack_bag2: stack_bag1 |= stack_bag2 del helices[j] break else: break return helices
[docs] def is_stacking(self, bulge, method="Tyagi", verbose=False): """ EXPERIMENTAL Reports, whether the stems connected by the given bulge are coaxially stacking. :param bulge: STRING. Name of a interior loop or multiloop (e.g. "m3") :param method": STRING. "Tyagi": Use cutoffs from doi:10.1261/rna.305307, PMCID: PMC1894924. :returns: A BOOLEAN. """ assert method in ["Tyagi", "CG"] if method == "Tyagi": return self._is_stacking_tyagi(bulge, verbose) return self._is_stacking_CG(bulge, verbose)
def _is_stacking_CG(self, bulge, verbose=False): """""" stem1, stem2 = self.connections(bulge) angle = ftuv.vec_angle(self.coords[stem1][1] - self.coords[stem1][0], self.coords[stem2][1] - self.coords[stem2][0]) if angle > math.pi / 2: angle = math.pi - angle if angle > math.radians(45): if verbose: print("Angle {}>45".format(math.degrees(angle))) return False shear_angle1 = ftuv.vec_angle(self.coords[stem1][1] - self.coords[stem1][0], self.coords[bulge][1] - self.coords[bulge][0]) if shear_angle1 > math.pi / 2: shear_angle1 = math.pi - shear_angle1 shear_angle2 = ftuv.vec_angle(self.coords[stem2][1] - self.coords[stem2][0], self.coords[bulge][1] - self.coords[bulge][0]) if shear_angle2 > math.pi / 2: shear_angle2 = math.pi - shear_angle2 if shear_angle1 > math.radians(60) or shear_angle2 > math.radians(60): if verbose: print ("Shear angle 1 {}>60 or shear angle 2 {}>60".format( math.degrees(shear_angle1), math.degrees(shear_angle1))) return False return True def _is_stacking_tyagi(self, bulge, verbose=False): """ Implementation of the method described in doi:10.1261/rna.305307 (Tyagi and Matthews) for the detection of coaxial stacking. Called by self.is_stacking(bulge, "Tyagi") ..note:: This does NOT implement the method for coaxial stacking prediction which is the main focus of the paper, only the method for the detection of stacking in pdb files. """ assert bulge[0] in "mi" DISTANCE_CUTOFF = [14, 6] ANGLE_CUTOFF = [math.acos(0.75), math.acos(0.8)] # Relaxed compared to 60 in the paper, because we use SHEAR_ANGLE_CUTOFF = math.radians(60) # virtual atom positions SHEAR_OFFSET_CUTOFF = 10 if bulge[0] == "m" and self.get_length(bulge) == 0: is_flush = True # flush-stack vs. mismatch-mediated stack else: is_flush = False stem1, stem2 = self.connections(bulge) side_nts = self.get_connected_residues(stem1, stem2, bulge)[0] # Distance try: bp_center1 = ftug.get_basepair_center(self, side_nts[0]) bp_center2 = ftug.get_basepair_center(self, side_nts[1]) except KeyError: return False if ftuv.vec_distance(bp_center1, bp_center2) > DISTANCE_CUTOFF[is_flush]: if verbose: print ("Distance {} > {}".format(ftuv.vec_distance( bp_center1, bp_center2), DISTANCE_CUTOFF[is_flush])) return False normalvec1 = ftug.get_basepair_plane(self, side_nts[0]) normalvec2 = ftug.get_basepair_plane(self, side_nts[1]) # Coaxial angle = ftuv.vec_angle(normalvec1, normalvec2) if angle > math.pi / 2: # Triggered frequently #warnings.warn("Angle > 90 degrees: {} ({})".format(angle, math.degrees(angle))) angle = math.pi - angle if angle > ANGLE_CUTOFF[is_flush]: if verbose: print ("Angle {} > {}".format(angle, ANGLE_CUTOFF[is_flush])) return False # Shear Angle shear_angle1 = ftuv.vec_angle(normalvec1, bp_center2 - bp_center1) if shear_angle1 > math.pi / 2: shear_angle1 = math.pi - shear_angle1 if shear_angle1 > SHEAR_ANGLE_CUTOFF: if verbose: print ("Shear angle 1 {} > {}".format( shear_angle1, SHEAR_ANGLE_CUTOFF)) return False shear_angle2 = ftuv.vec_angle(normalvec2, bp_center1 - bp_center2) if shear_angle2 > math.pi / 2: shear_angle2 = math.pi - shear_angle2 if shear_angle2 > SHEAR_ANGLE_CUTOFF: if verbose: print ("Shear angle 2 {} > {}".format( shear_angle2, SHEAR_ANGLE_CUTOFF)) return False # Shear Offset # Formula for distance between a point and a line # from http://onlinemschool.com/math/library/analytic_geometry/p_line/ if (ftuv.magnitude(np.cross((bp_center1 - bp_center2), normalvec2)) / ftuv.magnitude(normalvec2)) > SHEAR_OFFSET_CUTOFF: if verbose: print ("Shear offset 1 wrong:", (ftuv.magnitude(np.cross((bp_center1 - bp_center2), normalvec2)) / ftuv.magnitude(normalvec2)), ">", SHEAR_OFFSET_CUTOFF) return False if (ftuv.magnitude(np.cross((bp_center1 - bp_center2), normalvec1)) / ftuv.magnitude(normalvec1)) > SHEAR_OFFSET_CUTOFF: if verbose: print ("Shear offset 2 wrong") return False return True
[docs] def get_stem_stats(self, stem): ''' Calculate the statistics for a stem and return them. These statistics will describe the length of the stem as well as how much it twists. :param stem: The name of the stem. :return: A StemStat structure containing the above information. ''' ss = ftms.StemStat() ss.pdb_name = self.name #ss.bp_length = abs(self.defines[stem][0] - self.defines[stem][1]) ss.bp_length = self.stem_length(stem) ss.phys_length = ftuv.magnitude( self.coords[stem][0] - self.coords[stem][1]) ss.twist_angle = ftug.get_twist_angle( self.coords[stem], self.twists[stem]) ss.define = self.defines[stem] # Stems may or may not have adjacent nucleotides, so we do not include the adj nucleotides ss.seqs = self.get_define_seq_str(stem, adjacent=False) return ss
# def get_loop_from_residue(self, residue) -> use BulgeGraph.get_node_from_residue_num()! def _init_coords(self): self.coords = LineSegmentStorage( self.defines.keys(), on_change=self.reset_vatom_cache) self.twists = CoordinateStorage( [x for x in self.defines if x[0] == "s"], on_change=self.reset_vatom_cache) @property def incomplete_elements(self): if self._incomplete_elements is None: self._incomplete_elements = ftug.get_incomplete_elements(self) return set(self._incomplete_elements) @property def interacting_elements(self): if self._interacting_elements is None: interacting_nts = [] for r in self.interacting_residues: try: interacting_nts.append(self.seq_id_to_pos(r)) except ValueError as e: # interacting missing residue assert "not in list" in str(e) pass self._interacting_elements = set( self.nucleotides_to_elements(interacting_nts)) return self._interacting_elements
[docs] def radius_of_gyration(self, method="vres"): ''' Calculate the radius of gyration of this structure. :param method: A STRING. one of "fast" (use only coordinates of coarse grained stems) or "vres" (use virtual residue coordinates of stems) :return: A number with the radius of gyration of this structure. ''' if len(list(self.stem_iterator())) == 0: log.warning( "Cannnot calculate ROG (%s) for structure %s without stems. Returning 'nan'", method, self.name) return float("nan") if method == "fast": coords = self.get_ordered_stem_poss() elif method == "vres": coords = self.get_ordered_virtual_residue_poss() else: raise ValueError( "Wrong method {}. Choose one of 'fast' and 'vres'".format(method)) rog = ftmd.radius_of_gyration(coords) return rog
def _assign_loop_roles(self, loop): """ EXPERIMENTAL For 3-way junctions: Assign the roles P1, P2 and P3 and J12, J31, J23 according to Lescoute and Westhof, RNA 2006 (doi: 10.1261/rna.2208106) We try to detect which helices stack and assign the roles in a way that J12 is always the stacking segment. :returns: None if no clear classification was possible, a dictionary otherwise. """ if len(loop) != 3: raise ValueError( "junction_family is currentlty only implemented for 3-way junctions.") # Find out which helices stack. # Version 1: colinearity of helix axes: stems = self.edges[loop[0]] | self.edges[loop[1]] | self.edges[loop[2]] if len(stems) != 3: e = ValueError( "The multiloop segments do not form a 3-way junction.") with log_to_exception(log, e): log.error("stems are %s", stems) raise e collinearities = {} for stem1, stem2 in it.combinations(stems, 2): c = ftuv.line_segments_collinearity( self.coords[stem1], self.coords[stem2]) collinearities[(stem1, stem2)] = c # The two helices that are most likely to stack. stacking_stems = max(collinearities, key=lambda k: collinearities[k]) if collinearities[stacking_stems] < 0.8: return None # The second best pair second_best_pair, second_best_score = list( sorted(collinearities.items(), key=lambda x: x[1]))[1] if collinearities[stacking_stems] - second_best_score < 0.5: # Two are possible. We look now which one has fewer offset to the 3rd stem. # The reference stem could stack with both. reference = set(stacking_stems) & set(second_best_pair) other = set(stems) - reference reference, = reference stem1, stem2 = other o1 = self.stem_offset(reference, stem1) o2 = self.stem_offset(reference, stem2) if abs(o1 - o2) < 0.2: # Angstrom log.warning("Cannot assign loop roles. Stacking is ambiguous.") return None elif o1 < o2: stacking_stems = (reference, stem1) else: stacking_stems = (reference, stem2) # We have clearly identified the stacking helices. # j12 is the ML-segment that connects the stacking helices. j12, = self.edges[stacking_stems[0]] & self.edges[stacking_stems[1]] # stem P1 is at the 5' side of the connecting multiloop... p1 = self.get_node_from_residue_num(self.define_a(j12)[0]) # ...and P2 at the 3' side p2 = self.get_node_from_residue_num(self.define_a(j12)[1]) assert set([p1, p2]) == set(stacking_stems) assert p1 != p2 # p3 is the third stem, p3, = stems - set([p1, p2]) # j31 and j23 are the remaining ml segments j31, = self.edges[p3] & self.edges[p1] j23, = self.edges[p2] & self.edges[p3] return {"P1": p1, "P2": p2, "P3": p3, "J12": j12, "J31": j31, "J23": j23} def _junction_family_westhof1(self, loop): """ EXPERIMENTAL For 3-way junctions: Return the junction family according to Lescoute and Westhof, RNA 2006 (doi: 10.1261/rna.2208106). In this method, the junction family is defined via the relative lengths of fragments, not the orientation of the elements. :param loop: Either a dictionary, as returned by self._assign_loop_roles or a list of 3 loop segments. """ if isinstance(loop, dict): j_roles = loop else: j_roles = self._assign_loop_roles(loop) # Now compare the lengths of J31 and J23 to determine the junction family if self.get_length(j_roles["J31"]) > self.get_length(j_roles["J23"]): return "A" elif self.get_length(j_roles["J31"]) == self.get_length(j_roles["J23"]): return "B" else: return "C" def _junction_family_3d(self, loop): """ EXPERIMENTAL For 3-way junctions: Return the junction family depending on the 3D orientation. :param loop: Either a dictionary, as returned by self._assign_loop_roles or a list of 3 loop segments. """ if isinstance(loop, dict): j_roles = loop else: j_roles = self._assign_loop_roles(loop) a31 = self.stem_angle(j_roles["P3"], j_roles["P1"]) a23 = self.stem_angle(j_roles["P2"], j_roles["P3"]) # If the stack is perfectly straight, a31+a23=180 degrees. if a31 > a23: return 2 else: return 1 def _junction_family_is_perpenticular(self, loop): """ EXPERIMENTAL For 3-way junctions Return whether or not the not-stacking loop is very roughly perpenticular to the stack. :param loop: Either a dictionary, as returned by self._assign_loop_roles or a list of 3 loop segments. """ if isinstance(loop, dict): j_roles = loop else: j_roles = self._assign_loop_roles(loop) a31 = self.stem_angle(j_roles["P3"], j_roles["P1"]) a23 = self.stem_angle(j_roles["P2"], j_roles["P3"]) # If the stack is perfectly straight, a31+a23=180 degrees. if math.radians(60) < a31 < math.radians(120) or math.radians(60) < a23 < math.radians(120): return 1 else: return 0
[docs] def stem_offset(self, ref_stem, stem2): """ How much is the offset between the start of stem 2 and the axis of stem1. Assumes that stem1 and stem 2 are connected by a single bulge. Then the start of stem2 is defined to be the stem side closer to the bulge. """ common_edges = self.edges[ref_stem] & self.edges[stem2] if len(common_edges) != 1: raise ValueError("Stem1 and stem2 must be connected by a single bulge " "to calculate stem_offset.") bulge, = common_edges side = self.get_sides(stem2, bulge)[0] return ftuv.point_line_distance(self.coords[stem2][side], self.coords[ref_stem][0], self.coords.get_direction(ref_stem))
[docs] def stem_angle(self, stem1, stem2): """ Returns the angle between two stems. If they are connected via a single element, use the direction pointing away from this element for both stems. Otherwise, use the direction from start to end. """ vec1 = self.coords.get_direction(stem1) vec2 = self.coords.get_direction(stem2) common_edges = self.edges[stem1] & self.edges[stem2] if len(common_edges) == 1: bulge, = common_edges if self.get_sides(stem1, bulge) == (1, 0): vec1 = -vec1 else: assert self.get_sides(stem1, bulge) == (0, 1) if self.get_sides(stem2, bulge) == (1, 0): vec2 = -vec2 else: assert self.get_sides(stem2, bulge) == (0, 1) return ftuv.vec_angle(vec1, vec2)
[docs] def get_coordinates_array(self): ''' Get all of the coordinates in one large array. The coordinates are sorted in the order of the keys in coordinates dictionary. :return: A 2D numpy array containing all coordinates ''' all_coords = [] assert len(self.coords) == len( self.defines), self.coords.keys() ^ self.defines.keys() for key in sorted(self.coords.keys()): for i in range(len(self.coords[key])): all_coords.append(self.coords[key][i]) return np.array(all_coords)
[docs] def load_coordinates_array(self, coords): ''' Read in an array of coordinates (as may be produced by get_coordinates_array) and replace the coordinates of this structure with it. :param coords: A 2D array of coordinates :return: self ''' for j, key in enumerate(sorted(self.coords.keys())): self.coords[key] = coords[2 * j], coords[2 * j + 1] return self
[docs] def get_twists(self, node): ''' Get the array of twists for this node. If the node is a stem, then the twists will simply those stored in the array. If the node is an interior loop or a junction segment, then the twists will be the ones that are adjacent to it, projected to the plane normal to the element vector. If the node is a hairpin loop or a free end, then the same twist will be duplicated and returned twice. :param node: The name of the node ''' if node[0] == 's': return self.twists[node] connections = list(self.edges[node]) if not connections: assert node[0] == "f" log.warning("Twists for structures without any stems are 'nan'") return np.zeros(3) * float("nan"), np.zeros(3) * float("nan") (s1b, s1e) = self.get_sides(connections[0], node) if len(connections) == 1: vec = ftuv.normalize(ftuv.vector_rejection( self.twists[connections[0]][s1b], self.coords[node][1] - self.coords[node][0])) return (vec, vec) if len(connections) == 2: # interior loop or junction segment (s2b, s2e) = self.get_sides(connections[1], node) bulge_vec = (self.coords[connections[0]][s1b] - self.coords[connections[1]][s2b]) return (ftuv.normalize(ftuv.vector_rejection( self.twists[connections[0]][s1b], bulge_vec)), ftuv.normalize(ftuv.vector_rejection(self.twists[connections[1]][s2b], bulge_vec))) # uh oh, this shouldn't happen since every node # should have either one or two edges assert False
[docs] def element_physical_distance(self, element1, element2): ''' Calculate the physical distance between two coarse grain elements. :param element1: The name of the first element (e.g. 's1') :param element2: The name of the first element (e.g. 's2') :return: The closest distance between the two elements. ''' (i1, i2) = ftuv.line_segment_distance(self.coords[element1][0], self.coords[element1][1], self.coords[element2][0], self.coords[element2][1]) return ftuv.vec_distance(i1, i2)
[docs] def longrange_iterator(self, filter_connected=False): ''' Iterate over all long range interactions in this molecule. :param filter_connected: Filter interactions that are between elements which are connected (mostly meaning multiloops which connect to the same end of the same stem) :return: A generator yielding long-range interaction tuples (i.e. ('s7', 'i2')) ''' seen = set() for partner1 in self.longrange.keys(): for partner2 in self.longrange[partner1]: if filter_connected: if self.connected(partner1, partner2): continue interaction = tuple(sorted([partner1, partner2])) # check if we've already seen this interaction if interaction in seen: continue seen.add(interaction) yield interaction
[docs] def total_length(self): ''' Calculate the combined length of all the elements. ''' total_length = sum( [len(list(self.define_residue_num_iterator(d))) for d in self.defines]) assert total_length == self.seq_length return self.seq_length
[docs] def sorted_edges_for_mst(self): """ Keep track of all linked nodes. Used for the generation of the minimal spanning tree. This overrides the function in bulge graph and adds an additional sorting criterion with lowest priority. Elements that have no entry in self.sampled should be preferedly broken. This should ensure that the minimal spanning tree is the same after saving and loading an RNA to/from a file, if changes of the minimal spanning tree were performed by ernwin. """ priority = {'s': 1, 'i': 2, 'm': 3, 'f': 4, 't': 5} edges = sorted(it.chain(self.mloop_iterator(), self.iloop_iterator()), key=lambda x: (priority[x[0]], min(self.get_node_dimensions(x)), not x in self.sampled, x)) return edges
[docs] def coords_to_directions(self): """ The directions of each coarse grain element. One line per cg-element. The array is sorted by the corresponding element names alphabetically (`sorted(defines.keys()`) The directions usually point away from the elemnt's lowest nucleotide. However h,t and f elements always point away from the connected stem. """ coords = self.get_coordinates_array() directions = coords[1::2] - coords[0::2] return directions
[docs] def coords_from_directions(self, directions): """ Generate coordinates from direction vectors (using also their lengths) Currently ignores the twists! :param directions: An array of vectors from the side of a cg-element with lower nucleotide number to the side with higher number The array is sorted by the corresponding element names alphabetically (`sorted(defines.keys()`) """ sorted_defines = sorted(self.defines.keys()) assert len(sorted_defines) == len(directions), "{} != {}".format( len(sorted_defines), len(directions)) if self.build_order is None: self.traverse_graph() self.coords["s0"] = np.array( [0, 0, 0]), directions[sorted_defines.index("s0")] for stem1, link, stem2 in self.build_order: # Bulges and stems conn = self.connection_ends( self.connection_type(link, [stem1, stem2])) link_dir = self.get_link_direction(stem1, stem2, link) if link_dir == 1: self.coords[link] = self.coords[stem1][conn[0] ], self.coords[stem1][conn[0]] + directions[sorted_defines.index(link)] if conn[1] == 0: self.coords[stem2] = self.coords[link][1], self.coords[link][1] + \ directions[sorted_defines.index(stem2)] else: self.coords[stem2] = self.coords[link][1] - \ directions[sorted_defines.index( stem2)], self.coords[link][1] else: self.coords[link] = self.coords[stem1][conn[0]] - \ directions[sorted_defines.index( link)], self.coords[stem1][conn[0]] if conn[1] == 0: self.coords[stem2] = self.coords[link][0], self.coords[link][0] + \ directions[sorted_defines.index(stem2)] else: self.coords[stem2] = self.coords[link][0] - \ directions[sorted_defines.index( stem2)], self.coords[link][0] for d in self.defines: if d[0] == "m" and d not in self.mst: edges = list(self.edges[d]) (s1b, _) = self.get_sides(edges[0], d) (s2b, _) = self.get_sides(edges[1], d) mids1 = self.coords[edges[0]] mids2 = self.coords[edges[1]] # Save coordinates in direction of the strand. if self.get_link_direction(edges[0], edges[1], d) == 1: self.coords[d] = (mids1[s1b], mids2[s2b]) else: self.coords[d] = (mids2[s2b], mids1[s1b]) if d[0] in "hft": # Loops stem, = self.edges[d] (s1b, _) = self.get_sides(stem, d) self.coords[d] = self.coords[stem][s1b], self.coords[stem][s1b] + \ directions[sorted_defines.index(d)]
[docs] def virtual_atoms(self, key): """ Get virtual atoms for a key. :param key: An INTEGER: The number of the base in the RNA. :returns: A dict {atom:coords}, e.g. {"C8":np.array([x,y,z]), ...} """ if isinstance(key, int): if key not in self._virtual_atom_cache: try: self._virtual_atom_cache[key] = ftug.virtual_atoms(self)[ key] except KeyError as e: log.info( "Key {} not present. Need to recreate all virtual residues".format(e)) self.add_all_virtual_residues() self._virtual_atom_cache[key] = ftug.virtual_atoms(self)[ key] return self._virtual_atom_cache[key] else: raise ValueError("Expected an int, found {}".format(key))
[docs] def reset_vatom_cache(self, key): """ Delete all cached information about virtual residues and virtual atoms. Used as on_call function for the observing of the self.coords dictionary. :param key: A coarse grain element name, e.g. "s1" or "m15" """ try: if not self._virtual_atom_cache: return except AttributeError: # Happens during deepcopy return define = self.defines[key] # Do not delete self.vposs, it's in the element's coordinate system # Delete virtual residues try: del self.vbases[key] except KeyError: pass try: del self.vvecs[key] except KeyError: pass try: del self.v3dposs[key] except KeyError: pass try: del self.vinvs[key] except KeyError: pass # Delete virtual atoms if len(define) > 1: for i in range(define[0], define[1] + 1): if i in self._virtual_atom_cache: del self._virtual_atom_cache[i] if len(define) > 3: for i in range(define[2], define[3] + 1): if i in self._virtual_atom_cache: del self._virtual_atom_cache[i]
# def __deepcopy__(self, memo):
[docs] def rotate(self, angle, axis="x", unit="radians"): if unit == "degrees": angle = math.radians(angle) elif unit != "radians": raise ValueError( "Unit {} not understood. Use 'degrees' or 'radians'".format(unit)) s = math.sin(angle) cosi = math.cos(angle) rotation_matrix = ftuv.rotation_matrix(axis, angle) self.coords.rotate(rotation_matrix) self.twists.rotate(rotation_matrix) self.after_coordinates_changed() for chain in self.chains.values(): chain.transform(rotation_matrix.T, [0, 0, 0])
[docs] def rotate_translate(self, offset, rotation_matrix): """ First translate the RNA by offset, then rotate by rotation matrix """ self.coords._coordinates -= offset self.coords.rotate(rotation_matrix) self.twists.rotate(rotation_matrix) self.after_coordinates_changed() for chain in self.chains.values(): chain.transform([[1, 0, 0], [0, 1, 0], [0, 0, 1]], -offset) chain.transform(rotation_matrix.T, [0, 0, 0])
[docs] def after_coordinates_changed(self): # vposs is in the element coordinate system ==> does not have to be changed. #self.vposs = c.defaultdict( dict ) # Caching for virtual residues self.vbases = c.defaultdict(dict) self.vvecs = c.defaultdict(dict) self.v3dposs= c.defaultdict(dict) self.vinvs = c.defaultdict(dict)

Contents