Skip to content

Commit

Permalink
Add getType(field) to RecordCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
dain authored and martint committed Oct 29, 2013
1 parent d47d563 commit f58b438
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ public long getCompletedBytes()
return completedBytes;
}

@Override
public ColumnType getType(int field)
{
return types[field];
}

@Override
public boolean advanceNextPosition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ public long getCompletedBytes()
return completedBytes;
}

@Override
public ColumnType getType(int field)
{
return types[field];
}

@Override
public boolean advanceNextPosition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@ public static class FilterAndProjectOperatorFactory
private final List<TupleInfo> tupleInfos;
private boolean closed;

public FilterAndProjectOperatorFactory(int operatorId, FilterFunction filterFunction, ProjectionFunction... projections)
{
this(operatorId, filterFunction, ImmutableList.copyOf(checkNotNull(projections, "projections is null")));
}

public FilterAndProjectOperatorFactory(int operatorId, FilterFunction filterFunction, List<ProjectionFunction> projections)
public FilterAndProjectOperatorFactory(int operatorId, FilterFunction filterFunction, Iterable<? extends ProjectionFunction> projections)
{
this.operatorId = operatorId;
this.filterFunction = checkNotNull(filterFunction, "filterFunction is null");
Expand Down Expand Up @@ -72,12 +67,7 @@ public void close()
private final FilterFunction filterFunction;
private final List<ProjectionFunction> projections;

public FilterAndProjectOperator(OperatorContext operatorContext, FilterFunction filterFunction, ProjectionFunction... projections)
{
this(operatorContext, filterFunction, ImmutableList.copyOf(checkNotNull(projections, "projections is null")));
}

public FilterAndProjectOperator(OperatorContext operatorContext, FilterFunction filterFunction, List<ProjectionFunction> projections)
public FilterAndProjectOperator(OperatorContext operatorContext, FilterFunction filterFunction, Iterable<? extends ProjectionFunction> projections)
{
super(operatorContext, toTupleInfos(checkNotNull(projections, "projections is null")));
this.filterFunction = checkNotNull(filterFunction, "filterFunction is null");
Expand Down Expand Up @@ -112,7 +102,7 @@ protected void filterAndProjectRowOriented(Block[] blocks, PageBuilder pageBuild
}
}

private static List<TupleInfo> toTupleInfos(List<ProjectionFunction> projections)
private static List<TupleInfo> toTupleInfos(Iterable<? extends ProjectionFunction> projections)
{
ImmutableList.Builder<TupleInfo> tupleInfos = ImmutableList.builder();
for (ProjectionFunction projection : projections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,7 @@ public ScanFilterAndProjectOperatorFactory(
DataStreamProvider dataStreamProvider,
Iterable<ColumnHandle> columns,
FilterFunction filterFunction,
ProjectionFunction... projections)
{
this(operatorId, sourceId, dataStreamProvider, columns, filterFunction, ImmutableList.copyOf(checkNotNull(projections, "projections is null")));
}

public ScanFilterAndProjectOperatorFactory(
int operatorId,
PlanNodeId sourceId,
DataStreamProvider dataStreamProvider,
Iterable<ColumnHandle> columns,
FilterFunction filterFunction,
Iterable<ProjectionFunction> projections)
Iterable<? extends ProjectionFunction> projections)
{
this.operatorId = operatorId;
this.sourceId = checkNotNull(sourceId, "sourceId is null");
Expand Down Expand Up @@ -106,23 +95,7 @@ public ScanFilterAndProjectOperator(
DataStreamProvider dataStreamProvider,
Iterable<ColumnHandle> columns,
FilterFunction filterFunction,
ProjectionFunction... projections)
{
this(operatorContext,
sourceId,
dataStreamProvider,
columns,
filterFunction,
ImmutableList.copyOf(checkNotNull(projections, "projections is null")));
}

public ScanFilterAndProjectOperator(
OperatorContext operatorContext,
PlanNodeId sourceId,
DataStreamProvider dataStreamProvider,
Iterable<ColumnHandle> columns,
FilterFunction filterFunction,
Iterable<ProjectionFunction> projections)
Iterable<? extends ProjectionFunction> projections)
{
super(operatorContext,
sourceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public long getCompletedBytes()
return delegate.getCompletedBytes();
}

@Override
public ColumnType getType(int field)
{
return delegate.getType(field);
}

@Override
public boolean advanceNextPosition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class TupleInputResolver
private TupleReadable[] inputs;

private RecordCursor cursor;
private Map<Input, Type> inputTypes;

public void setInputs(TupleReadable[] inputs)
{
Expand All @@ -43,7 +42,6 @@ public void setCursor(RecordCursor cursor, Map<Input, Type> tupleInfo)
{
checkState(inputs == null, "%s already has inputs set", getClass().getName());
this.cursor = checkNotNull(cursor, "cursor is null");
this.inputTypes = checkNotNull(tupleInfo, "tupleInfo is null");
}

@Override
Expand Down Expand Up @@ -78,14 +76,14 @@ else if (cursor != null) {
return null;
}

switch (inputTypes.get(input)) {
switch (cursor.getType(input.getChannel())) {
case BOOLEAN:
return cursor.getBoolean(channel);
case BIGINT:
case LONG:
return cursor.getLong(channel);
case DOUBLE:
return cursor.getDouble(channel);
case VARCHAR:
case STRING:
return Slices.wrappedBuffer(cursor.getString(channel));
default:
throw new UnsupportedOperationException("not yet implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public long getCompletedBytes()
return 0;
}

@Override
public ColumnType getType(int field)
{
return columns.get(field).getType();
}

@Override
public boolean advanceNextPosition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ protected List<? extends OperatorFactory> createOperatorFactories()
{
BlockIterable blockIterable = getBlockIterable("orders", "totalprice", BlocksFileEncoding.RAW);
AlignmentOperatorFactory alignmentOperator = new AlignmentOperatorFactory(0, blockIterable);
FilterAndProjectOperatorFactory filterAndProjectOperator = new FilterAndProjectOperatorFactory(1, new DoubleFilter(50000.00), singleColumn(Type.DOUBLE, 0, 0));
FilterAndProjectOperatorFactory filterAndProjectOperator = new FilterAndProjectOperatorFactory(
1,
new DoubleFilter(50000.00),
ImmutableList.of(singleColumn(Type.DOUBLE, 0, 0)));

return ImmutableList.of(alignmentOperator, filterAndProjectOperator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.tuple.TupleInfo;
import com.facebook.presto.tuple.TupleReadable;
import com.facebook.presto.util.MaterializedResult;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.ExecutorService;

import static com.facebook.presto.operator.OperatorAssertion.assertOperatorEquals;
import static com.facebook.presto.operator.ProjectionFunctions.concat;
import static com.facebook.presto.operator.ProjectionFunctions.singleColumn;
import static com.facebook.presto.operator.RowPagesBuilder.rowPagesBuilder;
import static com.facebook.presto.tuple.TupleInfo.SINGLE_LONG;
Expand Down Expand Up @@ -67,22 +69,25 @@ public void testAlignment()
.addSequencePage(100, 0, 0)
.build();

OperatorFactory operatorFactory = new FilterAndProjectOperatorFactory(0, new FilterFunction()
{
@Override
public boolean filter(TupleReadable... cursors)
{
long value = cursors[1].getLong(0);
return 10 <= value && value < 20;
}
OperatorFactory operatorFactory = new FilterAndProjectOperatorFactory(
0,
new FilterFunction()
{
@Override
public boolean filter(TupleReadable... cursors)
{
long value = cursors[1].getLong(0);
return 10 <= value && value < 20;
}

@Override
public boolean filter(RecordCursor cursor)
{
long value = cursor.getLong(0);
return 10 <= value && value < 20;
}
}, ProjectionFunctions.concat(singleColumn(VARIABLE_BINARY, 0, 0), singleColumn(FIXED_INT_64, 1, 0)));
@Override
public boolean filter(RecordCursor cursor)
{
long value = cursor.getLong(0);
return 10 <= value && value < 20;
}
},
ImmutableList.of(concat(singleColumn(VARIABLE_BINARY, 0, 0), singleColumn(FIXED_INT_64, 1, 0))));

Operator operator = operatorFactory.createOperator(driverContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import java.util.List;

import static com.google.common.base.Preconditions.checkNotNull;

public class InfiniteRecordSet
implements RecordSet
{
Expand All @@ -42,18 +44,19 @@ public List<ColumnType> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new InMemoryRecordCursor(record);
return new InMemoryRecordCursor(types, record);
}

private static class InMemoryRecordCursor
implements RecordCursor
{
private final List<ColumnType> types;
private final List<?> record;

private InMemoryRecordCursor(List<?> record)
private InMemoryRecordCursor(List<ColumnType> types, List<?> record)
{
Preconditions.checkNotNull(record, "record is null");
this.record = record;
this.types = checkNotNull(ImmutableList.copyOf(types), "types is null");
this.record = checkNotNull(ImmutableList.copyOf(record), "record is null");
}

@Override
Expand All @@ -74,32 +77,38 @@ public boolean advanceNextPosition()
return true;
}

@Override
public ColumnType getType(int field)
{
return types.get(field);
}

@Override
public boolean getBoolean(int field)
{
Preconditions.checkNotNull(record.get(field), "value is null");
checkNotNull(record.get(field), "value is null");
return (Boolean) record.get(field);
}

@Override
public long getLong(int field)
{
Preconditions.checkNotNull(record.get(field), "value is null");
checkNotNull(record.get(field), "value is null");
return (Long) record.get(field);
}

@Override
public double getDouble(int field)
{
Preconditions.checkNotNull(record.get(field), "value is null");
checkNotNull(record.get(field), "value is null");
return (Double) record.get(field);
}

@Override
public byte[] getString(int field)
{
Object value = record.get(field);
Preconditions.checkNotNull(value, "value is null");
checkNotNull(value, "value is null");
if (value instanceof byte[]) {
return (byte[]) value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,22 @@ public List<ColumnType> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new InMemoryRecordCursor(records.iterator(), totalBytes);
return new InMemoryRecordCursor(types, records.iterator(), totalBytes);
}

private static class InMemoryRecordCursor
implements RecordCursor
{
private final List<ColumnType> types;
private final Iterator<? extends List<?>> records;
private final long totalBytes;
private List<?> record;
private long completedBytes;

private InMemoryRecordCursor(Iterator<? extends List<?>> records, long totalBytes)
private InMemoryRecordCursor(List<ColumnType> types, Iterator<? extends List<?>> records, long totalBytes)
{
this.types = types;

this.records = records;

this.totalBytes = totalBytes;
Expand All @@ -79,6 +82,12 @@ public long getCompletedBytes()
return completedBytes;
}

@Override
public ColumnType getType(int field)
{
return types.get(field);
}

@Override
public boolean advanceNextPosition()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface RecordCursor

long getCompletedBytes();

ColumnType getType(int field);

boolean advanceNextPosition();

boolean getBoolean(int field);
Expand Down

0 comments on commit f58b438

Please sign in to comment.