简介
之前我们已经学习过了,有好几种方式在.NET和C#中创建异步程序。其中一个是基 于事件的异步模式,在之前的章节中已经提及过。引人事件的初始目的是简化观察者设计模 式的实现该模式常用于实现对象间的通讯。
当我们讨论任务并行库时,我们注意到事件的主要缺点是它们不能有效地相互结合。另一 个缺点是基于事件的异步模式不能处理通知的顺序。想象一下我们有IEnurnerable
相反的方式是基于推送(push-based)的方式,即生产者通知客户端有新值要处理。这将 把工作推给生产者,而客户端在等待另一个值的时候可以做些其他事情。因此,目标是实现 类似于的异步版本的一个机制,可以生产一组序列值并按顺序通知消费者处理 这些值,直到序列处理完成或抛出异常。
.NET Framework从生4.0版本开始包含了接口IObservabIe
最迷人的事情是可观察的集合与LINQ是兼容的。因此我们可以使用声明式查询以异步 的方式来转换和组合这些集合。也可以使用扩展方法给Rx程序添加功能,从而具备通常的 LINQ查询的同样的功能。Rx程序也支持从所有异步编程模式(包括异步编程模型基于事 件的异步模式,以及任务并行库)转换到可观察的集合,并且支持以Rx内置方式运行异步 操作,该方式与TPL很相似。
Reacuve Extensions库是一个非常强大和复杂的工具,值得单独写本书。在本章中我 将展示最常用的场景,即如何高效地与异步事件序列工作。我们将了解ReactweExtensions 框架中的关键类型,学习以不同的方式创建和维护序列。最后,将展示如何使用Reactive Extensions来运行异步操作并管理操作选项。
将普通集合转换为异步的可观察集合
本节将展示如何使用EnumerabIe类创建可观察的集合,并且异步处理该集合。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace 将普通集合转换为异步的可观察集合
{
internal class Program
{
static void Main(string[] args)
{
foreach(int i inEnumerableEventSequence())
{
Console.Write(i);
}
Console.WriteLine();
Console.WriteLine("IEnumerable");
IObservable<int> o = EnumerableEventSequence().ToObservable();
using(IDisposable subscription=o.Subscribe(Console.Write))
{
Console.WriteLine();
Console.WriteLine("IObservable");
}
o = EnumerableEventSequence().ToObservable().SubscribeOn(TaskPoolScheduler.Default);
using(IDisposable subscription=o.Subscribe(Console.Write))
{
Console.WriteLine();
Console.WriteLine("IObservable async");
Console.ReadLine();
}
}
static IEnumerable<int>EnumerableEventSequence()
{
for(int i=0;i<10;i++)
{
Thread.Sleep(TimeSpan.FromSeconds(0.5));
yield return i;
}
}
}
}
运行结果
0123456789
IEnumerable
0123456789
IObservable
IObservable async
0123456789
工作原理
我们使用EnumerableEventSequence方法模拟了一个效率不高的可枚举的集合。然后使 用常用的foreach循环来迭代它,可以看到这一如既往的慢。等待直到所有迭代完成。
然后借助于Reactive Extensions库中的ToObservable扩展方法把可枚举的集合转换为可 观察的集合。接下来订阅该可观察集合的更新,提供console.write方法作为操作,其将在每 次更新该集合时被执行。结果我们得到了与上一个例子完全一样的行为。我们会等待所有迭 代完成,因为我们使用了主线程来订阅这些更新。
为了异步化该程序.我们使用SubscribeOn方法并提供其TPL任务池调度程序。该调度 程序将把订阅信息放置到TPL任务池中,卸除主线程的任务这可以使UI在集合更新时仍 保持响应并做些其他事情。为了检查该行为,你可以从代码中除移最后的Console.ReadLine调用。这样主线程会立即完成,从而强制所有的后台线程(包括TPL任务池工作线程)也一 起结束,我们将不会从异步集合中得到任何输出。
如果使用了任何UI框架,就只能从UI线程与U[控制器进行交互。为了实现该点,我 们应该对相应的调度程序使用ObserveOn方法。对于WindowsPresentationFoundation,我 们拥有定义在名为Rx-XAML(或ReacuveExtensionsXAML支持库)的单独的NuGet包中 的类和ObserveOnDtspatcher扩展方法。其他的平台也有相应的单独的 NuGet包。

