Skip to content

Commit

Permalink
[FLINK-22849][python] Move legacy testing utils to flink-python test jar
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Jun 4, 2021
1 parent 214ade6 commit 578ccd8
Show file tree
Hide file tree
Showing 11 changed files with 782 additions and 37 deletions.
18 changes: 18 additions & 0 deletions flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,24 @@ under the License.
</execution>
</executions>
</plugin>
<!-- This is only a temporary solution until FLINK-22872 is fixed. -->
<!-- It compiles `org.apache.flink.table.legacyutils` containing code from the old planner. -->
<!-- We should not start adding more Scala code. Please remove this as soon as possible. -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<!-- Run Scala compiler in the process-test-resources phase, so that dependencies on
Scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2 changes: 1 addition & 1 deletion flink-python/pyflink/pyflink_gateway_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ def download_apache_avro():

def construct_test_classpath():
test_jar_patterns = [
"flink-table/flink-table-planner/target/flink-table-planner*-tests.jar",
"flink-runtime/target/flink-runtime*tests.jar",
"flink-streaming-java/target/flink-streaming-java*tests.jar",
"flink-formats/flink-csv/target/flink-csv*.jar",
"flink-formats/flink-avro/target/flink-avro*.jar",
"flink-formats/flink-avro/target/avro*.jar",
"flink-formats/flink-json/target/flink-json*.jar",
"flink-python/target/artifacts/testDataStream.jar",
"flink-python/target/flink-python*-tests.jar",
]
test_jars = []
flink_source_root = _find_flink_source_root()
Expand Down
8 changes: 4 additions & 4 deletions flink-python/pyflink/table/tests/test_correlate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class CorrelateTests(PyFlinkBlinkStreamTableTestCase):
def test_join_lateral(self):
t_env = self.t_env
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
"org.apache.flink.table.legacyutils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

result = source.join_lateral("split(words) as (word)")
Expand All @@ -37,7 +37,7 @@ def test_join_lateral(self):
def test_join_lateral_with_join_predicate(self):
t_env = self.t_env
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
"org.apache.flink.table.legacyutils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

result = source.join_lateral(expr.call('split', source.words).alias('word'),
Expand All @@ -52,7 +52,7 @@ def test_join_lateral_with_join_predicate(self):
def test_left_outer_join_lateral(self):
t_env = self.t_env
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
"org.apache.flink.table.legacyutils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

result = source.left_outer_join_lateral(expr.call('split', source.words).alias('word'))
Expand All @@ -65,7 +65,7 @@ def test_left_outer_join_lateral(self):
def test_left_outer_join_lateral_with_join_predicate(self):
t_env = self.t_env
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
"org.apache.flink.table.legacyutils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

# only support "true" as the join predicate currently
Expand Down
25 changes: 12 additions & 13 deletions flink-python/pyflink/table/tests/test_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,18 +744,18 @@ def test_timestamps_from_source(self):

def test_timestamps_from_extractor(self):
rowtime = Rowtime().timestamps_from_extractor(
"org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor")
"org.apache.flink.table.legacyutils.CustomExtractor")

properties = rowtime.to_properties()
expected = {
'rowtime.timestamps.type': 'custom',
'rowtime.timestamps.class':
'org.apache.flink.table.descriptors.RowtimeTest$CustomExtractor',
'org.apache.flink.table.legacyutils.CustomExtractor',
'rowtime.timestamps.serialized':
'rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R'
'vbUV4dHJhY3RvcoaChjMg55xwAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLm'
'FwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1'
'Y6piFNsGAIAAHhwdAACdHM'}
'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvctj'
'ZLTGK9XvxAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
'50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd'
'AACdHM'}
self.assertEqual(expected, properties)

def test_watermarks_periodic_ascending(self):
Expand All @@ -782,19 +782,18 @@ def test_watermarks_from_source(self):

def test_watermarks_from_strategy(self):
rowtime = Rowtime().watermarks_from_strategy(
"org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner")
"org.apache.flink.table.legacyutils.CustomAssigner")

properties = rowtime.to_properties()
expected = {
'rowtime.watermarks.type': 'custom',
'rowtime.watermarks.class':
'org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner',
'org.apache.flink.table.legacyutils.CustomAssigner',
'rowtime.watermarks.serialized':
'rO0ABXNyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaXB0b3JzLlJvd3RpbWVUZXN0JEN1c3R'
'vbUFzc2lnbmVyeDcuDvfbu0kCAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3'
'RyYXRlZ2llcy5QdW5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY'
'2hlLmZsaW5rLnRhYmxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OW'
'aT4CAAB4cA'}
'rO0ABXNyADFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUFzc2lnbmVyu_8'
'TLNBQBsACAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5QdW'
'5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY2hlLmZsaW5rLnRhY'
'mxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OWaT4CAAB4cA'}
self.assertEqual(expected, properties)


Expand Down
24 changes: 12 additions & 12 deletions flink-python/pyflink/table/tests/test_table_environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ def test_register_functions(self):
"python_scalar_func", udf(lambda i: i, result_type=DataTypes.INT()))

t_env.register_java_function("scalar_func",
"org.apache.flink.table.expressions.utils.RichFunc0")
"org.apache.flink.table.legacyutils.RichFunc0")
t_env.register_java_function(
"agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
t_env.register_java_function("table_func", "org.apache.flink.table.utils.TableFunc1")
"agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
t_env.register_java_function("table_func", "org.apache.flink.table.legacyutils.TableFunc1")

actual = t_env.list_user_defined_functions()
expected = ['python_scalar_func', 'scalar_func', 'agg_func', 'table_func']
Expand Down Expand Up @@ -148,11 +148,11 @@ def test_create_and_drop_java_function(self):
t_env = self.t_env

t_env.create_java_temporary_system_function(
"scalar_func", "org.apache.flink.table.expressions.utils.RichFunc0")
"scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
t_env.create_java_function(
"agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
"agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
t_env.create_java_temporary_function(
"table_func", "org.apache.flink.table.utils.TableFunc1")
"table_func", "org.apache.flink.table.legacyutils.TableFunc1")
self.assert_equals(t_env.list_user_defined_functions(),
['scalar_func', 'agg_func', 'table_func'])

Expand Down Expand Up @@ -354,13 +354,13 @@ def test_register_java_function(self):
t_env = self.t_env

t_env.register_java_function(
"scalar_func", "org.apache.flink.table.expressions.utils.RichFunc0")
"scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")

t_env.register_java_function(
"agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
"agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")

t_env.register_java_function(
"table_func", "org.apache.flink.table.utils.TableFunc2")
"table_func", "org.apache.flink.table.legacyutils.TableFunc1")

actual = t_env.list_user_defined_functions()
expected = ['scalar_func', 'agg_func', 'table_func']
Expand Down Expand Up @@ -432,11 +432,11 @@ def test_create_and_drop_java_function(self):
t_env = self.t_env

t_env.create_java_temporary_system_function(
"scalar_func", "org.apache.flink.table.expressions.utils.RichFunc0")
"scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
t_env.create_java_function(
"agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
"agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
t_env.create_java_temporary_function(
"table_func", "org.apache.flink.table.utils.TableFunc1")
"table_func", "org.apache.flink.table.legacyutils.TableFunc1")
self.assert_equals(t_env.list_user_defined_functions(),
['scalar_func', 'agg_func', 'table_func'])

Expand Down
13 changes: 6 additions & 7 deletions flink-python/pyflink/testing/source_sink_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,16 @@ def _ensure_initialized(cls):

FLINK_SOURCE_ROOT_DIR = _find_flink_source_root()
filename_pattern = (
"flink-table/flink-table-planner/target/"
"flink-table-planner*-tests.jar")
"flink-python/target/flink-python*-tests.jar")
if not glob.glob(os.path.join(FLINK_SOURCE_ROOT_DIR, filename_pattern)):
raise unittest.SkipTest(
"'flink-table-planner*-tests.jar' is not available. Will skip the related tests.")
"'flink-python*-tests.jar' is not available. Will skip the related tests.")

gateway = get_gateway()
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestAppendSink")
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestRetractSink")
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestUpsertSink")
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.RowCollector")
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestAppendSink")
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestRetractSink")
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestUpsertSink")
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.RowCollector")

TestTableSink._inited = True

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.table.legacyutils.TestCollectionTableFactory
Loading

0 comments on commit 578ccd8

Please sign in to comment.