From 9251a4488c10c47e307fa0f1ebf7cb8ee3c2186a Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Tue, 13 Mar 2018 23:03:28 -0700 Subject: [PATCH] Issue 1118: refine HandlerBase to let only ConsumerImpl and ProducerImpl have client-cnx (#1354) * refine handlerBase * change following @sijie's comments --- .../impl/BrokerClientIntegrationTest.java | 2 +- .../pulsar/client/impl/MessageIdTest.java | 7 +- ...andlerBase.java => ConnectionHandler.java} | 129 +++++++----------- .../pulsar/client/impl/ConsumerBase.java | 9 +- .../pulsar/client/impl/ConsumerImpl.java | 47 ++++++- .../pulsar/client/impl/HandlerState.java | 69 ++++++++++ .../client/impl/PartitionedConsumerImpl.java | 14 +- .../client/impl/PartitionedProducerImpl.java | 10 -- .../pulsar/client/impl/ProducerBase.java | 5 +- .../pulsar/client/impl/ProducerImpl.java | 56 ++++++-- .../client/impl/TopicsConsumerImpl.java | 14 +- 11 files changed, 217 insertions(+), 145 deletions(-) rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/{HandlerBase.java => ConnectionHandler.java} (54%) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index d7d0d73b3a382..c5447b199b072 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -59,7 +59,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.HandlerBase.State; +import org.apache.pulsar.client.impl.HandlerState.State; import org.apache.pulsar.common.api.PulsarHandler; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index 323000622093a..4a9912dce107a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -454,13 +454,8 @@ public void testCorruptMessageRemove() throws Exception { } // 5. Verify - - // (5.1) Verify: producer's recoverChecksumError and updateChecksum invoked - verify(producer, times(1)).recoverChecksumError(any(), anyLong()); - verify(producer, times(1)).verifyLocalBufferIsNotCorrupted(any()); - /** - * (5.3) verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates if message is corrupt + * verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates if message is corrupt */ MessageImpl msg2 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()) .build(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java similarity index 54% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 867424e39b2bf..d04e8a847a2a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -20,72 +20,61 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.UnaryOperator; - import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.HandlerState.State; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class HandlerBase { - protected final PulsarClientImpl client; - protected final String topic; - private static final AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, State.class, "state"); - @SuppressWarnings("unused") - private volatile State state = null; - - private static final AtomicReferenceFieldUpdater CLIENT_CNX_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, ClientCnx.class, "clientCnx"); +class ConnectionHandler { + private static final AtomicReferenceFieldUpdater CLIENT_CNX_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, ClientCnx.class, "clientCnx"); @SuppressWarnings("unused") private volatile ClientCnx clientCnx = null; + + protected final HandlerState state; protected final Backoff backoff; - enum State { - Uninitialized, // Not initialized - Connecting, // Client connecting to broker - Ready, // Handler is being used - Closing, // Close cmd has been sent to broker - Closed, // Broker acked the close - Terminated, // Topic associated with this handler - // has been terminated - Failed // Handler is failed - }; - - public HandlerBase(PulsarClientImpl client, String topic, Backoff backoff) { - this.client = client; - this.topic = topic; + interface Connection { + void connectionFailed(PulsarClientException exception); + void connectionOpened(ClientCnx cnx); + } + + protected Connection connection; + + protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) { + this.state = state; + this.connection = connection; this.backoff = backoff; - STATE_UPDATER.set(this, State.Uninitialized); CLIENT_CNX_UPDATER.set(this, null); } protected void grabCnx() { if (CLIENT_CNX_UPDATER.get(this) != null) { - log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", topic, getHandlerName()); + log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", state.topic, state.getHandlerName()); return; } if (!isValidStateForReconnection()) { // Ignore connection closed when we are shutting down - log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this)); + log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState()); return; } try { - client.getConnection(topic) // - .thenAccept(this::connectionOpened) // + state.client.getConnection(state.topic) // + .thenAccept(cnx -> connection.connectionOpened(cnx)) // .exceptionally(this::handleConnectionError); } catch (Throwable t) { - log.warn("[{}] [{}] Exception thrown while getting connection: ", topic, getHandlerName(), t); + log.warn("[{}] [{}] Exception thrown while getting connection: ", state.topic, state.getHandlerName(), t); reconnectLater(t); } } private Void handleConnectionError(Throwable exception) { - log.warn("[{}] [{}] Error connecting to broker: {}", topic, getHandlerName(), exception.getMessage()); - connectionFailed(new PulsarClientException(exception)); + log.warn("[{}] [{}] Error connecting to broker: {}", state.topic, state.getHandlerName(), exception.getMessage()); + connection.connectionFailed(new PulsarClientException(exception)); - State state = STATE_UPDATER.get(this); + State state = this.state.getState(); if (state == State.Uninitialized || state == State.Connecting || state == State.Ready) { reconnectLater(exception); } @@ -96,15 +85,15 @@ private Void handleConnectionError(Throwable exception) { protected void reconnectLater(Throwable exception) { CLIENT_CNX_UPDATER.set(this, null); if (!isValidStateForReconnection()) { - log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this)); + log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState()); return; } long delayMs = backoff.next(); - log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", topic, getHandlerName(), + log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", state.topic, state.getHandlerName(), exception.getMessage(), delayMs / 1000.0); - STATE_UPDATER.set(this, State.Connecting); - client.timer().newTimeout(timeout -> { - log.info("[{}] [{}] Reconnecting after connection was closed", topic, getHandlerName()); + state.setState(State.Connecting); + state.client.timer().newTimeout(timeout -> { + log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName()); grabCnx(); }, delayMs, TimeUnit.MILLISECONDS); } @@ -112,15 +101,15 @@ protected void reconnectLater(Throwable exception) { protected void connectionClosed(ClientCnx cnx) { if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) { if (!isValidStateForReconnection()) { - log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this)); + log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState()); return; } long delayMs = backoff.next(); - STATE_UPDATER.set(this, State.Connecting); - log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", topic, getHandlerName(), cnx.channel(), + state.setState(State.Connecting); + log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", state.topic, state.getHandlerName(), cnx.channel(), delayMs / 1000.0); - client.timer().newTimeout(timeout -> { - log.info("[{}] [{}] Reconnecting after timeout", topic, getHandlerName()); + state.client.timer().newTimeout(timeout -> { + log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName()); grabCnx(); }, delayMs, TimeUnit.MILLISECONDS); } @@ -138,24 +127,6 @@ protected boolean isRetriableError(PulsarClientException e) { return e instanceof PulsarClientException.LookupException; } - // moves the state to ready if it wasn't closed - protected boolean changeToReadyState() { - return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Ready) - || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Ready)); - } - - protected State getState() { - return STATE_UPDATER.get(this); - } - - protected void setState(State s) { - STATE_UPDATER.set(this, s); - } - - protected State getAndUpdateState(final UnaryOperator updater) { - return STATE_UPDATER.getAndUpdate(this, updater); - } - protected ClientCnx getClientCnx() { return CLIENT_CNX_UPDATER.get(this); } @@ -165,28 +136,22 @@ protected void setClientCnx(ClientCnx clientCnx) { } private boolean isValidStateForReconnection() { - State state = STATE_UPDATER.get(this); + State state = this.state.getState(); switch (state) { - case Uninitialized: - case Connecting: - case Ready: - // Ok - return true; - - case Closing: - case Closed: - case Failed: - case Terminated: - return false; + case Uninitialized: + case Connecting: + case Ready: + // Ok + return true; + + case Closing: + case Closed: + case Failed: + case Terminated: + return false; } return false; } - abstract void connectionFailed(PulsarClientException exception); - - abstract void connectionOpened(ClientCnx cnx); - - abstract String getHandlerName(); - - private static final Logger log = LoggerFactory.getLogger(HandlerBase.class); + private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index f51b1b3dba0c6..cc718f3d5fc15 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -43,7 +43,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; -public abstract class ConsumerBase extends HandlerBase implements Consumer { +public abstract class ConsumerBase extends HandlerState implements Consumer { enum ConsumerType { PARTITIONED, NON_PARTITIONED @@ -61,9 +61,10 @@ enum ConsumerType { protected int maxReceiverQueueSize; protected Schema schema; - protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, int receiverQueueSize, - ExecutorService listenerExecutor, CompletableFuture> subscribeFuture, Schema schema) { - super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS)); + protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, + int receiverQueueSize, ExecutorService listenerExecutor, + CompletableFuture> subscribeFuture, Schema schema) { + super(client, topic); this.maxReceiverQueueSize = receiverQueueSize; this.subscription = conf.getSubscriptionName(); this.conf = conf; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ab812a0b396cb..a76529dcf001e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -79,7 +79,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -public class ConsumerImpl extends ConsumerBase { +public class ConsumerImpl extends ConsumerBase implements ConnectionHandler.Connection { private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000; private final long consumerId; @@ -123,6 +123,8 @@ public class ConsumerImpl extends ConsumerBase { private final boolean readCompacted; + private final ConnectionHandler connectionHandler; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -183,9 +185,17 @@ enum SubscriptionMode { metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); } + this.connectionHandler = new ConnectionHandler(this, + new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS), + this); + grabCnx(); } + public ConnectionHandler getConnectionHandler() { + return connectionHandler; + } + public UnAckedMessageTracker getUnAckedMessageTracker() { return unAckedMessageTracker; } @@ -533,7 +543,7 @@ public void operationComplete(Future future) throws Exception { } @Override - void connectionOpened(final ClientCnx cnx) { + public void connectionOpened(final ClientCnx cnx) { setClientCnx(cnx); cnx.registerConsumer(consumerId, this); @@ -612,7 +622,7 @@ void connectionOpened(final ClientCnx cnx) { return null; } log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress()); - if (e.getCause() instanceof PulsarClientException && isRetriableError((PulsarClientException) e.getCause()) + if (e.getCause() instanceof PulsarClientException && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause()) && System.currentTimeMillis() < subscribeTimeout) { reconnectLater(e.getCause()); return null; @@ -677,7 +687,7 @@ void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) { } @Override - void connectionFailed(PulsarClientException exception) { + public void connectionFailed(PulsarClientException exception) { if (System.currentTimeMillis() > subscribeTimeout && subscribeFuture.completeExceptionally(exception)) { setState(State.Failed); log.info("[{}] Consumer creation failed for consumer {}", topic, consumerId); @@ -1431,6 +1441,35 @@ public int hashCode() { return Objects.hash(topic, subscription, consumerName); } + // wrapper for connection methods + ClientCnx cnx() { + return this.connectionHandler.cnx(); + } + + void resetBackoff() { + this.connectionHandler.resetBackoff(); + } + + void connectionClosed(ClientCnx cnx) { + this.connectionHandler.connectionClosed(cnx); + } + + ClientCnx getClientCnx() { + return this.connectionHandler.getClientCnx(); + } + + void setClientCnx(ClientCnx clientCnx) { + this.connectionHandler.setClientCnx(clientCnx); + } + + void reconnectLater(Throwable exception) { + this.connectionHandler.reconnectLater(exception); + } + + void grabCnx() { + this.connectionHandler.grabCnx(); + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java new file mode 100644 index 0000000000000..618958305ad31 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.UnaryOperator; + +abstract class HandlerState { + protected final PulsarClientImpl client; + protected final String topic; + + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(HandlerState.class, State.class, "state"); + @SuppressWarnings("unused") + private volatile State state = null; + + enum State { + Uninitialized, // Not initialized + Connecting, // Client connecting to broker + Ready, // Handler is being used + Closing, // Close cmd has been sent to broker + Closed, // Broker acked the close + Terminated, // Topic associated with this handler + // has been terminated + Failed // Handler is failed + }; + + public HandlerState(PulsarClientImpl client, String topic) { + this.client = client; + this.topic = topic; + STATE_UPDATER.set(this, State.Uninitialized); + } + + // moves the state to ready if it wasn't closed + protected boolean changeToReadyState() { + return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Ready) + || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Ready)); + } + + protected State getState() { + return STATE_UPDATER.get(this); + } + + protected void setState(State s) { + STATE_UPDATER.set(this, s); + } + + abstract String getHandlerName(); + + protected State getAndUpdateState(final UnaryOperator updater) { + return STATE_UPDATER.getAndUpdate(this, updater); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java index f46c2b8b4967d..2d8ad4ed8b2c7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java @@ -130,7 +130,7 @@ private void start() { private void starReceivingMessages() throws PulsarClientException { for (ConsumerImpl consumer : consumers) { - consumer.sendFlowPermitsToBroker(consumer.cnx(), conf.getReceiverQueueSize()); + consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); receiveMessageFromConsumer(consumer); } } @@ -365,18 +365,6 @@ public boolean isConnected() { return consumers.stream().allMatch(ConsumerImpl::isConnected); } - @Override - void connectionFailed(PulsarClientException exception) { - // noop - - } - - @Override - void connectionOpened(ClientCnx cnx) { - // noop - - } - void messageReceived(Message message) { lock.writeLock().lock(); try { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 23a86e612a2c6..494fbce275121 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -217,16 +217,6 @@ public synchronized ProducerStatsRecorderImpl getStats() { private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class); - @Override - void connectionFailed(PulsarClientException exception) { - // noop - } - - @Override - void connectionOpened(ClientCnx cnx) { - // noop - } - @Override String getHandlerName() { return "partition-producer"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 14a9af0c8d03b..45453aedb6b51 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -30,7 +30,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -public abstract class ProducerBase extends HandlerBase implements Producer { +public abstract class ProducerBase extends HandlerState implements Producer { protected final CompletableFuture> producerCreatedFuture; protected final ProducerConfigurationData conf; @@ -38,8 +38,7 @@ public abstract class ProducerBase extends HandlerBase implements Producer protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture> producerCreatedFuture, Schema schema) { - super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, - Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)); + super(client, topic); this.producerCreatedFuture = producerCreatedFuture; this.conf = conf; this.schema = schema; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 8f0586bbd84bb..75661cbc9c088 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -69,7 +69,7 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.ScheduledFuture; -public class ProducerImpl extends ProducerBase implements TimerTask { +public class ProducerImpl extends ProducerBase implements TimerTask, ConnectionHandler.Connection { // Producer id, used to identify a producer within a single connection private final long producerId; @@ -104,6 +104,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask { private final Map metadata; + private final ConnectionHandler connectionHandler; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); @@ -172,9 +174,16 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); } + this.connectionHandler = new ConnectionHandler(this, + new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS), + this); grabCnx(); } + public ConnectionHandler getConnectionHandler() { + return connectionHandler; + } + private boolean isBatchMessagingEnabled() { return conf.isBatchingEnabled(); } @@ -379,8 +388,8 @@ private ByteBufPair sendMessage(long producerId, long sequenceId, int numMessage ByteBuf compressedPayload) throws IOException { ChecksumType checksumType; - if (getClientCnx() == null - || getClientCnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) { + if (connectionHandler.getClientCnx() == null + || connectionHandler.getClientCnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) { checksumType = ChecksumType.Crc32c; } else { checksumType = ChecksumType.None; @@ -568,11 +577,11 @@ public CompletableFuture closeAsync() { @Override public boolean isConnected() { - return getClientCnx() != null && (getState() == State.Ready); + return connectionHandler.getClientCnx() != null && (getState() == State.Ready); } public boolean isWritable() { - ClientCnx cnx = getClientCnx(); + ClientCnx cnx = connectionHandler.getClientCnx(); return cnx != null && cnx.channel().isWritable(); } @@ -808,10 +817,10 @@ protected OpSendMsg newObject(Handle handle) { } @Override - void connectionOpened(final ClientCnx cnx) { + public void connectionOpened(final ClientCnx cnx) { // we set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating the // producer, it will try to grab a new cnx - setClientCnx(cnx); + connectionHandler.setClientCnx(cnx); cnx.registerProducer(producerId, this); log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, cnx.ctx().channel()); @@ -892,7 +901,7 @@ void connectionOpened(final ClientCnx cnx) { producerCreatedFuture.completeExceptionally(cause); client.cleanupProducer(this); } else if (producerCreatedFuture.isDone() || // - (cause instanceof PulsarClientException && isRetriableError((PulsarClientException) cause) + (cause instanceof PulsarClientException && connectionHandler.isRetriableError((PulsarClientException) cause) && System.currentTimeMillis() < createProducerTimeout)) { // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are // still within the initial timeout budget and we are dealing with a retriable error @@ -908,7 +917,7 @@ void connectionOpened(final ClientCnx cnx) { } @Override - void connectionFailed(PulsarClientException exception) { + public void connectionFailed(PulsarClientException exception) { if (System.currentTimeMillis() > createProducerTimeout && producerCreatedFuture.completeExceptionally(exception)) { log.info("[{}] Producer creation failed for producer {}", topic, producerId); @@ -1248,5 +1257,34 @@ public String getProducerName() { return producerName; } + // wrapper for connection methods + ClientCnx cnx() { + return this.connectionHandler.cnx(); + } + + void resetBackoff() { + this.connectionHandler.resetBackoff(); + } + + void connectionClosed(ClientCnx cnx) { + this.connectionHandler.connectionClosed(cnx); + } + + ClientCnx getClientCnx() { + return this.connectionHandler.getClientCnx(); + } + + void setClientCnx(ClientCnx clientCnx) { + this.connectionHandler.setClientCnx(clientCnx); + } + + void reconnectLater(Throwable exception) { + this.connectionHandler.reconnectLater(exception); + } + + void grabCnx() { + this.connectionHandler.grabCnx(); + } + private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java index e8c018cdfb04f..e39de96c1d236 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java @@ -189,7 +189,7 @@ private void startReceivingMessages(List> newConsumers) throws P } if (getState() == State.Ready) { newConsumers.forEach(consumer -> { - consumer.sendFlowPermitsToBroker(consumer.cnx(), conf.getReceiverQueueSize()); + consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); receiveMessageFromConsumer(consumer); }); } @@ -469,18 +469,6 @@ public boolean isConnected() { return consumers.values().stream().allMatch(consumer -> consumer.isConnected()); } - @Override - void connectionFailed(PulsarClientException exception) { - // noop - - } - - @Override - void connectionOpened(ClientCnx cnx) { - // noop - - } - @Override String getHandlerName() { return subscription;