See full article here env-data/data-wrangling-with-kafka-and-ksql.adoc
ccloud topic create current-datetime
# Current time
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "source_rest_current-datetime",
"config": {
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
"tasks.max": "1",
"rest.source.poll.interval.ms": "60000",
"rest.source.method": "GET",
"rest.source.url": "http://worldclockapi.com/api/json/utc/now",
"rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter",
"rest.source.properties": "Content-Type:application/json,Accept::application/json",
"rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
"rest.source.destination.topics": "current-datetime"
}
}'
Check status:
$ curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
source_rest_current-datetime | RUNNING | RUNNING
source_rest_flood-monitoring-L2404 | RUNNING | RUNNING
Check the data:
"2018-08-07T09:55Z"
"2018-08-07T09:56Z"
#!/bin/bash
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "source-rest-weather_york",
"config": {
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
"tasks.max": "1",
"rest.source.poll.interval.ms": "900000",
"rest.source.method": "GET",
"rest.source.url": "https://api.openweathermap.org/data/2.5/weather?q=York,uk&appid=5139ef0dd688cdb7d864f4e118445aa3",
"rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter",
"rest.source.properties": "Content-Type:application/json,Accept::application/json",
"rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
"rest.source.destination.topics": "york-weather"
}
}'
ksql> create stream weather (weather array<struct<icon varchar ,description varchar, main varchar,id int>>,main struct<temp double,pressure bigint, humidity bigint>, visibility bigint, wind struct<speed double,deg int>,name varchar) with (kafka_topic='london-weather',value_format='json');
Message
----------------
Stream created
----------------
ksql> select name,weather[0], main->temp, main->temp - 273.15 as temp_c from weather;
London | {ICON=04n, DESCRIPTION=overcast clouds, MAIN=Clouds, ID=804} | 291.64 | 18.49000000000001
London | {ICON=04n, DESCRIPTION=overcast clouds, MAIN=Clouds, ID=804} | 291.63 | 18.480000000000018
London | {ICON=04n, DESCRIPTION=overcast clouds, MAIN=Clouds, ID=804} | 291.69 | 18.54000000000002
^CQuery terminated
ksql> select name,weather[0]->description, main->temp, main->temp - 273.15 as temp_c from weather;
London | overcast clouds | 291.64 | 18.49000000000001
London | overcast clouds | 291.63 | 18.480000000000018
London | overcast clouds | 291.69 | 18.54000000000002
ccloud topic create iex-stock-aapl-quote
ccloud topic create iex-stock-aapl-company
# IEX APPL quote
# https://iextrading.com/developer/
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "source_rest_iex-stock-aapl-quote",
"config": {"key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","connector.class": "com.tm.kafka.connect.rest.RestSourceConnector","tasks.max": "1", "rest.source.method": "GET", "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter", "rest.source.properties": "Content-Type:application/json,Accept::application/json", "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
"rest.source.url": "https://api.iextrading.com/1.0/stock/aapl/quote",
"rest.source.poll.interval.ms": "60000",
"rest.source.destination.topics": "iex-stock-aapl-quote"
}}'
# IEX APPL company
# https://iextrading.com/developer/
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "source_rest_iex-stock-aapl-company",
"config": {"key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","connector.class": "com.tm.kafka.connect.rest.RestSourceConnector","tasks.max": "1", "rest.source.method": "GET", "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter", "rest.source.properties": "Content-Type:application/json,Accept::application/json", "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
"rest.source.url": "https://api.iextrading.com/1.0/stock/aapl/company",
"rest.source.poll.interval.ms": "600000",
"rest.source.destination.topics": "iex-stock-aapl-company"
}}'
Check the data:
$ ccloud consume --from-beginning --topic iex-stock-aapl-quote
{"symbol":"AAPL","companyName":"Apple Inc.","primaryExchange":"Nasdaq Global Select","sector":"Technology","calculationPrice":"close","open":207.93,"openTime":1533562200581,"close":209.07,"closeTime":1533585600168,"high":209.25,"low":207.07,"latestPrice":209.07,"latestSource":"Close","latestTime":"August 6, 2018","latestUpdate":1533585600168,"latestVolume":25390079,"iexRealtimePrice":null,"iexRealtimeSize":null,"iexLastUpdated":null,"delayedPrice":209.06,"delayedPriceTime":1533585600229,"extendedPrice":209.02,"extendedChange":-0.05,"extendedChangePercent":-0.00024,"extendedPriceTime":1533589186272,"previousClose":207.99,"change":1.08,"changePercent":0.00519,"iexMarketPercent":null,"iexVolume":null,"avgTotalVolume":23922439,"iexBidPrice":null,"iexBidSize":null,"iexAskPrice":null,"iexAskSize":null,"marketCap":1009792628820,"peRatio":20.18,"week52High":209.25,"week52Low":149.16,"ytdChange":0.22852624924298495}
Check the data:
$ ccloud consume --from-beginning --topic iex-stock-aapl-company
{"symbol":"AAPL","companyName":"Apple Inc.","exchange":"Nasdaq Global Select","industry":"Computer Hardware","website":"http://www.apple.com","description":"Apple Inc is designs, manufactures and markets mobile communication and media devices and personal computers, and sells a variety of related software, services, accessories, networking solutions and third-party digital content and applications.","CEO":"Timothy D. Cook","issueType":"cs","sector":"Technology","tags":["Technology","Consumer Electronics","Computer Hardware"]}