In the previous part we developed a fake event store for our blogging application. This event store just kept all events in-memory, making it unsuitable for production use. But it did allow us to adapt our application to using an event store, and let us work out the details of the event store interface without having to worry about an actual persistence mechanism.

In this part we’ll develop an event store implementation on top of Redis, a key-value store with support for additional data structures (such as lists and hashes), publish/subscribe, and transactions. If you’re more interested in adding new application functionality, you can safely skip this part and go to part 4 – conflict resolution.

Other Parts

Running the application

So we already have an event store specification and a reference implementation. Implementing a Redis-based event store should be pretty straight forward, right? Well, it mostly is, but before we dive into the details let’s take a look at what we’ll be able to do once everything is up and running.

First, make sure you have a working Redis installation (you can use the redis-cli command to connect to Redis and check if everything is working). Redis 2.6 introduced support for server-side Lua scripts which the event store implementation uses automatically if available. Otherwise, the event store falls back to using the WATCH/MULTI/EXEC commands, so Redis 2.4 also works, although with a performance penalty when it comes to committing events.

If you use a Mac OS X with homebrew you can simple run brew install redis to install 2.4.15 or brew install redis --devel to install 2.6.0-RC5.

Now checkout the part-3 branch of the project, adopt conf/application.conf to match your Redis configuration, and start the application using play run or sbt run. Any blog posts you add, edit, or delete will now have the resulting events committed to Redis. If you restart the application the events are replayed on start-up, so you will no longer lose your data.

Start the Scala console (play console or sbt console) and paste the following Scala code (adjust the connection settings as needed).

import events._, eventstore._
val es = redis.RedisEventStore[PostEvent]("blog", "localhost", 6379)
es.publisher.subscribe(StoreRevision.Initial)(println)

This code connects to the event store and subscribes the println method. You’ll see that first all historical commits are printed, and as you use the application you’ll notice that new commits are immediately printed as well. This is one of the major differences with more traditional database backed systems: an event sourced system is open for extension and can communicate to the outside world what is going on right now, while a database based system is usually much more of a black box.

Now start a second instance of the application (play 'run 9001' or sbt 'run 9001'). This second instance also connects to the event store to replay and subscribe to the events being committed. The memory images of both instances stay synchronized as you make changes, while the Scala console keeps printing events as they are committed (make sure you close the event store before exiting the console, otherwise the process stays connected and will fail to terminate). The picture below shows the general setup:

So by using the Redis event store we can run multiple instances of the application, each running on a different server. Each commit from any of the application servers is pushed to all connected servers. This provides fault-tolerance and scalability at the application server level, and also makes it possible to upgrade application servers without affecting users (by doing rolling upgrades).

Event serialization

When data is stored in an external system we need to define some kind of serialization format. There is a wide variety of serialization formats and libraries available. For this example application we’ll use JSON. This may not be the most compact or fastest format, but is easy to use and read, which is great for this example. Play! defines the Format trait for JSON serialization. We just have to provide an implementation of this trait for each data type that we want to serialize.

Although Play! predefines various JSON format implementations for many of the standard Scala classes, it does not include any helpers to make it easy to define formats for our own types. But it is easy to supply our own helpers, which you can find in the JsonMapping object.

The objectFormat methods are used to define the JSON format for case classes such as PostContent:

implicit val PostContentFormat: Format[PostContent] =
  objectFormat("author", "title", "body")(PostContent.apply)(PostContent.unapply)

Here we simply list the JSON field names and the Scala generated apply and unapply methods to construct and deconstruct PostContent instances. Formats for class hierarchies like PostEvent are a bit more difficult to get right, but easy enough to define with the typeChoiceFormat helper:

implicit val PostEventFormat: Format[PostEvent] = typeChoiceFormat(
  "PostAdded" -> objectFormat("postId", "content")(PostAdded.apply)(PostAdded.unapply),
  "PostEdited" -> objectFormat("postId", "content")(PostEdited.apply)(PostEdited.unapply),
  "PostDeleted" -> objectFormat("postId")(PostDeleted.apply)(PostDeleted.unapply))

This may not be quite as convenient as using a reflection based serialization library, but avoids having to deal with various JVM class loader problems and other reflection related troubles.

The formats are defined as implicit values in the companion objects of the classes that we want to serialize so that the Scala compiler can find these formats and automatically pass them as parameters where needed, so that we don’t have to. (Yes, the type “checker” is filling in the blanks for you.)

Writing tests for correct serialization and deserialization is rather tedious, but very important. Fortunately, ScalaCheck makes writing these kind of tests almost trivial:

"Post events" should {
  "convert to and from JSON" in forAll(eventsForMultiplePosts.arbitrary) { events =>
    Json.fromJson[List[PostEvent]](Json.toJson(events)) must_== events
  }

  "parse example Post Added event" in {
    val event = PostAdded(PostId(UUID.fromString("5ab11526-477b-43b9-8fe6-4bb25a3dfcc6")), PostContent(author = "Author", title = "Title", body = "Body"))
    val json = """{"type":"PostAdded","data":{"postId":"5ab11526-477b-43b9-8fe6-4bb25a3dfcc6","content":{"author":"Author","title":"Title","body":"Body"}}}"""

    Json.fromJson[PostEvent](Json.parse(json)) must_== event
  }
}

For the serialization tests I typically test using a few examples, but the main testing happens by defining the property fromJson(toJson(events)) == events. ScalaCheck will randomly generate data to verify that this property holds, so we can be confident serialization works as expected.

The Redis event store implementation

The Redis event store implementation uses the Jedis client library. For reading and committing we use a JedisPool connection pool. To use a connection and return it to the pool we’ve implemented the loan pattern in the withJedis method:

class RedisEventStore[Event]
  (name: String, host: String, port: Int, config: Config)
  (implicit val eventFormat: Format[Event])
extends EventStore[Event] {

  val jedisPool = new JedisPool(config, host, port)

  def withJedis[A](f: Jedis => A): A = {
    val jedis = jedisPool.getResource
    try {
      f(jedis)
    } finally {
      jedisPool.returnResource(jedis: BinaryJedis)
    }
  }

  // [...]
}

Reading events

To store the commits we’ll use two Redis data structures:

  1. A Redis hash that maps commit ids to serialized commits. The commit id is equal to the commits store revision. We use a hash instead of a list as Redis lists are O(n) when accessing an arbitrary element, while hashes provide O(1) random element access.
  2. One Redis list for each event stream. Each element in the list is a commit id.

This means that if we want to read all past commits we simply need to read each hash value starting with commit id 1 up to the commit id equal to the size of the hash. The code for this can be found in the reader implementation of RedisEventStore.scala:

  override object reader extends CommitReader[Event] {
    override def storeRevision = withJedis { jedis =>
      StoreRevision(jedis.hlen(CommitsKey))
  }

  override def readCommits(since: StoreRevision, to: StoreRevision) = {
    val current = storeRevision
    if (since >= current) Stream.empty else {
      val revisionRange = (since.value + 1) to (to.value min current.value)
      doReadCommits(revisionRange.map(_.toString))
    }
  }

The current store revision is equal to the size of the commits hash (HLEN). The commits are all read in order using the doReadCommits method:

  val KeyPrefix = name + ":"
  val CommitsKey: String = KeyPrefix + "commits"

  def doReadCommits(commitIds: Seq[String]): Stream[Commit[Event]] = {
    val chunks = commitIds.grouped(ChunkSize).map(_.toArray)
    chunks.flatMap { chunk =>
      val serializedCommits = withJedis { _.hmget(CommitsKey, chunk: _*) }
      serializedCommits.asScala.par.map(deserializeCommit)
    }.toStream
  }

  def deserializeCommit(serialized: String) =
    Json.fromJson[Commit[Event]](Json.parse(serialized))

The list of commit ids to read is first split into chunks, each at most ChunkSize (10,000) elements in size. Each chunk is then read using a single get multiple values from hash (HMGET) command. By reading many commits at a time we greatly reduce the number of process context switches. The commits are then deserialized using the deserializeCommit method, which is just a simple wrapper around the Play! JSON deserializer. Since deserialization is a CPU intensive process we use Scala’s parallel collections (par) to get a nice speed-up on a multi-core machine.

The resulting commits are then transformed into a lazy Stream. This way we do not have to read all the commits into memory at once, but we’ll read them on-demand, one chunk at a time.

Committing

There are two implementations of the EventCommitter interface. The RedisWatchMultiExecEventCommitter uses the Redis WATCH/MULTI/EXEC commands and is compatible with Redis 2.4 and higher. But it’s slower and more complicated than the RedisLuaEventCommitter that we’ll describe below.

The RedisLuaEventCommitter uses a Lua script to atomically commit events in Redis. The script is fairly similar to the commit implementation of the FakeEventStore:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
trait RedisLuaEventCommitter[Event] { this: RedisEventStore[Event] =>
  val TryCommitScript: String = """
  | local commitsKey = KEYS[1]
  | local streamKey = KEYS[2]
  | local timestamp = tonumber(ARGV[1])
  | local streamId = ARGV[2]
  | local expected = tonumber(ARGV[3])
  | local events = ARGV[4]
  |
  | local actual = tonumber(redis.call(‘llen’, streamKey))
  | if actual ~= expected then
  | return {‘conflict’, tostring(actual)}
  | end
  |
  | local storeRevision = tonumber(redis.call(‘hlen’, commitsKey))
  | local commitId = storeRevision + 1
  | local commitData = string.format(‘{"storeRevision":%d,"timestamp":%d,"streamId":%s,"streamRevision":%d,"events":%s}’,
  | commitId, timestamp, cjson.encode(streamId), actual + 1, events)
  |
  | redis.call(‘hset’, commitsKey, commitId, commitData)
  | redis.call(‘rpush’, streamKey, commitId)
  | redis.call(‘publish’, commitsKey, commitData)
  |
  | return {‘commit’, tostring(commitId)}
  """.stripMargin

  // [...]
}

The RedisLuaEventCommitter must be mixed into the RedisEventStore class, so we specify this using a self type (line 1). This gives us access to all RedisEventStore methods.

The first section of the Lua script reads the command arguments (line 3-8). The second section checks if the expected stream revision matches the actual stream revision. If not, a conflict is returned (line 10-13).

Otherwise the current store revision is determined. The commit id is set to the next store revision and the serialized commit JSON is generated (line 15-18). Then three commands are executed to:

  1. Store the serialized commit in the commits hash (line 20).
  2. The commitId is appended to the commits of the affected stream (line 21).
  3. The serialized commit is published to all subscribers (line 22). The name of the publication channel is the same as the key for the commits hash, but this is a rather arbitrary implementation detail.

When starting the event store the above Lua script is uploaded to Redis (using SCRIPT LOAD) and the resulting script identifier (the SHA-1 hash of the script contents) is saved:

val TryCommitScriptId = withJedis { _.scriptLoad(TryCommitScript) }

The implementation of the tryCommit is now quite straightforward. We simple need to invoke the Lua script using the EVALSHA command and translate the results to Scala:

object committer extends EventCommitter[Event] {
  override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] = {
    // Prepare parameters.
    val timestamp = DateTimeUtils.currentTimeMillis
    val serializedEvents = Json.stringify(Json.toJson(Seq(event))(Writes.seqWrites(eventFormat)))

    // Invoke Lua script.
    val response = withJedis { _.evalsha(TryCommitScriptId, 2,
      /* KEYS */ CommitsKey, keyForStream(streamId),
      /* ARGV */ timestamp.toString, streamId, expected.value.toString, serializedEvents)
    }

    // Parse response.
    try {
      response.asInstanceOf[java.util.List[_]].asScala match {
        case Seq("conflict", actual: String) =>
          val conflicting = reader.readStream(streamId, since = expected)
          Left(Conflict(streamId, StreamRevision(actual.toLong), expected, conflicting))
        case Seq("commit", storeRevision: String) =>
          Right(Commit(StoreRevision(storeRevision.toLong), timestamp, streamId, expected.next, Seq(event)))
      }
    } catch {
      case e: Exception =>
        throw new EventStoreException("Error parsing response from Redis TryCommit script: " + response, e)
    }
  }
}

Subscriptions

Redis has built-in support for publish/subscribe, which we will use to notify each subscriber when new events are committed. But before we actually subscribe to Redis, we first need to replay historical commits. The reason to not subscribe before replaying is that replaying might take some time (if you have many historical commits) and we do not want any new commits to queue-up while we’re still processing the older commits:

override object publisher extends CommitPublisher[Event] {
  import reader._

  override def subscribe(since: StoreRevision)(listener: CommitListener[Event]): Subscription = {
    @volatile var cancelled = false
    @volatile var last = since
    val unsubscribeToken = UUID.randomUUID.toString

    executor.execute(new Runnable {
      private def replayCommitsTo(to: StoreRevision) {
        if (last < to) {
          Logger.info("Replaying commits since " + last + " to " + to)
          readCommits(last, to).takeWhile(_ => !closed && !cancelled).foreach(listener)
          last = to
        }
      }
      // [...]
    }
  }
}

The last variable contains the store revision of the last commit we passed to the listener so far. It is initialized to the since parameter. We then start a new thread to perform the actual commit replay and Redis subscription. This is consistent with our fake event store implementation.

The cancelled flag is used to help with unsubscribing when the subscription is cancelled. In addition to this, the unsubscribeToken is sent to the subscriber as well so that the subscriber wakes up when required.

After replaying (line 7 below) the historical commits we perform the actual subscription using a new Redis connection (line 12):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
override def run {
  val currentRevision = storeRevision
  if (last > currentRevision) {
    Logger.warn("Last " + last + " is in the future, resetting it to current " + currentRevision)
    last = currentRevision
  } else {
    replayCommitsTo(currentRevision)
  }

  val jedis = new Jedis(host, port)
  try {
    jedis.subscribe(Subscriber, ControlChannel, CommitsKey)
  } finally {
    jedis.disconnect
  }
}

The Subscriber object implements the JedisPubSub interface:

object Subscriber extends JedisPubSub {

When the subscription is confirmed we check if we missed any commits that happened while we were performing the initial replay, but before completing the subscription:

  override def onSubscribe(channel: String, subscribedChannels: Int) = channel match {
    case ControlChannel =>
      // We may have missed the cancellation token while subscribing, so check the flag.
      if (closed || cancelled) unsubscribe
    case CommitsKey =>
      // We may have missed some commits while subscribing, so replay missing if needed.
      replayCommitsTo(storeRevision)
    case _ =>
      Logger.warn("message received on unknown channel ‘" + channel + "’")
  }

Once we’re subscribed we’ll receive a notification for each commit:

  override def onMessage(channel: String, message: String) = channel match {
    case ControlChannel =>
      if (message == CloseToken || message == unsubscribeToken) {
        unsubscribe
      }
    case CommitsKey =>
      val commit = deserializeCommit(message)
      if (last.next < commit.storeRevision) {
        Logger.warn("missing commits since " + last + " to " + commit.storeRevision + ", replaying...")
        replayCommitsTo(commit.storeRevision)
      } else if (last.next == commit.storeRevision) {
        listener(commit)
        last = commit.storeRevision
      } else {
        Logger.warn("Ignoring old commit " + commit.storeRevision + ", since we already processed everything up to " + last)
      }
    case _ =>
      Logger.warn("message received on unknown channel ‘" + channel + "’")
  }

We deserialize and forward any new commit to the listener. In some case we may receive old commits, which we ignore. In case we miss any commits, we replay the missing ones. The control channel is monitored for event store closing or subscription cancellation messages.

This completes the implementation of the Redis event store. For full details, check the eventstore.redis package.

Setup and configuration

Now that we have multiple event store implementations we need to provide some configuration options. This is done in the controllers.Global object in the onStart method. The actual configuration can be found in the conf/application.conf file.

Since our event store exposes a blocking API (for ease of implementation) we also increase the Play! thread-pool sizes1.

Redis durability

The primary concern of an event store is to ensure events are written to durable storage, so no events get lost. To configure Redis for maximum reliability of written data we need to edit the redis.conf file to:

  • Turn off saving snapshots by commenting out all the save settings.
  • Turn on appendonly mode (appendonly yes).
  • Sync to disk on every transaction by setting appendfsync always.
  • (Optional) disable automatic rewriting of the append-only file (auto-aof-rewrite-percentage 0). We’ll only ever be adding new data to Redis, so rewriting will not result in a smaller append-only file anyway.

Performance

To test and compare the performance of the various event store implementations the following setup was used:

  • The application and Redis run on deler (dutch for denominator), a 2010 dual-core 2.66GHz Core i7 with hyper-threading and a 7200-RPM HDD. The server was started using the start-server.sh script.
  • The performance test client runs on noemer (dutch for numerator), a 2012 quad-core 2.6GHz Core i7 with hyper-threading. The benchmark was started using the run-benchmark.sh script with the following arguments: http://deler.local:9000 50 500000.

The server and client machine were directly connected with a 1 Gigabit ethernet cable and were otherwise idle. This setup ensures that neither the client nor the network was a bottleneck so that we can fully test the performance of the server code. Both computers were running the Java SE 7u5 JDK.

The benchmark itself consists of a warmup phase (running 20,000 iterations of adding/listing/editing/reading/deleting blog posts) to ensure the server JVM has ample opportunity to optimize the generated code, and then 50 concurrent clients completed a total of 500,000 iterations (100,000 iterations for the Redis WATCH/MULTI/EXEC event store implementation) of benchmarking. Note that this is a full-stack benchmark, including the network, HTTP protocol, request routing, the event store, and template rendering.

The results are summarized below (first number is total iterations per second, second number is the 99th percentile latency in milliseconds):

Event store None Fake Redis Lua Redis WME
Iterations 500,000 100,000
Add post (POST) 5330.5/s (47 ms) 5154.2/s (59 ms) 2617.4/s (64 ms) 583.7/s (762 ms)
Read post (GET) 8284.1/s (31 ms) 8176.1/s (32 ms) 6755.8/s (36 ms) 6641.7/s (38 ms)
Edit post (GET-POST-GET) 2145.0/s (55 ms) 1845.1/s (64 ms) 1178.7/s (96 ms) 359.5/s (511 ms)
List posts (GET) 1290.8/s (98 ms) 1259.6/s (147 ms) 1174.7/s (155 ms) 1165.5/s (150 ms)
Java memory usage 700 MB 1750 MB 700 MB
Redis memory usage N/A N/A 958 MB

As you can see there is only a slight difference between having no event store at all or the fake event store. The switch to the Lua-based Redis event store causes a much bigger change. Committing events now requires us to wait for a disk write, and also adds the overhead of process switching and commit serialization and deserialization. Still, the performance is not bad at all. When falling back to the WATCH/MULTI/EXEC instructions commit performance drops again. With the Lua implementation Redis was still able to write multiple commits to disk using a single fsync2, without Lua each commit needs to wait for a full fsync to complete due to the use of optimistic locking.

When a more complex page is rendered (listing the last 20 blog posts) the performance differences become negligible. It seems the bottleneck here is the speed at which we can render the page. I believe the Play! 2.0 template engine implementation could use a bit more optimization here.

After completing a benchmark run there are 1,060,000 events in the event store, almost 1 Gigabyte of data. How much time does it take to replay these events on startup? On deler it takes about 26 seconds to fully restore the memory image from the committed events (about 40,500 commits per second). Noemer is more than twice as fast, since it has more and faster processors to perform event deserialization, the primary bottleneck.

Summary

Now that we have a working event store implementation our application could go into production, except for the obvious lack of features. There are obviously some enhancements that could be made to the event store:

  • Automatic recovery from Redis connection failures.
  • Adding arbitrary headers to a commit. These can be used to store metadata like client IP, application server node, authenticated user, etc.
  • Performance metrics, such as latency from commit to notification. Raise alerts when latency gets to high.
  • Etc.

But for now this event store implementation is good enough.

If you feel that we needed to do a lot of work to get to this place, you’re partly right. But we did gain complete control and understanding of the basic infrastructure of our application! Compare this with having to understand SQL, database schemas, JDBC, your favorite Object-Relational Mapper, transaction managers, etc.

Of course, it still remains to be seen how easy or hard it is to add new application functionality. So in the next parts we’ll focus on adding new features, and some of the problems (with solutions) that you will quickly encounter when using event sourcing.

Footnotes:

  1. A good exercise is to modify the event store’s tryCommit method to return the result asynchronously. The implementation can then use a dedicated thread pool for performing the actual commits, leaving the Play! action thread pool available for handling web requests. 

  2. Another good exercise is to implement our own write-combining of commits. This should be pretty straightforward with the Lua event store implementation. All you need is a single, dedicated thread that continuously gathers all pending commits and sends these to Redis. Redis pipelining and the disruptor are a perfect match for this!