点亮 ⭐️ Star · 照亮开源之路
https://github.com/apache/dolphinscheduler

-
1 DolphinScheduler的设计与策略 -
1.1 分布式设计 -
1.1.1 中心化 -
1.1.2 去中心化 -
1.2 DophinScheduler架构设计 -
1.3 容错问题 -
1.3.1 宕机容错 -
1.3.2 失败重试 -
1.4 远程日志访问 -
2 DolphinScheduler源码分析 -
2.1 工程模块介绍与配置文件 -
2.1.1 工程模块介绍 -
2.1.2 配置文件 -
2.2 Api主要任务操作接口 -
2.3 Quaterz架构与运行流程 -
2.3.1 概念与架构 -
2.3.2 初始化与执行流程 -
2.3.3 集群运转 -
2.4 Master启动与执行流程 -
2.4.1 概念与执行逻辑 -
2.4.2 集群与槽(slot) -
2.4.3 代码执行流程 -
2.5 Work启动与执行流程 -
2.5.1 概念与执行逻辑 -
2.5.2 代码执行流程 -
2.6 rpc交互 -
2.6.1 Master与Worker交互 -
2.6.2 其他服务与Master交互 -
2.7 负载均衡算法 -
2.7.1 加权随机 -
2.7.2 线性负载 -
2.7.3 平滑轮询 -
2.8 日志服务 -
2.9 报警 -
3 后记 -
3.1 Make friends -
3.2 参考文献
前言
1 DolphinScheduler的设计与策略
-
任务定义:各种类型的任务,是流程定义的关键组成,如sql,shell,spark,mr,python等; -
任务实例:任务的实例化,标识着具体的任务执行状态; -
流程定义:一组任务节点通过依赖关系建立的起来的有向无环图(DAG); -
流程实例:通过手动或者定时调度生成的流程实例; -
定时调度:系统采用Quartz 分布式调度器,并同时支持cron表达式可视化的生成;
分布式设计
分布式系统的架构设计基本分为中心化和去中心化两种,各有优劣,凭借各自的业务选择。
1.1.1 中心化

1.1.2 去中心化

-
PR链接:https://github.com/apache/dolphinscheduler/issues/10874 -
动态展示见链接:http://thesecretlivesofdata.com/
DophinScheduler架构设计

容错问题
1.3.1 宕机容错


Master容错流程图
Worker容错流程图
1.3.2 失败重试
-
任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次。 -
流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行。 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行。
-
一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点、MR节点、Spark节点、依赖节点等。 -
还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。
远程日志访问
-
将日志放到ES搜索引擎上; -
通过netty通信获取远程日志信息;
工程模块介绍与配置文件
2.1.1 工程模块介绍
-
dolphinscheduler-alert 告警模块,提供告警服务; -
dolphinscheduler-api web应用模块,提供 Rest Api 服务,供 UI 进行调用; -
dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类 dolphinscheduler-dao 提供数据库访问等操作; -
dolphinscheduler-remote 基于netty的客户端、服务端 ; -
dolphinscheduler-server 日志与心跳服务 ; -
dolphinscheduler-log-server LoggerServer 用于Rest Api通过RPC查看日志; -
dolphinscheduler-master MasterServer服务,主要负责 DAG 的切分和任务状态的监控 ; -
dolphinscheduler-worker WorkerServer服务,主要负责任务的提交、执行和任务状态的更新;
-
dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用 ; -
dolphinscheduler-ui 前端模块;
2.1.2 配置文件
dolphinscheduler-common common.properties
#本地工作目录,用于存放临时文件
data.basedir.path=/tmp/dolphinscheduler
#资源文件存储类型: HDFS,S3,NONE
resource.storage.type=NONE
#资源文件存储路径
resource.upload.path=/dolphinscheduler
#hadoop是否开启kerberos权限
hadoop.security.authentication.startup.state=false
#kerberos配置目录
java.security.krb5.conf.path=/opt/krb5.conf
#kerberos登录用户
login.user.keytab.username=hdfs-mycluster@ESZ.COM
#kerberos登录用户keytab
login.user.keytab.path=/opt/hdfs.headless.keytab
#kerberos过期时间,整数,单位为小时
kerberos.expire.time=2
# 如果存储类型为HDFS,需要配置拥有对应操作权限的用户
hdfs.root.user=hdfs
#请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
#yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
#如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
# datasource encryption enable
datasource.encryption.enable=false
# datasource encryption salt
datasource.encryption.salt=!@#$%^&*
# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar
#data-quality.error.output.path=/tmp/data-quality-error-data
# Network IP gets priority, default inner outer
# Whether hive SQL is executed in the same session
support.hive.oneSession=false
# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true
# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=
# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default
# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh
#是否处于开发模式
development.state=false
# rpc port
alert.rpc.port=50052
# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080
server:
port: 12345
servlet:
session:
timeout: 120m
context-path: /dolphinscheduler/
compression:
enabled: true
mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
jetty:
max-http-form-post-size: 5000000
spring:
application:
name: api-server
banner:
charset: UTF-8
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
servlet:
multipart:
max-file-size: 1024MB
max-request-size: 1024MB
messages:
basename: i18n/messages
datasource:
# driver-class-name: org.postgresql.Driver
# url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
password: root
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
quartz:
auto-startup: false
job-store-type: jdbc
jdbc:
initialize-schema: never
properties:
org.quartz.threadPool:threadPriority: 5
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.scheduler.instanceId: AUTO
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.acquireTriggersWithinLock: true
org.quartz.scheduler.instanceName: DolphinScheduler
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.jobStore.useProperties: false
org.quartz.threadPool.makeThreadsDaemons: true
org.quartz.threadPool.threadCount: 25
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.scheduler.makeSchedulerThreadDaemon: true
# org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.clusterCheckinInterval: 5000
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
audit:
enabled: false
metrics:
enabled: true
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: true
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0
# Override by profile
---
spring:
config:
activate:
on-profile: mysql
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
quartz:
properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
spring:
banner:
charset: UTF-8
application:
name: master-server
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
cache:
# default enable cache, you can disable by `type: none`
type: none
cache-names:
- tenant
- user
- processDefinition
- processTaskRelation
- taskDefinition
caffeine:
spec: maximumSize=100,expireAfterWrite=300s,recordStats
datasource:
#driver-class-name: org.postgresql.Driver
#url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
password:
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
quartz:
job-store-type: jdbc
jdbc:
initialize-schema: never
properties:
org.quartz.threadPool:threadPriority: 5
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.scheduler.instanceId: AUTO
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.acquireTriggersWithinLock: true
org.quartz.scheduler.instanceName: DolphinScheduler
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.jobStore.useProperties: false
org.quartz.threadPool.makeThreadsDaemons: true
org.quartz.threadPool.threadCount: 25
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.scheduler.makeSchedulerThreadDaemon: true
# org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.clusterCheckinInterval: 5000
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
master:
listen-port: 5678
# master fetch command num
fetch-command-num: 10
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight
# master heartbeat interval, the unit is second
heartbeat-interval: 10
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval, the unit is millisecond
task-commit-interval: 1000
state-wheel-interval: 5
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
# failover interval, the unit is minute
failover-interval: 10
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
server:
port: 5679
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
metrics:
enabled: true
# Override by profile
---
spring:
config:
activate:
on-profile: mysql
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
quartz:
properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
spring:
banner:
charset: UTF-8
application:
name: worker-server
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
datasource:
#driver-class-name: org.postgresql.Driver
#url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
#password: root
password:
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
worker:
# worker listener port
listen-port: 1234
# worker execute thread number to limit task instances in parallel
exec-threads: 100
# worker heartbeat interval, the unit is second
heartbeat-interval: 10
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker tenant auto create
tenant-auto-create: true
# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
reserved-memory: 0.3
# default worker groups separated by comma, like 'worker.groups=default,test'
groups:
- default
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052
server:
port: 1235
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
metrics:
enabled: true
API主要任务操作接口
public Map<String, Object> setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
}
// check sub process definition release state
List<Long> subProcessDefineCodes = new ArrayList<>();
processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
if (!subProcessDefineCodes.isEmpty()) {
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
return result;
}
}
}
}
}
// check master server exists
List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
}
// set status
scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try {
switch (scheduleStatus) {
case ONLINE:
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), scheduleObj);
break;
case OFFLINE:
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
default:
putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
return result;
}
} catch (Exception e) {
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
throw new ServiceException(result.get(Constants.MSG).toString(), e);
}
putMsg(result, Status.SUCCESS);
return result;
}
public void setSchedule(int projectId, Schedule schedule) {
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());
quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
}
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {
String jobName = this.buildJobName(schedule.getId());
String jobGroupName = this.buildJobGroupName(projectId);
Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);
String cronExpression = schedule.getCrontab();
String timezoneId = schedule.getTimezoneId();
/**
* transform from server default timezone to schedule timezone
* e.g. server default timezone is `UTC`
* user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
* api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
* so when add job to quartz, it should recover by transform timezone
*/
Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
lock.writeLock().lock();
try {
JobKey jobKey = new JobKey(jobName, jobGroupName);
JobDetail jobDetail;
//add a task (if this task already exists, return this task directly)
if (scheduler.checkExists(jobKey)) {
jobDetail = scheduler.getJobDetail(jobKey);
jobDetail.getJobDataMap().putAll(jobDataMap);
} else {
jobDetail = newJob(clazz).withIdentity(jobKey).build();
jobDetail.getJobDataMap().putAll(jobDataMap);
scheduler.addJob(jobDetail, false, true);
logger.info("Add job, job name: {}, group name: {}",
jobName, jobGroupName);
}
TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
/*
* Instructs the Scheduler that upon a mis-fire
* situation, the CronTrigger wants to have it's
* next-fire-time updated to the next time in the schedule after the
* current time (taking into account any associated Calendar),
* but it does not want to be fired now.
*/
CronTrigger cronTrigger = newTrigger()
.withIdentity(triggerKey)
.startAt(startDate)
.endAt(endDate)
.withSchedule(
cronSchedule(cronExpression)
.withMisfireHandlingInstructionDoNothing()
.inTimeZone(DateUtils.getTimezone(timezoneId))
)
.forJob(jobDetail).build();
if (scheduler.checkExists(triggerKey)) {
// updateProcessInstance scheduler trigger when scheduler cycle changes
CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
String oldCronExpression = oldCronTrigger.getCronExpression();
if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
// reschedule job trigger
scheduler.rescheduleJob(triggerKey, cronTrigger);
logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate);
}
} else {
scheduler.scheduleJob(cronTrigger);
logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate);
}
} catch (Exception e) {
throw new ServiceException("add job failed", e);
} finally {
lock.writeLock().unlock();
}
}
Quaterz架构与运行流程
2.3.1 概念与架构
-
SchedulerFactory:任务调度工厂,主要负责管理任务调度器; -
Scheduler :任务调度器,主要负责任务调度,以及操作任务的相关接口; -
Job :任务接口,实现类包含具体任务业务代码; -
JobDetail:用于定义作业的实例; -
Trigger:任务触发器,主要存放 Job 执行的时间策略。例如多久执行一次,什么时候执行,以什么频率执行等等; -
JobBuilder :用于定义/构建 JobDetail 实例,用于定义作业的实例。 -
TriggerBuilder :用于定义/构建触发器实例; -
Calendar:Trigger 扩展对象,可以排除或者包含某个指定的时间点(如排除法定节假日); -
JobStore:存储作业和任务调度期间的状态Scheduler的生命期,从 SchedulerFactory 创建它时开始,到 Scheduler 调用Shutdown() 方法时结束;
2.3.2 初始化与执行流程


DolphinScheduler的业务类是ProcessScheduleJob,主要功能就是根据调度信息往commond表中写数据。
2.3.3 集群运转
-
当Quartz采用集群形式部署的时候,存储介质不能使用内存的形式,也就是不能使用JobStoreRAM。 -
Quartz集群对于对于需要被调度的Triggers实例的扫描是使用数据库锁TRIGGER_ACCESS来完成的,保障此扫描过程只能被一个Quartz实例获取到。代码如下:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
-
集群失败实例恢复需要注意的是各个实例恢复各自实例对应的异常实例,因为数据库有调度容器的instanceId信息。代码如下:
protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
throws JobPersistenceException {
if (failedInstances.size() > 0) {
long recoverIds = System.currentTimeMillis();
logWarnIfNonZero(failedInstances.size(),
"ClusterManager: detected " + failedInstances.size()
+ " failed or restarted instances.");
try {
for (SchedulerStateRecord rec : failedInstances) {
getLog().info(
"ClusterManager: Scanning for instance \""
+ rec.getSchedulerInstanceId()
+ "\"'s failed in-progress jobs.");
List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
.selectInstancesFiredTriggerRecords(conn,
rec.getSchedulerInstanceId());
int acquiredCount = 0;
int recoveredCount = 0;
int otherCount = 0;
Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
for (FiredTriggerRecord ftRec : firedTriggerRecs) {
TriggerKey tKey = ftRec.getTriggerKey();
JobKey jKey = ftRec.getJobKey();
triggerKeys.add(tKey);
// release blocked triggers..
if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
} else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
// release acquired triggers..
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
getDelegate().updateTriggerStateFromOtherState(
conn, tKey, STATE_WAITING,
STATE_ACQUIRED);
acquiredCount++;
} else if (ftRec.isJobRequestsRecovery()) {
// handle jobs marked for recovery that were not fully
// executed..
if (jobExists(conn, jKey)) {
@SuppressWarnings("deprecation")
SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
"recover_"
+ rec.getSchedulerInstanceId()
+ "_"
+ String.valueOf(recoverIds++),
Scheduler.DEFAULT_RECOVERY_GROUP,
new Date(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobName(jKey.getName());
rcvryTrig.setJobGroup(jKey.getGroup());
rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
rcvryTrig.setPriority(ftRec.getPriority());
JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobDataMap(jd);
rcvryTrig.computeFirstFireTime(null);
storeTrigger(conn, rcvryTrig, null, false,
STATE_WAITING, false, true);
recoveredCount++;
} else {
getLog()
.warn(
"ClusterManager: failed job '"
+ jKey
+ "' no longer exists, cannot schedule recovery.");
otherCount++;
}
} else {
otherCount++;
}
// free up stateful job's triggers
if (ftRec.isJobDisallowsConcurrentExecution()) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
}
getDelegate().deleteFiredTriggers(conn,
rec.getSchedulerInstanceId());
// Check if any of the fired triggers we just deleted were the last fired trigger
// records of a COMPLETE trigger.
int completeCount = 0;
for (TriggerKey triggerKey : triggerKeys) {
if (getDelegate().selectTriggerState(conn, triggerKey).
equals(STATE_COMPLETE)) {
List<FiredTriggerRecord> firedTriggers =
getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
if (firedTriggers.isEmpty()) {
if (removeTrigger(conn, triggerKey)) {
completeCount++;
}
}
}
}
logWarnIfNonZero(acquiredCount,
"ClusterManager: ......Freed " + acquiredCount
+ " acquired trigger(s).");
logWarnIfNonZero(completeCount,
"ClusterManager: ......Deleted " + completeCount
+ " complete triggers(s).");
logWarnIfNonZero(recoveredCount,
"ClusterManager: ......Scheduled " + recoveredCount
+ " recoverable job(s) for recovery.");
logWarnIfNonZero(otherCount,
"ClusterManager: ......Cleaned-up " + otherCount
+ " other failed job(s).");
if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
getDelegate().deleteSchedulerState(conn,
rec.getSchedulerInstanceId());
}
}
} catch (Throwable e) {
throw new JobPersistenceException("Failure recovering jobs: "
+ e.getMessage(), e);
}
}
}
Master启动与执行流程
2.4.1 概念与执行逻辑
-
Scheduler(任务调度容器,一般都是StdScheduler实例)。 -
ProcessScheduleJob:(实现Quarts调度框架的Job接口的业务类,专门生成DolphinScheduler数据库业务表t_ds_commond数据);
-
NettyRemotingServer(netty服务端,包含netty服务端serverBootstrap对象与netty服务端业务处理对象serverHandler), NettyServerHandler:(netty服务端业务处理类:包含各类处理器以及处理器对应的执行线程池); -
TaskPluginManager(任务插件管理器,不同类型的任务以插件的形式管理,在应用服务启动的时候,通过@AutoService加载实现了TaskChannelFactory接口的工厂信息到数据库,通过工厂对象来加载各类TaskChannel实现类到缓存); -
MasterRegistryClient(master操作zk的客户端,封装了master对于zk的所有操作,注册,查询,删除等); -
MasterSchedulerService(扫描服务,包含业务执行线程和work包含的nettyhe护短,负责任务调度业务,slot来控制集群模式下任务不被重复调度,底层实现是zookeeper分布式锁); -
WorkflowExecuteThread(真正的业务处理线程,通过插槽获取命令commond,执行之前会校验slot的变化,如果变化不执行,关键功能就是构建任务相关的参数,定义,优先级等,然后发送到队列,供队列处理线程消费); -
CommonTaskProcessor(普通任务处理器,实现ITaskProcessor接口,根据业务分为普通,依赖,子任务,阻塞,条件任务类型,包含了任务的提交,运行,分发,杀死等业务,通过@AutoService加载的类,根本就是封装了对); -
TaskPriorityQueueImpl(任务队列,负责任务队列的存储控制); -
TaskPriorityQueueConsumer(任务队列消费线程,负责任务的根据负载均衡策略在worker之间分发与执行); -
ServerNodeManager (节点信息控制器,负责节点注册信息更新与槽位(slot)变更,底层实现是zookeeper分布式锁的应用); -
EventExecuteService(事件处理线程,通过缓存起来的任务处理线程,处理每个任务在处理过程中注册在线程事件队列中的事件); -
FailoverExecuteThread(故障转移线程,包含Master和worker的); -
MasterRegistryDataListener(托管在zk管理框架cautor的故障监听器,负责对worker和master注册在zk上的节点的新增和删除)。
private void failoverMasterWithLock(String masterHost) {
String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
try {
registryClient.getLock(failoverPath);
this.failoverMaster(masterHost);
} catch (Exception e) {
LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
/**
* failover master
* <p>
* failover process instance and associated task instance
*故障转移流程实例和关联的任务实例
* @param masterHost master host
*/
private void failoverMaster(String masterHost) {
if (StringUtils.isEmpty(masterHost)) {
return;
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
if (Constants.NULL.equals(processInstance.getHost())) {
continue;
}
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance, workerServers);
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
LOGGER.info("failover process instance id: {}", processInstance.getId());
//updateProcessInstance host is null and insert into command
processInstance.setHost(Constants.NULL);
processService.processNeedFailoverProcessInstances(processInstance);
}
LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
}
2.4.2 集群与槽(slot)
private void updateMasterNodes() {
MASTER_SLOT = 0;
MASTER_SIZE = 0;
this.masterNodes.clear();
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
try {
registryClient.getLock(nodeLock);
Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
syncMasterNodes(currentNodes, masterNodes);
} catch (Exception e) {
logger.error("update master nodes error", e);
} finally {
registryClient.releaseLock(nodeLock);
}
}
/**
* sync master nodes
*
* @param nodes master nodes
*/
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
this.masterNodes.addAll(nodes);
this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes);
int index = masterPriorityQueue.getIndex(addr);
if (index >= 0) {
MASTER_SIZE = nodes.size();
MASTER_SLOT = index;
} else {
logger.warn("current addr:{} is not in active master list", addr);
}
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
} finally {
masterLock.unlock();
}
}
/**
* 1. get command by slot
* 2. donot handle command if slot is empty
*/
/** * 1. 通过插槽获取命令 * 2. 如果插槽为空,则不处理命令 */
private void scheduleProcess() throws Exception {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
return;
}
for (ProcessInstance processInstance : processInstances) {
if (processInstance == null) {
continue;
}
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance
, processService
, nettyExecutorManager
, processAlertManager
, masterConfig
, stateWheelExecuteThread);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
}
}
private List<Command> findCommands() {
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
List<Command> result = new ArrayList<>();
if (Stopper.isRunning()) {
int thisMasterSlot = ServerNodeManager.getSlot();
int masterCount = ServerNodeManager.getMasterSize();
if (masterCount > 0) {
result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
}
}
return result;
}
@Override
public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
if (masterCount <= 0) {
return Lists.newArrayList();
}
return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
}
<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
where id % #{masterCount} = #{thisMasterSlot}
order by process_instance_priority, id asc
limit #{limit} offset #{offset}
</select>
##槽位检查
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
masterPrepareExecService.execute(() -> {
try {
// slot check again
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
return;
}
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
command);
if (processInstance != null) {
processInstances.add(processInstance);
logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
}
} catch (Exception e) {
logger.error("handle command error ", e);
processService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
}
});
}
try {
// make sure to finish handling command each time before next scan
latch.await();
} catch (InterruptedException e) {
logger.error("countDownLatch await error ", e);
}
return processInstances;
}
private SlotCheckState slotCheck(Command command) {
int slot = ServerNodeManager.getSlot();
int masterSize = ServerNodeManager.getMasterSize();
SlotCheckState state;
if (masterSize <= 0) {
state = SlotCheckState.CHANGE;
} else if (command.getId() % masterSize == slot) {
state = SlotCheckState.PASS;
} else {
state = SlotCheckState.INJECT;
}
return state;
}
2.4.3 代码执行流程

Worker启动与执行流程
2.5.1 概念与执行逻辑
-
NettyRemotingServer(worker包含的netty服务端) WorkerRegistryClient(zk客户端,封装了worker与zk相关的操作,注册,查询,删除等) ; -
TaskPluginManager(任务插件管理器,封装了插件加载逻辑和任务实际执行业务的抽象) ; -
WorkerManagerThread(任务工作线程生成器,消费netty处理器推进队列的任务信息,并生成任务执行线程提交线程池管理) ; -
TaskExecuteProcessor(Netty任务执行处理器,生成master分发到work的任务信息,并推送到队列) ; -
TaskExecuteThread(任务执行线程) ; -
TaskCallbackService(任务回调线程,与master包含的netty client通信); -
AbstractTask(任务实际业务的抽象类,子类包含实际的任务执行业务,SqlTask,DataXTask等) ; -
RetryReportTaskStatusThread(不关注)
2.5.2 代码执行流程

RPC交互

2.6.1 Master与Worker交互
Master部分具体代码如下:
Master启动的时候会初始化Nettyserver,注册对应的请求处理器到NettyHandler并启动:
@PostConstruct
public void run() throws SchedulerException {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
// self tolerant
this.masterRegistryClient.init();
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
this.masterSchedulerService.init();
this.masterSchedulerService.start();
this.eventExecuteService.start();
this.failoverExecuteThread.start();
this.scheduler.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
}
}));
}
/**
* server start
*/
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
initNettyChannel(ch);
}
});
ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
} else {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
}
}
/**
* constructor
*/
public NettyExecutorManager() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
##注册处理worker回调的处理器
@PostConstruct
public void init() {
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
}
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
if (NettyUtils.useEpoll()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
} else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
}
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
this.start();
}
/**
* start
*/
private void start() {
this.bootstrap
.group(this.workerGroup)
.channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
.addLast(new NettyDecoder(), clientHandler, encoder);
}
});
this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
isStarted.compareAndSet(false, true);
}
/**
* task dispatch
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
/**
* get executor manager
*/
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
/**
* host select
*/
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
+ "current task needs worker group %s to execute",
context.getCommand(),context.getWorkerGroup()));
}
context.setHost(host);
executorManager.beforeExecute(context);
try {
/**
* task execute
*/
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
/**
* all nodes
*/
Set<String> allNodes = getAllNodes(context);
/**
* fail nodes
*/
Set<String> failNodeSet = new HashSet<>();
/**
* build command accord executeContext
*/
Command command = context.getCommand();
/**
* execute task host
*/
Host host = context.getHost();
boolean success = false;
while (!success) {
try {
doExecute(host, command);
success = true;
context.setHost(host);
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error", command), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
logger.error("retry execute command : {} host : {}", command, host);
} else {
throw new ExecuteException("fail after try all nodes");
}
} catch (Throwable t) {
throw new ExecuteException("fail after try all nodes");
}
}
}
return success;
}
/**
* execute logic
*
* @param host host
* @param command command
* @throws ExecuteException if error throws ExecuteException
*/
public void doExecute(final Host host, final Command command) throws ExecuteException {
/**
* retry count,default retry 3
*/
int retryCount = 3;
boolean success = false;
do {
try {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
retryCount--;
ThreadUtils.sleep(100);
}
} while (retryCount >= 0 && !success);
if (!success) {
throw new ExecuteException(String.format("send command : %s to %s error", command, host));
}
}
/**
* send task
*
* @param host host
* @param command command
*/
public void send(final Host host, final Command command) throws RemotingException {
Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
try {
ChannelFuture future = channel.writeAndFlush(command).await();
if (future.isSuccess()) {
logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
} else {
String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
logger.error(msg, future.cause());
throw new RemotingException(msg);
}
} catch (Exception e) {
logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
}
}
同理Woker在启动的时候会初始化NettyServer,注册对应处理器并启动:
/**
* worker server run
*/
@PostConstruct
public void run() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
// worker registry
try {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// task execute manager
this.workerManagerThread.start();
// retry report task status
this.retryReportTaskStatusThread.start();
/*
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
}
}));
}
public TaskCallbackService() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
}
/**
* send result
*
* @param taskInstanceId taskInstanceId
* @param command command
*/
public void send(int taskInstanceId, Command command) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// remove(taskInstanceId);
return;
}
}
});
}
}
2.6.2 其他服务与Master交互
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return loggerService.queryLog(taskInstanceId, skipNum, limit);
}
* view log
*
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
@Override
@SuppressWarnings("unchecked")
public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
if (StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance,skipLineNum,limit);
result.setData(log);
return result;
}
* query log
*
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
Host host = Host.of(taskInstance.getHost());
logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
host.getPort());
StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
String head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR);
log.append(head);
}
log.append(logClient
.rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));
return log.toString();
}
* roll view log
*
* @param host host
* @param port port
* @param path path
* @param skipLineNum skip line number
* @param limit limit
* @return log content
*/
public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
}
} catch (Exception e) {
logger.error("roll view log error", e);
} finally {
this.client.closeChannel(address);
}
return result;
}
* sync send
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
*/
public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
final long opaque = command.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
channel.writeAndFlush(command).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to host {} failed", command, host);
});
/*
* sync wait for result
*/
Command result = responseFuture.waitResponse();
if (result == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
}
}
return result;
}
* construct client
*/
public LogClientService() {
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
}
负载均衡算法
public HostManager hostManager() {
HostSelector selector = masterConfig.getHostSelector();
HostManager hostManager;
switch (selector) {
case RANDOM:
hostManager = new RandomHostManager();
break;
case ROUND_ROBIN:
hostManager = new RoundRobinHostManager();
break;
case LOWER_WEIGHT:
hostManager = new LowerWeightHostManager();
break;
default:
throw new IllegalArgumentException("unSupport selector " + selector);
}
beanFactory.autowireBean(hostManager);
return hostManager;
}
2.7.1 加权随机
public HostWorker doSelect(final Collection<HostWorker> source) {
List<HostWorker> hosts = new ArrayList<>(source);
int size = hosts.size();
int[] weights = new int[size];
int totalWeight = 0;
int index = 0;
for (HostWorker host : hosts) {
totalWeight += host.getHostWeight();
weights[index] = host.getHostWeight();
index++;
}
if (totalWeight > 0) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < size; i++) {
offset -= weights[i];
if (offset < 0) {
return hosts.get(i);
}
}
}
return hosts.get(ThreadLocalRandom.current().nextInt(size));
}
2.7.2 线性负载
double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
// If the warm-up is not over, add the weight
return calculatedWeight * Constants.WARM_UP_TIME / uptime;
}
return calculatedWeight;
}
* select
*
* @param sources sources
* @return HostWeight
*/
@Override
public HostWeight doSelect(Collection<HostWeight> sources) {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
lowerNode = hostWeight;
lowWeight = hostWeight.getCurrentWeight();
}
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
2.7.3 平滑轮询
public HostWorker doSelect(Collection<HostWorker> source) {
List<HostWorker> hosts = new ArrayList<>(source);
String key = hosts.get(0).getWorkerGroup();
ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
if (map == null) {
workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
map = workGroupWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
HostWorker selectedHost = null;
WeightedRoundRobin selectWeightRoundRobin = null;
for (HostWorker host : hosts) {
String workGroupHost = host.getWorkerGroup() + host.getAddress();
WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
int weight = host.getHostWeight();
if (weight < 0) {
weight = 0;
}
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// set weight
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(workGroupHost, weightedRoundRobin);
weightedRoundRobin = map.get(workGroupHost);
}
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedHost = host;
selectWeightRoundRobin = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
workGroupWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedHost != null) {
selectWeightRoundRobin.sel(totalWeight);
return selectedHost;
}
return hosts.get(0);
}
日志服务
报警
3 后记
Make friends
参考文献
-
https://dolphinscheduler.apache.org/zh-cn/development/architecture-design.html; -
https://juejin.cn/post/6844903729406148622; -
https://www.w3cschool.cn/quartz_doc/quartz_doc-1xbu2clr.html.
-
将遇到的问题通过 GitHub 上 issue 的形式反馈出来。 -
回答别人遇到的 issue 问题。 -
帮助完善文档。 -
帮助项目增加测试用例。 -
为代码添加注释。 -
提交修复 Bug 或者 Feature 的 PR。 -
发表应用案例实践、调度流程分析或者与调度相关的技术文章。 -
帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等。
-
比如添加代码注释或找到带有 ”easy to fix” 标记或一些非常简单的 issue(拼写错误等) 等等,先通过第一个简单的 PR 熟悉提交流程。
参与贡献
随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。
参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:
贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。
社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689
非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22
如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html
来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。
参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。
添加小助手微信时请说明想参与贡献。
来吧,开源社区非常期待您的参与。

2022 年 9 月 17 日,Apache DolphinScheduler 和 Apache RocketMQ 社区将强强联合,邀请来自互联网企业大数据专家和社区核心开发者,带来一场大数据开发技术以及生产场景实践的精彩分享。
报名通道已开启,马上预约吧!

扫码预约

扫码加入海豚调度交流群

添加小助手
点击阅读原文,免费预约直播!


