Skip to content

Commit

Permalink
TQueue: interface for webhooks
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 521f25f183d76bcbb9143270660dd48b3bf8fc83
  • Loading branch information
arseny30 committed Aug 27, 2019
1 parent de5cc3e commit 7a48b9b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
47 changes: 42 additions & 5 deletions tddb/td/db/TQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ bool EventId::operator==(const EventId &other) const {
return id_ == other.id_;
}

bool EventId::operator<(const EventId &other) const {
return id_ < other.id_;
}

StringBuilder &operator<<(StringBuilder &sb, const EventId id) {
return sb << "EventId{" << id.value() << "}";
}
Expand All @@ -86,6 +90,7 @@ class TQueueImpl : public TQueue {
}

void do_push(QueueId queue_id, RawEvent &&raw_event) override {
//LOG(ERROR) << "Push " << queue_id << " " << raw_event.event_id;
CHECK(!raw_event.event_id.empty());
if (raw_event.logevent_id == 0 && callback_) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
Expand Down Expand Up @@ -149,7 +154,24 @@ class TQueueImpl : public TQueue {
return q.tail_id;
}

Result<size_t> get(QueueId queue_id, EventId from_id, double now, MutableSpan<Event> &events) override {
void forget(QueueId queue_id, EventId event_id) override {
auto q_it = queues_.find(queue_id);
if (q_it == queues_.end()) {
return;
}
auto &q = q_it->second;
auto from_events = q.events.as_mutable_span();
auto it = std::lower_bound(from_events.begin(), from_events.end(), event_id,
[](auto &event, EventId event_id) { return event.event_id < event_id; });
if (it == from_events.end() || !(it->event_id == event_id)) {
return;
}
try_pop(queue_id, *it, {}, q.tail_id, 0, true /*force*/);
}

Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &events) override {
//LOG(ERROR) << "Get " << queue_id << " " << from_id;
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
events.truncate(0);
Expand All @@ -171,10 +193,16 @@ class TQueueImpl : public TQueue {
while (true) {
from_events = q.events.as_mutable_span();
ready_n = 0;
i = 0;
for (; i < from_events.size(); i++) {
size_t first_i = 0;
if (!forget_previous) {
first_i = std::lower_bound(from_events.begin(), from_events.end(), from_id,
[](auto &event, EventId event_id) { return event.event_id < event_id; }) -
from_events.begin();
}
//LOG(ERROR) << tag("first_i", first_i) << tag("size", from_events.size());
for (i = first_i; i < from_events.size(); i++) {
auto &from = from_events[i];
try_pop(queue_id, from, from_id, q.tail_id, now);
try_pop(queue_id, from, forget_previous ? from_id : EventId{}, q.tail_id, now);
if (from.data.empty()) {
continue;
}
Expand All @@ -183,6 +211,12 @@ class TQueueImpl : public TQueue {
break;
}

if (from.event_id < from_id) {
// should not happend
UNREACHABLE();
continue;
}

auto &to = events[ready_n];
to.data = from.data;
to.id = from.event_id;
Expand All @@ -191,7 +225,7 @@ class TQueueImpl : public TQueue {
}

// compactify skipped events
if (ready_n * 2 < i) {
if ((first_i + ready_n) * 2 < i) {
compactify(q.events, i);
continue;
}
Expand Down Expand Up @@ -229,6 +263,8 @@ class TQueueImpl : public TQueue {
}

void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now, bool force = false) {
//LOG(ERROR) << event.expire_at << " < " << now << " = " << (event.expire_at < now) << " "
//<< (event.event_id.value() < from_id.value()) << " " << force << " " << event.data.empty();
bool should_drop = event.expire_at < now || event.event_id.value() < from_id.value() || force || event.data.empty();
if (!callback_ || event.logevent_id == 0) {
if (should_drop) {
Expand All @@ -241,6 +277,7 @@ class TQueueImpl : public TQueue {
return;
}

//LOG(ERROR) << "Drop " << queue_id << " " << event.event_id;
if (event.event_id.value() + 1 == tail_id.value()) {
if (!event.data.empty()) {
event.data = {};
Expand Down
6 changes: 5 additions & 1 deletion tddb/td/db/TQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TQueue {
bool empty() const;

bool operator==(const EventId &other) const;
bool operator<(const EventId &other) const;

private:
int32 id_{0};
Expand Down Expand Up @@ -67,10 +68,13 @@ class TQueue {

virtual Result<EventId> push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0;

virtual void forget(QueueId queue_id, EventId event_id) = 0;

virtual EventId get_head(QueueId queue_id) const = 0;
virtual EventId get_tail(QueueId queue_id) const = 0;

virtual Result<size_t> get(QueueId queue_id, EventId from_id, double now, MutableSpan<Event> &events) = 0;
virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &events) = 0;

virtual void run_gc(double now) = 0;

Expand Down
8 changes: 4 additions & 4 deletions test/tqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ TEST(TQueue, hands) {
tqueue->push(qid, "hello", 0);
auto head = tqueue->get_head(qid);
ASSERT_EQ(head.next().ok(), tqueue->get_tail(qid));
ASSERT_EQ(1u, tqueue->get(qid, head, 0, events_span).move_as_ok());
ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
}

class TestTQueue {
Expand Down Expand Up @@ -127,9 +127,9 @@ class TestTQueue {
if (tmp.is_ok()) {
a_from = tmp.move_as_ok();
}
baseline_->get(qid, a_from, now, a_span).move_as_ok();
memory_->get(qid, a_from, now, b_span).move_as_ok();
binlog_->get(qid, a_from, now, c_span).move_as_ok();
baseline_->get(qid, a_from, true, now, a_span).move_as_ok();
memory_->get(qid, a_from, true, now, b_span).move_as_ok();
binlog_->get(qid, a_from, true, now, c_span).move_as_ok();
ASSERT_EQ(a_span.size(), b_span.size());
ASSERT_EQ(a_span.size(), c_span.size());
for (size_t i = 0; i < a_span.size(); i++) {
Expand Down

0 comments on commit 7a48b9b

Please sign in to comment.