Home > Web Front-end > JS Tutorial > Using iterators to handle waiting tasks in C#_Basic knowledge

Using iterators to handle waiting tasks in C#_Basic knowledge

WBOY
Release: 2016-05-16 15:50:31
Original
1036 people have browsed it

Introduction

Maybe you have read about C# 5 about async and await keywords and how they help simplify asynchronous programming. Unfortunately, just two years after upgrading VS2010, you are still not ready to upgrade to VS2012. After VS2010 and The async keyword is not available in C# 4, and you might be thinking "If I could write methods in VS 2010 that looked synchronous, but executed asynchronously. My code would be cleaner."

After reading this article, you will be able to do just that. We will develop a small infrastructure code that allows us to write "methods that look synchronous, but execute asynchronously", using the same VS2012 async keyword, and enjoying the features of C#5.

We must admit that async and await are very good syntactic sugar, and our method needs to write more "AsyncResultcallback" methods to adapt to this change. And when you finally upgrade to VS2012 (or later), this will be a problem A trivial thing, replacing this method with a C# keyword would be a simple syntax change rather than a painstaking structural rewrite.

Summary

async/await is a keyword based on the asynchronous task pattern. Since here already has a very complete documentation, I won’t explain it here. But it must be pointed out that TAP is extremely handsome! With it you can create a large number of small units of work (tasks) that will be completed at some time in the future; tasks can start other (nested) tasks and/or create some follow-up tasks that will only be started after the completion of the previous task. Predecessor and follow-up tasks can be linked in a one-to-many or many-to-one relationship. When the embedded task completes, the parent task does not need to be tied to a thread (a heavyweight resource!). You no longer have to worry about thread timing when executing tasks. You just need to make some small prompts and the framework will automatically handle these things for you. When the program starts running, all the tasks will reach their respective destinations like streams flowing into the ocean, bouncing off and interacting with each other like Pachinko's little iron balls.

However, we do not have async and await in C#4, but what is missing is just this little new feature of .Net5. We can either avoid these new features for a while, or we can build them ourselves. The key Task type Still available.


In a C#5 asynchronous (async) method, you have to wait for a Task. This does not cause the thread to wait; rather the method returns a Task to its caller, which can wait (if it is itself asynchronous) or attach a follow-up part. (It can also call Wait() on the task or on its result, but this will couple to the thread, so avoid doing that.) When the awaited task completes successfully, your asynchronous method will continue running where it left off.

Perhaps you will know that the C#5 compiler will rewrite its asynchronous method into a generated nested class that implements a state machine. C# happens to have one more feature (starting from 2.0): iterators (yield return method). The approach here is to build a state machine in C# 4 using an iterator method that returns a series of Tasks with waiting steps throughout the process. We can write a method that receives an enumeration of tasks returned from the iterator, and returns an overloaded Task to represent the completion of the entire sequence and provide its final result (if any).

Ultimate goal

Stephen Covey suggested that we prioritize our goals. This is what we do now. There are already a lot of examples showing us how to use async/await to implement SLAMs (synchronous-looking asynchronous methods). So how do we implement this function without using these keywords. Let's do a C#5 async example and see how to implement it in C#4. Then we discuss the general approach to converting these codes.

The following example shows how we implement the asynchronous reading and writing method Stream.CopyToAsync() in C#5. Assume that this method is not implemented in .NET5.

public static async Task CopyToAsync(
  this Stream input, Stream output,
  CancellationToken cancellationToken = default(CancellationToken))
{
  byte[] buffer = new byte[0x1000];  // 4 KiB
  while (true) {
    cancellationToken.ThrowIfCancellationRequested();
    int bytesRead = await input.ReadAsync(buffer, 0, buffer.Length);
    if (bytesRead == 0) break;
 
    cancellationToken.ThrowIfCancellationRequested();
    await output.WriteAsync(buffer, 0, bytesRead);
  }
}
Copy after login


For C#4, we will split it into two parts: one is a method with the same access capabilities, and the other is a private method with the same parameters but different return types. The private method implements the same process using iteration, and the result is a series of waiting tasks (IEnumerable). The actual tasks in the sequence can be non-generic or any combination of different types of generics. (Fortunately, the generic Task type is a subtype of the non-generic Task type)

The same access capability (public) method returns the same type as the corresponding async method: void, Task, or the generic Task. It will use the extension method to call the private iterator and convert it to Task or Task.

public static /*async*/ Task CopyToAsync(
  this Stream input, Stream output,
  CancellationToken cancellationToken = default(CancellationToken))
{
  return CopyToAsyncTasks(input, output, cancellationToken).ToTask();
}
private static IEnumerable<Task> CopyToAsyncTasks(
  Stream input, Stream output,
  CancellationToken cancellationToken)
{
  byte[] buffer = new byte[0x1000];  // 4 KiB
  while (true) {
    cancellationToken.ThrowIfCancellationRequested();
    var bytesReadTask = input.ReadAsync(buffer, 0, buffer.Length);
    yield return bytesReadTask;
    if (bytesReadTask.Result == 0) break;
 
    cancellationToken.ThrowIfCancellationRequested();
    yield return output.WriteAsync(buffer, 0, bytesReadTask.Result);
  }
}
Copy after login

异步方法通常以"Async"结尾命名(除非它是事件处理器如startButton_Click)。给迭代器以同样的名字后跟“Tasks”(如startButton_ClickTasks)。如果异步方法返回void值,它仍然会调用ToTask()但不会返回Task。如果异步方法返回Task,那么它就会调用通用的ToTask()扩展方法。对应三种返回类型,异步可替代的方法像下面这样:

public /*async*/ void DoSomethingAsync() {
  DoSomethingAsyncTasks().ToTask();
}
public /*async*/ Task DoSomethingAsync() {
  return DoSomethingAsyncTasks().ToTask();
}
public /*async*/ Task<String> DoSomethingAsync() {
  return DoSomethingAsyncTasks().ToTask<String>();
}
Copy after login

成对的迭代器方法不会更复杂。当异步方法等待非通用的Task时,迭代器简单的将控制权转给它。当异步方法等待task结果时,迭代器将task保存在一个变量中,转到该方法,之后再使用它的返回值。两种情况在上面的CopyToAsyncTasks()例子里都有显示。

对包含通用resultTask的SLAM,迭代器必须将控制转交给确切的类型。ToTask()将最终的task转换为那种类型以便提取其结果。经常的你的迭代器将计算来自中间task的结果数值,而且仅需要将其打包在Task中。.NET 5为此提供了一个方便的静态方法。而.NET 4没有,所以我们用TaskEx.FromResult(value)来实现它。

最后一件你需要知道的事情是如何处理中间返回的值。一个异步的方法可以从多重嵌套的块中返回;我们的迭代器简单的通过跳转到结尾来模仿它。


// C#5
public async Task<String> DoSomethingAsync() {
  while (…) {
    foreach (…) {
      return "Result";
    }
  }
}
 
// C#4; DoSomethingAsync() is necessary but omitted here.
private IEnumerable<Task> DoSomethingAsyncTasks() {
  while (…) {
    foreach (…) {
      yield return TaskEx.FromResult("Result");
      goto END;
    }
  }
END: ;
}
Copy after login

现在我们知道如何在C#4中写SLAM了,但是只有实现了FromResult()和两个 ToTask()扩展方法才能真正的做到。下面我们开始做吧。

简单的开端

我们将在类System.Threading.Tasks.TaskEx下实现3个方法, 先从简单的那2个方法开始。FromResult()方法先创建了一个TaskCompletionSource(), 然后给它的result赋值,最后返回Task。

public static Task<TResult> FromResult<TResult>(TResult resultValue) {
  var completionSource = new TaskCompletionSource<TResult>();
  completionSource.SetResult(resultValue);
  return completionSource.Task;
}
Copy after login

很显然, 这2个ToTask()方法基本相同, 唯一的区别就是是否给返回对象Task的Result属性赋值. 通常我们不会去写2段相同的代码, 所以我们会用其中的一个方法来实现另一个。 我们经常使用泛型来作为返回结果集,那样我们不用在意返回值同时也可以避免在最后进行类型转换。 接下来我们先实现那个没有用泛型的方法。

private abstract class VoidResult { }
 
public static Task ToTask(this IEnumerable<Task> tasks) {
  return ToTask<VoidResult>(tasks);
}
Copy after login

目前为止我们就剩下一个 ToTask()方法还没有实现。

第一次天真的尝试

对于我们第一次尝试实现的方法,我们将枚举每个任务的Wait()来完成,然后将最终的任务做为结果(如果合适的话)。当然,我们不想占用当前线程,我们将另一个线程来执行循环该任务。

// BAD CODE !
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks)
{
  var tcs = new TaskCompletionSource<TResult>();
  Task.Factory.StartNew(() => {
    Task last = null;
    try {
      foreach (var task in tasks) {
        last = task;
        task.Wait();
      }
 
      // Set the result from the last task returned, unless no result is requested.
      tcs.SetResult(
        last == null || typeof(TResult) == typeof(VoidResult)
          &#63; default(TResult) : ((Task<TResult>) last).Result);
 
    } catch (AggregateException aggrEx) {
      // If task.Wait() threw an exception it will be wrapped in an Aggregate; unwrap it.
      if (aggrEx.InnerExceptions.Count != 1) tcs.SetException(aggrEx);
      else if (aggrEx.InnerException is OperationCanceledException) tcs.SetCanceled();
      else tcs.SetException(aggrEx.InnerException);
    } catch (OperationCanceledException cancEx) {
      tcs.SetCanceled();
    } catch (Exception ex) {
      tcs.SetException(ex);
    }
  });
  return tcs.Task;
}
Copy after login


这里有一些好东西,事实上它真的有用,只要不触及用户界面:
它准确的返回了一个TaskCompletionSource的Task,并且通过源代码设置了完成状态。

  • 它显示了我们怎么通过迭代器的最后一个任务设置task的最终Result,同时避免可能没有结果的情况。
  • 它从迭代器中捕获异常并设置Canceled或Faulted状态. 它也传播枚举的task状态 (这里是通过Wait(),该方法可能抛出一个包装了cancellation或fault的异常的集合).

但这里有些主要的问题。最严重的是:

  • 由于迭代器需要实现“异步态的”的诺言,当它从一个UI线程初始化以后,迭代器的方法将能访问UI控件。你能发现这里的foreach循环都是运行在后台;从那个时刻开始不要触摸UI!这种方法没有顾及SynchronizationContext。
  • 在UI之外我们也有麻烦。我们可能想制造大量大量的由SLAM实现的并行运行的Tasks。但是看看循环中的Wait()!当等待一个嵌套task时,可能远程需要一个很长的时间完成,我们会挂起一个线程。我们面临线程池的线程资源枯竭的情况。
  • 这种解包Aggregate异常的方法是不太自然的。我们需要捕获并传播它的完成状态而不抛出异常。
  • 有时SLAM可以立刻决定它的完成状态。那种情形下,C#5的async可以异步并且有效的操作。这里我们总是计划了一个后台task,因此失去了那种可能。

是需要想点办法的时候了!

连续循环

最大的想法是直接从迭代器中获取其所产生的第一个任务。 我们创建了一个延续,使其在完成时能够检查任务的状态并且(如果成功的话)能接收下一个任务和创建另一个延续直至其结束。(如果没有,即迭代器没有需要完成的需求。)


// 很牛逼,但是我们还没有。
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks)
{
  var taskScheduler =
    SynchronizationContext.Current == null
      &#63; TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();
  var tcs = new TaskCompletionSource<TResult>();
  var taskEnumerator = tasks.GetEnumerator();
  if (!taskEnumerator.MoveNext()) {
    tcs.SetResult(default(TResult));
    return tcs.Task;
  }
 
  taskEnumerator.Current.ContinueWith(
    t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),
    taskScheduler);
  return tcs.Task;
}
private static void ToTaskDoOneStep<TResult>(
  IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,
  TaskCompletionSource<TResult> tcs, Task completedTask)
{
  var status = completedTask.Status;
  if (status == TaskStatus.Canceled) {
    tcs.SetCanceled();
 
  } else if (status == TaskStatus.Faulted) {
    tcs.SetException(completedTask.Exception);
 
  } else if (!taskEnumerator.MoveNext()) {
    // 设置最后任务返回的结果,直至无需结果为止。
    tcs.SetResult(
      typeof(TResult) == typeof(VoidResult)
        &#63; default(TResult) : ((Task<TResult>) completedTask).Result);
 
  } else {
    taskEnumerator.Current.ContinueWith(
      t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),
      taskScheduler);
  }
}
Copy after login

这里有许多值得分享的:


我们的后续部分(continuations)使用涉及SynchronizationContext的TaskScheduler,如果有的话。这使得我们的迭代器在UI线程初始化以后,立刻或者在一个继续点被调用,去访问UI控件。
进程不中断的运行,因此没有线程挂起等待!顺便说一下,在ToTaskDoOneStep()中对自身的调用不是递归调用;它是在taskEnumerator.Currenttask结束后调用的匿名函数,当前活动在调用ContinueWith()几乎立刻退出,它完全独立于后续部分。
此外,我们在继续点中验证每个嵌套task的状态,不是检查一个预测值。


然而,这儿至少有一个大问题和一些小一点的问题。

如果迭代器抛出一个未处理异常,或者抛出OperationCanceledException而取消,我们没有处理它或设置主task的状态。这是我们以前曾经做过的但在此版本丢失了。
为了修复问题1,我们不得不在两个方法中调用MoveNext()的地方引入同样的异常处理机制。即使是现在,两个方法中都有一样的后续部分建立。我们违背了“不要重复你自己”的信条。

如果异步方法被期望给出一个结果,但是迭代器没有提供就退出了会怎么样呢?或者它最后的task是错误的类型呢?第一种情形下,我们默默返回默认的结果类型;第二种情形,我们抛出一个未处理的InvalidCastException,主task永远不会到达结束状态!我们的程序将永久的挂起。

最后,如果一个嵌套的task取消或者发生错误呢?我们设置主task状态,再也不会调用迭代器。可能是在一个using块,或带有finally的try块的内部,并且有一些清理要做。我们应当遵守过程在中断的时候使它结束,而不要等垃圾收集器去做这些。我们怎么做到呢?当然通过一个后续部分!

为了解决这些问题,我们从ToTask()中移走MoveNext()调用,取而代之一个对ToTaskDoOneStep()的初始化的同步调用。然后我们将在一个提防增加合适的异常处理。

最终版本

这里是ToTask()的最终实现. 它用一个TaskCompletionSource返回主task,永远不会引起线程等待,如果有的话还会涉及SynchronizationContext,由迭代器处理异常,直接传播嵌套task的结束(而不是AggregateException),合适的时候向主task返回一个值,当期望一个结果而SLAM迭代器没有以正确的genericTask类型结束时,用一个友好的异常报错。

public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) {
  var taskScheduler =
    SynchronizationContext.Current == null
      &#63; TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();
  var taskEnumerator = tasks.GetEnumerator();
  var completionSource = new TaskCompletionSource<TResult>();
 
  // Clean up the enumerator when the task completes.
  completionSource.Task.ContinueWith(t => taskEnumerator.Dispose(), taskScheduler);
 
  ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, null);
  return completionSource.Task;
}
 
private static void ToTaskDoOneStep<TResult>(
  IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,
  TaskCompletionSource<TResult> completionSource, Task completedTask)
{
  // Check status of previous nested task (if any), and stop if Canceled or Faulted.
  TaskStatus status;
  if (completedTask == null) {
    // This is the first task from the iterator; skip status check.
  } else if ((status = completedTask.Status) == TaskStatus.Canceled) {
    completionSource.SetCanceled();
    return;
  } else if (status == TaskStatus.Faulted) {
    completionSource.SetException(completedTask.Exception);
    return;
  }
 
  // Find the next Task in the iterator; handle cancellation and other exceptions.
  Boolean haveMore;
  try {
    haveMore = taskEnumerator.MoveNext();
 
  } catch (OperationCanceledException cancExc) {
    completionSource.SetCanceled();
    return;
  } catch (Exception exc) {
    completionSource.SetException(exc);
    return;
  }
 
  if (!haveMore) {
    // No more tasks; set the result (if any) from the last completed task (if any).
    // We know it's not Canceled or Faulted because we checked at the start of this method.
    if (typeof(TResult) == typeof(VoidResult)) {    // No result
      completionSource.SetResult(default(TResult));
 
    } else if (!(completedTask is Task<TResult>)) {   // Wrong result
      completionSource.SetException(new InvalidOperationException(
        "Asynchronous iterator " + taskEnumerator +
          " requires a final result task of type " + typeof(Task<TResult>).FullName +
          (completedTask == null &#63; ", but none was provided." :
            "; the actual task type was " + completedTask.GetType().FullName)));
 
    } else {
      completionSource.SetResult(((Task<TResult>) completedTask).Result);
    }
 
  } else {
    // When the nested task completes, continue by performing this function again.
    taskEnumerator.Current.ContinueWith(
      nextTask => ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, nextTask),
      taskScheduler);
  }
}
Copy after login

瞧! 现在你会在Visual Studio 2010中用没有async和await的 C#4 (或 VB10)写SLAMs(看起来同步的方法,但异步执行)。

有趣的地方

直到最后那个版本,我一直在给ToTask()传递一个CancellationTokenUp,并且将它传播进后续部分的ToTaskDoOneStep()。(这与本文毫不相关,所以我去掉了它们。你可以在样例代码中看注释掉的痕迹。)这有两个原因。第一,处理OperationCanceledException时,我会检查它的CancellationToken以确认它与这个操作是匹配的。如果不是,它将用一个错误来代替取消动作。虽然技术上没错,但不幸的是取消令牌可能会混淆,在其传递给ToTask()调用和后续部分之间的无关信息使它不值得。(如果你们这些 Task专家能给我一个注释里的可确认发生的好的用例,我会重新考虑)

第二个原因是我会检查令牌是否取消,在每次MoveNext()调用迭代器之前,立即取消主task时,和退出进程的时候。这使你可以不经过迭代器检查令牌,具有取消的行为。我不认为这是要做的正确事情(因为对一个异步进程在yield return处取消是不合适的)——更可能是它完全在迭代器进程控制之下——但我想试试。它无法工作。我发现在某些情形,task会取消而却后续部分不会触发。请看样例代码;我靠继续执行来恢复按钮可用,但它没有发生因此按钮在进程结束之后仍不可用。我在样例代码中留下了注释掉的取消检测;你可以将取消令牌的方法参数放回去并测试它。(如果你们Task专家能解释为什么会是这种情形,我将很感激!)

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template