Source code for pygwas.boundary

import collections
from exceptions import InvalidBoundarySpec
from . import BuildReportLine
import os

__copyright__ = "Todd Edwards, Chun Li & Eric Torstenson"
__license__ = "GPL3.0"
#     This file is part of pyGWAS.
#
#     pyGWAS is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     pyGWAS is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with MVtest.  If not, see <http://www.gnu.org/licenses/>.


[docs]class BoundaryCheck(object): """Record boundary specifications from user to control traversal. Default boundaries are specified in numerical positions along a single chromosome. Users are permitted to provide boundaries in 3 forms: Bases, Kilobasees and Megabases. All are recorded as single base offsets from the beginning of the chromosome (starting at 1). The valid setting doesn't mean the boundary object is invalid, only that no actual boundary ranges have been provided. This is done to allow the user interface code to be a little simpler (i.e. if the user didn't provide bounds using numerical boundaries, it can try instantiating a SnpBoundary and pass the relevant arguments to that object. If none are valid, then either can be used, at which point both act as chromosome boundaries or simple SNP filters) If chrom is specfied, all SNPs and boundaries are expected to reside on that chromosome. """ chrom = -1 def __init__(self, bp=(None, None), kb=(None, None), mb=(None, None)): """Initialize boundary :param bp: limit range in base pairs :param kb: limit range in kilobases :param mb: limit range in megabases :return: None If any of the range objects contains a valid pair, valid is set to be True. Only one boundary pair is accepted. So, if multiples are provided, the most specific is accepted (bp is most specific). """ #: List of RS Numbers to be ignored self.ignored_rs = [] #: List of RS Numbers to be targeted (ignors all but those listed) self.target_rs = [] #: Indices of loci that are to be dropped #: {chr=>[pos1, pos2, ..., posN]} self.dropped_snps = collections.defaultdict(set) #: Actual boundary details in BP self.bounds = [] #: True if boundary conditions remain true self.valid = True if bp[0] != None and bp[0] + bp[1] > 0: self.bounds = bp elif kb[0] != None and kb[0] + kb[1] > 0: self.bounds = [1000*i for i in kb] elif mb[0] != None and mb[0] + mb[1] > 0: self.bounds = [1000000*i for i in mb] else: self.valid = False if len(self.bounds) > 0: if BoundaryCheck.chrom == -1: raise InvalidBoundarySpec(("--chr must be present for " + "positional filtering to work")) # If there is a meaningful boundary configuration but a meaningless # chromosome in place, then there is a problem try: chr = int(BoundaryCheck.chrom) except: self.valid = False #: Is set once the upper limit has been exceeded self.beyond_upper_bound = False
[docs] def LoadExclusions(self, snps): """ Load locus exclusions. :param snps: Can either be a list of rsids or a file containing rsids. :return: None If snps is a file, the file must only contain RSIDs separated by whitespace (tabs, spaces and return characters). """ snp_names = [] if len(snps) == 1 and os.path.isfile(snps[0]): snp_names = open(snps).read().strip().split() else: snp_names = snps for snp in snp_names: if len(snp.strip()) > 0: self.ignored_rs.append(snp)
[docs] def TestBoundary(self, chr, pos, rsid): """Test if locus is within the boundaries and not to be ignored. :param chr: Chromosome of locus :param pos: BP position of locus :param rsid: RSID (used to check for exclusions) :return: True if locus isn't to be ignored """ # We can skip over anything that has a negative boundary if pos < 0: return False # Skip over anything that has been explicitly ignored if rsid in self.ignored_rs: return False # If Chromosome isn't defined, then we have no bounds if BoundaryCheck.chrom == -1: return pos not in self.dropped_snps[chr] self.beyond_upper_bound = chr > BoundaryCheck.chrom if chr == BoundaryCheck.chrom: if len(self.bounds) == 0: return pos not in self.dropped_snps[chr] if (pos>=self.bounds[0]) and (pos<=self.bounds[1]): return pos not in self.dropped_snps[chr] self.beyond_upper_bound = pos > self.bounds[1] return rsid in self.target_rs
[docs] def NoExclusions(self): """Determine that there are no exclusion criterion in play :return: True if there is no real boundary specification of any kind. Simple method allowing parsers to short circuit the determination of missingness, which can be moderately compute intensive. """ if len(self.ignored_rs) + len(self.target_rs) + len(self.bounds) == 0: return BoundaryCheck.chrom == -1 return False
[docs] def ReportConfiguration(self, f): """Report the boundary configuration details :param f: File (or standard out/err) :return: None """ if BoundaryCheck.chrom != -1: print >> f, BuildReportLine("CHROM", BoundaryCheck.chrom) if len(self.bounds) > 0: print >> f, BuildReportLine("SNP BOUNDARY", "-".join( [str(x) for x in self.bounds])) if len(self.ignored_rs) > 0: print >> f, BuildReportLine("IGNORED RS", ",".join(self.ignored_rs)) if len(self.target_rs) > 0: print >> f, BuildReportLine("TARGET RS", ",".join(self.target_rs))
[docs] def LoadSNPs(self, snps=[]): """Define the SNP inclusions (by RSID). This overrides true boundary \ definition. :param snps: array of RSIDs :return: None This doesn't define RSID ranges, so it throws InvalidBoundarySpec if it encounters what appears to be a range (SNP contains a "-") """ for snp in snps: bounds = snp.split("-") if len(bounds) == 1: if bounds[0] != "": self.target_rs.append(bounds[0]) else: raise InvalidBoundarySpec(snp)