大数跨境

Apache DolphinScheduler 任务调度3.1.0版本源码剖析

Apache DolphinScheduler 任务调度3.1.0版本源码剖析 海豚调度
2022-12-21
0
导读:超详细讲解Apache DolphinScheduler源码

目录(本文章基于3.1.0版本研究)

1、数据库表介绍

2、 整体流程运行

3、 源码剖析

3.1 apiserver任务执行入口

3.2 master调度任务

  • 3.2.1 master启动
  • 3.2.2 command扫描
  • 3.2.3 workerFlowEvent消费
  • 3.2.4 workerflow事件处理逻辑
  • 3.2.5 workerflowRunnable运行逻辑
  • 3.2.6 任务消费
  • 3.2.7 任务分派

3.3. worker执行任务

  • 3.3.1 Worker启动

  • 3.3.2 Worker启动command

  • 3.3.3 workerManager消费

  • 3.3.4 任务运行

3.4 master接受任务反馈

  • 3.4.1 master接受反馈消息

  • 3.4.2 taskEventService处理taskevent

  • 3.4.3 TaskResultEventHandler处理taskevent

3.5. master闭环提交下游任务

  • 3.5.1 EventExecuteService处理stateEvent

  • 3.5.2 workflowExecuteThread处理stateEvent事件

  • 3.5.3 TaskStateEventHandler处理stateEvent事件

  • 3.5.4 wokerflowExecuteThread调度下游任务

4、总结

4.1 各个组件的作用
4.2 线程的作用


01

数据库表介绍

t_ds_process_definition:工作流定义表

用于保存工作流,一个工作流一条数据;

 

t_ds_process_instance:工作流运行实例表,工作流根据crontab每调度一次则生成一条数据;

 

t_ds_task_definition:任务定义表

画布中节点信息表,多少个节点就有多少条数据;

 

t_ds_process_task_relation:任务关系表

任务与任务之间连了线,则会有一条数据;

 

t_ds_task_instance:task运行实例表

task每运行一次会生成一条数据;

 

t_ds_command:发起任务工作流运行,向apiserver发送http请求,然后接口往该表输出要运行工作流的信息;

02

整体流程运行


  • 用户点击WEB界面的启动工作流按钮

  • apiserver 封装 commnd 到 db(往 t_ds_command 表中插入一条数据)。

  • master 扫描到 commad,进行 dga 构建,初始化,将源头 task 提交到 priority 队列中。

  • taskConsumer 消费队列数据得到 task,选择一台 worker 分配任务。

  • worker 接收到分配任务的消息启动任务。

  • worker 返回结果给 master,master 更新任务信息到 db 。

03

DolphinScheduler源码剖析


3.1 apiserver任务执行入口

当用户在前端点击执行任务,则会向海豚调度的接口发送请求,最终由 ExecutorController 的 startProcessInstance 方法来处理请求。


ExecutorController.startProcessInstance() 方法。


最终会往 mysql 表 t_ds_command 插入一条数据,将要运行的工作流信息写入该表。


@PostMapping(value = "start-process-instance")@ResponseStatus(HttpStatus.OK)@ApiException(START_PROCESS_INSTANCE_ERROR)@AccessLogAnnotation(ignoreRequestArgs = "loginUser")public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,                                   @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,                                   @RequestParam(value = "processDefinitionCode") long processDefinitionCode,                                   @RequestParam(value = "scheduleTime") String scheduleTime,                                   @RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,                                   @RequestParam(value = "startNodeList", required = false) String startNodeList,                                   @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,                                   @RequestParam(value = "execType", required = false) CommandType execType,                                   @RequestParam(value = "warningType") WarningType warningType,                                   @RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId,                                   @RequestParam(value = "runMode", required = false) RunMode runMode,                                   @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,                                   @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,                                   @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,                                   @RequestParam(value = "timeout", required = false) Integer timeout,                                   @RequestParam(value = "startParams", required = false) String startParams,                                   @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,                                   @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,                                   @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; } Map<String, String> startParamMap = null; if (startParams != null) { startParamMap = JSONUtils.toMap(startParams); }
if (complementDependentMode == null) { complementDependentMode = ComplementDependentMode.OFF_MODE; } //生成commnd信息入库 Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, complementDependentMode); return returnDataList(result);}


3.2 master 调度任务


3.2.1  master启动

DS

MasterServer.run() 方法

启动 master 的工作线程

public void run() throws SchedulerException {    // init rpc server    this.masterRPCServer.start();//启动netty rpc服务,与worker通信使用
// install task plugin this.taskPluginManager.loadPlugin();//加载taskplugin
// self tolerant this.masterRegistryClient.init();//加载高可用的一些注册信息 this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); //command扫描线程 this.masterSchedulerBootstrap.init(); this.masterSchedulerBootstrap.start(); //事件处理线程 this.eventExecuteService.start(); this.failoverExecuteThread.start(); //定时调度 this.schedulerApi.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { close("MasterServer shutdownHook"); } }));}


3.2.2 command扫描

DS

MasterSchedulerBootstrap.run()方法


该线程在3.2.1启动,启动之后,进入循环,一直扫描 command 表,查询出 command,然后封装成 processInstants 入库,创建 WorkflowExecuteRunnable (此对象后续很多地方用到) 写入到 workflowEventQueue 中。

public void run() {    while (Stopper.isRunning()) {        try {            // todo: if the workflow event queue is much, we need to handle the back pressure            boolean isOverload =                    OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());            if (isOverload) {                MasterServerMetrics.incMasterOverload();                Thread.sleep(Constants.SLEEP_TIME_MILLIS);                continue;            }            List<Command> commands = findCommands();            if (CollectionUtils.isEmpty(commands)) {                // indicate that no command ,sleep for 1s                Thread.sleep(Constants.SLEEP_TIME_MILLIS);                continue;            }            //将command转换成processInstance,并入库            List<ProcessInstance> processInstances = command2ProcessInstance(commands);            if (CollectionUtils.isEmpty(processInstances)) {                // indicate that the command transform to processInstance error, sleep for 1s                Thread.sleep(Constants.SLEEP_TIME_MILLIS);                continue;            }            MasterServerMetrics.incMasterConsumeCommand(commands.size());
processInstances.forEach(processInstance -> { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (processInstanceExecCacheManager.contains(processInstance.getId())) { logger.error("The workflow instance is already been cached, this case shouldn't be happened"); } WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, processService, nettyExecutorManager, processAlertManager, masterConfig, stateWheelExecuteThread, curingGlobalParamsService); processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);//processInstanceExecCacheManager设置进cache 被 workflowEventLoop获取 workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }); } catch (InterruptedException interruptedException) { logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException); Thread.currentThread().interrupt(); break; } catch (Exception e) { logger.error("Master schedule workflow error", e); // sleep for 1s here to avoid the database down cause the exception boom ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } }}


3.2.3 workerFlowEvent消费

DS


在 command 扫描线程中启动了 workflowEventLooper 线程用于消费 workerFlowEvent 。

MasterSchedulerBootstrap.start() 方法



@Overridepublic synchronized void start() { logger.info("Master schedule bootstrap starting.."); super.start(); workflowEventLooper.start();//工作流调度线程启动 logger.info("Master schedule bootstrap started...");}

从workflowEventQueue 拉取 workflowevent 事件,调用 workflowEventHandler 处理该事件。


WorkflowEventLooper.run()方法

public void run() {    WorkflowEvent workflowEvent = null;    while (Stopper.isRunning()) {        try {            workflowEvent = workflowEventQueue.poolEvent();//拉取workflowevent            LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());            logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);            WorkflowEventHandler workflowEventHandler =                workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());//获取workflowevent,处理workflowevent事件            workflowEventHandler.handleWorkflowEvent(workflowEvent);        } catch (InterruptedException e) {            logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);            Thread.currentThread().interrupt();            break;        } catch (WorkflowEventHandleException workflowEventHandleException) {            logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",                workflowEvent, workflowEventHandleException);            workflowEventQueue.addEvent(workflowEvent);            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);        } catch (WorkflowEventHandleError workflowEventHandleError) {            logger.error("Handle workflow event error, will drop this event, event: {}",                         workflowEvent,                         workflowEventHandleError);        } catch (Exception unknownException) {            logger.error(                "Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",                workflowEvent, unknownException);            workflowEventQueue.addEvent(workflowEvent);            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);        } finally {            LoggerUtils.removeWorkflowInstanceIdMDC();        }    }}

3.2.4 workerflow事件处理逻辑

DS

因为是START_WORKFLOW类型的所以获取到 WorkflowStartEventHandler.handleWorkflowEvent() 来处理该事件。


该方法中,获取 WorkflowExecuteRunnable ,运行异步任务调用 call 方法。

@Overridepublic void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError {    logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);//获取WorkflowExecuteRunnable     WorkflowExecuteRunnable workflowExecuteRunnable =       processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());    if (workflowExecuteRunnable == null) {        throw new WorkflowEventHandleError(            "The workflow start event is invalid, cannot find the workflow instance from cache");    }    ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstanceMetrics.incProcessInstanceSubmit(); //异步调用call方法执行workflowExecute运行逻辑。 CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool); workflowSubmitFuture.thenAccept(workflowSubmitStatue -> { if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { // submit failed will resend the event to workflow event queue logger.info("Success submit the workflow instance");//监听返回状态是否成功 if (processInstance.getTimeout() > 0) {//是否超时 stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } } else {//出现异常,重试,重新进入队列,调用call方法 logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}", workflowEvent); workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); } });}


3.2.5 workerflowRunnable运行逻辑

DS

WorkflowExecuteRunnable.call()


  • 初始化workerflow的有向无环图。

  • 初始化任务调度配置

  • 提交源头任务到任务优先级队列中。

@Overridepublic WorkflowSubmitStatue call() {    if (isStart()) {        // This case should not been happened        logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());        return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;    }
try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) { buildFlowDag();//创建dag有向无环图 workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) { initTaskQueue();//初始化任务调度配置 workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) { submitPostNode(null);//提交任务到队列中,注意是先提交源头结点,源头结点运行万再提交源头结点的下有节点 workflowRunnableStatus = WorkflowRunnableStatus.STARTED; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } return WorkflowSubmitStatue.SUCCESS; } catch (Exception e) { logger.error("Start workflow error", e); return WorkflowSubmitStatue.FAILED; } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); }}

3.2.6 任务消费

DS

TaskPriorityQueueConsumer.run方法,该线程通过注解启动。

//通过注解启动@PostConstructpublic void init() {    this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());    logger.info("Task priority queue consume thread staring");    super.start();    logger.info("Task priority queue consume thread started");}
@Overridepublic void run() { int fetchTaskNum = masterConfig.getDispatchTaskNumber(); while (Stopper.isRunning()) { try { List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
if (CollectionUtils.isNotEmpty(failedDispatchTasks)) { TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size()); for (TaskPriority dispatchFailedTask : failedDispatchTasks) { taskPriorityQueue.put(dispatchFailedTask); } // If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher. if (fetchTaskNum == failedDispatchTasks.size()) { TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); } } } catch (Exception e) { TaskMetrics.incTaskDispatchError(); logger.error("dispatcher task error", e); } }}

批量调度任务

TaskPriorityQueueConsumer.batchDispatch()

/** * batch dispatch with thread pool */public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {    List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());    CountDownLatch latch = new CountDownLatch(fetchTaskNum);
for (int i = 0; i < fetchTaskNum; i++) {//拉取任务 TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); if (Objects.isNull(taskPriority)) { latch.countDown(); continue; } consumerThreadPoolExecutor.submit(() -> {//创建异步线程分派任务 try { boolean dispatchResult = this.dispatchTask(taskPriority); if (!dispatchResult) { failedDispatchTasks.add(taskPriority); } } finally { // make sure the latch countDown latch.countDown(); } }); } latch.await(); return failedDispatchTasks;}


3.2.7 任务分派

DS


/** * Dispatch task to worker. * * @param taskPriority taskPriority * @return dispatch result, return true if dispatch success, return false if dispatch failed. */protected boolean dispatchTask(TaskPriority taskPriority) { TaskMetrics.incTaskDispatch(); boolean result = false; try { WorkflowExecuteRunnable workflowExecuteRunnable =//获取workflowexecuteRunnable对象 processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId()); if (workflowExecuteRunnable == null) { logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority); return true; } Optional<TaskInstance> taskInstanceOptional =//获取任务实例 workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId()); if (!taskInstanceOptional.isPresent()) { logger.error("Cannot find the task instance from related processInstance, taskPriority: {}", taskPriority); // we return true, so that we will drop this task. return true; } TaskInstance taskInstance = taskInstanceOptional.get(); TaskExecutionContext context = taskPriority.getTaskExecutionContext(); ExecutionContext executionContext =//创建执行上下文 new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
if (isTaskNeedToCheck(taskPriority)) { if (taskInstanceIsFinalState(taskPriority.getTaskId())) { // when task finish, ignore this task, there is no need to dispatch anymore return true; } } //分派任务 result = dispatcher.dispatch(executionContext);
if (result) { logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}", taskPriority.getTaskId(), executionContext.getHost()); addDispatchEvent(context, executionContext); } else { logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}", taskPriority.getTaskId(), executionContext.getHost()); } } catch (RuntimeException | ExecuteException e) { logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e); } return result;}


选出一台worker节点通过netty向worker发送command,让worker运行该任务。


/** * task dispatch * * @param context context * @return result * @throws ExecuteException if error throws ExecuteException */public Boolean dispatch(final ExecutionContext context) throws ExecuteException {    // get executor manager 获取    ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());    if (executorManager == null) {        throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());    }
// host select worke节点选择器,选择worker节点 Host host = hostManager.select(context); if (StringUtils.isEmpty(host.getAddress())) { logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute", context.getCommand(), context.getWorkerGroup()); return false; } context.setHost(host);//设置host进上下文 executorManager.beforeExecute(context); try { // task execute 通过netty发送消息给worker,告知worker要执行该任务 return executorManager.execute(context); } finally { executorManager.afterExecute(context); }}


/** * execute logic * * @param context context * @return result * @throws ExecuteException if error throws ExecuteException */@Overridepublic Boolean execute(ExecutionContext context) throws ExecuteException {    // all nodes    Set<String> allNodes = getAllNodes(context);    // fail nodes    Set<String> failNodeSet = new HashSet<>();    // build command accord executeContext    Command command = context.getCommand();    // execute task host    Host host = context.getHost();    boolean success = false;    while (!success) {        try {            doExecute(host, command);            success = true;            context.setHost(host);            // We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host            // is not belongs to the down worker ISSUE-10842.            context.getTaskInstance().setHost(host.getAddress());        } catch (ExecuteException ex) {            logger.error(String.format("execute command : %s error", command), ex);            try {                failNodeSet.add(host.getAddress());                Set<String> tmpAllIps = new HashSet<>(allNodes);                Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);                if (remained != null && remained.size() > 0) {                    host = Host.of(remained.iterator().next());                    logger.error("retry execute command : {} host : {}", command, host);                } else {                    throw new ExecuteException("fail after try all nodes");                }            } catch (Throwable t) {                throw new ExecuteException("fail after try all nodes");            }        }    }
return success;}


3.3 worker执行任务


3.3.1 worker启动

DS

WorkerServer.run()

public void run() {    this.workerRpcServer.start();//启动rpc通信组件    this.workerRpcClient.start();    this.taskPluginManager.loadPlugin();
this.workerRegistryClient.registry(); this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.handleDeadServer(); //启动workermaanager线程,消费command执行任务。 this.workerManagerThread.start();
this.messageRetryRunner.start();
/* * registry hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { close("WorkerServer shutdown hook"); } }));}


3.3.2 worker消费任务启动command

DS

worker 通过接收到 master 发送过来的消息最终会调用TaskDispatchProcessor.process(在3.3.1启动netty rpc通信组件的时候注册的handler)方法处理 command 消息。


TaskDispatchProcessor.process()


提取 command 中的 task 上下文对象封装成 TaskExecuteThread ,提交到 
workerManage中。

@Counted(value = "ds.task.execution.count", description = "task execute total count")@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)@Overridepublic void process(Channel channel, Command command) {    Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),                                String.format("invalid command type : %s", command.getType()));
TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);
if (taskDispatchCommand == null) { logger.error("task execute request command content is null"); return; } final String masterAddress = taskDispatchCommand.getMessageSenderAddress(); logger.info("task execute request message: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();//获取上下文
if (taskExecutionContext == null) { logger.error("task execution context is null"); return; } try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
// todo custom logger
taskExecutionContext.setHost(workerConfig.getWorkerAddress()); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {//不是测试运行 boolean osUserExistFlag; //if Using distributed is true and Currently supported systems are linux,Should not let it automatically //create tenants,so TenantAutoCreate has no effect if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) { //use the id command to judge in linux osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode()); } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { // if not exists this user, then create OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); } else { osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); }
// check if the OS user exists if (!osUserExistFlag) {//校验操作系统用户是否存在 logger.error("tenantCode: {} does not exist, taskInstanceId: {}", taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); taskExecutionContext.setEndTime(new Date()); workerMessageSender.sendMessageWithRetry(taskExecutionContext,//发送失败消息 masterAddress, CommandType.TASK_EXECUTE_RESULT); return; }
// local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); logger.info("task instance local execute path : {}", execLocalPath); taskExecutionContext.setExecutePath(execLocalPath);
try { FileUtils.createWorkDirIfAbsent(execLocalPath); } catch (Throwable ex) {//workerdir创建失败 logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId(), ex); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); return; } }
// delay task process long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); if (remainTime > 0) { logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); taskExecutionContext.setStartTime(null); workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); }
// submit task to manager//提交任务 workerManager 中 boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate)); if (!offer) { logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}", workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT); } } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }}


3.3.3 workerManager 消费任务

DS


public void start() {    logger.info("Worker manager thread starting");    Thread thread = new Thread(this, this.getClass().getName());    thread.setDaemon(true);    thread.start();    logger.info("Worker manager thread started");}
public void submit(TaskExecuteThread taskExecuteThread) { taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);//设置进正在运行task的map中。 ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);//提交任务到线程池中运行任务 FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Object o) {//注册毁掉函数 taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); }
@Override public void onFailure(Throwable throwable) { logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(), taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable); taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());//移除 } }; Futures.addCallback(future, futureCallback, this.listeningExecutorService);}


3.3.4 任务运行

DS

@Overridepublic void run() {    try {        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),                taskExecutionContext.getTaskInstanceId());        if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {//判断是不是测试运行            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);            taskExecutionContext.setStartTime(new Date());            taskExecutionContext.setEndTime(new Date());            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());            workerMessageSender.sendMessageWithRetry(taskExecutionContext,                    masterAddress,                    CommandType.TASK_EXECUTE_RESULT);            logger.info("Task dry run success");            return;        }    } finally {        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();    }    try {        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),                taskExecutionContext.getTaskInstanceId());        logger.info("script path : {}", taskExecutionContext.getExecutePath());        if (taskExecutionContext.getStartTime() == null) {            taskExecutionContext.setStartTime(new Date());        }        logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources()); if (!fileDownloads.isEmpty()) { downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads); }
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());//获取task构造器 if (null == taskChannel) { throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); } String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setTaskLogName(taskLogName);
// set the name of the current thread Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);//创建task
// task init this.task.init();//初始化task
//init varPool this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle this.task.handle();//task处理的真正逻辑,每一种task对应自己的handle逻辑
// task result process if (this.task.getNeedAlert()) { sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode()); }
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));//返回任务的返回信息 taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus()); } catch (Throwable e) { logger.error("task scheduler failure", e); kill(); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);//返回失败 taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); } finally { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());//返回任务运行结果 workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); clearTaskExecPath(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }}


3.4 master接受任务反馈


3.4.1 master接受反馈消息

DS


Worker运行完任务通过netty向master汇报任务运行结果。Master在启动的时候也启动了netty,同时也注册了很多handler。一种消息类型是一个业务对应一个handler来处理业务。

Worker 反馈任务运行消息则是  Task_EXECUTE_RESULT 消息类型


MasterRPCServer.init()

@Overridepublic void run() {    try {        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),                taskExecutionContext.getTaskInstanceId());        if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {//判断是不是测试运行            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);            taskExecutionContext.setStartTime(new Date());            taskExecutionContext.setEndTime(new Date());            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());            workerMessageSender.sendMessageWithRetry(taskExecutionContext,                    masterAddress,                    CommandType.TASK_EXECUTE_RESULT);            logger.info("Task dry run success");            return;        }    } finally {        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();    }    try {        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),                taskExecutionContext.getTaskInstanceId());        logger.info("script path : {}", taskExecutionContext.getExecutePath());        if (taskExecutionContext.getStartTime() == null) {            taskExecutionContext.setStartTime(new Date());        }        logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources()); if (!fileDownloads.isEmpty()) { downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads); }
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());//获取task构造器 if (null == taskChannel) { throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); } String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setTaskLogName(taskLogName);
// set the name of the current thread Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);//创建task
// task init this.task.init();//初始化task
//init varPool this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle this.task.handle();//task处理的真正逻辑,每一种task对应自己的handle逻辑
// task result process if (this.task.getNeedAlert()) { sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode()); }
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));//返回任务的返回信息 taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus()); } catch (Throwable e) { logger.error("task scheduler failure", e); kill(); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);//返回失败 taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); } finally { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());//返回任务运行结果 workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); clearTaskExecPath(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }}


将消息封装成 taskResultEvent 提交给taskEventService


TaskExecuteResponseProcessor.run()

@Overridepublic void process(Channel channel, Command command) {    Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),                                String.format("invalid command type : %s", command.getType()));
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteResultCommand.class); TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel, taskExecuteResultMessage.getMessageSenderAddress()); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(), taskResultEvent.getTaskInstanceId()); logger.info("Received task execute result, event: {}", taskResultEvent); //提交事件给taskEventService taskEventService.addEvent(taskResultEvent); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }}


3.4.2 taskEventService处理taskevent

DS

TaskEventService 启动源码,启动  taskeventdispacher 和 taskeventhandler 。


Eventservice 由这两个组件组成。


TaskEventService.start() 方法

@Overridepublic void process(Channel channel, Command command) {    Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),                                String.format("invalid command type : %s", command.getType()));
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteResultCommand.class); TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel, taskExecuteResultMessage.getMessageSenderAddress()); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(), taskResultEvent.getTaskInstanceId()); logger.info("Received task execute result, event: {}", taskResultEvent); //提交事件给taskEventService taskEventService.addEvent(taskResultEvent); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }}


接3.4.1 处理

@Overridepublic void process(Channel channel, Command command) {    Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),                                String.format("invalid command type : %s", command.getType()));
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteResultCommand.class); TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel, taskExecuteResultMessage.getMessageSenderAddress()); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(), taskResultEvent.getTaskInstanceId()); logger.info("Received task execute result, event: {}", taskResultEvent); //提交事件给taskEventService taskEventService.addEvent(taskResultEvent); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }}


进入submitTaskEvent方法

public void submitTaskEvent(TaskEvent taskEvent) {    if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {        logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);        return;    } //创建taskExecuteRunnable,并且将事件提交到taskEventRunnable,后续该对象负责处理该process的所有提交过来的TaskEvent    TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),        (processInstanceId) -> new TaskExecuteRunnable(processInstanceId, taskEventHandlerMap));    taskExecuteRunnable.addEvent(taskEvent);}


处理 taskevent ,最终调用 taskExecuteThread.run  方法处理 taskevent。

class TaskEventHandlerThread extends BaseDaemonThread {
protected TaskEventHandlerThread() { super("TaskEventHandlerThread"); }
@Override public void run() { logger.info("event handler thread started"); while (Stopper.isRunning()) { try {//处理事件 taskExecuteThreadPool.eventHandler(); TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("TaskEvent handle thread interrupted, will return this loop"); break; } catch (Exception e) { logger.error("event handler thread error", e); } } }}

//进入eventHandler方法public void eventHandler() { for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) { executeEvent(taskExecuteThread); }}

//执行事件public void executeEvent(TaskExecuteRunnable taskExecuteThread) { if (taskExecuteThread.isEmpty()) { return; } if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) { return;}//获取对应task的事件处理线程 multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread); ListenableFuture future = this.submitListenable(taskExecuteThread::run); future.addCallback(new ListenableFutureCallback() {//注册毁掉 @Override public void onFailure(Throwable ex) { Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex); if (!processInstanceExecCacheManager.contains(processInstanceId)) { taskExecuteThreadMap.remove(processInstanceId); logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); }
@Override public void onSuccess(Object result) { Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId); if (!processInstanceExecCacheManager.contains(processInstanceId)) { taskExecuteThreadMap.remove(processInstanceId); logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } });}


//最终提交异步任务TaskExecuteRunnable调用run方法
TaskExecuteRunnable.run()
public void run() { while (!this.events.isEmpty()) { // we handle the task event belongs to one task serial, so if the event comes in wrong order, TaskEvent event = this.events.peek(); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId()); logger.info("Handle task event begin: {}", event);//根据事件类型获取handler,处理事件 taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event); events.remove(event); logger.info("Handle task event finished: {}", event); } catch (TaskEventHandleException taskEventHandleException) { // we don't need to resubmit this event, since the worker will resubmit this event logger.error("Handle task event failed, this event will be retry later, event: {}", event, taskEventHandleException); } catch (TaskEventHandleError taskEventHandleError) { logger.error("Handle task event error, this event will be removed, event: {}", event, taskEventHandleError); events.remove(event); } catch (Exception unknownException) { logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}", event, unknownException); events.remove(event); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } }}


3.4.3 TaskResultEventHandler 处理 taskevent

DS

TaskResultEventHandler.handlerTaskEvent()

class TaskEventHandlerThread extends BaseDaemonThread {
protected TaskEventHandlerThread() { super("TaskEventHandlerThread"); }
@Override public void run() { logger.info("event handler thread started"); while (Stopper.isRunning()) { try {//处理事件 taskExecuteThreadPool.eventHandler(); TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("TaskEvent handle thread interrupted, will return this loop"); break; } catch (Exception e) { logger.error("event handler thread error", e); } } }}

//进入eventHandler方法public void eventHandler() { for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) { executeEvent(taskExecuteThread); }}

//执行事件public void executeEvent(TaskExecuteRunnable taskExecuteThread) { if (taskExecuteThread.isEmpty()) { return; } if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) { return;}//获取对应task的事件处理线程 multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread); ListenableFuture future = this.submitListenable(taskExecuteThread::run); future.addCallback(new ListenableFutureCallback() {//注册毁掉 @Override public void onFailure(Throwable ex) { Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex); if (!processInstanceExecCacheManager.contains(processInstanceId)) { taskExecuteThreadMap.remove(processInstanceId); logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); }
@Override public void onSuccess(Object result) { Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId); if (!processInstanceExecCacheManager.contains(processInstanceId)) { taskExecuteThreadMap.remove(processInstanceId); logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } });}


//最终提交异步任务TaskExecuteRunnable调用run方法
TaskExecuteRunnable.run()
public void run() { while (!this.events.isEmpty()) { // we handle the task event belongs to one task serial, so if the event comes in wrong order, TaskEvent event = this.events.peek(); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId()); logger.info("Handle task event begin: {}", event);//根据事件类型获取handler,处理事件 taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event); events.remove(event); logger.info("Handle task event finished: {}", event); } catch (TaskEventHandleException taskEventHandleException) { // we don't need to resubmit this event, since the worker will resubmit this event logger.error("Handle task event failed, this event will be retry later, event: {}", event, taskEventHandleException); } catch (TaskEventHandleError taskEventHandleError) { logger.error("Handle task event error, this event will be removed, event: {}", event, taskEventHandleError); events.remove(event); } catch (Exception unknownException) { logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}", event, unknownException); events.remove(event); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } }}



3.5 master闭环提交下游任务


3.5.1 EventExecuteService 处理 stateEvent

DS

EventExecuteService.run()

该service线程由3.2.1启动

@Overridepublic void run() {    while (Stopper.isRunning()) {        try {          //处理事件            eventHandler();            TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);        } catch (InterruptedException interruptedException) {            logger.warn("Master event service interrupted, will exit this loop", interruptedException);            Thread.currentThread().interrupt();            break;        } catch (Exception e) {            logger.error("Master event execute service error", e);        }    }}
进入eventHnnder方法private void eventHandler() { for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { try { LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); workflowExecuteThreadPool.executeEvent(workflowExecuteThread); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }}

最终调用 workflowExecuteThread 的 handlerEvents 处理事件。

/** * Handle the events belong to the given workflow. */public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {    if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {        return;    }    if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {        logger.warn("The workflow has been executed by another thread");        return;    }    multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);    int processInstanceId = workflowExecuteThread.getProcessInstance().getId();    ListenableFuture<?> future = this.submitListenable(workflowExecuteThread::handleEvents);//    future.addCallback(new ListenableFutureCallback() {        @Override        public void onFailure(Throwable ex) {            LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);            try {                logger.error("Workflow instance events handle failed", ex);                multiThreadFilterMap.remove(workflowExecuteThread.getKey());            } finally {                LoggerUtils.removeWorkflowInstanceIdMDC();            }        }
@Override public void onSuccess(Object result) { try { LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); if (workflowExecuteThread.workFlowFinish()) { stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId()); processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); notifyProcessChanged(workflowExecuteThread.getProcessInstance()); logger.info("Workflow instance is finished."); } } catch (Exception e) { logger.error("Workflow instance is finished, but notify changed error", e); } finally { // make sure the process has been removed from multiThreadFilterMap multiThreadFilterMap.remove(workflowExecuteThread.getKey()); LoggerUtils.removeWorkflowInstanceIdMDC(); } } });}

3.5.2 workflowExecuteThread 处理 stateEvent 事件

DS

WorkflowExecuteRunnable.handlerEvents()

/** * handle event */public void handleEvents() {    if (!isStart()) {        logger.info(                "The workflow instance is not started, will not handle its state event, current state event size: {}",                stateEvents);        return;    }    StateEvent stateEvent = null;    while (!this.stateEvents.isEmpty()) {        try {            stateEvent = this.stateEvents.peek();            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),                    stateEvent.getTaskInstanceId());            // if state handle success then will remove this state, otherwise will retry this state next time.            // The state should always handle success except database error.            checkProcessInstance(stateEvent);
StateEventHandler stateEventHandler = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); logger.info("Begin to handle state event, {}", stateEvent); if (stateEventHandler.handleStateEvent(this, stateEvent)) {//调用stateEventHandler处理事件 this.stateEvents.remove(stateEvent); } } catch (StateEventHandleError stateEventHandleError) { logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); this.stateEvents.remove(stateEvent); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (StateEventHandleException stateEventHandleException) { logger.error("State event handle error, will retry this event: {}", stateEvent, stateEventHandleException); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue. logger.error("State event handle error, get a unknown exception, will retry this event: {}", stateEvent, e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); }
}}


3.5.3 TaskStateEventHandler处理stateEvent事件

DS


判断是否是任务完成事件如果是完成事件,调用 workflowExecuteThread 的 taskFinished 方法,提交下游任务。


TaskStateEventHandler.handlerStateEvent()


@Overridepublic boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)    throws StateEventHandleException, StateEventHandleError {    measureTaskState(stateEvent);    workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError( "Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId()));
if (task.getState() == null) { throw new StateEventHandleError("Task state event handle error due to task state is null"); }
Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
if (task.getState().typeIsFinished()) { if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) { logger.warn("The task instance is already complete, stateEvent: {}", stateEvent); return true; }//调用workflowexecuteRunnabletaskFinished方法。 workflowExecuteRunnable.taskFinished(task); if (task.getTaskGroupId() > 0) { workflowExecuteRunnable.releaseTaskGroup(task); } return true; } Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap(); if (activeTaskProcessMap.containsKey(task.getTaskCode())) { ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode()); iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskInstance().getState().typeIsFinished()) { if (iTaskProcessor.taskInstance().getState() != task.getState()) { task.setState(iTaskProcessor.taskInstance().getState()); } workflowExecuteRunnable.taskFinished(task); } return true; } throw new StateEventHandleException( "Task state event handle error, due to the task is not in activeTaskProcessorMaps");}


3.5.4 wokerflowExecuteThread调度下游任务

DS

WorkflowExecuteRunnable.taskFinished方法
public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {    logger.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState());    try {
activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance); stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
if (taskInstance.getState().typeIsSuccess()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); // todo: merge the last taskInstance processInstance.setVarPool(taskInstance.getVarPool()); processService.saveProcessInstance(processInstance); if (!processInstance.isBlocked()) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) { // retry task logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); retryTaskInstance(taskInstance); } else if (taskInstance.getState().typeIsFailure()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); // There are child nodes and the failure policy is: CONTINUE if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( Long.toString(taskInstance.getTaskCode()), dag)) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); if (processInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } } } else if (taskInstance.getState().typeIsFinished()) { // todo: when the task instance type is pause, then it should not in completeTaskMap completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); } logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState()); this.updateProcessInstanceState(); } catch (Exception ex) { logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex); // remove the task from complete map, so that we can finish in the next time. completeTaskMap.remove(taskInstance.getTaskCode()); throw ex; }}


04

总结


4.1 各个组件的使用

DS

  • ApiServer负责前端的接口请求。

  • Master负责工作流的任务调度,根据策略选择一台合适的worker执行任务,并更新任务状态到db。

  • Worker负责接受到master的任务,然后运行,并报告运行结果给master

4.2 线程作用

DS

  • MasterSchedulerBootstrap:command扫描线程,负责扫描apiserver写入的command信息,并创建processInstants入库。

  • WorkflowEventLooper:WorkflowEvent处理线程,负责处理MasterSchedulerBootstrap写入的WorkflowEvent

  • WorkflowExecuteRunnable:负责工作流的启动逻辑,构建dag有向无环图,初始化调度配置,提交任务到任务队列中。

  • TaskPriorityQueueConsumer:负责消费优先级队列的任务,异步通过dispacher选择wokrer,向worker发送启动任务的command。

  • TaskDispatchProcessor:worker节点负责处理 master的command信息。

  • WorkerManagerThread:worker节点负责管理当前正则运行的task信息

  • TaskExecuteThread:task的执行线程,负责执行task的逻辑,并返回执行结果给master。

  • TaskExecuteResponseProcessor:负责处理worker返回的任务结果,并包装成taskEvent

  • TaskEventService:负责处理taskEvent TaskResultEventHandler:被taskEventService调用,负责task结果处理。

  • EventExecuteService:负责处理工作流产生的事件。

  • TaskStateEventHandler:负责处理workflowExecute的event,event类型为TaskStateEvent 并且如果是taskstate是运行完成信息,还要提交调度下游任务。

本文是个人阅读 DolphinScheduler 源码的一些见解,欢迎大家跟我交流~如有错误,请批评指正!


参与贡献


随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html


来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

添加社区小助手微信(Leonard-ds) 



添加小助手微信时请说明想参与贡献。


来吧,开源社区非常期待您的参与。



< 🐬🐬 >
更多精彩推荐

DolphinScheduler 登陆 AWS AMI 应用市场!

DolphinScheduler 机器学习工作流预测今年 FIFA 世界杯冠军大概率是荷兰!

手把手教你上手Apache DolphinScheduler机器学习工作流

DolphinScheduler 快速构建 Hugging Face 文本分类工作流,基于工作流的机器学习训练部署太强了!

Apache DolphinScheduler 获评「2022 年度优秀开源技术团队」

【Meetup讲师】您有一张社区认证讲师证书未领取,点击领取!

最佳实践 | 如何基于GitHub Actions创建 DolphinScheduler Python API的CI/CD?



我知道你在看

【声明】内容源于网络
0
0
海豚调度
Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
内容 667
粉丝 0
海豚调度 Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
总阅读167
粉丝0
内容667