Skip to content

lzshlzsh/hbase-replication-endpoint

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 

Repository files navigation

背景

目前,Flink 社区 [1] 和云厂商 [2] 对于 HBase 作为数据源时,只支持批模式和维表,暂无增量数据读取方案。HBase WAL 日志类似 MySQL binlog,记录了 HBase 表的变更数据,但没有提供直接读取 WAL 日志的接口,Flink 直接读取 WAL 日志暂时不可行。这里采用自定义 HBase ReplicationEndpoint 的方案,引入中间存储,间接实现 Flink 读取 HBase 增量数据。

HBase 可以基于 WAL 复制 Cluster Replication[3] 功能将实时写入的数据从主集群复制到备份集群,且 HBase 支持自定义 ReplicationEndpoint,因此我们可以通过自定义 ReplicationEndpoint 来把增量数据推送至其它存储(例如 Kafka),业界有相关的实践[4]。WAL 推送 Kafa 后,我们可以通过 flink-connector-kafka 来消费 kafka 中的 WAL 日志,从而实现读取 HBase 增量数据的目的。

自定义 HBase ClusterReplicationEndpoint

本示例的自定义 ClusterReplicationEndpoint 参考自[4],并做了简化,通过直接继承自 HBaseInterClusterReplicationEndpoint,并 mock 一些方法来实现。这里暂未实现写 kafka 的功能,只是在日志中输出 WAL 信息。

  1. 编译项目

编译后得到 hbase-replication-endpoint-1.0.0.jar 包,放入 $HBASE_HOME/lib 目录下。

mvn pacakge
  1. HBase 表创建表

hbase shell 中执行

create 'person', {NAME=>'info',REPLICATION_SCOPE => '1'}

注意:REPLICATION_SCOPE 属于设置为 1,表示开启复制

  1. 创建复制链路

hbase shell 中执行

add_peer '111', CLUSTER_KEY => 'localhost:2181:/fake_hbase', ENDPOINT_CLASSNAME => 'com.tencent.cloud.oceanus.hbase.replication.KafkaInterClusterReplicationEndpoint', SERIAL => true, CONFIG => {"bootstrap-servers" => "127.0.0.1:9092", "topic" => "test" }, TABLE_CFS => { "person" => []}

注意:

  • HBase 2 必须设置 CLUSTER_KEY,可设置为一个假的 zookeeper 路径
  • SERIAL 设置为 true,开启串行复制
  1. 往示例表中写入数据、删除数据,观察 RegionServer 日志输出

hbase shell 中执行

put 'person', '1', 'info:c1', 'v1'
delete 'person', '1', 'info:c1'

日志输出如下

img

参考资料

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages