forked from ton-blockchain/ton
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcatchain-receiver.hpp
240 lines (189 loc) · 9.44 KB
/
catchain-receiver.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
*/
#pragma once
#include <list>
#include <queue>
#include <map>
#include "catchain-types.h"
#include "catchain-receiver.h"
#include "catchain-receiver-source.h"
#include "catchain-received-block.h"
#include "td/db/KeyValueAsync.h"
namespace ton {
namespace catchain {
class CatChainReceiverImpl : public CatChainReceiver {
public:
PrintId print_id() const override {
return PrintId{incarnation_, local_id_};
}
void add_prepared_event(td::BufferSlice data) override {
add_block(std::move(data), std::vector<CatChainBlockHash>());
}
CatChainSessionId get_incarnation() const override {
return incarnation_;
}
void run_block(CatChainReceivedBlock *block) override;
td::uint32 get_forks_cnt() const override {
return total_forks_;
}
td::uint32 get_sources_cnt() const override {
return static_cast<td::uint32>(sources_.size());
}
CatChainReceiverSource *get_source(td::uint32 source_id) const override {
if (source_id >= get_sources_cnt()) {
return nullptr;
}
return sources_[source_id].get();
}
PublicKeyHash get_source_hash(td::uint32 source_id) const override;
CatChainReceiverSource *get_source_by_hash(PublicKeyHash source_hash) const;
CatChainReceiverSource *get_source_by_adnl_id(adnl::AdnlNodeIdShort source_hash) const;
td::uint32 add_fork() override;
void deliver_block(CatChainReceivedBlock *block) override;
void receive_message_from_overlay(adnl::AdnlNodeIdShort src, td::BufferSlice data);
void receive_query_from_overlay(adnl::AdnlNodeIdShort src, td::BufferSlice data,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlock &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlocks &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlockHistory &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getDifference &query,
td::Promise<td::BufferSlice> promise);
template <class T>
void process_query(adnl::AdnlNodeIdShort src, T &query, td::Promise<td::BufferSlice> promise) {
//LOG(WARNING) << this << ": unknown query from " << src;
callback_->on_custom_query(get_source_by_adnl_id(src)->get_hash(), serialize_tl_object(&query, true),
std::move(promise));
}
void receive_broadcast_from_overlay(PublicKeyHash src, td::BufferSlice data);
void receive_block(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);
void receive_block_answer(adnl::AdnlNodeIdShort src, td::BufferSlice);
//void send_block(PublicKeyHash src, tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);
CatChainReceivedBlock *create_block(tl_object_ptr<ton_api::catchain_block> block, td::SharedSlice payload) override;
CatChainReceivedBlock *create_block(tl_object_ptr<ton_api::catchain_block_dep> block) override;
td::Status validate_block_sync(tl_object_ptr<ton_api::catchain_block_dep> &dep) override;
td::Status validate_block_sync(tl_object_ptr<ton_api::catchain_block> &block, td::Slice payload) override;
void send_fec_broadcast(td::BufferSlice data) override;
void send_custom_query_data(PublicKeyHash dst, std::string name, td::Promise<td::BufferSlice> promise,
td::Timestamp timeout, td::BufferSlice query) override;
void send_custom_query_data_via(PublicKeyHash dst, std::string name, td::Promise<td::BufferSlice> promise,
td::Timestamp timeout, td::BufferSlice query, td::uint64 max_answer_size,
td::actor::ActorId<adnl::AdnlSenderInterface> via) override;
void send_custom_message_data(PublicKeyHash dst, td::BufferSlice query) override;
void run_scheduler();
void add_block(td::BufferSlice data, std::vector<CatChainBlockHash> deps) override;
void add_block_cont(tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);
void add_block_cont_2(tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);
void add_block_cont_3(tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);
void debug_add_fork(td::BufferSlice payload, CatChainBlockHeight height,
std::vector<CatChainBlockHash> deps) override;
void debug_add_fork_cont(tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload);
void on_blame(td::uint32 src) override {
callback_->blame(src);
}
void blame_node(td::uint32 idx) override {
}
const CatChainOptions &opts() const override {
return opts_;
}
void got_fork_proof(td::BufferSlice data);
void synchronize_with(CatChainReceiverSource *source);
void alarm() override;
void start_up() override;
void tear_down() override;
void read_db();
void read_db_from(CatChainBlockHash id);
void read_block_from_db(CatChainBlockHash id, td::BufferSlice data);
void block_written_to_db(CatChainBlockHash hash);
void destroy() override;
CatChainReceivedBlock *get_block(CatChainBlockHash hash) const;
CatChainReceiverImpl(std::unique_ptr<Callback> callback, CatChainOptions opts,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<overlay::Overlays>, std::vector<CatChainNode> ids, PublicKeyHash local_id,
CatChainBlockHash unique_hash, std::string db_root);
private:
std::unique_ptr<overlay::Overlays::Callback> make_callback() {
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id,
td::BufferSlice data) override {
td::actor::send_closure(id_, &CatChainReceiverImpl::receive_message_from_overlay, src, std::move(data));
}
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
td::actor::send_closure(id_, &CatChainReceiverImpl::receive_query_from_overlay, src, std::move(data),
std::move(promise));
}
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
td::actor::send_closure(id_, &CatChainReceiverImpl::receive_broadcast_from_overlay, src, std::move(data));
}
Callback(td::actor::ActorId<CatChainReceiverImpl> id) : id_(std::move(id)) {
}
private:
td::actor::ActorId<CatChainReceiverImpl> id_;
};
return std::make_unique<Callback>(actor_id(this));
}
struct PendingBlock {
td::BufferSlice payload_;
std::vector<CatChainBlockHash> deps_;
PendingBlock(td::BufferSlice &&payload, std::vector<CatChainBlockHash> &&deps)
: payload_(std::move(payload)), deps_(std::move(deps)) {
}
};
std::list<std::unique_ptr<PendingBlock>> pending_blocks_;
bool active_send_ = false;
bool read_db_ = false;
td::uint32 pending_in_db_ = 0;
CatChainBlockHash db_root_block_ = CatChainBlockHash::zero();
void choose_neighbours();
std::vector<std::unique_ptr<CatChainReceiverSource>> sources_;
std::map<PublicKeyHash, td::uint32> sources_hashes_;
std::map<adnl::AdnlNodeIdShort, td::uint32> sources_adnl_addrs_;
td::uint32 total_forks_ = 0;
std::map<CatChainBlockHash, std::unique_ptr<CatChainReceivedBlock>> blocks_;
CatChainReceivedBlock *root_block_;
CatChainReceivedBlock *last_sent_block_;
CatChainSessionId incarnation_;
std::unique_ptr<Callback> callback_;
CatChainOptions opts_;
std::vector<td::uint32> neighbours_;
//std::queue<tl_object_ptr<ton_api::catchain_block_inner_Data>> events_;
//std::queue<td::BufferSlice> raw_events_;
td::actor::ActorId<keyring::Keyring> keyring_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<overlay::Overlays> overlay_manager_;
overlay::OverlayIdShort overlay_id_;
overlay::OverlayIdFull overlay_full_id_;
PublicKeyHash local_id_;
td::uint32 local_idx_;
td::Timestamp next_sync_;
td::Timestamp next_rotate_;
std::string db_root_;
using DbType = td::KeyValueAsync<CatChainBlockHash, td::BufferSlice>;
DbType db_;
bool intentional_fork_ = false;
std::list<CatChainReceivedBlock *> to_run_;
};
} // namespace catchain
} // namespace ton
namespace td {
inline td::StringBuilder &operator<<(td::StringBuilder &sb, const ton::catchain::CatChainReceiverImpl *catchain) {
sb << catchain->print_id();
return sb;
}
} // namespace td