.NET的CancellationTokenSource和ManualResetEvent结合使用

2023-09-21 16:14:50

一、CancellationTokenSource 是 C# 中的一个类,用于取消异步操作。它提供了一种机制,可以取消一个或多个异步操作。

CancellationTokenSource 包含以下主要方法:

  1. Cancel(): 该方法会取消所有挂起的操作,并引发 OperationCanceledException 异常。如果在调用 Cancel() 方法时没有挂起的操作,则没有任何效果。
  2. Cancel(boolean): 这个重载方法允许您取消一个或多个特定的挂起操作。如果Cancel(true)被调用,那么所有挂起的操作都将被取消,并引发 OperationCanceledException 异常。如果Cancel(false)被调用,那么不会有任何操作被取消。
  3. IsCancellationRequested: 这个属性会返回一个 bool 值,表明是否已经请求取消操作。如果已经请求取消操作,则返回 true;否则返回 false。
  4. ThrowIfCancellationRequested(): 这个方法会检查是否已经请求取消操作。如果是这样,那么会引发 OperationCanceledException 异常。

CancellationTokenSource 通常与 CancellationToken 一起使用。CancellationToken 是一个结构,它提供了一种机制,可以在异步操作中请求取消操作。CancellationTokenSource 可以生成一个 CancellationToken,这个 token 可以传递给异步操作,以便在需要取消操作时请求取消。

以下是一个简单的示例,演示如何使用 CancellationTokenSource 和 CancellationToken 来取消一个异步操作:

CancellationTokenSource cts = new CancellationTokenSource();  
CancellationToken token = cts.Token;  
  
Task.Run(() =>   
{  
    while (true)  
    {  
        token.ThrowIfCancellationRequested();  
        // 异步操作代码...  
    }  
}, cts.Token);  
  
// 在需要取消操作时调用以下方法:  
cts.Cancel();

在这个示例中,我们创建了一个 CancellationTokenSource 对象 cts 和一个与之关联的 CancellationToken 对象 token。我们将 CancellationToken 对象传递给 Task.Run 方法,以便在异步操作中检查是否请求了取消操作。如果取消操作被请求,则会引发 OperationCanceledException 异常,从而取消异步操作。在需要取消操作时,我们调用 cts.Cancel() 方法来请求取消操作。 

 二、ManualResetEvent是一种同步对象,用于在多线程编程中控制线程之间的通信。它允许一个或多个线程等待某个事件发生,然后在事件发生后继续执行。

使用ManualResetEvent时,通常需要以下步骤:

  1. 创建一个ManualResetEvent对象。可以通过构造函数来指定初始状态,如果初始状态为真,则线程可以继续执行;否则,线程将被阻塞直到事件被触发。
  2. 在需要等待某个事件发生的线程中,调用ManualResetEvent的WaitOne()方法。这将会阻塞当前线程,直到另一个线程调用ManualResetEvent的Set()方法触发事件。
  3. 在事件发生后,另一个线程可以调用ManualResetEvent的Set()方法来触发事件,这将使得等待的线程可以继续执行。
  4. 如果需要手动重置ManualResetEvent的状态,可以调用Reset()方法。这样,任何等待的线程将会继续被阻塞,直到再次调用Set()方法触发事件。

总之,ManualResetEvent可以帮助线程间进行同步,使得线程可以等待另一个线程完成某个任务后才继续执行。

3、通用类示例

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;

namespace Common
{
    /// <summary>
    /// 线程通用类
    /// </summary>
    public class TaskCommand
    {
        //用于取消异步操作
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        //用于在多线程编程中控制线程之间的通信
        ManualResetEvent resetEvent = new ManualResetEvent(true);
        Thread thread = null;

        /// <summary>
        /// 队列对象
        /// </summary>
        private Queue<MeterAsyncQueue> AsyncQueues { get; set; }

        /// <summary>
        /// 并发任务数
        /// </summary>
        private int ParallelTaskCount { get; set; }

        /// <summary>
        /// 并行任务集合
        /// </summary>
        private List<Task> ParallelTasks { get; set; }

        /// <summary>
        /// 是否首次执行任务
        /// </summary>
        private bool IsInitTask { get; set; }

        /// <summary>
        /// 锁
        /// </summary>
        private readonly object _objLock = new object();

        /// <summary>
        /// 获取队列锁
        /// </summary>
        private readonly object _queueLock = new object();

        /// <summary>
        /// 开始任务
        /// </summary>
        public void StartData()
        {
            tokenSource = new CancellationTokenSource();
            resetEvent = new ManualResetEvent(true);

            List<int> Ids = new List<int>();
            for (int i = 0; i < 10000; i++)
            {
                Ids.Add(i);
            }
            thread = new Thread(new ThreadStart(() => StartTask(Ids)));
            thread.Start();
        }

        /// <summary>
        /// 暂停任务
        /// </summary>
        public void OutData()
        {
            //task暂停
            resetEvent.Reset();
        }

        /// <summary>
        /// 继续任务
        /// </summary>
        public void ContinueData()
        {
            //task继续
            resetEvent.Set();
        }

        /// <summary>
        /// 取消任务
        /// </summary>
        public void Cancel()
        {
            //释放对象
            resetEvent.Dispose();
            foreach (var CurrentTask in ParallelTasks)
            {
                if (CurrentTask != null)
                {
                    if (CurrentTask.Status == TaskStatus.Running) { }
                    {
                        //终止task线程
                        tokenSource.Cancel();
                    }
                }
            }
            thread.Abort();
        }

        /// <summary>
        /// 执行数据
        /// </summary>
        /// <param name="Index"></param>
        public void Execute(int Index)
        {
            //阻止当前线程
            resetEvent.WaitOne();

            Console.WriteLine("当前第" + Index + "个线程");

            Thread.Sleep(1000);
        }

        //控制线程并行数量
        public void StartTask(List<int> Ids)
        {
            IsInitTask = true;
            ParallelTasks = new List<Task>();
            AsyncQueues = new Queue<MeterAsyncQueue>();
            //获取并发数
            ParallelTaskCount = 5;
            //初始化异步队列
            InitAsyncQueue(Ids);
            //开始执行队列任务
            HandlingTask();

            Task.WaitAll(new Task[] { Task.WhenAll(ParallelTasks.ToArray()) });
        }

        /// <summary>
        /// 初始化异步队列
        /// </summary>
        private void InitAsyncQueue(List<int> Ids)
        {
            foreach (var item in Ids)
            {
                MeterInfo info = new MeterInfo();
                info.Id = item;
                AsyncQueues.Enqueue(new MeterAsyncQueue()
                {
                    MeterInfoTask = info
                });
            }
        }

        /// <summary>
        /// 开始执行队列任务
        /// </summary>
        private void HandlingTask()
        {
            lock (_objLock)
            {
                if (AsyncQueues.Count <= 0)
                {
                    return;
                }

                var loopCount = GetAvailableTaskCount();
                //并发处理队列
                for (int i = 0; i < loopCount; i++)
                {
                    HandlingQueue();
                }
                IsInitTask = false;
            }
        }

        /// <summary>
        /// 处理队列
        /// </summary>
        private void HandlingQueue()
        {
            CancellationToken token = tokenSource.Token;
            lock (_queueLock)
            {
                if (AsyncQueues.Count > 0)
                {
                    var asyncQueue = AsyncQueues.Dequeue();

                    if (asyncQueue == null) return;
                    var task = Task.Factory.StartNew(() =>
                    {
                        if (token.IsCancellationRequested)
                        {
                            return;
                        }
                        //阻止当前线程
                        resetEvent.WaitOne();
                        //执行任务
                        Execute(asyncQueue.MeterInfoTask.Id);

                    }, token).ContinueWith(t =>
                    {
                        HandlingTask();
                    }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
                    ParallelTasks.Add(task);
                }
            }
        }

        /// <summary>
        /// 获取当前有效并行的任务数
        /// </summary>
        /// <returns></returns>
        [MethodImpl(MethodImplOptions.Synchronized)]
        private int GetAvailableTaskCount()
        {
            if (IsInitTask)
                return ParallelTaskCount;
            return 1;
        }
    }

    // <summary>
    /// 并发对象
    /// </summary>
    public class MeterAsyncQueue
    {
        public MeterAsyncQueue()
        {
            MeterInfoTask = new MeterInfo();
        }
 
        public MeterInfo MeterInfoTask { get; set; }
    }

    public class MeterInfo
    {
        public MeterInfo()
        {
 
        }
        public int Id { get; set; }
    }
}

更多推荐

数据结构前瞻

集合框架JAVA的集合框架是定义在java.util包下的一组接口和实现类,用于将多个元素置于一个单元中,对这些元素进行快速,便捷的存储,减速和管理,即增删查改下面的格子,黄色代表接口,蓝色代表抽象类,棕色代表类是动态数组(顺序表)优先级队列是双向列表底层就是一颗红黑树重要的四个接口算法效率时间复杂度即算法中基本操作的

2022年贵州省职业院校技能大赛中职组网络安全赛项规程

2022年贵州省职业院校技能大赛中职组网络安全赛项规程一、赛项名称赛项名称:网络安全赛项归属:信息技术类二、竞赛目的为检验中职学校网络信息安全人才培养成效,促进网络信息安全专业教学改革,培养大批既满足国家网络安全战略需要有具备世界水平的优秀技能人才,在社会上营造“技能改变命运、匠心成就人生”的崇尚技能的氛围,国家教育部

深入理解WPF中MVVM的设计思想

近些年来,随着WPF在生产,制造,工业控制等领域应用越来越广发,很多企业对WPF开发的需求也逐渐增多,使得很多人看到潜在机会,不断从Web,WinForm开发转向了WPF开发,但是WPF开发也有很多新的概念及设计思想,如:数据驱动,数据绑定,依赖属性,命令,控件模板,数据模板,MVVM等,与传统WinForm,ASP.

Redis实现Session持久化

Redis实现Session持久化1.前言直接使用Session存储用户登录信息,此时的会话信息是存储在内中的,只要项目重启存储的Session信息就会丢失。而使用Redis存储Session的话就不会存在这种情况,即使项目重启也并不影响,也无需用户重新登录。使用Redis存储Session,还能让项目支持分布式的,比

如何将 Transformer 应用于时间序列模型

在机器学习的广阔前景中,transformers就像建筑奇迹一样高高耸立,以其复杂的设计和捕获复杂关系的能力重塑了我们处理和理解大量数据的方式。自2017年创建第一个Transformer以来,Transformer类型呈爆炸式增长,其中包括ChatGPT和DALL-E等强大的生成式AI模型。虽然transformer

Beyong compare 介绍

"BeyondCompare"是一个强大的文件和文件夹比较工具,它允许你比较两个或更多的目录/文件并显示差异。这个工具特别有用,当你需要找出两个不同版本的文件之间的改变时。如果你想要使用BeyondCompare,你需要按照以下步骤操作:1.**下载和安装**:首先,你需要从BeyondCompare的官方网站或其他可

接口文档规范

接口文档规范主要包括以下几个方面:1.接口基本信息应该在文档的开头提供接口的基本信息,包括接口名称、接口URL、请求方法、请求参数、返回值、返回状态码等。这些信息应该清晰明了,并且易于理解。2.请求参数接口文档应该明确列出请求参数,并提供每个参数的名称、类型、是否必填、描述等信息。对于复杂结构的请求参数,例如JSON或

从网约车平台合规问题看企业合规难题如何破解

随着互联网的快速发展,网约车行业逐渐崛起并成为人们出行的重要选择之一。然而,虽然网约车平台带来了便利和效率,但也引发了一系列合规问题。近日,西安市交通运输综合执法支队和西安市出租汽车管理处组织开展了西安市网约车行业“全过程执法、全链条监管、全合规营运百日攻坚行动”。约谈会上,西安市出租汽车管理处相关负责人通报了“百日攻

华为OD机试真题 Java 实现【简易内存池】【2023 B卷 200分 考生抽中题】

目录一、题目描述二、输入描述三、输出描述样例:输出样例:四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明4、再输入5、再说明6、如果走后一次请求的是20,会怎么样呢?华为OD机试2023B卷题库疯狂收录中,刷题点这里一、题目描述请实现一个简易内存池,根据请求命令完成内存分配和释放。内存池支持两种操作

Shiro【散列算法、Shiro会话、退出登录 、权限表设计、注解配置鉴权 】(五)-全面详解(学习总结---从入门到深化)

目录Shiro认证_散列算法Shiro认证_过滤器Shiro认证_获取认证数据Shiro认证_Shiro会话Shiro认证_会话管理器Shiro认证_退出登录Shiro认证_RememberMeShiro授权_权限表设计Shiro授权_数据库查询权限Shiro授权_在Realm进行授权Shiro授权_过滤器配置鉴权Sh

轻量服务器2核与1核的区别

​1.核心数量轻量服务器2核与1核最明显的区别在于核心数量。1核服务器只有一个处理器核心,而2核服务器有两个处理器核心。这使得2核服务器在处理数据时能够同时执行更多的任务。2.并行处理能力由于只有1个核心,1核服务器不具备并行处理任务的能力。而2核服务器在处理数据时可以同时执行多个任务,提高了服务器的处理速度和效率。3

热文推荐