介紹
可能你已經閱讀C#5 關於async 和await 關鍵字以及它們如何幫助簡化異步編程的,可惜的是在升級VS2010後短短兩年時間,任然沒有準備好升級到VS2012,在VS2010和C#4中不能使用非同步關鍵字,你可能會想「如果我能在VS 2010中寫看起來同步的方法,但異步執行.我的程式碼會更清晰.」
看完這篇文章後,您將能夠做到這一點。我們將開發一個小的基礎結構程式碼,讓我們寫"看起來同步的方法,但異步執行"的方法,這個VS2012 異步關鍵字一樣, 享受C#5的特性.
我們必須承認,async 和await 是非常好的語法糖,我們的方法需要編寫更多的"AsyncResultcallback"方法適應這種變化.而當你終於升級到VS2012(或以後),這將是一件微不足道的小事,用C#關鍵字取代這個方法,只要簡單的語法變化,而不是一個艱苦的結構重寫。
概要
async/await 是基於非同步任務模式的關鍵字。鑑於 這裡已經有了非常完整的文檔描述,這裡我就不再加以說明。但必須指出的是,TAP簡直帥到極點了!透過它你可以創造大量的將在未來某個時間完成的小型單元工作(任務);任務可以啟動其他的(嵌套)任務 並且/或 建立一些僅當前置任務完成後才會啟動的後續任務。前置與後續任務則可連結為一對多或是多對一的關係。當內嵌任務完成時,父級任務無需與執行緒(重量級資源!)相綁定。執行任務時也不必再擔心執行緒的時序安排,只要做一些小提示,框架就會自動為你處理這些事情。當程式開始運行,所有的任務將如溪流匯入大海般各自走向終點,又像柏青哥的小鐵球一樣相互反彈相互作用。
然而在C#4裡面我們卻沒有async和await,不過缺少的也只是這一點點.Net5的新特性而已,這些新特性我們要么可以稍作迴避,要么可以自己構建,關鍵的Task類型還是可用的。
在一個C#5的非同步(async)方法裡,你要等待一個Task。這不會導致執行緒等待;而是這個方法傳回一個Task給它的呼叫者,這個Task能夠等待(如果它自己是非同步的)或附上後續部分。 (它同樣能在任務中或它的結果中呼叫Wait(),但這會和執行緒耦合,所以避免那樣做。)當等待的任務成功完成,你的非同步方法會在它中斷的地方繼續運行。
也許你會知道,C#5的編譯器會重寫它的非同步方法為一個產生的實作了狀態機的巢狀類別。 C#正好還有一個特徵(從2.0開始):迭代器(yield return 的方式)。這裡的方法是使用一個迭代器方法在C#4中建造狀態機,傳回一系列在全部處理過程中的等待步驟的Task。我們可以編寫一個方法接收一個從迭代器返回的任務的枚舉,返回一個重載過的Task來代表全部序列的完成以及提供它的最終結果(如果有)。
最終目標
Stephen Covey 建議我們目標有先後。這就是我們現在做的。已經有大量例子來告訴我們如何使用async/await來實現SLAMs(synchronous-looking asynchronous methods)。那我們不使用這些關鍵字如何實現這個功能。讓我們來做一個C#5 async的例子,看看如何在C#4裡實現它。然後我們討論一下轉換這些程式碼的一般方法。
下面的範例展示了我們在C#5裡實作非同步讀寫方法Stream.CopyToAsync()的一種寫法。假設這個方法並沒有在.NET5裡實作。
public static async Task CopyToAsync( this Stream input, Stream output, CancellationToken cancellationToken = default(CancellationToken)) { byte[] buffer = new byte[0x1000]; // 4 KiB while (true) { cancellationToken.ThrowIfCancellationRequested(); int bytesRead = await input.ReadAsync(buffer, 0, buffer.Length); if (bytesRead == 0) break; cancellationToken.ThrowIfCancellationRequested(); await output.WriteAsync(buffer, 0, bytesRead); } }
對C#4,我們將分成兩塊:一個是相同存取能力的方法,另一個是私有方法,參數相同但傳回型別不同。私有方法用迭代實現同樣的處理,結果是一連串等待的任務(IEnumerable
相同存取能力(公用)方法傳回與對應async方法一致的型別:void,Task,或泛型Task
public static /*async*/ Task CopyToAsync( this Stream input, Stream output, CancellationToken cancellationToken = default(CancellationToken)) { return CopyToAsyncTasks(input, output, cancellationToken).ToTask(); } private static IEnumerable<Task> CopyToAsyncTasks( Stream input, Stream output, CancellationToken cancellationToken) { byte[] buffer = new byte[0x1000]; // 4 KiB while (true) { cancellationToken.ThrowIfCancellationRequested(); var bytesReadTask = input.ReadAsync(buffer, 0, buffer.Length); yield return bytesReadTask; if (bytesReadTask.Result == 0) break; cancellationToken.ThrowIfCancellationRequested(); yield return output.WriteAsync(buffer, 0, bytesReadTask.Result); } }
异步方法通常以"Async"结尾命名(除非它是事件处理器如startButton_Click)。给迭代器以同样的名字后跟“Tasks”(如startButton_ClickTasks)。如果异步方法返回void值,它仍然会调用ToTask()但不会返回Task。如果异步方法返回Task
public /*async*/ void DoSomethingAsync() { DoSomethingAsyncTasks().ToTask(); } public /*async*/ Task DoSomethingAsync() { return DoSomethingAsyncTasks().ToTask(); } public /*async*/ Task<String> DoSomethingAsync() { return DoSomethingAsyncTasks().ToTask<String>(); }
成对的迭代器方法不会更复杂。当异步方法等待非通用的Task时,迭代器简单的将控制权转给它。当异步方法等待task结果时,迭代器将task保存在一个变量中,转到该方法,之后再使用它的返回值。两种情况在上面的CopyToAsyncTasks()例子里都有显示。
对包含通用resultTask
最后一件你需要知道的事情是如何处理中间返回的值。一个异步的方法可以从多重嵌套的块中返回;我们的迭代器简单的通过跳转到结尾来模仿它。
// C#5 public async Task<String> DoSomethingAsync() { while (…) { foreach (…) { return "Result"; } } } // C#4; DoSomethingAsync() is necessary but omitted here. private IEnumerable<Task> DoSomethingAsyncTasks() { while (…) { foreach (…) { yield return TaskEx.FromResult("Result"); goto END; } } END: ; }
现在我们知道如何在C#4中写SLAM了,但是只有实现了FromResult
简单的开端
我们将在类System.Threading.Tasks.TaskEx下实现3个方法, 先从简单的那2个方法开始。FromResult()方法先创建了一个TaskCompletionSource(), 然后给它的result赋值,最后返回Task。
public static Task<TResult> FromResult<TResult>(TResult resultValue) { var completionSource = new TaskCompletionSource<TResult>(); completionSource.SetResult(resultValue); return completionSource.Task; }
很显然, 这2个ToTask()方法基本相同, 唯一的区别就是是否给返回对象Task的Result属性赋值. 通常我们不会去写2段相同的代码, 所以我们会用其中的一个方法来实现另一个。 我们经常使用泛型来作为返回结果集,那样我们不用在意返回值同时也可以避免在最后进行类型转换。 接下来我们先实现那个没有用泛型的方法。
private abstract class VoidResult { } public static Task ToTask(this IEnumerable<Task> tasks) { return ToTask<VoidResult>(tasks); }
目前为止我们就剩下一个 ToTask
第一次天真的尝试
对于我们第一次尝试实现的方法,我们将枚举每个任务的Wait()来完成,然后将最终的任务做为结果(如果合适的话)。当然,我们不想占用当前线程,我们将另一个线程来执行循环该任务。
// BAD CODE ! public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) { var tcs = new TaskCompletionSource<TResult>(); Task.Factory.StartNew(() => { Task last = null; try { foreach (var task in tasks) { last = task; task.Wait(); } // Set the result from the last task returned, unless no result is requested. tcs.SetResult( last == null || typeof(TResult) == typeof(VoidResult) ? default(TResult) : ((Task<TResult>) last).Result); } catch (AggregateException aggrEx) { // If task.Wait() threw an exception it will be wrapped in an Aggregate; unwrap it. if (aggrEx.InnerExceptions.Count != 1) tcs.SetException(aggrEx); else if (aggrEx.InnerException is OperationCanceledException) tcs.SetCanceled(); else tcs.SetException(aggrEx.InnerException); } catch (OperationCanceledException cancEx) { tcs.SetCanceled(); } catch (Exception ex) { tcs.SetException(ex); } }); return tcs.Task; }
这里有一些好东西,事实上它真的有用,只要不触及用户界面:
它准确的返回了一个TaskCompletionSource的Task,并且通过源代码设置了完成状态。
但这里有些主要的问题。最严重的是:
是需要想点办法的时候了!
连续循环
最大的想法是直接从迭代器中获取其所产生的第一个任务。 我们创建了一个延续,使其在完成时能够检查任务的状态并且(如果成功的话)能接收下一个任务和创建另一个延续直至其结束。(如果没有,即迭代器没有需要完成的需求。)
// 很牛逼,但是我们还没有。 public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) { var taskScheduler = SynchronizationContext.Current == null ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext(); var tcs = new TaskCompletionSource<TResult>(); var taskEnumerator = tasks.GetEnumerator(); if (!taskEnumerator.MoveNext()) { tcs.SetResult(default(TResult)); return tcs.Task; } taskEnumerator.Current.ContinueWith( t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t), taskScheduler); return tcs.Task; } private static void ToTaskDoOneStep<TResult>( IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler, TaskCompletionSource<TResult> tcs, Task completedTask) { var status = completedTask.Status; if (status == TaskStatus.Canceled) { tcs.SetCanceled(); } else if (status == TaskStatus.Faulted) { tcs.SetException(completedTask.Exception); } else if (!taskEnumerator.MoveNext()) { // 设置最后任务返回的结果,直至无需结果为止。 tcs.SetResult( typeof(TResult) == typeof(VoidResult) ? default(TResult) : ((Task<TResult>) completedTask).Result); } else { taskEnumerator.Current.ContinueWith( t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t), taskScheduler); } }
这里有许多值得分享的:
我们的后续部分(continuations)使用涉及SynchronizationContext的TaskScheduler,如果有的话。这使得我们的迭代器在UI线程初始化以后,立刻或者在一个继续点被调用,去访问UI控件。
进程不中断的运行,因此没有线程挂起等待!顺便说一下,在ToTaskDoOneStep()中对自身的调用不是递归调用;它是在taskEnumerator.Currenttask结束后调用的匿名函数,当前活动在调用ContinueWith()几乎立刻退出,它完全独立于后续部分。
此外,我们在继续点中验证每个嵌套task的状态,不是检查一个预测值。
然而,这儿至少有一个大问题和一些小一点的问题。
如果迭代器抛出一个未处理异常,或者抛出OperationCanceledException而取消,我们没有处理它或设置主task的状态。这是我们以前曾经做过的但在此版本丢失了。
为了修复问题1,我们不得不在两个方法中调用MoveNext()的地方引入同样的异常处理机制。即使是现在,两个方法中都有一样的后续部分建立。我们违背了“不要重复你自己”的信条。
如果异步方法被期望给出一个结果,但是迭代器没有提供就退出了会怎么样呢?或者它最后的task是错误的类型呢?第一种情形下,我们默默返回默认的结果类型;第二种情形,我们抛出一个未处理的InvalidCastException,主task永远不会到达结束状态!我们的程序将永久的挂起。
最后,如果一个嵌套的task取消或者发生错误呢?我们设置主task状态,再也不会调用迭代器。可能是在一个using块,或带有finally的try块的内部,并且有一些清理要做。我们应当遵守过程在中断的时候使它结束,而不要等垃圾收集器去做这些。我们怎么做到呢?当然通过一个后续部分!
为了解决这些问题,我们从ToTask()中移走MoveNext()调用,取而代之一个对ToTaskDoOneStep()的初始化的同步调用。然后我们将在一个提防增加合适的异常处理。
最终版本
这里是ToTask
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) { var taskScheduler = SynchronizationContext.Current == null ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext(); var taskEnumerator = tasks.GetEnumerator(); var completionSource = new TaskCompletionSource<TResult>(); // Clean up the enumerator when the task completes. completionSource.Task.ContinueWith(t => taskEnumerator.Dispose(), taskScheduler); ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, null); return completionSource.Task; } private static void ToTaskDoOneStep<TResult>( IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler, TaskCompletionSource<TResult> completionSource, Task completedTask) { // Check status of previous nested task (if any), and stop if Canceled or Faulted. TaskStatus status; if (completedTask == null) { // This is the first task from the iterator; skip status check. } else if ((status = completedTask.Status) == TaskStatus.Canceled) { completionSource.SetCanceled(); return; } else if (status == TaskStatus.Faulted) { completionSource.SetException(completedTask.Exception); return; } // Find the next Task in the iterator; handle cancellation and other exceptions. Boolean haveMore; try { haveMore = taskEnumerator.MoveNext(); } catch (OperationCanceledException cancExc) { completionSource.SetCanceled(); return; } catch (Exception exc) { completionSource.SetException(exc); return; } if (!haveMore) { // No more tasks; set the result (if any) from the last completed task (if any). // We know it's not Canceled or Faulted because we checked at the start of this method. if (typeof(TResult) == typeof(VoidResult)) { // No result completionSource.SetResult(default(TResult)); } else if (!(completedTask is Task<TResult>)) { // Wrong result completionSource.SetException(new InvalidOperationException( "Asynchronous iterator " + taskEnumerator + " requires a final result task of type " + typeof(Task<TResult>).FullName + (completedTask == null ? ", but none was provided." : "; the actual task type was " + completedTask.GetType().FullName))); } else { completionSource.SetResult(((Task<TResult>) completedTask).Result); } } else { // When the nested task completes, continue by performing this function again. taskEnumerator.Current.ContinueWith( nextTask => ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, nextTask), taskScheduler); } }
瞧! 现在你会在Visual Studio 2010中用没有async和await的 C#4 (或 VB10)写SLAMs(看起来同步的方法,但异步执行)。
有趣的地方
直到最后那个版本,我一直在给ToTask()传递一个CancellationTokenUp,并且将它传播进后续部分的ToTaskDoOneStep()。(这与本文毫不相关,所以我去掉了它们。你可以在样例代码中看注释掉的痕迹。)这有两个原因。第一,处理OperationCanceledException时,我会检查它的CancellationToken以确认它与这个操作是匹配的。如果不是,它将用一个错误来代替取消动作。虽然技术上没错,但不幸的是取消令牌可能会混淆,在其传递给ToTask()调用和后续部分之间的无关信息使它不值得。(如果你们这些 Task专家能给我一个注释里的可确认发生的好的用例,我会重新考虑)
第二个原因是我会检查令牌是否取消,在每次MoveNext()调用迭代器之前,立即取消主task时,和退出进程的时候。这使你可以不经过迭代器检查令牌,具有取消的行为。我不认为这是要做的正确事情(因为对一个异步进程在yield return处取消是不合适的)——更可能是它完全在迭代器进程控制之下——但我想试试。它无法工作。我发现在某些情形,task会取消而却后续部分不会触发。请看样例代码;我靠继续执行来恢复按钮可用,但它没有发生因此按钮在进程结束之后仍不可用。我在样例代码中留下了注释掉的取消检测;你可以将取消令牌的方法参数放回去并测试它。(如果你们Task专家能解释为什么会是这种情形,我将很感激!)