Skip to content

Commit

Permalink
Support exclude the message when reset cursor by message ID (apache#8306
Browse files Browse the repository at this point in the history
)

Fixes apache#8259
### Motivation
Currently, when reset the cursor to a position, the broker will set the mark delete position to the previous position of the reset position. For some usecase, we don't want to consume the reset position again, so it's better to provide a way to reset the cursor to a specific position and exclude this position. So that the consumers under the subscription can start consume messages from the next position of the reset position.

### Modifications
Add a new API to exclude the message when reset cursor by message ID

### Verifying this change
SimpleProducerConsumerTest#testResetPosition
  • Loading branch information
315157973 authored Oct 21, 2020
1 parent a5b4146 commit 656bcb2
Show file tree
Hide file tree
Showing 18 changed files with 424 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2006,7 +2006,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn
}

protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId) {
MessageIdImpl messageId, boolean isExcluded) {
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down Expand Up @@ -2038,7 +2038,9 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String
try {
PersistentSubscription sub = topic.getSubscription(subName);
Preconditions.checkNotNull(sub);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).thenRun(() -> {
PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
position = isExcluded ? position.getNext() : position;
sub.resetCursor(position).thenRun(() -> {
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
asyncResponse.resume(Response.noContent().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public void resetCursorOnPosition(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId);
internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId, false);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.v2;

import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import static org.apache.pulsar.common.util.Codec.decode;

Expand Down Expand Up @@ -1216,15 +1217,17 @@ public void resetCursorOnPosition(
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
MessageIdImpl messageId) {
ResetCursorData resetCursorData) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId);
internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative
, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
, resetCursorData.isExcluded());
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@GET
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/position/{messagePosition}")
@ApiOperation(value = "Peek nth message on a topic subscription.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3468,4 +3468,54 @@ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
}).start();
countDownLatch3.await();
}

@Test(timeOut = 20000)
public void testResetPosition() throws Exception {
final String topicName = "persistent://my-property/my-ns/testResetPosition";
final String subName = "my-sub";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName).subscriptionName(subName).subscribe();
for (int i = 0; i < 50; i++) {
producer.send("msg" + i);
}
Message<String> lastMsg = null;
for (int i = 0; i < 10; i++) {
lastMsg = consumer.receive();
assertNotNull(lastMsg);
consumer.acknowledge(lastMsg);
}
MessageIdImpl lastMessageId = (MessageIdImpl)lastMsg.getMessageId();
consumer.close();
producer.close();

admin.topics().resetCursor(topicName, subName, lastMsg.getMessageId());
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
Message<String> message = consumer2.receive(1, TimeUnit.SECONDS);
assertEquals(message.getMessageId(), lastMsg.getMessageId());
consumer2.close();

admin.topics().resetCursor(topicName, subName, lastMsg.getMessageId(), true);
Consumer<String> consumer3 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
message = consumer3.receive(1, TimeUnit.SECONDS);
assertNotEquals(message.getMessageId(), lastMsg.getMessageId());
MessageIdImpl messageId = (MessageIdImpl)message.getMessageId();
assertEquals(messageId.getEntryId() - 1, lastMessageId.getEntryId());
consumer3.close();

admin.topics().resetCursorAsync(topicName, subName, lastMsg.getMessageId(), true).get(3, TimeUnit.SECONDS);
Consumer<String> consumer4 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
message = consumer4.receive(1, TimeUnit.SECONDS);
assertNotEquals(message.getMessageId(), lastMsg.getMessageId());
messageId = (MessageIdImpl)message.getMessageId();
assertEquals(messageId.getEntryId() - 1, lastMessageId.getEntryId());
consumer4.close();

admin.topics().resetCursorAsync(topicName, subName, lastMsg.getMessageId()).get(3, TimeUnit.SECONDS);
Consumer<String> consumer5 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
message = consumer5.receive(1, TimeUnit.SECONDS);
assertEquals(message.getMessageId(), lastMsg.getMessageId());
consumer5.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,18 @@ void createSubscription(String topic, String subscriptionName, MessageId message
*/
void resetCursor(String topic, String subName, long timestamp) throws PulsarAdminException;

/**
* Reset cursor position on a topic subscription.
* <p/>
* and start consume messages from the next position of the reset position.
* @param topic
* @param subName
* @param messageId
* @param isExcluded
* @throws PulsarAdminException
*/
void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException;

/**
* Reset cursor position on a topic subscription.
*
Expand All @@ -1314,6 +1326,18 @@ void createSubscription(String topic, String subscriptionName, MessageId message
*/
CompletableFuture<Void> resetCursorAsync(String topic, String subName, long timestamp);

/**
* Reset cursor position on a topic subscription.
* <p/>
* and start consume messages from the next position of the reset position.
* @param topic
* @param subName
* @param messageId
* @param isExcluded
* @return
*/
CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded);

/**
* Reset cursor position on a topic subscription.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
Expand Down Expand Up @@ -1132,22 +1133,50 @@ public CompletableFuture<Void> resetCursorAsync(String topic, String subName, lo
@Override
public void resetCursor(String topic, String subName, MessageId messageId) throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName, "resetcursor");
request(path).post(Entity.entity(messageId, MediaType.APPLICATION_JSON),
ErrorData.class);
resetCursorAsync(topic, subName, messageId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void resetCursor(String topic, String subName, MessageId messageId
, boolean isExcluded) throws PulsarAdminException {
try {
resetCursorAsync(topic, subName, messageId, isExcluded).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId) {
return resetCursorAsync(topic, subName, messageId, false);
}

@Override
public CompletableFuture<Void> resetCursorAsync(String topic, String subName
, MessageId messageId, boolean isExcluded) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
final WebTarget path = topicPath(tn, "subscription", encodedSubName, "resetcursor");
return asyncPostRequest(path, Entity.entity(messageId, MediaType.APPLICATION_JSON));
ResetCursorData resetCursorData = new ResetCursorData(messageId);
resetCursorData.setExcluded(isExcluded);
return asyncPostRequest(path, Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -792,6 +793,12 @@ public boolean matches(Long timestamp) {

cmdTopics.run(split("last-message-id persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getLastMessageId(eq("persistent://myprop/clust/ns1/ds1"));

//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopics(admin);
cmdTopics.run(split("reset-cursor persistent://myprop/clust/ns1/ds2 -s sub1 -m 1:1 -e"));
verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds2"), eq("sub1")
, eq(new MessageIdImpl(1, 1, -1)), eq(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,20 @@ private class ResetCursor extends CliCommand {
"-m" }, description = "messageId to reset back to (ledgerId:entryId)", required = false)
private String resetMessageIdStr;

@Parameter(names = { "-e", "--exclude-reset-position" },
description = "Exclude the reset position, start consume messages from the next position.", required = false)
private boolean excludeResetPosition = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
if (isNotBlank(resetMessageIdStr)) {
MessageId messageId = validateMessageIdString(resetMessageIdStr);
topics.resetCursor(persistentTopic, subName, messageId);
if (excludeResetPosition) {
topics.resetCursor(persistentTopic, subName, messageId, true);
} else {
topics.resetCursor(persistentTopic, subName, messageId);
}
} else if (isNotBlank(resetTimeStr)) {
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;


import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.MessageId;

@Data
@NoArgsConstructor
public class ResetCursorData {
protected long ledgerId;
protected long entryId;
protected int partitionIndex = -1;
protected boolean isExcluded = false;

public ResetCursorData(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
}

public ResetCursorData(long ledgerId, long entryId, boolean isExcluded) {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.isExcluded = isExcluded;
}

public ResetCursorData(MessageId messageId) {
if (messageId instanceof MessageIdImpl) {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
this.ledgerId = messageIdImpl.getLedgerId();
this.entryId = messageIdImpl.getEntryId();
} else if (messageId instanceof TopicMessageIdImpl) {
throw new IllegalArgumentException("Not supported operation on partitioned-topic");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.backwardscompatibility;


import org.apache.pulsar.tests.integration.topologies.ClientTestBase;
import org.testng.annotations.Test;

public class ClientTest2_2 extends PulsarStandaloneTestSuite2_2 {

private final ClientTestBase clientTestBase = new ClientTestBase();

@Test(dataProvider = "StandaloneServiceUrlAndHttpUrl")
public void testResetCursorCompatibility(String serviceUrl, String httpServiceUrl) throws Exception {
String topicName = generateTopicName("test-reset-cursor-compatibility", true);
clientTestBase.resetCursorCompatibility(serviceUrl, httpServiceUrl, topicName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.backwardscompatibility;


import org.apache.pulsar.tests.integration.topologies.ClientTestBase;
import org.testng.annotations.Test;

public class ClientTest2_3 extends PulsarStandaloneTestSuite2_3 {

private final ClientTestBase clientTestBase = new ClientTestBase();

@Test(dataProvider = "StandaloneServiceUrlAndHttpUrl")
public void testResetCursorCompatibility(String serviceUrl, String httpServiceUrl) throws Exception {
String topicName = generateTopicName("test-reset-cursor-compatibility", true);
clientTestBase.resetCursorCompatibility(serviceUrl, httpServiceUrl, topicName);
}

}
Loading

0 comments on commit 656bcb2

Please sign in to comment.