Skip to content

Commit

Permalink
[hotfix][connector-hive] Avoid serializing TableConfig
Browse files Browse the repository at this point in the history
Use and pass around only `threadNum` which is the only option read,
instead of the whole `TableConfig`, to prevent the relevant classes
that are serialized from trying to serialize also the `TableConfig`.
  • Loading branch information
matriv authored and twalthr committed Mar 22, 2022
1 parent 07150c4 commit 7fcd693
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
Expand Down Expand Up @@ -78,7 +77,7 @@ public ContinuousHiveSplitEnumerator(
Collection<List<String>> seenPartitionsSinceOffset,
FileSplitAssigner splitAssigner,
long discoveryInterval,
ReadableConfig flinkConf,
int threadNum,
JobConf jobConf,
ObjectPath tablePath,
ContinuousPartitionFetcher<Partition, T> fetcher,
Expand All @@ -95,7 +94,7 @@ public ContinuousHiveSplitEnumerator(
currentReadOffset,
seenPartitionsSinceOffset,
tablePath,
flinkConf,
threadNum,
jobConf,
fetcher,
fetcherContext);
Expand Down Expand Up @@ -188,7 +187,7 @@ static class PartitionMonitor<T extends Comparable<T>>
private final Set<List<String>> seenPartitionsSinceOffset;

private final ObjectPath tablePath;
private final ReadableConfig flinkConf;
private final int threadNum;
private final JobConf jobConf;
private final ContinuousPartitionFetcher<Partition, T> fetcher;
private final HiveContinuousPartitionContext<Partition, T> fetcherContext;
Expand All @@ -197,14 +196,14 @@ static class PartitionMonitor<T extends Comparable<T>>
T currentReadOffset,
Collection<List<String>> seenPartitionsSinceOffset,
ObjectPath tablePath,
ReadableConfig flinkConf,
int threadNum,
JobConf jobConf,
ContinuousPartitionFetcher<Partition, T> fetcher,
HiveContinuousPartitionContext<Partition, T> fetcherContext) {
this.currentReadOffset = currentReadOffset;
this.seenPartitionsSinceOffset = new HashSet<>(seenPartitionsSinceOffset);
this.tablePath = tablePath;
this.flinkConf = flinkConf;
this.threadNum = threadNum;
this.jobConf = jobConf;
this.fetcher = fetcher;
this.fetcherContext = fetcherContext;
Expand Down Expand Up @@ -244,7 +243,7 @@ public NewSplitsAndState<T> call() throws Exception {
0,
Collections.singletonList(
fetcherContext.toHiveTablePartition(partition)),
flinkConf,
threadNum,
jobConf));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private TableFunction<RowData> getLookupFunction(int[] keys) {

PartitionReader<HiveTablePartition, RowData> partitionReader =
new HiveInputFormatPartitionReader(
flinkConf,
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM),
jobConf,
hiveVersion,
tablePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.AbstractFileSource;
import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {

private static final long serialVersionUID = 1L;

private final ReadableConfig flinkConf;
private final int threadNum;
private final JobConfWrapper jobConfWrapper;
private final List<String> partitionKeys;
private final ContinuousPartitionFetcher<Partition, ?> fetcher;
Expand All @@ -73,7 +72,7 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
FileSplitAssigner.Provider splitAssigner,
BulkFormat<T, HiveSourceSplit> readerFormat,
@Nullable ContinuousEnumerationSettings continuousEnumerationSettings,
ReadableConfig flinkConf,
int threadNum,
JobConf jobConf,
ObjectPath tablePath,
List<String> partitionKeys,
Expand All @@ -86,10 +85,10 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
readerFormat,
continuousEnumerationSettings);
Preconditions.checkArgument(
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM) >= 1,
threadNum >= 1,
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
+ " cannot be less than 1");
this.flinkConf = flinkConf;
this.threadNum = threadNum;
this.jobConfWrapper = new JobConfWrapper(jobConf);
this.tablePath = tablePath;
this.partitionKeys = partitionKeys;
Expand Down Expand Up @@ -164,7 +163,7 @@ private boolean continuousPartitionedEnumerator() {
seenPartitions,
getAssignerFactory().create(new ArrayList<>(splits)),
getContinuousEnumerationSettings().getDiscoveryInterval().toMillis(),
flinkConf,
threadNum,
jobConfWrapper.conf(),
tablePath,
fetcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_ORDER;
import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
import static org.apache.flink.table.catalog.hive.util.HiveTableUtil.checkAcidTable;
import static org.apache.flink.util.Preconditions.checkArgument;

Expand All @@ -78,7 +79,8 @@ public class HiveSourceBuilder {
private static final Duration DEFAULT_SCAN_MONITOR_INTERVAL = Duration.ofMinutes(1L);

private final JobConf jobConf;
private final ReadableConfig flinkConf;
private final int threadNum;
private final boolean fallbackMappedReader;

private final ObjectPath tablePath;
private final Map<String, String> tableOptions;
Expand Down Expand Up @@ -110,7 +112,9 @@ public HiveSourceBuilder(
@Nonnull String tableName,
@Nonnull Map<String, String> tableOptions) {
this.jobConf = jobConf;
this.flinkConf = flinkConf;
this.threadNum =
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
this.fallbackMappedReader = flinkConf.get(TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
this.tablePath = new ObjectPath(dbName, tableName);
this.hiveVersion = hiveVersion == null ? HiveShimLoader.getHiveVersion() : hiveVersion;
HiveConf hiveConf = HiveConfUtils.create(jobConf);
Expand Down Expand Up @@ -147,7 +151,9 @@ public HiveSourceBuilder(
@Nullable String hiveVersion,
@Nonnull CatalogTable catalogTable) {
this.jobConf = jobConf;
this.flinkConf = flinkConf;
this.threadNum =
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
this.fallbackMappedReader = flinkConf.get(TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
this.tablePath = tablePath;
this.hiveVersion = hiveVersion == null ? HiveShimLoader.getHiveVersion() : hiveVersion;
this.fullSchema = catalogTable.getSchema();
Expand Down Expand Up @@ -231,12 +237,12 @@ public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulk
new Path[1],
new HiveSourceFileEnumerator.Provider(
partitions != null ? partitions : Collections.emptyList(),
flinkConf,
threadNum,
new JobConfWrapper(jobConf)),
splitAssigner,
bulkFormat,
continuousSourceSettings,
flinkConf,
threadNum,
jobConf,
tablePath,
partitionKeys,
Expand Down Expand Up @@ -318,7 +324,7 @@ private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
fullSchema.getFieldDataTypes(),
hiveVersion,
getProducedRowType(),
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)),
fallbackMappedReader),
limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
Expand All @@ -45,33 +44,27 @@ public class HiveSourceFileEnumerator implements FileEnumerator {
// For non-partition hive table, partitions only contains one partition which partitionValues is
// empty.
private final List<HiveTablePartition> partitions;
private final ReadableConfig flinkConf;
private final int threadNum;
private final JobConf jobConf;

public HiveSourceFileEnumerator(
List<HiveTablePartition> partitions, ReadableConfig flinkConf, JobConf jobConf) {
List<HiveTablePartition> partitions, int threadNum, JobConf jobConf) {
this.partitions = partitions;
this.flinkConf = flinkConf;
this.threadNum = threadNum;
this.jobConf = jobConf;
}

@Override
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
throws IOException {
return new ArrayList<>(createInputSplits(minDesiredSplits, partitions, flinkConf, jobConf));
return new ArrayList<>(createInputSplits(minDesiredSplits, partitions, threadNum, jobConf));
}

public static List<HiveSourceSplit> createInputSplits(
int minNumSplits,
List<HiveTablePartition> partitions,
ReadableConfig flinkConf,
JobConf jobConf)
int minNumSplits, List<HiveTablePartition> partitions, int threadNum, JobConf jobConf)
throws IOException {
List<HiveSourceSplit> hiveSplits = new ArrayList<>();
try (MRSplitsGetter splitsGetter =
new MRSplitsGetter(
flinkConf.get(
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM))) {
try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
for (HiveTablePartitionSplits partitionSplits :
splitsGetter.getHiveTablePartitionMRSplits(minNumSplits, partitions, jobConf)) {
HiveTablePartition partition = partitionSplits.getHiveTablePartition();
Expand Down Expand Up @@ -109,21 +102,19 @@ public static class Provider implements FileEnumerator.Provider {
private static final long serialVersionUID = 1L;

private final List<HiveTablePartition> partitions;
private final ReadableConfig flinkConf;
private final int threadNum;
private final JobConfWrapper jobConfWrapper;

public Provider(
List<HiveTablePartition> partitions,
ReadableConfig flinkConf,
JobConfWrapper jobConfWrapper) {
List<HiveTablePartition> partitions, int threadNum, JobConfWrapper jobConfWrapper) {
this.partitions = partitions;
this.flinkConf = flinkConf;
this.threadNum = threadNum;
this.jobConfWrapper = jobConfWrapper;
}

@Override
public FileEnumerator create() {
return new HiveSourceFileEnumerator(partitions, flinkConf, jobConfWrapper.conf());
return new HiveSourceFileEnumerator(partitions, threadNum, jobConfWrapper.conf());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su

private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);

private final ReadableConfig flinkConf;
private final boolean fallbackMappedReader;
private final boolean fallbackMappedWriter;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectIdentifier identifier;
Expand All @@ -128,7 +129,24 @@ public HiveTableSink(
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism) {
this.flinkConf = flinkConf;
this(
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
jobConf,
identifier,
table,
configuredParallelism);
}

private HiveTableSink(
boolean fallbackMappedReader,
boolean fallbackMappedWriter,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism) {
this.fallbackMappedReader = fallbackMappedReader;
this.fallbackMappedWriter = fallbackMappedWriter;
this.jobConf = jobConf;
this.identifier = identifier;
this.catalogTable = table;
Expand Down Expand Up @@ -298,7 +316,7 @@ private DataStreamSink<?> createStreamSink(
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());

BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
if (fallbackMappedWriter) {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
Expand Down Expand Up @@ -377,7 +395,7 @@ private CompactReader.Factory<RowData> createCompactReaderFactory(
catalogTable,
hiveVersion,
(RowType) tableSchema.toRowDataType().getLogicalType(),
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
fallbackMappedReader);
}

private HiveTableMetaStoreFactory msFactory() {
Expand Down Expand Up @@ -487,7 +505,12 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
public DynamicTableSink copy() {
HiveTableSink sink =
new HiveTableSink(
flinkConf, jobConf, identifier, catalogTable, configuredParallelism);
fallbackMappedReader,
fallbackMappedWriter,
jobConf,
identifier,
catalogTable,
configuredParallelism);
sink.staticPartitionSpec = staticPartitionSpec;
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ protected DataStream<RowData> getDataStream(
catalogTable.getPartitionKeys(),
remainingPartitions);

int threadNum =
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
int parallelism =
new HiveParallelismInference(tablePath, flinkConf)
.infer(
Expand All @@ -156,7 +158,7 @@ protected DataStream<RowData> getDataStream(
HiveSourceFileEnumerator.createInputSplits(
0,
hivePartitionsToRead,
flinkConf,
threadNum,
jobConf)
.size())
.limit(limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connectors.hive.read;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.table.PartitionReader;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
Expand All @@ -36,7 +35,7 @@ public class HiveInputFormatPartitionReader
implements PartitionReader<HiveTablePartition, RowData> {

private static final long serialVersionUID = 1L;
private final ReadableConfig flinkConf;
private final int threadNum;
private final JobConfWrapper jobConfWrapper;
private final String hiveVersion;
protected final ObjectPath tablePath;
Expand All @@ -51,7 +50,7 @@ public class HiveInputFormatPartitionReader
private transient int readingSplitId;

public HiveInputFormatPartitionReader(
ReadableConfig flinkConf,
int threadNum,
JobConf jobConf,
String hiveVersion,
ObjectPath tablePath,
Expand All @@ -60,7 +59,7 @@ public HiveInputFormatPartitionReader(
List<String> partitionKeys,
int[] selectedFields,
boolean useMapRedReader) {
this.flinkConf = flinkConf;
this.threadNum = threadNum;
this.jobConfWrapper = new JobConfWrapper(jobConf);
this.hiveVersion = hiveVersion;
this.tablePath = tablePath;
Expand All @@ -75,7 +74,7 @@ public HiveInputFormatPartitionReader(
public void open(List<HiveTablePartition> partitions) throws IOException {
hiveTableInputFormat =
new HiveTableInputFormat(
this.flinkConf,
this.threadNum,
this.jobConfWrapper.conf(),
this.partitionKeys,
this.fieldTypes,
Expand Down
Loading

0 comments on commit 7fcd693

Please sign in to comment.