Skip to content

Commit

Permalink
[Broker] Add verification when terminating non-persistent topic (apac…
Browse files Browse the repository at this point in the history
…he#11903)

* [Broker] Add verification when terminating non-persistent

* [Broker] add unit test

* code format
  • Loading branch information
shibd authored Sep 6, 2021
1 parent ad9efae commit d4055b5
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ protected void validateTopicName(String property, String namespace, String encod
}
}

protected void validatePersistentTopicName(String property, String namespace, String encodedTopic) {
validateTopicName(property, namespace, encodedTopic);
if (topicName.getDomain() != TopicDomain.persistent) {
throw new RestException(Status.NOT_ACCEPTABLE, "Need to provide a persistent topic name");
}
}

protected void validatePartitionedTopicName(String tenant, String namespace, String encodedTopic) {
// first, it has to be a validate topic name
validateTopicName(tenant, namespace, encodedTopic);
Expand Down Expand Up @@ -278,6 +285,14 @@ protected void validateTopicName(String property, String cluster, String namespa
}
}

@Deprecated
protected void validatePersistentTopicName(String property, String cluster, String namespace, String encodedTopic) {
validateTopicName(property, cluster, namespace, encodedTopic);
if (topicName.getDomain() != TopicDomain.persistent) {
throw new RestException(Status.NOT_ACCEPTABLE, "Need to provide a persistent topic name");
}
}

protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,11 +673,12 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
@ApiResponse(code = 406, message = "Need to provide a persistent topic name"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public MessageId terminate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
validatePersistentTopicName(property, cluster, namespace, encodedTopic);
return internalTerminate(authoritative);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2442,6 +2442,7 @@ public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Termination of a partitioned topic is not allowed"),
@ApiResponse(code = 406, message = "Need to provide a persistent topic name"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
Expand All @@ -2454,7 +2455,7 @@ public MessageId terminate(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validatePersistentTopicName(tenant, namespace, encodedTopic);
return internalTerminate(authoritative);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,41 @@ protected String domain() {
};
}

private static AdminResource mockNonPersistentResource() {
return new AdminResource() {

@Override
protected String domain() {
return "non-persistent";
}
};
}

@Test
public void testValidatePersistentTopicNameSuccess() {
String tenant = "test-tenant";
String namespace = "test-namespace";
String topic = Codec.encode("test-topic");

AdminResource resource = mockResource();
resource.validatePersistentTopicName(tenant, namespace, topic);
}

@Test
public void testValidatePersistentTopicNameInvalid() {
String tenant = "test-tenant";
String namespace = "test-namespace";
String topic = Codec.encode("test-topic");

AdminResource nPResource = mockNonPersistentResource();
try {
nPResource.validatePersistentTopicName(tenant, namespace, topic);
fail("Should fail validation on non-persistent topic");
} catch (RestException e) {
assertEquals(Status.NOT_ACCEPTABLE.getStatusCode(), e.getResponse().getStatus());
}
}

@Test
public void testValidatePartitionedTopicNameSuccess() {
String tenant = "test-tenant";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,35 @@ public void testTerminatePartitionedTopic() {
verify(response, timeout(5000).times(1)).resume(Arrays.asList(new MessageIdImpl(3, -1, -1)));
}

@Test
public void testTerminate() {
String testLocalTopicName = "topic-not-found";

// 1) Create the nonPartitionTopic topic
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, testLocalTopicName, true);

// 2) Create a subscription
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
(MessageIdImpl) MessageId.earliest, false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

// 3) Assert terminate persistent topic
MessageId messageId = persistentTopics.terminate(testTenant, testNamespace, testLocalTopicName, true);
Assert.assertEquals(messageId, new MessageIdImpl(3, -1, -1));

// 4) Assert terminate non-persistent topic
String nonPersistentTopicName = "non-persistent-topic";
try {
nonPersistentTopic.terminate(testTenant, testNamespace, nonPersistentTopicName, true);
Assert.fail("Should fail validation on non-persistent topic");
} catch (RestException e) {
Assert.assertEquals(Response.Status.NOT_ACCEPTABLE.getStatusCode(), e.getResponse().getStatus());
}
}

@Test
public void testNonPartitionedTopics() {
final String nonPartitionTopic = "non-partitioned-topic";
Expand Down

0 comments on commit d4055b5

Please sign in to comment.