From 965b789daba3bad753dd1f20a4592c96478138b1 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 14 Jan 2019 16:36:46 +0100 Subject: [PATCH] DBZ-175 Misc. adjustments; * Dedicated functional interface for halting predicates * Using semantic temporal types * Typo fixes * Exposing SnapshotNewTables through connector config * Static loggers * Adding Moira to COPYRIGHT.txt --- COPYRIGHT.txt | 1 + .../connector/mysql/AbstractReader.java | 11 +++--- .../connector/mysql/BinlogReader.java | 4 +- .../connector/mysql/HaltingPredicate.java | 23 ++++++++++++ .../connector/mysql/MySqlConnectorConfig.java | 10 ++++- .../connector/mysql/MySqlConnectorTask.java | 14 +++---- .../connector/mysql/MySqlTaskContext.java | 6 --- .../mysql/ParallelSnapshotReader.java | 24 ++++++------ .../mysql/ReconcilingBinlogReader.java | 33 +++++++++-------- .../connector/mysql/SnapshotReader.java | 2 +- .../mysql/ParallelSnapshotReaderTest.java | 37 ++++++++++--------- .../mysql/ReconcilingBinlogReaderTest.java | 12 +++--- 12 files changed, 102 insertions(+), 75 deletions(-) create mode 100644 debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/HaltingPredicate.java diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 356bd2c6241..3725971f40d 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -43,6 +43,7 @@ MaoXiang Pan Mario Mueller Matteo Capitanio Matthias Wessendorf +Moira Tagle Olavi Mustanoja Omar Al-Safi Ori Popowski diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java index fdb6f69e1ab..adc7aa4febc 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java @@ -13,7 +13,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; @@ -50,7 +49,7 @@ public abstract class AbstractReader implements Reader { private final AtomicReference uponCompletion = new AtomicReference<>(); private final Duration pollInterval; - private final Predicate acceptAndContinue; + private final HaltingPredicate acceptAndContinue; /** * Create a snapshot reader. @@ -62,7 +61,7 @@ public abstract class AbstractReader implements Reader { * accepting records once {@link #enqueueRecord(SourceRecord)} is called with a record * that tests as false. Can be null. If null, all records will be accepted. */ - public AbstractReader(String name, MySqlTaskContext context, Predicate acceptAndContinue) { + public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) { this.name = name; this.context = context; this.connectionContext = context.getConnectionContext(); @@ -313,7 +312,7 @@ protected void pollComplete(List batch) { */ protected boolean enqueueRecord(SourceRecord record) throws InterruptedException { if (record != null) { - if (acceptAndContinue.test(record)) { + if (acceptAndContinue.accepts(record)) { if (logger.isTraceEnabled()) { logger.trace("Enqueuing source record: {}", record); } @@ -331,10 +330,10 @@ protected boolean enqueueRecord(SourceRecord record) throws InterruptedException /** * A predicate that returns true for all sourceRecords */ - public static class AcceptAllPredicate implements Predicate { + public static class AcceptAllPredicate implements HaltingPredicate { @Override - public boolean test(SourceRecord sourceRecord) { + public boolean accepts(SourceRecord sourceRecord) { return true; } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index da1e3ce325b..3c52eac5271 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -146,7 +146,7 @@ public boolean equals(Object obj) { * @param context the task context in which this reader is running; may not be null * @param acceptAndContinue see {@link AbstractReader#AbstractReader(String, MySqlTaskContext, Predicate)} */ - public BinlogReader(String name, MySqlTaskContext context, Predicate acceptAndContinue) { + public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) { this(name, context, acceptAndContinue, context.serverId()); } @@ -158,7 +158,7 @@ public BinlogReader(String name, MySqlTaskContext context, Predicate acceptAndContinue, long serverId) { + public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue, long serverId) { super(name, context, acceptAndContinue); connectionContext = context.getConnectionContext(); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/HaltingPredicate.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/HaltingPredicate.java new file mode 100644 index 00000000000..bb2a22dc9f6 --- /dev/null +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/HaltingPredicate.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import org.apache.kafka.connect.source.SourceRecord; + +/** + * A predicate invoked by {@link Reader} implementations in order to determine whether they should continue with + * processing records or not. + * + * @author Gunnar Morling + */ +@FunctionalInterface +public interface HaltingPredicate { + + /** + * Whether this record should be processed by the calling reader or not. + */ + boolean accepts(SourceRecord record); +} diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 704aa34a731..85efdb70926 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -631,7 +631,7 @@ public static DdlParsingMode parse(String value, String defaultValue) { .withWidth(Width.LONG) .withImportance(Importance.HIGH) .withDefault(10000L) - .withDescription("Only relevant in parallel snapshotting is configured. During " + .withDescription("Only relevant if parallel snapshotting is configured. During " + "parallel snapshotting, multiple (4) connections open to the database " + "client, and they each need their own unique connection ID. This offset is " + "used to generate those IDs from the base configured cluster ID."); @@ -1092,6 +1092,7 @@ public static final Field MASK_COLUMN(int length) { private final SnapshotLockingMode snapshotLockingMode; private final DdlParsingMode ddlParsingMode; private final GtidNewChannelPosition gitIdNewChannelPosition; + private final SnapshotNewTables snapshotNewTables; public MySqlConnectorConfig(Configuration config) { super( @@ -1119,6 +1120,9 @@ public MySqlConnectorConfig(Configuration config) { String gitIdNewChannelPosition = config.getString(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION); this.gitIdNewChannelPosition = GtidNewChannelPosition.parse(gitIdNewChannelPosition, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.defaultValueAsString()); + + String snapshotNewTables = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES); + this.snapshotNewTables = SnapshotNewTables.parse(snapshotNewTables, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString()); } public SnapshotLockingMode getSnapshotLockingMode() { @@ -1133,6 +1137,10 @@ public GtidNewChannelPosition gtidNewChannelPosition() { return gitIdNewChannelPosition; } + public SnapshotNewTables getSnapshotNewTables() { + return snapshotNewTables; + } + protected static ConfigDef configDef() { ConfigDef config = new ConfigDef(); Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID, SERVER_ID_OFFSET, diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index f911bd14532..0f6982fd7c5 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -10,13 +10,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; -import io.debezium.util.LoggingContext; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -29,6 +29,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; +import io.debezium.util.LoggingContext; import io.debezium.util.LoggingContext.PreviousContext; /** @@ -215,7 +216,7 @@ public synchronized void start(Configuration config) { // if there are new tables if (newTablesInConfig()) { // and we are configured to run a parallel snapshot - if (taskContext.snapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) { + if (taskContext.getConnectorConfig().getSnapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) { ServerIdGenerator serverIdGenerator = new ServerIdGenerator(config.getLong(MySqlConnectorConfig.SERVER_ID), config.getLong(MySqlConnectorConfig.SERVER_ID_OFFSET)); @@ -310,14 +311,13 @@ public long getConfiguredServerId() { * @return the offset to restart from. * @see RecordMakers#RecordMakers(MySqlSchema, SourceInfo, TopicSelector, boolean, Map) */ - @SuppressWarnings("unchecked") private Map getRestartOffset(Map storedOffset) { Map restartOffset = new HashMap<>(); if (storedOffset != null) { - for (String key : storedOffset.keySet()){ - if (key.startsWith(SourceInfo.RESTART_PREFIX)) { - String newKey = key.substring(SourceInfo.RESTART_PREFIX.length()); - restartOffset.put(newKey, storedOffset.get(key)); + for (Entry entry : storedOffset.entrySet()){ + if (entry.getKey().startsWith(SourceInfo.RESTART_PREFIX)) { + String newKey = entry.getKey().substring(SourceInfo.RESTART_PREFIX.length()); + restartOffset.put(newKey, entry.getValue()); } } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index 39c2b3bc4c1..b81913d730b 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -17,7 +17,6 @@ import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; -import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotNewTables; import io.debezium.function.Predicates; import io.debezium.relational.TableId; import io.debezium.relational.history.DatabaseHistory; @@ -246,11 +245,6 @@ protected SnapshotMode snapshotMode() { return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValueAsString()); } - protected SnapshotNewTables snapshotNewTables() { - String value = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES); - return SnapshotNewTables.parse(value, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString()); - } - public String getSnapshotSelectOverrides() { return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ParallelSnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ParallelSnapshotReader.java index b50e5f474bd..792e02ca912 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ParallelSnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ParallelSnapshotReader.java @@ -5,17 +5,17 @@ */ package io.debezium.connector.mysql; -import io.debezium.config.Configuration; -import org.apache.kafka.connect.source.SourceRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; + +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; /** * A reader that runs a {@link ChainedReader} consisting of a {@link SnapshotReader} and a {@link BinlogReader} @@ -195,17 +195,17 @@ public String name() { * the current time. Once a single record near the end of the binlog has been seen, we * we assume the reader will stay near the end of the binlog. */ - /*package local*/ static class ParallelHaltingPredicate implements Predicate { + /*package local*/ static class ParallelHaltingPredicate implements HaltingPredicate { - private final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(ParallelHaltingPredicate.class); + + private static final Duration DEFAULT_MIN_HALTING_DURATION = Duration.ofMinutes(5); private volatile AtomicBoolean thisReaderNearEnd; private volatile AtomicBoolean otherReaderNearEnd; // The minimum duration we must be within before we attempt to halt. private final Duration minHaltingDuration; - // is hard coded in as 5 minutes. - private static final Duration DEFAULT_MIN_HALTING_DURATION = Duration.ofMinutes(5); /*package local*/ ParallelHaltingPredicate(AtomicBoolean thisReaderNearEndRef, AtomicBoolean otherReaderNearEndRef) { @@ -221,7 +221,7 @@ public String name() { } @Override - public boolean test(SourceRecord ourSourceRecord) { + public boolean accepts(SourceRecord ourSourceRecord) { // we assume if we ever end up near the end of the binlog, then we will remain there. if (!thisReaderNearEnd.get()) { Long sourceRecordTimestamp = (Long) ourSourceRecord.sourceOffset().get(SourceInfo.TIMESTAMP_KEY); @@ -232,7 +232,7 @@ public boolean test(SourceRecord ourSourceRecord) { now); if (durationToEnd.compareTo(minHaltingDuration) <= 0) { // we are within minHaltingDuration of the end - logger.debug("Parallel halting predicate: this reader near end"); + LOGGER.debug("Parallel halting predicate: this reader near end"); thisReaderNearEnd.set(true); } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ReconcilingBinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ReconcilingBinlogReader.java index 4bf427cb993..ac512794b68 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ReconcilingBinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ReconcilingBinlogReader.java @@ -5,10 +5,8 @@ */ package io.debezium.connector.mysql; -import io.debezium.document.Document; -import org.apache.kafka.connect.source.SourceRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static io.debezium.connector.mysql.SourceInfo.BINLOG_FILENAME_OFFSET_KEY; +import static io.debezium.connector.mysql.SourceInfo.BINLOG_POSITION_OFFSET_KEY; import java.util.List; import java.util.Map; @@ -16,8 +14,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; -import static io.debezium.connector.mysql.SourceInfo.BINLOG_FILENAME_OFFSET_KEY; -import static io.debezium.connector.mysql.SourceInfo.BINLOG_POSITION_OFFSET_KEY; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.document.Document; /** * A reader that unifies the binlog positions of two binlog readers. @@ -29,7 +30,7 @@ */ public class ReconcilingBinlogReader implements Reader { - private final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(ReconcilingBinlogReader.class); private final BinlogReader binlogReaderA; private final BinlogReader binlogReaderB; @@ -104,11 +105,11 @@ public void start() { public void stop() { if (running.compareAndSet(true, false)){ try { - logger.info("Stopping the {} reader", reconcilingReader.name()); + LOGGER.info("Stopping the {} reader", reconcilingReader.name()); reconcilingReader.stop(); reconcilingReader.context.shutdown(); } catch (Throwable t) { - logger.error("Unexpected error stopping the {} reader", reconcilingReader.name()); + LOGGER.error("Unexpected error stopping the {} reader", reconcilingReader.name()); } } } @@ -126,7 +127,7 @@ private void completeSuccessfully() { if (completed.compareAndSet(false, true)){ stop(); setupUnifiedReader(); - logger.info("Completed Reconciliation of Parallel Readers."); + LOGGER.info("Completed Reconciliation of Parallel Readers."); Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once if (completionHandler != null) { @@ -177,9 +178,9 @@ private void determineLeadingReader() { } if (aReaderLeading) { - logger.info("old tables leading; reading only from new tables"); + LOGGER.info("old tables leading; reading only from new tables"); } else { - logger.info("new tables leading; reading only from old tables"); + LOGGER.info("new tables leading; reading only from old tables"); } } @@ -203,10 +204,10 @@ private void checkLaggingLeadingInfo() { /** * A Predicate that returns false for any record beyond a given offset. */ - /*package private*/ static class OffsetLimitPredicate implements Predicate { + /*package private*/ static class OffsetLimitPredicate implements HaltingPredicate { - private Document leadingReaderFinalOffsetDocument; - private Predicate gtidFilter; + private final Document leadingReaderFinalOffsetDocument; + private final Predicate gtidFilter; /*package private*/ OffsetLimitPredicate(Map leadingReaderFinalOffset, Predicate gtidFilter) { @@ -216,7 +217,7 @@ private void checkLaggingLeadingInfo() { } @Override - public boolean test(SourceRecord sourceRecord) { + public boolean accepts(SourceRecord sourceRecord) { Document offsetDocument = SourceInfo.createDocumentFromOffset(sourceRecord.sourceOffset()); // .isPositionAtOrBefore is true IFF leadingReaderFinalOffsetDocument <= offsetDocument // we should stop (return false) IFF leadingReaderFinalOffsetDocument <= offsetDocument diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index da7a33d7520..fe8cb0f899b 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -766,7 +766,7 @@ protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection my * @return {@link Filters} that represent all the tables that this snapshot reader should CREATE */ private Filters getCreateTableFilters(Filters filters) { - MySqlConnectorConfig.SnapshotNewTables snapshotNewTables = context.snapshotNewTables(); + MySqlConnectorConfig.SnapshotNewTables snapshotNewTables = context.getConnectorConfig().getSnapshotNewTables(); if (snapshotNewTables == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) { // if we are snapshotting new tables in parallel, we need to make sure all the tables in the configuration // are created. diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ParallelSnapshotReaderTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ParallelSnapshotReaderTest.java index 448c39d6f3e..35f7bc4b9b4 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ParallelSnapshotReaderTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ParallelSnapshotReaderTest.java @@ -5,21 +5,23 @@ */ package io.debezium.connector.mysql; -import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Assert; -import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import static io.debezium.connector.mysql.ParallelSnapshotReader.ParallelHaltingPredicate; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Assert; +import org.junit.Test; + +import io.debezium.connector.mysql.ParallelSnapshotReader.ParallelHaltingPredicate; /** * @author Moira Tagle @@ -170,12 +172,11 @@ public void testHaltingPredicateHonorsTimeRange() { AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false); AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false); - long durationSec = 5 * 60; // five minutes - Duration duration = Duration.ofSeconds(durationSec); + Duration duration = Duration.ofMinutes(5); ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration); - boolean testResult = parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000 - (durationSec * 2))); + boolean testResult = parallelHaltingPredicate.accepts(createSourceRecordWithTimestamp(Instant.now().minus(duration.multipliedBy(2)))); Assert.assertTrue(testResult); @@ -192,11 +193,11 @@ public void testHaltingPredicateFlipsthisReaderNearEnd() { AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false); AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false); - Duration duration = Duration.ofSeconds(5 * 60); // five minutes + Duration duration = Duration.ofMinutes(5); ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration); - boolean testResult = parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000)); + boolean testResult = parallelHaltingPredicate.accepts(createSourceRecordWithTimestamp(Instant.now())); Assert.assertTrue(testResult); @@ -212,16 +213,16 @@ public void testHaltingPredicateHalts() { AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false); AtomicBoolean otherReaderNearEnd = new AtomicBoolean(true); - Duration duration = Duration.ofSeconds(5 * 60); // five minutes + Duration duration = Duration.ofMinutes(5); ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration); boolean testResult = - parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000)); + parallelHaltingPredicate.accepts(createSourceRecordWithTimestamp(Instant.now())); Assert.assertFalse(testResult); - + Assert.assertTrue(thisReaderNearEnd.get()); Assert.assertTrue(otherReaderNearEnd.get()); } @@ -230,11 +231,11 @@ public void testHaltingPredicateHalts() { * Create an "offset" containing a single timestamp element with the given value. * Needed because {@link ParallelSnapshotReader.ParallelHaltingPredicate} halts based on how * close the record's timestamp is to the present time. - * @param tsSec the timestamp (in seconds) in the resulting offset. + * @param tsSec the timestamp in the resulting offset. * @return an "offset" containing the given timestamp. */ - private SourceRecord createSourceRecordWithTimestamp(long tsSec) { - Map offset = Collections.singletonMap(SourceInfo.TIMESTAMP_KEY, tsSec); + private SourceRecord createSourceRecordWithTimestamp(Instant ts) { + Map offset = Collections.singletonMap(SourceInfo.TIMESTAMP_KEY, ts.getEpochSecond()); return new SourceRecord(null, offset, null, null, null); } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReconcilingBinlogReaderTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReconcilingBinlogReaderTest.java index ffb33439091..77270b1be6f 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReconcilingBinlogReaderTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReconcilingBinlogReaderTest.java @@ -5,15 +5,15 @@ */ package io.debezium.connector.mysql; -import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Assert; -import org.junit.Test; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Assert; +import org.junit.Test; + /** * @author Moira Tagle */ @@ -27,7 +27,7 @@ public void haltAfterPredicateTrue() { SourceRecord testSourceRecord = createSourceRecordWithOffset(offsets.get(0)); // tested record (0) is before limit (1), so we should return true. - Assert.assertTrue(offsetLimitPredicate.test(testSourceRecord)); + Assert.assertTrue(offsetLimitPredicate.accepts(testSourceRecord)); } @Test @@ -38,7 +38,7 @@ public void haltAfterPredicateFalse() { SourceRecord testSourceRecord = createSourceRecordWithOffset(offsets.get(1)); // tested record (1) is beyond limit (0), so we should return false. - Assert.assertFalse(offsetLimitPredicate.test(testSourceRecord)); + Assert.assertFalse(offsetLimitPredicate.accepts(testSourceRecord)); } private final int SERVER_ID = 0;