Skip to content

Commit

Permalink
Fix flaky MessageIdTest and introduce some testing improvements (apac…
Browse files Browse the repository at this point in the history
…he#9286)

* Refactor PulsarClient initialization and lifecycle management in tests

* Add getter and setter to access remoteEndpointProtocolVersion field

- it makes it easier to override for tests

* Add hooks for overriding the producer implementation in PulsarClientImpl

- useful for tests. Instead of relying on Mockito, there's a pure Java
  way to inject behavior to producer implementations for testing purposes

* Introduce PulsarTestClient that contains ways to prevent race conditions and test flakiness

- provides features for simulating failure conditions, for example
  the case of the broker connection disconnecting

* Add solution for using Enums classes as source for TestNG DataProvider

* Fix flaky MessageIdTest and move checksum related tests to new class

* Fix NPE in PartitionedProducerImplTest
  • Loading branch information
lhotari authored Jan 26, 2021
1 parent 9335c49 commit 19e6546
Show file tree
Hide file tree
Showing 28 changed files with 845 additions and 539 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.testng.annotations.DataProvider;

/**
* TestNG DataProvider for passing all Enum values as parameters to a test method.
*
* Supports currently a single Enum parameter for a test method.
*/
public abstract class EnumValuesDataProvider {
@DataProvider
public static final Object[][] values(Method testMethod) {
Class<?> enumClass = Arrays.stream(testMethod.getParameterTypes())
.findFirst()
.filter(Class::isEnum)
.orElseThrow(() -> new IllegalArgumentException("The test method should have an enum parameter."));
return toDataProviderArray((Class<? extends Enum<?>>) enumClass);
}

/*
* Converts all values of an Enum class to a TestNG DataProvider object array
*/
public static Object[][] toDataProviderArray(Class<? extends Enum<?>> enumClass) {
Enum<?>[] enumValues = enumClass.getEnumConstants();
return Stream.of(enumValues)
.map(enumValue -> new Object[]{enumValue})
.collect(Collectors.toList())
.toArray(new Object[0][]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests;

import static org.testng.Assert.assertEquals;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.testng.annotations.Test;

public class EnumValuesDataProviderTest {
enum Sample {
A, B, C
}

@Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
void testEnumValuesProvider(Sample sample) {
System.out.println(sample);
}

@Test
void shouldContainAllEnumValues() {
verifyTestParameters(EnumValuesDataProvider.toDataProviderArray(Sample.class));
}

@Test
void shouldDetermineEnumValuesFromMethod() {
Method testMethod = Arrays.stream(getClass().getDeclaredMethods())
.filter(method -> method.getName().equals("testEnumValuesProvider"))
.findFirst()
.get();
verifyTestParameters(EnumValuesDataProvider.values(testMethod));
}

private void verifyTestParameters(Object[][] testParameters) {
Set<Sample> enumValuesFromDataProvider = Arrays.stream(testParameters)
.map(element -> element[0])
.map(Sample.class::cast)
.collect(Collectors.toSet());
assertEquals(enumValuesFromDataProvider, new HashSet<>(Arrays.asList(Sample.values())));
}

@Test(expectedExceptions = IllegalArgumentException.class)
void shouldFailIfEnumParameterIsMissing() {
Method testMethod = Arrays.stream(getClass().getDeclaredMethods())
.filter(method -> method.getName().equals("shouldFailIfEnumParameterIsMissing"))
.findFirst()
.get();
EnumValuesDataProvider.values(testMethod);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ protected void setup() throws Exception {

lookupUrl = new URI(pulsar.getWebServiceAddress());

pulsarClient = PulsarClient.builder()
replacePulsarClient(PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.statsInterval(0, TimeUnit.SECONDS)
.authentication(authSasl).build();
.authentication(authSasl));

// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
private void completeConnect(int clientProtoVersion, String clientVersion) {
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize));
state = State.Connected;
remoteEndpointProtocolVersion = clientProtoVersion;
setRemoteEndpointProtocolVersion(clientProtoVersion);
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
Expand Down Expand Up @@ -662,7 +662,8 @@ public void refreshAuthenticationCredentials() {
try {
AuthData brokerData = authState.refreshAuthentication();

ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, remoteEndpointProtocolVersion));
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
getRemoteEndpointProtocolVersion()));
if (log.isDebugEnabled()) {
log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.",
remoteAddress, authMethod);
Expand Down Expand Up @@ -1958,7 +1959,7 @@ protected void interceptCommand(BaseCommand command) throws InterceptException {
public void closeProducer(Producer producer) {
// removes producer-connection from map and send close command to producer
safelyRemoveProducer(producer);
if (remoteEndpointProtocolVersion >= v5.getValue()) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
} else {
close();
Expand All @@ -1970,7 +1971,7 @@ public void closeProducer(Producer producer) {
public void closeConsumer(Consumer consumer) {
// removes consumer-connection from map and send close command to consumer
safelyRemoveConsumer(consumer);
if (remoteEndpointProtocolVersion >= v5.getValue()) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
} else {
close();
Expand Down Expand Up @@ -2224,7 +2225,7 @@ boolean hasConsumer(long consumerId) {

@Override
public boolean isBatchMessageCompatibleVersion() {
return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getValue();
return getRemoteEndpointProtocolVersion() >= ProtocolVersion.v4.getValue();
}

boolean supportsAuthenticationRefresh() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
Expand All @@ -40,7 +38,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
Expand All @@ -53,6 +50,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -69,7 +67,7 @@
import org.slf4j.LoggerFactory;

/**
* Base class for all tests that need a Pulsar instance without a ZK and BK cluster
* Base class for all tests that need a Pulsar instance without a ZK and BK cluster.
*/
@PowerMockIgnore(value = {"org.slf4j.*", "com.sun.org.apache.xerces.*" })
public abstract class MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -117,17 +115,29 @@ protected final void internalSetup(ServiceConfiguration serviceConfiguration) th
internalSetup();
}

protected final void internalSetup(boolean isPreciseDispatcherFlowControl) throws Exception {
init(isPreciseDispatcherFlowControl);
lookupUrl = new URI(brokerUrl.toString());
if (isTcpLookup) {
lookupUrl = new URI(pulsar.getBrokerServiceUrl());
}
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
ClientBuilder clientBuilder =
PulsarClient.builder()
.serviceUrl(url)
.statsInterval(intervalInSecs, TimeUnit.SECONDS);
customizeNewPulsarClientBuilder(clientBuilder);
return createNewPulsarClient(clientBuilder);
}

protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {

}

protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
return clientBuilder.build();
}

protected PulsarClient replacePulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
if (pulsarClient != null) {
pulsarClient.shutdown();
}
pulsarClient = createNewPulsarClient(clientBuilder);
return pulsarClient;
}

protected final void internalSetupForStatsTest() throws Exception {
Expand Down Expand Up @@ -163,27 +173,6 @@ protected final void init() throws Exception {
startBroker();
}

protected final void init(boolean isPreciseDispatcherFlowControl) throws Exception {
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setPreciseDispatcherFlowControl(isPreciseDispatcherFlowControl);
this.conf.setNumExecutorThreadPoolSize(5);

sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
bkExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
.setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
.build());

mockZooKeeper = createMockZooKeeper();
mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);

startBroker();
}

protected final void internalCleanup() throws Exception {
// if init fails, some of these could be null, and if so would throw
// an NPE in shutdown, obscuring the real error
Expand All @@ -196,7 +185,7 @@ protected final void internalCleanup() throws Exception {
pulsarClient = null;
}
if (pulsar != null) {
pulsar.close();
stopBroker();
pulsar = null;
}
if (mockBookKeeper != null) {
Expand Down Expand Up @@ -244,6 +233,8 @@ protected void restartBroker() throws Exception {
}

protected void stopBroker() throws Exception {
log.info("Stopping Pulsar broker. brokerServiceUrl: {} webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
pulsar.getWebServiceAddress());
pulsar.close();
pulsar = null;
// Simulate cleanup of ephemeral nodes
Expand Down Expand Up @@ -274,6 +265,8 @@ protected PulsarService startBroker(ServiceConfiguration conf) throws Exception
conf.setAuthorizationEnabled(true);
pulsar.start();
conf.setAuthorizationEnabled(isAuthorizationEnabled);
log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
pulsar.getWebServiceAddress());

return pulsar;
}
Expand Down Expand Up @@ -363,9 +356,9 @@ public void reallyShutdown() {
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {

if (serverList != null &&
(serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
|| serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
if (serverList != null
&& (serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
|| serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
return CompletableFuture.completedFuture(mockZooKeeperGlobal);
}

Expand Down Expand Up @@ -408,7 +401,8 @@ public static boolean retryStrategically(Predicate<Void> predicate, int retryCou
return false;
}

public static void setFieldValue(Class<?> clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
public static void setFieldValue(Class<?> clazz, Object classObj, String fieldName,
Object fieldValue) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(classObj, fieldValue);
Expand All @@ -418,7 +412,8 @@ protected static ServiceConfiguration getDefaultConf() {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setAdvertisedAddress("localhost");
configuration.setClusterName(configClusterName);
configuration.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
// there are TLS tests in here, they need to use localhost because of the certificate
configuration.setAdvertisedAddress("localhost");
configuration.setManagedLedgerCacheSizeMB(8);
configuration.setActiveConsumerFailoverDelayTimeMillis(0);
configuration.setDefaultNumberOfNamespaceBundles(1);
Expand All @@ -435,4 +430,4 @@ protected static ServiceConfiguration getDefaultConf() {
}

private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ protected final void internalSetup(Authentication auth) throws Exception {
} else {
lookupUrl = pulsar.getBrokerServiceUrlTls();
}
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true).authentication(auth)
.enableTls(true).build();
.enableTls(true));
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -241,9 +241,8 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws
EnumSet.allOf(AuthAction.class));

// setup the client
pulsarClient.close();
pulsarClient = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
.operationTimeout(1, TimeUnit.SECONDS).build();
replacePulsarClient(PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
.operationTimeout(1, TimeUnit.SECONDS));

// unauthorized topic test
Exception pulsarClientException = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,11 @@ protected void setupClient() throws Exception {
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
.tlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
.authentication(authTls).build());
pulsarClient = PulsarClient.builder()
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrlTls())
.statsInterval(0, TimeUnit.SECONDS)
.tlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled)
.build();
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled));

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

Expand Down
Loading

0 comments on commit 19e6546

Please sign in to comment.