Skip to content

Commit

Permalink
PIP-254: Support configuring client version (apache#20009)
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower authored Apr 10, 2023
1 parent 470b674 commit 21c7c62
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,26 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;

/**
* Test Mutual Authentication.
Expand Down Expand Up @@ -182,7 +188,7 @@ public AuthenticationState newAuthState(AuthData authData,
}
}

@BeforeMethod(alwaysRun = true)
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
mutualAuth = new MutualAuthentication();
Expand All @@ -205,7 +211,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.authentication(mutualAuth);
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
internalCleanup();
Expand All @@ -214,12 +220,13 @@ protected void cleanup() throws Exception {
@Test
public void testAuthentication() throws Exception {
log.info("-- Starting {} test --", methodName);
String topic = "persistent://my-property/my-ns/test-authentication";

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/my-ns/my-topic1")
.topic(topic)
.create();

for (int i = 0; i < 10; i++) {
Expand All @@ -239,4 +246,33 @@ public void testAuthentication() throws Exception {

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testClientVersion() throws Exception {
String defaultClientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion();
String topic = "persistent://my-property/my-ns/test-client-version";

Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topic)
.create();
TopicStats stats = admin.topics().getStats(topic);
assertEquals(stats.getPublishers().size(), 1);
assertEquals(stats.getPublishers().get(0).getClientVersion(), defaultClientVersion);

PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
.description("my-java-client")
.serviceUrl(lookupUrl.toString())
.authentication(mutualAuth)
.build();
Producer<byte[]> producer2 = client.newProducer().topic(topic).create();
stats = admin.topics().getStats(topic);
assertEquals(stats.getPublishers().size(), 2);

assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
Sets.newHashSet(defaultClientVersion, defaultClientVersion + "-my-java-client"));

producer1.close();
producer2.close();
client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand All @@ -105,6 +107,8 @@
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -4581,4 +4585,32 @@ public void testSendMsgGreaterThanBatchingMaxBytes() throws Exception {
// sendAsync should complete in time
assertNotNull(producer.sendAsync(msg).get(timeoutSec, TimeUnit.SECONDS));
}
}

@Test
public void testClientVersion() throws Exception {
String defaultClientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion();
String topic = "persistent://my-property/my-ns/test-client-version";

Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topic)
.create();
TopicStats stats = admin.topics().getStats(topic);
assertEquals(stats.getPublishers().size(), 1);
assertEquals(stats.getPublishers().get(0).getClientVersion(), defaultClientVersion);

PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
.description("my-java-client")
.serviceUrl(lookupUrl.toString())
.build();
Producer<byte[]> producer2 = client.newProducer().topic(topic).create();
stats = admin.topics().getStats(topic);
assertEquals(stats.getPublishers().size(), 2);

assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
Sets.newHashSet(defaultClientVersion, defaultClientVersion + "-my-java-client"));

producer1.close();
producer2.close();
client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,27 @@ public ClientBuilder socks5ProxyPassword(String socks5ProxyPassword) {
conf.setSocks5ProxyPassword(socks5ProxyPassword);
return this;
}

/**
* Set the description.
*
* <p> By default, when the client connects to the broker, a version string like "Pulsar-Java-v<x.y.z>" will be
* carried and saved by the broker. The client version string could be queried from the topic stats.
*
* <p> This method provides a way to add more description to a specific PulsarClient instance. If it's configured,
* the description will be appended to the original client version string, with '-' as the separator.
*
* <p>For example, if the client version is 3.0.0, and the description is "forked", the final client version string
* will be "Pulsar-Java-v3.0.0-forked".
*
* @param description the description of the current PulsarClient instance
* @throws IllegalArgumentException if the length of description exceeds 64
*/
public ClientBuilder description(String description) {
if (description != null && description.length() > 64) {
throw new IllegalArgumentException("description should be at most 64 characters");
}
conf.setDescription(description);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ public class ClientCnx extends PulsarHandler {
@Getter
private long lastDisconnectedTimestamp;

private final String clientVersion;

protected enum State {
None, SentConnectFrame, Ready, Failed, Connecting
}
Expand Down Expand Up @@ -252,6 +254,8 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
this.state = State.None;
this.protocolVersion = protocolVersion;
this.idleState = new ClientCnxIdleState(this);
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
}

@Override
Expand Down Expand Up @@ -293,8 +297,7 @@ protected ByteBuf newConnectCommand() throws Exception {
authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()), proxyToTargetBrokerAddress, null, null,
null);
clientVersion, proxyToTargetBrokerAddress, null, null, null);
}

@Override
Expand Down Expand Up @@ -411,7 +414,7 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
ByteBuf request = Commands.newAuthResponse(authentication.getAuthMethodName(),
authData,
this.protocolVersion,
String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
clientVersion);

if (log.isDebugEnabled()) {
log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ public class ClientConfigurationData implements Serializable, Cloneable {
@Secret
private String socks5ProxyPassword;

@ApiModelProperty(
name = "description",
value = "The extra description of the client version. The length cannot exceed 64."
)
private String description;

/**
* Gets the authentication settings for the client.
*
Expand Down

0 comments on commit 21c7c62

Please sign in to comment.