- 低代码接入
Connect 提供丰富的官方连接器(Connector),如 Debezium MySQL Connector(用于捕获 MySQL 数据变更)和Elasticsearch Connector(用于写入数据至 ES),无需从零开发数据同步逻辑,仅需配置即可实现核心功能。 - 减少重复工作
避免手动处理数据库 binlog 解析、数据格式转换、异常重试等底层细节,开发者可专注于业务逻辑,降低开发周期与维护成本。
-
通过 Debezium 连接器监听 MySQL 的 binlog 日志,实时捕获订单数据的新增、修改、删除操作(CDC,Change Data Capture),同步延迟可控制在秒级,满足业务对 “实时性” 的需求。
-
Exactly-Once 语义:Connect 结合 Kafka 的事务机制与连接器的重试策略,确保数据在同步过程中不丢失、不重复,保障 MySQL 与 Elasticsearch 的数据一致性。
-
Connect 支持集群部署,单个节点故障后,其他节点可自动接管任务,避免单点风险,满足生产环境的高可用要求。
-
可通过增加 Connect Worker 节点数量,提升数据同步的吞吐量,轻松应对订单量激增(如大促场景)的同步需求。
-
支持通过 REST API 动态创建、修改、删除同步任务,无需重启服务即可更新同步规则(如新增订单表字段同步、调整同步频率)。
-
可通过配置内置或自定义的 Converter/Transformer,实现数据格式转换(如 MySQL datetime 转 ES timestamp)、字段过滤(如仅同步有效订单)、数据脱敏(如隐藏订单中的敏感信息)等个性化需求。
-
Connect 作为 Kafka 生态的一部分,可借助 Kafka 的消息队列能力,实现数据的缓冲与削峰,避免源端(MySQL)或目标端(ES)压力突增导致的同步异常。
-
除 MySQL 与 Elasticsearch 外,Connect 还支持 PostgreSQL、MongoDB、HDFS 等多种数据源与目标,未来业务扩展时可快速复用现有架构。
- Netflix
使用 Connect 将用户行为数据、订单数据从关系型数据库同步至 Elasticsearch 与数据湖,支撑用户推荐与业务分析。 - Uber
通过 Connect 实现全球订单数据的跨区域同步,结合 Elasticsearch 提供实时订单查询与轨迹追踪功能。 - 阿里巴巴
在电商业务中,利用 Connect 将 MySQL 中的商品、订单数据同步至 Elasticsearch,保障双 11 等大促场景下的高并发查询需求。 - LinkedIn
通过 Connect 构建数据管道,将用户资料、互动数据同步至 Elasticsearch,优化社交搜索体验。 - Airbnb
使用 Connect 同步房源与订单数据至 Elasticsearch,实现房源的多维度筛选与实时价格查询。
- 日志实时分析
将应用日志通过 Connect 同步至 Elasticsearch,结合 Kibana 实现日志的实时检索与可视化分析。 - 用户画像构建
将用户行为数据(如浏览、购买记录)从 MySQL 同步至 Elasticsearch,支撑用户画像的实时更新与精准推荐。 - 库存实时监控
将商品库存数据从 MySQL 同步至 Elasticsearch,实现库存的实时查询与低库存预警。 - 多系统数据打通
通过 Connect 将核心业务数据(如订单、用户)同步至 ES、Redis、HDFS 等多目标端,实现 “一次同步,多端复用”。
-
JDK 1.8+
-
SpringBoot 2.7.x(稳定版)
-
MySQL 8.0(开启 binlog,binlog 格式设为 ROW,用于 CDC 捕获)
-
Kafka 2.8.x(Connect 依赖 Kafka 集群,建议至少 3 节点部署)
-
Elasticsearch 7.14.x(单节点或集群均可,需提前创建订单索引)
-
Debezium MySQL Connector 1.9.x(捕获 MySQL 数据变更)
-
Elasticsearch Connector 13.1.x(写入数据至 ES)
<dependencies><!-- SpringBoot Web:提供HTTP接口 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Kafka Connect Client:管理Connect任务 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-api</artifactId><version>2.8.0</version></dependency><!-- MySQL Driver:数据源配置 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- Elasticsearch Client:操作ES索引 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.14.0</version></dependency><!-- SpringBoot Test:单元测试 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
spring:# MySQL数据源配置(用于初始化订单表,非Connect依赖)datasource:order-db:url: jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC&binlog_format=ROWusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driver# Kafka Connect配置kafka:connect:url: http://localhost:8083 # Connect集群的REST API地址(默认端口8083)worker:group-id: order-sync-group # Connect Worker组ID(用于高可用)# Elasticsearch配置elasticsearch:host: localhostport: 9200order-index: order_index # 订单数据在ES中的索引名username: elastic # ES用户名(默认elastic)password: elastic # ES密码(默认elastic)
order_db数据库中创建orders表,用于存储订单数据(需开启 binlog):
CREATE TABLE `orders` (`id` BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '订单ID',`user_id` BIGINT NOT NULL COMMENT '用户ID',`amount` DECIMAL(10,2) NOT NULL COMMENT '订单金额',`status` VARCHAR(20) NOT NULL COMMENT '订单状态(PENDING:待支付,PAID:已支付,CANCELLED:已取消)',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '电商订单表';
order_index索引,定义字段类型(如create_time设为date类型,支持时间范围查询):
package com.example.ordersync.es;import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.xcontent.XContentBuilder;import org.elasticsearch.common.xcontent.json.JsonXContent;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;import java.io.IOException;public class EsIndexInitializer implements CommandLineRunner {private RestHighLevelClient esClient;private String orderIndex;// 初始化ES订单索引public void run(String... args) throws Exception {if (!indexExists()) {createOrderIndex();System.out.println("Elasticsearch订单索引[" + orderIndex + "]创建成功");} else {System.out.println("Elasticsearch订单索引[" + orderIndex + "]已存在");}}// 判断索引是否已存在private boolean indexExists() throws IOException {return esClient.indices().exists(new org.elasticsearch.action.admin.indices.exists.IndicesExistsRequest(orderIndex),RequestOptions.DEFAULT);}// 创建订单索引(定义字段类型)private void createOrderIndex() throws IOException {CreateIndexRequest request = new CreateIndexRequest(orderIndex);// 设置索引分片与副本(单节点环境副本数设为0)request.settings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0));// 定义索引的字段映射XContentBuilder mapping = JsonXContent.contentBuilder().startObject().startObject("properties").startObject("id").field("type", "long") // 订单ID:长整型.endObject().startObject("user_id").field("type", "long") // 用户ID:长整型.endObject().startObject("amount").field("type", "double") // 订单金额:浮点型.endObject().startObject("status").field("type", "keyword") // 订单状态:keyword(精确匹配).endObject().startObject("create_time").field("type", "date") // 创建时间:date类型(支持时间范围查询).field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject().startObject("update_time").field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject().endObject().endObject();request.mapping(mapping);CreateIndexResponse response = esClient.indices().create(request, RequestOptions.DEFAULT);if (!response.isAcknowledged()) {throw new RuntimeException("Elasticsearch订单索引创建失败");}}}
package com.example.ordersync.connect;import com.alibaba.fastjson.JSONObject;import org.springframework.beans.factory.annotation.Value;import org.springframework.http.HttpEntity;import org.springframework.http.HttpHeaders;import org.springframework.http.HttpMethod;import org.springframework.http.MediaType;import org.springframework.stereotype.Component;import org.springframework.web.client.RestTemplate;public class ConnectClient {private final RestTemplate restTemplate = new RestTemplate();("${kafka.connect.url}")private String connectUrl;// 创建Connect连接器(源连接器/汇连接器)public JSONObject createConnector(String connectorName, JSONObject config) {String url = connectUrl + "/connectors";HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);// 构建请求体:包含连接器名称与配置JSONObject requestBody = new JSONObject();requestBody.put("name", connectorName);requestBody.put("config", config);HttpEntity<String> entity = new HttpEntity<>(requestBody.toString(), headers);return restTemplate.postForObject(url, entity, JSONObject.class);}// 查询指定连接器的状态public JSONObject getConnectorStatus(String connectorName) {String url = connectUrl + "/connectors/" + connectorName + "/status";return restTemplate.getForObject(url, JSONObject.class);}// 删除连接器public void deleteConnector(String connectorName) {String url = connectUrl + "/connectors/" + connectorName;restTemplate.delete(url);}}
orders表,将数据变更(新增 / 修改 / 删除)发送至 Kafka 主题:
package com.example.ordersync.connect;import com.alibaba.fastjson.JSONObject;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;public class MysqlSourceConnectorService {private ConnectClient connectClient;("${spring.datasource.order-db.url}")private String mysqlUrl;("${spring.datasource.order-db.username}")private String mysqlUsername;("${spring.datasource.order-db.password}")private String mysqlPassword;("${kafka.connect.worker.group-id}")private String workerGroupId;// 连接器名称(唯一标识)private static final String CONNECTOR_NAME = "mysql-order-source-connector";// 创建MySQL源连接器(捕获订单表变更)public JSONObject createMysqlSourceConnector() {JSONObject config = new JSONObject();// 1. 基础配置config.put("connector.class", "io.debezium.connector.mysql.MySqlConnector"); // Debezium MySQL连接器类config.put("tasks.max", 1); // 任务数(根据表数量调整)config.put("group.id", workerGroupId); // Worker组IDconfig.put("name", CONNECTOR_NAME);// 2. MySQL连接配置config.put("database.hostname", extractHostFromUrl(mysqlUrl)); // MySQL主机config.put("database.port", extractPortFromUrl(mysqlUrl)); // MySQL端口config.put("database.user", mysqlUsername); // MySQL用户名config.put("database.password", mysqlPassword); // MySQL密码config.put("database.dbname", extractDbNameFromUrl(mysqlUrl)); // 数据库名(order_db)config.put("database.server.name", "order-db-server"); // 逻辑服务器名(用于Kafka主题前缀)// 3. CDC配置(仅同步orders表,忽略其他表)config.put("table.include.list", "order_db.orders"); // 需同步的表(库名.表名)config.put("database.history.kafka.bootstrap.servers", "localhost:9092"); // Kafka集群地址config.put("database.history.kafka.topic", "order-db-history"); // 存储数据库结构变更的Kafka主题config.put("include.schema.changes", "false"); // 是否同步表结构变更(关闭)// 4. 数据格式配置(输出JSON格式,便于ES解析)config.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");config.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");config.put("key.converter.schemas.enable", "false"); // 关闭key的schema(简化数据)config.put("value.converter.schemas.enable", "false"); // 关闭value的schema// 调用Connect客户端创建连接器return connectClient.createConnector(CONNECTOR_NAME, config);}// 从MySQL URL中提取主机(如jdbc:mysql://localhost:3306/order_db → localhost)private String extractHostFromUrl(String url) {return url.split("//")[1].split(":")[0];}// 从MySQL URL中提取端口(如jdbc:mysql://localhost:3306/order_db → 3306)private int extractPortFromUrl(String url) {String portStr = url.split("//")[1].split(":")[1].split("/")[0];return Integer.parseInt(portStr);}// 从MySQL URL中提取数据库名(如jdbc:mysql://localhost:3306/order_db → order_db)private String extractDbNameFromUrl(String url) {return url.split("/")[3].split("\\?")[0];}}
order_index索引:
package com.example.ordersync.connect;import com.alibaba.fastjson.JSONObject;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;public class EsSinkConnectorService {private ConnectClient connectClient;private String esHost;private int esPort;private String esOrderIndex;private String esUsername;private String esPassword;private String workerGroupId;// 连接器名称(唯一标识)private static final String CONNECTOR_NAME = "es-order-sink-connector";// 创建Elasticsearch汇连接器(写入订单数据至ES)public JSONObject createEsSinkConnector() {JSONObject config = new JSONObject();// 1. 基础配置config.put("connector.class", "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"); // ES连接器类config.put("tasks.max", 1); // 任务数config.put("group.id", workerGroupId);config.put("name", CONNECTOR_NAME);// 2. Kafka主题配置(读取源连接器输出的主题:order-db-server.order_db.orders)config.put("topics", "order-db-server.order_db.orders"); // Kafka主题名(由源连接器的database.server.name + 库名 + 表名组成)// 3. Elasticsearch连接配置config.put("connection.url", "http://" + esHost + ":" + esPort); // ES连接地址config.put("connection.username", esUsername); // ES用户名config.put("connection.password", esPassword); // ES密码// 4. 数据写入配置config.put("index.name", esOrderIndex); // ES索引名config.put("key.ignore", "false"); // 不忽略Kafka的key(使用订单ID作为ES文档的ID,避免重复)config.put("schema.ignore", "true"); // 忽略schema(源连接器已关闭schema)// 5. 错误处理配置(重试3次,失败后记录到死信队列)config.put("retries", 3); // 重试次数config.put("deadletterqueue.topic.name", "order-sync-dead-letter-topic"); // 死信队列主题config.put("deadletterqueue.topic.replication.factor", 1); // 死信队列副本数// 调用Connect客户端创建连接器return connectClient.createConnector(CONNECTOR_NAME, config);}}
package com.example.ordersync.controller;import com.alibaba.fastjson.JSONObject;import com.example.ordersync.connect.EsSinkConnectorService;import com.example.ordersync.connect.MysqlSourceConnectorService;import com.example.ordersync.es.EsClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.web.bind.annotation.*;import java.util.Map;("/order/sync")public class OrderSyncController {private MysqlSourceConnectorService mysqlSourceService;private EsSinkConnectorService esSinkService;private EsClient esClient;("${elasticsearch.order-index}")private String esOrderIndex;// 1. 创建完整同步任务(源连接器+汇连接器)("/start")public JSONObject startSync() {JSONObject result = new JSONObject();try {// 先创建源连接器(MySQL → Kafka)JSONObject mysqlSourceResult = mysqlSourceService.createMysqlSourceConnector();// 再创建汇连接器(Kafka → Elasticsearch)JSONObject esSinkResult = esSinkService.createEsSinkConnector();result.put("code", 200);result.put("message", "订单同步任务启动成功");result.put("mysqlSourceConnector", mysqlSourceResult);result.put("esSinkConnector", esSinkResult);} catch (Exception e) {result.put("code", 500);result.put("message", "订单同步任务启动失败:" + e.getMessage());}return result;}// 2. 查询同步任务状态("/status")public JSONObject getSyncStatus( String connectorName) {JSONObject result = new JSONObject();try {JSONObject status = connectClient.getConnectorStatus(connectorName);result.put("code", 200);result.put("status", status);} catch (Exception e) {result.put("code", 500);result.put("message", "查询连接器状态失败:" + e.getMessage());}return result;}// 3. 测试数据同步(根据订单ID查询ES中的数据)("/test")public JSONObject testSync( Long orderId) {JSONObject result = new JSONObject();try {// 从ES中查询订单数据Map<String, Object> orderData = esClient.getDocument(esOrderIndex, orderId.toString());if (orderData != null) {result.put("code", 200);result.put("message", "数据同步成功");result.put("orderData", orderData);} else {result.put("code", 404);result.put("message", "ES中未查询到订单数据(可能同步未完成)");}} catch (Exception e) {result.put("code", 500);result.put("message", "测试同步失败:" + e.getMessage());}return result;}// 4. 停止同步任务(删除连接器)("/stop")public JSONObject stopSync( String connectorName) {JSONObject result = new JSONObject();try {connectClient.deleteConnector(connectorName);result.put("code", 200);result.put("message", "连接器[" + connectorName + "]已删除,同步任务停止");} catch (Exception e) {result.put("code", 500);result.put("message", "停止同步任务失败:" + e.getMessage());}return result;}}
POST
http://localhost:8080/order/sync/start{"code": 200,"message": "订单同步任务启动成功","mysqlSourceConnector": {"name": "mysql-order-source-connector","config": { ... },"tasks": [ ... ]},"esSinkConnector": {"name": "es-order-sink-connector","config": { ... },"tasks": [ ... ]}}
orders表中插入一条测试数据:
INSERT INTO orders (user_id, amount, status, create_time, update_time)VALUES (1001, 299.99, 'PAID', '2025-04-15 10:30:00', '2025-04-15 10:30:00');
GET
http://localhost:8080/order/sync/test?orderId=1{"code": 200,"message": "数据同步成功","orderData": {"id": 1,"user_id": 1001,"amount": 299.99,"status": "PAID","create_time": "2025-04-15 10:30:00","update_time": "2025-04-15 10:30:00"}}
-
基于 Connect 的官方连接器,无需自定义 binlog 解析与数据写入逻辑,同时保障数据不丢失、不重复;
-
通过 CDC 机制捕获 MySQL 数据变更,同步延迟秒级,满足业务对实时查询的需求;
-
Connect 集群支持水平扩展,且可通过 REST API 动态管理同步任务,便于运维。
8
交流技术 职位内推 行业探讨
9

