Skip to content

Commit

Permalink
fix(frontend): fix changelog out_col_change error and add snowflake u…
Browse files Browse the repository at this point in the history
…psert sink demo (risingwavelabs#17515)
  • Loading branch information
xxhZs authored Jul 2, 2024
1 parent 74e0842 commit d9a90d6
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ create materialized view mv6 as with sub as changelog from t3 select * from sub;
statement ok
create materialized view mv7(col1,col2,col3) as with sub as changelog from t3 select * from sub;

statement ok
create materialized view mv8 as with sub as changelog from t2 select *, _changelog_row_id as row_id from sub;

statement ok
insert into t1 values(1,1),(2,2);

Expand Down Expand Up @@ -133,6 +136,17 @@ select * from mv7_rename order by col1;
5 500 3
6 6 1

query III rowsort
select v1 from mv8 order by v1;
----
1
1
1
2

statement ok
drop materialized view mv8;

statement ok
drop materialized view mv7_rename;

Expand Down
24 changes: 24 additions & 0 deletions integration_tests/snowflake-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,27 @@ launch your risingwave cluster, and execute the following sql commands respectiv
- `create_sink.sql`

note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`.

## 3. Sink data into snowflake with UPSERT

1. To begin the process of sink data into Snowflake with upsert, we need to set up snowflake and s3 as we did for step 1

2. Execute the following sql commands respectively.
- `upsert/create_source.sql`
- `upsert/create_mv.sql`
- `upsert/create_sink.sql`

After execution, we will import RisingWave's data change log into the snowflake's table.

3. We then use the following sql statement to create the dynamic table. We can select it to get the result of the upsert
```
CREATE OR REPLACE DYNAMIC TABLE user_behaviors
TARGET_LAG = '1 minute'
WAREHOUSE = test_warehouse
AS SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY {primary_key} ORDER BY __row_id DESC) AS dedupe_id
FROM t3
) AS subquery
WHERE dedupe_id = 1 AND (__op = 1 or __op = 3)
```
13 changes: 13 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- please note that the column name(s) for your mv should be *exactly*
-- the same as the column name(s) in your snowflake table, since we are matching column by name.

CREATE MATERIALIZED VIEW ss_mv AS
WITH sub AS changelog FROM user_behaviors
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp,
changelog_op AS __op,
_changelog_row_id::bigint AS __row_id
FROM
sub;
19 changes: 19 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE SINK snowflake_sink FROM ss_mv WITH (
connector = 'snowflake',
type = 'append-only',
snowflake.database = 'EXAMPLE_DB',
snowflake.schema = 'EXAMPLE_SCHEMA',
snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE',
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
snowflake.user = 'XZHSEH',
snowflake.rsa_public_key_fp = 'EXAMPLE_FP',
snowflake.private_key = 'EXAMPLE_PK',
snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET',
snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID',
snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY',
snowflake.aws_region = 'EXAMPLE_REGION',
snowflake.s3_path = 'EXAMPLE_S3_PATH',
-- depends on your mv setup, note that snowflake sink *only* supports
-- `append-only` mode at present.
force_append_only = 'true'
);
19 changes: 19 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- please note that this will create a source that generates 1,000 rows in 10 seconds
-- you may want to change the configuration for better testing / demo purpose

CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ message FilterNode {
}

message ChangeLogNode {
// Whether or not there is an op in the final output.
bool need_op = 1;
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub struct ChangeLog<PlanRef> {
pub input: PlanRef,
// If there is no op in the output result, it is false, example 'create materialized view mv1 as with sub as changelog from t1 select v1 from sub;'
pub need_op: bool,
// False before rewrite, true after rewrite
// Before rewrite. If there is no changelog_row_id in the output result, it is false.
// After rewrite. It is always true.
pub need_changelog_row_id: bool,
}
impl<PlanRef: GenericPlanRef> DistillUnit for ChangeLog<PlanRef> {
Expand Down
23 changes: 16 additions & 7 deletions src/frontend/src/optimizer/plan_node/logical_changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,24 @@ impl PlanTreeNodeUnary for LogicalChangeLog {
input_col_change: ColIndexMapping,
) -> (Self, ColIndexMapping) {
let changelog = Self::new(input, self.core.need_op, true);
if self.core.need_op {
let mut output_vec = input_col_change.to_parts().0.to_vec();
let len = input_col_change.to_parts().1;

let out_col_change = if self.core.need_op {
let (mut output_vec, len) = input_col_change.into_parts();
output_vec.push(Some(len));
let out_col_change = ColIndexMapping::new(output_vec, len + 1);
(changelog, out_col_change)
ColIndexMapping::new(output_vec, len + 1)
} else {
(changelog, input_col_change)
}
input_col_change
};

let (mut output_vec, len) = out_col_change.into_parts();
let out_col_change = if self.core.need_changelog_row_id {
output_vec.push(Some(len));
ColIndexMapping::new(output_vec, len + 1)
} else {
ColIndexMapping::new(output_vec, len + 1)
};

(changelog, out_col_change)
}
}

Expand Down

0 comments on commit d9a90d6

Please sign in to comment.