大数跨境

王炸组合:Dolphinscheudler 3.1.*搭配SeaTunnel 2.3.*高效完成异构数据数据集成

王炸组合:Dolphinscheudler 3.1.*搭配SeaTunnel 2.3.*高效完成异构数据数据集成 海豚调度
2025-01-13
2
导读:本篇主要介绍如何通过Dolphinscheduler海豚调度搭配Seatunnel完成异构数据源之间的数据同步功能,这个在大数据流批一体数仓建设的过程中是一个非常好的解决方案, 稳定高效,只要用上了你

点击蓝字,关注我们

转载自嫣夜来

01


概述

本篇主要介绍如何通过Dolphinscheduler海豚调度搭配Seatunnel完成异构数据源之间的数据同步功能,这个在大数据流批一体数仓建设的过程中是一个非常好的解决方案, 稳定高效,只要用上了你肯定爱不释手。


02


环境准备



  • dolphinscheduler集群 >= 3.1.5
  • dolphinscheduler3.1.5版本源码
  • Seatunnel集群 >= 2.3.3
没有安装好以上准备环境的童鞋,请先参考我的另外两篇文章完成基础环境搭建基于Seatunnel最新2.3.5版本分布式集群安装部署指南(小白版)及dolphinscheduler分布式集群部署指南(小白版)再回到章节继续。


03


配置文件修改




这里说明一下, 通过海豚调度配置的Seatunnel数据同步任务最后都会被分配到DS集群的某个Worker组或者某个worker节点上进行执行,所以你要保证你的DS集群的目标worker节点上也安装了Seatunnel服务。这很重要,因为实际dolphisncheduler中定义的seatunnel任务实例到最后都是需要调用worker节点上的seatunnel服务在本地执行seatunnel的任务启动命令来完成任务提交和运行。

Dolphinscheduler的配置文件修改

因为我们需要使用seatunnel完成数据集成,所以我们需要在dolphinscheduler的系统环境变量中将我们的Seatunnel的安装目录进行配置。
找到你的dolphinscheduler主节点的安装目录下的$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh
设置SEATUNNEL_HOME的访问目录,将SEATUNNEL_HOME设置为你自己的SeaTunnel安装目录。
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5}
然后保存重启Dolphinscheduler集群即可完成配置修改同步到所有的api-server、master-server及worker-server节点。

Dolphinscheduler部分源码修改

为什么要修改Dolphinscheduler的源码?
因为我这里使用的Seatunnel的版本是2.3.5,使用的引擎不是Seatunnel的默认引擎, 用的是Spark引擎, Spark我用的版本是2.4.5, 所有我最后在命令执行的命令如下:
 
 
 
$SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template
如果我用的是Spark3.X的版本,我执行命令如下:
 
 
 
$SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template
然而在Dolphinscheduler3.1.5版本的Seatunnel任务插件中,存在一些问题没办法兼容, 首先是前端,这里引擎只支持Spark和Flink,没有针对具体的版本进行兼容,没办法自由的选择使用Spark2、Spark3还是FIink13、Flink15。
其次就是后端的代码。
找到EngineEnum类, 修改一下代码如下:
 
 
 
public enum EngineEnum {

// FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
// SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"),
FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"),
SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"),
SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh");

private String command;

EngineEnum(String command) {
this.command = command;
}

public String getCommand() {
return command;
}
}
这样修改完毕之后就OK了, 然后编译打包dolphinscheduler的源码。

更新Dolphinscheduler集群中的SeaTunnel任务插件


项目编译打包完成之后,找到dolphinscheduler-task-seatunnel工程下的target目录下的dolphinscheduler-task-seatunnel-3.1.5.jar包, 上传到你的dolphinscheduler集群的主节点。
然后将主节点上DS安装目录下的api-server/libsmaster-server/libsworker-server/libsalert-server/libs目录(其实这里可以只替换woker-server/libs目录)下的dolphinscheduler-task-seatunnel-3.1.5.jar重命名为dolphinscheduler-task-seatunnel-3.1.5.jar.20240606(带上日期方便知道变更时间)。
然后将我们编译的dolphinscheduler-task-seatunnel-3.1.5.jar拷贝到这几个目录(api-server/libs、master-server/libs、worker-server/libs、alert-server/libs目录,确认一下是不是所有目录下都有这个dolphinscheduler-task-seatunnel-3.1.5.jar,没有的目录就直接略过)下。
然后使用主节点上的分发脚本,将api-server/libsmaster-server/libsworker-server/libsalert-server/libs的修改同步到其他的DS节点上,分发完成之后,检查一下分发是否成功。
最后就是重启我们的DS集群,通过以上步骤我们就完成了Dolphisncheduler中SeaTunnel插件的升级适配。


04


测试验证



我们通过dolphinscheduler的工作流定义页面定义一个Seatunnel数据同步的任务, 完成Oracle数据库表采集到MySQL数据库的任务, 下面我们来操作。
关于seatunnel任务配置脚本文件,官网的文档介绍如下:
  • Source: https://seatunnel.incubator.apache.org/zh-CN/docs/category/source-v2
  • Transform: https://seatunnel.incubator.apache.org/zh-CN/docs/category/transform-v2
  • Sink: https://seatunnel.incubator.apache.org/zh-CN/docs/category/sink-v2/

Source输入源配置定义说明

这里我们的输入原始Oracle, 所以直接从Source中查找Oracle相关的配置如何定义,官网给我们提供了不少任务示例,:

简单任务示例

 
 
 
# Defining the runtime environment
env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
query = "SELECT * FROM TEST_TABLE"
}
}

transform {}

sink {
Console {}
}

按分区列并行任务示例

并行读取你配置的分片字段和分片数据,如果你想读取整个表,可以这样做
 
 
 
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# 根据需要定义查询逻辑
query = "SELECT * FROM TEST_TABLE"
# 设置并行分片读取字段
partition_column = "ID"
# 分区切片数量
partition_num = 10
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
}
}
sink {
Console {}
}

按主键或唯一索引并行任务示例

配置table_path会开启自动分割,可以配置split.*来调整分割策略
 
 
 
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
table_path = "DA.SCHEMA1.TABLE1"
query = "select * from SCHEMA1.TABLE1"
split.size = 10000
}
}

sink {
Console {}
}

并行上下限任务示例

指定查询的上下限内的数据效率更高按照你配置的上下限来读取你的数据源效率更高
 
 
 
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# Define query logic as required
query = "SELECT * FROM TEST_TABLE"
partition_column = "ID"
# Read start boundary
partition_lower_bound = 1
# Read end boundary
partition_upper_bound = 500
partition_num = 10
}
}

多表读取任务示例

配置table_list会开启自动分割,可以通过配置split.来调整分割策略*
 
 
 
env {
job.mode = "BATCH"
parallelism = 4
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
"table_list"=[
{
"table_path"="XE.TEST.USER_INFO"
},
{
"table_path"="XE.TEST.YOURTABLENAME"
}
]
#where_condition= "where id > 100"
split.size = 10000
#split.even-distribution.factor.upper-bound = 100
#split.even-distribution.factor.lower-bound = 0.05
#split.sample-sharding.threshold = 1000
#split.inverse-sampling.rate = 1000
}
}

sink {
Console {}
}

Sink输出源配置定义说明

简单任务示例

本示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送到JDBC Sink。FakeSource一共生成16行数据(row.num=16),每行有两个字段name(string类型)和age(int类型)。最终的目标表为test_table,表中同样会有16行数据。运行此作业之前,你需要在mysql中创建数据库test和表test_table。如果你还没有安装和部署SeaTunnel,你需要按照安装SeaTunnel中的说明安装并部署SeaTunnel。然后按照快速开始使用SeaTunnel引擎中的说明运行此作业。
 
 
 
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

transform {}

sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
}

生成输出SQL任务示例

本示例无需编写复杂的sql语句,您可以配置输出端数据库名称表名称来自动为您生成添加语句
 
 
 
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
# 根据数据库表名自动生成sql语句
generate_sink_sql = true
database = test
table = test_table
}
}

精确任务示例

对于需要精确写入场景,我们保证精确一次。
 
 
 
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
}

CDC(变更数据捕获)事件

我们也支持CDC变更数据在这种情况下,您需要配置数据库,表和primary_keys。
 
 
 
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
generate_sink_sql = true
# You need to configure both database and table
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}
}

完整测试脚本配置文件

下面给出本示例中完整的配置文件示例
 
 
 
env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP"
driver = "oracle.jdbc.OracleDriver"
user = "appuser001"
password = "appuser001"
query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS"
}
}

transform {}

sink {
jdbc {
url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "appuser001"
password = "appuser001"
generate_sink_sql = "true"
database = "hive"
table = "met_com_icdoperation_ls"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}
}

将上述脚本中的数据库配置信息修改成你的数据连接配置, 然后将脚本覆盖到上图脚本输入中, 保存工作流, 上线之后启动工作流。
到对应数据库验证。
原来的Oracle数据库表
同步之后的MySQL数据库表
任务成功了, 数据也成功同步过来了, OK,测试通过!大家接下来可以在这个Demo的基础上进行更多的扩展和挖掘, 实战的多了, 你对于Dolphinscheduler和Seatunnel的架构和原理的理解就会越来越深入了,慢慢你也可以通过扩展源码来升级和拓展这些优秀开源框架的功能了。创作不易,如果我的文章对你有帮助欢迎点赞,收藏,送你一朵小红花~~~
原文链接:https://blog.csdn.net/qq_41865652/article/details/140971419

<🐬🐬 >

推荐阅读

用户实践案例
奇富科技  腾讯音乐 联通数科 拈花云科
蔚来汽车 长城汽车 集度 长安汽车
思科网讯 食行生鲜 联通医疗 联想
新网银行 唯品富邦消费金融  蜀海供应链 
自如 有赞 伊利 当贝大数据
珍岛集团 传智教育 Bigo
YY直播  作业帮 太美医疗
某新能源 中电信翼康
迁移实践
Azkaban   Ooize(当贝迁移案例)   
Airflow (有赞迁移案例) 
Air2phin(迁移工具)
Airflow迁移实践

新手入门
选择Apache DolphinScheduler的10个理由
Apache DolphinScheduler 3.1.8 保姆级教程【安装、介绍、项目运用、邮箱预警设置】轻松拿捏!
Apache DolphinScheduler 如何实现自动化打包+单机/集群部署?
DolphinScheduler快速上手:基于Docker Compose的安装与配置全攻略
Apache DolphinScheduler 在大数据环境中的应用与调优
Apache DolphinScheduler-3.2.0集群部署教程

< 🐬🐬 >
参与社区

参与Apache DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689

非新手问题列表:https://github.com/apache/dolphinscheduler/issues?
q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的!


球分享

球点赞

球在看

【声明】内容源于网络
0
0
海豚调度
Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
内容 667
粉丝 0
海豚调度 Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
总阅读167
粉丝0
内容667