Skip to content

Commit

Permalink
DBZ-175 Misc. adjustments;
Browse files Browse the repository at this point in the history
* Dedicated functional interface for halting predicates
* Using semantic temporal types
* Typo fixes
* Exposing SnapshotNewTables through connector config
* Static loggers
* Adding Moira to COPYRIGHT.txt
  • Loading branch information
gunnarmorling committed Jan 24, 2019
1 parent a6d791c commit 965b789
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 75 deletions.
1 change: 1 addition & 0 deletions COPYRIGHT.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ MaoXiang Pan
Mario Mueller
Matteo Capitanio
Matthias Wessendorf
Moira Tagle
Olavi Mustanoja
Omar Al-Safi
Ori Popowski
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -50,7 +49,7 @@ public abstract class AbstractReader implements Reader {
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final Duration pollInterval;

private final Predicate<SourceRecord> acceptAndContinue;
private final HaltingPredicate acceptAndContinue;

/**
* Create a snapshot reader.
Expand All @@ -62,7 +61,7 @@ public abstract class AbstractReader implements Reader {
* accepting records once {@link #enqueueRecord(SourceRecord)} is called with a record
* that tests as false. Can be null. If null, all records will be accepted.
*/
public AbstractReader(String name, MySqlTaskContext context, Predicate<SourceRecord> acceptAndContinue) {
public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) {
this.name = name;
this.context = context;
this.connectionContext = context.getConnectionContext();
Expand Down Expand Up @@ -313,7 +312,7 @@ protected void pollComplete(List<SourceRecord> batch) {
*/
protected boolean enqueueRecord(SourceRecord record) throws InterruptedException {
if (record != null) {
if (acceptAndContinue.test(record)) {
if (acceptAndContinue.accepts(record)) {
if (logger.isTraceEnabled()) {
logger.trace("Enqueuing source record: {}", record);
}
Expand All @@ -331,10 +330,10 @@ protected boolean enqueueRecord(SourceRecord record) throws InterruptedException
/**
* A predicate that returns true for all sourceRecords
*/
public static class AcceptAllPredicate implements Predicate<SourceRecord> {
public static class AcceptAllPredicate implements HaltingPredicate {

@Override
public boolean test(SourceRecord sourceRecord) {
public boolean accepts(SourceRecord sourceRecord) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public boolean equals(Object obj) {
* @param context the task context in which this reader is running; may not be null
* @param acceptAndContinue see {@link AbstractReader#AbstractReader(String, MySqlTaskContext, Predicate)}
*/
public BinlogReader(String name, MySqlTaskContext context, Predicate<SourceRecord> acceptAndContinue) {
public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) {
this(name, context, acceptAndContinue, context.serverId());
}

Expand All @@ -158,7 +158,7 @@ public BinlogReader(String name, MySqlTaskContext context, Predicate<SourceRecor
* @param acceptAndContinue see {@link AbstractReader#AbstractReader(String, MySqlTaskContext, Predicate)}
* @param serverId the server id to use for the {@link BinaryLogClient}
*/
public BinlogReader(String name, MySqlTaskContext context, Predicate<SourceRecord> acceptAndContinue, long serverId) {
public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue, long serverId) {
super(name, context, acceptAndContinue);

connectionContext = context.getConnectionContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;

import org.apache.kafka.connect.source.SourceRecord;

/**
* A predicate invoked by {@link Reader} implementations in order to determine whether they should continue with
* processing records or not.
*
* @author Gunnar Morling
*/
@FunctionalInterface
public interface HaltingPredicate {

/**
* Whether this record should be processed by the calling reader or not.
*/
boolean accepts(SourceRecord record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ public static DdlParsingMode parse(String value, String defaultValue) {
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDefault(10000L)
.withDescription("Only relevant in parallel snapshotting is configured. During "
.withDescription("Only relevant if parallel snapshotting is configured. During "
+ "parallel snapshotting, multiple (4) connections open to the database "
+ "client, and they each need their own unique connection ID. This offset is "
+ "used to generate those IDs from the base configured cluster ID.");
Expand Down Expand Up @@ -1092,6 +1092,7 @@ public static final Field MASK_COLUMN(int length) {
private final SnapshotLockingMode snapshotLockingMode;
private final DdlParsingMode ddlParsingMode;
private final GtidNewChannelPosition gitIdNewChannelPosition;
private final SnapshotNewTables snapshotNewTables;

public MySqlConnectorConfig(Configuration config) {
super(
Expand Down Expand Up @@ -1119,6 +1120,9 @@ public MySqlConnectorConfig(Configuration config) {

String gitIdNewChannelPosition = config.getString(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION);
this.gitIdNewChannelPosition = GtidNewChannelPosition.parse(gitIdNewChannelPosition, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.defaultValueAsString());

String snapshotNewTables = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
this.snapshotNewTables = SnapshotNewTables.parse(snapshotNewTables, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString());
}

public SnapshotLockingMode getSnapshotLockingMode() {
Expand All @@ -1133,6 +1137,10 @@ public GtidNewChannelPosition gtidNewChannelPosition() {
return gitIdNewChannelPosition;
}

public SnapshotNewTables getSnapshotNewTables() {
return snapshotNewTables;
}

protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID, SERVER_ID_OFFSET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.debezium.util.LoggingContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
Expand All @@ -29,6 +29,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
import io.debezium.util.LoggingContext.PreviousContext;

/**
Expand Down Expand Up @@ -215,7 +216,7 @@ public synchronized void start(Configuration config) {
// if there are new tables
if (newTablesInConfig()) {
// and we are configured to run a parallel snapshot
if (taskContext.snapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
if (taskContext.getConnectorConfig().getSnapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
ServerIdGenerator serverIdGenerator =
new ServerIdGenerator(config.getLong(MySqlConnectorConfig.SERVER_ID),
config.getLong(MySqlConnectorConfig.SERVER_ID_OFFSET));
Expand Down Expand Up @@ -310,14 +311,13 @@ public long getConfiguredServerId() {
* @return the offset to restart from.
* @see RecordMakers#RecordMakers(MySqlSchema, SourceInfo, TopicSelector, boolean, Map)
*/
@SuppressWarnings("unchecked")
private Map<String, ?> getRestartOffset(Map<String, ?> storedOffset) {
Map<String, Object> restartOffset = new HashMap<>();
if (storedOffset != null) {
for (String key : storedOffset.keySet()){
if (key.startsWith(SourceInfo.RESTART_PREFIX)) {
String newKey = key.substring(SourceInfo.RESTART_PREFIX.length());
restartOffset.put(newKey, storedOffset.get(key));
for (Entry<String, ?> entry : storedOffset.entrySet()){
if (entry.getKey().startsWith(SourceInfo.RESTART_PREFIX)) {
String newKey = entry.getKey().substring(SourceInfo.RESTART_PREFIX.length());
restartOffset.put(newKey, entry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotNewTables;
import io.debezium.function.Predicates;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
Expand Down Expand Up @@ -246,11 +245,6 @@ protected SnapshotMode snapshotMode() {
return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());
}

protected SnapshotNewTables snapshotNewTables() {
String value = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
return SnapshotNewTables.parse(value, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString());
}

public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
*/
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.config.Configuration;

/**
* A reader that runs a {@link ChainedReader} consisting of a {@link SnapshotReader} and a {@link BinlogReader}
Expand Down Expand Up @@ -195,17 +195,17 @@ public String name() {
* the current time. Once a single record near the end of the binlog has been seen, we
* we assume the reader will stay near the end of the binlog.
*/
/*package local*/ static class ParallelHaltingPredicate implements Predicate<SourceRecord> {
/*package local*/ static class ParallelHaltingPredicate implements HaltingPredicate {

private final Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelHaltingPredicate.class);

private static final Duration DEFAULT_MIN_HALTING_DURATION = Duration.ofMinutes(5);

private volatile AtomicBoolean thisReaderNearEnd;
private volatile AtomicBoolean otherReaderNearEnd;

// The minimum duration we must be within before we attempt to halt.
private final Duration minHaltingDuration;
// is hard coded in as 5 minutes.
private static final Duration DEFAULT_MIN_HALTING_DURATION = Duration.ofMinutes(5);

/*package local*/ ParallelHaltingPredicate(AtomicBoolean thisReaderNearEndRef,
AtomicBoolean otherReaderNearEndRef) {
Expand All @@ -221,7 +221,7 @@ public String name() {
}

@Override
public boolean test(SourceRecord ourSourceRecord) {
public boolean accepts(SourceRecord ourSourceRecord) {
// we assume if we ever end up near the end of the binlog, then we will remain there.
if (!thisReaderNearEnd.get()) {
Long sourceRecordTimestamp = (Long) ourSourceRecord.sourceOffset().get(SourceInfo.TIMESTAMP_KEY);
Expand All @@ -232,7 +232,7 @@ public boolean test(SourceRecord ourSourceRecord) {
now);
if (durationToEnd.compareTo(minHaltingDuration) <= 0) {
// we are within minHaltingDuration of the end
logger.debug("Parallel halting predicate: this reader near end");
LOGGER.debug("Parallel halting predicate: this reader near end");
thisReaderNearEnd.set(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@
*/
package io.debezium.connector.mysql;

import io.debezium.document.Document;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.debezium.connector.mysql.SourceInfo.BINLOG_FILENAME_OFFSET_KEY;
import static io.debezium.connector.mysql.SourceInfo.BINLOG_POSITION_OFFSET_KEY;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static io.debezium.connector.mysql.SourceInfo.BINLOG_FILENAME_OFFSET_KEY;
import static io.debezium.connector.mysql.SourceInfo.BINLOG_POSITION_OFFSET_KEY;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.document.Document;

/**
* A reader that unifies the binlog positions of two binlog readers.
Expand All @@ -29,7 +30,7 @@
*/
public class ReconcilingBinlogReader implements Reader {

private final Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger LOGGER = LoggerFactory.getLogger(ReconcilingBinlogReader.class);

private final BinlogReader binlogReaderA;
private final BinlogReader binlogReaderB;
Expand Down Expand Up @@ -104,11 +105,11 @@ public void start() {
public void stop() {
if (running.compareAndSet(true, false)){
try {
logger.info("Stopping the {} reader", reconcilingReader.name());
LOGGER.info("Stopping the {} reader", reconcilingReader.name());
reconcilingReader.stop();
reconcilingReader.context.shutdown();
} catch (Throwable t) {
logger.error("Unexpected error stopping the {} reader", reconcilingReader.name());
LOGGER.error("Unexpected error stopping the {} reader", reconcilingReader.name());
}
}
}
Expand All @@ -126,7 +127,7 @@ private void completeSuccessfully() {
if (completed.compareAndSet(false, true)){
stop();
setupUnifiedReader();
logger.info("Completed Reconciliation of Parallel Readers.");
LOGGER.info("Completed Reconciliation of Parallel Readers.");

Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
if (completionHandler != null) {
Expand Down Expand Up @@ -177,9 +178,9 @@ private void determineLeadingReader() {
}

if (aReaderLeading) {
logger.info("old tables leading; reading only from new tables");
LOGGER.info("old tables leading; reading only from new tables");
} else {
logger.info("new tables leading; reading only from old tables");
LOGGER.info("new tables leading; reading only from old tables");
}
}

Expand All @@ -203,10 +204,10 @@ private void checkLaggingLeadingInfo() {
/**
* A Predicate that returns false for any record beyond a given offset.
*/
/*package private*/ static class OffsetLimitPredicate implements Predicate<SourceRecord> {
/*package private*/ static class OffsetLimitPredicate implements HaltingPredicate {

private Document leadingReaderFinalOffsetDocument;
private Predicate<String> gtidFilter;
private final Document leadingReaderFinalOffsetDocument;
private final Predicate<String> gtidFilter;

/*package private*/ OffsetLimitPredicate(Map<String, ?> leadingReaderFinalOffset,
Predicate<String> gtidFilter) {
Expand All @@ -216,7 +217,7 @@ private void checkLaggingLeadingInfo() {
}

@Override
public boolean test(SourceRecord sourceRecord) {
public boolean accepts(SourceRecord sourceRecord) {
Document offsetDocument = SourceInfo.createDocumentFromOffset(sourceRecord.sourceOffset());
// .isPositionAtOrBefore is true IFF leadingReaderFinalOffsetDocument <= offsetDocument
// we should stop (return false) IFF leadingReaderFinalOffsetDocument <= offsetDocument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection my
* @return {@link Filters} that represent all the tables that this snapshot reader should CREATE
*/
private Filters getCreateTableFilters(Filters filters) {
MySqlConnectorConfig.SnapshotNewTables snapshotNewTables = context.snapshotNewTables();
MySqlConnectorConfig.SnapshotNewTables snapshotNewTables = context.getConnectorConfig().getSnapshotNewTables();
if (snapshotNewTables == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
// if we are snapshotting new tables in parallel, we need to make sure all the tables in the configuration
// are created.
Expand Down
Loading

0 comments on commit 965b789

Please sign in to comment.