Skip to content

Commit

Permalink
[FLINK-4581] [table] Fix Table API throwing "No suitable driver found…
Browse files Browse the repository at this point in the history
… for jdbc:calcite"

This closes apache#2506
This closes apache#1491 // closing stale PR
This closes apache#997  // closing stale PR
  • Loading branch information
twalthr authored and fhueske committed Oct 22, 2016
1 parent 770f2f8 commit f0e451a
Showing 1 changed file with 11 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@

package org.apache.flink.api.table

import java.util.Collections

import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
import org.apache.calcite.plan._
import org.apache.calcite.plan.volcano.VolcanoPlanner
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rex.{RexExecutorImpl, RexBuilder}
import org.apache.calcite.schema.{Schemas, SchemaPlus}
import org.apache.calcite.tools.Frameworks.PlannerAction
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}

/**
* Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
*/
class FlinkRelBuilder(
context: Context,
cluster: RelOptCluster,
relOptCluster: RelOptCluster,
relOptSchema: RelOptSchema)
extends RelBuilder(
context,
cluster,
relOptCluster,
relOptSchema) {

def getPlanner: RelOptPlanner = cluster.getPlanner
Expand All @@ -49,37 +50,20 @@ class FlinkRelBuilder(
object FlinkRelBuilder {

def create(config: FrameworkConfig): FlinkRelBuilder = {
// prepare planner and collect context instances
val clusters: Array[RelOptCluster] = Array(null)
val relOptSchemas: Array[RelOptSchema] = Array(null)
val rootSchemas: Array[SchemaPlus] = Array(null)
Frameworks.withPlanner(new PlannerAction[Void] {
override def apply(
cluster: RelOptCluster,
relOptSchema: RelOptSchema,
rootSchema: SchemaPlus)
: Void = {
clusters(0) = cluster
relOptSchemas(0) = relOptSchema
rootSchemas(0) = rootSchema
null
}
})
val planner = clusters(0).getPlanner
planner.setExecutor(config.getExecutor)
val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader]

// create Flink type factory
val typeSystem = config.getTypeSystem
val typeFactory = new FlinkTypeFactory(typeSystem)

// create context instances with Flink type factory
val planner = new VolcanoPlanner(Contexts.empty())
planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
val relOptSchema = new CalciteCatalogReader(
calciteSchema,
config.getParserConfig.caseSensitive(),
defaultRelOptSchema.getSchemaName,
Collections.emptyList(),
typeFactory)

new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
Expand Down

0 comments on commit f0e451a

Please sign in to comment.