Skip to content

Commit

Permalink
Admin Tool: Adding support to peek batch Messages (apache#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
jai1 authored Sep 15, 2017
1 parent 8f15858 commit dd635ac
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,9 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara
responseBuilder.header("X-Pulsar-publish-time",
DATE_FORMAT.format(Instant.ofEpochMilli(metadata.getPublishTime())));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -48,6 +49,10 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand All @@ -65,9 +70,12 @@
import com.google.gson.Gson;
import com.google.gson.JsonObject;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class PersistentTopicsImpl extends BaseResource implements PersistentTopics {
private final WebTarget persistentTopics;

private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
public PersistentTopicsImpl(WebTarget web, Authentication auth) {
super(auth);
this.persistentTopics = web.path("/persistent");
Expand Down Expand Up @@ -515,19 +523,18 @@ public CompletableFuture<Void> expireMessagesForAllSubscriptionsAsync(String des
Entity.entity("", MediaType.APPLICATION_JSON));
}

private CompletableFuture<Message> peekNthMessage(String destination, String subName, int messagePosition) {
private CompletableFuture<List<Message>> peekNthMessage(String destination, String subName, int messagePosition) {
DestinationName ds = validateTopic(destination);
String encodedSubName = Codec.encode(subName);
final CompletableFuture<Message> future = new CompletableFuture<Message>();
final CompletableFuture<List<Message>> future = new CompletableFuture<List<Message>>();
asyncGetRequest(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("subscription")
.path(encodedSubName).path("position").path(String.valueOf(messagePosition)),
new InvocationCallback<Response>() {

@Override
public void completed(Response response) {
try {
Message msg = getMessageFromHttpResponse(response);
future.complete(msg);
future.complete(getMessageFromHttpResponse(response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
Expand All @@ -543,7 +550,6 @@ public void failed(Throwable throwable) {

@Override
public List<Message> peekMessages(String destination, String subName, int numMessages) throws PulsarAdminException {

try {
return peekMessagesAsync(destination, subName, numMessages).get();
} catch (ExecutionException e) {
Expand All @@ -557,39 +563,38 @@ public List<Message> peekMessages(String destination, String subName, int numMes
@Override
public CompletableFuture<List<Message>> peekMessagesAsync(String destination, String subName, int numMessages) {
checkArgument(numMessages > 0);
List<Message> messages = Lists.newArrayList();
CompletableFuture<List<Message>> futures = new CompletableFuture<List<Message>>();
CompletableFuture<List<Message>> future = new CompletableFuture<List<Message>>();
peekMessagesAsync(destination, subName, numMessages, Lists.newArrayList(), future, 1);
return future;
}


private void peekMessagesAsync(String destination, String subName, int numMessages,
List<Message> messages, CompletableFuture<List<Message>> future, int nthMessage) {
if (numMessages <= 0) {
future.complete(messages);
return;
}

// if peeking first message succeeds, we know that the topic and subscription exists
peekNthMessage(destination, subName, 1).handle((r, ex) -> {
peekNthMessage(destination, subName, nthMessage).handle((r, ex) -> {
if (ex != null) {
futures.completeExceptionally(ex);
} else {
messages.add(r);
List<CompletableFuture<Message>> futureMessages = Lists.newArrayList();
for (int i = 2; i <= numMessages; i++) {
futureMessages.add(peekNthMessage(destination, subName, i));
}

try {
for (CompletableFuture<Message> futureMessage : futureMessages) {
messages.add(futureMessage.get());
}
} catch (Exception e) {
// if we get a not found exception, it means that the position for the message we are trying to get
// does not exist. At this point, we can return the already found messages.
if (!(e.getCause() instanceof NotFoundException)) {
futures.completeExceptionally(e.getCause());
return null;
// if we get a not found exception, it means that the position for the message we are trying to get
// does not exist. At this point, we can return the already found messages.
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occured while trying to peek Messages.", ex.getMessage());
future.complete(messages);
} else {
future.completeExceptionally(ex);
}
return null;
}

futures.complete(messages);
}
return null;
} );

return futures;
for (int i = 0; i < Math.min(r.size(), numMessages); i++) {
messages.add(r.get(i));
}
peekMessagesAsync(destination, subName, numMessages - r.size(), messages, future, nthMessage + 1);
return null;
});
}

@Override
Expand Down Expand Up @@ -655,7 +660,7 @@ private DestinationName validateTopic(String destination) {
return DestinationName.get(destination);
}

private Message getMessageFromHttpResponse(Response response) throws Exception {
private List<Message> getMessageFromHttpResponse(Response response) throws Exception {

if (response.getStatus() != Status.OK.getStatusCode()) {
if (response.getStatus() >= 500) {
Expand All @@ -676,25 +681,57 @@ private Message getMessageFromHttpResponse(Response response) throws Exception {

Map<String, String> properties = Maps.newTreeMap();
MultivaluedMap<String, Object> headers = response.getHeaders();
Object publishTime = headers.getFirst("X-Pulsar-publish-time");
if (publishTime != null) {
properties.put("publish-time", (String) publishTime);
Object tmp = headers.getFirst("X-Pulsar-publish-time");
if (tmp != null) {
properties.put("publish-time", (String) tmp);
}
tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String)tmp);
return getIndividualMsgsFromBatch(msgId, data, properties);
}
for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
if (header.contains("X-Pulsar-PROPERTY-")) {
String keyName = header.substring(header.indexOf("X-Pulsar-PROPERTY-") + 1, header.length());
String keyName = header.substring("X-Pulsar-PROPERTY-".length(), header.length());
properties.put(keyName, (String) entry.getValue().get(0));
}
}

return new MessageImpl(msgId, properties, data);
return Lists.newArrayList(new MessageImpl(msgId, properties, data));
} finally {
if (stream != null) {
stream.close();
}
}
}

private List<Message> getIndividualMsgsFromBatch(String msgId, byte[] data, Map<String, String> properties) {
List<Message> ret = new ArrayList<Message>();
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
for (int i = 0; i < batchSize; i++) {
String batchMsgId = msgId + ":" + i;
PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
.newBuilder();
ByteBuf buf = Unpooled.wrappedBuffer(data);
try {
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadataBuilder, i,
batchSize);
SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();
if (singleMessageMetadata.getPropertiesCount() > 0) {
for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl(batchMsgId, properties, singleMessagePayload));
} catch (Exception ex) {
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
}
buf.release();
singleMessageMetadataBuilder.recycle();
}
return ret;
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopicsImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;

import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -465,8 +466,13 @@ void run() throws PulsarAdminException {
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (msg.getProperties().size() > 0) {
System.out.println("Properties:");
print(msg.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,14 @@ public MessageImpl(String msgId, Map<String, String> properties, byte[] payload)
}

public MessageImpl(String msgId, Map<String, String> properties, ByteBuf payload) {
long ledgerId = Long.parseLong(msgId.substring(0, msgId.indexOf(':')));
long entryId = Long.parseLong(msgId.substring(msgId.indexOf(':') + 1));
this.messageId = new MessageIdImpl(ledgerId, entryId, -1);
String[] data = msgId.split(":");
long ledgerId = Long.parseLong(data[0]);
long entryId = Long.parseLong(data[1]);
if (data.length == 3) {
this.messageId = new BatchMessageIdImpl(ledgerId, entryId, -1, Integer.parseInt(data[2]));
} else {
this.messageId = new MessageIdImpl(ledgerId, entryId, -1);
}
this.cnx = null;
this.payload = payload;
this.properties = Collections.unmodifiableMap(properties);
Expand Down

0 comments on commit dd635ac

Please sign in to comment.