forgi 2.0.0 documentation

Contents

Source code for forgi.graph.bulge_graph

#!/usr/bin/env python

"""bulge_graph.py: A graph representation of RNA secondary structure based
   on its decomposition into primitive structure types: stems, hairpins,
   interior loops, multiloops, etc..."""

from __future__ import absolute_import, unicode_literals
from __future__ import print_function
from __future__ import division
from builtins import (ascii, bytes, chr, dict, filter, hex, input,   # pylint: disable=W0611
                      map, next, oct, pow, range, round,   # pylint: disable=W0611
                      str, super, zip, object)   # pylint: disable=W0611

import sys
import collections as col
import random
import re
import itertools as it
import warnings
import math
import logging
from pprint import pformat

import numpy as np

from logging_exceptions import log_to_exception, log_at_caller

from ..utilities import stuff as fus
from ..utilities.exceptions import GraphConstructionError, GraphIntegrityError
from .sequence import Sequence, _insert_breakpoints_simple, SequenceLoader, _seq_ids_from_seq_str, VALID_CHAINIDS
from . import transform_graphs as fgt
from .residue import RESID
from ._basegraph import BaseGraph
from ._graph_construction import _BulgeGraphConstruction
from . import _cofold as fgc

log = logging.getLogger(__name__)


try:
    profile  # The @profile decorator from line_profiler (kernprof)
except: # pylint: disable=W0702
[docs] def profile(x): return x
[docs]class BulgeGraph(BaseGraph): def __init__(self, graph_construction, seq_obj, name=None, infos=None, _dont_split=False): # pylint: disable=W0231 """ A bulge graph object. Users of the forgi library should use the provided factory functions instead of invoking BulgeGraph() directly! :param graph_construction: A forgi.graph._graph_construction.Graph-construction instance that contains defines and edges. :param seq_obj: A forgi.graph.sequence.Sequence instance :param name: A string :param infos: A dict of lists """ # :param _dont_split: Internal, used if the graph-construction is already splitted # e.g. for graph transformations. # TODO: It would be better to move split_at_cofold_cutpoints # Somewhere out of BulgeGraph. self.ang_types = None self.mst = None self.build_order = None if name is None: self.name = "untitled" log.info("No name given. name was set to %s", self.name) else: log.debug("Setting name to %s", name) self.name = name #: The coarse grain element definitions: Keys are for example 's1'/ 'm2'/ 'h3'/ 'f1'/ 't1' #: Values are the positions in the sequence (1D-coordinate) of start , end, ... self.defines = {} self.defines.update(graph_construction.defines) self.edges = col.defaultdict(set) self.edges.update(graph_construction.edges) self.longrange = col.defaultdict(set) # Some cached values: self.nx_graph = None self.nuc_bp_dists = None self._elem_bp_dists = {} self._node_to_resnum = {} # Additional infos as key-value pairs are stored here. self.infos = col.defaultdict(list) if infos is not None: self.infos.update(infos) self._seq = seq_obj if not _dont_split: fgc.split_at_cofold_cutpoints(self, self.seq.backbone_breaks_after) ############################################################################ # Factory functions. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @classmethod def from_dotbracket(cls, dotbracket_str, seq=None, name=None, dissolve_length_one_stems=False, remove_pseudoknots=False): """ Create a BulgeGraph object from a dotbracket string. :param dotbracket_str: A string :param seq: A string, with the same length as the dotbracket string, a forgi.graph.sequence.Sequence instance or None. If it is None, the sequence will be all 'N's :param name: Optional string to use as molecule name. """ log.debug("From dotbracket %s", dotbracket_str) pt = fus.dotbracket_to_pairtable(dotbracket_str) tuples = fus.pairtable_to_tuples(pt) if not isinstance(seq, Sequence): if seq is None: seq = _seq_of_Ns_from_db(dotbracket_str) seq_ids = _seq_ids_from_seq_str(seq) seq = Sequence(seq, seq_ids) graph_constr = _BulgeGraphConstruction(tuples) log.debug("Defines are %s", graph_constr.defines) bg = cls(graph_constr, seq, name) return _cleaned_bg(bg, dissolve_length_one_stems, remove_pseudoknots)
[docs] @classmethod def from_ct_string(cls, ct_string, dissolve_length_one_stems=False, remove_pseudoknots=False): """ Create the graph from a string holding a connectivity table. See http://x3dna.org/highlights/dssr-derived-secondary-structure-in-ct-format """ lines = ct_string.splitlines() lines = [line for line in lines if line] header = lines[0].strip() num_nts = int(header.split()[0]) name = header[num_nts:].strip() haswarned_circ = False seq = "" tuples = [] breakpoints = [] seq_ids = [] for i, line in enumerate(lines): line = line.strip() if not line: continue if i == 0: continue fields = line.split() seq += fields[1] pos = int(fields[0]) if pos != i: if pos > i and len(seq) == num_nts: log.warning( "Ignoring alternative structure in ct-file. Using only the first structure") break else: raise ValueError( "Missing residue number {}, foun {} instead".format(i, pos)) seqid = int(fields[5]) if seq_ids and pos - 1 not in breakpoints and seq_ids[-1][1][1] == seqid: if seq_ids[-1][1][2] == " ": insertion = "A" else: insertion = chr(ord(seq_ids[-1][1][2]) + 1) else: insertion = " " seq_ids.append( RESID(VALID_CHAINIDS[len(breakpoints)], (" ", int(fields[5]), insertion))) tuples.append((int(fields[0]), int(fields[4]))) next_nt = int(fields[3]) if next_nt < pos: if next_nt != 0: if not haswarned_circ: log.warning( "Circular RNA not supported. Treating it as non-circular.") haswarned_circ = True breakpoints.append(pos) prev_nt = int(fields[2]) if prev_nt != pos - 1: if pos - 1 not in breakpoints: raise ValueError("3rd and 4th column not consistent in ct-file " "at pos {}. Expecting residue at {} to have " "next-residue set to 0 or the beginning " "of the chain.".format(pos, pos - 1)) if prev_nt > pos: if not haswarned_circ: log.warning( "Circular RNA not supported. Treating it as non-circular.") haswarned_circ = True if len(seq) != num_nts: raise ValueError( "Insufficient nts ({}) present in ct file, expecting {}.".format(len(seq), num_nts)) seq = _insert_breakpoints_simple(seq, breakpoints, 1) print(seq_ids) seq = Sequence(seq, seq_ids) graph_constr = _BulgeGraphConstruction(tuples) bg = cls(graph_constr, seq, name) bg = _cleaned_bg(bg, dissolve_length_one_stems, remove_pseudoknots) return bg
[docs] @classmethod def from_bpseq_str(cls, bpseq_str, breakpoints=(), name=None, dissolve_length_one_stems=False, remove_pseudoknots=False): """ Create the graph from a string listing the base pairs. The string should be formatted like so: 1 G 115 2 A 0 3 A 0 4 U 0 5 U 112 6 G 111 :param bpseq_str: The string, containing newline characters. :param breakpoints: A list of positions, after which there is a backbone break. :return: A new BulgeGraph object. """ tuples, seq_str = fus.bpseq_to_tuples_and_seq(bpseq_str) seq_str = _insert_breakpoints_simple(seq_str, breakpoints, 1) seq_ids = _seq_ids_from_seq_str(seq_str) seq = Sequence(seq_str, seq_ids) graph_constr = _BulgeGraphConstruction(tuples) bg = cls(graph_constr, seq, name) bg = _cleaned_bg(bg, dissolve_length_one_stems, remove_pseudoknots) if log.isEnabledFor(logging.INFO): log.info("From bpseq_str: Secondary structure: %s", bg.to_dotbracket_string()) return bg
[docs] @classmethod def from_bg_file(cls, bg_file): """ Load a BulgeGraph from a file containing a text-based representation. :param bg_file: The filename. :return: A bulge Graph. """ with open(bg_file, 'r') as f: bg_string = f.read() return cls.from_bg_string(bg_string)
[docs] @classmethod def from_bg_string(cls, bg_str): """ Create a BulgeGraph from the string created by the method to_bg_string. :param bg_str: The string representation of a BugleGraph. :returns: A BulgeGraphObject """ # Initialize an empty BulgeGraph. (this is cumbersome by design) class DummyGraphConstr: defines = {} edges = {} log.debug("Creating empty BG") bg = cls(DummyGraphConstr(), Sequence("", [])) log.debug("Now loading BG") lines = bg_str.split('\n') length = None seq_loader = SequenceLoader() for line in lines: line = line.strip() parts = line.split() if len(parts) == 0: # blank line continue if seq_loader.consume_fields(parts): continue if parts[0] == 'length': length = int(parts[1]) elif parts[0] == 'define': bg.defines[parts[1]] = list(map(int, parts[2:])) elif parts[0] == 'connect': for p in parts[2:]: bg.edges[parts[1]].add(p) bg.edges[p].add(parts[1]) elif parts[0] == 'name': bg.name = parts[1].strip() elif parts[0] == 'info': bg.infos[parts[1]].append(" ".join(parts[2:])) bg._seq = seq_loader.sequence return bg
[docs] @classmethod def from_fasta_text(cls, fasta_text, dissolve_length_one_stems=False, remove_pseudoknots=False): """ Create one or more Bulge Graphs from some fasta text. :returns: A list of BulgeGraphs """ # compile searches for the fasta id, sequence and # secondary structure respectively id_search = re.compile(r'>(.+)') seq_search = re.compile(r'^([acgutACGUT&]+)$') stru_search = re.compile(r'^([(){}<>.A-Za-z&\[\]]+)$') prev_id = None prev_seq = None prev_struct = None curr_id = None bgs = [] for i, line in enumerate(fasta_text.split('\n')): # newlines suck line = line.strip() # We allow comments if line.startswith("#"): continue # find out what this line contains id_match = id_search.match(line) seq_match = seq_search.match(line) stru_match = stru_search.match(line) if id_match is not None: prev_id = curr_id # we found an id, check if there's a previous # sequence and structure, and create a BG curr_id = id_match.group(1) if prev_seq is None and prev_struct is None: # must be the first sequence/structure continue if prev_struct is None: raise GraphConstructionError( "No structure for id: {}".format(prev_id)) bg = cls.from_dotbracket(prev_struct, prev_seq, name=prev_id, dissolve_length_one_stems=dissolve_length_one_stems, remove_pseudoknots=remove_pseudoknots) bgs.append(bg) prev_seq = None prev_struct = None prev_id = None if seq_match is not None: curr_seq = seq_match.group(1) if "t" in curr_seq or "T" in curr_seq: warnings.warn( "Original sequence contained T. All occurrences of T/t were replaced by U/u respectively!") curr_seq = curr_seq.replace("T", "U") curr_seq = curr_seq.replace("t", "u") if prev_seq: prev_seq += curr_seq else: prev_seq = curr_seq if id_match is None and seq_match is None: if stru_match: if prev_struct: prev_struct += line else: prev_struct = line elif line: raise GraphConstructionError( "Cannot parse line {}: '{}' is neither sequence, nor structure, nor name (starting with '>'), nor comment (starting with '#').".format(i, line)) if prev_struct is None: raise GraphConstructionError( "Error during parsing of fasta file. No structure found for id {} and sequence {}".format(prev_id, prev_seq)) bg = cls.from_dotbracket(prev_struct, prev_seq, name=curr_id, dissolve_length_one_stems=dissolve_length_one_stems, remove_pseudoknots=remove_pseudoknots) bgs.append(bg) return bgs
[docs] @classmethod def from_fasta(cls, filename, dissolve_length_one_stems=False): """ Return a list of BulgeGraphs from a fasta file. """ with open(filename) as f: fasta_text = f.read() return cls.from_fasta_text(fasta_text, dissolve_length_one_stems)
############################################################################ # Convert this object to different file formats. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def to_bg_string(self): """ Output a string representation that can be stored and reloaded. """ out_str = '' out_str += self._get_name_str() out_str += self._get_length_str() out_str += self.seq.get_bg_str() out_str += self._get_define_str() out_str += self._get_connect_str() out_str += self._get_info_str() return out_str
[docs] def to_file(self, filename): with open(filename, 'w') as f: out_str = self.to_bg_string() f.write(out_str)
[docs] def to_element_string(self, with_numbers=False): """ Create a string similar to dotbracket notation that identifies what type of element is present at each location. For example the following dotbracket: ..((..)).. Should yield the following element string: ffsshhsstt Indicating that it begins with a fiveprime region, continues with a stem, has a hairpin after the stem, the stem continues and it is terminated by a threeprime region. :param with_numbers: show the last digit of the element id in a second line.:: (((.(((...)))))) Could result in:: sssissshhhssssss 0000111000111000 Indicating that the first stem is named 's0', followed by 'i0',' s1', 'h0', the second strand of 's1' and the second strand of 's0' """ log.debug("To element_string from defines %s", self.defines) output_str = [' '] * (self.seq_length + 1) output_nr = [' '] * (self.seq_length + 1) for d in self.defines.keys(): for resi in self.define_residue_num_iterator(d, adjacent=False): output_str[resi] = d[0] output_nr[resi] = d[-1] if with_numbers: return "".join(output_str).strip() + "\n" + "".join(output_nr).strip() else: return "".join(output_str).strip()
[docs] def to_neato_string(self): # The different nodes for different types of bulges node_lines = dict() fontsize = 20 out = [] out.append("graph G {") out.append("\tgraph [overlap=false,splines=true];") out.append("\tnode [shape=box];") for key2 in self.defines.keys(): # Create the nodes with a different color for each type of element if key2[0] == 's': out.append('\t{node [style=filled,fillcolor="#B3E2CD",fontsize=%d,label=\"%s\\n(%d)\"] %s};' % ( fontsize, key2, self.stem_length(key2), key2)) continue elif key2[0] == 'i': out.append( '\t{node [style=filled,shape=circle,fillcolor="#FFF2AE",fontsize=%d' % (fontsize)) elif key2[0] == 'm': out.append( '\t{node [style=filled,shape=circle,fillcolor="#F4CAE4",fontsize=%d' % (fontsize)) elif key2[0] == 'f': out.append( '\t{node [style=filled,shape=circle,fillcolor="#FDCDAC",fontsize=%d' % (fontsize)) elif key2[0] == 't': out.append( '\t{node [style=filled,shape=circle,fillcolor="#E6F5C9",fontsize=%d' % (fontsize)) else: out.append( '\t{node [style=filled,shape=circle,fillcolor="#CBD5E8",fontsize=%d' % (fontsize)) out[-1] += ',label=\"%s \\n' % (key2) # figure out the size of the node and use that as a label node_dims = self.get_node_dimensions(key2) total_bulge = sum(node_dims) if node_dims[0] == -1 or node_dims[0] == 1000: node_dims = "({})".format(node_dims[1]) elif node_dims[1] == -1 or node_dims[1] == 1000: node_dims = "({})".format(node_dims[0]) log.info("Dims of node %s are %r", key2, node_dims) out[-1] += str(node_dims) # make bigger interior loops visually bigger width = math.sqrt(1.5 * total_bulge / 10.0) height = width if key2[0] == 'i': out[-1] += "\",width=%f,heigh=%f] %s};" % (width, height, key2) else: out[-1] += "\"] %s};" % (key2) for key1 in self.edges: if key1[0] == 's': for key2 in self.edges[key1]: out.append("\t%s -- %s;" % (key1, key2)) out.append("}") return "\n".join(out)
[docs] def to_pair_table(self): """ Create a pair table from the list of elements. The first element in the returned list indicates the number of nucleotides in the structure. i.e. [5,5,4,0,2,1] """ pair_tuples = self.to_pair_tuples() return fus.tuples_to_pairtable(pair_tuples)
[docs] def to_pair_tuples(self, remove_basepairs=None): """ Create a list of tuples corresponding to all of the base pairs in the structure. Unpaired bases will be shown as being paired with a nucleotide numbered 0. i.e. [(1,5),(2,4),(3,0),(4,2),(5,1)] :param remove_basepairs: A list of 2-tuples containing basepairs that should be removed """ # iterate over each element table = [] for d in self.defines: # iterate over each nucleotide in each element for b in self.define_residue_num_iterator(d): p = self.pairing_partner(b) if p is None: p = 0 table.append((b, p)) if remove_basepairs: nt = [] for p in table: to_add = p for s in remove_basepairs: if sorted(p) == sorted(s): to_add = (p[0], 0) break nt += [to_add] table = nt return table
[docs] def to_bpseq_string(self): """ Create a bpseq string from this structure. """ out_str = '' for i in range(1, self.seq_length + 1): pp = self.pairing_partner(i) if pp is None: pp = 0 out_str += "{} {} {}\n".format(i, self.seq[i], pp) return out_str
[docs] def to_dotbracket_string(self, include_missing=False): """ Convert the BulgeGraph representation to a dot-bracket string and return it. :return: A dot-bracket representation of this BulgeGraph """ pt = self.to_pair_table() db_string = fus.pairtable_to_dotbracket(pt) if include_missing: db_string = self.seq.with_missing.update_dotbracket(db_string) else: for breakpoint in reversed(sorted(self.backbone_breaks_after)): db_string = db_string[:breakpoint] + \ "&" + db_string[breakpoint:] return db_string
[docs] def to_fasta_string(self, include_missing=False): """ Output the BulgeGraph representation as a fast string of the format:: >id AACCCAA ((...)) :param include_missing: Whether or not residues for which no structure information is present should be included in the output. """ if include_missing: seq = str(self.seq.with_missing) else: seq = self.seq return ">{}\n{}\n{}".format(self.name, seq, self.to_dotbracket_string(include_missing))
[docs] def to_networkx(self): """ Convert this graph to a networkx representation. This representation will contain all of the nucleotides as nodes and all of the base pairs as edges as well as the adjacent nucleotides. """ import networkx as nx G = nx.Graph() residues = [] for d in self.defines: prev = None for r in self.define_residue_num_iterator(d): G.add_node(r) residues += [r] # Add links along the backbone residues.sort() prev = None for r in residues: if prev is not None: G.add_edge(prev, r) prev = r # Add links along basepairs for s in self.stem_iterator(): for (f, t) in self.stem_bp_iterator(s): G.add_edge(f, t) return G
########################################################################### # Helper functions only used for conversion #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _get_single_define_str(self, key): """ Get a define string for a single key. """ return "define {} {}".format(key, " ".join([str(d) for d in self.defines[key]])) def _get_define_str(self): """ Convert the defines into a string. Format: define [name] [start_res1] [end_res1] [start_res2] [end_res2] """ defines_str = '' # a method for sorting the defines def define_sorter(k): drni = self.define_residue_num_iterator(k, adjacent=True) return next(drni) # .next() for key in sorted(self.defines.keys(), key=define_sorter): defines_str += self._get_single_define_str(key) # defines_str += "define %s %s" % ( key, " ".join([str(d) for d in self.defines[key]])) defines_str += '\n' return defines_str def _get_length_str(self): return "length " + str(self.seq_length) + '\n' def _get_info_str(self): out = "" for info in self.infos: for value in self.infos[info]: out += "info " + info + " " + value + "\n" return out def _get_connect_str(self): """ Get the connections of the bulges in the graph. Format: connect [from] [to1] [to2] [to3] """ whole_str = '' for key in sorted(self.edges): if len(self.edges[key]) == 0: continue # Our graph will be defined by the stems and the bulges they connect to name = key if name[0] == 's': out_str = "connect {}".format(name) for dest in self.connections(key): out_str += " {}".format(dest) whole_str += out_str whole_str += '\n' return whole_str def _get_name_str(self): """ Return the name of this structure along with its keyword: name 1y26 """ return "name {}\n".format(self.name) ############################################################################ # Descriptors of the BulgeGraph as a whole #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @property def seq_length(self): return len(self.seq) @property def seq(self): return self._seq @property def backbone_breaks_after(self): return self.seq.backbone_breaks_after
[docs] def get_domains(self): """ Get secondary structure domains. Currently domains found are: * multiloops (without any connected stems) * rods: stretches of stems + interior loops (without branching), with trailing hairpins * pseudoknots """ domains = col.defaultdict(list) multiloops = self.find_mlonly_multiloops() for ml in multiloops: ml = sorted(ml) if self.is_loop_pseudoknot(ml): domains["pseudoknots"].append(ml) else: domains["multiloops"].append(ml) doublestr = [] for s in self.stem_iterator(): neighbors = self.edges[s] for region in doublestr: if s in region or any(n in region for n in neighbors): curr_region = region curr_region.add(s) break else: doublestr.append(set([s])) curr_region = doublestr[-1] for n in neighbors: if n[0] in "sih": curr_region.add(n) # print(doublestr) while True: for reg1, reg2 in it.combinations(doublestr, 2): if reg1 & reg2: doublestr.remove(reg1) doublestr.remove(reg2) doublestr.append(reg1 | reg2) break else: break for region in doublestr: domains["rods"].append( sorted(region, key=self.define_a)) domains["pseudoknots"].sort() domains["multiloops"].sort() domains["rods"].sort() # print(domains) return domains
############################################################################ # Domains 2.0 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @property def junctions(self): """ Get all regular multiloops of this structure. :return: A list of tuples of multiloop segments. Each tuple contains the segments of one regular (i.e. not pseudoknotted) multiloop. """ out = [] multiloops = self.find_mlonly_multiloops() for ml in multiloops: if self.is_loop_pseudoknot(ml): continue else: out.append(ml) return out @property def rods(self): domains = self.get_domains() return domains["rods"] ############################################################################ # Descriptors of individual elements #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def element_length(self, key): """ Get the number of residues that are contained within this element. :param key: The name of the element. """ d = self.defines[key] length = 0 for i in range(0, len(d), 2): length += d[i + 1] - d[i] + 1 return length
[docs] def stem_length(self, key): """ Get the length of a particular element. If it's a stem, it's equal to the number of paired bases. If it's an interior loop, it's equal to the number of unpaired bases on the strand with less unpaired bases. If it's a multiloop, then it's the number of unpaired bases. """ d = self.defines[key] if key[0] == 's' or key[0] == 'y': return (d[1] - d[0]) + 1 elif key[0] == 'f': return self.get_bulge_dimensions(key)[0] elif key[0] == 't': return self.get_bulge_dimensions(key)[1] elif key[0] == 'h': return self.get_bulge_dimensions(key)[0] else: return min(self.get_bulge_dimensions(key))
[docs] def define_a(self, elem): # Special case, because interior loops can have # defines of length 2 or 4 if elem[0] == "i": conns = self.connections(elem) s1 = self.defines[conns[0]] s2 = self.defines[conns[1]] return [s1[1], s2[0], s2[3], s1[2]] else: # The following may call this classes # _define_a_nonzero implementation. return super(BulgeGraph, self).define_a(elem)
[docs] def is_single_stranded(self, node): """ Does this node represent a single-stranded region? Single stranded regions are five-prime and three-prime unpaired regions, multiloops, and hairpins .. warning:: Interior loops are never considered single stranded by this function. :param node: The name of the node :return: True if yes, False if no """ if node[0] == 'f' or node[0] == 't' or node[0] == 'm' or node[0] == 'h': return True else: return False
[docs] def get_node_dimensions(self, node, with_missing=False): """ Return the dimensions of a node. If the node is a stem, then the dimensions will be l where l is the length of the stem. Otherwise, see get_bulge_dimensions(node) :param node: The name of the node :return: A pair containing its dimensions """ if node[0] == 's': if with_missing: warnings.warn( "get_node_dimensions: 'with_missing'-flag is currently ignored for stems!") return (self.stem_length(node), self.stem_length(node)) else: bd = self.get_bulge_dimensions(node, with_missing) log.debug("BulgeDimensions of %s are %s", node, bd) return bd
[docs] def get_bulge_dimensions(self, bulge, with_missing=False): """ Return the dimensions of the bulge. If it is single stranded it will be (x, -1) for h,t,f or (x, 1000) for m. Otherwise it will be (x, y). :param bulge: The name of the bulge. :return: A pair containing its dimensions """ if bulge[0] == "s": raise ValueError("Stems are not allowed in get_bulge_dimensions") bd = self.defines[bulge] c = self.connections(bulge) if with_missing: get_define_len = self.seq.with_missing.define_length else: get_define_len = self.seq.define_length if bulge[0] == 'i': # if this interior loop only has one unpaired region # then we have to find out if it's on the 5' strand or # the 3' strand # Example: # s1 1 3 # 23 25 # s2 5 10 # 15 20 s1 = self.defines[c[0]] s2 = self.defines[c[1]] log.debug("Getting bulge dimensions for %s", bulge) return get_define_len([s1[1] + 1, s2[0] - 1]), get_define_len([s2[3] + 1, s1[2] - 1]) else: if bd: dim0 = get_define_len([bd[0], bd[1]]) else: dim0 = 0 if bulge[0] == "m": dim1 = 1000 else: dim1 = -1 return dim0, dim1
[docs] def get_length(self, vertex): """ Get the minimum length of a vertex. If it's a stem, then the result is its length (in base pairs). If it's a bulge, then the length is the smaller of it's dimensions. :param vertex: The name of the vertex. """ if vertex[0] == 's': return abs(self.defines[vertex][1] - self.defines[vertex][0]) + 1 else: if len(self.edges[vertex]) == 1: return self.defines[vertex][1] - self.defines[vertex][0] + 1 else: dims = list(self.get_bulge_dimensions(vertex)) dims.sort() if vertex[0] == 'i': return sum(dims) / float(len(dims)) else: return min(dims)
############################################################################ # Private functions related to descriptors #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _define_a_nonzero(self, elem): """ Get a define including the adjacent nucleotides. """ log.debug("define_a nonzero of BulgeGraph called for %s", elem) define = self.defines[elem] new_def = [] for i in range(0, len(define), 2): if define[i] - 1 in self.seq.backbone_breaks_after: log.debug("Left-side adjacent nt is in " "backbone-breaks: %s in %s", define[i] - 1, self.seq.backbone_breaks_after) new_def.append(define[i]) else: new_def.append(max(define[i] - 1, 1)) if define[i + 1] in self.seq.backbone_breaks_after: new_def.append(define[i + 1]) else: new_def.append(min(define[i + 1] + 1, self.seq_length)) return new_def def _all_connections(self, elem): """ Return the connected elements in order along the backbone. The difference to self.connections is that the returned list always contains as many elements, as the define of elem has numbers. If there is no connected element at this side,the returned list contains None. If elem is a stem connected to a hairpin or interior loop, this loop will be contained twice in the resulting output list. """ connections = [] # To correctly account for 0-length elements, we have to treat stems seperately. if elem[0] != "s": for nt in self.define_a(elem): neighbor = self.get_node_from_residue_num(nt) if neighbor == elem: connections.append(None) else: connections.append(neighbor) else: connections = [None, None, None, None] for neighbor in self.edges[elem]: for side in [0, 1, 2, 3]: if any(x == self.defines[elem][side] for x in self.define_a(neighbor)): connections[side] = neighbor return connections ############################################################################
[docs] def get_elem(self, position): """ Get the secondary structure element from a nucleotide position :param position: An integer or a fgr.RESID instance, describing the nucleotide number. """ if isinstance(position, RESID): position = self.seq.to_integer(position) if position not in self._node_to_resnum: self._node_to_resnum[position] = super( BulgeGraph, self).get_node_from_residue_num(position) return self._node_to_resnum[position]
[docs] def get_node_from_residue_num(self, base_num): """ USE get_elem instead. """ return self.get_elem(base_num)
@property def transformed(self): return fgt.BGTransformer(self)
[docs] def add_info(self, key, value): self.infos[key].append(value)
[docs] def stem_bp_iterator(self, stem, seq_ids=False): """ Iterate over all the base pairs in the stem. """ assert stem[0] == "s" d = self.defines[stem] stem_length = self.stem_length(stem) for i in range(stem_length): if seq_ids: yield self.seq.to_resid(d[0] + i), self.seq.to_resid(d[3] - i) else: yield (d[0] + i, d[3] - i)
[docs] def stem_iterator(self): """ Iterator over all of the stems in the structure. """ for d in self.defines.keys(): assert d[0] in "ftsmih", "stem_iterator should only be called after relabelling of nodes during GraphConstruction" if d[0] == 's': yield d
[docs] def pairing_partner(self, nucleotide_number): """ Return the base pairing partner of the nucleotide at position nucleotide_number. If this nucleotide is unpaired, return None. :param nucleotide_number: The position of the query nucleotide in the sequence or a RESID instance. :return: The number of the nucleotide base paired with the one at position nucleotide_number. """ for d in self.stem_iterator(): if isinstance(nucleotide_number, RESID): bp_iter = self.stem_bp_iterator(d, seq_ids=True) else: bp_iter = self.stem_bp_iterator(d) for (r1, r2) in bp_iter: if r1 == nucleotide_number: return r2 elif r2 == nucleotide_number: return r1 return None
[docs] def iterate_over_seqid_range(self, start_id, end_id): """ Iterate over the seq_ids between the start_id and end_id. """ for res in self.seq.iter_resids(start_id, end_id): # TODO: replace with yield from, once we drop python 2 support yield res
[docs] def seq_id_to_pos(self, seq_id): """ Convert a pdb seq_id to a 1-based nucleotide position :param seq_id: An instance of RESID """ assert isinstance(seq_id, RESID), "{} of type {} is not RESID".format( repr(seq_id), type(seq_id).__name__) return self.seq.to_integer(seq_id)
[docs] def shortest_bg_loop(self, vertex): """ Find a shortest loop containing this node. The vertex should be a multiloop. :param vertex: The name of the vertex to find the loop. :return: A list containing the elements in the shortest cycle. """ log.debug("Starting shortest BG loop for %s", vertex) G = self.to_networkx() log.debug("nx graph %r with edges %r ", G, G.adj) # use the nucleotide in the middle of this element as the starting point residues = sorted( list(self.define_residue_num_iterator(vertex, adjacent=True))) mid_res = residues[len(residues) // 2] log.debug("mid_residue %s", mid_res) if len(residues) == 2: # no-residue multiloop # find the neighbor which isn't part of the multiloop neighbors = [n for n in G.neighbors(mid_res) if n != residues[0]] if len(neighbors) == 2: # break the chain so that we don't get cycles within a stem for n in neighbors: if abs(n - mid_res) == 1: G.remove_edge(n, mid_res) break import forgi.utilities.graph as fug path = fug.shortest_cycle(G, mid_res) log.debug("Shortest cycle is %s", path) return path
def _chain_start_end(self, pos): if pos not in self.backbone_breaks_after: if pos == self.seq_length: if self.backbone_breaks_after: return self.backbone_breaks_after[-1] else: return 1 else: raise ValueError( "Pos {} is not at the end of a chain.".format(pos)) else: i = self.backbone_breaks_after.index(pos) if i == 0: return 1 else: return self.backbone_breaks_after[i - 1] + 1
[docs] @profile def get_next_ml_segment(self, ml_segment): """ Get the adjacent multiloop-segment (or 3' loop) next to the 3' side of ml_segment. If there is no other single stranded RNA after the stem, the backbone must end there. In that case return None. """ log.debug("get_next_ml_segment called for %s", ml_segment) if ml_segment.startswith("t"): return None else: if ml_segment[0] in "mf": f = self.define_a(ml_segment)[-1] else: raise ValueError("{} is not a multiloop".format(ml_segment)) # The stem following the ml-segment s = self.get_node_from_residue_num(f) if s == ml_segment: # The Cg consists of only a single f-element. assert len(self.defines) == 1 return None side_stem, _ = self._get_sides_plus(s, ml_segment) # Get the stem-side where we expect to find the next ML-segment if side_stem == 0: side_stem = 3 elif side_stem == 3: side_stem = 0 elif side_stem == 1: side_stem = 2 elif side_stem == 2: side_stem = 1 else: assert False log.debug("flanking_nuc_at_stem_side called for %s, side %s with defines %s.", s, side_stem, self.defines[s]) ml_nuc = self.flanking_nuc_at_stem_side(s, side_stem) log.debug("ml_nucleotide is %s (sequence length is %s).", ml_nuc, self.seq_length) # End of the backbone if ml_nuc > self.seq_length or ml_nuc - 1 in self.backbone_breaks_after: return None elem = self.get_node_from_residue_num(ml_nuc) log.debug("side now %s, ml_nuc %s, ml %s", side_stem, ml_nuc, elem) if elem[0] == "s": # 0-length multiloop elems = self.edges[elem] & self.edges[s] for elem in elems: if self.element_length(elem) == 0: return elem assert False if elem[0] not in "mft": self.log() log.error("%s is not a multiloop node", elem) assert False return elem
[docs] def shortest_mlonly_multiloop(self, ml_segment): loops = self.find_mlonly_multiloops() for loop in loops: if ml_segment in loop: return loop
[docs] def flanking_nuc_at_stem_side(self, s, side): """ Return the nucleotide number that is next to the stem at the given stem side. :param side: 0, 1, 2 or 3, as returned by self._get_sides_plus :returns: The nucleotide position. If the stem has no neighbor at that side, 0 or self.seq_length+1 is returned instead. """ assert s[0] == "s", "{} is not a stem".format(s) stem_nuc = self.defines[s][side] if side == 0 or side == 2: return stem_nuc - 1 else: return stem_nuc + 1
[docs] def nucleotides_to_elements(self, nucleotides): """ Convert a list of nucleotides (nucleotide numbers) to element names. Remove redundant entries and return a set. ..note:: Use `self.get_node_from_residue_num` if you have only a single nucleotide number. """ return set([self.get_node_from_residue_num(n) for n in nucleotides])
[docs] def elements_to_nucleotides(self, elements): """ Convert a list of element names to a list of nucleotide numbers. Remove redundant entries. """ nucs = set() for elem in elements: for def_range in self.define_range_iterator(elem, adjacent=False): for nuc in range(def_range[0], def_range[1] + 1): nucs.add(nuc) return sorted(nucs)
[docs] def find_bulge_loop(self, vertex, max_length=4): """ Find a set of nodes that form a loop containing the given vertex and being no greater than max_length nodes long. :param vertex: The vertex to start the search from. :param max_length: Only fond loops that contain no more then this many elements :returns: A list of the nodes in the loop. """ visited = set() to_visit = [(key, 1) for key in self.edges[vertex]] visited.add(vertex) in_path = [vertex] while len(to_visit) > 0: (current, depth) = to_visit.pop() visited.add(current) in_path = in_path[:depth] in_path.append(current) for key in self.edges[current]: if key == vertex and depth > 1: if len(in_path[:depth + 1]) > max_length: continue else: return in_path[:depth + 1] if key not in visited: to_visit.append((key, depth + 1)) return []
[docs] def length_one_stem_basepairs(self): """ Return a list of basepairs that correspond to length-1 stems. """ # dissolve all stems which have a length of one stems_to_dissolve = [ s for s in self.stem_iterator() if self.stem_length(s) == 1] if log.isEnabledFor(logging.WARNING): if len(stems_to_dissolve) == len(list(self.stem_iterator())) != 0: log.warning("All stems of the structure have length 1!") log.info("Stems with length 1: %s", stems_to_dissolve) bps_to_dissolve = [] for s in stems_to_dissolve: bps_to_dissolve.extend(self.stem_bp_iterator(s)) return bps_to_dissolve
[docs] def has_connection(self, v1, v2): """ Is there an edge between these two nodes """ if v2 in self.edges[v1]: return True else: # two multiloops can be connected at the end of a stem for e in self.edges[v1]: if e[0] != 's': continue if v2 in self.edges[e]: (s1b, s1e) = self.get_sides(e, v1) (s2b, s2e) = self.get_sides(e, v2) if s1b == s2b: return True return False
[docs] def connection_type(self, define, connections): """ Classify the way that two stems are connected according to the type of bulge that separates them. Potential angle types for single stranded segments, and the ends of the stems they connect: = = ====== =========== 1 2 (1, 1) #pseudoknot 1 0 (1, 0) 3 2 (0, 1) 3 0 (0, 0) = = ====== =========== :param define: The name of the bulge separating the two stems :param connections: The two stems and their separation :returns: INT connection type = ====================================================================== + positive values mean forward (from the connected stem starting at the lower nucleotide number to the one starting at the higher nuc. number) - negative values mean backwards. 1 interior loop 2 first multi-loop segment of normal multiloops and most pseudoknots 3 middle segment of a normal multiloop 4 last segment of normal multiloops and most pseudoknots 5 middle segments of pseudoknots = ====================================================================== """ if define[0] == 'i': # interior loop, we just have to check if # connections[0] < connections[1] if self.defines[connections[0]][0] < self.defines[connections[1]][0]: return 1 else: return -1 elif define[0] == 'm': (s1c, b1c) = self._get_sides_plus(connections[0], define) (s2c, b2c) = self._get_sides_plus(connections[1], define) if (s1c, s2c) == (1, 0): return 2 elif (s1c, s2c) == (0, 1): return -2 elif (s1c, s2c) == (3, 0): return 3 elif (s1c, s2c) == (0, 3): return -3 elif (s1c, s2c) == (2, 3): return 4 elif (s1c, s2c) == (3, 2): return -4 # the next two refer to pseudoknots elif (s1c, s2c) == (2, 1): return 5 elif (s1c, s2c) == (1, 2): return -5 else: raise GraphIntegrityError("Weird angle type: (s1c, s2c) = (%d, %d)" % (s1c, s2c)) else: raise ValueError( "connection_type called on non-interior loop/multiloop")
[docs] def connection_ends(self, connection_type): """ Find out which ends of the stems are connected by a particular angle type. :param connection_type: The angle type, as determined by which corners of a stem are connected :return: (s1e, s2b) 0 means the side of the stem with the lowest nucleotide, 1 the other side """ ends = () if abs(connection_type) == 1: ends = (1, 0) elif abs(connection_type) == 2: ends = (1, 0) elif abs(connection_type) == 3: ends = (0, 0) elif abs(connection_type) == 4: ends = (1, 0) elif abs(connection_type) == 5: ends = (1, 1) else: raise GraphIntegrityError( 'Unknown connection type: %d' % (connection_type)) if connection_type < 0: return ends[::-1] else: return ends
# This function seems to be unused. Consider deprecation...
[docs] def get_multiloop_nucleotides(self, multiloop_loop): """ Return a list of nucleotides which make up a particular multiloop. :param multiloop_loop: The elements which make up this multiloop :return: A list of nucleotides """ stems = [d for d in multiloop_loop if d[0] == 's'] multis = [d for d in multiloop_loop if d[0] == 'm'] residues = [] for s in stems: relevant_edges = [c for c in self.edges[s] if c in multiloop_loop] sides = [self._get_sides_plus(s, c)[0] for c in relevant_edges] sides.sort() # the whole stem is part of this multiloop if sides == [2, 3] or sides == [0, 1]: residues += list(range(self.defines[s] [sides[0]], self.defines[s][sides[1]] + 1)) else: residues += [self.defines[s][sides[0]], self.defines[s][sides[1]]] for m in multis: residues += self.define_residue_num_iterator(m, adjacent=False) return residues
[docs] @profile def find_mlonly_multiloops(self): import networkx as nx ml_graph = nx.Graph() for d in it.chain(self.mloop_iterator(), self.floop_iterator(), self.tloop_iterator()): next_ml = self.get_next_ml_segment(d) if next_ml is not None: ml_graph.add_edge(d, next_ml) else: ml_graph.add_node(d) loops = [] for comp in nx.connected_components(ml_graph): # Order along the cycle, in arbitrary direction. # We need to start at a node with only 1 connection, if present for x in comp: if len(ml_graph.edges(x)) == 1: st_node = x break else: st_node = x # Just take any node # Sort nodes along the cycle loop = list(nx.dfs_preorder_nodes( ml_graph.subgraph(comp), st_node)) # See if we need to reverse the order for i, l in enumerate(loop): next_l = self.get_next_ml_segment(l) if i + 1 < len(loop): if loop[i + 1] == next_l: break else: if loop[0] == next_l: break else: loop.reverse() # Find first node first = min(loop, key=lambda x: sorted( self.flanking_nucleotides(x))) first_i = loop.index(first) loop = loop[first_i:] + loop[:first_i] loops.append(tuple(loop)) return list(sorted(loops))
[docs] def describe_multiloop(self, multiloop): """ :param multiloop: An iterable of nodes (only "m", "t" and "f" elements) """ descriptors = set() all_stems = col.Counter() angle_types = col.Counter() for elem in multiloop: if elem[0] in "ft": descriptors.add("open") continue elif elem[0] != "m": raise ValueError( "Non multiloop element '{}' encountered in describe_multiloop.".format(elem)) conn = self.connections(elem) ctype = abs(self.connection_type(elem, conn)) angle_types[ctype] += 1 if ctype == 5: descriptors.add("pseudoknot") all_stems.update(self.edges[elem]) # Odd number of occurrences for 2 stems. if sum(v % 2 for v in all_stems.values()) == 2: descriptors.add("open") elif "open" not in descriptors: # if any(v!=2 for v in all_stems.values()): # print(all_stems) # print(multiloop) # print(self.to_dotbracket_string()) # print (self.to_element_string(True)) assert sum(v % 2 for v in all_stems.values()) == 0 if angle_types[2] == 1 and angle_types[4] == 1 and "pseudoknot" not in descriptors: descriptors.add("regular_multiloop") elif angle_types[2] > 1 or angle_types[4] > 1: descriptors.add("pseudoknot") elif "open" in descriptors and (angle_types[2] > 0 or angle_types[4] > 0): descriptors.add("pseudoknot") return descriptors
def _zerolen_defines_a_between(self, stem1, stem2): log.debug("Searching for zerolen-coordinates") zl_coordinates = set() for k, l in it.product(range(4), repeat=2): if abs(self.defines[stem1][k] - self.defines[stem2][l]) == 1: d = [self.defines[stem1][k], self.defines[stem2][l]] d.sort() log.debug("Zero-length element found: %s", d) if d[0] not in self.seq.backbone_breaks_after: zl_coordinates.add(tuple(d)) else: log.debug("But backbone-break encountered!") return zl_coordinates
[docs] def log(self, level=logging.DEBUG): with log_at_caller(log): log.log(level, self.seq) log.log(level, self.to_dotbracket_string()) es = self.to_element_string(with_numbers=True).split("\n") log.log(level, es[0]) log.log(level, es[1]) log.log(level, "DEFINES: %s", pformat(self.defines)) log.log(level, "EDGES: %s", pformat(self.edges))
[docs] def sorted_stem_iterator(self): """ Iterate over a list of the stems sorted by the lowest numbered nucleotide in each stem. """ stems = [d for d in self.defines if d[0] == 's'] stems.sort(key=lambda s: self.defines[s][0]) for s in stems: yield s
[docs] def sorted_element_iterator(self): """ Iterate over a list of the coarse grained elements sorted by the lowest numbered nucleotide in each stem. Multiloops with no nucleotide coordinates come last. """ elements = [d for d in self.defines] elements.sort( key=lambda s: self.defines[s][0] if self.defines[s] else 10000 + int(s[1:])) for e in elements: yield e
[docs] def adjacent_stem_pairs_iterator(self): """ Iterate over all pairs of stems which are separated by some element. This will always yield triples of the form (s1, e1, s2) where s1 and s2 are the stem identifiers and e1 denotes the element that separates them. """ for d in self.defines.keys(): if len(self.edges[d]) == 2: edges = list(self.edges[d]) if edges[0][0] == 's' and edges[1][0] == 's': yield (edges[0], d, edges[1])
[docs] def get_connected_residues(self, s1, s2, bulge=None): """ Get the nucleotides which are connected by the element separating s1 and s2. They should be adjacent stems. :param s1, s2: 2 adjacent stems :param bulge: Optional: The bulge seperating the two stems. If s1 and s2 are connected by more than one element, this has to be given, or a ValueError will be raised. (useful for pseudoknots) The connected nucleotides are those which are spanned by a single interior loop or multiloop. In the case of an interior loop, this function will return a list of two tuples and in the case of multiloops if it will be a list of one tuple. If the two stems are not separated by a single element, then return an empty list. """ c1 = self.edges[s1] c2 = self.edges[s2] # find out which edges they share common_edges = c1.intersection(c2) if len(common_edges) == 0: # not connected return [] if len(common_edges) > 1 and bulge is None: raise ValueError("Too many connections between the stems. " "Please provide the connectiong bulge you are interested in.") if bulge and bulge not in common_edges: raise ValueError( "{} does not connecty the stems {} and {}.".format(bulge, s1, s2)) # the element linking the two stems if bulge: conn = bulge else: conn = list(common_edges)[0] # find out the sides of the stems that face the bulge (s1b, s1e) = self.get_sides(s1, conn) (s2b, s2e) = self.get_sides(s2, conn) # get the nucleotides on the side facing the stem s1_nucleotides = self.get_side_nucleotides(s1, s1b) s2_nucleotides = self.get_side_nucleotides(s2, s2b) # find out the distances between all the nucleotides flanking # the bulge dists = [] for n1 in s1_nucleotides: for n2 in s2_nucleotides: dists += [(abs(n2 - n1), n1, n2)] dists.sort() # return the ones which are closest to each other first if conn[0] == 'i': return sorted([list(dists[0][1:]), list(dists[1][1:])]) else: return [list(dists[0][1:])]
[docs] def get_side_nucleotides(self, stem, side): """ Get the nucleotide numbers on the given side of them stem. Side 0 corresponds to the 5' end of the stem whereas as side 1 corresponds to the 3' side of the stem. :param stem: The name of the stem :param side: Either 0 or 1, indicating the 5' or 3' end of the stem :return: A tuple of the nucleotide numbers on the given side of the stem. """ if side == 0: return (self.defines[stem][0], self.defines[stem][3]) elif side == 1: return (self.defines[stem][1], self.defines[stem][2]) raise ValueError("Invalid side (%d) for the stem (%s)." % (stem, side))
[docs] def get_stem_edge(self, stem, pos): """ Returns the side (strand) of the stem that position is on. Side 0 corresponds to the 5' pairing residues in the stem whereas as side 1 corresponds to the 3' pairing residues in the stem. :param stem: The name of the stem :param pos: A position in the stem :return: 0 if pos on 5' edge of stem """ fp_side = self.get_side_nucleotides(stem, 0) tp_side = self.get_side_nucleotides(stem, 1) fp_edge = range(fp_side[0], tp_side[0] + 1) tp_edge = range(tp_side[1], fp_side[1] + 1) if pos in fp_edge: return 0 elif pos in tp_edge: return 1 raise ValueError("Position (%d) not in stem (%s)." % (pos, stem))
[docs] def get_sides(self, s1, b): """ Get the side of s1 that is next to b. s1e -> s1b -> b :param s1: The stem. :param b: The bulge. :return: A tuple indicating which side is the one next to the bulge and which is away from the bulge. """ s1d = self.defines[s1] bd = self.defines[b] # Special case if the bulge is a length 0 multiloop if len(bd) == 0: bd = self.define_a(b) bd[0] += 1 bd[1] -= 1 if bd[1] + 1 == s1d[0]: return (0, 1) elif bd[1] + 1 == s1d[2]: return (1, 0) elif s1d[1] + 1 == bd[0]: return (1, 0) elif s1d[3] + 1 == bd[0]: return (0, 1) else: raise GraphIntegrityError( "Faulty bulge {}:{} connected to {}:{}".format(b, bd, s1, s1d))
[docs] def stem_resn_to_stem_vres_side(self, stem, res): d = self.defines[stem] if res <= d[1]: assert res >= d[0] pos = res - d[0] side = 0 elif res <= d[3]: assert res >= d[2] pos = d[3] - res side = 1 else: raise ValueError( "Residue {} not in stem {} with define {}".format(res, stem, d)) return pos, side
[docs] def stem_side_vres_to_resn(self, stem, side, vres): """ Return the residue number given the stem name, the strand (side) it's on and the virtual residue number. """ d = self.defines[stem] if side == 0: return d[0] + vres else: return d[3] - vres
[docs] def hloop_iterator(self): """ Iterator over all of the hairpin in the structure. """ for d in self.defines.keys(): if d[0] == 'h': yield d
[docs] def mloop_iterator(self): """ Iterator over all of the multiloops in the structure. """ for d in self.defines.keys(): if d[0] == 'm': yield d
[docs] def iloop_iterator(self): """ Iterator over all of the interior loops in the structure. """ for d in self.defines.keys(): if d[0] == 'i': yield d
[docs] def floop_iterator(self): """ Yield the name of the 5' prime unpaired region if it is present in the structure. """ for d in self.defines.keys(): if d[0] == 'f': yield d
[docs] def tloop_iterator(self): """ Yield the name of the 3' prime unpaired region if it is present in the structure. """ for d in self.defines.keys(): if d[0] == 't': yield d
[docs] def get_define_seq_str(self, elem, adjacent=False): """ Get a list containing the sequences for the given define. :param d: The element name for which to get the sequences :param adjacent: Boolean. Include adjacent nucleotides (for single stranded RNA only) :return: A list containing the sequence(s) corresponding to the defines """ if adjacent: define = self.define_a(elem) else: define = self.defines[elem] seqs = [] for i in range(0, len(define), 2): seqs.append(self.seq[define[i]:define[i + 1]]) if elem[0] == "i" and not adjacent: def_a = self.define_a(elem) if define[0] < def_a[1]: seqs.append("") else: seqs.insert(0, "") return seqs
[docs] def get_multiloop_side(self, m): """ Find out which strand a multiloop is on. An example of a situation in which the loop can be on both sides can be seen in the three-stemmed structure below: (.().().) In this case, the first multiloop section comes off of the 5' strand of the first stem (the prior stem is always the one with a lower numbered first residue). The second multiloop section comess of the 3' strand of the second stem and the third loop comes off the 3' strand of the third stem. """ c = self.connections(m) p1 = self._get_sides_plus(c[0], m) p2 = self._get_sides_plus(c[1], m) return (p1[0], p2[0])
# This function seems to be unused here. This code is possible duplicated somewhere. # Requires cleanup.
[docs] def get_strand(self, multiloop): """ Get the strand on which this multiloop is located. :param multiloop: The name of the multiloop :return: 0 for being on the lower numbered strand and 1 for being on the higher numbered strand. """ conn = self.connections(multiloop) t = self.connection_type(multiloop, conn) if abs(t) == 2: return 1 elif abs(t) == 3: return 0 else: return 2
[docs] def get_flanking_region(self, bulge_name, side=0): """ If a bulge is flanked by stems, return the lowest residue number of the previous stem and the highest residue number of the next stem. :param bulge_name: The name of the bulge :param side: The side of the bulge (indicating the strand) """ c = self.connections(bulge_name) if bulge_name[0] == 'h': s1 = self.defines[c[0]] return (s1[0], s1[3]) s1 = self.defines[c[0]] s2 = self.defines[c[1]] if bulge_name[0] == 'i': # interior loop if side == 0: return (s1[0], s2[1]) else: return (s2[2], s1[3]) elif bulge_name[0] == 'm': ss = self.get_multiloop_side(bulge_name) st = [s1, s2] ends = [] # go through the two sides and stems and pick # the other end of the same strand for i, s in enumerate(ss): if s == 0: ends += [st[i][1]] elif s == 1: ends += [st[i][0]] elif s == 2: ends += [st[i][3]] elif s == 3: ends += [st[i][2]] else: raise GraphIntegrityError("Weird multiloop sides: %s" % bulge_name) ends.sort() return tuple(ends) # multiloop return (None, None)
[docs] def get_flanking_sequence(self, bulge_name, side=0): """ Return the sequence of a bulge and the adjacent strand of the adjacent stems. :param bulge_name: The name of the bulge, e.g. 'h0' :param side: Used for interior loops: The strand of interest (0=forward, 1=backward) """ if len(self.seq) == 0: raise ValueError( "No sequence present in the bulge_graph: %s" % (self.name)) (m1, m2) = self.get_flanking_region(bulge_name, side) return self.seq[m1:m2] # 1 based indexing
[docs] def get_flanking_handles(self, bulge_name, side=0): """ Get the indices of the residues for fitting bulge regions. So if there is a loop like so (between residues 7 and 16):: (((...)))) 7890123456 ^ ^ Then residues 9 and 13 will be used as the handles against which to align the fitted region. In the fitted region, the residues (2,6) will be the ones that will be aligned to the handles. :return: (orig_chain_res1, orig_chain_res1, flanking_res1, flanking_res2) """ f1 = self.get_flanking_region(bulge_name, side) c = self.connections(bulge_name) if bulge_name[0] == 'h': s1 = self.defines[c[0]] ab = [s1[1], s1[2]] return (ab[0], ab[1], ab[0] - f1[0], ab[1] - f1[0]) s1 = self.defines[c[0]] s2 = self.defines[c[1]] if bulge_name[0] == 'm': sides = self.get_multiloop_side(bulge_name) ab = [s1[sides[0]], s2[sides[1]]] ab.sort() return (ab[0], ab[1], ab[0] - f1[0], ab[1] - f1[0]) if bulge_name[0] == 'i': if side == 0: ab = [s1[1], s2[0]] else: ab = [s2[3], s1[2]] return (ab[0], ab[1], ab[0] - f1[0], ab[1] - f1[0]) # probably still have to include the 5' and 3' regions, but that # will come a little later return None
[docs] def are_adjacent_stems(self, s1, s2, multiloops_count=True): """ Are two stems separated by only one element. If multiloops should not count as edges, then the appropriate parameter should be set. :param s1: The name of the first stem :param s2: The name of the second stem :param multiloops_count: Whether to count multiloops as an edge linking two stems """ for e in self.edges[s1]: if not multiloops_count and e[0] == 'm': continue if s2 in self.edges[e]: return True return False
[docs] def random_subgraph(self, subgraph_length=None): """ Return a random subgraph of this graph. :return: A list containing a the nodes comprising a random subgraph """ if subgraph_length == None: subgraph_length = random.randint(1, len(self.defines)) start_node = random.choice(list(self.defines)) curr_length = 0 visited = set() next_nodes = [start_node] new_graph = [] while curr_length < subgraph_length: curr_node = random.choice(next_nodes) if curr_node[0] == 'i' or curr_node[0] == 'm': # if it's an interior loop or a multiloop, then we have to # add the adjacent stems for e in self.edges[curr_node]: if e in new_graph: continue visited.add(e) new_graph += [e] next_nodes += list(self.edges[e]) curr_length += 1 visited.add(curr_node) next_nodes += list(self.edges[curr_node]) next_nodes = [n for n in next_nodes if n not in visited] new_graph += [curr_node] curr_length += 1 # self.element_length(curr_node) return new_graph
[docs] def get_resseqs(self, define, seq_ids=True): """ Return the pdb ids of the nucleotides in this define. :param define: The name of this element. :param: Return a tuple of two arrays containing the residue ids on each strand """ #raise NotImplementedError("Removed. Use define_residue_num_iterator instead!") resnames = [] ranges = zip(*[iter(self.defines[define])] * 2) for r in ranges: strand_resnames = [] for x in range(r[0], r[1] + 1): if seq_ids: try: res_id = self.seq.to_resid(x) except IndexError as e: with log_to_exception(log, e): log.error("Index %s not in seq_ids.", (x - 1)) raise if hasattr(self, "chain") and self.chain is not None: assert res_id in self.chain strand_resnames.append(res_id) else: strand_resnames += [x] resnames += [strand_resnames] return resnames
def _insert_cutpoints_into_seq(self): if self.seq: for breakpoint in self.backbone_breaks_after: log.debug("Inserting breakpoint into seq '%s'", self.seq) self.seq = self.seq.subseq_with_cutpoints( 1, breakpoint + 1) + "&" + self.seq.subseq_with_cutpoints(breakpoint + 1, None) log.info("seq now has %s cutpoints", self.seq.count('&')) # This function seems to be dead code, but might be useful in the future. # Consider adding this to whitelist.py
[docs] def connected_stem_iterator(self): """ Iterate over all pairs of connected stems. """ for l in it.chain(self.mloop_iterator(), self.iloop_iterator()): edge_list = list(self.edges[l]) yield (edge_list[0], l, edge_list[1])
[docs] def sorted_edges_for_mst(self): """ Keep track of all linked nodes. Used for the generation of the minimal spanning tree. """ priority = {'s': 1, 'i': 2, 'm': 3, 'f': 4, 't': 5} edges = sorted(it.chain(self.mloop_iterator(), self.iloop_iterator()), key=lambda x: (priority[x[0]], min(self.get_node_dimensions(x)), x)) return edges
[docs] def get_mst(self): """ Create a minimum spanning tree from this BulgeGraph. This is useful for constructing a structure where each section of a multiloop is sampled independently and we want to introduce a break at the largest multiloop section. """ # keep track of all linked nodes edges = self.sorted_edges_for_mst() mst = set(it.chain(self.stem_iterator(), self.floop_iterator(), self.tloop_iterator(), self.hloop_iterator())) # store all of the disconnected trees forest = [set([m]) for m in mst] # get the tree containing a particular element def get_tree(elem): for t in forest: if elem in t: return t while len(edges) > 0: conn = edges.pop(0) neighbors = list(self.edges[conn]) # get the trees containing the neighbors of this node # the node should be an interior loop or multiloop so # the neighbors should necessarily be stems, 5' or 3' t1 = get_tree(neighbors[0]) t2 = get_tree(neighbors[1]) if len(set.intersection(t1, t2)) == 0: # if this node connects two disparate trees, then add it to the mst new_tree = t1.union(t2) forest.remove(t1) forest.remove(t2) forest.append(new_tree) mst.add(conn) return mst
[docs] def traverse_graph(self): """ Traverse the graph to get the angle types. The angle type depends on which corners of the stem are connected by the multiloop or internal loop. :returns: A list of triples (stem, loop, stem) """ if self.mst is None: self.mst = self.get_mst() build_order = [] to_visit = [('s0', 'start')] visited = set(['s0']) build_paths = col.defaultdict(list) while len(to_visit) > 0: to_visit.sort(key=lambda x: min(self.get_node_dimensions(x[0]))) (current, prev) = to_visit.pop(0) for e in self.edges[current]: if e not in visited and e in self.mst: # make sure the node hasn't been visited # and is in the minimum spanning tree to_visit.append((e, current)) build_paths[e] += [e] build_paths[e] += build_paths[current] visited.add(e) if current[0] != 's' and len(self.edges[current]) == 2: # multiloop or interior loop # overkill method of getting the stem that isn't # equal to prev next_stem = set.difference(self.edges[current], set([prev])) build_order += [(prev, current, list(next_stem)[0])] self.build_order = build_order return build_order
[docs] def set_angle_types(self): """ Fill in the angle types based on the build order """ if self.build_order is None: self.traverse_graph() self.ang_types = dict() for (s1, b, s2) in self.build_order: self.ang_types[b] = self.connection_type(b, [s1, s2])
[docs] def get_angle_type(self, bulge, allow_broken=False): """ Return what type of angle this bulge is, based on the way this would be built using a breadth-first traversal along the minimum spanning tree. :param allow_broken: How to treat broken multiloop segments. * False (default): Return None * True: Return the angle type according to the build-order (i.e. from the first built stem to the last-built stem) """ if self.ang_types is None: self.set_angle_types() if bulge in self.ang_types: return self.ang_types[bulge] else: if allow_broken: stems = self.connections(bulge) s1, s2 = sorted(stems, key=self.buildorder_of) return self.connection_type(bulge, [s1, s2]) else: return None
[docs] def buildorder_of(self, element): """ Returns the index into build_order where the element FIRST appears. :param element: Element name, a string. e.g. "m0" or "s0" :returns: An index into self.build_order or None, if the element is not part of the build_order (e.g. hairpin loops) """ if self.build_order is None: self.traverse_graph() for i, elements in enumerate(self.build_order): if element in elements: return i return None
[docs] def is_loop_pseudoknot(self, loop): """ Is a particular loop a pseudoknot? :param loop: A list of elements that are part of the loop (only m,f and t elements). :return: Either True or false """ return "pseudoknot" in self.describe_multiloop(loop)
[docs] def iter_elements_along_backbone(self, startpos=1): """ Iterate all coarse grained elements along the backbone. Note that stems are yielded twice (for forward and backward strand). Interior loops may be yielded twice or once (if one side has no nucleotide) 0-length multiloop-segments are correctly yielded. :param startpos: The nucleotide position at which to start :yields: Coarse grained element names, like "s0", "i0" """ nuc = startpos try: node = self.get_node_from_residue_num(nuc) except: assert len(self.defines) == 0 raise StopIteration("Empty Graph") while True: log.debug("Yielding node %s with edges %s", node, self.edges[node]) yield node if node[0] in "si": # The strand matters if node[0] == "s": strand = self.get_stem_edge(node, nuc) if strand == 0: # forward nuc = self.flanking_nuc_at_stem_side(node, 1) else: nuc = self.flanking_nuc_at_stem_side(node, 3) else: f1, f2, f3, f4 = sorted(self.flanking_nucleotides(node)) if f1 < nuc < f2: nuc = f2 elif f3 < nuc < f4: nuc = f4 else: assert False try: next_node = self.get_node_from_residue_num(nuc) except LookupError: raise StopIteration("End of chain reached") else: # We need to make sure there is no 0-length multiloop between the two stems. intersect = self.edges[node] & self.edges[next_node] for el in intersect: if el[0] == "m" and self.defines[el] == []: log.debug("ML %s between %s and %s: %s", el, node, next_node, self.define_a(el)) for el in intersect: if el[0] == "m" and self.defines[el] == []: # In case of this structuire ([)], there are 2 0-length multiloops between the two stems. prev_nuc = min(self.flanking_nucleotides(el)) log.debug("prev_nuc = %s", prev_nuc) if nuc - 1 == prev_nuc: node = el break else: node = next_node else: try: f1, f2 = self.flanking_nucleotides(node) except ValueError as e: # Too few values to unpack if node[0] == "f": if len(self.defines) == 1: raise StopIteration("Single stranded-only RNA") nuc, = self.flanking_nucleotides(node) else: raise StopIteration("End of chain reached") else: if f1 > f2: nuc = f1 else: nuc = f2 node = self.get_node_from_residue_num(nuc) log.debug("Next nuc is %s (%s), node is %s", nuc, repr(nuc), node)
[docs] def pseudoknotted_basepairs(self, ignore_basepairs=()): """ Return a list of base-pairs that will be removed to remove pseudoknots using the knotted2nested.py script. :param ignore_basepairs: An optional list of basepairs that knested2knotted will not consider present in the structure. :return: A list of base-pairs that can be removed. """ # remove unpaired bases and redundant pairs (i.e. (2,3) and (3,2)) pairs = sorted([tuple(sorted(p)) for p in self.to_pair_tuples() if p[1] != 0]) pairs = set(pairs) pairs = [p for p in pairs if p not in ignore_basepairs and (p[1], p[0] not in ignore_basepairs)] import forgi._k2n_standalone.knots as fakk pk_function = fakk.eg nested_pairs, removed_pairs = pk_function(pairs, return_removed=True) return removed_pairs
[docs] def ss_distance(self, e1, e2): ''' Calculate the distance between two elements (e1, e2) along the secondary structure. The distance only starts at the edge of each element, and is the closest distance between the two elements. :param e1: The name or nucleotide number of the first element :param e2: The name or nucleotide number of the second element :return: The integer distance between the two elements / residues along the secondary structure. (if a element is given, we use its corner for the distance, otherwise the exact nucleotide) ''' # get the edge nucleotides # thanks to: # http://stackoverflow.com/questions/2154249/identify-groups-of-continuous-numbers-in-a-list # we get the edges, except that they might be one too close because we use adjacent # nucleotides, nevertheless we'll take care of that later d1_corners = [] d2_corners = [] correction = 0 if isinstance(e1, int): d1_corners=[e1] elif isinstance(e1, RESID): d1_corners=[self.seq.to_integer(e1)] else: d1_corners=self.defines[e1] if not d1_corners: d1_corners=self.define_a(e1) correction+=1 if isinstance(e2, int): d2_corners=[e2] elif isinstance(e2, RESID): d2_corners=[self.seq.to_integer(e2)] else: d2_corners=self.defines[e2] if not d2_corners: d2_corners=self.define_a(e2) correction+=1 log.debug("Corners for distance are %s and %s", d1_corners, d2_corners) import networkx as nx G = self.to_networkx() path_lengths = [] for c1, c2 in it.product(d1_corners, d2_corners): path_lengths += [nx.shortest_path_length(G, c1, c2)] return min(path_lengths)+correction
[docs] def define_residue_num_iterator(self, node, adjacent=False, seq_ids=False): """ Iterate over the residue numbers that belong to this node. :param node: The name of the node """ visited = set() for r in self.define_range_iterator(node, adjacent): for i in range(r[0], r[1] + 1): if seq_ids: if self.seq.to_resid(i) not in visited: visited.add(self.seq.to_resid(i)) yield self.seq.to_resid(i) else: if i not in visited: visited.add(i) yield i
[docs] def shortest_path(self, e1, e2): ''' Determine the shortest path between two elements (e1, e2) along the secondary structure. :param e1: The name of the first element :param e2: The name of the second element :return: A list of the element names along the shortest path ''' import networkx as nx # Get residue numbers of source and targets, for shortest_path in nx source = min([res for res in self.define_residue_num_iterator(e1)]) target = min([res for res in self.define_residue_num_iterator(e2)]) # Get nx graph, and the shortest path G = self.to_networkx() nx_sp = nx.shortest_path(G, source=source, target=target) # Convert shortest path of residue numbers to a shortest path of node names sp, sp_set = [], set() # Use set to keep track of additions for faster lookup for res in nx_sp: node = self.get_node_from_residue_num(res) if node not in sp_set: sp_set.add(node) sp.append(node) # assymetric bulges with a length of 0 on 1 side are missed, # two adjacent stems indicate a bulge with length 0 along the path shortest_path, sp_set = [], set() # Connections are ordered compared to connected_stem_iterator() traversal = self.traverse_graph() # Iterate through adjacent pairs of elements in the list for n1, n2 in zip(sp, sp[1:]): # If two elements are both stems if n1.startswith('s') and n2.startswith('s'): # Find their connection in graph traversal connection = list( [conn for conn in traversal if n1 in conn and n2 in conn][0]) # If we're moving 'backwards' on the traversal if connection.index(n1) > connection.index(n2): connection.reverse() for node in connection: if node not in sp_set: sp_set.add(node) shortest_path.append(node) else: if n1 not in sp_set: sp_set.add(n1) shortest_path.append(n1) if n2 not in sp_set: shortest_path.append(n2) # Append last item in path return shortest_path
[docs] def get_position_in_element(self, resnum): """ Return the position of the residue in the cg-element and the length of the element. :param resnum: An integer. The 1-based position in the total sequence. :returns: A tuple (p,l) where p is the position of the residue in the cg-element (0-based for stems, 1-based for loops) and p/l gives a measure for the position of the residue along the cg-element's axis (0 means at cg.coords[elem][0], 1 at cg.coords[elem][1] and 0.5 exactely in the middle of these two. ) """ node = self.get_node_from_residue_num(resnum) if node[0] == 's': if self.defines[node][0] <= resnum <= self.defines[node][1]: return resnum - self.defines[node][0], self.defines[node][1] - self.defines[node][0] else: return abs(resnum - self.defines[node][3]), self.defines[node][1] - self.defines[node][0] elif node[0] == 'i': s0, s1 = self.connections(node) if self.defines[s0][1] <= resnum <= self.defines[s1][0]: return resnum - self.defines[s0][1], self.defines[s1][0] - self.defines[s0][1] else: return abs(resnum - self.defines[s0][2]) - 1, self.defines[s0][2] - self.defines[s1][3] elif node[0] == 'h': pos1 = resnum - self.defines[node][0] pos2 = abs(resnum - self.defines[node][1]) return min(pos1, pos2) + 1, (self.defines[node][1] - self.defines[node][0] + 2) // 2 i = 0 while i < len(self.defines[node]): s = self.defines[node][i] e = self.defines[node][i + 1] if s <= resnum <= e: return resnum - s + 1, e - s + 2 i += 2 return None
[docs] def connected(self, n1, n2): ''' Are the nucleotides n1 and n2 connected? :param n1: A node in the BulgeGraph :param n2: Another node in the BulgeGraph :return: True or False indicating whether they are connected. ''' if n1 in self.edges[n2] or n2 in self.edges[n1]: return True # two multiloops can be considered connected if they both # link to the same side of the same stem if n1[0] == 'm' and n2[0] == 'm': common_stems = list(set.intersection( self.edges[n1], self.edges[n2])) if len(common_stems) == 0: return False common_stem = common_stems[0] (s1c, b1c) = self._get_sides_plus(common_stem, n1) (s2c, b1c) = self._get_sides_plus(common_stem, n2) if sorted([s1c, s2c]) == [0, 3] or sorted([s1c, s2c]) == [1, 2]: return True return False
[docs] def min_max_bp_distance(self, e1, e2): ''' Get the minimum and maximum base pair distance between these two elements. If they are connected, the minimum distance will be 1. The maximum will be 1 + length(e1) + length(e1) :param e1: The name of the first element :param e2: The name of the second element :return: A tuple containing the minimum and maximum distance between the two elements. ''' if (e1, e2) in self._elem_bp_dists: # Shortcut if cached. return self._elem_bp_dists[(e1, e2)] min_bp = sys.maxsize max_bp = 0 if self.nx_graph is None: self.nx_graph = self.to_networkx() if self.nuc_bp_dists is None: import networkx as nx self.nuc_bp_dists = np.zeros( (self.seq_length + 1, self.seq_length + 1)) # col.defaultdict(dict) self.nuc_bp_dists[0, :] = np.nan self.nuc_bp_dists[:, 0] = np.nan dist_matrix = np.array(nx.floyd_warshall_numpy(self.nx_graph)) for (i1, n1), (i2, n2) in it.product(enumerate(self.nx_graph.nodes()), enumerate(self.nx_graph.nodes())): self.nuc_bp_dists[n1, n2] = dist_matrix[i1][i2] for f1, f2 in it.product(set(self.defines[e1]), set(self.defines[e2])): #d = nx.dijkstra_path_length(self.nx_graph, f1, f2) d = self.nuc_bp_dists[f1, f2] if d < min_bp: min_bp = d if d > max_bp: max_bp = d self._elem_bp_dists[(e1, e2)] = (min_bp, max_bp) self._elem_bp_dists[(e2, e1)] = (min_bp, max_bp) return (min_bp, max_bp)
# Free functions def _seq_of_Ns_from_db(dotbracket): """ Get a sequence containing only 'N'-characters with the same length and the same position of cutpoints as the dotbracketstring. :param dotbracket: A string, optionally containing '&' to indicate seperate cofolded structures. """ seq = [] for substr in dotbracket.split('&'): seq.append("N" * len(substr)) return "&".join(seq) def _cleaned_bg(bg, dissolve_length_one_stems=False, remove_pseudoknots=False): """ Return a new BulgeGraph with a cleaned secondary structure or a reference to the original BulgeGraph. This function performs the cleaning operations requested by the flags dissolve_length_one_stems and remove_pseudoknots. :param dissolve_length_one_stems: If true, all stems with only a single basepair are removed from the structure. :param remove_pseudoknots: If true, nested2knotted is used to generate a pseudoknot-free structure by removing conflicting basepairs. :returns: A modified copy of or a reference to the original BulgeGraph """ log.debug("Cleaning BG with type %s", type(bg).__name__) bps_to_remove = [] if dissolve_length_one_stems: bps_to_remove.extend(bg.length_one_stem_basepairs()) if remove_pseudoknots: bps_to_remove.extend(bg.pseudoknotted_basepairs( ignore_basepairs=bps_to_remove)) if bps_to_remove: log.info("Recreating without the following " "basepairs: %s", bps_to_remove) new_tuples = bg.to_pair_tuples(bps_to_remove) c = _BulgeGraphConstruction(new_tuples) # Create and init new object with only the Structure changed. return type(bg)(c, bg.seq, bg.name, bg.infos) else: log.debug("Cleaning bg is no-op, returning bg with defines %s", bg.defines) return bg

Contents