大数跨境

DolphinScheduler - 1.3 系列核心表结构剖析

DolphinScheduler - 1.3 系列核心表结构剖析 海豚调度
2020-07-03
2
导读:Apache DolphinScheduler 是一个分布式去中心化,易扩展的可视化 DAG 工作流任务调度。近日,伯毅同学给社区贡献了工作流核心表结构的剖析文章,非常细致,喜欢的伙伴请转走


Apache DolphinScheduler 是一个分布式去中心化,易扩展的可视化 DAG 工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。


近日,伯毅同学给社区贡献了工作流核心表结构的剖析文章,非常细致,喜欢的伙伴请转走


1. 工作流总体存储结构

在 dolphinscheduler 库中创建的所有工作流定义(模板)都保存在 t_ds_process_definition 表中.

该数据库表结构如下表所示:

序号

字段

类型

描述

1

id

int(11)

主键

2

name

varchar(255)

流程定义名称

3

version

int(11)

流程定义版本

4

release_state

tinyint(4)

流程定义的发布状态:0 未上线 , 1已上线

5

project_id

int(11)

项目id

6

user_id

int(11)

流程定义所属用户id

7

process_definition_json

longtext

流程定义JSON

8

description

text

流程定义描述

9

global_params

text

全局参数

10

flag

tinyint(4)

流程是否可用:0 不可用,1 可用

11

locations

text

节点坐标信息

12

connects

text

节点连线信息

13

receivers

text

收件人

14

receivers_cc

text

抄送人

15

create_time

datetime

创建时间

16

timeout

int(11)

超时时间

17

tenant_id

int(11)

租户id

18

update_time

datetime

更新时间

19

modify_by

varchar(36)

修改用户

20

resource_ids

varchar(255)

资源ids

其中 process_definition_json 字段为核心字段, 定义了 DAG 图中的任务信息.该数据以JSON 的方式进行存储.

公共的数据结构如下表

序号

字段

类型

描述

1

globalParams

Array

全局参数

2

tasks

Array

流程中的任务集合 [ 各个类型的结构请参考如下章节]

3

tenantId

int

租户id

4

timeout

int

超时时间

数据示例:

{

    "globalParams":[

        {

            "prop":"golbal_bizdate",

            "direct":"IN",

            "type":"VARCHAR",

            "value":"${system.biz.date}"

        }

    ],

    "tasks":Array[1],

    "tenantId":0,

    "timeout":0

}


2. 各任务类型存储结构详解

2.1 Shell 节点

Shell 节点数据结构如下:

序号

参数名


类型

描述

描述

1

id


String

任务编码


2

type


String

类型

SHELL

3

name


String

名称


4

params


Object

自定义参数

Json 格式

5


rawScript

String

Shell脚本


6


localParams

Array

自定义参数


7


resourceList

Array

资源文件


8

description


String

描述


9

runFlag


String

运行标识


10

conditionResult


Object

条件分支


11


successNode

Array

成功跳转节点


12


failedNode

Array

失败跳转节点


13

dependence


Object

任务依赖

params互斥

14

maxRetryTimes


String

最大重试次数


15

retryInterval


String

重试间隔


16

timeout


Object

超时控制


17

taskInstancePriority


String

任务优先级


18

workerGroup


String

Worker 分组


19

preTasks


Array

前置任务


Shell 节点数据样例:

{

    "type":"SHELL",

    "id":"tasks-80760",

    "name":"Shell Task",

    "params":{

        "resourceList":[

            {

                "id":3,

                "name":"run.sh",

                "res":"run.sh"

            }

        ],

        "localParams":[

 

        ],

        "rawScript":"echo "This is a shell script""

    },

    "description":"",

    "runFlag":"NORMAL",

    "conditionResult":{

        "successNode":[

            ""

        ],

        "failedNode":[

            ""

        ]

    },

    "dependence":{

 

    },

    "maxRetryTimes":"0",

    "retryInterval":"1",

    "timeout":{

        "strategy":"",

        "interval":null,

        "enable":false

    },

    "taskInstancePriority":"MEDIUM",

    "workerGroup":"default",

    "preTasks":[

 

    ]

}


2.2 SQL节点

通过 SQL 对指定的数据源进行数据查询、更新操作.

SQL 节点数据结构如下:

序号

参数名


类型

描述

描述

1

id


String

任务编码


2

type


String

类型

SQL

3

name


String

名称


4

params


Object

自定义参数

Json 格式

5


type

String

数据库类型


6


datasource

Int

数据源id


7


sql

String

查询SQL语句


8


udfs

String

udf函数

UDF函数id,以逗号分隔.

9


sqlType

String

SQL节点类型

0 查询 , 1 非查询

10


title

String

邮件标题


11


receivers

String

收件人


12


receiversCc

String

抄送人


13


showType

String

邮件显示类型

TABLE 表格 , ATTACHMENT附件

14


connParams

String

连接参数


15


preStatements

Array

前置SQL


16


postStatements

Array

后置SQL


17


localParams

Array

自定义参数


18

description


String

描述


19

runFlag


String

运行标识


20

conditionResult


Object

条件分支


21


successNode

Array

成功跳转节点


22


failedNode

Array

失败跳转节点


23

dependence


Object

任务依赖

params互斥

24

maxRetryTimes


String

最大重试次数


25

retryInterval


String

重试间隔


26

timeout


Object

超时控制


27

taskInstancePriority


String

任务优先级


28

workerGroup


String

Worker 分组


29

preTasks


Array

前置任务


SQL 节点数据样例:

{

    "type":"SQL",

    "id":"tasks-95648",

    "name":"SqlTask-Query",

    "params":{

        "type":"MYSQL",

        "datasource":1,

        "sql":"select id , namge , age from emp where id =  ${id}",

        "udfs":"",

        "sqlType":"0",

        "title":"xxxx@xxx.com",

        "receivers":"xxxx@xxx.com",

        "receiversCc":"",

        "showType":"TABLE",

        "localParams":[

            {

                "prop":"id",

                "direct":"IN",

                "type":"INTEGER",

                "value":"1"

            }

        ],

        "connParams":"",

        "preStatements":[

            "insert into emp ( id,name ) value (1,'Li' )"

        ],

        "postStatements":[

 

        ]

    },

    "description":"",

    "runFlag":"NORMAL",

    "conditionResult":{

        "successNode":[

            ""

        ],

        "failedNode":[

            ""

        ]

    },

    "dependence":{

 

    },

    "maxRetryTimes":"0",

    "retryInterval":"1",

    "timeout":{

        "strategy":"",

        "interval":null,

        "enable":false

    },

    "taskInstancePriority":"MEDIUM",

    "workerGroup":"default",

    "preTasks":[

 

    ]

}


2.2 Spark 节点

Spark 节点数据结构如下:

序号

参数名


类型

描述

描述

1

id


String

任务编码


2

type


String

类型

SPARK

3

name


String

名称


4

params


Object

自定义参数

Json 格式

5


mainClass

String

运行主类


6


mainArgs

String

运行参数


7


others

String

其他参数


8


mainJar

Object

程序 jar


9


deployMode

String

部署模式

local,client,cluster

10


driverCores

String

driver核数


11


driverMemory

String

driver 内存数


12


numExecutors

String

executor数量


13


executorMemory

String

executor内存


14


executorCores

String

executor核数


15


programType

String

程序类型

JAVA,SCALA,PYTHON

16


sparkVersion

String

Spark 版本

SPARK1 , SPARK2

17


localParams

Array

自定义参数


18


resourceList

Array

资源文件


19

description


String

描述


20

runFlag


String

运行标识


21

conditionResult


Object

条件分支


22


successNode

Array

成功跳转节点


23


failedNode

Array

失败跳转节点


24

dependence


Object

任务依赖

params互斥

25

maxRetryTimes


String

最大重试次数


26

retryInterval


String

重试间隔


27

timeout


Object

超时控制


28

taskInstancePriority


String

任务优先级


29

workerGroup


String

Worker 分组


30

preTasks


Array

前置任务


Spark 节点数据样例:

{

    "type":"SPARK",

    "id":"tasks-87430",

    "name":"SparkTask",

    "params":{

        "mainClass":"org.apache.spark.examples.SparkPi",

        "mainJar":{

            "id":4

        },

        "deployMode":"cluster",

        "resourceList":[

            {

                "id":3,

                "name":"run.sh",

                "res":"run.sh"

            }

        ],

        "localParams":[

 

        ],

        "driverCores":1,

        "driverMemory":"512M",

        "numExecutors":2,

        "executorMemory":"2G",

        "executorCores":2,

        "mainArgs":"10",

        "others":"",

        "programType":"SCALA",

        "sparkVersion":"SPARK2"

    },

    "description":"",

    "runFlag":"NORMAL",

    "conditionResult":{

        "successNode":[

            ""

        ],

        "failedNode":[

            ""

        ]

    },

    "dependence":{

 

    },

    "maxRetryTimes":"0",

    "retryInterval":"1",

    "timeout":{

        "strategy":"",

        "interval":null,

        "enable":false

    },

    "taskInstancePriority":"MEDIUM",

    "workerGroup":"default",

    "preTasks":[

 

    ]

}


2.3 MapReduce(MR)节点

MapReduce(MR) 节点数据结构如下:

序号

参数名


类型

描述

描述

1

id


String

任务编码


2

type


String

类型

MR

3

name


String

名称


4

params


Object

自定义参数

Json 格式

5


mainClass

String

运行主类


6


mainArgs

String

运行参数


7


others

String

其他参数


8


mainJar

Object

程序 jar


9


programType

String

程序类型

JAVA,PYTHON

10


localParams

Array

自定义参数


11


resourceList

Array

资源文件


12

description


String

描述


13

runFlag


String

运行标识


14

conditionResult


Object

条件分支


15


successNode

Array

成功跳转节点


16


failedNode

Array

失败跳转节点


17

dependence


Object

任务依赖

params互斥

18

maxRetryTimes


String

最大重试次数


19

retryInterval


String

重试间隔


20

timeout


Object

超时控制


21

taskInstancePriority


String

任务优先级


22

workerGroup


String

Worker 分组


23

preTasks


Array

前置任务


MapReduce(MR) 节点数据样例:

{

    "type":"MR",

    "id":"tasks-28997",

    "name":"MRTask",

    "params":{

        "mainClass":"wordcount",

        "mainJar":{

            "id":5

        },

        "resourceList":[

            {

                "id":3,

                "name":"run.sh",

                "res":"run.sh"

            }

        ],

        "localParams":[

 

        ],

        "mainArgs":"/tmp/wordcount/input /tmp/wordcount/output/",

        "others":"",

        "programType":"JAVA"

    },

    "description":"",

    "runFlag":"NORMAL",

    "conditionResult":{

        "successNode":[

            ""

        ],

        "failedNode":[

            ""

        ]

    },

    "dependence":{

 

    },

    "maxRetryTimes":"0",

    "retryInterval":"1",

    "timeout":{

        "strategy":"",

        "interval":null,

        "enable":false

    },

    "taskInstancePriority":"MEDIUM",

    "workerGroup":"default",

    "preTasks":[

 

    ]

}


2.4 Python节点

Python 节点数据结构如下:

序号

参数名


类型

描述

描述

1

id


String

任务编码


2

type


String

类型

PYTHON

3

name


String

名称


4

params


Object

自定义参数

Json 格式

5


rawScript

String

Python脚本


6


localParams

Array

自定义参数


7


resourceList

Array

资源文件


8

description


String

描述


9

runFlag


String

运行标识


10

conditionResult


Object

条件分支


11


successNode

Array

成功跳转节点


12


failedNode

Array

失败跳转节点


13

dependence


Object

任务依赖

params互斥

14

maxRetryTimes


String

最大重试次数


15

retryInterval


String

重试间隔


16

timeout


Object

超时控制


17

taskInstancePriority


String

任务优先级


18

workerGroup


String

Worker 分组


19

preTasks


Array

前置任务


Python 节点数据样例:

{

    "type":"PYTHON",

    "id":"tasks-5463",

    "name":"Python Task",

    "params":{

        "resourceList":[

            {

                "id":3,

                "name":"run.sh",

                "res":"run.sh"

            }

        ],

        "localParams":[

 

        ],

        "rawScript":"print("This is a python script")"

    },

    "description":"",

    "runFlag":"NORMAL",

    "conditionResult":{

        "successNode":[

            ""

        ],

        "failedNode":[

            ""

        ]

    },

    "dependence":{

 

    },

    "maxRetryTimes":"0",

    "retryInterval":"1",

    "timeout":{

        "strategy":"",

        "interval":null,

        "enable":false

    },

    "taskInstancePriority":"MEDIUM",

    "workerGroup":"default",

    "preTasks":[

 

    ]

}


2.5 Flink 节点

Flink 节点数据结构如下:

序号

参数名


类型

描述

描述

1

id


String

任务编码


2

type


String

类型

FLINK

3

name


String

名称


4

params


Object

自定义参数

Json 格式

5


mainClass

String

运行主类


6


mainArgs

String

运行参数


7


others

String

其他参数


8


mainJar

Object

程序 jar


9


deployMode

String

部署模式

local,client,cluster

10


slot

String

slot数量


11


taskManager

String

taskManage数量


12


taskManagerMemory

String

taskManager内存数


13


jobManagerMemory

String

jobManager内存数


14


programType

String

程序类型

JAVA,SCALA,PYTHON

15


localParams

Array

自定义参数


16


resourceList

Array

资源文件


17

description


String

描述


18

runFlag


String

运行标识


19

conditionResult


Object

条件分支


20


successNode

Array

成功跳转节点


21


failedNode

Array

失败跳转节点


22

dependence


Object

任务依赖

params互斥

23

maxRetryTimes


String

最大重试次数


24

retryInterval


String

重试间隔


25

timeout


Object

超时控制


26

taskInstancePriority


String

任务优先级


27

workerGroup


String

Worker 分组


38

preTasks


Array

前置任务


Flink 节点数据样例:

{

    "type":"FLINK",

    "id":"tasks-17135",

    "name":"FlinkTask",

    "params":{

        "mainClass":"com.flink.demo",

        "mainJar":{

            "id":6

        },

        "deployMode":"cluster",

        "resourceList":[

            {

                "id":3,

                "name":"run.sh",

                "res":"run.sh"

            }

        ],

        "localParams":[

 

        ],

        "slot":1,

【声明】内容源于网络
0
0
海豚调度
Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
内容 667
粉丝 0
海豚调度 Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
总阅读167
粉丝0
内容667