Kafka进阶-基本原理

集群架构图

kafka_cluster_architecture

使用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

  • 消息系统:解耦和生产者和消费者、缓存消息等。

  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

  • 流式处理:比如spark streaming和storm

  • 事件源

如何做到高吞吐、低延迟

Kafka写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁盘

因此 Kafka 达到高吞吐、低延迟的原因主要有以下4点:

  • 页缓存是在内存中分配的,所以消息写入的速度很快。

  • Kafka不必和底层的文件系统进行交互,所有繁琐的I/O操作都由操作系统来处理

  • Kafka采用追加写的方式,避免了磁盘随机写操作

  • 使用以Sendfile为代表的零拷贝技术提高了读取数据的效率。

PS: 使用页缓存而非堆内存还有一个好处,就是当Kafka broker的进程崩溃时,堆内存的数据会丢失,但是页缓存的数据依然存在,重启Kafka broker后可以继续提供服务。

Producer工作流程

序列化消息 && 计算partition

根据key和value的配置对消息进行序列化,然后计算partition:

  • ProducerRecord对象中如果指定了partition,就使用这个partition;

  • 否则根据key和topic的partition数目取余;

  • 如果key也没有的话就随机生成一个counter,使用这个counter来和partition数目取余。这个counter每次使用的时候递增。

发送到batch && 唤醒Sender线程

根据topic-partition获取对应的batchs(Dueue<ProducerBatch>),然后将消息append到batch中. 如果有batch满了则唤醒Sender线程。队列的操作是加锁执行,所以batch内消息是有序的,后续的Sender操作为异步操作。

Sender把消息有序发到broker(tp replia leader)

确定tp relica leader 所在的broker

  • Kafka中每台broker都保存了kafka集群的metadata信息,metadata信息里包括了每个topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等; Kafka客户端从任一broker都可以获取到需要的metadata信息; sender线程通过metadata信息可以知道tp leader的brokerId

  • producer也保存了metada信息,同时根据metadata更新策略(定期更新metadata.max.age.ms、失效检测,强制更新):检查到metadata失效以后,调用metadata.requestUpdate()强制更新

幂等性发送
为实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number。同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号)大一,则Broker会接受它,否则将其丢弃:

  • 如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber

  • 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

Sender处理broker发来的produce response

一旦broker处理完Sender的produce请求,就会发送produce response给Sender,此时producer将执行我们为send()设置的回调函数。至此producer的send执行完毕。

Consumer工作流程

Poll消息

  • 消费者通过fetch线程拉消息(单线程)

  • 消费者通过心跳线程来与broker发送心跳。超时会认为挂掉

  • 每个consumer groupbroker上都有一个coordnator来管理,消费者加入和退出,以及消费消息的位移都由coordnator处理。

位移管理

consumer的消息位移代表了当前group对topic-partition的消费进度,consumer宕机重启后可以继续从该offset开始消费。

在kafka0.8之前,位移信息存放在zookeeper上,由于zookeeper不适合高并发的读写,新版本Kafka把位移信息当成消息,发往 __consumers_offsets 这个 topic 所在的 broker,__consumers_offsets 默认有50个分区。
消息的key 是 groupId+topic_partition, value 是offset.
-w887

Kafka Group状态

  • Empty:初始状态,Group 没有任何成员,如果所有的 offsets 都过期的话就会变成 Dead

  • PreparingRebalance:Group 正在准备进行 Rebalance

  • AwaitingSync:Group 正在等待 group leader 的分配方案

  • Stable:稳定的状态(Group is stable);

  • Dead: Group 内已经没有成员,并且它的 Metadata 已经被移除

重平衡Reblance

当一些原因导致consumer对partition消费不再均匀时,kafka 会自动执行rebalance,使得consumer对partition的消费再次平衡。

什么时候发生rebalance?:

  • 组订阅topic数变更

  • topic partition数变更

  • consumer成员变更

Reblance过程

  • 举例1 consumer被检测为崩溃引起的rebalance
    比如心跳线程在 timeout 时间内没和 broker 发送心跳,此时 coordinator 认为该group应该进行rebalance。接下来其他consumer发来fetch请求后,coordinator将回复他们进行rebalance通知。当consumer成员收到请求后,只有leader会根据分配策略进行分配,然后把各自的分配结果返回给coordinator。 这个时候只有consumer leader返回的是实质数据,其他返回的都为空。收到分配方法后,coordinator将会把分配策略同步给各consumer.

  • 举例2 consumer加入引起的rebalance

    • 使用 join 协议,表示有consumer 要加入到group中

    • 使用 sync 协议,根据分配规则进行分配

Rebalance机制存在的问题

在大型系统中,一个topic可能对应数百个consumer实例。

  • 这些consumer陆续加入到一个空消费组将导致多次的rebalance;

  • 此外consumer 实例启动的时间不可控,很有可能超出coordinator确定的rebalance timeout(即max.poll.interval.ms),将会再次触发rebalance,而每次rebalance的代价又相当地大,因为很多状态都需要在rebalance前被持久化,而在rebalance后被重新初始化

    新版本改进

    通过延迟进入 PreparingRebalance 状态减少 rebalance 次数

    新版本新增了 group.initial.rebalance.delay.ms 参数。空消费组接受到成员加入请求时,不立即转化到 PreparingRebalance 状态来开启rebalance。当时间超过group.initial.rebalance.delay.ms后,再把group状态改为PreparingRebalance(开启rebalance)

    实现机制是在coordinator底层新增一个group状态:InitialRebalance。假设此时有多个consumer陆续启动,那么group状态先转化为 InitialRebalance,待group.initial.rebalance.delay.ms 时间后,再转换为PreparingRebalance(开启rebalance).

消息传输一致

Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。

  • at most once: 消费者fetch消息, 然后保存offset,然后处理消息; 当client保存offset之后,但是在消息处理过程中consumer进程失效(crash), 导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理), 这就是”at most once”.** 可能会出现数据丢失情况;**

  • at least once: 消费者fetch消息, 然后处理消息, 然后保存offset. 如果消息处理成功之后, 但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功, 这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是”at least once”.可能会重传数据,有可能出现数据被重复处理的情况;

  • exactly once:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:

    最少1次 + 消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况

Broker设计原理

Broker 是Kafka 集群中的节点。负责处理生产者发送过来的消息,消费者消费的请求。以及集群节点的管理等。

broker消息存储

  • Kafka的消息以二进制的方式紧凑地存储,节省了很大空间

  • 此外消息存在 ByteBuffer 而不是堆,这样broker进程挂掉时,数据不会丢失,同时避免了gc问题

  • 通过零拷贝和顺序寻址,让消息存储和读取速度都非常快

  • 处理fetch请求的时候通过 zero-copy 加快速度

broker状态数据

broker设计中,每台机器都保存了相同的状态数据。主要包括以下:

  • Controller所在的broker ID,即保存了当前集群中controller是哪台broker;

  • 集群中所有broker的信息:比如每台broker的ID、机架信息以及配置的若干组连接信息

  • 集群中所有节点的信息:严格来说,它和上一个有些重复,不过此项是按照broker ID监听器类型进行分组的。 对于超大集群来说,使用这一项缓存可以快速地定位和查找给定节点信息,而无需遍历上一项中的内容,算是一个优化吧

  • 集群中所有分区的信息:所谓分区信息指的是分区的leader、ISR和AR信息以及当前处于offline状态的副本集合。 这部分数据按照 topic-partitionID 进行分组,可以快速地查找到每个分区的当前状态。(注:AR表示assigned replicas,即创建topic时为该分区分配的副本集合)

broker负载均衡

  • 分区数量负载:各台broker的partition数量应该均匀
    partition Replica分配算法如下:

    • 将所有Broker(假设共n个Broker)和待分配的Partition排序;

    • 将第i个Partition分配到第(i mod n)个Broker上

    • 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上

  • 容量大小负载:每台broker的硬盘占用大小应该均匀
    在kafka1.1之前,Kafka能够保证各台broker上partition数量均匀,但由于每个partition内的消息数不同,可能存在不同硬盘之间内存占用差异大的情况。在Kafka1.1中增加了副本跨路径迁移功能 kafka-reassign-partitions.sh,我们可以结合它和监控系统,实现自动化的负载均衡

Kafaka重要参数

  • acks

    producer收到多少broker的答复才算真的发送成功
    acks = 0 : 不接收发送结果
    acks = all 或者 -1: 表示发送消息时,不仅要写入本地日志,还要等待所有副本写入成功。
    acks = 1: 写入本地日志即可,是上述二者的折衷方案,也是默认值。

  • retries

    默认为 0,即不重试,立即失败。
    一个大于 0 的值,表示重试次数。

  • buffer.memory

    指定 producer 端用于缓存消息的缓冲区的大小,默认 32M;
    适当提升该参数值,可以增加一定的吞吐量, 但是batch太大会增大延迟,可搭配linger_ms参数使用

  • linger_ms

    如果batch太大,或者producer qps不高,batch添加的会很慢,我们可以强制在linger_ms时间后发送batch数据

  • batch.size

    producer 会将发送分区的多条数据封装在一个 batch 中进行发送,这里的参数指的就是 batch 的大小。
    该参数值过小的话,会降低吞吐量,过大的话,会带来较大的内存压力。
    默认为 16K,建议合理增加该值。

丢失数据的场景及解决方案

consumer端

不是严格意义的丢失,其实只是漏消费了。
设置了 auto.commit.enable=true ,当 consumer fetch 了一些数据但还没有完全处理掉的时候,刚好到 commit interval 触发了提交 offset 操作,接着 consumer 挂掉。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。

解决方案:
enable.auto.commit=false 关闭自动提交位移,在消息被完整处理之后再手动提交位移

producer端

I/O 线程发送消息之前,producer 崩溃, 则 producer 的内存缓冲区的数据将丢失

解决方案:

  • 同步发送,性能差,不推荐。

  • 仍然异步发送,通过“无消息丢失配置”(来自胡夕的《Apache Kafka 实战》)极大降低丢失的可能性:

    • block.on.buffer.full = true 尽管该参数在0.9.0.0已经被标记为“deprecated”,但鉴于它的含义非常直观,所以这里还是显式设置它为true,使得producer将一直等待缓冲区直至其变为可用。否则如果producer生产速度过快耗尽了缓冲区,producer将抛出异常

    • acks=all 很好理解,所有follower都响应了才认为消息提交成功,即”committed”

    • retries = MAX 无限重试,直到你意识到出现了问题:)

    • max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序

    • 使用KafkaProducer.send(record, callback)而不是send(record)方法 自定义回调逻辑处理消息发送失败

    • callback逻辑中最好显式关闭producer:close(0) 注意:设置此参数是为了避免消息乱序

    • unclean.leader.election.enable=false 关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失

    • replication.factor >= 3 这个完全是个人建议了,参考了Hadoop及业界通用的三备份原则

    • min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用

    • 保证replication.factor > min.insync.replicas 如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1 即可

如何选择Partiton的数量

  • 在创建 Topic 的时候可以指定 Partiton 数量,也可以在创建完后手动修改。但 Partiton 数量只能增加不能减少。中途增加 Partiton,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance。

  • Partition 的数量直接决定了该 Topic 的并发处理能力。但也并不是越多越好。Partition 的数量对消息延迟性会产生影响

  • 一般建议选择 Broker Num * Consumer Num,这样平均每个 Consumer 会同时读取 Broker 数目个 Partition , 这些 Partition 压力可以平摊到每台 Broker 上

controller的职责

在 kafka 集群中,某个 broker 会被选举承担特殊的角色,即控制器(controller),用于管理和协调 kafka 集群,具体职责如下:

  • 管理副本和分区的状态

  • 更新集群元数据信息

  • 创建、删除 topic

  • 分区重分配

  • leader 副本选举

  • topic 分区扩展

  • broker 加入、退出集群

  • 受控关闭

  • controller leader选举

节点异常情形

leader挂了(leader failover)

当 leader 挂了之后,controller 默认会从 ISR 中选择一个 replica 作为 leader 继续工作,条件是新 leader 必须有挂掉 leader 的所有数据

如果为了系统的可用性,而容忍降低数据的一致性的话,可以将 unclean.leader.election.enable = true,开启 kafka 的“脏 leader 选举”。当 ISR 中没有 replica,则选一个幸存的replica作为leader 继续响应请求,如此操作提高了 Kafka 的分区容忍度,但是数据一致性降低了。

broker挂了(broker failover)

broker上面有很多 partition 和多个 leader 。因此至少需要处理如下内容:

  • 更新该 broker 上所有 follower 的状态

  • 重新给 leader 在该 broker 上的 partition 选举 leader

  • 选举完成后,要更新 partition 的状态,比如谁是 leader 等

kafka 集群启动后,所有的 broker 都会被 controller 监控,一旦有 broker 宕机,ZK 的监听机制会通知到 controller, controller 拿到挂掉 broker 中所有的 partition,以及它上面的存在的 leader,然后从 partition的 ISR 中选择一个 follower 作为 leader,更改 partition 的 follower 和 leader 状态。

controller挂了(controller failover)

  • 由于每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知

  • 存活的 broker 收到 fire 的通知后,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

Zookeeper在Kafka中作用

  • 管理 broker 与 consumer 的动态加入与离开。(Producer 不需要管理,随便一台计算机都可以作为Producer 向 Kakfa Broker 发消息)

  • 触发负载均衡,当 broker 或 consumer 加入或离开时会触发负载均衡算法,使得一个 consumer group 内的多个 consumer 的消费负载平衡。(因为一个 comsumer 消费一个或多个partition,一个 partition 只能被一个 consumer 消费)

  • 维护消费关系及每个 partition 的消费信息

Page Cache带来的好处

Linux 总会把系统中还没被应用使用的内存挪来给 Page Cache,在命令行输入free,或者 cat /proc/meminfo ,“Cached”的部分就是 Page Cache。

Page Cache 中每个文件是一棵 Radix 树(又称 PAT 位树, 一种多叉搜索树),节点由 4k 大小的 Page 组成,可以通过文件的偏移量(如 0x1110001)快速定位到某个Page。

当写操作发生时,它只是将数据写入 Page Cache 中,并将该页置上 dirty 标志。

当读操作发生时,它会首先在 Page Cache 中查找,如果有就直接返回,没有的话就会从磁盘读取文件写入 Page Cache 再读取

可见,只要生产者与消费者的速度相差不大,消费者会直接读取之前生产者写入Page Cache的数据,大家在内存里完成接力,根本没有磁盘访问

而比起在内存中维护一份消息数据的传统做法,这既不会重复浪费一倍的内存,Page Cache 又不需要 GC (可以放心使用60G内存了),而且即使 Kafka 重启了,Page Cache 还依然在

参考:
[1] Kafka常见问题 [知乎]
[2] Kafka系统设计开篇
[3] Kafka史上最详细原理总结