diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java new file mode 100644 index 000000000000..cde3f1e6c140 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DataTestHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.orc.OrcConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestOrcRowIterator { + + private static final Schema DATA_SCHEMA = new Schema( + required(100, "id", Types.LongType.get()) + ); + + private static final int NUM_ROWS = 8000; + private static final List DATA_ROWS; + + static { + DATA_ROWS = Lists.newArrayListWithCapacity(NUM_ROWS); + for (long i = 0; i < NUM_ROWS; i++) { + Record row = GenericRecord.create(DATA_SCHEMA); + row.set(0, i); + DATA_ROWS.add(row); + } + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private File testFile; + + @Before + public void writeFile() throws IOException { + testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = ORC.write(Files.localOutput(testFile)) + .createWriterFunc(GenericOrcWriter::buildWriter) + .schema(DATA_SCHEMA) + // write in such a way that the file contains 2 stripes each with 4 row groups of 1000 rows + .config("iceberg.orc.vectorbatch.size", "1000") + .config(OrcConf.ROW_INDEX_STRIDE.getAttribute(), "1000") + .config(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "4000") + .config(OrcConf.STRIPE_SIZE.getAttribute(), "1") + .build()) { + writer.addAll(DATA_ROWS); + } + } + + @Test + public void testReadAllStripes() throws IOException { + // With default batch size of 1024, will read the following batches + // Stripe 1: 1024, 1024, 1024, 928 + // Stripe 2: 1024, 1024, 1024, 928 + readAndValidate(Expressions.alwaysTrue(), DATA_ROWS); + } + + @Test + public void testReadFilteredRowGroupInMiddle() throws IOException { + // We skip the 2nd row group [1000, 2000] in Stripe 1 + // With default batch size of 1024, will read the following batches + // Stripe 1: 1000, 1024, 976 + readAndValidate(Expressions.in("id", 500, 2500, 3500), + Lists.newArrayList(Iterables.concat(DATA_ROWS.subList(0, 1000), DATA_ROWS.subList(2000, 4000)))); + } + + private void readAndValidate(Expression filter, List expected) throws IOException { + List rows; + try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) + .project(DATA_SCHEMA) + .filter(filter) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(DATA_SCHEMA, fileSchema)) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + DataTestHelpers.assertEquals(DATA_SCHEMA.asStruct(), expected.get(i), rows.get(i)); + } + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java index 2560f053d26d..d3eaa410fdbd 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java @@ -133,6 +133,7 @@ private static class OrcRowIterator implements CloseableIterator { private int nextRow; private VectorizedRowBatch current; + private int currentBatchSize; private final VectorizedRowBatchIterator batchIter; private final OrcRowReader reader; @@ -142,18 +143,20 @@ private static class OrcRowIterator implements CloseableIterator { this.reader = reader; current = null; nextRow = 0; + currentBatchSize = 0; } @Override public boolean hasNext() { - return (current != null && nextRow < current.size) || batchIter.hasNext(); + return (current != null && nextRow < currentBatchSize) || batchIter.hasNext(); } @Override public T next() { - if (current == null || nextRow >= current.size) { + if (current == null || nextRow >= currentBatchSize) { Pair nextBatch = batchIter.next(); current = nextBatch.first(); + currentBatchSize = current.size; nextRow = 0; this.reader.setBatchContext(nextBatch.second()); }