In this post we will explore how we can use some of Elixir’s more interesting features to implement event sourcing. OTP is the star of this show, enabling us to do some very nice tricks that would be practically impossible with Ruby. OTP stands for Open Telecom Platform, and is Erlang’s answer to the problem of concurrency. OTP relies on immutability of data to provide some very nice abstractions that make working with it very easy.

I will assume that the reader is familiar with OTP, Elixir syntax, and Event Sourcing. If not, I would recommend at least skimming the following links:

Elixir Getting Started - Pattern Matching

What is OTP?

Elixir Getting Started - Agent

Greg Young - CQRS and Event Sourcing - Code on the Beach 2014

General approach

If you are building an event-sourced system in Ruby, the usual way you would process a command is like this:

  1. read all events from an aggregate into memory (possibly use of snapshotting will limit this)
  2. build up internal aggregate state
  3. evaluate write request based on internal state (apply invariants)
  4. write new events to disk
  5. process events (update the view model)

With OTP, we can reduce steps 1 and 2 (which are by far the most expensive parts of the process for large aggregates), reducing the entire operation from O(N) to O(1) for an aggregate that has already been loaded into memory.

This approach also eliminates the “optimistic locking” that you usually see in the DB layer in event sourced solutions. With optimistic locking you generally accept that some requests will fail if they are in a race against one other. With this approach, they will all succeed (unless there is application logic that prevents that).

This is possible if we build our system with the following guidelines:

  • aggregate state is encapsulated in a process, one process per aggregate
  • all computation related to a particular aggregate occurs inside that aggregate’s process
  • aggregate repository maintains a cache of aggregate processes and controls access to them

Event Store

Let’s dive right into the code.

I’ve chosen Redis to back my event store for demonstration purposes but it should be trivial to write an event store using any other storage system. An event store is on its most basic level just a key-value store, with every key pointing to a stack of events.

The EventStore stores events in the event store, and notifies listeners via EventHandler (this will come back later). It has only two functions. commit will add events to a particular aggregate, identified by its uuid. load simply loads events from a particular aggregate.

#lib/event_store.ex
defmodule EventStore do
  @event_store RedisEventStore

  def commit(uuid, events) when is_list(events) do
    @event_store.commit(uuid, (events |> Enum.map(&EventSerializer.serialize/1)))

    notify(uuid, events)
  end
  def commit(uuid, event), do: commit(uuid, [event])

  def load(uuid), do: @event_store.load(uuid) |> Enum.map(&EventSerializer.deserialize/1)

  defp notify(_uuid, []), do: :ok
  defp notify(uuid, [event | events]) do
    EventHandler.handle({uuid, event})
    notify(uuid, events)
  end
end

We also need a serializer to convert our events from structs to strings and back again.

#lib/event_store.ex
defmodule EventSerializer do
  def serialize(event) do
    event |> Map.from_struct |> Map.merge(%{__struct__: to_string(event.__struct__)}) |> Poison.encode!
  end

  def deserialize(serialized_event) do
    serialized_event |> Poison.decode!(keys: :atoms) |> decode()
  end

  def decode(event_map) do
    new_event = event_map |> Enum.reduce(%{}, fn ({key, val}, acc) -> Map.put(acc, key, val) end)
    [Map.fetch!(new_event, :__struct__)] |> Module.concat() |> struct(new_event)
  end
end

Finally, the bit of code that writes and reads from Redis. If you want to implement an event store using another back-end, this is the code you’ll have to adapt.

#lib/event_store.ex
defmodule RedisEventStore do
  def commit(_uuid, []), do: nil
  def commit(uuid, [event|events]) do
    {:ok, _} = Redix.command(:redix, ["RPUSH", uuid, event])
    commit(uuid, events)
  end

  def load(uuid) do
    {:ok, result} = Redix.command(:redix, ~w(LRANGE #{uuid} 0 -1))
    result
  end
end

Note that we are not making use of OTP here. As you will see in a moment, we will isolate each aggregate into its own process, which will provide us with the consistency guarantee that we need.

We have made use of the pattern matching that Elixir affords us to deal with lists of events, and check responses of function calls.

Event Handlers

The event handler is unremarkable. It has only one function, handle, which dispatches an event to multiple handlers. Again we are making use of pattern matching to help us process the list of handlers.

# lib/event_handler.ex
defmodule EventHandler do
  @handlers [CounterEventHandler]

  def handle(_, []), do: :ok
  def handle({uuid, event}, [handler | handlers] \\ @handlers) do
    :ok = handler.handle(event)
    handle({uuid, event}, handlers)
  end
end

The event handler for a simple counter might look like this:

# lib/counter.ex
defmodule CounterEventHandler do
  alias Model.Counter

  def handle(%Counter.Created{aggregate_id: aggregate_id}) do
    %Eslixir.Counter{aggregate_id: aggregate_id, count: 0}
    |> Eslixir.Repo.insert

    :ok
  end

  def handle(%Counter.Incremented{aggregate_id: aggregate_id, count: count}) do
    import Ecto.Query, only: [from: 2, update: 2, where: 2]

    from(c in Eslixir.Counter, where: c.aggregate_id == ^aggregate_id)
    |> Eslixir.Repo.update_all(set: [count: count])

    :ok
  end

  def handle(_), do: :ok
end

Note the last function definition: def handle(_), do: :ok. This is necessary to prevent Elixir from complaining that a particular event handler doesn’t have a function definition for a particular event. So we simply return :ok for all events not otherwise handled by a particular event handler.

Aggregate

The aggregate is the first part of our system that will be built using OTP. It’s important for our strategy to ensure that there is only ever one process running per aggregate. In order to do this, we are making use of Registry, which ensures that there is only one process alive per model / aggregate_id combination. This eliminates race conditions and guarantees data consistency.

#lib/aggregate_root.ex
defmodule Aggregate do
  use GenServer

  def with_aggregate(model, aggregate_id, function) do
    :ok = start_link(model, aggregate_id)
    via_tuple(model, aggregate_id) |> GenServer.call(function)
  end

  def start_link(model, aggregate_id) do
    name = via_tuple(model, aggregate_id)

    case GenServer.start_link(__MODULE__, {model, aggregate_id}, name: name) do
      {:ok, _} ->
        GenServer.cast(name, :finish_init)
        :ok
      {:error, {:already_started, _}} ->
        :ok
    end
  end

  defp via_tuple(model, aggregate_id) do
    {:via, Registry, {:aggregate_repository, {model, aggregate_id}}}
  end

  # callbacks
  #
  def init({model, aggregate_id}) do
    {:ok, {model, %{aggregate_id: aggregate_id}}}
  end

  def handle_cast(:finish_init, {model, state}) do
    events = if state.aggregate_id, do: EventStore.load(state.aggregate_id), else: []

    {:noreply, {model, state_from_events(model, events) || nil}}
  end

  def handle_call(function, _from, {model, state}) do
    case function.({state, []}) do
      {_, {:error, reason}} ->
        {:reply, {:error, reason}, {model, state}}
      {new_state, new_events} ->
        EventStore.commit(new_state.aggregate_id, Enum.reverse(new_events))
        {:reply, :ok, {model, new_state}}
    end
  end

  defp state_from_events(model, events) do
    events
    |> Enum.reduce(nil, fn(event, state) -> model.next_state(event, state) end)
  end
end

The aggregate instance is also a GenServer, capturing the state of the aggregate, and ensuring data consistency by ensuring that all writes for a particular aggregate get executed in a single process. This eliminates race conditions present in typical CRUD applications (which may have multiple “read -> compute new state -> write” processes computing at once).

In start_link, we send an asynchronous message to the aggregate to tell it to finish its own initialization. The aggregate must read its events from the event store and build up its state before it is ready to do any computation. start_link is always run in the Aggregate.Repository process, but we want EventStore.load to execute in the Aggregate.Instance process. This will reduce the amount of computation done in Aggregate.Repository, improving its throughput. Aggregate.Repository is a bottleneck in the system, since every write request will be using it.

Because we are writing this in a functional language, appending to lists is an expensive O(N) operation, while prepending to lists is O(1). This is due to immutability and the structure of linked lists. So while it may seem logical to append new events, we will actually prepend them and then reverse the order later with Enum.reverse.

If the model returns {:error, reason}, then the state reverts to the old state (or more accurately is simply not updated), and nothing is written to the event store. Only when the result of function is a valid new state, does the state get updated and the new events get written to the event store.

In order to have as many aggregates loaded in memory as possible it makes sense to limit the amount of state that will be tracked to only that which is needed to perform business logic. Data that is not used in invariants should not be tracked in the State struct of an aggregate.

The function Aggregate.with_aggregate/3 is worth mentioning. It takes a model, aggregate id, and function as arguments. It calls GenServer.call with the via tuple (which routes the call through Registry) and function as arguments. This sends the function to the aggregate process, which then executes it. This approach eliminates race conditions and guarantees that our data will always be consistent.

Of course, if you run this in a production environment, sooner or later you are going to run out of memory as your system loads more and more aggregates into memory. Setting an upper limit on the number of processes would be a good idea. There are many strategies possible when deciding which processes to kill when you reach the upper limit, but that’s far outside of the scope of this post.

Aggregate Model

#lib/counter.ex
defmodule Model.Counter do
  defmodule State, do: defstruct [:aggregate_id, :count]

  defmodule Create, do: defstruct [:aggregate_id]
  defmodule Increment, do: defstruct [:aggregate_id]

  defmodule Created, do: defstruct [:aggregate_id]
  defmodule Incremented, do: defstruct [:aggregate_id, :count]

  def create({state, new_events}, aggregate_id) do
    if(state) do
      {state, {:error, "Already created"}}
    else
      {state, new_events} |> apply_event(%Created{aggregate_id: aggregate_id})
    end
  end

  def increment({state, new_events}) do
    {state, new_events} |> apply_event(%Incremented{aggregate_id: state.aggregate_id, count: state.count + 1})
  end

  def next_state(%Created{aggregate_id: aggregate_id}, _), do: %State{aggregate_id: aggregate_id, count: 0}
  def next_state(%Incremented{aggregate_id: aggregate_id, count: count}, state), do: %State{state | count: count}

  defp apply_event({state, {:error, reason}}, _), do: {state, {:error, reason}}
  defp apply_event({state, new_events}, event) do
    { next_state(event, state), [event | new_events] }
  end
end

This model is defining a simple counter. Every model has a State struct, that’s what’s going to be held in memory in the Aggregate.Instance process. There are also some commands defined here (Create, Increment) and some events (Created, Incremented).

Model.Counter.create/2 applies Created, but only if the counter has not already been created.

Model.Counter.increment/2 applied Incremented, and increases the count by 1.

Model.Counter.next_state is used to translate events to state. This is used both when the aggregate is being loaded from the event store, and when a new event is applied to the aggregate. This way the state is always kept up to date.

Model.Counter.apply_event/3 simply coordinates setting the new state and prepending the new event.

Command Service

The command service will handle dispatching a command to the right command handler.

#lib/command_service.ex
defprotocol CommandService do
  def execute(command)
end

CommandService is defined using a protocol (since we only want a command to be processed once). Each command will have its own implementation of the CommandService protocol.

#lib/counter.ex
defimpl CommandService, for: Model.Counter.Create do
  alias Model.Counter
  def execute(command) do
    Aggregate.with_aggregate(Counter, command.aggregate_id, fn(state) ->
      state |> Counter.create(command.aggregate_id)
    end)
  end
end

defimpl CommandService, for: Model.Counter.Increment do
  alias Model.Counter
  def execute(command) do
    Aggregate.with_aggregate(Counter, command.aggregate_id, fn(state) ->
      state |> Counter.increment
    end)
  end
end

One execute function can also describe multiple instructions. These are chainable, so the following code is also possible:

def execute(%Counter.Increment2{}) do
  Aggregate.with_aggregate(Counter, aggregate_id, fn(state) ->
    state
    |> Counter.increment
    |> Counter.increment
  end)
end

Conclusion

With Elixir we can easily implement a robust event sourcing system. Thanks to OTP, we can maintain a single, lightweight process per aggregate, that is accessible to the entire Elixir runtime (even transparently across multiple servers). We can avoid the complexity of snapshotting, and increase throughput by requiring fewer database accesses (and in some cases – if an invariant is violated – none), increasing throughput. Perhaps more importantly, because our data is immutable, this can be done without locking, and concurrently across however many processes or servers we would like.

Edited 19 May 2017: Use Registry instead of homemade Aggregate.Repository and implement CommandService as a protocol.