Skip to content

Commit

Permalink
Validator patch: state download, adnl stats (ton-blockchain#1257)
Browse files Browse the repository at this point in the history
* Persistent state download improvements

1) Don't start over on restart
2) Download shards one at a time to reduce RAM usage
3) More logs

* Remove old peers from adnl stats
  • Loading branch information
SpyCheese authored Oct 9, 2024
1 parent 1da94e6 commit b69214b
Show file tree
Hide file tree
Showing 19 changed files with 135 additions and 76 deletions.
27 changes: 16 additions & 11 deletions adnl/adnl-local-id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ void AdnlLocalId::update_packet(AdnlPacket packet, bool update_id, bool sign, td
}
}

void AdnlLocalId::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise) {
void AdnlLocalId::get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise) {
auto stats = create_tl_object<ton_api::adnl_stats_localId>();
stats->short_id_ = short_id_.bits256_value();
for (auto &[ip, x] : inbound_rate_limiter_) {
Expand All @@ -317,22 +317,22 @@ void AdnlLocalId::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localI
}
prepare_packet_stats();
stats->packets_recent_ = packet_stats_prev_.tl();
stats->packets_total_ = packet_stats_total_.tl();
stats->packets_total_ = packet_stats_total_.tl(all);
stats->packets_total_->ts_start_ = (double)Adnl::adnl_start_time();
stats->packets_total_->ts_end_ = td::Clocks::system();
promise.set_result(std::move(stats));
}

void AdnlLocalId::add_decrypted_packet_stats(td::IPAddress addr) {
prepare_packet_stats();
++packet_stats_cur_.decrypted_packets[addr];
++packet_stats_total_.decrypted_packets[addr];
packet_stats_cur_.decrypted_packets[addr].inc();
packet_stats_total_.decrypted_packets[addr].inc();
}

void AdnlLocalId::add_dropped_packet_stats(td::IPAddress addr) {
prepare_packet_stats();
++packet_stats_cur_.dropped_packets[addr];
++packet_stats_total_.dropped_packets[addr];
packet_stats_cur_.dropped_packets[addr].inc();
packet_stats_total_.dropped_packets[addr].inc();
}

void AdnlLocalId::prepare_packet_stats() {
Expand All @@ -351,17 +351,22 @@ void AdnlLocalId::prepare_packet_stats() {
}
}

tl_object_ptr<ton_api::adnl_stats_localIdPackets> AdnlLocalId::PacketStats::tl() const {
tl_object_ptr<ton_api::adnl_stats_localIdPackets> AdnlLocalId::PacketStats::tl(bool all) const {
double threshold = all ? -1.0 : td::Clocks::system() - 600.0;
auto obj = create_tl_object<ton_api::adnl_stats_localIdPackets>();
obj->ts_start_ = ts_start;
obj->ts_end_ = ts_end;
for (const auto &[ip, packets] : decrypted_packets) {
obj->decrypted_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets));
if (packets.last_packet_ts >= threshold) {
obj->decrypted_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets.packets));
}
}
for (const auto &[ip, packets] : dropped_packets) {
obj->dropped_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets));
if (packets.last_packet_ts >= threshold) {
obj->dropped_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets.packets));
}
}
return obj;
}
Expand Down
18 changes: 14 additions & 4 deletions adnl/adnl-local-id.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class AdnlLocalId : public td::actor::Actor {
void update_packet(AdnlPacket packet, bool update_id, bool sign, td::int32 update_addr_list_if,
td::int32 update_priority_addr_list_if, td::Promise<AdnlPacket> promise);

void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise);
void get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise);

td::uint32 get_mode() {
return mode_;
Expand Down Expand Up @@ -111,10 +111,20 @@ class AdnlLocalId : public td::actor::Actor {
std::map<td::IPAddress, InboundRateLimiter> inbound_rate_limiter_;
struct PacketStats {
double ts_start = 0.0, ts_end = 0.0;
std::map<td::IPAddress, td::uint64> decrypted_packets;
std::map<td::IPAddress, td::uint64> dropped_packets;

tl_object_ptr<ton_api::adnl_stats_localIdPackets> tl() const;
struct Counter {
td::uint64 packets = 0;
double last_packet_ts = 0.0;

void inc() {
++packets;
last_packet_ts = td::Clocks::system();
}
};
std::map<td::IPAddress, Counter> decrypted_packets;
std::map<td::IPAddress, Counter> dropped_packets;

tl_object_ptr<ton_api::adnl_stats_localIdPackets> tl(bool all = true) const;
} packet_stats_cur_, packet_stats_prev_, packet_stats_total_;
void add_decrypted_packet_stats(td::IPAddress addr);
void add_dropped_packet_stats(td::IPAddress addr);
Expand Down
6 changes: 3 additions & 3 deletions adnl/adnl-peer-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ void AdnlPeerTableImpl::get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_
td::actor::send_closure(it->second, &AdnlPeer::get_conn_ip_str, l_id, std::move(promise));
}

void AdnlPeerTableImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) {
void AdnlPeerTableImpl::get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) {
class Cb : public td::actor::Actor {
public:
explicit Cb(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) : promise_(std::move(promise)) {
Expand Down Expand Up @@ -440,7 +440,7 @@ void AdnlPeerTableImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>

for (auto &[id, local_id] : local_ids_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(local_id.local_id, &AdnlLocalId::get_stats,
td::actor::send_closure(local_id.local_id, &AdnlLocalId::get_stats, all,
[id = id, callback](td::Result<tl_object_ptr<ton_api::adnl_stats_localId>> R) {
if (R.is_error()) {
VLOG(ADNL_NOTICE)
Expand All @@ -454,7 +454,7 @@ void AdnlPeerTableImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>
for (auto &[id, peer] : peers_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(
peer, &AdnlPeer::get_stats,
peer, &AdnlPeer::get_stats, all,
[id = id, callback](td::Result<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> R) {
if (R.is_error()) {
VLOG(ADNL_NOTICE) << "failed to get stats for peer " << id << " : " << R.move_as_error();
Expand Down
2 changes: 1 addition & 1 deletion adnl/adnl-peer-table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) override;
void get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;

void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) override;
void get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) override;

struct PrintId {};
PrintId print_id() const {
Expand Down
18 changes: 14 additions & 4 deletions adnl/adnl-peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,15 @@ void AdnlPeerPairImpl::get_conn_ip_str(td::Promise<td::string> promise) {
promise.set_value("undefined");
}

void AdnlPeerPairImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) {
void AdnlPeerPairImpl::get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) {
if (!all) {
double threshold = td::Clocks::system() - 600.0;
if (last_in_packet_ts_ < threshold && last_out_packet_ts_ < threshold) {
promise.set_value(nullptr);
return;
}
}

auto stats = create_tl_object<ton_api::adnl_stats_peerPair>();
stats->local_id_ = local_id_.bits256_value();
stats->peer_id_ = peer_id_short_.bits256_value();
Expand Down Expand Up @@ -993,15 +1001,17 @@ void AdnlPeerImpl::update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_m
td::actor::send_closure(it->second, &AdnlPeerPair::update_addr_list, std::move(addr_list));
}

void AdnlPeerImpl::get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) {
void AdnlPeerImpl::get_stats(bool all, td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) {
class Cb : public td::actor::Actor {
public:
explicit Cb(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise)
: promise_(std::move(promise)) {
}

void got_peer_pair_stats(tl_object_ptr<ton_api::adnl_stats_peerPair> peer_pair) {
result_.push_back(std::move(peer_pair));
if (peer_pair) {
result_.push_back(std::move(peer_pair));
}
dec_pending();
}

Expand All @@ -1027,7 +1037,7 @@ void AdnlPeerImpl::get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl

for (auto &[local_id, peer_pair] : peer_pairs_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(peer_pair, &AdnlPeerPair::get_stats,
td::actor::send_closure(peer_pair, &AdnlPeerPair::get_stats, all,
[local_id = local_id, peer_id = peer_id_short_,
callback](td::Result<tl_object_ptr<ton_api::adnl_stats_peerPair>> R) {
if (R.is_error()) {
Expand Down
4 changes: 2 additions & 2 deletions adnl/adnl-peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AdnlPeerPair : public td::actor::Actor {
virtual void update_peer_id(AdnlNodeIdFull id) = 0;
virtual void update_addr_list(AdnlAddressList addr_list) = 0;
virtual void get_conn_ip_str(td::Promise<td::string> promise) = 0;
virtual void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) = 0;
virtual void get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) = 0;

static td::actor::ActorOwn<AdnlPeerPair> create(td::actor::ActorId<AdnlNetworkManager> network_manager,
td::actor::ActorId<AdnlPeerTable> peer_table, td::uint32 local_mode,
Expand Down Expand Up @@ -101,7 +101,7 @@ class AdnlPeer : public td::actor::Actor {
td::actor::ActorId<AdnlLocalId> local_actor, AdnlAddressList addr_list) = 0;
virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) = 0;
virtual void get_conn_ip_str(AdnlNodeIdShort l_id, td::Promise<td::string> promise) = 0;
virtual void get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) = 0;
virtual void get_stats(bool all, td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) = 0;
};

} // namespace adnl
Expand Down
4 changes: 2 additions & 2 deletions adnl/adnl-peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class AdnlPeerPairImpl : public AdnlPeerPair {
void update_peer_id(AdnlNodeIdFull id) override;

void get_conn_ip_str(td::Promise<td::string> promise) override;
void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) override;
void get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) override;

void got_data_from_db(td::Result<AdnlDbItem> R);
void got_data_from_static_nodes(td::Result<AdnlNode> R);
Expand Down Expand Up @@ -302,7 +302,7 @@ class AdnlPeerImpl : public AdnlPeer {
AdnlAddressList addr_list) override;
void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) override;
void get_conn_ip_str(AdnlNodeIdShort l_id, td::Promise<td::string> promise) override;
void get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) override;
void get_stats(bool all, td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) override;
//void check_signature(td::BufferSlice data, td::BufferSlice signature, td::Promise<td::Unit> promise) override;

AdnlPeerImpl(td::actor::ActorId<AdnlNetworkManager> network_manager, td::actor::ActorId<AdnlPeerTable> peer_table,
Expand Down
2 changes: 1 addition & 1 deletion adnl/adnl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class Adnl : public AdnlSenderInterface {
virtual void create_tunnel(AdnlNodeIdShort dst, td::uint32 size,
td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) = 0;

virtual void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) = 0;
virtual void get_stats(bool all, td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) = 0;

static td::actor::ActorOwn<Adnl> create(std::string db, td::actor::ActorId<keyring::Keyring> keyring);

Expand Down
2 changes: 1 addition & 1 deletion tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ engine.validator.setStateSerializerEnabled enabled:Bool = engine.validator.Succe
engine.validator.setCollatorOptionsJson json:string = engine.validator.Success;
engine.validator.getCollatorOptionsJson = engine.validator.JsonConfig;

engine.validator.getAdnlStats = adnl.Stats;
engine.validator.getAdnlStats all:Bool = adnl.Stats;
engine.validator.getActorTextStats = engine.validator.TextStats;

---types---
Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
20 changes: 18 additions & 2 deletions validator-engine-console/validator-engine-console-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1306,13 +1306,21 @@ td::Status GetCollatorOptionsJsonQuery::receive(td::BufferSlice data) {

td::Status GetAdnlStatsJsonQuery::run() {
TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token<std::string>());
if (!tokenizer_.endl()) {
TRY_RESULT(s, tokenizer_.get_token<std::string>());
if (s == "all") {
all_ = true;
} else {
return td::Status::Error(PSTRING() << "unexpected token " << s);
}
}
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();
}

td::Status GetAdnlStatsJsonQuery::send() {
auto b =
ton::create_serialize_tl_object<ton::ton_api::engine_validator_getAdnlStats>();
ton::create_serialize_tl_object<ton::ton_api::engine_validator_getAdnlStats>(all_);
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
Expand All @@ -1327,13 +1335,21 @@ td::Status GetAdnlStatsJsonQuery::receive(td::BufferSlice data) {
}

td::Status GetAdnlStatsQuery::run() {
if (!tokenizer_.endl()) {
TRY_RESULT(s, tokenizer_.get_token<std::string>());
if (s == "all") {
all_ = true;
} else {
return td::Status::Error(PSTRING() << "unexpected token " << s);
}
}
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();
}

td::Status GetAdnlStatsQuery::send() {
auto b =
ton::create_serialize_tl_object<ton::ton_api::engine_validator_getAdnlStats>();
ton::create_serialize_tl_object<ton::ton_api::engine_validator_getAdnlStats>(all_);
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
Expand Down
8 changes: 6 additions & 2 deletions validator-engine-console/validator-engine-console-query.h
Original file line number Diff line number Diff line change
Expand Up @@ -1327,14 +1327,16 @@ class GetAdnlStatsJsonQuery : public Query {
return "getadnlstatsjson";
}
static std::string get_help() {
return "getadnlstatsjson <filename>\tsave adnl stats to <filename>";
return "getadnlstatsjson <filename> [all]\tsave adnl stats to <filename>. all - returns all peers (default - only "
"peers with traffic in the last 10 minutes)";
}
std::string name() const override {
return get_name();
}

private:
std::string file_name_;
bool all_ = false;
};

class GetAdnlStatsQuery : public Query {
Expand All @@ -1349,12 +1351,14 @@ class GetAdnlStatsQuery : public Query {
return "getadnlstats";
}
static std::string get_help() {
return "getadnlstats\tdisplay adnl stats";
return "getadnlstats [all]\tdisplay adnl stats. all - returns all peers (default - only peers with traffic in the "
"last 10 minutes)";
}
std::string name() const override {
return get_name();
}

private:
std::string file_name_;
bool all_ = false;
};
2 changes: 1 addition & 1 deletion validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3913,7 +3913,7 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getAdnlSt
return;
}
td::actor::send_closure(
adnl_, &ton::adnl::Adnl::get_stats,
adnl_, &ton::adnl::Adnl::get_stats, query.all_,
[promise = std::move(promise)](td::Result<ton::tl_object_ptr<ton::ton_api::adnl_stats>> R) mutable {
if (R.is_ok()) {
promise.set_value(ton::serialize_tl_object(R.move_as_ok(), true));
Expand Down
14 changes: 13 additions & 1 deletion validator/downloaders/download-state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ void DownloadShardState::start_up() {
void DownloadShardState::got_block_handle(BlockHandle handle) {
handle_ = std::move(handle);

download_state();
if (handle_->received_state()) {
LOG(WARNING) << "shard state " << block_id_.to_str() << " already stored in db";
td::actor::send_closure(manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle_,
[SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
R.ensure();
td::actor::send_closure(SelfId, &DownloadShardState::written_shard_state, R.move_as_ok());
});
} else {
download_state();
}
}

void DownloadShardState::retry() {
Expand Down Expand Up @@ -165,6 +174,7 @@ void DownloadShardState::downloaded_shard_state(td::BufferSlice data) {
}

void DownloadShardState::checked_shard_state() {
LOG(WARNING) << "checked shard state " << block_id_.to_str();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure();
td::actor::send_closure(SelfId, &DownloadShardState::written_shard_state_file);
Expand All @@ -179,6 +189,7 @@ void DownloadShardState::checked_shard_state() {
}

void DownloadShardState::written_shard_state_file() {
LOG(WARNING) << "written shard state file " << block_id_.to_str();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
R.ensure();
td::actor::send_closure(SelfId, &DownloadShardState::written_shard_state, R.move_as_ok());
Expand Down Expand Up @@ -207,6 +218,7 @@ void DownloadShardState::written_shard_state(td::Ref<ShardState> state) {
}

void DownloadShardState::written_block_handle() {
LOG(WARNING) << "finished downloading and storing shard state " << block_id_.to_str();
finish_query();
}

Expand Down
2 changes: 1 addition & 1 deletion validator/manager-init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void ValidatorManagerMasterchainReiniter::choose_masterchain_state() {
}
if (!p || ValidatorManager::is_persistent_state(h->unix_time(), p->unix_time())) {
auto ttl = ValidatorManager::persistent_state_ttl(h->unix_time());
double time_to_download = 3600 * 3;
double time_to_download = 3600 * 8;
if (ttl > td::Clocks::system() + time_to_download) {
handle = h;
break;
Expand Down
Loading

0 comments on commit b69214b

Please sign in to comment.