forked from confluentinc/demo-scene
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdefine_source_data.ksql
80 lines (77 loc) · 3.19 KB
/
define_source_data.ksql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
-- York rainfall
DROP STREAM flood_monitoring_059793;
CREATE STREAM flood_monitoring_059793 \
(meta STRUCT<publisher VARCHAR, \
comment VARCHAR>, \
items STRUCT<eaRegionName VARCHAR, \
gridReference VARCHAR, \
lat DOUBLE, \
long DOUBLE, \
measures STRUCT<label VARCHAR, \
latestReading STRUCT<\
dateTime VARCHAR, \
value DOUBLE>,\
parameterName VARCHAR, \
unitName VARCHAR>, \
stationReference VARCHAR,\
town VARCHAR,\
eaAreaName VARCHAR,\
RiverName VARCHAR,\
label VARCHAR> \
) WITH (KAFKA_TOPIC='flood-monitoring-059793',VALUE_FORMAT='JSON');
DROP STREAM flood_monitoring_L2404;
CREATE STREAM flood_monitoring_L2404 \
(meta STRUCT<publisher VARCHAR, \
comment VARCHAR>, \
items STRUCT<eaRegionName VARCHAR, \
gridReference VARCHAR, \
lat DOUBLE, \
long DOUBLE, \
measures STRUCT<label VARCHAR, \
latestReading STRUCT<\
dateTime VARCHAR, \
value DOUBLE>,\
parameterName VARCHAR, \
unitName VARCHAR>, \
stationReference VARCHAR,\
town VARCHAR,\
eaAreaName VARCHAR,\
RiverName VARCHAR,\
label VARCHAR> \
) WITH (KAFKA_TOPIC='flood-monitoring-L2404',VALUE_FORMAT='JSON');
DROP STREAM flood_monitoring_L2481;
CREATE STREAM flood_monitoring_L2481 \
(meta STRUCT<publisher VARCHAR, \
comment VARCHAR>, \
items STRUCT<eaRegionName VARCHAR, \
gridReference VARCHAR, \
lat DOUBLE, \
long DOUBLE, \
measures STRUCT<label VARCHAR, \
latestReading STRUCT<\
dateTime VARCHAR, \
value DOUBLE>,\
parameterName VARCHAR, \
unitName VARCHAR>, \
stationReference VARCHAR,\
town VARCHAR,\
eaAreaName VARCHAR,\
RiverName VARCHAR,\
label VARCHAR> \
) WITH (KAFKA_TOPIC='flood-monitoring-L2481',VALUE_FORMAT='JSON');
DROP STREAM flood_monitoring_3680;
CREATE STREAM flood_monitoring_3680 \
(meta STRUCT<publisher VARCHAR, \
comment VARCHAR>, \
items STRUCT<eaRegionName VARCHAR, \
gridReference VARCHAR, \
lat DOUBLE, \
long DOUBLE, \
measures ARRAY<STRUCT<label VARCHAR, \
latestReading STRUCT<\
dateTime VARCHAR, \
value DOUBLE>,\
parameterName VARCHAR, \
unitName VARCHAR>>, \
stationReference VARCHAR> \
) WITH (KAFKA_TOPIC='flood-monitoring-3680',VALUE_FORMAT='JSON');