本系列学习在.NET中的并发并行编程模式,实战技巧
本小节了解TPL Dataflow并行工作流,在工作中如何利用现成的类库处理数据。旨在通过TDF实现数据流的并行处理。
数据流由一个一个的块组成,一个块处理完毕后链接到下一个块上。每一个块以消息的形式接收和缓存来自一个或多个源的数据,当接收到信息时,块通过将其行为应用于输入来作出反应,块的输出将传递到下一个块中。
TDF并不是作为.NET4.5框架的一部分分发,需要单独安装,用过nuget导入Microsoft.Tpl.Dataflow。4.5之上在System.Threading.Tasks.Dataflow类库中。TDF提供了一组丰富的组件(块),用于基于进程内消息传递语义来组合数据流和管道基础设施。
TDF最常用的块是标准的BufferBlock、ActionBlock和TransformBlock。它们每个都基于一个委托,该委托可以是匿名函数的形式,用于定义要计算的工作。
BufferBlock是一个很好的工具,用于启用和实现异步生产者/消费者模式,其中内部的消息队列可以由多个源写入或从多个目标读取。保证先进先出的顺序。
以下展示基于TDF BufferBlock的生产者消费者模式
BufferBlock<int> buffer = new BufferBlock<int>(); async Task Producer(IEnumerable<int> values){ foreach (var value in values) await buffer.SendAsync(value); buffer.Complete(); }async Task Consumer(Action<int> process){ while (await buffer.OutputAvailableAsync()) process(await buffer.ReceiveAsync()); }public async Task Run(){ IEnumerable<int> range = Enumerable.Range(0, 100); await Task.WhenAll(Producer(range), Consumer(n => Console.WriteLine($"value {n}")));}
IEnumerable值的条目通过buffer.Post方法发送到BufferBlock缓冲区,并使用buffer.ReceiveAsync方法异步检索它们。OutputAvailableAsync方法用于当下一个条目准备好可被检索时发出通知。
用于映射转换,该转换函数以委托Func
给定一组地址下载图片为例
var fetchImageFlag = new TransformBlock<string, (string, byte[])>( async urlImage => { using (var webClient = new WebClient()) { byte[] data = await webClient.DownloadDataTaskAsync(urlImage); return (urlImage, data); } });List<string> urlFlags = new List<string>{ "Italy#/media/File:Flag_of_Italy.svg", "Spain#/media/File:Flag_of_Spain.svg", "United_States#/media/File:Flag_of_the_United_States.svg" };foreach (var urlFlag in urlFlags) fetchImageFlag.Post($"https://en.wikipedia.org/wiki/{urlFlag}");
TransformBlock<string, (string, byte[]) 块以元组字符串和字节数组格式来提取标记图像。转换得到字节数组对象后,此处还没有消费使用。下面通过另一个块组合将其保存到本地。
通过名称就可以看出,该块用于接收数据时调用一个委托去处理。因为它没有输出,所以通常用于工作流的结束节点上。
前面通过转换块将图片地址下载转换成了字节数组,下面通过ActionBlock将其持久化本地。
var saveData = new ActionBlock<(string, byte[])>(async data =>{ (string urlImage, byte[] image) = data; string filePath = urlImage.Substring(urlImage.IndexOf("File:") + 5); await Agents.File.WriteAllBytesAsync(filePath, image); });fetchImageFlag.LinkTo(saveData);
ActionBlock块实例化传递给构造函数的参数可以是委托Action或Func
最后粘贴一下File的扩展方法,用于异步读写文件。
public static class File{ public static async Task<string[]> ReadAllLinesAsync(string path) { using (var sourceStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.None, bufferSize: 4096, useAsync: true)) using (var reader = new StreamReader(sourceStream)) { var fileText = await reader.ReadToEndAsync(); return fileText.Split(new[] { Environment.NewLine }, StringSplitOptions.None); } } public static async Task WriteAllTextAsync(string path, string contents) { byte[] encodedText = Encoding.Unicode.GetBytes(contents); await WriteAllBytesAsync(path, encodedText); } public static async Task WriteAllBytesAsync(string path, byte[] bytes) { using (var sourceStream = new FileStream(path, FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) { await sourceStream.WriteAsync(bytes, 0, bytes.Length); }; }}
第一次做人,何不痛痛快快,潇潇洒洒,讨好自己
工作认认真真的完成,生活充充实实的过着
手机扫一扫
移动阅读更方便
你可能感兴趣的文章