Skip to content

Commit

Permalink
Merge pull request confluentinc#201 from pdruley/pdruley/pizza-demo
Browse files Browse the repository at this point in the history
new pizza orders demo
  • Loading branch information
rmoff authored Apr 14, 2021
2 parents c3c33c7 + 6cfb262 commit 2c233a2
Show file tree
Hide file tree
Showing 9 changed files with 3,463 additions and 0 deletions.
19 changes: 19 additions & 0 deletions industry-themes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Welcome to Industry Themes

Modest interpretations on various industry based use cases can be found here. Most include the data, producers and ksqlDB code necessary for demonstrating how stream processing can be applied to business problems.

## Requirements

In some cases, a Docker compose file is provided for easy deployment but the producers can easily be modify to run on any Confluent environment. There are also a few that use Confluent Cloud but again these can be modified for other deployments. Also keep in mind that these have been developed over a period of time where ksqlDB sytnax has changed and some older use cases may not work with newer ksqlDB server instances without some minor changes.

## Contents

- [Tracking field assets for utilities companies](field_management)
- [Client services for Foreign Exchange market trading](fx_client_services)
- [Looking up claim statics by patient with FHIR events](healthcare_claims_microservices)
- [Medical device alerts from noisy sensors](medical_devices)
- Building next best offer engines with session information ([Banking](next_best_offer_banking) or [Insurance](next_best_offer_insurance))
- [Pizza order status tracking with nested JSON](pizza_orders)
- [Acitivty based discounting for ecom](real_time_discounting)
- [Simple spend based segmentation for grocery](real_time_segmentation)
- [Truck GPS and sensor correlation for alerts](truck_sensors)
1,000 changes: 1,000 additions & 0 deletions industry-themes/pizza_orders/data/pizza_orders.json

Large diffs are not rendered by default.

1,000 changes: 1,000 additions & 0 deletions industry-themes/pizza_orders/data/pizza_orders_cancelled.json

Large diffs are not rendered by default.

1,000 changes: 1,000 additions & 0 deletions industry-themes/pizza_orders/data/pizza_orders_completed.json

Large diffs are not rendered by default.

130 changes: 130 additions & 0 deletions industry-themes/pizza_orders/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:6.1.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

schema-registry:
image: confluentinc/cp-schema-registry:6.1.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
image: cnfldemos/cp-server-connect-datagen:0.4.0-6.1.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

control-center:
image: confluentinc/cp-enterprise-control-center:6.1.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021

ksqldb-server:
image: confluentinc/cp-ksqldb-server:6.1.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

244 changes: 244 additions & 0 deletions industry-themes/pizza_orders/ksqlDB/order-event-modeler.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
SET 'auto.offset.reset' = 'earliest';

CREATE STREAM PIZZA_ORDERS
(
storeNumber INT,
storeOrderId INT,
businessDate STRING,
status STRING,
coupons ARRAY<
STRUCT<code int>
>,
orderLines ARRAY<
STRUCT<
product_id INT,
category STRING,
quantity INT,
unit_price DECIMAL(10,2),
net_price DECIMAL(10,2)
>
>
)
WITH (
KAFKA_TOPIC = 'pizza_orders',
VALUE_FORMAT = 'JSON'
);

CREATE STREAM PIZZA_ORDERS_COMPLETED
(
storeNumber INT,
storeOrderId INT,
businessDate STRING,
status STRING,
rackTimeSecs INT,
orderDeliveryTimeSecs INT
)
WITH (
KAFKA_TOPIC = 'pizza_orders_completed',
VALUE_FORMAT = 'JSON'
);

CREATE STREAM PIZZA_ORDERS_CANCELLED
(
storeNumber INT,
storeOrderId INT,
businessDate STRING,
status STRING
)
WITH (
KAFKA_TOPIC = 'pizza_orders_cancelled',
VALUE_FORMAT = 'JSON'
);

--Rekey and convert to AVRO
CREATE STREAM PIZZA_ORDERS_KEYED
WITH (
KAFKA_TOPIC = 'pizza_orders_keyed',
VALUE_FORMAT = 'AVRO'
)
AS
SELECT
CAST(storeNumber AS STRING) + '-' + CAST(storeOrderId AS STRING) + '-' + businessDate AS order_key,
storeNumber,
storeOrderId,
businessDate,
status,
coupons,
orderLines
FROM PIZZA_ORDERS
PARTITION BY (CAST(storeNumber AS STRING) + '-' + CAST(storeOrderId AS STRING) + '-' + businessDate);

CREATE STREAM PIZZA_ORDERS_COMPLETED_KEYED
WITH (
KAFKA_TOPIC = 'pizza_orders_completed_keyed',
VALUE_FORMAT = 'AVRO'
)
AS
SELECT
CAST(storeNumber AS STRING) + '-' + CAST(storeOrderId AS STRING) + '-' + businessDate AS order_key,
storeNumber,
storeOrderId,
businessDate,
status,
rackTimeSecs,
orderDeliveryTimeSecs
FROM PIZZA_ORDERS_COMPLETED
PARTITION BY (CAST(storeNumber AS STRING) + '-' + CAST(storeOrderId AS STRING) + '-' + businessDate);


CREATE STREAM PIZZA_ORDERS_CANCELLED_KEYED
WITH (
KAFKA_TOPIC = 'pizza_orders_cancelled_keyed',
VALUE_FORMAT = 'AVRO'
)
AS
SELECT
CAST(storeNumber AS STRING) + '-' + CAST(storeOrderId AS STRING) + '-' + businessDate AS order_key,
storeNumber,
storeOrderId,
businessDate,
status
FROM PIZZA_ORDERS_CANCELLED
PARTITION BY (CAST(storeNumber AS STRING) + '-' + CAST(storeOrderId AS STRING) + '-' + businessDate);

CREATE STREAM PIZZA_ORDERS_COMMON
WITH (
KAFKA_TOPIC = 'pizza_orders_common',
VALUE_FORMAT = 'AVRO'
)
AS
SELECT
order_key,
storeNumber,
storeOrderId,
businessDate,
status,
CASE
WHEN ARRAY_LENGTH(coupons) > 0 THEN 'true'
ELSE 'false'
END AS usedCoupon,
ARRAY_LENGTH(orderLines) AS orderLineCount,
0 AS rackTimeSecs,
0 AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_KEYED;

--This is a different way to hande the join instead of doing
--insert into statements and allows for ksqlDB to manage the
--the state store better. It does require correct ordering of
--the event types in the coalesce function.
--CREATE STREAM PIZZA_ORDERS_COMMON_ALT
--WITH (
-- KAFKA_TOPIC = 'pizza_orders_common_alt',
-- VALUE_FORMAT = 'AVRO'
--)
--AS
--SELECT
--ROWKEY AS order_key,
--COALESCE(podk.storeNumber, popk.storeNumber, pok.storeNumber) as storeNumber,
--COALESCE(podk.storeOrderId, popk.storeOrderId, pok.storeOrderId) as storeOrderId,
--COALESCE(podk.businessDate, popk.businessDate, pok.businessDate) as businessDate,
--COALESCE(popk.status, podk.status, pok.status) AS status,
--CASE
-- WHEN ARRAY_LENGTH(coupons) > 0 THEN 'true'
-- ELSE 'false'
--END AS usedCoupon,
--ARRAY_LENGTH(orderLines) AS orderLineCount,
--COALESCE(popk.rackTimeSecs, 0) AS rackTimeSecs,
--COALESCE(popk.orderDeliveryTimeSecs, 0) AS orderDeliveryTimeSecs
--FROM PIZZA_ORDERS_KEYED pok
--FULL OUTER JOIN PIZZA_ORDERS_COMPLETED_KEYED popk
--WITHIN 2 HOURS
--ON pok.order_key = popk.order_key
--FULL OUTER JOIN PIZZA_ORDERS_CANCELLED_KEYED podk
--WITHIN 2 HOURS
--ON pok.order_key = podk.order_key;


--this is the materialized table for storing current state of each order based
--on the common model.
CREATE TABLE PIZZA_ORDERS_TABLE
WITH (
KAFKA_TOPIC = 'pizza_orders_table',
VALUE_FORMAT = 'AVRO'
)
AS
SELECT
order_key,
LATEST_BY_OFFSET(storeNumber) AS latest_storeNumber,
LATEST_BY_OFFSET(storeOrderId) AS latest_storeOrderId,
LATEST_BY_OFFSET(businessDate) AS latest_businessDate,
LATEST_BY_OFFSET(status) AS latest_status,
LATEST_BY_OFFSET(usedCoupon) AS latest_usedCoupon,
LATEST_BY_OFFSET(orderLineCount) AS latest_orderLineCount,
LATEST_BY_OFFSET(rackTimeSecs) AS latest_rackTimeSecs,
LATEST_BY_OFFSET(orderDeliveryTimeSecs) AS latest_orderDeliveryTimeSecs
FROM PIZZA_ORDERS_COMMON
WINDOW TUMBLING(SIZE 2 HOURS, RETENTION 6 HOURS, GRACE PERIOD 30 MINUTES)
GROUP BY order_key;

--now we insert back into the common model the other order types
--and join to the table to get the latest values for feeds we don't
--have in that type. This can easily be expanding for multiple order event types.
INSERT INTO PIZZA_ORDERS_COMMON
SELECT
poc.order_key AS order_key,
poc.storeNumber AS storeNumber,
poc.storeOrderId AS storeOrderId,
poc.businessDate AS businessDate,
poc.status AS status,
pot.latest_usedCoupon AS usedCoupon,
pot.latest_orderLineCount AS orderLineCount,
pot.latest_rackTimeSecs AS rackTimeSecs,
pot.latest_orderDeliveryTimeSecs AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_CANCELLED_KEYED poc
LEFT OUTER JOIN PIZZA_ORDERS_TABLE pot
ON poc.order_key = pot.order_key;

INSERT INTO PIZZA_ORDERS_COMMON
SELECT
poc.order_key AS order_key,
poc.storeNumber AS storeNumber,
poc.storeOrderId AS storeOrderId,
poc.businessDate AS businessDate,
poc.status AS status,
pot.latest_usedCoupon AS usedCoupon,
pot.latest_orderLineCount AS orderLineCount,
poc.rackTimeSecs AS rackTimeSecs,
poc.orderDeliveryTimeSecs AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_COMPLETED_KEYED poc
LEFT OUTER JOIN PIZZA_ORDERS_TABLE pot
ON poc.order_key = pot.order_key;

--This section creates event from the order lines detail
--and flattens the products out to a easy to consume model.
CREATE STREAM PIZZA_ORDER_LINES
WITH (
KAFKA_TOPIC = 'pizza_order_lines',
VALUE_FORMAT = 'AVRO'
)
AS
SELECT
order_key,
storeNumber,
storeOrderId,
businessDate,
EXPLODE(orderLines) AS orderLine
FROM PIZZA_ORDERS_KEYED;

CREATE STREAM PIZZA_ORDER_LINES_FLAT
WITH (
KAFKA_TOPIC = 'pizza_order_lines_flat',
VALUE_FORMAT = 'AVRO'
)
AS
SELECT
order_key,
storeNumber,
storeOrderId,
businessDate,
orderLine->product_id AS product_id,
orderLine->category AS category,
orderLine->quantity AS quantity,
orderLine->unit_price AS unit_price,
orderLine->net_price AS net_price
FROM PIZZA_ORDER_LINES;
Loading

0 comments on commit 2c233a2

Please sign in to comment.