Skip to content

Commit

Permalink
Merge pull request steemit#211 from emfrias/p2p-sync-fixes
Browse files Browse the repository at this point in the history
p2p sync fixes
  • Loading branch information
bytemaster authored Jul 26, 2016
2 parents 68fbfdb + 233461e commit 489c99e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
3 changes: 2 additions & 1 deletion libraries/net/include/graphene/net/peer_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ namespace graphene { namespace net
bool peer_needs_sync_items_from_us;
bool we_need_sync_items_from_peer;
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer
std::set<item_hash_t> sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
fc::time_point_sec last_block_time_delegate_has_seen;
bool inhibit_fetching_sync_blocks;
Expand Down
38 changes: 19 additions & 19 deletions libraries/net/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,8 +1000,8 @@ namespace graphene { namespace net { namespace detail {
dlog( "requesting item ${item_hash} from peer ${endpoint}", ("item_hash", item_to_request )("endpoint", peer->get_remote_endpoint() ) );
item_id item_id_to_request( graphene::net::block_message_type, item_to_request );
_active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
peer->sync_items_requested_from_peer.insert( peer_connection::item_to_time_map_type::value_type(item_id_to_request, fc::time_point::now() ) );
std::vector<item_hash_t> items_to_fetch;
peer->last_sync_item_received_time = fc::time_point::now();
peer->sync_items_requested_from_peer.insert(item_to_request);
peer->send_message( fetch_items_message(item_id_to_request.item_type, std::vector<item_hash_t>{item_id_to_request.item_hash} ) );
}

Expand All @@ -1013,8 +1013,8 @@ namespace graphene { namespace net { namespace detail {
for (const item_hash_t& item_to_request : items_to_request)
{
_active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
item_id item_id_to_request( graphene::net::block_message_type, item_to_request );
peer->sync_items_requested_from_peer.insert( peer_connection::item_to_time_map_type::value_type(item_id_to_request, fc::time_point::now() ) );
peer->last_sync_item_received_time = fc::time_point::now();
peer->sync_items_requested_from_peer.insert(item_to_request);
}
peer->send_message(fetch_items_message(graphene::net::block_message_type, items_to_request));
}
Expand Down Expand Up @@ -1378,14 +1378,14 @@ namespace graphene { namespace net { namespace detail {
else
{
bool disconnect_due_to_request_timeout = false;
for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->sync_items_requested_from_peer)
if (item_and_time.second < active_ignored_request_threshold)
{
wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ${id}",
("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash));
disconnect_due_to_request_timeout = true;
break;
}
if (!active_peer->sync_items_requested_from_peer.empty() &&
active_peer->last_sync_item_received_time < active_ignored_request_threshold)
{
wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests",
("peer", active_peer->get_remote_endpoint())("count", active_peer->sync_items_requested_from_peer.size()));
disconnect_due_to_request_timeout = true;
break;
}
if (!disconnect_due_to_request_timeout &&
active_peer->item_ids_requested_from_peer &&
active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold)
Expand Down Expand Up @@ -1486,7 +1486,7 @@ namespace graphene { namespace net { namespace detail {

if (!_node_is_shutting_down && !_terminate_inactive_connections_loop_done.canceled())
_terminate_inactive_connections_loop_done = fc::schedule( [this](){ terminate_inactive_connections_loop(); },
fc::time_point::now() + fc::seconds(GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT / 2),
fc::time_point::now() + fc::seconds(1),
"terminate_inactive_connections_loop" );
}

Expand Down Expand Up @@ -2775,7 +2775,7 @@ namespace graphene { namespace net { namespace detail {
return;
}

auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(requested_item);
auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(requested_item.item_hash);
if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
{
originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
Expand All @@ -2785,7 +2785,7 @@ namespace graphene { namespace net { namespace detail {
else
disconnect_from_peer(originating_peer, "You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",true,
fc::exception(FC_LOG_MESSAGE(error,"You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",
("item_id",requested_item))));
("item_id", requested_item))));
wlog("Peer doesn't have the requested sync item. This really shouldn't happen");
trigger_fetch_sync_items_loop();
return;
Expand Down Expand Up @@ -2963,8 +2963,8 @@ namespace graphene { namespace net { namespace detail {
// received yet, reschedule them to be fetched from another peer
if (!originating_peer->sync_items_requested_from_peer.empty())
{
for (auto sync_item_and_time : originating_peer->sync_items_requested_from_peer)
_active_sync_requests.erase(sync_item_and_time.first.item_hash);
for (auto sync_item : originating_peer->sync_items_requested_from_peer)
_active_sync_requests.erase(sync_item);
trigger_fetch_sync_items_loop();
}

Expand Down Expand Up @@ -3454,11 +3454,11 @@ namespace graphene { namespace net { namespace detail {
else
{
// not during normal operation. see if we requested it during sync
auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(item_id(graphene::net::block_message_type,
block_message_to_process.block_id));
auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find( block_message_to_process.block_id);
if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
{
originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
originating_peer->last_sync_item_received_time = fc::time_point::now();
_active_sync_requests.erase(block_message_to_process.block_id);
process_block_during_sync(originating_peer, block_message_to_process, message_hash);
if (originating_peer->idle())
Expand Down

0 comments on commit 489c99e

Please sign in to comment.