【Linux】线程池 | 自旋锁 | 读写锁

2023-09-19 19:05:29


一、线程池

1. 线程池模型和应用场景

线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

💕 线程池模型

线程池模型本质上也是生产者消费者模型,线程池的实现原理是:在线程池中预先准备好并创建一批线程,然后上层将任务push到任务队列中,休眠的线程如果检测到任务队列中有任务,就直接被操作系统唤醒,然后去消费并处理任务,唤醒一个线程的代价比创建一个线程的代价小的很多。

在这里插入图片描述

任务线程指的是生产者,任务队列指的是交易场所,右边的一大批线程指的是消费者,因此。线程池的本质还是生产消费模型。

💕 线程池的应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

2. 单例模式实现线程池(懒汉模式)

💕 ThreadPool.hpp

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"
using namespace std;

const static int N = 5;

// 将此代码设计成单例模式————懒汉模式

template <class T>
class ThreadPool
{
private:
	ThreadPool(int num = N) : _num(num)
	{
		pthread_mutex_init(&_lock, nullptr);
		pthread_cond_init(&_cond, nullptr);
	}
	ThreadPool(const ThreadPool<T>& tp) = delete;
	void operator=(const ThreadPool<T>& tp) = delete;
public:
	// 设计一个静态成员函数来返回创建的对象
	static ThreadPool<T>* getinstance()
	{
		if(_instance == nullptr)
		{
			LockGuard lockguard(&_instance_lock);
			{
				if(_instance == nullptr)
				{
					_instance = new ThreadPool<T>();
					_instance->init();
					_instance->start();
				}
			}
		}
		return _instance;
	}

	pthread_mutex_t *getlock()
	{
		return &_lock;
	}

	void threadWait()
	{
		pthread_cond_wait(&_cond, &_lock);
	}

	void threadWake()
	{
		pthread_cond_signal(&_cond);
	}

	bool isEmpty()
	{
		return _tasks.empty();
	}

	void init()
	{
		for (int i = 0; i < _num; i++)
		{
			_threads.push_back(Thread(i + 1, threadRoutine, this));
		}
	}

	void start()
	{
		for (auto &t : _threads)
		{
			t.run();
		}
	}

	void check()
	{
		for (auto &t : _threads)
			cout << t.threadname() << " running..." << endl;
	}

	static void threadRoutine(void *args)
	{
		ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
		while (true)
		{
			T t;
			// 检测此时有没有任务, 如果有任务就处理任务, 否则就挂起等待
			{
				LockGuard lockguard(tp->getlock());
				while (tp->isEmpty())
				{
					tp->threadWait();
				}
				t = tp->popTask();
			}
			t();
			cout << "thread handler done, result: " << t.formatRes() << endl;
		}
	}

	T popTask()
	{
		T t = _tasks.front();
		_tasks.pop();
		return t;
	}

	void pushTask(const T &t)
	{
		LockGuard lockguard(&_lock);
		_tasks.push(t);
		threadWake();
	}

	~ThreadPool()
	{
		for (auto &t : _threads)
		{
			t.join();
		}
		pthread_mutex_destroy(&_lock);
		pthread_cond_destroy(&_cond);
	}

private:
	vector<Thread> _threads;
	int _num;

	queue<T> _tasks; // 使用stl的自动扩容机制
	pthread_mutex_t _lock;
	pthread_cond_t _cond;

	static ThreadPool<T>* _instance;
	static pthread_mutex_t _instance_lock;
};

template<class T>
ThreadPool<T>* ThreadPool<T>::_instance = nullptr;

template<class T>
pthread_mutex_t ThreadPool<T>::_instance_lock = PTHREAD_MUTEX_INITIALIZER;

💕 Thread.hpp

#pragma once

#include <iostream>
#include <cstdlib>
#include <string>
#include <pthread.h>
using namespace std;

class Thread
{
public:
    typedef enum{
        NEW = 0,
        RUNNING,
        EXITED
    } ThreadStatus;
    typedef void (*func_t)(void*);

public:
    Thread(int num, func_t func, void* args) :_tid(0), _status(NEW),_func(func),_args(args)
    {
        char name[128];
        snprintf(name, 128, "thread-%d", num);
        _name = name;
    }

    int status(){ return _status; }
    string threadname(){ return _name; }

    pthread_t get_id()
    {
        if(_status == RUNNING)
            return _tid;
        else
            return 0;
    }

    static void* thread_run(void* args)
    {
        Thread* ti = static_cast<Thread*>(args);
        (*ti)();
        return nullptr;
    }

    void operator()()
    {
        if(_func != nullptr)
            _func(_args);
    }

    void run() // 封装线程运行
    {
        int n = pthread_create(&_tid, nullptr, thread_run, this);
        if(n != 0)
            exit(-1);
        _status = RUNNING; // 线程状态变为运行
    }

    void join() // 疯转线程等待
    {
        int n = pthread_join(_tid, nullptr);
        if(n != 0)
        {
            cout << "main thread join thread: " << _name << "error" << endl;
            return;
        }
        _status = EXITED;
    }

    ~Thread(){}
private:
    pthread_t _tid;
    string _name;
    func_t _func; // 线程未来要执行的回调
    void* _args;
    ThreadStatus _status;
};

💕 Task.hpp

#pragma once
#include <iostream>
#include <string>
using namespace std;

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op):_x(x), _y(y), _op(op), _result(0), _exitcode(0)
    {}
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y; 
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if(_y == 0)
                _exitcode = -1;
            else 
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if(_y == 0)
                _exitcode = -1;
            else 
                _result = _x % _y;
        }
        break;
        default:
            break;
        }
    }

    string formatArge()
    {
        return to_string(_x) + _op + to_string(_y) + "=";
    }

    string formatRes()
    {
        return to_string(_result) + "(" + to_string(_exitcode) + ")";
    }

    ~Task()
    {}

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitcode;
};

💕 lockGuard.hpp

#pragma once

#include <iostream>
#include <pthread.h>

using namespace std;

class Mutex // 自己不维护锁,有外部传入
{
public:
    Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmutex);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *_pmutex;
};

class LockGuard // 自己不维护锁,有外部传入
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        _mutex.lock();
    }
    ~LockGuard()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

💕 main.cc

#include "ThreadPool_V4.hpp"
#include "Task.hpp"
#include <memory>

const string ops = "+-*/%";

int main()
{
    srand(time(nullptr) ^ getpid());

    while(true)
    {
        sleep(1);
        int x = rand() % 100;
        int y = rand() % 100;
        char op = ops[(x + y) % ops.size()];
        Task t(x, y, op);
		ThreadPool<Task>::getinstance()->pushTask(t);
        // tp->pushTask(t);
        cout << "the question is what: " << t.formatArge() << " ? " << endl;
    }
    return 0;
}

在这里插入图片描述


二、其他常见的锁

1. STL、智能指针和线程安全

💕 STL中的容器是否是线程安全的?

不是;原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响,而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全。

💕 智能指针是线程安全的吗?

智能指针是线程安全的吗?unique_ptr 是和资源强关联,只是在当前代码块范围内生效,因此不涉及线程安全问题。对于 shared_ptr,多个对象需要共有一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候也考虑到了这个问题,就基于原子操作(Compare And Swap(CAS)) 的方式保证 shared_ptr 能够高效原子地操作引用计数。shared_ptr 是线程安全的,但不意味着对其管理的资源进行操作是线程安全的,所以对 shared_ptr 管理的资源进行操作时也可能需要进行加锁保护。


2. 其他常见的锁

  • 悲观锁:悲观锁做事比较悲观,它认为多线程同时修改共享资源的概率比较高,于是很容易出现冲突,所以访问贡献资源前,先要进行加锁保护。常见的悲观锁有:互斥锁、自旋锁和读写锁等。
  • 乐观锁:乐观锁做事比较乐观,它乐观地认为共享数据不会被其他线程修改,因此不上锁。它的工作方式是:先修改完共享数据,再判断这段时间内有没有发生冲突。如果其他线程没有修改共享数据,那么则操作成功。如果发现其他线程已经修改该共享数据,就放弃本次操作。乐观锁全程并没有加锁,所以它也叫无锁编程。乐观锁主要采取两种方式:版本号机制(Gitee等)和 CAS 操作。乐观锁虽然去除了加锁和解锁的操作,但是一旦发生冲突,重试的成本是很高的,所以只有在冲突概率非常低,且加锁成本非常高的场景下,才考虑使用乐观锁。
  • CAS 操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁:使用自旋锁的时候,当多线程发生竞争锁的情况时,加锁失败的线程会忙等待(这里的忙等待可以用 while 循环等待实现),直到它拿到锁。而互斥锁加锁失败后,线程会让出 CPU 资源给其他线程使用,然后该线程会被阻塞挂起。如果临界区代码执行时间过长,自旋的线程会长时间占用 CPU 资源,所以自旋的时间和临界区代码执行的时间是成正比的关系。如果临界区代码执行的时间很短,就不应该使用互斥锁,而应该选用自旋锁。因为互斥锁加锁失败,是需要发生上下文切换的,如果临界区执行的时间比较短,那可能上下文切换的时间会比临界区代码执行的时间还要长。

三、读者写者问题

1. 读者写者模型

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢?

这就需要我们的读者写者模型出场了,读者写者模型其实也是维护321原则;三种关系:读者与读者、读者与写者、写者与写者。两种对象:读者和写者。一个交易场所:需要写入和从中读取的缓冲区。

下面我们来看一下读者写者模型的三种关系:

  • 读者与读者:没有关系
  • 读者与写者:互斥与同步
  • 写者与写者:互斥

那么,为什么在生产者消费者模型中,消费者和消费者是互斥关系,而在读者写者问题中,读者和读者之间没有关系呢?

读者写者模型和生产者消费者模型的最大区别就是:消费者会将数据拿走,而读者不会拿走数据,读者仅仅是对数据做读取,并不会进行任何修改的操作,因此共享资源也不会因为有多个读者来读取而导致数据不一致的问题。


2. 读写锁

在读者写者模型中,pthread库为我们提供了 读写锁 来维护其中的同步与互斥关系。读写锁由读锁写锁两部分构成,如果只读取共享资源用读锁加锁,如果要修改共享资源则用写锁加锁。所以,读写锁适用于能明确区分读操作和写操作的场景。

读写锁的工作原理:

当写锁没有被写线程持有时,多个读线程能够并发地持有读锁,这大大提高了共享资源的访问效率。因为读锁是用于读取共享资源的场景,所以多个线程同时持有读锁也不会破坏共享资源的数据。但是,一旦写锁被写进程持有后,读线程获取读锁的操作会被阻塞,而其它写线程的获取写锁的操作也会被阻塞。

伪代码:

// 写者进程/线程执行的函数
void Writer()
{
	while(true)
	{
		P(wCountMutex); // 进入临界区
		if(wCount == 0)
			P(rMutex); // 当第一个写者进入,如果有读者则阻塞读者
		wCount++;// 写者计数 + 1
		V(wCountMutex); // 离开临界区

		P(wDataMutex); // 写者写操作之间互斥,进入临界区
		write(); // 写数据
		V(wDataMutex); // 离开临界区

		P(wCountMutex); // 进入临界区
		wCount--; // 写完数据,准备离开
		if(wCount == 0)
		{
			V(rMutex);  // 最后一个写者离开了,则唤醒读者
		}
		V(wCountMutex); //离开临界区
	}
}

// 读者进程/线程执行的次数
void reader()
{
	while(TRUE)
	{
		P(rMutex);
		P(rCountMutex); // 进入临界区
		if ( rCount == 0 )
			P(wDataMutex); // 当第一个读者进入,如果有写者则阻塞写者写操作
		rCount++;
		V(rCountMutex); // 离开临界区
		V(rMutex);
		read( ); // 读数据

		P(rCountMutex); // 进入临界区
		rCount--;
		if ( rCount == 0 )
			V(wDataMutex); // 当没有读者了,则唤醒阻塞中写者的写操作
		V(rCountMutex); // 离开临界区
	}
}

在这里插入图片描述

初始化

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);

销毁

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

加锁和解锁

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

读者写者问题很明显会存在读者优先还是写者优先的问题,如果是读者优先的话,可能就会带来写者饥饿的问题。而写者优先可以保证写线程不会饿死,但如果一直有写线程获取写锁,那么读者也会被饿死。所以使用读写锁时,需要考虑应用场景。读写锁通常用于数据被读取的频率非常高,而被修改的频率非常低。注:Linux 下的读写锁默认是读者优先的。

更多推荐

建造者模式

1、概述在软件开发过程中有时需要创建一个复杂的对象,这个复杂对象通常由多个子部件按一定的步骤组合而成。例如,计算机是由CPU、主板、内存、硬盘、显卡、机箱、显示器、键盘、鼠标等部件组装而成的,采购员不可能自己去组装计算机,而是将计算机的配置要求告诉计算机销售公司,计算机销售公司安排技术人员去组装计算机,然后再交给要买计

微服务是个坏主意吗?

曾几何时,我记得我的手指疯狂地敲打键盘,与庞大而杂乱的代码库搏斗。那是巨石的时代,代码就像古老的城堡一样,由一块块石头砌成一个令人印象深刻的庞然大物。几年过去了,时代变了。开发人员口中的流行语变成了“微服务”。微服务革命——承诺成为我们的救世主。我们被告知,通过将庞然大物分割成更小、自包含的独立服务,我们将获得无与伦比

Dynatrace获评《2023 Gartner® 应用性能监控(APM)和可观测性魔力象限™》“领导者”,前瞻性与执行力均表现卓越

中国上海,2023年9月12日——统一的可观测性和安全解决方案公司Dynatrace(纽交所代码:DT)日前宣布在《2023Gartner应用性能监控(ApplicationPerformanceMonitoring,APM)和可观测性魔力象限》报告中获评“领导者”。Gartner对19家厂商进行了评测,Dynatra

数据采集:数据挖掘的基础

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️🐴作者:秋无之地🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、留言💬、关注🤝,关注必回关上一篇文章已经跟大家介绍过

简单讲讲在一台机器上用docker部署hadoop HDFS

为什么写这篇文章?老东西叫我用vmvare部署hadoop,我觉得这简直蠢毙了,让我们用docker和docker-compose来快速的过一遍如何使用docker-compose来部署简单的hadoop集群范例写在前面,一定要看我!!!还有注意!Hadoop中的主机名不能带-或者_注意了!一定注意存储空间大小,确保机

【C++】map与set的封装

文章目录前言正文1.类型的泛化2.仿函数3.迭代器3.1正向迭代器3.1.1++3.1.2--3.1.3*3.1.4->3.1.5!=完整版代码4.[](map)框架1.红黑树2.set3.map总结前言在学习了红黑树之后,我们便可以尝试初步的在红黑树的基础上封装出map与set,好了,话不多说,进入今天的学习吧!所需

AVL 树

文章目录一、AVL树的概念二、AVL树的实现1.AVL树的存储结构2.AVL树的插入一、AVL树的概念在二叉搜索树中,当我们连续插入有序的数据时,二叉搜索树可能会呈现单枝树的情况,此时二叉搜索树的查找效率为O(N)俄罗斯的两位数学家G.M.Adelson-Velsky和E.M.Landis发明了AVL树可以解决上述问题

深度学习之模型压缩、加速模型推理

简介当将一个机器学习模型部署到生产环境中时,通常需要满足一些在模型原型阶段没有考虑到的要求。例如,在生产中使用的模型将不得不处理来自不同用户的大量请求。因此,您将希望进行优化,以获得较低的延迟和/或吞吐量。延迟:是任务完成所需的时间,就像单击链接后加载网页所需的时间。它是开始某项任务和看到结果之间的等待时间。吞吐量:是

Linux内核源码分析 (B.9)深度解读 Linux 内核级通用内存池 —— kmalloc 体系

Linux内核源码分析(B.9)深度解读Linux内核级通用内存池——kmalloc体系文章目录Linux内核源码分析(B.9)深度解读Linux内核级通用内存池——kmalloc体系1\.kmalloc内存池中都有哪些尺寸的内存块2\.kmalloc内存池如何选取合适尺寸的内存块3\.kmalloc内存池的整体架构4

计网第五章(运输层)(六)(TCP可靠传输的实现)

目录一、基本概述二、具体实现1.前后沿:2.利用指针描述发送窗口的状态3.有差错情况之前在数据链路层时已经讨论过可靠传输(计网第三章(数据链路层)(二)(可靠传输)),也在其中提到过可靠传输并不局限于数据链路层。一、基本概述TCP通过以字节为单位的滑动窗口来实现可靠传输。可靠传输的概念在之前已经提到过,这里不再做赘述。

计算机视觉与深度学习-卷积神经网络-纹理表示&卷积神经网络-卷积神经网络-[北邮鲁鹏]

这里写目录标题参考文章全连接神经网络全连接神经网络的瓶颈全连接神经网络应用场景卷积神经网络卷积层(CONV)卷积核卷积操作卷积层设计卷积步长(stride)边界填充特征响应图组尺寸计算激活层池化层(POOL)池化操作定义池化操作作用池化层超参数常见池化操作全连接层(FC)样本增强翻转随机缩放&抠图色彩抖动参考文章计算机

热文推荐