-
Notifications
You must be signed in to change notification settings - Fork 157
/
Copy pathprepare_dbsnp.py
107 lines (97 loc) · 3.51 KB
/
prepare_dbsnp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Prepare sorted and consolidated dbSNP resources for mouse mm10/GRCh38 in VCF format.
"""
import datetime
import ftplib
import gzip
import os
import subprocess
from argparse import ArgumentParser
import re
import shutil
FTP = "ftp.ncbi.nih.gov"
REMOTES = {"mm10": "snp/organisms/mouse_10090/VCF",
"canFam3": "snp/organisms/dog_9615/VCF/"}
def main(org):
work_dir = "tmp-dbsnp-%s" % org
if not os.path.exists(work_dir):
os.makedirs(work_dir)
conn = ftplib.FTP(FTP, "anonymous", "[email protected]")
conn.cwd(REMOTES[org])
os.chdir(work_dir)
files = []
def add_files(x):
if x.endswith("vcf.gz"):
files.append(get_file(x, REMOTES[org], conn))
conn.retrlines("NLST", add_files)
out_file = "%s-dbSNP-%s.vcf" % (org, datetime.datetime.now().strftime("%Y-%m-%d"))
with open(out_file, "w") as out_handle:
for i, f in enumerate(karyotype_sort(files)):
with gzip.open(f) as in_handle:
for line in in_handle:
if line.startswith("#"):
if i == 0:
out_handle.write(line)
else:
out_handle.write("\t".join(fix_info(fix_chrom(line.rstrip().split("\t")))) + "\n")
subprocess.check_call(["bgzip", out_file])
shutil.move(out_file + ".gz", os.path.join(os.pardir, out_file + ".gz"))
os.chdir(os.pardir)
subprocess.check_call(["tabix", "-p", "vcf", out_file + ".gz"])
shutil.rmtree(work_dir)
multi_whitespace = re.compile(r"\s+")
def fix_info(parts):
"""Fix the INFO file to remove whitespace.
"""
parts[7] = multi_whitespace.sub("_", parts[7])
return parts
def fix_chrom(parts):
MAX_CHROMOSOMES = 50
if parts[0] in [str(x) for x in range(1, MAX_CHROMOSOMES)] + ["X", "Y"]:
new_chrom = "chr%s" % parts[0]
elif parts[0] == "MT":
new_chrom = "chrM"
else:
raise NotImplementedError(parts)
parts[0] = new_chrom
return parts
def get_file(x, ftp_dir, conn):
if not os.path.exists(x):
print "Retrieving %s" % x
with open(x, "wb") as out_handle:
conn = ftplib.FTP(FTP, "anonymous", "[email protected]")
conn.cwd(ftp_dir)
conn.retrbinary("RETR %s" % x, out_handle.write)
return x
def karyotype_sort(xs):
"""Sort in karyotypic order to work with GATK's defaults.
"""
def karyotype_keyfn(x):
for suffix in [".gz"]:
if x.endswith(suffix):
x = x[:-len(suffix)]
base = os.path.splitext(os.path.basename(x))[0]
for prefix in ["chr", "vcf_chr_"]:
if base.startswith(prefix):
base = base[len(prefix):]
parts = base.split("_")
try:
parts[0] = int(parts[0])
except ValueError:
pass
# unplaced at the very end
if isinstance(parts[0], basestring) and parts[0].startswith(("Un", "Alt", "Multi", "NotOn")):
parts.insert(0, "z")
# mitochondrial special case -- after X/Y
elif parts[0] in ["M", "MT"]:
parts.insert(0, "x")
# sort random and extra chromosomes after M
elif len(parts) > 1:
parts.insert(0, "y")
return parts
return sorted(xs, key=karyotype_keyfn)
if __name__ == "__main__":
parser = ArgumentParser(description="Prepare a dbSNP file from NCBI.")
parser.add_argument("org_build", choices=REMOTES.keys(),
help="genome build")
args = parser.parse_args()
main(args.org_build)