Skip to content

Commit

Permalink
fix flaky test testDuplicateConcurrentSubscribeCommand (apache#11089)
Browse files Browse the repository at this point in the history
### Motivation
When running test, it get the following exception accidentally
```
Error:  testDuplicateConcurrentSubscribeCommand(org.apache.pulsar.broker.service.ServerCnxTest)  Time elapsed: 0.75 s  <<< FAILURE!
java.lang.AssertionError: Response is not CommandError but org.apache.pulsar.common.api.proto.CommandSuccess@6d68db expected [true] but found [false]
	at org.testng.Assert.fail(Assert.java:99)
	at org.testng.Assert.failNotEquals(Assert.java:1037)
	at org.testng.Assert.assertTrue(Assert.java:45)
	at org.apache.pulsar.broker.service.ServerCnxTest.testDuplicateConcurrentSubscribeCommand(ServerCnxTest.java:767)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
```
The reason is in the test logic, it subscribe the topic in `Exclusive` mode first, and then subscribe second time and check the subscribe status. If the first subscribe status returns faster than the second subscribe, it will get the above exception.

### Modifications
1. Use `Awaitility.await().untilAsserted` to wait the second subscribe status return, otherwise the test will be terminated by test timeout(30s)
2. format `ServerCnx#handleSubscribe` code style
  • Loading branch information
hangc0276 authored Jun 25, 2021
1 parent 0b67438 commit b71bc65
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -938,145 +938,145 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
remoteAddress, getPrincipal());
}

log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
try {
Metadata.validateMetadata(metadata);
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
return null;
}
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
consumerFuture);

if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
return null;
}
log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
try {
Metadata.validateMetadata(metadata);
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
return null;
}
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
consumerFuture);

if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
return null;
}
}

boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());

service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException(
"Topic " + topicName + " does not exist"));
}
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException(
"Topic " + topicName + " does not exist"));
}

Topic topic = optTopic.get();
Topic topic = optTopic.get();

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName);
boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName);

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}
if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}

if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(
ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec,
isReplicated, keySharedMeta));
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition,
startMessageRollbackDurationSec, isReplicated, keySharedMeta);
}
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
} else {
// The consumer future was completed before by a close command
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn(
"[{}] Error closing consumer created"
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
}
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(
ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec,
isReplicated, keySharedMeta));
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition,
startMessageRollbackDurationSec, isReplicated, keySharedMeta);
}
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
} else {
// The consumer future was completed before by a close command
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn(
"[{}] Error closing consumer created"
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
}

})
.exceptionally(exception -> {
if (exception.getCause() instanceof ConsumerBusyException) {
if (log.isDebugEnabled()) {
log.debug(
"[{}][{}][{}] Failed to create consumer because exclusive consumer"
+ " is already connected: {}",
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage(), exception);
}
})
.exceptionally(exception -> {
if (exception.getCause() instanceof ConsumerBusyException) {
if (log.isDebugEnabled()) {
log.debug(
"[{}][{}][{}] Failed to create consumer because exclusive consumer"
+ " is already connected: {}",
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage(), exception);
}

// If client timed out, the future would have been completed by subsequent close.
// Send error
// back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);
// If client timed out, the future would have been completed by subsequent close.
// Send error
// back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);

return null;
return null;

});
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
});
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
Expand Down
Loading

0 comments on commit b71bc65

Please sign in to comment.