forgi 2.0.0 documentation

Contents

Source code for forgi.utilities.stuff

from builtins import next
from builtins import range
import itertools as it
import contextlib
import random
import shutil
import tempfile as tf
import collections as col
import sys
import subprocess

import logging
log = logging.getLogger(__name__)

import forgi
import forgi.utilities.debug as cud
from .exceptions import GraphConstructionError


bracket_left = "([{<ABCDEFGHIJKLMNOPQRSTUVWXYZ"
bracket_right = ")]}>abcdefghijklmnopqrstuvwxyz"


[docs]def is_string_type(stri): if sys.version_info < (3,): return isinstance(stri, (str, unicode)) else: return isinstance(stri, str)
[docs]def get_version_string(): """ If installed from github, print the version obtained by `git describe` On my local machine, when run within a git directory, get the commit hash directly using `git describe` """ try: # Installed with setup.py from a gitrepo label = "forgi {}".format(forgi.__complete_version__) except: try: # On my local machine, run from git directory. repo = subprocess.check_output( ["git", "rev-parse", "--show-toplevel"]).decode('ascii') if "forgi" not in repo: raise OSError("") label = subprocess.check_output( ["git", "describe", "--dirty"]).decode('ascii') label = "forgi {}".format(label) except OSError: # In production, use the version variable label = "forgi {}".format(forgi.__version__) return label
# COVERAGE: Not used
[docs]def grouped(iterable, n): ''' Return a list of every n elements in iterable. http://stackoverflow.com/questions/5389507/iterating-over-every-two-elements-in-a-list s -> (s0,s1,s2,...sn-1), (sn,sn+1,sn+2,...s2n-1), (s2n,s2n+1,s2n+2,...s3n-1), ... ''' return zip(*[iter(iterable)] * n)
[docs]def merge_intervals(intervals, diff=0): ''' Take a set of intervals, and combine them whenever the endpoints match. I.e. [(42,47), (55,60), (60,63), (1,9), (63,71)] Should yield [(1,9),(42,47), (55,71)] There should be no overlapping intervals. @param intervals: A set of tuples indicating intervals @return: A list of merged intervals ''' intervals.sort() iter_intervals = iter(intervals) # get the first interval curr_interval = list(next(iter_intervals)) merged_intervals = [] for i in iter_intervals: if abs(i[0] - curr_interval[1]) <= diff: # the start of this interval is equal to the end of the # current merged interval, so we merge it curr_interval[1] = i[1] else: # start a new interval and add the current merged one # to the list of intervals to return merged_intervals += [curr_interval] curr_interval = list(i) merged_intervals += [curr_interval] return merged_intervals
[docs]def gen_random_sequence(l): ''' Generate a random RNA sequence of length l. ''' return "".join([random.choice(['A', 'C', 'G', 'U']) for i in range(l)])
[docs]@contextlib.contextmanager def make_temp_directory(): ''' Yanked from: http://stackoverflow.com/questions/13379742/right-way-to-clean-up-a-temporary-folder-in-python-class ''' temp_dir = tf.mkdtemp() try: yield temp_dir finally: shutil.rmtree(temp_dir)
[docs]def insert_into_stack(stack, i, j): # print "add", i,j k = 0 while len(stack[k]) > 0 and stack[k][len(stack[k]) - 1] < j: k += 1 stack[k].append(j) return k
[docs]def delete_from_stack(stack, j): # print "del", j k = 0 while len(stack[k]) == 0 or stack[k][len(stack[k]) - 1] != j: k += 1 stack[k].pop() return k
[docs]def pairtable_to_dotbracket(pt): """ Converts arbitrary pair table array (ViennaRNA format) to structure in dot bracket format. """ stack = col.defaultdict(list) seen = set() res = "" for i in range(1, pt[0] + 1): if pt[i] != 0 and pt[i] in seen: raise ValueError('Invalid pairtable contains duplicate entries') seen.add(pt[i]) if pt[i] == 0: res += '.' else: # '(' check if we can stack it... if pt[i] > i: res += bracket_left[insert_into_stack(stack, i, pt[i])] else: # ')' res += bracket_right[delete_from_stack(stack, i)] return res
[docs]def inverse_brackets(bracket): res = col.defaultdict(int) for i, a in enumerate(bracket): res[a] = i return res
[docs]def dotbracket_to_pairtable(struct): """ Converts arbitrary structure in dot bracket format to pair table (ViennaRNA format). """ if len(struct) == 0: raise ValueError("Cannot convert empty structure to pairtable") pt = [0] * ((len(struct) + 1) - struct.count("&")) pt[0] = len(struct) - struct.count("&") stack = col.defaultdict(list) inverse_bracket_left = inverse_brackets(bracket_left) inverse_bracket_right = inverse_brackets(bracket_right) i = 0 for a in struct: if a == '&': continue i += 1 # print i,a, pt log.debug("Parsing bracket %r", a) if a == ".": pt[i] = 0 else: if a in inverse_bracket_left: stack[inverse_bracket_left[a]].append(i) else: assert a in inverse_bracket_right if len(stack[inverse_bracket_right[a]]) == 0: raise ValueError('Too many closing brackets!') j = stack[inverse_bracket_right[a]].pop() pt[i] = j pt[j] = i if len(stack[inverse_bracket_left[a]]) != 0: raise ValueError('Too many opening brackets!') return pt
[docs]def pairtable_to_tuples(pt): ''' Convert a pairtable to a list of base pair tuples. i.e. [4,3,4,1,2] -> [(1,3),(2,4),(3,1),(4,2)] :param pt: A pairtable :return: A list paired tuples ''' pt = iter(pt) # get rid of the first element which contains the length # of the sequence. We'll figure it out after the traversal next(pt) tuples = [] for i, p in enumerate(pt): tuples += [(i + 1, p)] return tuples
[docs]def tuples_to_pairtable(pair_tuples, seq_length=None): ''' Convert a representation of an RNA consisting of a list of tuples to a pair table: i.e. [(1,3),(2,4),(3,1),(4,2)] -> [4,3,4,1,2] :param tuples: A list of pair tuples :param seq_length: How long is the sequence? Only needs to be passed in when the unpaired nucleotides aren't passed in as (x,0) tuples. :return: A pair table ''' if seq_length is None: max_bp = max([max(x) for x in pair_tuples]) else: max_bp = seq_length pt = [0] * (max_bp + 1) pt[0] = max_bp for tup in pair_tuples: pt[tup[0]] = tup[1] return pt
[docs]def pairtable_to_elements(pt, level, i, j): ''' Convert a pair table to a list of secondary structure elements: [['s',1,[2,3]] The 's' indicates that an element can be a stem. It can also be an interior loop ('i'), a hairpin loop ('h') or a multiloop ('m') The second number (1 in this case) indicates the depth or how many base pairs have to be broken to get to this element. Finally, there is the list of nucleotides which are part of of this element. ''' elements = [] u5 = [i - 1] u3 = [j + 1] if (i > j): return [] # iterate over the unpaired regions on either side # this is either 5' and 3' unpaired if level == 0 # or an interior loop or a multiloop while (pt[i] == 0): u5.append(i) i += 1 while (pt[j] == 0): u3.append(j) j -= 1 if (i > j): # hairpin loop or one large unpaired molecule u5.append(i) if (level == 0): return [['e', level, sorted(u5)]] else: # check to see if we have chain breaks due # to multiple strands in the input external = False left = [] right = [] for k in range(0, len(u5)): if (external): right.append(u5[k]) else: left.append(u5[k]) return [['h', level, sorted(u5)]] if (pt[i] != j): # multiloop m = u5 k = i # the nucleotide before and the starting nucleotide m.append(k) while (k <= j): # recurse into a stem elements += pairtable_to_elements(pt, level, k, pt[k]) # add the nucleotides between stems m.append(pt[k]) k = pt[k] + 1 while (pt[k] == 0 and k <= j): m.append(k) k += 1 m.append(k) m.pop() m += u3 if (len(m) > 0): if (level == 0): elements.append(['e', level, sorted(m)]) else: elements.append(['m', level, sorted(m)]) return elements if (pt[i] == j): # interior loop u5.append(i) u3.append(j) combined = u5 + u3 if len(combined) > 4: if (level == 0): elements.append(['e', level, sorted(u5 + u3)]) else: elements.append(['i', level, sorted(u5 + u3)]) s = [] # go through the stem while (pt[i] == j and i < j): # one stem s.append(i) s.append(j) i += 1 j -= 1 level += 1 u5 = [i - 1] u3 = [j + 1] elements.append(['s', level, sorted(s)]) return elements + pairtable_to_elements(pt, level, i, j)
[docs]def bpseq_to_tuples_and_seq(bpseq_str): """ Convert a bpseq string to a list of pair tuples and a sequence dictionary. The return value is a tuple of the list of pair tuples and a sequence string. :param bpseq_str: The bpseq string :return: ([(1,5),(2,4),(3,0),(4,2),(5,1)], 'ACCAA') """ lines = bpseq_str.split('\n') seq = [] tuples = [] pairing_partner = {} for line in lines: parts = line.split() if len(parts) == 0: continue (t1, s, t2) = (int(parts[0]), parts[1], int(parts[2])) if t2 in pairing_partner and t1 != pairing_partner[t2]: raise GraphConstructionError("Faulty bpseq string. {} pairs with {}, " "but {} pairs with {}".format(t2, pairing_partner[t2], t1, t2)) if t1 in pairing_partner and t2 != pairing_partner[t1]: raise GraphConstructionError("Faulty bpseq string. {} pairs with {}, " "but {} pairs with {}".format(pairing_partner[t1], t1, t1, t2)) pairing_partner[t1] = t2 if t2 != 0: pairing_partner[t2] = t1 tuples += [(t1, t2)] seq += [s] seq = "".join(seq).upper().replace('T', 'U') return (tuples, seq)
[docs]def renumber_bpseq(bpseq_triples): """ :param bpseq_triples: A list of triples (from, res, to) """ out = [] mapping = {} for i, triple in enumerate(bpseq_triples): mapping[triple[0]] = i + 1 for triple in bpseq_triples: from_, res, to_ = triple if to_ not in [0, '0']: to_ = mapping[to_] from_ = mapping[from_] out.append("{} {} {}".format(from_, res, to_)) return "\n".join(out)

Contents