Skip to content

Commit

Permalink
Fix Websocket Consume Messages in Partitioned Topics (apache#2829)
Browse files Browse the repository at this point in the history
* fix consume messages in partitioned topics on websocket

* add consumeMessagesInPartitionedTopicTest

* add fromByteArrayWithTopic

* remove public
  • Loading branch information
k2la authored and merlimat committed Oct 26, 2018
1 parent 1a1cf22 commit d79499d
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void testProxyStats() throws Exception {
+ "/my-sub?subscriptionType=Failover";
final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/";
final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic;
System.out.println(consumerUri+", "+producerUri);
System.out.println(consumerUri + ", " + producerUri);
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
URI readUri = URI.create(readerUri);
Expand Down Expand Up @@ -424,6 +424,47 @@ public void testProxyStats() throws Exception {
}
}

@Test(timeOut = 10000)
public void consumeMessagesInPartitionedTopicTest() throws Exception {
final String namespace = "my-property/my-ns";
final String topic = namespace + "/" + "my-topic7";
admin.topics().createPartitionedTopic("persistent://" + topic, 3);

final String subscription = "my-sub";
final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic;

URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);

WebSocketClient consumeClient = new WebSocketClient();
WebSocketClient produceClient = new WebSocketClient();

SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
produceSocket.sendMessage(100);
} finally {
stopWebSocketClient(produceClient);
}

Thread.sleep(500);

try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
consumerFuture.get();
} finally {
stopWebSocketClient(consumeClient);
}
}

private void verifyTopicStat(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + topic + "/stats";
WebTarget webTarget = client.target(statUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.Serializable;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;

/**
* Opaque unique identifier of a single message
Expand Down Expand Up @@ -49,6 +50,10 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
return MessageIdImpl.fromByteArray(data);
}

public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException {
return MessageIdImpl.fromByteArrayWithTopic(data, topicName);
}

public static final MessageId earliest = new MessageIdImpl(-1, -1, -1);

public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.pulsar.shaded.com.google.protobuf.v241.UninitializedMessageException;
Expand Down Expand Up @@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
return messageId;
}

public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException {
checkNotNull(data);
ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length));
PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder();

PulsarApi.MessageIdData idData;
try {
idData = builder.mergeFrom(inputStream, null).build();
} catch (UninitializedMessageException e) {
throw new IOException(e);
}

MessageId messageId;
if (idData.hasBatchIndex()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex());
} else {
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
}
if (idData.getPartition() > -1 && topicName != null) {
messageId = new TopicMessageIdImpl(
topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId);
}

inputStream.recycle();
builder.recycle();
idData.recycle();
return messageId;
}

// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
protected byte[] toByteArray(int batchIndex) {
MessageIdData.Builder builder = MessageIdData.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void onWebSocketText(String message) {
MessageId msgId;
try {
ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
msgId = MessageId.fromByteArray(Base64.getDecoder().decode(ack.messageId));
msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic);
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);
Expand Down

0 comments on commit d79499d

Please sign in to comment.