Skip to content

Commit

Permalink
feat: introduce include clause to add additional connector columns (r…
Browse files Browse the repository at this point in the history
…isingwavelabs#13707)

Signed-off-by: tabVersion <[email protected]>
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
tabVersion and xxchan authored Dec 20, 2023
1 parent 66f6bc0 commit feadac7
Show file tree
Hide file tree
Showing 48 changed files with 952 additions and 428 deletions.
4 changes: 3 additions & 1 deletion e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
statement ok
create table from_kafka with (
create table from_kafka ( primary key (some_key) )
include key as some_key
with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092')
Expand Down
63 changes: 63 additions & 0 deletions e2e_test/source/basic/inlcude_key_as.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# upsert format must have a pk
statement error
CREATE TABLE upsert_students_default_key (
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
FORMAT UPSERT ENCODE JSON

# upsert format pk must be the key column
statement error
CREATE TABLE upsert_students_default_key (
"ID" INT primary key,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
FORMAT UPSERT ENCODE JSON

statement ok
CREATE TABLE upsert_students_default_key (
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL,
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
FORMAT PLAIN ENCODE JSON

statement ok
select * from upsert_students_default_key;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 3s

query I
select count(rw_key) from upsert_students_default_key
----
15

statement ok
drop table upsert_students_default_key
10 changes: 7 additions & 3 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,10 @@ CREATE TABLE upsert_students_default_key (
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
weight REAL,
primary key (rw_key)
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand All @@ -425,13 +427,15 @@ FORMAT UPSERT ENCODE JSON

statement ok
CREATE TABLE upsert_students (
"ID" INT PRIMARY KEY,
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
weight REAL,
primary key (rw_key)
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down
20 changes: 6 additions & 14 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,17 @@

# If we cannot extract key schema, use message key as varchar primary key
statement ok
CREATE TABLE upsert_avro_json_default_key ()
CREATE TABLE upsert_avro_json_default_key ( primary key (rw_key) )
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');


# key schema should be a subset of value schema
statement error
CREATE TABLE upsert_student_key_not_subset_of_value_avro_json ()
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_student_key_not_subset_of_value_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');


statement ok
CREATE TABLE upsert_student_avro_json ()
CREATE TABLE upsert_student_avro_json ( primary key (rw_key) )
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -68,7 +59,8 @@ CREATE TABLE kafka_json_schema_plain with (
) FORMAT PLAIN ENCODE JSON (schema.registry = 'http://schemaregistry:8082');

statement ok
CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(id))
CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(rw_key))
INCLUDE KEY AS rw_key
with (
connector = 'kafka',
kafka.topic = 'kafka_upsert_json_schema',
Expand Down
47 changes: 5 additions & 42 deletions e2e_test/source/basic/old_row_format_syntax/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -368,33 +368,20 @@ WITH (
topic = 'debezium_mongo_json_customers_no_schema_field')
ROW FORMAT DEBEZIUM_MONGO_JSON

statement ok
CREATE TABLE upsert_students_default_key (
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
ROW FORMAT UPSERT_JSON

statement ok
CREATE TABLE upsert_students (
"ID" INT PRIMARY KEY,
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
weight REAL,
primary key (rw_key)
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
ROW FORMAT UPSERT_JSON

Expand Down Expand Up @@ -682,27 +669,6 @@ ORDER BY
6 Leah Davis 18 5.7 140
9 Jacob Anderson 20 5.8 155

query II
SELECT
"ID",
"firstName",
"lastName",
"age",
"height",
"weight"
FROM
upsert_students_default_key
ORDER BY
"ID";
----
1 Ethan Martinez 18 6.1 180
2 Emily Jackson 19 5.4 110
3 Noah Thompson 21 6.3 195
4 Emma Brown 20 5.3 130
5 Michael Williams 22 6.2 190
6 Leah Davis 18 5.7 140
9 Jacob Anderson 20 5.8 155

query II
select
L_ORDERKEY,
Expand Down Expand Up @@ -791,8 +757,5 @@ DROP TABLE mongo_customers_no_schema_field;
statement ok
DROP TABLE upsert_students;

statement ok
DROP TABLE upsert_students_default_key;

statement ok
drop table dbz_ignore_case_json;
138 changes: 0 additions & 138 deletions e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt

This file was deleted.

4 changes: 3 additions & 1 deletion e2e_test/source/basic/schema_registry.slt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ create table t1 () with (
);

statement ok
create table t1 () with (
create table t1 (primary key(rw_key))
INCLUDE KEY AS rw_key
with (
connector = 'kafka',
topic = 'upsert_avro_json-topic-record',
properties.bootstrap.server = 'message_queue:29092'
Expand Down
Loading

0 comments on commit feadac7

Please sign in to comment.