大数跨境
0
0

面试官揪着问:如何保证MQ消息不丢失?重复消费如何保证幂等?

面试官揪着问:如何保证MQ消息不丢失?重复消费如何保证幂等? 码哥跳动
2025-11-10
0
导读:消息零丢失的三位一体架构本质上是对不确定性的系统化防御​

你好,见字如面。我是《Redis 高手心法》作者码哥,腾讯云架构师同盟深圳区理事会成员、InfoQ 签约作者,是一个手持菜刀砍电线,一路火花带闪电的靓仔。

面试官在面试候选人时,如果发现候选人的简历中写了在项目中使用了 MQ 技术(如 Kafka、RabbitMQ、RocketMQ),基本都会抛出一个问题:在使用 MQ 的时候,怎么确保消息 100% 不丢失?重复消费如何保证幂等?

这两个问题在实际工作中也很常见,既能考察你对 MQ 的掌握程度又能很好的判断是否有对应的实战经验。

本文将深入剖析消息丢失的本质原因,揭示 MQ 核心实现原理,并提供一套完整的 Java 实战解决方案。

消息传递的生命周期

如下图所示,阿斗被邀请去休闲养生 SPA 享受,服务包含泡脚、按摩、吃水果、看电视,玩真人 CS。

  • 生产者:休闲养生 SPA 系统,发送一条消息到 MQ。
  • MQ 消息队列:存储消息。
  • 消息消费者:享受泡脚技师帮泡脚、按摩技师肩背按摩、推油技师推背,同时吃水果看电视(估计是不会看电视了)。

此间乐不思蜀也……

消息的生命周期如下图所示。

你可以发现,从生产者发送消息,MQ 保存消息,消费者消费消息,每一个环节都有可能丢失消息。


各环节丢失概率统计

环节
故障概率
平均恢复时间
网络传输
0.1%-1%
秒级
内存存储
0.01%-0.1%
分钟级
磁盘故障
0.001%-0.01%
小时级
程序异常
0.1%-5%
分钟级

典型业务场景代价

  • 支付系统:单条消息丢失 ≈ 平均订单金额(如 1000 元)
  • 库存系统:1%消息丢失率 ≈10 倍超卖风险
  • 物流追踪:消息丢失率>0.1%≈ 客户投诉率提升 300%

消息生产者

当生产者往 MQ 中写数据时,以下场景会导致消息丢失:

  1. 网络闪断:发送过程中网络中断
  2. ACK 丢失:MQ 成功处理但确认丢失
  3. 发送超时:网络延迟导致超时误判
  4. 程序崩溃:处理中进程意外退出

生产者发送消息,主流消息队列都支持同步发送和异步发送。

如果使用同步发送,生产者发送消息后,会同步等待 Broker 返回的 ACK,收到 ACK 消息,就认为消息发送成功。如果长时间没有收到,则会认为消息发送失败,需要进行重试。

本地消息表 + 异步重试

消息发送的流程如下图所示,基于本地消息表 + 业务数据表构成本地事务。

通过消息一步发送并接受消息队列的 ACK 来更新消息表状态,若果未发送则继续重试发送,保证消息一定发送出去。

代码案例如下所示:

@Service
publicclass ReliableProducer {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Transactional
    public void createOrder(Order order) {
        // 1. 业务数据入库
        jdbcTemplate.update(
            "INSERT INTO orders(id, amount) VALUES(?, ?)",
            order.getId(), order.getAmount());

        // 2. 消息记录入库
        String msgId = UUID.randomUUID().toString();
        jdbcTemplate.update(
            "INSERT INTO message_log(msg_id, topic, message, status) VALUES(?, ?, ?, ?)",
            msgId, "orders", JsonUtil.toJson(order), 0); // 0-待发送

        // 事务提交后触发异步发送
        CompletableFuture.runAsync(() -> sendWithRetry(msgId));
    }

    // 这里其实可以使用 xxl-job 等分布式调度框架查询未发送成功的消息发送。
    private void sendWithRetry(String msgId) {
        MessageRecord msg = jdbcTemplate.queryForObject(
            "SELECT * FROM message_log WHERE msg_id = ?",
            new MessageRecordRowMapper(), msgId);

        int attempt = 0;
        while (attempt < MAX_RETRIES) {
            try {
                ListenableFuture<SendResult<String, String>> future =
                    kafkaTemplate.send(msg.getTopic(), msg.getMessage());

                future.addCallback(result -> {
                    // 更新发送状态
                    jdbcTemplate.update("UPDATE message_log SET status = 1 WHERE msg_id = ?", msgId);
                }, ex -> {
                    scheduleRetry(msgId, attempt); // 失败重试
                });

                return;
            } catch (Exception e) {
                scheduleRetry(msgId, attempt);
                attempt++;
            }
        }
    }

    private void scheduleRetry(String msgId, int attempt) {
        long delay = (long) Math.pow(2, attempt) * 1000// 指数退避
        scheduler.schedule(() -> sendWithRetry(msgId), delay, TimeUnit.MILLISECONDS);
    }
}

ACK 机制原理对比

MQ 类型
ACK 机制
可靠性
性能影响
Kafka
acks=0
最低
Kafka
acks=1
中等
Kafka
acks=all
最高
RabbitMQ
无确认
RabbitMQ
生产者确认
中等
RocketMQ
同步刷盘
最高

MQ 服务端:消息 100%存储原理

生产者发送消息成功,也不能保证消息绝对不丢失。因为即使消息发送到 Broker,如果在消费者拉取到消息之前,Broker 宕机了,消息还没有落盘,也会导致消息丢失。

kafka 存储架构剖析

  1. Producer(生产者):发送消息的一方,负责发布消息到 Kafka 主题(Topic)。

  2. Consumer(消费者):接受消息的一方,订阅主题并处理消息。Kafka 有 ConsumerGroup 的概念,每个 Consumer 只能消费所分配到的 Partition 的消息,每一个 Partition 只能被一个 ConsumerGroup 中的一个 Consumer 所消费,所以同一个 ConsumerGroup 中 Consumer 的数量如果超过了 Partiton 的数量,将会出现有些 Consumer 分配不到 partition 消费。

  3. Broker(代理):服务代理节点,Kafka 集群中的一台服务器就是一个 broker,可以水平无限扩展,同一个 Topic 的消息可以分布在多个 broker 中

  4. Topic(主题)与 Partition(分区) :Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。图中 TopicA 有三个 Partiton(TopicA-par0、TopicA-par1、TopicA-par2)

    为了提升整个集群的吞吐量,Topic 在物理上还可以细分多个 Partition,一个 Partition 在磁盘上对应一个文件夹。

  5. Replica(副本):副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有 leader 副本对外提供读写服务,当 leader 副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。

  6. ZooKeeper:管理 Kafka 集群的元数据和分布式协调。

同步刷盘

kafka 为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。

消息的刷盘过程,为了提高性能,减少刷盘次数,kafka 采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。

这种机制也是由于 linux 操作系统决定的。

将数据存储到 linux 操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从 page cache 到 file),或者通过 fsync 命令强制刷盘。

数据在 page cache 中时,如果系统挂掉,数据会丢失。

kafka 可靠性黄金配置

如图所示的 kafka 集群,一个 Broker 的 Topic 其中一个 partition 一共有三 副本(包含 Leader)。

试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。

该场景的 Kafka Broker 黄金高可靠配置如下:

# Kafka配置示例
acks=all
min.insync.replicas=2 // 最小同步副本数
replication.factor=3  // 每个分区的 总副本数量(含 Leader)
unclean.leader.election.enable=false
log.flush.interval.messages=10000
log.flush.interval.ms=1000
  • acks=all:生产者要求所有 ISR(In-Sync Replicas)副本 都成功写入消息后才返回确认。
  • min.insync.replicas:定义 最小同步副本数,必须至少有 2 个副本处于同步状态(含 Leader)。
    • 当 replication.factor=3 且 min.insync.replicas=2 时:允许 1 个副本宕机(如 Broker 故障)、若 2 个副本不可用,则生产会被阻塞
  • replication.factor=3:每个分区的 总副本数量(含 Leader),为了保证整个 Kafka 服务的高可用性,你还需要确保 replication.factor > min.insync.replicas ,一般推荐设置成 replication.factor = min.insync.replicas + 1。
  • unclean.leader.election.enable=false:禁止 非同步副本(Out-of-Sync) 成为 Leader。若允许非同步副本成为 Leader,可能导致已提交数据被覆盖,金融场景必须设为 false
    • 我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
    • 多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,即只从 ISR 中选择 leader,这样降低了消息丢失的可能性。
  • log.flush.interval.messages=10000:每累积 10000 条消息 强制刷盘一次。
  • log.flush.interval.ms=1000:每 1000 毫秒(1 秒) 强制刷盘一次。

消费者保证 100% 处理原理

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移(offset)。

偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。

自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。

解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。

这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。

开启手动提交的时候消费端需要去保证幂等性。

幂等消费 + 死信队列

@Slf4j
@Component
publicclass ReliableConsumer {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "orders")
    public void consume(ConsumerRecord<String, String> record) {
        String msgId = record.key();
        Order order = JsonUtil.fromJson(record.value(), Order.class);

        // 1. 幂等检查
        if (isProcessed(msgId)) {
            log.info("消息重复消费,已跳过: {}", msgId);
            return;
        }

        // 2. 获取分布式锁
        Lock lock = redisLockFactory.getLock("LOCK:" + msgId);
        if (!lock.tryLock(3, TimeUnit.SECONDS)) {
            thrownew ConcurrentAccessException("获取锁失败");
        }

        try {
            // 3. 二次幂等检查(防并发)
            if (isProcessed(msgId)) {
                return;
            }

            // 4. 业务处理
            orderService.processOrder(order);

            // 5. 记录处理状态(设置24小时过期)
            markProcessed(msgId);
        } catch (BusinessException e) {
            // 6. 业务异常处理
            handleFailure(record, e);
        } finally {
            lock.unlock();
        }
    }

    private boolean isProcessed(String msgId) {
        return"PROCESSED".equals(
            redisTemplate.opsForValue().get("MSG:" + msgId));
    }

    private void markProcessed(String msgId) {
        redisTemplate.opsForValue().set(
            "MSG:" + msgId, "PROCESSED"24, TimeUnit.HOURS);
    }

    private void handleFailure(ConsumerRecord<?, ?> record, Exception e) {
        // 失败计数
        int failCount = incrementFailCounter(record.key());

        if (failCount < 3) {
            thrownew RetryableException(e); // 触发重试
        } else {
            sendToDlq(record); // 转移死信队列
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 配置批量ACK(性能与可靠性的平衡)
        factory.getContainerProperties().setAckMode(
            AckMode.BATCH);

        // 消费并发控制
        factory.setConcurrency(3);

        return factory;
    }
}

端到端保障:构建全链路防御体系

除了对生产者、MQ 中间件、消费端保证不丢失消息的处理手段,还可以对消息轨迹进行监控。

自动化对账系统实现代码案例。

@Service
@Slf4j
publicclass ReconciliationService {

    @Scheduled(cron = "0 0 2 * * ?"// 每天凌晨2点执行
    public void dailyReconciliation() {
        // 1. 生产端计数
        long produced = countProducerMessages();

        // 2. MQ端计数
        long stored = countMQMessages();

        // 3. 消费端计数
        long consumed = countConsumerMessages();

        // 4. 数据对比
        if (produced != stored) {
            handleLoss(produced - stored, "生产到MQ丢失");
        }

        if (stored != consumed) {
            handleLoss(stored - consumed, "MQ到消费丢失");
        }

        log.info("对账完成: 生产={}, MQ存储={}, 消费={}",
            produced, stored, consumed);
    }

    private void handleLoss(long lossCount, String stage) {
        log.error("消息丢失告警: 阶段={}, 数量={}", stage, lossCount);
        // 1. 通知运维团队
        alertService.notifyStaff(stage, lossCount);

        // 2. 自动恢复机制
        if (lossCount < 1000) {
            recoveryService.recoverFromBackup();
        } else {
            // 重大事故,启动紧急预案
            emergencyService.handleDisaster();
        }
    }
}

总结

消息零丢失的三位一体架构本质上是对不确定性的系统化防御

  1. 生产者防御:建立冗余记录(消息表)对抗网络不确定性
  2. 存储层防御:通过副本机制抵御物理故障
  3. 消费者防御:依靠幂等性消除重试副作用
  4. 监控层防御:用全局视角捕捉异常情况

在 Java 生态中,我们拥有强大的工具集实现这套防御:

  • Spring 事务管理:确保本地事务一致性
  • Kafka/RabbitMQ 客户端:提供精细化的 ACK 控制
  • Redis 分布式锁:实现高并发下的幂等控制

最后介绍我人生的第一本书《Redis 高手心法》本书基于 Redis 7.0 版本,复杂的概念与实际案例相结合,以简洁、诙谐、幽默的方式揭示了Redis的精髓。

本书完美契合你对一个具体技术学习的期望: Redis 核心原理、关键细节、应用场景以及如何取舍......

从 Redis 的第一人称视角出发,拟人故事化方式和诙谐幽默的言语与各路“神仙”对话,配合 158 张图,由浅入深循序渐进的讲解 Redis 的数据结构实现原理、开发技巧、运维技术和高阶使用,让人轻松愉快地学习。

如下图所示,上市后得到了许多读者的较好口碑评价,而且上过京东榜单!其中还有一些业界大佬、公司 CTO 的推荐。

现在优惠力度特别大,原价 100,推荐大家趁着这个机会,赶紧冲一波,拍下《Redis 高手心法》 秘籍,早日修炼天阶斗技。

往期推荐



21 张图 9千字详解Kafka为何支持每秒上百万的高并发写入?架构是怎样的?

从 12s 到 200ms,MySQL 两千万订单数据 6 种深度分页优化全解析

38 张图详解 Redis:核心架构、发布订阅机制、9大数据类型底层原理、RDB和AOF 持久化、高可用架构、性能问题排查和调优

阿里天猫二面:MySQL Double Write Buffer是什么?架构是怎样的?为什么能保证崩溃恢复...


【声明】内容源于网络
0
0
码哥跳动
《Redis 高手心法》作者,后端架构师,宗旨是拥抱技术和对象,面向人民币编程。
内容 510
粉丝 0
码哥跳动 《Redis 高手心法》作者,后端架构师,宗旨是拥抱技术和对象,面向人民币编程。
总阅读134
粉丝0
内容510