Skip to content

Commit

Permalink
support pipeline multiple starts (#66)
Browse files Browse the repository at this point in the history
Co-authored-by: zhupengdong <[email protected]>
  • Loading branch information
Lefttyre and zhupengdong authored Jul 28, 2020
1 parent 63d202d commit d73ee1e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
1 change: 0 additions & 1 deletion framework/core/include/cnstream_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ class Pipeline {
std::unordered_map<std::string, ModuleAssociatedInfo> modules_;
std::unordered_map<std::string, CNModuleConfig> modules_config_;
std::unordered_map<std::string, std::vector<std::string>> connections_config_;
std::mutex stop_mtx_;
uint64_t eos_mask_ = 0;

std::vector<std::string> stream_ids_;
Expand Down
3 changes: 2 additions & 1 deletion framework/core/src/cnstream_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ bool Pipeline::QueryLinkStatus(LinkStatus* status, const std::string& link_id) {
}

bool Pipeline::Start() {
if (IsRunning()) return true;

// set eos mask
SetEOSMask();
// open modules
Expand Down Expand Up @@ -391,7 +393,6 @@ bool Pipeline::Start() {
}

bool Pipeline::Stop() {
std::lock_guard<std::mutex> lk(stop_mtx_);
if (!IsRunning()) return true;

// stop data transmit
Expand Down
4 changes: 0 additions & 4 deletions modules/source/src/data_handler_rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,6 @@ bool RtspHandlerImpl::Open() {
// updated with paramSet
DataSource *source = dynamic_cast<DataSource *>(module_);
param_ = source->GetSourceParam();
if (param_.output_type_ != OUTPUT_MLU) {
LOG(ERROR) << "output_type not supported:" << param_.output_type_;
return false;
}
this->interval_ = param_.interval_;

SetPerfManager(source->GetPerfManager(stream_id_));
Expand Down
14 changes: 10 additions & 4 deletions samples/demo/demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class MsgObserver : cnstream::StreamMsgObserver {
LOG(ERROR) << "[Observer] received ERROR_MSG";
stop_ = true;
}
wakener_.notify_one();
if (stop_) {
wakener_.notify_one();
}
}

void WaitForStop() {
Expand All @@ -98,6 +100,7 @@ class MsgObserver : cnstream::StreamMsgObserver {

void IncreaseStreamCnt() { stream_cnt_++; }
void DecreaseStreamCnt() { stream_cnt_--; }
int GetStreamCnt() { return stream_cnt_.load(); }

private:
std::atomic<int> stream_cnt_;
Expand Down Expand Up @@ -294,9 +297,6 @@ int main(int argc, char** argv) {
cnstream::DataSource* source = dynamic_cast<cnstream::DataSource*>(pipeline.GetModule("source"));
#ifdef BUILD_IPC
cnstream::ModuleIPC* ipc = dynamic_cast<cnstream::ModuleIPC*>(pipeline.GetModule("ipc"));
if (ipc != nullptr) {
ipc->SetChannelCount(video_urls.size());
}
if (nullptr == source && (nullptr == ipc)) {
LOG(ERROR) << "DataSource && ModuleIPC module both not found.";
#else
Expand Down Expand Up @@ -357,6 +357,12 @@ int main(int argc, char** argv) {
}
}

#ifdef BUILD_IPC
if (nullptr != ipc) {
ipc->SetChannelCount(msg_observer.GetStreamCnt());
}
#endif

auto quit_callback = [&pipeline, streams, &source]() {
// stop feed-data threads before remove-sources...
thread_running.store(false);
Expand Down

0 comments on commit d73ee1e

Please sign in to comment.