datax读写ArgoDB(rdbmsreader/rdbmswriter)

  其他常见问题
内容纲要

概要描述


本文档主要介绍Argodb如何通过datax的rdbmsreader/rdbmswriter插件,读写argodb表。

版本要求: Argodb5.x无法像6.x版本可以使用自定义argodbwriter/argodbreader插件,可以才有本文档的jdbc方式

6.x及以后版本,建议使用argodbwriter/argodbReader,参考KB datax读写ArgoDB(自定义argodbwriter/argodbreader插件)

详细介绍


一. 配置与准备

1. 部署datax

官方 github 下载并解压,按要求准备 DataX 的运行环境依赖

2. 部署argodb驱动包

从manager页面获取对应quark版本的jdbc驱动包,放置到 datax 对应路径下

cd datax

# 如需【写】ArgoDB 的功能,执行以下操作
cp quark-driver-8.37.3.jar plugin/reader/rdbmswriter/libs/

# 如需【读】ArgoDB 的功能,执行以下操作
cp quark-driver-8.37.3.jar plugin/reader/rdbmsreader/libs/
3. 修改plugin.json

修改这2个plugin.json文件,在drivers里面追加上 "io.transwarp.jdbc.InceptorDriver"

datax/plugin/writer/rdbmswriter/plugin.json
datax/plugin/reader/rdbmsreader/plugin.json

以rdbmswriter为例:

{
    "name": "rdbmswriter",
    "class": "com.alibaba.datax.plugin.reader.rdbmswriter.RdbmsWriter",
    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
    "developer": "alibaba",
    "drivers": [
        "dm.jdbc.driver.DmDriver",
        "com.sybase.jdbc3.jdbc.SybDriver",
        "com.edb.Driver",
        "com.ibm.db2.jcc.DB2Driver",
        "io.transwarp.jdbc.InceptorDriver"
    ]
}

确保这个类在驱动jar中能够匹配到 grep -nir "io.transwarp.jdbc.InceptorDriver" datax/plugin/writer/rdbmswriter/libs/quark-driver-8.37.3.jar

4. 网络配置

DataX 进程需要能访问quark server所在节点,及参数配置项 hive.server2.thrift.port 指定的端口,默认10000

二. DataX Job 配置

1. 写任务

实现原理:

RDBMSWriter 通过 DataX 框架获取 Reader 生成的协议数据,RDBMSWriter 通过 JDBC 连接远程 RDBMS 数据库,并执行相应的 insert into … 的 sql 语句将数据写入 RDBMS。

部分参数说明: 参考 rdbmswriter介绍

参数 描述 必选 默认值
jdbcUrl 描述的是到对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息。请注意不同的数据库jdbc的格式是不同的,DataX会根据具体jdbc的格式选择合适的数据库驱动完成数据读取
username 数据源的用户名
password 数据源指定用户名的密码
table 目标表名称,如果表的schema信息和上述配置username不一致,请使用schema.table的格式填写table信息。
column 所配置的表中需要同步的列名集合。以英文逗号(,)进行分隔。我们强烈不推荐用户使用默认列情况
preSql 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据。
postSql 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳。
batchSize 一次性批量提交的记录数大小,该值可以极大减少DataX与RDBMS的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。 1024
2. 读任务

实现原理:

简而言之,RDBMSReader通过JDBC连接器连接到远程的RDBMS数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程RDBMS数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

对于用户配置Table、Column、Where的信息,RDBMSReader将其拼接为SQL语句发送到RDBMS数据库;对于用户配置querySql信息,RDBMS直接将其发送到RDBMS数据库。

部分参数说明: 参考 rdbmsreader介绍

参数 描述 必选 默认值
jdbcUrl 描述的是到对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息。请注意不同的数据库jdbc的格式是不同的,DataX会根据具体jdbc的格式选择合适的数据库驱动完成数据读取
username 数据源的用户名
password 数据源指定用户名的密码
table 所选取的需要同步的表名
column 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如[”]
splitPk RDBMSReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能
where 筛选条件,RDBMSReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。例如在做测试时,可以将where条件指定为limit 10;在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate
querySql 在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
fetchSize 该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能 1024

三. 场景举例

场景1. 读oracle写argodb

以下任务从oracle 11g 的emp表中读取数据,写入到 default.emp_holo 表中。

Oracle端 创建测试表

DROP TABLE EMP;
CREATE TABLE EMP(
       EMPNO int,
       ENAME varchar2(255),
       JOB varchar2(255),
       MGR INT,
       HIREDATE DATE,
       SAL INT,
       COMM INT,
       DEPTNO INT
);

INSERT INTO EMP VALUES (7369,'SMITH','CLERK',7902,to_date('17-12-1980','dd-mm-yyyy'),800,NULL,20);
INSERT INTO EMP VALUES (7499,'ALLEN','SALESMAN',7698,to_date('20-2-1981','dd-mm-yyyy'),1600,300,30);
INSERT INTO EMP VALUES (7521,'WARD','SALESMAN',7698,to_date('22-2-1981','dd-mm-yyyy'),1250,500,30);
INSERT INTO EMP VALUES (7566,'JONES','MANAGER',7839,to_date('2-4-1981','dd-mm-yyyy'),2975,NULL,20);
INSERT INTO EMP VALUES (7654,'MARTIN','SALESMAN',7698,to_date('28-9-1981','dd-mm-yyyy'),1250,1400,30);

oracleToArgodb_byJDBC.json 的内容如下:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "username": "lkw",
                        "password": "123456",
                        "connection": [{
                                "querySql": [
                                    "select * from emp"
                                ],
                                "jdbcUrl": [
                                    "jdbc:oracle:thin:@172.18.131.175:1521/helowin"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "rdbmswriter",
                    "parameter": {
                        "connection": [{
                                "jdbcUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH",
                                "table": [
                                    "emp_holo"
                                ]
                            }
                        ],
                        "username": "hive",
                        "password": "123456",
                        "table": "emp_holo",
                        "column": [
                            "*"
                        ],
                        "preSql": []
                    }
                }
            }
        ]
    }
}

Job 启动命令如下:

python2 datax.py ./oracleToArgodb_byJDBC.json

执行完成之后,检查emp_holo表导入成功

场景2. 读argodb写oracle

以下任务从argodb的 default.emp_holo表,写入到 oracle 11g 的emp表中。

Argodb端 创建测试表

DROP TABLE IF EXISTS EMP_HOLO;
CREATE TABLE EMP_HOLO(
       EMPNO int,
       ENAME string,
       JOB string,
       MGR INT,
       HIREDATE DATE,
       SAL INT,
       COMM INT,
       DEPTNO INT
)CLUSTERED BY (empno) INTO 3 BUCKETS
STORED AS HOLODESK;

INSERT INTO EMP_HOLO VALUES (7369,'SMITH','CLERK',7902,tdh_todate('17-12-1980','dd-mm-yyyy'),800,NULL,20);

ArgodbToOracle_byJDBC.json 的内容如下:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
                "reader": {
                    "name": "rdbmsreader",
                    "parameter": {
                        "username": "lkw",
                        "password": "123456",
                        "connection": [{
                                "querySql": [
                                    "select * from emp_holo"
                                ],
                                "jdbcUrl": [
                                    "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "oraclewriter",
                    "parameter": {
                        "connection": [{
                                "jdbcUrl": "jdbc:oracle:thin:@172.18.131.175:1521/helowin",
                                "table": [
                                    "emp"
                                ]
                            }
                        ],
                        "username": "lkw",
                        "password": "123456",
                        "column": [
                            "*"
                        ],
                        "preSql": [],
                        "postSql": [],
                        "batchSize": 1024
                    }
                }
            }
        ]
    }
}

Job 启动命令如下:

python2 datax.py ./ArgodbToOracle_byJDBC.json

执行完成之后,检查emp表导入成功

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。