官方table sql kafka-connector 在无数据流入时watermark不能自动上涨,在做基于eventtime的temporal join
时由于维度表的数据一般是很少变动的,导致watermark
不能上涨,数据无法输出。
本项目是基于官方的kafka-connector上作的扩展,解决了上述问题,通过新增2个选项可定义当kakfa数据源空闲指定时间后,水印自动上涨至某个时间点。
本项目新增2种类型的kafka-connector:kafka-extend
和upsert-kafka-extend
,分别是在官方kafka
和upsert-kafka
2种connector上做的扩展,新增2个选项:
- extend.idleTimeOut:空闲超时时间,单位秒,超过此时间水印将自动上涨,默认5分钟
- extend.watermarkInterval:水印上涨到离当前时间的间隔,单位秒,默认10分钟。比如设置为10分钟,当前时间2021-05-01 12:30,当空闲时间已到,watermark会上涨至2021-05-01 12:20
使用示例:
CREATE TABLE movie_test (
_id String COMMENT '影片UUID',--√
movie_id INT COMMENT '影片id',
update_time TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (_id) NOT ENFORCED,
WATERMARK FOR update_time AS update_time - INTERVAL '10' MINUTE
)
WITH (
'connector' = 'upsert-kafka-extend',
'extend.idleTimeOut' = '300',
'extend.watermarkInterval' = '600',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'kafka-address',
'properties.group.id' = 'test-group',
'key.format' = 'json',
'value.format' = 'json'
)