Skip to content

Commit

Permalink
Fix flakiness in ProducerSemaphoreTest (apache#6285)
Browse files Browse the repository at this point in the history
The test was asserting on the value of the semaphore, when this value
depended on how quickly the broker responds.

I fixed this by making the producers channel unreadable before sending
the messages. Even if the broker responds, the client won't see the
response until the channel is made readable again, so we can reliably
check the value of the semaphore.

For testEnsureNotBlockOnPendingQueue, I switched it to throw on full
rather than block on full because whether something is blocking is
hard to assert for in a test.

Finally, there was a bug in testEnsureNotBlockOnPendingQueue where
waitForAll() was called but nothing was done with the returned
future. This is a really badly named utility, as the name implies that
the method itself does the waiting. It should be renamed.
  • Loading branch information
ivankelly authored Feb 11, 2020
1 parent e1f3409 commit 485f2d2
Showing 1 changed file with 88 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerSemaphoreTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProducerSemaphoreTest.class);

@Override
@BeforeMethod
Expand Down Expand Up @@ -65,34 +69,54 @@ public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientExceptio

final int messages = 10;
final List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
producer.getClientCnx().channel().config().setAutoRead(false);
try {
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
}
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize - messages);
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
}

FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
futures.clear();

// Simulate replicator, non batching message but `numMessagesInBatch` of message metadata > 1
for (int i = 0; i < messages / 2; i++) {
PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();
builder.setNumMessagesInBatch(10);
MessageImpl<byte[]> msg = MessageImpl.create(builder, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
futures.add(producer.sendAsync(msg));
producer.getClientCnx().channel().config().setAutoRead(false);
try {
for (int i = 0; i < messages / 2; i++) {
PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();
builder.setNumMessagesInBatch(10);
MessageImpl<byte[]> msg = MessageImpl.create(builder, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
futures.add(producer.sendAsync(msg));
}
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize - messages/2);
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
}
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
futures.clear();

// Here must ensure that the semaphore available permits is 0
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);

// Acquire 5 and not wait the send ack call back
for (int i = 0; i < messages / 2; i++) {
producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync();
}
producer.getClientCnx().channel().config().setAutoRead(false);
try {
for (int i = 0; i < messages / 2; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
}

// Here must ensure that the Semaphore a acquired 5
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize - messages / 2);
// Here must ensure that the Semaphore a acquired 5
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize - messages / 2);
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);

}
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
}

/**
Expand All @@ -101,35 +125,71 @@ public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientExceptio
* dead lock happens {https://github.com/apache/pulsar/issues/5585}
*/
@Test(timeOut = 30000)
public void testEnsureNotBlockOnThePendingQueue() throws PulsarClientException {

public void testEnsureNotBlockOnThePendingQueue() throws Exception {
final int pendingQueueSize = 10;

@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerSemaphoreAcquire")
.maxPendingMessages(pendingQueueSize)
.enableBatching(false)
.blockIfQueueFull(true)
.create();

final int messages = 20;
final List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
final List<CompletableFuture<MessageId>> futures = new ArrayList<>();

// Simulate replicator, non batching message but `numMessagesInBatch` of message metadata > 1
for (int i = 0; i < messages; i++) {
PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();
builder.setNumMessagesInBatch(10);
MessageImpl<byte[]> msg = MessageImpl.create(builder, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
futures.add(producer.sendAsync(msg));
// Test that when we fill the queue with "replicator" messages, we are notified
// (replicator itself would block)
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
producer.getClientCnx().channel().config().setAutoRead(false);
try {
for (int i = 0; i < pendingQueueSize; i++) {
PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();
builder.setNumMessagesInBatch(10);

MessageImpl<byte[]> msg = MessageImpl.create(builder, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
futures.add(producer.sendAsync(msg));
}
Assert.assertEquals(producer.getSemaphore().availablePermits(), 0);
try {
PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder();
builder.setNumMessagesInBatch(10);

MessageImpl<byte[]> msg = MessageImpl.create(builder, ByteBuffer.wrap(new byte[0]), Schema.BYTES);
producer.sendAsync(msg).get();
Assert.fail("Shouldn't be able to send message");
} catch (ExecutionException ee) {
Assert.assertEquals(ee.getCause().getClass(),
PulsarClientException.ProducerQueueIsFullError.class);
Assert.assertEquals(producer.getSemaphore().availablePermits(), 0);
}
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
}

FutureUtil.waitForAll(futures);
FutureUtil.waitForAll(futures).get();
futures.clear();

for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
// Test that when we fill the queue with normal messages, we get an error
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
producer.getClientCnx().channel().config().setAutoRead(false);
try {
for (int i = 0; i < pendingQueueSize; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
}
Assert.assertEquals(producer.getSemaphore().availablePermits(), 0);

try {
producer.newMessage().value(("Semaphore-test-Q-full").getBytes()).sendAsync().get();
} catch (ExecutionException ee) {
Assert.assertEquals(ee.getCause().getClass(),
PulsarClientException.ProducerQueueIsFullError.class);
Assert.assertEquals(producer.getSemaphore().availablePermits(), 0);

}
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
}
Assert.assertEquals(producer.getSemaphore().availablePermits(), 0);
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
}
}

0 comments on commit 485f2d2

Please sign in to comment.