Skip to content

Commit

Permalink
Add LongSelectiveStreamReader
Browse files Browse the repository at this point in the history
Benchmark results:

```
Benchmark                                       (typeSignature)  (withNulls)  Mode  Cnt  Score    Error  Units
BenchmarkSelectiveStreamReaders.read                    boolean         true  avgt   20  0.040 ±  0.001   s/op
BenchmarkSelectiveStreamReaders.read                    boolean        false  avgt   20  0.029 ±  0.001   s/op
BenchmarkSelectiveStreamReaders.read                    integer         true  avgt   20  0.083 ±  0.004   s/op
BenchmarkSelectiveStreamReaders.read                    integer        false  avgt   20  0.083 ±  0.003   s/op
BenchmarkSelectiveStreamReaders.read                     bigint         true  avgt   20  0.081 ±  0.003   s/op
BenchmarkSelectiveStreamReaders.read                     bigint        false  avgt   20  0.101 ±  0.003   s/op
BenchmarkSelectiveStreamReaders.read                   smallint         true  avgt   20  0.077 ±  0.002   s/op
BenchmarkSelectiveStreamReaders.read                   smallint        false  avgt   20  0.080 ±  0.003   s/op
BenchmarkSelectiveStreamReaders.read                       date         true  avgt   20  0.081 ±  0.002   s/op
BenchmarkSelectiveStreamReaders.read                       date        false  avgt   20  0.094 ±  0.004   s/op

BenchmarkSelectiveStreamReaders.readWithFilter          boolean         true  avgt   20  0.083 ± 0.005   s/op
BenchmarkSelectiveStreamReaders.readWithFilter          boolean        false  avgt   20  0.100 ± 0.002   s/op
BenchmarkSelectiveStreamReaders.readWithFilter          integer         true  avgt   20  0.108 ± 0.002   s/op
BenchmarkSelectiveStreamReaders.readWithFilter          integer        false  avgt   20  0.126 ± 0.003   s/op
BenchmarkSelectiveStreamReaders.readWithFilter           bigint         true  avgt   20  0.121 ± 0.008   s/op
BenchmarkSelectiveStreamReaders.readWithFilter           bigint        false  avgt   20  0.152 ± 0.006   s/op
BenchmarkSelectiveStreamReaders.readWithFilter         smallint         true  avgt   20  0.107 ± 0.012   s/op
BenchmarkSelectiveStreamReaders.readWithFilter         smallint        false  avgt   20  0.121 ± 0.002   s/op
BenchmarkSelectiveStreamReaders.readWithFilter             date         true  avgt   20  0.111 ± 0.007   s/op
BenchmarkSelectiveStreamReaders.readWithFilter             date        false  avgt   20  0.132 ± 0.005   s/op

BenchmarkSelectiveStreamReaders.readAllNull          boolean  avgt   20  0.003 ±  0.001   s/op
BenchmarkSelectiveStreamReaders.readAllNull          integer  avgt   20  0.003 ±  0.001   s/op
BenchmarkSelectiveStreamReaders.readAllNull           bigint  avgt   20  0.003 ±  0.001   s/op
BenchmarkSelectiveStreamReaders.readAllNull         smallint  avgt   20  0.003 ±  0.001   s/op
BenchmarkSelectiveStreamReaders.readAllNull             date  avgt   20  0.003 ±  0.001   s/op
```
  • Loading branch information
mbasmanova committed Jul 16, 2019
1 parent d326587 commit 903e19c
Show file tree
Hide file tree
Showing 10 changed files with 1,179 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class TestHivePushdownFilterQueries
extends AbstractTestQueryFramework
{
private static final String WITH_LINEITEM_EX = "WITH lineitem_ex AS (" +
"SELECT linenumber, " +
"SELECT linenumber, orderkey, " +
" CASE WHEN linenumber % 7 = 0 THEN null ELSE shipmode = 'AIR' END AS ship_by_air, " +
" CASE WHEN linenumber % 5 = 0 THEN null ELSE returnflag = 'R' END AS is_returned " +
"FROM lineitem)";
Expand All @@ -50,11 +50,12 @@ private static QueryRunner createQueryRunner()
Optional.empty());

queryRunner.execute(noPushdownFilter(queryRunner.getDefaultSession()),
"CREATE TABLE lineitem_ex (linenumber, ship_by_air, is_returned) AS " +
"SELECT linenumber, " +
" IF (linenumber % 7 = 0, null, shipmode = 'AIR') AS ship_by_air, " +
" IF (linenumber % 5 = 0, null, returnflag = 'R') AS is_returned " +
"FROM lineitem");
"CREATE TABLE lineitem_ex (linenumber, orderkey, ship_by_air, is_returned) AS " +
"SELECT linenumber, " +
" orderkey, " +
" IF (linenumber % 7 = 0, null, shipmode = 'AIR') AS ship_by_air, " +
" IF (linenumber % 5 = 0, null, returnflag = 'R') AS is_returned " +
"FROM lineitem");

return queryRunner;
}
Expand Down Expand Up @@ -83,6 +84,24 @@ public void testBooleans()
assertQueryUsingH2Cte("SELECT COUNT(*) FROM lineitem_ex WHERE ship_by_air is not null AND is_returned = true");
}

@Test
public void testNumeric()
{
assertQuery("SELECT orderkey, custkey, orderdate, shippriority FROM orders");

assertQuery("SELECT count(*) FROM orders WHERE orderkey BETWEEN 100 AND 1000 AND custkey BETWEEN 500 AND 800");

assertQuery("SELECT custkey, orderdate, shippriority FROM orders WHERE orderkey BETWEEN 100 AND 1000 AND custkey BETWEEN 500 AND 800");

assertQuery("SELECT orderkey, orderdate FROM orders WHERE orderdate BETWEEN date '1994-01-01' AND date '1997-03-30'");

assertQueryUsingH2Cte("SELECT count(*) FROM lineitem_ex WHERE orderkey < 30000 AND ship_by_air = true");

assertQueryUsingH2Cte("SELECT linenumber, orderkey, ship_by_air, is_returned FROM lineitem_ex WHERE orderkey < 30000 AND ship_by_air = true");

assertQueryUsingH2Cte("SELECT linenumber, ship_by_air, is_returned FROM lineitem_ex WHERE orderkey < 30000 AND ship_by_air = true");
}

private void assertQueryUsingH2Cte(String query)
{
assertQuery(query, WITH_LINEITEM_EX + query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private static SelectiveStreamReader[] createStreamReaders(
streamReaders[columnId] = createStreamReader(
streamDescriptor,
Optional.ofNullable(filters.get(columnId)),
outputColumns.contains(columnId),
Optional.ofNullable(includedColumns.get(columnId)),
Optional.ofNullable(requiredSubfields.get(columnId)).orElse(ImmutableList.of()),
hiveStorageTimeZone,
systemMemoryContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc.reader;

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.IntArrayBlock;
import com.facebook.presto.spi.block.LongArrayBlock;
import com.facebook.presto.spi.block.ShortArrayBlock;
import com.facebook.presto.spi.type.Type;

import javax.annotation.Nullable;

import java.util.Optional;

import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

abstract class AbstractLongSelectiveStreamReader
implements SelectiveStreamReader
{
protected final boolean outputRequired;
@Nullable
protected final Type outputType;

@Nullable
protected long[] values;
@Nullable
protected boolean[] nulls;
@Nullable
protected int[] outputPositions;
protected int outputPositionCount;

protected AbstractLongSelectiveStreamReader(Optional<Type> outputType)
{
this.outputRequired = outputType.isPresent();
this.outputType = requireNonNull(outputType, "outputType is null").orElse(null);
}

@Override
public int[] getReadPositions()
{
return outputPositions;
}

protected Block buildOutputBlock(int[] positions, int positionCount, boolean includeNulls)
{
if (outputType == BIGINT) {
return getLongArrayBlock(positions, positionCount, includeNulls);
}

if (outputType == INTEGER || outputType == DATE) {
return getIntArrayBlock(positions, positionCount, includeNulls);
}

if (outputType == SMALLINT) {
return getShortArrayBlock(positions, positionCount, includeNulls);
}

throw new UnsupportedOperationException("Unsupported type: " + outputType);
}

private Block getLongArrayBlock(int[] positions, int positionCount, boolean includeNulls)
{
if (positionCount == outputPositionCount) {
LongArrayBlock block;
if (includeNulls) {
block = new LongArrayBlock(positionCount, Optional.ofNullable(nulls), values);
nulls = null;
}
else {
block = new LongArrayBlock(positionCount, Optional.empty(), values);
}
values = null;
return block;
}

long[] valuesCopy = new long[positionCount];
boolean[] nullsCopy = null;

if (includeNulls) {
nullsCopy = new boolean[positionCount];
}

int positionIndex = 0;
int nextPosition = positions[positionIndex];
for (int i = 0; i < outputPositionCount; i++) {
if (outputPositions[i] < nextPosition) {
continue;
}

assert outputPositions[i] == nextPosition;

valuesCopy[positionIndex] = this.values[i];
if (includeNulls) {
nullsCopy[positionIndex] = this.nulls[i];
}

positionIndex++;
if (positionIndex >= positionCount) {
break;
}

nextPosition = positions[positionIndex];
}

return new LongArrayBlock(positionCount, Optional.ofNullable(nullsCopy), valuesCopy);
}

private Block getIntArrayBlock(int[] positions, int positionCount, boolean includeNulls)
{
int[] valuesCopy = new int[positionCount];
boolean[] nullsCopy = null;
if (includeNulls) {
nullsCopy = new boolean[positionCount];
}

int positionIndex = 0;
int nextPosition = positions[positionIndex];
for (int i = 0; i < outputPositionCount; i++) {
if (outputPositions[i] < nextPosition) {
continue;
}

assert outputPositions[i] == nextPosition;

valuesCopy[positionIndex] = toIntExact(this.values[i]);
if (includeNulls) {
nullsCopy[positionIndex] = this.nulls[i];
}

positionIndex++;
if (positionIndex >= positionCount) {
break;
}

nextPosition = positions[positionIndex];
}

return new IntArrayBlock(positionCount, Optional.ofNullable(nullsCopy), valuesCopy);
}

private Block getShortArrayBlock(int[] positions, int positionCount, boolean includeNulls)
{
short[] valuesCopy = new short[positionCount];
boolean[] nullsCopy = null;
if (includeNulls) {
nullsCopy = new boolean[positionCount];
}

int positionIndex = 0;
int nextPosition = positions[positionIndex];
for (int i = 0; i < outputPositionCount; i++) {
if (outputPositions[i] < nextPosition) {
continue;
}

assert outputPositions[i] == nextPosition;

valuesCopy[positionIndex] = (short) this.values[i];
if (includeNulls) {
nullsCopy[positionIndex] = this.nulls[i];
}

positionIndex++;
if (positionIndex >= positionCount) {
break;
}

nextPosition = positions[positionIndex];
}

return new ShortArrayBlock(positionCount, Optional.ofNullable(nullsCopy), valuesCopy);
}

protected void ensureValuesCapacity(int capacity, boolean recordNulls)
{
if (values == null || values.length < capacity) {
values = new long[capacity];
}

if (recordNulls) {
if (nulls == null || nulls.length < capacity) {
nulls = new boolean[capacity];
}
}
}

protected void ensureOutputPositionsCapacity(int capacity)
{
if (outputPositions == null || outputPositions.length < capacity) {
outputPositions = new int[capacity];
}
}
}
Loading

0 comments on commit 903e19c

Please sign in to comment.