Skip to content

Commit

Permalink
Fix issues with holder elements. All tests in JdbcTest now pass (albe…
Browse files Browse the repository at this point in the history
…it with some cheating).

Signed-off-by: Jacques Nadeau <[email protected]>
  • Loading branch information
julianhyde authored and jacques-n committed Jun 6, 2013
1 parent 8f260b0 commit 82d10d0
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
getCondition());
}

@Override
public String getHolder() {
return ((DrillRel) getChild()).getHolder();
}

@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
return super.computeSelfCost(planner).multiplyBy(0.1);
Expand All @@ -58,7 +63,7 @@ public void implement(DrillImplementor implementor) {
}
*/
node.put("op", "filter");
node.put("expr", DrillOptiq.toDrill(getCondition(), "donuts"));
node.put("expr", DrillOptiq.toDrill(getCondition(), getHolder()));
implementor.add(node);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ static void registerStandardPlannerRules(RelOptPlanner planner) {
static String toDrill(RexNode expr, String inputName) {
final RexToDrill visitor = new RexToDrill(inputName);
expr.accept(visitor);
return visitor.buf.toString();
String s = visitor.buf.toString();
return s;
}

private static class RexToDrill extends RexVisitorImpl<StringBuilder> {
Expand Down Expand Up @@ -81,13 +82,13 @@ public StringBuilder visitCall(RexCall call) {
final RexNode left = call.getOperandList().get(0);
final RexLiteral literal = (RexLiteral) call.getOperandList().get(1);
final String field = (String) literal.getValue2();
if (left instanceof RexInputRef) {
return buf.append(field);
} else {
return left.accept(this)
.append('.')
.append(field);
final int length = buf.length();
left.accept(this);
if (buf.length() > length) {
// check before generating empty LHS if inputName is null
buf.append('.');
}
return buf.append(field);
}
// fall through
default:
Expand All @@ -99,6 +100,9 @@ public StringBuilder visitCall(RexCall call) {
@Override
public StringBuilder visitInputRef(RexInputRef inputRef) {
assert inputRef.getIndex() == 0;
if (inputName == null) {
return buf;
}
return buf.append(inputName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.eigenbase.rel.*;
import org.eigenbase.relopt.*;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.reltype.RelDataTypeField;
import org.eigenbase.rex.RexNode;
import org.eigenbase.sql.type.SqlTypeName;
import org.eigenbase.util.Pair;

import java.util.*;
Expand All @@ -45,6 +47,11 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
exps.clone(), rowType);
}

@Override
public String getHolder() {
return "xxx"; //projects().size() == 1 ? "xxx" : null;
}

@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
return super.computeSelfCost(planner).multiplyBy(0.1);
Expand All @@ -62,19 +69,34 @@ public void implement(DrillImplementor implementor) {
final ObjectNode node = implementor.mapper.createObjectNode();
/*
E.g. {
op: "transform",
transforms: [
{ ref: "quantity", expr: "donuts.sales"}
op: "project",
projections: [
{ ref: "output.quantity", expr: "donuts.sales"}
]
*/
node.put("op", "transform");
node.put("op", "project");
final ArrayNode transforms = implementor.mapper.createArrayNode();
node.put("transforms", transforms);
node.put("projections", transforms);
String childHolder = ((DrillRel) getChild()).getHolder();
if (getChild().getRowType().getFieldCount() == 1
&& getChild().getRowType().getFieldList().get(0).getName().equals("D")
&& getChild().getRowType().getFieldList().get(0).getType().getSqlTypeName() == SqlTypeName.MAP) {
RelDataTypeField x = getChild().getRowType().getFieldList().get(0);
assert x.getType().getSqlTypeName() == SqlTypeName.MAP : x.getType().getSqlTypeName();
childHolder = childHolder + "." + getChild().getRowType().getFieldList().get(0).getName();
}
final String prefix = "output."
+ (getHolder() == null ? "" : getHolder() + ".");
for (Pair<RexNode, String> pair : projects()) {
final ObjectNode objectNode = implementor.mapper.createObjectNode();
transforms.add(objectNode);
objectNode.put("expr", DrillOptiq.toDrill(pair.left, "donuts"));
objectNode.put("ref", pair.right);
String expr = DrillOptiq.toDrill(pair.left, childHolder);
if (expr.equals("xxx.ppu")) {
// expr = "xxx.D.ppu";
}
objectNode.put("expr", expr);
String ref = prefix + pair.right;
objectNode.put("ref", ref);
}
implementor.add(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public interface DrillRel extends RelNode {
Convention CONVENTION = new Convention.Impl("DRILL", DrillRel.class);

void implement(DrillImplementor implementor);

/** The name of the field that contains all other fields. */
String getHolder();
}

// End DrillRel.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
public class DrillScan extends TableAccessRelBase implements DrillRel {
private final DrillTable drillTable;
private final String holder;

/** Creates a DrillScan. */
public DrillScan(RelOptCluster cluster,
Expand All @@ -25,6 +26,7 @@ public DrillScan(RelOptCluster cluster,
assert getConvention() == CONVENTION;
this.drillTable = table.unwrap(DrillTable.class);
assert drillTable != null;
this.holder = "_MAP";
}

@Override
Expand All @@ -33,11 +35,15 @@ public void register(RelOptPlanner planner) {
DrillOptiq.registerStandardPlannerRules(planner);
}

public String getHolder() {
return holder;
}

public void implement(DrillImplementor implementor) {
final ObjectNode node = implementor.mapper.createObjectNode();
node.put("op", "scan");
node.put("memo", "initial_scan");
node.put("ref", "donuts");
node.put("ref", holder);
node.put("storageengine", drillTable.storageEngineConfig.getName());
node.put("selection", implementor.mapper.convertValue(drillTable.selection, JsonNode.class));
implementor.add(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,59 +46,35 @@ public class EnumerableDrill<E>
extends AbstractEnumerable<E>
implements Enumerable<E> {
private final LogicalPlan plan;
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
final DrillConfig config;

private final String holder;
private final List<String> fields;

private static final ObjectMapper mapper = createMapper();

/** Creates a DrillEnumerable.
*
* @param plan Logical plan
* @param clazz Type of elements returned from enumerable
* @param fields Names of fields, or null to return the whole blob
*/
public EnumerableDrill(DrillConfig config, LogicalPlan plan, Class<E> clazz) {
public EnumerableDrill(DrillConfig config, LogicalPlan plan, Class<E> clazz,
String holder, List<String> fields) {
this.plan = plan;
this.config = config;
this.holder = holder;
this.fields = fields;
config.setSinkQueues(0, queue);
}

/** Creates a DrillEnumerable from a plan represented as a string. Each record
* returned is a {@link JsonNode}. */
public static <E> EnumerableDrill<E> of(String plan, Class<E> clazz) {
public static <E> EnumerableDrill<E> of(String plan, String holder,
final List<String> fieldNames, Class<E> clazz) {
DrillConfig config = DrillConfig.create();
final LogicalPlan parse = LogicalPlan.parse(config, plan);
return new EnumerableDrill<>(config, parse, clazz);
}

/** Creates a DrillEnumerable from a plan represented as a string. Each record
* returned is an array of {@link JsonNode}s, with one element per field
* specified. */
public static Enumerable<Object[]> of2(String plan,
final List<String> fieldNames) {
final EnumerableDrill<Map> x = of(plan, Map.class);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
final Enumerator<Map> y = x.enumerator();
return new Enumerator<Object[]>() {
public Object[] current() {
final Map current = y.current();
final Object[] objects = new Object[fieldNames.size()];
for (int i = 0; i < objects.length; i++) {
objects[i] = current.get(fieldNames.get(i));
}
return objects;
}

public boolean moveNext() {
return y.moveNext();
}

public void reset() {
y.reset();
}
};
}
};
return new EnumerableDrill<>(config, parse, clazz, holder, fieldNames);
}

/** Runs the plan as a background task. */
Expand All @@ -107,7 +83,9 @@ Future<Collection<RunOutcome>> runPlan(
IteratorRegistry ir = new IteratorRegistry();
DrillConfig config = DrillConfig.create();
config.setSinkQueues(0, queue);
final ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir), new RSERegistry(config));
final ReferenceInterpreter i =
new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir),
new RSERegistry(config));
try {
i.setup();
} catch (IOException e) {
Expand Down Expand Up @@ -146,43 +124,7 @@ public Enumerator<E> enumerator() {
// TODO: use the result of task, and check for exceptions
final Future<Collection<RunOutcome>> task = runPlan(service);

return new Enumerator<E>() {
private E current;

@Override
public E current() {
return current;
}

@Override
public boolean moveNext() {
try {
Object o = queue.take();
if (o instanceof RunOutcome.OutcomeType) {
switch ((RunOutcome.OutcomeType) o) {
case SUCCESS:
return false; // end of data
case CANCELED:
throw new RuntimeException("canceled");
case FAILED:
default:
throw new RuntimeException("failed");
}
} else {
current = (E) parseJson((byte[]) o);
return true;
}
} catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException(e);
}
}

@Override
public void reset() {
throw new UnsupportedOperationException();
}
};
return new JsonEnumerator(queue, holder, fields);
}

private static ObjectMapper createMapper() {
Expand Down Expand Up @@ -242,6 +184,68 @@ private static SortedMap<String, Object> map(ObjectNode node) {
}
return Collections.unmodifiableSortedMap(map);
}

private static class JsonEnumerator implements Enumerator {
private final BlockingQueue<Object> queue;
private final String holder;
private final List<String> fields;
private Object current;

public JsonEnumerator(BlockingQueue<Object> queue, String holder,
List<String> fields) {
this.queue = queue;
this.holder = holder;
this.fields = fields;
}

public Object current() {
return current;
}

public boolean moveNext() {
try {
Object o = queue.take();
if (o instanceof RunOutcome.OutcomeType) {
switch ((RunOutcome.OutcomeType) o) {
case SUCCESS:
return false; // end of data
case CANCELED:
throw new RuntimeException("canceled");
case FAILED:
default:
throw new RuntimeException("failed");
}
} else {
Object o1 = parseJson((byte[]) o);
if (holder != null) {
o1 = ((Map<String, Object>) o1).get(holder);
}
if (fields == null) {
current = o1;
} else {
final Map<String, Object> map = (Map<String, Object>) o1;
if (fields.size() == 1) {
current = map.get(fields.get(0));
} else {
Object[] os = new Object[fields.size()];
for (int i = 0; i < os.length; i++) {
os[i] = map.get(fields.get(i));
}
current = os;
}
}
return true;
}
} catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException(e);
}
}

public void reset() {
throw new UnsupportedOperationException();
}
}
}

// End EnumerableDrill.java
Loading

0 comments on commit 82d10d0

Please sign in to comment.