Skip to content

Commit

Permalink
[FLINK-28868][connector/hbase] Migrate HBase table connector to the n…
Browse files Browse the repository at this point in the history
…ew LookupFunction interface

This closes apache#20495
  • Loading branch information
PatrickRen committed Aug 9, 2022
1 parent c5a8b0f commit bf81768
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 347 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
Constructor <org.apache.flink.connector.hbase.sink.HBaseSinkFunction.<init>(java.lang.String, org.apache.hadoop.conf.Configuration, org.apache.flink.connector.hbase.sink.HBaseMutationConverter, long, long, long)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseSinkFunction.java:0)
Constructor <org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> has parameter of type <org.apache.hadoop.conf.Configuration> in (AbstractHBaseDynamicTableSource.java:0)
Constructor <org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseRowDataLookupFunction.java:0)
Constructor <org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, int, org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has parameter of type <org.apache.hadoop.conf.Configuration> in (AbstractHBaseDynamicTableSource.java:0)
Constructor <org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, int)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseRowDataLookupFunction.java:0)
Constructor <org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink.<init>(java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, org.apache.hadoop.conf.Configuration, org.apache.flink.connector.hbase.options.HBaseWriteOptions, java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseDynamicTableSink.java:0)
Constructor <org.apache.flink.connector.hbase1.source.AbstractTableInputFormat.<init>(org.apache.hadoop.conf.Configuration)> has parameter of type <org.apache.hadoop.conf.Configuration> in (AbstractTableInputFormat.java:0)
Constructor <org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseDynamicTableSource.java:0)
Constructor <org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, int, org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseDynamicTableSource.java:0)
Constructor <org.apache.flink.connector.hbase1.source.HBaseRowDataInputFormat.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseRowDataInputFormat.java:0)
Constructor <org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink.<init>(java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, org.apache.hadoop.conf.Configuration, org.apache.flink.connector.hbase.options.HBaseWriteOptions, java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseDynamicTableSink.java:0)
Constructor <org.apache.flink.connector.hbase2.source.AbstractTableInputFormat.<init>(org.apache.hadoop.conf.Configuration)> has parameter of type <org.apache.hadoop.conf.Configuration> in (AbstractTableInputFormat.java:0)
Constructor <org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseDynamicTableSource.java:0)
Constructor <org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseRowDataAsyncLookupFunction.java:0)
Constructor <org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, int, boolean, org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseDynamicTableSource.java:0)
Constructor <org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String, int)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseRowDataAsyncLookupFunction.java:0)
Constructor <org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat.<init>(org.apache.hadoop.conf.Configuration, java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration> in (HBaseRowDataInputFormat.java:0)
Field <org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.conf> has type <org.apache.hadoop.conf.Configuration> in (AbstractHBaseDynamicTableSource.java:0)
Field <org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink.hbaseConf> has type <org.apache.hadoop.conf.Configuration> in (HBaseDynamicTableSink.java:0)
Expand Down Expand Up @@ -92,4 +92,4 @@ Method <org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter.la
Method <org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$1(java.lang.Class, org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter$JdbcDeserializationConverter, java.lang.Object)> calls method <org.postgresql.jdbc.PgArray.getArray()> in (PostgresRowConverter.java:90)
Method <org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)> calls method <org.apache.flink.streaming.api.scala.DataStream.javaStream()> in (CassandraSink.java:205)
Method <org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)> has generic parameter type <org.apache.flink.streaming.api.scala.DataStream<IN>> with type argument depending on <org.apache.flink.streaming.api.scala.DataStream> in (CassandraSink.java:0)
Method <org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)> has parameter of type <org.apache.flink.streaming.api.scala.DataStream> in (CassandraSink.java:0)
Method <org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)> has parameter of type <org.apache.flink.streaming.api.scala.DataStream> in (CassandraSink.java:0)
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;

import org.apache.hadoop.conf.Configuration;

import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -52,7 +56,6 @@
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseLookupOptions;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
Expand All @@ -78,13 +81,31 @@ public DynamicTableSource createDynamicTableSource(Context context) {
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
LookupCache cache = null;

// Backward compatible to legacy caching options
if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
&& tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) > 0) {
cache =
DefaultLookupCache.newBuilder()
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
.build();
}

if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}

return new HBaseDynamicTableSource(
hbaseClientConf,
tableName,
hbaseSchema,
nullStringLiteral,
getHBaseLookupOptions(tableOptions));
tableOptions.get(LookupOptions.MAX_RETRIES),
cache);
}

@Override
Expand Down Expand Up @@ -133,6 +154,12 @@ public Set<ConfigOption<?>> optionalOptions() {
set.add(LOOKUP_CACHE_MAX_ROWS);
set.add(LOOKUP_CACHE_TTL);
set.add(LOOKUP_MAX_RETRIES);
set.add(LookupOptions.CACHE_TYPE);
set.add(LookupOptions.MAX_RETRIES);
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
return set;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
package org.apache.flink.connector.hbase1.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;

import org.apache.hadoop.conf.Configuration;

import javax.annotation.Nullable;

import java.util.Objects;

/** HBase table source implementation. */
@Internal
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
Expand All @@ -38,23 +41,38 @@ public HBaseDynamicTableSource(
String tableName,
HBaseTableSchema hbaseSchema,
String nullStringLiteral,
HBaseLookupOptions lookupOptions) {
super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
int maxRetryTimes,
@Nullable LookupCache cache) {
super(conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes, cache);
}

@Override
public DynamicTableSource copy() {
return new HBaseDynamicTableSource(
conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes, cache);
}

@Override
public InputFormat<RowData, ?> getInputFormat() {
return new HBaseRowDataInputFormat(conf, tableName, hbaseSchema, nullStringLiteral);
}

@VisibleForTesting
public HBaseLookupOptions getLookupOptions() {
return this.lookupOptions;
@Override
public boolean equals(Object o) {
if (!(o instanceof HBaseDynamicTableSource)) {
return false;
}
HBaseDynamicTableSource that = (HBaseDynamicTableSource) o;
return Objects.equals(conf, that.conf)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(hbaseSchema, that.hbaseSchema)
&& Objects.equals(nullStringLiteral, that.nullStringLiteral)
&& Objects.equals(maxRetryTimes, that.maxRetryTimes)
&& Objects.equals(cache, that.cache);
}

@Override
public int hashCode() {
return Objects.hash(conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes, cache);
}
}
Loading

0 comments on commit bf81768

Please sign in to comment.