Skip to content

Commit

Permalink
[refactor][client] Reusing multi-topics reader to implement TableView…
Browse files Browse the repository at this point in the history
… auto-update partition (apache#15589)

### Motivation

Currently, the TableViewImpl itself maintains the reader per partition to support auto-update partition, but the reader support muti-partition topic and auto-update partitions, we should reuse the `MultiTopicsReaderImpl` to implement the TableView.

### Modifications

Reusing `MultiTopicsReaderImpl` to implement TableView auto-update partition.
  • Loading branch information
Demogorgon314 authored May 17, 2022
1 parent 5678409 commit 08a7a3e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -182,57 +178,4 @@ public void testTableViewUpdatePartitions() throws Exception {
});
Assert.assertEquals(tv.keySet(), keys2);
}


@Test(timeOut = 30 * 1000)
// Regression test for making sure partition changes are always periodically checked even after a check returned
// exceptionally.
public void testTableViewUpdatePartitionsTriggeredDespiteExceptions() throws Exception {
String topic = "persistent://public/default/tableview-test-update-partitions-triggered-despite-exceptions";
admin.topics().createPartitionedTopic(topic, 3);
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false);
PulsarClient spyPulsarClient = Mockito.spy(pulsarClient);
@Cleanup
TableView<byte[]> tv = spyPulsarClient.newTableViewBuilder(Schema.BYTES)
.topic(topic)
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.create();
log.info("start tv size: {}", tv.size());
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
Assert.assertEquals(tv.size(), count);
});
Assert.assertEquals(tv.keySet(), keys);
tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));

// Let update partition check throw an exception
Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("")))
.when(spyPulsarClient)
.getPartitionsForTopic(Mockito.any());

admin.topics().updatePartitionedTopic(topic, 4);
TopicName topicName = TopicName.get(topic);

// Make sure the get partitions callback is called; it should throw an exception
Mockito.verify(spyPulsarClient).getPartitionsForTopic(Mockito.any());

// Send more data to partition 3, which is not in the current TableView, need update partitions
Set<String> keys2 =
this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);

// Wait for 10 seconds; verify that the messages haven't arrived, which would have happened if the partitions
// has been updated
TimeUnit.SECONDS.sleep(10);
Assert.assertEquals(tv.size(), count);

// Let update partition check succeed, and check the messages eventually arrives
Mockito.doCallRealMethod().when(spyPulsarClient).getPartitionsForTopic(Mockito.any());
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
Assert.assertEquals(tv.size(), count * 2);
});
Assert.assertEquals(tv.keySet(), keys2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.apache.pulsar.client.impl;

import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -34,84 +32,46 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class TableViewImpl<T> implements TableView<T> {

private final PulsarClientImpl client;
private final Schema<T> schema;
private final TableViewConfigurationData conf;

private final ConcurrentMap<String, T> data;
private final Map<String, T> immutableData;

private final ConcurrentMap<String, Reader<T>> readers;
private final CompletableFuture<Reader<T>> reader;

private final List<BiConsumer<String, T>> listeners;
private final ReentrantLock listenersMutex;

TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
this.client = client;
this.schema = schema;
this.conf = conf;
this.data = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
this.readers = new ConcurrentHashMap<>();
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
this.reader = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
.autoUpdatePartitions(true)
.autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS)
.readCompacted(true)
.poolMessages(true)
.createAsync();
}

CompletableFuture<TableView<T>> start() {
return client.getPartitionsForTopic(conf.getTopicName())
.thenCompose(partitions -> {
Set<String> partitionsSet = new HashSet<>(partitions);
List<CompletableFuture<?>> futures = new ArrayList<>();

// Add new Partitions
partitions.forEach(partition -> {
if (!readers.containsKey(partition)) {
futures.add(newReader(partition));
}
});

// Remove partitions that are not used anymore
readers.forEach((existingPartition, existingReader) -> {
if (!partitionsSet.contains(existingPartition)) {
futures.add(existingReader.closeAsync()
.thenRun(() -> readers.remove(existingPartition, existingReader)));
}
});

return FutureUtil.waitForAll(futures)
.thenRun(() -> schedulePartitionsCheck());
}).thenApply(__ -> this);
}

private void schedulePartitionsCheck() {
client.timer()
.newTimeout(this::checkForPartitionsChanges, conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS);
}

private void checkForPartitionsChanges(Timeout timeout) {
if (timeout.isCancelled()) {
return;
}

start().whenComplete((tw, ex) -> {
if (ex != null) {
log.warn("Failed to check for changes in number of partitions:", ex);
schedulePartitionsCheck();
}
});
return reader.thenCompose(this::readAllExistingMessages)
.thenApply(__ -> this);
}

@Override
Expand Down Expand Up @@ -171,11 +131,7 @@ public void forEachAndListen(BiConsumer<String, T> action) {

@Override
public CompletableFuture<Void> closeAsync() {
return FutureUtil.waitForAll(
readers.values().stream()
.map(Reader::closeAsync)
.collect(Collectors.toList())
);
return reader.thenCompose(Reader::closeAsync);
}

@Override
Expand Down Expand Up @@ -217,30 +173,6 @@ private void handleMessage(Message<T> msg) {
}
}

private CompletableFuture<Reader<T>> newReader(String partition) {
return client.newReader(schema)
.topic(partition)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.poolMessages(true)
.createAsync()
.thenCompose(this::cacheNewReader)
.thenCompose(this::readAllExistingMessages);
}

private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
CompletableFuture<Reader<T>> future = new CompletableFuture<>();
if (this.readers.containsKey(reader.getTopic())) {
future.completeExceptionally(
new IllegalArgumentException("reader on partition " + reader.getTopic() + " already existed"));
} else {
this.readers.put(reader.getTopic(), reader);
future.complete(reader);
}

return future;
}

private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
long startTime = System.nanoTime();
AtomicLong messagesRead = new AtomicLong();
Expand Down

0 comments on commit 08a7a3e

Please sign in to comment.