大数跨境
0
0

数据库同步实践(SUISHEN-CDC)

数据库同步实践(SUISHEN-CDC) 微鲤技术团队
2024-05-27
2
导读:本文介绍了数据库同步的实践流程。







01


需求背景



 需要将业务数据库的数据,向数仓同步,目前包括两种数据库:mongo、mysql。






02



总体实现方案


1、总体流程



  • a、定时任务,加载数据库事件偏移信息,统一监听数据库变更事件;

  • b、本地缓存收集事件变更信息(一定的数据量、一定的收集时间);

  • c、收集达到阈值后,向消息队列发送消息;

  • d、发送消息成功后,使用redis记录最后一条消息的偏移信息;

  • e、消息队列批量读取事件,批量向数仓同步数据。


 2、mongo实现方案


mongo-java-driver库,提供了Change Streams API来监听和获取实时变更(change)事件。通过Change Streams,可以监视集合中的插入、更新和删除等操作,并对这些变更事件做出响应。


@Testpublic void tst() {    // 获取指定数据库连接    MongoDatabase database = mongoTemplate.getDb().getMongoClient().getDatabase("database");    // 过滤需要监听的表    Document matchStage = new Document("$match", new Document("ns.coll", new Document("$in",            Arrays.asList("colletion", "label"))));    // 开启监听    ChangeStreamIterable<Document> changeStream = database            .watch(Arrays.asList(matchStage)).fullDocument(FullDocument.UPDATE_LOOKUP)            // 设置事件偏移信息            .resumeAfter(BsonDocument.parse("{\"_data\": \"8265AB99800000000129295A10048EBDB1DB4C23440CAD9BD906E9098378463C5F6964003C3134313334000004\"}"));
for (ChangeStreamDocument<Document> document : changeStream) { // 根据操作类型进行相应的操作 if (document.getOperationType() == OperationType.INSERT) { // 处理插入操作 System.out.println(JSON.toJSONString(document.getFullDocument())); } else if (document.getOperationType() == OperationType.UPDATE) { // 处理更新操作 System.out.println(JSON.toJSONString(document.getFullDocument())); } else if (document.getOperationType() == OperationType.DELETE) { // 处理删除操作 System.out.println(JSON.toJSONString(document.getDocumentKey())); } // 获取偏移信息 BsonDocument resumeToken = document.getResumeToken();
System.out.println(resumeToken.toJson()); }}



注意点

  • mongo的changeStream支持订阅指定某些表,但是如果后续要新增监听的表,会导致就的偏移信息不可用,所以建议监听整个库,由业务对不需要处理的表过滤;

  • 一些表存在过期索引,对于这种数据库自动过期的数据变更是否需要处理,业务也要自行处理。

  • 对于偏移的更新,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。


3、mysql实现方案

mysql-binlog-connector-java库可以连接到MySQL服务器并订阅binlog事件,监听和解析MySQL的二进制日志(binlog)。

public void tst() throws InterruptedException, IOException {    // 连接mysql    BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,            "root", "password");    // 设置偏移信息    client.setBinlogFilename("mysql-bin.062888");    client.setBinlogPosition(6080);
client.registerEventListener(event -> { System.out.println(JSON.toJSONString(event)); EventData data = event.getData(); if (data instanceof WriteRowsEventData) { WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data; System.out.println("Insert operation: " + JSON.toJSONString(writeRowsEventData.getRows())); // TODO: 处理插入操作 } else if (data instanceof UpdateRowsEventData) { UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data; System.out.println("Update operation: " + JSON.toJSONString(updateRowsEventData.getRows()));
// TODO: 处理更新操作 } else if (data instanceof DeleteRowsEventData) { DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data; System.out.println("Delete operation: " + JSON.toJSONString(deleteRowsEventData.getRows())); // TODO: 处理删除操作 } else if (event.getHeader().getEventType() == EventType.TABLE_MAP) { // 处理表信息 TableMapEventData eventData = event.getData(); System.out.println("Database name: " + eventData.getDatabase()); System.out.println("Table name: " + eventData.getTable()); } long binlogPosition = client.getBinlogPosition(); System.out.println(client.getBinlogFilename()); System.out.println(binlogPosition); });
client.connect();
}


注意点

  • 对同一数据库监听时,尽量指定serverId,一个serverId同一时间只能有一个客户端监听;

  • 无法单独订阅某一个库、某一个表和某一个事件,需要在handle处理中自行过滤;

  • 单独的事件中不包含语句所执行的库和表信息,只有一个tableId,需要监听TABLE_MAP事件,缓存tableId和具体表的映射,在具体的执行语句中通过此映射找到具体的表信息;

  • mysql对binlog有定期清理策略,需要注意binlog的缓存时间,避免重启时无法找到对应的binlog文件;

  • 对于偏移的处理,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。

  • 注意mysql用户权限CREATE USER 'userxxx'@'%' IDENTIFIED BY 'Qwe123!!!';
    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'userxxx' IDENTIFIED BY 'Qwe123!!!';
    FLUSH PRIVILEGES;





03



suishen-cdc核心实现



CdcDataDeque 事件本地缓存队列

这是一个线程安全的单向链表,提供的添加节点、查询头节点及重置头节点方法。

OffsetStorage 偏移量存储器

用户保存和读取数据库的偏移信息,默认提供了基于redis保存的RedisOffsetStorage存储器

DataProcessor 事件处理器

默认提供了基于suishen-queue消息队列的SuishenQueueDataProcessor实现。

当使用queue消息队列的SuishenQueueDataProcessor时,可以通过实现CdcDataOperator完成对具体的队列事件消费,默认提供了WeryaiCdcDataOperator向weryAi服务同步。

AbstractSynchronizer 数据库同步器

  • 继承Runnalbe接口,用于启动具体的监听任务;

  • 继承DisposableBean接口,用户停止监听任务;

  • 通过定时任务,指定时间内批量处理事件。

  • 当前提供了mongo同步的MongoSynchronizer同步器和mysql同步的MysqlSynchronizer同步器实现

public abstract class AbstractSynchronizer implements Runnable, DisposableBean {    // 本地缓存队列    private final CdcDataDeque<CdcData> queue = new CdcDataDeque<>();    // 偏移量存储器    protected final OffsetStorage offsetStorage;    // 事件处理器    protected final DataProcessor dataProcessor;
protected AbstractSynchronizer(OffsetStorage offsetStorage, DataProcessor dataProcessor) { this.offsetStorage = offsetStorage; this.dataProcessor = dataProcessor; }
protected void handler(CdcData data) { log.info("cdc AbstractSynchronizer:{}", JSON.toJSONString(data)); queue.put(data); }
protected void handler(String offset) { if (StringUtils.isEmpty(offset)) { return; } log.info("cdc AbstractSynchronizer offset:{}", offset); queue.put(new CdcData().setOffset(offset)); }
/** * 定时任务,每5s消费一次 */ @PostConstruct public void consume() { ThreadPoolTaskScheduler scheduledThreadPoolExecutor = new ThreadPoolTaskScheduler(); scheduledThreadPoolExecutor.setPoolSize(2); scheduledThreadPoolExecutor.initialize();
scheduledThreadPoolExecutor.execute(this); scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> { CdcDataDeque.Node<CdcData> first = queue.get(); if (first == null) { return; } String offset; List<CdcData> list = Lists.newArrayList(first.getItem()); while (true) { CdcDataDeque.Node<CdcData> next = first.getNext(); if (Objects.isNull(next)) { break; } CdcData item = next.getItem(); if (StringUtils.isNotEmpty(item.getId())) { list.add(item); } offset = item.getOffset(); // 限制一次性处理数据量 if (list.size() >= 90) { // 发送数据 dataProcessor.handle(list); // 保存偏移 offsetStorage.setOffset(offset); // 重置头节点 queue.resetFirst(next); list = Lists.newArrayList(); } first = next; } if (CollectionUtils.isNotEmpty(list)) { // 发送数据 dataProcessor.handle(list); } // 保存偏移 offsetStorage.setOffset(first.getItem().getOffset()); // 重置头节点 queue.resetFirst(first); // 释放对象 list = null; first = null; }, 5000); }}




作者 | 郑亚腾 资深服务端开发工程师

本文来自微鲤技术团队,转载请注明出处。

【声明】内容源于网络
0
0
微鲤技术团队
践行数据驱动理念,相信技术改变世界。
内容 25
粉丝 0
微鲤技术团队 践行数据驱动理念,相信技术改变世界。
总阅读42
粉丝0
内容25