CodeSnippet.Cn
代码片段
Csharp
架构设计
.NetCore
西班牙语
kubernetes
MySql
Redis
Algorithm
Ubuntu
Linux
Other
.NetMvc
VisualStudio
Git
pm
Python
WPF
java
Plug-In
分布式
CSS
微服务架构
JavaScript
DataStructure
Shared
揭秘 .NET 中的 AsyncLocal
0
Csharp
小笨蛋
发布于:2023年03月03日
更新于:2023年03月03日
131
#custom-toc-container
### 线程本地存储及其局限性 如果想要整个.NET程序中共享一个变量,我们可以将想要共享的变量放在某个类的`静态属性`上来实现。 而在多线程的运行环境中,则可能会希望能将这个变量的共享范围缩小到单个线程内,这时候就需要使用线程本地存储了。 线程本地存储可以通过 `ThreadStaticAttribute` 或者 `ThreadLocal
` 来实现。 `ThreadStaticAttribute` 是一个特性,可以用来修饰字段或者属性,修饰后的字段或者属性就可以在每个线程中单独存储一份数据。 `ThreadLocal
` 是一个泛型类,可以用来存储每个线程的数据。 ``` class Program { [ThreadStatic] private static string _threadStatic; private static ThreadLocal
_threadLocal = new ThreadLocal
(); static void Main(string[] args) { Parallel.For(0, 4, _ => { var threadId = Thread.CurrentThread.ManagedThreadId; var value = $"这是来自线程{threadId}的数据"; _threadStatic ??= value; _threadLocal.Value ??= value; Console.WriteLine($"Use ThreadStaticAttribute; Thread:{threadId}; Value:{_threadStatic}"); Console.WriteLine($"Use ThreadLocal; Thread:{threadId}; Value:{_threadLocal.Value}"); }); Console.Read(); } } ``` > 输出结果 ``` Use ThreadStaticAttribute; Thread:1; Value:这是来自线程1的数据 Use ThreadLocal; Thread:1; Value:这是来自线程1的数据 Use ThreadStaticAttribute; Thread:6; Value:这是来自线程6的数据 Use ThreadLocal; Thread:6; Value:这是来自线程6的数据 Use ThreadStaticAttribute; Thread:10; Value:这是来自线程10的数据 Use ThreadLocal; Thread:10; Value:这是来自线程10的数据 Use ThreadStaticAttribute; Thread:7; Value:这是来自线程7的数据 Use ThreadLocal; Thread:7; Value:这是来自线程7的数据 ``` 除了可以使用 `ThreadStaticAttribute` 和 `ThreadLocal
` 外,我们还可以使用 `CallContext` 、`AsyncLocal
` 来实现一样的功能。由于 `.NET Core及以后版本` 不再实现 `CallContext`,所以下列代码只能在 `.NET Framework` 中执行。 ``` class Program { [ThreadStatic] private static string _threadStatic; private static ThreadLocal
_threadLocal = new ThreadLocal
(); private static AsyncLocal
_asyncLocal = new AsyncLocal
(); static void Main(string[] args) { Parallel.For(0, 4, _ => { var threadId = Thread.CurrentThread.ManagedThreadId; var value = $"这是来自线程{threadId}的数据"; _threadStatic ??= value; CallContext.SetData("value", value); _threadLocal.Value ??= value; _asyncLocal.Value ??= value; Console.WriteLine($"Use ThreadStaticAttribute; Thread:{threadId}; Value:{_threadStatic}"); Console.WriteLine($"Use CallContext; Thread:{threadId}; Value:{CallContext.GetData("value")}"); Console.WriteLine($"Use ThreadLocal; Thread:{threadId}; Value:{_threadLocal.Value}"); Console.WriteLine($"Use AsyncLocal; Thread:{threadId}; Value:{_asyncLocal.Value}"); }); Console.Read(); } } ``` > 输出结果 ``` Use ThreadStaticAttribute; Thread:3; Value:这是来自线程3的数据 Use ThreadStaticAttribute; Thread:4; Value:这是来自线程4的数据 Use ThreadStaticAttribute; Thread:1; Value:这是来自线程1的数据 Use CallContext; Thread:1; Value:这是来自线程1的数据 Use ThreadLocal; Thread:1; Value:这是来自线程1的数据 Use AsyncLocal; Thread:1; Value:这是来自线程1的数据 Use ThreadStaticAttribute; Thread:5; Value:这是来自线程5的数据 Use CallContext; Thread:5; Value:这是来自线程5的数据 Use ThreadLocal; Thread:5; Value:这是来自线程5的数据 Use AsyncLocal; Thread:5; Value:这是来自线程5的数据 Use CallContext; Thread:3; Value:这是来自线程3的数据 Use CallContext; Thread:4; Value:这是来自线程4的数据 Use ThreadLocal; Thread:4; Value:这是来自线程4的数据 Use AsyncLocal; Thread:4; Value:这是来自线程4的数据 Use ThreadLocal; Thread:3; Value:这是来自线程3的数据 Use AsyncLocal; Thread:3; Value:这是来自线程3的数据 ``` 上面的例子都只是在同一个线程中对线程进行存和取,但日常开发的过程中,我们会有很多异步的场景,这些场景可能会导致执行代码的线程发生切换。 比如下面的例子 ``` class Program { [ThreadStatic] private static string _threadStatic; private static ThreadLocal
_threadLocal = new ThreadLocal
(); private static AsyncLocal
_asyncLocal = new AsyncLocal
(); static void Main(string[] args) { _threadStatic = "ThreadStatic保存的数据"; _threadLocal.Value = "ThreadLocal保存的数据"; _asyncLocal.Value = "AsyncLocal保存的数据"; PrintValuesInAnotherThread(); Console.ReadKey(); } private static void PrintValuesInAnotherThread() { Task.Run(() => { Console.WriteLine($"ThreadStatic: {_threadStatic}"); Console.WriteLine($"ThreadLocal: {_threadLocal.Value}"); Console.WriteLine($"AsyncLocal: {_asyncLocal.Value}"); }); } } ``` > 输出结果 ThreadStatic: ThreadLocal: AsyncLocal: AsyncLocal保存的数据 在线程发生了切换之后,只有 AsyncLocal 还能够保留原来的值,当然,.NET Framework 中的 CallContext 也可以实现这个需求,下面给出一个相对完整的总结。 | 实现方式 | .NET FrameWork 可用 | .NET Core 可用 | 是否支持数据流向辅助线程 | | ---------------------------------------------------- | ------------------- | -------------- | ------------------------------------------------------------------ | | ThreadStaticAttribute | 是 | 是 | 否 | | ThreadLocal
| 是 | 是 | 否 | | CallContext.SetData(string name, object data) | 是 | 否 | 仅当参数 data 对应的类型实现了 ILogicalThreadAffinative 接口时支持 | | CallContext.LogicalSetData(string name, object data) | 是 | 否 | 是 | | AsyncLocal
| 是 | 是 | 是 | ### AsyncLocal 简介 AsyncLocal 能够在代码执行的逻辑上下文中存储数据,这个数据可以在当前线程及任意后续新创建的线程中被访问。 我们常用的 HttpContextAccessor 就是通过 AsyncLocal 来实现的。 AsyncLocal
为我们提供了两个功能 * 通过 Value 属性存取值。 * 通过构造函数注册回调函数监听任意线程中对值做出的改动,需记着这个功能,后面介绍源码的时候会有很多地方涉及。 监听回调的代码示例: ``` class Program { private static AsyncLocal
_asyncLocal = new AsyncLocal
(AsyncLocalValueChanged); private static int _number = 1; static void Main(string[] args) { _asyncLocal.Value = "Hello World!"; Console.WriteLine($"Main: {_asyncLocal.Value}, thread: {Thread.CurrentThread.ManagedThreadId}"); Task.Run(() => { Console.WriteLine($"Task: {_asyncLocal.Value}, thread: {Thread.CurrentThread.ManagedThreadId}"); _asyncLocal.Value = "Hello World from Task!"; }); Console.ReadLine(); } private static void AsyncLocalValueChanged(AsyncLocalValueChangedArgs
obj) { Console.WriteLine( $"AsyncLocalValueChanged_{_number++}: {obj.PreviousValue} -> {obj.CurrentValue}, thread: {Thread.CurrentThread.ManagedThreadId}"); } } ``` > 输出结果 ``` AsyncLocalValueChanged_1: -> Hello World!, thread: 1 Main: Hello World!, thread: 1 AsyncLocalValueChanged_2: -> Hello World!, thread: 4 Task: Hello World!, thread: 4 AsyncLocalValueChanged_3: Hello World! -> Hello World from Task!, thread: 4 AsyncLocalValueChanged_4: Hello World from Task! -> , thread: 4 ``` 这边先思考两个问题: 1. 为什么在主线程和 Task 中都会触发一次从 null 到 Hello World 的回调? 2. 为什么在 Task 中会触发一次值被清空的回调? 稍后会对这两个问题进行解答。 ### AsyncLocal 源码分析 源码地址:https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Threading/AsyncLocal.cs ### 主体 AsyncLocal
AsyncLocal 内部代码相对简单,是对 ExecutionContext 的封装,下面是 AsyncLocal
的源码: ``` public sealed class AsyncLocal
: IAsyncLocal { private readonly Action
>? m_valueChangedHandler; // 无参构造 public AsyncLocal() { } // 可以注册回调的构造函数,当 Value 在任意线程被改动,将调用回调 public AsyncLocal(Action
>? valueChangedHandler) { m_valueChangedHandler = valueChangedHandler; } [MaybeNull] public T Value { get { // 从 ExecutionContext 中以自身为 Key 获取值 object? obj = ExecutionContext.GetLocalValue(this); return (obj == null) ? default : (T)obj; } // 是否注册回调将回影响到 ExecutionContext 是否保存其引用 set => ExecutionContext.SetLocalValue(this, value, m_valueChangedHandler != null); } // 在 ExecutionContext 如果判断到值发生了变化,此方法将被调用 void IAsyncLocal.OnValueChanged(object? previousValueObj, object? currentValueObj, bool contextChanged) { Debug.Assert(m_valueChangedHandler != null); T previousValue = previousValueObj == null ? default! : (T)previousValueObj; T currentValue = currentValueObj == null ? default! : (T)currentValueObj; m_valueChangedHandler(new AsyncLocalValueChangedArgs
(previousValue, currentValue, contextChanged)); } } internal interface IAsyncLocal { void OnValueChanged(object? previousValue, object? currentValue, bool contextChanged); } ``` 真正的数据存取是通过 ``` ExecutionContext.GetLocalValue ``` 和 ``` ExecutionContext.SetLocalValue ``` 实现的。 ``` public class ExecutionContext { internal static object? GetLocalValue(IAsyncLocal local); internal static void SetLocalValue( IAsyncLocal local, object? newValue, bool needChangeNotifications); } ``` 需要注意的是这边通过 `IAsyncLocal` 这一接口实现了 `AsyncLocal` 与 `ExcutionContext` 的解耦。 `ExcutionContext` 只关注数据的存取本身,接口定义的类型都是 object,而不关心具体的类型 `T`。 ### AsyncLocal
在 ExecutionContext 中的数据存取实现 在.NET 中,每个线程都关联着一个 `执行上下文`(execution context) 。 可以通过`Thread.CurrentThread.ExecutionContext` 属性进行访问,或者通过 `ExecutionContext.Capture()` 获取(前者的实现) 。 `AsyncLocal` 最终就是把数据保存在 `ExecutionContext` 上的,为了更深入地理解 `AsyncLocal` 我们需要先理解一下它。 源码地址:https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs #### ExecutionContext 与 线程的绑定关系 ExecutionContext 被保存 Thread 的 internal 修饰的 _executionContext 字段上。但`Thread.CurrentThread.ExecutionContext` 并不直接暴露 _executionContext 而与 `ExecutionContext.Capture()` 共用一套逻辑。 ``` class ExecutionContext { public static ExecutionContext? Capture() { ExecutionContext? executionContext = Thread.CurrentThread._executionContext; if (executionContext == null) { executionContext = Default; } else if (executionContext.m_isFlowSuppressed) { executionContext = null; } return executionContext; } } ``` 下面是经过整理的 Thread 的与 ExecutionContext 相关的部分,Thread 属于部分类,_executionContext 字段定义在 https://github.com/dotnet/runtime/blob/5fca04171171f118bca0f93aa9741f205b8cdc29/src/coreclr/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs#L119 文件中 ``` class Thread { // 保存当前线程所关联的 执行上下文 internal ExecutionContext? _executionContext; [ThreadStatic] private static Thread? t_currentThread; public static Thread CurrentThread => t_currentThread ?? InitializeCurrentThread(); public ExecutionContext? ExecutionContext => ExecutionContext.Capture(); } ``` #### ExecutionContext 的私有变量 ``` public sealed class ExecutionContext : IDisposable, ISerializable { // 默认执行上下文 internal static readonly ExecutionContext Default = new ExecutionContext(isDefault: true); // 执行上下文禁止流动后的默认上下文 internal static readonly ExecutionContext DefaultFlowSuppressed = new ExecutionContext(AsyncLocalValueMap.Empty, Array.Empty
(), isFlowSuppressed: true); // 保存所有注册了修改回调的 AsyncLocal 的 Value 值,本文暂不涉及对此字段的具体讨论 private readonly IAsyncLocalValueMap? m_localValues; // 保存所有注册了回调的 AsyncLocal 的对象引用 private readonly IAsyncLocal[]? m_localChangeNotifications; // 当前线程是否禁止上下文流动 private readonly bool m_isFlowSuppressed; // 当前上下文是否是默认上下文 private readonly bool m_isDefault; } ``` #### IAsyncLocalValueMap 接口及其实现 在同一个线程中,所有 `AsyncLocal` 所保存的 `Value` 都保存在 `ExecutionContext` 的 `m_localValues` 字段上。 ``` public class ExecutionContext { private readonly IAsyncLocalValueMap m_localValues; } ``` 为了优化查找值时的性能,微软为 IAsyncLocalValueMap 提供了6个实现 | 类型 | 元素个数 | | -------------------------------- | -------- | | `EmptyAsyncLocalValueMap` | 0 | | `OneElementAsyncLocalValueMap` | 1 | | `TwoElementAsyncLocalValueMap` | 2 | | `ThreeElementAsyncLocalValueMap` | 3 | | `MultiElementAsyncLocalValueMap` | 4 ~ 16 | | `ManyElementAsyncLocalValueMap` | \> 16 | 随着 ExecutionContext 所关联的 AsyncLocal 数量的增加,IAsyncLocalValueMap 的实现将会在ExecutionContext的SetLocalValue方法中被`不断替换`。查询的`时间复杂度和空间复杂度依次递增`。代码的实现与 AsyncLocal 同属于 一个文件。当然元素数量减少时也会替换成之前的实现。 ``` // 这个接口是用来在 ExecutionContext 中保存 IAsyncLocal => object 的映射关系。 // 其实现被设定为不可变的(immutable),随着元素的数量增加而变化,空间复杂度和时间复杂度也随之增加。 internal interface IAsyncLocalValueMap { bool TryGetValue(IAsyncLocal key, out object? value); // 通过此方法新增 AsyncLocal 或修改现有的 AsyncLocal // 如果数量无变化,返回同类型的 IAsyncLocalValueMap 实现类实例 // 如果数量发生变化(增加或减少,将value设值为null时会减少),则可能返回不同类型的 IAsyncLocalValueMap 实现类实例 IAsyncLocalValueMap Set(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent); } ``` Map 的创建是以静态类 `AsyncLocalValueMap` 的 Create 方法作为创建的入口的。 ``` internal static class AsyncLocalValueMap { // EmptyAsyncLocalValueMap 设计上只在这边实例化,其他地方当作常量使用 public static IAsyncLocalValueMap Empty { get; } = new EmptyAsyncLocalValueMap(); public static bool IsEmpty(IAsyncLocalValueMap asyncLocalValueMap) { Debug.Assert(asyncLocalValueMap != null); Debug.Assert(asyncLocalValueMap == Empty || asyncLocalValueMap.GetType() != typeof(EmptyAsyncLocalValueMap)); return asyncLocalValueMap == Empty; } public static IAsyncLocalValueMap Create(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent) { // 创建最初的实例 // 如果 AsyncLocal 注册了回调,则需要保存 null 的 Value,以便下次设置非null的值时因为值发生变化而触发回调 return value != null || !treatNullValueAsNonexistent ? new OneElementAsyncLocalValueMap(key, value) : Empty; } } ``` 此后每次更新元素时都必须调用 IAsyncLocalValueMap 实现类的 Set 方法,原实例是不会发生变化的,需保存 Set 的返回值。 接下来以 `ThreeElementAsyncLocalValueMap` 为例进行解释 ``` private sealed class ThreeElementAsyncLocalValueMap : IAsyncLocalValueMap { // 申明三个私有字段保存 key private readonly IAsyncLocal _key1, _key2, _key3; // 申明三个私有字段保存 private readonly object? _value1, _value2, _value3; public ThreeElementAsyncLocalValueMap(IAsyncLocal key1, object? value1, IAsyncLocal key2, object? value2, IAsyncLocal key3, object? value3) { _key1 = key1; _value1 = value1; _key2 = key2; _value2 = value2; _key3 = key3; _value3 = value3; } public IAsyncLocalValueMap Set(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent) { // 如果 AsyncLocal 注册过回调,treatNullValueAsNonexistent 的值是 false, // 意思是就算 value 是 null,也认为它是有效的 if (value != null || !treatNullValueAsNonexistent) { // 如果现在的 map 已经保存过传入的 key ,则返回一个更新了 value 值的新 map 实例 if (ReferenceEquals(key, _key1)) return new ThreeElementAsyncLocalValueMap(key, value, _key2, _value2, _key3, _value3); if (ReferenceEquals(key, _key2)) return new ThreeElementAsyncLocalValueMap(_key1, _value1, key, value, _key3, _value3); if (ReferenceEquals(key, _key3)) return new ThreeElementAsyncLocalValueMap(_key1, _value1, _key2, _value2, key, value); // 如果当前Key不存在map里,则需要一个能存放第四个key的map var multi = new MultiElementAsyncLocalValueMap(4); multi.UnsafeStore(0, _key1, _value1); multi.UnsafeStore(1, _key2, _value2); multi.UnsafeStore(2, _key3, _value3); multi.UnsafeStore(3, key, value); return multi; } else { // value 是 null,对应的 key 会被忽略或者从 map 中去除,这边会有两种情况 // 1、如果当前的 key 存在于 map 当中,则将这个 key 去除,map 类型降级为 TwoElementAsyncLocalValueMap return ReferenceEquals(key, _key1) ? new TwoElementAsyncLocalValueMap(_key2, _value2, _key3, _value3) : ReferenceEquals(key, _key2) ? new TwoElementAsyncLocalValueMap(_key1, _value1, _key3, _value3) : ReferenceEquals(key, _key3) ? new TwoElementAsyncLocalValueMap(_key1, _value1, _key2, _value2) : // 2、当前 key 不存在于 map 中,则会被直接忽略 (IAsyncLocalValueMap)this; } } // 至多对比三次就能找到对应的 value public bool TryGetValue(IAsyncLocal key, out object? value) { if (ReferenceEquals(key, _key1)) { value = _value1; return true; } else if (ReferenceEquals(key, _key2)) { value = _value2; return true; } else if (ReferenceEquals(key, _key3)) { value = _value3; return true; } else { value = null; return false; } } } ``` #### ExecutionContext - SetLocalValue 需要注意的是这边会涉及到两个 _`Immutable`_ 结构,一个是 `ExecutionContext` 本身,另一个是 `IAsyncLocalValueMap` 的实现类。同一个 key 前后两次 value 发生变化后,会产生新的 ExecutionContext 的实例和 IAsyncLocalMap 实现类实例(在 `IAsyncLocalValueMap` 实现类的 `Set` 方法中完成)。 ``` internal static void SetLocalValue(IAsyncLocal local, object? newValue, bool needChangeNotifications) { // 获取当前执行上下文 ExecutionContext? current = Thread.CurrentThread._executionContext; object? previousValue = null; bool hadPreviousValue = false; if (current != null) { Debug.Assert(!current.IsDefault); Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context"); // 判断当前作为 Key 的 AsyncLocal 是否已经有对应的 Value hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue); } // 如果前后两次 Value 没有发生变化,则继续处理 if (previousValue == newValue) { return; } // 对于 treatNullValueAsNonexistent: !needChangeNotifications 的说明 // 如果 AsyncLocal 注册了回调,则 needChangeNotifications 为 ture,m_localValues 会保存 null 值以便下次触发change回调 IAsyncLocal[]? newChangeNotifications = null; IAsyncLocalValueMap newValues; bool isFlowSuppressed = false; if (current != null) { Debug.Assert(!current.IsDefault); Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context"); isFlowSuppressed = current.m_isFlowSuppressed; // 这一步很关键,通过调用 m_localValues.Set 对 map 进行修改,这会产生一个新的 map 实例。 newValues = current.m_localValues.Set(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications); newChangeNotifications = current.m_localChangeNotifications; } else { // 如果当前上下文不存在,创建第一个 IAsyncLocalValueMap 实例 newValues = AsyncLocalValueMap.Create(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications); } // 如果 AsyncLocal 注册了回调,则需要保存 AsyncLocal 的引用 // 这边会有两种情况,一个是数组未创建过,一个是数组已存在 if (needChangeNotifications) { if (hadPreviousValue) { Debug.Assert(newChangeNotifications != null); Debug.Assert(Array.IndexOf(newChangeNotifications, local) >= 0); } else if (newChangeNotifications == null) { newChangeNotifications = new IAsyncLocal[1] { local }; } else { int newNotificationIndex = newChangeNotifications.Length; // 这个方法会创建一个新数组并将原来的元素拷贝过去 Array.Resize(ref newChangeNotifications, newNotificationIndex + 1); newChangeNotifications[newNotificationIndex] = local; } } // 如果 AsyncLocal 存在有效值,且允许执行上下文流动,则创建新的 ExecutionContext实例,新实例会保存所有的AsyncLocal的值和所有需要通知的 AsyncLocal 引用。 Thread.CurrentThread._executionContext = (!isFlowSuppressed && AsyncLocalValueMap.IsEmpty(newValues)) ? null : // No values, return to Default context new ExecutionContext(newValues, newChangeNotifications, isFlowSuppressed); if (needChangeNotifications) { // 调用先前注册好的委托 local.OnValueChanged(previousValue, newValue, contextChanged: false); } } ``` #### ExecutionContext - GetLocalValue 值的获取实现相对简单 ``` internal static object? GetLocalValue(IAsyncLocal local) { ExecutionContext? current = Thread.CurrentThread._executionContext; if (current == null) { return null; } Debug.Assert(!current.IsDefault); Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context"); current.m_localValues.TryGetValue(local, out object? value); return value; } ``` ### ExecutionContext 的流动 在线程发生切换的时候,`ExecutionContext` 会在前一个线程中被默认捕获,流向下一个线程,它所保存的数据也就随之流动。 在所有会发生线程切换的地方,runtime 都为我们封装好了对执行上下文的捕获。 例如: * new Thread(ThreadStart start).Start() * Task.Run(Action action) * ThreadPool.QueueUserWorkItem(WaitCallback callBack) * await 语法糖 ``` class Program { static AsyncLocal
_asyncLocal = new AsyncLocal
(); static async Task Main(string[] args) { _asyncLocal.Value = "AsyncLocal保存的数据"; new Thread(() => { Console.WriteLine($"new Thread: {_asyncLocal.Value}"); }) { IsBackground = true }.Start(); ThreadPool.QueueUserWorkItem(_ => { Console.WriteLine($"ThreadPool.QueueUserWorkItem: {_asyncLocal.Value}"); }); Task.Run(() => { Console.WriteLine($"Task.Run: {_asyncLocal.Value}"); }); await Task.Delay(100); Console.WriteLine($"after await: {_asyncLocal.Value}"); } } ``` > 输出结果 ``` new Thread: AsyncLocal保存的数据 ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据 Task.Run: AsyncLocal保存的数据 after await: AsyncLocal保存的数据 ``` ### 流动的禁止和恢复 `ExecutionContext` 为我们提供了 `SuppressFlow`(禁止流动) 和 `RestoreFlow` (恢复流动)这两个静态方法来控制当前线程的执行上下文是否像辅助线程流动。并可以通过 `IsFlowSuppressed` 静态方法来进行判断。 ``` class Program { static AsyncLocal
_asyncLocal = new AsyncLocal
(); static async Task Main(string[] args) { _asyncLocal.Value = "AsyncLocal保存的数据"; Console.WriteLine("默认:"); PrintAsync(); // 不 await,后面的线程不会发生切换 Thread.Sleep(1000); // 确保上面的方法内的所有线程都执行完 ExecutionContext.SuppressFlow(); Console.WriteLine("SuppressFlow:"); PrintAsync(); Thread.Sleep(1000); Console.WriteLine("RestoreFlow:"); ExecutionContext.RestoreFlow(); await PrintAsync(); Console.Read(); } static async ValueTask PrintAsync() { new Thread(() => { Console.WriteLine($" new Thread: {_asyncLocal.Value}"); }) { IsBackground = true }.Start(); Thread.Sleep(100); // 保证输出顺序 ThreadPool.QueueUserWorkItem(_ => { Console.WriteLine($" ThreadPool.QueueUserWorkItem: {_asyncLocal.Value}"); }); Thread.Sleep(100); Task.Run(() => { Console.WriteLine($" Task.Run: {_asyncLocal.Value}"); }); await Task.Delay(100); Console.WriteLine($" after await: {_asyncLocal.Value}"); Console.WriteLine(); } } ``` 输出结果: ``` 默认: new Thread: AsyncLocal保存的数据 ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据 Task.Run: AsyncLocal保存的数据 after await: AsyncLocal保存的数据 SuppressFlow: new Thread: ThreadPool.QueueUserWorkItem: Task.Run: after await: RestoreFlow: new Thread: AsyncLocal保存的数据 ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据 Task.Run: AsyncLocal保存的数据 after await: AsyncLocal保存的数据 ``` 需要注意的是,在线程A中创建线程B之前调用 `ExecutionContext.SuppressFlow` 只会影响 `ExecutionContext` 从线程A => 线程B的传递,线程B => 线程C 不受影响。 ``` class Program { static AsyncLocal
_asyncLocal = new AsyncLocal
(); static void Main(string[] args) { _asyncLocal.Value = "A => B"; ExecutionContext.SuppressFlow(); new Thread((() => { Console.WriteLine($"线程B:{_asyncLocal.Value}"); // 输出线程B: _asyncLocal.Value = "B => C"; new Thread((() => { Console.WriteLine($"线程C:{_asyncLocal.Value}"); // 输出线程C:B => C })) { IsBackground = true }.Start(); })) { IsBackground = true }.Start(); Console.Read(); } } ``` ### ExcutionContext 的流动实现 上面举例了四种场景,由于每一种场景的传递过程都比较复杂,目前先介绍其中一个。 但不管什么场景,都会涉及到 ExcutionContext 的 Run 方法。在Run 方法中会调用 RunInternal 方法, ``` public static void Run(ExecutionContext executionContext, ContextCallback callback, object? state) { if (executionContext == null) { ThrowNullContext(); } // 内部会调用 RestoreChangedContextToThread 方法 RunInternal(executionContext, callback, state); } ``` RunInternal 调用下面一个 RestoreChangedContextToThread 方法将 ExcutionContext.Run 方法传入的 ExcutionContext 赋值给当前线程的 _executionContext 字段。 ``` internal static void RestoreChangedContextToThread(Thread currentThread, ExecutionContext? contextToRestore, ExecutionContext? currentContext) { Debug.Assert(currentThread == Thread.CurrentThread); Debug.Assert(contextToRestore != currentContext); // 在这边把之前的 ExecutionContext 赋值给了当前线程 currentThread._executionContext = contextToRestore; if ((currentContext != null && currentContext.HasChangeNotifications) || (contextToRestore != null && contextToRestore.HasChangeNotifications)) { OnValuesChanged(currentContext, contextToRestore); } } ``` #### new Thread(ThreadStart start).Start() 为例说明 ExecutionContext 的流动 这边可以分为三个步骤: 在 Thread 的 Start 方法中捕获当前的 ExecutionContext,将其传递给 Thread 的构造函数中实例化的 ThreadHelper 实例,ExecutionContext 会暂存在 ThreadHelper 的实例字段中,线程创建完成后会调用ExecutionContext.RunInternal 将其赋值给新创建的线程。 代码位置: > https://github.com/dotnet/runtime/blob/5fca04171171f118bca0f93aa9741f205b8cdc29/src/coreclr/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs#L200 ``` public void Start() { ##if FEATURE_COMINTEROP_APARTMENT_SUPPORT // Eagerly initialize the COM Apartment state of the thread if we're allowed to. StartupSetApartmentStateInternal(); ##endif // FEATURE_COMINTEROP_APARTMENT_SUPPORT // Attach current thread's security principal object to the new // thread. Be careful not to bind the current thread to a principal // if it's not already bound. if (_delegate != null) { // If we reach here with a null delegate, something is broken. But we'll let the StartInternal method take care of // reporting an error. Just make sure we don't try to dereference a null delegate. Debug.Assert(_delegate.Target is ThreadHelper); // 由于 _delegate 指向 ThreadHelper 的实例方法,所以 _delegate.Target 指向 ThreadHelper 实例。 var t = (ThreadHelper)_delegate.Target; ExecutionContext? ec = ExecutionContext.Capture(); t.SetExecutionContextHelper(ec); } StartInternal(); } ``` > https://github.com/dotnet/runtime/blob/5fca04171171f118bca0f93aa9741f205b8cdc29/src/coreclr/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs#L26 ``` class ThreadHelper { internal ThreadHelper(Delegate start) { _start = start; } internal void SetExecutionContextHelper(ExecutionContext? ec) { _executionContext = ec; } // 这个方法是对 Thread 构造函数传入的委托的包装 internal void ThreadStart() { Debug.Assert(_start is ThreadStart); ExecutionContext? context = _executionContext; if (context != null) { // 将 ExecutionContext 与 CurrentThread 进行绑定 ExecutionContext.RunInternal(context, s_threadStartContextCallback, this); } else { InitializeCulture(); ((ThreadStart)_start)(); } } } ``` ### 对于 AsyncLocalValueChanged 回调触发的解释 前面我们演示了向 AsyncLocal 注册回调的例子,并留下了两个疑问。 ``` class Program { private static AsyncLocal
_asyncLocal = new AsyncLocal
(AsyncLocalValueChanged); private static int _number = 1; static void Main(string[] args) { _asyncLocal.Value = "Hello World!"; Console.WriteLine($"Main: {_asyncLocal.Value}, thread: {Thread.CurrentThread.ManagedThreadId}"); Task.Run(() => { Console.WriteLine($"Task: {_asyncLocal.Value}, thread: {Thread.CurrentThread.ManagedThreadId}"); _asyncLocal.Value = "Hello World from Task!"; }); Console.ReadLine(); } private static void AsyncLocalValueChanged(AsyncLocalValueChangedArgs
obj) { Console.WriteLine( $"AsyncLocalValueChanged_{_number++}: {obj.PreviousValue} -> {obj.CurrentValue}, thread: {Thread.CurrentThread.ManagedThreadId}"); } } ``` > 输出结果 ``` AsyncLocalValueChanged_1: -> Hello World!, thread: 1 Main: Hello World!, thread: 1 AsyncLocalValueChanged_2: -> Hello World!, thread: 4 Task: Hello World!, thread: 4 AsyncLocalValueChanged_3: Hello World! -> Hello World from Task!, thread: 4 AsyncLocalValueChanged_4: Hello World from Task! -> , thread: 4 ``` #### 1. 为什么在主线程和 Task 中都会触发一次从 null 到 Hello World 的回调? Task.Run 会将任务调度到线程池中,runtime 会把 ExecutionContext 从主线程拷贝到 Task 所在的线程池线程中,这个拷贝过程中会触发一次回调,因为线程池线程原来的 ExecutionContext 里没有保存 AsyncLocal 的值,新的 ExecutionContext 是有值的,所以会触发一次从 null 到 Hello World 的回调。 #### 2. 为什么在 Task 中会触发一次值被清空的回调? 线程池线程执行完任务后会将 ExecutionContext 重置为 null,这个重置过程中会触发一次回调,因为线程池线程原来的 ExecutionContext 里保存了 AsyncLocal 的值,新的 ExecutionContext 中没有值,所以会触发一次值被清空的回调。 ### 总结 1. AsyncLocal 本身不保存数据,数据保存在与代码执行线程绑定的一个 `ExecutionContext` 实例中。具体是名为 `m_localValues` 的私有字段上,其类型定义是 `IAsyncLocalMap` ,以 `IAsyncLocal => object` 的 Map 结构进行保存,且实现类型随着元素数量的变化而变化。 2. `ExecutionContext` 实例 保存在 `Thread.CurrentThread._executionContext` 上,实现与当前线程的关联。 3. 对于 IAsyncLocalMap 的实现类,如果 AsyncLocal 注册了回调,value 传 null 不会被忽略。 没注册回调时分为两种情况:`如果 key 存在`,则做删除处理,map 类型可能出现降级。`如果 key 不存在`,则直接忽略。 4. ExecutionContext 和 IAsyncLocalMap 的实现类都被设计成`不可变(immutable)`。同一个 key 前后两次 value 发生变化后,会产生新的 ExecutionContext 的实例和 IAsyncLocalMap 实现类实例。`这边需要注意一点,创建新的 IAsyncLocalMap 实例时,原来保存的数据是被值拷贝过去的。` 5. ExecutionContext 与当前线程绑定,`默认会在线程发生切换时向后流动(进行数据的拷贝)`,可以禁止流动和恢复流动,且禁止流动仅影响当前线程向其辅助线程的传递,不影响后续。 ### 参考资料 1. https://devblogs.microsoft.com/pfxteam/executioncontext-vs-synchronizationcontext/ 2. 《CLR via C#》27.3 章节 3. github 代码库 https://github.com/dotnet/runtime
这里⇓感觉得写点什么,要不显得有点空,但还没想好写什么...
返回顶部
About
京ICP备13038605号
© 代码片段 2024