forked from ton-blockchain/ton
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager-disk.hpp
429 lines (370 loc) · 21.3 KB
/
manager-disk.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2020 Telegram Systems LLP
*/
#pragma once
#include "interfaces/validator-manager.h"
#include "interfaces/db.h"
#include "validator-group.hpp"
#include "manager-init.h"
#include "manager-disk.h"
#include "queue-size-counter.hpp"
#include <map>
#include <set>
namespace ton {
namespace validator {
class WaitBlockState;
class WaitZeroState;
class WaitShardState;
class WaitBlockDataDisk;
class ValidatorManagerImpl : public ValidatorManager {
private:
std::vector<td::Ref<ExtMessage>> ext_messages_;
std::vector<td::Ref<IhrMessage>> ihr_messages_;
struct Compare {
bool operator()(const td::Ref<ShardTopBlockDescription> &l, const td::Ref<ShardTopBlockDescription> &r) const {
return l->block_id() < r->block_id();
}
};
std::set<td::Ref<ShardTopBlockDescription>, Compare> shard_blocks_, out_shard_blocks_;
std::vector<td::BufferSlice> shard_blocks_raw_;
struct WaitBlockStateList {
std::vector<std::pair<td::Timestamp, td::Promise<td::Ref<ShardState>>>> waiting_;
td::actor::ActorId<WaitBlockState> actor_;
};
std::map<BlockIdExt, WaitBlockStateList> wait_state_;
struct WaitBlockDataList {
std::vector<std::pair<td::Timestamp, td::Promise<td::Ref<BlockData>>>> waiting_;
td::actor::ActorId<WaitBlockDataDisk> actor_;
};
std::map<BlockIdExt, WaitBlockDataList> wait_block_data_;
std::map<BlockIdExt, std::weak_ptr<BlockHandleInterface>> handles_;
std::unique_ptr<Callback> callback_;
td::actor::ActorOwn<Db> db_;
BlockSeqno last_masterchain_seqno_ = 0;
bool started_ = false;
td::Ref<MasterchainState> last_masterchain_state_;
//BlockHandle last_masterchain_block_;
public:
void install_callback(std::unique_ptr<Callback> new_callback, td::Promise<td::Unit> promise) override {
callback_ = std::move(new_callback);
promise.set_value(td::Unit());
}
void add_permanent_key(PublicKeyHash key, td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void add_temp_key(PublicKeyHash key, td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void del_temp_key(PublicKeyHash key, td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void validate_block_is_next_proof(BlockIdExt prev_block_id, BlockIdExt next_block_id, td::BufferSlice proof,
td::Promise<td::Unit> promise) override;
void validate_block_proof(BlockIdExt block_id, td::BufferSlice proof, td::Promise<td::Unit> promise) override;
void validate_block_proof_link(BlockIdExt block_id, td::BufferSlice proof, td::Promise<td::Unit> promise) override;
void validate_block_proof_rel(BlockIdExt block_id, BlockIdExt rel_block_id, td::BufferSlice proof,
td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void validate_block(ReceivedBlock block, td::Promise<BlockHandle> promise) override;
void prevalidate_block(BlockBroadcast broadcast, td::Promise<td::Unit> promise) override;
//void create_validate_block(BlockId block, td::BufferSlice data, td::Promise<Block> promise) = 0;
void sync_complete(td::Promise<td::Unit> promise) override;
void get_next_block(BlockIdExt block_id, td::Promise<BlockHandle> promise) override;
void get_next_key_blocks(BlockIdExt block_id, td::uint32 cnt, td::Promise<std::vector<BlockIdExt>> promise) override {
UNREACHABLE();
}
void get_block_data(BlockHandle handle, td::Promise<td::BufferSlice> promise) override;
void check_zero_state_exists(BlockIdExt block_id, td::Promise<bool> promise) override;
void get_zero_state(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
void check_persistent_state_exists(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<bool> promise) override;
void get_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::BufferSlice> promise) override;
void get_persistent_state_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset,
td::int64 max_length, td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
}
void get_block_proof(BlockHandle handle, td::Promise<td::BufferSlice> promise) override;
void get_block_proof_link(BlockHandle block_id, td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
}
void get_key_block_proof(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
void get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
//void get_block_description(BlockIdExt block_id, td::Promise<BlockDescription> promise) override;
void new_external_message(td::BufferSlice data) override;
void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) override {
UNREACHABLE();
}
void new_ihr_message(td::BufferSlice data) override;
void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override;
void add_ext_server_id(adnl::AdnlNodeIdShort id) override {
UNREACHABLE();
}
void add_ext_server_port(td::uint16 port) override {
UNREACHABLE();
}
void get_block_handle(BlockIdExt id, bool force, td::Promise<BlockHandle> promise) override;
void set_block_state(BlockHandle handle, td::Ref<ShardState> state,
td::Promise<td::Ref<ShardState>> promise) override;
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) override;
void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) override;
void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) override;
void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override;
void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override;
void wait_block_state_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override;
void set_block_data(BlockHandle handle, td::Ref<BlockData> data, td::Promise<td::Unit> promise) override;
void wait_block_data(BlockHandle handle, td::uint32 priority, td::Timestamp,
td::Promise<td::Ref<BlockData>> promise) override;
void wait_block_data_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp,
td::Promise<td::Ref<BlockData>> promise) override;
void set_block_proof(BlockHandle handle, td::Ref<Proof> proof, td::Promise<td::Unit> promise) override;
void wait_block_proof(BlockHandle handle, td::Timestamp timeout, td::Promise<td::Ref<Proof>> promise) override;
void wait_block_proof_short(BlockIdExt id, td::Timestamp timeout, td::Promise<td::Ref<Proof>> promise) override;
void set_block_proof_link(BlockHandle handle, td::Ref<ProofLink> proof, td::Promise<td::Unit> promise) override;
void wait_block_proof_link(BlockHandle handle, td::Timestamp timeout,
td::Promise<td::Ref<ProofLink>> promise) override;
void wait_block_proof_link_short(BlockIdExt id, td::Timestamp timeout,
td::Promise<td::Ref<ProofLink>> promise) override;
void set_block_signatures(BlockHandle handle, td::Ref<BlockSignatureSet> signatures,
td::Promise<td::Unit> promise) override;
void wait_block_signatures(BlockHandle handle, td::Timestamp timeout,
td::Promise<td::Ref<BlockSignatureSet>> promise) override;
void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout,
td::Promise<td::Ref<BlockSignatureSet>> promise) override;
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise<td::Unit> promise) override;
void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override;
void wait_prev_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override;
void wait_block_message_queue(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
std::vector<ExtMessage::Hash> to_delete) override;
void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay, std::vector<IhrMessage::Hash> to_delete) override;
//void set_first_block(ZeroStateIdExt state, BlockIdExt block, td::Promise<td::Unit> promise) override;
void set_next_block(BlockIdExt prev, BlockIdExt next, td::Promise<td::Unit> promise) override;
void get_block_data_from_db(ConstBlockHandle handle, td::Promise<td::Ref<BlockData>> promise) override;
void get_block_data_from_db_short(BlockIdExt block_id, td::Promise<td::Ref<BlockData>> promise) override;
void get_shard_state_from_db(ConstBlockHandle handle, td::Promise<td::Ref<ShardState>> promise) override;
void get_shard_state_from_db_short(BlockIdExt block_id, td::Promise<td::Ref<ShardState>> promise) override;
void get_block_candidate_from_db(PublicKey source, BlockIdExt id, FileHash collated_data_file_hash,
td::Promise<BlockCandidate> promise) override;
void get_block_proof_from_db(ConstBlockHandle handle, td::Promise<td::Ref<Proof>> promise) override;
void get_block_proof_from_db_short(BlockIdExt id, td::Promise<td::Ref<Proof>> promise) override;
void get_block_proof_link_from_db(ConstBlockHandle handle, td::Promise<td::Ref<ProofLink>> promise) override;
void get_block_proof_link_from_db_short(BlockIdExt id, td::Promise<td::Ref<ProofLink>> promise) override;
void get_block_by_lt_from_db(AccountIdPrefixFull account, LogicalTime lt,
td::Promise<ConstBlockHandle> promise) override;
void get_block_by_unix_time_from_db(AccountIdPrefixFull account, UnixTime ts,
td::Promise<ConstBlockHandle> promise) override;
void get_block_by_seqno_from_db(AccountIdPrefixFull account, BlockSeqno seqno,
td::Promise<ConstBlockHandle> promise) override;
// get block handle declared in parent class
void write_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;
void new_block(BlockHandle handle, td::Ref<ShardState> state, td::Promise<td::Unit> promise) override;
void new_block_cont(BlockHandle handle, td::Ref<ShardState> state, td::Promise<td::Unit> promise);
void get_top_masterchain_state(td::Promise<td::Ref<MasterchainState>> promise) override;
void get_top_masterchain_block(td::Promise<BlockIdExt> promise) override;
void get_top_masterchain_state_block(td::Promise<std::pair<td::Ref<MasterchainState>, BlockIdExt>> promise) override;
void get_last_liteserver_state_block(td::Promise<std::pair<td::Ref<MasterchainState>, BlockIdExt>> promise) override;
void send_get_block_request(BlockIdExt id, td::uint32 priority, td::Promise<ReceivedBlock> promise) override;
void send_get_zero_state_request(BlockIdExt id, td::uint32 priority, td::Promise<td::BufferSlice> promise) override;
void send_get_persistent_state_request(BlockIdExt id, BlockIdExt masterchain_block_id, td::uint32 priority,
td::Promise<td::BufferSlice> promise) override;
void send_get_block_proof_request(BlockIdExt block_id, td::uint32 priority,
td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
}
void send_get_block_proof_link_request(BlockIdExt block_id, td::uint32 priority,
td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
}
void send_get_next_key_blocks_request(BlockIdExt block_id, td::uint32 priority,
td::Promise<std::vector<BlockIdExt>> promise) override {
UNREACHABLE();
}
void send_external_message(td::Ref<ExtMessage> message) override {
new_external_message(message->serialize());
}
void send_ihr_message(td::Ref<IhrMessage> message) override {
new_ihr_message(message->serialize());
}
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast) override {
}
void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) override;
void get_shard_client_state(bool from_db, td::Promise<BlockIdExt> promise) override;
void subscribe_to_shard(ShardIdFull shard) override {
}
void update_async_serializer_state(AsyncSerializerState state, td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void get_async_serializer_state(td::Promise<AsyncSerializerState> promise) override {
UNREACHABLE();
}
void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override;
void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise) override {
promise.set_error(td::Status::Error(ErrorCode::error, "download disabled"));
}
void get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) override {
UNREACHABLE();
}
void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit,
td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
}
void add_shard_block_description(td::Ref<ShardTopBlockDescription> desc);
void register_block_handle(BlockHandle handle, td::Promise<BlockHandle> promise);
void finished_wait_state(BlockIdExt id, td::Result<td::Ref<ShardState>> R);
void finished_wait_data(BlockIdExt id, td::Result<td::Ref<BlockData>> R);
void start_up() override;
void started(ValidatorManagerInitResult result);
void write_fake(BlockCandidate candidate, std::vector<BlockIdExt> prev, BlockIdExt last,
td::Ref<ValidatorSet> val_set);
void validate_fake(BlockCandidate candidate, std::vector<BlockIdExt> prev, BlockIdExt last,
td::Ref<ValidatorSet> val_set);
void complete_fake(BlockIdExt candidate_id);
void check_is_hardfork(BlockIdExt block_id, td::Promise<bool> promise) override {
CHECK(block_id.is_masterchain());
promise.set_result(opts_->is_hardfork(block_id));
}
void get_vertical_seqno(BlockSeqno seqno, td::Promise<td::uint32> promise) override {
promise.set_result(opts_->get_vertical_seqno(seqno));
}
void run_ext_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
}
ValidatorManagerImpl(PublicKeyHash local_id, td::Ref<ValidatorManagerOptions> opts, ShardIdFull shard_id,
BlockIdExt shard_to_block_id, std::string db_root)
: local_id_(local_id)
, opts_(std::move(opts))
, db_root_(db_root)
, shard_to_generate_(shard_id)
, block_to_generate_(shard_to_block_id) {
}
public:
void update_gc_block_handle(BlockHandle handle, td::Promise<td::Unit> promise) override {
promise.set_value(td::Unit());
}
void allow_block_data_gc(BlockIdExt block_id, bool is_archive, td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_block_state_gc(BlockIdExt block_id, td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_zero_state_file_gc(BlockIdExt block_id, td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_persistent_state_file_gc(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_block_signatures_gc(BlockIdExt block_id, td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_block_proof_gc(BlockIdExt block_id, bool is_archive, td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_block_proof_link_gc(BlockIdExt block_id, bool is_archive, td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_block_candidate_gc(BlockIdExt block_id, td::Promise<bool> promise) override {
promise.set_result(false);
}
void allow_block_info_gc(BlockIdExt block_id, td::Promise<bool> promise) override {
promise.set_result(false);
}
void archive(BlockHandle handle, td::Promise<td::Unit> promise) override {
td::actor::send_closure(db_, &Db::archive, std::move(handle), std::move(promise));
}
void update_last_known_key_block(BlockHandle handle, bool send_request) override {
}
void update_shard_client_block_handle(BlockHandle handle, td::Ref<MasterchainState> state,
td::Promise<td::Unit> promise) override {
}
void prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) override {
UNREACHABLE();
}
void prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) override {
UNREACHABLE();
}
void add_perf_timer_stat(std::string name, double duration) override {
}
void truncate(BlockSeqno seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void wait_shard_client_state(BlockSeqno seqno, td::Timestamp timeout, td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
UNREACHABLE();
}
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
queue_size_counter_ =
td::actor::create_actor<QueueSizeCounter>("queuesizecounter", td::Ref<MasterchainState>{}, actor_id(this));
}
td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise));
}
void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) override {
get_block_handle(block_id, false, promise.wrap([](BlockHandle &&handle) -> ConstBlockHandle { return handle; }));
}
void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_lt_from_db(account, lt, std::move(promise));
}
void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_unix_time_from_db(account, ts, std::move(promise));
}
void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_seqno_from_db(account, seqno, std::move(promise));
}
private:
PublicKeyHash local_id_;
private:
td::Ref<ValidatorManagerOptions> opts_;
private:
BlockIdExt last_masterchain_block_id_;
BlockHandle last_masterchain_block_handle_;
std::string db_root_;
ShardIdFull shard_to_generate_;
BlockIdExt block_to_generate_;
int pending_new_shard_block_descr_{0};
std::vector<td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>>> waiting_new_shard_block_descr_;
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
void update_shards();
void update_shard_blocks();
void dec_pending_new_blocks();
ValidatorSessionId get_validator_set_id(ShardIdFull shard, td::Ref<ValidatorSet> val_set);
};
} // namespace validator
} // namespace ton