Skip to content

Commit

Permalink
Merge pull request hyperledger-iroha#385 from hyperledger/feature/net…
Browse files Browse the repository at this point in the history
…work-stubs

Network stubs
  • Loading branch information
grimadas authored Jun 26, 2017
2 parents a6b1f9e + 7d104e7 commit 807c006
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 65 deletions.
20 changes: 10 additions & 10 deletions cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -466,24 +466,24 @@ set_target_properties(gflags PROPERTIES
add_dependencies(gflags gflags_gflags)

##########################
# observable #
# rx c++ #
##########################

ExternalProject_Add(ddinu_observable
GIT_REPOSITORY "https://github.com/ddinu/observable"
ExternalProject_Add(reactive_extensions_rxcpp
GIT_REPOSITORY "https://github.com/Reactive-Extensions/RxCpp"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND "" # remove install step
UPDATE_COMMAND "" # remove update step
TEST_COMMAND "" # remove test step
)
ExternalProject_Get_Property(ddinu_observable source_dir)
set(OBSERVABLE_INCLUDE_DIRS ${source_dir}/include)
file(MAKE_DIRECTORY ${OBSERVABLE_INCLUDE_DIRS})
ExternalProject_Get_Property(reactive_extensions_rxcpp source_dir binary_dir)
set(RXCPP_INCLUDE_DIRS ${source_dir}/Rx/v2/src)
file(MAKE_DIRECTORY ${RXCPP_INCLUDE_DIRS})

add_library(observable INTERFACE IMPORTED)
set_target_properties(observable PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${OBSERVABLE_INCLUDE_DIRS}
add_library(rxcpp INTERFACE IMPORTED)
set_target_properties(rxcpp PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${RXCPP_INCLUDE_DIRS}
)

add_dependencies(observable ddinu_observable)
add_dependencies(rxcpp reactive_extensions_rxcpp)
1 change: 1 addition & 0 deletions irohad/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ add_subdirectory(main)
add_subdirectory(ordering)
add_subdirectory(peer_service)
add_subdirectory(validation)
add_subdirectory(network)
7 changes: 3 additions & 4 deletions irohad/ametsuchi/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

#include <cstdint>
#include <memory>
#include <vector>
#include <string>
#include <nonstd/optional.hpp>
#include <string>
#include <vector>

namespace iroha {

Expand All @@ -49,8 +49,7 @@ namespace iroha {
// TODO merge into one method?
virtual nonstd::optional<uint64_t> get_block_id_by_tx_hash(
const std::string &hash) = 0;
virtual nonstd::optional<uint64_t> get_tx_id(
const std::string &hash) = 0;
virtual nonstd::optional<uint64_t> get_tx_id(const std::string &hash) = 0;
virtual nonstd::optional<uint64_t> last_block_id_index() = 0;

virtual std::vector<std::string> get_tx_hash(
Expand Down
8 changes: 8 additions & 0 deletions irohad/network/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
add_library(networking
impl/network_api.cpp
)

target_link_libraries(networking PUBLIC
rxcpp
dao
)
17 changes: 17 additions & 0 deletions irohad/network/impl/network_api.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
Copyright Soramitsu Co., Ltd. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <network/network_api.h>
87 changes: 87 additions & 0 deletions irohad/network/network_api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright Soramitsu Co., Ltd. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef IROHA_NETWORK_H
#define IROHA_NETWORK_H

#include "dao/dao.hpp"
#include "rxcpp/rx-observable.hpp"

namespace iroha {
namespace network {

/**
* Interface provides methods for fetching consensus-related data.
*/
class ConsensusListener {
public:
/**
* Event is triggered when proposal arrives from network.
* @return observable with Proposals.
* (List of Proposals)
*/
virtual rxcpp::observable<iroha::dao::Proposal> on_proposal() = 0;

/**
* Event is triggered when commit block arrives.
* @return observable with sequence of committed blocks.
* In common case observable<Block> will contain one element.
* But there are scenarios when consensus provide many blocks, e.g.
* on peer startup - peer will get all actual blocks.
*/
virtual rxcpp::observable<rxcpp::observable<iroha::dao::Block>>
on_commit() = 0;
};

/**
* Interface for downloading blocks from a network
*/
class BlockLoaderApi {
public:
/**
* Method requests missed blocks from external peer starting from it's top
* block.
* Note, that blocks will be in order: from the newest
* to your actual top block.
* This order is required for verify blocks before storing in a ledger.
* @param target_peer - peer for requesting blocks
* @param topBlock - your last actual block
* @return observable with blocks
*/
virtual rxcpp::observable<iroha::dao::Block> requestBlocks(
iroha::dao::Peer &target_peer, iroha::dao::Block &topBlock) = 0;
};

/**
* Interface for propagating transaction in a network
*/
class TransactionPropagator {
public:
/**
* Method spreads transaction to other members of a network
* @param tx - transaction for propagation
*/
virtual void propagate_transaction(iroha::dao::Transaction &tx) = 0;
};

/**
* Public API interface for communication between current peer and other
* peers in a network
*/
class PeerCommunicationService : public TransactionPropagator,
public ConsensusListener {};
}
}
#endif // IROHA_NETWORK_H
1 change: 1 addition & 0 deletions irohad/validation/stateful/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ ADD_LIBRARY(stateful_validator STATIC validator.cpp)

target_link_libraries(stateful_validator
schema
optional
)
1 change: 1 addition & 0 deletions libs/dao/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ add_library(dao

target_link_libraries(dao
schema
optional
)
4 changes: 4 additions & 0 deletions libs/dao/dao.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include "transaction.hpp"
#include "asset.hpp"
#include "account.hpp"
#include "peer.hpp"
#include "singature.hpp"
#include "dao_crypto_provider.hpp"
#include "dao_hash_provider.hpp"

/**
* DAO - Data Access Object.
Expand Down
2 changes: 1 addition & 1 deletion libs/dao/peer.h → libs/dao/peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace iroha {
/**
* Public key of peer
*/
const iroha::ed25519::pubkey_t pubkey;
const iroha::crypto::ed25519::pubkey_t pubkey;

/*
* Peer account
Expand Down
6 changes: 3 additions & 3 deletions test/module/vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ target_link_libraries(uvw_test
uvw
)

AddTest(observable_test observable_test.cpp)
target_link_libraries(observable_test
observable
AddTest(rxcpp_test rxcpp_test.cpp)
target_link_libraries(rxcpp_test
rxcpp
)
47 changes: 0 additions & 47 deletions test/module/vendor/observable_test.cpp

This file was deleted.

110 changes: 110 additions & 0 deletions test/module/vendor/rxcpp_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright Soramitsu Co., Ltd. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <gtest/gtest.h>

#include "rxcpp/rx.hpp"
namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
}
using namespace Rx;

#include <regex>
#include <random>
using namespace std;
using namespace std::chrono;

TEST(rxcppTest, usage_observable_test) {
random_device rd; // non-deterministic generator
mt19937 gen(rd());
uniform_int_distribution<> dist(4, 18);

// for testing purposes, produce byte stream that from lines of text
auto bytes = range(0, 10) |
flat_map([&](int i){
auto body = from((uint8_t)('A' + i)) |
repeat(dist(gen)) |
as_dynamic();
auto delim = from((uint8_t)'\r');
return from(body, delim) | concat();
}) |
window(17) |
flat_map([](observable<uint8_t> w){
return w |
reduce(
vector<uint8_t>(),
[](vector<uint8_t> v, uint8_t b){
v.push_back(b);
return v;
}) |
as_dynamic();
}) |
tap([](vector<uint8_t>& v){
// print input packet of bytes
copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
cout << endl;
});

//
// recover lines of text from byte stream
//

auto removespaces = [](string s){
s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
return s;
};

// create strings split on \r
auto strings = bytes |
concat_map([](vector<uint8_t> v){
string s(v.begin(), v.end());
regex delim(R"/(\r)/");
cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
cregex_token_iterator end;
vector<string> splits(cursor, end);
return iterate(move(splits));
}) |
filter([](const string& s){
return !s.empty();
}) |
publish() |
ref_count();

// filter to last string in each line
auto closes = strings |
filter(
[](const string& s){
return s.back() == '\r';
}) |
Rx::map([](const string&){return 0;});

// group strings by line
auto linewindows = strings |
window_toggle(closes | start_with(0), [=](int){return closes;});

// reduce the strings for a line into one string
auto lines = linewindows |
flat_map([&](observable<string> w) {
return w | start_with<string>("") | sum() | Rx::map(removespaces);
});

// print result
lines |
subscribe<string>(println(cout));
}

0 comments on commit 807c006

Please sign in to comment.