Skip to content

Commit

Permalink
Bug fixes/Improvement for notify pending receive method (apache#3337)
Browse files Browse the repository at this point in the history
### Motivation

Prevent 2 bugs and refactoring for method [notifyPendingReceivedCallback()](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L912) 

### Modifications

  - Bugfix interceptor missed event when prefetch messages is disabled by
    consumer `ReceiverQueueSize == 0`.
  - Bugfix when message is null and no exception is present. Previously to this
    commit testing if message was null was made by `checkNotNull()` method leaving
    consumed future without completion resulting in a hanged future.
  - Refactor to favour simplicity and readability for control flow.
  - Add unit tests exploiting bug fixes.

### Result

Bug fixes and more maintainable code.
  • Loading branch information
lovelle authored and sijie committed Jan 15, 2019
1 parent 49a271f commit 4738991
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,27 +912,44 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
* @param message
*/
void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
if (!pendingReceives.isEmpty()) {
// fetch receivedCallback from queue
CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
if (exception == null) {
checkNotNull(message, "received message can't be null");
if (receivedFuture != null) {
if (conf.getReceiverQueueSize() == 0) {
// return message to receivedCallback
receivedFuture.complete(message);
} else {
// increase permits for available message-queue
Message<T> interceptMsg = beforeConsume(message);
messageProcessed(interceptMsg);
// return message to receivedCallback
listenerExecutor.execute(() -> receivedFuture.complete(interceptMsg));
}
}
} else {
listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
}
if (pendingReceives.isEmpty()) {
return;
}

// fetch receivedCallback from queue
final CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
if (receivedFuture == null) {
return;
}

if (exception != null) {
listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
return;
}

if (message == null) {
IllegalStateException e = new IllegalStateException("received message can't be null");
listenerExecutor.execute(() -> receivedFuture.completeExceptionally(e));
return;
}

if (conf.getReceiverQueueSize() == 0) {
// call interceptor and complete received callback
interceptAndComplete(message, receivedFuture);
return;
}

// increase permits for available message-queue
messageProcessed(message);
// call interceptor and complete received callback
interceptAndComplete(message, receivedFuture);
}

private void interceptAndComplete(final Message<T> message, final CompletableFuture<Message<T>> receivedFuture) {
// call proper interceptor
final Message<T> interceptMessage = beforeConsume(message);
// return message to receivedCallback
listenerExecutor.execute(() -> receivedFuture.complete(interceptMessage));
}

private void triggerZeroQueueSizeListener(final Message<T> message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.mockito.Mockito.*;

public class ConsumerImplTest {

private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private ConsumerImpl<ConsumerImpl> consumer;
private ConsumerConfigurationData consumerConf;

@BeforeMethod
public void setUp() {
consumerConf = new ConsumerConfigurationData<>();
ClientConfigurationData clientConf = new ClientConfigurationData();
PulsarClientImpl client = mock(PulsarClientImpl.class);
CompletableFuture<ClientCnx> clientCnxFuture = new CompletableFuture<>();
CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new CompletableFuture<>();
String topic = "non-persistent://tenant/ns1/my-topic";

// Mock connection for grabCnx()
when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
when(client.getConfiguration()).thenReturn(clientConf);

consumerConf.setSubscriptionName("test-sub");
consumer = new ConsumerImpl<ConsumerImpl>(client, topic, consumerConf,
executorService, -1, subscribeFuture, null, null);
}

@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_EmptyQueueNotThrowsException() {
consumer.notifyPendingReceivedCallback(null, null);
}

@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_CompleteWithException() {
CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
consumer.pendingReceives.add(receiveFuture);
Exception exception = new PulsarClientException.InvalidMessageException("some random exception");
consumer.notifyPendingReceivedCallback(null, exception);

try {
receiveFuture.join();
} catch (CompletionException e) {
// Completion exception must be the same we provided at calling time
Assert.assertEquals(e.getCause(), exception);
}

Assert.assertTrue(receiveFuture.isCompletedExceptionally());
}

@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_CompleteWithExceptionWhenMessageIsNull() {
CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
consumer.pendingReceives.add(receiveFuture);
consumer.notifyPendingReceivedCallback(null, null);

try {
receiveFuture.join();
} catch (CompletionException e) {
Assert.assertEquals("received message can't be null", e.getCause().getMessage());
}

Assert.assertTrue(receiveFuture.isCompletedExceptionally());
}

@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_InterceptorsWorksWithPrefetchDisabled() {
CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
MessageImpl message = mock(MessageImpl.class);
ConsumerImpl<ConsumerImpl> spy = spy(consumer);

consumer.pendingReceives.add(receiveFuture);
consumerConf.setReceiverQueueSize(0);
doReturn(message).when(spy).beforeConsume(any());
spy.notifyPendingReceivedCallback(message, null);
Message<ConsumerImpl> receivedMessage = receiveFuture.join();

verify(spy, times(1)).beforeConsume(message);
Assert.assertTrue(receiveFuture.isDone());
Assert.assertFalse(receiveFuture.isCompletedExceptionally());
Assert.assertEquals(receivedMessage, message);
}

@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_WorkNormally() {
CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
MessageImpl message = mock(MessageImpl.class);
ConsumerImpl<ConsumerImpl> spy = spy(consumer);

consumer.pendingReceives.add(receiveFuture);
doReturn(message).when(spy).beforeConsume(any());
doNothing().when(spy).messageProcessed(message);
spy.notifyPendingReceivedCallback(message, null);
Message<ConsumerImpl> receivedMessage = receiveFuture.join();

verify(spy, times(1)).beforeConsume(message);
verify(spy, times(1)).messageProcessed(message);
Assert.assertTrue(receiveFuture.isDone());
Assert.assertFalse(receiveFuture.isCompletedExceptionally());
Assert.assertEquals(receivedMessage, message);
}
}

0 comments on commit 4738991

Please sign in to comment.