学习笔记 --- RabbitMQ

2023-08-22 14:32:10

简介

RabbitMQ是一款开源的消息队列中间件,它实现了高级消息队列协议(AMQP)标准。作为一个消息代理,RabbitMQ可以在应用程序之间可靠地传递和存储消息,并支持多种消息传递模式。

基本概念和特性

  • 消息:在RabbitMQ中,消息是传输的基本单位。它由消息体和可选的属性组成,消息体是要传递的实际数据,而属性则包含有关消息的元数据信息。

  • 队列:队列是消息的容器,它类似于一个缓冲区,用于存储待处理的消息。生产者将消息发送到队列,消费者从队列中接收和处理消息。

  • 交换机:交换机是消息路由的核心组件,它接收来自生产者的消息,并根据特定的路由规则将消息分发给一个或多个绑定到它上面的队列。

  • 绑定:绑定定义了交换机和队列之间的关系,它指定了消息在被发送到交换机时如何被路由到与之绑定的队列。

  • 路由模式:RabbitMQ支持多种路由模式,包括直连、主题、扇出和头部路由等。不同的路由模式提供了不同的消息分发机制,以满足不同的应用需求。

  • 可靠性:RabbitMQ提供了持久化消息、手动确认和事务等机制来确保消息的可靠性传递和处理。

  • 高可用性:通过设置集群和镜像队列,RabbitMQ可以实现高可用性,确保即使某个节点或队列发生故障,系统仍然可用。

  • 插件生态系统:RabbitMQ具有丰富的插件生态系统,可以扩展其功能和集成其他系统。

Spring Boot 整合 RabbitMQ 简单示例

  • 添加依赖:在Spring Boot项目的pom.xml文件中添加以下依赖,以引入RabbitMQ客户端库:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置连接信息:在application.properties(或application.yml)文件中配置RabbitMQ的连接信息,如下所示
spring:
  # rabbitmq相关配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: demo
    password: demo
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /demo
    # 链接超时时间
    connection-timeout: 1000
    # 缓存配置
    cache:
      channel:
        # 要保留在高速缓存中的通道数,注意此值不能超过 requested-channel-max 
        size: 2980
        # 如果已达到高速缓存大小,则等待获取通道的持续时间。如果为0,则始终创建一个新通道。
        checkout-timeout: 500
      connection:
        # 链接模式 channel 和 connection 两种,建议channel,channel使用ThreadLocal绑定,记得使用线程池
        mode: channel
    publisher-returns: true
    publisher-confirm-type: simple
    # 监听器相关配置(可以看作消费者链接工厂配置)
    listener:
      # 设置默认连接器(simple)
      type: simple
      # 简单的链接工厂(默认)
      simple:
        # 可选最大消费者数量 cpu * 2
        max-concurrency: 20
        # 手动确认消费
        acknowledge-mode: manual
        # 初始化消费者数量
        concurrency: 10
        # 消费者一次从MQ服务器拉取的数据量,
        prefetch: 250
    # 最大channel 数量,该数量和rabbit mq server中配置相关,二者取最小值
    requested-channel-max: 3000
    # 生产者配置
    template:
      retry:
        # 是否开启重试
        enabled: true
        # 最大重试次数 5 次
        max-attempts: 5
  • 创建消息发送者:创建一个用于发送消息的消息发送者(生产者)类。
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageSender {

    private final AmqpTemplate rabbitTemplate;

    @Autowired
    public MessageSender(AmqpTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("<交换机名称>", "<路由键>", message);
    }
}

  • 创建消息接收者:创建一个用于接收消息的消息接收者(消费者)类。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {

    @RabbitListener(queues = "<队列名称>")
    public void receiveMessage(String message) {
        // 处理接收到的消息
        System.out.println("Received message: " + message);
    }
}

注意事项

channel 部分

  • channel的最大限制,取决于 requested-channel-max 和Rabbit-MQ server中配置相关,二者取最小值
  • channel 是线程绑定对象,使用ThreadLocal存储,多线程环境下一定要注意
  • channel 的数量,是针对每个应用,也就是说,如果server配置5000,requested-channel-max=5000,那么每个应用都可以开启5000个 channel
  • channel 是消费者和生产者共享
  • 如果channel 不足,则会抛出异常 org.springframework.amqp.AmqpTimeoutException: No available channels,需要做好容错处理

消费者部分

  • 在RabbitMQ的程序中,消费者称之为 Listener
  • 消费者有两种模式,分别是 SIMPLE 和 DIRECT,推荐使用simple,通过配置参数 spring.rabbitmq.listener.type 进行设置
    • SIMPLE:RabbitMQ 使用者将消息分派到调用者线程的容器
    • DIRECT :在 RabbitMQ 消费者线程上直接调用侦听器的容器。
  • simple和direct 都有自己的独立配置,不要混用,混用也不会生效
  • simple中 prefetch 参数是针对每个消费者的,具体工作流程参考下图
  • concurrency 和 max-concurrency 数量差距不要太大,要不然数据会有比较严重的积压,因为扩充到 max也需要一定时间

生产者部分

  • 连接管理:确保在处理完消息后正确关闭连接,以避免资源泄漏。建议使用连接池来管理和复用连接。连接池推荐使用 CachingConnectionFactory ,默认的也是这种

  • 消息确认:当发送消息到 RabbitMQ 时,可以选择等待确认。这样可以确保消息被成功处理,或者在发生错误时进行重试。确保在代码中实现消息确认机制,以保证消息的可靠性。

  • 消息持久化:为了防止消息丢失,在发送消息时应将消息标记为持久化。这样即使 RabbitMQ 服务意外关闭,消息也会被保存在磁盘上,并在重新启动后恢复。

  • 序列化与反序列化:在将对象转换为消息发送到 RabbitMQ 之前,需要进行序列化操作。同样,在接收到消息后,需要进行相应的反序列化操作。确保选择一种适合你的数据类型和语言的序列化方式。并且你的生产者和消费者的序列化方式必须相同

  • 错误处理:当发送消息时,要考虑可能出现的异常情况。例如,RabbitMQ 服务器不可用或消息队列已满。在代码中实现适当的错误处理机制,可以记录日志、重试发送或采取其他措施

附注,简易工作流程图

自己整理的 Spring boot 中RabbitMQ工作的简易流程图,可能有不对的地方,仅供参考

请添加图片描述

更多推荐

【Linux】自制shell

本期我们利用之前学过的知识,写一个shell命令行程序目录一、初始代码二、使用户输入的ls指令带有颜色分类三、解决cd指令后用户所在路径不变化问题3.1chdir函数四、关于环境变量的问题一、初始代码#include<stdio.h>#include<unistd.h>#include<stdlib.h>#includ

模块化开发_php中使用redis

redis介绍和安装redis数据库,支持数据持久化,常用与分布式锁,支持事务,持久化,非关心型数据库区别:关系型数据库:硬盘,安全,结构简单,易于理解,浪费空间非关系型数据库:内存,断电丢失数据,读写速度快,内存的速度是硬盘的100倍redis:用于缓存压力,提升网站访问速度三种类型:持久化(将数据保存到硬盘中,再开

02. Springboot集成Flyway

目录1、前言2、什么是Flyway?3、为什么要使用Flyway?4、简单示例4.1、创建SpringBoot工程4.2、添加Flyway依赖4.3、Springboot添加Flyway配置4.4、创建执行SQL脚本4.5、启动测试4.6、Flyway版本管理5、SQL脚本文件命名规则6、使用注意事项1、前言在现代应用

kafka介绍

1.kafka概述消息中间件对比特性ActiveMQRabbitMQRocketMQKafka开发语言javaerlangjavascala单机吞吐量万级万级10万级100万级时效性msusmsms级以内可用性高(主从)高(主从)非常高(分布式)非常高(分布式)功能特性成熟的产品、较全的文档、各种协议支持好并发能力强、

SkyWalking快速上手(二)——架构剖析1

文章目录介绍架构概述一、Agent组件介绍Agent的配置配置参数详解service_namesample_n_per_3_secsnamespacecollector.backend_serviceAgent的工作原理二、Collector组件什么是Collector组件?Collector组件的配置配置Collec

List<HashMap<String,String>>实现自定义字符串排序(key排序、Value排序)

系列文章目录SpringBoot+Vue3实现登录验证码功能Java实现发送邮件(定时自动发送邮件)换个角度使用Redis去解决跨域存取Session问题Redis缓存穿透、击穿、雪崩问题及解决方法SpringCache的使用–快速上手篇更多该系列文章请查看我的主页哦文章目录系列文章目录前言一、根据Key值排序1.1.

【Spring Cloud系列】Feign详解与实战

Feign详解与实战文章目录Feign详解与实战一、概述二、什么是Feign三、Feign特性四、Feign简单使用3.1Feign使用步骤3.2Feign具体使用1.引入依赖2.启动类上添加注解3.编写FeignClient接口五、使用Feign发起http请求5.1Maven导入Feign配置,并集成Jackson

全球性区块链服务网络(BSN)机制体系、关键技术和应用项目科技成果鉴定会在北京举行

原标题:《全球性区块链服务网络(BSN)机制体系、关键技术和应用项目科技成果鉴定会在北京举行》2023年9月17日,“全球性区块链服务网络(BSN)机制体系、关键技术和应用”科技成果鉴定会在北京举行。鉴定委员会由北京交通大学移动专用网络国家工程研究中心主任、中国工程院院士张宏科,中国航天科技集团五院研究员、中国科学院院

Android Kotlin 协程详解

前言关于Kotlin基础和高阶函数又不熟悉的可以先参考文章:AndroidKotlin基础详解_袁震的博客-CSDN博客AndroidKotlin高阶详解_袁震的博客-CSDN博客什么是协程?要理解协程,就要将它和线程联系起来理解。线程是什么?我想大家都清楚,而协程,它比线程更加轻量级,一个线程上面可以有多个协程。如果

无人车开源软件架构

参考视频:Apollo自动驾驶入门课程开源软件架构开放式软件层分为三个子层:实时操作系统、运行时框架和应用程序模块层实时操作系统(RTOS)可确保在给定时间内完成特定任务,“实时”是指无人车的操作系统能够及时进行计算,分析并执行相应的操作。以上是在汽车传感器收集到外界数据后的短时间内完成。假设无人车的传感器检测到车辆前

requests模块高级用法练习

文章目录模拟浏览器指纹发送get请求发送post请求文件上传服务器超时模拟浏览器指纹打开http://10.9.75.164/php/functions/setcookie.php网页,找到请求头的UA字段,这段信息是浏览器的指纹(包括当前系统、浏览器名称和版本):在Python脚本中新建一个headers字段,将该U

热文推荐