In this recipe, we aggregate credit card transactions for each customer over a two-hour period and join it with the customer’s average credit card spend. If the total credit card spend over the past two hours is more than the average credit card usage of a customer, the account will be flagged as a possible case of credit card theft.
-
Docker
-
If running on Mac/Windows, at least 4GB allocated to Docker:
docker system info | grep Memory
Should return a value greater than 8GB - if not, the Kafka stack will probably not work.
Minimum version is Confluent Platform 5.0
-
Clone this repository
git clone https://github.com/confluentinc/ksql-recipes-try-it-at-home.git
-
Launch:
cd ksql-recipes-try-it-at-home/detecting-unusual-card-activity docker-compose up -d
-
Run KSQL CLI:
docker-compose exec ksql-cli ksql http://ksql-server:8088
-
Register the existing
transactions
topic as a KSQL stream:CREATE STREAM TRANSACTIONS_RAW (ACCOUNT_ID VARCHAR, \ TIMESTAMP VARCHAR, \ CARD_TYPE VARCHAR, \ AMOUNT DOUBLE, \ IP_ADDRESS VARCHAR, \ TRANSACTION_ID VARCHAR) \ WITH (KAFKA_TOPIC='transactions',\ VALUE_FORMAT='JSON');
-
Repartition the stream on
account_id
, and use Avro for the target stream (this is optional):CREATE STREAM TRANSACTIONS_SOURCE \ WITH (VALUE_FORMAT='AVRO') AS \ SELECT * \ FROM TRANSACTIONS_RAW \ PARTITION BY ACCOUNT_ID;
-
Register the existing stream of customer data from Oracle in the topic
customers
as a KSQL stream:CREATE STREAM CUST_RAW_STREAM (ID BIGINT, \ FIRST_NAME VARCHAR, \ LAST_NAME VARCHAR, \ EMAIL VARCHAR, \ AVG_CREDIT_SPEND DOUBLE) \ WITH (KAFKA_TOPIC='customers', \ VALUE_FORMAT='JSON');
-
Repartition the customer data stream by
account_id
to prepare for the join, and use Avro for the target stream (this is optional):CREATE STREAM CUSTOMER_REKEYED \ WITH (VALUE_FORMAT='AVRO') AS \ SELECT * \ FROM CUST_RAW_STREAM \ PARTITION BY ID;
-
Register the partitioned customer data topic as a KSQL table used for the join with the incoming stream of transactions:
CREATE TABLE customer \ WITH (KAFKA_TOPIC='CUSTOMER_REKEYED', \ VALUE_FORMAT='AVRO', \ KEY='ID');
-
Join the transactions to customer information:
CREATE STREAM TRANSACTIONS_ENRICHED AS \ SELECT T.ACCOUNT_ID, T.CARD_TYPE, T.AMOUNT, \ C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, \ C.AVG_CREDIT_SPEND \ FROM TRANSACTIONS_SOURCE T \ INNER JOIN CUSTOMER C \ ON T.ACCOUNT_ID = C.ID;
-
Aggregate the stream of transactions for each account ID using a two-hour tumbling window, and filter for accounts in which the total spend in a two-hour period is greater than the customer’s average:
CREATE TABLE POSSIBLE_STOLEN_CARD AS \ SELECT TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START, \ T.ACCOUNT_ID, T.CARD_TYPE, SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND, \ T.FULL_NAME, MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND \ FROM TRANSACTIONS_ENRICHED T \ WINDOW TUMBLING (SIZE 2 HOURS) \ GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME \ HAVING SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND) ;
Examine the output:
ksql> SELECT WINDOW_START, ACCOUNT_ID, CARD_TYPE, \ TOTAL_CREDIT_SPEND, FULL_NAME, AVG_CREDIT_SPEND \ FROM POSSIBLE_STOLEN_CARD; 2019-01-11 16:00:00 +0000 | 100019 | jcb | 90.69 | Horatius Keefe | 60.58 2019-01-11 16:00:00 +0000 | 100012 | mastercard | 84.04 | Juditha Shwalbe | 53.94 2019-01-11 16:00:00 +0000 | 100016 | maestro | 76.01 | Milo Drewes | 68.33 2019-01-11 16:00:00 +0000 | 100035 | visa-electron | 69.61 | Roxine Furminger | 59.68 …