Source code for RNAsketch.RNARedPrintSampler

from __future__ import print_function

'''
    RNARedPrintSampler.py: Class that uses system calls to sample sequences with the RNARedPrint program. It implements the same interface as the DependencyGraph of RNAblueprint. This makes it possible to use it within RNAsketch to optimize sequences.
'''

__author__ = "Stefan Hammer"
__copyright__ = "Copyright 2017"
__version__ = "0.1"
__maintainer__ = "Stefan Hammer"
__email__ = "s.hammer@univie.ac.at"

from distutils.dir_util import copy_tree
import os
import sys
import uuid
import shutil
import timeit
import subprocess as sp
import math
from RNAsketch import *

import numpy as np
import re
import scipy.stats as stats

from Structure import RNAStructure

[docs]class RPSampler(object): ''' RPSampler is a python wrapper for RNARedPrint and implements a similar interface to the RNAblueprint DependencyGraphMT object. :param structures: List of Dot-bracket structure strings :param constraint: Sequence constraints in IUPACK notation :param model: String specifying the model, can be 'uniform', 'nussinov', 'basepairs' or 'stacking' :param weights: List of integers specifying the boltzmann weight for the target structures, range in ]0, +infinity] :param gcweight: Integer specifying the GC-weight, ]0, +infinity] :param temperature: Temperature for the folding predictions (default: 37.0 degree celsius) :param stacksize: Size of one sequence sampling batch (default: 1000 sequences) :param StopConstruct: Bool specifying if we want to benchmark the construction time. Measured time can be obtained from construction_time after construction (default: False) :param RedPrintFolder: Location of the RNARedPrint project folder (default: './RNARedPrint/') :param debug: Bool to print debug statements (default: False) ''' def __init__(self, structures, constraint='', model='nussinov', weights=[], gcweight=1, temperature=37.0, stacksize=1000, StopConstruct=False, RedPrintFolder = None, debug=False): self._structures = structures self._constraint = constraint self.model = model self._samplestack = [] self._stacksize = stacksize self.sample_time = 0 self.construction_time = 0 self.number_of_connected_components = 1 self._current = 0 self.gcweight = gcweight self._temperature = temperature self.weights = weights self._debug = debug if not RedPrintFolder: RedPrintFolder = self._get_path('') if not RedPrintFolder: RedPrintFolder = './RNARedPrint/' # copy RNARedprint binary to temp toDirectory for multithread self._copy_RNAredprint_folder(RedPrintFolder) # call RNAredprint to get construction time if StopConstruct: self._call_RNAredprint(0) def __del__(self): shutil.rmtree(self._RedPrintFolder, ignore_errors=True) def _get_path(self, subfolder): ''' Try to detect the RNARedPrint project folder location :param subfolder: string specifying a subfolder, e.g. 'bin/' :return: string with the complete project path ''' if 'REDPRINT' in os.environ: return os.environ['REDPRINT'] + subfolder else: return subfolder @property def model(self): ''' String specifying the sampling model. :return: string, can be 'uniform', 'nussinov', 'basepairs' or 'stacking' ''' return self._model @model.setter def model(self, value): modelarg = None if value == 'uniform': modelarg = 0 elif value == 'nussinov': modelarg = 1 elif value == 'basepairs': modelarg = 2 elif value == 'stacking': modelarg = 3 else: raise ValueError('Modelarg not set!') self._modelarg = modelarg self._model = value @property def weights(self): ''' List of integers specifying the boltzmann weight for the target structures, range in ]0, +infinity] ''' return self._weights @weights.setter def weights(self, value): # standard weights for boltzmann sampling if not value: value = [math.exp(1/((self._temperature + 273.15)*0.00198717))] * len(self._structures) # check amount of weights if len(value) != len(self._structures): raise ValueError('Amount of weights must be equal to the amount of structures') for w in value: if w <= 0: raise ValueError('Weights must be in range ]0, +infinity]') self._weights = value @property def gcweight(self): ''' Integer specifying the GC-weight, ]0, +infinity] ''' return self._gcweight @gcweight.setter def gcweight(self, value): if value <= 0: raise ValueError('GC weight must be in range ]0, +infinity]') else: self._gcweight = value
[docs] def dump_new_stack(self): ''' Draws a sample from RNARedPrint and returns the list of generated sequences at once. The variable stacksize specifies the amount of sequences. :return: List of RNA sequence strings ''' self._samplestack = [] self._current = 0 newseqs, energies = self._call_RNAredprint(number=(self._stacksize)) return newseqs, energies
[docs] def sample(self): ''' Sample a new sequence. :return: Returns 0 for compatibility to DependencyGraphMT. ''' self._current += 1 return 0
[docs] def sample_clocal(self): ''' Compatibility to the DependencyGraphMT object, calls sample() internally. ''' self.sample()
[docs] def sample_plocal(self): ''' Compatibility to the DependencyGraphMT object, calls sample() internally. ''' self.sample()
[docs] def get_sequence(self): ''' Get the sampled sequence as a string. :return: RNA Sequence string in IUPACK notation ''' if len(self._samplestack) <= self._current: if self._stacksize <= self._current: self._stacksize += 1000 self._get_new_sample() return self._samplestack[self._current]
[docs] def revert_sequence(self, amount): ''' Compatibility to the DependencyGraphMT object, does nothing internally. ''' #self._current -= amount pass
[docs] def set_sequence(self, seq): ''' Compatibility to the DependencyGraphMT object, does nothing internally. ''' pass
[docs] def set_history_size(self, size): ''' Compatibility to the DependencyGraphMT object, does nothing internally. ''' pass
#if self._stacksize != size: # self._stacksize = size # self._get_new_sample() def _get_new_sample(self): # generate list of sequences with RNAredprint if (self._debug): print('# getting new sequences: ', self._stacksize, len(self._samplestack), str(self._stacksize - len(self._samplestack))) newseqs, _ = self._call_RNAredprint(number=(self._stacksize - len(self._samplestack))) self._samplestack.extend(newseqs) def _copy_RNAredprint_folder(self, RedPrintFolder): # copy subdirectory example fromDirectory = RedPrintFolder toDirectory = "/tmp/RNARedPrint-" + str(uuid.uuid4()) self._RedPrintFolder = toDirectory copy_tree(fromDirectory, toDirectory) def _call_RNAredprint(self, number=1000): # resolve PKs structuresNoPK = {} weights = {} for i, s in enumerate(self._structures): structuresNoPK[s] = RNAStructure(s).resolvePKs() weights[s] = self._weights[i] # create structure and weight list structures_cmd = [] weights_cmd = [] for struct, NoPKs in structuresNoPK.items(): for NoPK in NoPKs: structures_cmd.append('"'+str(NoPK)+'"') weights_cmd.append(weights[struct]) cmd = ['./RNARedPrint'] + structures_cmd + ['--num', str(number), '--model', str(self._modelarg), '-gcw', str(self._gcweight), '--weights', ','.join(map(str, weights_cmd))] #cmd=['ls'] cmdstring = " ".join(cmd) if (self._debug): print("# ", cmdstring) #cmd = ['RNAblueprint', '-n', str(number)] #stdin = '\n'.join(structures+[constraint]) #p = sp.Popen(cmd, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) start = timeit.default_timer() p = sp.Popen(cmdstring, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=True, cwd=self._RedPrintFolder + "/bin/") #out, err = p.communicate(input=stdin) out, err = p.communicate(input=None) if err: exit(err) if number == 0: self.construction_time = timeit.default_timer() - start else: self.sample_time += timeit.default_timer() - start if (self._debug): print("# ", out, err) lines = out.splitlines() seqpattern = re.compile(r"^[AUGC]+") strucpattern = re.compile(r"^[\.\(\)]+$") epattern = re.compile(r"E(\d)=(-?[\d]+\.?[\d]*)") structures = [] sequences = [] energies = [] for l in lines: s = re.search(strucpattern, l, flags=0) if s: structures.append(s.group(0)) else: s = re.search(seqpattern, l, flags=0) if s: sequences.append(s.group(0)) struct_energy = {} for e in re.finditer(epattern, l, flags=0): struct_energy[structures[int(e.group(1))-1]] = float(e.group(2)) energy = {} for i, s in enumerate(self._structures): energy[i] = 0 for sNoPKs in structuresNoPK[s]: energy[i] += struct_energy[sNoPKs] energies.append(energy) return sequences, energies