springboot集成kafka

2023-09-14 17:58:47

创建工程

  • 父工程pom

父工程做了子工程管理和包管理

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
    </parent>

    <groupId>com.wzw</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <packaging>pom</packaging>

    <modules>
        <module>producer</module>
    </modules>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>2.4.13</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.8.1</version>
            </dependency>
        </dependencies>
    </dependencyManagement>


</project>

发布者

  • pom
    引入包,继承父工程
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.wzw</groupId>
        <artifactId>kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <groupId>com.wzw</groupId>
    <artifactId>producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>producer</name>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>


</project>

  • yml
server:
  port: 7070
spring:
  kafka:
    bootstrap-servers: 192.168.3.32:19092
    producer: # 生产者
      retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
  • config配置类
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
  • 请求测试接口
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public void sendMessage(String message) {
        kafkaTemplate.send("my-topic", message);
    }

}

订阅者

  • pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.wzw</groupId>
        <artifactId>kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>consumer</name>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

</project>

  • yml
server:
  port: 7071
spring:
  kafka:
    bootstrap-servers: 192.168.3.32:19092
    producer: # 生产者
      retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
  • 配置类
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

  • 监听类
    有消息进来就输出
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "default-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }

}

测试

在这里插入图片描述

注意事项

kafka的配置文件kafka_2.13-2.8.1/config/server.properties,里面有三个配置需要注意,不然的话服务器测试正常,但是java或者其它地址的客户端报错
在这里插入图片描述
我的环境是docker搭建的,

  • listeners
    kafka容器的网络地址是172.18.0.5,kafka的端口是9092
    listeners=PLAINTEXT://172.18.0.5:9092
    我的虚拟机ip是192.168.3.32,然后启动kafka容器的时候,映射的地址是-p 19092:9092
    不配置正确的话,会连接不到,可能报错Connection to node 1 (kafka1/172.18.0.2:9092) could not be established. Broker may not be available.
  • advertised.listeners
    advertised.listeners=PLAINTEXT://192.168.3.32:39092
    不配置正确的话,会连接不到,可能报错kafka disconnecting from node xxx due to request timeout.
  • advertised.host.name
    最底下有个属性advertised.host.name
    配置成kafka容器的网络地址172.18.0.5
    advertised.host.name=172.18.0.5
    如果这里不配置advertised.host.name为ip,而是使用名称,会报错,
    配置为:advertised.host.name=kafka
    报错如下
    Error connecting to node kafka:19092 (id: 1001 rack: null)
    
    解决方法:访问的客户端需要修改hosts,太麻烦,所以直接修改成ip
更多推荐

【PyTorch 攻略(5/7)】训练和模型

一、说明训练模型是一个迭代过程。每次迭代称为纪元。该模型对输出进行猜测,计算其猜测中的误差(损失),收集误差相对于其参数的导数,并使用梯度下降优化这些参数。我们从这里加载前面的代码。%matplotlibinlineimporttorchfromtorchimportnnfromtorch.utils.dataimpo

1399: 最小生成树

题目描述最小生成树问题是实际生产生活中十分重要的一类问题。假设需要在n个城市之间建立通信联络网,则连通n个城市只需要n-1条线路。这时,自然需要考虑这样一个问题,即如何在最节省经费的前提下建立这个通信网。可以用连通网来表示n个城市以及n个城市之间可能设置的通信线路,其中网的顶点表示城市,边表示两个城市之间的线路,赋于边

Python中的函数未定义的错误

前言:嗨喽~大家好呀,这里是魔王呐❤~!python更多源码/资料/解答/教程等点击此处跳转文末名片免费获取通过这个解释,我们将了解当Python程序显示类似NameError:name‘’isnotdefined的错误时,即使该函数存在于脚本中,也会出现这种情况。我们还学习了当我们使用拼写错误的变量或没有导入的内置函

Zookeeper集群 + Kafka集群

目录1、Zookeeper1.1Zookeeper概述1.1.1Zookeeper定义1.1.2Zookeeper工作机制1.1.3Zookeeper特点1.1.4Zookeeper数据结构1.1.5Zookeeper应用场景1.2Zookeeper选举机制1.3部署Zookeeper集群1.3.1安装前准备1.3.2

肖sir___环境的讲解详情__002

一、环境讲解1、jdk什么是JDK?JDK的作用?JDK是java语言的软件开发工具包,能解释java程序,编译java语言,没有jdk的话无法编译Java程序。包含了各种类库和工具,机器不是直接识别语言的,会借助工具的编译器,可以理解为翻译官,将一门语言翻译为机器可以识别的语言,Jdk能对Java进行编译,我们的工具

人机合作的有效性、安全性和可信度

确定人机协同中权力归属的原则和方法可以根据具体情境和任务的要求进行灵活选择。以下是一些常见的方法:专业领域授权:在专业领域中,权力可能更多地授予具有相关知识和经验的人类专家。他们能够理解和分析复杂的情况,并基于其专业判断做出决策。机器可以作为工具或辅助手段,提供数据分析、模型预测等支持。分工合作:根据任务的不同阶段或特

java版Spring Cloud+Mybatis+Oauth2+分布式+微服务+实现工程管理系统

鸿鹄工程项目管理系统SpringCloud+SpringBoot+Mybatis+Vue+ElementUI+前后端分离构建工程项目管理系统1.项目背景一、随着公司的快速发展,企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性,公司对内部工程管理的提升提出了更高的要求。二、企业通过

【JavaScript】video标签配置及相关事件:

文章目录一、标签配置:二、事件:三、案例:一、标签配置:标签名描述src要播放的路径地址autoplay是否自动播放,默认值是false,(Boolean)loop是否循环播放,默认值是false,(Boolean)muted是否静音播放,默认值是false,(Boolean)initial-time制定和视频初始播放

线性代数的本质(八)——内积空间

文章目录内积空间内积空间正交矩阵与正交变换正交投影施密特正交化实对称矩阵的对角化内积空间内积空间三维几何空间是线性空间的一个重要例子,如果分析一下三维几何空间,我们就会发现它还具有一般线性空间不具备的重要性质:三维几何空间中向量有长度和夹角,这称为三维几何空间的度量性质。现在,我们在一般线性空间中引入度量有关的概念。我

【面试刷题】——Qt使用的编译器

Qt可以使用多种不同的编译器,具体取决于你的开发环境和目标平台。以下是一些常见的Qt支持的编译器:MinGW:MinGW(MinimalistGNUforWindows)是Windows下的一个轻量级开发工具集,通常与Qt一起使用来开发Windows应用程序。Qt的官方发行版通常包括了MinGW编译器,以便在Windo

关于Qt适配不同分辨率和缩放率时可能遇到的问题和解决方案

如果没有特殊的处理,Qt的UI窗口在不同的分辨率和缩放率下,其显示效果可能会出现问题,常见的有:子控件堆叠,无法显示完整窗口尺寸变大,超出屏幕的显示范围控件变形,长宽比不合理界面模糊字体变大,控件尺寸却没有变化有两种方式可以对UI界面进行良好的缩放:Qt不做任何事情,由windows系统负责缩放windows系统不做任

热文推荐