RabbitMQ 几种模式

2023-09-13 11:28:51

一、Hello World 模式

        在这一部分中,我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。模型如下所示:

        在下图中,“ P” 是我们的生产者,“ C” 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区。

1.1 生产者

1.1.1 添加依赖

  <!--指定 jdk 编译版本 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依赖客户端 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖 -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

1.1.2 编写生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

// 生产者,用于推送消息
public class Producer {
    
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.229.146");
        factory.setUsername("admin");
        factory.setPassword("123");
        
        // channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化 默认消息存储在内存中
         * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message="hello world";
        /**
         * 发送一个消息
         * 1.发送到那个交换机
         * 2.路由的 key 是哪个
         * 3.其他的参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    }
}

1.1.3 推送数据

        运行代码,然后在我们的 rabbitmq 管理界面上可以看到,此时我们的 hello 队列里面,有一个消息等待被消费,如下所示:

1.2 消费者

1.2.1 编写消费者代码

import com.rabbitmq.client.*;

// 接收消息的消费者
public class Consumer {
    // 队列的名称
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.229.146");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息....");

        // 接收到消息时的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            String resultMessage= new String(message.getBody());
            System.out.println(resultMessage);
        };
        // 取消消费时的回调函数,比如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

1.2.2 接收数据

        启动代码,可以看到,我们成功的消费了 rabbitmq 里面的消息。

二、Work Queues 模式

        工作队列(又称任务队列)的主要思想是假设生产者发送大量的消息,会把消息放入队列之中,后面有多个工作线程去接收处理消息。

2.1 轮询分发消息

        在这个案例中我们会启动三个工作线程,一个消息发送线程,我们来看看他们三个工作线程是如何工作的。轮询模式就是你处理一个,我处理一个,按顺序一个一个的进行处理。

2.1.1 抽取工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
   // 得到一个连接的 channel
   public static Channel getChannel() throws Exception{
      // 创建一个连接工厂
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("192.168.229.147");
      factory.setUsername("admin");
      factory.setPassword("123");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      return channel;
   }
}

2.1.2 启动三个工作线程

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker02 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker03 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C3 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

2.1.3 启动一个发送线程

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

public class Task01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 10; i++) {
            String message = "I am message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息完成:" + message);
        }
    }
}

2.1.4 结果展示

        通过程序执行发现生产者总共发送 10 个消息,消费者二和消费者分别分得 3 个消息,消费者一分得 4 个消息,并且是按照有序的一个接收一次消息。

2.2 消息应答

2.2.1 概念

        消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了一部分任务,这时它突然挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们就会丢失正在处理的消息。后续发送给该消费者的消息也都会丢失。

        为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

        消息应答分为两种,一种是自动应答,一种是手动应答

2.2.2 自动应答

        消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,假设采用自动应答的模式,如果消息在被消费者接收到之前,出现了连接关闭或者 channel 关闭的情况,那么消息就丢失了。

        假设消费者接收的消息很多,且没有对传递的消息数量进行限制,这样就有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

        下面方法里面的第二个参数如果设置为 true 则为自动应答。

/**
 * 消费者消费消息
 * 1.消费哪个队列
 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
 * 3.消费者未成功消费的回调
 * 4.消费者取消消费的回调
*/
 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

2.2.3 手动应答

        手动应答有三种模式,第一种是 Channel.basicAck 肯定确认 ,如下所示,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。

   public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
            /**
             * 1、消息标记 tag
             * 2、false 表示只应答接收到的传递消息;true 表示应答所有传递过来的消息
             * */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }

        第二种是 Channel.basicNack 否定确认,第三种是 Channel.basicReject  也是用于否定确认,Channel.basicReject 比 Channel.basicNack 少一个参数,表示不处理该消息了直接拒绝,可以将其丢弃了。

2.2.4 消息自动重新入队

        如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

2.2.5 消息手动应答编码

        消息生产者代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

// 记得先创建下这个队列,执行下就可以了
public class Task01 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 2; i++) {
            String message = "I am message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息完成:" + message);
        }
    }
}

        两个消费者的代码如下所示,系统默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker01 {
    private static final String QUEUE_NAME="ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c1 等待接收消息时间较短");
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 1、消息标记 tag
             * 2、false 表示只应答接收到的传递消息;true 表示应答所有传递过来的消息
             * */
            System.out.println("接收到消息:"+receivedMessage);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c2 等待接收消息时间较长");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            try {
                Thread.sleep(30000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 1、消息标记 tag
             * 2、false 表示只应答接收到的传递消息;true 表示应答所有传递过来的消息
             * */
            System.out.println("接收到消息:" + receivedMessage);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

2.2.6 结果展示

        启动消费者和生产者,消费者 1 由于阻塞时间短,很快的处理完消息,而消费者 2 则会阻塞一会,等到 30s 之后才会开始处理消息,如下所示:

        等过了一会之后,消息成功被消费者 2 消费,如下所示:

        如果消费者 2 等待的过程中,手动将其关闭的话,会发生什么?可以发现消息被消费者 1 消费了,并没有丢失消息。

2.3 RabbitMQ 持久化

2.3.1 概念

        上一节我们学习了消息应答可以保证处理的任务不会丢失,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失呢?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,队列和消息就都丢了。除非配置相关的参数。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

2.3.2 队列持久化

        之前我们创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化,需要在声明队列的时候把 durable 参数设置为持久化,如下所示:

// 设置消息队列持久化
boolean durable=true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

        删除完毕之后,再次启动生产者,可以看到 D 表示的就是这个队列是持久化的。

        此时,即使重启 rabbitmq 队列也依然存在。 

2.3.3 消息持久化

        将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,后面会详细说明。

        要想让消息实现持久化需要在消息生产者修改代码,添加如下这个属性。

# 第三个参数为 null 表示不持久化
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

# 第三个参数设置为 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息要持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

2.3.4 不公平分发

        在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮询分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候如果我们还是采用轮询分发的话,就会导致处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。

        为了避免这种情况,我们可以使用不公平分发,在消费者代码里面设置参数 channel.basicQos(1) 即可。如下:

# 默认等于0,表示轮询,如果等于1的话就表示不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);

        分别启动消费者和生产者,打开 rabbitmq 的管理界面如下:

        意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。 

2.3.5 预取值

        rabbitmq 本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

        这个时候就可以通过使用 basic.qos 方法设置 “预取计数” 值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,类似于 channel 的最大容量。

        例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACKRabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。

        通常,增加预取值将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器),应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程。

        不同的负载预取值也不同, 100 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

        说白了就是根据你的消费者的性能优劣,动态的去设置 channel ,性能好的,处理速度快的设置大些,性能不好的设置小些,通过 channel.basicQos 参数进行设置,默认等于 0 表示轮询,等于 1 表示不公平分发,设置成其他值则表示预约值,即消息缓冲区的大小。

三、发布确认

3.1 发布确认概念

        我们在前面的几个章节说过,为了防止数据丢失,我们可以将队列和消息都进行持久化的操作。但是,这还是不够好的,如果生产者再向 rabbitmq 发送消息,而 rabbitmq 还没来得及存储到磁盘的时候崩了,消息就丢失了,这个时候该怎么办?此时就引入了发布确认的概念,即生产者向 rabbitmq 发送消息,而 rabbitmq 给生产者个反馈,无论是否可以成功的接收到消息,都给一个反馈,这样就比较合理了。

3.2 发布确认策略

3.2.1 开启发布确认

   发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法。

// 只需要在创建完 channel 信道之后创建即可
channel.confirmSelect();

3.2.2 单个确认发布

        这是一种最简单的同步确认发布的方式,只有前面发送的消息被确认发布了,后续的消息才能继续发布。

        这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        // 开启发布确认
        channel.confirmSelect();
        long begin = System.currentTimeMillis();
        for(int i=0;i<1000;i++){
            String message = i +"";
            channel.basicPublish("",queueName,null,message.getBytes());
            // 单个消息马上进行发布确认,可以通过返回值进行判断是否通知成功
            boolean flag = channel.waitForConfirms();
        }
        long end = System.currentTimeMillis();
        System.out.println("发布1000个单独确认消息,耗时:"+(end-begin)+"ms");
    }
}

3.2.3 批量确认发布

        上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task02 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 批量确认消息大小
        int batchSize = 100;
        // 未确认消息个数
        int outstandingMessageCount = 0;
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            outstandingMessageCount++;
            if (outstandingMessageCount == batchSize) {
                channel.waitForConfirms();
                outstandingMessageCount = 0;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布1000个批量确认消息,耗时:" + (end - begin) + "ms");
    }
}

3.2.4 异步确认发布

        异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

        如下图,首先消息生产者发送消息到队列中(信道),数据在信道中的存储类型类似于 map key 存储消息序号,value 存储具体的消息内容。在信道里面会根据 key 为消息排列顺序,这样的好处是消息推送的成功与否完全可以根据 key 来识别出来。

       broker rabbitmq 的消息实体,当它接收到 1 号消息的时候,它就会进行一次确认收到的回调函数,告诉消息生产者消息我收到了,如果没有收到 1 号消息,它就会调用未收到消息的回调函数通知生产者,消息我没有收到。

        所以作为消息生产者,我只管一直发送消息即可,将来会由 broker 告诉我哪些消息接收到了,哪些消息没有接收到。只需要将没接收到的消息重新发送即可。收到的不做任何处理。且 broker 的通知类型为异步,速度会很快。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task03 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始的时间
        long begin = System.currentTimeMillis();

        // 消息确认成功的回调函数
        ConfirmCallback ackCallback = (deliveryTag,multiple)->{
            System.out.println("确认的消息:"+deliveryTag);
        };
        // 消息确认失败的回调函数
        // 第一个参数:消息的标记。第二个参数:是否为批量确认
        ConfirmCallback nackCallback = (deliveryTag,multiple)->{
            System.out.println("未确认的消息:"+deliveryTag);
        };
        // 添加一个消息的监听器,用于监听哪些消息成功了,哪些消息失败了
        // 异步通知
        // 第一个参数:监听哪些消息成功了。第二个参数:监听哪些消息失败了
        channel.addConfirmListener(ackCallback,nackCallback);
        // 批量发送消息
        for(int i=0;i<1000;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
        }
        // 结束的时间
        long end = System.currentTimeMillis();
        System.out.println("发布1000个异步确认消息,耗时:" + (end - begin) + "ms");
    }
}

3.2.5 处理异步未确认消息

        最好的解决的解决方案就是把未确认的消息放到一个基于内存且能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue, 这个队列可以在 confirm callbacks 与发布线程之间进行消息的传递。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class Task03 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        /** 创建一个线程安全有序的哈希表,适用于高并发的情况
         * 1、将序号与消息进行关联
         * 2、可以根据需要批量的删除消息条目
         * 3、支持高并发
         */
        ConcurrentSkipListMap<Long,String>  outstandingConfirms =new ConcurrentSkipListMap<>();
        // 开始的时间
        long begin = System.currentTimeMillis();

        // 消息确认成功的回调函数
        ConfirmCallback ackCallback = (deliveryTag,multiple)->{
            if(multiple){
                // 第二步:删除掉已经确认的消息,剩下的就是未确认的消息
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else{
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息:"+deliveryTag);
        };
        // 消息确认失败的回调函数
        // 第一个参数:消息的标记。第二个参数:是否为批量确认
        ConfirmCallback nackCallback = (deliveryTag,multiple)->{
            // 第三步:打印未确认的消息有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:"+message+":::未确认消息的标记为:"+deliveryTag);
        };
        // 添加一个消息的监听器,用于监听哪些消息成功了,哪些消息失败了
        // 异步通知
        // 第一个参数:监听哪些消息成功了。第二个参数:监听哪些消息失败了
        channel.addConfirmListener(ackCallback,nackCallback);
        // 批量发送消息
        for(int i=0;i<1000;i++){
            String message = i+"";
            // 第一步:记录下所有要发送的消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
            channel.basicPublish("",queueName,null,message.getBytes());  
        }
        // 结束的时间
        long end = System.currentTimeMillis();
        System.out.println("发布1000个异步确认消息,耗时:" + (end - begin) + "ms");
    }
}

3.2.6 三种发布确认速度对比

        单独发布消息:同步等待确认,简单,但吞吐量非常有限。

        批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。

        异步发布处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。

更多推荐

基于STM32CUBEMX驱动TMOS模块STHS34PF80(5)----配置嵌入式函数

基于STM32CUBEMX驱动TMOS模块STHS34PF80----4.中断获取信号概述视频教学样品申请视频教程参考Demo参考Demo完整代码下载内嵌函数地址串口配置IIC配置IO口设置串口重定向参考程序初始化IIC写函数IIC读函数获取ID设备的自动引导过程和关机模式配置省电模式温度数据的灵敏度值设置低通滤波器温

【面试题】智力题

文章目录腾讯1000瓶毒药里面只有1瓶是有毒的,问需要多少只老鼠才能在24小时后试出那瓶有毒。有两根不规则的绳子,两根绳子从头烧到尾均需要一个小时,现在有一个45分钟的比赛,裁判员忘记带计时器,你能否通过烧绳子的方式来为这场比赛计时?有25匹马,5条赛道,每条赛道同时只能有一匹马跑,假设每匹马的水平都很稳定,在没有计时

go并发处理业务

引言实际上,在服务端程序开发和爬虫程序开发时,我们的大多数业务都是IO密集型业务,什么是IO密集型业务,通俗地说就是CPU运行时间只占整个业务执行时间的一小部分,而剩余的大部分时间都在等待IO操作。IO操作包括http请求、数据库查询、文件读取、摄像设备录音设备的输入等等。这些IO操作会引起中断,使业务线程暂时放弃cp

PaddlePaddle Hackathon 飞桨黑客马拉松热身赛上线!

挑战自我拓展技能激发创新挑战极限再次相遇黑客松我们期待你的加入!第五期PaddlePaddleHackathon飞桨黑客马拉松热身赛上线,本次活动是面向全球开发者的深度学习领域编程活动,鼓励开发者了解和参与飞桨深度学习开源项目与文心大模型开发实践。本次飞桨黑客马拉松活动包含三大赛道:开源贡献个人挑战赛、大模型应用与创意

Mybatis - 常用 SQL 语句设计思路及具体实现 - 数据存在则更新,不存在则插入、批量更新、批量插入、连表查询 + - 字段加减法

目录序言一、数据存在则更新,不存在则插入1、ONDUPLICATEKEYUPDATE的具体xml用法:(虽然有点问题,但没准以后有用到的时候)onduplicatekeyupdate用法总结:二、批量更新方法一:(数据量越多,容易变成慢SQL,不太推荐)方法二方法三(推荐)三、批量插入四、连表查询+-字段加减法1、连表

SQL故障和排查解决浅析

MySQL长连接MySQL长连接是指应用程序与MySQL数据库之间的连接在执行完一个操作后不会立即关闭,而是保持活动状态以供后续使用。这种连接模式在某些情况下可以提高性能,但也可能导致一些问题。以下是MySQL长连接的一些现象和排查方法:现象连接数积累增加:如果应用程序中的长连接没有得到正确管理和释放,数据库服务器上的

《PostgreSQL数据分区:原理与实战》

🌷🍁博主猫头虎(🐅🐾)带您GotoNewWorld✨🍁🐅🐾猫头虎建议程序员必备技术栈一览表📖:🛠️全栈技术FullStack:📚MERN/MEAN/MEVNStack|🌐Jamstack|🌍GraphQL|🔁RESTfulAPI|⚡WebSockets|🔄CI/CD|🌐Git&Versio

Makefile详解&实战

title:Makefile详解&实战date:2023-09-2012:01:24comments:true#是否可评论toc:true#是否显示文章目录categories:#分类-CMaketags:#标签-CMake-Makefilesummary:Makefile详解&实战Makefile详解&实战什么是Ma

《打造高可用PostgreSQL:策略与工具》

🌷🍁博主猫头虎(🐅🐾)带您GotoNewWorld✨🍁🐅🐾猫头虎建议程序员必备技术栈一览表📖:🛠️全栈技术FullStack:📚MERN/MEAN/MEVNStack|🌐Jamstack|🌍GraphQL|🔁RESTfulAPI|⚡WebSockets|🔄CI/CD|🌐Git&Versio

什么是JVM常用调优策略?分别有哪些?

目录一、JVM调优二、堆内存大小调整三、垃圾回收器调优四、线程池调优一、JVM调优Java虚拟机(JVM)的调优主要是为了提高应用程序的性能,包括提高应用程序的响应速度和吞吐量。以下是一些常用的JVM调优策略:堆内存大小调整:JVM的堆内存是用于存储对象实例的内存区域。通过调整堆内存的大小,可以找到最适合你的应用程序的

【案例教学】华为云API对话机器人的魅力—体验AI垃圾分类机器人

云服务、API、SDK,调试,查看,我都行阅读短文您可以学习到:人工智能AI自言语言的情感分析、文本分词、文本翻译1IntelliJIDEA之API插件介绍API插件支持VSCodeIDE、IntelliJIDEA等平台、以及华为云自研CodeArtsIDE,基于华为云服务提供的能力,帮助开发者更高效、便捷的搭建应用。

热文推荐