Skip to content

Commit

Permalink
Merge pull request confluentinc#235 from pdruley/pdruley/fraud-graphing
Browse files Browse the repository at this point in the history
new use case for insurance fraud using neo4j
  • Loading branch information
rmoff authored Nov 10, 2021
2 parents b9b64ae + 3ab2936 commit 77e6f50
Show file tree
Hide file tree
Showing 9 changed files with 1,216 additions and 66 deletions.
1 change: 1 addition & 0 deletions industry-themes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ In some cases, a Docker compose file is provided for easy deployment but the pro
- [Tracking field assets for utilities companies](field_management)
- [Client services for Foreign Exchange market trading](fx_client_services)
- [Looking up claim statics by patient with FHIR events](healthcare_claims_microservices)
- [Using a graph model to detect insurance fraud](insurance_fraud_graphing)
- [Auto part tracking with enriched GPS events](logistics_enrichment)
- [Medical device alerts from noisy sensors](medical_devices)
- Building next best offer engines with session information ([Banking](next_best_offer_banking) or [Insurance](next_best_offer_insurance))
Expand Down
1,000 changes: 1,000 additions & 0 deletions industry-themes/insurance_fraud_graphing/data/auto_insurance_claims.json

Large diffs are not rendered by default.

62 changes: 62 additions & 0 deletions industry-themes/insurance_fraud_graphing/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
---
version: '2'
services:

neo4j:
image: neo4j:4.3.3-enterprise
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_dbms_memory_heap_max__size: 2G
NEO4J_dbms_memory_heap_initial__size: 1G
NEO4J_dbms_memory_pagecache_size: 1G
NEO4J_AUTH: neo4j/admin
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"

connect:
image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
hostname: connect
container_name: connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect"
CONNECT_CONFIG_STORAGE_TOPIC: local-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: local-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: local-connect-status
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
# Connect worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_SASL_MECHANISM: PLAIN
# Connect producer
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
# Connect consumer
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
CREATE STREAM auto_insurance_claims
(
CLAIM_ID INT,
CLAIM_TYPE STRING,
CLAIM_AMOUNT_USD DECIMAL(10,2),
ADJUSTER STRING,
INSURED STRING,
PAYEE STRING
)
WITH (
KAFKA_TOPIC = 'auto_insurance_claims',
VALUE_FORMAT = 'JSON'
);

CREATE STREAM auto_insurance_claims_avro
WITH (
KAFKA_TOPIC = 'auto-insurance-claims-avro',
VALUE_FORMAT = 'AVRO'
) AS
SELECT
CLAIM_ID,
CLAIM_TYPE,
ADJUSTER,
INSURED,
PAYEE
FROM auto_insurance_claims;
17 changes: 17 additions & 0 deletions industry-themes/insurance_fraud_graphing/neo4j-sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "Neo4jSinkConnector",
"config": {
"topics": "auto-insurance-claims-avro",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "admin",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.auto-insurance-claims-avro": "MERGE (c:Claim{claim_id: event.CLAIM_ID, claim_type: event.CLAIM_TYPE}) MERGE (a:Adjuster{name: event.ADJUSTER}) MERGE (i:Insured{name: event.INSURED}) MERGE (p:Payee{name: event.PAYEE}) MERGE (a)-[:ADJUSTED]->(c) MERGE (i)-[:FILED]->(c) MERGE (p)-[:PAID_BY]->(c)"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=<bootstrap_server>
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<api_key>' password='<api_secret>';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kafka-producer-perf-test \
--topic auto_insurance_claims \
--throughput 1 \
--producer.config java-producer.properties \
--payload-file ../data/auto_insurance_claims.json \
--num-records 100000
67 changes: 67 additions & 0 deletions industry-themes/insurance_fraud_graphing/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#this demo uses Confluent Cloud and self-managed connectors using cp all in one cloud
#full instructions can be found here
#https://docs.confluent.io/platform/current/tutorials/build-your-own-demos.html#cp-all-in-one-cloud

#get your client configs fron Confluent Cloud and put them here
vi $HOME/.confluent/java.config

#download the ccloud library shell script
curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh

#source the functions
source ./ccloud_library.sh

#generate the configs from your java config
ccloud::generate_configs $HOME/.confluent/java.config

#source the delta configs to environments variables
source delta_configs/env.delta

#start up the docker images
docker-compose up -d

#bash into the connect container
docker exec -it connect bash

#install the Neo4j connector
confluent-hub install neo4j/kafka-connect-neo4j:1.0.9
#answer a bunch of questions

#restart connect
docker restart connect

#create topic auto_insurance_claims
ccloud kafka topic create auto_insurance_claims

#you'll need a ksqlDB app to run the ksql code
ccloud ksql app create claim_formatter

#run ksql in claim-formatter.ksql

#upload the connector configuration
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @neo4j-sink.json

#wait a bit
curl -X GET http://localhost:8083/connectors/Neo4jSinkConnector/status | jq

#run the producer to Confluent Cloud
bash < producers.sh

#open the Neo4j browser
http://localhost:7474/browser/

#cypher queries
MATCH (a:Adjuster)-[r:ADJUSTED]->(c:Claim)<-[PAID_BY]-(p:Payee)
RETURN a.name, count(r) as count, p.name
ORDER BY count DESC

MATCH (n:Adjuster)-[r:ADJUSTED]->(:Claim)
RETURN n.name, count(r) as count
ORDER BY count DESC

MATCH (a:Adjuster{name:'Nebula'})-[r:ADJUSTED]->(c:Claim)<-[PAID_BY]-(p:Payee)
RETURN p.name, count(r) as count
ORDER BY count DESC
93 changes: 27 additions & 66 deletions industry-themes/pizza_orders/ksqlDB/order-event-modeler.ksql
Original file line number Diff line number Diff line change
Expand Up @@ -122,40 +122,34 @@ ARRAY_LENGTH(orderLines) AS orderLineCount,
0 AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_KEYED;

--This is a different way to hande the join instead of doing
--insert into statements and allows for ksqlDB to manage the
--the state store better. It does require correct ordering of
--the event types in the coalesce function.
--CREATE STREAM PIZZA_ORDERS_COMMON_ALT
--WITH (
-- KAFKA_TOPIC = 'pizza_orders_common_alt',
-- VALUE_FORMAT = 'AVRO'
--)
--AS
--SELECT
--ROWKEY AS order_key,
--COALESCE(podk.storeNumber, popk.storeNumber, pok.storeNumber) as storeNumber,
--COALESCE(podk.storeOrderId, popk.storeOrderId, pok.storeOrderId) as storeOrderId,
--COALESCE(podk.businessDate, popk.businessDate, pok.businessDate) as businessDate,
--COALESCE(popk.status, podk.status, pok.status) AS status,
--CASE
-- WHEN ARRAY_LENGTH(coupons) > 0 THEN 'true'
-- ELSE 'false'
--END AS usedCoupon,
--ARRAY_LENGTH(orderLines) AS orderLineCount,
--COALESCE(popk.rackTimeSecs, 0) AS rackTimeSecs,
--COALESCE(popk.orderDeliveryTimeSecs, 0) AS orderDeliveryTimeSecs
--FROM PIZZA_ORDERS_KEYED pok
--FULL OUTER JOIN PIZZA_ORDERS_COMPLETED_KEYED popk
--WITHIN 2 HOURS
--ON pok.order_key = popk.order_key
--FULL OUTER JOIN PIZZA_ORDERS_CANCELLED_KEYED podk
--WITHIN 2 HOURS
--ON pok.order_key = podk.order_key;
INSERT INTO PIZZA_ORDERS_COMMON
SELECT
order_key AS order_key,
storeNumber AS storeNumber,
storeOrderId AS storeOrderId,
businessDate AS businessDate,
status AS status,
CAST(NULL AS STRING) AS usedCoupon,
CAST(NULL AS INT) AS orderLineCount,
CAST(NULL AS INT) AS rackTimeSecs,
CAST(NULL AS INT) AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_CANCELLED_KEYED;


INSERT INTO PIZZA_ORDERS_COMMON
SELECT
order_key AS order_key,
storeNumber AS storeNumber,
storeOrderId AS storeOrderId,
businessDate AS businessDate,
status AS status,
CAST(NULL AS STRING) AS usedCoupon,
CAST(NULL AS INT) AS orderLineCount,
rackTimeSecs AS rackTimeSecs,
orderDeliveryTimeSecs AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_COMPLETED_KEYED;


--this is the materialized table for storing current state of each order based
--on the common model.
CREATE TABLE PIZZA_ORDERS_TABLE
WITH (
KAFKA_TOPIC = 'pizza_orders_table',
Expand All @@ -173,42 +167,9 @@ LATEST_BY_OFFSET(orderLineCount) AS latest_orderLineCount,
LATEST_BY_OFFSET(rackTimeSecs) AS latest_rackTimeSecs,
LATEST_BY_OFFSET(orderDeliveryTimeSecs) AS latest_orderDeliveryTimeSecs
FROM PIZZA_ORDERS_COMMON
WINDOW TUMBLING(SIZE 2 HOURS, RETENTION 6 HOURS, GRACE PERIOD 30 MINUTES)
WINDOW SESSION(30 MINUTES)
GROUP BY order_key;

--now we insert back into the common model the other order types
--and join to the table to get the latest values for feeds we don't
--have in that type. This can easily be expanding for multiple order event types.
INSERT INTO PIZZA_ORDERS_COMMON
SELECT
poc.order_key AS order_key,
poc.storeNumber AS storeNumber,
poc.storeOrderId AS storeOrderId,
poc.businessDate AS businessDate,
poc.status AS status,
pot.latest_usedCoupon AS usedCoupon,
pot.latest_orderLineCount AS orderLineCount,
pot.latest_rackTimeSecs AS rackTimeSecs,
pot.latest_orderDeliveryTimeSecs AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_CANCELLED_KEYED poc
LEFT OUTER JOIN PIZZA_ORDERS_TABLE pot
ON poc.order_key = pot.order_key;

INSERT INTO PIZZA_ORDERS_COMMON
SELECT
poc.order_key AS order_key,
poc.storeNumber AS storeNumber,
poc.storeOrderId AS storeOrderId,
poc.businessDate AS businessDate,
poc.status AS status,
pot.latest_usedCoupon AS usedCoupon,
pot.latest_orderLineCount AS orderLineCount,
poc.rackTimeSecs AS rackTimeSecs,
poc.orderDeliveryTimeSecs AS orderDeliveryTimeSecs
FROM PIZZA_ORDERS_COMPLETED_KEYED poc
LEFT OUTER JOIN PIZZA_ORDERS_TABLE pot
ON poc.order_key = pot.order_key;

--This section creates event from the order lines detail
--and flattens the products out to a easy to consume model.
CREATE STREAM PIZZA_ORDER_LINES
Expand Down

0 comments on commit 77e6f50

Please sign in to comment.