Dapr-Actor构建块
阅读原文时间:2021年11月22日阅读:1

前言:

 前篇-绑定 文章对Dapr的绑定构建块进行了解,本篇继续对 Actor 构建块进行了解学习。

一、Actor简介:

 Actors 为最低级别的“计算单元”。 换句话说,您将代码写入独立单元 ( 称为actor) ,该单元接收消息并一次处理消息,而不进行任何类型的并行或线程处理。

 当代码处理一条消息时,它可以向其他参与者发送一条或多条消息,或者创建新的 Actors。 底层 运行时 将管理每个 actor 的运行方式,时机和位置,并在 Actors 之间传递消息。

 大量 Actors 可以同时执行,而 Actors 可以相互独立执行。

 Dapr 包含专门实现 virtual actors 模式 的运行时。 通过 Dapr 的实现,您可以根据 Actors 模型编写 Dapr Actor,而 Dapr 利用底层平台提供的可扩展性和可靠性保证。

 应用场景:

  • 您的问题空间涉及大量(数千或更多) 的独立和孤立的小单位和逻辑。
  • 您想要处理单线程对象,这些对象不需要外部组件的大量交互,例如在一组 Actors 之间查询状态。
  • 您的 actor 实例不会通过发出I/O操作来阻塞调用方。

 生命周期:

  Dapr Actors 是虚拟的,意思是他们的生命周期与他们的 in - memory 表现不相关。 因此,它们不需要显式创建或销毁。 Dapr Actors 运行时在第一次接收到该 actor ID 的请求时自动激活 actor。 如果 actor 在一段时间内未被使用,那么 Dapr Actors 运行时将回收内存对象。 如果以后需要重新启动,它还将保持对 actor 的一切原有数据。

  调用 actor 方法和 reminders 将重置空闲时间,例如,reminders 触发将使 actor 保持活动状态。 不论 actor 是否处于活动状态或不活动状态 Actor reminders 都会触发,对不活动 actor ,那么会首先激活 actor。 Actor timers 不会重置空闲时间,因此 timer 触发不会使参与者保持活动状态。 Timer 仅在 actor 活跃时被触发。

  空闲超时和扫描时间间隔 Dapr 运行时用于查看是否可以对 actor 进行垃圾收集。 当 Dapr 运行时调用 actor 服务以获取受支持的 actor 类型时,可以传递此信息。

  Virtual actors 生命周期抽象会将一些警告作为 virtual actors 模型的结果,而事实上, Dapr Actors 实施有时会偏离此模型。

  在第一次将消息发送到其 actor 标识时,将自动激活 actor ( 导致构造 actor 对象) 。 在一段时间后,actor 对象将被垃圾回收。 以后,再次使用 actor ID 访问,将构造新的 actor。 Actor 的状态比对象的生命周期更久,因为状态存储在 Dapr 运行时的配置状态提供程序中(也就是说Actor即使不在活跃状态,仍然可以读取它的状态)。

二、工作原理:

 Dapr Sidecar 提供了用于调用执行组件的 HTTP/gRPC API。 这是 HTTP API 的URL格式: 

http://localhost:/v1.0/actors///

  • <daprPort>: Dapr 侦听的 HTTP 端口。

  • <actorType>:执行组件类型。

  • <actorId>:要调用的特定参与者的 ID。

     a)Actor组件放置服务流程:

  1. 启动时,Sidecar 调用执行组件服务以获取注册的执行组件类型和执行组件的配置设置。

  2. Sidecar 将注册的执行组件类型的列表发送到放置服务。

  3. 放置服务会将更新的分区信息广播到所有执行组件服务实例。 每个实例都将保留分区信息的缓存副本,并使用它来调用执行组件。

    b)调用Actor组件方法流程:

      

  4. 服务在Sidecar 上调用执行组件 API。 请求正文中的 JSON 有效负载包含要发送到执行组件的数据。

  5. Sidecar 使用位置服务中的本地缓存的分区信息来确定哪个执行组件服务实例 (分区) 负责托管 ID 为的执行组件 3 。 在此示例中,它是 pod 2 中的服务实例。 调用将转发到相应的Sidecar 。

  6. Pod 2 中的Sidecar 实例调用服务实例以调用执行组件。 服务实例激活actor(如果它还没有激活)并执行actor 方法。

三、Actor timers(定时器) 和 reminders(提醒)

 可以使用计时器和提醒来计划自身的调用。 这两个概念都支持配置截止时间。 不同之处在于回调注册的生存期:

  • 只要激活执行组件,计时器就会保持活动状态。 计时器 不会 重置空闲计时器,因此它们不能使执行组件处于活动状态。

  • 提醒长于执行组件激活。 如果停用了某个执行组件,则会重新激活该执行组件。 提醒  重置空闲计时器。

    1、Timers定时器:

    Dapr Actor 运行时确保回调方法被顺序调用,而非并发调用。 这意味着,在此回调完成执行之前,不会有其他Actor方法或timer/remider回调被执行。

      Timer的下一个周期在回调完成执行后开始计算。 这意味着 timer 在回调执行时停止,并在回调完成时启动。

      Dapr Actor 运行时在回调完成时保存对actor的状态所作的更改。 如果在保存状态时发生错误,那么将取消激活该actor对象,并且将激活新实例。

      当actor作为垃圾回收(GC)的一部分被停用时,所有 timer 都会停止。 在此之后,将不会再调用 timer 的回调。 此外, Dapr Actors 运行时不会保留有关在失活之前运行的 timer 的任何信息。 也就是说,重新启动 actor 后将会激活的 timer 完全取决于注册时登记的 timer。

    a) 创建定时器:

POST/PUT http://localhost:3500/v1.0/actors///timers/

  Timer 的 duetime 和回调函数可以在请求主体中指定。 到期时间(due time)表示注册后 timer 将首次触发的时间。 period 表示timer在此之后触发的频率。 到期时间为0表示立即执行。 负 due times 和负 periods 都是无效。

  以下请求体配置了一个 timer, dueTime 9秒, period 3秒。 这意味着它将在9秒后首次触发,然后每3秒触发一次。

{
"dueTime":"0h0m9s0ms",
"period":"0h0m3s0ms"
}

b) 删除定时器:

DELETE http://localhost:3500/v1.0/actors///timers/

2、Reminders 提醒:

  Reminders 是一种在指定时间内触发 persistent 回调的机制。 它们的功能类似于 timer。 但与 timer 不同,在所有情况下 reminders 都会触发,直到 actor 显式取消注册 reminders 或删除 actor 。 具体而言, reminders 会在所有 actor 失活和故障时也会触发触发,因为Dapr Actors 运行时会将 reminders 信息持久化到 Dapr Actors 状态提供者中。

  a) 创建Reminders

POST/PUT http://localhost:3500/v1.0/actors///reminders/

Reminders 的 duetime 和回调函数可以在请求主体中指定。 到期时间(due time)表示注册后 reminders将首次触发的时间。 period 表示在此之后 reminders 将触发的频率。 到期时间为0表示立即执行。 负 due times 和负 periods 都是无效。 若要注册仅触发一次的 reminders ,请将 period 设置为空字符串。

  以下请求体配置了一个 reminders, dueTime 9秒, period 3秒。 这意味着它将在9秒后首次触发,然后每3秒触发一次。

{
"dueTime":"0h0m9s0ms",
"period":"0h0m3s0ms"
}

b) 获取Reminders****

GET http://localhost:3500/v1.0/actors///reminders/

c) 删除Reminders****

DELETE http://localhost:3500/v1.0/actors///reminders/

四、数据持久化:

  使用 Dapr 状态管理构建块保存执行组件状态。由于执行组件可以一轮执行多个状态操作,因此状态存储组件必须支持多项事务

  当前状态管理组件支持事务/Actors支持情况:

Name

CRUD

事务

ETag

Actors

状态

组件版本

自从

Aerospike

Alpha

v1

1.0

Apache Cassandra

Alpha

v1

1.0

Cloudstate

Alpha

v1

1.0

Couchbase

Alpha

v1

1.0

Hashicorp Consul

Alpha

v1

1.0

Hazelcast

Alpha

v1

1.0

Memcached

Alpha

v1

1.0

MongoDB

GA

v1

1.0

MySQL

Alpha

v1

1.0

PostgreSQL

Alpha

v1

1.0

Redis

GA

v1

1.0

RethinkDB

Alpha

v1

1.0

Zookeeper

Alpha

v1

1.0

 需要支持Actor状态存储需添加以下内容:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:

  • name: redisHost
    value: localhost:6379
  • name: redisPassword
    value: ""
  • name: actorStateStore
    value: "true"  

五、.NET Core 实例:

 1、添加nuget包引用:Dapr.ActorsDapr.Actors.AspNetCore。

2、定义IOrderStatusActor接口,需要继承自IActor

public interface IOrderStatusActor : IActor
{
Task Paid(string orderId);
Task GetStatus(string orderId);
}

  执行组件方法的返回类型必须为 Task 或 Task<T> 。 此外,执行组件方法最多只能有一个参数。 返回类型和参数都必须可 System.Text.Json 序列化。

 3、定义OrderStatusActor实现IOrderStatusActor,并继承自Actor

public class OrderStatusActor : Actor, IOrderStatusActor
{
public OrderStatusActor(ActorHost host) : base(host)
{
}
public async Task Paid(string orderId)
{
// change order status to paid
await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");
return orderId;
}
public async Task GetStatus(string orderId)
{
return await StateManager.GetStateAsync(orderId);
}
}

 4、修改Statup.cs文件

public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
//注册Actor
services.AddActors(option =>
{
option.Actors.RegisterActor();
});
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
//
endpoints.MapActorsHandlers();
});
}

 5、添加ActorController操作Actor

[Route("api/[controller]")]
[ApiController]
public class ActorController : ControllerBase
{
private readonly IActorProxyFactory _actorProxyFactory;

public ActorController(IActorProxyFactory actorProxyFactory)  
{  
    \_actorProxyFactory = actorProxyFactory;  
}

/// <summary>  
/// 方式一:ActorProxy.Create方式  
/// </summary>  
/// <param name="orderId"></param>  
/// <returns></returns>  
\[HttpGet("paid/{orderId}")\]  
public async Task<ActionResult> PaidAsync(string orderId)  
{  
    var actorId = new ActorId("myid-" + orderId);  
    var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");  
    var result = await proxy.Paid(orderId);  
    return Ok(result);  
}

/// <summary>  
/// 方式二:依赖注入方式  
/// </summary>  
/// <param name="orderId"></param>  
/// <returns></returns>  
\[HttpGet("get/{orderId}")\]  
public async Task<ActionResult> GetAsync(string orderId)  
{  
    var proxy = \_actorProxyFactory.CreateActorProxy<IOrderStatusActor>(  
        new ActorId("myid-" + orderId), "OrderStatusActor");

    return Ok(await proxy.GetStatus(orderId));  
}  

}

 6、Timer应用:使用Actor基类的 RegisterTimerAsync 方法注册计时器:在OrderStatusActor类中新增方法

#region Timer操作

///

/// 启动Timer定时器 ///
/// 定时器名称
/// 定时器参数
///
public Task StartTimerAsync(string name, string text)
{
//注册立即执行的间隔3s执行的定时器
return RegisterTimerAsync(
name,
nameof(TimerCallbackAsync),
Encoding.UTF8.GetBytes(text),
TimeSpan.Zero,
TimeSpan.FromSeconds(3));
}

///

/// 定时器回调 ///
///
///
public Task TimerCallbackAsync(byte[] state)
{
var text = Encoding.UTF8.GetString(state);

Console.WriteLine($"Timer fired: {text}");

return Task.CompletedTask;

}

///

/// 停止定时器 ///
///
///
public Task StopTimerAsync(string name)
{
//停止计时器 UnregisterTimerAsync
return UnregisterTimerAsync(name);
}

#endregion

 7、Reminder操作:使用Actor基类的 RegisterReminderAsync 方法计划计时器。在OrderStatusActor类中新增方法

#region Reminder 操作

public Task SetReminderAsync(string text)
{
return RegisterReminderAsync(
"test-reminder",
Encoding.UTF8.GetBytes(text),
TimeSpan.Zero,
TimeSpan.FromSeconds(1));
}

///

/// Reminder触发处理(实现IRemindable接口处理触发) ///
///
///
///
///
///
public Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
{
if (reminderName == "test-reminder")
{
var text = Encoding.UTF8.GetString(state);
Console.WriteLine($"reminder fired: {text}");
}
return Task.CompletedTask;
}
#endregion

 8、启动定时器:

public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable
{
public OrderStatusActor(ActorHost host) : base(host)
{
//注册Timer
StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();

    //设置Reminder  
    SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();  
}  
//其他处理逻辑  

}

总结:

 Dapr 执行组件构建基块可以更轻松地编写正确的并发系统。 执行组件是状态和逻辑的小单元。 它们使用基于轮次的访问模型,无需使用锁定机制编写线程安全代码。 执行组件是隐式创建的,在未执行任何操作时以无提示方式从内存中卸载。 重新激活执行组件时,自动持久保存并加载执行组件中存储的任何状态。 执行组件模型实现通常是为特定语言或平台创建的。 但是,借助 Dapr 执行组件构建基块,可以从任何语言或平台利用执行组件模型。

 Actor组件支持计时器和提醒来计划将来的工作。 计时器不会重置空闲计时器,并且允许执行组件在未执行其他操作时停用。 提醒会重置空闲计时器,并且也会自动保留。 计时器和提醒都遵守基于轮次的访问模型,确保在处理计时器/提醒事件时无法执行任何其他操作。

 使用 Dapr 状态管理构建基块 持久保存执行组件状态。 支持多项事务的任何状态存储都可用于存储执行组件状态。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章