Skip to content

Commit

Permalink
[FLINK-22202][parquet] Thread safety in ParquetColumnarRowInputFormat
Browse files Browse the repository at this point in the history
This closes apache#15572
  • Loading branch information
JingsongLi authored Apr 13, 2021
1 parent b78ce3b commit 413ff6a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public ParquetColumnarRowInputFormat(
this.producedType = producedType;
}

@Override
protected int numBatchesToCirculate(org.apache.flink.configuration.Configuration config) {
// In a VectorizedColumnBatch, the dictionary will be lazied deserialized.
// If there are multiple batches at the same time, there may be thread safety problems,
// because the deserialization of the dictionary depends on some internal structures.
// We need set numBatchesToCirculate to 1.
return 1;
}

@Override
protected ParquetReaderBatch<RowData> createReaderBatch(
WritableColumnVector[] writableVectors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,16 @@ public ParquetReader createReader(final Configuration config, final SplitT split

checkSchema(fileSchema, requestedSchema);

final int numBatchesToCirculate =
config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
final Pool<ParquetReaderBatch<T>> poolOfBatches =
createPoolOfBatches(split, requestedSchema, numBatchesToCirculate);
createPoolOfBatches(split, requestedSchema, numBatchesToCirculate(config));

return new ParquetReader(reader, requestedSchema, totalRowCount, poolOfBatches);
}

protected int numBatchesToCirculate(Configuration config) {
return config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
}

@Override
public ParquetReader restoreReader(final Configuration config, final SplitT split)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -73,6 +74,7 @@
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
import static org.apache.parquet.schema.Types.primitive;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

/** Test for {@link ParquetColumnarRowInputFormat}. */
Expand Down Expand Up @@ -393,9 +395,17 @@ private int testReadingSplit(
CheckpointedPosition.NO_OFFSET, seekToRow)));

AtomicInteger cnt = new AtomicInteger(0);
final AtomicReference<RowData> previousRow = new AtomicReference<>();
forEachRemaining(
reader,
row -> {
if (previousRow.get() == null) {
previousRow.set(row);
} else {
// ParquetColumnarRowInputFormat should only have one row instance.
assertSame(previousRow.get(), row);
}

Integer v = expected.get(cnt.get());
if (v == null) {
assertTrue(row.isNullAt(0));
Expand Down

0 comments on commit 413ff6a

Please sign in to comment.