使用c#的 async/await编写 长时间运行的基于代码的工作流的 持久任务框架
阅读原文时间:2023年07月08日阅读:2

持久任务框架 (DTF) 是基于async/await 工作流执行框架。工作流的解决方案很多,包括Windows Workflow Foundation,BizTalk,Logic Apps, Workflow-CoreElsa-Core。最近我在Dapr 的仓库里跟踪工作流构建块的进展,DTFx 正好是.NET开发的,所以对他多了几分关注,以前没有深入进去看看,现在我觉得是值得推荐给大家的一个工作流方案,它足够轻量级,而且非常简单,依赖很少。

持久任务框架是一个开源框架,它为 .NET 平台中的工作流即代码提供了基础。GitHub上:https://github.com/Azure/durabletask

它有两个主要组件:业务流程和任务。业务流程“编排”应用程序逻辑,以内联方式执行自定义代码并调用任务。自定义业务流程派生自 TaskOrchestration自定义任务派生自 TaskActivity

推荐大家从这两个仓库可用来学习和生产使用。

Microsoft.Extensions.Hosting包装器: https://github.com/jviau/durabletask-hosting

持久任务框架扩展: https://github.com/lucaslorentz/durabletask-extensions

我们一起来看下持久任务框架的Hello world: 代码来自https://github.com/jviau/durabletask-hosting 的 DurableTask.Samples:

这个非常简单的业务流程“GreetingsOrchestration”,有两个称为任务“GetUserTask”,它执行名称提示和“SendGreetingTask”,它将问候语写入控制台。

GreetingsOrchestration 派生自 TaskOrchestration 并具有调用 GetUserTask 和 SendGreetingTask 的 RunTask 方法。

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

///

/// A task orchestration for greeting a user.

///

public class GreetingsOrchestration : TaskOrchestration

{
     ///
     public override async Task RunTask(OrchestrationContext context, string input)
     {
         string user = await context.ScheduleTask(typeof(GetUserTask));
         string greeting = await context.ScheduleTask(typeof(SendGreetingTask), user);
         return greeting;
     }

}

GetUserTask 派生自 TaskActivity 并实现了 Execute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

///

/// A task activity for getting a username from console.

///

public class GetUserTask : TaskActivity

{
     private readonly IConsole _console;

///

     /// Initializes a new instance of the class.      ///
     /// The console output helper.
     public GetUserTask(IConsole console)
     {
         _console = console ?? throw new ArgumentNullException(nameof(console));
     }

///
     protected override string Execute(TaskContext context, string input)
     {
         _console.WriteLine("Please enter your name:");
         return _console.ReadLine();
     }

}

SendGreetingTask 派生自 TaskActivity 并实现了 Excute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

///

/// A task for sending a greeting.

///

public sealed class SendGreetingTask : AsyncTaskActivity

{
     private readonly IConsole _console;

///

     /// Initializes a new instance of the class.      ///
     /// The console output helper.
     public SendGreetingTask(IConsole console)
     {
         _console = console ?? throw new ArgumentNullException(nameof(console));
     }

///
     protected override async Task ExecuteAsync(TaskContext context, string user)
     {
         string message;
         if (!string.IsNullOrWhiteSpace(user) && user.Equals("TimedOut"))
         {
             message = "GetUser Timed out!!!";
             _console.WriteLine(message);
         }
         else
         {
             _console.WriteLine("Sending greetings to user: " + user + "…");
             await Task.Delay(5 * 1000);
             message = "Greeting sent to " + user;
             _console.WriteLine(message);
         }

return message;
     }

}

上面的这个例子非常基础,我们在项目中要把它用起来就要用到这个扩展项目 https://github.com/lucaslorentz/durabletask-extensions。这个项目通过更多功能扩展持久任务框架,并使其更易于使用,目前还在开发过程中,尚未达到投入生产的程度。包含了下列这些功能,让你在任何地方都可以运行。

  • 更多定义存储功能的接口
  • 依赖注入集成
  • EF Core MySql/PostgreSQL/SqlServer storages
  • 分布式工作线程:允许在多个工作线程中拆分业务流程/活动实现
  • 通过 GRPC 协议进行间接存储访问:将您的存储选择和配置集中在单个组件中。
  • 用户界面
  • BPMN 运行器

示例文件夹中,您可以找到经典书籍《飞行、汽车、酒店》的实现,其中包含补偿问题。

该示例旨在演示具有以下组件的微服务体系结构:

  • 服务器:连接到存储并将其公开为 GRPC 终结点。
  • 应用程序接口:公开 REST API 以管理业务流程。
  • 用户界面:公开用于管理业务流程的 UI。
  • 业务流程工作线程:为给定问题实现BookParallelBookSquential业务流程。
  • 飞行工作人员:实施预订航班和取消航班活动。
  • 车夫:实施“预订汽车”和“取消汽车”活动。
  • 酒店工作人员:实施预订酒店和取消酒店活动。
  • BPMNWorker:一个建立在持久任务之上的实验性 BPMN 运行器。对于给定的问题,还有BookParallelBookSequentialBPMN 工作流。