Skip to content

Commit

Permalink
[FLINK-32689][table] Improve validation of local time zone
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Aug 7, 2023
1 parent 6e138f1 commit 8c9b176
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.factories.TableFactory;

import java.time.ZoneId;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -46,8 +48,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.time.ZoneId.SHORT_IDS;

/**
* File system {@link TableFactory}.
*
Expand Down Expand Up @@ -155,18 +155,9 @@ private void validate(FactoryUtil.TableFactoryHelper helper) {
helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + ".");

// validate time zone of watermark
String watermarkTimeZone =
validateTimeZone(
helper.getOptions()
.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE);
if (watermarkTimeZone.startsWith("UTC+")
|| watermarkTimeZone.startsWith("UTC-")
|| SHORT_IDS.containsKey(watermarkTimeZone)) {
throw new ValidationException(
String.format(
"The supported watermark time zone is either a full name such as 'America/Los_Angeles',"
+ " or a custom time zone id such as 'GMT-08:00', but configured time zone is '%s'.",
watermarkTimeZone));
}
.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
}

private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat(
Expand Down Expand Up @@ -220,4 +211,28 @@ private boolean formatFactoryExists(Context context, Class<?> factoryClass) {

return !matchingFactories.isEmpty();
}

/** Similar logic as for {@link TableConfig}. */
private void validateTimeZone(String zone) {
boolean isValid;
try {
// We enforce a zone string that is compatible with both java.util.TimeZone and
// java.time.ZoneId to avoid bugs.
// In general, advertising either TZDB ID, GMT+xx:xx, or UTC is the best we can do.
isValid = java.util.TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone));
} catch (Exception e) {
isValid = false;
}

if (!isValid) {
throw new ValidationException(
String.format(
"Invalid time zone for '%s'. The value should be a Time Zone Database (TZDB) ID "
+ "such as 'America/Los_Angeles' to include daylight saving time. Fixed "
+ "offsets are supported using 'GMT-03:00' or 'GMT+03:00'. Or use 'UTC' "
+ "without time zone and daylight saving time.",
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE
.key()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@

import java.time.Duration;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static java.time.ZoneId.SHORT_IDS;
import static org.apache.flink.table.api.internal.TableConfigValidation.validateTimeZone;

/**
* Configuration for the current {@link TableEnvironment} session to adjust Table & SQL API
Expand Down Expand Up @@ -242,11 +243,12 @@ public void setSqlDialect(SqlDialect sqlDialect) {
* @see org.apache.flink.table.types.logical.LocalZonedTimestampType
*/
public ZoneId getLocalTimeZone() {
String zone = configuration.getString(TableConfigOptions.LOCAL_TIME_ZONE);
final String zone = configuration.getString(TableConfigOptions.LOCAL_TIME_ZONE);
if (TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)) {
return ZoneId.systemDefault();
}
validateTimeZone(zone);
return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
? ZoneId.systemDefault()
: ZoneId.of(zone);
return ZoneId.of(zone);
}

/**
Expand Down Expand Up @@ -296,22 +298,17 @@ public ZoneId getLocalTimeZone() {
* @see org.apache.flink.table.types.logical.LocalZonedTimestampType
*/
public void setLocalTimeZone(ZoneId zoneId) {
validateTimeZone(zoneId.toString());
configuration.setString(TableConfigOptions.LOCAL_TIME_ZONE, zoneId.toString());
}

/** Validates user configured time zone. */
private void validateTimeZone(String zone) {
final String zoneId = zone.toUpperCase();
if (zoneId.startsWith("UTC+")
|| zoneId.startsWith("UTC-")
|| SHORT_IDS.containsKey(zoneId)) {
throw new IllegalArgumentException(
String.format(
"The supported Zone ID is either a full name such as 'America/Los_Angeles',"
+ " or a custom timezone id such as 'GMT-08:00', but configured Zone ID is '%s'.",
zone));
final String zone;
if (zoneId instanceof ZoneOffset) {
// Give ZoneOffset a timezone for backwards compatibility reasons.
// In general, advertising either TZDB ID, GMT+xx:xx, or UTC is the best we can do.
zone = ZoneId.ofOffset("GMT", (ZoneOffset) zoneId).toString();
} else {
zone = zoneId.toString();
}
validateTimeZone(zone);

configuration.setString(TableConfigOptions.LOCAL_TIME_ZONE, zone);
}

/** Returns the current configuration of Planner for Table API and SQL queries. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;

import java.time.ZoneId;

/** Utilities around validation to keep {@link TableConfig} clean. */
@Internal
public class TableConfigValidation {

/** Validates user configured time zone. */
public static void validateTimeZone(String zone) {
boolean isValid;
try {
// We enforce a zone string that is compatible with both java.util.TimeZone and
// java.time.ZoneId to avoid bugs.
// In general, advertising either TZDB ID, GMT+xx:xx, or UTC is the best we can do.
isValid = java.util.TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone));
} catch (Exception e) {
isValid = false;
}

if (!isValid) {
throw new ValidationException(
"Invalid time zone. The value should be a Time Zone Database (TZDB) ID "
+ "such as 'America/Los_Angeles' to include daylight saving time. Fixed "
+ "offsets are supported using 'GMT-03:00' or 'GMT+03:00'. Or use 'UTC' "
+ "without time zone and daylight saving time.");
}
}

private TableConfigValidation() {
// No instantiation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.api;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.config.TableConfigOptions;

import org.junit.jupiter.api.Test;
Expand All @@ -35,12 +34,10 @@ public class TableConfigTest {

private static final TableConfig CONFIG_BY_METHOD = TableConfig.getDefault();
private static final TableConfig CONFIG_BY_CONFIGURATION = TableConfig.getDefault();
private static final Configuration configuration = new Configuration();

@Test
void testSetAndGetSqlDialect() {
configuration.setString("table.sql-dialect", "HIVE");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
CONFIG_BY_CONFIGURATION.set("table.sql-dialect", "HIVE");
CONFIG_BY_METHOD.setSqlDialect(SqlDialect.HIVE);

assertThat(CONFIG_BY_METHOD.getSqlDialect()).isEqualTo(SqlDialect.HIVE);
Expand All @@ -49,8 +46,7 @@ void testSetAndGetSqlDialect() {

@Test
void testSetAndGetMaxGeneratedCodeLength() {
configuration.setString("table.generated-code.max-length", "5000");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
CONFIG_BY_CONFIGURATION.set("table.generated-code.max-length", "5000");
CONFIG_BY_METHOD.setMaxGeneratedCodeLength(5000);

assertThat(CONFIG_BY_METHOD.getMaxGeneratedCodeLength()).isEqualTo(Integer.valueOf(5000));
Expand All @@ -60,29 +56,28 @@ void testSetAndGetMaxGeneratedCodeLength() {

@Test
void testSetAndGetLocalTimeZone() {
configuration.setString("table.local-time-zone", "Asia/Shanghai");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
CONFIG_BY_CONFIGURATION.set("table.local-time-zone", "Asia/Shanghai");
CONFIG_BY_METHOD.setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

assertThat(CONFIG_BY_METHOD.getLocalTimeZone()).isEqualTo(ZoneId.of("Asia/Shanghai"));
assertThat(CONFIG_BY_CONFIGURATION.getLocalTimeZone())
.isEqualTo(ZoneId.of("Asia/Shanghai"));

configuration.setString("table.local-time-zone", "GMT-08:00");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
CONFIG_BY_CONFIGURATION.set("table.local-time-zone", "GMT-08:00");
CONFIG_BY_METHOD.setLocalTimeZone(ZoneId.of("GMT-08:00"));

assertThat(CONFIG_BY_METHOD.getLocalTimeZone()).isEqualTo(ZoneId.of("GMT-08:00"));
assertThat(CONFIG_BY_CONFIGURATION.getLocalTimeZone()).isEqualTo(ZoneId.of("GMT-08:00"));

CONFIG_BY_CONFIGURATION.set("table.local-time-zone", "UTC");
CONFIG_BY_METHOD.setLocalTimeZone(ZoneId.of("UTC"));
assertThat(CONFIG_BY_METHOD.getLocalTimeZone()).isEqualTo(ZoneId.of("UTC"));
assertThat(CONFIG_BY_CONFIGURATION.getLocalTimeZone()).isEqualTo(ZoneId.of("UTC"));
}

@Test
public void testSetInvalidLocalTimeZone() {
assertThatThrownBy(() -> CONFIG_BY_METHOD.setLocalTimeZone(ZoneId.of("UTC-10:00")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"The supported Zone ID is either a full name such as 'America/Los_Angeles',"
+ " or a custom timezone id such as 'GMT-08:00', but configured Zone ID is 'UTC-10:00'.");
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Invalid time zone.");
}

@Test
Expand All @@ -93,31 +88,32 @@ public void testInvalidGmtLocalTimeZone() {
}

@Test
void testGetInvalidLocalTimeZone() {
configuration.setString("table.local-time-zone", "UTC+8");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
void testGetInvalidLocalTimeZoneUTC() {
CONFIG_BY_CONFIGURATION.set("table.local-time-zone", "UTC+8");
assertThatThrownBy(CONFIG_BY_CONFIGURATION::getLocalTimeZone)
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Invalid time zone.");
}

@Test
void testGetInvalidLocalTimeZoneUT() {
CONFIG_BY_CONFIGURATION.set("table.local-time-zone", "UT+8");
assertThatThrownBy(CONFIG_BY_CONFIGURATION::getLocalTimeZone)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"The supported Zone ID is either a full name such as 'America/Los_Angeles',"
+ " or a custom timezone id such as 'GMT-08:00', but configured Zone ID is 'UTC+8'.");
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Invalid time zone.");
}

@Test
void testGetInvalidAbbreviationLocalTimeZone() {
configuration.setString("table.local-time-zone", "PST");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
CONFIG_BY_CONFIGURATION.set("table.local-time-zone", "PST");
assertThatThrownBy(CONFIG_BY_CONFIGURATION::getLocalTimeZone)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"The supported Zone ID is either a full name such as 'America/Los_Angeles',"
+ " or a custom timezone id such as 'GMT-08:00', but configured Zone ID is 'PST'.");
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Invalid time zone.");
}

@Test
void testSetAndGetIdleStateRetention() {
configuration.setString("table.exec.state.ttl", "1 h");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
CONFIG_BY_CONFIGURATION.set("table.exec.state.ttl", "1 h");
CONFIG_BY_METHOD.setIdleStateRetention(Duration.ofHours(1));

assertThat(CONFIG_BY_METHOD.getIdleStateRetention()).isEqualTo(Duration.ofHours(1));
Expand All @@ -126,8 +122,7 @@ void testSetAndGetIdleStateRetention() {

@Test
void testDisplayMaxColumnLength() {
configuration.setString("table.display.max-column-width", "100");
CONFIG_BY_CONFIGURATION.addConfiguration(configuration);
CONFIG_BY_CONFIGURATION.set("table.display.max-column-width", "100");
CONFIG_BY_METHOD.set(TableConfigOptions.DISPLAY_MAX_COLUMN_WIDTH, 100);

assertThat(CONFIG_BY_METHOD.get(TableConfigOptions.DISPLAY_MAX_COLUMN_WIDTH))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import java.util.HashSet;
import java.util.Set;

import static java.time.ZoneId.SHORT_IDS;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS;
import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY;
import static org.apache.flink.table.api.internal.TableConfigValidation.validateTimeZone;

/** Utility class for {@link TableConfig} related helper functions. */
public class TableConfigUtils {
Expand Down Expand Up @@ -101,11 +101,12 @@ public static CalciteConfig getCalciteConfig(TableConfig tableConfig) {
* @see TableConfig#getLocalTimeZone()
*/
public static ZoneId getLocalTimeZone(ReadableConfig tableConfig) {
String zone = tableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
final String zone = tableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
if (TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)) {
return ZoneId.systemDefault();
}
validateTimeZone(zone);
return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
? ZoneId.systemDefault()
: ZoneId.of(zone);
return ZoneId.of(zone);
}

/**
Expand All @@ -118,21 +119,6 @@ public static long getMaxIdleStateRetentionTime(ReadableConfig tableConfig) {
return tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis() * 3 / 2;
}

/** Validates user configured time zone. */
private static void validateTimeZone(String zone) {
final String zoneId = zone.toUpperCase();
if (zoneId.startsWith("UTC+")
|| zoneId.startsWith("UTC-")
|| SHORT_IDS.containsKey(zoneId)) {
throw new IllegalArgumentException(
String.format(
"The supported Zone ID is either a full name such as "
+ "'America/Los_Angeles', or a custom timezone id such as "
+ "'GMT-08:00', but configured Zone ID is '%s'.",
zone));
}
}

// Make sure that we cannot instantiate this class
private TableConfigUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ public void testUnsupportedWatermarkTimeZone() {
.satisfies(
anyCauseMatches(
ValidationException.class,
"The supported watermark time zone is either a full name such "
+ "as 'America/Los_Angeles', or a custom time zone id such "
+ "as 'GMT-08:00', but configured time zone is 'UTC+8'."));
"Invalid time zone for 'sink.partition-commit.watermark-time-zone'."));
}

@Test
Expand Down

0 comments on commit 8c9b176

Please sign in to comment.