Kafka 源码分析——Producer

2023-09-20 10:52:16

前言

在 Kafka 中, 把产生消息的一方称为 Producer 即 生产者,它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer 整体流程

Kafka一条消息发送和消费的流程

image.png

站在源码的核心角度,可以把Producer分成以下几个核心部分:

  1. Producer初始化
  2. Producer发送流程
  3. Producer缓冲区
  4. Producer参数与调优

Producer 初始化

image.png

因为源码中有非常多的一些额外处理,所以解读源码没必要每行都读,只需要根据梳理的主流程找到核心代码进行解读就可以。

设置分区器(partitioner),分区器是支持自定义的

image.png

设置重试时间

重试时间(retryBackoffMs)默认100ms。如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

设置序列化器

image.png

设置拦截器(interceptors)

image.png

拦截器一般用得不多,可以为消息统一添加字段或统计发送失败成功次数,这些逻辑会拖慢producer的消息发送效率,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

image.png
上图的一些设置

  1. 设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

  2. 设置缓存大小(totalMemorySize) 默认是32M

  3. 设置压缩格式(compressionType)

  4. 初始化RecordAccumulator也就是缓冲区指定为32M

设置缓冲区

image.png

设置消息累加器

因为生产者是通过缓冲的方式发送,所以需要一个消息累加器配合才能完成消息的发送。

image.png

初始化集群元数据(metadata)

image.png

创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

Producer 发送流程

执行拦截器逻辑

执行拦截器逻辑,预处理消息,封装 Producer Record

image.png

获取集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

选择分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

消息发送

真正的消息发送是Sender线程来做,并且还要结合缓冲区来处理。这里我们只需要知道发送的条件:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限。

Producer缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销,。这样内存池可以对 RecordBatch 做到反复利用,防止引起Full GC问题。

核心就是这段代码:

image.png

image.png

Kafka 内存设计有两部分,可用的内存(未分配的内存,初始的时候是 32M)和已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存, 两部分加起来是 32M。

申请内存的过程

发送流程中会把消息放入 accumulator中,即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送,然后去申请内存(free.allocate())。

image.png

  1. 如果申请的内存大小超过了整个缓存池的大小,则抛异常出来。
  2. 如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。
  3. 如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存申请一块内存。

Producer 参数调优

在 Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证。

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

调优建议:这个配置对于生产环境来说有点小, 为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, **主要跟 retries 配合使用, 在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。

更多推荐

Linux下git安装及使用

Linux下Git使用1.git的安装sudoaptinstallgit安装完,使用git--version查看git版本2.配置gitgitconfig--globaluser.name"YourName“##配置用户gitconfig--globaluser.emailemail@example.com##配置邮箱

vuex实现简易购物车加购效果

目录一、加购效果动图二、前提条件三、开始操作四、解决vuex刷新数据丢失问题五、最终效果一、加购效果动图二、前提条件创建了vue项目,安装了vuex三、开始操作目录结构如下:main.js文件中引入store:importVuefrom'vue'importAppfrom'./App.vue'importstorefr

服务器的维护是如何操作

服务器的维护是如何操作服务器可以说是不可或缺的资源,因为现在网络技术发达,我们的生活也都离不开网络的存在,我们想要获取的业务、资料等大多是通过网络进行,所以想要顺应潮流并获得发展,肯定需要服务器来将企业的相关信息与产品等发布到网络中,供客户选择。那应该如何维护好服务器呢?硬件维护1、增加内存和硬盘容量的工作。增加内存是

linux 杂乱汇总

SO_LINGER作用设置函数close()关闭TCP连接时的行为。缺省close()的行为是,如果有数据残留在socket发送缓冲区中则系统将继续发送这些数据给对方,等待被确认,然后返回。利用此选项,可以将此缺省行为设置为以下两种a.立即关闭该连接,通过发送RST分组(而不是用正常的FIN|ACK|FIN|ACK四个

一文搞懂并查集

一文搞懂并查集1背景意义2原理讲解3路径压缩4代码模板1背景意义首先要知道并查集可以解决什么问题呢?并查集常用来解决连通性问题。大白话就是当我们需要判断两个元素是否在同一个集合里的时候,我们就要想到用并查集。并查集主要有两个功能:将两个元素添加到一个集合中;判断两个元素在不在同一个集合。接下来围绕并查集的这两个功能来展

JWT 安全及案例实战

文章目录一、JWT(jsonwebtoken)安全1.Cookie(放在浏览器)2.Session(放在服务器)3.Token4.JWT(jsonwebtoken)4.1头部4.1.1alg4.1.2typ4.2payload4.3签名4.4通信流程5.防御措施二、漏洞实例(webgoat)1.第四关2.第五关3.第七

uniappAndroid平台签名证书(.keystore)生成

一、安装JRE环境https://www.oracle.com/java/technologies/downloads/#java8记住下载默认安装地址。ps:我都默认安装地址C:\ProgramFiles\Java\jdk-1.8二、安装成功后配置环境变量系统变量配置AVA_HOME放到环境变量去%JAVA_HOME

nginx部署多个项目

前言实现在一台服务器上使用nginx部署多个项目的方法查看并修改nginx安装的默认配置文件在Linux操作系统中,Nginx在编译安装时默认的配置文件路径是/usr/local/nginx/conf/nginx.conf。如果是通过发行版的包管理器安装,则默认的配置文件路径可能会相应改变,例如在Ubuntu下为/et

Vue中使用VueAMap

npm安装npminstallvue-amap--save注册:高德地图//在main.js中注册:高德地图importVueAMapfrom"vue-amap";Vue.use(VueAMap);VueAMap.initAMapApiLoader({key:"你的高德key",plugin:["AMap.AutoCo

【K8S系列】深入解析k8s网络插件—Flannel

序言做一件事并不难,难的是在于坚持。坚持一下也不难,难的是坚持到底。文章标记颜色说明:黄色:重要标题红色:用来标记结论绿色:用来标记论点蓝色:用来标记论点Kubernetes(k8s)是一个容器编排平台,允许在容器中运行应用程序和服务。今天学习一下k8s网络插件相关知识希望这篇文章能让你不仅有一定的收获,而且可以愉快的

C++ 类(1)

你知道吗,C++类是编程世界中的一种强大工具,它可以帮助我们更好地组织和管理代码。接下来,我将为你呈现一篇近万字的C++类的教程,希望能帮助你熟悉这个概念。首先,让我们从C++类的定义开始。类是一个模板,它描述了一种具有相同属性和行为的数据类型。听起来很复杂吗?别担心,我来给你举个例子。假设我们要创建一个名为"Car"

热文推荐