前不久,我们发布了《选择 .NET 的 n 个理由》。它提供了对平台的高层次概述,总结了各种组件和设计决策,并承诺对所涉及的领域发表更深入的文章。这是第一篇这样深入探讨 C# 和 .NET 中 async/await 的历史、背后的设计决策和实现细节的文章。
对 async/await 的支持已经存在了十年之久。在这段时间里,它改变了为 .NET 编写可扩展代码的方式,而在不了解其底层逻辑的情况下使用该功能是可行的,也是非常常见的。在这篇文章中,我们将深入探讨 await 在语言、编译器和库级别的工作原理,以便你可以充分利用这些有价值的功能。
不过,要做到这一点,我们需要追溯到 async/await 之前,以了解在没有它的情况下最先进的异步代码是什么样子的。
最初的样子
早在 .NET Framework 1.0中,就有异步编程模型模式,又称 APM 模式、Begin/End 模式、IAsyncResult 模式。在高层次上,该模式很简单。对于同步操作 DoStuff:
class Handler
{
public int DoStuff(string arg);
}
作为模式的一部分,将有两个相应的方法:BeginDoStuff 方法和 EndDoStuff 方法:
class Handler
{
public int DoStuff(string arg);
public IAsyncResult BeginDoStuff(string arg, AsyncCallback? callback, object? state);
public int EndDoStuff(IAsyncResult asyncResult);
}
BeginDoStuff 会像 DoStuff 一样接受所有相同的参数,但除此之外,它还会接受 AsyncCallback 委托和一个不透明的状态对象,其中一个或两个都可以为 null。Begin 方法负责初始化异步操作,如果提供了回调(通常称为初始操作的“延续”),它还负责确保在异步操作完成时调用回调。Begin 方法还将构造一个实现了 IAsyncResult 的类型实例,使用可选状态填充 IAsyncResult 的 AsyncState 属性:
namespace System
{
public interface IAsyncResult
{
object? AsyncState { get; }
WaitHandle AsyncWaitHandle { get; }
bool IsCompleted { get; }
bool CompletedSynchronously { get; }
}
public delegate void AsyncCallback(IAsyncResult ar);
}
然后,这个 IAsyncResult 实例将从 Begin 方法返回,并在最终调用 AsyncCallback 时传递给它。当准备使用操作的结果时,调用者将把 IAsyncResult 实例传递给 End 方法,该方法负责确保操作已完成(如果没有完成,则通过阻塞同步等待操作完成),然后返回操作的任何结果,包括传播可能发生的任何错误和异常。因此,不用像下面这样写代码来同步执行操作:
try
{
int i = handler.DoStuff(arg);
Use(i);
}
catch (Exception e)
{
... // handle exceptions from DoStuff and Use
}
可以按以下方式使用 Begin/End 方法异步执行相同的操作:
try
{
handler.BeginDoStuff(arg, iar =>
{
try
{
Handler handler = (Handler)iar.AsyncState!;
int i = handler.EndDoStuff(iar);
Use(i);
}
catch (Exception e2)
{
... // handle exceptions from EndDoStuff and Use
}
}, handler);
}
catch (Exception e)
{
... // handle exceptions thrown from the synchronous call to BeginDoStuff
}
对于在任何语言中处理过基于回调的 API 的人来说,这应该感觉很熟悉。
然而,事情从此变得更加复杂。例如,有一个”stack dives”的问题。stack dives 是指代码反复调用,在堆栈中越陷越深,以至于可能出现堆栈溢出。如果操作同步完成,Begin 方法被允许同步调用回调,这意味着对 Begin 的调用本身可能直接调用回调。同步完成的 “异步 “操作实际上是很常见的;它们不是 “异步”,因为它们被保证异步完成,而只是被允许这样做。
这是一种真实的可能性,很容易再现。在 .NET Core 上试试这个程序:
using System.NET;
using System.NET.Sockets;
using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
listener.Listen();
using Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(listener.LocalEndPoint!);
using Socket server = listener.Accept();
_ = server.SendAsync(new byte[100_000]);
var mres = new ManualResetEventSlim();
byte[] buffer = new byte[1];
var stream = new NetworkStream(client);
void ReadAgain()
{
stream.BeginRead(buffer, 0, 1, iar =>
{
if (stream.EndRead(iar) != 0)
{
ReadAgain(); // uh oh!
}
else
{
mres.Set();
}
}, null);
};
ReadAgain();
mres.Wait();
在这里,我设置了一个相互连接的简单客户端套接字和服务器套接字。服务器向客户端发送100,000字节,然后客户端继续使用 BeginRead/EndRead 来“异步”地每次读取一个字节。传给 BeginRead 的回调函数通过调用 EndRead 来完成读取,然后如果它成功读取了所需的字节,它会通过递归调用 ReadAgain 局部函数来发出另一个 BeginRead。然而,在 .NET Core 中,套接字操作比在 .NET Framework 上快得多,并且如果操作系统能够满足同步操作,它将同步完成(注意内核本身有一个缓冲区用于满足套接字接收操作)。因此,这个堆栈会溢出:
因此,APM 模型中内置了补偿机制。有两种可能的方法可以弥补这一点:
1.不要允许 AsyncCallback 被同步调用。如果一直异步调用它,即使操作以同步方式完成,那么 stack dives 的风险也会消失。但是性能也是如此,因为同步完成的操作(或者快到无法观察到它们的区别)是非常常见的,强迫每个操作排队回调会增加可测量的开销。
2.使用一种机制,允许调用方而不是回调方在操作同步完成时执行延续工作。这样,您就可以避开额外的方法框架,继续执行后续工作,而不深入堆栈。
APM 模式与方法2一起使用。为此,IAsyncResult 接口公开了两个相关但不同的成员:IsCompleted 和 CompletedSynchronously。IsCompleted 告诉你操作是否已经完成,可以多次检查它,最终它会从 false 转换为 true,然后保持不变。相比之下,CompletedSynchronously 永远不会改变(如果改变了,那就是一个令人讨厌的 bug)。它用于 Begin 方法的调用者和 AsyncCallback 之间的通信,他们中的一个负责执行任何延续工作。如果 CompletedSynchronously 为 false,则操作是异步完成的,响应操作完成的任何后续工作都应该留给回调;毕竟,如果工作没有同步完成,Begin 的调用方无法真正处理它,因为还不知道操作已经完成(如果调用方只是调用 End,它将阻塞直到操作完成)。然而,如果 CompletedSynchronously 为真,如果回调要处理延续工作,那么它就有 stack dives 的风险,因为它将在堆栈上执行比开始时更深的延续工作。因此,任何涉及到这种堆栈潜水的实现都需要检查 CompletedSynchronously,并让 Begin 方法的调用者执行延续工作(如果它为真),这意味着回调不需要执行延续工作。这也是 CompletedSynchronously 永远不能更改的原因,调用方和回调方需要看到相同的值,以确保不管竞争条件如何,延续工作只执行一次。
我们都习惯了现代语言中的控制流结构为我们提供的强大和简单性,一旦引入了任何合理的复杂性,而基于回调的方法通常会与这种结构相冲突。其他主流语言也没有更好的替代方案。
我们需要一种更好的方法,一种从 APM 模式中学习的方法,融合它正确的东西,同时避免它的陷阱。值得注意的是,APM 模式只是一种模式。运行时间、核心库和编译器在使用或实现该模式时并没有提供任何帮助。
基于事件的异步模式
.NET Framework 2.0引入了一些 API,实现了处理异步操作的不同模式,这种模式主要用于在客户端应用程序上下文中处理异步操作。这种基于事件的异步模式或 EAP 也作为一对成员出现,这次是一个用于初始化异步操作的方法和一个用于侦听其完成的事件。因此,我们之前的 DoStuff 示例可能被公开为一组成员,如下所示:
class Handler
{
public int DoStuff(string arg);
public void DoStuffAsync(string arg, object? userToken);
public event DoStuffEventHandler? DoStuffCompleted;
}
public delegate void DoStuffEventHandler(object sender, DoStuffEventArgs e);
public class DoStuffEventArgs : AsyncCompletedEventArgs
{
public DoStuffEventArgs(int result, Exception? error, bool canceled, object? userToken) :
base(error, canceled, usertoken) => Result = result;
public int Result { get; }
}
你需要用 DoStuffCompleted 事件注册你的后续工作,然后调用 DoStuffAsync 方法;它将启动该操作,并且在该操作完成时,调用者将异步地引发 DoStuffCompleted 事件。然后,处理程序可以继续执行后续工作,可能会验证所提供的 userToken 与它所期望的进行匹配,从而允许多个处理程序同时连接到事件。
这种模式使一些用例变得更简单,同时使其他用例变得更加困难(考虑到前面的 APM CopyStreamToStream 示例,这说明了一些问题)。它没有以广泛的方式推出,只是在一个单独的 .NET Framework 版本中匆匆的出现又消失了,尽管留下了它使用期间添加的 api,如 Ping.SendAsync/Ping.PingCompleted:
public class Ping : Component
{
public void SendAsync(string hostNameOrAddress, object? userToken);
public event PingCompletedEventHandler? PingCompleted;
...
}
然而,它确实取得了一个 APM 模式完全没有考虑到的显著进步,并且这一点一直延续到我们今天所接受的模型中: SynchronizationContext。
考虑到像 Windows Forms 这样的 UI 框架。与 Windows 上的大多数 UI 框架一样,控件与特定的线程相关联,该线程运行一个消息泵,该消息泵运行能够与这些控件交互的工作,只有该线程应该尝试操作这些控件,而任何其他想要与控件交互的线程都应该通过发送消息由 UI 线程的泵消耗来完成操作。Windows 窗体使用 ControlBeginInvoke 等方法使这变得很容易,它将提供的委托和参数排队,由与该控件相关联的任何线程运行。因此,你可以这样编写代码:
private void button1_Click(object sender, EventArgs e)
{
ThreadPool.QueueUserWorkItem(_ =>
{
string message = ComputeMessage();
button1.BeginInvoke(() =>
{
button1.Text = message;
});
});
}
这将卸载在 ThreadPool 线程上完成的 ComputeMessage()工作(以便在处理 UI 的过程中保持 UI 的响应性),然后在工作完成时,将委托队列返回到与 button1 相关的线程,以更新 button1 的标签。这很简单,WPF 也有类似的东西,只是用它的 Dispatcher 类型:
private void button1_Click(object sender, RoutedEventArgs e){
ThreadPool.QueueUserWorkItem(_ =>
{
string message = ComputeMessage();
button1.Dispatcher.InvokeAsync(() =>
{
button1.Content = message;
});
});}
.NET MAUI 也有类似的功能。但如果我想把这个逻辑放到辅助方法中呢?
E.g.
// Call ComputeMessage and then invoke the update action to update controls.internal static void ComputeMessageAndInvokeUpdate(Action update) { ... }
然后我可以这样使用它:
private void button1_Click(object sender, EventArgs e){
ComputeMessageAndInvokeUpdate(message => button1.Text = message);}
但是如何实现 ComputeMessageAndInvokeUpdate,使其能够在这些应用程序中工作呢?是否需要硬编码才能了解每个可能的 UI 框架?这就是 SynchronizationContext 的魅力所在。我们可以这样实现这个方法:
internal static void ComputeMessageAndInvokeUpdate(Action update){
SynchronizationContext? sc = SynchronizationContext.Current;
ThreadPool.QueueUserWorkItem(_ =>
{
string message = ComputeMessage();
if (sc is not null)
{
sc.Post(_ => update(message), null);
}
else
{
update(message);
}
});}
它使用 SynchronizationContext 作为一个抽象,目标是任何“调度器”,应该用于回到与 UI 交互的必要环境。然后,每个应用程序模型确保它作为 SynchronizationContext.Current 发布一个 SynchronizationContext-derived 类型,去做 “正确的事情”。例如,Windows Forms 有这个:
public sealed class WindowsFormsSynchronizationContext : SynchronizationContext, IDisposable{
public override void Post(SendOrPostCallback d, object? state) =>
_controlToSendTo?.BeginInvoke(d, new object?[] { state });
...}
WPF 有这个:
public sealed class DispatcherSynchronizationContext : SynchronizationContext{
public override void Post(SendOrPostCallback d, Object state) =>
_dispatcher.BeginInvoke(_priority, d, state);
...}
ASP.NET 曾经有一个,它实际上并不关心工作在什么线程上运行,而是关心给定的请求相关的工作被序列化,这样多个线程就不会并发地访问给定的 HttpContext:
internal sealed class AspNetSynchronizationContext : AspNetSynchronizationContextBase{
public override void Post(SendOrPostCallback callback, Object state) =>
_state.Helper.QueueAsynchronous(() => callback(state));
...}
这也不限于这些主要的应用程序模型。例如,xunit 是一个流行的单元测试框架,是 .NET 核心存储库用于单元测试的框架,它也采用了多个自定义的 SynchronizationContext。例如,你可以允许并行运行测试,但限制允许并发运行的测试数量。这是如何实现的呢?通过 SynchronizationContext:
public class MaxConcurrencySyncContext : SynchronizationContext, IDisposable{
public override void Post(SendOrPostCallback d, object? state)
{
var context = ExecutionContext.Capture();
workQueue.Enqueue((d, state, context));
workReady.Set();
}}
MaxConcurrencySyncContext 的 Post 方法只是将工作排到自己的内部工作队列中,然后在它自己的工作线程上处理它,它根据所需的最大并发数来控制有多少工作线程。
这与基于事件的异步模式有什么联系?EAP 和 SynchronizationContext 是同时引入的,当异步操作被启动时,EAP 规定完成事件应该排队到当前任何 SynchronizationContext 中。为了稍微简化一下,System.ComponentModel 中也引入了一些辅助类型,尤其是 AsyncOperation 和 AsyncOperationManager。前者只是一个元组,封装了用户提供的状态对象和捕获的 SynchronizationContext,而后者只是作为一个简单的工厂来捕获并创建 AsyncOperation 实例。然后 EAP 实现将使用这些,例如 Ping.SendAsync 调用 AsyncOperationManager.CreateOperation 来捕获 SynchronizationContext。当操作完成时,AsyncOperation 的 PostOperationCompleted 方法将被调用,以调用存储的 SynchronizationContext 的 Post 方法。
我们需要比 APM 模式更好的东西,接下来出现的 EAP 引入了一些新的事务,但并没有真正解决我们面临的核心问题。我们仍然需要更好的东西。
输入任务
.NET Framework 4.0 引入了 System.Threading.Tasks.Task 类型。从本质上讲,Task 只是一个数据结构,表示某些异步操作的最终完成(其他框架将类似的类型称为“promise”或“future”)。创建 Task 是为了表示某些操作,然后当它表示的操作逻辑上完成时,结果存储到该 Task 中。但是 Task 提供的关键特性使它比 IAsyncResult 更有用,它在自己内部内置了 continuation 的概念。这一特性意味着您可以访问任何 Task,并在其完成时请求异步通知,由任务本身处理同步,以确保继续被调用,无论任务是否已经完成、尚未完成、还是与通知请求同时完成。为什么会有如此大的影响?如果你还记得我们对旧 APM 模式的讨论,有两个主要问题。
- 你必须为每个操作实现一个自定义的 IAsyncResult 实现:没有内置的 IAsyncResult 实现,任何人都可以根据需要使用。
- 在 Begin 方法被调用之前,你必须知道当它完成时要做什么。这使得实现组合器和其他用于消耗和组合任意异步实现的通用例程成为一个重大挑战。
现在,让我们更好地理解它的实际含义。我们先从几个字段开始:
class MyTask{
private bool _completed;
private Exception? _error;
private Action? _continuation;
private ExecutionContext? _ec;
...}
我们需要一个字段来知道任务是否完成(_completed),还需要一个字段来存储导致任务失败的任何错误(_error);如果我们还要实现一个通用的 MyTask
如前所述,与以前的模型相比,Task 的一个基本进步是能够在操作开始后提供延续工作(回调)。我们需要一个方法来做到这一点,所以让我们添加 ContinueWith:
public void ContinueWith(Action action){
lock (this)
{
if (_completed)
{
ThreadPool.QueueUserWorkItem(_ => action(this));
}
else if (_continuation is not null)
{
throw new InvalidOperationException("Unlike Task, this implementation only supports a single continuation.");
}
else
{
_continuation = action;
_ec = ExecutionContext.Capture();
}
}}
如果任务在 ContinueWith 被调用时已经被标记为完成,ContinueWith 只是排队执行委托。否则,该方法将存储该委托,以便在任务完成时可以排队继续执行(它还存储了一个叫做 ExecutionContext 的东西,然后在以后调用该委托时使用它)。
然后,我们需要能够将 MyTask 标记为完成,这意味着它所代表的异步操作已经完成。为此,我们将提供两个方法,一个用于标记完成(” SetResult “),另一个用于标记完成并返回错误(” SetException “):
public void SetResult() => Complete(null);
public void SetException(Exception error) => Complete(error);
private void Complete(Exception? error){
lock (this)
{
if (_completed)
{
throw new InvalidOperationException("Already completed");
}
_error = error;
_completed = true;
if (_continuation is not null)
{
ThreadPool.QueueUserWorkItem(_ =>
{
if (_ec is not null)
{
ExecutionContext.Run(_ec, _ => _continuation(this), null);
}
else
{
_continuation(this);
}
});
}
}}
我们存储任何错误,将任务标记为已完成,然后如果之前已经注册了 continuation,则将其排队等待调用。
最后,我们需要一种方法来传播任务中可能发生的任何异常(并且,如果这是一个泛型 MyTask
public void Wait(){
ManualResetEventSlim? mres = null;
lock (this)
{
if (!_completed)
{
mres = new ManualResetEventSlim();
ContinueWith(_ => mres.Set());
}
}
mres?.Wait();
if (_error is not null)
{
ExceptionDispatchInfo.Throw(_error);
}}
基本上就是这样。现在可以肯定的是,真正的 Task 要复杂得多,有更高效的实现,支持任意数量的 continuation,有大量关于它应该如何表现的按钮(例如,continuation 应该像这里所做的那样排队,还是应该作为任务完成的一部分同步调用),能够存储多个异常而不是一个异常,具有取消的特殊知识,有大量的辅助方法用于执行常见操作,例如 Task.Run,它创建一个 Task 来表示线程池上调用的委托队列等等。
你可能还注意到,我简单的 MyTask 直接有公共的 SetResult/SetException 方法,而 Task 没有。实际上,Task 确实有这样的方法,它们只是内部的,System.Threading.Tasks.TaskCompletionSource 类型作为任务及其完成的独立“生产者”;这样做不是出于技术上的需要,而是为了让完成方法远离只用于消费的东西。然后,你就可以把 Task 分发出去,而不必担心它会在你下面完成;完成信号是创建任务的实现细节,并且通过保留 TaskCompletionSource 本身来保留完成它的权利。(CancellationToken 和 CancellationTokenSource 遵循类似的模式:CancellationToken 只是 CancellationTokenSource 的一个结构封装器,只提供与消费取消信号相关的公共区域,但没有产生取消信号的能力,而产生取消信号的能力仅限于能够访问 CancellationTokenSource的人。)
当然,我们可以为这个 MyTask 实现组合器和辅助器,就像 Task 提供的那样。想要一个简单的 MyTask.WhenAll?
public static MyTask WhenAll(MyTask t1, MyTask t2){
var t = new MyTask();
int remaining = 2;
Exception? e = null;
Action continuation = completed =>
{
e ??= completed._error; // just store a single exception for simplicity
if (Interlocked.Decrement(ref remaining) == 0)
{
if (e is not null) t.SetException(e);
else t.SetResult();
}
};
t1.ContinueWith(continuation);
t2.ContinueWith(continuation);
return t;}
想要一个 MyTask.Run?你得到了它:
public static MyTask Run(Action action){
var t = new MyTask();
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
action();
t.SetResult();
}
catch (Exception e)
{
t.SetException(e);
}
});
return t;}
一个 MyTask.Delay 怎么样?当然可以:
public static MyTask Delay(TimeSpan delay){
var t = new MyTask();
var timer = new Timer(_ => t.SetResult());
timer.Change(delay, Timeout.InfiniteTimeSpan);
return t;}
有了 Task,.NET 中之前的所有异步模式都将成为过去。在以前使用 APM 模式或 EAP 模式实现异步实现的地方,都会公开新的 Task 返回方法。
▌ValueTasks
时至今日,Task 仍然是 .NET 中异步处理的主力,每次发布都有新方法公开,并且在整个生态系统中都例行地返回 Task 和 Task
缓存一些 Task
这就是 ValueTask
public readonly struct ValueTask{
private readonly Task? _task;
private readonly TResult _result;
...}
然后,一个方法可以返回这样一个 ValueTask
然而,在一些超级极端的高性能场景中,即使在异步完成的情况下,您也希望能够避免 Task
这就是 System.Threading.Tasks.Sources.IValueTaskSource
public interface IValueTaskSource{
ValueTaskSourceStatus GetStatus(short token);
void OnCompleted(Action
IValueTaskSource
public readonly struct ValueTask{
private readonly object? _obj;
private readonly TResult _result;
...}
以前 _task 字段要么是 Task
我提到了 ValueTask
因此,我们有 Task、Task
下期文章,我们将继续介绍 C# 迭代器,欢迎持续关注。