概述
本文介绍如何通过 DataX 的 rdbmsreader / rdbmswriter 通用插件,以 JDBC 方式读写 ArgoDB 表数据。该方案适用于 ArgoDB 5.x 版本,利用 ArgoDB 兼容 Hive Server2 的 JDBC 接口完成数据同步。
版本选择指引
- **ArgoDB 5.x**:使用本文所述 JDBC 方式(rdbmsreader / rdbmswriter)
- **ArgoDB 6.x 及以上**:推荐使用专用的 argodbwriter / argodbreader 插件,性能更优、配置更简捷,详见 → [DataX 读写 ArgoDB(自定义插件方式)](https://kb.transwarp.cn/posts/18088)
环境准备
1. 部署 DataX
从 DataX 官方 GitHub 下载最新稳定版并解压,确保运行环境满足 DataX 依赖要求(JDK 1.8+、Python 2.7+)。
2. 部署 ArgoDB JDBC 驱动
从 Manager 页面获取与当前 Quark 版本匹配的 JDBC 驱动包(quark-driver-x.x.x.jar),将其放置到 DataX 对应插件目录下:
cd ${DATAX_HOME}
# 写 ArgoDB → 放入 rdbmswriter 插件目录
cp quark-driver-8.37.3.jar plugin/writer/rdbmswriter/libs/
# 读 ArgoDB → 放入 rdbmsreader 插件目录
cp quark-driver-8.37.3.jar plugin/reader/rdbmsreader/libs/
驱动包版本需与集群中 Quark 版本保持一致,否则可能出现兼容性问题。请从 Manager 页面下载对应版本,勿混用。
3. 注册驱动类
分别编辑以下两个 plugin.json 文件,在 drivers 数组中追加 io.transwarp.jdbc.InceptorDriver:
${DATAX_HOME}/plugin/writer/rdbmswriter/plugin.json
${DATAX_HOME}/plugin/reader/rdbmsreader/plugin.json
以 rdbmswriter 为例,修改后的 plugin.json:
{
"name": "rdbmswriter",
"class": "com.alibaba.datax.plugin.reader.rdbmswriter.RdbmsWriter",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet.",
"developer": "alibaba",
"drivers": [
"dm.jdbc.driver.DmDriver",
"com.sybase.jdbc3.jdbc.SybDriver",
"com.edb.Driver",
"com.ibm.db2.jcc.DB2Driver",
"io.transwarp.jdbc.InceptorDriver"
]
}
验证驱动注册
执行以下命令确认驱动类已包含在 jar 包中:
grep -nir "io.transwarp.jdbc.InceptorDriver" plugin/writer/rdbmswriter/libs/quark-driver-8.37.3.jar
4. 网络连通性
确保 DataX 进程所在节点可以访问:
| 目标 | 说明 |
|---|---|
| Quark Server 节点 | DataX 需通过 JDBC 连接 Quark Server |
hive.server2.thrift.port 端口 |
默认 10000,以实际集群配置为准 |
可通过以下命令快速验证连通性:
telnet 10000
参数参考
写入参数(rdbmswriter)
rdbmswriter 通过 JDBC 连接目标数据库,执行 INSERT INTO 语句批量写入数据。
| 参数 | 说明 | 必选 | 默认值 |
|---|---|---|---|
jdbcUrl |
目标数据库的 JDBC 连接地址,遵循 ArgoDB JDBC 规范,格式为 jdbc:hive2://<host>:<port>/<db>;guardianToken=<token> |
✅ | — |
username |
数据库用户名 | ✅ | — |
password |
数据库密码 | ✅ | — |
table |
目标表名;若 schema 与 username 不一致,需使用 schema.table 格式 |
✅ | — |
column |
需写入的列名集合,以英文逗号分隔 | ✅ | — |
preSql |
写入前执行的 SQL 语句(仅支持单条),如清除旧数据 | ❌ | — |
postSql |
写入后执行的 SQL 语句(仅支持单条),如添加时间戳标记 | ❌ | — |
batchSize |
单次批量提交的记录数,值越大网络交互越少、吞吐越高,但需注意内存占用 | ❌ | 1024 |
📖 完整参数说明参见 rdbmswriter 官方文档
读取参数(rdbmsreader)
rdbmsreader 通过 JDBC 连接源数据库,根据配置生成 SELECT 查询并拉取数据。
| 参数 | 说明 | 必选 | 默认值 |
|---|---|---|---|
jdbcUrl |
源数据库的 JDBC 连接地址 | ✅ | — |
username |
数据库用户名 | ✅ | — |
password |
数据库密码 | ✅ | — |
table |
需同步的源表名 | ✅ | — |
column |
需同步的列名集合,JSON 数组格式,["*"] 表示所有列 |
✅ | — |
splitPk |
数据分片字段,指定后 DataX 将启动并发任务提升同步效能 | ❌ | — |
where |
筛选条件,与 table、column 拼接为 SELECT 语句 |
❌ | — |
querySql |
自定义查询 SQL,配置后 table、column、where 将被忽略,适用于多表 JOIN 等复杂场景 |
❌ | — |
fetchSize |
每次从服务端批量获取的记录条数,影响网络交互次数 | ❌ | 1024 |
📖 完整参数说明参见 rdbmsreader 官方文档
使用示例
示例 1:Oracle → ArgoDB(读 Oracle 写 ArgoDB)
以下任务从 Oracle 11g 的 emp 表读取数据,写入 ArgoDB 的 default.emp_holo 表。
1)Oracle 端准备测试数据
DROP TABLE EMP;
CREATE TABLE EMP (
EMPNO INT,
ENAME VARCHAR2(255),
JOB VARCHAR2(255),
MGR INT,
HIREDATE DATE,
SAL INT,
COMM INT,
DEPTNO INT
);
INSERT INTO EMP VALUES (7369,'SMITH','CLERK',7902,TO_DATE('17-12-1980','dd-mm-yyyy'),800,NULL,20);
INSERT INTO EMP VALUES (7499,'ALLEN','SALESMAN',7698,TO_DATE('20-2-1981','dd-mm-yyyy'),1600,300,30);
INSERT INTO EMP VALUES (7521,'WARD','SALESMAN',7698,TO_DATE('22-2-1981','dd-mm-yyyy'),1250,500,30);
INSERT INTO EMP VALUES (7566,'JONES','MANAGER',7839,TO_DATE('2-4-1981','dd-mm-yyyy'),2975,NULL,20);
INSERT INTO EMP VALUES (7654,'MARTIN','SALESMAN',7698,TO_DATE('28-9-1981','dd-mm-yyyy'),1250,1400,30);
COMMIT;
2)编写 DataX Job 配置
创建 oracle_to_argodb.json:
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "",
"password": "",
"connection": [{
"querySql": ["SELECT * FROM emp"],
"jdbcUrl": ["jdbc:oracle:thin:@:1521/"]
}]
}
},
"writer": {
"name": "rdbmswriter",
"parameter": {
"connection": [{
"jdbcUrl": "jdbc:hive2://:10000/default;guardianToken=",
"table": ["emp_holo"]
}],
"username": "",
"password": "",
"table": "emp_holo",
"column": ["*"],
"preSql": []
}
}
}]
}
}
3)执行同步任务
python2 ${DATAX_HOME}/datax.py ./oracle_to_argodb.json
执行完成后,在 ArgoDB 中查询 emp_holo 表验证数据写入。
示例 2:ArgoDB → Oracle(读 ArgoDB 写 Oracle)
以下任务从 ArgoDB 的 default.emp_holo 表读取数据,写入 Oracle 11g 的 emp 表。
1)ArgoDB 端准备测试数据
DROP TABLE IF EXISTS emp_holo;
CREATE TABLE emp_holo (
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate DATE,
sal INT,
comm INT,
deptno INT
) CLUSTERED BY (empno) INTO 3 BUCKETS
STORED AS HOLODESK;
INSERT INTO emp_holo VALUES (7369,'SMITH','CLERK',7902,tdh_todate('17-12-1980','dd-mm-yyyy'),800,NULL,20);
2)编写 DataX Job 配置
创建 argodb_to_oracle.json:
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "",
"password": "",
"connection": [{
"querySql": ["SELECT * FROM emp_holo"],
"jdbcUrl": ["jdbc:hive2://:10000/default;guardianToken="]
}]
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"connection": [{
"jdbcUrl": "jdbc:oracle:thin:@:1521/",
"table": ["emp"]
}],
"username": "",
"password": "",
"column": ["*"],
"preSql": [],
"postSql": [],
"batchSize": 1024
}
}
}]
}
}
3)执行同步任务
python2 ${DATAX_HOME}/datax.py ./argodb_to_oracle.json
执行完成后,在 Oracle 中查询 emp 表验证数据写入。
常见问题
Q1:执行时报 ClassNotFoundException: io.transwarp.jdbc.InceptorDriver
原因:驱动 jar 包未正确放置,或 plugin.json 中未注册驱动类。
解决:
- 确认
quark-driver-x.x.x.jar存在于对应插件的libs/目录 - 确认
plugin.json的drivers数组中包含io.transwarp.jdbc.InceptorDriver - 使用
grep命令验证 jar 包中确实包含该驱动类
Q2:连接 ArgoDB 超时
原因:网络不通或端口未开放。
解决:
- 使用
telnet或nc测试 Quark Server 节点及端口连通性 - 检查防火墙规则是否放行
hive.server2.thrift.port(默认 10000) - 确认
jdbcUrl中的guardianToken参数有效
Q3:写入数据时出现 OOM
原因:batchSize 设置过大,导致内存溢出。
解决:适当降低 batchSize 值(如 512 或 256),在吞吐与内存之间取得平衡。