netty之ObjectPool(对象池)

2023-09-20 10:12:33

对象池和我们的连接池一样就是对象放入一个池中循环使用。特别是在netty创建ByteBuf的时候buf循环使用大大减小了频繁创建对象,垃圾收集的压力。特别是在使用直接内存的时候。

netty的对象池对象 RecyclerObjectPool extends ObjectPool。RecyclerObjectPool只是外层抽象,实际对象池由其内部的Recycler对象来维护管理。对象池有两个重要操作对象的获取和回收,下面就从Recycler看起对象池。

获取对象

Recycler.get()

public final T get() {
    //recycler.get-1
    if (maxCapacityPerThread == 0) {//每个线程最大容量
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    //recycler.get-2
    LocalPool<T> localPool = threadLocal.get();
    DefaultHandle<T> handle = localPool.claim();
    T obj;
    if (handle == null) {
        handle = localPool.newHandle();
        if (handle != null) {
            obj = newObject(handle);
            handle.set(obj);
        } else {
            obj = newObject((Handle<T>) NOOP_HANDLE);
        }
    } else {
        obj = handle.get();
    }

    return obj;
}

1、判断对象池容量

get方法首先判断maxCapacityPerThread 每个线程对象最大容量,这个值的初始化在Recycler静态块里

static {
    // In the future, we might have different maxCapacity for different object types.
    // e.g. io.netty.recycler.maxCapacity.writeTask
    //      io.netty.recycler.maxCapacity.outboundBuffer
    int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
            SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
    if (maxCapacityPerThread < 0) {
        maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
    }
}

会读取"io.netty.recycler.maxCapacityPerThread"和"io.netty.recycler.maxCapacity"配置,如果都没有才哦那个

DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD变量值,该静态变量初始值是4 * 1024。所以这里一般不为0.

2、获取LocalPool

通过threadLocal变量get获取localPool。

LocalPool<T> localPool = threadLocal.get();

这里threadLocal变量不是直接jdk里的ThreadLocal实例,是进行了一层包装

private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
    @Override
    protected LocalPool<T> initialValue() {
        return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
    }

    @Override
    protected void onRemoval(LocalPool<T> value) throws Exception {
        super.onRemoval(value);
        MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
        value.pooledHandles = null;
        handles.clear();
    }
};

threadLocal是FastThreadLocal类型。其内部真正ThreadLocal是 ThreadLocal类型。InternalThreadLocalMap是存储线程变量实际对象。其内部有一个Object[] indexedVariables数组用来存储数据。

FastThreadLocal.get方法

public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    //按当前FastThreadLocal的index从InternalThreadLocalMap获取变量值
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    //初始化线程变量
    return initialize(threadLocalMap);
}

首先从threadlocal里获取InternalThreadLocalMap,然后按照当前FastThreadLocal的索引获取其对应的数据对象。如果没有获取到则调用initialize方法初始化数据对象。这里initialize方法在上面new FastThreadLocal时候进行重写了,最后是执行new LocalPoo()新建了一个LocalPool。

FastThreadLocal.index变量是在构造方法里初始化的

public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
}

InternalThreadLocalMap里有一个AtomicInteger类型的变量用来获取唯一的index。这样最后每个FastThreadLocal有唯一一个索引,InternalThreadLocalMap内有一个Object[] 用来存储每个FastThreadLocal对应的线程变量。

InternalThreadLocalMap类主要结构:

class InternalThreadLocalMap{
  //真正的threadlocal变量
  ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
            new ThreadLocal<InternalThreadLocalMap>();
  //fastThreadlocal获取唯一index            
  AtomicInteger nextIndex = new AtomicInteger();
  //存储每个fastThreadLocal.index对应的变量
  Object[] indexedVariables;
}

看一下LocalPool的构造方法

LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
    this.ratioInterval = ratioInterval;
    if (BLOCKING_POOL) {
        pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
    } else {
        pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
    }
    ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
}

这里看到pooledHandles是一个队列类型。根据是否是阻塞对象池创建BlockingMessageQueue或MessagePassingQueue类型队列实例。队列最大容量就是我们最开始看的maxCapacityPerThread。

3、对象实例获取

然后掉用队列的claim方法去获取一个handle

claim方法:

DefaultHandle<T> claim() {
    MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
    if (handles == null) {
        return null;
    }
    DefaultHandle<T> handle;
    do {
        handle = handles.relaxedPoll();
    } while (handle != null && !handle.availableToClaim());
    return handle;
}

循环从队列中获取一个handle,handle可用条件:handle不为空且当前状态也不是claim的。 这时候队列是空获取handle为空,接着localPool.newHandle()创建一个handle。没有什么特殊除了,就是创建了一个DefaultHandle实例,将localPool引用传给handle

DefaultHandle(LocalPool<T> localPool) {
    this.localPool = localPool;
}

然后会调用recycler.newObject()方法去创建对象。newObject是一个抽象方法,在创建recycler实例的时候要实现该方法。也就是我们在这个方法里定义自己对象实例化方法。其有一个入参handle,由handle可以获取localPool的引用,后面我们就可以当前对象实例执行对象回收。

通过newObject方法创建实例对象后,在返回该实例前,还会调用handle.set(obj)方法将对象放到handle里,方便下次直接从handle里get对象。

到这里对象创建了,但是对象池里好像还没有保留对象的引用,这个需要继续看对象的回收。

回收对象

对象的回收通过handle.recylce()方法来执行。上面newObject方法创建对象实例的时候入参会传入handle,所以可以通过对象实例本身发起对象的回收。同时我们可以将当前必要的属性值进行清理以满足下次重复使用。

handle.recycle方法

public void recycle(Object object) {
    if (object != value) {//判断当前对象和set的值是否相等
        throw new IllegalArgumentException("object does not belong to handle");
    }
    //调用localPool release方法回收对象
    localPool.release(this);
}

release方法:

void release(DefaultHandle<T> handle) {
    MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
    handle.toAvailable();//修改状态标识
    if (handles != null) {
        //handle入队
        handles.relaxedOffer(handle);
    }
}

release方法真正将handle入队,也就是对象放入线程池。

这里看到和我们的数据库连接池稍微不同是,不是先初始化几个连接放到池子里。而是先创建使用,然后回收的时候才入池。

这里总结一下对象属性关系:

对象池存放在threadLocal变量里,是一个LocalPool对象实例。LocalPool内部使用一个Queue来保存对象。Queue不是直接保留的对象实例,是保留的Handle实例。

ThreadLocal<LocalPool{
  Queue<DefaultHandle<Object>>
}>

大致关系如上

使用实例

1、定义一个简单对象

private static class User {
    private String name;
    private Recycler.Handle handle;
    public User(Recycler.Handle handle){
        this.handle = handle;
    }

    public void recycle(){
        this.name = null;
        //TODO 对象必要清理
        handle.recycle(this);
    }
}

这里我们用一个属性来保留handle的引用,主要是是在recycle里调起回收方法。

2、Recycler创建

static final Recycler<User> recycler = new Recycler<User>() {
    @Override
    protected User newObject(Handle<User> handle) {
        return new User(handle);
    }
};

重写其newObject方法。

3、对象池使用

User user1 = recycler.get();
user1.recycle();
User user2 = recycler.get();
System.out.println(user1 == user2);

这里可以看到user1和user2是同一个实例引用。

更多推荐

LoGoNet:基于局部到全局跨模态融合的精确 3D 目标检测

论文地址:https://arxiv.org/abs/2303.03595论文代码:https://github.com/sankin97/LoGoNet论文背景激光雷达传感器点云通常是稀疏的,无法提供足够的上下文来区分远处的区域,从而造成性能次优。激光雷达-摄像机融合方法在三维目标检测中表现出了良好的性能。目前先进的

Flask狼书笔记 | 08_个人博客(上)

文章目录8个人博客8.1大型项目结构8.2编写程序骨架8.3编写博客前台8个人博客个人博客是一个典型的CMS(内容管理系统),通常包含前台和后台两部分。这一张将涉及更高级的项目组织方式,以及一些新的Python包:Flask-Login,Unidecode。8.1大型项目结构本章将学习使用蓝本,和工厂函数,来进一步组织

设计模式:策略模式

目录组件代码示例源码中应用优缺点总结策略模式(StrategyPattern)是一种行为型设计模式,它允许在运行时根据不同的情况选择不同的算法或行为。策略模式通过将算法封装成独立的策略类,使得它们可以互相替换,而不影响客户端的使用。组件在策略模式中,有三个核心组件:策略接口(StrategyInterface):定义了

ARTS打卡第三周

概述infoq的arts打卡学习,贯彻左耳朵耗子的学习理念,活到老学到老,每天都精进一点,上个星期没有写打卡文档,只能用工作太忙为借口为自己开脱了一、Algorithm一道算法题最近工作使用算法场景较少,基本上是基于数据统计对系统进行优化,因此结合工作的算法暂时没有,最近在学习go,就用go来写一些经典算法吧,再配上单

机器学习实战:Python基于LR线性回归进行预测(十)

文章目录1前言1.1LR的介绍1.2LR的应用2.weather数据集实战演示2.1导入函数2.2导入数据2.3数据整体可视化2.4训练模型2.5预测模型2.6评估模型3.讨论1前言注意这里的LR指的是LinearRegression,线性回归。而非逻辑回归LogisticRegression,虽然二者简称都是LR,但

《DevOps实践指南》- 读书笔记(五)

DevOps实践指南Part4第二步:反馈的技术实践14.建立能发现并解决问题的遥测系统14.1建设集中式监控架构14.2建立生产环境的应用程序日志遥测14.3使用遥测指导问题的解决14.4将建立生产遥测融入日常工作14.5建立自助访问的遥测和信息辐射器14.6发现和填补遥测的盲区14.6.1应用程序和业务度量指标14

【窗体】Winform两个窗体之间通过委托事件进行值传递,基础篇

2023年,第38周。给自己一个目标,然后坚持总会有收货,不信你试试!在实际项目中,我们可能会用到一些窗体做一些小工具或者小功能。比如:运行程序,在主窗体A基础上,点击某个按钮希望能够弹出B窗体,在B窗体完成操作后,希望能够把值传递回到A窗体,然后进行其他业务操作。目录一、显示线上图片二、显示本地图片三、A窗体打开B窗

西瓜书读书笔记整理(六)—— 第六章 支持向量机

第六章支持向量机6.1间隔与支持向量6.1.1什么是支持向量机6.1.2支持向量与间隔6.1.3支持向量机的求解过程6.2对偶问题(dualproblem)6.2.1什么是对偶问题6.2.2如何求解支持向量机的对偶问题6.3核函数(kernelfunction)6.3.1什么是支持向量机的核函数6.3.2常见的几种核函

[NLP] LLM---<训练中文LLama2(五)>对SFT后的LLama2进行DPO训练

当前关于LLM的共识大型语言模型(LLM)使NLP中微调模型的过程变得更加复杂。最初,当ChatGPT等模型首次出现时,最主要的方法是先训练奖励模型,然后优化LLM策略。从人类反馈中强化学习(RLHF)极大地推动了NLP的发展,并将NLP中许多长期面临的挑战抛在了一边。基于人类反馈的强化学习(Reinforcement

算法通关村-----数组中元素出现次数问题

数组中出现次数超过一半的数字问题描述数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。你可以假设数组是非空的,并且给定的数组总是存在多数元素。详见剑指offer39问题分析最直接的方式就是使用hashMap,遍历给定数组,将数字和对应出现次数存储在hashMap中,然后再遍历hashMap,找到出现次数最大

代码随想录算法训练营第三十八天|理论基础 ● 509. 斐波那契数 ● 70. 爬楼梯 ● 746. 使用最小花费爬楼梯

理论基础无论大家之前对动态规划学到什么程度,一定要先看我讲的动态规划理论基础。如果没做过动态规划的题目,看我讲的理论基础,会有感觉是不是简单题想复杂了?其实并没有,我讲的理论基础内容,在动规章节所有题目都有运用,所以很重要!如果做过动态规划题目的录友,看我的理论基础就会感同身受了。代码随想录视频:从此再也不怕动态规划了

热文推荐