
作 者 | 杜沁园(悬衡)
导语:引入消息队列可以帮助我们解耦业务逻辑,提升性能,让主链路更加清晰。但是消息链路的代码腐化和一致性问题也给业务带来了很多困扰,本文阐述了钉钉审批消息链路重构的设计和解决方案。注:Metaq 是阿里 RocketMQ 消息队列的内网版本。
概述
引入消息队列可以帮助我们解耦业务逻辑,提升性能,让主链路更加清晰。Metaq 也确实可靠,重试机制能够保障足够的一致性。
钉钉审批将审批中的关键事件,比如审批单发起,任务开始,任务结束以及审批单结束等等作为消息发布出去,将审批的主体流程和周边业务清晰地分开。
但是经过数年的产品迭代,周边业务越来越多,消息链路越来越复杂(详见 “消息链路的美好幻想与残酷现实” 章节):
1.不同的业务逻辑堆砌在一起互相影响,单个消息处理方法就能有上千行代码。
2.因为逻辑太复杂无法实现幂等,只能放弃重试,不一致问题严重。每个业务都必须额外开发复杂的对账,补偿机制才能避免客诉。
钉钉审批每分钟都会数十万条消息的发出,从监控可以看出平均每分钟都会有百条消息失败(有的时候会有千条),失败率大约 0.5%。

通过本文的最佳实践,将能实现:
1.将堆砌在一起的逻辑拆分成一个个业务 Listener 类(详见 “朴素的拆分想法” 章节);
2.当有业务 Listener 失败时,可以实现失败业务精准重试,而不是粗暴地全部重试(详见 “精准重试” 章节);
3.高性能地构建 Listener 的统一上下文,降低读扩散,并且避免其随着迭代腐化(详解 “统一上下文” 章节);
4.最后,本文的实践不需要额外的存储,也不需要建立额外的 Consumer,原来的基础设施可以直接复用;
通过下图监控可以看出,经过本文方案的治理,消息链路每天只有非常零星的失败(约几十个),而审批每天要发送两亿以上的消息,失败率只有约 0.000005%。

图中只有顶点处存在失败,其他都是 Sunfire 的自动连线,不存在失败。
消息链路的美好幻想与残酷现实
美好幻想
消息队列在设计之初就给业务规划好了一条康庄大道:
-
主业务链路作为 Producer 发出消息 -
数十个甚至更多 Consumer 订阅该消息,分别执行自己快速且幂等的原子逻辑

-
逻辑相互影响,修改风险高;
-
链路脆弱,容易中断,一个调用失败,后续所有逻辑将不会执行;
-
没有重试:大泥球无法做到原子和幂等,整体重试代价太大,所以直接异步执行放弃重试 ;
-
消息队列引以为傲的重试功能反而会成为故障的温床,导致雪崩。
为什么不直接把大泥球拆分成前面的多个 Consumer 呢?这确实也是一种方案,但是对于大泥球 Consumer,可能会拆出几十个 Consumer,这会导致非常严重的读扩散。举个例子,审批单发起的消息中只含有审批单的 id,内容需要从数据库反查,原本在“大泥球”中,只需要查询一次就复用,而拆分后可能要多查几十次。这还只是众多扩散问题的其中一个,如果为了治理大泥球,却加重了扩散问题,就得不偿失了。
public interface ProcessEventListener extends EventListener {
/**
* 审批单发起事件
*/
void onProcessInstanceStart(InstanceEventContext instanceEventContext);
/**
* 审批单结束事件
*/
void onProcessInstanceFinish(InstanceEventContext instanceEventContext);
/**
* 审批任务生成事件
*/
void onTaskActivated(TaskEventContext taskEventContext);
// 省略其他事件
// ...
}

// 接到审批单发起消息
InstanceEventContext instanceEventContext = buildContext();
for (handler : handlers) {
try {
handler.onProcessInstanceStart(instanceEventContext);
} catch (Exception e) {
// 打印监控日志等等
// ...
}
}
-
某个处理器因为网络超时失败了,如何重试?:我们仅仅是将逻辑拆开了,执行的时候还是一串 “大泥球”,如果仅仅依靠消息队列本身机制,要重试只能一起重试,这显然无法满足诉求;
-
如何高性能地构建庞大的统一上下文(即 Context 参数):为了满足众多处理器对数据查询的诉求,需要提供庞大的上下文,除了性能风险外,也是代码腐败的温床
-
Ignore:非重要业务,失败就算了,不需要重试;
-
Concurrent:在另外线程池中并发执行,也不会重试。适合一些容易影响后续执行的长耗时的处理器;
-
Retry Now:立即重试。会将任务放到本地的一个延迟队列中,100~500ms 后重试。适合时效性比较强的处理器;
-
Retry Later:重投消息,精确重试失败的处理器,遵从 Metaq 的重投延迟,前三次重试分别是 1s 5s 10s ,因此不适合时效性强的处理器;
为了方便使用,我们将这些策略做成了注解的形式,比如审批抄送时效性没这么强,可以使用 Retry Later 策略:
public class CcListener implements ProcessEventListener {
// Retry Later 策略, 最多重试两次
@Policy(value = PolicyType.RETRY_LATER, retry = 2)
@Override
public void onProcessInstanceStart(InstanceEventContext instanceEventContext) {
//... 逻辑省略
}
}
public class SyncTodoTaskListener implements ProcessEventListener {
// Retry Now 策略, 最多重试三次
@Policy(value = PolicyType.RETRY_NOW, retry = 3)
@Override
public void onProcessInstanceStart(InstanceEventContext instanceEventContext) {
//... 逻辑省略
}
}
前三种策略都比较好理解,就不多说了。下文重点讨论最后一种策略是如何实现的。
{
// 总体重试次数, 第一次重试(第 0 次代表正常执行)
"globalCnt": 1,
// 每个处理器的执行状态
"cntMap": {
// handler1 第 1 次重试, 读取该属性可以判断 handler1 是否还有重试机会
"handler1": 1,
// -1 表示 handler2 已经执行成功, 不需要再执行
"handler2": -1,
// -2 表示 handler3 已经彻底执行失败(一般是超过了设置的最大重试次数), 不需要再执行
"handler3": -2
}
}
Message message = new Message();
// 重投到专门的重试主题
message.setTopic("my-retry-topic");
// 消息体保持不变
message.setBody(preBody);
// nextCnt 是重试的次数
// 设置 DelayTimeLevel 能够让重投有一定的延时
message.setDelayTimeLevel(nextCnt);
// 将本次执行状态存储到 user property 中
message.putUserProperty("RETRY_STORE", "{\"globalCnt\":1,\"cntMap\":{\"handler1\":1,\"handler2\":-1,\"handler3\":-2}}");
// 发送消息
mqPublishService.send(message);

图中的重投都是指往 topic 再发一遍消息,而不是 Metaq 自身的重投机制。
-
为了满足所有处理器的需求,上下文往往会很庞大,因此构建性能差。
-
外部无法感知处理器内部需要使用上下文的哪些字段,只能一股脑地将所有字段都填充好,传递进去,而且内部很有可能一个字段都不使用,白白损耗了性能。
-
上下文中存在一些幽灵字段,在某个处理器中设置进去,又在某几个处理器中读取,也就是它有时候为 null,有时候又有值,维护难度巨大,从中取个值都要战战兢兢。
-
读扩散问题:每个处理器都去读相同的数据,导致链路数十倍的读扩散。
public class User {
// 用户 id
private Long uid;
// 用户的部门,为了保持示例简单,这里就用普通的字符串
// 需要远程调用 通讯录系统 获得
private final Lazy<String> department;
public User(Long uid, Lazy<String> department) {
this.uid = uid;
this.department = department;
}
public Long getUid() {
return uid;
}
public String getDepartment() {
return department.get();
}
}
// 构建 User 实体
Long uid = 1L;
User user = new User(uid, Lazy.of(() -> departmentService.getDepartment(uid)));
// 使用 User 实体,部门属性用起来和普通属性一样
user.getDepartment();
Lazy 框架的具体实现可以参考另一篇文章 利用惰性写出高性能且抽象的代码。


// 通过用户获得部门
Lazy<String> departmentLazy = Lazy.of(() -> departmentService.getDepartment(uid));
// 通过部门获得主管
// department -> supervisor
Lazy<Long> supervisorLazy = departmentLazy.map(
department -> SupervisorService.getSupervisor(department)
);
-
审批单实例上下文 -
审批单发起消息 -
审批单结束消息 -
审批单撤销消息 -
...
-
活动上下文 -
活动开始消息 -
活动结束消息 -
...
-
任务上下文 -
任务开始消息 -
任务结束消息 -
任务取消消息 -
...

-
rebalance 导致重试次数归 0; -
消费者执行超时,重试次数从 3 开始继续重投;
private boolean isDuplicateMessage(Message msg) {
try {
Integer consumedCount = ltairManager
.incr("dingflow_mq_consume_" + msg.getMsgId() + "_" + msg.getReconsumeTimes(), 1, 30);
if (consumedCount != null && consumedCount > 1) {
return true;
}
} catch (Throwable throwable) {
// 打印错误日志
// ...
}
return false;
}

如图,OpenEventCallbackListener 处理器因为 NPE 问题,失败量突然大幅度增加。
-
将消息链路拆分成多个处理器;
-
利用 Metaq 的 UserProperty 存储每次处理器的执行状态,精准重试失败的处理器;
-
用懒加载机制构建统一上下文,提升构建性能,降低读扩散,最终设计出最符合业务的上下文;

