Kafka 消息机制

Kafka 消息丢失情况分析

消息生产端

Kafka Producer 消息发送(producer.type)有两种,syncasync,默认是同步sync,而如果设置成异步async,生产者可以以 batch 的形式批量 push 数据,这样可以减少网络和磁盘 IO 的次数,以少量的延迟换取 broker 的吞吐量,但同时也增加了丢失数据的风险。producer.type=async下,相对应的几个参数

  • queue.buffering.max.ms(default:5000) 启用异步模式时,producer 缓存消息的时间。比如我们设置成 1000 时,它会缓存 1s 的数据再一次发送出去,这样可以极大的增加 broker 吞吐量,但也会造成时效性的降低。
  • queue.buffering.max.messages(default:10000) 启用异步模式时,producer 缓存队列里最大缓存的消息数量,如果超过这个值,producer 就会阻塞或者丢掉消息。
  • queue.enqueue.timeout.ms(default:-1) 当达到上面参数时 producer 会阻塞等待的时间。
    • 0:buffer队列满时 producer 不会阻塞,消息直接被丢掉。
    • -1:producer 会被阻塞直到队列有空间为止,不会丢消息。
  • batch.num.messages() 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer 才会发送消息,即每次批量发送的数量。

Kafka Producer 消息确认(acks)有三种:

  • acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.
  • acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
  • acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

简而言之,ack=0不进行消息接受成功的确认,ack=1leader写入成功后返回确认,ack=allleaderfollower都写入成功后返回确认。

总结一下在消息发送时丢失会丢失数据的情况

  • request.required.acks不为-1的情况下,根本不确认(acks=0)有可能会丢失数据,只保证写入leader成功的话(acks=1),在异步同步 follower 之前,刚好 leader 挂了,也会丢数据。
  • producer.type=async,当queue.buffering.max.messages满了,并且queue.enqueue.timeout.ms不为-1,可能会立即丢掉,或者在等待超时后丢掉数据

消息消费端

Kafka 消息消费有两种 Consumer API

  • Low-Level,消费者自己维护 offset,实现对 Kafka 的完全控制
  • High-Level,封装了 partition 和 offset 的管理,使用简单,根据 Kafka 版本不同存储在不同的地方,zookeeper 或者 kafka 自身。

如果使用High-level API,并且enable.auto.commit=true,当消费者从集群中把消息取出来,并 commit 了新的消息 offset 值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了,这种情况下可以关闭自动提交enable.auto.commit=false,在确认数据被处理完成之后,在手动 commit offset。

Kafka 数据一致性的保证

在上边的参数都保证消息不会丢失,数据持久性的情况下,Kafka 对于节点间数据一致性的保证是基于至少有一个节点在保持同步状态,一旦分区上的所有备份节点都挂了,就无法保证了。当分区所有备份都挂了,并且 leader 也挂了,为了保证可用性,Kafka 有两种实现方式

  • 等待一个ISR的副本重新恢复正常服务,并选择这个副本作为 leader,但是ISR中的 follower 并不一定是和 leader 完全一致的,只是一致的可能性比较高,取决于HW(High Watermark)LEO(Log End Offset)LEO > HW的 follower 会加入ISR
  • 选择第一个重新恢复正常的 replica(不一定是ISR中的) 作为 leader

这也是一致性和可用性之间的简单妥协,Kafka 默认采用第二种策略,当ISR中 replica 所在的节点全挂掉的时候,会选择一个非ISR集合中的 replica,不过可以通过unclean.leader.election.enable=false禁用此策略,从而使用第一种策略,死等。这种可用性和一致性的权衡,存在于任何 quorum-based 规则中,例如在多数投票算法中,如果大多数服务器永久性的挂了,要么就当数据100%丢失,要么就违背数据的一致性原则,选择一个存活的节点作为数据可信的来源。

Kafka 消费事务 - Exactly Once

Kafka 消息顺序保证

Reference

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦