消息队列保证消息可靠,一般来说都可以从以下3点入手。
- 消息生产可靠
- 消息存储可靠
- 消息消费可靠
本次记录一下RabbitMQ如何保证消息可靠的。
1.消息生产可靠
RabbitMQ的3种确认模式:
/**
* The type of publisher confirms to use.
*/
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}
设置发布者的confirmType:
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
在rabbitTemplate中实现、设置ConfirmCallback方法:
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean isSuccess, String cause) -> {
if (!isSuccess) {
log.warn("send message to mq failed, correlationData: [{}], reason: [{}]", correlationData, cause);
}
});
当程序发送消息后就会收到回调,我们可以在回调函数中手动控制成功或失败的处理。
2. 消息存储可靠
首先有可能在broker中消息不能正确路由到对应的队列中,默认情况mq会将消息丢弃。
我们可以在rabbitTemplate中设置开启监听回调:
rabbitTemplate.setMandatory(true);
并实现ReturnCallback方法:
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
log.warn("cannot reach specified queue, message: [{}], replyCode: [{}], replyText: [{}], exchange: [{}], routingKey: [{}]",
message, replyCode, replyText, exchange, routingKey);
});
其次还需要考虑消息的持久化。
3. 消息消费可靠
RabbitMQ消费者确认分三种: 自动确认AUTO(默认) 和 手动确认MANUAL 和 不确认NONE
自动确认: 消息发送到消费者后即认为发送完成,不论消费者是否已经成功消费或后续消费出异常。
手动确认: 在消费者业务流程处理完成后,手动确认,才认为消息被成功消费。
自动确认下,如果消费异常,消息可能会丢失,所以需要在代码中捕获异常并记录消息,方便后续补偿。或者实现RabbitListenerErrorHandler接口,捕获@RabbitListener注解中发生的异常:
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = MqExchangeConstant.RECHARGE_SUCCESS_EXCHANGE, type = BaseConstant.FANOUT),
value = @Queue(value = MqQueueConstant.DIVERSION_VIP_SUCCESS_FANOUT_QUEUE)
),
errorHandler = "rabbitListenerErrorHandler"
)
@Bean("rabbitListenerErrorHandler")
public RabbitListenerErrorHandler handle() {
return (amqpMessage, message, exception) -> {
String messageContent = message.getPayload().toString();
log.info("error message content: [{}]", messageContent);
Long mqConsumeDetailId = JSON.parseObject(messageContent).getLong(TRACEABLE_MQ_CONSUME_DETAIL_ID);
if (mqConsumeDetailId != null) {
if (exception.getCause() != null) {
mqConsumeDetailDao.update(mqConsumeDetailId, MqConsumeDetailStatusEnum.FAILED.getStatus(), exception.getCause().getMessage());
} else {
mqConsumeDetailDao.update(mqConsumeDetailId, MqConsumeDetailStatusEnum.FAILED.getStatus(), null);
}
}
return null;
};
}
手动确认下,如果消费者异常,没有确认消息,那么消息会重新排队,等待下次消费,需要处理好幂等。当然也会出现失败-排队-失败的死循环中,可以设置重试次数,如果到达重试次数,则会丢弃消息:
#开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#默认重试的次数为3
spring.rabbitmq.listener.simple.retry.max-attempts=5
如果有多个消费者,自动确认会平均分配消息,手动确认会按照消费者消费的速度分配消息。