Skip to content

Commit

Permalink
CCDB-4340 - Add support for the new Timestamp granularity features in…
Browse files Browse the repository at this point in the history
… the jdbc source connector (#1152)

* CCDB-4340 - Add support for the new Timestamp granularity features in the jdbc source connector

* Changed the nanos config

* Fix checkstyle changes

* Update the doc string for timestamp.granularity

* Change all the conversion logic to an enum

* Address review comments and minor nit fixes
  • Loading branch information
ddasarathan authored Jan 12, 2022
1 parent 0f1a8ac commit 574a90e
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 48 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
files="(DataConverter|GenericDatabaseDialect|JdbcSourceTask).java"/>

<suppress checks="ParameterNumber"
files="(ColumnDefinition|GenericDatabaseDialect|SqlServerDatabaseDialect|PostgreSqlDatabaseDialect).java"/>
files="(ColumnDefinition|GenericDatabaseDialect|SqlServerDatabaseDialect|PostgreSqlDatabaseDialect|TimestampIncrementingTableQuerier).java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.confluent.connect.jdbc.source.ColumnMapping;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.NumericMapping;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TimestampGranularity;
import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig;
import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria;
import io.confluent.connect.jdbc.util.ColumnDefinition;
Expand Down Expand Up @@ -152,6 +153,7 @@ public DatabaseDialect create(AbstractConfig config) {
private volatile JdbcDriverInfo jdbcDriverInfo;
private final int batchMaxRows;
private final TimeZone timeZone;
private final JdbcSourceConnectorConfig.TimestampGranularity tsGranularity;

/**
* Create a new dialect instance with the given connector configuration.
Expand Down Expand Up @@ -208,6 +210,12 @@ protected GenericDatabaseDialect(
} else {
timeZone = TimeZone.getTimeZone(ZoneOffset.UTC);
}

if (config instanceof JdbcSourceConnectorConfig) {
tsGranularity = TimestampGranularity.get((JdbcSourceConnectorConfig) config);
} else {
tsGranularity = TimestampGranularity.CONNECT_LOGICAL;
}
}

@Override
Expand Down Expand Up @@ -1154,11 +1162,7 @@ protected String addFieldToSchema(

// Timestamp is a date + time
case Types.TIMESTAMP: {
SchemaBuilder tsSchemaBuilder = org.apache.kafka.connect.data.Timestamp.builder();
if (optional) {
tsSchemaBuilder.optional();
}
builder.field(fieldName, tsSchemaBuilder.build());
builder.field(fieldName, tsGranularity.schemaFunction.apply(optional));
break;
}

Expand Down Expand Up @@ -1385,7 +1389,10 @@ protected ColumnConverter columnConverterFor(

// Timestamp is a date + time
case Types.TIMESTAMP: {
return rs -> rs.getTimestamp(col, DateTimeUtils.getTimeZoneCalendar(timeZone));
return rs -> {
Timestamp timestamp = rs.getTimestamp(col, DateTimeUtils.getTimeZoneCalendar(timeZone));
return tsGranularity.fromTimestamp.apply(timestamp);
};
}

// Datalink is basically a URL -> string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.jdbc.source;

import java.sql.Timestamp;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -26,10 +27,12 @@
import java.util.concurrent.atomic.AtomicReference;

import io.confluent.connect.jdbc.util.DatabaseDialectRecommender;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import io.confluent.connect.jdbc.util.EnumRecommender;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TimeZoneValidator;

import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -40,6 +43,8 @@
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -187,6 +192,19 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
+ "Use -1 to use the current time. If not specified, all data will be retrieved.";
public static final String TIMESTAMP_INITIAL_DISPLAY = "Unix time value of initial timestamp";

public static final String TIMESTAMP_GRANULARITY_CONFIG = "timestamp.granularity";
public static final String TIMESTAMP_GRANULARITY_DOC =
"Define the granularity of the Timestamp column. Options include: \n"
+ " * connect_logical (default): represents timestamp values using Kafka Connect's "
+ "built-in representations \n"
+ " * nanos_long: represents timestamp values as nanos since epoch\n"
+ " * nanos_string: represents timestamp values as nanos since epoch in string\n"
+ " * nanos_iso_datetime_string: uses the iso format 'yyyy-MM-dd'T'HH:mm:ss.n'\n";
public static final String TIMESTAMP_GRANULARITY_DISPLAY = "Timestamp granularity for "
+ "timestamp columns";
private static final EnumRecommender TIMESTAMP_GRANULARITY_RECOMMENDER =
EnumRecommender.in(TimestampGranularity.values());

public static final String TABLE_POLL_INTERVAL_MS_CONFIG = "table.poll.interval.ms";
private static final String TABLE_POLL_INTERVAL_MS_DOC =
"Frequency in ms to poll for new or removed tables, which may result in updated task "
Expand Down Expand Up @@ -643,7 +661,19 @@ public void ensureValid(final String name, final Object value) {
CONNECTOR_GROUP,
++orderInGroup,
Width.MEDIUM,
DB_TIMEZONE_CONFIG_DISPLAY);
DB_TIMEZONE_CONFIG_DISPLAY
).define(
TIMESTAMP_GRANULARITY_CONFIG,
Type.STRING,
TimestampGranularity.DEFAULT,
TIMESTAMP_GRANULARITY_RECOMMENDER,
Importance.LOW,
TIMESTAMP_GRANULARITY_DOC,
CONNECTOR_GROUP,
++orderInGroup,
Width.MEDIUM,
TIMESTAMP_GRANULARITY_DISPLAY,
TIMESTAMP_GRANULARITY_RECOMMENDER);
}

public static final ConfigDef CONFIG_DEF = baseConfigDef();
Expand Down Expand Up @@ -786,6 +816,62 @@ public static NumericMapping get(JdbcSourceConnectorConfig config) {
}
}

public enum TimestampGranularity {
CONNECT_LOGICAL(optional -> optional
? org.apache.kafka.connect.data.Timestamp.builder().optional().build()
: org.apache.kafka.connect.data.Timestamp.builder().build(),
timestamp -> timestamp,
timestamp -> (Timestamp) timestamp),

NANOS_LONG(optional -> optional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA,
DateTimeUtils::toEpochNanos,
timestamp -> DateTimeUtils.toTimestamp((long) timestamp)),

NANOS_STRING(optional -> optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA,
timestamp -> String.valueOf(DateTimeUtils.toEpochNanos(timestamp)),
timestamp -> {
try {
return DateTimeUtils.toTimestamp((String) timestamp);
} catch (NumberFormatException e) {
throw new ConnectException(
"Invalid value for timestamp column with nanos-string granularity: "
+ timestamp
+ e.getMessage());
}
}),

NANOS_ISO_DATETIME_STRING(optional -> optional
? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA,
DateTimeUtils::toIsoDateTimeString,
timestamp -> DateTimeUtils.toTimestampFromIsoDateTime((String) timestamp));

public final Function<Boolean, Schema> schemaFunction;
public final Function<Timestamp, Object> fromTimestamp;
public final Function<Object, Timestamp> toTimestamp;

public static final String DEFAULT = CONNECT_LOGICAL.name().toLowerCase(Locale.ROOT);

private static final Map<String, TimestampGranularity> reverse = new HashMap<>(values().length);
static {
for (TimestampGranularity val : values()) {
reverse.put(val.name().toLowerCase(Locale.ROOT), val);
}
}

public static TimestampGranularity get(JdbcSourceConnectorConfig config) {
String tsGranularity = config.getString(TIMESTAMP_GRANULARITY_CONFIG);
return reverse.get(tsGranularity.toLowerCase(Locale.ROOT));
}

TimestampGranularity(Function<Boolean, Schema> schemaFunction,
Function<Timestamp, Object> fromTimestamp,
Function<Object, Timestamp> toTimestamp) {
this.schemaFunction = schemaFunction;
this.fromTimestamp = fromTimestamp;
this.toTimestamp = toTimestamp;
}
}

protected JdbcSourceConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
super(subclassConfigDef, props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ public void start(Map<String, String> properties) {
offset = computeInitialOffset(tableOrQuery, offset, timeZone);

String topicPrefix = config.topicPrefix();
JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity
= JdbcSourceConnectorConfig.TimestampGranularity.get(config);

if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) {
tableQueue.add(
Expand All @@ -224,7 +226,8 @@ public void start(Map<String, String> properties) {
offset,
timestampDelayInterval,
timeZone,
suffix
suffix,
timestampGranularity
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) {
Expand All @@ -238,7 +241,8 @@ public void start(Map<String, String> properties) {
offset,
timestampDelayInterval,
timeZone,
suffix
suffix,
timestampGranularity
)
);
} else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
Expand All @@ -253,7 +257,8 @@ public void start(Map<String, String> properties) {
offset,
timestampDelayInterval,
timeZone,
suffix
suffix,
timestampGranularity
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,18 @@ protected void setQueryParametersTimestamp(
* @param schema the record's schema; never null
* @param record the record's struct; never null
* @param previousOffset a previous timestamp offset if the table has timestamp columns
* @param timestampGranularity defines the configured granularity of the timestamp field
* @return the timestamp for this row; may not be null
*/
public TimestampIncrementingOffset extractValues(
Schema schema,
Struct record,
TimestampIncrementingOffset previousOffset
TimestampIncrementingOffset previousOffset,
JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity
) {
Timestamp extractedTimestamp = null;
if (hasTimestampColumns()) {
extractedTimestamp = extractOffsetTimestamp(schema, record);
extractedTimestamp = extractOffsetTimestamp(schema, record, timestampGranularity);
assert previousOffset == null || (previousOffset.getTimestampOffset() != null
&& previousOffset.getTimestampOffset().compareTo(
extractedTimestamp) <= 0
Expand All @@ -218,15 +220,17 @@ public TimestampIncrementingOffset extractValues(
*
* @param schema the record's schema; never null
* @param record the record's struct; never null
* @param timestampGranularity defines the configured granularity of the timestamp field
* @return the timestamp for this row; may not be null
*/
protected Timestamp extractOffsetTimestamp(
Schema schema,
Struct record
Struct record,
JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity
) {
caseAdjustedTimestampColumns.computeIfAbsent(schema, this::findCaseSensitiveTimestampColumns);
for (String timestampColumn : caseAdjustedTimestampColumns.get(schema)) {
Timestamp ts = (Timestamp) record.get(timestampColumn);
Timestamp ts = timestampGranularity.toTimestamp.apply(record.get(timestampColumn));
if (ts != null) {
return ts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TimestampGranularity;
import io.confluent.connect.jdbc.source.SchemaMapping.FieldSetter;
import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.CriteriaValues;
import io.confluent.connect.jdbc.util.ColumnDefinition;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class TimestampIncrementingTableQuerier extends TableQuerier implements C
protected TimestampIncrementingCriteria criteria;
protected final Map<String, String> partition;
protected final String topic;
protected final TimestampGranularity timestampGranularity;
private final List<ColumnId> timestampColumns;
private String incrementingColumnName;
private final long timestampDelay;
Expand All @@ -80,7 +82,8 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode
List<String> timestampColumnNames,
String incrementingColumnName,
Map<String, Object> offsetMap, Long timestampDelay,
TimeZone timeZone, String suffix) {
TimeZone timeZone, String suffix,
TimestampGranularity timestampGranularity) {
super(dialect, mode, name, topicPrefix, suffix);
this.incrementingColumnName = incrementingColumnName;
this.timestampColumnNames = timestampColumnNames != null
Expand Down Expand Up @@ -111,6 +114,7 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode
}

this.timeZone = timeZone;
this.timestampGranularity = timestampGranularity;
}

/**
Expand Down Expand Up @@ -220,7 +224,7 @@ public SourceRecord extractRecord() throws SQLException {
throw new DataException(e);
}
}
offset = criteria.extractValues(schemaMapping.schema(), record, offset);
offset = criteria.extractValues(schemaMapping.schema(), record, offset, timestampGranularity);
return new SourceRecord(partition, offset.toMap(), topic, record.schema(), record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TimestampGranularity;
import io.confluent.connect.jdbc.source.SchemaMapping.FieldSetter;

/**
Expand Down Expand Up @@ -62,7 +63,8 @@ public TimestampTableQuerier(
Map<String, Object> offsetMap,
Long timestampDelay,
TimeZone timeZone,
String suffix
String suffix,
TimestampGranularity timestampGranularity
) {
super(
dialect,
Expand All @@ -74,7 +76,8 @@ public TimestampTableQuerier(
offsetMap,
timestampDelay,
timeZone,
suffix
suffix,
timestampGranularity
);

this.latestCommittableTimestamp = this.offset.getTimestampOffset();
Expand Down Expand Up @@ -145,7 +148,8 @@ private PendingRecord doExtractRecord() {
throw new DataException(e);
}
}
this.offset = criteria.extractValues(schemaMapping.schema(), record, offset);
this.offset = criteria.extractValues(schemaMapping.schema(), record, offset,
timestampGranularity);
Timestamp timestamp = offset.hasTimestampOffset() ? offset.getTimestampOffset() : null;
return new PendingRecord(partition, timestamp, topic, record.schema(), record);
}
Expand Down
Loading

0 comments on commit 574a90e

Please sign in to comment.