概要描述
本文档主要介绍如何通过datax工具读写argodb表。
版本要求:
ArgoDB版本>=6, Patch版本>=20250930
详细介绍
一. 配置与准备
1. DataX 程序包
官方 github 下载并解压,按要求准备 DataX 的运行环境依赖
2. ArgoDB DataX 插件包
注:${version} 根据实际使用的包进行调整
在 ArgoDB patch 的配套工具里获取以下包:
argodb_client/argodb-datax/argodbwriter-${version}.jar
argodb_client/argodb-datax/argodbreader-${version}.jar
argodb_client/argodb-sink-client/quark-driver-${version}.jar
argodb_client/argodb-sink-client/argodb-sink-client-${version}-all-in-one.jar
在 datax 路径下创建目录并放置需要的包
cd datax
# 如需【写】ArgoDB 的功能,执行以下操作
mkdir -p plugin/writer/argodbwriter/libs
cp argodbwriter-${version}.jar plugin/writer/argodbwriter/
cp argodb-sink-client-${version}-all-in-one.jar plugin/writer/argodbwriter/libs/
cp quark-driver-${version}.jar plugin/writer/argodbwriter/libs/
# 如需【读】ArgoDB 的功能,执行以下操作
mkdir -p plugin/reader/argodbreader/libs
cp argodbreader-${version}.jar plugin/reader/argodbreader/
# 将对应版本 quark 镜像中 /usr/lib/inceptor/lib 的所有 jar 包准备好(后续会优化需要包的列表)
cp /usr/lib/inceptor/lib下的所有jar包 plugin/reader/argodbreader/libs/
3. ArgoDB DataX 插件配置
将 writer_plugin.json 文件放置在 datax/plugin/writer/argodbwriter/ 目录下并重命名为 plugin.json
将 reader_plugin.json 文件放置在 datax/plugin/reader/argodbreader/ 目录下并重命名为 plugin.json
4. 网络配置
DataX 进程需要能访问以下端口,需要开放的端口清单:
访问 Quark
- 配置项 hive.server2.thrift.port 指定的端口
访问 TDDMS
- 以配置项 master.rpc_service.master_service_port 开始的连续4个端口
- 以配置项 tabletserver.rpc_service.manage_service_port 开始的连续4个端口
二. DataX Job 配置
1. 写任务
部分参数说明:
# preSqlList - 执行写入前需要执行的 SQL 列表(如创建分区、清理等操作)
# postSqlList - 写入完成后需要执行的 SQL 列表
# compressionType - 写入数据的压缩方式;可选值有: SNAPPY, NOTCOMPRESSION, LZF, ZLIB,默认为 SNAPPY
# largeStringEnabled - 是否开启大 string 对象插入功能,默认 false;等价于执行 SQL 时的参数 argodb.insert.large.string.enabled
# maxLargeStringSize - 支持的最大的 string 大小,单位为 bytes,默认 3145728,只有当 largeStringEnabled 为 true 时才有效;等价于执行 SQL 时的参数 argodb.max.large.string.size
# columnList - 写入 Holodesk 表时,指定插入哪些列
{
"name": "argodbwriter",
"parameter": {
"quarkTableName": "",
"connection": {
"quarkUrl": "",
"quarkUserName": "",
"quarkPasswd": ""
},
"sink": {
"largeStringEnabled": false,
"maxLargeStringSize": 3145728,
"compressionType": "SNAPPY",
"columnList": []
},
"sql": {
"preSqlList": [],
"postSqlList": []
}
}
}
2. 读任务
部分参数说明:
# neededColumns - 需要读哪些列,使用半角逗号将多个列名串连;如果需要读所有列,设置为 *;
# 数据输出时,会按照表原始的列顺序输出,不论 neededColumns 参数如何定义
{
"name": "argodbreader",
"parameter": {
"quarkTableName": "",
"neededColumns": "",
"connection": {
"quarkUrl": "",
"quarkUserName": "",
"quarkPasswd": ""
}
}
}
三. 场景举例
场景1. 读argodb写argodb
以下任务从 default.out_tbl 表中读取数据,写入到 default.in_tbl 表中。(两张表均为 Holodesk 表)
创建测试表
DROP TABLE IF EXISTS default.out_tbl;
DROP TABLE IF EXISTS default.in_tbl;
CREATE TABLE default.out_tbl (a INT,b STRING ) STORED AS HOLODESK ;
CREATE TABLE default.in_tbl (a INT,b STRING ) STORED AS HOLODESK ;
INSERT INTO default.out_tbl SELECT 1,'a' FROM system.dual;
INSERT INTO default.out_tbl SELECT 2,'b' FROM system.dual;
job.json 的内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "argodbreader",
"parameter": {
"quarkTableName": "default.out_tbl",
"neededColumns": "*",
"connection": {
"quarkUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH?use.linac=false",
"quarkUserName": "",
"quarkPasswd": ""
}
}
},
"writer": {
"name": "argodbwriter",
"parameter": {
"quarkTableName": "default.in_tbl",
"connection": {
"quarkUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH?use.linac=false",
"quarkUserName": "",
"quarkPasswd": ""
},
"sink": {
"largeStringEnabled": false,
"maxLargeStringSize": 3145728,
"compressionType": "SNAPPY",
"columnList": []
},
"sql": {
"preSqlList": [],
"postSqlList": []
}
}
}
}
]
}
}
Job 启动命令如下:
python datax/bin/datax.py job.json
执行完成之后,检查default.in_tbl表导入成功
场景2. 读oracle写argodb
以下任务从oracle 11g 的emp表中读取数据,写入到 default.emp_holo 表中。
oracle端 创建测试表
DROP TABLE EMP;
CREATE TABLE EMP(
EMPNO int,
ENAME varchar2(255),
JOB varchar2(255),
MGR INT,
HIREDATE DATE,
SAL INT,
COMM INT,
DEPTNO INT
);
INSERT INTO EMP VALUES (7369,'SMITH','CLERK',7902,to_date('17-12-1980','dd-mm-yyyy'),800,NULL,20);
INSERT INTO EMP VALUES (7499,'ALLEN','SALESMAN',7698,to_date('20-2-1981','dd-mm-yyyy'),1600,300,30);
INSERT INTO EMP VALUES (7521,'WARD','SALESMAN',7698,to_date('22-2-1981','dd-mm-yyyy'),1250,500,30);
INSERT INTO EMP VALUES (7566,'JONES','MANAGER',7839,to_date('2-4-1981','dd-mm-yyyy'),2975,NULL,20);
INSERT INTO EMP VALUES (7654,'MARTIN','SALESMAN',7698,to_date('28-9-1981','dd-mm-yyyy'),1250,1400,30);
oracleToArgodb.json 的内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "lkw",
"password": "123456",
"connection": [{
"querySql": ["select * from emp"],
"jdbcUrl": ["jdbc:oracle:thin:@172.18.131.175:1521/helowin"]
}
],
}
},
"writer": {
"name": "argodbwriter",
"parameter": {
"quarkTableName": "default.emp_holo",
"connection": {
"quarkUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH?use.linac=false",
"quarkUserName": "",
"quarkPasswd": ""
},
"sink": {
"largeStringEnabled": false,
"maxLargeStringSize": 3145728,
"compressionType": "SNAPPY",
"columnList": []
},
"sql": {
"preSqlList": [],
"postSqlList": []
}
}
}
}
]
}
}
Job 启动命令如下:
python ./datax.py oracleToArgodb.json
执行完成之后,检查emp_holo表导入成功
注意事项(必读)
1.写入 ArgoDB 时,不保证原子性,需要用户自行处理错误情况(如:部分 Task 失败,那么成功的 Task 会把数据写入 ArgoDB,需要用户能自己进行错误恢复);
2.对于 blob/clob 类型,仅支持 performance 表,建表时需要设置 set argodb.performance.complex.types.enabled=true;对于 blob 类型的写入,目前仅支持能转成 string 的二进制值(后续会完善);
3.目前不支持读分区表;
4.在使用 writer 的时,需要用户保证从 reader 读到的输出,列数量和顺序与写入的目标表一致。在未指定 columnList 参数时,reader 读出的列的数量和顺序必须和 Holodesk 表的表定义一致;指定 columnList 参数时,reader 读出的列的数量和顺序必须和 columnList 指定的一致。
5.udf不支持linac,所以遇到 Check Dependency Version failed: EXECUTION FAILED: Task MAPRED-SPARK error PlanException: [Error 1] Found no matched plan 的报错,有可能是全局开启了linac,执行 SELECT showArgodbVersion() 报错导致的,可以在jdbc连接串后面加上?use.linac=false session级关闭linac。