消息队列保证消息可靠,一般来说都可以从以下3点入手。
- 消息生产可靠
- 消息存储可靠
- 消息消费可靠
本次记录一下kafka如何保证消息可靠的。
1.消息生产可靠
首先生产者要保证消息发送到broker。
Kafka为生产者生产消息提供了一个 send(msg)
方法,另有一个重载的方法 send(msg, callback)
。
- send(msg)
该方法可以将一条消息发送出去,但是对发送出去的消息没有掌控能力,无法得知其最后是不是到达了Kafka,所以这是一种不可靠的发送方式。
- send(msg, callback)
该方法可以将一条消息发送出去,并且可以从callback回调中得到该条消息的发送结果,并且callback是异步回调,所以在兼具性能的情况下,也对消息具有比较好的掌控。
例如:
kafkaTemplate.send(TOPIC, JSONObject.toJSONString(data)).addCallback(
success -> {
//发送成功后的处理,目前看好像不需要做什么操作
log.info("send event success.");
},
failure -> {
//发送失败后的处理,先记录个日志
log.error("send event error.", failure);
});
其次还需要注意生产者的配置。
生产者配置例子:
public ProducerFactory<String, String> producerFactory(String bootstrapServers) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, "3");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
配置中有2个值需要注意:
- ProducerConfig.RETRIES_CONFIG
生产者发送失败后的重试次数。默认为0.
- ProducerConfig.ACKS_CONFIG
- acks=0,那么生产者将完全不会管服务器是否收到消息,该记录将立即添加到套接字缓冲区中并视为已发送,不能保证broker正常接受到消息,返回到偏移量始终等于-1。优点是吞吐量大,一般配合send(msg)使用。
- acks=1,当broker的leader接受到消息就会直接给客户端返回成功,不管是否同步到了follower。只有leader挂了同时数据还没有同步到follower才会导致数据丢失。
- acks=all或者-1,当broker的leader接受到消息并同步给指定数量的follower才会通知生产者消息发送成功。这种方式几乎不会丢失数据。同步到follower的数量在broker中配置。
2.消息存储可靠
为了保证高可用,broker都是以集群部署的。为了保证消息不丢失,需要将消息发送到多个broker节点中作为副本。
- min.insync.replicas 最小同步副本数
当acks=all时,写入的副本数就必须大于等于min.insync.replicas时才会认为消息发送成功。
如果达不到这个要求,生产者端会收到一个either NotEnoughReplicas or NotEnoughReplicasAfterAppend的异常。
这个数配置多少需要由实际的集群数量有关。
比如集群中有3个broker,那么replication.factor=3,我们至少设置min.insync.replicas=2即可。如果设为1则没有意义,因为leader中就包含1了。如果设置为3,那么当集群中down掉一个的时候,就永远达不到最小副本数,导致无法正常使用。
所以replication.factor应当大于min.insync.replicas才能保证整个系统可用。
- unclean.leader.election.enable
unclean.leader.election.enable=false代表数据落后较多的follower是否能在leader当机后被选举为新leader。
如果设置为true,那么可能会出现落后较多的broker节点被选举为leader,造成部分数据丢失。
关于broker如何安全的存放消息还有更多的内容,但是从一般使用来说注意这些配置就够了。
3.消息消费可靠
public KafkaListenerContainerFactory<?> walletKafkaListenerFactory(
@Value("${alarm.kafka.bootstrap-servers}") String bootstrapServers) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs(bootstrapServers)));
factory.setConcurrency(2);
factory.getContainerProperties().setPollTimeout(5000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
private Map<String, Object> consumerConfigs(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "nw");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
return props;
}
一般来说消费端的消息可靠最为重要,首先是消费者的配置,不论什么MQ一般为了防止消息丢失,我们都会关闭自动提交之类的功能,改为手动提交。例如kafka:
设置ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=false,关闭自动提交位移
设置ackMode为MANUAL,即消费成功后手动确认。保证不会因为自动ack造成offset位移造成消息消费失败后还不知道。
除了配置,在消费消息的时候一定也要注意消费端的幂等。