Skip to content

Commit

Permalink
[FLINK-26177][Connector/pulsar] Use mocked pulsar runtime instead of …
Browse files Browse the repository at this point in the history
…embedded runtime and enable tests.
  • Loading branch information
syhily authored and fapaul committed Mar 15, 2022
1 parent 3bb6fa3 commit 1572908
Show file tree
Hide file tree
Showing 22 changed files with 745 additions and 173 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#
#Thu Mar 03 12:42:13 CST 2022
Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=3ac3a1dc-681f-4213-9990-b7b3298a20bc
ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=f4d91193-72ba-4ce4-ad83-98f780dce581
17 changes: 16 additions & 1 deletion flink-connectors/flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ under the License.

<!-- Pulsar bundles the latest bookkeeper & zookeeper, -->
<!-- we don't override the version here. -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>testmocks</artifactId>
<version>${pulsar.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-testng</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
Expand Down Expand Up @@ -236,7 +252,6 @@ under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {

ControlSource source =
new ControlSource(
sharedObjects, operator(), topic, guarantee, counts, Duration.ofMinutes(5));
sharedObjects,
operator(),
topic,
guarantee,
counts,
Duration.ofMillis(50),
Duration.ofMinutes(5));
PulsarSink<String> sink =
PulsarSink.builder()
.setServiceUrl(operator().serviceUrl())
Expand All @@ -79,8 +85,11 @@ void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);
env.enableCheckpointing(100L);
if (guarantee != DeliveryGuarantee.NONE) {
env.enableCheckpointing(500L);
}
env.addSource(source).sinkTo(sink);
env.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.flink.util.UserCodeClassLoader;

import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -65,16 +64,9 @@ class PulsarWriterTest extends PulsarTestSuiteBase {

private static final SinkWriter.Context CONTEXT = new MockSinkWriterContext();

@Test
void writeMessageWithGuarantee() throws Exception {
writeMessageWithoutGuarantee(EXACTLY_ONCE);
}

@ParameterizedTest
@EnumSource(
value = DeliveryGuarantee.class,
names = {"AT_LEAST_ONCE", "NONE"})
void writeMessageWithoutGuarantee(DeliveryGuarantee guarantee) throws Exception {
@EnumSource(DeliveryGuarantee.class)
void writeMessages(DeliveryGuarantee guarantee) throws Exception {
String topic = randomAlphabetic(10);
operator().createTopic(topic, 8);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,15 @@
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;

import org.junit.jupiter.api.Disabled;

/** Unite test class for {@link PulsarSource}. */
@SuppressWarnings("unused")
class PulsarSourceITCase extends SourceTestSuiteBase<String> {

@Disabled // TODO: remove override after FLINK-26177 is fixed
@Override
public void testScaleUp(
TestEnvironment testEnv,
DataStreamSourceExternalContext<String> externalContext,
CheckpointingMode semantic)
throws Exception {
super.testScaleUp(testEnv, externalContext, semantic);
}

@Disabled // TODO: remove override after FLINK-26177 is fixed
@Override
public void testScaleDown(
TestEnvironment testEnv,
DataStreamSourceExternalContext<String> externalContext,
CheckpointingMode semantic)
throws Exception {
super.testScaleDown(testEnv, externalContext, semantic);
}

// Defines test environment on Flink MiniCluster
@TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();

// Defines pulsar running environment
@TestExternalSystem
PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.embedded());
PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());

@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
Expand All @@ -78,4 +55,22 @@ public void testScaleDown(
@TestContext
PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
new PulsarTestContextFactory<>(pulsar, MultipleTopicConsumingContext::new);

@Override
public void testScaleUp(
TestEnvironment testEnv,
DataStreamSourceExternalContext<String> externalContext,
CheckpointingMode semantic)
throws Exception {
super.testScaleUp(testEnv, externalContext, semantic);
}

@Override
public void testScaleDown(
TestEnvironment testEnv,
DataStreamSourceExternalContext<String> externalContext,
CheckpointingMode semantic)
throws Exception {
super.testScaleDown(testEnv, externalContext, semantic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
import org.apache.flink.core.testutils.CommonTestUtils;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.TestTemplate;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -139,22 +140,6 @@ private void setupSourceReader(
reader.notifyNoMoreSplits();
}

private void pollUntilReadExpectedNumberOfRecordsAndValidate(
PulsarSourceReaderBase<Integer> reader,
TestingReaderOutput<Integer> output,
int expectedRecords,
String topicNameWithPartition)
throws Exception {
pollUntil(
reader,
output,
() -> output.getEmittedRecords().size() == expectedRecords,
"The output didn't poll enough records before timeout.");
reader.close();
verifyAllMessageAcknowledged(expectedRecords, topicNameWithPartition);
assertThat(output.getEmittedRecords()).hasSize(expectedRecords);
}

private void pollUntil(
PulsarSourceReaderBase<Integer> reader,
ReaderOutput<Integer> output,
Expand All @@ -177,15 +162,19 @@ private void pollUntil(
}

private void verifyAllMessageAcknowledged(int expectedMessages, String partitionName)
throws PulsarAdminException {
TopicStats topicStats = operator().admin().topics().getStats(partitionName, true, true);
// verify if the messages has been consumed
Map<String, ? extends SubscriptionStats> subscriptionStats = topicStats.getSubscriptions();
assertThat(subscriptionStats).hasSizeGreaterThan(0);
subscriptionStats.forEach(
(subscription, stats) -> {
assertThat(stats.getUnackedMessages()).isZero();
assertThat(stats.getMsgOutCounter()).isEqualTo(expectedMessages);
});
throws PulsarAdminException, PulsarClientException {

Consumer<byte[]> consumer =
operator()
.client()
.newConsumer()
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscriptionName("verify-message")
.topic(partitionName)
.subscribe();

assertThat(((MessageIdImpl) consumer.getLastMessageId()).getEntryId())
.isEqualTo(expectedMessages - 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -92,11 +91,6 @@ void beforeEach(String topicName) {
operator().setupTopic(topicName, Schema.INT32, () -> random.nextInt(20));
}

@AfterEach
void afterEach(String topicName) {
operator().deleteTopic(topicName);
}

@TestTemplate
void assignZeroSplitsCreatesZeroSubscription(
PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class PulsarTestSuiteBase {
* pulsar broker. Override this method when needs.
*/
protected PulsarRuntime runtime() {
return PulsarRuntime.embedded();
return PulsarRuntime.mock();
}

/** Operate pulsar by acquiring a runtime operator. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;

import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -66,8 +68,10 @@ public ControlSource(
String topic,
DeliveryGuarantee guarantee,
int messageCounts,
Duration interval,
Duration timeout) {
MessageGenerator generator = new MessageGenerator(topic, guarantee, messageCounts);
MessageGenerator generator =
new MessageGenerator(topic, guarantee, messageCounts, interval);
StopSignal signal = new StopSignal(operator, topic, messageCounts, timeout);

this.sharedGenerator = sharedObjects.add(generator);
Expand Down Expand Up @@ -134,12 +138,15 @@ private static class MessageGenerator implements Iterator<String> {
private final DeliveryGuarantee guarantee;
private final int messageCounts;
private final List<String> expectedRecords;
private final Duration interval;

public MessageGenerator(String topic, DeliveryGuarantee guarantee, int messageCounts) {
public MessageGenerator(
String topic, DeliveryGuarantee guarantee, int messageCounts, Duration interval) {
this.topic = topic;
this.guarantee = guarantee;
this.messageCounts = messageCounts;
this.expectedRecords = new ArrayList<>(messageCounts);
this.interval = interval;
}

@Override
Expand All @@ -158,6 +165,9 @@ public String next() {
+ "-"
+ randomAlphanumeric(10);
expectedRecords.add(content);

// Make sure the message was generated in the fixed interval.
Uninterruptibles.sleepUninterruptibly(interval);
return content;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
import org.apache.flink.connector.pulsar.testutils.runtime.embedded.PulsarEmbeddedRuntime;
import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;

import org.testcontainers.containers.GenericContainer;

Expand All @@ -39,18 +40,23 @@ public interface PulsarRuntime {
void tearDown();

/**
* Return a operator for operating this pulsar runtime. This operator predefined a set of
* Return an operator for operating this pulsar runtime. This operator predefined a set of
* extremely useful methods for Pulsar. You can easily add new methods in this operator.
*/
PulsarRuntimeOperator operator();

/** Create a Pulsar instance which would mock all the backends. */
static PulsarRuntime mock() {
return new PulsarMockRuntime();
}

/**
* Create a standalone Pulsar instance in test thread. We would start a embedded zookeeper and
* Create a standalone Pulsar instance in test thread. We would start an embedded zookeeper and
* bookkeeper. The stream storage for bookkeeper is disabled. The function worker is disabled on
* Pulsar broker.
*
* <p>This runtime would be faster than {@link #container()} and behaves the same like the
* {@link #container()}.
* <p>This runtime would be faster than {@link #container()} and behaves the same as the {@link
* #container()}.
*/
static PulsarRuntime embedded() {
return new PulsarEmbeddedRuntime();
Expand Down
Loading

0 comments on commit 1572908

Please sign in to comment.