C++11的半同步半异步线程池

2023-09-19 16:03:04

简介

半同步半异步线程池用的比较多,实现也比较简单。

其中同步层包括同步服务层和排队层,指的是将接收的任务排队,将所有的任务排队到一个队列中,等待处理;

异步层指多个线程处理任务,异步处理层从同步层取出任务,并发处理任务。

在这里插入图片描述

同步队列

同步队列属于同步层的内容,主要作用是保证队列中共享数据线程安全,同时也提供新增任务的接口,以及提供取任务的接口。

这里使用C++11的锁、条件变量、右值引用、std::move和std::forward来实现。

同步队列主要包括三个函数,Take、Add和Stop。

Take函数

这里实现重载了两个Take函数,可支持一次获取多个任务,或者一次获取一个任务。

//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		list = std::move(m_queue);
		m_notFull.notify_one();
	}
	//获取单个任务
	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}

先创建一个unique *lock 获取 mutex,然后再通过条件变量 m_*notEmpty 来等待判断式。判断式由两个条件组成,一个是停止的标志,另一个是不为空的条件,当不满足任何一个条件时,条件变量会释放 mutex 并将线程置于 waiting 状态,等待其他线程调用 notify_one/notify all 将其唤醒;当满足任何一个条件时,则继续往下执行后面的逻辑,即将队列中的任务取出,并唤醒一个正处于等待状态的添加任务的线程去添加任务。当处于 waiting 状态的线程被 notify_one 或notify all 唤醒时,条件变量会先重新获取 mutex,然后再检查条件是否满足,如果满足,则往下执行,如果不满足,则释放 mutex 继续等待。

Add函数

Add 的过程和 Take 的过程是类似的,也是先获取 mutex,然后检查条件是否满足,不满足条件时,释放 mutex 继续等待,如果满足条件,则将新的任务插入到队列中,并唤醒取任务的线程去取数据。

template<typename F>
	void Add(F &&x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;
		m_queue.emplace_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}

Stop函数

Stop 函数先获取 mutex,然后将停止标志置为 true。注意,为了保证线程安全,这里需要先获取 mutex,在将其标志置为 true 之后,再唤醒所有等待的线,因为等待的条件是m_needStop,并且满足条件,所以线程会继续往下执行。由于线程在 m_needStop 为 true 时会退出,所以所有的等待线程会相继退出。

另外一个值得注意的地方是,我们把 m notFull.notify_all0放到lock_guard 保护范围之外了,这里也可以将 m_notFull.notify all0)放到ockguard保护范围之内,放到外面是为了做一点优化。因为 notify_one 或 notify_all 会唤醒一个在等待的线程,线程被唤醒后会先获取 mutex 再检查条件是否满足,如果这时被 lock guard保护,被唤醒的线程则需要 lock guard 析构释放 mutex 才能获取(即stop函数执行完了才释放)。如果在 lock_guard 之外notify_one 或notify_all,被唤醒的线程获取锁的时候不需要等待 lock_guard 释放锁,性能会好一点,所以在执行 notify_one或notify_all 时不需要加锁保护。

void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}

SyncQueue完整代码

”SyncQueue.h”

同步队列整体代码:

#pragma once
#include <iostream>
#include <list>
#include <mutex>

using namespace std;

template<typename T>
class SyncQueue
{
public:
	SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
	{
	}
	void Put(const T &x)
	{
		Add(x);
	}

	void Put(T &&x)
	{
		Add(std::forward<T>(x));
	}
	//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		list = std::move(m_queue);
		m_notFull.notify_one();
	}
	//获取单个任务
	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}
	void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}
	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	bool Full()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() == m_maxSize;
	}
	//可以获取任务数量
	int Count()
	{
		return m_queue.size();
	}
private:
	bool NotFull() const
	{
		bool full = m_queue.size() >= m_maxSize;
		if (full)
		{
			cout << "缓冲区满了,需要等待。。。" << endl;
		}
		return !full;
	}
	bool NotEmpty() const
	{
		bool empty = m_queue.empty();
		if (empty)
		{
			cout << "缓冲区空了,需要等待。。。,异步层的线程ID:" << this_thread::get_id() << endl;
		}
		return !empty;
	}
	template<typename F>
	void Add(F &&x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;
		m_queue.emplace_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}
private:
	std::list<T> m_queue; //缓冲区
	std::mutex m_mutex; //互斥量
	std::condition_variable m_notEmpty; //不为空的条件变量
	std::condition_variable m_notFull; //没有满的条件变量
	int m_maxSize; //同步队列最大的size
	bool m_needStop; //停止的标志
};

线程池

“ThreadPool.h”

线程池ThreadPool有3个成员变量,一个是线程组,这个线程组中的线程是预先创建的,应该创建多少个线程由外面传人,一般建议创建 CPU 核数的线程以达到最优的效率,线程组循环从同步队列中取出任务并执行,如果线程池为空,线程组将处于等待状态,等待任务的到来。

另一个成员变量是同步队列,它不仅用来做线程同步,还用来限制同步队列的上限,这个上限也是由使用者设置的。

第三个成员变量是用来停止线程池的,为了保证线程安全,我们用到了原子变量 atomic bool。下一节中将展示使用这个半同步半异步的线程池的实例。

#include<list>
#include<thread>
#include<functional>
#include<memory>
#include<atomic>
#include "SyncQueue.h"

const int MaxTaskCount = 100;
class ThreadPool
{
public:
	using Task = std::function<void()>;
	ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
	{
		Start(numThreads);
	}
	~ThreadPool(void)
	{
		Stop();
	}
	void Stop()
	{
		//保证多线程情况下只调用一次 StopThreadGroup
		std::call_once(m_flag, [this] {StopThreadGroup(); });
	}
	//可输入右值,例如lambda表达式
	void AddTask(Task&& task)
	{
		m_queue.Put(std::forward<Task>(task));
	}
	void AddTask(const Task& task)
	{
		m_queue.Put(task);
	}
	void Start(int numThreads)
	{
		m_running = true;
		//创建线程组
		for (int i = 0; i < numThreads; ++i)
		{
			m_threadgroup.emplace_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
		}
	}
private:
	
	void RunInThread()
	{
		while (m_running)
		{
			//取任务分别执行
			std::list<Task> list;
			m_queue.Take(list);

			for (auto& task : list)
			{
				if (!m_running)
					return;
				task();
			}
		}
	}
	void StopThreadGroup()
	{
		m_queue.Stop(); //让同步队列中的线程停止
		m_running = false; //置为false,让内部线程跳出循环并退出

		for (auto thread : m_threadgroup)
		{
			if (thread)
				thread->join();
		}
		m_threadgroup.clear();

	}
	std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
	SyncQueue<Task> m_queue; //同步队列
	atomic_bool m_running; //是否停止的标志
	std::once_flag m_flag;
};

主函数测试

#include <iostream>
#include "ThreadPool.h"
using namespace std;

void TestThdPool()
{
	ThreadPool pool(2);//创建一个2个线程的线程池

	//创建一个线程来添加10个任务1
	std::thread thd1([&pool] {
		for (int i = 0; i < 10; i++)
		{
			auto thdId = this_thread::get_id();
			pool.AddTask([thdId] {//添加任务可以使用lambda表达式,代码中实现了右值作为参数输入
				cout << "同步线程1的线程ID:" << thdId << endl;
			});
		}
	});
	//创建一个线程来添加20个任务2
	std::thread thd2([&pool] {
		for (int i = 0; i < 20; i++)
		{
			auto thdId = this_thread::get_id();
			pool.AddTask([thdId] {
				cout << "同步线程2的线程ID:" << thdId << endl;
			});
		}
	});

	this_thread::sleep_for(std::chrono::seconds(2));
	getchar();
	pool.Stop();
	thd1.join();
	thd2.join();
}
int main()
{
	TestThdPool();
	return 0;
}

运行结果:
在这里插入图片描述

更多推荐

汽车自适应巡航系统控制策略研究

摘要自适应巡航控制系统是高级驾驶辅助系统的重要组成部分,可以有效地减轻驾驶员操作负担,提高汽车的舒适性、燃油经济性以及道路通行率,具有重要的工程应用价值。控制策略是自适应巡航系统研究的核心内容,因此针对自适应巡航系统控制策略的研究具有重要意义。本文首先针对汽车自适应巡航控制系统的功能需求进行了分析,对固定车间时距算法和

中国汽车供应商远赴德国,中国智驾方案能否远渡重洋?

作者|Amy编辑|德新今年的上海车展,中国智能汽车的进步有目共睹,吸引了大批外企高管和研发人员的关注,甚至引发了海外车企一系列的动作和调整。而在刚刚结束的慕尼黑车展,中国车企及汽车供应链把「肌肉」秀到了现代汽车起源地——德国。此次在慕尼黑举办的IAAMOBILITY是世界五大车展之一,也是欧洲规模最大的国际性车展。这里

基于SSM+Vue的汽车售票网站的设计与实现

末尾获取源码开发语言:JavaJava开发工具:JDK1.8后端框架:SSM前端:采用Vue技术开发数据库:MySQL5.7和Navicat管理工具结合服务器:Tomcat8.5开发软件:IDEA/Eclipse是否Maven项目:是目录一、项目简介二、系统设计设计原则三、系统项目截图客运班次管理会员充值管理购票记录管

笔记本、台式机、平板二合一?Mac、Win、Linux?

电脑选型根据日常使用的需求进行选择,笔记本、台式机、平板和二合一电脑我都有尝试过,目前而言,最适合我个人的是笔记本。笔记本如果你犹豫笔记本和台式机,选择笔记本;如果你犹豫笔记本和二合一电脑,选择笔记本。笔记本电脑比较中规中矩,是绝大多数场景下都可以作为第一或者备选方案的选择。在选择笔记本的时候,需要重点考虑如下几个因素

MySQL MHA

目录概述MHAMHA的组成MHA的特点搭建MySQLMHA配置主从复制1.关防火墙,安全机制2.修改Master、Slave1、Slave2节点的主机名3.在Master、Slave1、Slave2添加主机映射关系4.修改Master、Slave1、Slave2节点的Mysql主配置文件/etc/my.cnf5.在Ma

Mac 安装ZooKeeper+kafka基本使用

为什么Kafka依赖ZooKeeper?下面ZooKeeper基本介绍:1、基本功能ZooKeeper为分布式系统提供了一种配置管理的服务:集中管理配置,即将全局配置信息保存在ZooKeeper服务中,方便进行修改和管理,省去了手动拷贝配置的过程,同时还保证了可靠和一致性。2、命名服务在分布式系统中,经常需要对应用或者

大数据驱动业务增长:数据分析和洞察力的新纪元

文章目录引言大数据分析的重要性1.数据驱动的决策2.洞察力和预测3.个性化服务大数据分析的关键组成部分1.数据收集2.数据存储3.数据清洗和预处理4.数据分析和建模5.数据可视化数据驱动业务增长的案例1.亚马逊的个性化推荐2.谷歌的广告优化3.零售业的库存管理数据驱动文化的建立1.数据教育和培训2.数据可访问性3.数据

log4j2原理分析及漏洞复现CVE-2021-44228

文章目录log4j2原理分析及漏洞复现0x01log4j2简介Log4j2特点Log4j2组件的应用0x02CVE-2021-44228漏洞简介:漏洞适用版本漏洞原理lookup功能jndi解析器jndi是什么ldap服务RMI0x03攻击过程0x04漏洞复现漏洞环境1.访问靶机2.dns回显验证3.将bash反弹sh

深入解析:自己实现 MyBatis 底层机制的任务阶段1 - 读取配置文件与建立数据库连接

😀前言.本文将深入探讨如何在自己实现MyBatis底层机制的过程中完成第一个任务阶段,即读取配置文件并建立数据库连接。这一关键步骤是了解MyBatis内部工作原理的第一步,也是自定义MyBatis底层机制的基础。.在任务阶段1中,我们将从头开始创建必要的配置文件和Java类,逐步解析配置文件,获取数据库连接,以为后续

单片机论文参考:5、基于单片机的自动打铃系统

摘要本次设计中的LED数码管电子时钟电路采用24小时制记时方式,本次设计采用AT89C51单片机的扩展芯片和6个PNP三极管做驱动,由三块LED数码管构成的显示系统,与传统的基于8/16位普通单片机的LED显示系统相比较,本系统在不显著地增加系统成本的情况下,可支持更多的LED数码管稳定显示。设计采用AT98C51单片

详解SpringBoot的常用注解

详解SpringBoot的常用注解在SpringBoot中,注解是一种非常重要的编程方式,它可以简化代码,提高开发效率。本文将详细介绍SpringBoot中的常用注解,以及它们的使用方法和场景。1.@SpringBootApplication1.1概述@SpringBootApplication是SpringBoot应

热文推荐