Skip to content

leochinaliu/flink-connector-kafka-extend

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

背景

官方table sql kafka-connector 在无数据流入时watermark不能自动上涨,在做基于eventtime的temporal join时由于维度表的数据一般是很少变动的,导致watermark不能上涨,数据无法输出。 本项目是基于官方的kafka-connector上作的扩展,解决了上述问题,通过新增2个选项可定义当kakfa数据源空闲指定时间后,水印自动上涨至某个时间点。

使用方法

本项目新增2种类型的kafka-connector:kafka-extendupsert-kafka-extend,分别是在官方kafkaupsert-kafka 2种connector上做的扩展,新增2个选项:

  • extend.idleTimeOut:空闲超时时间,单位秒,超过此时间水印将自动上涨,默认5分钟
  • extend.watermarkInterval:水印上涨到离当前时间的间隔,单位秒,默认10分钟。比如设置为10分钟,当前时间2021-05-01 12:30,当空闲时间已到,watermark会上涨至2021-05-01 12:20

使用示例:

CREATE TABLE movie_test (
    _id String COMMENT '影片UUID',--
    movie_id INT COMMENT '影片id',
    update_time TIMESTAMP COMMENT '更新时间',
    PRIMARY KEY (_id) NOT ENFORCED,
    WATERMARK FOR update_time AS update_time - INTERVAL '10' MINUTE
)
WITH (
  'connector' = 'upsert-kafka-extend',
  'extend.idleTimeOut' = '300',
  'extend.watermarkInterval' = '600',
  'topic' = 'test-topic',
  'properties.bootstrap.servers' = 'kafka-address',
  'properties.group.id' = 'test-group',
  'key.format' = 'json',
  'value.format' = 'json'
)

About

extend flink-connector-kafka

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages