Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Event loop for pipeline processing #2209

Merged
merged 16 commits into from
Apr 12, 2019

Conversation

lebdron
Copy link
Contributor

@lebdron lebdron commented Apr 2, 2019

Description of the Change

Create a synchronize subject in consensus to queue the events from the network without locking the operation of consensus

  • Patch rxcpp to remove data race and allow using current_thread scheduler with synchronize
  • Add subscriptions for subjects in irohad to manage their lifetime
  • Add round parameter to Yac to close rounds that are less than or equal to current round
  • Add publish().ref_count() to YacGateImpl observable to eliminate redundant calls to flat_map if multiple subscribers are present
  • Fix data race in getSpdlogLogLevel
  • Fix nondeterminism in consensus_sunny_day test 🎉
  • Allow usage of blocking_observable in TestSubscriber

Benefits

Consensus can process messages from the network while the pipeline is running

Possible Drawbacks

A separate thread is allocated to the pipeline

@lebdron lebdron requested review from luckychess and MBoldyrev April 2, 2019 16:42
@@ -1,6 +1,7 @@
add_library(rxcpp INTERFACE IMPORTED)

find_path(rxcpp_INCLUDE_DIR rxcpp/rx.hpp)
# TODO 2019-04-02 lebdron: IR-346 Update Rx commit
#find_path(rxcpp_INCLUDE_DIR rxcpp/rx.hpp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't need this line anymore.

set_target_description(rxcpp "Library for reactive programming" ${URL} ${VERSION})


if (NOT rxcpp_FOUND)
#if (NOT rxcpp_FOUND)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just remove?

TEST_COMMAND "" # remove test step
)
externalproject_get_property(reactive_extensions_rxcpp source_dir)
set(rxcpp_INCLUDE_DIR ${source_dir}/Rx/v2/src)
file(MAKE_DIRECTORY ${rxcpp_INCLUDE_DIR})

add_dependencies(rxcpp reactive_extensions_rxcpp)
endif ()
#endif ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here as well.

@luckychess
Copy link
Contributor

CI confuses me a bit - looks like all tests are passed but it shows failure status.

Copy link
Contributor

@MBoldyrev MBoldyrev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will finish the review later.

@@ -150,13 +172,17 @@ namespace iroha {

network_->sendState(current_leader, {vote});
cluster_order_.switchToNext();
if (cluster_order_.hasNext()) {
auto has_next = cluster_order_.hasNext();
lock.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and round_ btw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can be synchronized with shared lock as optimization, but it will introduce more complexity to code. I am not sure that it is required at the moment.
The accesses you mentioned are covered by lock in onState now.

timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); });
}
}

void Yac::closeRound() {
timer_->deny();
void Yac::closeRound(Round round) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as for me, the closeRound function should better take no round argument, because we do not maintain more than one round at a time. Better would be to call it only if the proposal_round matches the condition here:
https://github.com/hyperledger/iroha/blob/6b97c3e467ca628d3a1e7f44a22b7d3420498813/irohad/consensus/yac/impl/yac.cpp#L247

                  if (proposal_round >= round_) {
                    this->closeRound();
                  }

@@ -98,7 +100,8 @@ namespace iroha {
/**
* Initializes on-demand ordering gate and ordering sevice components
*
* @param max_number_of_transactions maximum number of transaction in a proposal
* @param max_number_of_transactions maximum number of transaction in a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param max_number_of_transactions maximum number of transaction in a
* @param max_number_of_transactions maximum number of transactions in a

return spdlog::level::critical;
default:
BOOST_ASSERT_MSG(false, "Unknown log level!");
return getSpdlogLogLevel(logger::kDefaultLogLevel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a place for stack overflow! if something changes and logger::kDefaultLogLevel will hit default too.. better something like

Suggested change
return getSpdlogLogLevel(logger::kDefaultLogLevel);
return getSpdlogLogLevel(spdlog::level::info);

@MBoldyrev
Copy link
Contributor

also it would be nice to give the patch patch/synchronize-lock-recursion-thread-safety.patch a more meaningful and descriptive name. I would suggest either renaming it with some rxcpp prefix or moving to a directory like patches/external/rxcpp (it other patches to rxcpp are probable).

@lebdron lebdron requested a review from MBoldyrev April 6, 2019 16:56
Signed-off-by: Andrei Lebedev <[email protected]>
@lebdron lebdron merged commit 681dc16 into hyperledger-iroha:develop Apr 12, 2019
@lebdron lebdron deleted the refactor/yac-threads branch April 12, 2019 09:21
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants