概要描述
本文档主要介绍Argodb如何通过datax的rdbmsreader/rdbmswriter插件,读写argodb表。
版本要求: Argodb5.x无法像6.x版本可以使用自定义argodbwriter/argodbreader插件,可以才有本文档的jdbc方式
6.x及以后版本,建议使用argodbwriter/argodbReader,参考KB datax读写ArgoDB(自定义argodbwriter/argodbreader插件)
详细介绍
一. 配置与准备
1. 部署datax
官方 github 下载并解压,按要求准备 DataX 的运行环境依赖
2. 部署argodb驱动包
从manager页面获取对应quark版本的jdbc驱动包,放置到 datax 对应路径下
cd datax
# 如需【写】ArgoDB 的功能,执行以下操作
cp quark-driver-8.37.3.jar plugin/reader/rdbmswriter/libs/
# 如需【读】ArgoDB 的功能,执行以下操作
cp quark-driver-8.37.3.jar plugin/reader/rdbmsreader/libs/
3. 修改plugin.json
修改这2个plugin.json文件,在drivers里面追加上 "io.transwarp.jdbc.InceptorDriver"
datax/plugin/writer/rdbmswriter/plugin.json
datax/plugin/reader/rdbmsreader/plugin.json
以rdbmswriter为例:
{
"name": "rdbmswriter",
"class": "com.alibaba.datax.plugin.reader.rdbmswriter.RdbmsWriter",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba",
"drivers": [
"dm.jdbc.driver.DmDriver",
"com.sybase.jdbc3.jdbc.SybDriver",
"com.edb.Driver",
"com.ibm.db2.jcc.DB2Driver",
"io.transwarp.jdbc.InceptorDriver"
]
}
确保这个类在驱动jar中能够匹配到 grep -nir "io.transwarp.jdbc.InceptorDriver" datax/plugin/writer/rdbmswriter/libs/quark-driver-8.37.3.jar
4. 网络配置
DataX 进程需要能访问quark server所在节点,及参数配置项 hive.server2.thrift.port 指定的端口,默认10000
二. DataX Job 配置
1. 写任务
实现原理:
RDBMSWriter 通过 DataX 框架获取 Reader 生成的协议数据,RDBMSWriter 通过 JDBC 连接远程 RDBMS 数据库,并执行相应的 insert into … 的 sql 语句将数据写入 RDBMS。
部分参数说明: 参考 rdbmswriter介绍 :
| 参数 | 描述 | 必选 | 默认值 |
|---|---|---|---|
| jdbcUrl | 描述的是到对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息。请注意不同的数据库jdbc的格式是不同的,DataX会根据具体jdbc的格式选择合适的数据库驱动完成数据读取 | 是 | 无 |
| username | 数据源的用户名 | 是 | 无 |
| password | 数据源指定用户名的密码 | 是 | 无 |
| table | 目标表名称,如果表的schema信息和上述配置username不一致,请使用schema.table的格式填写table信息。 | 是 | 无 |
| column | 所配置的表中需要同步的列名集合。以英文逗号(,)进行分隔。我们强烈不推荐用户使用默认列情况 | 是 | 无 |
| preSql | 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据。 | 否 | 无 |
| postSql | 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳。 | 否 | 无 |
| batchSize | 一次性批量提交的记录数大小,该值可以极大减少DataX与RDBMS的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。 | 否 | 1024 |
2. 读任务
实现原理:
简而言之,RDBMSReader通过JDBC连接器连接到远程的RDBMS数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程RDBMS数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column、Where的信息,RDBMSReader将其拼接为SQL语句发送到RDBMS数据库;对于用户配置querySql信息,RDBMS直接将其发送到RDBMS数据库。
部分参数说明: 参考 rdbmsreader介绍 :
| 参数 | 描述 | 必选 | 默认值 |
|---|---|---|---|
| jdbcUrl | 描述的是到对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息。请注意不同的数据库jdbc的格式是不同的,DataX会根据具体jdbc的格式选择合适的数据库驱动完成数据读取 | 是 | 无 |
| username | 数据源的用户名 | 是 | 无 |
| password | 数据源指定用户名的密码 | 是 | 无 |
| table | 所选取的需要同步的表名 | 是 | 无 |
| column | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如[”] | 是 | 无 |
| splitPk | RDBMSReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能 | 否 | 空 |
| where | 筛选条件,RDBMSReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。例如在做测试时,可以将where条件指定为limit 10;在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate | 否 | 无 |
| querySql | 在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id | 否 | 无 |
| fetchSize | 该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能 | 否 | 1024 |
三. 场景举例
场景1. 读oracle写argodb
以下任务从oracle 11g 的emp表中读取数据,写入到 default.emp_holo 表中。
Oracle端 创建测试表
DROP TABLE EMP;
CREATE TABLE EMP(
EMPNO int,
ENAME varchar2(255),
JOB varchar2(255),
MGR INT,
HIREDATE DATE,
SAL INT,
COMM INT,
DEPTNO INT
);
INSERT INTO EMP VALUES (7369,'SMITH','CLERK',7902,to_date('17-12-1980','dd-mm-yyyy'),800,NULL,20);
INSERT INTO EMP VALUES (7499,'ALLEN','SALESMAN',7698,to_date('20-2-1981','dd-mm-yyyy'),1600,300,30);
INSERT INTO EMP VALUES (7521,'WARD','SALESMAN',7698,to_date('22-2-1981','dd-mm-yyyy'),1250,500,30);
INSERT INTO EMP VALUES (7566,'JONES','MANAGER',7839,to_date('2-4-1981','dd-mm-yyyy'),2975,NULL,20);
INSERT INTO EMP VALUES (7654,'MARTIN','SALESMAN',7698,to_date('28-9-1981','dd-mm-yyyy'),1250,1400,30);
oracleToArgodb_byJDBC.json 的内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "lkw",
"password": "123456",
"connection": [{
"querySql": [
"select * from emp"
],
"jdbcUrl": [
"jdbc:oracle:thin:@172.18.131.175:1521/helowin"
]
}
]
}
},
"writer": {
"name": "rdbmswriter",
"parameter": {
"connection": [{
"jdbcUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH",
"table": [
"emp_holo"
]
}
],
"username": "hive",
"password": "123456",
"table": "emp_holo",
"column": [
"*"
],
"preSql": []
}
}
}
]
}
}
Job 启动命令如下:
python2 datax.py ./oracleToArgodb_byJDBC.json
执行完成之后,检查emp_holo表导入成功
场景2. 读argodb写oracle
以下任务从argodb的 default.emp_holo表,写入到 oracle 11g 的emp表中。
Argodb端 创建测试表
DROP TABLE IF EXISTS EMP_HOLO;
CREATE TABLE EMP_HOLO(
EMPNO int,
ENAME string,
JOB string,
MGR INT,
HIREDATE DATE,
SAL INT,
COMM INT,
DEPTNO INT
)CLUSTERED BY (empno) INTO 3 BUCKETS
STORED AS HOLODESK;
INSERT INTO EMP_HOLO VALUES (7369,'SMITH','CLERK',7902,tdh_todate('17-12-1980','dd-mm-yyyy'),800,NULL,20);
ArgodbToOracle_byJDBC.json 的内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "lkw",
"password": "123456",
"connection": [{
"querySql": [
"select * from emp_holo"
],
"jdbcUrl": [
"jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH"
]
}
]
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"connection": [{
"jdbcUrl": "jdbc:oracle:thin:@172.18.131.175:1521/helowin",
"table": [
"emp"
]
}
],
"username": "lkw",
"password": "123456",
"column": [
"*"
],
"preSql": [],
"postSql": [],
"batchSize": 1024
}
}
}
]
}
}
Job 启动命令如下:
python2 datax.py ./ArgodbToOracle_byJDBC.json
执行完成之后,检查emp表导入成功