大数跨境
0
0

DataX入门指南:从零开始掌握数据同步

DataX入门指南:从零开始掌握数据同步 Lisa聊外贸
2025-10-16
2
导读:什么是DataX?DataX 是阿里的开源的,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。

什么是DataX?
DataX 是阿里的开源的,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

离线同步工具和在线同步工具的区别?
特性维度
离线数据同步
在线数据同步
核心目标
批量(Batch) 一次处理大量数据
实时(Real-Time) 持续处理数据流
数据延迟
高延迟(小时级、天级)
低延迟(秒级、毫秒级)
数据处理模型
对某个时间点的数据快照进行操作
对连续不断的数据流进行操作
对源端压力
瞬时压力大:在同步窗口内会大量读取源库
持续压力小:平滑地读取增量变化
典型场景
历史数据迁移
缓存同步
代表工具
DataX
Canel
总结:
  • 离线数据同步:像货运卡车。它定期(比如每天一次)将整批货物从A仓库运到B仓库。单次运量大,但频率低,有延迟。
  • 在线数据同步:像传送带。货物(数据)在A仓库一生产出来,就立刻被放到传送带上,源源不断地、以很低的延迟运往B仓库。

DataX架构

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    简单来说,DataX架构由核心框架和读写插件组件,假设我们有一个数据同步任务,将MySQL的数据同步到Oracle中,那么数据同步流程就是使用MySQL读插件从MySQL数据库中读取数据,然后将数据写入DataX核心框架,然后使用Oracle写插件从DataX核心框架中拿到MySQL读插件写入的数据,写入Oracle数据库中。

    这其中,MySQL读插件、FrameWork、Oracle写插件都为我们提供好了,对于使用者来说,只需在一个json配置文件中配置好这个数据同步任务所需的参数,比如需要什么读插件、什么写插件、同步并发度等,最后将配置文件交给DataX框架执行即可,DataX框架会根据我们的配置自动从我们配置的读插件读取数据,然后将数据写入到我们配置的写插件地址。

DataX插件设计有什么好处?
    想想,在没有插件设计之前,我们想写一个从MySQL同步到Oracle的数据同步任务,那么我们需要对此单独写一份代码,如果此时有一个新任务,将MySQL数据同步到HDFS,那又得写一份新代码,后面又来从MySQL同步到其他数据源的任务,还得写一份,每一份代码中从MySQL中读取数据的基础代码是其实一致,这样做代码不可复用,扩展性差。
    当使用DataX的插件开发模式后,我们只要开发好MySQL的读插件,就可以一直重复使用,写插件也是如此。
DataX的设计理念就是如此,将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX插件体系:
 经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
类型
数据源
Reader(读)
Writer(写)
文档
RDBMS 关系型数据库
MySQL
读 、写
           
Oracle    
    √    
    √    
读 、写
           
OceanBase  
    √    
    √    
读 、写

SQLServer
读 、写

PostgreSQL
读 、写

DRDS
读 、写

达梦
读 、写

通用RDBMS(支持所有关系型数据库)
读 、写
阿里云数仓数据存储
ODPS
读 、写

ADS


OSS
读 、写

OCS
读 、写
NoSQL数据存储
OTS
读 、写

Hbase0.94
读 、写

Hbase1.1
读 、写

MongoDB
读 、写

Hive
读 、写
无结构化数据存储
TxtFile
读 、写

FTP
读 、写

HDFS
读 、写

Elasticsearch

上面表格列举出了DataX现在支持的插件,原本点击文档一列的 ‘读’ 和 ‘写’ 能查看到对应的读写插件配置详解, 但微信公众号的格式我不知道怎么调, 导致文档链接丢失了  ,可以直接使用以下地址查看插件使用文档:
https://github.com/alibaba/DataX

DataX使用
下载DataX
https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
环境需要
windows和linux都可
JDK(1.8以上,推荐1.8)
python(2或者3都可)
DataX核心代码是Java编写,所以支持跨平台,并且需要JDK环境,DataX的数据同步任务使用python脚本启动,所以需要下载Python环境。(不熟悉python没关系,DataX只是用python执行脚本,并不设计python的复杂操作)
jdk下载地址:
https://www.oracle.com/cn/java/technologies/downloads/
python下载地址:
https://www.python.org/downloads/

我本次打算在window系统上演示:
DataX下载后解压:

bin为python执行脚本所在目录。我们主要关注datax.py, 后续执行的所有数据同步任务都是以该脚本执行。
conf为防止datax核心配置文件。包含日志配置和core.json, 这个core.json的内容可以理解为每个同步任务所继承的父任务配置
job放置使用案例,学习时可参考
lib包含datax执行时所需库。


plugin是dataX支持的插件目录,里面包含读插件和写插件:


其他目录不过多介绍。



如何执行一个数据同步任务?
上面我们说过,执行datax同步任务使用data.py脚本,当我们在本机配置了python环境后,使用如下命令:
python /path/datax.py /path/job.json
python:  python编译器指令,安装好python默认存在。
/path/datax.py:  datax.py脚本的路径地址。
/path/job.json: 同步任务的配置,文件名随意,但必须是json格式。

接下来需要说一下同步任务配置文件的格式:
上面截图我直接拿的datax安装目录里job目录的job.json文件,这是一个实例文件,我会用它来讲解任务基本格式。

文件内容为json格式,外部一个大Json对象,第一层有一个名为 'job' 的Key,在DataX框架中,每一次数据同步被定义为一个Job任务,这个Job任务对应到配置文件里的 'job',  'job'的内容又分为两部分,'setting'和 'content', 'setting'中为本次任务全局设置,比如配置传输通道数量,或者错误限制信息。'content' 是一个数组,里面配置读插件和写插件。
setting部分配置
content部分的读插件设置:
content部分的写插件设置:
上面截图中,我对每一部分内容都添加了详细的注释。

我稍微调整一下上面的参数,直接运行以下这个数据同步任务查看效果,我将 'sliceRecordCount' 的值调整为10,防止控制台打印过多数据,将写插件的 'print' 改为true,表示控制台打印数据。
然后在DataX安装目录执行以下命令启动数据同步任务:
python bin/datax.py job/job.json

执行日志如下:
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

2025-10-16 10:12:46.279 [main] INFO  MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN2025-10-16 10:12:46.280 [main] INFO  MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]2025-10-16 10:12:46.296 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl2025-10-16 10:12:46.298 [main] INFO  Engine - the machine info  =>
        osInfo: Windows 10 amd64 10.0        jvmInfo:        Oracle Corporation 21 21.0.8+12-LTS-250        cpu num:        20
        totalPhysicalMemory:    -0.00G        freePhysicalMemory:     -0.00G        maxFileDescriptorCount: -1        currentOpenFileDescriptorCount: -1
        GC Names        [G1 Young Generation, G1 Old Generation, G1 Concurrent GC]
        MEMORY_NAME                    | allocation_size                | init_size        CodeHeap 'profiled nmethods'   | 116.31MB                       | 2.44MB        G1 Old Gen                     | 1,024.00MB                     | 973.00MB        G1 Survivor Space              | -0.00MB                        | 0.00MB        CodeHeap 'non-profiled nmethods' | 116.38MB                       | 2.44MB        Compressed Class Space         | 1,024.00MB                     | 0.00MB        Metaspace                      | -0.00MB                        | 0.00MB        G1 Eden Space                  | -0.00MB                        | 51.00MB        CodeHeap 'non-nmethods'        | 7.31MB                         | 2.44MB

2025-10-16 10:12:46.331 [main] INFO  Engine -{        "setting":{                "speed":{                        "channel":1                },                "errorLimit":{                        "record":0,                        "percentage":0.02                }        },        "content":[                {                        "reader":{                                "name":"streamreader",                                "parameter":{                                        "column":[                                                {                                                        "value":"DataX",                                                        "type":"string"                                                },                                                {                                                        "value":20250101,                                                        "type":"long"                                                },                                                {                                                        "value":"2025-01-01 00:00:00",                                                        "type":"date"                                                },                                                {                                                        "value":true,                                                        "type":"bool"                                                },                                                {                                                        "value":"test",                                                        "type":"bytes"                                                }                                        ],                                        "sliceRecordCount":10                                }                        },                        "writer":{                                "name":"streamwriter",                                "parameter":{                                        "print":true,                                        "encoding":"UTF-8"                                }                        }                }        ]}
2025-10-16 10:12:46.340 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false2025-10-16 10:12:46.340 [main] INFO  JobContainer - DataX jobContainer starts job.2025-10-16 10:12:46.341 [main] INFO  JobContainer - Set jobId = 02025-10-16 10:12:46.348 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...2025-10-16 10:12:46.349 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do prepare work .2025-10-16 10:12:46.349 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do prepare work .2025-10-16 10:12:46.349 [job-0] INFO  JobContainer - jobContainer starts to do split ...2025-10-16 10:12:46.349 [job-0] INFO  JobContainer - Job set Channel-Number to 1 channels.2025-10-16 10:12:46.350 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.2025-10-16 10:12:46.350 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.2025-10-16 10:12:46.363 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...2025-10-16 10:12:46.365 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.2025-10-16 10:12:46.366 [job-0] INFO  JobContainer - Running by standalone Mode.2025-10-16 10:12:46.369 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.2025-10-16 10:12:46.372 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.2025-10-16 10:12:46.372 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.2025-10-16 10:12:46.379 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is startedDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    testDataX   20250101        2025-01-01 00:00:00     true    test2025-10-16 10:12:46.482 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[104]ms2025-10-16 10:12:46.482 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.2025-10-16 10:12:56.384 [job-0] INFO  StandAloneJobContainerCommunicator - Total 10 records, 260 bytes | Speed 26B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%2025-10-16 10:12:56.385 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.2025-10-16 10:12:56.387 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.2025-10-16 10:12:56.388 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.2025-10-16 10:12:56.389 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.2025-10-16 10:12:56.392 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: E:\develop-env\datax\hook2025-10-16 10:12:56.395 [job-0] INFO  JobContainer -         [total cpu info] =>                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                -1.00%                         | -1.00%                         | -1.00%

         [total gc info] =>                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime                 G1 Young Generation  | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s                 G1 Old Generation    | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s                 G1 Concurrent GC     | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s
2025-10-16 10:12:56.396 [job-0] INFO  JobContainer - PerfTrace not enable!2025-10-16 10:12:56.397 [job-0] INFO  StandAloneJobContainerCommunicator - Total 10 records, 260 bytes | Speed 26B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%2025-10-16 10:12:56.398 [job-0] INFO  JobContainer -任务启动时刻                    : 2025-10-16 10:12:46任务结束时刻                    : 2025-10-16 10:12:56任务总计耗时                    :                 10s任务平均流量                    :               26B/s记录写入速度                    :              1rec/s读出记录总数                    :                  10读写失败总数                    :                   0

我们发现,DataX执行数据同步任务时,给我们打印了很全面的日志信息,最开始给我们打印了执行任务机器的相关信息,包括jvm信息等:
然后打印了我们 任务配置的json内容
后面我们能看到我们读取的数据在控制台打印输出了:
当DataX一次任务执行完毕后,会显示一个任务总的统计信息:


实战案例
上面执行的只是DataX本身提供的最简单案例,下面我们稍微操作一个复杂一点的案例,我想把一个MYSQL表数据同步到另一个MYSQL表中。

整体思路:
对我们来说,只需编写好相应的配置文件交给DataX执行即可,所以我们重点关注配置文件的三部分内容。
我们是从MYSQL读数据,所以读数据选择mysqlreader插件,目录的名字也是插件的名字。

我们需要将数据同步到MYSQL中,所以写数据选择mysqlwriter插件

DataX内部每开发一个插件会为插件配备一个完善的属性配置文档,这个文档会告诉你这个插件能配置哪些属性,哪些属性必填,哪些非必填,以及对这些属性的详细解释等。
以我们选择的mysqlreader和mysqlwriter插件为例,他们的配置文档地址如下:
mysqlreader: 
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
mysqlwriter:
https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

这些文档在Github的DataX项目中可以找到,我就不详细解释了。

新建一个json配置文件,名字为mysql2msql.json(文件名字随意,格式必须为Json文件),内容如下:
mysql2mysql.json
{  "job": {    "setting": {      "speed": {        "channel": 1  #设置传输并发      }    },    "content": [      {        "reader": {  # 读插件配置          "name""mysqlreader"# 插件名,完整名字可在plugin/reader下找到          "parameter": {  # mysqlreader的配置参数            "username""root"# 连接mysql的用户名            "password""root"# 连接mysql的密码            "column": [   #需要查询的列 此处代表查询[id,name,age,gender]列              "id",              "name",              "age",              "gender"            ],            "splitPk""id"# 分片字段, DataX如果并发执行                             # 需要根据这个字段对任务进行分片推荐使用主键            "connection": [ #连接信息              {                "table": [ #表名                  "users"                ],                "jdbcUrl": [ #url                  "jdbc:mysql://127.0.0.1:3306/lyc_test?useSSL=false"                ]              }            ]          }        },        "writer": {  #写插件配置          "name""mysqlwriter",# 插件名,完整名字可在plugin/writer下找到          "parameter": { #写插件的参数配置            "writeMode""insert"#写入模式insert代表直接插入数据但是遇到id冲突会报错            "username""root"#写入数据库用户名            "password""root"#写入数据库密码            "column": [ #写入哪些列              "id",              "name",              "age",              "gender"            ],            "connection": [ #连接信息              {                "jdbcUrl""jdbc:mysql://127.0.0.1:3306/lyc_test_copy?useSSL=false",                "table": [                  "users_copy"                ]              }            ]          }        }      }    ]  }}
上面的配置注释已经很详细了,配置内容代表的操作就是从读插件配置的数据库lyc_test的表users中获取 [id,name,age,gender] 这四列数据,然后写入到写插件配置的数据库lyc_test_copy的表users_copy中。
可以看到,我现在源表有数据,目标表数据为空,现在我开始执行数据同步任务,
我是将mysql2mysql.json文件放在datax安装目录,所以命令为
python bin/datax.py ./mysql2mysql.json
执行后,控制台看到如下统计信息,说明任务结束了:
我们去目的数据库查看,发现数据已经同步过来了:


总结一下,其实DataX的使用很简单,我们首先需要明确我们的同步任务是从哪个数据源到哪个数据源,然后选择合适的插件在json文件中进行配置,插件参数的配置文档参考Github/DataX即可:
https://github.com/alibaba/DataX

DataX的插件式设计扩展性很高,当我们遇到DataX不支持的数据源插件时,我们可以自定义读写插件,加入到DataX插件体系中,能与之前定义的所有插件完美兼容,后续我会出一篇关于如何自定义DataX插件的文章,欢迎大家持续关注.

【声明】内容源于网络
0
0
Lisa聊外贸
跨境分享吧 | 长期输出优质内容
内容 44193
粉丝 1
Lisa聊外贸 跨境分享吧 | 长期输出优质内容
总阅读228.0k
粉丝1
内容44.2k