
01
—
需求背景
需要将业务数据库的数据,向数仓同步,目前包括两种数据库:mongo、mysql。
02
—
总体实现方案
1、总体流程

a、定时任务,加载数据库事件偏移信息,统一监听数据库变更事件;
b、本地缓存收集事件变更信息(一定的数据量、一定的收集时间);
c、收集达到阈值后,向消息队列发送消息;
d、发送消息成功后,使用redis记录最后一条消息的偏移信息;
e、消息队列批量读取事件,批量向数仓同步数据。
2、mongo实现方案
mongo-java-driver库,提供了Change Streams API来监听和获取实时变更(change)事件。通过Change Streams,可以监视集合中的插入、更新和删除等操作,并对这些变更事件做出响应。
@Testpublic void tst() {// 获取指定数据库连接MongoDatabase database = mongoTemplate.getDb().getMongoClient().getDatabase("database");// 过滤需要监听的表Document matchStage = new Document("$match", new Document("ns.coll", new Document("$in",Arrays.asList("colletion", "label"))));// 开启监听ChangeStreamIterable<Document> changeStream = database.watch(Arrays.asList(matchStage)).fullDocument(FullDocument.UPDATE_LOOKUP)// 设置事件偏移信息.resumeAfter(BsonDocument.parse("{\"_data\": \"8265AB99800000000129295A10048EBDB1DB4C23440CAD9BD906E9098378463C5F6964003C3134313334000004\"}"));for (ChangeStreamDocument<Document> document : changeStream) {// 根据操作类型进行相应的操作if (document.getOperationType() == OperationType.INSERT) {// 处理插入操作System.out.println(JSON.toJSONString(document.getFullDocument()));} else if (document.getOperationType() == OperationType.UPDATE) {// 处理更新操作System.out.println(JSON.toJSONString(document.getFullDocument()));} else if (document.getOperationType() == OperationType.DELETE) {// 处理删除操作System.out.println(JSON.toJSONString(document.getDocumentKey()));}// 获取偏移信息BsonDocument resumeToken = document.getResumeToken();System.out.println(resumeToken.toJson());}}
注意点
mongo的changeStream支持订阅指定某些表,但是如果后续要新增监听的表,会导致就的偏移信息不可用,所以建议监听整个库,由业务对不需要处理的表过滤;
一些表存在过期索引,对于这种数据库自动过期的数据变更是否需要处理,业务也要自行处理。
对于偏移的更新,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。
3、mysql实现方案
mysql-binlog-connector-java库可以连接到MySQL服务器并订阅binlog事件,监听和解析MySQL的二进制日志(binlog)。
public void tst() throws InterruptedException, IOException {// 连接mysqlBinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,"root", "password");// 设置偏移信息client.setBinlogFilename("mysql-bin.062888");client.setBinlogPosition(6080);client.registerEventListener(event -> {System.out.println(JSON.toJSONString(event));EventData data = event.getData();if (data instanceof WriteRowsEventData) {WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;System.out.println("Insert operation: " + JSON.toJSONString(writeRowsEventData.getRows()));// TODO: 处理插入操作} else if (data instanceof UpdateRowsEventData) {UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;System.out.println("Update operation: " + JSON.toJSONString(updateRowsEventData.getRows()));// TODO: 处理更新操作} else if (data instanceof DeleteRowsEventData) {DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;System.out.println("Delete operation: " + JSON.toJSONString(deleteRowsEventData.getRows()));// TODO: 处理删除操作} else if (event.getHeader().getEventType() == EventType.TABLE_MAP) {// 处理表信息TableMapEventData eventData = event.getData();System.out.println("Database name: " + eventData.getDatabase());System.out.println("Table name: " + eventData.getTable());}long binlogPosition = client.getBinlogPosition();System.out.println(client.getBinlogFilename());System.out.println(binlogPosition);});client.connect();}
注意点
对同一数据库监听时,尽量指定serverId,一个serverId同一时间只能有一个客户端监听;
无法单独订阅某一个库、某一个表和某一个事件,需要在handle处理中自行过滤;
单独的事件中不包含语句所执行的库和表信息,只有一个tableId,需要监听TABLE_MAP事件,缓存tableId和具体表的映射,在具体的执行语句中通过此映射找到具体的表信息;
mysql对binlog有定期清理策略,需要注意binlog的缓存时间,避免重启时无法找到对应的binlog文件;
对于偏移的处理,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。
注意mysql用户权限CREATE USER 'userxxx'@'%' IDENTIFIED BY 'Qwe123!!!';
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'userxxx' IDENTIFIED BY 'Qwe123!!!';
FLUSH PRIVILEGES;
03
—
suishen-cdc核心实现
CdcDataDeque 事件本地缓存队列
这是一个线程安全的单向链表,提供的添加节点、查询头节点及重置头节点方法。
OffsetStorage 偏移量存储器
用户保存和读取数据库的偏移信息,默认提供了基于redis保存的RedisOffsetStorage存储器
DataProcessor 事件处理器
默认提供了基于suishen-queue消息队列的SuishenQueueDataProcessor实现。
当使用queue消息队列的SuishenQueueDataProcessor时,可以通过实现CdcDataOperator完成对具体的队列事件消费,默认提供了WeryaiCdcDataOperator向weryAi服务同步。
AbstractSynchronizer 数据库同步器
继承Runnalbe接口,用于启动具体的监听任务;
继承DisposableBean接口,用户停止监听任务;
通过定时任务,指定时间内批量处理事件。
当前提供了mongo同步的MongoSynchronizer同步器和mysql同步的MysqlSynchronizer同步器实现
public abstract class AbstractSynchronizer implements Runnable, DisposableBean {// 本地缓存队列private final CdcDataDeque<CdcData> queue = new CdcDataDeque<>();// 偏移量存储器protected final OffsetStorage offsetStorage;// 事件处理器protected final DataProcessor dataProcessor;protected AbstractSynchronizer(OffsetStorage offsetStorage, DataProcessor dataProcessor) {this.offsetStorage = offsetStorage;this.dataProcessor = dataProcessor;}protected void handler(CdcData data) {log.info("cdc AbstractSynchronizer:{}", JSON.toJSONString(data));queue.put(data);}protected void handler(String offset) {if (StringUtils.isEmpty(offset)) {return;}log.info("cdc AbstractSynchronizer offset:{}", offset);queue.put(new CdcData().setOffset(offset));}/*** 定时任务,每5s消费一次*/@PostConstructpublic void consume() {ThreadPoolTaskScheduler scheduledThreadPoolExecutor = new ThreadPoolTaskScheduler();scheduledThreadPoolExecutor.setPoolSize(2);scheduledThreadPoolExecutor.initialize();scheduledThreadPoolExecutor.execute(this);scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {CdcDataDeque.Node<CdcData> first = queue.get();if (first == null) {return;}String offset;List<CdcData> list = Lists.newArrayList(first.getItem());while (true) {CdcDataDeque.Node<CdcData> next = first.getNext();if (Objects.isNull(next)) {break;}CdcData item = next.getItem();if (StringUtils.isNotEmpty(item.getId())) {list.add(item);}offset = item.getOffset();// 限制一次性处理数据量if (list.size() >= 90) {// 发送数据dataProcessor.handle(list);// 保存偏移offsetStorage.setOffset(offset);// 重置头节点queue.resetFirst(next);list = Lists.newArrayList();}first = next;}if (CollectionUtils.isNotEmpty(list)) {// 发送数据dataProcessor.handle(list);}// 保存偏移offsetStorage.setOffset(first.getItem().getOffset());// 重置头节点queue.resetFirst(first);// 释放对象list = null;first = null;}, 5000);}}
作者 | 郑亚腾 资深服务端开发工程师

