Kafka教程
Last Update:
本教程将指导您完成从零开始在类 RHEL (Red Hat, CentOS, Rocky Linux 等) 系统上安装和配置 Zookeeper 与 Kafka 的全过程。
与许多过时的教程不同,本指南将:
- 解决实际问题:包括处理
wget、java等依赖、JAVA_HOME环境变量以及棘手的sudo权限问题。 - 使用现代命令:完全采用 Kafka 2.8+ 推荐的、基于 Broker 的管理命令(
--bootstrap-server),而不是废弃的--zookeeper标记。 - 深入配置:不仅仅是“复制粘贴”。我们将把您提供的“知识”部分(第 3 章生产者、第 4 章消费者)完全融入教程中,详细解释每一个关键配置参数的含义和“为什么”。
- 解释核心概念:在您开始配置之前,我们将首先深入探讨“序列化”等 Kafka 核心概念,让您知其所以然。
第一部分:核心前置知识(The “Why”)
在敲下第一行命令之前,我们必须理解我们正在构建的系统的核心概念。
1.1 背景:为何 Kafka 仍(暂时)需要 Zookeeper?
您提供的教程背景知识非常到位:
在较新的 Kafka 版本(2.8.0+)中,引入了基于 KRaft 协议的无 Zookeeper 模式,这已成为未来的趋势。但在 KRaft 成为主流并完全稳定之前,以及在许多现有系统中,Kafka 依然依赖 Zookeeper 来管理和协调集群。Zookeeper 负责存储集群的元数据(如 broker、主题、分区信息)和消费者状态等关键信息。因此,一个稳定运行的 Zookeeper 服务是传统 Kafka 集群正常工作的前提。
本教程将构建的就是这种最广泛使用的、基于 Zookeeper 的 Kafka 集群。
1.2 Kafka 核心概念速览
- Broker:一台 Kafka 服务器。多个 Broker 组成了 Kafka 集群。
- Topic(主题):消息的类别。是 Kafka 中数据组织的基本单位。
- Partition(分区):Topic 的物理分组。一个 Topic 可以分为多个 Partition,分布在不同的 Broker 上。这是 Kafka 实现高吞吐量和水平扩展的关键。
- Producer(生产者):向 Kafka Topic 发送(写入)消息的客户端。
- Consumer(消费者):从 Kafka Topic 读取消息的客户端。
- Consumer Group(消费者组):一个或多个 Consumer 组成的逻辑分组。一个 Topic 的同一个 Partition,在同一时间只能被一个 Consumer Group 内的一个 Consumer 消费,这保证了消息的有序消费和负载均衡。
1.3 核心前置知识:序列化 (Serialization)
这是您提供的知识中最关键、也最容易被忽视的部分。在配置生产者/消费者之前,必须理解它。
1. 序列化到底是什么? (What is Serialization?)
想象一下,你在程序里创建了一个对象,比如一个用户信息对象,它在内存里包含了用户名(字符串)、年龄(整数)、注册时间(日期对象)等信息。现在,你想通过网络把这个用户信息发送给另一个系统(比如 Kafka Broker),或者把它存到硬盘上。
问题来了:网络和硬盘只能理解和传输字节流(一串 0 和 1),它们完全不认识你程序中的“对象”这个概念。
序列化(Serialization) 就是将内存中的数据结构或对象,转换成可以存储或传输的格式(通常是字节序列)的过程。
反序列化(Deserialization) 则是相反的过程,即将字节序列恢复为内存中的原始数据结构或对象。
简单来说,序列化就是为你的数据“打包”,方便运输和存储;反序列化就是“拆开包裹”,恢复数据。
2. 为什么 Kafka 强制要求使用序列化器?
Kafka 是一个分布式的消息系统。它的核心工作流程如下:
- 生产者 (Producer):在你的应用程序中,将消息(记录)发送到 Kafka Broker。
- Broker:Kafka 服务器,负责接收、存储和转发消息。
- 消费者 (Consumer):从 Kafka Broker 拉取消息,并在你的另一个应用程序中进行处理。
在这个过程中,消息至少要经过两次网络传输(Producer -> Broker, Broker -> Consumer)和一次磁盘存储(在 Broker 上)。如上所述,这些环节都只认字节。
因此,Kafka 强制要求:
- 生产者在发送数据前,必须配置一个序列化器 (Serializer),将键(key)和值(value)从它们的对象形式转换为字节数组 (byte[])。
- 消费者在接收数据后,必须配置一个反序列化器 (Deserializer),将从 Broker 收到的字节数组 (byte[]) 转换回它原本的对象形式,以便程序处理。
3. Kafka 内置的序列化器及其局限性
Kafka 自带了一些开箱即用的序列化器,位于 org.apache.kafka.common.serialization 包下:
StringSerializer: 用于序列化字符串。IntegerSerializer: 用于序列化整数。LongSerializer: 用于序列化长整型。ByteArraySerializer: 用于序列化字节数组(实际上它什么也不做,直接返回原始字节数组)。
局限性: 这些内置的序列化器只能处理非常简单的数据类型。在实际业务中,我们发送的消息通常是复杂的业务对象,比如订单、用户信息、日志事件等。这时,我们就需要自定义序列化器。
4. 推荐方案:使用 Avro 和 Schema Registry
您提供的资料中还提到了 JSON 作为自定义序列化方案,这是一个很好的起点。但它有几个缺点:
- 冗余信息:JSON 格式中包含大量的键名,浪费存储和网络带宽。
- 无 Schema 约束:JSON 本身不强制数据结构。生产者可以随意修改字段,导致消费者崩溃。
- Schema 演进困难:当业务发展,需要增删字段时,保证新旧版本兼容非常棘手。
为了解决这些问题,社区推荐使用 Apache Avro 配合 Schema Registry。
- Avro 是一个数据序列化系统。它使用基于 JSON 的 Schema (模式) 来定义数据结构,并将数据序列化成非常紧凑的二进制格式。
- Schema Registry 是一个独立的、管理所有 Avro Schema 的服务。
工作流程:
- 生产者将 Schema 注册到 Schema Registry,获得一个唯一的 Schema ID。
- 生产者使用 Avro 将对象序列化成二进制数据,并在消息中只包含这个 Schema ID 和二进制数据。
- 消费者读到消息,提取出 Schema ID。
- 消费者用这个 ID 去 Schema Registry 拉取完整的 Schema(如果本地没有缓存的话)。
- 消费者使用该 Schema 将二进制数据反序列化成对象。
巨大优势:
- 高效存储:二进制格式非常紧凑,Schema 不随每条消息发送,极大节省空间。
- 强数据一致性:所有数据都必须符合预先定义的 Schema。
- 强大的 Schema 演进:Avro 制定了详细的兼容性规则。你可以安全地为 Schema 添加可选字段等,同时保证新旧版本的生产者和消费者之间不会中断服务。
结论:对于学习,使用
StringSerializer即可。对于生产,强烈建议直接采用 Avro + Schema Registry 的方案。
第二部分:安装和配置 Zookeeper (3.8.x)
现在,我们开始动手实践。我们将严格按照您提供的“更新版”教程步骤进行。
第 1 步:准备工作 - 安装必要工具
在现代的 RHEL 系统(如 RHEL 8/9, Rocky Linux 8/9)上,wget 可能没有预装。
命令解释:
sudo:以管理员(root)权限执行命令。安装软件需要系统权限。dnf:是 RHEL 8+ 上的新一代包管理器(替代了yum)。install wget:告诉dnf我们要安装wget这个工具。wget用于从网络上下载文件。
执行:
1
2
3[qd@qd ~]$ sudo dnf install wget
# [输入您的用户密码]
# ... 等待安装完成 ...
第 2 步:安装和配置 Java 环境 (关键依赖)
Zookeeper 和 Kafka 都是用 Java 编写的,必须在 Java 虚拟机(JVM)上运行。您在日志中遇到的 Error: JAVA_HOME is not set 是一个经典错误。
命令解释与执行:
安装 Java:我们安装 OpenJDK 17,这是一个长期支持(LTS)版本,与 ZK 3.8+ 和新版 Kafka 兼容良好。
1
[qd@qd ~]$ sudo dnf install openjdk-17-jre查找 Java 安装路径:我们需要设置
JAVA_HOME环境变量,它告诉系统 Java 的“家”在哪里。1
2[qd@qd ~]$ readlink -f $(which java)
# 假设输出: /usr/lib/jvm/java-17-openjdk-17.0.x.x-x.el8.x86_64/bin/java设置永久环境变量:我们将其写入
~/.bashrc文件,这样每次登录时都会自动生效。JAVA_HOME的路径是上面查找到的路径去掉末尾的/bin/java。
1
[qd@qd ~]$ vi ~/.bashrc在文件末尾添加以下两行 (按
G到末尾,按o新起一行):1
2
3
4#
# 设置 JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-17.0.x.x-x.el8.x86_64
export PATH=$PATH:$JAVA_HOME/bin(请将路径替换为您上一步查到的真实路径)
保存退出 (按
Esc,输入:wq,回车),然后让配置立即生效:1
[qd@qd ~]$ source ~/.bashrc验证配置:
1
2
3
4
5[qd@qd ~]$ echo $JAVA_HOME
/usr/lib/jvm/java-17-openjdk-17.0.x.x-x.el8.x86_64
[qd@qd ~]$ java --version
# 应该能看到 Java 17 的版本信息
第 3 步:下载并解压 Zookeeper
注意:务必下载带有
-bin标志的二进制包 (binary)。源码包 (src) 无法直接运行。命令解释与执行:
1
2
3
4
5
6
7
8# 1. 使用 wget 下载 (以 3.8.5 为例)
[qd@qd ~]$ wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.5/apache-zookeeper-3.8.5-bin.tar.gz
# 2. 解压
# -z: 处理 .gz 压缩
# -x: 解压 (extract)
# -f: 指定文件名 (file)
[qd@qd ~]$ tar -zxf apache-zookeeper-3.8.5-bin.tar.gz
第 4 步:安装 Zookeeper 并创建数据目录
我们将 Zookeeper 安装在 /usr/local (用于存放用户安装的软件),并将其数据存放在 /var/lib (用于存放易变的应用状态数据)。
命令解释与执行:
1
2
3
4
5
6
7
8# 1. 移动并重命名文件夹
# /usr/local 是系统目录,必须用 sudo
[qd@qd ~]$ sudo mv apache-zookeeper-3.8.5-bin /usr/local/zookeeper
# 2. 创建 ZK 数据目录
# /var/lib 也是系统目录
# -p: 确保父目录不存在时也一并创建
[qd@qd ~]$ sudo mkdir -p /var/lib/zookeeper
第 5 步:配置 Zookeeper (zoo.cfg)
ZK 的配置非常简单。
命令解释与执行:
1
2
3
4
5
6
7
8# 1. 进入 ZK 配置目录
[qd@qd ~]$ cd /usr/local/zookeeper/conf/
# 2. 复制官方示例配置 (需要 sudo)
[qd@qd conf]$ sudo cp zoo_sample.cfg zoo.cfg
# 3. 编辑新配置文件 (需要 sudo)
[qd@qd conf]$ sudo vi zoo.cfg配置文件内容:
确保zoo.cfg文件中,dataDir的路径被正确修改,指向我们在上一步创建的目录。其他可以暂时保持默认。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# Zookeeper 中时间的基本单位,毫秒
tickTime=2000
# ZK follower 连接和同步 leader 的初始时限
initLimit=10
# ZK follower 和 leader 之间请求和应答的时限
syncLimit=5
# !!!!这是唯一必须修改的配置!!!!
# 数据目录,必须与第 4 步创建的目录一致
dataDir=/var/lib/zookeeper
# 客户端连接 Zookeeper 的端口
clientPort=2181
第 6 步:解决权限问题并启动 Zookeeper
这是您在日志中遇到的 FAILED TO WRITE PID 权限错误。
问题根源:
/var/lib/zookeeper目录是我们用sudo创建的,其所有者是root用户。当我们尝试以普通用户qd启动 ZK 时,qd用户无权在该目录下写入数据(包括 PID 文件和快照)。解决方案 A (不推荐):
sudo /usr/local/zookeeper/bin/zkServer.sh start- 这能启动,但是以 root 身份运行服务是一个严重的安全隐患。
解决方案 B (推荐做法):更改目录所有权
- 我们将数据目录的所有权交给将要运行服务的用户(这里是
qd)。
1
2
3
4
5
6
7
8# 1. 将 /var/lib/zookeeper 目录及其下所有文件 (-R) 的所有者和所属组都更改为 qd。
[qd@qd ~]$ sudo chown -R qd:qd /var/lib/zookeeper
# 2. 完成后,就可以用普通用户身份直接启动了。
[qd@qd ~]$ /usr/local/zookeeper/bin/zkServer.sh start
# JMX enabled by default
# Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
# Starting zookeeper ... STARTED- 我们将数据目录的所有权交给将要运行服务的用户(这里是
第 7 步:验证 Zookeeper 服务
方法 1: 使用
jps(Java Process Status)jps可以列出当前用户的所有 Java 进程。1
2
3
4[qd@qd ~]$ jps
# 您应该能看到一个名为 QuorumPeerMain 的进程 (ZK 的主进程)
12345 QuorumPeerMain
12346 Jps方法 2: 使用四字命令 (Four Letter Words)
Zookeeper 监听一个端口,并能响应一组特殊的四字诊断命令。srvr是其中之一。1
2
3
4
5
6
7
8
9
10
11# 1. 安装 nc (Netcat),它比 telnet 更好用
[qd@qd ~]$ sudo dnf install nmap-ncat
# 2. 发送 "srvr" 命令到 2181 端口
[qd@qd ~]$ echo srvr | nc localhost 2181
# 3. 查看返回信息
Zookeeper version: 3.8.5-19f1842d4b6ee1f82cc8b05284959b26c6ae507d, built on 2025-09-03 21:35 UTC
...
Mode: standalone # 看到这个,表示是单机模式,成功!
Node count: 4
第三部分:安装和配置 Kafka
Zookeeper 基础打好了,现在我们来安装 Kafka。
第 1 步:下载并解压 Kafka
关键陷阱:您在日志中也遇到了这个
Classpath is empty错误。- 不要下载
kafka-3.x.x-src.tgz(源码包) - 必须下载
kafka_2.13-3.x.x.tgz(二进制包)。2.13是 Scala 版本号,3.x.x是 Kafka 版本号。
- 不要下载
命令解释与执行:
1
2
3
4
5# 1. 下载 Kafka (以 3.9.1 为例)
[qd@qd ~]$ wget https://dlcdn.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
# 2. 解压
[qd@qd ~]$ tar -zxf kafka_2.13-3.9.1.tgz
第 2 步:安装 Kafka
与 Zookeeper 类似,我们将 Kafka 安装到 /usr/local。
命令解释与执行:
1
2# 移动并重命名 (需要 sudo)
[qd@qd ~]$ sudo mv kafka_2.13-3.9.1 /usr/local/kafka
第 3 步:配置 Kafka (server.properties)
这是 Kafka Broker 的核心配置文件。您提供的 server.properties 内容非常适合单机测试。
命令解释与执行:
1
2
3
4
5
6
7
8
9
10
11# 1. 进入 Kafka 配置目录
[qd@qd ~]$ cd /usr/local/kafka/config/
# 2. 编辑配置文件 (我们用普通用户 qd 启动,先确保我们有权限)
# 更改 /usr/local/kafka 目录的所有权为 qd (生产环境会有专门的 service-account)
[qd@qd config]$ cd /usr/local
[qd@qd local]$ sudo chown -R qd:qd kafka
# 3. 现在可以不用 sudo 编辑了
[qd@qd local]$ cd kafka/config
[qd@qd config]$ vi server.properties核心配置详解:
打开server.properties文件,确保以下几项配置正确。服务器基础 (Server Basics)
1
2
3# broker.id 在一个 Kafka 集群中必须是唯一的整数。
# 对于单节点测试,0 即可。在集群中,每个 broker 都必须有不同的 ID。
broker.id=0网络与套接字设置 (Socket Server Settings)
1
2
3
4
5
6# Kafka 服务监听的地址和端口。
# PLAINTEXT 是监听器名称,localhost:9092 是监听的主机和端口。
# 这意味着只有在本机上才能连接到 Kafka。
# 警告:如果您的生产者/消费者在另一台机器上,必须将 localhost 改为 0.0.0.0
# 或该主机的内网/外网 IP 地址。
listeners=PLAINTEXT://localhost:9092日志基础 (Log Basics)
1
2
3
4
5
6
7# Kafka 存储 topic 数据的目录。
# 警告:/tmp/kafka-logs 是默认的,但 /tmp 目录在系统重启时可能会被清空!
# 生产环境必须改为一个持久化目录,例如 /var/lib/kafka-logs
log.dirs=/tmp/kafka-logs
# 新建 topic 时默认的分区数量。
num.partitions=1内部主题设置 (Internal Topic Settings) - [单机关键]
1
2
3
4
5
6
7
8# Kafka 内部用于存储消费者位移(__consumer_offsets)的主题的副本因子。
# 在单节点环境中,这个值【只能是 1】。
# 如果是 3 (默认值),Kafka 会因找不到另外两个副本而无法启动。
offsets.topic.replication.factor=1
# 事务状态日志的副本因子,同上,单机必须为 1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1Zookeeper 连接 (Zookeeper) - [核心]
1
2
3
4
5
6
7# 这是最关键的配置之一!
# 指定 Zookeeper 集群的地址。
# 这里的 localhost:2181 必须与我们第二部分中 Zookeeper 的 clientPort 配置一致。
zookeeper.connect=localhost:2181
# 连接 Zookeeper 的超时时间(毫秒)。
zookeeper.connection.timeout.ms=18000
第 4 步:创建 Kafka 日志目录
根据 server.properties 中的 log.dirs 配置,我们需要手动创建该目录。
命令解释与执行:
1
2
3
4
5
6
7
8
9
10# 创建在配置文件中指定的日志目录
# 因为我们上一步把 /usr/local/kafka 给了 qd 用户,
# 我们也应该把数据目录 /tmp/kafka-logs (或 /var/lib/kafka-logs) 权限给 qd
# 1. 创建目录 (以 /tmp 为例)
[qd@qd ~]$ mkdir /tmp/kafka-logs
# (推荐) 如果您使用了 /var/lib/kafka-logs,请执行:
# [qd@qd ~]$ sudo mkdir /var/lib/kafka-logs
# [qd@qd ~]$ sudo chown -R qd:qd /var/lib/kafka-logs
第 5 步:启动 Kafka 服务器
一切准备就绪,现在我们可以启动 Kafka 了。
命令解释与执行:
1
2
3
4# /usr/local/kafka/bin/kafka-server-start.sh 是 Kafka 的启动脚本。
# -daemon 参数会让 Kafka 在后台作为守护进程运行。
# 最后一个参数是我们的配置文件路径。
[qd@qd ~]$ /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
第 6 步:验证 Kafka 服务
等待片刻,让 Kafka 完成启动。
使用
jps查看 Java 进程:
您应该能同时看到 Zookeeper 的QuorumPeerMain和 Kafka 的Kafka进程。1
2
3
4[qd@qd ~]$ jps
12345 Kafka # Kafka Broker 进程
54321 QuorumPeerMain # Zookeeper 进程
... Jps检查 Kafka 日志 (排错关键):
如果启动失败,最重要的排错信息通常在日志文件的末尾。1
2# 查看 Kafka 服务端日志
[qd@qd ~]$ tail -f /tmp/kafka-logs/server.log如果看到
[KafkaServer id=0] started类似的信息,说明 Kafka 已成功启动!
第四部分:Kafka 核心操作 (使用新版命令)
恭喜!您的 Kafka + Zookeeper 环境已成功运行。现在我们来测试它。
重要:命令演进 (The “New Way”)
过去,Kafka 的管理脚本 (如
kafka-topics.sh) 依赖 Zookeeper (使用--zookeeper参数) 来读写元数据。从 Kafka 2.8+ 开始,Kafka 社区大力推动将 Zookeeper 变为内部实现细节。所有管理命令都应通过 Kafka Broker 自己的 API 来执行。
因此,旧的
--zookeeper参数已被废弃,您必须使用--bootstrap-server来指向 Kafka Broker (e.g.,localhost:9092)。您提供的“新版命令”是完全正确的,我们将使用它。
1. 创建并验证主题 (Topic)
创建名为
test的主题:1
2
3
4
5# 注意:这里是关键变化,使用 --bootstrap-server 代替 --zookeeper
[qd@qd ~]$ /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 成功后会看到提示:
# Created topic "test".查看
test主题的详细信息:1
2
3
4
5[qd@qd ~]$ /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
# 输出结果:
# Topic: test TopicId: [ID] PartitionCount: 1 ReplicationFactor: 1 ...
# Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2. 发布消息 (Produce)
启动控制台生产者:
1
2# 注意:参数名从 --broker-list 统一为 --bootstrap-server
[qd@qd ~]$ /usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test输入消息并发送:
启动后,您会看到一个>提示符。1
2
3>Hello Kafka
>This is the new way
>...(输入完成后,按
Ctrl+C退出生产者,不要按 Ctrl+D,新版 Kafka 可能有 bug)
3. 读取消息 (Consume)
启动控制台消费者 (在新的终端窗口):
打开一个新的 SSH 会话或终端标签页,登录到您的服务器。1
2
3# 注意:消费者也使用 --bootstrap-server 连接
# --from-beginning: 确保我们能读到历史消息,而不是只读新消息
[qd@qd ~]$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning查看消费结果:
你将会在终端看到刚才发送的两条消息:1
2Hello Kafka
This is the new way(按
Ctrl+C停止消费者)
第五部分:深入 Kafka 生产者 (Producer)
您已经使用 kafka-console-producer.sh 成功发送了消息。但
在真实的应用中,您会使用 Java、Python 或 Go 的客户端来编写生产者程序。
此时,您提供的“第 3 章 知识”就变得至关重要。
3.2 创建 Kafka 生产者 (程序中)
在 Java 程序中创建生产者时,您需要提供一个 Properties 对象。其中有 3 个必选属性。
bootstrap.servers- 解释:指定 broker 的地址清单,格式为
host:port。 - 详细:清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。(对于我们的单机测试,
localhost:9092即可)。
- 解释:指定 broker 的地址清单,格式为
key.serializer- 解释:键(Key)的序列化器。
- 详细:Broker 希望接收到的消息的键和值都是字节数组。
key.serializer必须被设置为一个实现了Serializer接口的类。生产者会使用这个类把键对象序列化成字节数组。 - 参见第一部分:这就是我们详细讨论过的“序列化”。如果您使用
String作为键,这里就设置为org.apache.kafka.common.serialization.StringSerializer。
value.serializer- 解释:值(Value)的序列化器。
- 详细:与
key.serializer一样,value.serializer指定的类会将值序列化。如果您发送String消息,这里也设置为org.apache.kafka.common.serialization.StringSerializer。
3.4 生产者的关键配置详解
以下是您提供的知识库中,对生产者性能、可靠性、内存影响最大的参数。
01. acks (数据持久性)
该参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
acks=0:“发后即忘”。生产者在成功写入消息之前不会等待任何来自服务器的响应。如果服务器没有收到消息(例如 Broker 宕机),生产者无从得知,消息会丢失。- 优点:极高的吞吐量和极低的延迟。
- 缺点:零持久性保证。
acks=1:“Leader 确认” (默认值)。只要集群的 Leader 节点收到消息,生产者就会收到一个来自服务器的成功响应。- 优点:良好的吞吐量和延迟。
- 缺点:如果 Leader 节点刚收到消息但还未复制给 Follower 节点就宕机了,那么新选举出的 Leader 上并没有这条消息,消息依然会丢失。
acks=all(或acks=-1):“全员确认”。只有当所有参与复制的节点(ISR - In-Sync Replicas)全部收到消息时,生产者才会收到一个来自服务器的成功响应。- 优点:最强的持久性保证。只要至少还有一个副本存活,消息就不会丢失。
- 缺点:延迟最高,吞吐量最低。
02. buffer.memory (内存管理)
- 解释:该参数用来设置生产者客户端内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
- 详细:如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。默认 32MB。
03. compression.type (网络效率)
- 解释:默认情况下,消息发送时不会被压缩。该参数可以设置为
snappy、gzip、lz4或zstd。 - 详细:
snappy:由 Google 发明,占用较少的 CPU,提供较好的性能和相当可观的压缩比。gzip:占用较多的 CPU,但会提供更高的压缩比。
- 权衡:压缩会消耗客户端 CPU,但能极大降低网络传输开销和 Broker 存储开销。在生产环境中强烈推荐开启(例如
snappy)。
04. retries (可靠性)
- 解释:生产者从服务器收到的错误有可能是临时性的(比如分区找不到首领)。
retries参数的值决定了生产者可以重发消息的次数。 - 详细:默认值很大 (2147483647),配合
delivery.timeout.ms(默认 2 分钟) 来控制总重试时间。 - 陷阱:重试可能导致消息重复和消息乱序。
- 重复:生产者发送消息 A,Leader 收到并写入,但在响应生产者之前宕机了。生产者认为失败,重试,新 Leader 再次写入消息 A,导致重复。
- 乱序:生产者发送消息 A (失败)、B (成功)、A (重试成功)。最终顺序变为 B, A。
- 解决方案:设置
enable.idempotence=true(幂等生产者),可解决重复和乱序问题(要求acks=all且retries > 0)。
05. batch.size (吞吐量)
- 解释:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小(字节数)。
- 详细:默认 16KB。批次被填满后,消息会被发送。但生产者不一定都会等到批次被填满。
06. linger.ms (延迟)
- 解释:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。
- 详细:默认 0ms。这意味着只要有可用的线程,生产者就会立即把消息发送出去,即使批次里只有一个消息。
- 调优:
batch.size和linger.ms是提升吞吐量的关键。- 将
linger.ms设置成一个大于 0 的数(例如 5ms 或 20ms),可以以微小的延迟为代价,极大地提升吞吐量。 - 这使得生产者有时间“稍等片刻”,收集更多的消息组成一个大批次 (Batch),从而减少了网络请求次数,提高了效率。
- 将
07. client.id
- 解释:任意的字符串,服务器会用它来识别消息的来源,用在日志、监控和配额指标里。建议设置成有意义的名称(如 “payment-service-producer”)。
08. max.in.flight.requests.per.connection
- 解释:生产者在收到服务器响应之前可以发送多少个消息(批次)。
- 详细:默认 5。如果设为 1,可以保证消息是按照发送的顺序写入服务器的(即使发生重试),但会严重影响吞吐量。
- 注意:启用幂等性 (
enable.idempotence=true) 时,此值必须小于等于 5,并且能自动保证顺序。
09. max.request.size
- 解释:控制生产者发送的请求大小,即单个批次的总大小。默认 1MB。
- 注意:Broker 端也有一个
message.max.bytes(默认 1MB) 的配置。生产者的这个值不能大于 Broker 的值,否则消息将被拒绝。
第六部分:深入 Kafka 消费者 (Consumer)
您已经使用 kafka-console-consumer.sh 成功接收了消息。但真实的消费者要复杂得多,它需要处理消费者组、分区再均衡、偏移量提交等。
您提供的“第 4 章 知识”为我们提供了深入理解的基础。
4.2 创建 Kafka 消费者 (程序中)
创建消费者同样需要一个 Properties 对象,有 4 个关键属性。
bootstrap.servers- 解释:与生产者一样,指向
localhost:9092。
- 解释:与生产者一样,指向
key.deserializer- 解释:键(Key)的反序列化器。
- 详细:必须与生产者
key.serializer配对使用。如果生产者用StringSerializer,消费者必须用StringDeserializer。
value.deserializer- 解释:值(Value)的反序列化器。
- 详细:必须与生产者
value.serializer配对使用。
group.id- 解释:消费者最重要的配置。它指定了 KafkaConsumer 属于哪一个消费者群组。
- 详细:
- 同一个
group.id下的所有消费者,会分摊它们所订阅的 Topic 的所有分区。 - 一个分区最多只能被一个组内的消费者消费。
- 如果
group.id不同,那么它们是两个独立的组,会各自收到一份完整的消息(实现了“发布-订阅”模式)。 - 如果
group.id相同,它们是一个组,会分担消息处理(实现了“工作队列”模式)。
- 同一个
4.3 订阅主题
consumer.subscribe(Collections.singletonList("test"));- 解释:订阅一个或多个固定的主题。
consumer.subscribe("test.*");- 解释:使用正则表达式订阅所有与
test相关的主题。如果集群中新增了一个test-new主题,消费者会自动发现并开始消费。
- 解释:使用正则表达式订阅所有与
4.5 消费者的关键配置详解
以下是您提供的知识库中,对消费者性能、可用性、消息语义影响最大的参数。
01. fetch.min.bytes (网络效率)
- 解释:该属性指定了消费者从服务器获取记录的最小字节数。默认 1 字节。
- 详细:Broker 在收到消费者的数据请求时,如果可用的数据量小于此值,那么它会等待,直到有足够的可用数据时才返回给消费者(参见下一个参数)。
- 调优:如果 Topic 不活跃,消费者会频繁地拉取空数据,浪费 CPU 和网络。适当调高此值(例如 1KB 或更高)可以降低消费者和 Broker 的负载,实现高效的“长轮询”。
02. fetch.max.wait.ms (延迟)
- 解释:与
fetch.min.bytes配合使用。指定 Broker 的最大等待时间,默认 500ms。 - 详细:Kafka 在收到消费者的请求后,要么返回
fetch.min.bytes大小的数据,要么在等待fetch.max.wait.ms毫秒后返回所有可用的数据,看哪个条件先满足。 - 权衡:
fetch.min.bytes和fetch.max.wait.ms共同定义了长轮询。增加fetch.max.wait.ms会增加延迟,但能极大降低空轮询的系统开销。
03. max.partition.fetch.bytes (内存管理)
- 解释:该属性指定了服务器从每个分区里返回给消费者的最大字节数。默认 1MB。
- 详细:如果一个消费者订阅了 10 个分区,它在一次
poll()中最多可能收到 10MB 的数据。您需要确保为消费者分配了足够的内存。
04. session.timeout.ms (可用性 - 关键)
- 解释:该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认 45 秒 (新版)。
- 详细:如果消费者没有在
session.timeout.ms指定的时间内发送心跳 (Heartbeat)给群组协调器,协调器就认为它已经死亡,会触发再均衡 (Rebalance),把它负责的分区分配给群组里的其他消费者。
heartbeat.interval.ms (可用性 - 关键)
- 解释:该属性指定了
poll()方法向协调器发送心跳的频率,默认 3 秒。 - 规则:
heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。 - 陷阱:如果消费者处理单条消息(或一批消息)的时间过长,超过了
session.timeout.ms,它将无法发送心跳,导致被误判为“死亡”,引发不必要的再均衡。 - 解决方案:
- 延长
session.timeout.ms。 - 缩短消息处理时间(例如使用异步处理)。
- 减小
max.poll.records(见下)。
- 延长
05. auto.offset.reset (新消费者 - 关键)
- 解释:该属性指定了消费者在一个新的
group.id(即没有历史偏移量记录)或者偏移量无效(例如数据已被删除)的情况下,该作何处理。 latest(默认值):消费者将从最新的记录开始读取数据(即在消费者启动之后生成的记录)。earliest:消费者将从起始位置(偏移量 0)读取分区的所有记录。- 常见问题:“为什么我的消费者启动了却收不到消息?”—— 答案多半是因为你是用
latest启动的,而启动后又没有新消息写入。
06. enable.auto.commit (消息语义 - 关键)
解释:指定了消费者是否自动提交偏移量,默认值是
true。详细:如果为
true,消费者会根据auto.commit.interval.ms(默认 5 秒)的频率,在后台自动提交poll()方法返回的最高偏移量。自动提交的陷阱 (At-Most-Once - 至多一次):
- 消费者
poll()了一批消息。 - 后台自动提交了偏移量。
- 消费者在处理这批消息的
过程中崩溃了。 - 消费者重启后,从已提交的偏移量开始,导致丢失了上一批未处理完的消息。
- 消费者
推荐做法:
enable.auto.commit = false- 手动提交偏移量。在您的代码中,当消息真正处理完毕后(例如写入数据库成功后),再调用
consumer.commitSync()或consumer.commitAsync()来手动提交偏移量。 - 这实现了 At-Least-Once (至少一次) 的语义,是生产环境中最常见的做法。
07. partition.assignment.strategy
- 解释:当消费者组内的成员发生变化时(加入、退出、崩溃),决定如何将分区分配给消费者的策略。
Range(默认):该策略会把主题的若干个连续的分区分配给消费者。RoundRobin:该策略把主题的所有分区逐个分配给消费者。
08. max.poll.records
- 解释:该属性用于控制单次调用
poll()方法能够返回的最大记录数量。 - 调优:如果您的消息处理很慢(例如
session.timeout.ms很容易超时),可以调低此值(例如 50 或 100),确保您可以在超时时间内处理完这一小批数据。
第七部分:结论与生产环境考量
您已成功搭建了一个单机版的 Kafka 环境,并深入学习了 Zookeeper、Kafka Broker、生产者和消费者的核心配置。
关于 Zookeeper 集群(Ensemble):
您提供的教程中最后一部分关于 ZK 集群的说明非常重要:
在生产环境中,绝不能使用单点 Zookeeper,因为它会成为整个系统的单点故障。您需要搭建一个由奇数个节点(通常是 3 个或 5 个)组成的 Zookeeper 集群(官方称为 Ensemble),以保证高可用性。
后续步骤:
- 练习:尝试使用 Java (或您熟悉的语言) 编写一个生产者和消费者,并实践“第五部分”和“第六部分”中的配置参数(尤其是
acks,linger.ms,enable.auto.commit=false)。 - 集群化:尝试在一个 3 节点的集群上重复此过程,搭建一个 3 节点的 ZK 集群和 3 节点的 Kafka 集群。
- 探索 KRaft:当您熟悉了 ZK 模式后,可以开始探索 Kafka 最新的 KRaft 模式,它将是未来的标准。