Skip to content

Commit

Permalink
[REST] Rest API Produce message. (apache#8125)
Browse files Browse the repository at this point in the history
### Motivation
PIP 64: https://github.com/apache/pulsar/wiki/PIP-64%3A-Introduce-REST-endpoints-for-producing%2C-consuming-and-reading-messages

Tested with Postman

### Modifications

Add produce message rest api.
  • Loading branch information
MarvinCai authored Sep 29, 2021
1 parent 102e3d2 commit b2678be
Show file tree
Hide file tree
Showing 11 changed files with 1,879 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,8 @@ private void addWebServerHandlers(WebService webService,
"org.apache.pulsar.broker.admin.v3", true, attributeMap);
webService.addRestResources("/lookup",
"org.apache.pulsar.broker.lookup", true, attributeMap);
webService.addRestResources("/topics",
"org.apache.pulsar.broker.rest", true, attributeMap);

// Add metrics servlet
webService.addServlet("/metrics",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.rest;

import io.netty.util.Recycler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;

/**
* PublishContext implementation for REST message publishing.
*/
@Slf4j
public class RestMessagePublishContext implements Topic.PublishContext {

private Topic topic;
private long startTimeNs;
private CompletableFuture<PositionImpl> positionFuture;

/**
* Executed from managed ledger thread when the message is persisted.
*/
@Override
public void completed(Exception exception, long ledgerId, long entryId) {
if (exception != null) {
positionFuture.completeExceptionally(exception);
if (log.isInfoEnabled()) {
log.info("Failed to write entry for rest produce request: ledgerId: {}, entryId: {}. "
+ "triggered send callback.",
ledgerId, entryId);
}
} else {
if (log.isInfoEnabled()) {
log.info("Success write topic for rest produce request: {}, ledgerId: {}, entryId: {}. "
+ "triggered send callback.",
topic.getName(), ledgerId, entryId);
}
topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.MICROSECONDS);
positionFuture.complete(PositionImpl.get(ledgerId, entryId));
}
recycle();
}

// recycler
public static RestMessagePublishContext get(CompletableFuture<PositionImpl> positionFuture, Topic topic,
long startTimeNs) {
RestMessagePublishContext callback = RECYCLER.get();
callback.positionFuture = positionFuture;
callback.topic = topic;
callback.startTimeNs = startTimeNs;
return callback;
}

private final Recycler.Handle<RestMessagePublishContext> recyclerHandle;

private RestMessagePublishContext(Recycler.Handle<RestMessagePublishContext> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<RestMessagePublishContext> RECYCLER = new Recycler<RestMessagePublishContext>() {
protected RestMessagePublishContext newObject(Handle<RestMessagePublishContext> handle) {
return new RestMessagePublishContext(handle);
}
};

public void recycle() {
topic = null;
startTimeNs = -1;
recyclerHandle.recycle(this);
}
}
160 changes: 160 additions & 0 deletions pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.rest;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.websocket.data.ProducerMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Api(value = "/persistent", description = "Apis for produce,consume and ack message on topics.", tags = "topics")
public class Topics extends TopicsBase {
private static final Logger log = LoggerFactory.getLogger(Topics.class);

@POST
@Path("/persistent/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void produceOnPersistentTopic(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
ProducerMessages producerMessages) {
try {
validateTopicName(tenant, namespace, encodedTopic);
validateProducePermission();
publishMessages(asyncResponse, producerMessages, authoritative);
} catch (Exception e) {
log.error("[{}] Failed to produce on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@POST
@Path("/persistent/{tenant}/{namespace}/{topic}/partitions/{partition}")
@ApiOperation(value = "Produce message to a partition of a persistent topic.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void produceOnPersistentTopicPartition(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Specify topic partition", required = true)
@PathParam("partition") int partition,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
ProducerMessages producerMessages) {
try {
validateTopicName(tenant, namespace, encodedTopic);
validateProducePermission();
publishMessagesToPartition(asyncResponse, producerMessages, authoritative, partition);
} catch (Exception e) {
log.error("[{}] Failed to produce on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@POST
@Path("/non-persistent/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void produceOnNonPersistentTopic(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false")
boolean authoritative,
ProducerMessages producerMessages) {
try {
validateTopicName(tenant, namespace, encodedTopic);
validateProducePermission();
publishMessages(asyncResponse, producerMessages, authoritative);
} catch (Exception e) {
log.error("[{}] Failed to produce on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@POST
@Path("/non-persistent/{tenant}/{namespace}/{topic}/partitions/{partition}")
@ApiOperation(value = "Produce message to a partition of a persistent topic.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void produceOnNonPersistentTopicPartition(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Specify topic partition", required = true)
@PathParam("partition") int partition,
@QueryParam("authoritative") @DefaultValue("false")
boolean authoritative,
ProducerMessages producerMessages) {
try {
validateTopicName(tenant, namespace, encodedTopic);
validateProducePermission();
publishMessagesToPartition(asyncResponse, producerMessages, authoritative, partition);
} catch (Exception e) {
log.error("[{}] Failed to produce on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

}
Loading

0 comments on commit b2678be

Please sign in to comment.