Skip to content

Commit

Permalink
Ack response implement (apache#8161)
Browse files Browse the repository at this point in the history
## Motivation
Now acknowledge messages will not get response, but acknowledge messages with transaction will check the acknowledge whether conflict with other transaction in pendingack. We can commit or abort the transaction after we get the response.

## Modification
- We add requestId field in ack protocol and it type is optional.
- Server handle the ack command, if the command carry the requestId, server will return the response.
- In normal, we don't need to get response. Unless we ack with transaction.

## Compatibility
This PR is for transaction, and don't change any client api, so we don't need to think about the compatibility question.
  • Loading branch information
congbobo184 authored Oct 14, 2020
1 parent 757cd5a commit 59e0cfb
Show file tree
Hide file tree
Showing 21 changed files with 518 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;

/**
Expand Down Expand Up @@ -214,6 +215,8 @@ private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean che
return ServerError.InvalidTxnStatus;
} else if (t instanceof NotAllowedException) {
return ServerError.NotAllowedError;
} else if (t instanceof TransactionConflictException) {
return ServerError.TransactionConflict;
} else {
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
Expand All @@ -60,6 +61,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -388,24 +390,22 @@ void doUnsubscribe(final long requestId) {
});
}

void messageAcked(CommandAck ack) {
CompletableFuture<Void> messageAcked(CommandAck ack) {
this.lastAckedTimestamp = System.currentTimeMillis();
Map<String,Long> properties = Collections.emptyMap();
if (ack.getPropertiesCount() > 0) {
properties = ack.getPropertiesList().stream()
.collect(Collectors.toMap((e) -> e.getKey(),
(e) -> e.getValue()));
.collect(Collectors.toMap(PulsarApi.KeyLongValue::getKey,
PulsarApi.KeyLongValue::getValue));
}

if (ack.getAckType() == AckType.Cumulative) {
if (ack.getMessageIdCount() != 1) {
log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId);
return;
}

if (Subscription.isIndividualAckMode(subType)) {
log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", subscription, consumerId);
return;
}
PositionImpl position = PositionImpl.earliest;
if (ack.getMessageIdCount() == 1) {
Expand All @@ -418,7 +418,7 @@ void messageAcked(CommandAck ack) {
}
List<Position> positionsAcked = Collections.singletonList(position);
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
return transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
} else {
subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
}
Expand All @@ -445,24 +445,24 @@ void messageAcked(CommandAck ack) {
}
}
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
return transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
} else {
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
}
}

return CompletableFuture.completedFuture(null);
}

private void transactionAcknowledge(long txnidMostBits, long txnidLeastBits,
private CompletableFuture<Void> transactionAcknowledge(long txnidMostBits, long txnidLeastBits,
List<Position> positionList, AckType ackType) {
if (subscription instanceof PersistentSubscription) {
TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
try {
((PersistentSubscription) subscription).acknowledgeMessage(txnID, positionList, ackType);
} catch (TransactionConflictException e) {
log.error("Transaction acknowledge failed for txn " + txnID, e);
}
return ((PersistentSubscription) subscription).acknowledgeMessage(txnID, positionList, ackType);
} else {
log.error("Transaction acknowledge only support the `PersistentSubscription`.");
String error = "Transaction acknowledge only support the `PersistentSubscription`.";
log.error(error);
return FutureUtil.failedFuture(new TransactionConflictException(error));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,19 @@ protected void handleAck(CommandAck ack) {
CompletableFuture<Consumer> consumerFuture = consumers.get(ack.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).messageAcked(ack);
consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> {
if (ack.hasRequestId()) {
ctx.writeAndFlush(Commands.newAckResponse(
ack.getRequestId(), null, null, ack.getConsumerId()));
}
}).exceptionally(e -> {
if (ack.hasRequestId()) {
ctx.writeAndFlush(Commands.newAckResponse(ack.getRequestId(),
BrokerServiceException.getClientErrorCode(e),
e.getMessage(), ack.getConsumerId()));
}
return null;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
Expand Down Expand Up @@ -416,7 +415,7 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
* cumulative ack or try to single ack message already acked by any ongoing transaction.
* @throws IllegalArgumentException if try to cumulative ack but passed in multiple positions.
*/
public synchronized void acknowledgeMessage(TxnID txnId, List<Position> positions, AckType ackType) throws TransactionConflictException {
public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, List<Position> positions, AckType ackType) {
checkArgument(txnId != null, "TransactionID can not be null.");
if (AckType.Cumulative == ackType) {
// Check if another transaction is already using cumulative ack on this subscription.
Expand All @@ -425,14 +424,14 @@ public synchronized void acknowledgeMessage(TxnID txnId, List<Position> position
" try to cumulative ack message while transaction:" + this.pendingCumulativeAckTxnId +
" already cumulative acked messages.";
log.error(errorMsg);
throw new TransactionConflictException(errorMsg);
return FutureUtil.failedFuture(new TransactionConflictException(errorMsg));
}

if (positions.size() != 1) {
String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
" invalid cumulative ack received with multiple message ids.";
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
return FutureUtil.failedFuture(new TransactionConflictException(errorMsg));
}

Position position = positions.get(0);
Expand All @@ -443,7 +442,7 @@ public synchronized void acknowledgeMessage(TxnID txnId, List<Position> position
" try to cumulative ack position: " + position + " within range of cursor's " +
"markDeletePosition: " + cursor.getMarkDeletedPosition();
log.error(errorMsg);
throw new TransactionConflictException(errorMsg);
return FutureUtil.failedFuture(new TransactionConflictException(errorMsg));
}

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -481,21 +480,22 @@ public synchronized void acknowledgeMessage(TxnID txnId, List<Position> position
String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
" try to ack message:" + position + " in pending ack status.";
log.error(errorMsg);
throw new TransactionConflictException(errorMsg);
return FutureUtil.failedFuture(new TransactionConflictException(errorMsg));
}

// If try to ack message already acked by committed transaction or normal acknowledge, throw exception.
if (((ManagedCursorImpl) cursor).isMessageDeleted(position)) {
String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
" try to ack message:" + position + " already acked before.";
log.error(errorMsg);
throw new TransactionConflictException(errorMsg);
return FutureUtil.failedFuture(new TransactionConflictException(errorMsg));
}

pendingAckMessageForCurrentTxn.add(position);
this.pendingAckMessages.add(position);
}
}
return CompletableFuture.completedFuture(null);
}

private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -62,7 +63,6 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -165,7 +165,7 @@ public void teardown() throws Exception {
}

@Test
public void testCanAcknowledgeAndCommitForTransaction() throws TransactionConflictException {
public void testCanAcknowledgeAndCommitForTransaction() {
List<Position> expectedSinglePositions = new ArrayList<>();
expectedSinglePositions.add(new PositionImpl(1, 1));
expectedSinglePositions.add(new PositionImpl(1, 3));
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testCanAcknowledgeAndCommitForTransaction() throws TransactionConfli
}

@Test
public void testCanAcknowledgeAndAbortForTransaction() throws TransactionConflictException, BrokerServiceException {
public void testCanAcknowledgeAndAbortForTransaction() throws BrokerServiceException, InterruptedException {
List<Position> positions = new ArrayList<>();
positions.add(new PositionImpl(2, 1));
positions.add(new PositionImpl(2, 3));
Expand Down Expand Up @@ -242,10 +242,10 @@ public void testCanAcknowledgeAndAbortForTransaction() throws TransactionConflic

// Can not single ack message already acked.
try {
persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Individual);
persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Individual).get();
fail("Single acknowledge for transaction2 should fail. ");
} catch (TransactionConflictException e) {
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName] " +
} catch (ExecutionException e) {
assertEquals(e.getCause().getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName] " +
"Transaction:(1,2) try to ack message:2:1 in pending ack status.");
}

Expand All @@ -254,11 +254,10 @@ public void testCanAcknowledgeAndAbortForTransaction() throws TransactionConflic

// Can not cumulative ack message for another txn.
try {
persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Cumulative);
persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Cumulative).get();
fail("Cumulative acknowledge for transaction2 should fail. ");
} catch (TransactionConflictException e) {
System.out.println(e.getMessage());
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName] " +
} catch (ExecutionException e) {
assertEquals(e.getCause().getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName] " +
"Transaction:(1,2) try to cumulative ack message while transaction:(1,1) already cumulative acked messages.");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import lombok.Cleanup;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;

import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;


public class ConsumerAckResponseTest extends ProducerConsumerBase {

private final static TransactionImpl transaction = mock(TransactionImpl.class);

@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
doReturn(1L).when(transaction).getTxnIdLeastBits();
doReturn(1L).when(transaction).getTxnIdMostBits();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
doReturn(completableFuture).when(transaction).registerAckOp(any());
doNothing().when(transaction).registerAckedTopic(any(), any());

Thread.sleep(1000 * 3);
}

@AfterClass
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testAckResponse() throws PulsarClientException, InterruptedException, ExecutionException {
String topic = "testAckResponse";
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
@Cleanup
ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();
producer.send(1);
producer.send(2);
try {
consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get();
fail();
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
Message<Integer> message = consumer.receive();
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.transaction;
package org.apache.pulsar.client.impl;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -57,7 +57,7 @@
* End to end transaction test.
*/
@Slf4j
public class EndToEndTest extends TransactionTestBase {
public class TransactionEndToEndTest extends TransactionTestBase {

private final static int TOPIC_PARTITION = 3;

Expand Down Expand Up @@ -248,9 +248,9 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
Message<byte[]> message = consumer.receive();
Assert.assertNotNull(message);
log.info("receive msgId: {}", message.getMessageId());
consumer.acknowledgeAsync(message.getMessageId(), txn);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}
Thread.sleep(2000);
Thread.sleep(1000);

consumer.redeliverUnacknowledgedMessages();

Expand All @@ -266,7 +266,7 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
for (int i = 0; i < messageCnt; i++) {
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
consumer.acknowledgeAsync(message.getMessageId(), commitTxn);
consumer.acknowledgeAsync(message.getMessageId(), commitTxn).get();
log.info("receive msgId: {}", message.getMessageId());
}

Expand Down
Loading

0 comments on commit 59e0cfb

Please sign in to comment.