大数跨境

Apache DolphinScheduler 2.0 时代 API 接口变化

Apache DolphinScheduler 2.0 时代 API 接口变化 海豚调度
2022-03-01
0
导读:在DS 2.0版本,接口已改为具有Restful风格的请求方式,本文重点介绍工作流、任务及工作流与任务关系的部分接口。




作者介绍:李进勇,优秀创作者,大数据架构师/Apache DolphinScheduler Committer





01


背景



众目期待的 Apache DolphinScheduler 2.0 版本终于在 2021 年 12 月 17 日 发布了,要知道 2.0 版本代码重构了约 70%,从 2021 年 1 月份计划启动大 JSON 拆分,历时 11 个月到 2021 年 12 月 5 号发布 2.0.0(由于非预料 bug 而下线),终于在 12 月 17 号成功发布 2.0.1,随后半个月又发布 2.0.2 及当前的 2.0.3 版本。
在 DS 2.0 版本,接口已改为具有 Restful 风格的请求方式,对应的 swagger 文档地址是 http://ip:port/dolphinscheduler/doc.html,从 swagger 文档可以看到,接口分类很明确。我们本次重点介绍工作流、任务及工作流与任务关系的部分接口。



02


工作流定义接口



工作流定义在当前版本有 25 个接口,接下来会介绍下比较常用的接口:



03


任务定义接口



接口名称 接口地址
请求方式
功能介绍
save
/dolphinscheduler/projects/{projectCode}/task-definition
POST
创建任务的接口,taskDefinitionJson 必须是 JSON Array 的方式
update
/dolphinscheduler/projects/{projectCode}/task-definition/{code}
PUT
修改任务的接口,taskDefinitionJsonObj 必须是 JSON Object 的方式
deleteTaskDefinition
/dolphinscheduler/projects/{projectCode}/task-definition/{code}
DELETE
删除任务的接口,根据任务 code 进行删除
deleteVersion
/dolphinscheduler/projects/{projectCode}/task-definition/{code}/versions/{version}
DELETE
根据任务 code 和 version 删除,只能删除非主表应用版本的数据
switchVersion
/dolphinscheduler/projects/{projectCode}/task-definition/{code}/versions/{version}
GET
根据任务 code 和 version 切换到指定版本
genTaskCodeList
/dolphinscheduler/projects/{projectCode}/task-definition/gen-task-codes
GET
获取 taskCode,根据 genNum 可获取多个
queryTaskDefinitionByCode
/dolphinscheduler/projects/{projectCode}/task-definition/{code}
GET
根据任务 code 查询任务详情信息
queryTaskDefinitionListPaging
/dolphinscheduler/projects/{projectCode}/task-definition
GET
分页查询任务



04


工作流任务关系接口



接口名称
接口地址
请求方式
功能介绍
save
/dolphinscheduler/projects/{projectCode}/process-task-relation
POST
工作流和任务绑定接口,支持绑定前置任务、绑定后置任务
deleteRelation
/dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}
DELETE
工作流和任务解绑,当任务是条件分支、依赖任务、子工作流时同步删除任务
deleteDownstreamRelation
/dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/downstream
DELETE
删除任务的下游依赖,支持批量删除下游
deleteUpstreamRelation
/dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/upstream
DELETE
删除任务的上游依赖,支持批量删除上游
queryDownstreamRelation
/dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/downstream
GET
查询任务的下游依赖
queryUpstreamRelation
/dolphinscheduler/projects/{projectCode}/process-task-relation/{taskCode}/upstream
GET
查询任务的上游依赖




05


代码调用接口方式



如果单单使用代码调用接口,需要有个 token,这个 token 可以使用 admin 用户在界面生成(创建令牌),也可以通过调用接口获得
Maven 依赖
  
  
  
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>
代码示例
  
  
  
private static String DOLPHIN_BASE_URI = "http://ip:port";
private static String token = "xxx";
private static String sendPost(String uri, List<NameValuePair> params) throws Exception {
    CloseableHttpClient httpclient = HttpClients.createDefault();
    CloseableHttpResponse response = null;
    try {
        UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(params, Consts.UTF_8);
        HttpPost httpPost = new HttpPost(DOLPHIN_BASE_URI + uri);
        httpPost.setEntity(formEntity);
        httpPost.setHeader("token", token);
        response = httpclient.execute(httpPost);
        return EntityUtils.toString(response.getEntity(), Consts.UTF_8);
    } catch (Exception e) {
        throw new Exception(String.format("[dolphin] The %s call failed", uri));
    } finally {
        try {
            if (response != null) {
                response.close();
            }
            httpclient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
private static String sendGet(String uri, List<NameValuePair> params) throws Exception {
    CloseableHttpClient httpclient = HttpClients.createDefault();
    CloseableHttpResponse response = null;
    try {
        HttpGet httpGet = new HttpGet(new URIBuilder(DOLPHIN_BASE_URI + uri).setParameters(params).build());
        httpGet.setHeader("token", token);
        response = httpclient.execute(httpGet);
        return EntityUtils.toString(response.getEntity(), Consts.UTF_8);
    } catch (Exception e) {
        throw new Exception(String.format("[dolphin] The %s call failed", uri));
    } finally {
        if (response != null) {
            response.close();
        }
        httpclient.close();
    }
}



06


创建工作流的两种方式



(1) 调用 createProcessDefinition

参数
参数说明
实例值
locations
通过接口调用时可不填写,DAG 界面会给默认值,用户可 format 后保存重新生成

name
工作流名称
lee-test-01
projectCode
项目 code,必须填写
4362891840832
taskDefinitionJson
task 所有信息组成 json Array,在这个接口中 task code 必须含有,task version 无需含有
[{"code":4143298469056,"name":"lee-test","description":"","delayTime":0,"taskType":"SHELL","taskParams":{"resourceList":[],"localParams":[],"rawScript":"echo 11333","dependence":{},"conditionResult":{"successNode":[],"failedNode":[]},"waitStartTimeout":{},"switchResult":{}},"flag":"YES","taskPriority":"MEDIUM","workerGroup":"default","failRetryTimes":0,"failRetryInterval":1,"timeoutFlag":"CLOSE","timeoutNotifyStrategy":"WARN","timeout":0,"environmentCode":-1}]
taskRelationJson
dag 上 task 关系描述,postTask 表述当前节点
[{"name":"","preTaskCode":0,"preTaskVersion":0,"postTaskCode":4143298469056,"conditionType":0,"conditionParams":{}}]
tenantCode
租户,对应于租户管理的操作系统租户
root
description
工作流描述信息

globalParams
全局参数
[]
timeout
工作流超时时长
0
  
  
  
    public static void main(String[] args) throws Exception {
        long projectCode = 4362891840832L;
        String uri = String.format("/dolphinscheduler/projects/%d/process-definition", projectCode);
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("name""lee-test-04"));
        params.add(new BasicNameValuePair("projectCode", projectCode + ""));
        String taskDefinitionJson = "[{\"code\":4143298469059,\"name\":\"lee-test-4\",\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\"," +
                "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 11333\",\"dependence\":{},\"conditionResult\"" +
                ":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\"," +
                "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"WARN\"," +
                "\"timeout\":0,\"environmentCode\":-1}]";
        params.add(new BasicNameValuePair("taskDefinitionJson", taskDefinitionJson));
        params.add(new BasicNameValuePair("taskRelationJson""[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":4143298469059,\"conditionType\":0,\"conditionParams\":{}}]"));
        params.add(new BasicNameValuePair("tenantCode""root"));
        params.add(new BasicNameValuePair("description"""));
        params.add(new BasicNameValuePair("globalParams""[]"));
        params.add(new BasicNameValuePair("timeout""0"));
        sendPost(uri, params);
    }

(2) 调用工作流 createEmptyProcessDefinition、任务 save、工作流任务关系 save

ProcessDefinition createEmptyProcessDefinition api
参数
参数说明
示例值
name
工作流名称
lee-test-01
projectCode
项目 code,必须填写
4362891840832
scheduleJson
创建定时,可为空
{"warningType":"NONE","warningGroupId":1,"failureStrategy":"CONTINUE","workerGroup":"prod","environmentCode":-1,"processInstancePriority":"MEDIUM","startTime":"2022-02-07 00:00:00","endTime":"2027-02-07 00:00:00","crontab":"0 11 11 * * ? *","timezoneId":"Asia/Shanghai"}
tenantCode
租户,对应于租户管理的操作系统租户
root
description
工作流描述信息

globalParams
全局参数
[]
timeout
工作流超时时长
0
TaskDefinition save api
参数
参数说明
示例值
projectCode
项目 code,必须填写
4362891840832
taskDefinitionJson
task 所有信息组成 json Array,在这个接口中无需必须含有 task code 和 task version
[{"name":"lee-test","description":"","delayTime":0,"taskType":"SHELL","taskParams":{"resourceList":[],"localParams":[],"rawScript":"echo 11333","dependence":{},"conditionResult":{"successNode":[],"failedNode":[]},"waitStartTimeout":{},"switchResult":{}},"flag":"YES","taskPriority":"MEDIUM","workerGroup":"default","failRetryTimes":0,"failRetryInterval":1,"timeoutFlag":"CLOSE","timeoutNotifyStrategy":"WARN","timeout":0,"environmentCode":-1}]
ProcessTaskRelation save api
参数
参数说明 示例值
projectCode
项目 code,必须填写
4362891840832
postTaskCode
创建定时,可为空
{"warningType":"NONE","warningGroupId":1,"failureStrategy":"CONTINUE","workerGroup":"prod","environmentCode":-1,"processInstancePriority":"MEDIUM","startTime":"2022-02-07 00:00:00","endTime":"2027-02-07 00:00:00","crontab":"0 11 11 * * ? *","timezoneId":"Asia/Shanghai"}
preTaskCode
租户,对应于租户管理的操作系统租户
root
processDefinitionCode
工作流描述信息



07


参与贡献


随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/docs/development/contribute.html


来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。


添加小助手微信时请说明想参与贡献。


来吧,开源社区非常期待您的参与。








社区官网
https://dolphinscheduler.apache.org/

代码仓地址https://github.com/apache/dolphinscheduler


您的 Star,是 Apache DolphinScheduler 为爱发电的动力❤️ 


投稿请添加社区小助手微信

(Leonard-ds)





☞Apache DolphinScheduler 版本控制核心原理揭

☞海豚调度在 Kubernetes 体系中的技术实战

☞倒计时 1 天 | Apache DolphinScheduler Meetup 2.26 准时上线!

☞Apache 基金会中国项目活跃度分析 Top20 发布,DolphinScheduler 位列第四

☞数据迁移 | Apache Dolphinscheduler 调度 DataX 从 MySQL 全量导入 Hive

☞Apache DolphinScheduler 2.0.3 发布,支持钉钉告警签名校验,数据源可从多个会话获取链接

Apache DolphinScheduler 董事会报告:社区健康运行,Commit 增长 123%

一文给你整明白多租户在 Apache DolphinScheduler 中的作用



点击阅读原文,加入开源!
【声明】内容源于网络
0
0
海豚调度
Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
内容 667
粉丝 0
海豚调度 Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
总阅读167
粉丝0
内容667