RabbitMQ —— 延迟队列

2023-09-19 10:00:00

前言

        前面荔枝梳理了有关死信队列的知识,延伸过来我们可以借助死信队列来理解延时队列。在实际需求中我们总是不可避免地需要一些定时任务,为了避免大量的计时操作消耗性能,我们常常采用的是延时队列来存储相应的消息,这时候死信队列的特点就出现了,死信队列使用的一个前提就是消息的TTL过期。在这篇文章中,荔枝会梳理延迟队列的相关知识,主要涉及两种实现延时队列的方式,希望能帮助到有需要的小伙伴~~~


文章目录

前言

一、延时队列

1.1 基本概念

1.2 有关TTL的优化

1.3 基于插件的延时队列 

1.4 延时队列的应用场景 

总结


一、延时队列

1.1 基本概念

        延时队列队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列,最简单的延时队列是基于死信队列的,我们通过设置TTL的方式来实现将消息按照指定的时间被丢到死信队列中被消费。

队列和交换机的配置类

        我们定义NORMAL_QUEUE_A和NORMAL_QUEUE_B作为普通队列,配置这两个队列中的routingKey,各自的TTL分别是10s和40s,定义DEAD_QUEUE作为死信队列,并绑定了这三种队列和交换机的关系。

package com.crj.rabbitmqtestspringboot.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLQueueConfig {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
    //死信交换机名称
    public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";
    //普通队列名称
    public static final String NORMAL_QUEUE_A = "NORMAL_QUEUE_A";
    public static final String NORMAL_QUEUE_B = "NORMAL_QUEUE_B";
    //死信队列名称
    public static final String DEAD_QUEUE = "DEAD_QUEUE";

    //通用队列名称
    public static final String NORMAL_QUEUE_C = "NORMAL_QUEUE_C";

    //声明交换机
    @Bean("nExchange")
    public DirectExchange normalExchange(){
        return new DirectExchange(NORMAL_EXCHANGE);
    }
    @Bean("dExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //声明普通队列
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信routingKey
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",10000);

        return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build();
    }
    @Bean("queueB")
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>();
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信routingKey
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",40000);

        return QueueBuilder.durable(NORMAL_QUEUE_B).withArguments(arguments).build();
    }
    //声明死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    //交换机和队列绑定
    @Bean
    public Binding queueABindingNExchange(@Qualifier("queueA") Queue queueA ,
                                          @Qualifier("nExchange") DirectExchange nExchange){

        return BindingBuilder.bind(queueA).to(nExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingNExchange(@Qualifier("queueB") Queue queueB ,
                                          @Qualifier("nExchange") DirectExchange nExchange){

        return BindingBuilder.bind(queueB).to(nExchange).with("XB");
    }
    @Bean
    public Binding queueDBindingDExchange(@Qualifier("queueD") Queue queueD ,
                                          @Qualifier("dExchange") DirectExchange dExchange){

        return BindingBuilder.bind(queueD).to(dExchange).with("YD");
    }
}

死信队列的消费者

        死信队列中的消费者负责消费因时间过期而被塞到死信队列中的消息,这里我们没有写NORMAL_QUEUE_A和NORMAL_QUEUE_B处理消息的方法,是为了让消息超过TTL而被丢到死信队列中。

package com.crj.rabbitmqtestspringboot.controller.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "DEAD_QUEUE")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),msg);

    }
}

消息生产者

生产者会通过交换机向普通队列中发送消息,并等待消息的发布确认。 

package com.crj.rabbitmqtestspringboot.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    /**
     * 发送延迟消息
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("NORMAL_EXCHANGE","XA","消息来自ttl为10s的队列:"+message);
        rabbitTemplate.convertAndSend("NORMAL_EXCHANGE","XB","消息来自ttl为40s的队列:"+message);
    }
}

1.2 有关TTL的优化

        在上面的示例demo中,我们在普通队列中就定义好了相应的过期时间TTL,但是这样会导致我们在面对不同的TTL需求的时候需要认为创建大量的普通队列,这时候我们可以借助参数传递来实现由消息发布者来设定消息被正常队列消费前的等待时间

    public static final String NORMAL_QUEUE_C = "NORMAL_QUEUE_C";

//声明QC
    @Bean("queueC")
    public Queue ququeC(){
        Map arguments = new HashMap<>();
        //设置死信交换机
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",NORMAL_QUEUE_C);
        //设置死信routingKey
        arguments.put("x-dead-letter-routing-key","YD");

        return QueueBuilder.durable(NORMAL_QUEUE_C).withArguments(arguments).build();
    }
    @Bean
    public Binding queueCBindingNExchange(@Qualifier("queueC") Queue queueC,@Qualifier("nExchange") DirectExchange nExchange){
        return BindingBuilder.bind(queueC).to(nExchange).with("XC");
    }

        在生产者发送消息的时候通过rabbitTemplate.convertAndSend()方法的第四个参数传入一个回调函数,在这个lambda表达式中 msg.getMessageProperties().setExpiration(ttlTime);来设置TTL的具体时间。

package com.crj.rabbitmqtestspringboot.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    /**
     * 发送延迟消息
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //带TTL的消息队列发送
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条时长为:{}ms的消息给TTL队列:{}",new Date().toString(),ttlTime,message);
        rabbitTemplate.convertAndSend("NORMAL_EXCHANGE","XA","消息来自ttl为10s的队列:"+message,(msg)->{
            //发送消息的延时时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
}

1.3 基于插件的延时队列 

        基于死信队列的延时看起来似乎没什么问题,但是通过在消息属性上设置TTL的方式,消息可能并不会按照我们设置的TTL被消费,因为RabbitMQ只会检查一个消息是否过期如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先被执行。简单来说,延时队列中的消息传递并不是并发进行的,如果队列中的消息延时时间比较长的话就会导致队列信道阻塞,为了解决这个问题我们可以采用基于插件的延时队列来实现。

插件下载地址https://www.rabbitmq.com/community-plugins.html 

下载插件后将插件放在rabbitmq的插件目录下/plugins

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

rabbitmg-plugins enable rabbitmq_delayed_message_exchange

之后重启rabbitmq服务即可。

相比于上面的基于死信队列的延时队列的实现,基于插件的延时队列是通过交换机延时来实现延时的。

延时队列和交换机声明类

这里的CustomExchange类对象的实例化提供了五个参数:

  • String name:交换机名称
  • String type:交换机的类型
  • boolean durable:持久化与否
  • boolean autoDelete:是否自动删除
  • Map<String, Object> arguments:其它参数
package com.crj.rabbitmqtestspringboot.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayQueueConfig {
//    队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//    交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//  routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    //声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //声明交换机
    @Bean
    public CustomExchange delayedExchange(){
        /**
         * 1.交换机名称
         * 2.交换机的类型
         * 3.持久化与否
         * 4.是否自动删除
         * 5.其它参数
         */
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
    }

    //绑定
    @Bean
    public Binding delayQueueBinding(@Qualifier("delayedQueue") Queue delayedQueue,
                                     @Qualifier("delayedExchange") CustomExchange delayedExchange
    ){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();

    }
}

消息发布者

设置延时消息的延时时间通过rabbitTemplate对象的convertAndASend()方法中的一个lambda表达式设置延时消息:msg.getMessageProperties().setDelay(delayTime); 

package com.crj.rabbitmqtestspringboot.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    /**
     * 发送延迟消息
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendExpirationMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
        log.info("当前时间:{},发送一条时长为:{}ms的消息给延时队列:{}",new Date().toString(),delayTime,message);
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME,DelayQueueConfig.DELAYED_ROUTING_KEY,message,msg->{
            //设置延时消息
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }
}

消费者

package com.crj.rabbitmqtestspringboot.controller.consumer;

import com.crj.rabbitmqtestspringboot.config.DelayQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class DelayQueueConsumer {
    //接收消息
    @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelay(Message message) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}",new Date().toString(),msg);

    }
}

1.4 延时队列的应用场景 

        延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
        当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,不同的方式选择需要适用的实际的需求场景。


总结

        上面我们已经比较清楚地了解了延时队列的使用场景和两种延时队列的实现方式,希望能梳理清楚哈哈哈。在后续的文章中,荔枝也会继续输出有关RabbitMQ中间件的相关知识,继续努力学习ing~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

更多推荐

C++【个人笔记1】

1.C++的初识1.1简单入门#include<iostream>usingnamespacestd;intmain(){cout<<"helloworld"<<endl;return0;}#include<iostream>;预编译指令,引入头文件iostream.usingnamespacestd;使用标准命名空间

聊聊自动化测试路上会遇到的挑战~

一、测试范围无论是功能测试,还是自动化或者性能测试,第一步要做的,是明确测试范围和需求指标。对于自动化测试来说,特别是UI自动化,并不是所有的功能点都适合做UI自动化。根据具体的业务情况和项目稳定程度,选择UI自动化+API自动化结合,选择合适的业务点来进行针对性的自动化测试方案设计,才是最佳方案。①、使用频次较高,异

Logstash8.3.3 parse 包含a(AM PM)时间报错_dateparsefailure

遇到个奇奇怪怪的错误,我log中的时间是2023/08/2412:01:39AM,我写的格式化时间和转成东八区时间,但是就是一直报错_dateparsefailure,我反复检查了format没有问题,[xxxx][timestamp]这个字段从grok读出来也是正确的,百思不得其解。如果是这种的format的话,在g

Arduino程序设计(十三)触摸按键实验(TTP223)

触摸按键实验前言一、TTP223触摸按键模块二、触摸按键控制LED二、触摸按键状态检测前言本文介绍触摸按键控制LED的原理及实验,主要内容有:1、介绍TTP223触摸按键模块;2、触摸按键控制LED;3、触摸按键状态检测。一、TTP223触摸按键模块1、模块介绍:该模块是一个基于触摸检测IC(TTP223B)的电容式点

小样本目标检测:ECEA: Extensible Co-Existing Attention for Few-Shot Object Detection

论文作者:ZhimengXin,TianxuWu,ShimingChen,YixiongZou,LingShao,XingeYou作者单位:HuazhongUniversityofScienceandTechnology;UCAS-TerminusAILab论文链接:http://arxiv.org/abs/2309.

快速搭建接口自动化测试框架

1接口测试接口测试是对系统或组件之间的接口进行测试,主要是校验数据的交换,传递和控制管理过程,以及相互逻辑依赖关系。接口自动化相对于UI自动化来说,属于更底层的测试,这样带来的好处就是测试收益更大,且维护成本相对来说较低,是我们进行自动化测试的首选2框架选型目前接口自动化的框架比较多,比如jmeter,就可以集接口自动

企业数字化转型如何成功落地

企业数字化转型是当前趋势,是企业在面对日益变化的商业环境时必须采取的关键策略之一。然而,要实现数字化转型的成功落地并不容易,需要企业有明确的目标和正确的方法。数聚将探讨一些关键的步骤和策略,帮助企业实现数字化转型的顺利落地,并在竞争激烈的市场中取得优势。第一步是明确转型目标。企业在进行数字化转型时,必须清楚地知道自己的

【springMVC】高级部分

拦截器1拦截器(Interceptor)是一种动态拦截方法调用的机制#作用:1.在指定的方法调用前后执行预先设定后的的代码2.阻止原始方法的执行#核心原理:AOP思想#拦截器链:多个拦截器按照一定的顺序,对原始被调用功能进行增强2拦截器使用步骤1实现HandlerInterceptor接口/***三个方法的运行顺序为p

UI自动化测试用例管理平台搭建

用到的工具:python3+django2+mysql+RabbitMQ+celery+seleniumpython3和selenium这个网上很多教程,我不在这一一说明;平台功能介绍:项目管理:用于管理项目。每个项目可以设置多个环境,例如开发环境,测试环境,预发布环境,生产环境等。页面管理:主要用来方便对元素进行管理

通讯网关软件009——利用CommGate X2MQTT实现MQTT访问ODBC数据源

本文介绍利用CommGateX2MQTT实现MQTT访问ODBC数据源。CommGateX2MQTT是宁波科安网信开发的网关软件,软件可以登录到网信智汇(http://wangxinzhihui.com)下载。【案例】如下图所示,实现上位机通过MQTT来获取ODBC数据源的数据。【解决方案】设置网关机,与ODBC数据源

2023数A题——WLAN网络信道接入机制建模

A题——WLAN网络信道接入机制建模思路:该题主要考察的WLAN下退避机制建模仿真。资料获取问题1:假设AP发送包的载荷长度为1500Bytes(1Bytes=8bits),PHY头时长为13.6μs,MAC头为30Bytes,MAC头和有效载荷采用物理层速率455.8Mbps发送。AP之间的RSSI为-70dBm。大

热文推荐