CodeSnippet.Cn
代码片段
Csharp
架构设计
.NetCore
西班牙语
kubernetes
MySql
Redis
Algorithm
Other
Ubuntu
Linux
.NetMvc
VisualStudio
Python
Git
pm
WPF
java
Plug-In
分布式
CSS
微服务架构
JavaScript
DataStructure
Shared
啥是Task?
0
.NetCore
Csharp
小笨蛋
发布于:2022年05月11日
更新于:2022年05月11日
209
#custom-toc-container
> 文中所有例子均出于解释目的,并非具有实际意义的代码。有返回值的 Task 和无返回值的 Task 实际区别不是很大,下文大多数举例不做特别区分。不纠结 api 的使用细节,只讲 Task 的整体设计思路。 代码运行截图是在 .NET 6 中的,其他版本的设计没有大的改动,不影响学习。 笔者解读并非权威解读,只是希望能给大家一个理解 Task 的方法。 ### 从表象讲起 #### Task 从何而来 以下仅做典型举例,并非全部 - new Task ```csharp new Task(_ => { Console.WriteLine("Hello World!"); }, null).Start(); ``` - TaskFactory.StartNew ```csharp new TaskFactory().StartNew(() => { Console.WriteLine("Hello World!"); }); ``` - Task.Run ```csharp Task.Run(() => { Console.WriteLine("Hello World!"); }); ``` - Task.FromResult 等直接创建一个已完成的 Task ```csharp Task.FromResult("Hello World!"); var task = Task.CompletedTask; ``` - 某个不知道其内部实现的 async 方法 `async Task
FooAsync();` #### Task 常见用法 - 注册一个回调,等待 Task 执行完成时获取结果并执行回调 ```csharp var task = Task.Run
(() => "Hello World!"); task.ContinueWith(t => Console.WriteLine(t.Result)); ``` - await 一个 Task 并得到结果 ```csharp var task = Task.Run
(() => "Hello World!"); var result = await task; Console.WriteLine(result); ``` - 直接 GetResult ```csharp var task = Task.Run
(() => "Hello World!"); // 等效于 task.Result var result = task.GetAwaiter().GetResult(); Console.WriteLine(result); ``` #### Task 的分类 ##### 按是否包含 Result 分,也就是是否是泛型 Task - `Task` - `Task
` ##### 按得到 Task 的方式,可以分为 我知道这个 Task 是怎么来的,这种情况下,我们自己参与了 Task 的创建过程,知道这个 Task 是在干啥。比如: `Task task = Task.Run
(() => 1 + 2);` 计算 1 + 2,并将结果作为 Task 的结果。 不知道这个 Task 是怎么来的。比如: `Task task = new HttpClient().GetStringAsync("http://localhost:5000/api/values");` 而这两种获取方式的不同对应的是两种完全不同的侧重点: 1. Task 是一个白盒,关注 Task 里干了什么,在哪执行里面这些代码。 2. Task 是一个黑盒,关注 Task 能给到我什么,Task 完成执行之后,我该干什么。 ##### 对 Task 进行分解 按功能点可以将 Task 分为三个部分 1. 任务执行:通过 Task.Run 等方式执行一段我们自定义的逻辑。 2. 回调通知及回调执行:注册一个回调,等待 Task 完成时执行。 3. await 语法支持:脱离了 await,task 的上述两个功能依旧可以完整执行。但却会丧失代码的简洁性。 ### Task 在哪执行? #### 线程池 Task 可以作为 ThreadPool 队列系统的基本单元被 ThreadPool 调度执行。 下面这些常见的创建 Task 的方式,默认情况都是在 ThreadPool 中被调度执行的,这几个本质上是一样的,只是使用方式上和可支持传入的自定义选项上的区别。 - new Task ```csharp new Task(_ => { Console.WriteLine("Hello World!"); }, null).Start(); ``` - TaskFactory.StartNew ```csharp new TaskFactory().StartNew(() => { Console.WriteLine("Hello World!"); }); ``` - Task.Run ```csharp // 可以看做简化版的 TaskFactory.StartNew Task.Run(() => { Console.WriteLine("Hello World!"); }); ``` 以 `Task.Run` 为例来看下里面到底做了些什么。 在 `PortableThreadPool.TryCreateWorkerThread` 和实际要要执行的 `lambda` 表达式中打上断点,我们便可以清晰的看到整个执行过程。   整理一下的话,主要就是这个样子,为简化理解,ThreadPool 中的调用细节已省略。  Task 关键代码摘录: ```csharp class Task { // 任务的主体,我们要执行的实际逻辑 // 可能有返回值,可能没有 internal Delegate m_action; // 任务的状态 internal volatile int m_stateFlags; // ThreadPool 调用入口,由于 JIT 的内联优化,调用栈里只能看到 ExecuteEntryUnsafe,看不到这个方法 internal virtual void ExecuteFromThreadPool(Thread threadPoolThread) => ExecuteEntryUnsafe(threadPoolThread); internal void ExecuteEntryUnsafe(Thread? threadPoolThread) { // 设置 Task 状态为已经执行 m_stateFlags |= (int)TaskStateFlags.DelegateInvoked; if (!IsCancellationRequested & !IsCanceled) { ExecuteWithThreadLocal(ref t_currentTask, threadPoolThread); } else { ExecuteEntryCancellationRequestedOrCanceled(); } } // 创建 Task 的时候可传入的数据,用于执行时使用 // new Task(state => Console.WriteLine(state), "Hello World").Start(); internal object? m_stateObject; private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null) { // 执行上下文维护着代码执行逻辑上下文的一些数据,如 AsyncLocal ExecutionContext? ec = CapturedContext; if (ec == null) { // 没有执行上下文,直接执行 InnerInvoke(); } else { // 是否是在 ThreadPool 线程上执行 if (threadPoolThread is null) { ExecutionContext.RunInternal(ec, s_ecCallback, this); } else { ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, ec, s_ecCallback, this); } } } // 不管 ExecuteWithThreadLocal 分支如何,最后会走到 InnerInvoke internal virtual void InnerInvoke() { if (m_action is Action action) { action(); return; } if (m_action is Action
actionWithState) { actionWithState(m_stateObject); } } } ``` 可以看到 Task 以 `ThreadPoolTaskScheduler` 为媒介,进入了 `ThreadPool`。`ThreadPool` 调用 Task.`ExecuteFromThreadPool` 方法最终触发 `Task` 所封装的 `action` 的执行。 与 `ThreadPool` 中另一种基本单元 `IThreadPoolWorkItem` 一样,`Task` 在进入 `ThreadPoolWorkQueue` 时会有两种可能,进入全局队列或者本地队列。 理解这个问题,我们需要看一下 `ThreadPoolTaskScheduler.QueueTask` 里做了些什么。 ```csharp internal sealed class ThreadPoolTaskScheduler : TaskScheduler { protected internal override void QueueTask(Task task) { TaskCreationOptions options = task.Options; if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0) { // 创建独立线程,和线程池无关 new Thread(s_longRunningThreadWork) { IsBackground = true, Name = ".NET Long Running Task" }.UnsafeStart(task); } else { // 第二个参数是 preferLocal // options & TaskCreationOptions.PreferFairness 这个位标志的枚举用法可查看官方资料 // https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/builtin-types/enum#enumeration-types-as-bit-flags ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0); } } } ``` 上面代码里的 `TaskCreationOptions` 是我们在创建 `Task` 的时候可以指定的一个选项,默认是 `None`。 `Task.Run` 不支持传入该选项,可使用 `TaskFactory.StartNew` 的重载进行指定: ```csharp new TaskFactory().StartNew(() => { Console.WriteLine("Hello World!"); }, TaskCreationOptions.PreferFairness); ``` 根据 TaskCreationOptions 的不同,出现了三个分支 - LongRunning:独立线程,和线程池无关 - 包含 PreferFairness时:preferLocal=false,进入全局队列 - 不包含 PreferFairness时:preferLocal=ture,进入本地队列 进入全局队列的任务能够公平地被各个线程池中的线程领取执行,也是就是 `prefer fairness` 这个词组的字面意思了。 下图中 `Task666` 先进入全局队列,随后被 `Thread1` 领走。`Thread3` 通过 `WorkStealing` 机制窃取了 `Thread2` 中的 `Task2`。  #### 一个独立的后台线程中 也就是上文提到的创建 Task 时使用 `TaskCreationOptions.LongRunning`,如果你需要一个执行一个长时间的任务,比如一段耗时很久的同步代码,就可以使用这个。执行异步代码(指 await xxx)时不推荐使用,后面会讲原因。 ```csharp new TaskFactory().StartNew(() => { // 耗时较长的同步代码 }, TaskCreationOptions.LongRunning); ``` `ThreadPool` 管理的线程是出于可复用的目的设计的,不停地从队列系统中领取任务执行。如果一个 `WorkThread` 阻塞在一个耗时较长的任务上,它就没办法处理其他任务,`ThreadPool` 的吞吐率会受影响。 当然并不意味着 `ThreadPool` 不能处理这样的任务。举个极端的例子,如果线程池目前的 `WorkThread` 全在处理 `LongRunning Task`。在 `Starvation Avoidance` 机制(每隔500ms)创建新的 `WorkThread` 之前,`ThreadPool` 没法执行新的任务。 `LongRunning` 的 `Task` 生命周期与 `ThreadPool` 设计目的不符合,因此需独立开来。 #### 自定义的TaskScheduler里 除了 `ThreadPoolTaskScheduler` 外,我们还可以定义自己的 `TaskScheduler`。 首先需要继承 `TaskScheduler` 这个抽象类,有三个抽象方法需要我们实现。 ```csharp public abstract class TaskScheduler { // 入口,待调度执行的 Task 会通过该方法传入 protected internal abstract void QueueTask(Task task); // 这个是在执行 Task 回调的时候才会被执行到的方法,放到后面再讲 protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued); // 获取所有调度到该 TaskScheduler 的 Task protected abstract IEnumerable
? GetScheduledTasks(); } ``` 在我们自定义的 `TaskScheduler` 里,在 `QueueTask` 被执行时会拿到 `Task`,但是 `Task` 要怎么去触发里面的 `action` 呢。 `Task` 针对 `ThreadPool` 的调用场景暴露了一个 `ExecuteFromThreadPool` 的 `internal` 方法,同时也提供了一个 `ExecuteEntry` 方法供其他场景调用,但是这个方法也是 `internal` 的。只能通过 `TaskScheduler` 的 protect 方法进行间接调用。 ```csharp public abstract class TaskScheduler { protected bool TryExecuteTask(Task task) { if (task.ExecutingTaskScheduler != this) { throw new InvalidOperationException(SR.TaskScheduler_ExecuteTask_WrongTaskScheduler); } return task.ExecuteEntry(); } } 下面是一个自定义的 TaskScheduler,在一个固定的线程上顺序执行 Task。 class CustomTaskScheduler : TaskScheduler { private readonly BlockingCollection
_queue = new(); public CustomTaskScheduler() { new Thread(() => { while (true) { var task = _queue.Take(); Console.WriteLine($"task {task.Id} is going to be executed"); TryExecuteTask(task); Console.WriteLine($"task {task.Id} has been executed"); } }) { IsBackground = true }.Start(); } protected override IEnumerable
GetScheduledTasks() { return _queue.ToArray(); } protected override void QueueTask(Task task) { _queue.Add(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } } ``` 在 `TaskFactory` 的构造函数中可以传入我们自定义的 `TaskScheduler` ```csharp var taskFactory = new TaskFactory(new CustomTaskScheduler()); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $" threadId: {Thread.CurrentThread.ManagedThreadId}")); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $" threadId: {Thread.CurrentThread.ManagedThreadId}")); Console.ReadLine(); ``` 输出结果如下: ```csharp var taskFactory = new TaskFactory(new CustomTaskScheduler()); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $" threadId: {Thread.CurrentThread.ManagedThreadId}")); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $" threadId: {Thread.CurrentThread.ManagedThreadId}")); Console.ReadLine(); ``` ```csharp task 1 is going to be executed task 1 threadId: 10 task 1 has been executed task 2 is going to be executed task 2 threadId: 10 task 2 has been executed ``` 所有的 Task 都会在一个线程里被调度执行。 #### Task 可以封装任何类型的别的任务 上面两种情况,Task 都存在明确的执行实体,但有时候,可能是没有的。看下面这样的例子。 ```csharp var task = FooAsync(); var action = typeof(Task).GetField("m_action", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(task); Console.WriteLine($"Task action is null: {action == null}"); task.ContinueWith(t => Console.WriteLine(t.Result)); // 回调可以注册多个 task.ContinueWith(t => Console.WriteLine(t.Result)); Task
FooAsync() { var tsc = new TaskCompletionSource
(); new Thread(() => { Thread.Sleep(1000); tsc.SetResult("Hello World"); }) { IsBackground = true }.Start(); return tsc.Task; } ``` 输出: ```csharp Task action is null: True Hello World Hello World ``` 从 FooAsync 外部和内部两个角度来看这个问题 - FooAsync 外:拿到了一个 Task 并注册了回调 - FooAsync 内:相当于间接的持有了这个回调,并通过 tsc.SetResult 间接地调用了这个回调。 下面是关键代码的摘录 ```csharp class Task { // 保存一个或一组回调 private volatile object? m_continuationObject; internal void FinishContinuations() { // 处理回调的执行 } } class Task
: Task { internal bool TrySetResult(TResult result) { // ... this.m_result = result; // 复用父类的逻辑 FinishContinuations(); // ... } } public class TaskCompletionSource
{ public TaskCompletionSource() => _task = new Task
(); public Task
Task => _task; public void SetResult(TResult result) { TrySetResult(result); } public bool TrySetResult(TResult result) { _task.TrySetResult(result); // ... } } ``` 有时候 Task.TrySetResult() 的触发源可能是一个异步IO完成事件导致的,也就是我们常说的异步IO,硬件有自己的处理芯片,在异步IO完成通知CPU(硬件中断 hardware interrupt)之前,CPU并不需要参与,这也是异步IO的价值所在。 ### 小结 Task 是个已经完成或者将在未来某个时间点完成的任务,可以向其注册一个回调等待任务完成时被执行。
这里⇓感觉得写点什么,要不显得有点空,但还没想好写什么...
返回顶部
About
京ICP备13038605号
© 代码片段 2025