C++多线程的用法(包含线程池小项目)

2023-09-13 11:45:04

一些小tips: 

编译命令如下:

 g++ 7.thread_pool.cpp -lpthread

查看运行时间:

time ./a.out

 获得本进程的进程id:

this_thread::get_id()

 需要引入的库函数有:

#include<thread> // 引入线程库
#include<mutex> // 加入锁机制需要引入库函数mutex
#include<condition_variable> // 引入信号量机制

定义信号量、锁:

condition_variable m_cond
std::mutex m_mutex;

所谓的多线程只不过就是指定的某一个函数为入口函数,的另外一套执行流程。

什么是临界资源?多线程情况下,大家都能访问到的资源。

进程是资源分配的最基本单位,线程是进程中的概念。

线程也是操作系统分配的一批资源。一个线程的栈所占用的空间有多大?——8M。查看命令:

ulimit -a

简单用法:

#include<iostream>
#include<thread>
using namespace std;

#define BEGINS(x) namespace x{
#define ENDS(x) }

BEGINS(thread_usage)
void func() {
    cout << "hello wolrd" << endl;
    return ;
}
int main() {
    thread t1(func);  // t1已经开始运行了
    t1.join();        // 等待t1线程结束
    return 0;
}
ENDS(thread_usage)

int main() {
    thread_usage::main();

    return 0;
}

那么如何给入口函数传参呢? 在C++中就很简单:

void print(int a, int b) {
    cout << a << " " << b << endl;
    return ;
}
int main() {
    thread t2(print, 3, 4);
    t2.join();
    return 0;
}

多线程封装思维:在相应的功能函数内部,不要去访问全局变量,类似于一个原子操作的功能。本身就支持多线程并行。

多线程程序设计:线程功能函数和入口函数要分开。(高内聚低耦合)

能不用锁就不用锁,因为这个锁机制会给程序运行效率带来极大的影响。

实现素数统计的功能

不加锁版:

#include<iostream>
#include<thread>
#include<cmath>
#include<mutex>
using namespace std;

#define BEGINS(x) namespace x{
#define ENDS(x) }

BEGINS(is_prime)
bool is_prime(int x) {
    for (int i = 2, I = sqrt(x); i <= I; i++) {
        if (x % i == 0) return false;
    }
    return true;
}
// 多线程——功能函数 
int prime_count(int l, int r) {
    // 从l到r范围内素数的数量
    int ans = 0;
    for (int i = l; i <= r; i++) {
        ans += is_prime(i);
    }
    return ans;
}
// 多线程——入口函数 
void worker(int l, int r, int &ret) {
    cout << this_thread::get_id() << "begin" << endl;
    ret = prime_count(l, r);
    cout << this_thread::get_id() << "dnoe" << endl;
}
int main() {
    #define batch 500000
    thread *t[10];
    int ret[10];
    for (int i = 0, j = 1; i < 10; i++, j += batch) {
        t[i] = new thread(worker, j, j + batch - 1, ref(ret[i]));
    }
    for (auto x : t) x->join();
    int ans = 0;
    for (auto x : ret) ans += x;
    for (auto x : t) delete x;
    cout << ans << endl;
    #undef batch
    return 0;
}
ENDS(is_prime)

int main() {
    // thread_usage::main();
    is_prime::main();
    return 0;
}

加锁版:

#include<iostream>
#include<thread>
#include<cmath>
#include<mutex>
using namespace std;

#define BEGINS(x) namespace x{
#define ENDS(x) }
BEGINS(prime_count2) 
int ans = 0;
std::mutex m_mutex;
bool is_prime(int x) {
    for (int i = 2, I = sqrt(x); i <= I; i++) {
        if (x % i == 0) return false;
    }
    return true;
}
void prime_count(int l, int r) {
    cout << this_thread::get_id() << " begin\n";
    for (int i = l; i <= r; i++) {
        std::unique_lock<std::mutex> lock(m_mutex); // 临界区
        ans += is_prime(i);
        lock.unlock();    
    }
    cout << this_thread::get_id() << " done\n";
    return ;
}
int main() {
    #define batch 500000
    thread *t[10];
    for (int i = 0, j = 1; i < 10; i++, j += batch) {
        t[i] = new thread(prime_count, j, j + batch - 1);
    }
    for (auto x : t) x->join();
    for (auto x : t) delete x;
    cout << ans << endl;
    #undef batch
    return 0;
}
ENDS(prime_count2)

int main() {
    prime_count2::main();
    return 0;
}

为什么不用++而是用+=

因为后者是原子操作,而前者不是,在多线程情况下可能存在覆盖写的问题。

__sync_fetch_and_add

这个函数也是原子操作。

#include<iostream>
#include<thread>
#include<cmath>
#include<mutex>
using namespace std;

#define BEGINS(x) namespace x{
#define ENDS(x) }

BEGINS(thread3)
int ans = 0;
bool is_prime(int x) {
    for (int i = 2, I = sqrt(x); i <= I; i++) {
        if (x % i == 0) return false;
    }
    return true;
}
void prime_count(int l, int r) {
    cout << this_thread::get_id() << "begin\n";
    for (int i = l; i <= r; i++) {
        int ret = is_prime(i);
        __sync_fetch_and_add(&ans, ret);
    }
    cout << this_thread::get_id() << "done\n";
}
int main() {
    #define batch 500000
    thread *t[10];
    for (int i = 0, j = 1; i < 10; i++, j += batch) {
        t[i] = new thread(prime_count, j, j + batch - 1);
    }
    for (auto x : t) x->join();
    for (auto x : t) delete x;
    cout << ans << endl;
    #undef batch 
    return 0;
}
ENDS(thread3)

int main() {
    thread3::main();
    return 0;
}

根据你提供的输出结果 `./a.out 2.06s user 0.02s system 385% cpu 0.538 total`,可以对各项含义进行解释:

  • - `2.06s user`:表示程序在用户态消耗的CPU时间为2.06秒。这是程序执行期间用于执行用户代码的时间。
  • - `0.02s system`:表示程序在内核态消耗的CPU时间为0.02秒。这是程序执行期间用于执行内核代码的时间。
  • - `385% cpu`:表示程序使用了385%的CPU资源。这个值超过100%是因为程序可能在多个CPU核心上并行执行。所以 总时间 < 用户态+内核态时间 是非常正常的。
  • - `0.538 total`:表示程序从开始执行到结束的总时间为0.538秒。这是包括了实际执行时间、等待时间和其他非CPU消耗的时间。

综上所述,这个输出结果告诉你程序的执行时间情况。用户态和内核态的CPU时间可以帮助你了解程序在不同执行环境中的资源消耗情况。CPU使用率告诉你程序在执行期间使用了多少CPU资源。总时间则是程序的整体执行时间。

线程池的实现

我们希望申请内存的动作是可控的。线程作为一种内存资源,在通常的设计模式下,申请线程资源并不可控。

解决办法就是线程池。在线程池内部,线程数量是可控的。把计算任务打包,扔到一个任务队列中。

什么是计算任务?——分为过程(函数方法)和数据(函数参数),如何进行打包——bind()方法。

线程池解决了传统的多线程程序设计中,面对不同任务我们需要实现不同程序的麻烦:而是直接往里塞任务就行了。这样就实现了资源的有效利用和管控。

m_cond.notify_one() 对于这个进程来说,是否同时也解锁了互斥资源?

是的,m_cond.notify_one()会解锁互斥资源。在调用m_cond.notify_one()之前,通常会先调用m_mutex.lock()来锁定互斥资源,然后在适当的时候调用m_cond.notify_one()来唤醒等待该条件的线程。在唤醒线程后,该线程会重新尝试获取互斥资源的锁,而其他等待的线程会继续等待。所以,m_cond.notify_one()不仅唤醒等待的线程,还会解锁互斥资源,使得等待的线程有机会获取互斥资源的锁。

子进程的中wait函数对互斥量进行解锁,同时线程进入阻塞或者等待状态。

/*************************************************************************
        > File Name: threadpool.cpp
        > Author: jby
        > Mail: 
        > Created Time: Wed 13 Sep 2023 08:48:23 AM CST
 ************************************************************************/

#include<iostream>
#include<cmath>
#include<vector>
#include<unordered_map>
#include<queue>
#include<mutex>
#include<thread>
#include<condition_variable>
#include<functional>
using namespace std;

#define BEGINS(x) namespace x {
#define ENDS(x) }

BEGINS(thread_pool_test)
class Task {
public:
    template<typename FUNC_T, typename ...ARGS>
    Task(FUNC_T func, ARGS ...args) {
        this->func = bind(func, forward<ARGS>(args)...);
    }
    void run() {
        func();
        return ;
    }
private:
    function<void()> func; // 任意函数
};
class ThreadPool {
public:
    ThreadPool(int n = 1) : thread_size(n), threads(n), starting(false) {
        this->start();
        return ;
    }
    void worker() {
        auto id = this_thread::get_id(); // 获得本进程号
        running[id] = true;
        while (running[id]) {
            // 取任务
            Task *t = get_task(); 
            t->run();
            delete t;
        }
        return ;
    }
    void start() {
        if (starting == true) return ; // 如果已经开始了就不用启动了
        for (int i = 0; i < thread_size; i++) {
            threads[i] = new thread(&ThreadPool::worker, this);
        }
        starting = true;
        return ;
    }
    template<typename FUNC_T, typename ...ARGS>
    void add_task(FUNC_T func, ARGS ...args) {
        unique_lock<mutex> lock(m_mutex); 
        tasks.push(new Task(func, forward<ARGS>(args)...)); // 任务池相当于临界资源
        m_cond.notify_one(); // 生产者消费者模型
        return ;
    }
    void stop() {
        if (starting == false) return ; // 如果已经关了就不用再关了
        for (int i = 0; i < threads.size(); i++) { // 往队列末尾投递毒药任务
            this->add_task(&ThreadPool::stop_runnnig, this);
        }
        for (int i = 0; i < threads.size(); i++) {
            threads[i]->join();
        }
        for (int i = 0; i < threads.size(); i++) {
            delete threads[i]; // 释放那片进程剩余的空间
            threads[i] = nullptr; // 进程指针指向空
        }
        starting = false;
        return ;
    }
    ~ThreadPool() {
        this->stop();
        while (!tasks.empty()) { // 如果任务队列里还有任务没执行完,全部丢弃
            delete tasks.front();
            tasks.pop();
        }
        return ;
    }
private:
    bool starting;
    int thread_size;
    Task *get_task() {
        unique_lock<mutex> lock(m_mutex);
        while (tasks.empty()) m_cond.wait(lock); // 生产者消费者模型 //子进程的中wait函数对互斥量进行解锁,同时线程进入阻塞或者等待状态。
        Task *t = tasks.front();
        tasks.pop();
        return t;
    }
    std::mutex m_mutex;
    std::condition_variable m_cond;
    vector<thread *> threads; // 线程池子
    unordered_map<decltype(this_thread::get_id()), bool> running; // 进程号到运行状态的哈希映射
    queue<Task *> tasks;  // 任务队列
    void stop_runnnig() { // 毒药任务
        auto id = this_thread::get_id();
        running[id] = false;
        return ;
    }
};
bool is_prime(int x) {
    for (int i = 2, I = sqrt(x); i <= I; i++) {
        if (x % i == 0) return false;
    }
    return true;
}
int prime_count(int l, int r) {
    int ans = 0;
    for (int i = l; i <= r; i++) {
        ans += is_prime(i);
    }
    return ans;
}
void worker(int l, int r, int &ret) {
    cout << this_thread::get_id() << "begin\n";
    ret = prime_count(l, r);
    cout << this_thread::get_id() << "done\n";
    return ;
}
int main() {
    #define batch 500000
    ThreadPool tp(5); // 五个任务的窗口队列
    int ret[10];
    for (int i = 0, j = 1; i < 10; i++, j += batch) {
        tp.add_task(worker, j, j + batch - 1, ref(ret[i]));
    }
    tp.stop();
    int ans = 0;
    for (auto x : ret) ans += x;
    cout << ans << endl;
    #undef batch
    return 0;
}
ENDS(thread_pool_test)

int main() {
    thread_pool_test::main();
    return 0;
}

更多推荐

Wi-Fi7将带来前所未有的快捷、稳定的互联网,更快的传输速度

随着科技的飞速发展,无线连接技术也日渐成熟与先进。Wi-Fi,作为我们日常生活和工作中不可或缺的一部分,也正在迎来技术的革新。我们将有机会见识到新一代的Wi-Fi技术--Wi-Fi7,它将带来前所未有的快捷、稳定的互联网体验。首先,Wi-Fi7将会为我们带来更快的传输速度。预计新一代Wi-Fi(Wi-Fi7)的最高理论

快速入门:如何使用HTTP代理进行网络请求

今天我要和大家分享如何使用HTTP代理进行网络请求的快速入门指南。如果你想了解如何通过代理服务器发送和接收网络请求,那么这篇文章将为你提供一个简单而全面的指南。第一部分:HTTP代理的基础知识HTTP代理服务器充当了客户端和目标服务器之间的中间人角色。当你发送网络请求时,请求首先发送到代理服务器,然后由代理服务器将请求

基于Xml方法的Bean的配置-实例化Bean的方法-构造方法

SpringBean的配置详解Bean的实例化配置Spring的实例化方法主要由以下两种构造方法实例化底层通过构造方法对bean进行实例化构造方法实例化bean又分为无参方法实例化和有参方法实例化,在Spring中配置的<bean>几乎都是无参构造该方式,默认是无参构造方法,在此处不赘述。下面讲解有参构造方法实例化Be

Mybatis的mapper.xml批量插入、修改sql

今天要有个功能,要进行一批数据的插入和修改,为了不频繁调用数据库,所以想到了批量插入和修改,因为从毕业后,就没写过批量插入和批量修改,所以在这里记录一下,避免后续再遇到忘记怎么写了批量插入(传入的参数是List<实体>list):<insertid="insertList"keyColumn="id"keyProper

【Spring Boot】专栏合集,快速入门大全

作者简介前言作者之前写过一个SpringBoot的系列,包含自动装配原理、MVC、安全、监控、集成数据库、集成Redis、日志、定时任务、异步任务等内容,本文将会一文拉通来总结这所有内容,不骗人,一文快速入门SpringBoot。专栏地址:https://blog.csdn.net/joker_zjn/category

优化系统报错提示信息,提高人机交互(一)

1、常规报错及处理packagecom.example.demo.controller;importcom.example.demo.service.IDemoService;importlombok.AllArgsConstructor;importlombok.extern.slf4j.Slf4j;importor

XAPI项目架构:应对第三方签名认证的设计与调整

《XAPI项目》:GitHub仓库(勿打🚫小破站一个)该项目是基于鱼皮的《API开发平台》项目的需求和架构设计上进行Golang版本开发的。这篇文章,主要内容是记录在《XAPI项目》的原架构上,为了应对第三方签名认证的设计,而对原架构的调整修改。目录原设计架构遇到的问题:当【第三方项目】需要验证自己项目的签名认证时,

“全景江西·南昌专场”数字技术应用场景发布会 | 万广明市长莅临拓世集团展位,一览AIGC科技魅力

随着数字技术的迅猛发展,传统产业正在发生深刻的变革,新兴产业蓬勃兴起。但要想实现数字经济超常规发展,就要在数字产业化上培育新优势,大力实施数字经济核心产业提速行动,加快推进“一核三基地”建设。在这个数字经济时代,创新和科技成为了推动社会进步的关键力量。数字经济的崛起也意味着更多的机会和挑战。它不仅为企业提供了更多创新和

【微信小程序】文章样式,标题样式,及设置背景~

|background-size设置背景图片大小。图片可以保有其原有的尺寸,或者拉伸到新的尺寸,或者在保持其原有比例的同时缩放到元素的可用空间的尺寸。|background-size:cover;适配屏幕大小文章样式,标题样式,及设置背景~index.wxml<viewclass="about"><viewclass=

Fork() 函数:“父” 与 “子” 进程的交互(进程的创建)

阅读导航前言一、fork函数初识1.基本概念2.fork函数返回值二、fork函数的写时拷贝三、总结温馨提示前言前面我们讲了C语言的基础知识,也了解了一些数据结构,并且讲了有关C++的一些知识,也学习了一些Linux的基本操作,也了解并学习了有关Linux开发工具vim、gcc/g++使用、yum工具以及git命令行提

蓝桥杯2023年第十四届省赛真题-更小的数--题解

目录蓝桥杯2023年第十四届省赛真题-更小的数题目描述输入格式输出格式样例输入样例输出提示【思路解析】【代码实现】蓝桥杯2023年第十四届省赛真题-更小的数时间限制:3s内存限制:320MB提交:895解决:303题目描述小蓝有一个长度均为n且仅由数字字符0∼9组成的字符串,下标从0到n−1,你可以将其视作是一个具有n

热文推荐