- Published on
go-workflows: Durable Workflows in Go
- Authors
- Name
- Christopher Schleiden
- @cschleiden
Lately I've been getting interested in "durable workflows" as implemented by Azure's Durable Task Framework (DTFx), the basis for Azure Durable Functions and Temporal. To really understand how a library or a framework works, what problems it solves, and why certain decisions were made the way they were made, I often tend to try to re-implement my own version, even if only with minimal functionality.
So for durable workflows I started to build go-workflows, a not quite production-ready mix of Durable Tasks and Temporal written in Go. It started as a number of very small experiments over the holidays, but it kind of snowballed from there, and I'm approaching something that's quite usable. For the next few posts I'll try to explain what go-workflows
is and how it works.
The library is still a work in progress so some of the details and the example code might've changed by now although I'll try to keep everything in sync. The general concepts should still be valid, though.
What are durable workflows?
In modern software we often have to orchestrate calls to many different services to fulfill requests. If it's all synchronous and the number of downstream services is small it might be possible to manage this manually. But when operations become more complex and it's not enough to make a couple quick remote procedure calls to fulfill a task, it gets complicated. When you have to orchestrate long-running tasks across various services, maybe waiting a long time for results, soon keeping the code reliable becomes a burden. What happens if requests are interrupted in the middle? What happens if services crash during execution? What happens to in-flight orchestrations when a new version of the orchestrating service needs to get deployed quickly?
You can deal with all of that by manually persisting and reading state, using queues, retries, etc. Unfortunately, all of this is complex, requires a lot of code, and soon it's hard to follow the original business logic amid all the code to try to make this reliable.
Both DTFx as well as Temporal are workflow engines that try to abstract this problem and allow you to write code that mostly looks like ordinary code that's executed sequentially and focuses only on the business logic, while still dealing with crashing processes, dynamically scaling services, and so on. Their core premise is the same, but they have different approaches for solving this.
Core Concepts
Both frameworks differentiate between two kinds of code they can execute:
- workflows (Temporal) / orchestrations (DTFx)
- activities
Workflows are long-running, durable, and their main task is to orchestrate activities. While workflows mostly look like ordinary, sequential code, there are some differences and rules you have to follow, more on that later.
Activities are plain code, they can have side-effects, can use any language or library features.
Inversion of execution
How do these frameworks execute workflows in a durable way? They use event sourcing for recording an event history of everything that can happen in a workflow. This allows them to replay these events to get back to a previous state in case of a crash, or if a worker is re-balanced.
Looking at a very simple example workflow:
func OrderWorkflow(ctx workflow.Context, item string) error {
workflow.ExecuteActivity(ctx, Process, item).Get(ctx, nil)
workflow.ExecuteActivity(ctx, SendConfirmation, item).Get(ctx, nil)
return nil
}
func Process(ctx context.Context, item string) error {
// ...
fmt.Println(item)
}
func SendConfirmation(ctx context.Context, item string) error {
// ...
fmt.Println("Confirmation for:", item)
}
We receive an item string
as an input, and then schedule two activities Process
, and SendConfirmation
, before we return and end the workflow.
The key is that we call Process
and SendConfirmation
not directly, but we do it indirectly by passing them to: workflow.ExecuteActivity
. What happens at runtime is that a workflow engine starts executing OrderWorkflow
. When it gets to the first ExecuteActivity
call (and the blocking Get
to get its result), it pauses the execution of the workflow and instead of calling the activity code directly, records an ActivityScheduled
event in the workflow event history. So once we reach that point of the workflow the history will look like this:
# | Name | Attributes | Executed |
---|---|---|---|
1 | WorkflowExecutionStarted | OrderWorkflow | x |
2 | ActivityScheduled | Process | x |
Now the framework will execute the Process
activity somewhere. It might run in the same process as the workflow, or it might be on another machine. In any case, assuming it succeeds its result is also written to the event history:
# | Name | Attributes | Executed |
---|---|---|---|
1 | WorkflowExecutionStarted | OrderWorkflow | x |
2 | ActivityScheduled | Process | x |
3 | ActivityCompleted | some-output |
Now the workflow execution can be continued. If it's still in memory we can just execute the ActivityCompleted
event, which will unblock the
workflow.ExecuteActivity(ctx, Process, item).Get(ctx, nil)
👆🏻
.Get
call and make the activity result available to the workflow code. In our example we are not interested in the result of the activity, so we just continue to the next. Again we schedule an activity, block the workflow, and wait for the activity result.
# | Name | Attributes | Executed |
---|---|---|---|
1 | WorkflowExecutionStarted | OrderWorkflow | x |
2 | ActivityScheduled | Process | x |
3 | ActivityCompleted | some-output | x |
2 | ActivityScheduled | SendConfirmation |
Eventually we complete the workflow and can record its result. I'll go over a more detailed example later.
Durable Task Framework (DTFx)
Azure's Durable Task Framework makes use of .NET C#'s async
/await
support to allow developers to write persistent workflows, which it calls orchestrations in C#.
It's consumed as a library, with various options for the backend implemented in what it calls providers. The original provider uses Azure's Service Bus, but recent ones are available utilizing Azure Storage, Azure ServiceFabric, or Microsoft's SQL Server either hosted as Azure SQL, or the on-premise product.
When looking at DTFx I've focused on the usage with a database, i.e. the SQL Server provider. In that case it's a two-tier architecture. Clients and workers - which could also be in the same process - both talk directly to the same database.
Temporal
Temporal is a fork of Uber's Cadence, and has some great documentation at https://temporal.io. It uses some of the same concepts as DTFx and was written by some of the engineers who were involved in DTFx as well.
It comes with a server with various roles that you interact with, see the documentation for a detailed introduction.
go-workflows
go-workflows
is written from scratch, but borrows concepts very liberally from both DTFx and Temporal. While both DTFx and Temporal are quite similar due to their shared heritage, go-workflows's internals for persistence align more with DTFx due to the provider concepts, while the user visible interface is more similar to Temporal's, to work around Go's indeterministic select
statement behavior, for example.
Its overall architecture is the same as DTFx that there are clients and workers, and both interface via a pluggable implementation to a backend. What DTFx calls providers I called backends . So far I've written a Sqlite and a MySQL implementation, which also makes go-workflows programs two tier architectures.
When writing workflows in C#, you have to avoid certain library, but in general you can use all language features. async
/await
and exception handling are even central to writing workflows.
For Golang this is slightly different. There are a number of language features which at not deterministic by design. For example, iterating over a map
with range
yields a pseudo-random iteration order. Also a select
statement with multiple case
s that are ready will pick one at random. This prevents us from using native channels at all.
Example workflow
To give an impression of how authoring workflows looks like, let's go over a small example:
Workflow
Workflows are written in Go code. The only exception is they must not use any of Go's non-deterministic features (select
, iteration over a map
, etc.). Inputs and outputs for workflows and activities have to be serializable:
func Workflow1(ctx workflow.Context, input string) error {
var r1, r2 int
if err := workflow.ExecuteActivity(ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx, &r1); err != nil {
panic("error getting activity 1 result")
}
log.Println("A1 result:", r1)
if err := workflow.ExecuteActivity(ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx, &r2); err != nil {
panic("error getting activity 1 result")
}
log.Println("A2 result:", r2)
return nil
}
Activities
Activities can have side-effects and don't have to be deterministic. They will be executed only once and the result is persisted:
func Activity1(ctx context.Context, a, b int) (int, error) {
return a + b, nil
}
func Activity2(ctx context.Context) (int, error) {
return 12, nil
}
Worker
The worker is responsible for executing Workflows
and Activities
, both need to be registered with it.
func runWorker(ctx context.Context, mb backend.Backend) {
w := worker.New(mb, nil)
r.RegisterWorkflow(Workflow1)
w.RegisterActivity(Activity1)
w.RegisterActivity(Activity2)
if err := w.Start(ctx); err != nil {
panic("could not start worker")
}
}
Backend
The backend is responsible for persisting the workflow events. Currently there is an in-memory backend implementation for testing, one using SQLite, and one for MySql.
b := sqlite.NewSqliteBackend("simple.sqlite")
Putting it all together
We can start workflows from the same process the worker runs in -- or they can be separate. Here we use the SQLite backend, spawn a single worker (which then executes both Workflows
and Activities
), and then start a single instance of our workflow
func main() {
ctx := context.Background()
b := sqlite.NewSqliteBackend("simple.sqlite")
go runWorker(ctx, b)
c := client.NewClient(b)
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: uuid.NewString(),
}, Workflow1, "input-for-workflow")
if err != nil {
panic("could not start workflow")
}
c2 := make(chan os.Signal, 1)
signal.Notify(c2, os.Interrupt)
signal.Notify(c2, os.Interrupt)
<-c2
}
That's it for Part 1.
- Part 1 - Durable workflows and event sourcing
- Part 2 - Cooperative scheduler and events in-depth
- Part 3 - Architecture & Backends