Skip to content

Commit

Permalink
[hotfix][table-planner] Remove ExecNodeConfig#getLocalTimeZone
Browse files Browse the repository at this point in the history
Remove `ExecNodeConfig#getLocalTimeZone` by replacing its usages with
the newly introduced method in the utility class `TableConfigUtils`.
  • Loading branch information
matriv authored and twalthr committed Mar 22, 2022
1 parent d526585 commit bc0cc0c
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;

import java.time.ZoneId;
import java.util.Optional;

/**
Expand Down Expand Up @@ -86,18 +85,12 @@ public long getStateRetentionTime() {

// See https://issues.apache.org/jira/browse/FLINK-26190
/**
* Using {@link #tableConfig} to satisify tests like {@code OverAggregateHarnessTest}, which
* use {@code HarnessTestBase#TestTableConfig} to individually manipulate the
* Using {@link #tableConfig} to satisify tests like {@code OverAggregateHarnessTest}, which use
* {@code HarnessTestBase#TestTableConfig} to individually manipulate the
* maxIdleStateRetentionTime. See {@link TableConfig#getMaxIdleStateRetentionTime()}.
*/
@Deprecated
public long getMaxIdleStateRetentionTime() {
return tableConfig.getMaxIdleStateRetentionTime();
}

// See https://issues.apache.org/jira/browse/FLINK-26190
/** See {@link TableConfig#getLocalTimeZone()}. */
public ZoneId getLocalTimeZone() {
return tableConfig.getLocalTimeZone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.WindowTableFunctionOperator;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
Expand Down Expand Up @@ -80,7 +81,8 @@ protected Transformation<RowData> translateToPlanInternal(
WindowAssigner<TimeWindow> windowAssigner = createWindowAssigner(windowingStrategy);
final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowingStrategy.getTimeAttributeType(), config.getLocalTimeZone());
windowingStrategy.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
WindowTableFunctionOperator windowTableFunctionOperator =
new WindowTableFunctionOperator(
windowAssigner, windowingStrategy.getTimeAttributeIndex(), shiftTimeZone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
Expand Down Expand Up @@ -151,7 +152,8 @@ protected Transformation<RowData> translateToPlanInternal(

final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(), config.getLocalTimeZone());
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
final SliceAssigner sliceAssigner = createSliceAssigner(windowing, shiftTimeZone);

final AggregateInfoList localAggInfoList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedClass;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceTableAggsHandleFunction;
Expand Down Expand Up @@ -226,7 +227,7 @@ protected Transformation<RowData> translateToPlanInternal(
final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
window.timeAttribute().getOutputDataType().getLogicalType(),
config.getLocalTimeZone());
TableConfigUtils.getLocalTimeZone(config));

final boolean[] aggCallNeedRetractions = new boolean[aggCalls.length];
Arrays.fill(aggCallNeedRetractions, needRetraction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator;
Expand Down Expand Up @@ -137,7 +138,8 @@ protected Transformation<RowData> translateToPlanInternal(

final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(), config.getLocalTimeZone());
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
final SliceAssigner sliceAssigner = createSliceAssigner(windowing, shiftTimeZone);

final AggregateInfoList aggInfoList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.table.planner.plan.utils.PythonUtil;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
Expand Down Expand Up @@ -224,7 +225,7 @@ protected Transformation<RowData> translateToPlanInternal(
final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
window.timeAttribute().getOutputDataType().getLogicalType(),
config.getLocalTimeZone());
TableConfigUtils.getLocalTimeZone(config));
Tuple2<WindowAssigner<?>, Trigger<?>> windowAssignerAndTrigger =
generateWindowAssignerAndTrigger();
WindowAssigner<?> windowAssigner = windowAssignerAndTrigger.f0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
Expand Down Expand Up @@ -152,7 +153,8 @@ protected Transformation<RowData> translateToPlanInternal(

final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(), config.getLocalTimeZone());
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
final SliceAssigner sliceAssigner = createSliceAssigner(windowing, shiftTimeZone);

// Hopping window requires additional COUNT(*) to determine whether to register next timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.deduplicate.window.RowTimeWindowDeduplicateOperatorBuilder;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -149,7 +150,8 @@ protected Transformation<RowData> translateToPlanInternal(

ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(), config.getLocalTimeZone());
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));

RowType inputType = (RowType) inputEdge.getOutputType();
RowDataKeySelector selector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator;
Expand Down Expand Up @@ -160,7 +161,8 @@ protected Transformation<RowData> translateToPlanInternal(

ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
leftWindowing.getTimeAttributeType(), config.getLocalTimeZone());
leftWindowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
WindowJoinOperator operator =
WindowJoinOperatorBuilder.builder()
.leftSerializer(leftTypeInfo.toRowSerializer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
Expand Down Expand Up @@ -218,7 +219,8 @@ protected Transformation<RowData> translateToPlanInternal(

ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(), config.getLocalTimeZone());
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
GeneratedRecordComparator sortKeyComparator =
ComparatorCodeGenerator.gen(
config,
Expand Down

0 comments on commit bc0cc0c

Please sign in to comment.