Skip to content

Commit

Permalink
[Issue 5585][pulsar-client] Fix producer Semaphore release error (apa…
Browse files Browse the repository at this point in the history
…che#5587)

### Motivation

Fixes apache#5585 

This PR is try to fix the dead lock issue which mentioned in apache#5585. The core problem is the semaphore release count calculation in replicate scenario.

Replicator send a batch message(from the original producer) without batcher, so that the batch message acquire 1 each batch message but release it by number of messages in the batch. This PR will handle the semaphore release in replicate scenario.

### Modifications

Release only one semaphore in replicate scenario.
  • Loading branch information
weishuisheng authored and sijie committed Nov 11, 2019
1 parent 006f291 commit f2f801a
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import lombok.Cleanup;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ProducerSemaphoreTest extends ProducerConsumerBase {

@Override
@BeforeMethod
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@Override
@AfterMethod
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientException, ExecutionException, InterruptedException {

final int pendingQueueSize = 100;

@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerSemaphoreAcquire")
.maxPendingMessages(pendingQueueSize)
.enableBatching(false)
.create();

final int messages = 10;
final List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
}

FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
futures.clear();

// Simulate replicator, non batching message but `numMessagesInBatch` of message metadata > 1
for (int i = 0; i < messages / 2; i++) {
PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();
builder.setNumMessagesInBatch(10);
MessageImpl<byte[]> msg = MessageImpl.create(builder, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
futures.add(producer.sendAsync(msg));
}
FutureUtil.waitForAll(futures).get();

// Here must ensure that the semaphore available permits is 0
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);

// Acquire 5 and not wait the send ack call back
for (int i = 0; i < messages / 2; i++) {
producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync();
}

// Here must ensure that the Semaphore a acquired 5
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize - messages / 2);

}

/**
* We use semaphore to limit the pending send, so we must ensure that the thread of sending message never block
* at the pending message queue. If not, the dead lock might occur. Here is the related issue to describe the
* dead lock happens {https://github.com/apache/pulsar/issues/5585}
*/
@Test(timeOut = 30000)
public void testEnsureNotBlockOnThePendingQueue() throws PulsarClientException {

final int pendingQueueSize = 10;

@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerSemaphoreAcquire")
.maxPendingMessages(pendingQueueSize)
.enableBatching(false)
.blockIfQueueFull(true)
.create();

final int messages = 20;
final List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);

// Simulate replicator, non batching message but `numMessagesInBatch` of message metadata > 1
for (int i = 0; i < messages; i++) {
PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();
builder.setNumMessagesInBatch(10);
MessageImpl<byte[]> msg = MessageImpl.create(builder, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
futures.add(producer.sendAsync(msg));
}

FutureUtil.waitForAll(futures);
futures.clear();

for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
}
Assert.assertEquals(producer.getSemaphore().availablePermits(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -795,7 +796,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) {
log.debug("[{}] [{}] Received ack for msg {} ", topic, producerName, sequenceId);
}
pendingMessages.remove();
semaphore.release(op.numMessagesInBatch);
releaseSemaphoreForSendOp(op);
callback = true;
pendingCallbacks.add(op);
}
Expand All @@ -819,6 +820,10 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) {
}
}

private void releaseSemaphoreForSendOp(OpSendMsg op) {
semaphore.release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
}

/**
* Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the
* message header-payload again.
Expand All @@ -845,7 +850,7 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId)
if (corrupted) {
// remove message from pendingMessages queue and fail callback
pendingMessages.remove();
semaphore.release(op.numMessagesInBatch);
releaseSemaphoreForSendOp(op);
try {
op.callback.sendComplete(
new PulsarClientException.ChecksumException("Checksum failed on corrupt message"));
Expand Down Expand Up @@ -1267,8 +1272,9 @@ public void run(Timeout timeout) throws Exception {
private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
if (cnx == null) {
final AtomicInteger releaseCount = new AtomicInteger();
final boolean batchMessagingEnabled = isBatchMessagingEnabled();
pendingMessages.forEach(op -> {
releaseCount.addAndGet(op.numMessagesInBatch);
releaseCount.addAndGet(batchMessagingEnabled ? op.numMessagesInBatch: 1);
try {
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
Expand All @@ -1280,12 +1286,14 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
});
semaphore.release(releaseCount.get());

pendingMessages.clear();
pendingCallbacks.clear();
if (isBatchMessagingEnabled()) {
semaphore.release(releaseCount.get());
if (batchMessagingEnabled) {
failPendingBatchMessages(ex);
}

} else {
// If we have a connection, we schedule the callback and recycle on the event loop thread to avoid any
// race condition since we also write the message on the socket from this thread
Expand All @@ -1305,9 +1313,9 @@ private void failPendingBatchMessages(PulsarClientException ex) {
if (batchMessageContainer.isEmpty()) {
return;
}
int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
semaphore.release(numMessagesInBatch);
final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
batchMessageContainer.discard(ex);
semaphore.release(numMessagesInBatch);
}

TimerTask batchMessageAndSendTask = new TimerTask() {
Expand Down Expand Up @@ -1410,12 +1418,12 @@ private void processOpSendMsg(OpSendMsg op) {
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
semaphore.release(op.numMessagesInBatch);
releaseSemaphoreForSendOp(op);
if (op != null) {
op.callback.sendComplete(new PulsarClientException(ie));
}
} catch (Throwable t) {
semaphore.release(op.numMessagesInBatch);
releaseSemaphoreForSendOp(op);
log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
if (op != null) {
op.callback.sendComplete(new PulsarClientException(t));
Expand Down Expand Up @@ -1533,5 +1541,10 @@ void grabCnx() {
this.connectionHandler.grabCnx();
}

@VisibleForTesting
Semaphore getSemaphore() {
return semaphore;
}

private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
}

0 comments on commit f2f801a

Please sign in to comment.