Skip to content

Commit

Permalink
[minor][python][tests] Add some test cases for binary and char types
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Jan 13, 2023
1 parent 2ef6f26 commit 6b3a5db
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 14 deletions.
4 changes: 2 additions & 2 deletions flink-python/pyflink/fn_execution/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ def _to_data_type(cls, field_type):
elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
return DoubleType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
return VarCharType(0x7fffffff, field_type.nullable)
return VarCharType(field_type.var_char_info.length, field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.BINARY:
return BinaryType(field_type.binary_info.length, field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.VARBINARY:
return VarBinaryType(0x7fffffff, field_type.nullable)
return VarBinaryType(field_type.var_binary_info.length, field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.DECIMAL:
return DecimalType(field_type.decimal_info.precision,
field_type.decimal_info.scale,
Expand Down
55 changes: 48 additions & 7 deletions flink-python/pyflink/table/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,45 @@ def decimal_cut_func(decimal_param):
'decimal_param is wrong value %s !' % decimal_param
return decimal_param

@udf(result_type=DataTypes.BINARY(5))
def binary_func(binary_param):
assert len(binary_param) == 5
return binary_param

@udf(result_type=DataTypes.CHAR(7))
def char_func(char_param):
assert len(char_param) == 7
return char_param

@udf(result_type=DataTypes.VARCHAR(10))
def varchar_func(varchar_param):
assert len(varchar_param) <= 10
return varchar_param

sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(
a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g FLOAT, h DOUBLE, i BYTES,
j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>, o MAP<BIGINT, STRING>,
p DECIMAL(38, 18), q DECIMAL(38, 18)) WITH ('connector'='test-sink')
a BIGINT,
b BIGINT,
c TINYINT,
d BOOLEAN,
e SMALLINT,
f INT,
g FLOAT,
h DOUBLE,
i BYTES,
j STRING,
k DATE,
l TIME,
m TIMESTAMP(3),
n ARRAY<BIGINT>,
o MAP<BIGINT, STRING>,
p DECIMAL(38, 18),
q DECIMAL(38, 18),
r BINARY(5),
s CHAR(7),
t VARCHAR(10)
) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)

Expand All @@ -408,7 +441,8 @@ def decimal_cut_func(decimal_param):
datetime.time(hour=12, minute=0, second=0, microsecond=123000),
datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), [[1, 2, 3]],
{1: 'flink', 2: 'pyflink'}, decimal.Decimal('1000000000000000000.05'),
decimal.Decimal('1000000000000000000.05999999999999999899999999999'))],
decimal.Decimal('1000000000000000000.05999999999999999899999999999'),
bytearray(b'flink'), 'pyflink', 'pyflink')],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
Expand All @@ -426,7 +460,10 @@ def decimal_cut_func(decimal_param):
DataTypes.FIELD("n", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT()))),
DataTypes.FIELD("o", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING())),
DataTypes.FIELD("p", DataTypes.DECIMAL(38, 18)),
DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18))]))
DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18)),
DataTypes.FIELD("r", DataTypes.BINARY(5)),
DataTypes.FIELD("s", DataTypes.CHAR(7)),
DataTypes.FIELD("t", DataTypes.VARCHAR(10))]))

t.select(
bigint_func(t.a),
Expand All @@ -445,7 +482,10 @@ def decimal_cut_func(decimal_param):
array_func(t.n),
map_func(t.o),
decimal_func(t.p),
decimal_cut_func(t.q)) \
decimal_cut_func(t.q),
binary_func(t.r),
char_func(t.s),
varchar_func(t.t)) \
.execute_insert(sink_table).wait()
actual = source_sink_utils.results()
# Currently the sink result precision of DataTypes.TIME(precision) only supports 0.
Expand All @@ -454,7 +494,8 @@ def decimal_cut_func(decimal_param):
"[102, 108, 105, 110, 107], pyflink, 2014-09-13, 12:00:00.123, "
"2018-03-11T03:00:00.123, [1, 2, 3], "
"{1=flink, 2=pyflink}, 1000000000000000000.050000000000000000, "
"1000000000000000000.059999999999999999]"])
"1000000000000000000.059999999999999999, [102, 108, 105, 110, 107], "
"pyflink, pyflink]"])

def test_all_data_types(self):
def boolean_func(bool_param):
Expand Down
5 changes: 0 additions & 5 deletions flink-python/pyflink/table/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2360,8 +2360,6 @@ def CHAR(length: int, nullable: bool = True) -> CharType:
:param length: int, the string representation length. It must have a value
between 1 and 2147483647(0x7fffffff) (both inclusive).
:param nullable: boolean, whether the type can be null (None) or not.
.. note:: `CharType` is still not supported yet.
"""
return CharType(length, nullable)

Expand Down Expand Up @@ -2409,8 +2407,6 @@ def BINARY(length: int, nullable: bool = True) -> BinaryType:
:param length: int, the number of bytes. It must have a value between
1 and 2147483647(0x7fffffff) (both inclusive).
:param nullable: boolean, whether the type can be null (None) or not.
.. note:: `BinaryType` is still not supported yet.
"""
return BinaryType(length, nullable)

Expand All @@ -2423,7 +2419,6 @@ def VARBINARY(length: int, nullable: bool = True) -> VarBinaryType:
between 1 and 2147483647(0x7fffffff) (both inclusive).
:param nullable: boolean, whether the type can be null (None) or not.
.. note:: The length limit must be 0x7fffffff(2147483647) currently.
.. seealso:: :func:`~DataTypes.BYTES`
"""
return VarBinaryType(length, nullable)
Expand Down

0 comments on commit 6b3a5db

Please sign in to comment.