Skip to content

Commit

Permalink
1. add multi pipelines demo (#117)
Browse files Browse the repository at this point in the history
2. fix some issues on Edge platform

Co-authored-by: zhupengdong <[email protected]>
  • Loading branch information
Lefttyre and zhupengdong authored Dec 17, 2020
1 parent 7f86dd7 commit e45a544
Show file tree
Hide file tree
Showing 15 changed files with 996 additions and 54 deletions.
35 changes: 35 additions & 0 deletions docs/source/application/how_to_build_apps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,38 @@ Pipeline基本骨架的构建
^^^^^^^^^^^^^^^^^^^^

CNStream针对非配置文件方式提供了一些完整的、独立的应用程序开发示例。参见CNStream源代码中 ``samples/example/example.cpp``。

用户侧MessageHandle
---------------------

用户程序可以通过注册的事件监听监测Pipeline的Message信息,目前定义的用户侧Message信息包括EOS_MSG、FRAME_ERR_MSG、STREAM_ERR_MSG、ERROR_MSG(参见StreamMsgType定义)。

各消息处理示例可以参考CNStream源代码 ``samples/demo/demo.cpp``。

1. EOS_MSG
^^^^^^^^^^^^^^^^^^^^

EOS_MSG表示Pipeline数据处理结束,接收到该消息时,可以正常结束Pipeline释放资源等。

2. FRAME_ERR_MSG
^^^^^^^^^^^^^^^^^^^^

FRAME_ERR_MSG表示帧解码失败消息,当前仅支持使用mlu解码JPEG图片场景:

(1)JPEG图片文件形式时,用户侧接收到FRAME_ERR_MSG消息时,可以同时获取解码错误的图片帧信息,包含用户侧定义的stream_id和内部赋值定义的pts、frame_id信息;

(2)从内存中输入JPEG数据流时,用户侧接收到FRAME_ERR_MSG消息时,可以同时获取解码错误的图片帧信息,包含用户侧定义的stream_id、pts和内部赋值定义的frame_id信息;

接收到这些信息后,用户侧可以根据自己的业务逻辑处理解码失败的图片帧,比如丢弃、记录等。

3. STREAM_ERR_MSG
^^^^^^^^^^^^^^^^^^^^

STREAM_ERR_MSG表示某一路数据发生不可恢复错误,通常包括超过内存限制导致的解码器申请失败等。

用户侧接收到该信息时,若希望Pipeline继续进行,将出现错误的数据流移除掉即可(使用Source模块的RemoveSource方法进行特定数据流的卸载),该操作不影响其他正常处理的数据流。

4. ERROR_MSG
^^^^^^^^^^^^^^^^^^^^

ERROR_MSG表示普通的错误信息,目前表示不可恢复错误,建议直接停止Pipeline,并根据log信息进行错误定位。
310 changes: 266 additions & 44 deletions docs/source/modules/Module.rst

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions docs/source/web_ui/web_visualize.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ web可视化工具主要提供以下功能:
- 图形化界面设计和配置pipeline:

* 提供内置的pipeline示例配置,用户可以直接运行示例,快速体验如何使用CNStream开发应用。
* 支持在线设计和配置pipeline。提供内置模块的流程块,支持像绘制流程图一样在web端绘制pipeline,拖动表示模块的流程块至设计框,并通过连线连接数据流向。
* 支持在线设计和配置pipeline。提供内置模块的流程块,支持像绘制流程图一样在web端绘制pipeline,选择表示模块的流程块至设计框,并通过连线连接数据流向。

* 支持修改模块参数配置。
* 提供pipeline配置正确性自动检测,包括基本的模块参数配置和流程图的环自动检测。
* 流程图绘制完成后,可以通过下载为JSON文件或者跳转至主页面运行或预览。

- 支持部分数据源选用和上传:

* 默认支持cars、people、images三种类型的数据源
* 默认支持cars、people、images、objects四种类型的数据源
* 支持上传视频文件为数据源。

- 支持preview和status两种模式运行pipeline。
Expand Down Expand Up @@ -107,7 +107,7 @@ pipeline设计完成后,Web可视化工具会自动生成对应的JSON配置

Web可视化工具提供了pipeline示例,用户可以直接运行示例,并直接在页面查看运行结果。

1. 在web可视化工具主页面,从下拉菜单中选择数据源类型。目前支持cars、people、images三种类型的数据源
1. 在web可视化工具主页面,从下拉菜单中选择数据源类型。目前支持cars、people、images、objects四种类型的数据源
2. 在下拉菜单中选择任务类型类型,目前提供以下一种示例:Classification、Object Detection、Object Tracking、Secondary Classification。
3. 选择Preview或者Status标签。
4. 点击 **RUN** 按钮。
Expand Down
2 changes: 2 additions & 0 deletions framework/core/include/cnstream_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class SourceModule : public Module {
public: // NOLINT
#endif

friend class SourceHandler;

uint32_t GetStreamIndex(const std::string &stream_id);
void ReturnStreamIndex(const std::string &stream_id);
/**
Expand Down
36 changes: 32 additions & 4 deletions modules/cnstream_frame_va.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
#include <sys/types.h>
#include <unistd.h>

#include <cstring>
#include <cmath>
#include <cstring>
#include <map>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -73,7 +73,34 @@ cv::Mat* CNDataFrame::ImageBGR() {
LOGF_IF(FRAME, nullptr == img_data) << "CNDataFrame::ImageBGR() failed to alloc memory";
uint8_t* t = img_data;
for (int i = 0; i < GetPlanes(); ++i) {
#ifdef CNS_MLU220_SOC
void* src = nullptr;
cnrtRet_t ret = cnrtMap(reinterpret_cast<void**>(&src), const_cast<void*>(data[i]->GetCpuData()));
if (ret != CNRT_RET_SUCCESS) {
LOGF(FRAME) << "cnrtMap: failed to cnrtMap(void **host_ptr, void *dev_ptr)";
}
ret = cnrtCacheOperation(src, CNRT_FLUSH_CACHE);
if (ret != CNRT_RET_SUCCESS) {
LOGF(FRAME) << "cnrtCacheOperation: failed to cnrtCacheOperation(void *host_ptr, cnrtCacheOps_t opr)";
}
void* dev_src_found_by_mapped = NULL;
ret = cnrtFindDevAddrByMappedAddr(reinterpret_cast<void*>(src), &dev_src_found_by_mapped);
if (ret != CNRT_RET_SUCCESS) {
LOGF(FRAME) << "cnrtFindDevAddrByMappedAddr: failed to"
<< "cnrtFindDevAddrByMappedAddr(void *mappped_host_ptr, void **dev_ptr)";
}
if (dev_src_found_by_mapped != data[i]->GetCpuData())
LOGF(FRAME) << ("find device address by mapped host failed!\n");
memcpy(t, src, GetPlaneBytes(i));
ret = cnrtUnmap(src);
if (ret != CNRT_RET_SUCCESS) {
LOGF(FRAME) << "cnrtUnmap: failed to"
<< "cnrtUnmap(void *host_ptr)";
}

#else
memcpy(t, data[i]->GetCpuData(), GetPlaneBytes(i));
#endif
t += GetPlaneBytes(i);
}
switch (fmt) {
Expand Down Expand Up @@ -185,7 +212,8 @@ void CNDataFrame::CopyToSyncMem(bool dst_mlu) {
bytes = ROUND_UP(bytes, 64 * 1024);
if (dst_mlu) {
if (dst_device_id < 0 || (ctx.dev_type == DevContext::MLU && ctx.dev_id != dst_device_id)) {
LOGF(FRAME) << "CopyToSyncMem: dst_device_id not set, or ctx.dev_id != dst_device_id" << "," << dst_device_id;
LOGF(FRAME) << "CopyToSyncMem: dst_device_id not set, or ctx.dev_id != dst_device_id"
<< "," << dst_device_id;
std::terminate();
return;
}
Expand Down Expand Up @@ -559,7 +587,7 @@ StringPairs CNInferObject::GetExtraAttributes() {
return StringPairs(extra_attributes_.begin(), extra_attributes_.end());
}

bool CNInferObject::AddFeature(const std::string &key, const CNInferFeature &feature) {
bool CNInferObject::AddFeature(const std::string& key, const CNInferFeature& feature) {
std::lock_guard<std::mutex> lk(feature_mutex_);
if (features_.find(key) != features_.end()) {
return false;
Expand All @@ -568,7 +596,7 @@ bool CNInferObject::AddFeature(const std::string &key, const CNInferFeature &fea
return true;
}

CNInferFeature CNInferObject::GetFeature(const std::string &key) {
CNInferFeature CNInferObject::GetFeature(const std::string& key) {
std::lock_guard<std::mutex> lk(feature_mutex_);
if (features_.find(key) != features_.end()) {
return features_[key];
Expand Down
14 changes: 11 additions & 3 deletions modules/display/src/sdl_video_player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ SDLVideoPlayer::SDLVideoPlayer() {}
SDLVideoPlayer::~SDLVideoPlayer() {}

bool SDLVideoPlayer::Init(int max_chn) {
std::cout << "before init" << std::endl;
LOGI(DISPLAYER) << "before init";
if (!SdlInitTool::instance()->init()) {
return false;
}
std::cout << "before create window" << std::endl;
LOGI(DISPLAYER) << "before create window";
window_ = SDL_CreateWindow("CNStream", 0, 0, window_w_, window_h_, 0);
if (nullptr == window_) {
LOGE(DISPLAYER) << "Create SDL window failed." << SDL_GetError();
Expand All @@ -76,7 +76,7 @@ bool SDLVideoPlayer::Init(int max_chn) {
LOGE(DISPLAYER) << "Create SDL renderer failed." << SDL_GetError();
return false;
}
std::cout << "before create texture" << std::endl;
LOGI(DISPLAYER) << "before create texture";
int pixelf = SDL_PIXELFORMAT_BGR24;
texture_ = SDL_CreateTexture(renderer_, pixelf, SDL_TEXTUREACCESS_STREAMING, window_w_, window_h_);
if (nullptr == texture_) {
Expand Down Expand Up @@ -136,14 +136,22 @@ void SDLVideoPlayer::EventLoop(const std::function<void()>& quit_callback) {
SDL_WaitEvent(&event);
switch (event.type) {
case SDL_MOUSEBUTTONDOWN:
LOGI(DISPLAYER) << "Get SDL_MOUSEBUTTONDOWN EVENT";
mouse_x = event.button.x;
mouse_y = event.button.y;
ClickEventProcess(mouse_x, mouse_y);
break;
case REFRESH_EVENT:
Refresh();
break;
case SDL_WINDOWEVENT:
if (SDL_WINDOWEVENT_CLOSE == event.window.event) {
LOGI(DISPLAYER) << "Get SDL Close Window EVENT";
if (quit_callback) quit_callback();
}
break;
case SDL_QUIT:
LOGI(DISPLAYER) << "Get SDL QUIT EVENT";
if (quit_callback) quit_callback();
break;
} // switch (event.type)
Expand Down
4 changes: 4 additions & 0 deletions modules/source/src/data_handler_rawimg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ bool RawImgMemHandlerImpl::ProcessImage(const uint8_t *img_data, const int size,
if (!dataframe) return false;

if (param_.output_type_ == OUTPUT_MLU) {
#ifdef CNS_MLU220_SOC
dataframe->ctx.dev_type = DevContext::MLU_CPU;
#else
dataframe->ctx.dev_type = DevContext::MLU;
#endif
dataframe->ctx.dev_id = param_.device_id_;
dataframe->ctx.ddr_channel = -1; // FIXME
} else {
Expand Down
7 changes: 7 additions & 0 deletions modules/source/src/data_handler_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,20 @@ int SourceRender::Process(std::shared_ptr<CNFrameInfo> frame_info,
return -1;
}
}
#ifdef CNS_MLU220_SOC
dataframe->ctx.dev_type = DevContext::MLU_CPU;
#else
dataframe->ctx.dev_type = DevContext::MLU;
#endif
// device_id in callback-frame incorrect, use device_id saved in params instead
dataframe->ctx.dev_id = param_.device_id_;
dataframe->ctx.ddr_channel = -1;
for (int i = 0; i < dataframe->GetPlanes(); i++) {
dataframe->stride[i] = frame->stride[i];
dataframe->ptr_mlu[i] = reinterpret_cast<void *>(frame->plane[i]);
#ifdef CNS_MLU220_SOC
dataframe->ptr_cpu[i] = reinterpret_cast<void *>(frame->plane[i]);
#endif
}

if (OUTPUT_MLU == param_.output_type_) {
Expand Down
19 changes: 19 additions & 0 deletions modules/source/src/data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ static int GetDeviceId(ModuleParamSet paramSet) {
}

bool DataSource::Open(ModuleParamSet paramSet) {
if(!CheckParamSet(paramSet)) {
return false;
}
if (paramSet.find("output_type") != paramSet.end()) {
std::string out_type = paramSet["output_type"];
if (out_type == "cpu") {
Expand Down Expand Up @@ -172,6 +175,12 @@ bool DataSource::CheckParamSet(const ModuleParamSet &paramSet) const {
LOGE(SOURCE) << "[DataSource] [output_type] MLU : device_id must be set";
ret = false;
}
#ifdef CNS_MLU220_SOC
if (paramSet.at("output_type") != "mlu") {
LOGE(SOURCE) << "[DataSource] [output_type] should be mlu for mlu220 edge";
ret = false;
}
#endif
}

std::string err_msg;
Expand All @@ -197,6 +206,16 @@ bool DataSource::CheckParamSet(const ModuleParamSet &paramSet) const {
ret = false;
}
}
#ifdef CNS_MLU220_SOC
if (dec_type != "mlu") {
LOGE(SOURCE) << "[DataSource] [decoder_type] should be mlu for mlu220 edge";
ret = false;
}
if (paramSet.find("reuse_cndec_buf") != paramSet.end() && paramSet.at("reuse_cndec_buf") != "true"){
LOGE(SOURCE) << "[DataSource] [reuse_cndec_buf] should be true for mlu220 edge";
ret = false;
}
#endif
}

return ret;
Expand Down
7 changes: 7 additions & 0 deletions samples/demo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ include_directories("${CMAKE_CURRENT_SOURCE_DIR}/../../modules/ipc/include")
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/../../modules/rtsp_sink/include")

list(APPEND Example_INCLUDE_DIRS ${3RDPARTY_INCLUDE_DIRS})
list(APPEND Example_INCLUDE_DIRS "${Example_DIR}/demo")
list(APPEND Example_INCLUDE_DIRS "${Example_DIR}/../include")
list(APPEND Example_INCLUDE_DIRS "${Example_DIR}/../framework/core/include")

Expand All @@ -38,6 +39,12 @@ endif()
add_dependencies(${name} cnstream_va cnstream_core)
target_link_libraries(${name} ${Example_LINKER_LIBS})


add_executable(multi_pipelines ${CMAKE_CURRENT_SOURCE_DIR}/multi_pipelines/multi_pipelines.cpp util.cpp ${preprocess_srcs} ${postprocess_srcs} ${obj_filter_srcs})

add_dependencies(multi_pipelines cnstream_va cnstream_core)
target_link_libraries(multi_pipelines ${Example_LINKER_LIBS})

if(build_modules_contrib)
#force load all the modules
set(CMAKE_EXE_LINKER_FLAGS "-Wl,--no-as-needed")
Expand Down
2 changes: 2 additions & 0 deletions samples/demo/multi_pipelines/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Instruction #
This Demo shows how to run two pipelines in one App. The first pipeline runs on card 0 while second works on card 1.
58 changes: 58 additions & 0 deletions samples/demo/multi_pipelines/first_pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"source" : {
"class_name" : "cnstream::DataSource",
"parallelism" : 0,
"next_modules" : ["detector"],
"show_perf_info" : true,
"custom_params" : {
"output_type" : "mlu",
"decoder_type" : "mlu",
"output_buf_number" : 8,
"device_id" : 0
}
},

"detector" : {
"class_name" : "cnstream::Inferencer",
"parallelism" : 1,
"max_input_queue_size" : 20,
"next_modules" : ["osd"],
"show_perf_info" : true,
"custom_params" : {
"model_path" : "../../../data/models/MLU270/Primary_Detector/ssd/resnet34_ssd.cambricon",
"func_name" : "subnet0",
"postproc_name" : "PostprocSsd",
"threshold" : 0.5,
"batching_timeout" : 100,
"device_id" : 0
}
},

"osd" : {
"class_name" : "cnstream::Osd",
"parallelism" : 4,
"max_input_queue_size" : 20,
"next_modules" : ["displayer"],
"show_perf_info" : true,
"custom_params" : {
// "chinese_font_path" : "/user/include/wqy_zenhei.ttf",
"label_path" : "../../../data/models/MLU270/Primary_Detector/ssd/label_voc.txt"
}
},

"displayer" : {
"class_name" : "cnstream::Displayer",
"parallelism" : 1,
"max_input_queue_size" : 20,
"show_perf_info" : true,
"custom_params" : {
"window-width" : 1920,
"window-height" : 1080,
"refresh-rate" : 25,
"max-channels" : 32,
"show" : "false", // Please confirm build_display is ON before setting true
"full-screen" : "false"
}
}
}

Loading

0 comments on commit e45a544

Please sign in to comment.