-
Register for an account at https://datafeeds.networkrail.co.uk/
-
Set username and password in
/data/credentials.properties /data/set_credentials_env.sh
-
Make sure you’ve allocated Docker a bunch of memory. Like, at least 8GB. If you don’t then you’ll see containers appearing to randomly die and you’ll get frustrated 😕
-
Check how much memory Docker has using this:
docker system info | grep Memory
-
-
Launch the stack
docker-compose up -d
-
Check health
docker-compose ps
-
Launch ksqlDB CLI
$ docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088' =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = Event Streaming Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2020 Confluent Inc. CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
N.B. A lot of the code is complete, but not documented below. The canonical version is the code; the docs below may or may not be accurate and/or complete. The code is sensibly named and laid out though so should be easy to follow.
cd data
source ./set_credentials_env.sh
cd ./ingest/movements/
./00_load_canx_reason_code.sh
cd ../cif_schedule
./00_load_opencage.sh
./00_ingest_schedule.sh
You can ignore the jq: error (at <stdin>:16515): Cannot iterate over null (null)
messages.
ksql> PRINT CIF_FULL_DAILY FROM BEGINNING LIMIT 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/01/05 11:49:54.286 Z, key: <null>, value: {"JsonTimetableV1":{"classification":"public","timestamp":1609805484,"owner":"Network Rail","Sender":{"organisation":"Rockshore","application":"NTROD","component":"SCHEDULE"},"Metadata":{"type":"full","sequence":3127}}}
Topic printing ceased
ksql>
Do this before starting on the movement/cancellation data below
From ksqlDB CLI:
run script '/data/ksql/02_cif_schedule/00_opencage.ksql';
PRINT 'LOCATION_GEOCODE' FROM BEGINNING LIMIT 1;
run script '/data/ksql/02_cif_schedule/01_geocode_table.ksql';
run script '/data/ksql/02_cif_schedule/01_schedule_raw.ksql';
run script '/data/ksql/02_cif_schedule/02_location.ksql';
PRINT 'TIPLOC_FLAT_KEYED' FROM BEGINNING LIMIT 1;
PRINT 'STANOX_GEO' FROM BEGINNING LIMIT 1;
run script '/data/ksql/02_cif_schedule/03_location_table.ksql';
run script '/data/ksql/02_cif_schedule/04_schedule.ksql';
PRINT 'SCHEDULE_00' FROM BEGINNING LIMIT 1;
run script '/data/ksql/02_cif_schedule/05_schedule_table.ksql';
If you get Avro schema for message values on topic TIPLOC_FLAT_KEYED does not exist in the Schema Registry.
after running 02_tiploc.ksql
then wait a few moments and then re-run it; the topic on which it’s built is created and populated by the preceeding step, and it may not have kicked in yet. You can also run PRINT 'SCHEDULE_02' FROM BEGINNING LIMIT 5;
and wait until you get some data.
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------------------
CIF_RAW | CIF_FULL_DAILY | KAFKA | JSON | false
KSQL_PROCESSING_LOG | confluent_rmoff_02ksql_processing_log | KAFKA | JSON | false
LOCATION_GEOCODE | LOCATION_GEOCODE | KAFKA | AVRO | false
OPENCAGE | opencage | KAFKA | JSON | false
SCHEDULE_00 | SCHEDULE_00 | KAFKA | AVRO | false
STANOX_FLAT | STANOX_FLAT | KAFKA | AVRO | false
STANOX_GEO | STANOX_GEO | KAFKA | AVRO | false
TIPLOC_FLAT_KEYED | TIPLOC_FLAT_KEYED | KAFKA | AVRO | false
TIPLOC_FLAT_KEYED_DUP | TIPLOC_FLAT_KEYED_DUP | KAFKA | AVRO | false
------------------------------------------------------------------------------------------------------
SET 'auto.offset.reset' = 'earliest';
SELECT SCHEDULE_KEY,
TRAIN_STATUS,
POWER_TYPE,
SEATING_CLASSES,
ORIGIN_TPS_DESCRIPTION, ORIGIN_PUBLIC_DEPARTURE_TIME,
DESTINATION_TPS_DESCRIPTION, DESTINATION_PUBLIC_ARRIVAL_TIME
FROM SCHEDULE_00
WHERE ORIGIN_PUBLIC_DEPARTURE_TIME IS NOT NULL
EMIT CHANGES
LIMIT 1;
Y62982/2019-09-03/O | Passenger & Parcels (Permanent - WTT) | Electric Multiple Unit | Standard only | BRADFORD FORSTER SQUARE | 1841 | SKIPTON | 1922
Limit Reached
Query terminated
ksql> SHOW TABLES;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
-----------------------------------------------------------------------------------
LOCATION_GEOCODE_T | LOCATION_GEOCODE | KAFKA | AVRO | false
SCHEDULE_T | SCHEDULE_00 | KAFKA | AVRO | false
STANOX | STANOX_GEO | KAFKA | AVRO | false
TIPLOC | TIPLOC_FLAT_KEYED | KAFKA | AVRO | false
TIPLOC_DUP | TIPLOC_FLAT_KEYED_DUP | KAFKA | AVRO | false
-----------------------------------------------------------------------------------
SET 'auto.offset.reset' = 'earliest';
SELECT TIPLOC_CODE,
STANOX,
TPS_DESCRIPTION,
OPENCAGE_TOTAL_RESULTS,
GEO_OSM_URL
FROM STANOX
WHERE OPENCAGE_TOTAL_RESULTS>0
EMIT CHANGES
LIMIT 1;
+---------------+-------+----------------+-----------------------+-----------+
|TIPLOC_CODE |STANOX |TPS_DESCRIPTION |OPENCAGE_TOTAL_RESULTS |GEO_OSM_URL|
+---------------+-------+----------------+-----------------------+-----------+
|SHDN |15831 |SHILDON |4 |null |
Limit Reached
Query terminated
-
Create Kafka Connect connector(s):
./data/ingest/movements/00_ingest.sh
Check status:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort
source | source-activemq-networkrail-TRAIN_MVT_EA_TOC-01 | RUNNING | RUNNING | io.confluent.connect.activemq.ActiveMQSourceConnector source | source-activemq-networkrail-TRAIN_MVT_ED_TOC-01 | RUNNING | RUNNING | io.confluent.connect.activemq.ActiveMQSourceConnector source | source-activemq-networkrail-TRAIN_MVT_EM_TOC-01 | RUNNING | RUNNING | io.confluent.connect.activemq.ActiveMQSourceConnector source | source-activemq-networkrail-TRAIN_MVT_HB_TOC-01 | RUNNING | RUNNING | io.confluent.connect.activemq.ActiveMQSourceConnector
Check there’s some data:
docker exec kafkacat kafkacat -b broker:29092 -t networkrail_TRAIN_MVT -C -c1 {"messageID":"ID:opendata-backend.rockshore.net-38745-1609843645734-11:1:2:32:1194"}{"messageID":"ID:opendata-backend.rockshore.net-38745-1609843645734-11:1:2:32:1194","messageType":"text","timestamp":1609868889020,"deliveryMode":2,"correlationID":null,"replyTo":null,"destination":{"destinationType":"topic","name":"TRAIN_MVT_EA_TOC"},"redelivered":false,"type":null,"expiration":1609869189020,"priority":4,"properties":{},"bytes":null,"map":null,"text":"[{\"header\":{\"msg_type\":\"0003\",\"source_dev_id\":\"\",\"user_id\":\"\",\"original_data_source\":\"SMART\",\"msg_queue_timestamp\":\"1609868887000\",\"source_system_id\":\"TRUST\"},\"body\":{\"event_type\":\"DEPARTURE\",\"gbtt_timestamp\":\"\",\"original_loc_stanox\":\"\",\"planned_timestamp\":\"1609868910000\",\"timetable_variation\":\"1\",\"original_loc_timestamp\":\"\",\"current_train_id\":\"\",\"delay_monitoring_point\":\"false\",\"next_report_run_time\":\"2\",\"reporting_stanox\":\"00000\",\"actual_timestamp\":\"1609868820000\",\"correction_ind\":\"false\",\"event_source\":\"AUTOMATIC\",\"train_file_address\":null,\"platform\":\"\",\"division_code\":\"20\",\"train_terminated\":\"false\",\"train_id\":\"129M30MP05\",\"offroute_ind\":\"false\",\"variation_status\":\"EARLY\",\"train_service_code\":\"21731000\",\"toc_id\":\"20\",\"loc_stanox\":\"35439\",\"auto_expected\":\"true\",\"direction_ind\":\"DOWN\",\"route\":\"2\",\"planned_event_type\":\"DEPARTURE\",\"next_report_stanox\":\"36605\",\"line_ind\":\"\"}},{\"header\":{\"msg_type\":\"0003\",\"source_dev_id\":\"\",\"user_id\":\"\",\"original_data_source\":\"SMART\",\"msg_queue_timestamp\":\"1609868887000\",\"source_system_id\":\"TRUST\"},\"body\":{\"event_type\":\"DEPARTURE\",\"gbtt_timestamp\":\"\",\"original_loc_stanox\":\"\",\"planned_timestamp\":\"1609868940000\",\"timetable_variation\":\"2\",\"original_loc_timestamp\":\"\",\"current_train_id\":\"\",\"delay_monitoring_point\":\"true\",\"next_report_run_time\":\"6\",\"reporting_stanox\":\"16602\",\"actual_timestamp\":\"1609868820000\",\"correction_ind\":\"false\",\"event_source\":\"AUTOMATIC\",\"train_file_address\":null,\"platform\":\" 2\",\"division_code\":\"20\",\"train_terminated\":\"false\",\"train_id\":\"191K28MU05\",\"offroute_ind\":\"false\",\"variation_status\":\"EARLY\",\"train_service_code\":\"21733000\",\"toc_id\":\"20\",\"loc_stanox\":\"16602\",\"auto_expected\":\"true\",\"direction_ind\":\"UP\",\"route\":\"1\",\"planned_event_type\":\"DEPARTURE\",\"next_report_stanox\":\"17112\",\"line_ind\":\"M\"}}]"}
Hacky way to keep the connector running by restarting it after network glitches etc
while [ 1 -eq 1 ]; do ./data/ingest/movements/check_latest_timestamp_mac.sh ; ./data/ingest/movements/restart_failed_connector_tasks.sh sleep 300 done
-
Set pipeline running to split out payload batches into single messages
./data/ingest/movements/01_explode.sh &
Group tm_explode rebalanced (memberid rdkafka-c53a4270-e767-493a-b5de-2244b389e645): assigned: networkrail_TRAIN_MVT [0] % Reached end of topic networkrail_TRAIN_MVT [0] at offset 189 …
Check the data on the target topic
$ docker exec kafkacat kafkacat -b broker:29092 -t networkrail_TRAIN_MVT_X -C -c1 | jq '.' { "header": { "msg_type": "0003", "source_dev_id": "", "user_id": "", "original_data_source": "SMART", "msg_queue_timestamp": "1567674217000", "source_system_id": "TRUST" }, "body": { "event_type": "DESTINATION", "gbtt_timestamp": "1567677780000", "original_loc_stanox": "", "planned_timestamp": "1567677660000", "timetable_variation": "4", "original_loc_timestamp": "", "current_train_id": "", "delay_monitoring_point": "true", "next_report_run_time": "", "reporting_stanox": "54311", "actual_timestamp": "1567677900000", "correction_ind": "false", "event_source": "AUTOMATIC", "train_file_address": null, "platform": " 1", "division_code": "61", "train_terminated": "true", "train_id": "121Y14M605", "offroute_ind": "false", "variation_status": "LATE", "train_service_code": "21700001", "toc_id": "61", "loc_stanox": "54311", "auto_expected": "true", "direction_ind": "UP", "route": "0", "planned_event_type": "DESTINATION", "next_report_stanox": "", "line_ind": "" } }
RUN SCRIPT '/data/ksql/03_movements/01_canx_reason.ksql';
RUN SCRIPT '/data/ksql/03_movements/01_movement_raw.ksql';
RUN SCRIPT '/data/ksql/03_movements/02_activations.ksql';
PRINT 'TRAIN_ACTIVATIONS_00' FROM BEGINNING LIMIT 1;
RUN SCRIPT '/data/ksql/03_movements/03_activations_table.ksql';
RUN SCRIPT '/data/ksql/03_movements/04_cancellations_nway.ksql';
RUN SCRIPT '/data/ksql/03_movements/04_movements_nway.ksql';
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') as ACTUAL_TIMESTAMP,
EVENT_TYPE,
LOC_NLCDESC,
PLATFORM,
VARIATION ,
TOC,
TRAIN_ID,
GEOHASH
FROM TRAIN_MOVEMENTS
WHERE ORIGIN_TPS_DESCRIPTION = 'LEEDS'
EMIT CHANGES;
+---------------------+-----------+-------------------+-----------+------------+-----------------------------+-----------+----------------------+
|ACTUAL_TIMESTAMP |EVENT_TYPE |LOC_NLCDESC |PLATFORM |VARIATION |TOC |TRAIN_ID |GEOHASH |
+---------------------+-----------+-------------------+-----------+------------+-----------------------------+-----------+----------------------+
|2021-01-25 11:16:09 |DEPARTURE |LEEDS |Platform 6 |ON TIME |London North Eastern Railway |171A26MI25 |gcwfhcebdvxtmzmvpgye |
|2021-01-25 11:17:44 |DEPARTURE |WHITEHALL JN | |1 MINS LATE |London North Eastern Railway |171A26MI25 |gcwfh2xcgev4kgnq8t02 |
|2021-01-25 11:18:04 |DEPARTURE |HOLBECK JUNCTION | |1 MINS LATE |London North Eastern Railway |171A26MI25 |gcwcuxs5fnfj87uf6hx9 |
|2021-01-25 11:25:54 |ARRIVAL |WAKEFIELD WESTGATE |Platform 1 |1 MINS LATE |London North Eastern Railway |171A26MI25 |gcwcmsxdb89yec9h6um3 |
|2021-01-25 11:29:09 |DEPARTURE |WAKEFIELD WESTGATE |Platform 1 |ON TIME |London North Eastern Railway |171A26MI25 |gcwcmsxdb89yec9h6um3 |
Regarding activations:
Most trains are called automatically (auto-call) before the train is due to run, either 1 or 2 hours depending on the train’s class. The TRUST mainframe runs an internal process every 30 seconds throughout the day, causing potentially two lots of train activation messages to be received every minute.
therefore the point at which you start the pipeline there may be movement messages for trains for which the activation message was sent prior to the pipeline starting. This consequently means that the movements won’t be linked to schedules because activations provide the conduit.
Once all pipelines are up and running, execute ./data/configure_topics.sh
to set the retention period to 26 weeks on each topic.
Set up the sink connectors:
./data/egress/elasticsearch/00_create_template.sh
./data/egress/elasticsearch/01_create_sinks.sh
./data/egress/elasticsearch/02_create_kibana_metadata.sh
Status
./data/egress/elasticsearch/list_indices_stats.sh
Connectors
----------
sink-elastic-schedule_02-v01 | RUNNING | RUNNING
sink-elastic-train_cancellations_02-v01 | RUNNING | RUNNING
sink-elastic-train_cancellations_activations_schedule_00-v01 | RUNNING | RUNNING
sink-elastic-train_movements_01-v01 | RUNNING | RUNNING
sink-elastic-train_movements_activations_schedule_00-v01 | RUNNING | RUNNING
Indices and doc count
---------------------
train_movements_01 0
train_movements_activations_schedule_00 0
train_cancellations_activations_schedule_00 0
train_cancellations_02 0
schedule_02 42529
-
Explore in Kibana’s Discover view
-
Use Kibana’s Management → Saved Objects → Import option to import the
/data/egress/elasticsearch/kibana_objects.json
file
./data/egress/postgres/00_create_sink.sh
$ docker-compose exec postgres bash -c 'echo "select count(*) from \"TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00\";" | psql -U $POSTGRES_USER $POSTGRES_DB'
count
-------
450
(1 row)
SELECT "ACTUAL_TIMESTAMP", to_timestamp("ACTUAL_TIMESTAMP"/1000) FROM "TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00" ORDER BY "ACTUAL_TIMESTAMP" DESC LIMIT 5;