Skip to content

Commit

Permalink
[FLINK-20812][hbase] Support 'properties.*' option to pass through al…
Browse files Browse the repository at this point in the history
…l the HBase properties

This closes apache#14536
  • Loading branch information
chaozwn authored Jan 14, 2021
1 parent 5905c7d commit 39dbd75
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
package org.apache.flink.connector.hbase1;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.options.HBaseOptions;
import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -35,12 +32,22 @@
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;

import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.connector.hbase.options.HBaseOptions.NULL_STRING_LITERAL;
import static org.apache.flink.connector.hbase.options.HBaseOptions.PROPERTIES_PREFIX;
import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
import static org.apache.flink.connector.hbase.options.HBaseOptions.TABLE_NAME;
import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM;
import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT;
import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration;
import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseWriteOptions;
import static org.apache.flink.connector.hbase.options.HBaseOptions.validatePrimaryKey;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;

Expand All @@ -50,105 +57,47 @@ public class HBase1DynamicTableFactory

private static final String IDENTIFIER = "hbase-1.4";

private static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("The name of HBase table to connect.");

private static final ConfigOption<String> ZOOKEEPER_QUORUM =
ConfigOptions.key("zookeeper.quorum")
.stringType()
.noDefaultValue()
.withDescription("The HBase Zookeeper quorum.");

private static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT =
ConfigOptions.key("zookeeper.znode.parent")
.stringType()
.defaultValue("/hbase")
.withDescription("The root dir in Zookeeper for HBase cluster.");

private static final ConfigOption<String> NULL_STRING_LITERAL =
ConfigOptions.key("null-string-literal")
.stringType()
.defaultValue("null")
.withDescription(
"Representation for null values for string fields. HBase source and "
+ "sink encodes/decodes empty bytes as null values for all types except string type.");

private static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE =
ConfigOptions.key("sink.buffer-flush.max-size")
.memoryType()
.defaultValue(MemorySize.parse("2mb"))
.withDescription(
"Writing option, maximum size in memory of buffered rows for each "
+ "writing request. This can improve performance for writing data to HBase database, "
+ "but may increase the latency. Can be set to '0' to disable it. ");

private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
.defaultValue(1000)
.withDescription(
"Writing option, maximum number of rows to buffer for each writing request. "
+ "This can improve performance for writing data to HBase database, but may increase the latency. "
+ "Can be set to '0' to disable it.");

private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
"Writing option, the interval to flush any buffered rows. "
+ "This can improve performance for writing data to HBase database, but may increase the latency. "
+ "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+ "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
helper.validateExcept(PROPERTIES_PREFIX);

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);

String hTableName = helper.getOptions().get(TABLE_NAME);
// create default configuration from current runtime env (`hbase-site.xml` in classpath)
// first,
Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, helper.getOptions().get(ZOOKEEPER_QUORUM));
hbaseClientConf.set(
HConstants.ZOOKEEPER_ZNODE_PARENT, helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseClientConf = getHBaseConfiguration(options);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);

return new HBaseDynamicTableSource(
hbaseClientConf, hTableName, hbaseSchema, nullStringLiteral);
hbaseClientConf, tableName, hbaseSchema, nullStringLiteral);
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
helper.validateExcept(PROPERTIES_PREFIX);

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);

HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
hbaseOptionsBuilder.setTableName(helper.getOptions().get(TABLE_NAME));
hbaseOptionsBuilder.setZkQuorum(helper.getOptions().get(ZOOKEEPER_QUORUM));
hbaseOptionsBuilder.setZkNodeParent(helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));

HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
writeBuilder.setBufferFlushMaxSizeInBytes(
helper.getOptions().get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
writeBuilder.setBufferFlushIntervalMillis(
helper.getOptions().get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
writeBuilder.setBufferFlushMaxRows(helper.getOptions().get(SINK_BUFFER_FLUSH_MAX_ROWS));
writeBuilder.setParallelism(helper.getOptions().getOptional(SINK_PARALLELISM).orElse(null));
String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(options);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);

return new HBaseDynamicTableSink(
hbaseSchema, hbaseOptionsBuilder.build(), writeBuilder.build(), nullStringLiteral);
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
}

@Override
Expand All @@ -175,37 +124,4 @@ public Set<ConfigOption<?>> optionalOptions() {
set.add(SINK_PARALLELISM);
return set;
}

// ------------------------------------------------------------------------------------------

/**
* Checks that the HBase table have row key defined. A row key is defined as an atomic type, and
* column families and qualifiers are defined as ROW type. There shouldn't be multiple atomic
* type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the primary key
* constraint must be defined on the single row key field.
*/
private static void validatePrimaryKey(TableSchema schema) {
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
if (!hbaseSchema.getRowKeyName().isPresent()) {
throw new IllegalArgumentException(
"HBase table requires to define a row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
schema.getPrimaryKey()
.ifPresent(
k -> {
if (k.getColumns().size() > 1) {
throw new IllegalArgumentException(
"HBase table doesn't support a primary Key on multiple columns. "
+ "The primary key of HBase table must be defined on row key field.");
}
if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
throw new IllegalArgumentException(
"Primary key of HBase table must be defined on the row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.options.HBaseOptions;
import org.apache.flink.connector.hbase1.sink.HBaseUpsertTableSink;
import org.apache.flink.connector.hbase1.source.HBaseTableSource;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -51,14 +50,15 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_PROPERTIES;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_TABLE_NAME;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_TABLE_NAME;
import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE;
import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_ZK_NODE_PARENT;
import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_ZK_QUORUM;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_ZK_NODE_PARENT;
import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_ZK_QUORUM;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
Expand All @@ -82,12 +82,7 @@ public StreamTableSource<Row> createStreamTableSource(Map<String, String> proper
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create default configuration from current runtime env (`hbase-site.xml` in classpath)
// first,
Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
Configuration hbaseClientConf = getHBaseConf(descriptorProperties);

String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
TableSchema tableSchema =
Expand All @@ -99,18 +94,15 @@ public StreamTableSource<Row> createStreamTableSource(Map<String, String> proper
@Override
public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(
Map<String, String> properties) {

final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
hbaseOptionsBuilder.setZkQuorum(descriptorProperties.getString(CONNECTOR_ZK_QUORUM));
hbaseOptionsBuilder.setTableName(descriptorProperties.getString(CONNECTOR_TABLE_NAME));
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(hbaseOptionsBuilder::setZkNodeParent);

TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA));
HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);

Configuration hbaseClientConf = getHBaseConf(descriptorProperties);

HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
descriptorProperties
.getOptionalInt(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS)
Expand All @@ -123,7 +115,10 @@ public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(
.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));

return new HBaseUpsertTableSink(
hbaseSchema, hbaseOptionsBuilder.build(), writeBuilder.build());
descriptorProperties.getString(CONNECTOR_TABLE_NAME),
hbaseSchema,
hbaseClientConf,
writeBuilder.build());
}

private HBaseTableSchema validateTableSchema(TableSchema schema) {
Expand Down Expand Up @@ -200,10 +195,30 @@ public List<String> supportedProperties() {
properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);

// HBase properties
properties.add(CONNECTOR_PROPERTIES + ".*");

return properties;
}

private String hbaseVersion() {
return CONNECTOR_VERSION_VALUE_143;
}

private static Configuration getHBaseConf(DescriptorProperties descriptorProperties) {
Configuration hbaseClientConf = HBaseConfigurationUtil.createHBaseConf();
descriptorProperties
.getOptionalString(CONNECTOR_ZK_QUORUM)
.ifPresent(zkQ -> hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, zkQ));

descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));

// add HBase properties
descriptorProperties
.getPropertiesWithPrefix(CONNECTOR_PROPERTIES)
.forEach(hbaseClientConf::set);
return hbaseClientConf;
}
}
Loading

0 comments on commit 39dbd75

Please sign in to comment.