消息队列-rabbitMq

2023-09-18 21:06:16

消息队列(MQ)到底能干什么?
MQ全称为Message Queue,也就是消息队列,是应用程序和应用程序之间的通信方法。
在微服务盛行的当下,MQ被使用的也是越来越多,一般常用来进行业务异步解耦、解耦微服务、流量削峰填谷、消息分发、分布式事务的数据一致性。

1、业务异步解耦
最常见的场景就是用户注册之后,需要发送注册短信、邮件通知,以告知相关信息。使用MQ,只需要在处理完用户信息之后,给MQ发送两个消息即可,邮件服务、短信服务监听MQ的任务消息,根据消息进行发送即可。
直销银行中台处理支付操作,将支付报文发给支付中心MQ,支付中心监听到请求,会处理发送第三方,将最终处理结果发给中台系统接收mq。
中台系统也可以轮询查询终态。

2、流量削峰填谷
MQ比较常用的一个场景,一般在秒杀、搞活动中使用广泛。使用MQ,可以将需要处理的消息全部放入其中,系统按照最大处理能力,去获取消息进行消费,这样就可以将一瞬间过来的请求,分散到一段时间内进行处理,避免了系统的崩溃。

3、消息分发
多个系统对同一个数据感兴趣,只需要监听同一类消息即可。
例如付款系统,在付款成功之后,正常做法是通知外围系统这个单子付款成功了,或者是外围系统定时来拉取付款结果,
使用MQ后,付款系统可以在付款成功之后,将消息放到MQ里面,想知道这个结果的系统订阅这个主题的消息即可,非常方便,也不需要定时去拉取数据了。

4、分布式事务的数据一致性
RocketMQ事务消息发送步骤如下:
1、发送方将半事务消息发送至消息队列RocketMQ。
2、消息队列RocketMQ将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
3、发送方开始执行本地事务逻辑。
4、发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:
1、在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
2、发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
3、发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

RabbitMQ入门
Exchange:交换器,Exchange有4种类型: direct(默认), fanout, topic,和headers,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Queue:消息队列,用来保存消息直到发送给消费者。
Binding:用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

1、基本消息类型
rabbitTemplate.converAndSend(queue, message)
@RabbitListener(queue)

2、工作队列
rabbitTemplate.converAndSend(queue, message)
多个监听器消费

3、发布和订阅模型
3.1、Fanout广播:将收到的消息路由给所有绑定的queue
定义配置类,配置FanoutExchange、Queue、Binding
rabbitTemplate.converAndSend(Exchange, “”, message)
@RabbitListener(queue)

3.2、DriectExchage:将收到的消息发给指定queue
定义配置类,配置DirectExchange、Queue、Binding(with指定 routingKey )
rabbitTemplate.converAndSend(Exchange, routingKey, message)
@RabbitListener(queues)

3.3、TopicExchange话题广播
定义配置类,配置DirectExchange、Queue、Binding(with指定 routingKey ) #:0或者多个 *:一个单词
rabbitTemplate.converAndSend(Exchange, routingKey, message)
@RabbitListener(queues)

什么是事务?
数据库事务。原子性、一致性、隔离性、持久性。

本地事务?
整个服务操作只能涉及一个单一的数据库资源。

分布式事务?
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是为了保证不同数据库的数据一致性。

分布式事务应用架构?
微服务架构的分布式应用环境下,越来越多的应用要求对多个数据库资源,多个服务的访问都能纳入到同一个事务当中,分布式事务应运而生。

分布式事务有几种解决方案: 两阶段提交/XA MQ
第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。
第二阶段 (commit/rollback):当事务管理者™确认所有参与者(RM)都ready后,向所有参与者发送commit命令。

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

基本用法

1、简单队列模式

生产者
// 1、获取到通道
Channel channel = RabbitMqUtils.getChannel();

// 2、声明(创建)队列
// queue:队列的名称
// durable:是否持久化
// exclusive:设置是否排他。
// autoDelete:设置是否自动删除。至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
// arguments:设置队列的其他一些参数
channel.queueDeclare(“test”, true, false, false, null);

// exchange:要将消息发送到的Exchange(交换器)
// routingKey: 路由KEY
// props: 消息的其它属性,如:路由头等
// body: 消息体
channel.basicPublish(“”, “test”, null, (i + “hello”).getBytes());

消费者
// 1、获取通道
Channel channel = RabbitMqUtils.getChannel();

// 2、接收到消息回调此函数
// queue:队列名
// autoAck:true 接收到传递消息自动应答,false 接收到消息后不自动应答服务器
// deliverCallback: 当一个消息发送过来后的回调接口
channel.basicConsume(“test”, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-one: 收到的消息: " + new String(body));
}
});

2、消息自动确认
channel.basicQos(1); // 批量确认模式:通过设置channel.basicQos方法的prefetchCount参数为大于1的值,同样需要手动调用channel.basicAck方法来确认消息。
channel.queueDeclare(“firstQueue”, false, false, false, null);

// 消息改为手动确认
channel.basicConsume(“firstQueue”, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(body, Charset.defaultCharset()));
// 手动确认消息已经被消费了, 第一个参数是当前消费的消息的标签(递增的整数)
// 第二个参数是否确认多条消息,包括之前消费的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});

发布订阅模型(fanout)
exchange类型有如下3种:
fanout 广播, 把消费转发给所有 绑定到该交换机的所有队列;
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
topic 通配符, 把消息交给 routing pattern(路由模式)的队列;

exchange 交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;

每个消费者监听自己的队列;
生产者把消息发送给 broker, 由交换机把消息转发到绑定此交换机的所有队列;交换机需要与队列绑定, 绑定之后,一个消息可以被多个消费者收到;

// 声明一个交换机, 交换机的类型为 fanout
参数1-交换机名称
参数2-交换机类型(fanout, topic, direct, headers)
channel.exchangeDeclare(“multiple”, BuiltinExchangeType.FANOUT);

channel.basicPublish(“multiple”, “”, null, “fanout-message”.getBytes());

RabbitMQUtils.close(channel, connection);

//创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称
String qName = channel.queueDeclare().getQueue(); //创建临时队列
//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
channel.queueBind(qName, “multiple”, “”);

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, Charset.defaultCharset()));
}
});

直连模型(direct)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

channel.exchangeDeclare(“direct-module”, BuiltinExchangeType.DIRECT);

channel.basicPublish(“direct-module”, “success”, null, “成功信息”.getBytes());

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, “direct-module”, “success”);

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, Charset.defaultCharset()));
}
});

主题模式(topic)
Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符,多个单词之间以”.”分割

统配符

	* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
	# (hash) can substitute for zero or more words.  匹配一个或多个词

如:

	audit.#    匹配audit.irs.corporate或者audit.irs 等
    audit.*   只能匹配 audit.irs

// 声明一个交换机, 交换机的类型为 fanout
channel.exchangeDeclare(“topic-module”, BuiltinExchangeType.TOPIC);

channel.basicPublish(“topic-module”, “company.java”, null, “通知信息”.getBytes());

RabbitMQUtils.close(channel, connection);

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, “topic-module”, “company.#”);
channel.queueBind(qName, “topic-module”, “company.java.#”);
channel.queueBind(qName, “topic-module”, “company.html.*”);

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, Charset.defaultCharset()));
}
});

RabbitMQ与springboot整合
1、添加依赖 spring-boot-starter-amqp
2、配置RabbitConfig,包含RabbitListenerContainerFactory、RabbitTemplate

简单模型
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void simpleMessageSend() {
    rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
}

}

@RabbitListener(queuesToDeclare = {@Queue(“simpleQueue”)})
public void simpleModel(User user) {
log.info(“message: {}”, user);
}

发布订阅模型
// fanout模型
@Test
public void fanoutMessageSend() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(“fanout-ex”, “”, new User(i, “张三”));
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

直连模式(direct)
@Test
public void directMessageSend() {
//rabbitTemplate.convertAndSend(“direct-ex”, “success”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“direct-ex”, “error”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”, “success”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

topic模型
@Test
public void topicMessageSend() {
//rabbitTemplate.convertAndSend(“topic-ex”, “company”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“topic-ex”, “company.java”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.java.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.html.*”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format(“消费者 【three】: %s”, user));
}

消息的手动确认
spring:
rabbitmq:
listener:
simple:
# 提交方式为手动
acknowledge-mode: MANUAL

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。

RabbitMQ如何做到消息不丢失?

1.持久化

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

2.确认机制

消费者通过basic.ack命令向RabbitMQ服务器确认已经消费了消息。如果消费者处理消息时发生错误或宕机,RabbitMQ会重新将消息发送给其他消费者。RabbitMQ在接收到消费者确认消息前会将消息保存在内存中,在确认后才会删除消息。

3.发布者确认

RabbitMQ支持发布者确认机制,即发布者在将消息发送到队列后,等待RabbitMQ服务器的确认消息。成功保存到队列的消息会返回确认消息给发布者,如果无法保存则返回Nack(Negative Acknowledgement)消息。通过发布者确认机制,可以确保消息成功发送到RabbitMQ服务器。

4.备份队列

RabbitMQ支持备份队列(Alternate Exchange)机制,即在消息发送到队列之前,先将消息发送到备份队列。如果主队列无法接收消息,RabbitMQ会将消息发送到备份队列中。备份队列通常是一个交换机,在创建队列时可以通过x-dead-letter-exchange属性指定备份队列。

三、注意事项 在使用RabbitMQ消息确认机制时,需注意以下几点:

1、确认模式的选择: 根据具体业务需求选择合适的确认模式。如果对消息传递的可靠性要求较高,建议使用手动确认模式或批量确认模式。

2、设置消息持久化: 在发布消息时,通过设置消息的持久化属性,可以确保即使RabbitMQ服务器意外关闭或重启,消息仍然能够被保存下来。

3、处理未确认消息: 如果消费者在处理消息时发生异常,导致无法发送确认信号给RabbitMQ,那么这些消息将保持在队列中,并可能被重新投递。需要设定适当的重试策略和错误处理机制来处理这些未确认的消息。

4、监听确认超时: 在使用批量确认模式时,如果长时间没有收到确认信号,需要设置合理的超时时间,并采取相应措施来处理超时的情况。

RabbitMQ消息确认机制详解:确保交付成功

消息确认机制是通过发布者(Producer)和消费者(Consumer)之间的交互来实现的。
当发布者发送消息到RabbitMQ后,会等待确认结果。如果消息成功被消费者接收并处理,消费者会发送一个确认信号给RabbitMQ,告知消息已经处理完成。而RabbitMQ则会根据接收到的确认信号,判断消息是否成功交付。
消息确认机制一般有两种模式:简单模式和批量模式。在简单模式中,每发送一条消息后就等待确认;而在批量模式中,可以一次发送多条消息,然后等待批量确认。无论是简单模式还是批量模式,都可以提高消息传递的可靠性。

https://blog.csdn.net/weixin_52278591/article/details/128841155 RabbitMq 消息确认机制详解

https://blog.csdn.net/u011942456/article/details/128198956 总结RabbitMq消息丢失和消息重复消费问题

https://www.zhihu.com/question/54756562?utm_id=0 rabbitmq是什么,主要用于哪些方面?

https://zhuanlan.zhihu.com/p/583520436?utm_id=0 用了8年MQ!聊聊消息队列的技术选型,哪个最香!

更多推荐

Vuex —— 状态管理 | Module

在前面讲到了关于Vuex数据状态管理的内容,讲了Vuex的五大核心属性,在这五大核心属性中就state、mutation和actions在前面介绍Vuex状态管理和讲Vuex中的同步和异步操作已经比较熟悉了,getter是基于state的计算属性,vue中computed从data中派生出的计算属性,vuex中gett

【校招VIP】java语言考点之嵌套类&内部类

考点介绍:嵌套类&内部类问题在校招面试中经常出现。以在一个类的内部定义另一个类,这种类称为嵌套类(nestedclasses),它有两种类型:静态嵌套类和非静态嵌套类。静态嵌套类使用很少最重要的是非静态嵌套类,也即是被称作为内部类(inner)。java语言考点之嵌套类&内部类-相关题目及解析内容可点击文章末尾链接查看

如何搭建Linux环境

W...Y的主页😊代码仓库分享💕当我们想要搭建一个Linux系统,我们应该怎么使用呢?今天我就带领大家搭建Linux系统!!!目录Linux环境安装双系统(不推荐)powwershell(不推荐)虚拟机+centos7镜像使用云服务器(推荐)XShell下的复制粘贴Linux环境安装双系统(不推荐)在计算机上安装L

Linux MQTT智能家居(源码使用分析)

文章目录前言一、连接服务器1.初始化客户端2.设置端口号设置IP地址3.连接服务器二、发布消息三、订阅消息总结前言本篇文章开始我们来分析一下大佬写的MQTT源码,并且来看看怎么样使用MQTT连接到服务器。MQTT源码地址:源码地址这里找到源码中的test.c进行分析:一、连接服务器1.初始化客户端首先使用mqtt_le

Linux MQTT智能家居(ubantu和ARM中使用MQTT)

文章目录前言一、在ubantu中使用MQTT1.安装cmake2.编译MQTT库二、在ARM中使用MQTT三、使用自己的服务器四、ARM板服务器MQTTX三者关系五、MQTTX的使用六、ARM使用MQTT的方法1.修改MQTT源码2.使用库3.把MQTT源码加入到自己的工程总结前言本篇文章将会带大家在ubantu和AR

elementui-slider 滑动时会重置为0的问题解决

文章目录问题描述:问题排查问题解决总结问题描述:首次打开有elementui-slider的页面,不管滑动哪个滑块,滑动时都会自动归0(划得动,但是会自动回到最左侧0的位置),但是他确实触发了change函数。问题排查尝试了很多方法,包括将slider的值设置为number类型,虽然它本身就是number,用了一个干净

Webserver项目解析

一.webserver的组成部分Buffer类用于存储需要读写的数据Channel类存储文件描述符和相应的事件,当发生事件时,调用对应的回调函数ChannelMap类Channel数组,用于保存一系列的ChannelDispatcher监听器,可以设置为epoll类型/select类型/poll类型。HttpReque

云计算的下一个飞跃:容器编排与Kubernetes最新趋势解析

文章目录容器和容器编排的基础什么是容器?什么是容器编排?Kubernetes的崛起Kubernetes的最新趋势1.云原生生态系统的增长2.云原生安全性3.边缘计算支持4.多集群管理5.GitOps的兴起总结🎉欢迎来到云计算技术应用专栏~探索Java中的静态变量与实例变量☆*o(≧▽≦)o*☆嗨~我是IT·陈寒🍹✨

CDN加速器有哪些?

一、前端使用CDN加速的优缺点1.CDN优点(1).提高网站性能:CDN(内容分发网络)可以将静态资源(如脚本、样式表、图片等)缓存在服务器节点上,并通过就近访问用户,从而提供更快的加载速度和更好的用户体验。(2).减轻源服务器负载:CDN可以将用户请求的静态资源分发到全球各个边缘节点,从而将访问流量分散到多个服务器上

WebGIS开发教程:天地图的地理坐标系和投影坐标系的区别

天地图是⼀个在线地图服务提供商,⽀持多种地图投影和坐标系。在使用天地图时,需要了解地理坐标系、投影坐标系之间的区别。地理坐标系地理坐标系是⼀个用于表示地球表面上点位置的坐标系统,通常使用经度和纬度来表示。地理坐标系是⼀个三维坐标系统,以地球质心为原点,以地球⾚道面和⼀个固定点(例如北极点)为基准面。投影坐标系投影坐标系

【ONE·Linux || 进程间通信】

总言进程间通信:简述进程间通信,介绍一些通信方式,管道通信(匿名、名命)、共享内存等。文章目录总言1、进程间通信简述2、管道2.1、简介2.2、匿名管道2.2.1、匿名管道的原理2.2.2、编码理解:用fork来共享管道2.2.2.1、准备工作一:创建一个管道pipe2.2.2.2、准备工作二:创建子进程,对父子进程构

热文推荐