Kafka 基础入门¶
📖 目录¶
1. 什么是 Kafka¶
1.1 定义¶
Apache Kafka 是一个开源的分布式流式处理平台(Distributed Streaming Platform),最初由 LinkedIn 开发,后捐赠给 Apache 基金会。
官方定义
Kafka 是一个分布式、分区化、可复制的提交日志服务(Distributed, Partitioned, Replicated Commit Log Service)
1.2 核心特性¶
- 高吞吐量 - 单机百万级消息/秒,支持水平扩展
- 持久化存储 - 消息持久化到磁盘,支持消息回溯
- 分布式架构 - 天然支持集群,高可用、可扩展
- 流式处理 - 支持流式数据处理,与 Flink、Spark 集成
- 多订阅者 - 支持多个消费者组同时消费同一主题
1.3 应用场景¶
| 场景 | 说明 | 示例 |
|---|---|---|
| 日志收集 | 集中式日志收集和分析 | ELK 栈(Elasticsearch + Logstash + Kibana) |
| 流式处理 | 实时数据流处理 | 实时推荐系统、实时风控 |
| 事件溯源 | 事件驱动架构 | 微服务间的事件通信 |
| 消息总线 | 微服务消息通信 | 服务解耦、异步处理 |
| 指标监控 | 应用指标收集 | Prometheus、Grafana 集成 |
2. 为什么需要 Kafka¶
2.1 传统消息队列的局限性¶
❌ 问题 1:吞吐量不足¶
传统消息队列(如 RabbitMQ)在单机场景下吞吐量有限,难以满足大数据场景的需求。
❌ 问题 2:消息持久化能力弱¶
传统消息队列通常将消息存储在内存中,消息消费后即删除,不支持消息回溯。
❌ 问题 3:扩展性差¶
传统消息队列难以水平扩展,集群管理复杂。
2.2 Kafka 的优势¶
Kafka 的核心优势
- 高吞吐量 - 通过顺序写入磁盘、零拷贝等技术实现高性能
- 持久化存储 - 消息持久化到磁盘,支持按时间、偏移量回溯
- 水平扩展 - 通过增加 Broker 和 Partition 实现水平扩展
- 流式处理 - 原生支持流式数据处理,与大数据生态无缝集成
2.3 Kafka vs 传统消息队列¶
| 对比项 | Kafka | RabbitMQ | RocketMQ |
|---|---|---|---|
| 设计目标 | 大数据流式处理 | 企业级消息队列 | 金融级消息队列 |
| 吞吐量 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 消息持久化 | ✅ 磁盘持久化 | ⚠️ 可选 | ✅ 磁盘持久化 |
| 消息回溯 | ✅ 支持 | ❌ 不支持 | ✅ 支持 |
| 顺序保证 | ✅ Partition 内有序 | ✅ 队列有序 | ✅ 队列有序 |
| 适用场景 | 大数据、日志 | 业务解耦 | 电商、金融 |
3. Kafka 核心架构¶
3.1 整体架构图¶
graph TB
subgraph "Producer 生产者"
P1[Producer 1]
P2[Producer 2]
end
subgraph "Kafka Cluster 集群"
subgraph "Broker 1"
T1[Topic: orders<br/>Partition 0]
T2[Topic: orders<br/>Partition 1]
end
subgraph "Broker 2"
T3[Topic: orders<br/>Partition 0 Replica]
T4[Topic: orders<br/>Partition 1]
end
subgraph "Broker 3"
T5[Topic: orders<br/>Partition 0]
T6[Topic: orders<br/>Partition 1 Replica]
end
ZK[Zookeeper<br/>协调服务]
end
subgraph "Consumer Group 消费者组"
CG1[Consumer Group A<br/>Consumer 1, 2]
CG2[Consumer Group B<br/>Consumer 1]
end
P1 --> T1
P1 --> T2
P2 --> T1
P2 --> T2
T1 -.副本同步.-> T3
T1 -.副本同步.-> T5
T2 -.副本同步.-> T4
T2 -.副本同步.-> T6
CG1 --> T1
CG1 --> T2
CG2 --> T1
CG2 --> T2
ZK -.协调.-> Broker 1
ZK -.协调.-> Broker 2
ZK -.协调.-> Broker 3
3.2 核心组件¶
3.2.1 Broker(代理服务器)¶
定义:Kafka 集群中的每个服务器节点称为 Broker。
职责: - 存储消息(Topic 和 Partition) - 处理 Producer 的写入请求 - 处理 Consumer 的读取请求 - 副本同步和故障转移
特点: - 每个 Broker 都有唯一的 ID(broker.id) - Broker 之间通过 Zookeeper 协调 - 支持水平扩展(增加 Broker 数量)
3.2.2 Topic(主题)¶
定义:消息的逻辑分类,类似于数据库中的表。
特点: - 一个 Topic 可以有多个 Partition(分区) - 多个 Producer 可以向同一个 Topic 发送消息 - 多个 Consumer Group 可以消费同一个 Topic
示例:
3.2.3 Partition(分区)¶
定义:Topic 的物理分割,每个 Partition 是一个有序的消息队列。
特点: - 顺序性:Partition 内的消息是有序的 - 并行性:不同 Partition 可以并行处理 - 扩展性:通过增加 Partition 数量提高吞吐量 - 副本机制:每个 Partition 可以有多个副本(Replica)
分区策略:
3.2.4 Producer(生产者)¶
定义:向 Kafka Topic 发送消息的客户端。
职责: - 创建消息 - 选择目标 Partition - 发送消息到 Broker
关键配置:
- acks - 消息确认机制(0/1/all)
- retries - 重试次数
- batch.size - 批次大小
- linger.ms - 等待时间
3.2.5 Consumer(消费者)¶
定义:从 Kafka Topic 读取消息的客户端。
职责: - 订阅 Topic - 从 Partition 读取消息 - 提交消费偏移量(Offset)
关键配置:
- group.id - 消费者组 ID
- auto.offset.reset - 偏移量重置策略(earliest/latest)
- enable.auto.commit - 是否自动提交偏移量
3.2.6 Consumer Group(消费者组)¶
定义:一组 Consumer 的集合,共同消费一个 Topic。
特点: - 负载均衡:同一个 Consumer Group 内的 Consumer 平均分配 Partition - 并行消费:多个 Consumer Group 可以同时消费同一个 Topic - 故障转移:Consumer 故障时,其 Partition 会分配给其他 Consumer
示例:
Topic: orders (3个 Partition)
Consumer Group A: Consumer1, Consumer2, Consumer3
- Consumer1 -> Partition 0
- Consumer2 -> Partition 1
- Consumer3 -> Partition 2
Consumer Group B: Consumer1
- Consumer1 -> Partition 0, 1, 2 (全部)
3.2.7 Offset(偏移量)¶
定义:Consumer 在 Partition 中的消费位置。
特点:
- 每个 Partition 维护独立的 Offset
- Offset 存储在 Kafka 内部 Topic(__consumer_offsets)或 Zookeeper
- 支持手动提交和自动提交
示例:
3.2.8 Replica(副本)¶
定义:Partition 的副本,用于保证高可用。
类型: - Leader Replica:处理读写请求的主副本 - Follower Replica:从 Leader 同步数据的副本
ISR(In-Sync Replicas): - 与 Leader 保持同步的副本集合 - 只有 ISR 中的副本才能被选为新的 Leader
3.3 消息存储机制¶
3.3.1 顺序写入¶
Kafka 采用**顺序写入磁盘**的方式,充分利用磁盘顺序写入的高性能。
3.3.2 零拷贝(Zero Copy)¶
Kafka 使用零拷贝技术,减少数据在内核态和用户态之间的拷贝次数。
sequenceDiagram
participant App as 应用程序
participant Kernel as 内核
participant Disk as 磁盘
participant Network as 网络
Note over App,Network: 传统方式(4次拷贝)
App->>Kernel: 1. read() 系统调用
Kernel->>Disk: 2. 从磁盘读取到内核缓冲区
Kernel->>App: 3. 拷贝到用户缓冲区
App->>Kernel: 4. write() 系统调用
Kernel->>Network: 5. 拷贝到 Socket 缓冲区
Kernel->>Network: 6. 发送到网络
Note over App,Network: 零拷贝方式(2次拷贝)
App->>Kernel: sendfile() 系统调用
Kernel->>Disk: 1. 从磁盘读取到内核缓冲区
Kernel->>Network: 2. 直接发送到网络
3.3.3 分段存储(Segment)¶
Kafka 将 Partition 分成多个 Segment 文件,每个 Segment 文件大小固定(默认 1GB)。
Topic: orders
Partition 0/
├── 00000000000000000000.log (Segment 1)
├── 00000000000000000000.index
├── 00000000000001000000.log (Segment 2)
├── 00000000000001000000.index
└── ...
优势: - 便于日志清理(删除旧的 Segment) - 提高查找效率(通过索引文件) - 支持消息压缩
4. 安装与部署¶
4.1 环境要求¶
- Java:JDK 8 或以上
- 操作系统:Linux、macOS、Windows
- 内存:建议 4GB 以上
- 磁盘:SSD 推荐,用于消息存储
4.2 下载 Kafka¶
# 下载 Kafka(以 3.6.0 为例)
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
# 解压
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
4.3 单机部署¶
4.3.1 启动 Zookeeper¶
注意
Kafka 3.0+ 版本支持 KRaft 模式(不需要 Zookeeper),但为了兼容性,这里仍使用 Zookeeper 模式。
# 启动 Zookeeper(Kafka 自带)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 或者使用后台运行
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
4.3.2 启动 Kafka Broker¶
# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
# 或者使用后台运行
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
4.3.3 验证安装¶
# 查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 如果没有输出,说明安装成功(还没有创建 Topic)
4.4 Docker 部署(推荐)¶
4.4.1 使用 Docker Compose¶
创建 docker-compose.yml:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
启动服务:
4.5 集群部署¶
4.5.1 配置文件¶
创建多个 Broker 配置文件:
# Broker 1
cp config/server.properties config/server-1.properties
# 修改配置
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs-1
# Broker 2
cp config/server.properties config/server-2.properties
# 修改配置
broker.id=2
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs-2
# Broker 3
cp config/server.properties config/server-3.properties
# 修改配置
broker.id=3
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs-3
4.5.2 启动集群¶
# 启动所有 Broker
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &
5. 快速开始¶
5.1 创建 Topic¶
# 创建一个名为 "test-topic" 的 Topic,1个分区,1个副本
bin/kafka-topics.sh --create \
--topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# 查看 Topic 详情
bin/kafka-topics.sh --describe \
--topic test-topic \
--bootstrap-server localhost:9092
输出示例:
Topic: test-topic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
5.2 发送消息(Producer)¶
# 启动 Producer,输入消息后按 Enter 发送
bin/kafka-console-producer.sh \
--topic test-topic \
--bootstrap-server localhost:9092
交互示例:
5.3 消费消息(Consumer)¶
# 启动 Consumer,从最新位置开始消费
bin/kafka-console-consumer.sh \
--topic test-topic \
--from-beginning \
--bootstrap-server localhost:9092
输出示例:
5.4 Java 客户端示例¶
5.4.1 添加依赖¶
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
5.4.2 Producer 示例¶
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Kafka Producer 示例
* Kafka Producer Example
*/
public class SimpleProducer {
public static void main(String[] args) {
// 配置 Producer 属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建 Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", // Topic
"key-" + i, // Key
"message-" + i // Value
);
// 异步发送
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("消息发送成功: " +
"topic=" + metadata.topic() +
", partition=" + metadata.partition() +
", offset=" + metadata.offset());
} else {
System.err.println("消息发送失败: " + exception.getMessage());
}
});
}
// 关闭 Producer
producer.close();
}
}
5.4.3 Consumer 示例¶
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka Consumer 示例
* Kafka Consumer Example
*/
public class SimpleConsumer {
public static void main(String[] args) {
// 配置 Consumer 属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
// 创建 Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 Topic
consumer.subscribe(Collections.singletonList("test-topic"));
// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
}
} finally {
consumer.close();
}
}
}
6. 常见面试题¶
Q1: Kafka 为什么这么快?¶
答案要点:
- 顺序写入磁盘 - 充分利用磁盘顺序写入的高性能(600-700 MB/s)
- 零拷贝技术 - 使用
sendfile()系统调用,减少数据拷贝次数 - 批量发送 - Producer 批量发送消息,减少网络开销
- 分区并行 - 多个 Partition 可以并行处理,提高吞吐量
- 页缓存 - 利用操作系统的页缓存,减少磁盘 I/O
扩展: - 传统随机写入:100-200 MB/s - 顺序写入:600-700 MB/s - 零拷贝可以减少 50% 的 CPU 使用率
Q2: Kafka 如何保证消息不丢失?¶
答案要点:
Producer 端: 1. acks=all - 等待所有 ISR 副本确认 2. retries - 设置重试次数 3. max.in.flight.requests.per.connection=1 - 保证消息顺序
Broker 端: 1. 副本机制 - 每个 Partition 有多个副本 2. ISR 机制 - 只有 ISR 中的副本才能被选为 Leader 3. min.insync.replicas - 设置最小同步副本数
Consumer 端:
1. 关闭自动提交 - enable.auto.commit=false
2. 手动提交 Offset - 消息处理完成后再提交 Offset
3. 幂等性处理 - 保证消息处理的幂等性
Q3: Kafka 如何保证消息顺序?¶
答案要点:
- Partition 内有序 - Kafka 只保证 Partition 内的消息有序
- Key 分区策略 - 相同 Key 的消息发送到同一个 Partition
- 单 Partition 单 Consumer - 一个 Partition 只能被一个 Consumer 消费
- max.in.flight.requests.per.connection=1 - Producer 端保证顺序
限制:
- Kafka 不保证全局有序,只保证 Partition 内有序
- 如果需要全局有序,可以设置 partitions=1,但会降低吞吐量
Q4: Kafka 的副本机制是什么?¶
答案要点:
- Leader 和 Follower - 每个 Partition 有一个 Leader 和多个 Follower
- 读写分离 - 只有 Leader 处理读写请求,Follower 只同步数据
- ISR(In-Sync Replicas) - 与 Leader 保持同步的副本集合
- 故障转移 - Leader 故障时,从 ISR 中选择新的 Leader
配置:
- replication.factor - 副本因子(建议 3)
- min.insync.replicas - 最小同步副本数(建议 2)
Q5: Consumer Group 的作用是什么?¶
答案要点:
- 负载均衡 - 同一个 Consumer Group 内的 Consumer 平均分配 Partition
- 并行消费 - 多个 Consumer 可以并行消费不同的 Partition
- 故障转移 - Consumer 故障时,其 Partition 会分配给其他 Consumer
- 多订阅者 - 多个 Consumer Group 可以同时消费同一个 Topic
示例:
Topic: orders (3个 Partition)
Consumer Group A: Consumer1, Consumer2
- Consumer1 -> Partition 0, 1
- Consumer2 -> Partition 2
Consumer Group B: Consumer1
- Consumer1 -> Partition 0, 1, 2 (全部)
📚 扩展阅读¶
💡 学习提示:Kafka 是分布式系统的核心基础设施,建议先理解核心概念,再深入学习高级特性。
🔄 持续更新中... | 最后更新:2025年1月