kafka如何保证消息可靠性 - ZhangTory's NoteBlog - 张耀誉的笔记博客

kafka如何保证消息可靠性

消息队列保证消息可靠,一般来说都可以从以下3点入手。

  1. 消息生产可靠
  2. 消息存储可靠
  3. 消息消费可靠

本次记录一下kafka如何保证消息可靠的。

1.消息生产可靠

首先生产者要保证消息发送到broker。

Kafka为生产者生产消息提供了一个 send(msg) 方法,另有一个重载的方法 send(msg, callback)

  • send(msg)

该方法可以将一条消息发送出去,但是对发送出去的消息没有掌控能力,无法得知其最后是不是到达了Kafka,所以这是一种不可靠的发送方式。

  • send(msg, callback)

该方法可以将一条消息发送出去,并且可以从callback回调中得到该条消息的发送结果,并且callback是异步回调,所以在兼具性能的情况下,也对消息具有比较好的掌控。
例如:

kafkaTemplate.send(TOPIC, JSONObject.toJSONString(data)).addCallback(
    success -> {
        //发送成功后的处理,目前看好像不需要做什么操作
        log.info("send event success.");
    },
    failure -> {
        //发送失败后的处理,先记录个日志
        log.error("send event error.", failure);
    });

其次还需要注意生产者的配置。

生产者配置例子:

public ProducerFactory<String, String> producerFactory(String bootstrapServers) {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.ACKS_CONFIG, "all");
    configProps.put(ProducerConfig.RETRIES_CONFIG, "3");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

配置中有2个值需要注意:

  • ProducerConfig.RETRIES_CONFIG

生产者发送失败后的重试次数。默认为0.

  • ProducerConfig.ACKS_CONFIG
  1. acks=0,那么生产者将完全不会管服务器是否收到消息,该记录将立即添加到套接字缓冲区中并视为已发送,不能保证broker正常接受到消息,返回到偏移量始终等于-1。优点是吞吐量大,一般配合send(msg)使用。
  2. acks=1,当broker的leader接受到消息就会直接给客户端返回成功,不管是否同步到了follower。只有leader挂了同时数据还没有同步到follower才会导致数据丢失。
  3. acks=all或者-1,当broker的leader接受到消息并同步给指定数量的follower才会通知生产者消息发送成功。这种方式几乎不会丢失数据。同步到follower的数量在broker中配置。

2.消息存储可靠

为了保证高可用,broker都是以集群部署的。为了保证消息不丢失,需要将消息发送到多个broker节点中作为副本。

  • min.insync.replicas 最小同步副本数

当acks=all时,写入的副本数就必须大于等于min.insync.replicas时才会认为消息发送成功。
如果达不到这个要求,生产者端会收到一个either NotEnoughReplicas or NotEnoughReplicasAfterAppend的异常。
这个数配置多少需要由实际的集群数量有关。
比如集群中有3个broker,那么replication.factor=3,我们至少设置min.insync.replicas=2即可。如果设为1则没有意义,因为leader中就包含1了。如果设置为3,那么当集群中down掉一个的时候,就永远达不到最小副本数,导致无法正常使用。
所以replication.factor应当大于min.insync.replicas才能保证整个系统可用。

  • unclean.leader.election.enable

unclean.leader.election.enable=false代表数据落后较多的follower是否能在leader当机后被选举为新leader。
如果设置为true,那么可能会出现落后较多的broker节点被选举为leader,造成部分数据丢失。

关于broker如何安全的存放消息还有更多的内容,但是从一般使用来说注意这些配置就够了。

3.消息消费可靠

public KafkaListenerContainerFactory<?> walletKafkaListenerFactory(
        @Value("${alarm.kafka.bootstrap-servers}") String bootstrapServers) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new
            ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs(bootstrapServers)));
    factory.setConcurrency(2);
    factory.getContainerProperties().setPollTimeout(5000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

private Map<String, Object> consumerConfigs(String bootstrapServers) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "nw");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
    return props;
}

一般来说消费端的消息可靠最为重要,首先是消费者的配置,不论什么MQ一般为了防止消息丢失,我们都会关闭自动提交之类的功能,改为手动提交。例如kafka:
设置ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=false,关闭自动提交位移
设置ackMode为MANUAL,即消费成功后手动确认。保证不会因为自动ack造成offset位移造成消息消费失败后还不知道。

除了配置,在消费消息的时候一定也要注意消费端的幂等。

添加新评论

电子邮件地址不会被公开,评论内容可能需要管理员审核后显示。