From a91d9515dc3e2e694626a5fe2625a9d7402f67ef Mon Sep 17 00:00:00 2001 From: TsReaper Date: Fri, 12 Jul 2019 00:11:25 +0800 Subject: [PATCH] [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source (#9029) --- .../api/java/io/jdbc/JDBCLookupFunction.java | 2 +- .../api/java/io/jdbc/JDBCLookupOptions.java | 15 +- .../flink/api/java/io/jdbc/JDBCOptions.java | 18 +- .../api/java/io/jdbc/JDBCReadOptions.java | 146 +++++++++ .../api/java/io/jdbc/JDBCTableSource.java | 135 ++++++++- .../io/jdbc/JDBCTableSourceSinkFactory.java | 192 ++++++++++++ .../api/java/io/jdbc/JDBCUpsertTableSink.java | 17 ++ .../api/java/io/jdbc/dialect/JDBCDialect.java | 2 +- .../NumericBetweenParametersProvider.java | 75 +++-- .../table/descriptors/JDBCValidator.java | 133 +++++++++ ....apache.flink.table.factories.TableFactory | 16 + .../flink/api/java/io/jdbc/JDBCFullTest.java | 2 +- .../api/java/io/jdbc/JDBCInputFormatTest.java | 4 +- .../jdbc/JDBCTableSourceSinkFactoryTest.java | 276 ++++++++++++++++++ .../NumericBetweenParametersProviderTest.java | 122 ++++++++ 15 files changed, 1122 insertions(+), 33 deletions(-) create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java create mode 100644 flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory create mode 100644 flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java create mode 100644 flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java index 4a4644124969e..80d612712b7a9 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java @@ -114,7 +114,7 @@ public void open(FunctionContext context) throws Exception { try { establishConnection(); statement = dbConn.prepareStatement(query); - this.cache = cacheMaxSize == -1 ? null : CacheBuilder.newBuilder() + this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) .maximumSize(cacheMaxSize) .build(); diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java index dd6dbbe358b8a..61373244fcdc6 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.io.jdbc; import java.io.Serializable; +import java.util.Objects; import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES; @@ -53,8 +54,20 @@ public static Builder builder() { return new Builder(); } + @Override + public boolean equals(Object o) { + if (o instanceof JDBCLookupOptions) { + JDBCLookupOptions options = (JDBCLookupOptions) o; + return Objects.equals(cacheMaxSize, options.cacheMaxSize) && + Objects.equals(cacheExpireMs, options.cacheExpireMs) && + Objects.equals(maxRetryTimes, options.maxRetryTimes); + } else { + return false; + } + } + /** - * Builder of {@link JDBCOptions}. + * Builder of {@link JDBCLookupOptions}. */ public static class Builder { private long cacheMaxSize = -1L; diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java index e026a32aff45b..f68b14a9f7b5c 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java @@ -21,12 +21,13 @@ import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import java.util.Objects; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Options for the JDBC connector. + * Common options of {@link JDBCScanOptions} and {@link JDBCLookupOptions} for the JDBC connector. */ public class JDBCOptions { @@ -75,6 +76,21 @@ public static Builder builder() { return new Builder(); } + @Override + public boolean equals(Object o) { + if (o instanceof JDBCOptions) { + JDBCOptions options = (JDBCOptions) o; + return Objects.equals(dbURL, options.dbURL) && + Objects.equals(tableName, options.tableName) && + Objects.equals(driverName, options.driverName) && + Objects.equals(username, options.username) && + Objects.equals(password, options.password) && + Objects.equals(dialect.getClass().getName(), options.dialect.getClass().getName()); + } else { + return false; + } + } + /** * Builder of {@link JDBCOptions}. */ diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java new file mode 100644 index 0000000000000..4591e1078bb39 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.Serializable; +import java.util.Objects; +import java.util.Optional; + +/** + * Options for the JDBC scan. + */ +public class JDBCReadOptions implements Serializable { + + private final String partitionColumnName; + private final Long partitionLowerBound; + private final Long partitionUpperBound; + private final Integer numPartitions; + + private final int fetchSize; + + private JDBCReadOptions( + String partitionColumnName, + Long partitionLowerBound, + Long partitionUpperBound, + Integer numPartitions, + int fetchSize) { + this.partitionColumnName = partitionColumnName; + this.partitionLowerBound = partitionLowerBound; + this.partitionUpperBound = partitionUpperBound; + this.numPartitions = numPartitions; + + this.fetchSize = fetchSize; + } + + public Optional getPartitionColumnName() { + return Optional.ofNullable(partitionColumnName); + } + + public Optional getPartitionLowerBound() { + return Optional.ofNullable(partitionLowerBound); + } + + public Optional getPartitionUpperBound() { + return Optional.ofNullable(partitionUpperBound); + } + + public Optional getNumPartitions() { + return Optional.ofNullable(numPartitions); + } + + public int getFetchSize() { + return fetchSize; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof JDBCReadOptions) { + JDBCReadOptions options = (JDBCReadOptions) o; + return Objects.equals(partitionColumnName, options.partitionColumnName) && + Objects.equals(partitionLowerBound, options.partitionLowerBound) && + Objects.equals(partitionUpperBound, options.partitionUpperBound) && + Objects.equals(numPartitions, options.numPartitions) && + Objects.equals(fetchSize, options.fetchSize); + } else { + return false; + } + } + + /** + * Builder of {@link JDBCReadOptions}. + */ + public static class Builder { + private String partitionColumnName; + private Long partitionLowerBound; + private Long partitionUpperBound; + private Integer numPartitions; + + private int fetchSize = 0; + + /** + * optional, name of the column used for partitioning the input. + */ + public Builder setPartitionColumnName(String partitionColumnName) { + this.partitionColumnName = partitionColumnName; + return this; + } + + /** + * optional, the smallest value of the first partition. + */ + public Builder setPartitionLowerBound(long partitionLowerBound) { + this.partitionLowerBound = partitionLowerBound; + return this; + } + + /** + * optional, the largest value of the last partition. + */ + public Builder setPartitionUpperBound(long partitionUpperBound) { + this.partitionUpperBound = partitionUpperBound; + return this; + } + + /** + * optional, the maximum number of partitions that can be used for parallelism in table reading. + */ + public Builder setNumPartitions(int numPartitions) { + this.numPartitions = numPartitions; + return this; + } + + /** + * optional, the number of rows to fetch per round trip. + * default value is 0, according to the jdbc api, 0 means that fetchSize hint will be ignored. + */ + public Builder setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + return this; + } + + public JDBCReadOptions build() { + return new JDBCReadOptions( + partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize); + } + } +} diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java index 82c3c3360722f..56298c7c5d21a 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java @@ -18,30 +18,81 @@ package org.apache.flink.api.java.io.jdbc; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.sources.LookupableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row; +import java.util.Arrays; +import java.util.Objects; + import static org.apache.flink.util.Preconditions.checkNotNull; /** * {@link TableSource} for JDBC. - * Now only support {@link LookupableTableSource}. */ -public class JDBCTableSource implements LookupableTableSource { +public class JDBCTableSource implements + StreamTableSource, + ProjectableTableSource, + LookupableTableSource { private final JDBCOptions options; + private final JDBCReadOptions readOptions; private final JDBCLookupOptions lookupOptions; private final TableSchema schema; - public JDBCTableSource( - JDBCOptions options, JDBCLookupOptions lookupOptions, TableSchema schema) { + // index of fields selected, null means that all fields are selected + private final int[] selectFields; + private final RowTypeInfo returnType; + + private JDBCTableSource( + JDBCOptions options, JDBCReadOptions readOptions, JDBCLookupOptions lookupOptions, TableSchema schema) { + this(options, readOptions, lookupOptions, schema, null); + } + + private JDBCTableSource( + JDBCOptions options, JDBCReadOptions readOptions, JDBCLookupOptions lookupOptions, + TableSchema schema, int[] selectFields) { this.options = options; + this.readOptions = readOptions; this.lookupOptions = lookupOptions; this.schema = schema; + + this.selectFields = selectFields; + + final TypeInformation[] schemaTypeInfos = schema.getFieldTypes(); + final String[] schemaFieldNames = schema.getFieldNames(); + if (selectFields != null) { + TypeInformation[] typeInfos = new TypeInformation[selectFields.length]; + String[] typeNames = new String[selectFields.length]; + for (int i = 0; i < selectFields.length; i++) { + typeInfos[i] = schemaTypeInfos[selectFields[i]]; + typeNames[i] = schemaFieldNames[selectFields[i]]; + } + this.returnType = new RowTypeInfo(typeInfos, typeNames); + } else { + this.returnType = new RowTypeInfo(schemaTypeInfos, schemaFieldNames); + } + } + + @Override + public boolean isBounded() { + return true; + } + + @Override + public DataStream getDataStream(StreamExecutionEnvironment execEnv) { + return execEnv.createInput(getInputFormat(), getReturnType()).name(explainSource()); } @Override @@ -49,12 +100,22 @@ public TableFunction getLookupFunction(String[] lookupKeys) { return JDBCLookupFunction.builder() .setOptions(options) .setLookupOptions(lookupOptions) - .setFieldTypes(schema.getFieldTypes()) - .setFieldNames(schema.getFieldNames()) + .setFieldTypes(returnType.getFieldTypes()) + .setFieldNames(returnType.getFieldNames()) .setKeyNames(lookupKeys) .build(); } + @Override + public TypeInformation getReturnType() { + return returnType; + } + + @Override + public TableSource projectFields(int[] fields) { + return new JDBCTableSource(options, readOptions, lookupOptions, schema, fields); + } + @Override public AsyncTableFunction getAsyncLookupFunction(String[] lookupKeys) { throw new UnsupportedOperationException(); @@ -74,12 +135,57 @@ public static Builder builder() { return new Builder(); } + private JDBCInputFormat getInputFormat() { + JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(options.getDriverName()) + .setDBUrl(options.getDbURL()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setRowTypeInfo(new RowTypeInfo(returnType.getFieldTypes(), returnType.getFieldNames())); + + if (readOptions.getFetchSize() != 0) { + builder.setFetchSize(readOptions.getFetchSize()); + } + + final JDBCDialect dialect = options.getDialect(); + String query = dialect.getSelectFromStatement( + options.getTableName(), returnType.getFieldNames(), new String[0]); + if (readOptions.getPartitionColumnName().isPresent()) { + long lowerBound = readOptions.getPartitionLowerBound().get(); + long upperBound = readOptions.getPartitionUpperBound().get(); + int numPartitions = readOptions.getNumPartitions().get(); + builder.setParametersProvider( + new NumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions)); + query += " WHERE " + + dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) + + " BETWEEN ? AND ?"; + } + builder.setQuery(query); + + return builder.finish(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof JDBCTableSource) { + JDBCTableSource source = (JDBCTableSource) o; + return Objects.equals(options, source.options) && + Objects.equals(readOptions, source.readOptions) && + Objects.equals(lookupOptions, source.lookupOptions) && + Objects.equals(schema, source.schema) && + Arrays.equals(selectFields, source.selectFields); + } else { + return false; + } + } + /** * Builder for a {@link JDBCTableSource}. */ public static class Builder { private JDBCOptions options; + private JDBCReadOptions readOptions; private JDBCLookupOptions lookupOptions; private TableSchema schema; @@ -91,6 +197,15 @@ public Builder setOptions(JDBCOptions options) { return this; } + /** + * optional, scan related options. + * {@link JDBCReadOptions} will be only used for {@link StreamTableSource}. + */ + public Builder setReadOptions(JDBCReadOptions readOptions) { + this.readOptions = readOptions; + return this; + } + /** * optional, lookup related options. * {@link JDBCLookupOptions} only be used for {@link LookupableTableSource}. @@ -116,7 +231,13 @@ public Builder setSchema(TableSchema schema) { public JDBCTableSource build() { checkNotNull(options, "No options supplied."); checkNotNull(schema, "No schema supplied."); - return new JDBCTableSource(options, lookupOptions, schema); + if (readOptions == null) { + readOptions = JDBCReadOptions.builder().build(); + } + if (lookupOptions == null) { + lookupOptions = JDBCLookupOptions.builder().build(); + } + return new JDBCTableSource(options, readOptions, lookupOptions, schema); } } } diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java new file mode 100644 index 0000000000000..b90871cf5d000 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.JDBCValidator; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_DRIVER; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_CACHE_MAX_ROWS; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_CACHE_TTL; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_MAX_RETRIES; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_PASSWORD; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_FETCH_SIZE; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_COLUMN; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_NUM; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_TABLE; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_TYPE_VALUE_JDBC; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_URL; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_USERNAME; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_FLUSH_INTERVAL; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_FLUSH_MAX_ROWS; +import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_MAX_RETRIES; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_JDBC); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_TABLE); + properties.add(CONNECTOR_USERNAME); + properties.add(CONNECTOR_PASSWORD); + + // scan options + properties.add(CONNECTOR_READ_PARTITION_COLUMN); + properties.add(CONNECTOR_READ_PARTITION_NUM); + properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND); + properties.add(CONNECTOR_READ_PARTITION_UPPER_BOUND); + properties.add(CONNECTOR_READ_FETCH_SIZE); + + // lookup options + properties.add(CONNECTOR_LOOKUP_CACHE_MAX_ROWS); + properties.add(CONNECTOR_LOOKUP_CACHE_TTL); + properties.add(CONNECTOR_LOOKUP_MAX_RETRIES); + + // sink options + properties.add(CONNECTOR_WRITE_FLUSH_MAX_ROWS); + properties.add(CONNECTOR_WRITE_FLUSH_INTERVAL); + properties.add(CONNECTOR_WRITE_MAX_RETRIES); + + // schema + properties.add(SCHEMA + ".#." + SCHEMA_TYPE); + properties.add(SCHEMA + ".#." + SCHEMA_NAME); + + return properties; + } + + @Override + public StreamTableSource createStreamTableSource(Map properties) { + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); + + return JDBCTableSource.builder() + .setOptions(getJDBCOptions(descriptorProperties)) + .setReadOptions(getJDBCReadOptions(descriptorProperties)) + .setLookupOptions(getJDBCLookupOptions(descriptorProperties)) + .setSchema(descriptorProperties.getTableSchema(SCHEMA)) + .build(); + } + + @Override + public StreamTableSink> createStreamTableSink(Map properties) { + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); + + final JDBCUpsertTableSink.Builder builder = JDBCUpsertTableSink.builder() + .setOptions(getJDBCOptions(descriptorProperties)) + .setTableSchema(descriptorProperties.getTableSchema(SCHEMA)); + + descriptorProperties.getOptionalInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS).ifPresent(builder::setFlushMaxSize); + descriptorProperties.getOptionalDuration(CONNECTOR_WRITE_FLUSH_INTERVAL).ifPresent( + s -> builder.setFlushIntervalMills(s.toMillis())); + descriptorProperties.getOptionalInt(CONNECTOR_WRITE_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes); + + return builder.build(); + } + + private DescriptorProperties getValidatedProperties(Map properties) { + final DescriptorProperties descriptorProperties = new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + new SchemaValidator(true, false, false).validate(descriptorProperties); + new JDBCValidator().validate(descriptorProperties); + + return descriptorProperties; + } + + private JDBCOptions getJDBCOptions(DescriptorProperties descriptorProperties) { + final String url = descriptorProperties.getString(CONNECTOR_URL); + final JDBCOptions.Builder builder = JDBCOptions.builder() + .setDBUrl(url) + .setTableName(descriptorProperties.getString(CONNECTOR_TABLE)) + .setDialect(JDBCDialects.get(url).get()); + + descriptorProperties.getOptionalString(CONNECTOR_DRIVER).ifPresent(builder::setDriverName); + descriptorProperties.getOptionalString(CONNECTOR_USERNAME).ifPresent(builder::setUsername); + descriptorProperties.getOptionalString(CONNECTOR_PASSWORD).ifPresent(builder::setPassword); + + return builder.build(); + } + + private JDBCReadOptions getJDBCReadOptions(DescriptorProperties descriptorProperties) { + final Optional partitionColumnName = + descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN); + final Optional partitionLower = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND); + final Optional partitionUpper = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND); + final Optional numPartitions = descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM); + + final JDBCReadOptions.Builder builder = JDBCReadOptions.builder(); + if (partitionColumnName.isPresent()) { + builder.setPartitionColumnName(partitionColumnName.get()); + builder.setPartitionLowerBound(partitionLower.get()); + builder.setPartitionUpperBound(partitionUpper.get()); + builder.setNumPartitions(numPartitions.get()); + } + descriptorProperties.getOptionalInt(CONNECTOR_READ_FETCH_SIZE).ifPresent(builder::setFetchSize); + + return builder.build(); + } + + private JDBCLookupOptions getJDBCLookupOptions(DescriptorProperties descriptorProperties) { + final JDBCLookupOptions.Builder builder = JDBCLookupOptions.builder(); + + descriptorProperties.getOptionalLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS).ifPresent(builder::setCacheMaxSize); + descriptorProperties.getOptionalDuration(CONNECTOR_LOOKUP_CACHE_TTL).ifPresent( + s -> builder.setCacheExpireMs(s.toMillis())); + descriptorProperties.getOptionalInt(CONNECTOR_LOOKUP_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes); + + return builder.build(); + } +} diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java index 6a2df0d574d63..f6d0b9b67d9f4 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java @@ -33,6 +33,7 @@ import org.apache.flink.types.Row; import java.util.Arrays; +import java.util.Objects; import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS; import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE; @@ -145,6 +146,22 @@ public static Builder builder() { return new Builder(); } + @Override + public boolean equals(Object o) { + if (o instanceof JDBCUpsertTableSink) { + JDBCUpsertTableSink sink = (JDBCUpsertTableSink) o; + return Objects.equals(schema, sink.schema) && + Objects.equals(options, sink.options) && + Objects.equals(flushMaxSize, sink.flushMaxSize) && + Objects.equals(flushIntervalMills, sink.flushIntervalMills) && + Objects.equals(maxRetryTime, sink.maxRetryTime) && + Arrays.equals(keyFields, sink.keyFields) && + Objects.equals(isAppendOnly, sink.isAppendOnly); + } else { + return false; + } + } + /** * Builder for a {@link JDBCUpsertTableSink}. */ diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java index 970fbb92c1577..8c53eaceec806 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java @@ -126,6 +126,6 @@ default String getSelectFromStatement(String tableName, String[] selectFields, S .map(f -> quoteIdentifier(f) + "=?") .collect(Collectors.joining(" AND ")); return "SELECT " + selectExpressions + " FROM " + - quoteIdentifier(tableName) + " WHERE " + fieldExpressions; + quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : ""); } } diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java index 4b8ecd65990c3..0e9011dfdaeb5 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java @@ -18,9 +18,9 @@ package org.apache.flink.api.java.io.jdbc.split; -import java.io.Serializable; +import org.apache.flink.util.Preconditions; -import static org.apache.flink.util.Preconditions.checkArgument; +import java.io.Serializable; /** * This query parameters generator is an helper class to parameterize from/to queries on a numeric column. @@ -29,19 +29,32 @@ * *

For example, if there's a table BOOKS with a numeric PK id, using a query like: *

- *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
  * 
* *

You can take advantage of this class to automatically generate the parameters of the BETWEEN clause, * based on the passed constructor parameters. - * */ public class NumericBetweenParametersProvider implements ParameterValuesProvider { - private final long fetchSize; private final long minVal; private final long maxVal; + private long batchSize; + private int batchNum; + + /** + * NumericBetweenParametersProvider constructor. + * + * @param minVal the lower bound of the produced "from" values + * @param maxVal the upper bound of the produced "to" values + */ + public NumericBetweenParametersProvider(long minVal, long maxVal) { + Preconditions.checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal"); + this.minVal = minVal; + this.maxVal = maxVal; + } + /** * NumericBetweenParametersProvider constructor. * @@ -50,27 +63,51 @@ public class NumericBetweenParametersProvider implements ParameterValuesProvider * @param maxVal the upper bound of the produced "to" values */ public NumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) { - checkArgument(fetchSize > 0, "Fetch size must be greater than 0."); - checkArgument(minVal <= maxVal, "Min value cannot be greater than max value."); - this.fetchSize = fetchSize; + Preconditions.checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal"); this.minVal = minVal; this.maxVal = maxVal; + ofBatchSize(fetchSize); + } + + public NumericBetweenParametersProvider ofBatchSize(long batchSize) { + Preconditions.checkArgument(batchSize > 0, "Batch size must be positive"); + + long maxElemCount = (maxVal - minVal) + 1; + if (batchSize > maxElemCount) { + batchSize = maxElemCount; + } + this.batchSize = batchSize; + this.batchNum = new Double(Math.ceil((double) maxElemCount / batchSize)).intValue(); + return this; + } + + public NumericBetweenParametersProvider ofBatchNum(int batchNum) { + Preconditions.checkArgument(batchNum > 0, "Batch number must be positive"); + + long maxElemCount = (maxVal - minVal) + 1; + if (batchNum > maxElemCount) { + batchNum = (int) maxElemCount; + } + this.batchNum = batchNum; + this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue(); + return this; } @Override public Serializable[][] getParameterValues() { - double maxElemCount = (maxVal - minVal) + 1; - int numBatches = new Double(Math.ceil(maxElemCount / fetchSize)).intValue(); - Serializable[][] parameters = new Serializable[numBatches][2]; - int batchIndex = 0; - for (long start = minVal; start <= maxVal; start += fetchSize, batchIndex++) { - long end = start + fetchSize - 1; - if (end > maxVal) { - end = maxVal; - } - parameters[batchIndex] = new Long[]{start, end}; + Preconditions.checkState(batchSize > 0, + "Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?"); + + long maxElemCount = (maxVal - minVal) + 1; + long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum; + + Serializable[][] parameters = new Serializable[batchNum][2]; + long start = minVal; + for (int i = 0; i < batchNum; i++) { + long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0); + parameters[i] = new Long[]{start, end}; + start = end + 1; } return parameters; } - } diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java new file mode 100644 index 0000000000000..b0a7c181d8fe9 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.util.Preconditions; + +import java.util.Optional; + +/** + * The validator for JDBC. + */ +@Internal +public class JDBCValidator extends ConnectorDescriptorValidator { + + public static final String CONNECTOR_TYPE_VALUE_JDBC = "jdbc"; + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_TABLE = "connector.table"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USERNAME = "connector.username"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column"; + public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = "connector.read.partition.lower-bound"; + public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = "connector.read.partition.upper-bound"; + public static final String CONNECTOR_READ_PARTITION_NUM = "connector.read.partition.num"; + public static final String CONNECTOR_READ_FETCH_SIZE = "connector.read.fetch-size"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ROWS = "connector.lookup.cache.max-rows"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ROWS = "connector.write.flush.max-rows"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + validateCommonProperties(properties); + validateReadProperties(properties); + validateLookupProperties(properties); + validateSinkProperties(properties); + } + + private void validateCommonProperties(DescriptorProperties properties) { + properties.validateString(CONNECTOR_URL, false, 1); + properties.validateString(CONNECTOR_TABLE, false, 1); + properties.validateString(CONNECTOR_DRIVER, true); + properties.validateString(CONNECTOR_USERNAME, true); + properties.validateString(CONNECTOR_PASSWORD, true); + + final String url = properties.getString(CONNECTOR_URL); + final Optional dialect = JDBCDialects.get(url); + Preconditions.checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + url); + + Optional password = properties.getOptionalString(CONNECTOR_PASSWORD); + if (password.isPresent()) { + Preconditions.checkArgument( + properties.getOptionalString(CONNECTOR_USERNAME).isPresent(), + "Database username must be provided when database password is provided"); + } + } + + private void validateReadProperties(DescriptorProperties properties) { + properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, true); + properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, true); + properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, true); + properties.validateInt(CONNECTOR_READ_PARTITION_NUM, true); + properties.validateInt(CONNECTOR_READ_FETCH_SIZE, true); + + Optional lowerBound = properties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND); + Optional upperBound = properties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND); + if (lowerBound.isPresent() && upperBound.isPresent()) { + Preconditions.checkArgument(lowerBound.get() <= upperBound.get(), + CONNECTOR_READ_PARTITION_LOWER_BOUND + " must not be larger than " + CONNECTOR_READ_PARTITION_UPPER_BOUND); + } + + checkAllOrNone(properties, new String[]{ + CONNECTOR_READ_PARTITION_COLUMN, + CONNECTOR_READ_PARTITION_LOWER_BOUND, + CONNECTOR_READ_PARTITION_UPPER_BOUND, + CONNECTOR_READ_PARTITION_NUM + }); + } + + private void validateLookupProperties(DescriptorProperties properties) { + properties.validateLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS, true); + properties.validateDuration(CONNECTOR_LOOKUP_CACHE_TTL, true, 1); + properties.validateInt(CONNECTOR_LOOKUP_MAX_RETRIES, true); + + checkAllOrNone(properties, new String[]{ + CONNECTOR_LOOKUP_CACHE_MAX_ROWS, + CONNECTOR_LOOKUP_CACHE_TTL + }); + } + + private void validateSinkProperties(DescriptorProperties properties) { + properties.validateInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS, true); + properties.validateDuration(CONNECTOR_WRITE_FLUSH_INTERVAL, true, 1); + properties.validateInt(CONNECTOR_WRITE_MAX_RETRIES, true); + } + + private void checkAllOrNone(DescriptorProperties properties, String[] propertyNames) { + int presentCount = 0; + for (String name : propertyNames) { + if (properties.getOptionalString(name).isPresent()) { + presentCount++; + } + } + Preconditions.checkArgument(presentCount == 0 || presentCount == propertyNames.length, + "Either all or none of the following properties should be provided:\n" + String.join("\n", propertyNames)); + } +} diff --git a/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 0000000000000..dbd648d597fbb --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java index 91b175d128310..14dc85a3ef46e 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java @@ -85,7 +85,7 @@ private void runTest(boolean exploitParallelism) throws Exception { //use a "splittable" query to exploit parallelism inputBuilder = inputBuilder .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) - .setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max)); + .setParametersProvider(new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize)); } DataSet source = environment.createInput(inputBuilder.finish()); diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java index 81320e64698af..559976d06fe16 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -221,7 +221,7 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws final int fetchSize = 1; final long min = TEST_DATA[0].id; final long max = TEST_DATA[TEST_DATA.length - fetchSize].id; - ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize); jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(DRIVER_CLASS) .setDBUrl(DB_URL) @@ -257,7 +257,7 @@ public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() thr final long min = TEST_DATA[0].id; final long max = TEST_DATA[TEST_DATA.length - 1].id; final long fetchSize = max + 1; //generate a single split - ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize); jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(DRIVER_CLASS) .setDBUrl(DB_URL) diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java new file mode 100644 index 0000000000000..33c7b683dc1af --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Test for {@link JDBCTableSource} and {@link JDBCUpsertTableSink} created + * by {@link JDBCTableSourceSinkFactory}. + */ +public class JDBCTableSourceSinkFactoryTest { + + @Test + public void testJDBCCommonProperties() { + Map properties = getBasicProperties(); + properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver"); + properties.put("connector.username", "user"); + properties.put("connector.password", "pass"); + + final StreamTableSource actual = TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + + final JDBCOptions options = JDBCOptions.builder() + .setDBUrl("jdbc:derby:memory:mydb") + .setTableName("mytable") + .setDriverName("org.apache.derby.jdbc.EmbeddedDriver") + .setUsername("user") + .setPassword("pass") + .build(); + final TableSchema schema = TableSchema.builder() + .field("aaa", DataTypes.INT()) + .field("bbb", DataTypes.STRING()) + .field("ccc", DataTypes.DOUBLE()) + .build(); + final JDBCTableSource expected = JDBCTableSource.builder() + .setOptions(options) + .setSchema(schema) + .build(); + + assertEquals(expected, actual); + } + + @Test + public void testJDBCReadProperties() { + Map properties = getBasicProperties(); + properties.put("connector.read.partition.column", "aaa"); + properties.put("connector.read.partition.lower-bound", "-10"); + properties.put("connector.read.partition.upper-bound", "100"); + properties.put("connector.read.partition.num", "10"); + properties.put("connector.read.fetch-size", "20"); + + final StreamTableSource actual = TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + + final JDBCOptions options = JDBCOptions.builder() + .setDBUrl("jdbc:derby:memory:mydb") + .setTableName("mytable") + .build(); + final JDBCReadOptions readOptions = JDBCReadOptions.builder() + .setPartitionColumnName("aaa") + .setPartitionLowerBound(-10) + .setPartitionUpperBound(100) + .setNumPartitions(10) + .setFetchSize(20) + .build(); + final TableSchema schema = TableSchema.builder() + .field("aaa", DataTypes.INT()) + .field("bbb", DataTypes.STRING()) + .field("ccc", DataTypes.DOUBLE()) + .build(); + final JDBCTableSource expected = JDBCTableSource.builder() + .setOptions(options) + .setReadOptions(readOptions) + .setSchema(schema) + .build(); + + assertEquals(expected, actual); + } + + @Test + public void testJDBCLookupProperties() { + Map properties = getBasicProperties(); + properties.put("connector.lookup.cache.max-rows", "1000"); + properties.put("connector.lookup.cache.ttl", "10s"); + properties.put("connector.lookup.max-retries", "10"); + + final StreamTableSource actual = TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + + final JDBCOptions options = JDBCOptions.builder() + .setDBUrl("jdbc:derby:memory:mydb") + .setTableName("mytable") + .build(); + final JDBCLookupOptions lookupOptions = JDBCLookupOptions.builder() + .setCacheMaxSize(1000) + .setCacheExpireMs(10_000) + .setMaxRetryTimes(10) + .build(); + final TableSchema schema = TableSchema.builder() + .field("aaa", DataTypes.INT()) + .field("bbb", DataTypes.STRING()) + .field("ccc", DataTypes.DOUBLE()) + .build(); + final JDBCTableSource expected = JDBCTableSource.builder() + .setOptions(options) + .setLookupOptions(lookupOptions) + .setSchema(schema) + .build(); + + assertEquals(expected, actual); + } + + @Test + public void testJDBCSinkProperties() { + Map properties = getBasicProperties(); + properties.put("connector.write.flush.max-rows", "1000"); + properties.put("connector.write.flush.interval", "2min"); + properties.put("connector.write.max-retries", "5"); + + final StreamTableSink actual = TableFactoryService.find(StreamTableSinkFactory.class, properties) + .createStreamTableSink(properties); + + final JDBCOptions options = JDBCOptions.builder() + .setDBUrl("jdbc:derby:memory:mydb") + .setTableName("mytable") + .build(); + final TableSchema schema = TableSchema.builder() + .field("aaa", DataTypes.INT()) + .field("bbb", DataTypes.STRING()) + .field("ccc", DataTypes.DOUBLE()) + .build(); + final JDBCUpsertTableSink expected = JDBCUpsertTableSink.builder() + .setOptions(options) + .setTableSchema(schema) + .setFlushMaxSize(1000) + .setFlushIntervalMills(120_000) + .setMaxRetryTimes(5) + .build(); + + assertEquals(expected, actual); + } + + @Test + public void testJDBCWithFilter() { + Map properties = getBasicProperties(); + properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver"); + properties.put("connector.username", "user"); + properties.put("connector.password", "pass"); + + final TableSource actual = ((JDBCTableSource) TableFactoryService + .find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties)) + .projectFields(new int[] {0, 2}); + + Map projectedFields = ((FieldsDataType) actual.getProducedDataType()).getFieldDataTypes(); + assertEquals(projectedFields.get("aaa"), DataTypes.INT()); + assertNull(projectedFields.get("bbb")); + assertEquals(projectedFields.get("ccc"), DataTypes.DOUBLE()); + } + + @Test + public void testJDBCValidation() { + // only password, no username + try { + Map properties = getBasicProperties(); + properties.put("connector.password", "pass"); + + TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // read partition properties not complete + try { + Map properties = getBasicProperties(); + properties.put("connector.read.partition.column", "aaa"); + properties.put("connector.read.partition.lower-bound", "-10"); + properties.put("connector.read.partition.upper-bound", "100"); + + TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // read partition lower-bound > upper-bound + try { + Map properties = getBasicProperties(); + properties.put("connector.read.partition.column", "aaa"); + properties.put("connector.read.partition.lower-bound", "100"); + properties.put("connector.read.partition.upper-bound", "-10"); + properties.put("connector.read.partition.num", "10"); + + TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // lookup cache properties not complete + try { + Map properties = getBasicProperties(); + properties.put("connector.lookup.cache.max-rows", "10"); + + TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // lookup cache properties not complete + try { + Map properties = getBasicProperties(); + properties.put("connector.lookup.cache.ttl", "1s"); + + TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + } + + private Map getBasicProperties() { + Map properties = new HashMap<>(); + + properties.put("connector.type", "jdbc"); + properties.put("connector.property-version", "1"); + + properties.put("connector.url", "jdbc:derby:memory:mydb"); + properties.put("connector.table", "mytable"); + + properties.put("schema.0.name", "aaa"); + properties.put("schema.0.type", "INT"); + properties.put("schema.1.name", "bbb"); + properties.put("schema.1.type", "VARCHAR"); + properties.put("schema.2.name", "ccc"); + properties.put("schema.2.type", "DOUBLE"); + + return properties; + } +} diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java new file mode 100644 index 0000000000000..51f77e7e4f055 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.split; + +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link NumericBetweenParametersProvider}. + */ +public class NumericBetweenParametersProviderTest { + + @Test + public void testBatchSizeDivisible() { + NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 9).ofBatchSize(3); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = { + new long[]{-5, -3}, + new long[]{-2, 0}, + new long[]{1, 3}, + new long[]{4, 6}, + new long[]{7, 9} + }; + check(expected, actual); + } + + @Test + public void testBatchSizeNotDivisible() { + NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 11).ofBatchSize(4); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = { + new long[]{-5, -2}, + new long[]{-1, 2}, + new long[]{3, 5}, + new long[]{6, 8}, + new long[]{9, 11} + }; + check(expected, actual); + } + + @Test + public void testBatchSizeTooLarge() { + NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(0, 2).ofBatchSize(5); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = {new long[]{0, 2}}; + check(expected, actual); + } + + @Test + public void testBatchNumDivisible() { + NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 9).ofBatchNum(5); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = { + new long[]{-5, -3}, + new long[]{-2, 0}, + new long[]{1, 3}, + new long[]{4, 6}, + new long[]{7, 9} + }; + check(expected, actual); + } + + @Test + public void testBatchNumNotDivisible() { + NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 11).ofBatchNum(5); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = { + new long[]{-5, -2}, + new long[]{-1, 2}, + new long[]{3, 5}, + new long[]{6, 8}, + new long[]{9, 11} + }; + check(expected, actual); + } + + @Test + public void testBatchNumTooLarge() { + NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(0, 2).ofBatchNum(5); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = { + new long[]{0, 0}, + new long[]{1, 1}, + new long[]{2, 2}}; + check(expected, actual); + } + + private void check(long[][] expected, Serializable[][] actual) { + assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + for (int j = 0; j < 2; j++) { + assertEquals(expected[i][j], ((Long) actual[i][j]).longValue()); + } + } + } + +}