Skip to content

yayasensei/basic_ksqldb_udfs

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Overview

Here are some java functions I developped for KSQLDB following this guide https://docs.ksqldb.io/en/0.29.0-ksqldb/how-to-guides/create-a-user-defined-function/.

KSQLDB currently lacks many basic SQL functions and I haven't found many community projects actively working on it. I'll try to update this repo with new functions, any contribution is appreciated.

Content

Project :

  .
  ├── kafka_dataset_generator.py    - test tool (cf. hands-on)
  └── KSQLDBExtensions              - source eclipse project

Functions :

Name        : DELTA
Overview    : Difference between two successive values
Type        : AGGREGATE
Variations  :
  Variation   : DELTA(val1 DOUBLE)
  Returns     : DOUBLE
Name        : MODULO
Overview    : Modulo operator for scalar columns and constants
Type        : SCALAR
Variations  :
  Variation   : MODULO(v1 DOUBLE, v2 DOUBLE)
  Returns     : DOUBLE
  Variation   : MODULO(v1 DOUBLE, v2 BIGINT)
  Returns     : DOUBLE
  Variation   : MODULO(v1 DOUBLE, v2 INT)
  Returns     : DOUBLE
  Variation   : MODULO(v1 BIGINT, v2 BIGINT)
  Returns     : BIGINT
  Variation   : MODULO(v1 BIGINT, v2 INT)
  Returns     : BIGINT
  Variation   : MODULO(v1 INT, v2 INT)
  Returns     : INT

Hands-on

You can follow these steps to try these functions (assuming you've got a ksqldb server/cluster and the underlying kafka cluster set-up) :

  1. Compile the project into an uber-jar (using eclipse)
    1. Enable parameters reflection (gives ksqldb access to our udfs methods parameters which it uses to generate the doc at runtime) :
    (SUBWINDOW) Project Explorer > (RCLIC) [Project] > (CLIC) Properties >
      (WINDOW) Properties for [Project] > (SUBWINDOW) Java Compiler >
        (SUBWINDOW) Classfile Generation > (TICK) Store information about method parameters (usable via reflection)
    
    1. Generate the .jar :
    (SUBWINDOW) Project Explorer > (RCLIC) [Project] > (CLIC) Export >
      (WINDOW) Export > (SELECT) Java > (SELECT) JAR file ; (CLIC) Next
      (WINDOW) JAR Export >
        (SUBWINDOW) Select the resources to export: >
          (SELECT) src/main/java
          (SELECT) lib
        (TICK) Export generated class files and resources
        (TICK) Compress the contents of the JAR file
        (CLIC) Finish
    
  1. Implement the functions on your ksqldb server (on the device hosting the ksqldb server)
    1. Create a folder for your extensions and add them to it
    mkdir [path to your ksqldb extensions folder] && cp [path to your previously generated extensions uber-jar] [path to your ksqldb extensions folder]
    1. Specify the path to your extensions folder in your ksqldb config.
    (EDIT) [path to your ksqldb]/etc/ksqldb/ksqldb-server.properties
    "
    [...]
    ksql.extension.dir=[path to your ksqldb extensions folder]
    [...]
    "
    1. Restart your ksqldb server
    [path to ksqldb]/usr/bin/ksql-server-stop && \
    [path to ksqldb]/usr/bin/ksql-server-start [path to your ksqldb]/etc/ksqldb/ksqldb-server.properties
  1. Check that your ksqldb server implements the new functions
    1. List ksqldb functions
    show functions;
    1. Describe newly added ksqldb functions
    describe function [function];
  1. Test the new functions
    1. Set-up a test environment

      1. Create a kafka topic
      [path to kafka]/bin/kafka-topics.sh --create --bootstrap-server 10.0.72.10:9092 --topic TEST_TOPIC
      1. Extract a stream from it
      CREATE OR REPLACE STREAM TEST_STREAM (NAME STRING, VALUE DOUBLE, TS STRING) WITH (KAFKA_TOPIC='TEST_TOPIC', KEY_FORMAT='KAFKA', TIMESTAMP='TS', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss.SSSX', VALUE_FORMAT='JSON');
      1. Write random data to your topic (alternative : https://docs.ksqldb.io/en/0.10.2-ksqldb/developer-guide/test-and-debug/generate-custom-test-data/ (less flexible))
      ./kafka_dataset_generator.py 10.0.72.10:9092
    2. Try the new functions (from ksqldb cli)

      1. modulo (UDF) :
      SELECT *, modulo(value, 12) AS modulo_12 FROM TEST_STREAM EMIT CHANGES;
      1. delta (UDAF) :
      SELECT name, delta(value) AS delta, latest_by_offset(value,2) last_values, LATEST_BY_OFFSET(ts, 2) as last_tss FROM TEST_STREAM GROUP BY name EMIT CHANGES;
    3. Shut down the test environment

      1. Stop the generator

      2. Delete the test topic

      [path to kafka]/bin/kafka-topics.sh --delete --topic TEST_TOPIC --bootstrap-server 10.0.72.10:9092
      1. Delete the test stream
      drop stream TEST_STREAM;

Additional infos

Notable other repos on the subject (cf. https://github.com/search?q=ksqldb-udf&type=repositories) :

Contact : [email protected]

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 78.0%
  • Python 22.0%