Skip to content

Commit

Permalink
Improve multi-threaded performance
Browse files Browse the repository at this point in the history
  • Loading branch information
DerrickWood committed Dec 14, 2014
1 parent 8812161 commit 7730317
Showing 1 changed file with 39 additions and 43 deletions.
82 changes: 39 additions & 43 deletions src/classify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,52 +155,48 @@ void process_file(char *filename) {
else
reader = new FastaReader(file_str);

vector<DNASequence> work_units[Num_threads];
ostringstream kraken_output[Num_threads],
classified_output[Num_threads],
unclassified_output[Num_threads];
while (reader->is_valid()) {
// Get work for each thread
for (int i = 0; i < Num_threads; i++) {
work_units[i].clear();
#pragma omp parallel
{
vector<DNASequence> work_unit;
ostringstream kraken_output_ss, classified_output_ss, unclassified_output_ss;

while (reader->is_valid()) {
work_unit.clear();
size_t total_nt = 0;
while (total_nt < Work_unit_size) {
dna = reader->next_sequence();
if (! reader->is_valid())
break;
total_sequences++;
work_units[i].push_back(dna);
total_nt += dna.seq.size();
#pragma omp critical(get_input)
{
while (total_nt < Work_unit_size) {
dna = reader->next_sequence();
if (! reader->is_valid())
break;
work_unit.push_back(dna);
total_nt += dna.seq.size();
}
}
if (total_nt == 0)
break;

kraken_output_ss.str("");
classified_output_ss.str("");
unclassified_output_ss.str("");
for (size_t j = 0; j < work_unit.size(); j++)
classify_sequence( work_unit[j], kraken_output_ss,
classified_output_ss, unclassified_output_ss );

#pragma omp critical(write_output)
{
if (Print_kraken)
(*Kraken_output) << kraken_output_ss.str();
if (Print_classified)
(*Classified_output) << classified_output_ss.str();
if (Print_unclassified)
(*Unclassified_output) << unclassified_output_ss.str();
total_sequences += work_unit.size();
total_bases += total_nt;
cerr << "\rProcessed " << total_sequences << " sequences (" << total_bases << " bp) ...";
}
total_bases += total_nt;
}

// Classification loop
#pragma omp parallel for
for (int i = 0; i < Num_threads; i++) {
if (Print_kraken)
kraken_output[i].str("");
if (Print_classified)
classified_output[i].str("");
if (Print_unclassified)
unclassified_output[i].str("");
for (size_t j = 0; j < work_units[i].size(); j++)
classify_sequence( work_units[i][j], kraken_output[i],
classified_output[i], unclassified_output[i] );
}

// Report loop
for (int i = 0; i < Num_threads; i++) {
if (Print_kraken)
(*Kraken_output) << kraken_output[i].str();
if (Print_classified)
(*Classified_output) << classified_output[i].str();
if (Print_unclassified)
(*Unclassified_output) << unclassified_output[i].str();
}

cerr << "\rProcessed " << total_sequences << " sequences (" << total_bases << " bp) ...";
}
} // end parallel section

delete reader;
}
Expand Down

0 comments on commit 7730317

Please sign in to comment.