Kafka消费者组重平衡(二)

2023-09-15 07:25:12

概要

上一篇Kafka消费者组重平衡主要介绍了重平衡相关的概念,本篇主要梳理重平衡发生的流程。

为了更好地观察,数据准备如下:
kafka版本:kafka_2.13-3.2.1
控制台创建topic (2个分区1个副本):
bin/kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 2 --topic test-rebalance

本地启动两个SpringBoot项目实例,代码如下

@KafkaListener(topics = "test-rebalance", groupId = "test-group")
public void rebalanceConsumer(ConsumerRecord<String, String> recordInfo) {
    int partition = recordInfo.partition();
    System.out.println("partition:" + partition + " value:" + recordInfo.value());
}

重平衡通知机制

Kafka Java 消费者需要定期地发送心跳请求(Heartbeat Request)到 Broker 端的协调者,以表明它还存活着。在 Kafka 0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是调用 KafkaConsumer.poll 方法的那个线程。
这样的设计存在弊端,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者“错误地”认为该消费者已“死”。自 0.10.1.0 版本开始,Kafka引入了一个单独的心跳线程来专门执行心跳请求发送,避免了这个问题。

重平衡的通知机制正是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

消费组组状态

Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那么,这 5 种状态的含义如下:

状态含义
Empty组内没有任何成员
Dead组内没有任何成员,但组的元数据已经在协调者端删除
PreparingRebalance消费组组准备开启重平衡,此时所有成员都要重新请求加入消费者组
CompletingRebalance消费者组下所有成员已经加入,各个成员正在等待分配方案
Stable消费者组的稳定状态,该状态表明重平衡已经完成,组内各成员都能够正常消费数据了

以下是各个状态的流转:

在这里插入图片描述

一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于PreparingRebalance 状态等待成员加入,之后变更到CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。

创建一个topic并逐步启动两个消费者实例:
服务端日志:

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Empty state. Created a new member id consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 3 (__consumer_offsets-12) (reason: Adding new member consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 4 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 4. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

客户端一启动日志

o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 4
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1, test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1, test-rebalance-0]

有新成员加入后broker日志

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Stable state. Created a new member id consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 4 (__consumer_offsets-12) (reason: Adding new member consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) with 2 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 5. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

接下来启动客户端二

客户端一日志

 o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-0]

客户端二日志


o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Attempt to heartbeat failed since group is rebalancing
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Revoking previously assigned partitions [test-rebalance-1, test-rebalance-0]
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions revoked: [test-rebalance-1, test-rebalance-0]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1]

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。

消费端重平衡流程

重平衡的完整流程需要消费者端和协调者组件(什么是协调者)共同参与才能完成。下面先梳理消费者端的重平衡流程。主要分为3各阶段。

第一阶段:确定组协调器
关于如何确定组协调器,参考Kafka消费者重平衡(一)

第二阶段:JoinGroup

在此阶段的消费者会向Group Coordinator 发送 JoinGroupRequest 请求,并处理响应。然后GroupCoordinator 从一个consumer group中选择第一个(通常情况下)加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。

在这里插入图片描述

第三阶段:SyncGroup

待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段

在这里插入图片描述
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

Broker端重平衡流程

下面只要从几个常见的场景梳理Broker端重平衡的流程。

新成员加入

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
具体流程如下:

在这里插入图片描述

组成员主动离组

消费者实例所在线程或进程调用 close() 方法时,就会主动通知协调者要退出组。以下时具体的流程

在这里插入图片描述

组成员崩溃离组

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制。
在这里插入图片描述

更多推荐

​CVPR 2023 | STAR Loss:减少人脸关键点标注歧义,实现人脸关键点SOTA精度

论文链接:https://arxiv.org/pdf/2306.02763.pdf代码链接:https://github.com/ZhenglinZhou/STAR要解决的问题:人脸关键点检测标注中存在语义歧义问题。语义歧义是指不同的标注者对同一个面部特征点的位置有不同的理解,导致标注结果不一致,影响模型的收敛和准确性

设计模式-代理模式

“阁下有什么问题可以和我的代理律师谈即可”,为什么会有律师这个职业呢?随着法律法规的逐步完善,日益复杂,导致大部分的普通民众掌握的法律知识明显不足,进而无法在合适的时间点进行维权、规避风险。可见,律师的作用就是利用自身的专业知识帮助案件当事人处理其无法处理的事情。不仅只有律师,生活中处处可见这种代理模式的存在,比如婚庆

麦肯锡:中国生成式AI市场现状和未来发展趋势

本文来自《麦肯锡中国金融业CEO季刊》,版权归麦肯锡所有。该季刊主要围绕生成式AI(以下简称“GenAI”)主题,通过4大章节共8篇文章,全面深入分析了GenAI对各主要行业的影响、价值链投资机会、中国GenAI市场现状和未来趋势以及企业如何布局GenAI,从而真正挖掘其价值。随着ChatGPT的火爆出圈,GenAI成

2023年8月京东空调行业品牌销售排行榜(京东数据报告)

鲸参谋监测的京东平台8月份空调市场销售数据已出炉!鲸参谋数据显示,今年8月份,京东平台上空调的销量将近146万,环比降低约44%,同比降低约37%;销售额为41亿+,环比下降约45%,同比下降约40%。可以看到,8月份空调市场整体下滑。*数据源于鲸参谋-行业趋势分析(来自公开渠道获取,数据仅供参考)空调市场中,格力品牌

Python编程指南:利用HTTP和HTTPS适配器实现智能路由

目录HTTP和HTTPS适配器什么是智能路由利用HTTP和HTTPS适配器实现智能路由总结在Python编程中,利用HTTP和HTTPS适配器实现智能路由是一项非常实用的技能。智能路由可以根据不同的条件选择不同的路由,从而提高网络性能和用户体验。在本文中,我们将介绍如何使用Python编程语言和HTTP/HTTPS适配

非对称加密、解密原理及openssl中的RSA示例代码

一、【原理简介】非对称加密非对称加密,也被称为公钥加密,其中使用一对相关的密钥:一个公钥和一个私钥。公钥用于加密数据,私钥用于解密数据。公钥可以公开分享,而私钥必须保密。密钥生成:当一个用户或设备希望使用非对称加密时,要生成一对密钥:一个公钥和一个私钥。这两个密钥是数学上相关的,但从公钥中计算出私钥在计算上是不可行的。

【操作系统笔记】内存寻址

物理寻址主存(内存)计算机主存也可以称为物理内存,内存可以看成由若干个连续字节大小的单元组成的数组每个字节都有一个唯一的物理地址(PhysicalAddress)CPU访问内存前,先拿到内存地址,然后,通过内存地址访问内存中数据指令总线的分工数据总线:负责传输实际数据的地址总线:负责传输数据地址的,用来确定到底把数据传

FPGA千兆网 UDP 网络视频传输,基于88E1518 PHY实现,提供工程和QT上位机源码加技术支持

目录1、前言版本更新说明免责声明2、我这里已有的以太网方案3、设计思路框架视频源选择OV5640摄像头配置及采集动态彩条UDP协议栈UDP视频数据组包UDP协议栈数据发送UDP协议栈数据缓冲IP地址、端口号的修改TriModeEthernetMAC介绍以及移植注意事项88E1518PHYQT上位机和源码4、vivado

AOSP源码中Android.mk文件中的反斜杠符号(\)的作用和使用

简介在AOSP(AndroidOpenSourceProject)源码中的Android.mk文件中,反斜杠符号(\)的主要作用是将一行代码拆分成多行,以提高可读性并帮助组织较长的代码块。这对于定义复杂的构建规则和变量时特别有用。以下是\符号在Android.mk文件中的作用以及如何使用它的示例:多行命令:Androi

Nacos使用教程(三)——nacos注册中心(2)

文章目录什么是注册中心注册中心的作用1.服务注册2.服务发现3.负载均衡4.故障恢复注册中心的解决的问题1.服务管理问题2.服务调用问题3.负载均衡问题4.故障恢复问题服务的发现与注册的实现模式服务注册表Nacos注册中心的部署与使用什么是Nacos注册中心Nacos注册中心的部署下载Nacos安装包解压安装包修改配置

【图像处理】怎么让图片背景变透明?怎么改变图片中线条的颜色?在线PS软件

文章目录前言一、打开图二、背景透明处理三、改变线条颜色总结前言我们处理图片将会使用到PS软件,如果安装了PS那直接打开使用。如果没有安装PS,可使用在线PS软件:https://www.uupoop.com/#/一、打开图第一步,从这个入口打开图片。第二步:打开图片,我们打开一个MySQL数据库的图标。二、背景透明处理

热文推荐