Kafka

初步概念

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调[1]的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等

从早期的论文Kafka: a Distributed Messaging System for Log Processing可以看出,kafka最初是设计用于处理大量数据日志的。

消息订阅模式

1. 点对点模式(Point-to-Point, Queue 模式)

  • 特点:一个消息只能被一个消费者接收。

  • 生产者把消息发送到队列里,多个消费者竞争消费,但每条消息只会被其中一个消费掉。

  • 应用场景:任务分发、工作队列。

  • 例子(JMS Queue / RabbitMQ Queue):

  • // 假设用 JMS
    Queue queue = session.createQueue("order-queue");
    MessageConsumer consumer = session.createConsumer(queue);
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

    ### 2. **发布-订阅模式(Pub/Sub)**

    - **特点**:消息会广播给所有订阅者。

    - 生产者发消息到 Topic,所有订阅该 Topic 的消费者都能收到。

    - **应用场景**:新闻推送、通知广播、多服务监听。

    - **例子**(JMS Topic / Kafka Topic):

    - ```
    Topic topic = session.createTopic("news-topic");
    MessageConsumer consumer = session.createConsumer(topic);

设计架构

kafka的数据流的特殊格式为topic主题,一个consumer消费者从broker(经纪人,在这里是kafka的实例)可以订阅一个或者多个topic

要发布一个消息,生产者可以使用Byte类型,意味着由生产者决定如何序列化,并且发布到指定的topic上。

1
2
3
4
producer = new Producer(...);
message = new Message(“test message str”.getBytes());
set = new MessageSet(message);
producer.send(“topic1”, set);

要订阅一个主题(topic),消费者首先会为该主题创建一个或多个消息流(message stream)。
发布到该主题的消息会被均匀地分配到这些子流中。每个消息流都提供了一个迭代器接口,用于遍历源源不断产生的消息。消费者随后通过遍历流中的每一条消息来处理其载荷(payload)。与传统迭代器不同,消息流迭代器永远不会终止。
如果当前没有新的消息可供消费,迭代器会阻塞,直到有新的消息发布到该主题。

1
2
3
4
5
streams[] = Consumer.createMessageStreams(“topic1”, 1)
for (message : streams[0]) {
bytes = message.payload();
// do something with the bytes
}

image-20250930142805175

Producer:Producer即生产者,消息的产生者,是消息的入口。

Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等

Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。

Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!

Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

Message:每一条发送的消息主体。Consumer:消费者,即消息的消费方,是消息的出口。

Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!

Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

存储布局

Kafka 具有非常简单的存储布局。一个主题(topic)的每个分区(partition)对应一个逻辑日志(logical log)。在物理层面上,一个日志被实现为一组大小大致相同的段文件(segment file)(例如,1GB)。

每当生产者向某个分区发布消息时,broker 只需将该消息追加写入最后一个段文件。
为了获得更好的性能,段文件并不是每写一次就刷盘,而是在 发布了可配置数量的消息 或者 经过了一定时间 后才会刷盘到磁盘。只有当消息被刷盘后,它才会对消费者可见。

与典型的消息系统不同,Kafka 中存储的消息没有显式的消息 ID。相反,每条消息通过其在日志中的逻辑偏移量(offset)来寻址。这样避免了维护额外的、需要频繁随机访问的索引结构(将消息 ID 映射到消息实际位置)的开销。需要注意的是:我们的消息 ID 是递增的,但不是连续的。要计算下一条消息的 ID,我们需要将当前消息的长度加到它的 ID 上。从此以后,我们会将消息 ID 和偏移量(offset)交替使用。

消费者始终顺序地消费某个分区中的消息。如果消费者确认(acknowledge)了某个消息的偏移量,就意味着它已经收到了该分区中该偏移量之前的所有消息。

在底层,消费者会向 broker 发送异步的拉取请求(pull request),以便提前准备好一批数据供应用消费。每个拉取请求包含:

  • 开始消费的消息偏移量
  • 可接受的最大字节数

每个 broker 在内存中维护一个已排序的偏移量列表,包括每个段文件中第一条消息的偏移量。broker 会通过查找该偏移量列表,定位到所请求的消息所在的段文件,并将数据返回给消费者。

当消费者收到一条消息后,会计算出下一条消息的偏移量,并在下一次拉取请求中使用它。

image-20250930144654175

Partition结构

每个topic可以划分为一个或者多个Partition,在服务器上的表现为一段连续的磁盘存储,每个Partition下有多个Segment文件,每个Segment包含.index文件,.log文件,.timeindex文件。其中log文件为具体的消息,.index和.timeindex为索引。

偏移量索引(Offset Index)

  • 每个 Partition 都有一个 .index 文件。
  • 它是一个稀疏索引(sparse index),记录 逻辑 offset → 物理文件位置(position) 的映射。
  • Kafka 并不会为每条消息建立索引,而是隔固定字节数(默认约 4KB)写入一条索引。
  • 查找时:Kafka 先在索引文件里二分查找,找到最接近的 offset 对应的物理位置,再顺序扫描到目标消息。

时间戳索引(Time Index)

  • 每个 Partition 还有一个 .timeindex 文件。
  • 用于记录 时间戳 → 物理文件位置 的映射。
  • 查找消息时,如果用户按时间查找,Kafka 会在这个文件里二分查找,定位大致位置,然后再回到 .index 文件+日志文件扫描。
image-20250930145151958

.index是辅助查询的稀疏索引,而.log文件是真正的记录,要查找offset,需要经过稀疏索引的二次跳转。

image-20250930145739750

  1. 例如查找offset为368801的物理位置,先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。

  2. 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。

  3. 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!

message结构

在 Kafka 中,每条消息被封装成一个 Message,存储在 Partition 的 Log 中,主要包含以下部分:

字段 含义
Offset 分区内的唯一序号,标识消息在分区中的位置。消费者依赖它来跟踪消费进度。
Key 可选,用于分区路由(Producer 根据 Key 选择 Partition)。
Value 消息的实际内容,即业务数据。
Timestamp 消息的时间戳(创建时间或日志时间),可选用于时间索引。
Headers 可选的元数据,用于存储附加信息(比如 traceId、消息类型等)。
CRC 校验码,用于检测消息在存储或传输过程中的损坏。

注意:Kafka 0.10+ 引入了 批量消息(MessageBatch/MessageSet),为了减少磁盘 I/O,会把多条消息打包存储在同一段日志中。

存储策略

Kafka 支持两种策略删除旧消息:

  1. 按时间(log.retention.ms):保留最近 N 毫秒的消息,超过则删除。默认为7天。
  2. 按大小(log.retention.bytes):保留日志总大小,超过则删除最老的 segment。
  3. 压缩策略(log.cleanup.policy=compact)
    • 对于有 Key 的消息,同一个 Key 只保留最新的消息。
    • 适合状态消息、去重场景

定位消息

假设要读取 offset = O 的消息:

  1. 定位 Segment
    • Kafka 用 partition 的 segment 列表 + base offset 找到包含 offset O 的 segment。
    • 因为 segment 按 base offset 排序 → 可以用 二分查找 → 时间复杂度 O(log S),S = segment 数量。
  2. 查找 offset 在 segment 内的位置
    • 通过 offset 索引(稀疏索引),先定位到最接近的索引 entry → 索引查找可视作 二分查找 → 时间复杂度 O(log N_idx),N_idx = segment 内索引条目数。
    • 然后从索引位置向前或向后扫描若干条消息,找到确切 offset → 最多扫描 N_sparse 条消息(稀疏索引步长,通常 4KB 对应 ~1000 条消息),这个可以视为 常数级操作
  3. 读取消息
    • 消息在文件中是顺序存储的,找到位置后直接从页缓存或磁盘读取 → 顺序 I/O,非常快。

综合时间复杂度

  • 查找 segmentO(log S)
  • 查找 segment 内 offsetO(log N_idx)
  • 总复杂度: 接近 O(log S + log N_idx)
  • 读取消息:顺序 I/O,通常视为 O(1)(在页缓存命中情况下)

实际上,S(segment 数量)和 N_idx(segment 内索引条目数)都非常小,N_sparse 通常 1k 条左右,所以查找非常快

数据访问流程(存储角度)

  1. Producer 写消息 → append 到分区 segment → 写入页缓存(顺序写)。
  2. Broker 定期刷新页缓存到磁盘(fsync),或者由操作系统异步刷盘。
  3. Consumer 拉消息 → 从页缓存读取 → 消费后提交 offset。

生产者生产

image-20250930165926452

消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:

image-20250930170011478

分区的主要目的是:

  1. 方便扩展:因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
  2. 提高并发:以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?

kafka中有几个原则:partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。如果既没指定partition,又没有设置key,则会轮询选出一个partition。保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?

其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。

0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。

1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。

all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

消费者消费

消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是发布订阅模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!如下图:

image-20250930171712978

消费者组

Kafka 中的消费者组(Consumer Group)是消息消费机制的核心,决定了消息的分发方式和并发消费能力。

基本概念

  • 消费者组:由一个或多个消费者实例组成,隶属于同一个 group.id。
  • 消费者组内的每个消费者实例共同消费一个或多个 topic 的消息,同一分区(Partition)的消息,在一个消费者组内,只会被组内一个消费者消费
  • 一个 topic 的每个 partition,在同一个消费者组内只能被一个消费者实例独占消费,但不同的消费者组之间可以相互独立,互不干扰。

工作原理

  • 一个 topic 可被多个消费者组订阅,每个组都能完整消费所有消息副本,相互之间互不影响。
  • 消费者组内部做负载均衡:每当消费者数量变化或分区数量发生变化时,都会自动”rebalance”(重分配)分区给消费者,从而实现分布式并发消费。
  • 消费进度(offset)由消费者组维护,存储在 Kafka 内部特殊的 __consumer_offsets topic 中。
  • 消息一次只会被同组的“某一个消费者”消费,但允许多个消费者组同时独立消费同一消息。

应用场景

  • 广播模式/多系统独立消费:每个系统建立自己的消费者组,实现消息广播。例如日志分析、实时监控、缓冲队列等场景。
    Kafka中有消费者组的概念,每一个topic消费时会发布到所有订阅该消息的消费者组上,但是只会被消费者组中的其中一个消费者消费。

高效传输

们在 Kafka 的数据进出传输上非常谨慎。之前我们已经展示过,生产者可以在一次发送请求中提交一组消息。尽管最终的消费者 API 是一次迭代一条消息,但在底层,每次消费者的拉取请求实际上也会获取多条消息,直到达到某个大小限制,通常是数百 KB。

我们做的另一个非常规选择是:避免在 Kafka 层显式地将消息缓存到内存中。相反,我们依赖底层文件系统的页缓存(page cache)。这样做的主要好处是避免了“双重缓冲”——消息只会缓存在页缓存中。(一般的消息队列可能会在进程中缓存)

这还有一个额外的好处:即使 broker 进程重启,缓存依然是“热的”。(页缓存是OS操控的,只要机器不重启,只重启实例,页缓存依然存在)

由于 Kafka 完全不在进程中缓存消息,它在垃圾回收内存方面的开销非常小,从而使基于虚拟机(VM)的语言中实现高效运行成为可能。(如果 Kafka 在 JVM 堆里缓存大量消息,那么这些对象要由 GC 管理。)

最后,因为生产者和消费者都是顺序访问段文件(segment files),并且消费者通常只比生产者落后很少一部分,所以操作系统的常见缓存机制非常有效(特别是写直通缓存和预读机制)。我们发现生产和消费的性能都能与数据规模保持线性增长,甚至能处理多 TB 的数据。

Kafka 写入是 顺序写日志文件

消费者拉取数据时,也是按顺序读取。

操作系统的页缓存对顺序 I/O 有两大优化:

  • 写直通缓存(write-back caching):写数据先放缓存,批量刷盘。
  • 预读(read-ahead):读数据时,OS 会提前把后面的数据页读进缓存。

因为生产/消费几乎是紧密跟随的,OS 缓存的命中率极高 → 性能非常好

此外,我们还对消费者的网络访问进行了优化。Kafka 是一个多订阅者系统,一条消息可能会被不同的消费者应用多次消费。通常,将本地文件中的字节发送到远程 socket 的步骤如下:

  1. 从存储介质读取数据到操作系统的页缓存;
  2. 将页缓存的数据复制到应用缓冲区;
  3. 将应用缓冲区复制到另一个内核缓冲区;
  4. 将内核缓冲区的数据发送到 socket。

这个过程涉及 4 次数据拷贝和 2 次系统调用。在 Linux 和其他 Unix 系统上,存在一个 sendfile API,它可以直接将文件通道中的字节传输到 socket 通道。这通常能避免第 (2) 和 (3) 步引入的 2 次拷贝和 1 次系统调用。Kafka 利用 sendfile API 高效地将日志段文件中的字节从 broker 传输给消费者。

注:这是零拷贝技术,用于操作IO接口或者Socket接口无论如何都会进入内核态,而可以跳过用户态,因此在内核态之间的数据传输可以避免用户态的转换。

无状态 broker

与大多数其他消息系统不同,在 Kafka 中,消费者消费到什么位置的信息并不是由 broker 维护的,而是由消费者自己维护。这种设计减少了 broker 的复杂性和开销。但这也带来了删除消息的难题,因为 broker 并不知道所有订阅者是否都消费了该消息。Kafka 用一种简单的基于时间的 SLA 作为保留策略来解决这个问题:如果一条消息在 broker 中保存超过一定时间(通常是 7 天),就会被自动删除。这个方案在实践中效果很好。大多数消费者(包括离线消费者)都会按日、按小时或实时完成消费。而且,由于 Kafka 的性能不会随着数据量增大而下降,使得长时间保留成为可行的。

这种设计还有一个重要的副作用:消费者可以有意回退到旧的偏移量(offset)重新消费数据。这虽然违反了队列的一般契约,但对很多消费者来说却是必不可少的功能。举例来说,当消费者应用逻辑出错时,修复错误后可以重新回放某些消息。这对将数据加载到数据仓库或 Hadoop 系统的 ETL 流程尤其重要。再比如,消费的数据可能只是定期(如批量)刷新到持久存储(例如全文索引器)。如果消费者在刷新前崩溃,未刷新的数据就会丢失。在这种情况下,消费者可以记录下未刷新消息的最小 offset,并在重启后从该 offset 重新消费。需要注意的是,相比推模式(push model),在拉模式(pull model)下支持消费者回退要容易得多。

注:Broker 只存储消息,不关心谁消费过

  • Broker 是“存储和分发消息的系统”,不记录每个消费者的消费进度
  • 好处:
    • 简化 Broker 设计
    • 高性能(不需要频繁更新元数据)
    • 消费者可以灵活管理消费策略(手动/自动/回溯)

消费者维护 offset

  • 早期 Kafka(0.8.x 及之前):
    • offset 存储在 Zookeeper
    • 消费者在消费消息后,定期提交 offset
  • 现在 Kafka(0.9+):
    • offset 可以存储在 Kafka 内部的特殊 topic __consumer_offsets
    • 消费者拉取、提交 offset → Kafka 仅做存储和分发

消费者可以自由回溯或跳过

  • 因为消费者维护 offset,它可以:
    • 回退 offset → 重复消费历史消息
    • 跳过 offset → 跳过消息

分区协调

现在我们描述生产者和消费者在分布式环境下的行为。每个生产者可以将消息发布到随机选择的分区,或者通过分区键和分区函数语义地决定分区。我们将重点放在消费者与 broker 的交互上。

Kafka 有 消费者组(consumer groups) 的概念。每个消费者组由一个或多个消费者组成,共同消费一组订阅的主题。也就是说,每条消息只会被组内的一个消费者接收。而不同的消费者组则会彼此独立地消费完整的订阅消息集,组与组之间不需要协调。组内的消费者可以运行在不同的进程或不同的机器上。我们的目标是:将 broker 中存储的消息尽可能平均地分配给消费者,同时避免过多的协调开销。

我们的第一个决定是:将主题中的分区作为最小的并行单元。这意味着在任何时刻,每个分区的所有消息,只会被组内的某一个消费者消费。如果允许多个消费者同时消费一个分区,它们就必须协调谁消费哪些消息,从而引入锁和状态维护的额外开销。相比之下,在我们的设计中,消费者进程只需要在负载重新分配(rebalance)时协调,而这是不频繁发生的。为了让负载真正均衡,我们要求一个主题的分区数远多于每个组内的消费者数。通过对主题进行过度分区(over-partitioning),我们可以很容易做到这一点。

我们的第二个决定是:不设置中心化的 master 节点,而是让消费者以去中心化的方式自行协调。增加一个 master 节点会使系统更复杂,还需要考虑 master 故障问题。为实现协调,我们使用了一个高可用的一致性服务 —— Zookeeper

Zookeeper 提供一个非常简单、类似文件系统的 API:可以创建路径、设置路径的值、读取路径的值、删除路径、列出子路径。除此之外,它还能做一些有趣的事情:
(a) 可以在路径上注册 watcher,当路径的值或子路径发生变化时会收到通知;
(b) 路径可以创建为 临时(ephemeral) 的(而不是持久的),这意味着如果创建它的客户端消失,该路径会被 Zookeeper 自动删除;
(c) Zookeeper 会将数据复制到多个服务器,使得数据高度可靠和可用。

Kafka 使用 Zookeeper 来完成以下任务:

  1. 监测 broker 和消费者的加入与移除;
  2. 当上述事件发生时触发每个消费者的重新分配(rebalance)流程;
  3. 维护消费关系,并记录每个分区的消费偏移量(offset)。

具体来说,当每个 broker 或消费者启动时,它会把自身信息存储在 Zookeeper 的 broker 或 consumer 注册表中。

  • broker 注册表:包含 broker 的主机名、端口,以及其存储的主题和分区集合。
  • consumer 注册表:包含消费者所属的消费者组,以及它订阅的主题集合。

每个消费者组在 Zookeeper 中还有一个 分区所有权注册表(ownership registry) 和一个 偏移量注册表(offset registry)

  • 所有权注册表:为每个订阅分区创建一个路径,路径的值为当前正在消费该分区的消费者 id(我们称该消费者“拥有”该分区);
  • 偏移量注册表:为每个订阅分区存储最近消费过的消息的 offset。

在 Zookeeper 中,broker 注册表、consumer 注册表和所有权注册表的路径是临时的(ephemeral),而偏移量注册表是持久的(persistent)。因此:

  • 如果一个 broker 故障,其上的所有分区会自动从 broker 注册表中移除;
  • 如果一个消费者故障,它会丢失在 consumer 注册表中的记录以及它在所有权注册表中拥有的所有分区。

每个消费者都会在 broker 注册表和 consumer 注册表上注册 Zookeeper watcher,当 broker 集合或消费者组发生变化时会收到通知。

每个消费者组都有自己的 offset 注册表__consumer_offsets 内部主题),彼此独立。

一个组的消费进度不会影响另一个组

在消费者初次启动时,或者通过 watcher 收到 broker/consumer 变化的通知时,消费者会发起一次 重新分配(rebalance) 流程,以确定它应该消费的新分区子集。该流程在算法 1 中描述:

  • 消费者通过 Zookeeper 读取 broker 和 consumer 注册表,先计算出每个订阅主题 T 的可用分区集合 (P_T),以及订阅 T 的消费者集合 (C_T);
  • 然后将 P_T 分区成 |C_T| 份,并确定性地选择其中一份作为自己的分区;
  • 对于它选择的每个分区,消费者会在所有权注册表中写入自己作为该分区的新 owner;
  • 最后,消费者启动线程,从 offset 注册表中记录的偏移量开始拉取数据。当消息被消费时,消费者会定期更新该分区最新的消费 offset。

当组内有多个消费者时,它们都会收到 broker 或消费者变化的通知。但这些通知到达的时间可能略有不同,因此可能会出现某个消费者试图获取另一个消费者仍在拥有的分区的情况。这时,第一个消费者会简单地释放自己当前拥有的所有分区,稍等一会儿后重新尝试 rebalance。实践中,rebalance 流程通常在几次重试后就能稳定下来。

当一个新的消费者组被创建时,偏移量注册表中是没有任何 offset 的。在这种情况下,消费者会从每个订阅分区上可用的最小或最大 offset 开始消费(具体取决于配置),通过我们在 broker 上提供的 API 来实现。

分发保障

一般来说,Kafka 只保证 至少一次(at-least-once) 的消息投递。精确一次(exactly-once) 投递通常需要使用 两阶段提交(two-phase commits),对于我们的应用场景并不必要。大多数情况下,每条消息都会被 准确地投递一次 到每个消费者组。然而,如果某个消费者进程在没有干净关闭的情况下崩溃,接管这些分区的其他消费者可能会收到一些重复消息,这些消息位于最后一次成功提交到 Zookeeper 的偏移量之后。如果应用程序对重复消息敏感,就必须自行实现 去重逻辑,可以使用我们返回给消费者的偏移量,或者利用消息中的某个唯一键。这通常比使用两阶段提交更具成本效益。

Kafka 保证来自同一分区的消息会 按顺序投递给消费者,但对于来自不同分区的消息,不保证顺序

注:如果一个partition上的消息发给consumer后没有收到ack和offset,kafka实际上并不知道这个消息是否成功被消费了,因此会尝试重复通知consumer拉取消息,这时候就可能出现重复消费的问题。因此消费者需要自己保证:

去重(使用消息唯一 key 或缓存 offset);

幂等处理(业务处理可重复执行不会影响结果)。

为了避免日志损坏,Kafka 会为每条消息在日志中存储 CRC 校验值。如果 Broker 出现任何 I/O 错误,Kafka 会运行恢复流程,删除那些 CRC 不一致的消息。在消息级别保存 CRC 还可以让我们在消息被生产或消费后,检查网络传输错误。

如果某个 Broker 崩溃,其上存储但尚未被消费的消息将 变得不可用。如果该 Broker 的存储系统永久损坏,任何未消费的消息将 永久丢失

未来,Kafka 计划增加 内置副本机制,将每条消息冗余存储到多个 Broker 上,从而提高可靠性。【这一个特性在kafka上已经实现了】

kafka性能测试

image-20250930174512189

Producer 测试

我们将所有系统的 Broker 配置为异步刷新消息到持久化存储。每个系统运行一个 Producer,发布总共 1000 万条消息,每条 200 字节。Kafka Producer 配置为批量发送,批大小为 1 和 50。ActiveMQ 和 RabbitMQ 没有方便的批量方式,我们假设其批大小为 1。结果如图 4 所示:x 轴表示随时间发送到 Broker 的数据量(MB),y 轴表示生产者吞吐量(条/秒)。平均而言,Kafka 批量大小为 1 时可达 50,000 条/秒,批量大小为 50 时可达 400,000 条/秒。这比 ActiveMQ 高几个数量级,比 RabbitMQ 至少高两倍。

Kafka 性能更高的原因包括:

  1. Kafka Producer 当前不等待 Broker 确认,尽可能快速发送消息,这显著提高了吞吐量。批量为 50 时,单个 Kafka Producer 几乎饱和了生产者与 Broker 之间的 1Gb 链路。这在日志聚合场景中是合理优化,因为数据必须异步发送以避免对实时流量产生延迟。未确认时无法保证每条消息都被 Broker 接收,但对于大多数日志数据,可以以较小的消息丢失换取吞吐量。未来我们计划针对关键数据解决持久化问题。
  2. Kafka 的存储格式更高效。平均每条消息开销为 9 字节,而 ActiveMQ 为 144 字节。ActiveMQ 为存储相同 1000 万条消息多消耗了约 70% 空间,原因包括 JMS 消息头开销以及维护索引结构的成本。ActiveMQ 最繁忙的线程大部分时间花在访问 B-Tree 维护消息元数据和状态。
  3. 批量发送显著提高吞吐量,通过摊薄 RPC 开销。Kafka 批量大小为 50 条时,吞吐量几乎提高一个数量级。

Consumer 测试

第二个实验测试消费者性能。所有系统使用单个消费者,获取总共 1000 万条消息。配置每次拉取大致相同数据量——最多 1000 条或约 200KB。ActiveMQ 和 RabbitMQ 消费者设置为自动确认模式。由于所有消息都能放入内存,系统都从底层文件系统的页面缓存或内存缓冲区提供数据。结果如图 5 所示。

平均而言,Kafka 消费 22,000 条/秒,是 ActiveMQ 和 RabbitMQ 的 4 倍以上。原因包括:

  1. Kafka 存储格式更高效,传输的数据量更少;
  2. ActiveMQ 和 RabbitMQ 的 Broker 需要维护每条消息的投递状态;
  3. ActiveMQ 的线程在测试中忙于将 KahaDB 页面写入磁盘,而 Kafka Broker 没有磁盘写操作;
  4. Kafka 使用 sendfile API,降低了传输开销。

kafka的缺陷

1. 跨分区无法保证全局顺序

  • 问题:Kafka 只保证 同一分区内顺序,不同分区的消息顺序无法保证。
    • 示例:Producer 发送 m1=购物 到 partition1,m2=结算 到 partition2 → 消费者可能先消费 m2 再消费 m1,导致业务逻辑错乱。
  • 原因:Kafka 分区独立存储、并行处理以提高吞吐量。
  • 系统设计时应该重点考虑跨分区可能存在的全局顺序问题。

解决方案

  1. 同 key 消息走同分区
    • 通过 key(例如用户 ID)保证相关消息落在同一分区。
  2. 应用层顺序控制
    • 消费端使用缓存/序号等待机制,保证业务顺序。
  3. 事务消息(Kafka 0.11+)
    • Producer 使用事务发送多条消息,保证原子提交,但跨分区顺序仍需应用层处理。

2. 没有内置死信队列(DLQ)

  • 问题:消费者处理失败的消息不会自动移入死信队列,需要自己处理。
  • 原因:Kafka 的设计哲学是存储+分发,消费逻辑由应用负责。

解决方案

  1. 应用层实现 DLQ
    • 消费失败的消息写入专门 topic(例如 topic.DLQ)。
  2. Kafka Connect / Kafka Streams 支持 DLQ
    • Connector 可以自动把处理失败的消息写入 DLQ topic。
  3. Retry Topic + DLQ
    • 构建多层重试 topic → 最终失败写入 DLQ,控制消费顺序和延迟。

3. 至少一次投递(可能重复消费)

  • 问题:如果消费者 crash 或处理失败,Kafka 会重新发送消息 → 可能重复消费。
  • 原因:Kafka 依赖 offset 提交,而不是 ACK。

解决方案

  1. 幂等消费
    • 消费端确保重复处理不会影响结果。
  2. 去重逻辑
    • 使用消息唯一 ID 或 offset 做去重。
  3. 事务消息
    • Kafka Producer/Consumer 支持 exactly-once semantics(EOS),保证事务性消费。

4. 消息延迟感知受限

  • Kafka 是 拉取(pull)模型,消费者拉取消息频率影响延迟。
  • 解决方案
    • 调整消费者拉取批次大小和轮询间隔(fetch.min.bytes, fetch.max.wait.ms)。
    • 高延迟敏感场景可以用 push 模式 MQ 或 Flink/Kafka Streams 做实时处理。

5. 单分区性能受限

  • 问题:一个分区只能由一个消费者消费 → 单分区吞吐有限。
  • 解决方案
    • 采用key对热点分区进行平衡。
    • 注:实际上一个consumer对应一个partition,不应该采用多个consumer并行消费一个partition,这可能导致潜在的单分区顺序问题。

6. 存储依赖 Broker 磁盘【已解决】

  • Broker 磁盘损坏 → 未消费消息可能丢失。
  • 解决方案
    • 多副本机制(Replication Factor > 1)
    • 生产者 ACK 配置 (acks=all) 确保消息写入所有副本。

7.重复生产【已解决】

在 0.11.0.0 之前,如果生产者未能收到指示消息已提交的响应,它别无选择,只能重新发送消息。这提供了至少一次传递语义,因为如果原始请求实际上已成功,则在重新发送期间消息可能会再次写入日志。从0.11.0.0开始,Kafka生产者还支持幂等传递选项,保证重新发送不会导致日志中出现重复条目。为了实现这一点,代理为每个生产者分配一个 ID,并使用生产者随每条消息发送的序列号来删除重复的消息。同样从 0.11.0.0 开始,生产者支持使用类似事务的语义将消息发送到多个主题分区的能力:即,要么所有消息都成功写入,要么没有一条消息成功写入。

未来的计划(原论文均已实现)

内置消息复制

  • 计划在多个 Broker 之间增加消息的内置复制,以在不可恢复的机器故障情况下保证 数据持久性和可用性
  • 希望支持 异步和同步复制 模型,让应用在生产者延迟和保证强度之间做权衡。
  • 应用可以根据对持久性、可用性和吞吐量的需求选择合适的冗余级别。

流处理能力

  • 在 Kafka 中增加流处理功能。
  • 实时应用在获取消息后,通常会执行类似 窗口统计、或者 与二级存储或其他流的消息做 join 的操作。
  • 在最低层,通过在发布时 按 join key 对消息进行语义分区,保证所有具有相同 key 的消息进入同一分区,从而到达同一个消费者进程。
  • 这为在消费者集群上处理分布式流奠定了基础。
  • 在此基础上,我们认为提供一套 流处理工具库(如各种窗口函数或 join 技术)对应用非常有帮助。

上面的计划在kafka3.x之后已经实现,分别为(1)副本机制,(2)流和表的转换,可以实现使用Ksql来查找指定的消息。

  1. 在4.0后完全移除对ZK的支持,默认KRaft作为元数据管理协议。‘


Kafka
https://yicizhang00.github.io/posts/消息队列/kafka/
作者
Yici Zhang
发布于
2025年8月12日
许可协议