大数跨境
0
0

C# 使用ReactiveExtensions(简介和将普通集合转换为异步的可观察集合)

C# 使用ReactiveExtensions(简介和将普通集合转换为异步的可观察集合) 知识代码AI
2025-10-29
5
导读:简介之前我们已经学习过了,有好几种方式在.NET和C#中创建异步程序。其中一个是基 于事件的异步模式,在之前的章节中已经提及过。

简介

之前我们已经学习过了,有好几种方式在.NET和C#中创建异步程序。其中一个是基 于事件的异步模式,在之前的章节中已经提及过。引人事件的初始目的是简化观察者设计模 式的实现该模式常用于实现对象间的通讯。

当我们讨论任务并行库时,我们注意到事件的主要缺点是它们不能有效地相互结合。另一 个缺点是基于事件的异步模式不能处理通知的顺序。想象一下我们有IEnurnerable 提供 给我们字符串值。然而当迭代它时,不知道每个迭代会花费多长时间。如果使用常规的foreach 或其他的同步迭代构造方式,我们将阻塞线程直到得到下一个值,这可能会导致整个处理非常 慢。这种作为客户端从生产者那里拉取值的场景被称为 基于拉取 (pull-based)的方式。

相反的方式是基于推送(push-based)的方式,即生产者通知客户端有新值要处理。这将 把工作推给生产者,而客户端在等待另一个值的时候可以做些其他事情。因此,目标是实现 类似于的异步版本的一个机制,可以生产一组序列值并按顺序通知消费者处理 这些值,直到序列处理完成或抛出异常。

.NET Framework从生4.0版本开始包含了接口IObservabIe 和IObserver 的定义,它们一起代表了异步的基于推送的集合及其客户端。它们都来自叫做Reacuve Extensions(简称Rx)的库,其由微软创建,用于使用可观察的集合来有效地构造事件序列, 以及实际上任何其他类型的异步程序。这些接口包括在.Net Framework中,但这些接口的实 现类以及所有其他机制仍单独的分布在库中。

最迷人的事情是可观察的集合与LINQ是兼容的。因此我们可以使用声明式查询以异步 的方式来转换和组合这些集合。也可以使用扩展方法给Rx程序添加功能,从而具备通常的 LINQ查询的同样的功能。Rx程序也支持从所有异步编程模式(包括异步编程模型基于事 件的异步模式,以及任务并行库)转换到可观察的集合,并且支持以Rx内置方式运行异步 操作,该方式与TPL很相似。

Reacuve Extensions库是一个非常强大和复杂的工具,值得单独写本书。在本章中我 将展示最常用的场景,即如何高效地与异步事件序列工作。我们将了解ReactweExtensions 框架中的关键类型,学习以不同的方式创建和维护序列。最后,将展示如何使用Reactive Extensions来运行异步操作并管理操作选项。

将普通集合转换为异步的可观察集合

本节将展示如何使用EnumerabIe类创建可观察的集合,并且异步处理该集合。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace 将普通集合转换为异步的可观察集合
{
    internal class Program
    {
        static void Main(string[] args)
        {
            foreach(int i inEnumerableEventSequence())
            {
                Console.Write(i);
            }

            Console.WriteLine();
            Console.WriteLine("IEnumerable");

            IObservable<int> o = EnumerableEventSequence().ToObservable();
            using(IDisposable subscription=o.Subscribe(Console.Write))
            {
                Console.WriteLine();
                Console.WriteLine("IObservable");
            }

            o = EnumerableEventSequence().ToObservable().SubscribeOn(TaskPoolScheduler.Default);
            using(IDisposable subscription=o.Subscribe(Console.Write))
            {
                Console.WriteLine();
                Console.WriteLine("IObservable async");
                Console.ReadLine();
            }
        }

        static IEnumerable<int>EnumerableEventSequence()
        {
            for(int i=0;i<10;i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(0.5));
                yield return i; 
            }
        }


    }
}

运行结果

0123456789
IEnumerable
0123456789
IObservable

IObservable async
0123456789

工作原理

我们使用EnumerableEventSequence方法模拟了一个效率不高的可枚举的集合。然后使 用常用的foreach循环来迭代它,可以看到这一如既往的慢。等待直到所有迭代完成。

然后借助于Reactive Extensions库中的ToObservable扩展方法把可枚举的集合转换为可 观察的集合。接下来订阅该可观察集合的更新,提供console.write方法作为操作,其将在每 次更新该集合时被执行。结果我们得到了与上一个例子完全一样的行为。我们会等待所有迭 代完成,因为我们使用了主线程来订阅这些更新。

为了异步化该程序.我们使用SubscribeOn方法并提供其TPL任务池调度程序。该调度 程序将把订阅信息放置到TPL任务池中,卸除主线程的任务这可以使UI在集合更新时仍 保持响应并做些其他事情。为了检查该行为,你可以从代码中除移最后的Console.ReadLine调用。这样主线程会立即完成,从而强制所有的后台线程(包括TPL任务池工作线程)也一 起结束,我们将不会从异步集合中得到任何输出。

如果使用了任何UI框架,就只能从UI线程与U[控制器进行交互。为了实现该点,我 们应该对相应的调度程序使用ObserveOn方法。对于WindowsPresentationFoundation,我 们拥有定义在名为Rx-XAML(或ReacuveExtensionsXAML支持库)的单独的NuGet包中 的类和ObserveOnDtspatcher扩展方法。其他的平台也有相应的单独的 NuGet包。


【声明】内容源于网络
0
0
知识代码AI
技术基底 机器视觉全栈 × 光学成像 × 图像处理算法 编程栈 C++/C#工业开发 | Python智能建模 工具链 Halcon/VisionPro工业部署 | PyTorch/TensorFlow模型炼金术 | 模型压缩&嵌入式移植
内容 366
粉丝 0
知识代码AI 技术基底 机器视觉全栈 × 光学成像 × 图像处理算法 编程栈 C++/C#工业开发 | Python智能建模 工具链 Halcon/VisionPro工业部署 | PyTorch/TensorFlow模型炼金术 | 模型压缩&嵌入式移植
总阅读211
粉丝0
内容366