Skip to content

Commit

Permalink
Revert "[FLINK-17435][hive] Hive non-partitioned source supports stre…
Browse files Browse the repository at this point in the history
…aming read"

This reverts commit 32bd094.
  • Loading branch information
rmetzger committed May 15, 2020
1 parent 32bd094 commit e69d21b
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 329 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,16 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction;
import org.apache.flink.connectors.hive.read.HiveTableFileInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.connectors.hive.read.TimestampedHiveInputSplit;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
Expand Down Expand Up @@ -62,6 +57,7 @@
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand All @@ -86,7 +82,6 @@
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
Expand Down Expand Up @@ -178,10 +173,10 @@ public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {

if (isStreamingSource()) {
if (catalogTable.getPartitionKeys().isEmpty()) {
return createStreamSourceForNonPartitionTable(execEnv, typeInfo, inputFormat, allHivePartitions.get(0));
} else {
return createStreamSourceForPartitionTable(execEnv, typeInfo, inputFormat);
throw new UnsupportedOperationException(
"Non-partition table does not support streaming read now.");
}
return createStreamSource(execEnv, typeInfo, inputFormat);
} else {
return createBatchSource(execEnv, typeInfo, inputFormat);
}
Expand Down Expand Up @@ -226,20 +221,28 @@ private DataStream<RowData> createBatchSource(StreamExecutionEnvironment execEnv
return source.name(explainSource());
}

private DataStream<RowData> createStreamSourceForPartitionTable(
private DataStream<RowData> createStreamSource(
StreamExecutionEnvironment execEnv,
TypeInformation<RowData> typeInfo,
HiveTableInputFormat inputFormat) {
Configuration configuration = new Configuration();
catalogTable.getOptions().forEach(configuration::setString);

String consumeOrderStr = configuration.get(STREAMING_SOURCE_CONSUME_ORDER);
ConsumeOrder consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
String consumeOffset = configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET);
String extractorKind = configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
String extractorClass = configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
String extractorPattern = configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
final Map<String, String> properties = catalogTable.getOptions();

String consumeOrder = properties.getOrDefault(
STREAMING_SOURCE_CONSUME_ORDER.key(),
STREAMING_SOURCE_CONSUME_ORDER.defaultValue());
String consumeOffset = properties.getOrDefault(
STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
STREAMING_SOURCE_CONSUME_START_OFFSET.defaultValue());
String extractorKind = properties.getOrDefault(
PARTITION_TIME_EXTRACTOR_KIND.key(),
PARTITION_TIME_EXTRACTOR_KIND.defaultValue());
String extractorClass = properties.get(PARTITION_TIME_EXTRACTOR_CLASS.key());
String extractorPattern = properties.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key());

String monitorIntervalStr = properties.get(STREAMING_SOURCE_MONITOR_INTERVAL.key());
Duration monitorInterval = monitorIntervalStr != null ?
TimeUtils.parseDuration(monitorIntervalStr) :
STREAMING_SOURCE_MONITOR_INTERVAL.defaultValue();

HiveContinuousMonitoringFunction monitoringFunction = new HiveContinuousMonitoringFunction(
hiveShim,
Expand All @@ -265,45 +268,6 @@ private DataStream<RowData> createStreamSourceForPartitionTable(
return new DataStreamSource<>(source);
}

private DataStream<RowData> createStreamSourceForNonPartitionTable(
StreamExecutionEnvironment execEnv,
TypeInformation<RowData> typeInfo,
HiveTableInputFormat inputFormat,
HiveTablePartition hiveTable) {
HiveTableFileInputFormat fileInputFormat = new HiveTableFileInputFormat(inputFormat, hiveTable);

Configuration configuration = new Configuration();
catalogTable.getOptions().forEach(configuration::setString);
String consumeOrderStr = configuration.get(STREAMING_SOURCE_CONSUME_ORDER);
ConsumeOrder consumeOrder = ConsumeOrder.getConsumeOrder(consumeOrderStr);
if (consumeOrder != ConsumeOrder.CREATE_TIME_ORDER) {
throw new UnsupportedOperationException(
"Only " + ConsumeOrder.CREATE_TIME_ORDER + " is supported for non partition table.");
}

String consumeOffset = configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET);
long currentReadTime = toMills(consumeOffset);

Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);

ContinuousFileMonitoringFunction<RowData> monitoringFunction =
new ContinuousFileMonitoringFunction<>(
fileInputFormat,
FileProcessingMode.PROCESS_CONTINUOUSLY,
execEnv.getParallelism(),
monitorInterval.toMillis(),
currentReadTime);

ContinuousFileReaderOperatorFactory<RowData, TimestampedFileInputSplit> factory =
new ContinuousFileReaderOperatorFactory<>(fileInputFormat);

String sourceName = "HiveFileMonitoringFunction";
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, sourceName)
.transform("Split Reader: " + sourceName, typeInfo, factory);

return new DataStreamSource<>(source);
}

@VisibleForTesting
HiveTableInputFormat getInputFormat(List<HiveTablePartition> allHivePartitions, boolean useMapRedReader) {
return new HiveTableInputFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.hive.ConsumeOrder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.JobConfWrapper;
Expand Down Expand Up @@ -88,6 +87,9 @@ public class HiveContinuousMonitoringFunction

private static final Logger LOG = LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);

private static final String CREATE_TIME_ORDER = "create-time";
private static final String PARTITION_TIME_ORDER = "partition-time";

/** The parallelism of the downstream readers. */
private final int readerParallelism;

Expand All @@ -107,7 +109,7 @@ public class HiveContinuousMonitoringFunction
private final DataType[] fieldTypes;

// consumer variables
private final ConsumeOrder consumeOrder;
private final String consumeOrder;
private final String consumeOffset;

// extractor variables
Expand Down Expand Up @@ -144,7 +146,7 @@ public HiveContinuousMonitoringFunction(
ObjectPath tablePath,
CatalogTable catalogTable,
int readerParallelism,
ConsumeOrder consumeOrder,
String consumeOrder,
String consumeOffset,
String extractorKind,
String extractorClass,
Expand Down Expand Up @@ -252,7 +254,9 @@ public long extractTimestamp(
this.distinctPartitions.addAll(this.distinctPartsState.get().iterator().next());
} else {
LOG.info("No state to restore for the {}.", getClass().getSimpleName());
this.currentReadTime = toMills(consumeOffset);
if (consumeOffset != null) {
this.currentReadTime = toMills(consumeOffset);
}
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ public HiveTableInputFormat(
this.useMapRedReader = useMapRedReader;
}

public JobConf getJobConf() {
return jobConf;
}

@Override
public void configure(org.apache.flink.configuration.Configuration parameters) {
}
Expand Down
Loading

0 comments on commit e69d21b

Please sign in to comment.