Skip to content

Commit

Permalink
Add PulsarClient#isClosed method (apache#8428)
Browse files Browse the repository at this point in the history
Currently there is no way to know if the Pulsar client has been already closed or not, resulting in AlreadyClosedException errors.
  • Loading branch information
eolivelli authored Nov 6, 2020
1 parent 3ff3b31 commit d7bac05
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,13 @@ static ClientBuilder builder() {
* if the forceful shutdown fails
*/
void shutdown() throws PulsarClientException;

/**
* Return internal state of the client. Useful if you want to check that current client is valid.
* @return true is the client has been closed
* @see #shutdown()
* @see #close()
* @see #closeAsync()
*/
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -295,7 +293,7 @@ private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
} else {
producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
}

producers.add(producer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", topic, ex.getMessage());
Expand Down Expand Up @@ -385,7 +383,7 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}

consumers.add(consumer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata", topic, ex);
Expand All @@ -402,7 +400,7 @@ private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConf
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors,
true /* createTopicIfDoesNotExist */);

consumers.add(consumer);

return consumerSubscribedFuture;
Expand Down Expand Up @@ -436,7 +434,7 @@ private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerCo
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture,
schema, subscriptionMode, interceptors);

consumers.add(consumer);
})
.exceptionally(ex -> {
Expand Down Expand Up @@ -508,7 +506,7 @@ <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T>
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
ReaderImpl<T> reader = new ReaderImpl<>(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, schema);

consumers.add(reader.getConsumer());

consumerSubscribedFuture.thenRun(() -> {
Expand Down Expand Up @@ -548,8 +546,16 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(String topic) {
public void close() throws PulsarClientException {
try {
closeAsync().get();
} catch (Exception e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
PulsarClientException unwrapped = PulsarClientException.unwrap(e);
if (unwrapped instanceof PulsarClientException.AlreadyClosedException) {
// this is not a problem
return;
}
throw unwrapped;
}
}

Expand All @@ -562,7 +568,7 @@ public CompletableFuture<Void> closeAsync() {

final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = Lists.newArrayList();

producers.forEach(p -> futures.add(p.closeAsync()));
consumers.forEach(c -> futures.add(c.closeAsync()));

Expand Down Expand Up @@ -602,6 +608,12 @@ public void shutdown() throws PulsarClientException {
}
}

@Override
public boolean isClosed() {
State currentState = state.get();
return currentState == State.Closed || currentState == State.Closing;
}

@Override
public synchronized void updateServiceUrl(String serviceUrl) throws PulsarClientException {
log.info("Updating service URL to {}", serviceUrl);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.Test;

/**
* PulsarClientImpl unit tests.
*/
public class PulsarClientImplTest {

@Test
public void testIsClosed() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
assertFalse(clientImpl.isClosed());
clientImpl.close();
assertTrue(clientImpl.isClosed());
eventLoopGroup.shutdownGracefully().get();
}

}

0 comments on commit d7bac05

Please sign in to comment.