RabbitMQ如何保证消息可靠性 - ZhangTory's NoteBlog - 张耀誉的笔记博客

RabbitMQ如何保证消息可靠性

消息队列保证消息可靠,一般来说都可以从以下3点入手。

  1. 消息生产可靠
  2. 消息存储可靠
  3. 消息消费可靠

本次记录一下RabbitMQ如何保证消息可靠的。

1.消息生产可靠

RabbitMQ的3种确认模式:

/**
 * The type of publisher confirms to use.
 */
public enum ConfirmType {
   /**
    * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
    * within scoped operations.
    */
   SIMPLE,
   /**
    * Use with {@code CorrelationData} to correlate confirmations with sent
    * messsages.
    */
   CORRELATED,
   /**
    * Publisher confirms are disabled (default).
    */
   NONE
}

设置发布者的confirmType:

connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

在rabbitTemplate中实现、设置ConfirmCallback方法:

rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean isSuccess, String cause) -> {
    if (!isSuccess) {
        log.warn("send message to mq failed, correlationData: [{}], reason: [{}]", correlationData, cause);
    }
});

当程序发送消息后就会收到回调,我们可以在回调函数中手动控制成功或失败的处理。

2. 消息存储可靠

首先有可能在broker中消息不能正确路由到对应的队列中,默认情况mq会将消息丢弃。
我们可以在rabbitTemplate中设置开启监听回调:

rabbitTemplate.setMandatory(true);

并实现ReturnCallback方法:

rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
    log.warn("cannot reach specified queue, message: [{}], replyCode: [{}], replyText: [{}], exchange: [{}], routingKey: [{}]",
            message, replyCode, replyText, exchange, routingKey);
});

其次还需要考虑消息的持久化。

3. 消息消费可靠

RabbitMQ消费者确认分三种: 自动确认AUTO(默认) 和 手动确认MANUAL 和 不确认NONE
自动确认: 消息发送到消费者后即认为发送完成,不论消费者是否已经成功消费或后续消费出异常。
手动确认: 在消费者业务流程处理完成后,手动确认,才认为消息被成功消费。

自动确认下,如果消费异常,消息可能会丢失,所以需要在代码中捕获异常并记录消息,方便后续补偿。或者实现RabbitListenerErrorHandler接口,捕获@RabbitListener注解中发生的异常:

@RabbitListener(
        bindings = @QueueBinding(
                exchange = @Exchange(value = MqExchangeConstant.RECHARGE_SUCCESS_EXCHANGE, type = BaseConstant.FANOUT),
                value = @Queue(value = MqQueueConstant.DIVERSION_VIP_SUCCESS_FANOUT_QUEUE)
        ),
        errorHandler = "rabbitListenerErrorHandler"
)


@Bean("rabbitListenerErrorHandler")
public RabbitListenerErrorHandler handle() {
    return (amqpMessage, message, exception) -> {
        String messageContent = message.getPayload().toString();
        log.info("error message content: [{}]", messageContent);
        Long mqConsumeDetailId = JSON.parseObject(messageContent).getLong(TRACEABLE_MQ_CONSUME_DETAIL_ID);
        if (mqConsumeDetailId != null) {
            if (exception.getCause() != null) {
                mqConsumeDetailDao.update(mqConsumeDetailId, MqConsumeDetailStatusEnum.FAILED.getStatus(), exception.getCause().getMessage());
            } else {
                mqConsumeDetailDao.update(mqConsumeDetailId, MqConsumeDetailStatusEnum.FAILED.getStatus(), null);
            }
        }
        return null;
    };
}

手动确认下,如果消费者异常,没有确认消息,那么消息会重新排队,等待下次消费,需要处理好幂等。当然也会出现失败-排队-失败的死循环中,可以设置重试次数,如果到达重试次数,则会丢弃消息:

#开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#默认重试的次数为3
spring.rabbitmq.listener.simple.retry.max-attempts=5

如果有多个消费者,自动确认会平均分配消息,手动确认会按照消费者消费的速度分配消息。

添加新评论

电子邮件地址不会被公开,评论内容可能需要管理员审核后显示。