Skip to content

Commit

Permalink
1. cross compile libyuv (#115)
Browse files Browse the repository at this point in the history
2. fix connector cpu usage is too high
3. update cnstream system log
4. lock guard objs push back

Co-authored-by: zhupengdong <[email protected]>
  • Loading branch information
Lefttyre and zhupengdong authored Dec 8, 2020
1 parent d4a36af commit ef336c1
Show file tree
Hide file tree
Showing 90 changed files with 563 additions and 602 deletions.
2 changes: 2 additions & 0 deletions framework/core/include/cnstream_logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <time.h>

#include <string>
#include <streambuf>
#include <ostream>

/**
* @brief Log filter.
Expand Down
4 changes: 2 additions & 2 deletions framework/core/include/cnstream_module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <cxxabi.h>
#include <unistd.h>

#include <glog/logging.h>
#include "cnstream_logging.hpp"

#include <atomic>
#include <functional>
Expand Down Expand Up @@ -362,7 +362,7 @@ class ModuleFactory {
static ModuleFactory *Instance() {
if (nullptr == factory_) {
factory_ = new (std::nothrow) ModuleFactory();
LOG_IF(FATAL, nullptr == factory_) << "ModuleFactory::Instance() new ModuleFactory failed.";
LOGF_IF(CORE, nullptr == factory_) << "ModuleFactory::Instance() new ModuleFactory failed.";
}
return (factory_);
}
Expand Down
4 changes: 1 addition & 3 deletions framework/core/include/cnstream_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class SourceModule : public Module {

int Process(std::shared_ptr<CNFrameInfo> data) override {
(void)data;
LOG(ERROR) << "As a source module, Process() should not be invoked\n";
LOGE(CORE) << "As a source module, Process() should not be invoked\n";
return 0;
}

Expand All @@ -84,8 +84,6 @@ class SourceModule : public Module {
public: // NOLINT
#endif

friend class SourceHandler;

uint32_t GetStreamIndex(const std::string &stream_id);
void ReturnStreamIndex(const std::string &stream_id);
/**
Expand Down
36 changes: 18 additions & 18 deletions framework/core/src/cnstream_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>

#include <glog/logging.h>
#include "cnstream_logging.hpp"

#include <algorithm>
#include <chrono>
Expand All @@ -42,7 +42,7 @@ namespace cnstream {
bool CNModuleConfig::ParseByJSONStr(const std::string& jstr) {
rapidjson::Document doc;
if (doc.Parse<rapidjson::kParseCommentsFlag>(jstr.c_str()).HasParseError()) {
LOG(ERROR) << "Parse module configuration failed. Error code [" << std::to_string(doc.GetParseError()) << "]"
LOGE(CORE) << "Parse module configuration failed. Error code [" << std::to_string(doc.GetParseError()) << "]"
<< " Offset [" << std::to_string(doc.GetErrorOffset()) << "]. JSON:" << jstr;
return false;
}
Expand All @@ -52,11 +52,11 @@ bool CNModuleConfig::ParseByJSONStr(const std::string& jstr) {

// className
if (end == doc.FindMember("class_name")) {
LOG(ERROR) << "Module has to have a class_name.";
LOGE(CORE) << "Module has to have a class_name.";
return false;
} else {
if (!doc["class_name"].IsString()) {
LOG(ERROR) << "class_name must be string type.";
LOGE(CORE) << "class_name must be string type.";
return false;
}
this->className = doc["class_name"].GetString();
Expand All @@ -65,13 +65,13 @@ bool CNModuleConfig::ParseByJSONStr(const std::string& jstr) {
// parallelism
if (end != doc.FindMember("parallelism")) {
if (!doc["parallelism"].IsUint()) {
LOG(ERROR) << "parallelism must be uint type.";
LOGE(CORE) << "parallelism must be uint type.";
return false;
}
this->parallelism = doc["parallelism"].GetUint();
if (this->className != "cnstream::DataSource" && this->className != "cnstream::TestDataSource" &&
this->className != "cnstream::ModuleIPC" && this->parallelism < 1) {
LOG(ERROR) << "parallelism must be larger than 0, when class name is " << this->className;
LOGE(CORE) << "parallelism must be larger than 0, when class name is " << this->className;
return false;
}
} else {
Expand All @@ -81,7 +81,7 @@ bool CNModuleConfig::ParseByJSONStr(const std::string& jstr) {
// maxInputQueueSize
if (end != doc.FindMember("max_input_queue_size")) {
if (!doc["max_input_queue_size"].IsUint()) {
LOG(ERROR) << "max_input_queue_size must be uint type.";
LOGE(CORE) << "max_input_queue_size must be uint type.";
return false;
}
this->maxInputQueueSize = doc["max_input_queue_size"].GetUint();
Expand All @@ -92,7 +92,7 @@ bool CNModuleConfig::ParseByJSONStr(const std::string& jstr) {
// enablePerfInfo
if (end != doc.FindMember("show_perf_info")) {
if (!doc["show_perf_info"].IsBool()) {
LOG(ERROR) << "show_perf_info must be Boolean type.";
LOGE(CORE) << "show_perf_info must be Boolean type.";
return false;
}
this->showPerfInfo = doc["show_perf_info"].GetBool();
Expand All @@ -103,13 +103,13 @@ bool CNModuleConfig::ParseByJSONStr(const std::string& jstr) {
// next
if (end != doc.FindMember("next_modules")) {
if (!doc["next_modules"].IsArray()) {
LOG(ERROR) << "next_modules must be array type.";
LOGE(CORE) << "next_modules must be array type.";
return false;
}
auto values = doc["next_modules"].GetArray();
for (auto iter = values.begin(); iter != values.end(); ++iter) {
if (!iter->IsString()) {
LOG(ERROR) << "next_modules must be an array of strings.";
LOGE(CORE) << "next_modules must be an array of strings.";
return false;
}
this->next.push_back(iter->GetString());
Expand All @@ -122,7 +122,7 @@ bool CNModuleConfig::ParseByJSONStr(const std::string& jstr) {
if (end != doc.FindMember("custom_params")) {
rapidjson::Value& custom_params = doc["custom_params"];
if (!custom_params.IsObject()) {
LOG(ERROR) << "custom_params must be an object.";
LOGE(CORE) << "custom_params must be an object.";
return false;
}
this->parameters.clear();
Expand All @@ -148,7 +148,7 @@ bool CNModuleConfig::ParseByJSONFile(const std::string& jfname) {
std::ifstream ifs(jfname);

if (!ifs.is_open()) {
LOG(ERROR) << "File open failed :" << jfname;
LOGE(CORE) << "File open failed :" << jfname;
return false;
}

Expand All @@ -173,7 +173,7 @@ bool CNModuleConfig::ParseByJSONFile(const std::string& jfname) {
jf_dir += '/';

if (this->parameters.end() != this->parameters.find(CNS_JSON_DIR_PARAM_NAME)) {
LOG(WARNING) << "Parameter [" << CNS_JSON_DIR_PARAM_NAME << "] does not take effect. It is set "
LOGW(CORE) << "Parameter [" << CNS_JSON_DIR_PARAM_NAME << "] does not take effect. It is set "
<< "up by cnstream as the directory where the configuration file is located and passed to the module.";
}

Expand All @@ -184,7 +184,7 @@ bool CNModuleConfig::ParseByJSONFile(const std::string& jfname) {
bool ConfigsFromJsonFile(const std::string& config_file, std::vector<CNModuleConfig>& configs) { // NOLINT
std::ifstream ifs(config_file);
if (!ifs.is_open()) {
LOG(ERROR) << "Failed to open file: " << config_file;
LOGE(CORE) << "Failed to open file: " << config_file;
return false;
}

Expand All @@ -195,7 +195,7 @@ bool ConfigsFromJsonFile(const std::string& config_file, std::vector<CNModuleCon
std::vector<std::string> namelist;
rapidjson::Document doc;
if (doc.Parse<rapidjson::kParseCommentsFlag>(jstr.c_str()).HasParseError()) {
LOG(ERROR) << "Parse pipeline configuration failed. Error code [" << std::to_string(doc.GetParseError()) << "]"
LOGE(CORE) << "Parse pipeline configuration failed. Error code [" << std::to_string(doc.GetParseError()) << "]"
<< " Offset [" << std::to_string(doc.GetErrorOffset()) << "]. ";
return false;
}
Expand All @@ -204,7 +204,7 @@ bool ConfigsFromJsonFile(const std::string& config_file, std::vector<CNModuleCon
CNModuleConfig mconf;
mconf.name = iter->name.GetString();
if (find(namelist.begin(), namelist.end(), mconf.name) != namelist.end()) {
LOG(ERROR) << "Module name should be unique in Jason file. Module name : [" << mconf.name + "]"
LOGE(CORE) << "Module name should be unique in Jason file. Module name : [" << mconf.name + "]"
<< " appeared more than one time.";
return false;
}
Expand All @@ -215,7 +215,7 @@ bool ConfigsFromJsonFile(const std::string& config_file, std::vector<CNModuleCon
iter->value.Accept(jwriter);

if (!mconf.ParseByJSONStr(std::string(sbuf.GetString()))) {
LOG(ERROR) << "Parse module config failed. Module name : [" << mconf.name << "]";
LOGE(CORE) << "Parse module config failed. Module name : [" << mconf.name << "]";
return false;
}

Expand All @@ -233,7 +233,7 @@ bool ConfigsFromJsonFile(const std::string& config_file, std::vector<CNModuleCon
jf_dir += '/';

if (mconf.parameters.end() != mconf.parameters.find(CNS_JSON_DIR_PARAM_NAME)) {
LOG(WARNING)
LOGW(CORE)
<< "Parameter [" << CNS_JSON_DIR_PARAM_NAME << "] does not take effect. It is set "
<< "up by cnstream as the directory where the configuration file is located and passed to the module.";
}
Expand Down
10 changes: 5 additions & 5 deletions framework/core/src/cnstream_eventbus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ const std::list<BusWatcher> &EventBus::GetBusWatchers() const {

bool EventBus::PostEvent(Event event) {
if (!running_.load()) {
LOG(WARNING) << "Post event failed, pipeline not running";
LOGW(CORE) << "Post event failed, pipeline not running";
return false;
}
// LOG(INFO) << "Recieve Event from [" << event.module->GetName() << "] :" << event.message;
// LOGI(CORE) << "Recieve Event from [" << event.module->GetName() << "] :" << event.message;
queue_.Push(event);
#ifdef UNIT_TEST
if (unit_test) {
Expand Down Expand Up @@ -105,10 +105,10 @@ void EventBus::EventLoop() {
while (IsRunning()) {
Event event = PollEvent();
if (event.type == EVENT_INVALID) {
LOG(INFO) << "[EventLoop] event type is invalid";
LOGI(CORE) << "[EventLoop] event type is invalid";
break;
} else if (event.type == EVENT_STOP) {
LOG(INFO) << "[EventLoop] Get stop event";
LOGI(CORE) << "[EventLoop] Get stop event";
break;
}
std::unique_lock<std::mutex> lk(watcher_mtx_);
Expand All @@ -122,7 +122,7 @@ void EventBus::EventLoop() {
break;
}
}
LOG(INFO) << "Event bus exit.";
LOGI(CORE) << "Event bus exit.";
}

#ifdef UNIT_TEST
Expand Down
20 changes: 10 additions & 10 deletions framework/core/src/cnstream_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ bool CheckStreamEosReached(const std::string &stream_id, bool sync) {
if (iter != s_stream_eos_map_.end()) {
if (iter->second == true) {
s_stream_eos_map_.erase(iter);
// LOG(INFO) << "check stream eos reached, stream_id = " << stream_id;
// LOGI(CORE) << "check stream eos reached, stream_id = " << stream_id;
return true;
}
} else {
Expand Down Expand Up @@ -83,14 +83,14 @@ void SetStreamRemoved(const std::string &stream_id, bool value) {
} else {
s_stream_removed_map_[stream_id] = value;
}
// LOG(INFO) << "_____SetStreamRemoved " << stream_id << ":" << s_stream_removed_map_[stream_id];
// LOGI(CORE) << "_____SetStreamRemoved " << stream_id << ":" << s_stream_removed_map_[stream_id];
}

bool IsStreamRemoved(const std::string &stream_id) {
SpinLockGuard guard(s_remove_spinlock_);
auto iter = s_stream_removed_map_.find(stream_id);
if (iter != s_stream_removed_map_.end()) {
// LOG(INFO) << "_____IsStreamRemoved " << stream_id << ":" << s_stream_removed_map_[stream_id];
// LOGI(CORE) << "_____IsStreamRemoved " << stream_id << ":" << s_stream_removed_map_[stream_id];
return s_stream_removed_map_[stream_id];
}
return false;
Expand All @@ -99,12 +99,12 @@ bool IsStreamRemoved(const std::string &stream_id) {
std::shared_ptr<CNFrameInfo> CNFrameInfo::Create(const std::string& stream_id, bool eos,
std::shared_ptr<CNFrameInfo> payload) {
if (stream_id == "") {
LOG(ERROR) << "CNFrameInfo::Create() stream_id is empty string.";
LOGE(CORE) << "CNFrameInfo::Create() stream_id is empty string.";
return nullptr;
}
std::shared_ptr<CNFrameInfo> ptr(new (std::nothrow) CNFrameInfo());
if (!ptr) {
LOG(ERROR) << "CNFrameInfo::Create() new CNFrameInfo failed.";
LOGE(CORE) << "CNFrameInfo::Create() new CNFrameInfo failed.";
return nullptr;
}
ptr->stream_id = stream_id;
Expand All @@ -123,14 +123,14 @@ std::shared_ptr<CNFrameInfo> CNFrameInfo::Create(const std::string& stream_id, b
auto iter = stream_count_map_.find(stream_id);
if (iter == stream_count_map_.end()) {
stream_count_map_[stream_id] = 1;
// LOG(INFO) << "CNFrameInfo::Create() insert stream_id: " << stream_id;
// LOGI(CORE) << "CNFrameInfo::Create() insert stream_id: " << stream_id;
} else {
int count = stream_count_map_[stream_id];
if (count >= flow_depth_) {
return nullptr;
}
stream_count_map_[stream_id] = count + 1;
// LOG(INFO) << "CNFrameInfo::Create() add count stream_id " << stream_id << ":" << count;
// LOGI(CORE) << "CNFrameInfo::Create() add count stream_id " << stream_id << ":" << count;
}
}
return ptr;
Expand All @@ -155,13 +155,13 @@ CNFrameInfo::~CNFrameInfo() {
--count;
if (count <= 0) {
stream_count_map_.erase(iter);
// LOG(INFO) << "CNFrameInfo::~CNFrameInfo() erase stream_id " << frame.stream_id;
// LOGI(CORE) << "CNFrameInfo::~CNFrameInfo() erase stream_id " << frame.stream_id;
} else {
iter->second = count;
// LOG(INFO) << "CNFrameInfo::~CNFrameInfo() update stream_id " << frame.stream_id << " : " << count;
// LOGI(CORE) << "CNFrameInfo::~CNFrameInfo() update stream_id " << frame.stream_id << " : " << count;
}
} else {
LOG(ERROR) << "Invaid stream_id, please check\n";
LOGE(CORE) << "Invaid stream_id, please check\n";
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions framework/core/src/cnstream_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ bool Module::PostEvent(EventType type, const std::string& msg) {
if (container_) {
return container_->GetEventBus()->PostEvent(event);
} else {
LOG(WARNING) << "[" << GetName() << "] module's container is not set";
LOGW(CORE) << "[" << GetName() << "] module's container is not set";
return false;
}
}
Expand All @@ -109,7 +109,7 @@ bool Module::PostEvent(Event e) {
if (container_) {
return container_->GetEventBus()->PostEvent(e);
} else {
LOG(WARNING) << "[" << GetName() << "] module's container is not set";
LOGW(CORE) << "[" << GetName() << "] module's container is not set";
return false;
}
}
Expand Down
Loading

0 comments on commit ef336c1

Please sign in to comment.