Skip to content

Commit

Permalink
Replace map with set (apache#8051)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Sep 21, 2020
1 parent 5d31568 commit 01e3074
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.lang.reflect.Field;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand Down Expand Up @@ -767,7 +766,7 @@ public void testCleanProducer() throws Exception {
Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
prodField.setAccessible(true);
@SuppressWarnings("unchecked")
IdentityHashMap<ProducerBase<byte[]>, Boolean> producers = (IdentityHashMap<ProducerBase<byte[]>, Boolean>) prodField
Set<ProducerBase<byte[]>> producers = (Set<ProducerBase<byte[]>>) prodField
.get(pulsarClient);
assertTrue(producers.isEmpty());
pulsarClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
Expand All @@ -35,10 +34,11 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -102,8 +102,8 @@ public enum State {
}

private AtomicReference<State> state = new AtomicReference<>();
private final IdentityHashMap<ProducerBase<?>, Boolean> producers;
private final IdentityHashMap<ConsumerBase<?>, Boolean> consumers;
private final Set<ProducerBase<?>> producers;
private final Set<ConsumerBase<?>> consumers;

private final AtomicLong producerIdGenerator = new AtomicLong();
private final AtomicLong consumerIdGenerator = new AtomicLong();
Expand Down Expand Up @@ -151,8 +151,8 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor());
}
timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
producers = Maps.newIdentityHashMap();
consumers = Maps.newIdentityHashMap();
producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());

if (conf.isEnableTransaction()) {
tcClient = new TransactionCoordinatorClientImpl(this);
Expand Down Expand Up @@ -292,10 +292,8 @@ private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
} else {
producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
}

synchronized (producers) {
producers.put(producer, Boolean.TRUE);
}

producers.add(producer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", topic, ex.getMessage());
producerCreatedFuture.completeExceptionally(ex);
Expand Down Expand Up @@ -384,10 +382,8 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}

synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
}

consumers.add(consumer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata", topic, ex);
consumerSubscribedFuture.completeExceptionally(ex);
Expand All @@ -403,10 +399,8 @@ private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConf
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors,
true /* createTopicIfDoesNotExist */);

synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
}

consumers.add(consumer);

return consumerSubscribedFuture;
}
Expand Down Expand Up @@ -439,10 +433,8 @@ private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerCo
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture,
schema, subscriptionMode, interceptors);

synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
}

consumers.add(consumer);
})
.exceptionally(ex -> {
log.warn("[{}] Failed to get topics under namespace", namespaceName);
Expand Down Expand Up @@ -513,10 +505,8 @@ <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T>
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
ReaderImpl<T> reader = new ReaderImpl<>(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, schema);

synchronized (consumers) {
consumers.put(reader.getConsumer(), Boolean.TRUE);
}

consumers.add(reader.getConsumer());

consumerSubscribedFuture.thenRun(() -> {
readerFuture.complete(reader);
Expand Down Expand Up @@ -569,18 +559,9 @@ public CompletableFuture<Void> closeAsync() {

final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = Lists.newArrayList();

synchronized (producers) {
// Copy to a new list, because the closing will trigger a removal from the map
// and invalidate the iterator
List<ProducerBase<?>> producersToClose = Lists.newArrayList(producers.keySet());
producersToClose.forEach(p -> futures.add(p.closeAsync()));
}

synchronized (consumers) {
List<ConsumerBase<?>> consumersToClose = Lists.newArrayList(consumers.keySet());
consumersToClose.forEach(c -> futures.add(c.closeAsync()));
}

producers.forEach(p -> futures.add(p.closeAsync()));
consumers.forEach(c -> futures.add(c.closeAsync()));

// Need to run the shutdown sequence in a separate thread to prevent deadlocks
// If there are consumers or producers that need to be shutdown we cannot use the same thread
Expand Down

0 comments on commit 01e3074

Please sign in to comment.