高版本slipstream cdc场景解析json对接holodesk

  其他常见问题
内容纲要

【测试背景】

使用kafka,将实时数据流入ArgoSlipstream holodesk表中。CDC场景用于实时捕获数据库的变更日志数据并同步到Argodb表中,以确保数据的及时性和一致性。此类场景适用于对数据同步延迟要求较高的应用,通过捕获源数据库的插入、更新、删除操作并将其传递到目标系统,保证数据与源库一致。使用ogg、canal、Debezium等日志数据作为数据源,将变更事件推送到Argodb,实现数据的无缝同步。

【适用版本】

ArgoDB 3.2.1 :ArgoSlipstream 3.2.1 + kafka 7.0.1 + shiva 1.11.4

ArgoDB 3.2.2:搭配kafka 7.0.1

ArgoDB 5.1.0:搭配kafka 7.0.1

ArgoDB 5.2.0 : 搭配 eventstore 910

ArgoDB 6.0 : 搭配 eventstore 9.3

【解析使用的jar】

首先下载对应jar包

TDH 6.2
formatter-8.0.2.jar

ArgoDB 5.2
formatter-8.31.2.jar

TDH 9.3
formatter-8.37.3.jar
ArgoDB 6
formatter-8.37.3.jar

可以add jar,但最好将jar持久化到slipstream镜像中,否则可能start streamjob的时候出现class NotFound的报错。

【测试步骤】

1、kafka 创建topic

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/eventstore1/conf/jaas.conf -Djava.security.krb5.conf=/etc/eventstore1/conf/krb5.conf"

进入客户端路径
cd /TDH-Client/kafka/bin

创建 topic
./kafka-topics.sh --bootstrap-server 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --create --topic demo1 --partitions 1 --replication-factor 1 --command-config /root/TDH-Client/kafka/config/security-client.properties

 查看topic 创建成功:
 ./kafka-topics.sh --list --bootstrap-server 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --command-config /root/TDH-Client/kafka/config/security-client.properties

相关命令参考:TDH各版本Kafka客户端常用脚本命令使用https://kb.transwarp.cn/posts/11406

2、 argo-slipstream创建stream/streamjob

连接slipstream数据库:
beeline -u "jdbc:hive2://172.22.37.181:10010/default" -n hive -p 123456

创建stream:
CREATE stream lj.stream1 (
colum1 string DEFAULT NULL COMMENT '', colum2 bigint DEFAULT NULL COMMENT '',  op_type string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
tblproperties ('kafka.zookeeper'='172.22.37.181:2181,172.22.37.182:2181,172.22.37.183:2181','kafka.broker.list'='172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092',
'kafka.decoder'='io.transwarp.slipstream.api.decoder.dbz.DBZBinlogDecoder',  -- cdc场景下使用的解析函数之一
'kafka.decoder.columns.type'='string/bigint/string',
'kafka.decoder.columns'='colum1/colum2/op_type',
'transwarp.consumer.security.protocol'='SASL_PLAINTEXT',
'transwarp.consumer.sasl.kerberos.service.name'='kafka',
'transwarp.consumer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true 
storeKey=true keyTab=\"/etc/slipstream1/conf/test.keytab\"
 principal=\"test@argodb6x\"',
 'topic'='demo1');

 创建成功后可以 show streams;查看。

 创建流任务streamjob:
drop streamjob test_streamjob1;
create streamjob test_streamjob1 as("insert /*+USE_DELETE_MARK*/ into out_t1 select colum1,colum2,case when op_type='D' then true else false end as delete_mark from lj.stream1 ") jobproperties ("morphling.hdfs.flush.interval.ms"="1000","morphling.job.enable.checkpoint"="true");

创建目标holodesk表:
CREATE table lj.out_t1 (    colum1 string DEFAULT NULL COMMENT '',    colum2 bigint DEFAULT NULL COMMENT '') CLUSTERED BY (colum2) INTO 9 BUCKETS STORED AS HOLODESK TBLPROPERTIES ('holodesk.rowkey'='colum2') ;

启动streamjob:
start streamjob test_streamjob1;

可以通过list streamjob查看正在运行的streamjob

3、 Kafka生产-消费数据

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/eventstore1/conf/jaas.conf -Djava.security.krb5.conf=/etc/eventstore1/conf/krb5.conf"

生产数据:
./kafka-console-producer.sh --broker-list 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --topic demo1 --producer.config  /root/TDH-Client/kafka/config/security-client.properties

另外开个shell窗口
消费数据:
./kafka-console-consumer.sh --bootstrap-server 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --topic demo1  --consumer.config  /root/TDH-Client/kafka/config/security-client.properties

在生产数据shell中手动输入一些数据:
示例:

{"before":null,"after":{"colum1":"3","colum2":5656},"source":{"version":"1.9.0-SNAPSHOT","connector":"mysql","name":"9982bb3099ec459fbf53775194cb98cf","ts_ms":1730269165000,"snapshot":"false","db":"test_rt","sequence":null,"positionInfo":"{\n \"filename\" : \"mysql-bin.000007\",\n \"position\" : 217432819\n}","schemaVersion":"1","tableIdentifier":"7A7FE622BA3B2082CC612771B915EA89","table":"realtime_table","server_id":1,"gtid":null,"file":"mysql-bin.000007","pos":217432957,"nextPos":217433004,"row":0,"thread":23539,"query":null},"op":"c","syncLabel":"2024103014159982bb3099ec459fbf53775194cb98cf","ts_ms":1730269165411,"transaction":null}

file
消费端shell可以实时接收到数据:
file

4、查看holo表,可查看到流数据
file

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。