Skip to content

Commit

Permalink
[pulsar-client] remove duplicate cnx method (apache#6490)
Browse files Browse the repository at this point in the history
### Motivation
Remove duplicate `cnx()` method for `producer`
  • Loading branch information
rdhabalia authored Mar 6, 2020
1 parent ffe9a92 commit f9ada10
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testGeneratedNameProducerReconnect(TopicDomain domain) throws Pulsar
//simulate create producer timeout.
Thread.sleep(3000);

producer.getConnectionHandler().connectionClosed(producer.getConnectionHandler().getClientCnx());
producer.getConnectionHandler().connectionClosed(producer.getConnectionHandler().cnx());
Assert.assertFalse(producer.isConnected());
Thread.sleep(3000);
Assert.assertEquals(producer.getConnectionHandler().getEpoch(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,15 @@ protected void resetBackoff() {
backoff.reset();
}

protected ClientCnx cnx() {
@VisibleForTesting
public ClientCnx cnx() {
return CLIENT_CNX_UPDATER.get(this);
}

protected boolean isRetriableError(PulsarClientException e) {
return e instanceof PulsarClientException.LookupException;
}

@VisibleForTesting
public ClientCnx getClientCnx() {
return CLIENT_CNX_UPDATER.get(this);
}

protected void setClientCnx(ClientCnx clientCnx) {
CLIENT_CNX_UPDATER.set(this, clientCnx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,7 @@ void connectionClosed(ClientCnx cnx) {

@VisibleForTesting
public ClientCnx getClientCnx() {
return this.connectionHandler.getClientCnx();
return this.connectionHandler.cnx();
}

void setClientCnx(ClientCnx clientCnx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,8 @@ protected ByteBufPair sendMessage(long producerId, long lowestSequenceId, long h
}

private ChecksumType getChecksumType() {
if (connectionHandler.getClientCnx() == null
|| connectionHandler.getClientCnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) {
if (connectionHandler.cnx() == null
|| connectionHandler.cnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) {
return ChecksumType.Crc32c;
} else {
return ChecksumType.None;
Expand Down Expand Up @@ -782,11 +782,11 @@ public CompletableFuture<Void> closeAsync() {

@Override
public boolean isConnected() {
return connectionHandler.getClientCnx() != null && (getState() == State.Ready);
return connectionHandler.cnx() != null && (getState() == State.Ready);
}

public boolean isWritable() {
ClientCnx cnx = connectionHandler.getClientCnx();
ClientCnx cnx = connectionHandler.cnx();
return cnx != null && cnx.channel().isWritable();
}

Expand Down Expand Up @@ -1605,7 +1605,7 @@ void connectionClosed(ClientCnx cnx) {
}

ClientCnx getClientCnx() {
return this.connectionHandler.getClientCnx();
return this.connectionHandler.cnx();
}

void setClientCnx(ClientCnx clientCnx) {
Expand Down

0 comments on commit f9ada10

Please sign in to comment.