datax读写ArgoDB

  其他常见问题
内容纲要

概要描述


本文档主要介绍如何通过datax工具读写argodb表。

版本要求:
ArgoDB版本>=6, Patch版本>=20250930

详细介绍


一. 配置与准备

1. DataX 程序包

官方 github 下载并解压,按要求准备 DataX 的运行环境依赖

2. ArgoDB DataX 插件包

注:${version} 根据实际使用的包进行调整
在 ArgoDB patch 的配套工具里获取以下包:

argodb_client/argodb-datax/argodbwriter-${version}.jar
argodb_client/argodb-datax/argodbreader-${version}.jar 
argodb_client/argodb-sink-client/quark-driver-${version}.jar
argodb_client/argodb-sink-client/argodb-sink-client-${version}-all-in-one.jar

在 datax 路径下创建目录并放置需要的包

cd datax

# 如需【写】ArgoDB 的功能,执行以下操作
mkdir -p plugin/writer/argodbwriter/libs
cp argodbwriter-${version}.jar plugin/writer/argodbwriter/
cp argodb-sink-client-${version}-all-in-one.jar plugin/writer/argodbwriter/libs/
cp quark-driver-${version}.jar plugin/writer/argodbwriter/libs/

# 如需【读】ArgoDB 的功能,执行以下操作
mkdir -p plugin/reader/argodbreader/libs
cp argodbreader-${version}.jar plugin/reader/argodbreader/
# 将对应版本 quark 镜像中 /usr/lib/inceptor/lib 的所有 jar 包准备好(后续会优化需要包的列表)
cp /usr/lib/inceptor/lib下的所有jar包 plugin/reader/argodbreader/libs/
3. ArgoDB DataX 插件配置

将 writer_plugin.json 文件放置在 datax/plugin/writer/argodbwriter/ 目录下并重命名为 plugin.json
将 reader_plugin.json 文件放置在 datax/plugin/reader/argodbreader/ 目录下并重命名为 plugin.json

4. 网络配置

DataX 进程需要能访问以下端口,需要开放的端口清单:

访问 Quark

  • 配置项 hive.server2.thrift.port 指定的端口

访问 TDDMS

  • 以配置项 master.rpc_service.master_service_port 开始的连续4个端口
  • 以配置项 tabletserver.rpc_service.manage_service_port 开始的连续4个端口

二. DataX Job 配置

1. 写任务

部分参数说明:

# preSqlList - 执行写入前需要执行的 SQL 列表(如创建分区、清理等操作)
# postSqlList - 写入完成后需要执行的 SQL 列表
# compressionType - 写入数据的压缩方式;可选值有: SNAPPY, NOTCOMPRESSION, LZF, ZLIB,默认为 SNAPPY
# largeStringEnabled - 是否开启大 string 对象插入功能,默认 false;等价于执行 SQL 时的参数  argodb.insert.large.string.enabled
# maxLargeStringSize - 支持的最大的 string 大小,单位为 bytes,默认 3145728,只有当 largeStringEnabled 为 true 时才有效;等价于执行 SQL 时的参数 argodb.max.large.string.size
# columnList - 写入 Holodesk 表时,指定插入哪些列
{
    "name": "argodbwriter",
    "parameter": {
        "quarkTableName": "",
        "connection": {
          "quarkUrl": "",
          "quarkUserName": "",
          "quarkPasswd": ""
        },
        "sink": {
          "largeStringEnabled": false,
          "maxLargeStringSize": 3145728,
          "compressionType": "SNAPPY",
          "columnList": []
        },
        "sql": {
          "preSqlList": [],
          "postSqlList": []
        }
    }
}
2. 读任务

部分参数说明:

# neededColumns - 需要读哪些列,使用半角逗号将多个列名串连;如果需要读所有列,设置为 *;
# 数据输出时,会按照表原始的列顺序输出,不论 neededColumns 参数如何定义
{
    "name": "argodbreader",
    "parameter": {
        "quarkTableName": "",
        "neededColumns": "",
        "connection": {
          "quarkUrl": "",
          "quarkUserName": "",
          "quarkPasswd": ""
        }
    }
}

三. 场景举例

场景1. 读argodb写argodb

以下任务从 default.out_tbl 表中读取数据,写入到 default.in_tbl 表中。(两张表均为 Holodesk 表)

创建测试表

DROP TABLE IF EXISTS default.out_tbl;
DROP TABLE IF EXISTS default.in_tbl;
CREATE TABLE  default.out_tbl (a INT,b STRING ) STORED AS HOLODESK ;
CREATE TABLE  default.in_tbl (a INT,b STRING ) STORED AS HOLODESK ;
INSERT INTO default.out_tbl SELECT 1,'a' FROM system.dual;
INSERT INTO default.out_tbl SELECT 2,'b' FROM system.dual;

job.json 的内容如下:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
                "reader": {
                    "name": "argodbreader",
                    "parameter": {
                        "quarkTableName": "default.out_tbl",
                        "neededColumns": "*",
                        "connection": {
                            "quarkUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH?use.linac=false",
                            "quarkUserName": "",
                            "quarkPasswd": ""
                        }
                    }
                },
                "writer": {
                    "name": "argodbwriter",
                    "parameter": {
                        "quarkTableName": "default.in_tbl",
                        "connection": {
                            "quarkUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH?use.linac=false",
                            "quarkUserName": "",
                            "quarkPasswd": ""
                        },
                        "sink": {
                            "largeStringEnabled": false,
                            "maxLargeStringSize": 3145728,
                            "compressionType": "SNAPPY",
                            "columnList": []
                        },
                        "sql": {
                            "preSqlList": [],
                            "postSqlList": []
                        }
                    }
                }
            }
        ]
    }
}

Job 启动命令如下:

python datax/bin/datax.py job.json

执行完成之后,检查default.in_tbl表导入成功

场景2. 读oracle写argodb

以下任务从oracle 11g 的emp表中读取数据,写入到 default.emp_holo 表中。

oracle端 创建测试表

DROP TABLE EMP;
CREATE TABLE EMP(
       EMPNO int,
       ENAME varchar2(255),
       JOB varchar2(255),
       MGR INT,
       HIREDATE DATE,
       SAL INT,
       COMM INT,
       DEPTNO INT
);

INSERT INTO EMP VALUES (7369,'SMITH','CLERK',7902,to_date('17-12-1980','dd-mm-yyyy'),800,NULL,20);
INSERT INTO EMP VALUES (7499,'ALLEN','SALESMAN',7698,to_date('20-2-1981','dd-mm-yyyy'),1600,300,30);
INSERT INTO EMP VALUES (7521,'WARD','SALESMAN',7698,to_date('22-2-1981','dd-mm-yyyy'),1250,500,30);
INSERT INTO EMP VALUES (7566,'JONES','MANAGER',7839,to_date('2-4-1981','dd-mm-yyyy'),2975,NULL,20);
INSERT INTO EMP VALUES (7654,'MARTIN','SALESMAN',7698,to_date('28-9-1981','dd-mm-yyyy'),1250,1400,30);

oracleToArgodb.json 的内容如下:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 10
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [{
                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "username": "lkw",
                        "password": "123456",
                        "connection": [{
                                "querySql": ["select * from emp"],
                                "jdbcUrl": ["jdbc:oracle:thin:@172.18.131.175:1521/helowin"]
                            }
                        ],
                    }
                },
                "writer": {
                    "name": "argodbwriter",
                    "parameter": {
                        "quarkTableName": "default.emp_holo",
                        "connection": {
                            "quarkUrl": "jdbc:hive2://kv1:10000/default;guardianToken=so1GPbLhVmYJ459orice-KTDH?use.linac=false",
                            "quarkUserName": "",
                            "quarkPasswd": ""
                        },
                        "sink": {
                            "largeStringEnabled": false,
                            "maxLargeStringSize": 3145728,
                            "compressionType": "SNAPPY",
                            "columnList": []
                        },
                        "sql": {
                            "preSqlList": [],
                            "postSqlList": []
                        }
                    }
                }
            }
        ]
    }
}

Job 启动命令如下:

python ./datax.py oracleToArgodb.json

执行完成之后,检查emp_holo表导入成功

注意事项(必读)

1.写入 ArgoDB 时,不保证原子性,需要用户自行处理错误情况(如:部分 Task 失败,那么成功的 Task 会把数据写入 ArgoDB,需要用户能自己进行错误恢复);
2.对于 blob/clob 类型,仅支持 performance 表,建表时需要设置 set argodb.performance.complex.types.enabled=true;对于 blob 类型的写入,目前仅支持能转成 string 的二进制值(后续会完善);
3.目前不支持读分区表;
4.在使用 writer 的时,需要用户保证从 reader 读到的输出,列数量和顺序与写入的目标表一致。在未指定 columnList 参数时,reader 读出的列的数量和顺序必须和 Holodesk 表的表定义一致;指定 columnList 参数时,reader 读出的列的数量和顺序必须和 columnList 指定的一致。
5.udf不支持linac,所以遇到 Check Dependency Version failed: EXECUTION FAILED: Task MAPRED-SPARK error PlanException: [Error 1] Found no matched plan 的报错,有可能是全局开启了linac,执行 SELECT showArgodbVersion() 报错导致的,可以在jdbc连接串后面加上?use.linac=false session级关闭linac。

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。