-
Notifications
You must be signed in to change notification settings - Fork 296
Event loop for pipeline processing #2209
Event loop for pipeline processing #2209
Conversation
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
cmake/Modules/Findrxcpp.cmake
Outdated
@@ -1,6 +1,7 @@ | |||
add_library(rxcpp INTERFACE IMPORTED) | |||
|
|||
find_path(rxcpp_INCLUDE_DIR rxcpp/rx.hpp) | |||
# TODO 2019-04-02 lebdron: IR-346 Update Rx commit | |||
#find_path(rxcpp_INCLUDE_DIR rxcpp/rx.hpp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't need this line anymore.
cmake/Modules/Findrxcpp.cmake
Outdated
set_target_description(rxcpp "Library for reactive programming" ${URL} ${VERSION}) | ||
|
||
|
||
if (NOT rxcpp_FOUND) | ||
#if (NOT rxcpp_FOUND) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just remove?
cmake/Modules/Findrxcpp.cmake
Outdated
TEST_COMMAND "" # remove test step | ||
) | ||
externalproject_get_property(reactive_extensions_rxcpp source_dir) | ||
set(rxcpp_INCLUDE_DIR ${source_dir}/Rx/v2/src) | ||
file(MAKE_DIRECTORY ${rxcpp_INCLUDE_DIR}) | ||
|
||
add_dependencies(rxcpp reactive_extensions_rxcpp) | ||
endif () | ||
#endif () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here as well.
CI confuses me a bit - looks like all tests are passed but it shows failure status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will finish the review later.
@@ -150,13 +172,17 @@ namespace iroha { | |||
|
|||
network_->sendState(current_leader, {vote}); | |||
cluster_order_.switchToNext(); | |||
if (cluster_order_.hasNext()) { | |||
auto has_next = cluster_order_.hasNext(); | |||
lock.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should not we synchronize the other accesses to cluster_order_
for example with shared locks?
https://github.com/hyperledger/iroha/blob/6b97c3e467ca628d3a1e7f44a22b7d3420498813/irohad/consensus/yac/impl/yac.cpp#L190
https://github.com/hyperledger/iroha/blob/6b97c3e467ca628d3a1e7f44a22b7d3420498813/irohad/consensus/yac/impl/yac.cpp#L203
https://github.com/hyperledger/iroha/blob/6b97c3e467ca628d3a1e7f44a22b7d3420498813/irohad/consensus/yac/impl/yac.cpp#L286
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and round_
btw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They can be synchronized with shared lock as optimization, but it will introduce more complexity to code. I am not sure that it is required at the moment.
The accesses you mentioned are covered by lock in onState
now.
irohad/consensus/yac/impl/yac.cpp
Outdated
timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); }); | ||
} | ||
} | ||
|
||
void Yac::closeRound() { | ||
timer_->deny(); | ||
void Yac::closeRound(Round round) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as for me, the closeRound
function should better take no round
argument, because we do not maintain more than one round at a time. Better would be to call it only if the proposal_round
matches the condition here:
https://github.com/hyperledger/iroha/blob/6b97c3e467ca628d3a1e7f44a22b7d3420498813/irohad/consensus/yac/impl/yac.cpp#L247
if (proposal_round >= round_) {
this->closeRound();
}
@@ -98,7 +100,8 @@ namespace iroha { | |||
/** | |||
* Initializes on-demand ordering gate and ordering sevice components | |||
* | |||
* @param max_number_of_transactions maximum number of transaction in a proposal | |||
* @param max_number_of_transactions maximum number of transaction in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param max_number_of_transactions maximum number of transaction in a | |
* @param max_number_of_transactions maximum number of transactions in a |
libs/logger/logger_spdlog.cpp
Outdated
return spdlog::level::critical; | ||
default: | ||
BOOST_ASSERT_MSG(false, "Unknown log level!"); | ||
return getSpdlogLogLevel(logger::kDefaultLogLevel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a place for stack overflow! if something changes and logger::kDefaultLogLevel
will hit default
too.. better something like
return getSpdlogLogLevel(logger::kDefaultLogLevel); | |
return getSpdlogLogLevel(spdlog::level::info); |
also it would be nice to give the patch |
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Signed-off-by: Andrei Lebedev <[email protected]>
Description of the Change
Create a
synchronize
subject in consensus to queue the events from the network without locking the operation of consensuscurrent_thread
scheduler withsynchronize
publish().ref_count()
to YacGateImpl observable to eliminate redundant calls toflat_map
if multiple subscribers are presentgetSpdlogLogLevel
consensus_sunny_day
test 🎉blocking_observable
in TestSubscriberBenefits
Consensus can process messages from the network while the pipeline is running
Possible Drawbacks
A separate thread is allocated to the pipeline