Here are some java functions I developped for KSQLDB following this guide https://docs.ksqldb.io/en/0.29.0-ksqldb/how-to-guides/create-a-user-defined-function/.
KSQLDB currently lacks many basic SQL functions and I haven't found many community projects actively working on it. I'll try to update this repo with new functions, any contribution is appreciated.
Project :
.
├── kafka_dataset_generator.py - test tool (cf. hands-on)
└── KSQLDBExtensions - source eclipse project
Functions :
Name : DELTA
Overview : Difference between two successive values
Type : AGGREGATE
Variations :
Variation : DELTA(val1 DOUBLE)
Returns : DOUBLE
Name : MODULO
Overview : Modulo operator for scalar columns and constants
Type : SCALAR
Variations :
Variation : MODULO(v1 DOUBLE, v2 DOUBLE)
Returns : DOUBLE
Variation : MODULO(v1 DOUBLE, v2 BIGINT)
Returns : DOUBLE
Variation : MODULO(v1 DOUBLE, v2 INT)
Returns : DOUBLE
Variation : MODULO(v1 BIGINT, v2 BIGINT)
Returns : BIGINT
Variation : MODULO(v1 BIGINT, v2 INT)
Returns : BIGINT
Variation : MODULO(v1 INT, v2 INT)
Returns : INT
You can follow these steps to try these functions (assuming you've got a ksqldb server/cluster and the underlying kafka cluster set-up) :
-
Compile the project into an uber-jar (using eclipse)
- Enable parameters reflection (gives ksqldb access to our udfs methods parameters which it uses to generate the doc at runtime) :
(SUBWINDOW) Project Explorer > (RCLIC) [Project] > (CLIC) Properties > (WINDOW) Properties for [Project] > (SUBWINDOW) Java Compiler > (SUBWINDOW) Classfile Generation > (TICK) Store information about method parameters (usable via reflection)
- Generate the .jar :
(SUBWINDOW) Project Explorer > (RCLIC) [Project] > (CLIC) Export > (WINDOW) Export > (SELECT) Java > (SELECT) JAR file ; (CLIC) Next (WINDOW) JAR Export > (SUBWINDOW) Select the resources to export: > (SELECT) src/main/java (SELECT) lib (TICK) Export generated class files and resources (TICK) Compress the contents of the JAR file (CLIC) Finish
-
Implement the functions on your ksqldb server (on the device hosting the ksqldb server)
- Create a folder for your extensions and add them to it
mkdir [path to your ksqldb extensions folder] && cp [path to your previously generated extensions uber-jar] [path to your ksqldb extensions folder]
- Specify the path to your extensions folder in your ksqldb config.
(EDIT) [path to your ksqldb]/etc/ksqldb/ksqldb-server.properties " [...] ksql.extension.dir=[path to your ksqldb extensions folder] [...] "
- Restart your ksqldb server
[path to ksqldb]/usr/bin/ksql-server-stop && \ [path to ksqldb]/usr/bin/ksql-server-start [path to your ksqldb]/etc/ksqldb/ksqldb-server.properties
-
Check that your ksqldb server implements the new functions
- List ksqldb functions
show functions;
- Describe newly added ksqldb functions
describe function [function];
-
Test the new functions
-
Set-up a test environment
- Create a kafka topic
[path to kafka]/bin/kafka-topics.sh --create --bootstrap-server 10.0.72.10:9092 --topic TEST_TOPIC
- Extract a stream from it
CREATE OR REPLACE STREAM TEST_STREAM (NAME STRING, VALUE DOUBLE, TS STRING) WITH (KAFKA_TOPIC='TEST_TOPIC', KEY_FORMAT='KAFKA', TIMESTAMP='TS', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss.SSSX', VALUE_FORMAT='JSON');
- Write random data to your topic (alternative : https://docs.ksqldb.io/en/0.10.2-ksqldb/developer-guide/test-and-debug/generate-custom-test-data/ (less flexible))
./kafka_dataset_generator.py 10.0.72.10:9092
-
Try the new functions (from ksqldb cli)
- modulo (UDF) :
SELECT *, modulo(value, 12) AS modulo_12 FROM TEST_STREAM EMIT CHANGES;
- delta (UDAF) :
SELECT name, delta(value) AS delta, latest_by_offset(value,2) last_values, LATEST_BY_OFFSET(ts, 2) as last_tss FROM TEST_STREAM GROUP BY name EMIT CHANGES;
-
Shut down the test environment
-
Stop the generator
-
Delete the test topic
[path to kafka]/bin/kafka-topics.sh --delete --topic TEST_TOPIC --bootstrap-server 10.0.72.10:9092
- Delete the test stream
drop stream TEST_STREAM;
-
-
Notable other repos on the subject (cf. https://github.com/search?q=ksqldb-udf&type=repositories) :
- https://github.com/hpgrahsl/ksqldb-datetime-functions : date manipulation oriented functions & types built on java 11's time library
- https://github.com/entechlog/ksqlDB-udf : specific operations (strings/array manipulation)
- https://github.com/nbuesing/ksqldb-udf-geospatial : geospatial operations
Contact : [email protected]