Skip to content

Commit

Permalink
TQueue: store timeout in system time, drop old event when possible
Browse files Browse the repository at this point in the history
GitOrigin-RevId: c1aa277d76e85e411828577ce7d6efd2d374058e
  • Loading branch information
arseny30 committed Aug 7, 2019
1 parent 734a7e0 commit 5685591
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
17 changes: 11 additions & 6 deletions tddb/td/db/TQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TQueueImpl : public TQueue {
if (event_id.next().is_ok()) {
break;
}
confirm_read(q, event_id);
confirm_read(q, event_id, 0);
q.tail_id = {};
}

Expand Down Expand Up @@ -161,7 +161,7 @@ class TQueueImpl : public TQueue {
if (from_id.value() < q.tail_id.value() - narrow_cast<int32>(MAX_QUEUE_EVENTS) * 2) {
return Status::Error("from_id is in past");
}
confirm_read(q, from_id);
confirm_read(q, from_id, now);
if (q.events.empty()) {
return 0;
}
Expand Down Expand Up @@ -200,8 +200,9 @@ class TQueueImpl : public TQueue {
std::unordered_map<QueueId, Queue> queues_;
unique_ptr<Callback> callback_;

void confirm_read(Queue &q, EventId till_id) {
while (!q.events.empty() && q.events.front().event_id.value() < till_id.value()) {
void confirm_read(Queue &q, EventId till_id, double now) {
while (!q.events.empty() &&
(q.events.front().event_id.value() < till_id.value() || q.events.front().expire_at < now)) {
if (callback_) {
callback_->pop(q.events.front().logevent_id);
}
Expand Down Expand Up @@ -255,12 +256,16 @@ struct TQueueLogEvent : public Storer {
}
};

template <class BinlogT>
TQueueBinlog<BinlogT>::TQueueBinlog() {
diff_ = Clocks::system() - Time::now();
}
template <class BinlogT>
int64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
TQueueLogEvent log_event;
log_event.queue_id = queue_id;
log_event.event_id = event.event_id.value();
log_event.expire_at = static_cast<int32>(event.expire_at);
log_event.expire_at = static_cast<int32>(event.expire_at + diff_);
log_event.data = event.data;
return binlog_->add(magic_, log_event);
}
Expand All @@ -280,7 +285,7 @@ Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q)
RawEvent raw_event;
raw_event.logevent_id = binlog_event.id_;
raw_event.event_id = event_id;
raw_event.expire_at = event.expire_at;
raw_event.expire_at = event.expire_at - diff_ + 1;
raw_event.data = event.data.str();
q.do_push(event.queue_id, std::move(raw_event));
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions tddb/td/db/TQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ struct BinlogEvent;
template <class BinlogT>
class TQueueBinlog : public TQueue::Callback {
public:
TQueueBinlog();
int64 push(QueueId queue_id, const RawEvent &event) override;
void pop(int64 logevent_id) override;
Status replay(const BinlogEvent &binlog_event, TQueue &q);
Expand All @@ -92,6 +93,7 @@ class TQueueBinlog : public TQueue::Callback {
private:
std::shared_ptr<BinlogT> binlog_;
int32 magic_{2314};
double diff_{0};
};

class MemoryStorage : public TQueue::Callback {
Expand Down

0 comments on commit 5685591

Please sign in to comment.