Skip to content

Commit

Permalink
add masking
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Aug 31, 2018
1 parent 0242cad commit e72392b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
64 changes: 60 additions & 4 deletions gcp-pipeline/env-data/data-wrangling-with-ksql.adoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
= Data Wrangling with KSQL

Robin Moffatt <robin@confluent.io>
v1.00, August 31, 2018
v1.10, August 31, 2018

KSQL, the SQL streaming engine for Apache Kafka, puts the power of stream processing into the hands of anyone who knows SQL. It's fun to use for exploring data in Kafka topics, but its real power comes in building stream processing applications. By continually streaming messages from one Kafka topic to another, applying transformations expressed in SQL, it is possible to build powerful applications doing common data wrangling tasks such as:

Expand Down Expand Up @@ -47,7 +47,7 @@ Create the connectors (one per station):
}
}
----
[https://github.com/confluentinc/demo-scene/blob/master/gcp-pipeline/env-data/connect_source.sh[full code here]]
_https://github.com/confluentinc/demo-scene/blob/master/gcp-pipeline/env-data/connect_source.sh[-> full code on github]_

With this running, I have a Kafka topic per weather station, and data in each:

Expand Down Expand Up @@ -484,6 +484,62 @@ SELECT * FROM ENVIRONMENT_DATA_WITH_TS \
WHERE GEO_DISTANCE(LAT,LONG,53.919066, -1.815725,'KM') < 100;
----
== Masking data with KSQL
As well as 'row' filtering as shown above, KSQL can also be used to filter 'columns' from a message. Imagine you have a field in your source data that you don't want to persist downstream—with KSQL you simply create a derived stream and omit the column(s) in question from the projection:
[source,sql]
----
ksql> DESCRIBE ENVIRONMENT_DATA;

Name : ENVIRONMENT_DATA
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
STATIONREFERENCE | VARCHAR(STRING)
EAREGIONNAME | VARCHAR(STRING)
LABEL | VARCHAR(STRING)
LAT | DOUBLE
LONG | DOUBLE
READING_TS | VARCHAR(STRING)
PARAMETERNAME | VARCHAR(STRING)
READING_VALUE | DOUBLE
UNITNAME | VARCHAR(STRING)
----------------------------------------------

ksql> CREATE STREAM ENVIRONMENT_DATA_MINIMAL AS \
SELECT STATIONREFERENCE, READING_TS, READING_VALUE \
FROM ENVIRONMENT_DATA;

ksql> DESCRIBE ENVIRONMENT_DATA_MINIMAL;

Name : ENVIRONMENT_DATA_MINIMAL
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
STATIONREFERENCE | VARCHAR(STRING)
READING_TS | VARCHAR(STRING)
READING_VALUE | DOUBLE
----------------------------------------------
----
As well as simply dropping a column, KSQL ships with functions to mask data:
[source,sql]
----
ksql> SELECT STATIONREFERENCE, EAREGIONNAME \
FROM ENVIRONMENT_DATA;
L2404 | North East

ksql> SELECT STATIONREFERENCE, EAREGIONNAME, MASK_RIGHT(EAREGIONNAME,4) AS REGION_NAME_MASKED \
FROM ENVIRONMENT_DATA2;
L2404 | North East | North Xxxx
----
There are several `MASK`-based functions, and if you have your own special-sauce you'd like to use here, KSQL https://docs.confluent.io/current/ksql/docs/udf.html#example-udf-class[support UDFs as of 5.0].
== Recap
So far, we've ingested data from several sources, with similar but varying data models. Using KSQL we've wrangling the data:
Expand All @@ -494,7 +550,7 @@ So far, we've ingested data from several sources, with similar but varying data
* Set the message partitioning key
* Set the message timestamp metadata to the correct logical value
* Created derived columns in the transformation
* Filtered the data to include messages only matching a given pattern
* Filtered and masked the data
image::../images/recap.png[]
Expand Down Expand Up @@ -522,7 +578,7 @@ For our analytics, we're going to land the data to Google's Cloud Data Warehouse
"keyfile":"/root/creds/gcp_creds.json"
[...]
----
[https://github.com/confluentinc/demo-scene/blob/master/gcp-pipeline/env-data/connect_sink_gbq.sh[full code here]]
_https://github.com/confluentinc/demo-scene/blob/master/gcp-pipeline/env-data/connect_sink_gbq.sh[-> full code on github]_
Once deployed, we can see data arriving in BigQuery using the Console:
Expand Down
Binary file modified gcp-pipeline/images/recap.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit e72392b

Please sign in to comment.