forked from rrwick/Unicycler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoverlap_removal_test.py
130 lines (101 loc) · 4.83 KB
/
overlap_removal_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
Copyright 2017 Ryan Wick ([email protected])
https://github.com/rrwick/Unicycler
This script generates random sequences with lots of repeats and then removes overlaps. It outputs
a table of information with the time taken for overlap removal.
This file is part of Unicycler. Unicycler is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by the Free Software Foundation,
either version 3 of the License, or (at your option) any later version. Unicycler is distributed in
the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details. You should have received a copy of the GNU General Public License along with Unicycler. If
not, see <http://www.gnu.org/licenses/>.
"""
import os
import subprocess
import shutil
import random
import datetime
import sys
sys.path.insert(0, os.getcwd())
import unicycler.assembly_graph
import unicycler.misc
import unicycler.log
import test.fake_reads
col_widths = [10, 12, 13, 10, 12, 9]
def main():
random.seed(0)
print()
unicycler.log.logger = unicycler.log.Log(log_filename=None, stdout_verbosity_level=0)
header_row = ['Seq length', 'Repeat count', 'Segment count', 'Edge count', 'Overlap/kmer',
'Time (ms)']
unicycler.misc.print_table([header_row], col_separation=3, header_format='underline', indent=0,
alignments='RRRRRR', fixed_col_widths=col_widths, verbosity=0)
try:
while True:
overlap_removal_test()
# The user exits this script with Ctrl-C
except KeyboardInterrupt:
temp_dir = 'TEST_TEMP_' + str(os.getpid())
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
def overlap_removal_test():
random_seq_length = random.randint(8, 20) ** 4
repeat_count = random.randint(1, random_seq_length // 10)
random_seq = make_repeaty_sequence(random_seq_length, repeat_count)
out_dir = test.fake_reads.make_fake_reads(random_seq)
output_line = [str(random_seq_length), str(repeat_count)]
with open(os.path.join(out_dir, 'original_seq.fasta'), 'w') as fasta:
fasta.write('>seq')
fasta.write('\n')
fasta.write(random_seq)
fasta.write('\n')
run_spades(out_dir)
check_one_graph_overlap(out_dir, 21, output_line)
check_one_graph_overlap(out_dir, 41, output_line)
check_one_graph_overlap(out_dir, 61, output_line)
check_one_graph_overlap(out_dir, 81, output_line)
shutil.rmtree(out_dir)
def make_repeaty_sequence(length, repeat_count):
seq = unicycler.misc.get_random_sequence(length)
for i in range(repeat_count):
repeat_length = random.randint(10, 500)
repeat_instances = random.randint(2, 20)
repeat_seq = unicycler.misc.get_random_sequence(repeat_length)
for j in range(repeat_instances):
if random.randint(0, 1) == 1:
repeat_seq = unicycler.misc.reverse_complement(repeat_seq)
repeat_pos = random.randint(0, length-repeat_length)
seq = seq[:repeat_pos] + repeat_seq + seq[repeat_pos + repeat_length:]
assert len(seq) == length
return seq
def run_spades(out_dir):
reads_1 = os.path.join(out_dir, 'reads_1.fastq')
reads_2 = os.path.join(out_dir, 'reads_2.fastq')
reads_unpaired = os.path.join(out_dir, 'reads_unpaired.fastq')
spades_cmd = ['spades.py', '-o', out_dir, '--only-assembler', '-k', '21,41,61,81']
if os.path.isfile(reads_1):
spades_cmd += ['-1', reads_1, '-2', reads_2]
if os.path.isfile(reads_unpaired):
spades_cmd += ['-s', reads_unpaired]
p = subprocess.Popen(spades_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
return stdout.decode(), stderr.decode()
def check_one_graph_overlap(out_dir, k_size, output_line):
k_dir = 'K' + str(k_size)
graph_file = os.path.join(out_dir, k_dir, 'assembly_graph.fastg')
graph = unicycler.assembly_graph.AssemblyGraph(graph_file, k_size)
seg_count = len(graph.segments)
edge_count = sum(len(x) for x in graph.forward_links.values()) // 2
start_time = datetime.datetime.now()
graph.remove_all_overlaps()
end_time = datetime.datetime.now()
milliseconds = (end_time - start_time).total_seconds() * 1000
assert graph.overlap == 0
graph.save_to_gfa(graph_file + '.gfa')
table_row = output_line + [str(seg_count), str(edge_count), str(k_size), '%.1f' % milliseconds]
unicycler.misc.print_table([table_row], col_separation=3, header_format='normal', indent=0,
alignments='RRRRRR', fixed_col_widths=col_widths, verbosity=0,
left_align_header=False, bottom_align_header=False)
if __name__ == '__main__':
main()