go-workflows: Durable Workflows in Go Part 1

Lately I’ve been getting interested in “durable workflows” as implemented by Azure’s Durable Task Framework (DTFx), the basis for Azure Durable Functions and Temporal. To really understand how a library or a framework works, what problems it solves, and why certain decisions were made the way they were made, I often tend to try to re-implement my own version, even if only with minimal functionality.

So for durable workflows I started to build go-workflows, a not quite production-ready mix of Durable Tasks and Temporal written in Go. It started as a number of very small experiments over the holidays, but it kind of snowballed from there, and I’m approaching something that’s quite usable. For the next few posts I’ll try to explain what go-workflows is and how it works.

The library is still a work in progress so some of the details and the example code might’ve changed by now although I’ll try to keep everything in sync. The general concepts should still be valid, though.

What are durable workflows?

In modern software we often have to orchestrate calls to many different services to fulfill requests. If it’s all synchronous and the number of downstream services is small it might be possible to manage this manually. But when operations become more complex and it’s not enough to make a couple quick remote procedure calls to fulfill a task, it gets complicated. When you have to orchestrate long-running tasks across various services, maybe waiting a long time for results, soon keeping the code reliable becomes a burden. What happens if requests are interrupted in the middle? What happens if services crash during execution? What happens to in-flight orchestrations when a new version of the orchestrating service needs to get deployed quickly?

You can deal with all of that by manually persisting and reading state, using queues, retries, etc. Unfortunately, all of this is complex, requires a lot of code, and soon it’s hard to follow the original business logic amid all the code to try to make this reliable.

Both DTFx as well as Temporal are workflow engines that try to abstract this problem and allow you to write code that mostly looks like ordinary code that’s executed sequentially and focuses only on the business logic, while still dealing with crashing processes, dynamically scaling services, and so on. Their core premise is the same, but they have different approaches for solving this.

Core Concepts

Both frameworks differentiate between two kinds of code they can execute:

  • workflows (Temporal) / orchestrations (DTFx)
  • activities

Workflows are long-running, durable, and their main task is to orchestrate activities. While workflows mostly look like ordinary, sequential code, there are some differences and rules you have to follow, more on that later.

Activities are plain code, they can have side-effects, can use any language or library features.

Inversion of execution

How do these frameworks execute workflows in a durable way? They use event sourcing for recording an event history of everything that can happen in a workflow. This allows them to replay these events to get back to a previous state in case of a crash, or if a worker is re-balanced.

Looking at a very simple example workflow:

func OrderWorkflow(ctx workflow.Context, item string) error {
	workflow.ExecuteActivity(ctx, Process, item).Get(ctx, nil)

	workflow.ExecuteActivity(ctx, SendConfirmation, item).Get(ctx, nil)

	return nil
}

func Process(ctx context.Context, item string) error {
	// ...
	fmt.Println(item)
}

func SendConfirmation(ctx context.Context, item string) error {
	// ...
	fmt.Println("Confirmation for:", item)
}

We receive an item string as an input, and then schedule two activities Process, and SendConfirmation, before we return and end the workflow.

The key is that we call Process and SendConfirmation not directly, but we do it indirectly by passing them to: workflow.ExecuteActivity. What happens at runtime is that a workflow engine starts executing OrderWorkflow. When it gets to the first ExecuteActivity call (and the blocking Get to get its result), it pauses the execution of the workflow and instead of calling the activity code directly, records an ActivityScheduled event in the workflow event history. So once we reach that point of the workflow the history will look like this:

#NameAttributesExecuted
1WorkflowExecutionStartedOrderWorkflowx
2ActivityScheduledProcessx

Now the framework will execute the Process activity somewhere. It might run in the same process as the workflow, or it might be on another machine. In any case, assuming it succeeds its result is also written to the event history:

#NameAttributesExecuted
1WorkflowExecutionStartedOrderWorkflowx
2ActivityScheduledProcessx
3ActivityCompletedsome-output 

Now the workflow execution can be continued. If it’s still in memory we can just execute the ActivityCompleted event, which will unblock the

	workflow.ExecuteActivity(ctx, Process, item).Get(ctx, nil)
																							  👆🏻

.Get call and make the activity result available to the workflow code. In our example we are not interested in the result of the activity, so we just continue to the next. Again we schedule an activity, block the workflow, and wait for the activity result.

#NameAttributesExecuted
1WorkflowExecutionStartedOrderWorkflowx
2ActivityScheduledProcessx
3ActivityCompletedsome-outputx
2ActivityScheduledSendConfirmation 

Eventually we complete the workflow and can record its result. I’ll go over a more detailed example later.

Durable Task Framework (DTFx)

Azure’s Durable Task Framework makes use of .NET C#’s async/await support to allow developers to write persistent workflows, which it calls orchestrations in C#.

It’s consumed as a library, with various options for the backend implemented in what it calls providers. The original provider uses Azure’s Service Bus, but recent ones are available utilizing Azure Storage, Azure ServiceFabric, or Microsoft’s SQL Server either hosted as Azure SQL, or the on-premise product.

When looking at DTFx I’ve focused on the usage with a database, i.e. the SQL Server provider. In that case it’s a two-tier architecture. Clients and workers - which could also be in the same process - both talk directly to the same database.

Temporal

Temporal is a fork of Uber’s Cadence, and has some great documentation at https://temporal.io. It uses some of the same concepts as DTFx and was written by some of the engineers who were involved in DTFx as well.

It comes with a server with various roles that you interact with, see the documentation for a detailed introduction.

go-workflows

go-workflows is written from scratch, but borrows concepts very liberally from both DTFx and Temporal. While both DTFx and Temporal are quite similar due to their shared heritage, go-workflows’s internals for persistence align more with DTFx due to the provider concepts, while the user visible interface is more similar to Temporal’s, to work around Go’s indeterministic select statement behavior, for example.

Its overall architecture is the same as DTFx that there are clients and workers, and both interface via a pluggable implementation to a backend. What DTFx calls providers I called backends . So far I’ve written a Sqlite and a MySQL implementation, which also makes go-workflows programs two tier architectures.

When writing workflows in C#, you have to avoid certain library, but in general you can use all language features. async/await and exception handling are even central to writing workflows.

For Golang this is slightly different. There are a number of language features which at not deterministic by design. For example, iterating over a map with range yields a pseudo-random iteration order. Also a select statement with multiple cases that are ready will pick one at random. This prevents us from using native channels at all.

Example workflow

To give an impression of how authoring workflows looks like, let’s go over a small example:

Workflow

Workflows are written in Go code. The only exception is they must not use any of Go’s non-deterministic features (select, iteration over a map, etc.). Inputs and outputs for workflows and activities have to be serializable:

func Workflow1(ctx workflow.Context, input string) error {
	var r1, r2 int

	if err := workflow.ExecuteActivity(ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx, &r1); err != nil {
		panic("error getting activity 1 result")
	}

	log.Println("A1 result:", r1)

	if err := workflow.ExecuteActivity(ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx, &r2); err != nil {
		panic("error getting activity 1 result")
	}

	log.Println("A2 result:", r2)

	return nil
}

Activities

Activities can have side-effects and don’t have to be deterministic. They will be executed only once and the result is persisted:

func Activity1(ctx context.Context, a, b int) (int, error) {
	return a + b, nil
}

func Activity2(ctx context.Context) (int, error) {
	return 12, nil
}

Worker

The worker is responsible for executing Workflows and Activities, both need to be registered with it.

func runWorker(ctx context.Context, mb backend.Backend) {
	w := worker.New(mb, nil)

	r.RegisterWorkflow(Workflow1)

	w.RegisterActivity(Activity1)
	w.RegisterActivity(Activity2)

	if err := w.Start(ctx); err != nil {
		panic("could not start worker")
	}
}

Backend

The backend is responsible for persisting the workflow events. Currently there is an in-memory backend implementation for testing, one using SQLite, and one for MySql.

b := sqlite.NewSqliteBackend("simple.sqlite")

Putting it all together

We can start workflows from the same process the worker runs in – or they can be separate. Here we use the SQLite backend, spawn a single worker (which then executes both Workflows and Activities), and then start a single instance of our workflow

func main() {
	ctx := context.Background()

	b := sqlite.NewSqliteBackend("simple.sqlite")

	go runWorker(ctx, b)

	c := client.NewClient(b)

	wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
		InstanceID: uuid.NewString(),
	}, Workflow1, "input-for-workflow")
	if err != nil {
		panic("could not start workflow")
	}

	c2 := make(chan os.Signal, 1)
signal.Notify(c2, os.Interrupt)
	signal.Notify(c2, os.Interrupt)
	<-c2
}

That’s it for Part 1.


© 2022 Christopher Schleiden. All rights reserved.

Powered by Hydejack v9.1.6