Skip to content

Commit

Permalink
ORC: Support estimated length for unclosed file. (apache#3784)
Browse files Browse the repository at this point in the history
  • Loading branch information
hililiwei authored Mar 28, 2022
1 parent 340a0c5 commit 6d39f3c
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 73 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ project(':iceberg-orc') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation("org.apache.avro:avro") {
exclude group: 'org.tukaani' // xz compression is not supported
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,7 @@ private void openCurrent() {
}

private boolean shouldRollToNewFile() {
// TODO: ORC file now not support target file size before closed
return !format.equals(FileFormat.ORC) &&
currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
return currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
}

private void closeCurrent() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -52,13 +51,7 @@ public ClusteredDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory

@Override
protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newDataWriter(outputFile, spec, partition);
} else {
return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

Expand Down Expand Up @@ -53,13 +52,7 @@ public ClusteredEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputF

@Override
protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition);
} else {
return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;

Expand Down Expand Up @@ -56,13 +55,7 @@ public ClusteredPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputF

@Override
protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newPositionDeleteWriter(outputFile, spec, partition);
} else {
return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -52,13 +51,7 @@ public FanoutDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fi

@Override
protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
if (fileFormat == FileFormat.ORC) {
EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
return writerFactory.newDataWriter(outputFile, spec, partition);
} else {
return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class TestBaseTaskWriter extends TableTestBase {
public static Object[][] parameters() {
return new Object[][] {
{"avro"},
{"orc"},
{"parquet"}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -39,15 +38,15 @@
@RunWith(Parameterized.class)
public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {

// TODO: add ORC once we support ORC rolling file writers

@Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
public static Object[] parameters() {
return new Object[][] {
new Object[]{FileFormat.AVRO, false},
new Object[]{FileFormat.AVRO, true},
new Object[]{FileFormat.PARQUET, false},
new Object[]{FileFormat.PARQUET, true},
new Object[]{FileFormat.ORC, false},
new Object[]{FileFormat.ORC, true}
};
}

Expand Down Expand Up @@ -129,8 +128,6 @@ public void testRollingDataWriterSplitData() throws IOException {

@Test
public void testRollingEqualityDeleteWriterNoRecords() throws IOException {
Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);

List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
Expand Down Expand Up @@ -50,7 +51,6 @@
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -322,7 +322,6 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
*/
@Test
public void testRewriteAvoidRepeateCompress() throws IOException {
Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
List<Record> expected = Lists.newArrayList();
Schema schema = icebergTableUnPartitioned.schema();
GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
Expand All @@ -331,7 +330,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
long filesize = 20000;
for (; fileAppender.length() < filesize; count++) {
Record record = SimpleDataUtil.createRecord(count, "iceberg");
Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString());
fileAppender.add(record);
expected.add(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception {

@Test
public void testTableWithTargetFileSize() throws Exception {
// TODO: ORC file does not support target file size before closed.
if (format == FileFormat.ORC) {
return;
}
// Adjust the target-file-size in table properties.
table.updateProperties()
.set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ public void testCompleteFiles() throws IOException {

@Test
public void testRollingWithTargetFileSize() throws IOException {
// TODO ORC don't support target file size before closed.
if (format == FileFormat.ORC) {
return;
}
try (TaskWriter<RowData> taskWriter = createTaskWriter(4)) {
List<RowData> rows = Lists.newArrayListWithCapacity(8000);
List<Record> records = Lists.newArrayListWithCapacity(8000);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import java.util.List;
import java.util.Optional;
import org.apache.orc.TypeDescription;

public class EstimateOrcAvgWidthVisitor extends OrcSchemaVisitor<Integer> {

@Override
public Integer record(TypeDescription record, List<String> names, List<Integer> fieldWidths) {
return fieldWidths.stream().reduce(Integer::sum).orElse(0);
}

@Override
public Integer list(TypeDescription array, Integer elementWidth) {
return elementWidth;
}

@Override
public Integer map(TypeDescription map, Integer keyWidth, Integer valueWidth) {
return keyWidth + valueWidth;
}

@Override
public Integer primitive(TypeDescription primitive) {
Optional<Integer> icebergIdOpt = ORCSchemaUtil.icebergID(primitive);

if (!icebergIdOpt.isPresent()) {
return 0;
}

switch (primitive.getCategory()) {
case BYTE:
case CHAR:
case SHORT:
case INT:
case FLOAT:
case BOOLEAN:
case LONG:
case DOUBLE:
case DATE:
return 8;
case TIMESTAMP:
case TIMESTAMP_INSTANT:
return 12;
case STRING:
case VARCHAR:
case BINARY:
return 128;
case DECIMAL:
return primitive.getPrecision() + 2;
default:
throw new IllegalArgumentException("Can't handle " + primitive);
}
}
}
Loading

0 comments on commit 6d39f3c

Please sign in to comment.