Skip to content

Commit

Permalink
[Issue 2912][pulsar-admin] add get-message-by-id cmd into pulsar-admin (
Browse files Browse the repository at this point in the history
apache#6331)

Fixes apache#2912 

### Motivation
Adding a new command `get-message-by-id` to the pulsar-admin which allows user to look at a single message by providing ledger id and entry id.

### Modifications
- pulsar-admin includes the new command `get-message-by-id`
- pulsar-broker v1/v2 apis to handle the get-message-by-id request from pulsar-admin
- managedCursor to read from ledgers with given ledger id and entry id
  • Loading branch information
nlu90 authored Apr 22, 2020
1 parent 514b6af commit d3ac61f
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -55,11 +54,13 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -1816,10 +1817,43 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati
}
}

protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
boolean authoritative) {
verifyReadOperation(authoritative);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
try {
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(new RestException(exception));
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
asyncResponse.resume(generateResponseWithEntry(entry));
} catch (IOException exception) {
asyncResponse.resume(new RestException(exception));
} finally {
if (entry != null) {
entry.release();
}
}
}
}, null);
} catch (NullPointerException npe) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Message not found"));
} catch (Exception exception) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, exception);
asyncResponse.resume(new RestException(exception));
}
}

protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
verifyReadOperation(authoritative);
// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
Expand All @@ -1831,6 +1865,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Skip messages on a non-persistent topic is not allowed");
}

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
PersistentReplicator repl = null;
PersistentSubscription sub = null;
Expand All @@ -1846,48 +1881,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
} else {
entry = sub.peekNthMessage(messagePosition).get();
}
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();

StreamingOutput stream = new StreamingOutput() {

@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
}
};

return responseBuilder.entity(stream).build();
return generateResponseWithEntry(entry);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
Expand All @@ -1901,6 +1895,57 @@ public void write(OutputStream output) throws IOException, WebApplicationExcepti
}
}

private void verifyReadOperation(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
}
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();

StreamingOutput stream = output -> {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
};

return responseBuilder.entity(stream).build();
}

protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,28 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara
return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}

@GET
@Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
@ApiOperation(hidden = true, value = "Get message by its messageId.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't java admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the messageId does not exist")
})
public void getMessageByID(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("ledgerId") Long ledgerId,
@PathParam("entryId") Long entryId, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("{property}/{cluster}/{namespace}/{topic}/backlog")
@ApiOperation(hidden = true, value = "Get estimated backlog for offline topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,43 @@ public Response peekNthMessage(
return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
@ApiOperation(value = "Get message by its messageId.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void getMessageById(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The ledger id", required = true)
@PathParam("ledgerId") long ledgerId,
@ApiParam(value = "The entry id", required = true)
@PathParam("entryId") long entryId,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("{tenant}/{namespace}/{topic}/backlog")
@ApiOperation(value = "Get estimated backlog for offline topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,32 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages);

/**
* Get a message by its messageId via a topic subscription.
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
*/
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
* Get a message by its messageId via a topic subscription asynchronously.
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
*/
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);

/**
* Create a new subscription on a topic.
*
Expand Down
Loading

0 comments on commit d3ac61f

Please sign in to comment.