Kafka 实战应用¶
📖 目录¶
1. Spring Boot 集成 Kafka¶
1.1 添加依赖¶
1.1.1 Maven 依赖¶
<dependencies>
<!-- Spring Boot Kafka Starter -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring Boot Web(如果需要 REST API) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- JSON 序列化(如果需要) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
1.1.2 Gradle 依赖¶
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-web'
}
1.2 配置文件¶
1.2.1 application.yml¶
spring:
kafka:
# Broker 地址
bootstrap-servers: localhost:9092
# Producer 配置
producer:
# Key 序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息确认机制
acks: all
# 重试次数
retries: 3
# 批次大小(字节)
batch-size: 32768
# 等待时间(毫秒)
linger-ms: 10
# 缓冲区大小(字节)
buffer-memory: 67108864
# 压缩类型
compression-type: snappy
# 幂等性
enable-idempotence: true
# Consumer 配置
consumer:
# Consumer Group ID
group-id: my-consumer-group
# Key 反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 偏移量重置策略
auto-offset-reset: earliest
# 是否自动提交 Offset
enable-auto-commit: false
# 每次拉取的最大记录数
max-poll-records: 500
# 最大拉取间隔(毫秒)
max-poll-interval-ms: 300000
# Listener 配置
listener:
# 确认模式(manual - 手动提交,batch - 批量提交)
ack-mode: manual_immediate
# 并发消费者数量
concurrency: 3
1.2.2 application.properties¶
# Kafka Broker 地址
spring.kafka.bootstrap-servers=localhost:9092
# Producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=32768
spring.kafka.producer.linger-ms=10
spring.kafka.producer.buffer-memory=67108864
spring.kafka.producer.compression-type=snappy
spring.kafka.producer.enable-idempotence=true
# Consumer 配置
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.max-poll-interval-ms=300000
# Listener 配置
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.concurrency=3
1.3 Producer 使用¶
1.3.1 注入 KafkaTemplate¶
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Kafka Producer 服务
* Kafka Producer Service
*/
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息(异步)
* Send message asynchronously
*/
public void sendMessage(String topic, String key, String message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("消息发送成功: " +
"topic=" + result.getRecordMetadata().topic() +
", partition=" + result.getRecordMetadata().partition() +
", offset=" + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
System.err.println("消息发送失败: " + ex.getMessage());
}
});
}
/**
* 发送消息(同步)
* Send message synchronously
*/
public void sendMessageSync(String topic, String key, String message) {
try {
SendResult<String, String> result =
kafkaTemplate.send(topic, key, message).get();
System.out.println("消息发送成功: " + result.getRecordMetadata().offset());
} catch (Exception e) {
System.err.println("消息发送失败: " + e.getMessage());
}
}
}
1.3.2 REST API 示例¶
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Kafka 消息发送控制器
* Kafka Message Producer Controller
*/
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestBody MessageRequest request) {
kafkaProducerService.sendMessage(
request.getTopic(),
request.getKey(),
request.getMessage()
);
return "消息已发送";
}
// 消息请求 DTO
public static class MessageRequest {
private String topic;
private String key;
private String message;
// Getters and Setters
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getKey() { return key; }
public void setKey(String key) { this.key = key; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
}
}
1.4 Consumer 使用¶
1.4.1 @KafkaListener 注解¶
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* Kafka Consumer 服务
* Kafka Consumer Service
*/
@Component
public class KafkaConsumerService {
/**
* 消费消息(自动提交 Offset)
* Consume message with auto commit
*/
@KafkaListener(topics = "orders", groupId = "order-consumer-group")
public void consumeOrderMessage(@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.printf("收到消息: topic=%s, partition=%d, offset=%d, message=%s%n",
topic, partition, offset, message);
// 业务处理
processOrderMessage(message);
}
/**
* 消费消息(手动提交 Offset)
* Consume message with manual commit
*/
@KafkaListener(topics = "payments", groupId = "payment-consumer-group")
public void consumePaymentMessage(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
try {
System.out.printf("收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// 业务处理
processPaymentMessage(record.value());
// 手动提交 Offset
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
// 不提交 Offset,消息会重新消费
}
}
/**
* 批量消费消息
* Batch consume messages
*/
@KafkaListener(topics = "logs", groupId = "log-consumer-group",
containerFactory = "kafkaListenerContainerFactory")
public void consumeLogMessages(@Payload List<String> messages,
Acknowledgment acknowledgment) {
try {
for (String message : messages) {
System.out.println("收到日志消息: " + message);
processLogMessage(message);
}
// 批量提交 Offset
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
}
}
// 业务处理方法
private void processOrderMessage(String message) {
// 处理订单消息
// Process order message
}
private void processPaymentMessage(String message) {
// 处理支付消息
// Process payment message
}
private void processLogMessage(String message) {
// 处理日志消息
// Process log message
}
}
1.4.2 批量消费配置¶
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka Consumer 配置
* Kafka Consumer Configuration
*/
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 批量拉取
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置批量消费
factory.setBatchListener(true);
// 设置手动提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 设置并发消费者数量
factory.setConcurrency(3);
return factory;
}
}
2. 实际业务场景应用¶
2.1 订单系统¶
2.1.1 场景描述¶
订单创建后,需要: 1. 发送订单创建事件 2. 库存服务消费事件,扣减库存 3. 支付服务消费事件,创建支付订单 4. 物流服务消费事件,创建物流单
2.1.2 实现示例¶
订单服务(Producer):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* 订单服务
* Order Service
*/
@Service
public class OrderService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private OrderRepository orderRepository;
/**
* 创建订单
* Create order
*/
public Order createOrder(OrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus("CREATED");
order = orderRepository.save(order);
// 2. 发送订单创建事件
OrderEvent event = new OrderEvent();
event.setOrderId(order.getId());
event.setUserId(order.getUserId());
event.setProductId(order.getProductId());
event.setQuantity(order.getQuantity());
event.setAmount(order.getAmount());
event.setEventType("ORDER_CREATED");
event.setTimestamp(System.currentTimeMillis());
kafkaTemplate.send("order-events", order.getId().toString(),
JSON.toJSONString(event));
return order;
}
}
库存服务(Consumer):
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
/**
* 库存服务
* Inventory Service
*/
@Service
public class InventoryService {
@Autowired
private InventoryRepository inventoryRepository;
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderEvent(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
try {
OrderEvent event = JSON.parseObject(record.value(), OrderEvent.class);
if ("ORDER_CREATED".equals(event.getEventType())) {
// 扣减库存
Inventory inventory = inventoryRepository.findByProductId(event.getProductId());
if (inventory.getStock() >= event.getQuantity()) {
inventory.setStock(inventory.getStock() - event.getQuantity());
inventoryRepository.save(inventory);
System.out.println("库存扣减成功: productId=" + event.getProductId() +
", quantity=" + event.getQuantity());
} else {
throw new RuntimeException("库存不足");
}
}
// 提交 Offset
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("处理订单事件失败: " + e.getMessage());
// 不提交 Offset,消息会重新消费
}
}
}
2.2 日志收集系统¶
2.2.1 场景描述¶
应用日志发送到 Kafka,然后由日志处理服务消费,存储到 Elasticsearch。
2.2.2 实现示例¶
日志发送(Producer):
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* 日志发送器
* Log Sender
*/
@Component
public class LogSender {
private static final Logger logger = LoggerFactory.getLogger(LogSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送日志
* Send log
*/
public void sendLog(String level, String message, String serviceName) {
LogEvent logEvent = new LogEvent();
logEvent.setLevel(level);
logEvent.setMessage(message);
logEvent.setServiceName(serviceName);
logEvent.setTimestamp(System.currentTimeMillis());
logEvent.setThreadName(Thread.currentThread().getName());
kafkaTemplate.send("application-logs", serviceName, JSON.toJSONString(logEvent));
}
}
日志处理(Consumer):
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
/**
* 日志处理服务
* Log Processing Service
*/
@Service
public class LogProcessingService {
@Autowired
private ElasticsearchService elasticsearchService;
@KafkaListener(topics = "application-logs", groupId = "log-processing-service",
containerFactory = "kafkaListenerContainerFactory")
public void processLogs(@Payload List<String> messages,
Acknowledgment acknowledgment) {
try {
List<LogEvent> logEvents = new ArrayList<>();
for (String message : messages) {
LogEvent logEvent = JSON.parseObject(message, LogEvent.class);
logEvents.add(logEvent);
}
// 批量存储到 Elasticsearch
elasticsearchService.bulkIndex(logEvents);
// 提交 Offset
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("处理日志失败: " + e.getMessage());
}
}
}
3. 消息序列化与反序列化¶
3.1 JSON 序列化¶
3.1.1 配置 JSON 序列化器¶
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka JSON 序列化配置
* Kafka JSON Serialization Configuration
*/
@Configuration
public class KafkaJsonConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 信任所有包
return new DefaultKafkaConsumerFactory<>(props);
}
}
3.2.2 使用 JSON 序列化¶
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* JSON 消息发送示例
* JSON Message Send Example
*/
@Service
public class JsonMessageService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送 JSON 消息
* Send JSON message
*/
public void sendJsonMessage(OrderEvent event) {
kafkaTemplate.send("order-events", event.getOrderId().toString(), event);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* JSON 消息消费示例
* JSON Message Consume Example
*/
@Component
public class JsonMessageConsumer {
@KafkaListener(topics = "order-events", groupId = "order-consumer-group")
public void consumeJsonMessage(OrderEvent event) {
System.out.println("收到订单事件: " + event);
// 处理订单事件
}
}
3.2 Avro 序列化(高级)¶
Avro 是一种二进制序列化格式,性能优于 JSON,适合大数据场景。
配置示例:
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
@Bean
public ProducerFactory<String, Object> avroProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
configProps.put("schema.registry.url", "http://localhost:8081");
return new DefaultKafkaProducerFactory<>(configProps);
}
4. 错误处理与重试机制¶
4.1 错误处理¶
4.1.1 死信队列(DLQ)¶
配置死信队列:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* 错误处理示例
* Error Handling Example
*/
@Component
public class ErrorHandlingConsumer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "orders", groupId = "order-consumer-group")
public void consumeOrder(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
try {
// 处理消息
processOrder(record.value());
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
// 发送到死信队列
kafkaTemplate.send("orders-dlq", record.key(), record.value());
// 提交 Offset,避免重复消费
acknowledgment.acknowledge();
}
}
private void processOrder(String orderJson) {
// 业务处理
// Business processing
}
}
4.1.2 重试机制¶
配置重试:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@Configuration
public class KafkaErrorHandlerConfig {
@Bean
public DefaultErrorHandler errorHandler() {
// 重试策略:每 1 秒重试一次,最多重试 3 次
BackOff backOff = new FixedBackOff(1000L, 3L);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(backOff);
// 重试失败后,发送到死信队列
errorHandler.setCommitRecovered(true);
return errorHandler;
}
}
使用重试机制:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.stereotype.Component;
@Component
public class RetryConsumer implements ConsumerSeekAware {
@KafkaListener(topics = "orders", groupId = "order-consumer-group",
errorHandler = "errorHandler")
public void consumeOrder(String message) {
// 处理消息,如果失败会自动重试
processOrder(message);
}
private void processOrder(String message) {
// 业务处理
}
}
5. 监控与运维¶
5.1 监控指标¶
5.1.1 Producer 监控指标¶
- 消息发送速率 - messages-per-second
- 消息发送延迟 - record-send-total, record-send-rate
- 发送失败率 - record-error-rate
- 批次大小 - batch-size-avg
5.1.2 Consumer 监控指标¶
- 消息消费速率 - records-consumed-rate
- 消费延迟 - records-lag-max
- Offset 提交延迟 - commit-latency-avg
- 重平衡次数 - rebalance-rate
5.2 常用运维命令¶
# 查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详情
bin/kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
# 查看 Consumer Group 列表
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看 Consumer Group 详情
bin/kafka-consumer-groups.sh --describe \
--group my-consumer-group \
--bootstrap-server localhost:9092
# 查看 Consumer Group 的 Offset
bin/kafka-consumer-groups.sh --describe \
--group my-consumer-group \
--bootstrap-server localhost:9092 \
--verbose
# 重置 Consumer Group Offset
bin/kafka-consumer-groups.sh --reset-offsets \
--group my-consumer-group \
--topic orders \
--to-earliest \
--execute \
--bootstrap-server localhost:9092
6. 最佳实践¶
6.1 Producer 最佳实践¶
- 使用批量发送 - 设置
batch.size和linger.ms - 启用压缩 - 设置
compression.type=snappy或lz4 - 启用幂等性 - 设置
enable.idempotence=true - 合理设置 acks - 根据业务需求选择
acks=all或acks=1 - 异步发送 - 使用异步发送提高吞吐量
- 错误处理 - 实现 Callback 处理发送失败
6.2 Consumer 最佳实践¶
- 手动提交 Offset - 设置
enable.auto.commit=false - 批量消费 - 设置
max.poll.records和批量监听器 - 快速处理 - 避免长时间处理导致超时
- 幂等性处理 - 保证消息处理的幂等性
- 错误处理 - 实现死信队列和重试机制
- 监控消费延迟 - 监控
records-lag-max指标
6.3 Topic 设计最佳实践¶
- 合理设置 Partition 数量 - 根据吞吐量和 Consumer 数量
- 设置副本因子 -
replication.factor>=3 - 设置保留时间 - 根据业务需求设置
retention.ms - 命名规范 - 使用有意义的 Topic 名称
- 避免过多 Topic - 过多的 Topic 会增加管理复杂度
7. 常见面试题¶
Q1: Spring Boot 如何集成 Kafka?¶
答案要点:
- 添加依赖 -
spring-kafka - 配置文件 - 配置 Producer 和 Consumer
- 使用 KafkaTemplate - 发送消息
- 使用 @KafkaListener - 消费消息
- 配置序列化器 - JSON、Avro 等
Q2: 如何保证消息不丢失?¶
答案要点:
- Producer 端 -
acks=all,retries,enable.idempotence=true - Broker 端 -
replication.factor>=3,min.insync.replicas>=2 - Consumer 端 -
enable.auto.commit=false, 手动提交 Offset
Q3: 如何处理消息重复消费?¶
答案要点:
- 幂等性处理 - 保证业务逻辑的幂等性
- 去重机制 - 使用 Redis 或数据库去重
- 唯一标识 - 使用消息的唯一 ID 去重
Q4: 如何提高 Kafka 的吞吐量?¶
答案要点:
- 批量发送 - 设置
batch.size和linger.ms - 压缩 - 设置
compression.type - 增加 Partition - 提高并行度
- 增加 Consumer - 提高消费速度
- 异步发送 - 使用异步发送提高吞吐量
📚 扩展阅读¶
💡 学习提示:Kafka 实战应用需要结合实际业务场景,建议从简单的场景开始,逐步深入。
🔄 持续更新中... | 最后更新:2025年1月