forgi 2.0.0 documentation

Contents

Source code for forgi.threedee.classification.aminor

from __future__ import print_function, division, absolute_import

import pkgutil
import logging
import warnings
import json

try:
    from io import StringIO
except ImportError:
    from StringIO import StringIO

import numpy as np

import pandas as pd

from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.neighbors.kde import KernelDensity
from sklearn.metrics import confusion_matrix

import forgi.threedee.utilities.vector as ftuv
import forgi.threedee.utilities.graph_pdb as ftug

import matplotlib.pyplot as plt

log = logging.getLogger(__name__)


CUTOFFDIST = 30
ANGLEWEIGHT = 10

"""
This module contains code for classifying a coarse-grained geometry as A-Minor
interaction.

.. warning:: This is intended for low-resolution data , as it does not take the
             orientation of individual bases into account. If you have all-atom
             data, dedicated tools like FR3D will be more accurate.

If you just want to classify interactions in a given structure, you only need
the functions `classify_interaction` or `all_interactions`.

To train your own version of the classifier, modify its parameters or perform
cross-validation, use the AMinorClassifier.

Access the default trainings data with the get_trainings_data(loop_type)
function.
"""


P_INTERACTION = 0.05  # 0.03644949066213922

################# Just classify my structure ############################


[docs]def loop_potential_interactions(cg, loop, domain=None): """ Iterate over all stems and return those loop-stem pairs that will be passed to the AMinor classification and are not ruled out beforehand. """ geos = [] labels = [] if domain is not None: stems = (s for s in domain if s[0] == "s") else: stems = cg.stem_iterator() for stem in stems: if stem in cg.edges[loop]: continue # To save computation time if not ftuv.elements_closer_than(cg.coords[loop][0], cg.coords[loop][1], cg.coords[stem][0], cg.coords[stem][1], CUTOFFDIST): continue geos.append(get_relative_orientation(cg, loop, stem)) labels.append([loop, stem]) geos = np.array(geos) if len(geos) > 0: geos[:, 0] /= ANGLEWEIGHT return geos, labels
[docs]def potential_interactions(cg, loop_type, domain=None): """ :returns: A tuple `geos`, `labels`. `geos` is an Nx3 array, where N is the number of potential interactions and the inner dimension is dist, angle1, angle2. `geos` can be passed to the AMinor classifier. `labels` is an Nx2 array, where the inner dimension is loopname, stemname """ labels = [] geos = [] for loop in cg.defines: if domain is not None and loop not in domain: continue if loop[0] != loop_type: continue if 'A' not in "".join(cg.get_define_seq_str(loop)): continue loop_geos, loop_labels = loop_potential_interactions(cg, loop, domain) geos.extend(loop_geos) labels.extend(loop_labels) return np.array(geos), np.array(labels)
[docs]def all_interactions(cg, clfs=None): """ Get a list of all predicted A-Minor interactions in a cg-object. This is more efficient than using classify_interaction iteratively, because it uses vectorization. :param clfs: A dictionary {loop_type: AMinorClassifier} where loop_type is one of "i", "h", "m". If clfs is None or a key is missing, uses the default pretrained classifier. :returns: A list of tuples (loop, stem) """ interactions = [] for loop_type in ["i", "h"]: if clfs is not None and loop_type not in clfs: warnings.warn("No classifier specified for loop type %s (only for %s), " "using default classifier.", loop_type, ",".join(clfs.keys())) if clfs is None or loop_type not in clfs: clf = _get_default_clf(loop_type) else: clf = clfs[loop_type] geos, labels = potential_interactions(cg, loop_type) interactions.extend( _classify_potential_interactions(clf, geos, labels)) return interactions
[docs]def classify_interaction(cg, loop, stem=None, clf=None): """ Returns the interaction pair loop, stem as a tuple or False if no interaction exists. :param cg: The CoarseGrainRNA :param loop: The loop name, e.g. "i0" :param stem: A stem name, e.g. "s0" to consider interactions of loop with another stem as False and only an interaction with this stem as True. ..warning:: Our statistical modelling allows for at most 1 interaction per loop. This means that we have calculate the interaction probability of this loop with all stems, even if stem is given. If another stem has a higher interaction probability than the given stem, this function will return False, regardless of the interaction probability stem-loop. :param clf: A trained AMinorClassifier or None (use default classifier for loop type) ..note:: `all_interactions` is more efficient, if you are interested in all loop-stem pairs. :returns: A tuple (loop, stem) or False """ if clf is None: clf = _get_default_clf(loop[0]) geos, labels = loop_potential_interactions(cg, loop) interactions = _classify_potential_interactions(clf, geos, labels) if not interactions: return False if stem is not None and interactions != [(loop, stem)]: return False interaction, = interactions return interaction
############# Roll your own classifier ###########################
[docs]def get_trainings_data(loop): loop_type = loop[0] return _DefaultClf._get_data(loop_type)
[docs]class AMinorClassifier(BaseEstimator, ClassifierMixin): """ A classifier that predicts A-Minor interactions based on the following formula: .. math:: P(I|geo)=f(geo|I)/f(geo)*P(I) where :math:`f` is a probability density and :math:`P` is a probability and :math:`I` means interaction. Since there are no interactions with more than 30 Angstrom distance, interactions require an A in the loop sequence and estimating numerator and denominator seperately could lead to probabilities greater than 1, we use the following formula: .. math:: P(I| (geo, d<30, A in seq)) = num/denom where .. math:: num=f(geo|I)*P(I|(d<30, A \\in seq)) denom=num+f(geo|(\\not I, d<30))*(1-P(I|(d<30, A \\in seq)) We estimate the probability densities as kernel density estimates. For :math:`f(geo|I)` we use annotations created by FR3D-searches for all 4 types of A-minor motifs. We assume that :math:`f(geo|(\\not I, A \\in seq))== f(geo|(\\not I, A \\in seq))` and that FR3D might miss some true interactions. Thus we estimate :math:`f(geo|(\\not I, A \\in seq, d<30))` as :math:`f(geo|(\\not I, A \\notin seq, d<30))` Since both densities are normalized, it does not matter that we use a different number of datapoints for their estimation. We estimate P(I|(d<30, A \\in seq)) as the number of occurrences where a loop with A is in an A-minor interaction, over the number of all loop-stem pairs with less than 30 angstrom distance and an A in the sequence. """ def __init__(self, kernel="linear", bandwidth=0.3, symmetric=True, p_I=P_INTERACTION): self.p_I = p_I self.symmetric = symmetric self.kernel = kernel self.bandwidth = bandwidth
[docs] def fit(self, X, y): """ Train the model. :param X: A Nx3 array, where the features are distance(Angstrom)/10, angle1(rad), angle2(rad) The **distance** is the closest distance between the two line segments (i.e. coarse grained elementts) **angle1** is the angle between the line along the stem vector and the line along the shortest connection between the two elements. A an angle between two straight lines, it is defined between 0 and 90 degrees. **angle2** is the angle between the connecting vector (pointing from the stem to the loop), projected onto the plane normal to the stem direction and the twist vector (location of minor groove) at the point closest to the interaction. As an angle between two vectors, it is defined between 0 and 180 degrees. :param y: An array of length N. 0 means no interaction, 1 means interaction. """ # Check that X and y have correct shape X, y = check_X_y(X, y) log.info("Trainings-data has shape %s", X.shape) log.info("We have %s known interactions ", sum(y)) if X.shape[1] != 3: raise TypeError( "Expect exactly 3 features, found {}".format(X.shape[1])) if not all(yi in [0, 1] for yi in y): raise ValueError("y should only contain the values 1 and 0") ame = X[np.where(y)] non_ame = X[np.where(y == 0)] if self.symmetric: ame = self._make_symmetric(ame) non_ame = self._make_symmetric(non_ame) log.info("Fitting. First positive sample: %s", X[np.where(y)][0]) self.ame_kde_ = KernelDensity(kernel=self.kernel, bandwidth=self.bandwidth).fit(ame).score_samples self.non_ame_kde_ = KernelDensity(kernel=self.kernel, bandwidth=self.bandwidth).fit(non_ame).score_samples self.X_ = X self.y_ = y
@staticmethod def _make_symmetric(geos): """ Make the trainingsdata symmetric around those bounds of the radial variables, where we expect a high density. Kernel density estimation does not work too well out of the box on bounded support: Samples close to the boundary have create a significant density outside the support. Due to symmetry considerations of angular data, we can avoid this issue by adding symmetric datapoints the following way: Mirror all datapoints along the plane angle1=180 degrees (both angles between the two lines are equivalent) and around the plane angle2==0 degrees (both rotational directions are equivalent). By creating 4 times as many datapoints, we avoid bias near the boundary. """ geos = np.concatenate([geos, [(d, np.pi - a1, a2) for d, a1, a2 in geos]]) # We allow negative angles for angle2, so there is no need to make it symmetric. # geos = np.concatenate([geos, # [(d, a1, -a2) for d, a1, a2 in geos]]) return geos
[docs] def predict(self, X): return self.predict_proba(X) > 0.5
[docs] def score(self, X, y): """ The average between specificity and sensitivity """ y_pred = self.predict(X) tn, fp, fn, tp = confusion_matrix(y, y_pred).ravel() specificity = tn / (tn + fp) sensitivity = tp / (tp + fn) return (0.8 * sensitivity + 0.2 * specificity)
[docs] def predict_proba(self, X): check_is_fitted(self, ['ame_kde_', 'non_ame_kde_']) X = check_array(X) log.debug("Predicting for %s", X) numerator = np.exp(self.ame_kde_(X)) * self.p_I denom = numerator + np.exp(self.non_ame_kde_(X)) * (1 - self.p_I) with warnings.catch_warnings(): # division by 0 warnings.simplefilter("ignore", RuntimeWarning) return np.nan_to_num(numerator / denom)
[docs] def set_params(self, **kwargs): super(AMinorClassifier, self).set_params(**kwargs) # If it was fitted, we must propagate the parameter changes # to the child-KDEs by refitting to the same data if hasattr(self, "X_"): self.fit(self.X_, self.y_) return self
############## get orientation #########################
[docs]def get_loop_flexibility(cg, loop): """ Unused. We tried to see if the length of the loop vs # bases had an effect on ointeraction probability. """ assert loop[0] == "i" d = cg.define_a(loop) nt1, nt2 = d[1] - d[0], d[3] - d[2] max_nts = max(nt1, nt2) loop_length = ftuv.magnitude(cg.coords.get_direction(loop)) # As number of nucleotide-links (or phosphate groups) per Angstrom # 9.2 is the sum of average bond lengths for bonds in the nucleotide linkage. # Bond lengths taken from: DOI: 10.1021/ja9528846 # A value of 1 means, all bonds are stretched. # Ideal helices have a value of: 4.41 # A value below 1 should be rare. # Higher values mean higher flexibility. return (max_nts) / loop_length * 9.2
[docs]def get_relative_orientation(cg, loop, stem): ''' Return how loop is related to stem in terms of three parameters. The stem is the receptor of a potential A-Minor interaction, whereas the loop is the donor. The 3 parameters are: 1. Distance between the closest points of the two elements 2. The angle between the stem and the vector between the two 3. The angle between the minor groove of l2 and the projection of the vector between stem and loop onto the plane normal to the stem direction. ''' point_on_stem, point_on_loop = ftuv.line_segment_distance(cg.coords[stem][0], cg.coords[stem][1], cg.coords[loop][0], cg.coords[loop][1]) conn_vec = point_on_loop - point_on_stem dist = ftuv.magnitude(conn_vec) angle1 = ftuv.vec_angle(cg.coords.get_direction(stem), conn_vec) # The direction of the stem vector is irrelevant, so # choose the smaller of the two angles between two lines if angle1 > np.pi / 2: angle1 = np.pi - angle1 tw = cg.get_twists(stem) if dist == 0: angle2 = float("nan") else: if stem[0] != 's': raise ValueError( "The receptor needs to be a stem, not {}".format(stem)) else: stem_len = cg.stem_length(stem) # Where along the helix our A-residue points to the minor groove. # This can be between residues. We express it as floating point nucleotide coordinates. # So 0.0 means at the first basepair, while 1.5 means between the second and the third basepair. pos = ftuv.magnitude(point_on_stem - cg.coords[stem][0]) / ftuv.magnitude( cg.coords.get_direction(stem)) * (stem_len - 1) # The vector pointing to the minor groove, even if we are not at a virtual residue (pos is a float value) virt_twist = ftug.virtual_res_3d_pos_core( cg.coords[stem], cg.twists[stem], pos, stem_len)[1] # The projection of the connection vector onto the plane normal to the stem conn_proj = ftuv.vector_rejection( conn_vec, cg.coords.get_direction(stem)) try: # Note: here the directions of both vectors are well defined, # so angles >90 degrees make sense. angle2 = ftuv.vec_angle(virt_twist, conn_proj) except ValueError: if np.all(virt_twist == 0): angle2 = float("nan") else: raise # Furthermore, the direction of the second angle is meaningful. # We call use a positive angle, if the cross-product of the two vectors # has the same sign as the stem vector and a negative angle otherwise cr = np.cross(virt_twist, conn_proj) sign = ftuv.is_almost_parallel(cr, cg.coords.get_direction(stem)) #assert sign != 0, "{} vs {} not (anti) parallel".format( # cr, cg.coords.get_direction(stem)) angle2 *= sign return dist, angle1, angle2
############# Private ########################################## def _get_masks(df, loop_type): lt_mask = df.loop_type == loop_type cutoff_mask = df.dist < CUTOFFDIST in_support = lt_mask & cutoff_mask has_a = df.loop_sequence.str.contains("A") is_i = df.is_interaction mask_ame = in_support & is_i & has_a mask_non_ame = in_support & np.invert(is_i | has_a) mask_non_fred = in_support & np.invert(is_i) & has_a return mask_ame, mask_non_ame, mask_non_fred def _classify_potential_interactions(clf, geos, labels): if len(geos) == 0: return [] score = clf.predict_proba(geos) y = score > 0.5 log.info("Classifying %s", labels) log.info("score %s", score) log.info("# hits (not unique)=%s", sum(y)) # We modelled the probabilities in a way that each loop can have only 1 interaction. # So for multiple interactions, use only the best one. best_interactions = {} for i, label in enumerate(labels): loop = label[0] if score[i] < 0.5: continue if loop not in best_interactions or score[i] > best_interactions[loop][0]: best_interactions[loop] = (score[i], label) log.info("%s unique interacting loops.", len(best_interactions)) log.debug(best_interactions) return [best_i[1] for best_i in best_interactions.values()] class _DefaultClf(object): """Just to put global (cached) variables into a seperate scope.""" _clfs = {} # The pretrained classifiers, lazily loaded. _geo_df = None @classmethod def get_default_clf(cls, loop_type): if loop_type not in cls._clfs: clf = AMinorClassifier() rawdata = pkgutil.get_data( 'forgi', 'threedee/data/aminor_params.json') params = json.load(StringIO(rawdata.decode("ascii"))) clf.set_params(**params[loop_type]) X, y = cls._get_data(loop_type) clf.fit(X, y) cls._clfs[loop_type] = clf return cls._clfs[loop_type] @classmethod def get_dataframe(cls): if cls._geo_df is None: rawdata = pkgutil.get_data( 'forgi', 'threedee/data/aminor_geometries.csv') cls._geo_df = pd.read_csv( StringIO(rawdata.decode("ascii")), comment="#", sep=" ") return cls._geo_df @classmethod def _get_data(cls, loop_type): df = cls.get_dataframe() return df_to_data_labels(df, loop_type) def _get_default_clf(loop): """ :param loop: A element name, e.g. "i0" or a loop-type, e.g. "i" """ loop_type = loop[0] return _DefaultClf.get_default_clf(loop_type)
[docs]def df_to_data_labels(df, loop_type): """ Create the trainings data as two arrays X and y (or data and labels) from the initial dataframe :returns: X, y """ with warnings.catch_warnings(): warnings.simplefilter("ignore") df["loop_name"] = df.pdb_id + \ df.interaction.str.split("-").apply(lambda x: x[0]) mask_ame, mask_non_ame, mask_non_fred = _get_masks(df, loop_type) data = pd.concat([df[mask_ame].drop_duplicates(["loop_name"]), df[mask_non_ame]]) data = np.array([[x.dist / ANGLEWEIGHT, x.angle1, x.angle2, x.is_interaction] for x in data.itertuples()]) return data[:, :3], data[:, 3]

Contents