Skip to content

Commit

Permalink
[FLINK-26843][table-planner] Use TableConfigUtils to get local timezone.
Browse files Browse the repository at this point in the history
Replace usages of `TableConfig#getLocalTimeZone()` with
`TableConfigUtils.getLocalTimeZone(tableConfig)` to facilitate
passing `ReadableConfig` instead of concrete `TableConfig`.
  • Loading branch information
matriv authored and twalthr committed Mar 24, 2022
1 parent d74859e commit 33ce844
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -81,7 +82,8 @@ public static SupportsFilterPushDown.Result apply(
context.getSourceRowType().getFieldNames().toArray(new String[0]),
context.getFunctionCatalog(),
context.getCatalogManager(),
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
TimeZone.getTimeZone(
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
List<Expression> filters =
predicates.stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.planner.utils.TableConfigUtils;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
Expand Down Expand Up @@ -134,7 +135,8 @@ protected Tuple2<RexNode[], RexNode[]> extractPredicates(
inputNames,
context.getFunctionCatalog(),
context.getCatalogManager(),
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
TimeZone.getTimeZone(
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));

return RexNodeExtractor.extractConjunctiveConditions(
filterExpression, maxCnfNodeCount, rexBuilder, converter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.calcite.plan.RelOptRule;
Expand Down Expand Up @@ -341,7 +342,8 @@ private List<Map<String, String>> readPartitionFromCatalogAndPrune(
allFieldNames.toArray(new String[0]),
context.getFunctionCatalog(),
context.getCatalogManager(),
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
TimeZone.getTimeZone(
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
ArrayList<Expression> partitionFilters = new ArrayList<>();
Option<ResolvedExpression> subExpr;
for (RexNode node : JavaConversions.seqAsJavaList(partitionPredicate)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.tools.FrameworkConfig
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.StreamGraph
import org.apache.flink.table.api._
Expand Down Expand Up @@ -60,6 +59,7 @@ import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateTableSink}
import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala}
import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.table.runtime.generated.CompileUtils
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
Expand Down Expand Up @@ -443,7 +443,7 @@ abstract class PlannerBase(
val epochTime :JLong = System.currentTimeMillis()
tableConfig.set(TABLE_QUERY_START_EPOCH_TIME, epochTime)
val localTime :JLong = epochTime +
TimeZone.getTimeZone(tableConfig.getLocalTimeZone).getOffset(epochTime)
TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)).getOffset(epochTime)
tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime)

// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, LegacyTableSourceTable}
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, FlinkRexUtil, RexNodeExtractor}
import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils}
import org.apache.flink.table.sources.FilterableTableSource

import org.apache.calcite.plan.RelOptRule.{none, operand}
Expand Down Expand Up @@ -94,8 +95,8 @@ class PushFilterIntoLegacyTableSourceScanRule extends RelOptRule(
relBuilder.getRexBuilder,
context.getFunctionCatalog,
context.getCatalogManager,
TimeZone.getTimeZone(scan.getCluster.getPlanner.getContext
.unwrap(classOf[FlinkContext]).getTableConfig.getLocalTimeZone))
TimeZone.getTimeZone(
TableConfigUtils.getLocalTimeZone(ShortcutUtils.unwrapTableConfig(scan))))

if (predicates.isEmpty) {
// no condition can be translated to expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.data.{DecimalDataUtils, GenericRowData, StringData, TimestampData}
import org.apache.flink.table.planner.codegen.CodeGenUtils.DEFAULT_COLLECTOR_TERM
import org.apache.flink.table.planner.codegen.{ConstantCodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
Expand Down Expand Up @@ -124,7 +125,10 @@ object PartitionPruner {
// do filter against all partitions
allPartitions.foreach { partition =>
val row = convertPartitionToRow(
tableConfig.getLocalTimeZone, partitionFieldNames, partitionFieldTypes, partition)
TableConfigUtils.getLocalTimeZone(tableConfig),
partitionFieldNames,
partitionFieldTypes,
partition)
collector.collect(richMapFunction.map(row))
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
Expand Down Expand Up @@ -529,7 +530,7 @@ public void testToDataStreamCustomEventTime() throws Exception {
final TableConfig tableConfig = tableEnv.getConfig();

// session time zone should not have an impact on the conversion
final ZoneId originalZone = tableConfig.getLocalTimeZone();
final ZoneId originalZone = TableConfigUtils.getLocalTimeZone(tableConfig);
tableConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin"));

final LocalDateTime localDateTime1 = LocalDateTime.parse("1970-01-01T00:00:00.000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.flink.table.api._
import org.apache.flink.table.expressions.TimeIntervalUnit
import org.apache.flink.table.planner.codegen.CodeGenException
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.table.planner.utils.DateTimeTestUtil
import org.apache.flink.table.planner.utils.{DateTimeTestUtil, TableConfigUtils}
import org.apache.flink.table.planner.utils.DateTimeTestUtil._
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
Expand Down Expand Up @@ -1735,9 +1735,9 @@ class TemporalTypesTest extends ExpressionTestBase {
testData.setField(21, 44L)
testData.setField(22, 3)
testData.setField(23, localDateTime("1970-01-01 00:00:00.123456789")
.atZone(config.getLocalTimeZone).toInstant)
.atZone(TableConfigUtils.getLocalTimeZone(config)).toInstant)
testData.setField(24, localDateTime("1970-01-01 00:00:00.123456789")
.atZone(config.getLocalTimeZone).toInstant)
.atZone(TableConfigUtils.getLocalTimeZone(config)).toInstant)
testData.setField(25, localDateTime("1970-01-01 00:00:00.123456789").toInstant(ZoneOffset.UTC))
testData setField(26, new Integer(124).byteValue())
testData
Expand Down

0 comments on commit 33ce844

Please sign in to comment.