CQRS · Domain Driven Design

Asynchronous APIs and CloudEvents in messaging architectures

Prologue

In this article I will discuss an architectural approach I usually follow when I design Apis and distributed messaging architectures. The architecture is designed in a way to address non functional requirements like scalability and failure tolerance.

Do we need to be resilient to failures? Are the business stakeholders willing to accept to lose data in case of failure? Do we need to cope with variable peaks of workload over time?

The simple traditional way to integrate between a client application and a backend system seems to be the following (load balancer omitted as it’s a piece of infrastructure that can be set with both sync or async patterns)

This approach allows the development team to make quick steps forward giving the impression of something that is working. Then the requirements keep coming, the operations between clients and backend services increases in complexity, the different apps, features or even tenants keep growing and your architecture will become something like the following

The interaction between clients and backend services is done with synchronous calls. Even if your client gives you the impression that you can continue to do something else while waiting for the result that is still a synchronous call. The http protocol is deadly simple: make a request, wait for the response. Stop. Your clients are making http requests and are waiting for responses. If something goes wrong on the server side, the exception will crash back up to the client and the business transaction is lost. You can log that but you can’t log the full content of the transaction.

A better way to “organise” the code is to separate sections in different groups moving the code in separate repositories. Then use something like Istio or similar to expose to the clients a unified mesh of all the service. Is that something that solves the non functional requirements? I don’t think so. A service mesh could just alleviate the problems.

Asynchronous Api

The proposed solution is to process all change requests asynchronously while keep processing all read requests synchronously.

That actually reflects the proper use of the 202 Http status code as described in its documentation

The HyperText Transfer Protocol (HTTP) 202 Accepted response status code indicates that the request has been accepted for processing, but the processing has not been completed; in fact, processing may not have started yet. The request might or might not eventually be acted upon, as it might be disallowed when processing actually takes place.

developer.mozilla.org

When the complexity grows as the number of different clients or projects then the logic can be organised and scaled as follow

The consumers can be instances of a different microservice per project/context/feature. These microservices can run as Azure Functions, AWS Lambdas, GCP Functions or just services deployed in Kubernetes.

There are still issues there to be addressed. What format the messages will use? How do we scale processing? If we deal with the same internal system or database, how can we slice the logic and the related data to avoid a distributed monolith? How do we make the system failure tolerant?

CloudEvent specification: a common format for messages

As a developer, when I start exposing API to the clients following the common REST approach I end up with several http controllers having plenty of different endpoints/paths mapped per resources. That works nicely for reads but it sucks for changes.

One way I found to simplify that REST jungle is to provide a single endpoint for any change request. For multi-tenant or multi-project solutions this allows to build a platform to plug new tenants or ingestion pipelines at pace.

This single ingestion http endpoint will accept request with a payload that conforms to the CloudEvent specification.

The CloudEvent initiative is promoted by Microsoft, Google and others. The basic specification is enough to identify a set of attributes that all the change request messages should adhere to.

On the official repository there are sdks for most of the programming languages. Personally I don’t like to bind my ingestion to one of these sdks. I prefer to use the attributes from the spec and make my own routing and serialisation codes tailor made for my needs.

The clients are usually happy to structure all the change requests in that nice and clean way. Thanks for the simple and clear DataSchema field we can also hold and share the schemas with them in a consistent way.

Once the client makes a POST/PUT request to your ingestion http web api it will receive an ack response with Http Status Code 202 that the message has been received and it will be processed. The logic in our Api will only do the following:

  • make sure that the client has permissions
  • validate the payload using the schema from the DataSchema field
  • wrap the message and send it down the right ingestion queue based on the Source field

To pick the right queue you can use the Source field and map it with a connectionstring to the queue you are using like Azure Service Bus or AWS SQS. If our architecture is a multitenant platform the Source field can make our life easy to find the tenant settings in a consistent way.

Horizontal scaling with Competing Consumers pattern

It’s important to separate Commands from Events. They are both messages but with different intent.

When a client send a message with the intent of making a change that is a Command. For example BindPolicy, SubmitOrder, EnrollPatient. The result of processing that Command will be eventually an Event. For Commands, the conversation must be 1 to 1.

We need to have one authority for that Command and one source of the truth. That authority belongs to a slice of the domain that in DDD jargon is a Bounded Context. You can’t have two of these otherwise you can end up with conflicting results.

Once the processing is done, the result of it could be delivered to many interested subscribers. For events the conversation is 1 to many, Basic Pub/Sub pattern.

For the Command scenario use a message queue. For the Event scenario use a message broker.

That difference is important to let us benefit of a pattern called Competing Consumers. That pattern will work nicely wit a 1 to 1 communication. It can’t work with 1 to many. Following I will try to explain why.

A message queue like Azure Service Bus, AWS SQS FIFO (and others) will behave with the “At least once” delivery pattern. Some of these queues claim to use the stricter “At most once” but that is not necessary as long as you do your job well with idempotency. Using the “At least once” pattern, the queue will deliver a message to one of the available consumers. In other words, a message will only be delivered to one and not to all. That pattern will let you scale horizontally the processing having multiple instances of the processing component waiting for messages

The consumers in the group are identical twins and a message from the queue will be delivered to at least one of them. The next message will be delivered to another available consumer

I found quite common that software architects and teams end up using the same messaging tool for everything without adapting the decision depending on the scenario. Message brokers like Kafka, AWS Kinesys, AWS SNS, Azure Event Grid should be used to deliver events with a Publish Subscribe pattern. In that scenario, different subscribers for a given “same” Event will wait to be notified. You can’t scale here as all the subscribers are receiving all the messages. These systems should not be used to process commands but only to order messages or fan-out events. With Kafka for example you have to split the ingestion in multiple partitions to spread across multiple consumers. That is an infrastructure overhead that should be avoided.

Following is a view with the different tools that should be used for Commands and Events

Failure Tolerance

Message queue tools provide a simple and valuable pattern to make the system resilient to failures. Failures can happen all the time for many different reasons. To mention some: underlying system maintenance, network connectivity issues, wrong settings, wrong data.

The last one from this short list could be the sign that the client is sending messages using the wrong schema. In that case you can check the error, fix the problem in the integration code and reprocess the message or manually delete it from the DLQ if the fix is not possible.

All the other problems are usually transient and it is easy to implement a simple function that automatically reprocess the messages from the DLQ at intervals.

Design the system with the DLQ pattern in mind is critical to be resilient to failures. This pattern needs to be combined with other concepts like Idempotency and Eventual Consistency. We should keep the reprocessing function as simple as possible.

CQRS and how to handle the reads synchronously

CQRS is a pattern introduced by Greg Young quite a while ago used to separate the write model containing the results of our processing from read models used for querying and reporting.

When we apply the proposed architecture using an asynchronous Api one of the first objection that we will receive is… our clients need to read the result of the operation immediately.

That is a wrong angle to look at the requirement and has led a lot of developers down to the wrong path.

Let’s try to visualize the flow of our message processing. First the client sends the Command to our Api. Our Api checks that the client has permissions, eventually validates the payload, shuffle the message down the queue and returns a 202 Accepted Ack.

On the other side of the queue, a serverless function (or whatever is hosting your processing logic) is triggered with the message and process it. Once done, it raises a Domain Event with the result of the processing.

This whole flow can take a short time or a long time depending on the operations but it doesn’t matter that you do it asynchronously. The time that it takes is the same. Let’s say that most of the time it takes few seconds or less.

When the Domain Event is published, we can deliver it to any interested subscriber like a little program that is waiting for events to keep up-to-date the read model in near real time. There we are with an event driven architecture with separation between write and read models. The client is able to retrieve the result of the operation in near real time polling a GET endpoint using the ID of the operation. If the client is a UI we can display a spinner while we pull, waiting for the result in the same way we do with synchronous Apis.

The Read-Model plays the role of a local cache

Imagine that the internal system or database is down and the commands are falling into the DLQ or waiting to be processed.

The clients are still able to retrieve data. It’s then up to our Ops team to make the system back up and running and process the messages in the shortest amount of time possible to then refresh the read model.

Depending of our use case, we can often benefit in keep track of the past events. With an event-sourced system we can apply the past events and rebuild or reshape Read Models. That is a modern data-centric solution: be able to provide the current state in any way the requirements need.

An event-sourced system will also solve the problem of long running processes or saga. These are not rare use cases.

Imagine the need to onboard a patient with a series of steps. These steps can take days and on intervals actions need to be taken like sending notifications or others. This whole process can last for weeks. Instead of saving the current state of the patient enrolment would it not be better to keep track of all the events?

Another use case can be when you need to integrate something with several separate systems and we need to keep track of where we are with those specific integrations.

Instead of just deliver the Domain Event with a bus, save it in an EventStore in order to keep track of the transitions for each of your entities

In these cases, Event Sourcing patterns can help to build a time machine that will let you manage the long running processes in a solid and monitored way. That will also let you build and re-build the Read Models with all the data the business needs to see what is going on on reporting dashboards in near real time.

There are different databases that can be used to store streams of events. These are usually defined as time series, no-sql, immutable, append only databases. My favourite tool for this pattern is Event Store but there are others in the arena trying to do this job.

One thought on “Asynchronous APIs and CloudEvents in messaging architectures

Comments are closed.