Skip to content

Commit

Permalink
Feature - support seek() on Reader (apache#4031)
Browse files Browse the repository at this point in the history
*Motivation*

 fix apache#3976

According to what was discussed in pull apache#3983 it would be an acceptable solution
to add seek() command to Reader in order to reset a non durable cursor after
Reader instance was build.

*Modifications*

  - Bugfix reset() by timestamp on a non-durable consumer, previously the
    cached cursor was not present, therefore the state set by reset() was missed
    resulting in a reset() at the beginning of the cursor instead of a reset()
    at the expected position.
  - Copy seek() commands to Reader interface from Consumer interface.
  - Fix inconsistency with lastDequeuedMessage field after seek() command was
    performed successfully.
  - Fix consumer discarding messages on receive (after seek() command) due to
    messages being present on acknowledge grouping tacker.
  • Loading branch information
lovelle authored and sijie committed Apr 23, 2019
1 parent bd7b0cf commit c70438c
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,23 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
checkManagedLedgerIsOpen();
checkFenced();

return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
ManagedCursor cachedCursor = cursors.get(cursorName);
if (cachedCursor != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor was already created {}", name, cachedCursor);
}
return cachedCursor;
}

NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
(PositionImpl) startCursorPosition);

log.info("[{}] Opened new cursor: {}", name, cursor);
synchronized (this) {
cursors.add(cursor);
}

return cursor;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,126 @@ public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
reader.close();
producer.close();
}

@Test
public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
final int numOfMessage = 10;

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName).create();

for (int i = 0; i < numOfMessage; i++) {
producer.send(String.format("msg num %d", i).getBytes());
}

Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();

assertTrue(reader.hasMessageAvailable());

// Read all messages the first time
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
Assert.assertEquals(message.getData(), String.format("msg num %d", i).getBytes());
}

assertFalse(reader.hasMessageAvailable());

// Perform cursor reset by time
reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));

// Read all messages a second time after seek()
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
Assert.assertEquals(message.getData(), String.format("msg num %d", i).getBytes());
}

// Reader should be finished
assertTrue(reader.isConnected());
assertFalse(reader.hasMessageAvailable());
assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0);

reader.close();
producer.close();
}

@Test
public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
final int numOfMessage = 100;
final int halfMessages = numOfMessage / 2;

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName).create();

for (int i = 0; i < numOfMessage; i++) {
producer.send(String.format("msg num %d", i).getBytes());
}

Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();

assertTrue(reader.hasMessageAvailable());

// Read all messages the first time
MessageId midmessageToSeek = null;
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
Assert.assertEquals(message.getData(), String.format("msg num %d", i).getBytes());

if (i == halfMessages) {
midmessageToSeek = message.getMessageId();
}
}

assertFalse(reader.hasMessageAvailable());

// Perform cursor reset by MessageId to half of the topic
reader.seek(midmessageToSeek);

// Read all halved messages after seek()
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
Assert.assertEquals(message.getData(), String.format("msg num %d", i).getBytes());
}

// Reader should be finished
assertTrue(reader.isConnected());
assertFalse(reader.hasMessageAvailable());
assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 0);

reader.close();
producer.close();
}

@Test
public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
final int numOfMessage = 10;
final int halfMessages = numOfMessage / 2;

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName).create();

long l = System.currentTimeMillis();
for (int i = 0; i < numOfMessage; i++) {
producer.send(String.format("msg num %d", i).getBytes());
Thread.sleep(100);
}

Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();

int plusTime = (halfMessages + 1) * 100;
reader.seek(l + plusTime);

for (int i = halfMessages; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
Assert.assertEquals(message.getData(), String.format("msg num %d", i).getBytes());
}

reader.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,63 @@ public interface Reader<T> extends Closeable {
* @return Whether the reader is connected to the broker
*/
boolean isConnected();

/**
* Reset the subscription associated with this reader to a specific message id.
* <p>
*
* The message id can either be a specific message or represent the first or last messages in the topic.
* <p>
* <ul>
* <li><code>MessageId.earliest</code> : Reset the reader on the earliest message available in the topic
* <li><code>MessageId.latest</code> : Reset the reader on the latest message in the topic
* </ul>
*
* Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
* the individual partitions.
*
* @param messageId the message id where to reposition the reader
*/
void seek(MessageId messageId) throws PulsarClientException;

/**
* Reset the subscription associated with this reader to a specific message publish time.
*
* Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
* the individual partitions.
*
* @param timestamp the message publish time where to reposition the reader
*/
void seek(long timestamp) throws PulsarClientException;

/**
* Reset the subscription associated with this reader to a specific message id.
* <p>
*
* The message id can either be a specific message or represent the first or last messages in the topic.
* <p>
* <ul>
* <li><code>MessageId.earliest</code> : Reset the reader on the earliest message available in the topic
* <li><code>MessageId.latest</code> : Reset the reader on the latest message in the topic
* </ul>
*
* Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
* the individual partitions.
*
* @param messageId the message id where to position the reader
* @return a future to track the completion of the seek operation
*/
CompletableFuture<Void> seekAsync(MessageId messageId);

/**
* Reset the subscription associated with this reader to a specific message publish time.
*
* Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
* the individual partitions.
*
* @param timestamp
* the message publish time where to position the reader
* @return a future to track the completion of the seek operation
*/
CompletableFuture<Void> seekAsync(long timestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
@Override
void close();

void flushAndClean();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,9 @@ public CompletableFuture<Void> seekAsync(long timestamp) {

cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp);
acknowledgmentsGroupingTracker.flushAndClean();
lastDequeuedMessage = MessageId.earliest;
incomingMessages.clear();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
Expand Down Expand Up @@ -1408,6 +1411,9 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {

cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
acknowledgmentsGroupingTracker.flushAndClean();
lastDequeuedMessage = messageId;
incomingMessages.clear();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public void flush() {
public void close() {
// no-op
}

@Override
public void flushAndClean() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ public void flush() {
}
}

@Override
public void flushAndClean() {
flush();
lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
pendingIndividualAcks.clear();
}

@Override
public void close() {
flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
Expand Down Expand Up @@ -156,4 +149,24 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
public boolean isConnected() {
return consumer.isConnected();
}

@Override
public void seek(MessageId messageId) throws PulsarClientException {
consumer.seek(messageId);
}

@Override
public void seek(long timestamp) throws PulsarClientException {
consumer.seek(timestamp);
}

@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
return consumer.seekAsync(messageId);
}

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
return consumer.seekAsync(timestamp);
}
}

0 comments on commit c70438c

Please sign in to comment.