介绍
Apache Kafka® 是 一个分布式流处理平台.
我们知道流处理平台有以下三种特性:
- 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
- 可以储存流式的记录,并且有较好的容错性。
- 可以在流式记录产生时就进行处理。
Kafka适合什么样的场景?
它可以用于两大类别的应用:
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
- 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)
首先是一些概念:
- Kafka作为一个集群,运行在一台或者多台服务器上.
- Kafka 通过 topic 对存储的流数据进行分类。
- 每条记录中包含一个key,一个value和一个timestamp(时间戳)。
Topics和日志
让我们首先深入了解下Kafka的核心概念:提供一串流式的记录— topic 。
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题.
日志中的 partition(分区)有以下几个用途。
- 第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。
- 第二,可以作为并行的单元集
分布式
日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性.
每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。
消费者
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例.
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程.
在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由Kafka协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区;如果一个实例消失,拥有的分区将被分发到剩余的实例。
Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序。每个 partition 分区按照key值排序足以满足大多数应用程序的需求。但如果你需要总记录在所有记录的上面,可使用仅有一个分区的主题来实现,这意味着每个消费者组只有一个消费者进程。
Kafka的 topic 被分割成了一组完全有序的 partition,其中每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的 consumer 组中的一个 consumer 消费。这意味着 partition 中 每一个 consumer 的位置仅仅是一个数字,即下一条要消费的消息的offset。这使得被消费的消息的状态信息相当少,每个 partition 只需要一个数字。这个状态信息还可以作为周期性的 checkpoint。这以非常低的代价实现了和消息确认机制等同的效果。
这种方式还有一个附加的好处。consumer 可以回退到之前的 offset 来再次消费之前的数据,这个操作违反了队列的基本原则,但事实证明对大多数 consumer 来说这是一个必不可少的特性。 例如,如果 consumer 的代码有 bug,并且在 bug 被发现前已经有一部分数据被消费了, 那么 consumer 可以在 bug 修复后通过回退到之前的 offset 来再次消费这些数据。
持久化
Kafka 对消息的存储和缓存严重依赖于文件系统。现代操作系统提供了 read-ahead 和 write-behind 技术,read-ahead 是以大的 data block 为单位预先读取数据,而 write-behind 是将多个小型的逻辑写合并成一次大型的物理磁盘写入。关于该问题的进一步讨论可以参考 ACM Queue article,他们发现实际上顺序磁盘访问在某些情况下比随机内存访问还要快!
这里给出了一个非常简单的设计:相比于维护尽可能多的 in-memory cache,并且在空间不足的时候匆忙将数据 flush 到文件系统,我们把这个过程倒过来。所有数据一开始就被写入到文件系统的持久化日志中,而不用在 cache 空间不足的时候 flush 到磁盘。实际上,这表明数据被转移到了内核的 pagecache 中。
使用文件系统和 pagecache 显得更有优势–我们可以通过自动访问所有空闲内存将可用缓存的容量至少翻倍,并且通过存储紧凑的字节结构而不是独立的对象,有望将缓存容量再翻一番。 这样使得32GB的机器缓存容量可以达到28-30GB,并且不会产生额外的 GC 负担。此外,即使服务重新启动,缓存依旧可用,而 in-process cache 则需要在内存中重建(重建一个10GB的缓存可能需要10分钟),否则进程就要从 cold cache 的状态开始(这意味着进程最初的性能表现十分糟糕)。 这同时也极大的简化了代码,因为所有保持 cache 和文件系统之间一致性的逻辑现在都被放到了 OS 中,这样做比一次性的进程内缓存更准确、更高效。如果你的磁盘使用更倾向于顺序读取,那么 read-ahead 可以有效的使用每次从磁盘中读取到的有用数据预先填充 cache。
持久化队列可以建立在简单的读取和向文件后追加两种操作之上,这和日志解决方案相同。这种架构的优点在于所有的操作复杂度都是O(1),而且读操作不会阻塞写操作,读操作之间也不会互相影响。这有着明显的性能优势,在不产生任何性能损失的情况下能够访问几乎无限的硬盘空间,这意味着我们可以提供一些其它消息系统不常见的特性。例如:在 Kafka 中,我们可以让消息保留相对较长的一段时间(比如一周),而不是试图在被消费后立即删除。正如我们后面将要提到的,这给消费者带来了很大的灵活性。
优化
减少数据拷贝
使用 sendfile 方法,可以允许操作系统将数据从 pagecache 直接发送到网络,这样避免重新复制数据。所以这种优化方式,只需要最后一步的copy操作,将数据复制到 NIC 缓冲区。pagecache 和 sendfile 的组合使用意味着,在一个kafka集群中,大多数 consumer 消费时,您将看不到磁盘上的读取活动,因为数据将完全由缓存提供。端到端批量压缩
Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 consumer 消费时解压缩。
生产者
负载均衡
生产者直接发送数据到主分区的服务器上,不需要经过任何中间路由。为了让生产者实现这个功能,所有的 kafka 服务器节点都能响应这样的元数据请求: 哪些服务器是活着的,主题的哪些分区是主分区,分配在哪个服务器上,这样生产者就能适当地直接发送它的请求到服务器上。
客户端控制消息发送数据到哪个分区,这个可以实现随机的负载均衡方式, 或者使用一些特定语义的分区函数。 我们有提供特定分区的接口让用于根据指定的键值进行hash分区(当然也有选项可以重写分区函数),例如,如果使用用户ID作为key,则用户相关的所有数据都会被分发到同一个分区上。
异步发送
批处理是提升性能的一个主要驱动,为了允许批量处理,kafka 生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。 批处理,不仅仅可以配置指定的消息数量,也可以指定等待特定的延迟时间(如64k 或10ms),这允许汇总更多的数据后再发送,在服务器端也会减少更多的IO操作。 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量。
消息交互语义
Kafka可以提供的消息交付语义保证有多种:
At most once——消息可能会丢失但绝不重传。
At least once——消息可以重传但绝不丢失。
在 0.11.0.0 之前的版本中, 如果 producer 没有收到表明消息已经被提交的响应, 那么 producer 除了将消息重传之外别无选择。 这里提供的是 at-least-once 的消息交付语义,因为如果最初的请求事实上执行成功了,那么重传过程中该消息就会被再次写入到 log 当中。Exactly once——这正是人们想要的, 每一条消息只被传递一次.
从 0.11.0.0 版本开始,Kafka producer新增了幂等性的传递选项,该选项保证重传不会在 log 中产生重复条目。 为实现这个目的, broker 给每个 producer 都分配了一个 ID ,并且 producer 给每条被发送的消息分配了一个序列号来避免产生重复的消息。 同样也是从 0.11.0.0 版本开始, producer 新增了使用类似事务性的语义将消息发送到多个 topic partition 的功能: 也就是说,要么所有的消息都被成功的写入到了 log,要么一个都没写进去。这种语义的主要应用场景就是 Kafka topic 之间的 exactly-once 的数据传递。
并非所有使用场景都需要这么强的保证。对于延迟敏感的应用场景,我们允许生产者指定它需要的持久性级别。如果 producer 指定了它想要等待消息被提交,则可以使用10ms的量级。然而, producer 也可以指定它想要完全异步地执行发送,或者它只想等待直到 leader 节点拥有该消息(follower 节点有没有无所谓)。
现在让我们从 consumer 的视角来描述语义。
假设 consumer 要读取一些消息——它有几个处理消息和更新位置的选项。
Consumer 可以先读取消息,然后将它的位置保存到 log 中,最后再对消息进行处理。在这种情况下,消费者进程可能会在保存其位置之后,带还没有保存消息处理的输出之前发生崩溃。而在这种情况下,即使在此位置之前的一些消息没有被处理,接管处理的进程将从保存的位置开始。在 consumer 发生故障的情况下,这对应于“at-most-once”的语义,可能会有消息得不到处理。
Consumer 可以先读取消息,然后处理消息,最后再保存它的位置。在这种情况下,消费者进程可能会在处理了消息之后,但还没有保存位置之前发生崩溃。而在这种情况下,当新的进程接管后,它最初收到的一部分消息都已经被处理过了。在 consumer 发生故障的情况下,这对应于“at-least-once”的语义。 在许多应用场景中,消息都设有一个主键,所以更新操作是幂等的(相同的消息接收两次时,第二次写入会覆盖掉第一次写入的记录)。
多副本
创建副本的单位是 topic 的 partition ,正常情况下, 每个分区都有一个 leader 和零或多个 followers 。 总的副本数是包含 leader 的总和。 所有的读写操作都由 leader 处理,一般 partition 的数量都比 broker 的数量多的多,各分区的 leader 均 匀的分布在brokers 中。所有的 followers 节点都同步 leader 节点的日志,日志中的消息和偏移量都和 leader 中的一致。(当然, 在任何给定时间, leader 节点的日志末尾时可能有几个消息尚未被备份完成)。
Followers 节点就像普通的 consumer 那样从 leader 节点那里拉取消息并保存在自己的日志文件中。Followers 节点可以从 leader 节点那里批量拉取消息日志到自己的日志文件中。
与大多数分布式系统一样,自动处理故障需要精确定义节点 “alive” 的概念。Kafka 判断节点是否存活有两种方式。
- 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接。
- 如果节点是个 follower ,它必须能及时的同步 leader 的写操作,并且延时不能太久。
我们认为满足这两个条件的节点处于 “in sync” 状态,区别于 “alive” 和 “failed” 。 Leader会追踪所有 “in sync” 的节点。如果有节点挂掉了, 或是写超时, 或是心跳超时, leader 就会把它从同步副本列表中移除。 同步超时和写超时的时间由 replica.lag.time.max.ms 配置确定。
在所有时间里,Kafka 保证只要有至少一个同步中的节点存活,提交的消息就不会丢失。
Kafka分配Replica的算法如下:
- 将所有Broker(假设共n个Broker)和待分配的Partition排序
- 将第i个Partition分配到第(i mod n)个Broker上
- 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
可用性和持久性保证
向 Kafka 写数据时,producers 设置 ack 是否提交完成,
- 0:不等待broker返回确认消息,
- 1: leader保存成功返回或,
- -1(all): 所有备份都保存成功返回.
请注意. 设置 “ack = all” 并不能保证所有的副本都写入了消息。默认情况下,当 acks = all 时,只要 ISR 副本同步完成,就会返回消息已经写入。例如,一个 topic 仅仅设置了两个副本,那么只有一个 ISR 副本,那么当设置acks = all时返回写入成功时,剩下了的那个副本数据也可能数据没有写入。 尽管这确保了分区的最大可用性,但是对于偏好数据持久性而不是可用性的一些用户,可能不想用这种策略,因此,我们提供了两个topic 配置,可用于优先配置消息数据持久性:
禁用 unclean leader 选举机制 - 如果所有的备份节点都挂了,分区数据就会不可用,直到最近的 leader 恢复正常。这种策略优先于数据丢失的风险, 参看上一节的 unclean leader 选举机制。
指定最小的 ISR 集合大小,只有当 ISR 的大小大于最小值,分区才能接受写入操作,以防止仅写入单个备份的消息丢失造成消息不可用的情况,这个设置只有在生产者使用 acks = all 的情况下才会生效,这至少保证消息被 ISR 副本写入。此设置是一致性和可用性 之间的折衷,对于设置更大的最小ISR大小保证了更好的一致性,因为它保证将消息被写入了更多的备份,减少了消息丢失的可能性。但是,这会降低可用性,因为如果 ISR 副本的数量低于最小阈值,那么分区将无法写入。
高可用
ISR
Kafka 动态维护了一个同步状态的备份的集合 (a set of in-sync replicas), 简称 ISR ,在这个集合中的节点都是和 leader 保持高度一致的,只有这个集合的成员才 有资格被选举为 leader,一条消息必须被这个集合 所有 节点读取并追加到日志中了,这条消息才能视为提交。这个 ISR 集合发生变化会在 ZooKeeper 持久化,正因为如此,这个集合中的任何一个节点都有资格被选为 leader 。因为 ISR 模型和 f+1 副本,一个 Kafka topic 冗余 f 个节点故障而不会丢失任何已经提交的消息。
Kafka 对于数据不会丢失的保证,是基于至少一个节点在保持同步状态,一旦分区上的所有备份节点都挂了,就无法保证了。但是,实际在运行的系统需要去考虑假设一旦所有的备份都挂了,怎么去保证数据不会丢失,这里有两种实现的方法
- 等待一个 ISR 的副本重新恢复正常服务,并选择这个副本作为领 leader (它有极大可能拥有全部数据)。
- 选择第一个重新恢复正常服务的副本(不一定是 ISR 中的)作为leader。
kafka 默认选择第二种策略,当所有的 ISR 副本都挂掉时,会选择一个可能不同步的备份作为 leader ,可以配置属性 unclean.leader.election.enable 禁用此策略,那么就会使用第 一种策略即停机时间优于不同步。
如何选举Leader
最简单最直观的方案是,所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
但是该方法会有3个问题:
split-brain 这是由Zookeeper的特性引起的,虽然Zookeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
Zookeeper负载过重 每个Replica都要为此在Zookeeper上注册一个Watch,当集群规模增加到几千个Partition时Zookeeper负载会过重。
Kafka 0.8.* 的Leader Election方案解决了上述问题,它 在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。如果 controller 节点挂了,其他 存活的 broker 都可能成为新的 controller 节点。
【更详细的分析可看】Kafka设计解析(二)- Kafka High Availability (上)
Custom Rebalance
Consumer Rebalance 的算法如下:
- 将目标 Topic 下的所有 Partirtion 排序,存于 PT
- 对某 Consumer Group 下所有 Consumer 排序,存于 CG,第 i 个 Consumer 记为 Ci
- N=size(PT)/size(CG),向上取整
- 解除 Ci 对原来分配的 Partition 的消费权(i 从 0 开始)
- 将第 i * N 到(i+1)* N−1 个 Partition 分配给 Ci
根据 Kafka 社区 wiki,Kafka 作者正在考虑在还未发布的 0.9.x 版本中使用中心协调器 (Coordinator) 。大体思想是为所有 Consumer Group 的子集选举出一个 Broker 作为 Coordinator,由它 Watch Zookeeper,从而判断是否有 Partition 或者 Consumer 的增减,然后生成 Rebalance 命令,并检查是否这些 Rebalance 在所有相关的 Consumer 中被执行成功,如果不成功则重试,若成功则认为此次 Rebalance 成功(这个过程跟 Replication Controller 非常类似):
Consumer
1) Consumer 启动时,先向 Broker 列表中的任意一个 Broker 发送 ConsumerMetadataRequest,并通过 ConsumerMetadataResponse 获取它所在 Group 的 Coordinator 信息。
2)Consumer 连接到 Coordinator 并发送 HeartbeatRequest:
如果返回的 HeartbeatResponse 没有任何错误码,Consumer 继续 fetch 数据。
若其中包含 IllegalGeneration 错误码,即说明 Coordinator 已经发起了 Rebalance 操作,此时 Consumer 停止 fetch 数据,commit offset,并发送 JoinGroupRequest 给它的 Coordinator,并在 JoinGroupResponse 中获得它应该拥有的所有 Partition 列表和它所属的 Group 的新的 Generation ID。此时 Rebalance 完成,Consumer 开始 fetch 数据。
故障检测机制
Consumer 成功加入 Group 后,Consumer 和相应的 Coordinator 同时开始故障探测程序。
Consumer 向 Coordinator 发起周期性的 Heartbeat(HeartbeatRequest)并等待响应,该
周期为 session.timeout.ms/heartbeat.frequency。
若 Consumer 在 session.timeout.ms 内未收到 HeartbeatResponse,或者发现相应的 Socket channel 断开,它即认为 Coordinator 已宕机并启动 Coordinator 探测程序。
若 Coordinator 在 session.timeout.ms 内没有收到一次 HeartbeatRequest,则它将该 Consumer 标记为宕机状态并为其所在 Group 触发一次 Rebalance 操作。
Coordinator Failover 过程中,Consumer 可能会在新的 Coordinator 完成 Failover 过程之前或之后发现新的 Coordinator 并向其发送 HeatbeatRequest。
对于后者,新的 Cooodinator 可能拒绝该请求,致使该 Consumer 重新探测 Coordinator 并发起新的连接请求。
如果该 Consumer 向新的 Coordinator 发送连接请求太晚,新的 Coordinator 可能已经在此之前将其标记为宕机状态而将之视为新加入的 Consumer 并触发一次 Rebalance 操作。
Coordinator
1)稳定状态下,Coordinator 通过上述故障探测机制跟踪其所管理的每个 Group 下的每个 Consumer 的健康状态。
2)刚启动时或选举完成后,Coordinator 从 Zookeeper 读取它所管理的 Group 列表及这些 Group 的成员列表。如果没有获取到 Group 成员信息,它不会做任何事情直到某个 Group 中有成员注册进来。
3)在 Coordinator 完成加载其管理的 Group 列表及其相应的成员信息之前,它将为 HeartbeatRequest,OffsetCommitRequest 和 JoinGroupRequests 返回 CoordinatorStartupNotComplete 错误码。此时,Consumer 会重新发送请求。
4)Coordinator 会跟踪被其所管理的任何 Consumer Group 注册的 Topic 的 Partition 的变化,并为该变化触发 Rebalance 操作。创建新的 Topic 也可能触发 Rebalance,因为 Consumer 可以在 Topic 被创建之前就已经订阅它了。
Coordinator 发起 Rebalance 操作流程如下所示。
Coordinator Failover
如前文所述,Rebalance 操作需要经历如下几个阶段
1)Topic/Partition 的改变或者新 Consumer 的加入或者已有 Consumer 停止,触发 Coordinator 注册在 Zookeeper 上的 watch,Coordinator 收到通知准备发起 Rebalance 操作。
2)Coordinator 通过在 HeartbeatResponse 中返回 IllegalGeneration 错误码发起 Rebalance 操作。
3)Consumer 发送 JoinGroupRequest
4)Coordinator 在 Zookeeper 中增加 Group 的 Generation ID 并将新的 Partition 分配情况写入 Zookeeper
5)Coordinator 发送 JoinGroupResponse