Kafka 源码分析——Consumer

2023-09-20 11:24:43

前言

当生产者将消息发送到Broker时,这些消息将被存储在磁盘上。消费者是如何消费这些消息呢?

Consumer 消费流程

image.png
站在源码的核心角度,可以把Consumer分成以下几个核心部分:

  1. Consumer初始化
  2. 如何选举Consumer Leader
  3. Consumer Leader是如何指定分区
  4. Consumer如何拉取数据
  5. Consumer偏移量提交

Consumer初始化

从KafkaConsumer的构造方法出发,跟踪到核心实现方法

image.png

这个方法的前面代码部分都是一些配置

Consumer与Broker的核心通讯组件

image.png

ConsumerCoordinator:在Kafka消费中是组消费,协调器在具体进行消费之前要做很多的组织协调工作。

image.png

Fetcher:提取器,因为Kafka消费是拉数据的,所以这个Fetcher就是拉取数据的核心类

image.png

这个核心类中,有很多很多的参数设置,这些就跟消费的配置有关系

fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回,缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

fetch.max.bytes

每次fetch请求时,server应该返回的最大字节数。这个参数决定了可以成功消费到的最大数据。

比如这个参数设置的是50M,那么consumer能成功消费50M以下的数据,但是最终会卡在消费大于10M的数据上无限重试。fetch.max.bytes一定要设置到大于等于最大单条数据的大小才行。

image.png

fetch.wait.max.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500毫秒。和上面的fetch.min.bytes结合起来。

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。

假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

max.poll.records

控制每次poll方法返回的最大记录数量。

image.png

如何选举Consumer Leader

消费者协调器与组协调器的通讯

image.png

image.png

image.png

image.png

image.png

向Broker发送请求

image.png
对Broker响应进行处理
image.png

消费者协调器发起入组请求

image.png

image.png

image.png

image.png

image.png

消费者分区策略

partition.assignment.strategy,分区分配给消费者的策略。默认为Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

image.png

StickyAssignor:初始分区,和RoundRobin是一样。每一次分配变更相对上一次分配做最少的变动。

目标:

1、分区的分配尽量的均衡

2、每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标

比如有3个消费者(C0、C1、C2)、4个topic(T0、T1、T2、T34),每个topic有2个分区(P1、P2)

image.png

自定义策略

extends 类AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,类.class.getName());

分区策略源码分析

image.png

image.png

image.png

image.png

image.png

Consumer拉取数据

拉取数据核心Fetch类

image.png

image.png

image.png

提交偏移量

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

当然,自动提交auto.commit.interval.ms

image.png

默认5s

image.png

maybeAutoCommitOffsetsAsync 最后这个就是poll的时候会自动提交,而且没到auto.commit.interval.ms间隔时间也不会提交,如果没到下次自动提交的时间也不会提交。

更多推荐

新能源汽车运行安全性能检验规程需要哪些CAN数据才符合标准

新能源汽车的前生命周期包括了整车制造、使用、转让市场及报废回收这几个主要阶段,在政策大力扶持下,国内新能源汽车的制造产业链完善,补贴培育市场取得丰硕的果实。目前来说,我国新能源汽车有着技术领先、设计先进、低成本优势,在全球范围内都具备很大的吸引力。纯电动汽车及电车技术的出现和发展,实现了国内新能源汽车弯道超车,跨越了百

Opencv之区域生长和分裂

区域生长1.基本原理区域生长法是较为基础的一种区域分割方法它的基本思想我说的通俗些,即是一开始有一个生长点(可以一个像素也可以是一个小区域),从这个生长点开始往外扩充,扩充的意思就是它会把跟自己有相似特征的像素或者区域拉到自己的队伍里,以此壮大自己的势力范围,每次扩大后的势力范围就是一个新的生长点,一直生长一直生长,直

Matlab进阶绘图第30期—冲击图

冲击图是一种特殊的堆叠柱状图。与堆叠柱状图相比,冲击图添加了相邻柱子中相同组分之间的连线,可以更加清晰地表达各组分占比情况。由于Matlab中未收录冲击图的绘制函数,因此需要大家自行解决。本文使用自制的Fbarstacked小工具进行冲击图的绘制,先来看一下成品效果:特别提示:本期内容『数据+代码』已上传资源群中,加群

cmake:target属性POSITION_INDEPENDENT_CODE和INTERFACE_POSITION_INDEPENDENT_CODE的区别

cmake定义的target有两个名字类似的属性:POSITION_INDEPENDENT_CODE和INTERFACE_POSITION_INDEPENDENT_CODE,本文说明它们的含义和区别-fPIC介绍POSITION_INDEPENDENT_CODE和INTERFACE_POSITION_INDEPENDE

matlab读写json文件

Background通常,在matlab中使用mat文件进行数据存储。MAT文件是MATLAB中用来存储数据的二进制文件格式。MAT文件可以包含各种数据类型,包括数字、矩阵、向量、结构体、字符和函数等。但是,当和其他语言有交互时,mat文件会不太方便。而json格式在许多编程语言中,包括MATLAB,都有提供解析和创建

【LQR】离散代数黎卡提方程的求解,附Matlab/python代码(笔记)

LQR的核心是设计QRN,并求解对应的黎卡提方程对于连续状态空间方程系统,先求连续LQR后离散和先离散后求离散LQR方程的结果是不一样的1.离散代数黎卡提方程注:LQR算法中含N项离散系统:在matlab里有现成的函数dlqr(),但为了搞清楚其内核,编写matlab代码展示其求解过程matlab帮助文件里的dlqr(

淘宝拍立淘插件转链和商业化图片生成接口介绍,图片搜索商品接口,按图搜索接口,图片识别商品接口介绍

淘宝拍立淘是淘宝网推出的一种搜索方式,通过拍立淘,用户可以输入文字描述或上传图片来搜索商品。拍立淘通过与淘宝网进行数据接入和授权,使用淘宝提供的API获取商品信息和操作权限,拍立淘使用图像识别技术,通过深度学习算法和计算机视觉技术,对用户拍摄的商品照片进行识别,拍立淘插件转链API用于为淘宝客提供开启拍立淘插件(根据图

基于改进莱维飞行和混沌映射的粒子群优化BP神经网络预测股票价格研究(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。⛳️座右铭:行百里者,半于九十。📋📋📋本文目录如下:🎁🎁🎁目录💥1概述📚2运行结果🎉3参考文献🌈4Matlab代码实现💥1概述基于改进莱维飞行和混沌映射的粒子群优化BP神经网络

SunTorque亮相GAF2023数字化智能装配工程与装备技术大会

智能扭矩系统-智能拧紧系统-智能扭矩控制-SunTorqueGAF2023数字化智能装配工程与装备技术大会在中国上海汽车会展中心盛大开幕,青创智通与装配领域、智能制造、数字化应用等相关先进智造技术的知名企业一齐亮相。本次展会,我们带来了扭矩相关解决方案,包含智能扭矩系统软件、工具存储设备、扭矩校验设备、智能手持终端、扭

neo4j下载安装配置步骤

目录一、介绍简介Neo4j和JDK版本对应二、下载官网下载直接获取三、解压缩安装四、配置环境变量五、启动测试一、介绍简介Neo4j是一款高性能的图数据库,专门用于存储和处理图形数据。它采用节点、关系和属性的图形结构,非常适用于表示和查询复杂的实体关系。Neo4j具有高性能、事务支持、可扩展性和直观的Cypher查询语言

喜报 | 亮相2023数博会,摘得首届数智金融创新大赛优秀奖

河北正定,千年古城,这里不仅有一幕幕刀光剑影,鼓角争鸣的故事,还有驰名中外的人“一寺四塔”,有宜人的气候,也有汇聚高科技的天下英雄会。图源于网络2023年9月6日,河北正定,中国国际数字经济博览会(以下简称数博会)正式开幕,坚定“工业互联网赋能千行百业”的科技信仰,奔向“数字经济引领高质量发展”的未来世界。图源于网络据

热文推荐