环境:SpringBoot2.7.16 + RabbitMQ3.8.35
1. 简介
RabbitMQ 是一个开源的消息代理和队列服务器,用于通过轻量级和可靠的消息传递,在服务器之间进行通信。在Spring Boot项目中我们一般都是通过@RabbitListener进行消息监听。可以通过配置消息监听器并发数来提高系统的消息处理能力。
在实际应用中,根据业务场景的不同,我们可能需要动态调整 RabbitMQ 消息监听的并发数。例如,当RabbitMQ消息积压过多时,这时候我们就可以考虑通过动态调整并发数,以提高消息处理速度;而在系统自身负载过高时,这时候可以考虑通过减少并发数来减轻系统的整体压力。本篇文章将通过具体的示例来展示如何调整运行中消息监听处理器的并发数。
注意:动态调整并发监听数还可以帮助我们更好地控制系统的稳定性和可靠性。通过实时监测系统的负载情况和消息处理速度,我们可以及时发现潜在的问题并进行调整,从而确保系统的正常运行。
2. 实战案例
依赖管理
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.1 配置管理
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtualHost: testpublisherConfirmType: correlatedpublisherReturns: truelistener:simple:# 手动应答acknowledgeMode: manualconcurrency: 2max-concurrency: 2
2.3 创建交换机及队列
通过管理界面创建交换机及队列。
交换机名:test.exchange类型为topic
队列名: test
将交换机与队列进行绑定路由key:akf.#
2.4 消息队列准备消息
通过如下接口,先往队列中插入100条消息
@Resourceprivate RabbitTemplate rabbitTemplate ;@GetMapping("/send")public String send() {new Thread(() -> {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend("test.exchange", "akf.a", "message - " + i) ;}}).start() ;return "success" ;}

2.5 消息监听器
@RabbitListener(queues = "test")public void listener1(String message) {System.out.printf("%s - 接收到消息:%s%n", Thread.currentThread().getName(), message) ;try {TimeUnit.SECONDS.sleep(2) ;} catch (InterruptedException e) {}}
2.6 测试
测试上面的消息监听器是正常的

2.7 调整并发数
在上一步的测试中我们发现控制台打印的始终是一个线程在执行消息处理。但是在一开始的配置文件中我们将concurrency属性设置的为2,起码这里应该是2个线程交替执行才对,这是为什么呢?
Spring监听RabbitMQ的消息时默认并不是一条一条的从RabbitMQ中去,是一次预期一批数据,这一批消费完后才进行下一批的获取,默认预期250条。而我们向队列中存入的数据才100条,所以控制台中你只能看到一个线程打印,因为你没有足够的消息供其它线程去获取处理。我们可以通过如下配置进行预期数的设置:
spring:rabbitmq:listener:simple:prefetch: 5
重新启动服务,测试如下

2个线程交替执行;接下来该如何实现动态调整并发数呢?
首先,修改消息监听器配置
@RabbitListener(id = "test-queue", queues = "test", ackMode = "AUTO")public void listener1(String message) {// ...}
id: 这里最好是设置唯一的id值,我们是要通过该id值来获取当前队列的消息监听容器。
ackMode: AUTO 这里设置的应答模式,用来覆盖配置文件中的设置。
其次,通过RabbitListenerEndpointRegistry操作
@Resourceprivate RabbitListenerEndpointRegistry registry ;@GetMapping("/modify/{count}")public Object modify(@PathVariable("count") Integer count) {// 这里通过id获取对应的队列监听器;所以上面一定要定义唯一的id值MessageListenerContainer listenerContainer = registry.getListenerContainer("test-queue") ;if (listenerContainer instanceof SimpleMessageListenerContainer container) {container.setConcurrentConsumers(count) ;}return String.format("并发接收消息:%d%n", count) ;}
最后,测试
首先将服务启动,控制输出如下(当前只有2个线程处理)

目前只有2个线程
调用上面的接口修改并发数为3个后,控制台输出

成功增加了一个消费者线程。
接下来再测试,如果修改的数量大于最大数(spring.rabbitmq.listener.simple.max-concurrency)

控制台抛出如下异常

不能超过最大数;再看看调小是否可以

可以动态调小。
我们也可以对消息监听器进行暂停消费和重新启动消息监听,这里就不在演示了,非常简单调用相应start/stop即可。
总结:在 Spring Boot 中动态调整 RabbitMQ 消息监听的并发数是一个重要的优化手段。通过合理设置并发数并根据系统负载情况进行动态调整,我们可以提高消息处理效率、节省系统资源、确保系统的稳定性和可靠性。在实际应用中,我们应该根据具体的业务场景和需求来选择合适的并发数调整策略,以达到最佳的性能和效果。
以上是本篇文章的全部内容,如对你有帮助帮忙点赞+转发+收藏
推荐文章
强大!Spring Cloud网关Gateway新特性及高级开发技巧
SpringBoot3.2开始Controller参数验证@Validated新功能
SpringBoot自带Controller接口监控,赶紧用起来



