Simple event sourcing - refactoring and transactions (part 5)
In the previous part we added blog post comment functionality. In this part we’ll do some refactoring and change the memory image implementation to automatically retry domain logic on optimistic locking conflicts, giving us a simplified form of transactions. We’ll also change the event store to support multiple types of event streams in a single event store.
Other Parts
- Part 1 – Introduction
- Part 2 – Consistency
- Part 3 – Redis Event Store
- Part 4 – Conflict Resolution
- Part 5 – Refactoring and Transactions
- Part 6 – Users, Authentication, Authorization
Code
You can find the code associated with this part at github on the part-5
branch.
Refactoring
The main changes we want to make before improving the current commit
code to handle transient conflicts is to move it out of the PostsController
,
so that we use it from other parts of the application. But the current implementation is tied directly to Post
s and PostEvent
s. So first we remove
the following dependencies from this method:
PostEvent
conflict resolution.- How to derive the
PostId
from thePostEvent
. - How to turn a
PostId
into a event stream identifier (currently just usingtoString
)
To solve the first problem we introduce a new trait ConflictsWith
:
/**
* Compares committed events against an attempted events to check for
* conflicts.
*/
trait ConflictsWith[-Event] {
/**
* Checks each committed event from `conflict` for conflicts with the `attempted` events.
* Any committed events that conflict are returned.
*/
def conflicting[A <: Event, B <: Event](conflict: Conflict[A], attempted: Seq[B]): Option[Conflict[A]]
}
object ConflictsWith {
/**
* Builds a new `ConflictsWith` based on the `conflicts` predicate.
*/
def apply[Event](conflicts: (Event, Event) => Boolean) = new ConflictsWith[Event] {
override def conflicting[A <: Event, B <: Event] (conflict: Conflict[A], attempted: Seq[B]): Option[Conflict[A]] =
conflict.filter(a => attempted.exists(b => conflicts(a.event, b)))
}
}
ConflictsWith
is a type class that defines how to resolve conflicts
for a certain type of events. By making it a separate trait (instead of a function) the Scala compiler can automatically pass implementations as an
implicit parameter where needed. The definition of the Conflict
class has
also changed slightly, to make it easier
to filter the events while preserving meta-information.
The apply
method of the ConflictsWith
companion object is helpful to create instances of ConflictsWith
based on a simple predicate. The
predicate must check if two events conflict or not. The PostEvent
conflict resolution implementation becomes:
implicit val PostEventConflictsWith: ConflictsWith[PostEvent] = ConflictsWith {
case (a: PostCommentEvent, b: PostCommentEvent) => a.commentId == b.commentId
case (_: PostCommentEvent, _) => false
case _ => true
}
We use Scala’s convenient partial function syntax to define all the
cases, without having to explicitly use the match
keyword.
To solve the second and third problem, and to support different type of event streams in a single event store, we introduce another trait EventStreamType
:
/**
* Event stream type information.
*/
trait EventStreamType[StreamId, Event] {
/**
* Convert a stream identifier to a string. Used by the event store to persist
* the stream identifier.
*/
def toString(streamId: StreamId): String
/**
* Extract stream identifier from `event`.
*/
def streamId(event: Event): StreamId
/**
* Cast `event` to the `Event` type.
*
* @throws ClassCastException if `event` is not of type `Event`.
*/
def cast(event: Any): Event
}
object EventStreamType {
def apply[StreamId, Event]
(writeStreamId: StreamId => String,
eventToStreamId: Event => StreamId)
(implicit manifest: Manifest[Event]) =
// [...]
}
This trait captures the relationship between an event stream identifier type and the type of events that can be stored in the associated stream. It
also provides methods to extract the event stream identifier from an event and to turn an event stream identifier into its string
representation. Finally it allows casting a value of any type to the type of the event. With these two traits defined we can move the commit method
from the PostsController
into the MemoryImage
class. But first we look into using the event store for different event stream types.
Multiple event stream types
Now that we’ve associated an event streams identifier type with the event type using the EventStreamType
trait, we can make our event store
contra-variant. In other words, when we have an event store of type EventStore[DomainEvent]
we can use it as an EventStore[PostEvent]
, if
PostEvent
is a subtype of DomainEvent
.
But we can only do this by changing event store methods that return events to take into account the EventStreamType
, otherwise the Scala compiler
will not allow us to treat a EventStore[DomainEvent]
as a EventStore[PostEvent]
(since reading from such a store could return DomainEvent
s,
while the caller would only expect PostEvent
s). So the definition of CommitReader
becomes:
/**
* Reads commits from the event store.
*/
trait CommitReader[-Event] {
// [...]
/**
* Reads all commits `since` (exclusive) up `to` (inclusive). Events are
* filtered to only include events of type `E`.
*/
def readCommits[E <: Event]
(since: StoreRevision, to: StoreRevision)
(implicit manifest: Manifest[E]): Stream[Commit[E]]
/**
* Reads all commits from the stream identified by `streamId` that occurred
* `since` (exclusive) up `to` (inclusive).
*
* @throws ClassCastException the stream contained commits that did not
* have the correct type `E`.
*/
def readStream[StreamId, E <: Event]
(streamId: StreamId,
since: StreamRevision = StreamRevision.Initial,
to: StreamRevision = StreamRevision.Maximum)
(implicit descriptor: EventStreamType[StreamId, E]):
Other methods in the event store traits are similarly changed. The methods can be used much like before, except that you’ll have to provide the
expected event type (readCommits
) or that Scala will look for an appropriate instance of EventStreamType
(readStream
):
implicit val PostEventStreamType: EventStreamType[PostId, PostEvent] =
EventStreamType(streamId => streamId.toString, event => event.postId)
val myCommits = eventStore.reader.readStream(myPostId)
Here the Scala compiler knows the type of myPostId
and will infer that the events must be of type PostEvent
1, based on the existence of
PostEventStreamType
. So Scala will correctly infer the type of myCommits
to be Stream[Commit[PostEvent]]
, so you will not have to do any type
casting.
Commit parameters
Another change is that we introduce a new Changes
type that combines the three parameters provided to the event store committer’s tryCommit
method
(streamId
, expected
stream revision, and the event
to append). The interface is:
/**
* Represents the changes that can be committed atomically to the event store.
*/
sealed trait Changes[Event] {
type StreamId
def eventStreamType: EventStreamType[StreamId, Event]
def streamId: StreamId
def expected: StreamRevision
def events: Seq[Event]
def withExpectedRevision(expected: StreamRevision): Changes[Event]
}
object Changes {
// [...]
def apply[StreamId, Event]
(expected: StreamRevision, event: Event)
(implicit streamType: EventStreamType[StreamId, Event]): Changes[Event] =
apply(streamType.streamId(event), expected, event)
}
So besides the three parameters it also captures the type of the stream identifier and the associated EventStreamType
instance, so that only events
of the correct type can be committed to the event stream. The withExpectedRevision
returns a new copy of the changes with an updated expected
StreamRevision
, which is useful for retrying a commit in the case of resolved conflicts.
With all these changes in place we can get back to solving transient transaction conflicts.
Atomic memory image modifications
Going back to our application’s storage architecture the following picture emerges:
Remember that in the blog post application there is one event store, each with many memory images (one per application server), all concurrently committing new events, based on the actions performed by the users. We already detect the conflicts that can occur when users work with (potentially) outdated information by comparing the expected stream revision to the actual revision (see the previous part for more details). Let’s call these kind of conflicts user conflicts.
But by adding the blog comment functionality we introduced a potential conflict in the interaction between the memory image and the event store. In
the
add comment action of PostsController we
first read the current state of a blog post and use it to assign a CommentId
to the new comment. We then commit a CommentAdded
event. If multiple
users happen to add comments to a blog post simultaneously, an unresolvable conflict will occur: both new comments will have the same id. This is a
transaction conflict2.3
In the current version of the application the commit fails and we’ll notify the user that someone else added a comment with the same id. The user must then resubmit the comment, which will then succeed. But why ask the user to retry an action for these kind of transient conflicts, when computers are much better at doing repetitive work automatically?
What we have to do is to automatically rerun the code that reads the memory image and then generates the event. So we will replace the MemoryImage
‘s
tryCommit
method with a new modify
method:
class MemoryImage[State, Event] /* [...] */ {
// [...]
def modify[A](body: State => Transaction[Event, A]): A = // ...
}
The body
parameter is a function that uses the current state of the memory image and returns a Transaction
value. This transaction value contains
all the information we need to try to commit to the event store. If it turns out that a transient transaction conflict occurred, we will rerun the
body
function with the updated memory image and retry the commit. When the transaction succeeds we return the value returned by body
.
The possible values for a Transaction
are shown below:
/**
* The transaction to commit to the event store when modifying the memory image.
*/
sealed trait Transaction[+Event, +A] {
/**
* Maps the result of this transaction from `A` to `B` using `f`.
*/
def map[B](f: A => B): Transaction[Event, B]
}
object Transaction {
/**
* Transaction result that completes with `onAbort` when run,
* without committing anything the event store.
*/
def abort[A](onAbort: => A): Transaction[Nothing, A] =
new TransactionAbort(() => onAbort)
implicit def ChangesOps[Event](changes: Changes[Event]) = new ChangesOps(changes)
class ChangesOps[Event](changes: Changes[Event]) {
/**
* Transaction result that will commit the `changes` to the event store.
*/
def commit[A](onCommit: => A, onConflict: Conflict[Event] => A)
(implicit conflictsWith: ConflictsWith[Event])
: Transaction[Event, A] =
new TransactionCommit(changes, () => onCommit, onConflict, conflictsWith)
}
implicit def OptionTransactionOps[Event, A](m: Option[Transaction[Event, A]]) = new OptionTransactionOps(m)
class OptionTransactionOps[Event, A](value: Option[Transaction[Event, A]]) {
/**
* Turns an `Option[Transaction[Event, A]]` into `Transaction[Event, Option[A]]`.
*/
def sequence: Transaction[Event, Option[A]] = value match {
case None => abort(None)
case Some(transaction) => transaction.map(Some(_))
}
}
}
The TransactionCommit
and TransactionAbort
(not listed) are simple case classes to hold parameters until we are ready to commit (or abort) the
transaction. The ChangesOps
enriches the Changes
class with a new commit
method that
instantiates a TransactionCommit
(Scala 2.10 will offer a nicer syntax for “extending” existing classes with new methods
using implicit classes). Values of type Option[Transaction[Event, A]]
are also
extended with the sequence
method, which flips the optionality to return a Transaction[Event, Option[A]]
To modify the memory image safely, we can now write code like:
memoryImage.modify { posts => Changes(/* ... */).commit(/* ... */) }
The flow chart below describes the full MemoryImage
modify
algorithm, including user and transaction conflict resolution:
That’s quite a few steps! Let’s go through each step in the left column:
- Read the current state of the memory image.
- Pass the state to the provided transaction body.
- Is the result of the transaction an abort? If yes, return
onAbort
result and stop. - Try to commit the
Changes
to the event store using the expected stream revision. - Did the commit succeed? If yes, return the
onCommit
result and stop. - Was there an update to the event stream that was not yet applied to the memory image state used to run this transaction? If yes, we have a transaction conflict and go back to step 1 to retry.
- Is there an unresolvable user conflict? If yes, return the result of the
onConflict
handler and stop. - Try to commit again, but now use the actual stream revision.
- Did the commit succeed? If yes, return the
onCommit
result and stop. If not, we have a transaction conflict, so go back to step 1 to retry.
This entire process is implemented in MemoryImage.modify. The method is a bit larger than I would like, but it seems to be one of those cases that don’t get any more understandable when split up into smaller pieces.
Now that we have basic transaction support in place we can modify the controller methods to work with this. Here’s the post edit action together with
the updatePost
helper method:
object edit {
// [...]
def submit(id: PostId, expected: StreamRevision) = Action { implicit request =>
updatePost(id) { post =>
postContentForm.bindFromRequest.fold(
formWithErrors =>
abort(BadRequest(views.html.posts.edit(id, expected, formWithErrors))),
postContent =>
Changes(expected, PostEdited(id, postContent): PostEvent).commit(
onCommit =
Redirect(routes.PostsController.show(id)).flashing(“info” -> “Post saved.”),
onConflict = conflict =>
Conflict(views.html.posts.edit(id, conflict.actual, postContentForm.fill(postContent), conflict.events))))
} getOrElse {
notFound
}
}
}
/**
* Runs the transaction `body` against the post identified by `postId` and
* returns the result, if it exists. Otherwise `None` is returned.
*/
def updatePost[A](id: PostId)
(body: Post => Transaction[PostEvent, A]): Option[A] =
memoryImage.modify { _.get(id).map(body).sequence }
As you can see, if the form validation fails, we abort
the transaction and return a BadRequest
as usual. Otherwise we return a commit result with
the PostEdited
event and the appropriate onCommit
and onConflict
handlers.
Summary
With the addition of the modify
method to the MemoryImage
we now have a place to put domain logic. Since the example application’s logic is still
extremely simple, we can just keep it inside the PostsController
. As your application gets more complicated, you should extract the domain logic
into separate functions and classes.
The event store (and memory image) now also support multiple types of event streams. We’ll use that in the next part to add user accounts to the example application. We’ll also look into ensuring uniqueness of the email addresses associated with these user accounts.
Footnotes:
-
Of course, the event store is a boundary. The stored events our outside of the application’s control and there is no guarantee that the events in a stream are of the correct type, or can be deserialized. This is why we fail fast when we get bad data out of the event store, so the problem can be detected and fixed quickly. ↩
-
Transient transaction conflicts are not specific to memory images or event stores. In relational database with multi-version concurrency control (such as Oracle or PostgreSQL) you can get a “non-serializable transaction” error, in which case you should also retry your transaction. ↩
-
Instead of trying to resolve transient conflicts, we can also prevent them by adhering to the Single Writer Principle. By making sure only one application server can write to the event store, there can be no conflicts. All
Changes
can then be send to this writer process to be committed to the event store. This is actually a great solution, but does require some kind of cluster communication to decide which server is the “writer”. Currently cluster communication (and leadership election) is not provided out-of-the-box by Play! or Akka, but Akka 2.1 should have built-in cluster management. ↩