共计 1664 个字符,预计需要花费 5 分钟才能阅读完成。
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ 从队列中删除该消息
nack:消息处理失败,RabbitMQ 需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过 try catch
机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.
由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:
none
:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用
manual
: 手动模式。需要自己在业务代码中调用 api,发送 ack
或reject
,存在业务入侵,但更灵活
auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:
如果是 业务异常,会自动返回nack
;
如果是 消息处理或校验异常,自动返回reject
;
通过下面的配置可以修改 SpringAMQP 的 ACK 处理方式:
java
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
修改 consumer 服务的 SpringRabbitListener 类中的方法,模拟一个消息处理的异常:
java
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {throw new MessageConversionException("故意的");
}
log.info("消息处理完成");
}
测试可以发现:当消息处理发生异常时,消息依然被 RabbitMQ 删除了。
重复机制
当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次 requeue 到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致 mq 的消息处理飙升,带来不必要的压力,为了应对上述情况 Spring 又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。
修改 consumer 服务的 application.yml 文件,添加内容:
XML
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为 1 秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true 无状态;false 有状态。如果业务中包含事务,这里改为 false
重启 consumer 服务,重复之前的测试。消费者在失败后消息没有重新回到 MQ 无限重新投递,而是在本地重试了 3 次,本地重试 3 次以后,抛出了 AmqpRejectAndDontRequeueException
异常。查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是reject。
文章来源: RabbitMQ 消费者确认和重复机制