.NET并发编程-TPL Dataflow并行工作流
阅读原文时间:2021年05月09日阅读:1

本系列学习在.NET中的并发并行编程模式,实战技巧

本小节了解TPL Dataflow并行工作流,在工作中如何利用现成的类库处理数据。旨在通过TDF实现数据流的并行处理。

TDF Block

数据流由一个一个的块组成,一个块处理完毕后链接到下一个块上。每一个块以消息的形式接收和缓存来自一个或多个源的数据,当接收到信息时,块通过将其行为应用于输入来作出反应,块的输出将传递到下一个块中。

TDF并不是作为.NET4.5框架的一部分分发,需要单独安装,用过nuget导入Microsoft.Tpl.Dataflow。4.5之上在System.Threading.Tasks.Dataflow类库中。TDF提供了一组丰富的组件(块),用于基于进程内消息传递语义来组合数据流和管道基础设施。

TDF最常用的块是标准的BufferBlock、ActionBlock和TransformBlock。它们每个都基于一个委托,该委托可以是匿名函数的形式,用于定义要计算的工作。

BufferBlock

BufferBlock是一个很好的工具,用于启用和实现异步生产者/消费者模式,其中内部的消息队列可以由多个源写入或从多个目标读取。保证先进先出的顺序。

以下展示基于TDF BufferBlock的生产者消费者模式

BufferBlock<int>&nbsp;buffer&nbsp;=&nbsp;new&nbsp;BufferBlock<int>();&nbsp;async&nbsp;Task&nbsp;Producer(IEnumerable<int>&nbsp;values){&nbsp;&nbsp;&nbsp;&nbsp;foreach&nbsp;(var&nbsp;value&nbsp;in&nbsp;values)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;await&nbsp;buffer.SendAsync(value);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;buffer.Complete();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}async&nbsp;Task&nbsp;Consumer(Action<int>&nbsp;process){&nbsp;&nbsp;&nbsp;&nbsp;while&nbsp;(await&nbsp;buffer.OutputAvailableAsync())&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;process(await&nbsp;buffer.ReceiveAsync());&nbsp;&nbsp;&nbsp;}public&nbsp;async&nbsp;Task&nbsp;Run(){&nbsp;&nbsp;&nbsp;&nbsp;IEnumerable<int>&nbsp;range&nbsp;=&nbsp;Enumerable.Range(0,&nbsp;100);&nbsp;&nbsp;&nbsp;&nbsp;await&nbsp;Task.WhenAll(Producer(range),&nbsp;Consumer(n&nbsp;=>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Console.WriteLine($"value&nbsp;{n}")));}

IEnumerable值的条目通过buffer.Post方法发送到BufferBlock缓冲区,并使用buffer.ReceiveAsync方法异步检索它们。OutputAvailableAsync方法用于当下一个条目准备好可被检索时发出通知。

TransformBlock

用于映射转换,该转换函数以委托Func的形式作为参数传递

给定一组地址下载图片为例

var&nbsp;fetchImageFlag&nbsp;=&nbsp;new&nbsp;TransformBlock<string,&nbsp;(string,&nbsp;byte[])>(&nbsp;&nbsp;&nbsp;&nbsp;async&nbsp;urlImage&nbsp;=>&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;using&nbsp;(var&nbsp;webClient&nbsp;=&nbsp;new&nbsp;WebClient())&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;byte[]&nbsp;data&nbsp;=&nbsp;await&nbsp;webClient.DownloadDataTaskAsync(urlImage);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;(urlImage,&nbsp;data);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;});List<string>&nbsp;urlFlags&nbsp;=&nbsp;new&nbsp;List<string>{&nbsp;&nbsp;&nbsp;&nbsp;"Italy#/media/File:Flag_of_Italy.svg",&nbsp;&nbsp;&nbsp;&nbsp;"Spain#/media/File:Flag_of_Spain.svg",&nbsp;&nbsp;&nbsp;&nbsp;"United_States#/media/File:Flag_of_the_United_States.svg"&nbsp;&nbsp;&nbsp;&nbsp;};foreach&nbsp;(var&nbsp;urlFlag&nbsp;in&nbsp;urlFlags)&nbsp;&nbsp;&nbsp;&nbsp;fetchImageFlag.Post($"https://en.wikipedia.org/wiki/{urlFlag}");

TransformBlock<string, (string, byte[]) 块以元组字符串和字节数组格式来提取标记图像。转换得到字节数组对象后,此处还没有消费使用。下面通过另一个块组合将其保存到本地。

ActionBlock

通过名称就可以看出,该块用于接收数据时调用一个委托去处理。因为它没有输出,所以通常用于工作流的结束节点上。

前面通过转换块将图片地址下载转换成了字节数组,下面通过ActionBlock将其持久化本地。

var&nbsp;saveData&nbsp;=&nbsp;new&nbsp;ActionBlock<(string,&nbsp;byte[])>(async&nbsp;data&nbsp;=>{&nbsp;&nbsp;&nbsp;&nbsp;(string&nbsp;urlImage,&nbsp;byte[]&nbsp;image)&nbsp;=&nbsp;data;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;string&nbsp;filePath&nbsp;=&nbsp;urlImage.Substring(urlImage.IndexOf("File:")&nbsp;+&nbsp;5);&nbsp;&nbsp;&nbsp;&nbsp;await&nbsp;Agents.File.WriteAllBytesAsync(filePath,&nbsp;image);&nbsp;});fetchImageFlag.LinkTo(saveData);&nbsp;&nbsp;&nbsp;&nbsp;

ActionBlock块实例化传递给构造函数的参数可以是委托Action或Func。后者对每个消息输入异步执行内部操作。最后ActionBlock块saveData使用LinkTo扩展方法连接到前面的TransformBlock块上。通过这种方式,TransformBlock生成的输出会在可用时被立即推送到ActionBlock中。

最后粘贴一下File的扩展方法,用于异步读写文件。

public&nbsp;static&nbsp;class&nbsp;File{&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;static&nbsp;async&nbsp;Task<string[]>&nbsp;ReadAllLinesAsync(string&nbsp;path)&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;using&nbsp;(var&nbsp;sourceStream&nbsp;=&nbsp;new&nbsp;FileStream(path,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;FileMode.Open,&nbsp;FileAccess.Read,&nbsp;FileShare.None,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;bufferSize:&nbsp;4096,&nbsp;useAsync:&nbsp;true))&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;using&nbsp;(var&nbsp;reader&nbsp;=&nbsp;new&nbsp;StreamReader(sourceStream))&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;fileText&nbsp;=&nbsp;await&nbsp;reader.ReadToEndAsync();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;fileText.Split(new[]&nbsp;{&nbsp;Environment.NewLine&nbsp;},&nbsp;StringSplitOptions.None);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;static&nbsp;async&nbsp;Task&nbsp;WriteAllTextAsync(string&nbsp;path,&nbsp;string&nbsp;contents)&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;byte[]&nbsp;encodedText&nbsp;=&nbsp;Encoding.Unicode.GetBytes(contents);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;await&nbsp;WriteAllBytesAsync(path,&nbsp;encodedText);&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;static&nbsp;async&nbsp;Task&nbsp;WriteAllBytesAsync(string&nbsp;path,&nbsp;byte[]&nbsp;bytes)&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;using&nbsp;(var&nbsp;sourceStream&nbsp;=&nbsp;new&nbsp;FileStream(path,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;FileMode.Append,&nbsp;FileAccess.Write,&nbsp;FileShare.None,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;bufferSize:&nbsp;4096,&nbsp;useAsync:&nbsp;true))&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;await&nbsp;sourceStream.WriteAsync(bytes,&nbsp;0,&nbsp;bytes.Length);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;};&nbsp;&nbsp;&nbsp;&nbsp;}}

ending

第一次做人,何不痛痛快快,潇潇洒洒,讨好自己

工作认认真真的完成,生活充充实实的过着