大数跨境
0
0

Apache SeaTunnel 如何将 CDC 数据流转换为 Append-Only 模式?

Apache SeaTunnel 如何将 CDC 数据流转换为 Append-Only 模式? SeaTunnel
2025-11-17
2
导读:本文将介绍 RowKindExtractor 的核心功能,其在 CDC 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例。

点击蓝字



关注我们


RowKindExtractor 是 Apache SeaTunnel 的一个转换插件,它能将 CDC 数据流转为 Append-Only 模式,并提取原始 RowKind 信息为新字段。本文将介绍 RowKindExtractor 的核心功能,其在 CDC 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例。

 

RowKindExtractor

RowKindExtractor 转换插件

描述

RowKindExtractor 转换插件用于将 CDC(Change Data Capture)数据流转换为 Append-Only(仅追加)模式,同时将原始的 RowKind 信息提取为一个新的字段。

核心功能:

  • • 将所有数据行的 RowKind 统一改为 +I(INSERT),实现 Append-Only 模式
  • • 将原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
  • • 支持短格式和完整格式两种输出方式

为什么需要这个插件?

在 CDC 数据同步场景中,数据行带有 RowKind 标记(+I、-U、+U、-D),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持 Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此时需要:

  1. 1. 将所有数据转换为 INSERT 类型(Append-Only)
  2. 2. 将原始的变更类型保存为普通字段,供后续分析使用

转换示例:


   
    
   输入(CDC 数据):
  RowKind: -D (DELETE)
  数据: id=1, name="test1", age=20

输出(Append-Only 数据):
  RowKind: +I (INSERT)
  数据: id=1, name="test1", age=20, row_kind="DELETE"

典型应用场景:

  • • 将 CDC 数据写入只支持 Append 的数据湖
  • • 需要在数据仓库中保留完整的变更历史记录
  • • 需要对不同类型的变更进行统计分析

配置选项

参数名
类型
是否必填
默认值
说明
custom_field_name
string
row_kind
新增字段的名称,用于存储原始的 RowKind 信息
transform_type
enum
SHORT
RowKind 的输出格式,可选值:SHORT(短格式)或 FULL(完整格式)

custom_field_name [string]

指定新增字段的名称,该字段用于存储原始的 RowKind 信息。

默认值:row_kind

注意事项:

  • • 字段名不能与原有字段重名,否则会报错
  • • 建议使用有意义的名称,如 operation_typechange_typecdc_op 等

示例:


   
    
   custom_field_name = "operation_type"  # 使用自定义字段名

transform_type [enum]

指定 RowKind 字段值的输出格式。

可选值:

格式
说明
输出值
SHORT
短格式(符号表示)
+I
-U+U-D
FULL
完整格式(英文名称)
INSERT
UPDATE_BEFOREUPDATE_AFTERDELETE

默认值:SHORT

各值含义:

RowKind 类型
SHORT 格式
FULL 格式
说明
INSERT
+I
INSERT
插入操作
UPDATE_BEFORE
-U
UPDATE_BEFORE
更新前的值
UPDATE_AFTER
+U
UPDATE_AFTER
更新后的值
DELETE
-D
DELETE
删除操作

选择建议:

  • • SHORT 格式:节省存储空间,适合对存储敏感的场景
  • • FULL 格式:可读性更好,适合需要人工查看或分析的场景

示例:


   
    
   transform_type = FULL  # 使用完整格式

完整示例

示例 1:使用默认配置(SHORT 格式)

使用默认配置,将 CDC 数据转换为 Append-Only 模式,RowKind 以短格式保存。


   
    
   env {
  parallelism
 = 1
  job.mode
 = "STREAMING"
}

source
 {
  MySQL-CDC
 {
    plugin_output
 = "cdc_source"
    server-id
 = 5652
    username
 = "root"
    password
 = "your_password"
    table-names
 = ["mydb.users"]
    url
 = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform
 {
  RowKindExtractor
 {
    plugin_input
 = "cdc_source"
    plugin_output
 = "append_only_data"
    # 使用默认配置:

    # custom_field_name = "row_kind"

    # transform_type = SHORT

  }
}

sink
 {
  Console
 {
    plugin_input
 = "append_only_data"
  }
}

数据转换过程:


   
    
   输入数据(CDC 格式):
  1. RowKind=+I, id=1, name="张三", age=25
  2. RowKind=-U, id=1, name="张三", age=25
  3. RowKind=+U, id=1, name="张三", age=26
  4. RowKind=-D, id=1, name="张三", age=26

输出数据(Append-Only 格式):
  1. RowKind=+I, id=1, name="张三", age=25, row_kind="+I"
  2. RowKind=+I, id=1, name="张三", age=25, row_kind="-U"
  3. RowKind=+I, id=1, name="张三", age=26, row_kind="+U"
  4. RowKind=+I, id=1, name="张三", age=26, row_kind="-D"

示例 2:使用 FULL 格式和自定义字段名

使用完整格式输出 RowKind,并自定义字段名称。


   
    
   env {
  parallelism
 = 1
  job.mode
 = "STREAMING"
}

source
 {
  MySQL-CDC
 {
    plugin_output
 = "cdc_source"
    server-id
 = 5652
    username
 = "root"
    password
 = "your_password"
    table-names
 = ["mydb.orders"]
    url
 = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform
 {
  RowKindExtractor
 {
    plugin_input
 = "cdc_source"
    plugin_output
 = "append_only_data"
    custom_field_name
 = "operation_type"  # 自定义字段名
    transform_type
 = FULL                 # 使用完整格式
  }
}

sink
 {
  Iceberg
 {
    plugin_input
 = "append_only_data"
    catalog_name
 = "iceberg_catalog"
    database
 = "mydb"
    table
 = "orders_history"
    # Iceberg 表会包含 operation_type 字段,记录每条数据的变更类型

  }
}

数据转换过程:


   
    
   输入数据(CDC 格式):
  1. RowKind=+I, order_id=1001, amount=100.00
  2. RowKind=-U, order_id=1001, amount=100.00
  3. RowKind=+U, order_id=1001, amount=150.00
  4. RowKind=-D, order_id=1001, amount=150.00

输出数据(Append-Only 格式,FULL 格式):
  1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
  2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
  3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
  4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"

示例 3:完整的测试示例(使用 FakeSource)

使用 FakeSource 生成测试数据,演示各种 RowKind 的转换效果。


   
    
   env {
  parallelism
 = 1
  job.mode
 = "BATCH"
}

source
 {
  FakeSource
 {
    plugin_output
 = "fake_cdc_data"
    schema
 = {
      fields
 {
        pk_id
 = bigint
        name
 = string
        score
 = int
      }
      primaryKey
 {
        name
 = "pk_id"
        columnNames
 = [pk_id]
      }
    }
    rows
 = [
      {
        kind
 = INSERT
        fields
 = [1, "A", 100]
      },
      {
        kind
 = INSERT
        fields
 = [2, "B", 100]
      },
      {
        kind
 = UPDATE_BEFORE
        fields
 = [1, "A", 100]
      },
      {
        kind
 = UPDATE_AFTER
        fields
 = [1, "A_updated", 95]
      },
      {
        kind
 = UPDATE_BEFORE
        fields
 = [2, "B", 100]
      },
      {
        kind
 = UPDATE_AFTER
        fields
 = [2, "B_updated", 98]
      },
      {
        kind
 = DELETE
        fields
 = [1, "A_updated", 95]
      }
    ]
  }
}

transform
 {
  RowKindExtractor
 {
    plugin_input
 = "fake_cdc_data"
    plugin_output
 = "transformed_data"
    custom_field_name
 = "change_type"
    transform_type
 = FULL
  }
}

sink
 {
  Console
 {
    plugin_input
 = "transformed_data"
  }
}

预期输出:


   
    
   +I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"

 

 

 

 


 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Apache SeaTunnel

Apache SeaTunnel是一个云原生的多模态、高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布Apache SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达8k+,社区达到6000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。




同步Demo

MySQL→Doris | MySQLCDC | MySQL→Hive | HTTP → Doris  | HTTP → MySQL | MySQL→StarRocks|MySQL→Elasticsearch |Kafka→ClickHouse

新手入门

SeaTunnel 让数据集成变得 So easy!3 分钟入门指南
 0 到 1 快速入门 /初探/深入理解 
  分布式集群部署 | CDC数据同步管道 | Oracle-CDC
图片

最佳实践

中控技术天翼云多点OPPO | 清风马蜂窝孩子王哔哩哔哩唯品会众安保险兆原数通 | 亚信科技|映客|翼康济世|信也科技|华润置地|Shopee|京东科技|58同城|互联网银行|JPMorgan
图片

测试报告

SeaTunnel VS GLUE |  VS Airbyte |  VS DataX|SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

图片

源码解析

Zeta引擎源码解析(一) |(二) |(三)| API 源码解析 |2.1.1源码解析|封装 Flink 连接数据库解析





仓库地址: 
https://github.com/apache/seatunnel
网址:
https://seatunnel.apache.org/
Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在Community Over Code(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:
https://github.com/apache/seatunnel/issues
贡献代码:
https://github.com/apache/seatunnel/pulls
订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org
开发邮件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
关注 X.com: 
https://x.com/ASFSeaTunnel


【声明】内容源于网络
0
0
SeaTunnel
Apache SeaTunnel 是新一代超高性能数据集成工具,同时支持海量数据离线和实时同步,每天可以稳定高效同步万亿级数据,已在数百家公司生产使用,致力于为企业提供高效、可靠的数据处理方案,助力构建数据驱动的智能应用。
内容 472
粉丝 0
SeaTunnel Apache SeaTunnel 是新一代超高性能数据集成工具,同时支持海量数据离线和实时同步,每天可以稳定高效同步万亿级数据,已在数百家公司生产使用,致力于为企业提供高效、可靠的数据处理方案,助力构建数据驱动的智能应用。
总阅读36
粉丝0
内容472