Skip to content

Commit

Permalink
Fixing Debezium integration tests (apache#11154)
Browse files Browse the repository at this point in the history
Fixes apache#11099 

### Motivation

Integration test for debezium connector to tun on CI

### Modifications

Debezium integration test for postgres:  added check for flush lsn updates

Added pulsar-io tests to the CI
  • Loading branch information
dlg99 authored Aug 4, 2021
1 parent b80b0be commit 5d136a1
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 54 deletions.
122 changes: 122 additions & 0 deletions .github/workflows/ci-integration-pulsar-io.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: CI - Integration - Pulsar-IO Sinks and Sources
on:
pull_request:
branches:
- master
push:
branches:
- branch-*

env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3

jobs:

pulsar-io:
name:
runs-on: ubuntu-latest
timeout-minutes: 120

steps:
- name: checkout
uses: actions/checkout@v2

- name: Tune Runner VM
uses: ./.github/actions/tune-runner-vm

- name: Detect changed files
id: changes
uses: apache/pulsar-test-infra/paths-filter@master
with:
filters: .github/changes-filter.yaml

- name: Check changed files
id: check_changes
run: echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"

- name: Cache local Maven repository
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
uses: actions/cache@v2
with:
path: |
~/.m2/repository/*/*/*
!~/.m2/repository/org/apache/pulsar
key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }}
${{ runner.os }}-m2-dependencies-core-modules-
- name: Set up JDK 11
uses: actions/setup-java@v2
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
with:
distribution: 'adopt'
java-version: 11

- name: Replace maven's wagon-http version
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: sudo ./build/replace_maven-wagon-http-version.sh

- name: clean disk
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
- name: run install by skip tests
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -q -B -ntp clean install -DskipTests

- name: build pulsar image
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true

- name: build pulsar-all image
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true

- name: build artifacts and docker image
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker,-main -DskipTests

- name: run integration tests
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: ./build/run_integration_group.sh PULSAR_IO

- name: Upload container logs
uses: actions/upload-artifact@v2
if: ${{ cancelled() || failure() }}
continue-on-error: true
with:
name: container-logs
path: tests/integration/target/container-logs

- name: Upload surefire-reports
uses: actions/upload-artifact@v2
if: ${{ cancelled() || failure() }}
continue-on-error: true
with:
name: surefire-reports
path: tests/integration/target/surefire-reports
7 changes: 6 additions & 1 deletion build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,16 @@ test_group_sql() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 -DtestReuseFork=false
}

test_group_pulsar_io() {
mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=source
#mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=sink
}

echo "Test Group : $TEST_GROUP"
test_group_function_name="test_group_$(echo "$TEST_GROUP" | tr '[:upper:]' '[:lower:]')"
if [[ "$(LC_ALL=C type -t $test_group_function_name)" == "function" ]]; then
eval "$test_group_function_name" "$@"
else
echo "INVALID TEST GROUP"
exit 1
fi
fi
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
protected final String sourceType;
protected final Map<String, Object> sourceConfig;

protected int numEntriesToInsert = 1;

public static final Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
add("before");
add("after");
Expand Down Expand Up @@ -85,11 +87,27 @@ public Map<String, Object> sourceConfig() {

public void validateSourceResult(Consumer consumer, int number,
String eventType, String converterClassName) throws Exception {
doPreValidationCheck(eventType);
if (converterClassName.endsWith("AvroConverter")) {
validateSourceResultAvro(consumer, number, eventType);
} else {
validateSourceResultJson(consumer, number, eventType);
}
doPostValidationCheck(eventType);
}

/**
* Execute before regular validation to check database-specific state.
*/
public void doPreValidationCheck(String eventType) {
log.info("pre-validation of {}", eventType);
}

/**
* Execute after regular validation to check database-specific state.
*/
public void doPostValidationCheck(String eventType) {
log.info("post-validation of {}", eventType);
}

public void validateSourceResultJson(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.Assert;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* A tester for testing Debezium Postgresql source.
Expand All @@ -49,9 +56,19 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre

private final PulsarCluster pulsarCluster;

private final AtomicReference<String> confirmedFlushLsn = new AtomicReference<>("not read yet");

public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
super(NAME);
this.pulsarCluster = cluster;
/*
todo (possibly solvable by debezium upgrade?): figure out why last message is lost with larger numEntriesToInsert.
I.e. numEntriesToInsert = 100 results in 99 events from debezium 1.0.0, 300 results in 299 events.
10 is handled ok.
Likely this is related to https://issues.redhat.com/browse/DBZ-2288
*/
this.numEntriesToInsert = 10;

pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("database.hostname", DebeziumPostgreSqlContainer.NAME);
Expand Down Expand Up @@ -81,31 +98,56 @@ public void prepareSource() {

@Override
public void prepareInsertEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"insert into inventory.products(name, description, weight) " +
"values('test-debezium', 'description', 10);\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres "+
"-c \"select count(1), max(id) from inventory.products where name='test-debezium' and weight=10;\"");
}

@Override
public void prepareDeleteEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"delete from inventory.products where name='test-debezium';\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select count(1) from inventory.products where name='test-debezium';\"");
}

@Override
public void prepareUpdateEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"update inventory.products " +
"set description='test-update-description', weight='20' where name='test-debezium';\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c " +
"\"select count(1) from inventory.products where name='test-debezium' and weight=20;\"");
}

@Override
public void doPostValidationCheck(String eventType) {
super.doPostValidationCheck(eventType);
/*
confirmed_flush_lsn in pg_replication_slots table has to change,
otherwise postgres won't truncate WAL and the disk space will grow.
I.e. upgrade from debezium 1.0.0 to 1.0.3 resulted in confirmed_flush_lsn
not updating in insert-heavy load.
*/
try {
ContainerExecResult res = debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select confirmed_flush_lsn from pg_replication_slots;\"");
res.assertNoStderr();
String lastConfirmedFlushLsn = res.getStdout();
log.info("Current confirmedFlushLsn: \n{} \nLast confirmedFlushLsn: \n{}",
confirmedFlushLsn.get(), lastConfirmedFlushLsn);
org.junit.Assert.assertNotEquals(confirmedFlushLsn.get(), lastConfirmedFlushLsn);
confirmedFlushLsn.set(lastConfirmedFlushLsn);
} catch (Exception e) {
Assert.fail("failed to get flush lsn", e);
}
}

@Override
Expand All @@ -117,7 +159,7 @@ public Map<String, String> produceSourceMessages(int numMessages) {
@Override
public void close() {
if (pulsarCluster != null) {
pulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Slf4j
public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {

protected final AtomicInteger testId = new AtomicInteger(0);
protected final AtomicInteger testId = new AtomicInteger(0);

@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
Expand Down Expand Up @@ -104,21 +104,20 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
sourceTester.setServiceContainer(mySQLContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement();
final String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products";
final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8);


// This is the binlog count that contained in postgresql container.
final int numMessages = 26;

Expand All @@ -143,13 +142,13 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js
sourceTester.setServiceContainer(postgreSqlContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
Expand Down Expand Up @@ -182,8 +181,8 @@ private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonW
sourceTester.setServiceContainer(mongoDbContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}
Expand Down
Loading

0 comments on commit 5d136a1

Please sign in to comment.