Skip to content

Commit

Permalink
TQueue fixes.
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 87dd51ab556869a6066e9837cf8da34530cfbc4b
  • Loading branch information
levlam committed Jun 12, 2020
1 parent fc8d92b commit 00c30aa
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 41 deletions.
86 changes: 52 additions & 34 deletions tddb/td/db/TQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "td/db/TQueue.h"

#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/binlog/BinlogHelper.h"
#include "td/db/binlog/BinlogInterface.h"

Expand All @@ -28,10 +29,6 @@ namespace td {

using EventId = TQueue::EventId;

static constexpr int32 MAX_DELAY = 7 * 86400;
static constexpr size_t MAX_EVENT_LEN = 65536 * 8;
static constexpr size_t MAX_QUEUE_EVENTS = 1000000;

EventId::EventId() {
}

Expand All @@ -42,10 +39,6 @@ Result<EventId> EventId::from_int32(int32 id) {
return EventId(id);
}

EventId EventId::create_random() {
return from_int32(Random::fast(MAX_QUEUE_EVENTS + 1, MAX_ID / 2)).move_as_ok();
}

bool EventId::is_valid() const {
return !empty() && is_valid_id(id_);
}
Expand Down Expand Up @@ -92,6 +85,9 @@ bool EventId::is_valid_id(int32 id) {
}

class TQueueImpl : public TQueue {
static constexpr size_t MAX_EVENT_LEN = 65536 * 8;
static constexpr size_t MAX_QUEUE_EVENTS = 1000000;

public:
void set_callback(unique_ptr<StorageCallback> callback) override {
callback_ = std::move(callback);
Expand All @@ -100,15 +96,19 @@ class TQueueImpl : public TQueue {
return std::move(callback_);
}

void do_push(QueueId queue_id, RawEvent &&raw_event) override {
bool do_push(QueueId queue_id, RawEvent &&raw_event) override {
// LOG(ERROR) << "Push to queue " << queue_id << " " << raw_event.event_id;
CHECK(raw_event.event_id.is_valid());
if (raw_event.logevent_id == 0 && callback_ != nullptr) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
}
auto &q = queues_[queue_id];
q.tail_id = raw_event.event_id.next().move_as_ok();
q.events.push(std::move(raw_event));
if (q.events.empty() || q.events.back().event_id < raw_event.event_id) {
if (raw_event.logevent_id == 0 && callback_ != nullptr) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
}
q.tail_id = raw_event.event_id.next().move_as_ok();
q.events.push(std::move(raw_event));
return true;
}
return false;
}

Result<EventId> push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override {
Expand All @@ -119,10 +119,17 @@ class TQueueImpl : public TQueue {
if (data.empty()) {
return Status::Error("Data is empty");
}
if (data.size() > MAX_EVENT_LEN) {
return Status::Error("Data is too big");
}
EventId event_id;
while (true) {
if (q.tail_id.empty()) {
q.tail_id = hint_new_id.empty() ? EventId::create_random() : hint_new_id;
if (hint_new_id.empty()) {
q.tail_id = EventId::from_int32(Random::fast(2 * MAX_QUEUE_EVENTS + 1, EventId::MAX_ID / 2)).move_as_ok();
} else {
q.tail_id = hint_new_id;
}
}
event_id = q.tail_id;
CHECK(event_id.is_valid());
Expand All @@ -142,7 +149,8 @@ class TQueueImpl : public TQueue {
raw_event.data = std::move(data);
raw_event.expires_at = expires_at;
raw_event.extra = extra;
do_push(queue_id, std::move(raw_event));
bool is_added = do_push(queue_id, std::move(raw_event));
CHECK(is_added);
return event_id;
}

Expand Down Expand Up @@ -199,7 +207,7 @@ class TQueueImpl : public TQueue {
return Status::Error("Specified from_id is in the past");
}

auto from_events = q.events.as_mutable_span();
MutableSpan<RawEvent> from_events;
size_t ready_n = 0;
size_t i = 0;

Expand All @@ -224,11 +232,7 @@ class TQueueImpl : public TQueue {
break;
}

if (from.event_id < from_id) {
// should not happend
UNREACHABLE();
continue;
}
CHECK(!(from.event_id < from_id));

auto &to = result_events[ready_n];
to.data = from.data;
Expand All @@ -239,7 +243,7 @@ class TQueueImpl : public TQueue {
}

// compactify skipped events
if ((first_i + ready_n) * 2 < i) {
if ((ready_n + 1) * 2 < i + first_i) {
compactify(q.events, i);
continue;
}
Expand Down Expand Up @@ -269,7 +273,11 @@ class TQueueImpl : public TQueue {
std::unordered_map<QueueId, Queue> queues_;
unique_ptr<StorageCallback> callback_;

void compactify(VectorQueue<RawEvent> &events, size_t prefix) {
static void compactify(VectorQueue<RawEvent> &events, size_t prefix) {
if (prefix == events.size()) {
CHECK(!events.empty());
prefix--;
}
auto processed = events.as_mutable_span().substr(0, prefix);
auto removed_n =
processed.rend() - std::remove_if(processed.rbegin(), processed.rend(), [](auto &e) { return e.data.empty(); });
Expand Down Expand Up @@ -365,13 +373,14 @@ uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
TQueueLogEvent log_event;
log_event.queue_id = queue_id;
log_event.event_id = event.event_id.value();
log_event.expires_at = static_cast<int32>(event.expires_at + diff_);
log_event.expires_at = static_cast<int32>(event.expires_at + diff_ + 1);
log_event.data = event.data;
log_event.extra = event.extra;
auto magic = magic_ + (log_event.extra != 0);
if (event.logevent_id == 0) {
return binlog_->add(magic_ + (log_event.extra != 0), log_event);
return binlog_->add(magic, log_event);
}
binlog_->rewrite(event.logevent_id, magic_ + (log_event.extra != 0), log_event);
binlog_->rewrite(event.logevent_id, magic, log_event);
return event.logevent_id;
}

Expand All @@ -381,19 +390,26 @@ void TQueueBinlog<BinlogT>::pop(uint64 logevent_id) {
}

template <class BinlogT>
Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) {
Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) const {
TQueueLogEvent event;
TlParser parser(binlog_event.data_);
event.parse(parser, binlog_event.type_ - magic_);
int32 has_extra = binlog_event.type_ - magic_;
if (has_extra != 0 && has_extra != 1) {
return Status::Error("Wrong magic");
}
event.parse(parser, has_extra);
parser.fetch_end();
TRY_STATUS(parser.get_status());
TRY_RESULT(event_id, EventId::from_int32(event.event_id));
RawEvent raw_event;
raw_event.logevent_id = binlog_event.id_;
raw_event.event_id = event_id;
raw_event.expires_at = event.expires_at - diff_ + 1;
raw_event.expires_at = event.expires_at - diff_;
raw_event.data = event.data.str();
raw_event.extra = event.extra;
q.do_push(event.queue_id, std::move(raw_event));
if (!q.do_push(event.queue_id, std::move(raw_event))) {
return Status::Error("Failed to add event");
}
return Status::OK();
}

Expand All @@ -405,15 +421,17 @@ uint64 TQueueMemoryStorage::push(QueueId queue_id, const RawEvent &event) {
events_[logevent_id] = std::make_pair(queue_id, event);
return logevent_id;
}

void TQueueMemoryStorage::pop(uint64 logevent_id) {
events_.erase(logevent_id);
}

void TQueueMemoryStorage::replay(TQueue &q) {
for (auto e : events_) {
void TQueueMemoryStorage::replay(TQueue &q) const {
for (auto &e : events_) {
auto x = e.second;
x.second.logevent_id = e.first;
q.do_push(x.first, std::move(x.second));
bool is_added = q.do_push(x.first, std::move(x.second));
CHECK(is_added);
}
}

Expand Down
8 changes: 3 additions & 5 deletions tddb/td/db/TQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class TQueue {

static Result<EventId> from_int32(int32 id);

static EventId create_random();

bool is_valid() const;

int32 value() const;
Expand Down Expand Up @@ -98,7 +96,7 @@ class TQueue {
virtual void set_callback(unique_ptr<StorageCallback> callback) = 0;
virtual unique_ptr<StorageCallback> extract_callback() = 0;

virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
virtual bool do_push(QueueId queue_id, RawEvent &&raw_event) = 0;

virtual Result<EventId> push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) = 0;

Expand All @@ -124,7 +122,7 @@ class TQueueBinlog : public TQueue::StorageCallback {

uint64 push(QueueId queue_id, const RawEvent &event) override;
void pop(uint64 logevent_id) override;
Status replay(const BinlogEvent &binlog_event, TQueue &q);
Status replay(const BinlogEvent &binlog_event, TQueue &q) const;

void set_binlog(std::shared_ptr<BinlogT> binlog) {
binlog_ = std::move(binlog);
Expand All @@ -140,7 +138,7 @@ class TQueueMemoryStorage : public TQueue::StorageCallback {
public:
uint64 push(QueueId queue_id, const RawEvent &event) override;
void pop(uint64 logevent_id) override;
void replay(TQueue &q);
void replay(TQueue &q) const;

private:
uint64 next_logevent_id_{1};
Expand Down
10 changes: 8 additions & 2 deletions test/tqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ TEST(TQueue, hands) {
auto qid = 12;
ASSERT_EQ(true, tqueue->get_head(qid).empty());
ASSERT_EQ(true, tqueue->get_tail(qid).empty());
tqueue->push(qid, "hello", 0, 0, td::Auto());
tqueue->push(qid, "hello", 0, 0, td::TQueue::EventId());
auto head = tqueue->get_head(qid);
ASSERT_EQ(head.next().ok(), tqueue->get_tail(qid));
auto tail = tqueue->get_tail(qid);
ASSERT_EQ(head.next().ok(), tail);
ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
ASSERT_EQ(0u, tqueue->get(qid, tail, false, 0, events_span).move_as_ok());
ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
ASSERT_EQ(0u, tqueue->get(qid, tail, true, 0, events_span).move_as_ok());
ASSERT_EQ(0u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
}

class TestTQueue {
Expand Down

0 comments on commit 00c30aa

Please sign in to comment.