From 5d136a1132f8b8b76cdd49967b3ec4d422c0813e Mon Sep 17 00:00:00 2001 From: Andrey Yegorov <8622884+dlg99@users.noreply.github.com> Date: Wed, 4 Aug 2021 00:11:48 -0700 Subject: [PATCH] Fixing Debezium integration tests (#11154) Fixes #11099 ### Motivation Integration test for debezium connector to tun on CI ### Modifications Debezium integration test for postgres: added check for flush lsn updates Added pulsar-io tests to the CI --- .../workflows/ci-integration-pulsar-io.yaml | 122 ++++++++++++++++++ build/run_integration_group.sh | 7 +- .../integration/io/sources/SourceTester.java | 18 +++ .../DebeziumPostgreSqlSourceTester.java | 56 +++++++- .../debezium/PulsarDebeziumSourcesTest.java | 19 ++- .../PulsarIODebeziumSourceRunner.java | 78 +++++------ 6 files changed, 246 insertions(+), 54 deletions(-) create mode 100644 .github/workflows/ci-integration-pulsar-io.yaml diff --git a/.github/workflows/ci-integration-pulsar-io.yaml b/.github/workflows/ci-integration-pulsar-io.yaml new file mode 100644 index 0000000000000..2052e429c7517 --- /dev/null +++ b/.github/workflows/ci-integration-pulsar-io.yaml @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: CI - Integration - Pulsar-IO Sinks and Sources +on: + pull_request: + branches: + - master + push: + branches: + - branch-* + +env: + MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 + +jobs: + + pulsar-io: + name: + runs-on: ubuntu-latest + timeout-minutes: 120 + + steps: + - name: checkout + uses: actions/checkout@v2 + + - name: Tune Runner VM + uses: ./.github/actions/tune-runner-vm + + - name: Detect changed files + id: changes + uses: apache/pulsar-test-infra/paths-filter@master + with: + filters: .github/changes-filter.yaml + + - name: Check changed files + id: check_changes + run: echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}" + + - name: Cache local Maven repository + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + uses: actions/cache@v2 + with: + path: | + ~/.m2/repository/*/*/* + !~/.m2/repository/org/apache/pulsar + key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} + ${{ runner.os }}-m2-dependencies-core-modules- + + - name: Set up JDK 11 + uses: actions/setup-java@v2 + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + with: + distribution: 'adopt' + java-version: 11 + + - name: Replace maven's wagon-http version + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: sudo ./build/replace_maven-wagon-http-version.sh + + - name: clean disk + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: | + sudo swapoff -a + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc + sudo apt clean + docker rmi $(docker images -q) -f + df -h + + - name: run install by skip tests + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: mvn -q -B -ntp clean install -DskipTests + + - name: build pulsar image + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true + + - name: build pulsar-all image + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true + + - name: build artifacts and docker image + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker,-main -DskipTests + + - name: run integration tests + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: ./build/run_integration_group.sh PULSAR_IO + + - name: Upload container logs + uses: actions/upload-artifact@v2 + if: ${{ cancelled() || failure() }} + continue-on-error: true + with: + name: container-logs + path: tests/integration/target/container-logs + + - name: Upload surefire-reports + uses: actions/upload-artifact@v2 + if: ${{ cancelled() || failure() }} + continue-on-error: true + with: + name: surefire-reports + path: tests/integration/target/surefire-reports diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index 19d20cf1710da..b589202ef30cd 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -128,6 +128,11 @@ test_group_sql() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 -DtestReuseFork=false } +test_group_pulsar_io() { + mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=source + #mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=sink +} + echo "Test Group : $TEST_GROUP" test_group_function_name="test_group_$(echo "$TEST_GROUP" | tr '[:upper:]' '[:lower:]')" if [[ "$(LC_ALL=C type -t $test_group_function_name)" == "function" ]]; then @@ -135,4 +140,4 @@ if [[ "$(LC_ALL=C type -t $test_group_function_name)" == "function" ]]; then else echo "INVALID TEST GROUP" exit 1 -fi \ No newline at end of file +fi diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java index c2a7d9ad91c12..14f0bd452e27b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java @@ -50,6 +50,8 @@ public abstract class SourceTester { protected final String sourceType; protected final Map sourceConfig; + protected int numEntriesToInsert = 1; + public static final Set DEBEZIUM_FIELD_SET = new HashSet() {{ add("before"); add("after"); @@ -85,11 +87,27 @@ public Map sourceConfig() { public void validateSourceResult(Consumer consumer, int number, String eventType, String converterClassName) throws Exception { + doPreValidationCheck(eventType); if (converterClassName.endsWith("AvroConverter")) { validateSourceResultAvro(consumer, number, eventType); } else { validateSourceResultJson(consumer, number, eventType); } + doPostValidationCheck(eventType); + } + + /** + * Execute before regular validation to check database-specific state. + */ + public void doPreValidationCheck(String eventType) { + log.info("pre-validation of {}", eventType); + } + + /** + * Execute after regular validation to check database-specific state. + */ + public void doPostValidationCheck(String eventType) { + log.info("post-validation of {}", eventType); } public void validateSourceResultJson(Consumer> consumer, int number, String eventType) throws Exception { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java index 4b66d00709d23..e4596c3900e5c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java @@ -20,13 +20,20 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer; import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.io.sources.SourceTester; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testng.Assert; import java.io.Closeable; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * A tester for testing Debezium Postgresql source. @@ -49,9 +56,19 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester confirmedFlushLsn = new AtomicReference<>("not read yet"); + public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) { super(NAME); this.pulsarCluster = cluster; + /* + todo (possibly solvable by debezium upgrade?): figure out why last message is lost with larger numEntriesToInsert. + I.e. numEntriesToInsert = 100 results in 99 events from debezium 1.0.0, 300 results in 299 events. + 10 is handled ok. + Likely this is related to https://issues.redhat.com/browse/DBZ-2288 + */ + this.numEntriesToInsert = 10; + pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; sourceConfig.put("database.hostname", DebeziumPostgreSqlContainer.NAME); @@ -81,31 +98,56 @@ public void prepareSource() { @Override public void prepareInsertEvent() throws Exception { - this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", - "psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\""); this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres " + "-c \"insert into inventory.products(name, description, weight) " + "values('test-debezium', 'description', 10);\""); + this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", + "psql -h 127.0.0.1 -U postgres -d postgres "+ + "-c \"select count(1), max(id) from inventory.products where name='test-debezium' and weight=10;\""); } @Override public void prepareDeleteEvent() throws Exception { - this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", - "psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\""); this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres " + "-c \"delete from inventory.products where name='test-debezium';\""); + this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", + "psql -h 127.0.0.1 -U postgres -d postgres -c \"select count(1) from inventory.products where name='test-debezium';\""); } @Override public void prepareUpdateEvent() throws Exception { - this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", - "psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\""); this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", "psql -h 127.0.0.1 -U postgres -d postgres " + "-c \"update inventory.products " + "set description='test-update-description', weight='20' where name='test-debezium';\""); + this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", + "psql -h 127.0.0.1 -U postgres -d postgres -c " + + "\"select count(1) from inventory.products where name='test-debezium' and weight=20;\""); + } + + @Override + public void doPostValidationCheck(String eventType) { + super.doPostValidationCheck(eventType); + /* + confirmed_flush_lsn in pg_replication_slots table has to change, + otherwise postgres won't truncate WAL and the disk space will grow. + I.e. upgrade from debezium 1.0.0 to 1.0.3 resulted in confirmed_flush_lsn + not updating in insert-heavy load. + */ + try { + ContainerExecResult res = debeziumPostgresqlContainer.execCmd("/bin/bash", "-c", + "psql -h 127.0.0.1 -U postgres -d postgres -c \"select confirmed_flush_lsn from pg_replication_slots;\""); + res.assertNoStderr(); + String lastConfirmedFlushLsn = res.getStdout(); + log.info("Current confirmedFlushLsn: \n{} \nLast confirmedFlushLsn: \n{}", + confirmedFlushLsn.get(), lastConfirmedFlushLsn); + org.junit.Assert.assertNotEquals(confirmedFlushLsn.get(), lastConfirmedFlushLsn); + confirmedFlushLsn.set(lastConfirmedFlushLsn); + } catch (Exception e) { + Assert.fail("failed to get flush lsn", e); + } } @Override @@ -117,7 +159,7 @@ public Map produceSourceMessages(int numMessages) { @Override public void close() { if (pulsarCluster != null) { - pulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer); + PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java index 56dbe38a27fc8..2e88dad12abcb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java @@ -38,7 +38,7 @@ @Slf4j public class PulsarDebeziumSourcesTest extends PulsarIOTestBase { - protected final AtomicInteger testId = new AtomicInteger(0); + protected final AtomicInteger testId = new AtomicInteger(0); @Test(groups = "source") public void testDebeziumMySqlSourceJson() throws Exception { @@ -104,13 +104,13 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit sourceTester.setServiceContainer(mySQLContainer); PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(), - converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope, - consumeTopicName, client); + converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope, + consumeTopicName, client); runner.testSource(sourceTester); } - private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception { + private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception { final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; @@ -118,7 +118,6 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js final String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products"; final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8); - // This is the binlog count that contained in postgresql container. final int numMessages = 26; @@ -143,13 +142,13 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js sourceTester.setServiceContainer(postgreSqlContainer); PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(), - converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope, - consumeTopicName, client); + converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope, + consumeTopicName, client); runner.testSource(sourceTester); } - private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception { + private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception { final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; @@ -182,8 +181,8 @@ private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonW sourceTester.setServiceContainer(mongoDbContainer); PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(), - converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope, - consumeTopicName, client); + converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope, + consumeTopicName, client); runner.testSource(sourceTester); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java index 2766319d0b1de..6f0bbfd5cef7d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.io.sources.debezium; +import com.google.common.base.Preconditions; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -34,35 +35,35 @@ @Slf4j public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner { - private String converterClassName; - private String tenant; - private String namespace; - private String sourceName; - private String outputTopicName; - private String consumeTopicName; - private int numMessages; - private boolean jsonWithEnvelope; - private PulsarClient client; - - public PulsarIODebeziumSourceRunner(PulsarCluster cluster, String functionRuntimeType, String converterClassName, - String tenant, String ns, String sourceName, String outputTopic, int numMessages, boolean jsonWithEnvelope, - String consumeTopicName, PulsarClient client) { - super(cluster, functionRuntimeType); - this.converterClassName = converterClassName; - this.tenant = tenant; - this.namespace = ns; - this.sourceName = sourceName; - this.outputTopicName = outputTopic; - this.numMessages = numMessages; - this.jsonWithEnvelope = jsonWithEnvelope; - this.consumeTopicName = consumeTopicName; - this.client = client; - } - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testSource(SourceTester sourceTester) throws Exception { - // prepare the testing environment for source + private String converterClassName; + private String tenant; + private String namespace; + private String sourceName; + private String outputTopicName; + private String consumeTopicName; + private int numMessages; + private boolean jsonWithEnvelope; + private PulsarClient client; + + public PulsarIODebeziumSourceRunner(PulsarCluster cluster, String functionRuntimeType, String converterClassName, + String tenant, String ns, String sourceName, String outputTopic, int numMessages, boolean jsonWithEnvelope, + String consumeTopicName, PulsarClient client) { + super(cluster, functionRuntimeType); + this.converterClassName = converterClassName; + this.tenant = tenant; + this.namespace = ns; + this.sourceName = sourceName; + this.outputTopicName = outputTopic; + this.numMessages = numMessages; + this.jsonWithEnvelope = jsonWithEnvelope; + this.consumeTopicName = consumeTopicName; + this.client = client; + } + + @Override + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testSource(SourceTester sourceTester) throws Exception { + // prepare the testing environment for source prepareSource(sourceTester); // submit the source connector @@ -90,28 +91,33 @@ public void testSource(SourceTester sourceTester // validate the source result sourceTester.validateSourceResult(consumer, 9, null, converterClassName); - // prepare insert event - sourceTester.prepareInsertEvent(); + final int numEntriesToInsert = sourceTester.getNumEntriesToInsert(); + Preconditions.checkArgument(numEntriesToInsert >= 1); - // validate the source insert event - sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName); + for (int i = 1; i <= numEntriesToInsert; i++) { + // prepare insert event + sourceTester.prepareInsertEvent(); + log.info("inserted entry {} of {}", i, numEntriesToInsert); + // validate the source insert event + sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName); + } // prepare update event sourceTester.prepareUpdateEvent(); // validate the source update event - sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName); + sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.UPDATE, converterClassName); // prepare delete event sourceTester.prepareDeleteEvent(); // validate the source delete event - sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName); + sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.DELETE, converterClassName); // delete the source deleteSource(tenant, namespace, sourceName); // get source info (source should be deleted) getSourceInfoNotFound(tenant, namespace, sourceName); - } + } }