大数跨境

海豚调度清理:使用 API 轻松清理工作流历史版本记录,一键减少关系日志和任务定义日志表的数据量

海豚调度清理:使用 API 轻松清理工作流历史版本记录,一键减少关系日志和任务定义日志表的数据量 海豚调度
2024-09-05
0
导读:一键减少关系日志和任务定义日志表的数据量!

💡  本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。

推荐阅读:

祝开卷有益 :)


大家好,我是小陶,今天是清理调度数据的第二篇文章,之前分享过如何使用API清理工作流实例和任务实例,可以看这篇文章:海豚调度清理:使用 API 轻松清理历史工作流实例以及日志文件

我们知道 DolphinScheduler 的工作流是有版本控制的,每一次更新任务、添加任务、修改任务等等操作,都会生成一个新的版本号,同时 process_definition_log 和 process_task_relation_log 的数据也会增加,久而久之,会积累大量的"无用数据",MySQL 的记录越来越多,会影响调度的服务,进而影响用户使用体验和 MySQL 服务。


来看一个例子,往下看。👇👇👇

如下图所示,该工作流随着迭代,已经积累了 600 多个版本,我们用了这么长时间的调度,没有发生过需要切换历史版本的情况,历史的版本数据基本都算做“无用”数据了,同时为了保持稳定性,和数仓同学协商,只保留最近 20 个版本。

所以,需要清理以上历史版本记录,保证页面影响速度和 MySQL 服务。

清理调度任务历史版本记录,依然是使用API的方式,直接操作数据库风险比较高。

本文的内容也比较简单,先是说明 API 的逻辑,最后再介绍如何使用一个 Python 脚本来调用 API 删除历史版本记录。


1.API 逻辑介绍

DolphinScheduler 本身提供了删除版本记录的接口,请求类型:DELETE,接口地址:process-definition/{dag_code}/versions/{version} ,接口逻辑比较简单,这里就不赘述了。


2.使用 Python 脚本调用API

Python脚本的逻辑比较简单,使用了4个API,按照顺序是:

1.获取项目列表
2.获取工作流列表
3.获取当前工作流版本信息列表
4.删除历史版本

第三步,需要注意的是,获取版本信息列表的时候,指定了分页大小是 20 ,从第二页开始。因为我们要保留最近的 20 个版本记录。

入参:无

Python 环境 2.7

具体的代码如下:

#!/usr/bin/python
# -*- coding: utf8 -*-
## 清理调度任务历史版本记录,依然是使用API的方式,直接操作数据库风险比较高。
## 会减少 process_definition_log 和 process_task_relation_log 的数据。

import io
import subprocess
import requests
import json
import time
import datetime

# 配置信息: ip 端口 token自行修改
base_url = 'http://xxxx:xxxx'
token = 'xxxxx'
# 获取项目列表
def get_project_list():
    url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=&_t=0.3741042528841678".format(base_url=base_url)
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("GET", url, headers=headers, data=payload)
    response_data = json.loads(response.text)
    totalList = response_data['data']['totalList']
    return totalList

# 获取工作定义列表
def get_definition_detail(project_code):
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */
*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    all_data = []
    pageNo = 1
    while True:
        url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition?searchVal=&pageSize=50&pageNo={pageNo}".format(project_code=project_code,pageNo=pageNo,base_url=base_url)
        response = requests.request("GET", url, headers=headers, data=payload)
        response_data = json.loads(response.text)
        page_data = response_data['data']['totalList']
        totalPage = response_data['data']['totalPage']

        if len(page_data) == 0:
            print('工作定义列表为空,退出循环...')
            break
        all_data.extend(page_data)

        if pageNo >= totalPage:
            print('工作定义列表到头了,退出循环...')
            break
        pageNo += 1
    # 返回全部数据
    return all_data

# 获取工作定义的版本信息列表,注意,这里从第二页开始!!!size是 20
def get_version_detail(project_code,dag_code,current_version):
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }

    all_version = []
    pageNo = 2

    while True:
        if pageNo <= 1:
            print('获取工作定义的版本信息列表,pageNo 必须大于1!!!')
            break

        url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions?searchVal=&pageSize=20&pageNo={pageNo}".format(project_code=project_code,dag_code=dag_code,pageNo=pageNo,base_url=base_url)
        response = requests.request("GET", url, headers=headers, data=payload)
        response_data = json.loads(response.text)
        page_data = response_data['data']['totalList']
        totalPage = response_data['data']['totalPage']

        if len(page_data) == 0:
            print('version列表为空,退出循环...')
            break

        for page in page_data:
            version = int(page['version'])
            # 保留近20个版本
            if version + 20 <= current_version:
                all_version.append(version)

        if pageNo >= totalPage:
            print('version列表到头了,退出循环...')
            break

        pageNo += 1

    # TODO 分析all_data里面是否包含 current_version

    # 返回正常的数据
    return all_version

def delete(project_code,dag_code,version):
    print('即将删除的项目,工作流以及版本')
    print(project_code)
    print(dag_code)
    print(version)
    url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions/{version}".format(project_code=project_code,dag_code=dag_code,version=version,base_url=base_url)
    # 'processInstanceIds=89767'
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */
*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Content-Type': 'application/x-www-form-urlencoded',
      'Origin': 'http://10.1.19.150:7080',
      'Referer': 'http://10.1.19.150:7080/dolphinscheduler/ui/',
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token,
      'Cookie': 'sessionId=680b2a0e-624c-4804-9e9e-58c7d4a0b44c; language=zh_CN; userName=admin; HERA_Token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzc29JZCI6Ii0xIiwic3NvX25hbWUiOiJhZG1pbiIsImF1ZCI6IjJkZmlyZSIsImlzcyI6ImhlcmEiLCJleHAiOjE2NDYwMjk3MDYsInVzZXJJZCI6IjEiLCJpYXQiOjE2NDU3NzA1MDYsInVzZXJuYW1lIjoiYWRtaW4ifQ.YEhr9Mi7FDsQIAn5GJorB0U3lL92KQA8YvP26QMhh9g; sessionId=680b2a0e-624c-4804-9e9e-58c7d4a0b44c'
    }
    response = requests.request("DELETE", url, headers=headers, data=payload)
    print('执行结果如下:')
    print(response.text)

if __name__ == '__main__':
    # # 需要处理的项目
    projects = get_project_list()
    # 依次处理项目
    for project in projects:
        project_code = project['code']
        print('正在处理项目:'+ str(project_code))
        all_dags = get_definition_detail(project_code)
        for dag in all_dags:
            # 工作流code和当前版本
            dag_code = dag['code']
            current_version = dag['version']
            print(dag_code)
            print(current_version)
            # 获取该工作流历史版本记录...
            all_data = get_version_detail(project_code,dag_code,current_version)
            # TODO 删除
            print(all_data)
            for v in all_data:
                delete(project_code,dag_code,v)

使用示例:dolphin_clean_version.py 是上面的脚本。

python  dolphin_clean_version.py

脚本在 GitHub 也维护了一份,欢迎 star 

https://github.com/aikuyun/dolphin_practices/blob/main/dolphin_clean_version.py

3.注意事项

1.token 获取的方式


以上就使用 API 一键减少关系日志表和任务定义日志表的数据量的过程,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。


🌍 本次 Meetup 汇聚技术大咖和实战案例,为你揭示如何在多云环境中高效部署调度系统,学习哔哩哔哩如何基于海豚调度进行场景适配解决业务问题




9月26日,和我们一起探索 Apache DolphinScheduler 的最新功能、跨云调度策略,以及智能化调度的未来趋势!📊 不论你是开发者、数据工程师,还是对开源项目感兴趣的技术爱好者,这次活动都不容错过!快来报名,与全球社区一同交流吧!🔗


 

<🐬🐬 >

推荐阅读

用户实践案例
奇富科技  蜀海供应链 联通数科 拈花云科
蔚来汽车 长城汽车 集度 长安汽车
思科网讯 生鲜电商 联通医疗 联想
新网银行 消费金融  腾讯音乐 自如
有赞 伊利 当贝大数据
联想 传智教育 Bigo
通信行业  作业帮 太美医疗
某新能源

迁移实践
Azkaban   Ooize   
Airflow (有赞案例) Air2phin(迁移工具)
Airflow迁移实践
Apache DolphinScheduler 3.0.0 升级到 3.1.8 教程
Apache DolphinScheduler 1.3.4升级至3.1.2版本解决方案合集

新手入门
选择Apache DolphinScheduler的10个理由
Apache DolphinScheduler 3.1.8 保姆级教程【安装、介绍、项目运用、邮箱预警设置】轻松拿捏!
Apache DolphinScheduler 如何实现自动化打包+单机/集群部署?
Apache DolphinScheduler-3.1.3 版本安装部署详细教程
Apache DolphinScheduler 在大数据环境中的应用与调优

< 🐬🐬 >
参与社区

参与Apache DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689

非新手问题列表:https://github.com/apache/dolphinscheduler/issues?
q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的!

【声明】内容源于网络
0
0
海豚调度
Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
内容 667
粉丝 0
海豚调度 Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
总阅读167
粉丝0
内容667