Dynamic Consistency Boundaries

Kill the Aggregate

In software, Consistency may refer to many things - Code Consistency, Data Consistency, User Interface Consistency, etc. In this article, I am focusing on Data Consistency. Simply put, Data Consistency ensures that data remains accurate and reliable across different parts of a system. More formally, Data Consistency is one of the Database Transaction properties (ACID) defined as: 

Consistency (or correctness) ensures that a transaction can only bring the database from one consistent state to another, preserving database invariants. 

The invariants may be enforced in various ways depending on the database system. For example, in Relational Databases, there are many ways to define consistency: constraints (primary keys, foreign keys, uniqueness, etc.), ACID properties of the transaction (Atomicity, Consistency, Isolation, Durability), referential integrity, and many more. 

The common consensus (so far) of achieving consistency in Event Stores is that event streams define the atom of consistency. Consistency across event streams is not possible. Inside the event stream, which has append-only characteristics, the consistency is ensured by preventing multiple events from being appended at the same position (index).

Event streams are commonly associated with a concept borrowed from the DDD (Domain Driven Design) - an Aggregate

An aggregate is a cluster of associated objects that we treat as a unit for the purpose of data changes.

Dynamic Consistency Boundaries (DCB) redefine the granularity level of consistency for Event Stores, moving from event streams (aggregates) to individual events. You could still say that DCB deals with dynamically defined event streams. The original application of DCB was for Event Stores; however, it might be broadened to any messaging system with an append-only nature. 

Messaging

A broad definition of messaging would be two or more participants in communication exchanging information via various means. For us, the interesting type of messaging would be a (semi)durable append-only log with pub/sub characteristics. In other words, multiple producers produce messages to the log, and self-paced consumers read from the log. 

Each message in the log has its position (a.k.a. index), uniquely identifying it. This property helps consumers with the deduplication process. The consumer takes the message with its index from the log, processes it, and remembers the index. This process can be restarted at any time, and upon restarting, it will resume reading from the log at the remembered index.
Also, suppose the message gets re-delivered to the consumer for various reasons (usually associated with the Distributed Systems fallacies). In that case, the consumer can easily deduce whether it has already processed the message (if the index of the received message is less than or equal to the last remembered index, the consumer recognizes that the message has already been processed).

Message Streams

A large message log may be divided into multiple message streams for various reasons, such as domain semantics, security (to prevent mixing messages from different domains in a single stream), scalability, etc.

Usually, the message stream is backed up by a physical concept, such as an array of files storing messages. The consistency across physical streams is not guaranteed - the client cannot write to two message streams atomically (what if writing to one stream fails? what if a rollback fails? etc.). The consistency guarantee of this type of messaging system ensures that a new message can be appended to the stream only if its index follows sequentially after the last one - without overwriting or gaps.

Once we decide which (types of) messages belong to which stream, changing this ownership comes with a price. While splitting one stream into multiple smaller ones is not a complex operation (the order of messages from the previous stream is preserved in the following streams), the merging operation is not so trivial. When merging multiple streams, we must decide on the order of messages. We could use timestamps of the messages to determine the order; however, in distributed messaging systems, this is impossible (since we cannot rely on multiple servers being aligned time-wise). Usually, a manual intervention in cooperation with the business is required. This makes message streams somewhat rigid when it comes to refactoring them.

Event Sourcing

Event sourcing, often misunderstood, is "just" a way of persistence. Instead of persisting the current state of our system, we persist it as a series of ordered events. When we need to figure out the system's current state, in order to make a decision, we "replay" historical events. Although this sounds trivial and not a big deal, employing it correctly requires quite a mind shift. 

Event Stores are the most suitable database technology for durably storing events. Essentially, an Event Store is a Messaging System with an append-only log and pub/sub (a.k.a append/read) characteristics, but with a twist. The twist is that it stores messages (events, in this case) forever. Everything we talked about Messaging Systems and Message Streams applies here as well.

Some upfront design process is required before we start implementing an event-sourced system. This process can be Event Modeling, Event Storming, or something else found useful to identify which events are essential for the system. Usually, as one of the steps in the design process, we determine consistency boundaries. We typically try to fit events in aggregates as our consistency boundary. Translating this to Event Store terms means mapping aggregates to event streams in one-to-one relationship. 
But wait. We said that streams are not convenient when changing the ownership of events. This obstacle implies that once we establish consistency boundaries (aggregates), modifying them becomes difficult after the system is deployed to production.
Issues don't stop there. Sometimes, there are business rules that cannot fit inside a single stream. This means that to maintain consistency across streams, we must rely on alternative techniques instead of aggregates, which provide eventual consistency rather than immediate consistency (process managers are one example).
Also, specific use cases require that a single event belong to multiple streams. An event may affect multiple decisions spread across numerous streams (aggregates). By definition of an event stream, this is impossible - a single event belongs to one stream only!

Does this mean streams are a poor choice for an Event Store? Not necessarily. The issue lies in the granularity of consistency, which isn't well-adjusted. Mapping aggregates to streams lacks flexibility. But what if we rethink the concept of a stream? Rather than aligning with an aggregate, a stream is better suited to a broader scope. Revisiting Eric Evans' book on DDD (Domain-Driven Design: Tackling Complexity in the Heart of Software), the Bounded Context seems like a more fitting analogy. Within a Bounded Context, we store events related to a specific sub-domain. The challenge then becomes maintaining consistency boundaries within the Bounded Context. This is where DCB comes into play, adjusting the granularity level.

Dynamic Consistency Boundaries

DCB provides a way of guaranteeing consistency during an append to the Event Store. The Event Store client reads (only) the necessary events to rehydrate the state and make a decision. The client gets events and a consistency marker as a result of reading. This consistency marker is used to maintain consistency in the Event Store. When the client decides to append new events, it'll form a transaction consisting of events to be appended, the criteria used to filter events during read, and the consistency marker. The Event Store uses the criteria to check whether the consistency marker has changed in the meantime (after reading and before appending). If the marker hasn't changed, the transaction can be successfully appended; otherwise, it will be rejected. In relational databases, this mechanism is called optimistic locking.

Although simple, the DCB mechanism might be too much to grasp without an example. Let's build a simple in-memory Event Store that supports DCB and use a simple domain to exemplify it.

Before diving into the Event Store details, let's get familiar with the terminology.

Event Store terminology
termdefinition
global sequenceEach event in the Event Store is associated with the global sequence number which determines its position in the globally ordered Event Store log. The global sequence of the first event is 0.
headThe global sequence of the next event to be appended to the Event Store.
tagSpecifies the event in more detail. It is just a key-value pair, e.g. tag{key="email", value="student@university.com"}. Event Store must store tags together with events and provide a search based on them. Usually, an Event Store indexes events based on tags for faster retrieval.
criterionBuilding part of the criteria. It is composed of tags. Between them, an AND operator is applied - for an event to meet the criterion, all its tags must match the criterion tags.
criteriaFilters out events from the Event Store. It is composed of criterions. An OR operator is applied between them - for an event to meet the criteria, at least one criterion should be satisfied.

The Event Store

In our simplistic version of the Event Store API, we support only two operations:

  1. read - reads a finite stream of events from the Event Store based on provided criteria.
  2. append - appends events at the end of the Event Store log. It accepts the consistency condition as the parameter used to check the consistency of this append.
public interface EventStore {
MarkedEvents read(Criteria criteria);
void append(List events, ConsistencyCondition condition);
}

public record MarkedEvents(long consistencyMarker, Stream events) { }

Since our implementation is entirely in memory, we will use a SortedMap to store our events. The key of this map is the global sequence of the event and the value is the event with its corresponding tags. We want to support concurrent access to the Event Store, hence, we will use the ConcurrentSkipListMap implementation. However, dealing with concurrency in this article will make the code unnecessarily polluted; hence, I'll skip it to show the essence.

SortedMap events = new ConcurrentSkipListMap<>();

read operation provides MarkedEvents – all events matching the given criteria. These events are marked with Event Store’s consistency marker. Here, the Event Store's head is used as a consistency marker. Later, while appending events, the consistency marker is used to skip the events the client uses to make the decision. This little trick will help us narrow the search space to find conflicts.

public MarkedEvents read(Criteria criteria) {
var consistencyMarker = head();
var sourced = events.values()
.stream()
.filter(event -> criteria.matches(event.tags()));
return new MarkedEvents(consistencyMarker, sourced);
}

During the append each event is described in more detail with tags that associate this event with specific concepts from the Domain. append accepts a transaction (list of events) to be appended and the ConsistencyCondition denoting consistency requirements for the append. The ConsistencyCondition is composed of a consistency marker and criteria.

The consistency marker tells the Event Store to search for events that match the given criteria after its position. If no events match the criteria after the consistency marker, the consistency condition is fulfilled, and the transaction is accepted; otherwise, it's not.

public void append(List events, ConsistencyCondition condition) {
if (!validate(condition)) {
throw new InvalidConsistencyConditionException();
}

events.forEach(e -> this.events.put(head(), e));
}

private boolean validate(ConsistencyCondition condition) {
return events.tailMap(condition.consistencyMarker())
.values()
.stream()
.noneMatch(e -> condition.matches(e.tags()));
}

The Client

The architecture of the client application is beyond the scope of this article. Therefore, we will introduce only minimal abstractions to illustrate how DCB functions in practice. Our client application consists of multiple request handlers, each responsible for constructing criteria based on the incoming request. These handlers derive their state from a sequence of events retrieved from the Event Store according to the defined criteria and then process the request. Upon handling a request, the handler generates a list of events to be appended to the Event Store. The interface defining a request handler is shown below.

public interface RequestHandler {

Criteria criteria(R request);

S initialState();

S source(Object event, S state);

List handle(R request, S state);
}

We derive the handler's state from the events fetched from the Event Store. Once the state is entirely sourced, it is safe to pass the request to the handler. The handler will return a list of events to be appended to the Event Store. The dispatcher creates a consistency condition using the consistency marker obtained from the Event Store and the same criteria used to read the events. The dispatcher appends the events to the Event Store with this consistency condition. Meanwhile, new events that match our criteria may have been appended to the Event Store. If this happens, our transaction will be rejected; otherwise, it will succeed.

How to use criteria for filtering?

Using an example is the best way to understand how criteria-based querying works. In this case, we have a Student that can subscribe to a Course. There are events for student and course creation, renaming the course, course capacity change, and an event that a student subscribes to the course. The event stream is depicted in the image below.

All these events are tagged with specific Domain Identifiers. Certain events, like a student subscribed to a course, are tagged with two tags - student and the course. This is because the event belongs to the student and course Domain concepts.

To handle the request to subscribe the student to the course, we need to source our model based on the events we are interested in. Those events depict whether the course capacity has changed and whether the student has subscribed to a course. Let's see how to form a criteria for this use-case:

anyOf(allOf(tag("eventType", "CourceCapacityChanged"),
tag("courseId", "abc")),
allOf(tag("eventType", "StudentSubscribedToCourse"),
tag("courseId", "abc")),
allOf(tag("eventType", "StudentSubscribedToCourse"),
tag("studentId", "xyz")))

Another interesting scenario would be to check whether the student subscribed to a course.

anyOf(allOf(tag("eventType", "StudentSubscribedToCourse"),
tag("studentId", "xyz"),
tag("courseId", "abc")))

Conclusion

DCB introduces a different view on the consistency in event-sourced systems. Further, DCB can be applied to any Messaging System with append-only log and pub/sub nature. Let's sum up how DCB changes the current state of event-sourced systems.

  • Reduces the number of events needed to rehydrate the system's current state by finely filtering events based on the criteria.
  • Removes the necessity of other techniques to provide consistency in the single event stream. Now, we can dynamically define the boundary of consistency by pulling the events we need to make the decision.
  • Reduces the append contention. Finely defined criteria for request handlers have less chance of conflicting since they source only the necessary events.
  • If applied without caution, the consistency boundary of particular request handlers might be too large, causing frequent conflicts. This concern is not new to event-sourced systems; it also exists with aggregates. However, with DCB, refactoring consistency boundaries is much more flexible since our event stream stays intact.
  • Removes the burden of correctly completing the initial design since refactoring the consistency boundaries is not as tricky as with aggregates.

All code samples are backed by the GtiHub repository. Note that the code presented in this article is simplified to show the essence of the DCB. Also, the code in the repository understands CQRS and deals with Command Handlers, not Request Handlers. Introducing CQRS concepts here is definitely out of the scope and doesn't add to the understanding of the DCB.

Comments

Popular posts from this blog

Optimistic State Machine Execution

Ordering in Event-Sourced Systems

Beyond Total Ordering in Event-Sourced Systems