From a94213fb8af13daf2b28f9ddf9bd18d04b21c4bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AD=A6=E5=AE=AE=E8=AA=A0?= Date: Fri, 16 Jun 2017 12:35:00 +0900 Subject: [PATCH] add abe's changes --- irohad/consensus/sumeragi.cpp | 79 +++++++++++++++------------ irohad/ordering/connection/client.cpp | 17 ++++-- schema/endpoint.proto | 2 +- 3 files changed, 57 insertions(+), 41 deletions(-) diff --git a/irohad/consensus/sumeragi.cpp b/irohad/consensus/sumeragi.cpp index 01377a12c3..0255255129 100644 --- a/irohad/consensus/sumeragi.cpp +++ b/irohad/consensus/sumeragi.cpp @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include #include #include @@ -47,7 +49,6 @@ namespace consensus { using iroha::protocol::Block; using iroha::protocol::Signature; - connection::consensus::SumeragiClient sender; logger::Logger log("sumeragi"); static ThreadPool pool(ThreadPoolOptions{ @@ -65,29 +66,56 @@ namespace consensus { if ( /*check is_committed*/ false) { } else { - // send processTransaction(event) as a task to processing pool + // send processBlock(block) as a task to processing pool // this returns std::future object // (std::future).get() method locks processing until result of - // processTransaction will be available but processTransaction returns + // processBlock will be available but processBlock returns // void, so we don't have to call it and wait - // std::function&& task = - // std::bind(processTransaction, std::move(event)); - // pool.process(std::move(task)); - std::function &&task = std::bind(processBlock, block); pool.process(std::move(task)); } }); } + size_t getMaxFaulty() { + return (size_t)peer_service::monitor::getActivePeerSize() / 3; + } + + size_t getNumValidatingPeers() { + return getMaxFaulty() * 2 + 1; + } + + bool unicast(const iroha::protocol::Block& block, size_t peerOrder) { + auto peer = peer_service::monitor::getActivePeerAt((unsigned int)peerOrder); + connection::consensus::SumeragiClient client(peer->ip_, 50051); // TODO: Get port from config + auto response = client.Verify(block); + return response.code() == iroha::protocol::ResponseCode::OK; + } + + bool leaderMulticast(const iroha::protocol::Block& block) { + auto peerSize = getNumValidatingPeers(); + for (size_t i = 0; i < peerSize; i++) { + unicast(block, i); // Currently, return value is not used. + } + return true; + } + + bool commit(const iroha::protocol::Block& block) { + auto peerSize = (size_t)peer_service::monitor::getActivePeerSize(); + for (size_t i = 0; i < peerSize; i++) { + unicast(block, i); // Currently, return value is not used. + } + return true; + } + // TODO: Append block to db and calc merkle root. std::string appendBlock(const Block &block) { return std::string(); } Block createSignedBlock(const Block &block, const std::string &merkleRoot) { - auto pk = "pk"; // TODO: peer service - auto sk = "sk"; + auto pk = peer_service::self_state::getPublicKey(); + auto sk = peer_service::self_state::getPrivateKey(); auto sigblob = crypto::signature::sign(merkleRoot, pk, sk); std::string str_sigblob; @@ -105,25 +133,6 @@ namespace consensus { return ret; } - bool isLeader(const Block &block) { - //auto validLeader = true; // TODO: Use peer service - auto validNumOfSignatures = block.header().peer_signature().size() == 1; - if (validNumOfSignatures) return true; - return false; - } - - size_t getMaxFaulty() { - return 3;/*peer::service::getActivePeerList().size() / 3; */ // TODO: Peer service - } - - size_t getNumValidatingPeers() { - return getMaxFaulty() * 2 + 1; - } - - size_t getNumAllPeers() { - return 4; // TODO: peer service - } - void setTimeOutCommit(const Block &block) { timer::setAwkTimerForCurrentThread(3000, [block] { panic(block); @@ -136,8 +145,8 @@ namespace consensus { */ int getNextOrder() { - static int currentProxyTail = static_cast(getNumValidatingPeers()) - 1; - if (currentProxyTail >= getNumAllPeers()) { + thread_local int currentProxyTail = static_cast(getNumValidatingPeers()) - 1; + if (currentProxyTail >= peer_service::monitor::getActivePeerSize()) { return -1; } return currentProxyTail++; @@ -175,8 +184,8 @@ namespace consensus { auto merkleRoot = appendBlock(block); auto newBlock = createSignedBlock(block, merkleRoot); - if (isLeader(newBlock)) { - sender.broadCast(newBlock); + if (peer_service::self_state::isLeader()) { + leaderMulticast(newBlock); setTimeOutCommit(newBlock); return; } @@ -189,11 +198,11 @@ namespace consensus { log.error("getNextOrder() < 0 in processBlock"); return; } - sender.unicast(newBlock, static_cast(next)); + unicast(newBlock, static_cast(next)); setTimeOutCommit(newBlock); } else { if (numValidSignatures == getNumValidatingPeers()) { - sender.commit(newBlock); + commit(newBlock); setTimeOutCommit(newBlock); } } @@ -225,7 +234,7 @@ namespace consensus { log.info("否認"); return; } - sender.unicast(block, static_cast(next)); + unicast(block, static_cast(next)); setTimeOutCommit(block); } diff --git a/irohad/ordering/connection/client.cpp b/irohad/ordering/connection/client.cpp index 424e4cc1bc..96192be1d7 100644 --- a/irohad/ordering/connection/client.cpp +++ b/irohad/ordering/connection/client.cpp @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +#include #include #include "client.hpp" @@ -28,11 +29,17 @@ namespace ordering { // TODO } - QueueTransactionResponse* OrderingClient::QueueTransaction( - const iroha::protocol::Transaction& request) { - auto response = new QueueTransactionResponse(); + OrderingClient::OrderingClient(const std::string &ip, int port) { + auto channel = grpc::CreateChannel(ip + ":" + std::to_string(port), grpc::InsecureChannelCredentials()); + stub_ = iroha::protocol::OrderingService::NewStub(channel); + } + + QueueTransactionResponse OrderingClient::QueueTransaction( + const iroha::protocol::Transaction& tx) { + QueueTransactionResponse response; + stub_->QueueTransaction(&context_, tx, &response); return response; } - } // namespace consensus -} // namespace connection + } // namespace connection +} // namespace ordering \ No newline at end of file diff --git a/schema/endpoint.proto b/schema/endpoint.proto index 9ba1d5445b..83c43172d5 100644 --- a/schema/endpoint.proto +++ b/schema/endpoint.proto @@ -30,7 +30,7 @@ service QueryService { } service SumeragiService { - rpc Verify (Block) returns (QueueTransactionResponse); + rpc Verify (Block) returns (VerifyResponse); } service OrderingService {