Simple event sourcing - users, authentication, authorization (part 6)
Previously we spend some time preparing the code to support multiple kinds of events and data, rather than just supporting blog posts. In this part we’ll add user accounts, together with the required authentication and authorization code. We’ll again use event sourcing and the memory image to keep track of all users and currently active sessions. But the biggest changes to the application are related to security, and authorization in particular. It turns out event sourcing allows for an additional layer of authorization which allows us to whitelist any change a particular user is allowed to make.
Other parts
- Part 1 – Introduction
- Part 2 – Consistency
- Part 3 – Redis Event Store
- Part 4 – Conflict Resolution
- Part 5 – Refactoring and Transactions
- Part 6 – Users, Authentication, Authorization
Code
You can find the code associated with this part on github on the part-6
branch.
Since quite some time has past since the previous part some other upgrades were made as well:
- Scala 2.10.0 and Play 2.1-RC1 were released. The code has been updated accordingly. This mainly reduced the amount of code needed to work with JSON.
- The stable version of Redis is now 2.6.7, which supports Lua scripts. So the 2.4.x Redis Watch/Multi/Exec event store implementation was removed.
User Accounts
Most applications need some level of user authentication and authorization support. Since we’re building an event sourced example application we’ll first define the events we need for managing users and active sessions:
sealed trait UserEvent extends DomainEvent {
def userId: UserId
}
case class UserRegistered(userId: UserId, email: EmailAddress, displayName: String, password: Password) extends UserEvent
case class UserProfileChanged(userId: UserId, displayName: String) extends UserEvent
case class UserEmailAddressChanged(userId: UserId, email: EmailAddress) extends UserEvent
case class UserPasswordChanged(userId: UserId, password: Password) extends UserEvent
case class UserLoggedIn(userId: UserId, token: AuthenticationToken) extends UserEvent
case class UserLoggedOut(userId: UserId) extends UserEvent
From the event definitions it is quite clear what user related functionality our system supports. Besides registering, logging in and logging out, users can also change their profile information (display name), email address, and password.
To ensure we always hash passwords securely and never accidentally display them a custom Password
class is defined, which uses
the scrypt password hashing algorithm:
case class Password private (hash: String) {
require(hash.startsWith("$s0$"), "invalid password hash")
def verify(password: String): Boolean = SCryptUtil.check(password, hash)
override def toString = "<PASSWORD-HASH>"
}
object Password {
def fromHash(hash: String): Password = Password(hash)
def fromPlainText(password: String): Password = Password(SCryptUtil.scrypt(password, 1 << 14, 8, 2))
implicit val PasswordFormat: Format[Password] = valueFormat(fromHash)(_.hash)
}
Similar classes are defined for EmailAddress
and AuthenticationToken
. The authentication token is stored in the user’s session so we can lookup
the current user when an HTTP request is made. This is implemented as part of
the Users
class:
case class Users(
private val byId: Map[UserId, RegisteredUser] = Map.empty,
private val byEmail: Map[EmailAddress, UserId] = Map.empty,
private val byAuthenticationToken: Map[AuthenticationToken, UserId] = Map.empty) {
// [... code omitted ...]
def findByAuthenticationToken(token: AuthenticationToken): Option[RegisteredUser] =
byAuthenticationToken.get(token).flatMap(byId.get)
// [... code omitted ...]
}
Now that we not only store posts but users as well, the global state of the application is now captured in
the ApplicationState
class, which simply
combines the Posts
and Users
classes and dispatches incoming updates to its parts:
case class ApplicationState(posts: Posts = Posts(), users: Users = Users()) {
def update(event: DomainEvent, revision: StreamRevision) = event match {
case event: PostEvent => copy(posts = posts.update(event, revision))
case event: UserEvent => copy(users = users.update(event, revision))
case _ => sys.error(s"unknown event: $event")
}
def updateMany(events: Seq[(DomainEvent, StreamRevision)]) = events.foldLeft(this) {
case (state, (event, streamRevision)) => state.update(event, streamRevision)
}
}
The User
trait is defined as follows:
sealed trait User {
def displayName: String
/**
* @return `Some(registeredUser)` if this is a registered user,
* `None` otherwise.
*/
def registered: Option[RegisteredUser] = None
// [... authorization code ommitted ...]
}
By not having to worry about mapping classes to a (relational) database we can freely define our classes to match our application’s needs. In this
case there are four different implementations of this trait. They all have a displayName
and registered
method in common, but are used in
different situations:
/**
* A deleted or non-existent user.
*/
case class UnknownUser(id: UserId) extends User {
def displayName = "[deleted]"
}
/**
* A user that we know the name of, but nothing else. Consider a comment posted
* by a guest (where they only have to specify their name).
*/
case class PseudonymousUser(displayName: String) extends User
/**
* A visitor to the site is represented by the guest user object.
*/
case object GuestUser extends User {
def displayName = "Guest"
// [... authorization code ommitted ...]
}
/**
* A registered user who may have an active session identified by the
* `authenticationToken`.
*/
case class RegisteredUser(
id: UserId,
revision: StreamRevision,
email: EmailAddress,
displayName: String,
password: Password,
authenticationToken: Option[AuthenticationToken] = None
) extends User {
override def registered = Some(this)
// [... authorization code ommitted ...]
}
The current user
In most applications there is a current user. We want this user to be available to all our controller actions and views. We also want to store the current user’s id as part of a commit to our event store, so that we can easily trace who performed a specific action.
Instead of reimplementing this in every controller action we’ll add some helper methods, so that controller actions can stay focused. Play! makes it easy to do so using action composition.
First a new trait is defined so that we can decouple the controller from the raw memory image API. An instance of this trait will be passed to each controller when constructed:
/**
* Actions available to controllers that make use of a memory image.
*/
trait ControllerActions[State, -Event] extends Controller { outer =>
/**
* The type of blocks that only need to query the current state of the
* memory image.
*/
type QueryBlock[A] = State => ApplicationRequest[A] => Result
/**
* The type of blocks that wish to read and modify the memory image.
*/
type CommandBlock[A] = State => ApplicationRequest[A] => Transaction[Event, Result]
/**
* Runs the given `block` using the current state of the memory image.
*/
def QueryAction(block: QueryBlock[AnyContent]): Action[AnyContent]
/**
* Runs the given `block` using the current state of the memory image
* and applies the resulting transaction.
*/
def CommandAction(block: CommandBlock[AnyContent]): Action[AnyContent]
// [... code omitted ...]
}
As you can see two kinds of actions are provided:
- Query actions – used when you want to access the current user and application state, but do not need to modify it.
- Command actions – used when you want to generate and commit new events.
Each action expects a callback block as a parameter. The block will be invoked with the current memory image state and an ApplicationRequest
, which
wraps the standard Play! Request
and adds additional user related information:
trait CurrentUserContext {
/**
* The current authenticated user or the guest user.
*/
def currentUser: User
}
trait UsersContext {
/**
* All known users and sessions.
*/
def users: Users
}
/**
* Context for use as an implicit parameter to views.
*/
trait ViewContext extends CurrentUserContext {
def flash: Flash
}
/**
* Extend Play’s `Request` with user context information.
*/
class ApplicationRequest[A](
request: Request[A],
val currentUser: User,
val users: Users
) extends WrappedRequest(request) with ViewContext with UsersContext
So instead of passing the Play! Request
to views we now pass it a ViewContext
, so that we can easily control which information is available to a
view (in this case, just the current user and the flash message information).
The implementation of this trait uses a MemoryImage
based on the ApplicationState
class and can be found
at MemoryImageActions.scala. Here’s
the implementation of the QueryAction
method:
/**
* Implements `ControllerActions` using the (global) memory image containing
* the `ApplicationState`.
*/
class MemoryImageActions
(memoryImage: MemoryImage[ApplicationState, DomainEvent])
extends ControllerActions[ApplicationState, DomainEvent] {
override def QueryAction(block: QueryBlock[AnyContent]) = Action { request =>
val state = memoryImage.get
block(state)(buildApplicationRequest(request, state))
}
// [... CommandAction omitted ...]
private def buildApplicationRequest[A](request: Request[A], state: ApplicationState) = {
val currentUser = request.session.get("authenticationToken")
.flatMap(AuthenticationToken.fromString)
.flatMap(state.users.findByAuthenticationToken)
.getOrElse(GuestUser)
new ApplicationRequest(request, currentUser, state.users)
}
}
As you can see a QueryAction
is implemented as a Play! Action
that reads the current state of the memory image and uses it to build the ApplicationRequest
with the current user. If there is no authentication token the GuestUser
is used instead. The provided block is then invoked. The CommandAction
is similar, but uses the MemoryImage.modify
to also commit the generated events.
Finally, we do not want to expose the entire application state to each controller. The UsersController
only needs to see the users, while the
PostsController
only needs to see the posts. The view
method on ControllerActions
takes care of that:
trait ControllerActions[State, -Event] extends Controller { outer =>
// [... action methods omitted ...]
/**
* Only expose part of the state of the memory image using the provided
* function `f`.
*/
def view[S, E <: Event](f: State => S) = new ControllerActions[S, E] {
def QueryAction(block: QueryBlock[AnyContent]) = outer.QueryAction { state => request =>
block(f(state))(request)
}
def CommandAction(block: CommandBlock[AnyContent]) = outer.CommandAction { state => request =>
block(f(state))(request)
}
}
Controllers that use the memory image can now simply have an instance of ControllerActions
passed in at construction time to define actions. For
example, the PostsController.index action now becomes
object PostsController
extends PostsController(Global.MemoryImageActions.view(_.posts))
class PostsController(actions: ControllerActions[Posts, PostEvent]) {
// Import the action and controller methods directly.
import actions._
/**
* Show an overview of the most recent blog posts.
*/
def index = QueryAction { posts => implicit request =>
Ok(views.html.posts.index(posts.mostRecent(20)))
}
// [... other code omitted ...]
}
Uniqueness of email addresses
One common requirement is that email addresses must be unique across all users. In an application backed by a relational database this is as easy as adding a unique constraint1, but an event store does not provide anything similar. We could use the email address as the user’s event stream identifier, but that makes it hard for a user to change their email address without creating a brand new event stream. So what we’ll do instead is to store a map of email address to user ids and fill this map on demand. For unit testing the following implementation works fine:
val claimedEmailAddresses = collection.mutable.Map.empty[EmailAddress, UserId]
def claim(email: EmailAddress, requestedUserId: UserId): UserId =
claimedEmailAddress.getOrElseUpdate(email, requestedUserId)
In other words, the first time an email address is used a new user id is assigned. Afterwards, the same user id is always returned. For the production
implementation we use a Redis hash together with the HSETNX
command:
class RedisEmailRegistry(jedis: Jedis, redisKey: String) {
def claim(email: EmailAddress, requestedUserId: UserId): UserId = {
val result: Long = jedis.hsetnx(redisKey, email.toString, requestedUserId.toString)
result match {
case 0L =>
val existingUserId = jedis.hget(redisKey, email.toString)
UserId.fromString(existingUserId).getOrElse(sys.error(s"cannot parse user id: $existingUserId"))
case 1L =>
requestedUserId
case _ =>
sys.error(s"unexpected Redis return value: $result")
}
}
}
So when we register a new user we first map the email address to a user id and then generate the UserRegistered
event. If the email address is
already taken the event store will return a conflict, since we always expect the event stream to be empty for a new user:
class UsersController(
actions: ControllerActions[Users, UserEvent],
claimEmailAddress: (EmailAddress, UserId) => UserId
) {
import actions._
// [... other actions omitted ...]
def register = CommandAction { state => implicit request =>
val form = registrationForm.bindFromRequest
form.fold(
formWithErrors =>
abort(BadRequest(views.html.users.register(formWithErrors))),
registration => {
val (email, displayName, password) = registration
val userId = claimEmailAddress(email, UserId.generate)
Changes(
StreamRevision.Initial,
UserRegistered(userId, email, displayName, password): UserEvent)
.commit(
onCommit = Redirect(routes.UsersController.registered),
onConflict = _ => BadRequest(views.html.users.register(form.withGlobalError("duplicate.account"))))
})
}
Authors and authorization
When it comes to security authentication is often the “easy” part, mainly limiting itself to user registration and logging in. Authorization is
often much harder, since it affects the entire application. Now that we have the concept of a user we change blog posts and comments to include the
user id of the author or commenter. We then implement authorization rules to ensure only the author can delete a blog post, etc. We add the
authorization methods the User
trait:
trait User {
// [... code omitted ...]
def canAddPost: Boolean = false
def canAddComment(post: Post): Boolean = false
def canEditPost(post: Post): Boolean = false
def canDeletePost(post: Post): Boolean = false
def canDeleteComment(post: Post, comment: Comment): Boolean = false
// [... code omitted ...]
}
By default a user is not authorized to do anything, except for a RegisteredUser
or a GuestUser
:
case object GuestUser extends User {
// [... other code omitted ...]
override def canAddComment(post: Post) = true
}
case class RegisteredUser(/* ... code omitted ... */) extends User {
// [... other code omitted ...]
override def canAddPost = true
override def canAddComment(post: Post) = true
override def canEditPost(post: Post) = post.isAuthoredBy(this)
override def canDeletePost(post: Post) = post.isAuthoredBy(this)
override def canDeleteComment(post: Post, comment: Comment) =
post.isAuthoredBy(this) || comment.isAuthoredBy(this)
}
As you can see a guest user can only add comments to a post. Registered users can add posts, comments, and can edit or delete posts and comments authored by themselves.
Now controllers and views can easily invoke the correct authorization method to see if the user is allowed to see a certain button or to perform a
certain action. Here’s is an example when adding a post (from PostsController
), which uses
the AuthenticatedCommandAction
to require a logged in user:
def edit(id: PostId, expected: StreamRevision) = AuthenticatedCommandAction {
user => posts => implicit request =>
posts.get(id).filter(user.canEditPost) map { post =>
// [... code omitted ...]
} getOrElse {
abort(notFound)
}
}
Event authorization
Unfortunately, it is easy to forget to add the correct authorization check to a controller action, since any action is allowed by default. And an
attacker only has to find a single mistake to break the security of an application. With event sourcing we can optionally add an additional layer of
security: we can check the events being committed against the authorization level of a user. To do this we add one additional method to the User
trait:
trait User {
// [... code omitted ...]
def authorizeEvent(state: ApplicationState): DomainEvent => Boolean =
event => false
}
This method takes the current ApplicationState
and returns a function that checks if this user is allowed to commit a specific domain event. The
default implementation always forbids any change.
For guest users and registered users we override this method. Here’s the code for the guest user:
case object GuestUser extends User {
// [... code omitted ...]
override def authorizeEvent(state: ApplicationState) = {
case event: UserRegistered => true
case event: UserLoggedIn => true
case event: CommentAdded => state.posts.get(event.postId).exists(this.canAddComment)
case _ => false
}
}
Now we have everything to implement the CommandAction
helper method from the MemoryImageActions
class:
class MemoryImageActions(memoryImage: MemoryImage[ApplicationState, DomainEvent])
extends ControllerActions[ApplicationState, DomainEvent] {
// [... code omitted ...]
override def CommandAction(block: CommandBlock[AnyContent]) = Action { request =>
memoryImage.modify { state =>
val applicationRequest = buildApplicationRequest(request, state)
val currentUser = applicationRequest.currentUser
val transaction = block(state)(applicationRequest)
if (transaction.events.forall(currentUser.authorizeEvent(state))) {
transaction.withHeaders(currentUser.registered.map(user => "currentUserId" -> user.id.toString).toSeq: _*)
} else {
Transaction.abort(notFound(request))
}
}
}
// [... code omitted ...]
}
As you can see it first builds the ApplicationRequest
, just like the QueryAction
method. But it then inspects the transaction returned by the
provided block to see if the current user is authorized to commit all the generated events. If so, it adds the current user id as a header to the
commit and applies it to the memory image. Otherwise, the transaction is aborted with a 404 Not Found response.
The main advantage of this level of authorization is that events must be explicitly whitelisted. So when new functionality is added it will simply not work until the whitelist is updated. This is the opposite of views and controllers, where new functionality works by default and authorization checks must be explicitly added. A step that is easy to forget!
Summary
Adding users to our example application caused some major re-design to manage authentication and authorization. Compared to this event sourcing was just a minor detail, the same kind of re-design would have been needed if users were stored in a traditional database. Event sourcing does provide an additional layer of security by allowing us to authorize committed events. We can also fully audit all changes, as every commit includes the committing user’s id.
We have now build a fairly complete application. In the next part we’ll start making use of our event sourced model to integrate with the outside world. We’ll do this by reacting to the events as they are committed to the event store.
Footnotes:
-
Actually handling a unique constraint violation can be quite hard however. Often you’ll need to catch an exception from an ORM framework when the transaction is committed and then try to derive which unique constraint was violated. ↩