【kafka实战】03 SpringBoot使用kafka生产者和消费者示例

2023-09-22 11:41:32

本节主要介绍用SpringBoot进行开发时,使用kafka进行生产和消费

一、引入依赖

<dependencies>
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.6</version>
    </dependency>
</dependencies>

二、添加topic

使用offset explore软件,添加一个topic
在这里插入图片描述

三、配置文件

server:
  port: 8080
spring:
  kafka:
    consumer:
      # Kafka服务器
      bootstrap-servers: 192.168.56.201:9092
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      #auto-commit-interval: 2s
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
      # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
      # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
      # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
      # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
      # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
      # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
      max-poll-records: 100
    properties:
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      max:
        poll:
          interval:
            ms: 600000
      # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
      session:
        timeout:
          ms: 10000
    listener:
      # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
      concurrency: 4
      # 自动提交关闭,需要设置手动消息确认
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
      missing-topics-fatal: false
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      poll-timeout: 600000

    producer:
      # Kafka服务器
      bootstrap-servers: 192.168.56.201:9092
      # 发生错误后,消息重发的次数,开启事务必须设置大于0。
      retries: 3
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      # 开启事务时,必须设置为all
      acks: all
      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 生产者内存缓冲区的大小。
      buffer-memory: 1024000
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

四、生产者代码

@Slf4j
@RestController
@RequestMapping("/msg")
public class SendMessageController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("send")
    public String send() {
        for (int i = 0; i < 100; i++) {
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setAddress("成都市高新区");
            orderInfo.setOrderId(String.valueOf(i));
            orderInfo.setProductName("华为P60:" + i);
            kafkaTemplate.send("order.topic", "order:" + i, new Gson().toJson(orderInfo));
        }
        return "success";
    }
}

五、消费者代码

消费者代码采用手动ack的方式

@Slf4j
@Component
public class KafkaOrderConsumer {
    @KafkaListener(topics = "order.topic", groupId = "orderGroup")
    public void consumeOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("消费订单消息key={},value={}", record.key(), record.value());
        ack.acknowledge();
    }
}

代码git仓库:https://gitee.com/syk1234/mqdmo

更多推荐

rust容器

标准库提供了常见的容器。包括向量(Vector)、映射(HashMap)、集合(HashSet)。一、向量Vector数组有一个缺点,就是它的长度在编译时就确定了,一旦确定就永不可更改。向量是一个长度可变的数组。向量的存储在堆上,因此长度可变。Rust在标准库中定义了结构体Vec用于表示向量。(一)定义向量一维向量1.

使用Docker构建轻量级Linux容器

Docker是一个开源的容器化平台,可以帮助用户快速创建、部署和管理应用程序的轻量级Linux容器。通过Docker,用户可以将应用程序及其所有依赖项打包成一个独立的容器镜像,并在各种环境中运行,无需担心环境差异和依赖冲突。下面将详细介绍使用Docker构建轻量级Linux容器的过程。一、Docker简介Docker基

SpringMVC之自定义注解

目录一.什么是Java注解1.简介2.注解的分类3.JDK元注解二.自定义注解1.自定义注解的分类1.1.标记Annotation:1.2.元数据Annotation:2.如何使用自定义注解3.案例演示3.1获取类、方法及属性上的注解值3.2@Inherited的使用3.3获取类属性上的注解属性值3.4获取参数修饰注解

【Django入门】第一个Django项目

Django,广为人知的PythonWeb框架,以其强大而又灵活的特点脱颖而出。其宣传口号是:“为完美主义者开发的框架”。这篇文章将为你揭示创建第一个Django项目的魔法以及Django项目的基本结构。为什么选择Django?在深入学习前,我们先看看为什么要选择Django:快速开发:Django遵循“约定优于配置”

《ADS2011射频电路设计与仿真实例》功率放大器设计的输入输出匹配

徐兴福这本书的6.6Smith圆图匹配这一节中具体匹配时,直接给出了电容与串联微带的值,没有给出推导过程,我一开始以为是省略了详细推导过程,后来发现好像基本上是可以随便自己设的。以输入匹配(书本6.6.4输入匹配电路的建立)为例:因为它这里要求要在Q=1.5的等Q圆里面进行匹配,可以按照书本的操作显示出1.5的等Q圆,

【MySQL入门实战5】-Linux PRM 包安装MySQL

📢📢📢📣📣📣哈喽!大家好,我是【IT邦德】,江湖人称jeames007,10余年DBA工作经验一位上进心十足的【大数据领域博主】!😜😜😜中国DBA联盟(ACDU)成员,目前从事DBA及程序编程擅长主流数据Oracle、MySQL、PG运维开发,备份恢复,安装迁移,性能优化、故障应急处理等。✨如果有对【

列表页面新增 字段查询 ,点击查询后,前端页面和后端控制台 出现红色报错信息,查询数据失败。

项目场景:项目场景简述:列表页面新增字段查询,点击查询后,前端页面和后端控制台出现红色报错信息,查询数据失败。问题描述问题描述:<el-selectv-model="dataForm.engineerId"clearable@focus="getEngineerList"placeholder="请选择"filtera

真实的产品开发中,后端的设计规约可以写哪些

真实的产品开发中,后端的设计规约可以写哪些产品开发的后端设计规约通常包括以下内容:数据模型设计:详细描述数据库的结构,包括数据表的设计、字段的定义和关系的设置等。业务逻辑设计:详细描述后端的业务逻辑,包括各种算法的设计、业务流程的控制等。API设计:定义后端提供的各种服务接口,包括接口的URL、请求方法、请求参数、响应

CMMI认证是什么?为什么IT类企业都在申请?

如今,越来越多的公司都会去申请获得CMMI的认证,也会以通过认证为荣,那么CMMI认证的意义和目的是什么?怎样可以拿到CMMI认证呢?什么是CMMI?CMMI是由美国卡内基梅隆大学的软件工程研究所(SEI)提出的能力成熟度模型集成,致力于以更加系统和一致的框架来指导组织改善软件过程,提高产品和服务的开发、获取和维护能力

华为云ROMA Connect亮相Gartner®全球应用创新及商业解决方案峰会,助力企业应用集成和数字化转型

9月13日-9月14日Gartner全球应用创新及商业解决方案峰会在伦敦举行本届峰会以“重塑软件交付,驱动业务价值”为主题,全球1000多位业内专家交流最新的企业应用、软件工程、解决方案架构、集成与自动化、API等企业IT战略和新兴技术热门话题。9月13日,华为流程ITROMA首席专家陆昕,华为云ROMAConnect

在JavaScript中,什么是柯里化(currying)?

聚沙成塔·每天进步一点点⭐专栏简介⭐柯里化(Currying)⭐写在最后⭐专栏简介前端入门之旅:探索Web开发的奇妙世界欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发者,这里都将为你提供一个系统而又亲切的学习

热文推荐