forked from brentp/bw-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbw.py
120 lines (98 loc) · 3.92 KB
/
bw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from ._bigwig import lib, ffi
from collections import namedtuple
import sys
import array
Interval = namedtuple('Interval', ['chrom', 'start', 'end', 'value'])
class BigWig(object):
"""
>>> b = BigWig("libBigWig/test/test.bw")
>>> b
BigWig('libBigWig/test/test.bw')
>>> for interval in b("1", 0, 99):
... interval
Interval(chrom='1', start=0, end=1, value=0.10000000149011612)
Interval(chrom='1', start=1, end=2, value=0.20000000298023224)
Interval(chrom='1', start=2, end=3, value=0.30000001192092896)
# default is to include all values
>>> b.values("1", 0, 9)
array('f', [0.10000000149011612, 0.20000000298023224, 0.30000001192092896, nan, nan, nan, nan, nan, nan])
>>> b.values("1", 0, 9, False)
array('f', [0.10000000149011612, 0.20000000298023224, 0.30000001192092896])
>>> b.stats("1", 0, 9)
0.2000000054637591
>>> b.stats("1", 0, 9, stat="stdev")
0.10000000521540645
>>> b.stats("1", 0, 4, stat="coverage")
0.75
>>> b.stats("1", 0, 4, stat="coverage", nBins=2)
array('d', [1.0, 0.5])
#b.stats("chr1", 0, 10)
>>> b.close()
"""
def __init__(self, path):
self.bw = lib.bwOpen(path.encode(), ffi.NULL, "r".encode())
self.path = path
def close(self):
return lib.bwClose(self.bw)
def __enter__(self):
return self
def __exit__(self, *args):
return self.close()
@property
def chroms(self):
seqs = self.bw.cl
return [(ffi.string(seqs.chrom[i]).decode(),
seqs.len[i]) for i in range(seqs.nKeys)]
def __repr__(self):
return "%s('%s')" % (self.__class__.__name__, self.path)
def __call__(self, chrom, start, end, includeNA=False):
intervals = lib.bwGetValues(self.bw, chrom.encode(), start, end, int(includeNA))
for i in range(intervals.l):
yield Interval(chrom, intervals.start[i], intervals.start[i] + 1, intervals.value[i])
lib.bwDestroyOverlappingIntervals(intervals)
def values(self, chrom, start=0, end=-1, includeNA=True):
if end == -1:
chroms = dict(self.chroms)
end = chroms[chrom] - 1
intervals = lib.bwGetValues(self.bw, chrom.encode(), start, end, int(includeNA))
a = array.array('f')
if intervals != ffi.NULL and intervals.l != 0:
if sys.version_info >= (3, 2):
a.frombytes(ffi.buffer(intervals.value[0:intervals.l]))
else:
a.fromstring(ffi.buffer(intervals.value[0:intervals.l]))
lib.bwDestroyOverlappingIntervals(intervals)
return a
def intervals(self, chrom, start=0, end=-1, includeNA=True):
if end == -1:
chroms = dict(self.chroms)
end = chroms[chrom] - 1
intervals = lib.bwGetOverlappingIntervals(self.bw, chrom.encode(), start, end)
value = array.array('f')
start = array.array('I')
end = array.array('I')
if intervals != ffi.NULL and intervals.l != 0:
value.frombytes(ffi.buffer(intervals.value[0:intervals.l]))
start.frombytes(ffi.buffer(intervals.start[0:intervals.l]))
end.frombytes(ffi.buffer(intervals.end[0:intervals.l]))
lib.bwDestroyOverlappingIntervals(intervals)
return start, end, value
def stats(self, chrom, start, end, stat="mean", nBins=1):
ops = ("mean", "stdev", "max", "min", "coverage")
assert stat in ops, stat
itype = ops.index(stat)
res = lib.bwStats(self.bw, chrom.encode(), start, end, nBins, itype)
if res == ffi.NULL:
return [] if nBins > 1 else None
a = array.array('d')
a.fromstring(ffi.buffer(res[0:nBins]))
ffi.gc(res, lib.free)
if nBins == 1:
return a[0]
return a
def open(fname, mode="r"):
return BigWig(fname)
if __name__ == "__main__":
import doctest
import sys
sys.stderr.write(str(doctest.testmod()) + "\n")