type
status
date
slug
summary
tags
category
icon
password
由于之前对Java使用了解不够深入,做毕设期间学到很多,在这记录一下

1. Spring Kafka 简介

Spring Kafka 是 Spring 框架的一个子项目,专门为 Apache Kafka 提供集成支持。它简化了 Kafka 消息的生产和消费过程,并且与 Spring 生态系统无缝结合。通过 Spring Kafka,我们可以在 Spring Boot 应用中轻松实现消息的发送和接收。

2. @KafkaListener 注解

@KafkaListener 是 Spring Kafka 的核心注解,用于标记一个方法作为 Kafka 消息的监听器。
  • 所属包:org.springframework.kafka.annotation
  • 作用:监听指定的 Kafka 主题,当主题中有新消息时,自动调用被注解的方法。
  • 主要属性
    • topics:指定监听的 Kafka 主题,可以是静态主题名或动态配置。
    • containerFactory:指定使用的 Kafka 监听器容器工厂,用于自定义消息处理逻辑。

示例代码

 
  • topics = "#{'${austin.business.topic.name}'}":通过 Spring 的 SpEL(Spring Expression Language)从配置文件中动态读取主题名称。
  • containerFactory = "filterContainerFactory":使用自定义的容器工厂来处理消息。

3. 自定义容器工厂(Container Factory)

Spring Kafka 支持通过自定义容器工厂来灵活控制消息处理行为,例如实现消息过滤、并发处理等。
  • 默认容器工厂:Spring Kafka 内置了默认的 kafkaListenerContainerFactory。
  • 自定义容器工厂:可以手动定义容器工厂,并在 @KafkaListener 中通过 containerFactory 属性引用。

3.1 消息过滤示例

通过自定义容器工厂,可以过滤掉不符合条件的消息。

定义自定义容器工厂

  • setRecordFilterStrategy:设置过滤策略,当方法返回 true 时,消息会被过滤掉(不处理)。

使用自定义容器工厂

 

4. 动态配置 Kafka 主题

Spring Kafka 支持通过占位符和 SpEL 动态配置 Kafka 主题名称。

示例代码

 
  • topics = "#{'${austin.business.topic.name}'}":从配置文件(如 application.properties 或 application.yml)中读取 austin.business.topic.name 的值作为主题名称。

5. 并发处理和批量处理

自定义容器工厂还支持消息的并发处理和批量处理。
  • 并发处理:通过 setConcurrency 设置消费者线程数量。
    • 批量处理:通过 setBatchListener 启用批量消息处理。

      6. 错误处理

      Spring Kafka 允许通过自定义错误处理器来处理消息消费中的异常。

      示例代码

      • setErrorHandler:设置自定义的错误处理器 MyErrorHandler,用于捕获和处理异常。

      7. 总结

      Spring Kafka 提供了一套简洁而强大的 API,帮助开发者轻松实现 Kafka 消息的发送和接收。通过 @KafkaListener 注解和自定义容器工厂,我们可以灵活地控制消息消费行为,满足不同的业务需求。这篇记录涵盖了注解用法、自定义容器工厂、动态主题配置、并发与批量处理等核心知识点,适合作为学习笔记保存。