工作流引擎Temporalio介绍

August 3, 2023

Temporalio 简介

简介

引用wiki来介绍工作流引擎的定义:

A workflow engine manages and monitors the state of activities in a workflow, such as the processing and approval of a loan application form, and determines which new activity to transition to according to defined processes (workflows). The actions may be anything from saving an application form in a document management system to sending a reminder e-mail to users or escalating overdue items to management. A workflow engine facilitates the flow of information, tasks, and events. Workflow engines may also be referred to as Workflow Orchestration Engines.

Temporal是一个使用golang开发分布式的工作流引擎。Temporal 是原 Cadence 联合创始人离开 Cadence 后基于其 Fork的新分支,从某种程度上来讲,Temporal 可能是 Cadence 的一个更商业化的版本,毕竟后者原先只是 Uber 内部的一个组件。在2022年初Temporal已经是估值15亿美元的独角兽企业。

基本概念

  • Temporal System:任务调度平台,负责workflow任务存储、调度等。
  • Worker Process:Worker Process为任务的执行环境,负责具体任务的执行,任务类型包括workflow和Activity两种。
    • work process启动后需要连接到Temporal服务端,并自行注册需要支持的任务类型及名称、队列等信息。
    • A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Cluster with the results. # worker会一直循环从预定义好的队列中取task并执行后返回结果。
  • workflow & Activity:
    • Workflow由多个Activities编排组成,每个Activity只执行一个单一、已定义好的动作,比如调用外部服务、文件转码、发送邮件,可以被各种类型的Workflow复用。
    • 在Go中Workflow通常就是一个函数,接收workflow Context和输入input参数。

使用方法

使用官方demo来本地演示下Temporalio的基本使用方法。demo演示了一次完整的转账操作,包括容错处理。

  • 安装及启动Temporalio
    • 本地安装(linux)
    Loading...
    • 启动Temporalio服务,可以通过ui-ip参数指定本地ip,默认为localhost
    Loading...
  • 通过SDK实现workflow及activity,代码见官方demo
    • MoneyTransfer:workflow的具体实现
      • 实现了一个从source到target账号的转账操作。先从source中取(对应Withdraw activity),然后转给target(对应Deposit activity实现)。
      • 如果deposit失败,为保证事务的原子性,需要回滚前一步的Withdraw操作(Refund activity实现)
      • 入参:PaymentDetails定义,通常将入参封装在一个struct中,后续在新增参数时会更加方便
    • activity定义:3个activity分别实现见withdraw、deposit和refund函数。activity的参数定义与workflow并没有本质的区别。
    • 重试逻辑:workflow会自动重试,也可以通过RetryPolicy参数指定重试逻辑。注意:
    By default, Temporal retries failed Activities forever, but you can specify some errors that Temporal should not attempt to retry
  • 启动workflow
    • 两种方法可以启动workflow:SDK或者命令行工具(tctl)
    • 代码中已有sdk的视线:start/main.go中使用sdk启动workflow。指定参数(input、队列名称等)以及workflow名称,然后等待结果返回即可。
    • 启动workflow
    Loading...
  • 查看workflow状态

    启动后就可以在任务面板中看到workflow的基础和详细信息了。

    可见workflow虽然在runing状态,但是并没有被执行,因为还没有对应的worker来承担具体的执行工作。只有worker启动并连接到对应的task queue之后workflow才会被执行。

  • 启动worker执行task
Loading...

启动worker后可以观察到之前启动的workflow任务被立即执行了,并打印了输出。面板中的workflow状态也变成了Completed。

明细中可以看到有两类task分别被执行:

  • WorkflowTask:编排activity的任务
  • ActivityTask:执行具体动作的任务,此处分别为Withdraw、Deposit。

总结

可见temporalio本质上是将一个workflow拆分为多个可复用的activity,通过消息队列的方式分发到多个work中去调度,这样可以达到代码以及执行资源上的复用,对于业务流程复杂、多变的场景比较合适,比如DevOps流水线、数据分析pipeline等场景,需要根据各种业务需求来自定义特殊处理流程并部署,如果使用工作流引擎来编排各个activity可以快速的满足需求。

但是一个workflow的执行过程会被拆解到多个activity,并在分布式环境中去调度,会有多次的RTT操作以及序列化等操作,已定会有一定的性能损耗。对流量大、时延要求高微服务是否适合还没有去深入研究验证。另外对外服务也要考虑资源冲突等带来的影响。

相关参考

See all postsSee all posts