Skip to content

Commit

Permalink
Merge pull request apache#5918: [BEAM-4562] [SQL] Fix INSERT VALUES i…
Browse files Browse the repository at this point in the history
…n JDBC
  • Loading branch information
xumingmin authored Jul 11, 2018
2 parents 109f03c + 1275032 commit 39db610
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public TableModify toModificationRel(
updateColumnList,
sourceExpressionList,
flattened,
beamTable);
beamTable,
pipelineOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.ReleaseInfo;
Expand All @@ -37,7 +39,12 @@
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.rules.CalcRemoveRule;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.RuleSet;

/**
* Calcite JDBC driver with Beam defaults.
Expand Down Expand Up @@ -73,6 +80,21 @@ public class JdbcDriver extends Driver {
} finally {
Thread.currentThread().setContextClassLoader(origLoader);
}
// inject beam rules into planner
Hook.PLANNER.addThread(
new Function<RelOptPlanner, Void>() {
@Override
public Void apply(RelOptPlanner planner) {
for (RuleSet ruleSet : BeamRuleSets.getRuleSets()) {
for (RelOptRule rule : ruleSet) {
planner.addRule(rule);
}
}
planner.removeRule(CalcRemoveRule.INSTANCE);
return null;
}
});
// register JDBC driver
INSTANCE.register();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -41,6 +42,7 @@ public class BeamIOSinkRel extends TableModify
implements BeamRelNode, RelStructuredTypeFlattener.SelfFlatteningRel {

private final BeamSqlTable sqlTable;
private final Map<String, String> pipelineOptions;
private boolean isFlattening = false;

public BeamIOSinkRel(
Expand All @@ -52,7 +54,8 @@ public BeamIOSinkRel(
List<String> updateColumnList,
List<RexNode> sourceExpressionList,
boolean flattened,
BeamSqlTable sqlTable) {
BeamSqlTable sqlTable,
Map<String, String> pipelineOptions) {
super(
cluster,
cluster.traitSetOf(BeamLogicalConvention.INSTANCE),
Expand All @@ -64,6 +67,7 @@ public BeamIOSinkRel(
sourceExpressionList,
flattened);
this.sqlTable = sqlTable;
this.pipelineOptions = pipelineOptions;
}

@Override
Expand All @@ -79,7 +83,8 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
getUpdateColumnList(),
getSourceExpressionList(),
flattened,
sqlTable);
sqlTable,
pipelineOptions);
newRel.traitSet = traitSet;
return newRel;
}
Expand Down Expand Up @@ -120,4 +125,9 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
return input;
}
}

@Override
public Map<String, String> getPipelineOptions() {
return pipelineOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;

import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.rules.CalcRemoveRule;
import org.apache.calcite.tools.RuleSet;

/** Convertion for Beam SQL. */
public enum BeamLogicalConvention implements Convention {
Expand All @@ -53,14 +49,7 @@ public boolean satisfies(RelTrait trait) {
}

@Override
public void register(RelOptPlanner planner) {
for (RuleSet ruleSet : BeamRuleSets.getRuleSets()) {
for (RelOptRule rule : ruleSet) {
planner.addRule(rule);
}
}
planner.removeRule(CalcRemoveRule.INSTANCE);
}
public void register(RelOptPlanner planner) {}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void testToEnumerable_count() {
null,
null,
false,
new FakeTable());
new FakeTable(),
null);

Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node);
Enumerator<Object> enumerator = enumerable.enumerator();
Expand Down

0 comments on commit 39db610

Please sign in to comment.