datax读写ArgoDB(rdbmsreader/rdbmswriter)

  其他常见问题
内容纲要

概述

本文介绍如何通过 DataX 的 rdbmsreader / rdbmswriter 通用插件,以 JDBC 方式读写 ArgoDB 表数据。该方案适用于 ArgoDB 5.x 版本,利用 ArgoDB 兼容 Hive Server2 的 JDBC 接口完成数据同步。

版本选择指引

- **ArgoDB 5.x**:使用本文所述 JDBC 方式(rdbmsreader / rdbmswriter)
- **ArgoDB 6.x 及以上**:推荐使用专用的 argodbwriter / argodbreader 插件,性能更优、配置更简捷,详见 → [DataX 读写 ArgoDB(自定义插件方式)](https://kb.transwarp.cn/posts/18088)

环境准备

1. 部署 DataX

DataX 官方 GitHub 下载最新稳定版并解压,确保运行环境满足 DataX 依赖要求(JDK 1.8+、Python 2.7+)。

2. 部署 ArgoDB JDBC 驱动

从 Manager 页面获取与当前 Quark 版本匹配的 JDBC 驱动包(quark-driver-x.x.x.jar),将其放置到 DataX 对应插件目录下:

cd ${DATAX_HOME}

# 写 ArgoDB → 放入 rdbmswriter 插件目录
cp quark-driver-8.37.3.jar plugin/writer/rdbmswriter/libs/

# 读 ArgoDB → 放入 rdbmsreader 插件目录
cp quark-driver-8.37.3.jar plugin/reader/rdbmsreader/libs/

驱动包版本需与集群中 Quark 版本保持一致,否则可能出现兼容性问题。请从 Manager 页面下载对应版本,勿混用。

3. 注册驱动类

分别编辑以下两个 plugin.json 文件,在 drivers 数组中追加 io.transwarp.jdbc.InceptorDriver

${DATAX_HOME}/plugin/writer/rdbmswriter/plugin.json
${DATAX_HOME}/plugin/reader/rdbmsreader/plugin.json

rdbmswriter 为例,修改后的 plugin.json

{
    "name": "rdbmswriter",
    "class": "com.alibaba.datax.plugin.reader.rdbmswriter.RdbmsWriter",
    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet.",
    "developer": "alibaba",
    "drivers": [
        "dm.jdbc.driver.DmDriver",
        "com.sybase.jdbc3.jdbc.SybDriver",
        "com.edb.Driver",
        "com.ibm.db2.jcc.DB2Driver",
        "io.transwarp.jdbc.InceptorDriver"
    ]
}

验证驱动注册
执行以下命令确认驱动类已包含在 jar 包中:
grep -nir "io.transwarp.jdbc.InceptorDriver" plugin/writer/rdbmswriter/libs/quark-driver-8.37.3.jar

4. 网络连通性

确保 DataX 进程所在节点可以访问:

目标 说明
Quark Server 节点 DataX 需通过 JDBC 连接 Quark Server
hive.server2.thrift.port 端口 默认 10000,以实际集群配置为准

可通过以下命令快速验证连通性:

telnet  10000

参数参考

写入参数(rdbmswriter)

rdbmswriter 通过 JDBC 连接目标数据库,执行 INSERT INTO 语句批量写入数据。

参数 说明 必选 默认值
jdbcUrl 目标数据库的 JDBC 连接地址,遵循 ArgoDB JDBC 规范,格式为 jdbc:hive2://<host>:<port>/<db>;guardianToken=<token>
username 数据库用户名
password 数据库密码
table 目标表名;若 schema 与 username 不一致,需使用 schema.table 格式
column 需写入的列名集合,以英文逗号分隔
preSql 写入前执行的 SQL 语句(仅支持单条),如清除旧数据
postSql 写入后执行的 SQL 语句(仅支持单条),如添加时间戳标记
batchSize 单次批量提交的记录数,值越大网络交互越少、吞吐越高,但需注意内存占用 1024

📖 完整参数说明参见 rdbmswriter 官方文档

读取参数(rdbmsreader)

rdbmsreader 通过 JDBC 连接源数据库,根据配置生成 SELECT 查询并拉取数据。

参数 说明 必选 默认值
jdbcUrl 源数据库的 JDBC 连接地址
username 数据库用户名
password 数据库密码
table 需同步的源表名
column 需同步的列名集合,JSON 数组格式,["*"] 表示所有列
splitPk 数据分片字段,指定后 DataX 将启动并发任务提升同步效能
where 筛选条件,与 tablecolumn 拼接为 SELECT 语句
querySql 自定义查询 SQL,配置后 tablecolumnwhere 将被忽略,适用于多表 JOIN 等复杂场景
fetchSize 每次从服务端批量获取的记录条数,影响网络交互次数 1024

📖 完整参数说明参见 rdbmsreader 官方文档


使用示例

示例 1:Oracle → ArgoDB(读 Oracle 写 ArgoDB)

以下任务从 Oracle 11g 的 emp 表读取数据,写入 ArgoDB 的 default.emp_holo 表。

1)Oracle 端准备测试数据

DROP TABLE EMP;
CREATE TABLE EMP (
    EMPNO    INT,
    ENAME    VARCHAR2(255),
    JOB      VARCHAR2(255),
    MGR      INT,
    HIREDATE DATE,
    SAL      INT,
    COMM     INT,
    DEPTNO   INT
);

INSERT INTO EMP VALUES (7369,'SMITH','CLERK',7902,TO_DATE('17-12-1980','dd-mm-yyyy'),800,NULL,20);
INSERT INTO EMP VALUES (7499,'ALLEN','SALESMAN',7698,TO_DATE('20-2-1981','dd-mm-yyyy'),1600,300,30);
INSERT INTO EMP VALUES (7521,'WARD','SALESMAN',7698,TO_DATE('22-2-1981','dd-mm-yyyy'),1250,500,30);
INSERT INTO EMP VALUES (7566,'JONES','MANAGER',7839,TO_DATE('2-4-1981','dd-mm-yyyy'),2975,NULL,20);
INSERT INTO EMP VALUES (7654,'MARTIN','SALESMAN',7698,TO_DATE('28-9-1981','dd-mm-yyyy'),1250,1400,30);
COMMIT;

2)编写 DataX Job 配置

创建 oracle_to_argodb.json

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
            "reader": {
                "name": "oraclereader",
                "parameter": {
                    "username": "",
                    "password": "",
                    "connection": [{
                        "querySql": ["SELECT * FROM emp"],
                        "jdbcUrl": ["jdbc:oracle:thin:@:1521/"]
                    }]
                }
            },
            "writer": {
                "name": "rdbmswriter",
                "parameter": {
                    "connection": [{
                        "jdbcUrl": "jdbc:hive2://:10000/default;guardianToken=",
                        "table": ["emp_holo"]
                    }],
                    "username": "",
                    "password": "",
                    "table": "emp_holo",
                    "column": ["*"],
                    "preSql": []
                }
            }
        }]
    }
}

3)执行同步任务

python2 ${DATAX_HOME}/datax.py ./oracle_to_argodb.json

执行完成后,在 ArgoDB 中查询 emp_holo 表验证数据写入。


示例 2:ArgoDB → Oracle(读 ArgoDB 写 Oracle)

以下任务从 ArgoDB 的 default.emp_holo 表读取数据,写入 Oracle 11g 的 emp 表。

1)ArgoDB 端准备测试数据

DROP TABLE IF EXISTS emp_holo;
CREATE TABLE emp_holo (
    empno    INT,
    ename    STRING,
    job      STRING,
    mgr      INT,
    hiredate DATE,
    sal      INT,
    comm     INT,
    deptno   INT
) CLUSTERED BY (empno) INTO 3 BUCKETS
STORED AS HOLODESK;

INSERT INTO emp_holo VALUES (7369,'SMITH','CLERK',7902,tdh_todate('17-12-1980','dd-mm-yyyy'),800,NULL,20);

2)编写 DataX Job 配置

创建 argodb_to_oracle.json

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
            "reader": {
                "name": "rdbmsreader",
                "parameter": {
                    "username": "",
                    "password": "",
                    "connection": [{
                        "querySql": ["SELECT * FROM emp_holo"],
                        "jdbcUrl": ["jdbc:hive2://:10000/default;guardianToken="]
                    }]
                }
            },
            "writer": {
                "name": "oraclewriter",
                "parameter": {
                    "connection": [{
                        "jdbcUrl": "jdbc:oracle:thin:@:1521/",
                        "table": ["emp"]
                    }],
                    "username": "",
                    "password": "",
                    "column": ["*"],
                    "preSql": [],
                    "postSql": [],
                    "batchSize": 1024
                }
            }
        }]
    }
}

3)执行同步任务

python2 ${DATAX_HOME}/datax.py ./argodb_to_oracle.json

执行完成后,在 Oracle 中查询 emp 表验证数据写入。


常见问题

Q1:执行时报 ClassNotFoundException: io.transwarp.jdbc.InceptorDriver

原因:驱动 jar 包未正确放置,或 plugin.json 中未注册驱动类。

解决

  1. 确认 quark-driver-x.x.x.jar 存在于对应插件的 libs/ 目录
  2. 确认 plugin.jsondrivers 数组中包含 io.transwarp.jdbc.InceptorDriver
  3. 使用 grep 命令验证 jar 包中确实包含该驱动类

Q2:连接 ArgoDB 超时

原因:网络不通或端口未开放。

解决

  1. 使用 telnetnc 测试 Quark Server 节点及端口连通性
  2. 检查防火墙规则是否放行 hive.server2.thrift.port(默认 10000)
  3. 确认 jdbcUrl 中的 guardianToken 参数有效

Q3:写入数据时出现 OOM

原因batchSize 设置过大,导致内存溢出。

解决:适当降低 batchSize 值(如 512 或 256),在吞吐与内存之间取得平衡。


相关文档

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。