Skip to content

Commit

Permalink
In PulsarKafkaProducer use flush() from pulsar API (apache#3549)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 11, 2019
1 parent 1b44aaa commit 894146a
Showing 1 changed file with 6 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
Expand Down Expand Up @@ -66,9 +67,6 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private final Partitioner partitioner;
private volatile Cluster cluster = Cluster.empty();

/** Map that contains the last future for each producer */
private final ConcurrentMap<String, CompletableFuture<MessageId>> lastSendFuture = new ConcurrentHashMap<>();

public PulsarKafkaProducer(Map<String, Object> configs) {
this(configs, null, null);
}
Expand Down Expand Up @@ -174,10 +172,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
int messageSize = buildMessage(messageBuilder, record);;

CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
CompletableFuture<MessageId> sendFuture = messageBuilder.sendAsync();
lastSendFuture.put(record.topic(), sendFuture);

sendFuture.thenAccept((messageId) -> {
messageBuilder.sendAsync().thenAccept((messageId) -> {
future.complete(getRecordMetadata(record.topic(), messageBuilder, messageId, messageSize));
}).exceptionally(ex -> {
future.completeExceptionally(ex);
Expand All @@ -197,16 +192,10 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac

@Override
public void flush() {
lastSendFuture.forEach((topic, future) -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

// Remove the futures to remove eventually failed operations in order to trigger errors only once
lastSendFuture.remove(topic, future);
});
producers.values().stream()
.map(p -> p.flushAsync())
.collect(Collectors.toList())
.forEach(CompletableFuture::join);
}

@Override
Expand Down

0 comments on commit 894146a

Please sign in to comment.