Storage Handler that allows users to connect/analyze/transform Kafka topics. The workflow is as follows:
- First, the user will create an external table that is a view over one Kafka topic
- Second, the user will be able to run any SQL query including write back to the same table or different Kafka backed table
Kafka Java client version: 2.x
This handler does not commit offsets of topic partition reads either using the intrinsic Kafka capability or in an external storage. This means a query over a Kafka topic backed table will be a full topic read unless partitions are filtered manually, via SQL, by the methods described below. In the ETL section, a method for storing topic offsets in Hive tables is provided for tracking consumer position but this is not a part of the handler itself.
Use the following statement to create a table:
CREATE EXTERNAL TABLE
kafka_table (
`timestamp` TIMESTAMP,
`page` STRING,
`newPage` BOOLEAN,
`added` INT,
`deleted` BIGINT,
`delta` DOUBLE)
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
"kafka.topic" = "test-topic",
"kafka.bootstrap.servers" = "localhost:9092");
The table property kafka.topic
is the Kafka topic to connect to and kafka.bootstrap.servers
is the Kafka broker connection string.
Both properties are mandatory.
On the write path if such a topic does not exist the topic will be created if Kafka broker admin policy allows for
auto topic creation.
By default the serializer and deserializer is JSON, specifically org.apache.hadoop.hive.serde2.JsonSerDe
.
If you want to change the serializer/deserializer classes you can update the TBLPROPERTIES with SQL syntax ALTER TABLE
.
ALTER TABLE
kafka_table
SET TBLPROPERTIES (
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
If you use Confluent's Avro serialzier or deserializer with the Confluent Schema Registry, you will need to remove five bytes from the beginning of each message. These five bytes represent a magic byte and a four-byte schema ID from the registry.
This can be done by setting "avro.serde.type"="skip"
and "avro.serde.skip.bytes"="5"
. In this case it is also recommended to set the Avro schema either via "avro.schema.url"="http://hostname/SimpleDocument.avsc"
or "avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}
. If both properties are set then avro.schema.literal
has higher priority.
List of supported serializers and deserializers:
Supported Serializers and Deserializers |
---|
org.apache.hadoop.hive.serde2.JsonSerDe |
org.apache.hadoop.hive.serde2.OpenCSVSerde |
org.apache.hadoop.hive.serde2.avro.AvroSerDe |
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe |
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
In addition to the user defined column schema, this handler will append additional columns allowing the user to query the Kafka metadata fields:
__key
Kafka record key (byte array)__partition
Kafka record partition identifier (int 32)__offset
Kafka record offset (int 64)__timestamp
Kafka record timestamp (int 64)
List the table properties and all the partition/offsets information for the topic.
DESCRIBE EXTENDED
kafka_table;
Count the number of records where the record timestamp is within the last 10 minutes of query execution time.
SELECT
COUNT(*)
FROM
kafka_table
WHERE
`__timestamp` > 1000 * TO_UNIX_TIMESTAMP(CURRENT_TIMESTAMP - INTERVAL '10' MINUTES);
The storage handler allows these metadata fields to filter push-down read optimizations to Kafka. For instance, the query above will only read the records with timestamp satisfying the filter predicate. Please note that such time based filtering (Kafka consumer partition seek) is only viable if the Kafka broker version allows time based look up (Kafka 0.11 or later versions)
In addition to time based filtering, the storage handler reader is able to filter based on a
particular partition offset using the SQL WHERE clause.
Currently supports operators OR
and AND
with comparison operators <
, <=
, >=
, >
.
SELECT
COUNT(*)
FROM
kafka_table
WHERE
(`__offset` < 10 AND `__offset` >3 AND `__partition` = 0)
OR
(`__partition` = 0 AND `__offset` < 105 AND `__offset` > 99)
OR (`__offset` = 109);
Keep in mind that partitions can grow and shrink within the Kafka cluster without the consumer's knowledge. This partition and offset capability is good for replay of specific partitions when the consumer knows that something has gone wrong down stream or replay is required. Apache Hive users may or may not understand the underlying architecture of Kafka therefore, filtering on the record timestamp metadata column is arguably the best filter to use since it requires no partition knowledge.
The user can define a view to take of the last 15 minutes and mask what ever column as follows:
CREATE VIEW
last_15_minutes_of_kafka_table
AS
SELECT
`timestamp`,
`user`,
`delta`,
`added`
FROM
kafka_table
WHERE
`__timestamp` > 1000 * TO_UNIX_TIMESTAMP(CURRENT_TIMESTAMP - INTERVAL '15' MINUTES);
Join the Kafka topic to Hive table. For instance, assume you want to join the last 15 minutes of the topic to a dimension table with the following example:
CREATE TABLE
user_table (
`user` STRING,
`first_name` STRING,
`age` INT,
`gender` STRING,
`comments` STRING )
STORED AS ORC;
Join the view of the last 15 minutes to user_table
, group by the gender
column and compute aggregates
over metrics from the fact and dimension tables.
SELECT
SUM(`added`) AS `added`,
SUM(`deleted`) AS `deleted`,
AVG(`delta`) AS `delta`,
AVG(`age`) AS `avg_age`,
`gender`
FROM
last_15_minutes_of_kafka_table
JOIN
user_table ON
last_15_minutes_of_kafka_table.`user` = user_table.`user`
GROUP BY
`gender`
LIMIT 10;
In cases where you want to perform some ad-hoc analysis over the last 15 minutes of topic data, you can join it on itself. In the following example, we show how you can perform classical user retention analysis over the Kafka topic.
-- Topic join over the view itself
-- The example is adapted from https://www.periscopedata.com/blog/how-to-calculate-cohort-retention-in-sql
-- Assuming l15min_wiki is a view of the last 15 minutes based on the topic's timestamp record metadata
SELECT
COUNT(DISTINCT `activity`.`user`) AS `active_users`,
COUNT(DISTINCT `future_activity`.`user`) AS `retained_users`
FROM
l15min_wiki AS activity
LEFT JOIN
l15min_wiki AS future_activity
ON
activity.`user` = future_activity.`user`
AND
activity.`timestamp` = future_activity.`timestamp` - INTERVAL '5' MINUTES;
-- Topic to topic join
-- Assuming wiki_kafka_hive is the entire topic
SELECT
FLOOR_HOUR(activity.`timestamp`),
COUNT(DISTINCT activity.`user`) AS `active_users`,
COUNT(DISTINCT future_activity.`user`) AS retained_users
FROM
wiki_kafka_hive AS activity
LEFT JOIN
wiki_kafka_hive AS future_activity
ON
activity.`user` = future_activity.`user`
AND
activity.`timestamp` = future_activity.`timestamp` - INTERVAL '1' HOUR
GROUP BY
FLOOR_HOUR(activity.`timestamp`);
Property | Description | Mandatory | Default |
---|---|---|---|
kafka.topic | Kafka topic name to map the table to. | Yes | null |
kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null |
kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe |
hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) |
hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 |
hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) |
kafka.write.semantic | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE |
hive.kafka.ssl.credential.keystore | Location of credential store that holds SSL credentials. Used to avoid plaintext passwords in table properties | No | |
hive.kafka.ssl.truststore.password | The key in the credential store used to retrieve the truststore password. This is NOT the password itself. | No | |
hive.kafka.ssl.keystore.password | The key in the credential store used to retrieve the keystore password. This is NOT the password itself. Used for 2-way auth. | No | |
hive.kafka.ssl.key.password | The key in the credential store used to retrieve the key password. This is NOT the password itself. | No | |
hive.kafka.ssl.truststore.location | The location of the SSL truststore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down. | No | |
hive.kafka.ssl.keystore.location | The location of the SSL keystore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down. | No |
The user can create SSL connections to Kafka, via the properties described in the table properties. These properties are used to retrieve passwords from a credential store to avoid being in plaintext table properties. To ensure security, the credential store should have appropriate permissions applied. Clients that query the table without being able to read the credentials store will have the query fail.
Normally, the <consumer/producer>.ssl.truststore.location
and <consumer/producer>.ssl.keystore.location
would have to be local. Any job that requires a
job can retrieve these from an HDFS location, and they will be sourced from HDFS and pulled locally for this purpose.
The producer and consumer stores are both sourced from the same property, e.g. hive.kafka.ssl.truststore.location
, rather than kafka.consumer.ssl.truststore.location
.
Table creation is very simple, simply create a table as normal, and supply the appropriate Kafka
configs (e.g. kafka.consumer.security.protocol
), along with the credential store configs (e.g. hive.kafka.ssl.credential.store
).
CREATE EXTERNAL TABLE
kafka_ssl (
`data` STRING
)
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
"kafka.topic" = "test-topic",
"kafka.bootstrap.servers" = 'localhost:9093',
'hive.kafka.ssl.credential.keystore'='jceks://hdfs/tmp/test.jceks',
'hive.kafka.ssl.keystore.password'='keystore.password',
'hive.kafka.ssl.truststore.password'='truststore.password',
'kafka.consumer.security.protocol'='SSL',
'hive.kafka.ssl.keystore.location'='hdfs://cluster/tmp/keystore.jks',
'hive.kafka.ssl.truststore.location'='hdfs://cluster/tmp/keystore.jks'
);
Now we can query the table as normal.
SELECT * FROM kafka_ssl LIMIT 10;
Our truststore and keystore are located in HDFS, which means we can also run more complex queries that result in jobs. These will still connect from Kafka as expected.
SELECT `data` FROM kafka_ssl where `__offset` > 0 AND `__offset` < 10000 group by `data`;
The user can inject custom Kafka consumer/producer properties via the table properties.
To do so, the user can add any key/value pair of Kafka config to the Hive table property
by prefixing the key with kafka.consumer
for consumer configs and kafka.producer
for producer configs.
For instance the following alter table query adds the table property "kafka.consumer.max.poll.records" = "5000"
and will inject max.poll.records=5000
to the Kafka Consumer.
ALTER TABLE
kafka_table
SET TBLPROPERTIES
("kafka.consumer.max.poll.records" = "5000");
In this example we will load topic data only once. The goal is to read data and commit both data and offsets in a single Transaction
First, create the offset table.
DROP TABLE
kafka_table_offsets;
CREATE TABLE
kafka_table_offsets (
`partition_id` INT,
`max_offset` BIGINT,
`insert_time` TIMESTAMP);
Initialize the table
INSERT OVERWRITE TABLE
kafka_table_offsets
SELECT
`__partition`,
MIN(`__offset`) - 1,
CURRENT_TIMESTAMP
FROM
wiki_kafka_hive
GROUP BY
`__partition`,
CURRENT_TIMESTAMP;
Create the final Hive table for warehouse use,
DROP TABLE
orc_kafka_table;
CREATE TABLE
orc_kafka_table (
`partition_id` INT,
`koffset` BIGINT,
`ktimestamp` BIGINT,
`timestamp` TIMESTAMP,
`page` STRING,
`user` STRING,
`diffurl` STRING,
`isrobot` BOOLEAN,
`added` INT,
`deleted` INT,
`delta` BIGINT)
STORED AS ORC;
This is an example that inserts up to offset = 2 only.
FROM
wiki_kafka_hive ktable
JOIN
kafka_table_offsets offset_table
ON (
ktable.`__partition` = offset_table.`partition_id`
AND
ktable.`__offset` > offset_table.`max_offset`
AND
ktable.`__offset` < 3 )
INSERT INTO TABLE
orc_kafka_table
SELECT
`__partition`,
`__offset`,
`__timestamp`,
`timestamp`,
`page`,
`user`,
`diffurl`,
`isrobot`,
`added`,
`deleted`,
`delta`
INSERT OVERWRITE TABLE
kafka_table_offsets
SELECT
`__partition`,
max(`__offset`),
CURRENT_TIMESTAMP
GROUP BY
`__partition`,
CURRENT_TIMESTAMP;
Double check the insert.
SELECT
max(`koffset`)
FROM
orc_kafka_table
LIMIT 10;
SELECT
COUNT(*) AS `c`
FROM
orc_kafka_table
GROUP BY
`partition_id`,
`koffset`
HAVING
`c` > 1;
Conduct this as data becomes available on the topic.
FROM
wiki_kafka_hive ktable
JOIN
kafka_table_offsets offset_table
ON (
ktable.`__partition` = offset_table.`partition_id`
AND
ktable.`__offset` > `offset_table.max_offset`)
INSERT INTO TABLE
orc_kafka_table
SELECT
`__partition`,
`__offset`,
`__timestamp`,
`timestamp`,
`page`,
`user`,
`diffurl`,
`isrobot`,
`added`,
`deleted`,
`delta`
INSERT OVERWRITE TABLE
kafka_table_offsets
SELECT
`__partition`,
max(`__offset`),
CURRENT_TIMESTAMP
GROUP BY
`__partition`,
CURRENT_TIMESTAMP;
First create the table in Hive that will be the target table. Now all the inserts will go to the topic mapped by this table. Be aware that the Avro SerDe used below is regular Apache Avro (with schema) and not Confluent serialized Avro which is popular with Kafka usage
CREATE EXTERNAL TABLE
moving_avg_wiki_kafka_hive (
`channel` STRING,
`namespace` STRING,
`page` STRING,
`timestamp` TIMESTAMP,
`avg_delta` DOUBLE)
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
"kafka.topic" = "moving_avg_wiki_kafka_hive_2",
"kafka.bootstrap.servers"="localhost:9092",
-- STORE AS AVRO IN KAFKA
"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
Then, insert data into the table. Keep in mind that Kafka is append only thus you can not use insert overwrite.
INSERT INTO TABLE
moving_avg_wiki_kafka_hive
SELECT
`channel`,
`namespace`,
`page`,
`timestamp`,
avg(`delta`) OVER (ORDER BY `timestamp` ASC ROWS BETWEEN 60 PRECEDING AND CURRENT ROW) AS `avg_delta`,
null AS `__key`,
null AS `__partition`,
-1,
-1
FROM
l15min_wiki;