Skip to content

Commit

Permalink
[Broker] Fix create partitioned topic in replicated namespace (apache…
Browse files Browse the repository at this point in the history
…#10963)

Fixes apache#10673 Bug-2

### Motivation

Currently, create a partitioned topic in the replicated namespace will not create metadata path `/managed-ledgers` on replicated clusters.

### Modifications

Add a new flag `createLocalTopicOnly` to indicate whether create the partitioned path in replicated clusters or not.
If the flag is false, make remote calls to create partitioned topics on replicated clusters.
  • Loading branch information
gaoran10 authored Jun 21, 2021
1 parent 87bcee3 commit 2f8c175
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -619,7 +620,8 @@ protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
return topicPartitions;
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
boolean createLocalTopicOnly) {
Integer maxTopicsPerNamespace = null;

try {
Expand Down Expand Up @@ -672,55 +674,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
"Number of partitions should be less than or equal to " + maxPartitions));
return;
}

List<CompletableFuture<Void>> createFutureList = new ArrayList<>();

CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
} else {

try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
namespaceResources().getPartitionedTopicResources()
.createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException) {
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
asyncResponse.resume(
new RestException(Status.CONFLICT, "Partitioned topic already exists"));
} else if (ex.getCause() instanceof BadVersionException) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
ex.getCause());
asyncResponse.resume(new RestException(ex.getCause()));
}
return null;
});
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
return;
}

provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
.thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
.whenComplete((ignored, ex) -> {
if (ex != null) {
createLocalFuture.completeExceptionally(ex);
return;
}
createLocalFuture.complete(null);
});
}).exceptionally(ex -> {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});

if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
getNamespaceReplicatedClusters(namespaceName)
.stream()
.filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
.forEach(cluster -> createFutureList.add(
((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
.createPartitionedTopicAsync(
topicName.getPartitionedTopicName(), numPartitions, true)));
}

FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
if (ex != null) {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
if (ex.getCause() instanceof RestException) {
asyncResponse.resume(ex.getCause());
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
}
return;
}
log.info("[{}] Successfully created partitions for topic {} in cluster {}",
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
asyncResponse.resume(Response.noContent().build());
});
}

/**
Expand All @@ -747,6 +751,42 @@ protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName)
});
}

private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyncResponse,
int numPartitions,
boolean createLocalTopicOnly) {
CompletableFuture<Void> future = new CompletableFuture<>();
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
namespaceResources()
.getPartitionedTopicResources()
.createAsync(partitionedTopicPath, new PartitionedTopicMetadata(numPartitions))
.whenComplete((ignored, ex) -> {
if (ex != null) {
if (ex instanceof AlreadyExistsException) {
if (createLocalTopicOnly) {
future.complete(null);
return;
}
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
future.completeExceptionally(
new RestException(Status.CONFLICT, "Partitioned topic already exists"));
} else if (ex instanceof BadVersionException) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(), topicName);
future.completeExceptionally(
new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
future.completeExceptionally(new RestException(ex.getCause()));
}
return;
}
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
future.complete(null);
});
return future;
}

protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
if (throwable instanceof WebApplicationException) {
asyncResponse.resume((WebApplicationException) throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,17 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal"
+ " to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist")})
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded
String encodedTopic,
int numPartitions) {
public void createPartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
int numPartitions,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,17 @@ public void revokePermissionsOnTopic(@PathParam("property") String property,
@ApiResponse(code = 406, message = "The number of partitions should be "
+ "more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist")})
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded
String encodedTopic,
int numPartitions) {
public void createPartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
int numPartitions,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ public void createPartitionedTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The number of partitions for the topic",
required = true, type = "int", defaultValue = "0")
int numPartitions) {

int numPartitions,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateGlobalNamespaceOwnership(tenant, namespace);
validateTopicName(tenant, namespace, encodedTopic);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,13 @@ public void createPartitionedTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The number of partitions for the topic",
required = true, type = "int", defaultValue = "0")
int numPartitions) {
int numPartitions,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateGlobalNamespaceOwnership(tenant, namespace);
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ public void persistentTopics() throws Exception {
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
Expand Down
Loading

0 comments on commit 2f8c175

Please sign in to comment.