Skip to content

Commit

Permalink
Fix TQueue binlog replaying.
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 172d70847206253de981d38fed329276b2f778e0
  • Loading branch information
levlam committed Jul 24, 2020
1 parent db29976 commit 85ba9e5
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
16 changes: 10 additions & 6 deletions tddb/td/db/TQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class TQueueImpl : public TQueue {

bool do_push(QueueId queue_id, RawEvent &&raw_event) override {
CHECK(raw_event.event_id.is_valid());
// raw_event.data can be empty when replaying binlog
if (raw_event.data.size() > MAX_EVENT_LENGTH) {
return false;
}
auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS || raw_event.data.empty() || raw_event.data.size() > MAX_EVENT_LENGTH ||
q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size()) {
if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size()) {
return false;
}
if (q.events.empty() || q.events.back().event_id < raw_event.event_id) {
Expand All @@ -116,16 +119,17 @@ class TQueueImpl : public TQueue {
}

Result<EventId> push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override {
auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS) {
return Status::Error("Queue is full");
}
if (data.empty()) {
return Status::Error("Data is empty");
}
if (data.size() > MAX_EVENT_LENGTH) {
return Status::Error("Data is too big");
}

auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS) {
return Status::Error("Queue is full");
}
if (q.total_event_length > MAX_TOTAL_EVENT_LENGTH - data.size()) {
return Status::Error("Queue size is too big");
}
Expand Down
2 changes: 1 addition & 1 deletion tddb/td/db/TQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class TQueueBinlog : public TQueue::StorageCallback {

uint64 push(QueueId queue_id, const RawEvent &event) override;
void pop(uint64 logevent_id) override;
Status replay(const BinlogEvent &binlog_event, TQueue &q) const;
Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT;

void set_binlog(std::shared_ptr<BinlogT> binlog) {
binlog_ = std::move(binlog);
Expand Down
4 changes: 3 additions & 1 deletion test/tqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ class TestTQueue {
binlog_ = td::TQueue::create();
auto tqueue_binlog = td::make_unique<td::TQueueBinlog<td::Binlog>>();
auto binlog = std::make_shared<td::Binlog>();
binlog->init(binlog_path().str(), [&](const td::BinlogEvent &event) { tqueue_binlog->replay(event, *binlog_); })
binlog
->init(binlog_path().str(),
[&](const td::BinlogEvent &event) { tqueue_binlog->replay(event, *binlog_).ignore(); })
.ensure();
tqueue_binlog->set_binlog(std::move(binlog));
binlog_->set_callback(std::move(tqueue_binlog));
Expand Down

0 comments on commit 85ba9e5

Please sign in to comment.