Event Sourcing: Invariants spanning multiple aggregates
This post should mostly stand on its own, but if you find yourself getting lost, you can refer to the post I wrote two weeks ago for some background: Event Sourcing in Elixir.
Introduction
When you are designing your aggregate boundaries in an event sourced system, you are usually thinking about how large or small you want to make your aggregates.
What is an aggregate? In simple terms, it’s a single stack of events, usually referred to by a universal unique identifier (UUID).
Let’s think for a moment about a system in which there is only a single aggregate. In this system, all state is represented by or computed from a single stack of events. Data consistency is never a concern, since all invariants are by definition encapsulated in this one aggregate. A drawback is that all write operations to the system will have to be done consecutively. So one trade-off between large and small aggregates (putting aside the usual concerns that larger aggregates will be more complex) is between having invariants encapsulated in a single aggregate, and enabling concurrent writes.
For this reason, we usually search for ways to break up our system into smaller aggregates.
This begs the question, however, what if you want to have smaller aggregates, but that results in an invariant that will be split across two aggregates?
Example: A Model Bank
Now is a good time to introduce a concrete example, so let’s do that.
Let’s say we want to model a bank. This simplified model bank will allow the following functionality: you can create a bank account, and transfer money from one account to another.
There are a number of invariants that we want to enforce in our model bank.
- You cannot withdraw more money from an account than there is money present in the account.
- In a money transfer, an equal amount of money must be withdrawn as deposited.
There are two approaches available to us in this scenario:
- All bank accounts are grouped in one aggregate.
- Each bank account has its own aggregate.
Approach #1 has the benefit that both invariants can be encapsulated in a single aggregate, but all transactions within the system will have to be done consecutively (which will severely limit the eventual throughput of the system).
Approach #2 solves the problem of concurrent updates, but introduces the problem that invariant #2 can not be encapsulated in a single aggregate.
Let’s assume for the purposes of this blog post that approach #1 is out of the question because it would limit the throughput of the system too much.
Enforcing an invariant across aggregate boundaries
The challenge will be to enforce invariant #2 across aggregate boundaries. The most obvious solution is to simply use a transaction of sorts to ensure that both halves of the transaction either complete or fail.
defmodule BankAccountCommandHandler do
alias Model.BankAccount
def execute(command = %BankAccount.TransferAmount{}) do
reference = UUID.uuid4()
Aggregate.Repository.with_aggregate(BankAccount, command.to_account, fn(to_state) ->
:ok = Aggregate.Repository.with_aggregate(BankAccount, command.from_account, fn(from_state) ->
from_state |> BankAccount.withdraw(reference, command.amount)
end)
to_state |> BankAccount.deposit(reference, command.amount)
end)
end
end
Let’s break this apart line by line. First, we create a unique reference ID for the transaction. The function given to Aggregate.Repository.with_aggregate/3
will be run by the process representing command.to_account
. From that process, we will again call Aggregate.Repository.with_aggregate/3
to do the actual work of the transaction. The process for command.to_account
will wait for the process representing command.from_account
to finish before it continues, guaranteeing that we only deposit the amount if it has already been withdrawn. There is no invariant being checked in BankAccount.deposit/3
so we can safely complete the transaction this way.
Is it safe? Imagine someone pulls the plug on your server right before you deposit the amount to complete the transaction. You should be able to cross-reference every deposit with every withdrawal (with reference_id
) to find incomplete transactions and reverse them. It’s not perfect, but it will work.
But there is a drawback to this approach. What if two people try to transfer money at exactly the same time? Let’s imagine one transaction from A to B, and another from B to A. Let’s name our two requests Y and Z. Request Y is going to start by locking process A. Request Z is going to lock process B. Then when request Y tries to lock process B, it has to wait, up to 5s (or whatever timeout you set), because process B is already locked by request Z. It’s a deadlock, and one or both of our transfers will fail!
To illustrate this, I ran 50 transactions, 4 at a time, on 2 bank accounts, and checked the results. It was pretty ugly; of the 50 requested transfers, only 18 of them succeeded. The system was also only able to achieve a throughput of 1.24 requests per second. Increasing the concurrency only results in a higher percentage of requests failing.
A Better Solution
Instead of trying to make the transfer atomic, we can attempt to coordinate the transfer between the two aggregates.
defmodule BankAccountCommandHandler do
alias Model.BankAccount
def execute(command = %BankAccount.TransferAmount{}) do
reference = UUID.uuid4()
:ok = Aggregate.Repository.with_aggregate(BankAccount, command.from_account, fn(state) ->
state |> BankAccount.reserve(reference, command.amount)
end)
:ok = Aggregate.Repository.with_aggregate(BankAccount, command.to_account, fn(state) ->
state |> BankAccount.deposit(reference, command.amount)
end)
:ok = Aggregate.Repository.with_aggregate(BankAccount, command.from_account, fn(state) ->
state
|> BankAccount.cancel_reservation(reference)
|> BankAccount.withdraw(reference, command.amount)
end)
end
end
Using this approach, we first reserve the amount in the account from which we are withdrawing. Then we deposit it in the second bank account. Finally, we cancel the reservation and withdraw the amount for real.
There are more events in this version, but the benefit is that only one aggregate is ever locked at a time, meaning we can avoid deadlocks entirely.
Is it safe? At any point in this process, if someone pulls the plug on our server, we will be able to examine the situation and determine whether a transfer has been completed. Better yet, we don’t have to examine all past transfers, only the “reservations” from each account.
Once again I ran 50 transactions, 4 at a time, on 2 bank accounts. The results: all 50 transactions succeeded, with a throughput of 48 requests per second.
This solution also has the benefit that it’s easier to read and understand, despite the extra events.
I was wondering what would happen if I cranked up the number of concurrent requests. So I ran it again with 800 transfers, 100 at a time. Results: all 800 transactions succeeded, throughput of 90 requests per second.
Final Thoughts
It should be pretty obvious that the second solution can deal with simultaneous requests in a much better way. Whether this is a useful pattern to apply in your own code will depend on whether you expect to see simultaneous requests in your application. I can tell you that this is not a problem for Jortt, where usually only one person per organization is doing bookkeeping at a time.
On the one hand, it’s never bad to be prepared for the worst, but on the other hand, if the naive approach works 99.999% of the time, maybe it’s good enough?
But it’s always good to know that there are clean solutions that work under high load for enforcing invariants across aggregate root boundaries.