Skip to content

Commit

Permalink
Refactor queue consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Mar 27, 2020
1 parent c6d95ce commit e9b3063
Show file tree
Hide file tree
Showing 14 changed files with 603 additions and 1,300 deletions.
11 changes: 6 additions & 5 deletions src/main/java/cn/infinivision/dataforce/busybee/Consumer.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package cn.infinivision.dataforce.busybee;

import cn.infinivision.dataforce.busybee.pb.meta.Notify;
import cn.infinivision.dataforce.busybee.pb.rpc.QueueConcurrencyFetchRequest;
import cn.infinivision.dataforce.busybee.pb.rpc.QueueConcurrencyFetchResponse;
import cn.infinivision.dataforce.busybee.pb.rpc.QueueFetchRequest;
import cn.infinivision.dataforce.busybee.pb.rpc.QueueFetchResponse;
import cn.infinivision.dataforce.busybee.pb.rpc.QueueJoinGroupRequest;
import cn.infinivision.dataforce.busybee.pb.rpc.QueueJoinGroupResponse;
import cn.infinivision.dataforce.busybee.pb.rpc.Request;
Expand Down Expand Up @@ -178,7 +178,7 @@ void doFetch() {
consumer.client.transport.sent(Request.newBuilder()
.setId(consumer.client.id.incrementAndGet())
.setType(Type.FetchNotify)
.setQueueFetch(QueueConcurrencyFetchRequest.newBuilder()
.setQueueFetch(QueueFetchRequest.newBuilder()
.setId(consumer.tenantId)
.setGroup(ByteString.copyFromUtf8(consumer.group))
.setConsumer(index)
Expand All @@ -205,7 +205,7 @@ void onResponse(Response resp) {
return;
}

QueueConcurrencyFetchResponse fetchResp = resp.getFetchResp();
QueueFetchResponse fetchResp = resp.getFetchResp();
if (fetchResp.getRemoved()) {
log.info("{}/{}/v{} fetch from partition {} at {}, removed",
consumer.tenantId,
Expand All @@ -217,6 +217,7 @@ void onResponse(Response resp) {
return;
}

onFetch(fetchResp);
}

void onError(Throwable cause) {
Expand All @@ -234,7 +235,7 @@ void retryFetch() {
consumer.schedulers.schedule(this::doFetch, 1, TimeUnit.SECONDS);
}

void onFetch(QueueConcurrencyFetchResponse resp) {
void onFetch(QueueFetchResponse resp) {
consumer.client.opts.bizService.execute(() -> {
long after = 1;
try {
Expand Down
311 changes: 148 additions & 163 deletions src/main/java/cn/infinivision/dataforce/busybee/pb/rpc/PB.java

Large diffs are not rendered by default.

Loading

0 comments on commit e9b3063

Please sign in to comment.