Skip to content

Commit

Permalink
[FLINK-26843][table-planner] Cleanup TableConfig & ReadableConfig
Browse files Browse the repository at this point in the history
Cleanup usages and naming of variables regarding `TableConfig` and
`ReadableConfig` to make clear what is used/passed as variables and
method args.

This closes apache#19205.
  • Loading branch information
matriv authored and twalthr committed Mar 24, 2022
1 parent 38217d6 commit 3ddd74e
Show file tree
Hide file tree
Showing 16 changed files with 70 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ class StreamTableEnvironmentImplTest {
private def getStreamTableEnvironment(
env: StreamExecutionEnvironment,
elements: DataStream[Int]) = {
val config = TableConfig.getDefault
val tableConfig = TableConfig.getDefault
val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = new ModuleManager
new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
new FunctionCatalog(config, catalogManager, moduleManager),
config,
new FunctionCatalog(tableConfig, catalogManager, moduleManager),
tableConfig,
env,
new TestPlanner(elements.javaStream.getTransformation),
new ExecutorMock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
* @return The optimized [[RelNode]] tree
*/
private def optimizeTree(relNode: RelNode): RelNode = {
val config = planner.getTableConfig
val programs = TableConfigUtils.getCalciteConfig(config).getBatchProgram
.getOrElse(FlinkBatchProgram.buildProgram(config))
val tableConfig = planner.getTableConfig
val programs = TableConfigUtils.getCalciteConfig(tableConfig).getBatchProgram
.getOrElse(FlinkBatchProgram.buildProgram(tableConfig))
Preconditions.checkNotNull(programs)

val context = unwrapContext(relNode)
Expand All @@ -89,7 +89,7 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)

override def isBatchMode: Boolean = true

override def getTableConfig: TableConfig = config
override def getTableConfig: TableConfig = tableConfig

override def getFunctionCatalog: FunctionCatalog = planner.functionCatalog

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ class RelNodeWrapper(relNode: RelNode) {
/**
* Builds [[RelNodeBlock]] plan
*/
class RelNodeBlockPlanBuilder private(config: ReadableConfig) {
class RelNodeBlockPlanBuilder private(tableConfig: ReadableConfig) {

private val node2Wrapper = new util.IdentityHashMap[RelNode, RelNodeWrapper]()
private val node2Block = new util.IdentityHashMap[RelNode, RelNodeBlock]()

private val isUnionAllAsBreakPointEnabled = config
private val isUnionAllAsBreakPointEnabled = tableConfig
.get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)

/**
Expand Down Expand Up @@ -415,7 +415,7 @@ object RelNodeBlockPlanBuilder {
*/
def buildRelNodeBlockPlan(
sinkNodes: Seq[RelNode],
config: ReadableConfig): Seq[RelNodeBlock] = {
tableConfig: ReadableConfig): Seq[RelNodeBlock] = {
require(sinkNodes.nonEmpty)

// expand QueryOperationCatalogViewTable in TableScan
Expand All @@ -426,8 +426,8 @@ object RelNodeBlockPlanBuilder {
Seq(new RelNodeBlock(convertedRelNodes.head))
} else {
// merge multiple RelNode trees to RelNode dag
val relNodeDag = reuseRelNodes(convertedRelNodes, config)
val builder = new RelNodeBlockPlanBuilder(config)
val relNodeDag = reuseRelNodes(convertedRelNodes, tableConfig)
val builder = new RelNodeBlockPlanBuilder(tableConfig)
builder.buildRelNodeBlockPlan(relNodeDag)
}
}
Expand All @@ -438,8 +438,8 @@ object RelNodeBlockPlanBuilder {
* @param relNodes RelNode trees
* @return RelNode dag which reuse common subPlan in each tree
*/
private def reuseRelNodes(relNodes: Seq[RelNode], config: ReadableConfig): Seq[RelNode] = {
val findOpBlockWithDigest = config
private def reuseRelNodes(relNodes: Seq[RelNode], tableConfig: ReadableConfig): Seq[RelNode] = {
val findOpBlockWithDigest = tableConfig
.get(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED)
if (!findOpBlockWithDigest) {
return relNodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
miniBatchInterval: MiniBatchInterval,
isSinkBlock: Boolean): RelNode = {

val config = planner.getTableConfig
val calciteConfig = TableConfigUtils.getCalciteConfig(config)
val tableConfig = planner.getTableConfig
val calciteConfig = TableConfigUtils.getCalciteConfig(tableConfig)
val programs = calciteConfig.getStreamProgram
.getOrElse(FlinkStreamProgram.buildProgram(config))
.getOrElse(FlinkStreamProgram.buildProgram(tableConfig))
Preconditions.checkNotNull(programs)

val context = unwrapContext(relNode)
Expand All @@ -166,7 +166,7 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)

override def isBatchMode: Boolean = false

override def getTableConfig: TableConfig = config
override def getTableConfig: TableConfig = tableConfig

override def getFunctionCatalog: FunctionCatalog = planner.functionCatalog

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object FlinkBatchProgram {
val PHYSICAL = "physical"
val PHYSICAL_REWRITE = "physical_rewrite"

def buildProgram(config: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = {
def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]()

chainedProgram.addLast(
Expand Down Expand Up @@ -151,7 +151,7 @@ object FlinkBatchProgram {
.build())

// join reorder
if (config.get(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
if (tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
chainedProgram.addLast(
JOIN_REORDER,
FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object FlinkStreamProgram {
val PHYSICAL = "physical"
val PHYSICAL_REWRITE = "physical_rewrite"

def buildProgram(config: ReadableConfig): FlinkChainedProgram[StreamOptimizeContext] = {
def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[StreamOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[StreamOptimizeContext]()

// rewrite sub-queries to joins
Expand Down Expand Up @@ -161,7 +161,7 @@ object FlinkStreamProgram {
.build())

// join reorder
if (config.get(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
if (tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
chainedProgram.addLast(
JOIN_REORDER,
FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ class FlinkLogicalRankRuleForConstantRange extends FlinkLogicalRankRuleBase {
val predicate = calc.getProgram.expandLocalRef(condition)
// the rank function is the last field of FlinkLogicalOverAggregate
val rankFieldIndex = window.getRowType.getFieldCount - 1
val config = unwrapTableConfig(calc)
val tableConfig = unwrapTableConfig(calc)
val (rankRange, remainingPreds) = RankUtil.extractRankRange(
predicate,
rankFieldIndex,
calc.getCluster.getRexBuilder,
config)
tableConfig)

// remaining predicate must not access rank field attributes
val remainingPredsAccessRank = remainingPreds.isDefined &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class StreamPhysicalGroupWindowAggregateRule
val providedTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet)

val config = ShortcutUtils.unwrapTableConfig(rel)
val emitStrategy = WindowEmitStrategy(config, agg.getWindow)
val tableConfig = ShortcutUtils.unwrapTableConfig(rel)
val emitStrategy = WindowEmitStrategy(tableConfig, agg.getWindow)

new StreamPhysicalGroupWindowAggregate(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class StreamPhysicalGroupWindowTableAggregateRule
val providedTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet)

val config = ShortcutUtils.unwrapTableConfig(rel)
val emitStrategy = WindowEmitStrategy(config, agg.getWindow)
val tableConfig = ShortcutUtils.unwrapTableConfig(rel)
val emitStrategy = WindowEmitStrategy(tableConfig, agg.getWindow)

new StreamPhysicalGroupWindowTableAggregate(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class StreamPhysicalTableSourceScanRule
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val config = ShortcutUtils.unwrapContext(rel.getCluster).getTableConfig
val tableConfig = ShortcutUtils.unwrapContext(rel.getCluster).getTableConfig
val table = scan.getTable.asInstanceOf[TableSourceTable]

val newScan = new StreamPhysicalTableSourceScan(
Expand All @@ -74,7 +74,7 @@ class StreamPhysicalTableSourceScanRule
val resolvedSchema = table.contextResolvedTable.getResolvedSchema

if (isUpsertSource(resolvedSchema, table.tableSource) ||
isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, config)) {
isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, tableConfig)) {
// generate changelog normalize node
// primary key has been validated in CatalogSourceTable
val primaryKey = resolvedSchema.getPrimaryKey.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@
public class TypeConversionsTest extends ScalarOperatorsTestBase {
@Test
public void testTimestampWithLocalTimeZoneToString() {
config().setLocalTimeZone(ZoneOffset.ofHours(2));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
testTableApi(lit(Instant.EPOCH).cast(DataTypes.STRING()), "1970-01-01 02:00:00");
}

@Test
public void testTimestampWithLocalTimeZoneToDate() {
config().setLocalTimeZone(ZoneOffset.ofHours(4));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(4));
testTableApi(lit(Instant.EPOCH).cast(DataTypes.DATE()), "1970-01-01");
}

@Test
public void testTimestampWithLocalTimeZoneToTime() {
config().setLocalTimeZone(ZoneOffset.ofHours(4));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(4));
testTableApi(lit(Instant.EPOCH).cast(DataTypes.TIME(0)), "04:00:00");
}

@Test
public void testTimestampWithLocalTimeZoneToTimestamp() {
config().setLocalTimeZone(ZoneOffset.ofHours(3));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(3));
testTableApi(lit(Instant.EPOCH).cast(DataTypes.TIMESTAMP(0)), "1970-01-01 03:00:00");
}

@Test
public void testStringToTimestampWithLocalTimeZone() {
config().setLocalTimeZone(ZoneOffset.ofHours(2));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
testTableApi(
lit("1970-01-01 00:00:00").cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(0)),
"1970-01-01 00:00:00");
Expand All @@ -71,7 +71,7 @@ public void testStringToTimestampWithLocalTimeZone() {

@Test
public void testTimestampToTimestampWithLocalTimeZone() {
config().setLocalTimeZone(ZoneOffset.ofHours(2));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
testTableApi(
lit(LocalDateTime.parse("1970-01-01T00:00:00"))
.cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(0)),
Expand All @@ -84,7 +84,7 @@ public void testTimestampToTimestampWithLocalTimeZone() {

@Test
public void testTimeToTimestampWithLocalTimeZone() {
config().setLocalTimeZone(ZoneOffset.ofHours(2));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
testTableApi(
lit(LocalTime.parse("12:00:00")).cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(0)),
"1970-01-01 12:00:00");
Expand All @@ -96,7 +96,7 @@ public void testTimeToTimestampWithLocalTimeZone() {

@Test
public void testDateToTimestampWithLocalTimeZone() {
config().setLocalTimeZone(ZoneOffset.ofHours(2));
tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
testTableApi(
lit(LocalDate.parse("1970-02-01"))
.cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(0)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ import java.util.Collections
class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) {

// mock FlinkPlannerImpl to avoid discovering TableEnvironment and Executor.
val config = TableConfig.getDefault
val tableConfig = TableConfig.getDefault
val moduleManager = new ModuleManager
val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager)
val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager)
val plannerContext = new PlannerContext(
false,
config,
tableConfig,
moduleManager,
functionCatalog,
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ class NonDeterministicTests extends ExpressionTestBase {
@Test
def testTemporalFunctionsInBatchMode(): Unit = {
val zoneId = ZoneId.of("Asia/Shanghai")
config.setLocalTimeZone(zoneId)
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
tableConfig.setLocalTimeZone(zoneId)
tableConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)

config.set(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, Long.box(1123L))
config.set(
tableConfig.set(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, Long.box(1123L))
tableConfig.set(
InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
Long.box(1123L + TimeZone.getTimeZone(zoneId).getOffset(1123L)))

Expand All @@ -110,7 +110,7 @@ class NonDeterministicTests extends ExpressionTestBase {

@Test
def testCurrentRowTimestampFunctionsInBatchMode(): Unit = {
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
tableConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
val temporalFunctions = getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()"))

val round1 = evaluateFunctionResult(temporalFunctions)
Expand All @@ -135,7 +135,7 @@ class NonDeterministicTests extends ExpressionTestBase {
}

private def testTemporalTimestamp(zoneId: ZoneId): Unit = {
config.setLocalTimeZone(zoneId)
tableConfig.setLocalTimeZone(zoneId)
val localDateTime = LocalDateTime.now(zoneId)

val formattedLocalTime = localDateTime
Expand Down
Loading

0 comments on commit 3ddd74e

Please sign in to comment.