kafka 3.5 生产者请求中的acks,在服务端如何处理源码

2023-09-15 01:30:39

一、生产者客户端配置参数acks说明

首先,客户端需要配置一个acks参数,默认值是1,下面是acks各个值的说明
acks=-1,太慢,acks=0,有风险,acks=1,则是推荐,所以也是默认值的原因

1、acks=1

这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。

2、acks=0

表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1

3、acks=-1

这意味着leader需要等待ISR中所有备份都成功写入日志。只要任何一个备份存活,数据都不会丢失。min.insync.replicas指定必须确认写入才能被认为成功的副本的最小数量。

二、请求在写入Leader的数据管道之前,则会验证Leader的ISR副本数量和配置中的最小ISR数量

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
                            requestLocal: RequestLocal): LogAppendInfo = {
    //函数首先获取leaderIsrUpdateLock的读锁,以确保对Leader和ISR(In-Sync Replica)的更新操作是同步的。
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
      //然后检查当前是否有Leader日志,
      leaderLogIfLocal match {
        //如果存在Leader日志,
        case Some(leaderLog) =>
          //则获取最小ISR(MinInSyncReplicas)的配置和ISR的大小。
          val minIsr = leaderLog.config.minInSyncReplicas
          val inSyncSize = partitionState.isr.size

          // Avoid writing to leader if there are not enough insync replicas to make it safe,如果没有足够的不同步副本来使其安全,请避免写入领导者
          //如果ISR的大小小于最小ISR要求,并且requiredAcks的值为-1(表示不需要确认),则抛出NotEnoughReplicasException异常。
          if (inSyncSize < minIsr && requiredAcks == -1) {
            throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
              s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
          }
          //调用Leader日志的appendAsLeader方法将记录作为Leader追加到日志中,并传递相关参数。
          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
            interBrokerProtocolVersion, requestLocal)

          // we may need to increment high watermark since ISR could be down to 1,
          // 我们可能需要增加高水位线,因为 ISR 可能降至 1
          (info, maybeIncrementLeaderHW(leaderLog))
        //如果没有,则抛出NotLeaderOrFollowerException异常。
        case None =>
          throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
            .format(topicPartition, localBrokerId))
      }
    }
    //返回追加记录的信息,并根据是否增加了Leader高水位线,将LeaderHwChange.INCREASED或LeaderHwChange.SAME复制给返回信息的副本。
    info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
  }

1、Leader的ISR小于配置文件中minInSyncReplicas,并且acks=-1,则抛异常

会验证acks=-1并且当前Leader的ISR副本数量小于配置中规定的最小值

val minIsr = leaderLog.config.minInSyncReplicas
 val inSyncSize = partitionState.isr.size
 if (inSyncSize < minIsr && requiredAcks == -1) {
            throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
              s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
    }

2、如果acks不等于-1,则就算Leader的ISR小于配置,也会正常执行写入数据管道操作

 //调用Leader日志的appendAsLeader方法将记录作为Leader追加到日志中,并传递相关参数。
  val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
   interBrokerProtocolVersion, requestLocal)

三、请求把数据写入到Leader的数据管道后,acks=-1和非-1,有不同的逻辑

这里不从头开始,如果想知道推送的数据怎么到下面方法的,可以看kafka 3.5 kafka服务端接收生产者发送的数据源码

/**
  
   *将消息附加到分区的领导副本,并等待它们复制到其他副本;当超时或满足所需的 ACK 时,将触发回调函数;如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁
   * 请注意,所有挂起的延迟检查操作都存储在队列中。所有 ReplicaManager.appendRecords() 的调用方都应为所有受影响的分区调用 ActionQueue.tryCompleteActions,而不会保留任何冲突的锁。
   */
  def appendRecords(timeout: Long,
                    requiredAcks: Short,
                    internalTopicsAllowed: Boolean,
                    origin: AppendOrigin,
                    entriesPerPartition: Map[TopicPartition, MemoryRecords],
                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                    delayedProduceLock: Option[Lock] = None,
                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
                    requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
    //省略代码                
    //把数据中加入到本地Log                
    val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        origin, entriesPerPartition, requiredAcks, requestLocal)
     //省略代码      
     //调用recordConversionStatsCallback方法,将每个分区的记录转换统计信息传递给回调函数。
      recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
      //通过 delayedProduceRequestRequired 方法判断是否需要等待其它副本完成写入,如果 acks = -1,则需要等待所有副本的回应
      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
        //根据条件判断是否需要创建延迟的produce操作。如果需要,创建一个DelayedProduce对象,并将它添加到delayedProducePurgatory中。
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
        //创建(主题、分区)对的列表,以用作此延迟生成操作的键
        val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
        // 再一次尝试完成该延时请求
        //  如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

      } else {
        //如果不需要延迟操作,直接将produce的结果返回给回调函数。
        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
        responseCallback(produceResponseStatus)
      }else{
		 //如果不需要延迟操作,直接将produce的结果返回给回调函数。
        // we can respond immediately
        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
        responseCallback(produceResponseStatus)
	}
      //省略代码
  }

1、如果acks=-1,则会创建延迟Produce请求,等待ISR中所有副本的响应

  //它用于判断是否需要延迟发送生产请求并等待复制完成
  // 1. required acks = -1 判断requiredAcks是否等于-1,即是否需要等待所有副本的确认。
  // 2. there is data to append 判断entriesPerPartition是否不为空,即是否有要追加的数据。
   // 3. at least one partition append was successful (fewer errors than partitions) 计算localProduceResults中异常定义的数量,判断其是否小于entriesPerPartition的大小,即是否至少有一个分区的追加操作成功(即比分区数少的错误,如果全错,就应该直接返回)。
  private def delayedProduceRequestRequired(requiredAcks: Short,
                                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                            localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
    requiredAcks == -1 &&
    entriesPerPartition.nonEmpty &&
    localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
  }
 /**
   *检查操作是否可以完成,如果没有,则根据给定的监视键进行监视
   *请注意,可以在多个密钥上监视延迟操作。对于某些键(但不是所有键),操作可能会在添加到监视列表后完成。
   * 在这种情况下,操作被视为已完成,不会添加到其余键的监视列表中。过期收割线程将从存在该操作的任何观察程序列表中删除此操作。
   * @param operation the delayed operation to be checked 要检查的延迟操作
   * @param watchKeys keys for bookkeeping the operation 用于监视的键
   * @return true iff the delayed operations can be completed by the caller 如果延迟操作可以由调用方完成,则为 true
   */
  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    assert(watchKeys.nonEmpty, "The watch key list can't be empty")
    //尝试完成操作,如果操作不能立即完成,则将操作添加到所有观察键的观察列表中,并递增estimatedTotalOperations计数器的值
    if (operation.safeTryCompleteOrElse {
      watchKeys.foreach(key => watchForOperation(key, operation))
      if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
    }) return true
    //如果操作仍未完成,则根据条件执行以下操作:
    if (!operation.isCompleted) {
      //如果启用了定时器(timerEnabled为真),则将操作添加到超时定时器中。
      if (timerEnabled)
        timeoutTimer.add(operation)
      //如果操作已完成,则取消定时器任务。
      if (operation.isCompleted) {
        // cancel the timer task
        operation.cancel()
      }
    }
    //返回false表示操作未完成。
    false
  }

2、如果acks不等于-1,写入到Leader的数据管道后,则直接执行回调函数返回结果

	   //如果不需要延迟操作,直接将produce的结果返回给回调函数。
        // we can respond immediately
        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
        responseCallback(produceResponseStatus)

四、在返回response时,回调函数会遍历分区异常信息


    //用于发送 produce 响应的回调 ProduceResponse 的构造能够接受自动生成的协议数据,因此 KafkaApishandleProduceRequest 应应用自动生成的协议以避免额外的转换
    @nowarn("cat=deprecation")
    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
      var errorInResponse = false

      mergedResponseStatus.forKeyValue { (topicPartition, status) =>
        if (status.error != Errors.NONE) {
          errorInResponse = true
          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
            request.header.correlationId,
            request.header.clientId,
            topicPartition,
            status.error.exceptionName))
        }
      }
      //记录带宽和请求配额特定的值,并在违反任何配额时通过静音通道来限制。如果违反了两个配额,请使用两个配额之间的最大限制时间。请注意,如果 acks == 0,则不会强制执行请求配额。
      val timeMs = time.milliseconds()
      val requestSize = request.sizeInBytes
      val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, requestSize, timeMs)
      val requestThrottleTimeMs =
        if (produceRequest.acks == 0) 0
        else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
      val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
      if (maxThrottleTimeMs > 0) {
        request.apiThrottleTimeMs = maxThrottleTimeMs
        if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
          requestHelper.throttle(quotas.produce, request, bandwidthThrottleTimeMs)
        } else {
          requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
        }
      }
      //如果produceRequest.acks等于0,表示不需要响应。
      if (produceRequest.acks == 0) {
        //如果生产者请求,则无需操作;
        //但是,如果在处理请求时出现任何错误,由于生产者期望没有响应,服务器将关闭套接字服务器,以便生产者客户端知道发生了一些错误并刷新其元数据
        if (errorInResponse) {
          //如果errorInResponse为true,则关闭连接并发送包含错误信息的ProduceResponse响应
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
          info(
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          )
          requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
        } else {
          //如果没有异常,发送无操作的响应。
          requestHelper.sendNoOpResponseExemptThrottle(request)
        }
      } else {
        //如果produceRequest.acks不等于0,将mergedResponseStatus和maxThrottleTimeMs作为参数构造ProduceResponse响应,并通过requestChannel发送响应。
        requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs), None)
      }
    }

1、如果acks=0,则关闭套接字服务器

		//如果生产者请求,则无需操作;
        //但是,如果在处理请求时出现任何错误,由于生产者期望没有响应,服务器将关闭套接字服务器,以便生产者客户端知道发生了一些错误并刷新其元数据
        if (errorInResponse) {
          //如果errorInResponse为true,则关闭连接并发送包含错误信息的ProduceResponse响应
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
          info(
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          )
          requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
        }

2、如果acks不等0,会返回异常信息

 requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs), None)
更多推荐

7.前端·新建子模块与开发(自动生成)

文章目录学习地址视频笔记自动代码生成模式开发增删改查功能调试功能权限分配脚本实现权限分配学习地址https://www.bilibili.com/video/BV13g411Y7GS/?p=15&spm_id_from=pageDriver&vd_source=ed09a620bf87401694f763818a31c

企业电子招投标采购系统——功能模块&功能描述+数字化采购管理 采购招投标

功能描述1、门户管理:所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含:招标公告、非招标公告、系统通知、政策法规。2、立项管理:企业用户可对需要采购的项目进行立项申请,并提交审批,查看所有的立项信息。主要功能包含:招标立项申请、非招标立项申请、采购立项管理。3、采购项目管理:可对项目采购过程全流程管

共聚焦显微镜在化学机械抛光课题研究中的应用

两个物体表面相互接触即会产生相互作用力,研究具有相对运动的相互作用表面间的摩擦、润滑与磨损及其三者之间关系即为摩擦学,目前摩擦学已涵盖了化学机械抛光、生物摩擦、流体摩擦等多个细分研究方向,其研究的数值量级也涵盖了亚纳米到百微米的区间。摩擦本身是一种能量损耗现象,然而得到合理地利用也能产生巨大的正面效益,因此,准确地测定

【Linux】自动化构建工具:make/Makefile

​👻内容专栏:Linux操作系统基础🐨本文概括:工具使用的背景、理解make/makefile工具、探索工作原理(文件修改时间的对比)、.PHONY伪目标、特性等。🐼本文作者:阿四啊🐸发布时间:2023.9.14背景“make”和“Makefile”是用于自动化构建和编译软件项目的工具和文件。它们通常用于编译源

企业电子招标采购系统源码之从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理

功能描述1、门户管理:所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含:招标公告、非招标公告、系统通知、政策法规。2、立项管理:企业用户可对需要采购的项目进行立项申请,并提交审批,查看所有的立项信息。主要功能包含:招标立项申请、非招标立项申请、采购立项管理。3、采购项目管理:可对项目采购过程全流程管

【Linux学习笔记】 - 常用指令学习及其验证(上)

前言:本文主要记录对Linux常用指令的使用验证。环境为阿里云服务器CentOS7.9。关于环境如何搭建等问题,大家可到同平台等各大资源网进行搜索学习,本文不再赘述。由于本人对Linux学习程度尚且较浅,本文仅介绍验证常用指令的常用功能,可能无法展现指令及附带选项的所有功能,大家若想对相关指令内容有更全貌的了解还有劳搜

【Django】掌握models.py模型文件的使用

原文作者:我辈李想版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。文章目录前言一、models类继承(一)创建apps文件夹(二)settings.py文件配置(三)新建BaseModel(四)项目中调用二、时间字段(一)时间字段(二)默认时间(三)时间字段允许为空三、选择字段(枚举)(一)选择的基础

用Python实现链式调用

嗨喽,大家好呀~这里是爱看美女的茜茜呐我们在使用Django的models查询数据库时,可以看到有这种写法:formapp.modelsimportXXXquery=XXX.objects.all()query=query.filter(name=123,age=456).filter(salary=999)在这种写法

RHCE——二十、Ansible及安装与配置

Ansible一、自动化运维的背景运维自动化二、自动化运维的体系结构及组成1、系统预备2、配置管理3、监控报警4、常用工具三、自动化运维的发展概述1、运维工作内容分类2、运维自动化标准化2.1物理设备层面2.2操作系统层面2.3应用服务层面2.4运维操作层面四、Ansible的介绍1、什么是Ansible1.1Ansi

睿趣科技:抖音开店的操作流程有哪些

随着社交媒体的兴起,抖音已经成为了一款备受欢迎的短视频分享平台。许多人看到了在抖音上开店的商机,因此抖音开店也逐渐成为了一种新兴的商业模式。那么,抖音开店的操作流程究竟有哪些呢?下面将为您详细介绍。第一步:选定产品和目标市场在开店之前,您需要先确定您要销售的产品或服务以及目标市场。了解您的受众群体是谁,他们的需求是什么

【微信小程序】最新隐私弹窗组件

程序员何苦为难程序员微信小程序又发布了新一波政策就是获取头像昵称位置啥啥各种用户信息的时候都需要先搞个弹窗让用户确认才行小程序用户隐私保护指引内容介绍必须跟上啊咱公司的大佬马上搞了个组件贴出来学习一下顺便给大家参考<!--components/privacy/privacy.wxml--><!--隐私弹窗--><vie

热文推荐