Skip to content

Commit

Permalink
[INSTX-1190] Fix up places that weren't moving values into sinks
Browse files Browse the repository at this point in the history
This should catch issues like INSTX-1190 from happening again.
  • Loading branch information
blawrence-ont committed Aug 23, 2023
1 parent a1bd076 commit 6c9e736
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 22 deletions.
2 changes: 1 addition & 1 deletion dorado/read_pipeline/BaseSpaceDuplexCallerNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void BaseSpaceDuplexCallerNode::basespace(const std::string& template_read_id,
duplex_read->read_id = template_read->read_id + ";" + complement_read->read_id;
duplex_read->read_tag = template_read->read_tag;

send_message_to_sink(duplex_read);
send_message_to_sink(std::move(duplex_read));
}
edlibFreeAlignResult(result);
}
Expand Down
10 changes: 5 additions & 5 deletions dorado/read_pipeline/DuplexReadTaggingNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ void DuplexReadTaggingNode::worker_thread() {
// the ones whose duplex offsprings never came. They are retagged to not be
// duplex parents and then sent downstream.
if (!read->is_duplex && !read->is_duplex_parent) {
send_message_to_sink(read);
send_message_to_sink(std::move(read));
} else if (read->is_duplex) {
std::string template_read_id = read->read_id.substr(0, read->read_id.find(';'));
std::string complement_read_id =
read->read_id.substr(read->read_id.find(';') + 1, read->read_id.length());

send_message_to_sink(read);
send_message_to_sink(std::move(read));

for (auto& rid : {template_read_id, complement_read_id}) {
if (m_parents_processed.find(rid) != m_parents_processed.end()) {
Expand All @@ -61,7 +61,7 @@ void DuplexReadTaggingNode::worker_thread() {
if (find_parent != m_duplex_parents.end()) {
// Parent read has been seen. Process it and send it
// downstream.
send_message_to_sink(find_parent->second);
send_message_to_sink(std::move(find_parent->second));
m_parents_processed.insert(rid);
m_duplex_parents.erase(find_parent);
} else {
Expand All @@ -76,8 +76,8 @@ void DuplexReadTaggingNode::worker_thread() {
// If a read is in the parents wanted list, then sent it downstream
// and add it to the set of processed reads. It will also be removed
// from the parent reads being looked for.
send_message_to_sink(read);
m_parents_processed.insert(read->read_id);
send_message_to_sink(std::move(read));
m_parents_wanted.erase(find_read);
} else {
// No duplex offspring is seen so far, so hold it and track
Expand All @@ -89,7 +89,7 @@ void DuplexReadTaggingNode::worker_thread() {

for (auto& [k, v] : m_duplex_parents) {
v->is_duplex_parent = false;
send_message_to_sink(v);
send_message_to_sink(std::move(v));
}
}

Expand Down
2 changes: 1 addition & 1 deletion dorado/read_pipeline/FakeDataLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void FakeDataLoader::load_reads(const int num_reads) {
fake_read->raw_data = torch::randint(0, 10000, {read_size}, torch::kInt16);
fake_read->read_id = "Placeholder-read-id";

m_pipeline.push_message(fake_read);
m_pipeline.push_message(std::move(fake_read));
}
}

Expand Down
2 changes: 1 addition & 1 deletion dorado/read_pipeline/ModBaseCallerNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void ModBaseCallerNode::input_worker_thread() {
++m_working_reads_size;
} else {
// No modbases to call, pass directly to next node
send_message_to_sink(read);
send_message_to_sink(std::move(read));
++m_num_non_mod_base_reads_pushed;
}
break;
Expand Down
18 changes: 9 additions & 9 deletions dorado/read_pipeline/PairingNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ void PairingNode::pair_generating_worker_thread(int tid) {
std::unique_lock<std::mutex> lock(m_pairing_mtx);
auto flush_message = std::get<CacheFlushMessage>(message);
auto& read_cache = m_read_caches[flush_message.client_id];
for (const auto& [key, reads_list] : read_cache.channel_mux_read_map) {
for (auto& [key, reads_list] : read_cache.channel_mux_read_map) {
// kv is a std::pair<UniquePoreIdentifierKey, std::list<std::shared_ptr<Read>>>
for (const auto& read_ptr : reads_list) {
for (auto& read_ptr : reads_list) {
// Push each read message
send_message_to_sink(std::move(read_ptr));
}
Expand Down Expand Up @@ -378,8 +378,8 @@ void PairingNode::pair_generating_worker_thread(int tid) {
ok_to_clear = true;
}
if (ok_to_clear) {
send_message_to_sink(std::move(*to_clear_itr));
to_clear_itr = m_reads_to_clear.erase(to_clear_itr);
auto read_handle = m_reads_to_clear.extract(*to_clear_itr++);
send_message_to_sink(std::move(read_handle.value()));
} else {
++to_clear_itr;
}
Expand All @@ -391,14 +391,14 @@ void PairingNode::pair_generating_worker_thread(int tid) {
std::unique_lock<std::mutex> lock(m_pairing_mtx);
// There are still reads in channel_mux_read_map. Push them to the sink.
// Last thread alive is responsible for cleaning up the cache.
for (const auto& [client_id, read_cache] : m_read_caches) {
for (const auto& kv : read_cache.channel_mux_read_map) {
for (auto& [client_id, read_cache] : m_read_caches) {
for (auto& kv : read_cache.channel_mux_read_map) {
// kv is a std::pair<UniquePoreIdentifierKey, std::list<std::shared_ptr<Read>>>
const auto& reads_list = kv.second;
auto& reads_list = kv.second;

for (const auto& read_ptr : reads_list) {
for (auto& read_ptr : reads_list) {
// Push each read message
send_message_to_sink(read_ptr);
send_message_to_sink(std::move(read_ptr));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion dorado/read_pipeline/ReadFilterNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void ReadFilterNode::worker_thread() {
(m_read_ids_to_filter.find(read->read_id) != m_read_ids_to_filter.end())) {
log_filtering();
} else {
send_message_to_sink(read);
send_message_to_sink(std::move(read));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion dorado/read_pipeline/ScalerNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void ScalerNode::worker_thread() {
read->num_trimmed_samples = trim_start;

// Pass the read to the next node
send_message_to_sink(read);
send_message_to_sink(std::move(read));
}
}

Expand Down
3 changes: 2 additions & 1 deletion dorado/read_pipeline/StereoDuplexEncoderNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ void StereoDuplexEncoderNode::worker_thread() {
read_pair->read_1, read_pair->read_2, read_pair->read_1_start,
read_pair->read_1_end, read_pair->read_2_start, read_pair->read_2_end);

send_message_to_sink(stereo_encoded_read); // Stereo-encoded read created, send it to sink
send_message_to_sink(
std::move(stereo_encoded_read)); // Stereo-encoded read created, send it to sink
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/ReadFilterNodeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ TEST_CASE("ReadFilterNode: Filter read based on read name", TEST_GROUP) {
read_2->attributes.fast5_filename = "batch_0.fast5";

dorado::ReadFilterNode filter(sink, 0 /*min_qscore*/, 0, {"read_2"}, 2 /*threads*/);
filter.push_message(read_1);
filter.push_message(read_2);
filter.push_message(std::move(read_1));
filter.push_message(std::move(read_2));
}

auto messages = sink.get_messages();
Expand Down

0 comments on commit 6c9e736

Please sign in to comment.