Skip to content

Commit

Permalink
Flink: Fix UPSERT delete file metadata (apache#4364)
Browse files Browse the repository at this point in the history
Co-authored-by: liliwei <[email protected]>
  • Loading branch information
kbendick and hililiwei authored Mar 28, 2022
1 parent 74b51ca commit 340a0c5
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 3 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ subprojects {
}
})

maxHeapSize = "1500m"

testLogging {
events "failed"
exceptionFormat "full"
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de
*/
protected abstract StructLike asStructLike(T data);

/**
* Wrap the passed in key of a row as a {@link StructLike}
*/
protected abstract StructLike asStructLikeKey(T key);

public void write(T row) throws IOException {
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());

Expand Down Expand Up @@ -167,7 +172,7 @@ public void delete(T row) throws IOException {
* @param key is the projected data whose columns are the same as the equality fields.
*/
public void deleteKey(T key) throws IOException {
if (!internalPosDelete(asStructLike(key))) {
if (!internalPosDelete(asStructLikeKey(key))) {
eqDeleteWriter.write(key);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ public void delete(Record row) throws IOException {
deltaWriter.delete(row);
}

// The caller of this function is responsible for passing in a record with only the key fields
public void deleteKey(Record key) throws IOException {
deltaWriter.deleteKey(key);
}
Expand All @@ -479,6 +480,11 @@ private GenericEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema
protected StructLike asStructLike(Record row) {
return row;
}

@Override
protected StructLike asStructLikeKey(Record data) {
return data;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
throw new UnsupportedOperationException("Not implemented for Flink 1.12 during PR review");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
throw new UnsupportedOperationException("Not implemented for Flink 1.13 during PR review");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper keyWrapper;
private final RowDataProjection keyProjection;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
this.keyProjection = RowDataProjection.create(schema, deleteSchema);
this.upsert = upsert;
}

Expand All @@ -74,7 +80,7 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
Expand Down Expand Up @@ -103,5 +109,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -69,8 +71,13 @@ public RowDataTaskWriterFactory(Table table,

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row
// may differ from the deleted row other than the primary key fields, and the delete file must contain values
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
Expand Down
Loading

0 comments on commit 340a0c5

Please sign in to comment.