RocketMQ 源码分析——Producer

2023-09-19 17:00:30

消息发送代码实现

下面是一个生产者发送消息的demo(同步发送)

image.png

主要做了几件事:

  • 初始化一个生产者(DefaultMQProducer)对象
  • 设置 NameServer 的地址
  • 启动生产者
  • 发送消息

消息发送者启动流程

image.png

DefaultMQProducerImpl类start()

image.png

检查配置

DefaultMQProducerImpl

image.png

获得MQ客户端实例

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表。DefaultMQProducerImpl类start()

image.png

一个clientId只会创建一个MQClientInstance

image.png

clientId生成规则:IP@instanceName@unitName

image.png

RocketMQ中消息发送者、消息消费者都属于”客户端“。每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

不同的生产者、消费端如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

image.png

启动实例

MQClientInstance类start()

image.png

定时任务

MQClientInstance类startScheduledTask()

image.png

Producer 消息发送流程

我们从一个生产者案例的代码进入代码可知:DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

image.png

image.png

从核心方法可知消息发送就是4个步骤:验证消息、查找路由、选择队列、消息发送。

image.png

image.png

选择队列

默认选择队列策略

采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。

故障延迟机制策略*

采用此策略后,每次向Broker成功或者异常的发送,RocketMQ都会计算出该Borker的可用时间(发送结束时间-发送开始时间,失败的按照30S计算),并且保存,方便下次发送时做筛选。

image.png

除了记录Broker的发送消息时长之外,还要计算一个Broker的不可用时长。这里采用一个经验值:

如果发送时长在550ms之内,不可用时长为0。

达到550ms,不可用时长为30S

达到1000ms,不可用时长为60S

达到2000ms,不可用时长为120S

达到3000ms,不可用时长为180S

达到15000ms,不可用时长为600S

image.png

image.png

有了以上的Broker规避信息后发送消息就非常简单了。

在开启故障延迟机制策略步骤如下:

  1. 根据消息队列表时做轮训
  2. 选好一个队列
  3. 判断该队列所在Broker是否可用
  4. 如果是可用则返回该队列,队列选择逻辑结束
  5. 如果不可用,则接着步骤2继续
  6. 如果都不可用,则随机选一个

代码如下:

image.png

两种策略的选择

从这种策略上可以很明显看到,默认队列选择是轮训策略,而故障延迟选择队列则是优先考虑消息的发送时长短的队列。那么如何选择呢?

首先RocketMQ默认的发送失败有重试策略,默认是2,也就是如果向不同的Broker发送三次都失败了那么这条消息的发送就失败了,作为RocketMQ肯定是尽力要确保消息发送成功。所以给出以下建议。

如果是网络比较好的环境,推荐默认策略,毕竟网络问题导致的发送失败几率比较小。

如果是网络不太好的环境,推荐故障延迟机制,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。

当然以上成立的条件是一个Topic创建在2个Broker以上的的基础上。

技术亮点:ThreadLocal

image.png

image.png

image.png

更多推荐

小程序开发一个多少钱啊

在今天的数字化时代,小程序已经成为一种非常流行的应用程序形式。由于它们的便捷性、易用性和多功能性,小程序吸引了越来越多的用户和企业。但是,很多人在考虑开发一个小程序时,都会遇到同一个问题:开发一个小程序需要多少钱?小程序的开发费用因人而异,取决于多种因素。下面,我们将为您详细列出影响小程序开发费用的主要因素。1、功能需

Bean的生命周期

SpringBean的生命周期是从Bean实例化之后,即通过反射创建出对象之后,到Bean成为一个完整对象,最终存储到单例池中,这个过程被称为SpringBean的生命周期。小枫叶一,实例化1.1Bean工厂后处理器–BeanFactoryPostProcessorBeanFactoryPostProcessor是一个

怎样判断一个数是否为偶数

要求代码行数尽可能少;packagemainimport("fmt""strconv")funcmain(){fmt.Printf("传入的值是否为奇数:%t\n",Judge_is_even(7))}funcJudge_is_even(numint)bool{//fmt.Println(num%2)rs,_:=str

modbus的协议

在介绍Modbus协议之前,我们要先了解下RS485协议,因为Modbus协议是在RS485这个硬件层协议上搭建的软件层协议。RS485特性半双工。用缆线两端的电压差值来表示传递信号。RS485的特点包括1.S485的电气特性:逻辑“1”以两线间的电压差为+(2~6)V表示;逻辑“0”以两线间的电压差为-(2~6)V表

ChatGPT Prompting开发实战(八)

一.什么是归纳总结式的prompt开发有时候需要对一段文本进行归纳总结,那么可以采取以下的方案:-按照给定单词、句子或者字符的数量限制来让模型裁剪文本,使内容更精炼-基于聚焦的主题进行总结-只根据需求抽取相关的文本信息,不需要整段文本内容除了上面列出的几种方式之外,还可能有额外的一些需求,譬如给出多段文本,要求模型同时

JUnit测试进阶(Private测试)

Private测试前言一、间接调用二、Java反射机制调用前言在单元测试中,由于私有方法(PrivateMethod)无法直接被调用,因此对私有方法进行测试成为一项难题。一个可行的方法是:在测试时将私有方法改变为公有方法(PublicMethod),在测试完成后再将其修改为私有方法。然而,该方法操作过程比较复杂,不利于

Redis面试题(五)

文章目录前言一、使用过Redis做异步队列么,你是怎么用的?有什么缺点?二、什么是缓存穿透?如何避免?什么是缓存雪崩?何如避免?总结前言使用过Redis做异步队列么,你是怎么用的?有什么缺点?什么是缓存穿透?如何避免?什么是缓存雪崩?何如避免?`一、使用过Redis做异步队列么,你是怎么用的?有什么缺点?一般使用lis

Windows开机密码破解

目前可行的方法(目前只能通过进PE的方式进行密码的修改)通过本文最后“本文参考网页”下载Rufus写盘工具和Hiren’sBootCDPE镜像启动写盘工具,选择U盘和镜像U盘插入电脑时确保电脑为关机状态启动电脑,快速敲击Delete键,进入Bios界面(不同的电脑是通过不同的按键进入BIOS,可以利用搜索引擎查看你的电

腾讯会议核心存储治理:Redis分库和异地多活

👉导读会控为整个会议最为核心的业务,由于海量请求的高性能要求,后台存储全部为Redis。在业务飞速发展期,各模块边界不够清晰,大家对存储的使用处于失控状态,随着PCU的不断上涨,逐步暴露出存储和架构的诸多问题,同时也对系统容灾能力有了更高的要求。会控业务历史包袱重,存储改造伤筋动骨,要做到平滑迁移需要考虑的细节较多。

常用注解梳理

@RestController注解:将一个类标识为一个RESTful风格的控制器,用于处理HTTP请求和响应。@RequestMapping注解:用于将一个HTTP请求映射到控制器的处理方法上,可以用于类级别和方法级别。@PostMapping注解:用于将HTTPPOST请求映射到控制器的处理方法上。@GetMappi

浅谈xss

XSS简介XSS,全称CrossSiteScripting,即跨站脚本攻击,是最普遍的Web应用安全漏洞。这类漏洞能够使得攻击者嵌入恶意脚本代码到正常用户会访问到的页面中,当正常用户访问该页面时,则可导致嵌入的恶意脚本代码的执行,从而达到恶意攻击用户的目的。需要强调的是,XSS不仅仅限于JavaScript,还包括fl

热文推荐