From 791d4f317242186b874acc0da2d8d86f05ebdfec Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Wed, 7 Aug 2019 18:13:10 +0300 Subject: [PATCH] TQueue: some fixes. GitOrigin-RevId: f0521fd9c323e05ffaf4877b92ad42a17ee71dcd --- tddb/td/db/TQueue.cpp | 18 ++++++++++++------ tddb/td/db/TQueue.h | 2 +- test/tqueue.cpp | 12 ++++++------ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index 446e279b8356..fdd1bc160495 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -71,6 +71,9 @@ bool EventId::is_valid(int32 id) { class TQueueImpl : public TQueue { public: + static constexpr int32 MAX_DELAY = 7 * 86400; + static constexpr size_t MAX_EVENT_LEN = 65536 * 8; + void set_callback(unique_ptr callback) override { callback_ = std::move(callback); } @@ -141,7 +144,7 @@ class TQueueImpl : public TQueue { return q.tail_id; } - Result get(QueueId queue_id, EventId from_id, double now, MutableSpan events) override { + Result get(QueueId queue_id, EventId from_id, double now, MutableSpan &events) override { auto it = queues_.find(queue_id); if (it == queues_.end()) { return 0; @@ -154,7 +157,8 @@ class TQueueImpl : public TQueue { } auto from_events = q.events.as_span(); - size_t res_n = 0; + size_t ready_n = 0; + size_t left_n = 0; for (size_t i = 0; i < from_events.size(); i++) { auto &from = from_events[i]; if (from.expire_at < now) { @@ -162,17 +166,19 @@ class TQueueImpl : public TQueue { continue; } - auto &to = events[res_n]; + auto &to = events[ready_n]; to.data = from.data; to.id = from.event_id; to.expire_at = from.expire_at; - res_n++; - if (res_n == events.size()) { + ready_n++; + if (ready_n == events.size()) { + left_n += from_events.size() - i - 1; break; } } - return res_n; + events.truncate(ready_n); + return ready_n + left_n; } private: diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index ff78b13ac2cd..18cb307297b3 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -68,7 +68,7 @@ class TQueue { virtual EventId get_head(QueueId queue_id) const = 0; virtual EventId get_tail(QueueId queue_id) const = 0; - virtual Result get(QueueId queue_id, EventId from_id, double now, MutableSpan events) = 0; + virtual Result get(QueueId queue_id, EventId from_id, double now, MutableSpan &events) = 0; static unique_ptr create(unique_ptr callback = {}); }; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index 478db0d75c55..a0373479ff7a 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -115,12 +115,12 @@ class TestTQueue { if (tmp.is_ok()) { a_from = tmp.move_as_ok(); } - auto a_size = baseline_->get(qid, a_from, 0, a_span).move_as_ok(); - auto b_size = memory_->get(qid, a_from, 0, b_span).move_as_ok(); - auto c_size = binlog_->get(qid, a_from, 0, c_span).move_as_ok(); - ASSERT_EQ(a_size, b_size); - ASSERT_EQ(a_size, c_size); - for (size_t i = 0; i < a_size; i++) { + baseline_->get(qid, a_from, 0, a_span).move_as_ok(); + memory_->get(qid, a_from, 0, b_span).move_as_ok(); + binlog_->get(qid, a_from, 0, c_span).move_as_ok(); + ASSERT_EQ(a_span.size(), b_span.size()); + ASSERT_EQ(a_span.size(), c_span.size()); + for (size_t i = 0; i < a_span.size(); i++) { ASSERT_EQ(a_span[i].id, b_span[i].id); ASSERT_EQ(a_span[i].id, c_span[i].id); ASSERT_EQ(a_span[i].data, b_span[i].data);