Skip to content

Commit

Permalink
Merge pull request confluentinc#68 from confluentinc/pdruley-themes
Browse files Browse the repository at this point in the history
Add Patrick Druley's industry theme demos
  • Loading branch information
rmoff authored May 16, 2019
2 parents 5a36a68 + 9736f22 commit e8c188d
Show file tree
Hide file tree
Showing 13 changed files with 4,379 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ You need to allocate Docker 8GB when running these. Avoid allocating all your ma
- Docker Compose for just the [community licensed components of Confluent Platform](cos)
- [Topic Tailer](topic-tailer), stream topics to the browser using websockets
- [KPay payment processing example](scalable-payment-processing)
- [Industry themes (e.g. banking Next Best Offer)](industry-themes)


## Feedback & Questions
Expand Down
1,000 changes: 1,000 additions & 0 deletions industry-themes/next_best_offer_banking/data/customer_activity.json

Large diffs are not rendered by default.

1,000 changes: 1,000 additions & 0 deletions industry-themes/next_best_offer_banking/data/customers_fin.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions industry-themes/next_best_offer_banking/data/offers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"offer_id":1,"offer_name":"new_savings","offer_url":"http://google.com.br/magnis/dis/parturient.json"}
{"offer_id":2,"offer_name":"new_checking","offer_url":"https://earthlink.net/in/ante.js"}
{"offer_id":3,"offer_name":"new_home_loan","offer_url":"https://webs.com/in/ante.jpg"}
{"offer_id":4,"offer_name":"new_auto_loan","offer_url":"http://squidoo.com/venenatis/non/sodales/sed/tincidunt/eu.js"}
{"offer_id":5,"offer_name":"no_offer","offer_url":"https://ezinearticles.com/ipsum/primis/in/faucibus/orci/luctus.html"}
151 changes: 151 additions & 0 deletions industry-themes/next_best_offer_banking/ksql/next_best_offer.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
SET 'auto.offset.reset'='earliest';

CREATE STREAM CUSTOMER_ACTIVITY_STREAM
(
ACTIVITY_ID INTEGER,
IP_ADDRESS STRING,
CUSTOMER_ID INTEGER,
ACTIVITY_TYPE STRING,
PROPENSITY_TO_BUY DOUBLE
)
WITH (
KAFKA_TOPIC = 'CUSTOMER_ACTIVITY_STREAM',
VALUE_FORMAT = 'JSON'
);

CREATE STREAM CUSTOMER_ACTIVITY_STREAM_KEY
WITH (
KAFKA_TOPIC = 'CUSTOMER_ACTIVITY_STREAM_KEY',
VALUE_FORMAT = 'AVRO'
) AS
SELECT
ACTIVITY_ID,
IP_ADDRESS,
CAST(CUSTOMER_ID AS STRING) AS CUSTOMER_ID_STR,
ACTIVITY_TYPE,
PROPENSITY_TO_BUY
FROM CUSTOMER_ACTIVITY_STREAM
PARTITION BY CUSTOMER_ID_STR;

CREATE STREAM CUSTOMERS_STREAM
(
CUSTOMER_ID INTEGER,
FIRST_NAME STRING,
LAST_NAME STRING,
EMAIL STRING,
GENDER STRING,
INCOME INTEGER,
FICO INTEGER
)
WITH (
KAFKA_TOPIC = 'CUSTOMERS_STREAM',
VALUE_FORMAT = 'JSON'
);

CREATE STREAM "CUSTOMERS_STREAM_KEY"
WITH (
KAFKA_TOPIC = 'CUSTOMERS_STREAM_KEY',
VALUE_FORMAT = 'AVRO'
) AS
SELECT
CAST(CUSTOMER_ID AS STRING) AS CUSTOMER_ID_STR,
FIRST_NAME,
LAST_NAME,
EMAIL,
GENDER,
INCOME,
FICO
FROM CUSTOMERS_STREAM
PARTITION BY CUSTOMER_ID_STR;

CREATE TABLE CUSTOMERS_TABLE
(
CUSTOMER_ID_STR STRING,
FIRST_NAME STRING,
LAST_NAME STRING,
EMAIL STRING,
GENDER STRING,
INCOME INTEGER,
FICO INTEGER
)
WITH (
KAFKA_TOPIC = 'CUSTOMERS_STREAM_KEY',
VALUE_FORMAT = 'AVRO',
KEY = 'CUSTOMER_ID_STR'
);

CREATE STREAM OFFERS_STREAM
(
OFFER_ID INTEGER,
OFFER_NAME STRING,
OFFER_URL STRING
)
WITH (
KAFKA_TOPIC = 'OFFERS_STREAM',
VALUE_FORMAT = 'JSON'
);

CREATE STREAM OFFERS_STREAM_KEY
WITH (
KAFKA_TOPIC = 'OFFERS_STREAM_KEY',
VALUE_FORMAT = 'AVRO'
) AS
SELECT
CAST(OFFER_ID AS STRING) AS OFFER_ID_STR,
OFFER_NAME,
OFFER_URL
FROM OFFERS_STREAM
PARTITION BY OFFER_ID_STR;

CREATE TABLE OFFERS_TABLE
(
OFFER_ID_STR STRING,
OFFER_NAME STRING,
OFFER_URL STRING
)
WITH (
KAFKA_TOPIC = 'OFFERS_STREAM_KEY',
VALUE_FORMAT = 'AVRO',
KEY = 'OFFER_ID_STR'
);

CREATE STREAM NEXT_BEST_OFFER
WITH (
KAFKA_TOPIC = 'NEXT_BEST_OFFER',
VALUE_FORMAT = 'AVRO'
) AS
SELECT
cask.ACTIVITY_ID,
cask.CUSTOMER_ID_STR as CUSTOMER_ID_STR,
cask.PROPENSITY_TO_BUY,
cask.ACTIVITY_TYPE,
ct.INCOME,
ct.FICO,
CASE
WHEN ct.INCOMe > 100000 AND ct.FICO < 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN '1'
WHEN ct.INCOME < 50000 AND cask.PROPENSITY_TO_BUY < 0.9 THEN '2'
WHEN ct.INCOME >= 50000 AND ct.FICO >= 600 AND cask.PROPENSITY_TO_BUY < 0.9 THEN '3'
WHEN ct.INCOME > 100000 AND ct.FICO >= 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN '4'
ELSE '5'
END AS OFFER_ID_STR
FROM CUSTOMER_ACTIVITY_STREAM_KEY cask
LEFT OUTER JOIN CUSTOMERS_TABLE ct
ON cask.CUSTOMER_ID_STR = ct.CUSTOMER_ID_STR;

CREATE STREAM NEXT_BEST_OFFER_LOOKUP
WITH (
KAFKA_TOPIC = 'NEXT_BEST_OFFER_LOOKUP',
VALUE_FORMAT = 'AVRO'
) AS
SELECT
nbo.ACTIVITY_ID,
nbo.CUSTOMER_ID_STR,
nbo.PROPENSITY_TO_BUY,
nbo.ACTIVITY_TYPE,
nbo.INCOME,
nbo.FICO,
ot.OFFER_NAME,
ot.OFFER_URL
FROM NEXT_BEST_OFFER nbo
INNER JOIN OFFERS_TABLE ot
ON nbo.OFFER_ID_STR = ot.OFFER_ID_STR;
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TERMINATE CSAS_NEXT_BEST_OFFER_LOOKUP_4;

DROP STREAM NEXT_BEST_OFFER_LOOKUP;

CREATE STREAM NEXT_BEST_OFFER_LOOKUP
WITH (
KAFKA_TOPIC = 'NEXT_BEST_OFFER_LOOKUP',
VALUE_FORMAT = 'AVRO'
) AS
SELECT
nbo.ACTIVITY_ID,
nbo.CUSTOMER_ID_STR,
nbo.PROPENSITY_TO_BUY,
nbo.ACTIVITY_TYPE,
nbo.INCOME,
nbo.FICO,
ot.OFFER_ID_STR,
ot.OFFER_NAME,
ot.OFFER_URL
FROM NEXT_BEST_OFFER nbo
INNER JOIN OFFERS_TABLE ot
ON nbo.OFFER_ID_STR = ot.OFFER_ID_STR;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cat customers_fin.json | kafkacat -b localhost:9092 -t CUSTOMERS_STREAM
cat offers.json | kafkacat -b localhost:9092 -t OFFERS_STREAM
kafka-producer-perf-test \
--topic CUSTOMER_ACTIVITY_STREAM \
--throughput 1 \
--producer-props bootstrap.servers=localhost:9092 \
--payload-file customer_activity.json \
--num-records 100000
Loading

0 comments on commit e8c188d

Please sign in to comment.