并行化LINQ查询
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace 并行化LINQ查询
{
internal class Program
{
static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
var query = from t inGetTypes() selectEmulateProcessing(t);
foreach(string typeName in query)
{
PrintInfo(typeName);
}
sw.Stop();
Console.WriteLine("---");
Console.WriteLine("Sequential LINQ query.");
Console.WriteLine("Time elapsed:{0}", sw.Elapsed);
Console.WriteLine("Press ENTER to continue...");
Console.WriteLine();
Console.Clear();
sw.Restart();
sw.Start();
var parallelQuery = from t in ParallelEnumerable.AsParallel(GetTypes())
selectEmulateProcessing(t);
foreach(string typeName in parallelQuery)
{
PrintInfo(typeName);
}
sw.Stop();
Console.WriteLine("---");
Console.WriteLine("Parallel LINQ query.The results are being merged on a single thread");
Console.WriteLine("Time elapsed: {0}", sw.Elapsed);
Console.ReadLine();
Console.Clear();
sw.Reset();
sw.Start();
parallelQuery = from t inGetTypes().AsParallel() selectEmulateProcessing(t);
parallelQuery.ForAll(PrintInfo);
sw.Stop();
Console.WriteLine("---");
Console.WriteLine("Paraller LINQ query. The results are being processed in parallel");
Console.WriteLine("Time elapsed:{0}", sw.Elapsed);
Console.WriteLine("Press ENTER to conintue...");
Console.ReadLine();
Console.Clear();
sw.Reset();
sw.Start();
query=from t inGetTypes().AsParallel().AsSequential()
selectEmulateProcessing(t);
foreach(var typeName in query)
{
PrintInfo(typeName);
}
sw.Stop();
Console.WriteLine("---");
Console.WriteLine("Parallel LINQ query,transformed into sequential.");
Console.WriteLine("Time elapsed:{0}", sw.Elapsed);
Console.WriteLine("Press ENTER to coninue...");
Console.ReadLine();
Console.Clear();
}
static void PrintInfo(string typeName)
{
Thread.Sleep(TimeSpan.FromMilliseconds(150));
Console.WriteLine("{0} type was printed on a thread id {1}",
typeName,Thread.CurrentThread.ManagedThreadId );
}
static string EmulateProcessing(string typeName)
{
Thread.Sleep(TimeSpan.FromMilliseconds(150));
Console.WriteLine("{0} type was processed on a thread id {1}",
typeName, Thread.CurrentThread.ManagedThreadId);
return typeName;
}
static IEnumerable<string>GetTypes()
{
return from assembly in
AppDomain.CurrentDomain.GetAssemblies()
from type in assembly.GetExportedTypes()
where type.Name.StartsWith("Web")
select type.Name;
}
}
}
运行结果
WebSocket type was processed on a thread id 11
WebResponse type was processed on a thread id 3
WebRequestMethods type was processed on a thread id 4
WebHeaderCollection type was processed on a thread id 8
WebException type was processed on a thread id 9
WebSocketException type was processed on a thread id 18
WebSocketCloseStatus type was processed on a thread id 5
WebPermissionAttribute type was processed on a thread id 10
WebSocketError type was processed on a thread id 16
WebPermission type was processed on a thread id 12
WebRequest type was processed on a thread id 13
WebClient type was processed on a thread id 6
WebExceptionStatus type was processed on a thread id 17
WebSocketContext type was processed on a thread id 14
WebUtility type was processed on a thread id 15
WebProxy type was processed on a thread id 7
WebRequestModuleElementCollection type was processed on a thread id 18
WebSocketMessageType type was processed on a thread id 11
WebSocketReceiveResult type was processed on a thread id 3
WebSocketState type was processed on a thread id 4
WebProxyScriptElement type was processed on a thread id 8
WebUtilityElement type was processed on a thread id 10
WebRequestModulesSection type was processed on a thread id 5
WebSocketContext type was printed on a thread id 1
WebRequestModuleElement type was processed on a thread id 9
WebRequest type was printed on a thread id 1
WebExceptionStatus type was printed on a thread id 1
WebResponse type was printed on a thread id 1
WebUtility type was printed on a thread id 1
WebSocketError type was printed on a thread id 1
WebSocketException type was printed on a thread id 1
WebRequestMethods type was printed on a thread id 1
WebSocketCloseStatus type was printed on a thread id 1
WebClient type was printed on a thread id 1
WebException type was printed on a thread id 1
WebProxy type was printed on a thread id 1
WebPermission type was printed on a thread id 1
WebPermissionAttribute type was printed on a thread id 1
WebSocket type was printed on a thread id 1
WebHeaderCollection type was printed on a thread id 1
WebSocketReceiveResult type was printed on a thread id 1
WebRequestModuleElementCollection type was printed on a thread id 1
WebSocketState type was printed on a thread id 1
WebRequestModulesSection type was printed on a thread id 1
WebRequestModuleElement type was printed on a thread id 1
WebUtilityElement type was printed on a thread id 1
WebSocketMessageType type was printed on a thread id 1
WebProxyScriptElement type was printed on a thread id 1
---
Parallel LINQ query.The results are being merged on a single thread
Time elapsed: 00:00:03.9786911
工作原理
当程序运行时,我们创建了一个LINQ查询,其使用反射API来查询加载到当前应用程 序域中的所有组件中名称以"Web”开头的类型。我们使用EmuIateProcessing方法模拟处理 每个项时间的延迟,并使用Printlnfo方法打印结果。我们也使用了stopwatch类来测量每个 查询的执行时间。
首先我们运行了一个通常的顺序LINQ查询。此时并没有并行化,所有任何操作都运 行在当前线程。该查询的第二版显式地使用了ParallelEnumerable类。ParallelEnumerable包 含了PLINQ的逻辑实现,并且作为IEnumerable集合功能的一组扩展方法。通常无需显式 地使用该类,在这里是为了演示PLINQ的实际工作方式。第二个版本以并行的方式运行 EmulateProcessing操作然而,默认情况下结果会被合并到单个线程中,所以查询的执行时 间应该比第一个版本少几秒。
第三个版本展示了如何使用AsParaIIel方法来将LINQ查询按声明的方式并行化运行 这里我们并不关心实现细节,只是为了说明我们想以并行的方式运行。然而,该版本的关键 不同处是我们使用了ForAII方法来打印查询结果。打印结果操作与任务被处理的线程是同一 个线程,跳过了结果合并步骤。它允许我们也能以并行的方式运行PrintInfo。方法,甚至该版 本运行速度比之前的版本更快。
最后一个例子展示了如何使用AsSequential方法将PLINQ查以顺序方式运行可以 看到该查询运行方式与第一个示例完全一样。
加整PLINQ查询的参数
本节展示了使用PLINQ查询时如何管理并行处理选项,以及查询执行时这些选项的效果。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Linq;
using System.Threading;
namespace 加整PLINQ查询的参数
{
internal class Program
{
static void Main(string[] args)
{
var paraellQuery = from t inGetTypes().AsParallel()
selectEmulateProcessing(t);
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(3));
try
{
paraellQuery.WithDegreeOfParallelism
(Environment.ProcessorCount).WithExecutionMode
(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.WithCancellation(cts.Token).ForAll(Console.WriteLine);
}
catch (OperationCanceledException)
{
Console.WriteLine("----");
Console.WriteLine("Operations has been canceled!");
}
Console.WriteLine("---");
Console.WriteLine("Unordered PLINQ query execution");
var unorderedQuery = from i in ParallelEnumerable.Range(1, 30) select i;
foreach(var i in unorderedQuery)
{
Console.WriteLine(i);
}
Console.WriteLine("---");
Console.WriteLine("Ordered PLINQ query execution");
var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered() select i;
foreach(var i in orderedQuery)
{
Console.WriteLine(i);
}
}
static string EmulateProcessing(string typeName)
{
Thread.Sleep(TimeSpan.FromMilliseconds(
new Random(DateTime.Now.Millisecond).Next(250, 350)));
Console.WriteLine("{0} type was processed on a thread id {1}", typeName, Thread.CurrentThread.ManagedThreadId);
return typeName;
}
static IEnumerable<string>GetTypes()
{
return from assembly in
AppDomain.CurrentDomain.GetAssemblies()
from type in assembly.GetExportedTypes()
where type.Name.StartsWith("Web")
orderby type.Name.Length
select type.Name;
}
}
}
运行结果
WebResponse type was processed on a thread id 7
WebResponse
WebUtility type was processed on a thread id 6
WebSocketException type was processed on a thread id 18
WebUtilityElement type was processed on a thread id 16
WebException type was processed on a thread id 8
WebSocketState type was processed on a thread id 13
WebClient type was processed on a thread id 1
WebSocketError type was processed on a thread id 12
WebRequestMethods type was processed on a thread id 15
WebPermission type was processed on a thread id 11
WebProxy type was processed on a thread id 10
WebSocket type was processed on a thread id 4
WebExceptionStatus type was processed on a thread id 17
WebHeaderCollection type was processed on a thread id 19
WebSocketContext type was processed on a thread id 14
WebSocketContext
WebUtilityElement
WebProxy
WebExceptionStatus
WebSocketException
WebSocketState
WebClient
WebSocketError
WebPermission
WebRequestMethods
WebRequest type was processed on a thread id 5
WebRequest
WebUtility
WebSocket
WebException
WebHeaderCollection
WebRequestModulesSection type was processed on a thread id 13
WebRequestModulesSection
WebSocketReceiveResult type was processed on a thread id 17
WebSocketReceiveResult
WebSocketCloseStatus type was processed on a thread id 7
WebSocketCloseStatus
WebRequestModuleElementCollection type was processed on a thread id 1
WebRequestModuleElementCollection
WebRequestModuleElement type was processed on a thread id 18
WebRequestModuleElement
WebProxyScriptElement type was processed on a thread id 16
WebProxyScriptElement
WebSocketMessageType type was processed on a thread id 14
WebSocketMessageType
WebPermissionAttribute type was processed on a thread id 10
WebPermissionAttribute
---
Unordered PLINQ query execution
1
3
5
7
9
11
13
15
17
19
21
23
25
27
29
30
2
4
6
8
10
12
14
16
18
20
22
24
26
28
---
Ordered PLINQ query execution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
工作原理
该程序演示了多个可供程序员使用的非常有用的PLINQ选项。我们先创建了一个 PLINQ查询,然后又创建了一个调整了PLINQ选项的查询。
先从取消选项开始。接受一个取消标志对象的WithCancellation方法可用于取消查询。 本例中我们三秒后发出取消标志信号,这导致查询中抛出OperauonCanceledException异常, 并且取消了剩余的工作。
然后可以为查询指定并行度这是被用于执行查询时实际并行分割数。在第一节中我们 使用了ParaIIeI.ForEach循环,其拥有最大并行度选项。该选项与本节中的不一样,这是因为 它指定了一个最大的分割值,但如果基础设施决定最好使用较少的并行度以节省资源和提高 性能,那么并行度会小于最大值。
另一个有意思的选项是使用WithExecutionMode方法来重载查执行的模式。PLINQ基 础设施如果认为并行化某查询只会增加工作量并且运行更慢,那么将会以顺序模式执行该查 询。但我们可以强制该查询以并行的方式运行。
可以使用WithMergeOptions方法调整对查询结果的处理。默认模式是PLINQ基础设施 在查询返回结果之前会缓存一定数量的结果如果查询花费了大量的时间,更合理的方式是 关闭结果缓存从而尽可能快地得到结果。
最后一个选项是AsOrdered方法。当使用并行执行时,集合中的项有可能不是被顺序 处理的。集合中稍后的项可能会比稍前的项先处理为了防止该情况,我们可以显式的对 并行查询调用AsOrdered方法,来告诉PLINQ基础设施我们打算按项在集合中的顺序来进行 处理。

