【测试背景】
使用kafka,将实时数据流入ArgoSlipstream holodesk表中。CDC场景用于实时捕获数据库的变更日志数据并同步到Argodb表中,以确保数据的及时性和一致性。此类场景适用于对数据同步延迟要求较高的应用,通过捕获源数据库的插入、更新、删除操作并将其传递到目标系统,保证数据与源库一致。使用ogg、canal、Debezium等日志数据作为数据源,将变更事件推送到Argodb,实现数据的无缝同步。
【适用版本】
ArgoDB 3.2.1 :ArgoSlipstream 3.2.1 + kafka 7.0.1 + shiva 1.11.4
ArgoDB 3.2.2:搭配kafka 7.0.1
ArgoDB 5.1.0:搭配kafka 7.0.1
ArgoDB 5.2.0 : 搭配 eventstore 910
ArgoDB 6.0 : 搭配 eventstore 9.3
【解析使用的jar】
首先下载对应jar包
TDH 6.2
formatter-8.0.2.jar
ArgoDB 5.2
formatter-8.31.2.jar
TDH 9.3
formatter-8.37.3.jar
ArgoDB 6
formatter-8.37.3.jar
可以add jar,但最好将jar持久化到slipstream镜像中,否则可能start streamjob的时候出现class NotFound的报错。
【测试步骤】
1、kafka 创建topic
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/eventstore1/conf/jaas.conf -Djava.security.krb5.conf=/etc/eventstore1/conf/krb5.conf"
进入客户端路径
cd /TDH-Client/kafka/bin
创建 topic
./kafka-topics.sh --bootstrap-server 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --create --topic demo1 --partitions 1 --replication-factor 1 --command-config /root/TDH-Client/kafka/config/security-client.properties
查看topic 创建成功:
./kafka-topics.sh --list --bootstrap-server 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --command-config /root/TDH-Client/kafka/config/security-client.properties
相关命令参考:TDH各版本Kafka客户端常用脚本命令使用https://kb.transwarp.cn/posts/11406
2、 argo-slipstream创建stream/streamjob
连接slipstream数据库:
beeline -u "jdbc:hive2://172.22.37.181:10010/default" -n hive -p 123456
创建stream:
CREATE stream lj.stream1 (
colum1 string DEFAULT NULL COMMENT '', colum2 bigint DEFAULT NULL COMMENT '', op_type string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
tblproperties ('kafka.zookeeper'='172.22.37.181:2181,172.22.37.182:2181,172.22.37.183:2181','kafka.broker.list'='172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092',
'kafka.decoder'='io.transwarp.slipstream.api.decoder.dbz.DBZBinlogDecoder', -- cdc场景下使用的解析函数之一
'kafka.decoder.columns.type'='string/bigint/string',
'kafka.decoder.columns'='colum1/colum2/op_type',
'transwarp.consumer.security.protocol'='SASL_PLAINTEXT',
'transwarp.consumer.sasl.kerberos.service.name'='kafka',
'transwarp.consumer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true
storeKey=true keyTab=\"/etc/slipstream1/conf/test.keytab\"
principal=\"test@argodb6x\"',
'topic'='demo1');
创建成功后可以 show streams;查看。
创建流任务streamjob:
drop streamjob test_streamjob1;
create streamjob test_streamjob1 as("insert /*+USE_DELETE_MARK*/ into out_t1 select colum1,colum2,case when op_type='D' then true else false end as delete_mark from lj.stream1 ") jobproperties ("morphling.hdfs.flush.interval.ms"="1000","morphling.job.enable.checkpoint"="true");
创建目标holodesk表:
CREATE table lj.out_t1 ( colum1 string DEFAULT NULL COMMENT '', colum2 bigint DEFAULT NULL COMMENT '') CLUSTERED BY (colum2) INTO 9 BUCKETS STORED AS HOLODESK TBLPROPERTIES ('holodesk.rowkey'='colum2') ;
启动streamjob:
start streamjob test_streamjob1;
可以通过list streamjob查看正在运行的streamjob
3、 Kafka生产-消费数据
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/eventstore1/conf/jaas.conf -Djava.security.krb5.conf=/etc/eventstore1/conf/krb5.conf"
生产数据:
./kafka-console-producer.sh --broker-list 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --topic demo1 --producer.config /root/TDH-Client/kafka/config/security-client.properties
另外开个shell窗口
消费数据:
./kafka-console-consumer.sh --bootstrap-server 172.22.37.181:9092,172.22.37.182:9092,172.22.37.183:9092 --topic demo1 --consumer.config /root/TDH-Client/kafka/config/security-client.properties
在生产数据shell中手动输入一些数据:
示例:
{"before":null,"after":{"colum1":"3","colum2":5656},"source":{"version":"1.9.0-SNAPSHOT","connector":"mysql","name":"9982bb3099ec459fbf53775194cb98cf","ts_ms":1730269165000,"snapshot":"false","db":"test_rt","sequence":null,"positionInfo":"{\n \"filename\" : \"mysql-bin.000007\",\n \"position\" : 217432819\n}","schemaVersion":"1","tableIdentifier":"7A7FE622BA3B2082CC612771B915EA89","table":"realtime_table","server_id":1,"gtid":null,"file":"mysql-bin.000007","pos":217432957,"nextPos":217433004,"row":0,"thread":23539,"query":null},"op":"c","syncLabel":"2024103014159982bb3099ec459fbf53775194cb98cf","ts_ms":1730269165411,"transaction":null}

消费端shell可以实时接收到数据:

4、查看holo表,可查看到流数据
