Skip to content

Commit

Permalink
Remove Persistent Topics v3 API - use custom media type instead (apac…
Browse files Browse the repository at this point in the history
…he#14117)

* Remove Persistent Topics v3 API - use custom media type instead

* Don't pass properties if there are none

* Cleanup remains of topicV3, use old API method when no properties are set

* Fix tests
  • Loading branch information
lhotari authored Feb 4, 2022
1 parent 4e29a1e commit 996b301
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
Expand Down Expand Up @@ -243,6 +244,49 @@ public void createPartitionedTopic(
}
}

@PUT
@Consumes(PartitionedTopicMetadata.MEDIA_TYPE)
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Create a partitioned topic.",
notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and"
+ " less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412,
message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createPartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The metadata for the topic",
required = true, type = "PartitionedTopicMetadata") PartitionedTopicMetadata metadata,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, metadata.partitions, createLocalTopicOnly,
metadata.properties);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@PUT
@Path("/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Create a non-partitioned topic.",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {

private PersistentTopics persistentTopics;
private org.apache.pulsar.broker.admin.v3.PersistentTopics persistentTopicsV3;
private final String testTenant = "my-tenant";
private final String testLocalCluster = "use";
private final String testNamespace = "my-namespace";
Expand All @@ -124,23 +123,13 @@ protected void setup() throws Exception {
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
persistentTopicsV3 = spy(org.apache.pulsar.broker.admin.v3.PersistentTopics.class);
persistentTopicsV3.setServletContext(new MockServletContext());
persistentTopicsV3.setPulsar(pulsar);
doReturn(false).when(persistentTopics).isRequestHttps();
doReturn(null).when(persistentTopics).originalPrincipal();
doReturn("test").when(persistentTopics).clientAppId();
doReturn(TopicDomain.persistent.value()).when(persistentTopics).domain();
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();

doReturn(false).when(persistentTopicsV3).isRequestHttps();
doReturn(null).when(persistentTopicsV3).originalPrincipal();
doReturn("test").when(persistentTopicsV3).clientAppId();
doReturn(TopicDomain.persistent.value()).when(persistentTopicsV3).domain();
doNothing().when(persistentTopicsV3).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopicsV3).clientAuthData();

nonPersistentTopic = spy(NonPersistentTopics.class);
nonPersistentTopic.setServletContext(new MockServletContext());
nonPersistentTopic.setPulsar(pulsar);
Expand Down Expand Up @@ -448,7 +437,7 @@ public void testCreatePartitionedTopic() {
Map<String, String> topicMetadata = Maps.newHashMap();
topicMetadata.put("key1", "value1");
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
persistentTopicsV3.createPartitionedTopic(response, testTenant, testNamespace, topicName2, metadata, true);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName2, metadata, true);
Awaitility.await().untilAsserted(() -> {
PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName2, true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* Metadata of a partitioned topic.
*/
public class PartitionedTopicMetadata {
public static final String MEDIA_TYPE = "application/vnd.partitioned-topic-metadata+json";

/* Number of partitions for the topic */
public int partitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
public class TopicsImpl extends BaseResource implements Topics {
private final WebTarget adminTopics;
private final WebTarget adminV2Topics;
private final WebTarget adminV3Topics;
// CHECKSTYLE.OFF: MemberName
private static final String BATCH_HEADER = "X-Pulsar-num-batch-message";
private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size";
Expand Down Expand Up @@ -133,7 +132,6 @@ public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminTopics = web.path("/admin");
adminV2Topics = web.path("/admin/v2");
adminV3Topics = web.path("/admin/v3");
}

@Override
Expand Down Expand Up @@ -347,12 +345,12 @@ public CompletableFuture<Void> createPartitionedTopicAsync(
String topic, int numPartitions, boolean createLocalTopicOnly, Map<String, String> properties) {
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, properties, "partitions")
WebTarget path = topicPath(tn, "partitions")
.queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly));
Entity entity;
if (properties != null) {
if (properties != null && !properties.isEmpty()) {
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions, properties);
entity = Entity.entity(metadata, MediaType.APPLICATION_JSON);
entity = Entity.entity(metadata, MediaType.valueOf(PartitionedTopicMetadata.MEDIA_TYPE));
} else {
entity = Entity.entity(numPartitions, MediaType.APPLICATION_JSON);
}
Expand Down Expand Up @@ -1251,22 +1249,6 @@ private WebTarget namespacePath(String domain, NamespaceName namespace, String..
return namespacePath;
}

/**
* As we support topic metadata, user can add some properties when create topic.
* For compatibility, we have to define a new method, so when metadata is not null, v3 will be called.
* Details could be found here : https://github.com/apache/pulsar/pull/12818#discussion_r789340203
* @param topic
* @param metadata
* @param parts
* @return
*/
private WebTarget topicPath(TopicName topic, Map<String, String> metadata, String... parts) {
final WebTarget base = metadata != null ? adminV3Topics : (topic.isV2() ? adminV2Topics : adminTopics);
WebTarget topicPath = base.path(topic.getRestPath());
topicPath = WebTargets.addParts(topicPath, parts);
return topicPath;
}

private WebTarget topicPath(TopicName topic, String... parts) {
final WebTarget base = topic.isV2() ? adminV2Topics : adminTopics;
WebTarget topicPath = base.path(topic.getRestPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ public void topics() throws Exception {
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);

cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, new HashMap<>());
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);

cmdTopics.run(split("create-missed-partitions persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1");
Expand Down Expand Up @@ -1848,7 +1848,7 @@ public void nonPersistentTopics() throws Exception {
verify(mockTopics).getInternalStats("non-persistent://myprop/ns1/ds1", false);

topics.run(split("create-partitioned-topic non-persistent://myprop/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("non-persistent://myprop/ns1/ds1", 32, new HashMap<>());
verify(mockTopics).createPartitionedTopic("non-persistent://myprop/ns1/ds1", 32, null);

topics.run(split("list myprop/ns1"));
verify(mockTopics).getList("myprop/ns1", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,9 @@ private class CreatePartitionedCmd extends CliCommand {
@Override
void run() throws Exception {
String topic = validateTopicName(params);
Map<String, String> map = new HashMap<>();
if (metadata != null) {
Map<String, String> map = null;
if (metadata != null && !metadata.isEmpty()) {
map = new HashMap<>();
for (String property : metadata) {
if (!property.contains("=")) {
throw new ParameterException(String.format("Invalid key value pair '%s', "
Expand Down

0 comments on commit 996b301

Please sign in to comment.