Skip to content

Commit

Permalink
add abe's changes
Browse files Browse the repository at this point in the history
  • Loading branch information
takemiyamakoto committed Jun 16, 2017
1 parent 59ed936 commit a94213f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 41 deletions.
79 changes: 44 additions & 35 deletions irohad/consensus/sumeragi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <connection/consensus/service.hpp>
#include <connection/consensus/client.hpp>
#include <logger/logger.hpp>
#include <peer_service/self_state.hpp>
#include <peer_service/monitor.hpp>
#include <common/timer.hpp>
#include <common/datetime.hpp>
#include <thread_pool.hpp>
Expand Down Expand Up @@ -47,7 +49,6 @@ namespace consensus {
using iroha::protocol::Block;
using iroha::protocol::Signature;

connection::consensus::SumeragiClient sender;
logger::Logger log("sumeragi");

static ThreadPool pool(ThreadPoolOptions{
Expand All @@ -65,29 +66,56 @@ namespace consensus {
if ( /*check is_committed*/ false) {

} else {
// send processTransaction(event) as a task to processing pool
// send processBlock(block) as a task to processing pool
// this returns std::future<void> object
// (std::future).get() method locks processing until result of
// processTransaction will be available but processTransaction returns
// processBlock will be available but processBlock returns
// void, so we don't have to call it and wait
// std::function<void()>&& task =
// std::bind(processTransaction, std::move(event));
// pool.process(std::move(task));

std::function<void()> &&task = std::bind(processBlock, block);
pool.process(std::move(task));
}
});
}

size_t getMaxFaulty() {
return (size_t)peer_service::monitor::getActivePeerSize() / 3;
}

size_t getNumValidatingPeers() {
return getMaxFaulty() * 2 + 1;
}

bool unicast(const iroha::protocol::Block& block, size_t peerOrder) {
auto peer = peer_service::monitor::getActivePeerAt((unsigned int)peerOrder);
connection::consensus::SumeragiClient client(peer->ip_, 50051); // TODO: Get port from config
auto response = client.Verify(block);
return response.code() == iroha::protocol::ResponseCode::OK;
}

bool leaderMulticast(const iroha::protocol::Block& block) {
auto peerSize = getNumValidatingPeers();
for (size_t i = 0; i < peerSize; i++) {
unicast(block, i); // Currently, return value is not used.
}
return true;
}

bool commit(const iroha::protocol::Block& block) {
auto peerSize = (size_t)peer_service::monitor::getActivePeerSize();
for (size_t i = 0; i < peerSize; i++) {
unicast(block, i); // Currently, return value is not used.
}
return true;
}

// TODO: Append block to db and calc merkle root.
std::string appendBlock(const Block &block) {
return std::string();
}

Block createSignedBlock(const Block &block, const std::string &merkleRoot) {
auto pk = "pk"; // TODO: peer service
auto sk = "sk";
auto pk = peer_service::self_state::getPublicKey();
auto sk = peer_service::self_state::getPrivateKey();

auto sigblob = crypto::signature::sign(merkleRoot, pk, sk);
std::string str_sigblob;
Expand All @@ -105,25 +133,6 @@ namespace consensus {
return ret;
}

bool isLeader(const Block &block) {
//auto validLeader = true; // TODO: Use peer service
auto validNumOfSignatures = block.header().peer_signature().size() == 1;
if (validNumOfSignatures) return true;
return false;
}

size_t getMaxFaulty() {
return 3;/*peer::service::getActivePeerList().size() / 3; */ // TODO: Peer service
}

size_t getNumValidatingPeers() {
return getMaxFaulty() * 2 + 1;
}

size_t getNumAllPeers() {
return 4; // TODO: peer service
}

void setTimeOutCommit(const Block &block) {
timer::setAwkTimerForCurrentThread(3000, [block] {
panic(block);
Expand All @@ -136,8 +145,8 @@ namespace consensus {
*/

int getNextOrder() {
static int currentProxyTail = static_cast<int>(getNumValidatingPeers()) - 1;
if (currentProxyTail >= getNumAllPeers()) {
thread_local int currentProxyTail = static_cast<int>(getNumValidatingPeers()) - 1;
if (currentProxyTail >= peer_service::monitor::getActivePeerSize()) {
return -1;
}
return currentProxyTail++;
Expand Down Expand Up @@ -175,8 +184,8 @@ namespace consensus {
auto merkleRoot = appendBlock(block);
auto newBlock = createSignedBlock(block, merkleRoot);

if (isLeader(newBlock)) {
sender.broadCast(newBlock);
if (peer_service::self_state::isLeader()) {
leaderMulticast(newBlock);
setTimeOutCommit(newBlock);
return;
}
Expand All @@ -189,11 +198,11 @@ namespace consensus {
log.error("getNextOrder() < 0 in processBlock");
return;
}
sender.unicast(newBlock, static_cast<size_t>(next));
unicast(newBlock, static_cast<size_t>(next));
setTimeOutCommit(newBlock);
} else {
if (numValidSignatures == getNumValidatingPeers()) {
sender.commit(newBlock);
commit(newBlock);
setTimeOutCommit(newBlock);
}
}
Expand Down Expand Up @@ -225,7 +234,7 @@ namespace consensus {
log.info("否認");
return;
}
sender.unicast(block, static_cast<size_t>(next));
unicast(block, static_cast<size_t>(next));
setTimeOutCommit(block);
}

Expand Down
17 changes: 12 additions & 5 deletions irohad/ordering/connection/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#include <grpc++/grpc++.h>
#include <endpoint.grpc.pb.h>

#include "client.hpp"
Expand All @@ -28,11 +29,17 @@ namespace ordering {
// TODO
}

QueueTransactionResponse* OrderingClient::QueueTransaction(
const iroha::protocol::Transaction& request) {
auto response = new QueueTransactionResponse();
OrderingClient::OrderingClient(const std::string &ip, int port) {
auto channel = grpc::CreateChannel(ip + ":" + std::to_string(port), grpc::InsecureChannelCredentials());
stub_ = iroha::protocol::OrderingService::NewStub(channel);
}

QueueTransactionResponse OrderingClient::QueueTransaction(
const iroha::protocol::Transaction& tx) {
QueueTransactionResponse response;
stub_->QueueTransaction(&context_, tx, &response);
return response;
}

} // namespace consensus
} // namespace connection
} // namespace connection
} // namespace ordering
2 changes: 1 addition & 1 deletion schema/endpoint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ service QueryService {
}

service SumeragiService {
rpc Verify (Block) returns (QueueTransactionResponse);
rpc Verify (Block) returns (VerifyResponse);
}

service OrderingService {
Expand Down

0 comments on commit a94213f

Please sign in to comment.