Skip to content

Commit

Permalink
Issue 1118: refine HandlerBase to let only ConsumerImpl and ProducerI…
Browse files Browse the repository at this point in the history
…mpl have client-cnx (apache#1354)

* refine handlerBase

* change following @sijie's comments
  • Loading branch information
zhaijack authored and merlimat committed Mar 14, 2018
1 parent 381ccc0 commit 9251a44
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.HandlerBase.State;
import org.apache.pulsar.client.impl.HandlerState.State;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,8 @@ public void testCorruptMessageRemove() throws Exception {
}

// 5. Verify

// (5.1) Verify: producer's recoverChecksumError and updateChecksum invoked
verify(producer, times(1)).recoverChecksumError(any(), anyLong());
verify(producer, times(1)).verifyLocalBufferIsNotCorrupted(any());

/**
* (5.3) verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates if message is corrupt
* verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates if message is corrupt
*/
MessageImpl<byte[]> msg2 = (MessageImpl<byte[]>) MessageBuilder.create().setContent("message-1".getBytes())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,72 +20,61 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.UnaryOperator;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.HandlerState.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class HandlerBase {
protected final PulsarClientImpl client;
protected final String topic;
private static final AtomicReferenceFieldUpdater<HandlerBase, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, State.class, "state");
@SuppressWarnings("unused")
private volatile State state = null;

private static final AtomicReferenceFieldUpdater<HandlerBase, ClientCnx> CLIENT_CNX_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, ClientCnx.class, "clientCnx");
class ConnectionHandler {
private static final AtomicReferenceFieldUpdater<ConnectionHandler, ClientCnx> CLIENT_CNX_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, ClientCnx.class, "clientCnx");
@SuppressWarnings("unused")
private volatile ClientCnx clientCnx = null;

protected final HandlerState state;
protected final Backoff backoff;

enum State {
Uninitialized, // Not initialized
Connecting, // Client connecting to broker
Ready, // Handler is being used
Closing, // Close cmd has been sent to broker
Closed, // Broker acked the close
Terminated, // Topic associated with this handler
// has been terminated
Failed // Handler is failed
};

public HandlerBase(PulsarClientImpl client, String topic, Backoff backoff) {
this.client = client;
this.topic = topic;
interface Connection {
void connectionFailed(PulsarClientException exception);
void connectionOpened(ClientCnx cnx);
}

protected Connection connection;

protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) {
this.state = state;
this.connection = connection;
this.backoff = backoff;
STATE_UPDATER.set(this, State.Uninitialized);
CLIENT_CNX_UPDATER.set(this, null);
}

protected void grabCnx() {
if (CLIENT_CNX_UPDATER.get(this) != null) {
log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", topic, getHandlerName());
log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", state.topic, state.getHandlerName());
return;
}

if (!isValidStateForReconnection()) {
// Ignore connection closed when we are shutting down
log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this));
log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());
return;
}

try {
client.getConnection(topic) //
.thenAccept(this::connectionOpened) //
state.client.getConnection(state.topic) //
.thenAccept(cnx -> connection.connectionOpened(cnx)) //
.exceptionally(this::handleConnectionError);
} catch (Throwable t) {
log.warn("[{}] [{}] Exception thrown while getting connection: ", topic, getHandlerName(), t);
log.warn("[{}] [{}] Exception thrown while getting connection: ", state.topic, state.getHandlerName(), t);
reconnectLater(t);
}
}

private Void handleConnectionError(Throwable exception) {
log.warn("[{}] [{}] Error connecting to broker: {}", topic, getHandlerName(), exception.getMessage());
connectionFailed(new PulsarClientException(exception));
log.warn("[{}] [{}] Error connecting to broker: {}", state.topic, state.getHandlerName(), exception.getMessage());
connection.connectionFailed(new PulsarClientException(exception));

State state = STATE_UPDATER.get(this);
State state = this.state.getState();
if (state == State.Uninitialized || state == State.Connecting || state == State.Ready) {
reconnectLater(exception);
}
Expand All @@ -96,31 +85,31 @@ private Void handleConnectionError(Throwable exception) {
protected void reconnectLater(Throwable exception) {
CLIENT_CNX_UPDATER.set(this, null);
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this));
log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());
return;
}
long delayMs = backoff.next();
log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", topic, getHandlerName(),
log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", state.topic, state.getHandlerName(),
exception.getMessage(), delayMs / 1000.0);
STATE_UPDATER.set(this, State.Connecting);
client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after connection was closed", topic, getHandlerName());
state.setState(State.Connecting);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}

protected void connectionClosed(ClientCnx cnx) {
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this));
log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());
return;
}
long delayMs = backoff.next();
STATE_UPDATER.set(this, State.Connecting);
log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", topic, getHandlerName(), cnx.channel(),
state.setState(State.Connecting);
log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", state.topic, state.getHandlerName(), cnx.channel(),
delayMs / 1000.0);
client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", topic, getHandlerName());
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
Expand All @@ -138,24 +127,6 @@ protected boolean isRetriableError(PulsarClientException e) {
return e instanceof PulsarClientException.LookupException;
}

// moves the state to ready if it wasn't closed
protected boolean changeToReadyState() {
return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Ready)
|| STATE_UPDATER.compareAndSet(this, State.Connecting, State.Ready));
}

protected State getState() {
return STATE_UPDATER.get(this);
}

protected void setState(State s) {
STATE_UPDATER.set(this, s);
}

protected State getAndUpdateState(final UnaryOperator<State> updater) {
return STATE_UPDATER.getAndUpdate(this, updater);
}

protected ClientCnx getClientCnx() {
return CLIENT_CNX_UPDATER.get(this);
}
Expand All @@ -165,28 +136,22 @@ protected void setClientCnx(ClientCnx clientCnx) {
}

private boolean isValidStateForReconnection() {
State state = STATE_UPDATER.get(this);
State state = this.state.getState();
switch (state) {
case Uninitialized:
case Connecting:
case Ready:
// Ok
return true;

case Closing:
case Closed:
case Failed:
case Terminated:
return false;
case Uninitialized:
case Connecting:
case Ready:
// Ok
return true;

case Closing:
case Closed:
case Failed:
case Terminated:
return false;
}
return false;
}

abstract void connectionFailed(PulsarClientException exception);

abstract void connectionOpened(ClientCnx cnx);

abstract String getHandlerName();

private static final Logger log = LoggerFactory.getLogger(HandlerBase.class);
private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;

public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T> {
public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {

enum ConsumerType {
PARTITIONED, NON_PARTITIONED
Expand All @@ -61,9 +61,10 @@ enum ConsumerType {
protected int maxReceiverQueueSize;
protected Schema<T> schema;

protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, int receiverQueueSize,
ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS));
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
super(client, topic);
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = conf.getSubscriptionName();
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

public class ConsumerImpl<T> extends ConsumerBase<T> {
public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;

private final long consumerId;
Expand Down Expand Up @@ -123,6 +123,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {

private final boolean readCompacted;

private final ConnectionHandler connectionHandler;

enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Expand Down Expand Up @@ -183,9 +185,17 @@ enum SubscriptionMode {
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
}

this.connectionHandler = new ConnectionHandler(this,
new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS),
this);

grabCnx();
}

public ConnectionHandler getConnectionHandler() {
return connectionHandler;
}

public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
Expand Down Expand Up @@ -533,7 +543,7 @@ public void operationComplete(Future<Void> future) throws Exception {
}

@Override
void connectionOpened(final ClientCnx cnx) {
public void connectionOpened(final ClientCnx cnx) {
setClientCnx(cnx);
cnx.registerConsumer(consumerId, this);

Expand Down Expand Up @@ -612,7 +622,7 @@ void connectionOpened(final ClientCnx cnx) {
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());
if (e.getCause() instanceof PulsarClientException && isRetriableError((PulsarClientException) e.getCause())
if (e.getCause() instanceof PulsarClientException && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
&& System.currentTimeMillis() < subscribeTimeout) {
reconnectLater(e.getCause());
return null;
Expand Down Expand Up @@ -677,7 +687,7 @@ void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
}

@Override
void connectionFailed(PulsarClientException exception) {
public void connectionFailed(PulsarClientException exception) {
if (System.currentTimeMillis() > subscribeTimeout && subscribeFuture.completeExceptionally(exception)) {
setState(State.Failed);
log.info("[{}] Consumer creation failed for consumer {}", topic, consumerId);
Expand Down Expand Up @@ -1431,6 +1441,35 @@ public int hashCode() {
return Objects.hash(topic, subscription, consumerName);
}

// wrapper for connection methods
ClientCnx cnx() {
return this.connectionHandler.cnx();
}

void resetBackoff() {
this.connectionHandler.resetBackoff();
}

void connectionClosed(ClientCnx cnx) {
this.connectionHandler.connectionClosed(cnx);
}

ClientCnx getClientCnx() {
return this.connectionHandler.getClientCnx();
}

void setClientCnx(ClientCnx clientCnx) {
this.connectionHandler.setClientCnx(clientCnx);
}

void reconnectLater(Throwable exception) {
this.connectionHandler.reconnectLater(exception);
}

void grabCnx() {
this.connectionHandler.grabCnx();
}

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

}
Loading

0 comments on commit 9251a44

Please sign in to comment.