Introduction
Sharpino is a little F# event sourcing framework. With Sharpino you can build an event sourced application using a functional approach. Two types of event sourced objects are supported: Contexts and Aggregates. Contexts: There is no specific id for a context so we just assume that a single instance and a single zero (initial state) instance exist for any Context. Aggregates: Many instances of an aggregate can exist at a time so we need an Id for each instance represented by a Guid. The initial state for any aggregate is its initial snapshot in the event store.
Contexts and aggregates will implement transformation members that will return a new object with a new state using the Result data type. We tipically use Railway oriented programming is used to handle errors in any transformation operation (by the FsTookit.ErrorHandling library).
We associate transformation members with events: processing an event will return a new object with a new state by invoking the transformation member associated with the event. Commands are function that given a state of a context/aggregate and some parameters will return a pari of a new state of the context/aggregate and the list of the events that, processed, will return such state.
Commands and events can be easily implemented using Discriminated Union types implementing the Command and the Event interfaces respectively.
This document is a guide to the Sharpino library. It is a work in progress. Focus on the following topics: Contexts, events, commands, service application layer, Event Store.
The sample application 1.
This application will be deprecated as it fails in showing the main features of the library.
Sample application 2. Booking system for seats in a stadium
Contexts represent rows. Some constraints are applied to the rows. The context is responsible for the seats. The seats are associated with the rows.
Sample application 3. Booking system for seats in a stadium
The same as application 2 but where rows are aggregates (so I can have any number of instances of seat rows). The application uses the SAFE based stack based template son can publish the system as a rest service using the Fable Remoting library.
Sample application 6. Pub system
Context
I am representing in this example the Todo entity and a Todo context
This is the Todo entity definition:
type Todo =
{
Id: Guid
CategoryIds : List<Guid>
TagIds: List<Guid>
Description: string
}
type Todos =
{
todos: List<Todo>
}
with
static member Zero =
{
todos = []
}
The Todos context needs also some other members (to handle the stream name, the serialize/deserialize functions, and the snapshots interval...)
The Context will define a Zero static member (initial state). In case we use Fable remoting) then we need to share the definition of the entities between the client and the server side. (Shared project)
Source: TodosModel.fs
Context
A context is a class meant to be event-sourced, i.e. associated with a stream of events. To buld the state of a context using stored events we use the evolve function.
events are associated to members of the cluster that end up in Update/Delete/Remove of some entity
Static members that are mandatory for any cluster of entities are:
-
Zero: the initial state (no events yet).
-
StorageName and Version: this combination uniquely identifies the cluster and lets the storage know in which stream to store events and snapshots. Whatever will be the storage (memory, Postgres, EventstoreDb, etc.) the cluster will be stored in a stream named as the concatenation of the StorageName and the Version (i.e. "_todo_01")
-
SnapshotsInterval: the number of events that can be stored after a snapshot before creating a new snapshot (i.e. the number of events between snapshots).
The Command handler, by the runCommand function, applies a command, then stores the related events and returns the EventStore (database) IDs of those stored events and the KafkaDeliveryResult (if Kafka broker is enabled).
Example of a cluster of entities handling the todos and the categories:
type Todos =
{
todos: Todos
categories: Categories
}
static member Zero =
{
todos = Todos.Zero
categories = Categories.Zero
}
static member StorageName =
"_todo"
static member Version =
"_01"
static member SnapshotsInterval =
15
static member Deserialize (s: string) =
jsonSerializer.Deserialize<Todos> s
member Serialize =
this
|> jsonSerializer.Serialize
In the following example, the TodosContext can check the validity of the categories referenced by any todo before adding it (to preserve the invariant rule that you can add only todo with valid category ID references). It uses the "result" computational expression included in the FsToolkit.ErrorHandling library which supports the railway-oriented programming pattern of handling errors.
Example:
member this.AddTodo (t: Todo) =
let checkCategoryExists (c: Guid ) =
this.categories.GetCategories()
|> List.exists (fun x -> x.Id = c)
|> boolToResult (sprintf "A category with id '%A' does not exist" c)
result
{
let! categoriesMustExist = t.CategoryIds |> catchErrors checkCategoryExists
let! todos = this.todos.AddTodo t
return
{
this with
todos = todos
}
}
Todo: Context.fs
Events
Events are discriminated unions (DU) with cases associated with transformation members.
When we process an event it returns a new state or an error:
type Event<'A> =
abstract member Process: 'A -> Result<'A, string>
The 'A is the generic context or aggregate type, the event is associated with.
This is an example of a concrete implementation of an event related to the TodoCluster members Add and Remove.
So, for example, the TodoAdded event is associated with the AddTodo member. The Process member of the event is implemented by calling the related clusters member.
type TodoEvent =
| TodoAdded of Todo
| TodoRemoved of Guid
interface Event<TodosCluster> with
member this.Process (x: TodosContext ) =
match this with
| TodoAdded (t: Todo) ->
x.AddTodo t
| TodoRemoved (g: Guid) ->
x.RemoveTodo g
Source code: Events.fs
Commands
(this section needs an update as we have a new )
A Command type is a Discriminated Union. Executing the command on a specific context or aggregate means returning a new state and, accordingly, a list of events, or an error. You can also specify "command undoers", that allow you to compensate the effect of a command. An undoer returns a new function that in the future can be executed to return the events that can reverse the effect of the command itself. For example, the "under" of AddTodo is the related RemoveTodo (see next paragraph).
In the following code we can see the signature for any state viewers for any context or aggregate. State viewer corresponds to read models: they will provide the current state of aggregate or context. Typically, that state may come from a cache, from the event store (by processing the events) or from a topic of Kafa (or eventually any other message/event broker, even though I haven't implemented completed any of them yet).
type StateViewer<'A> = unit -> Result<EventId * 'A, string>
type AggregateViewer<'A> = Guid -> Result<EventId * 'A,string>
type Aggregate<'F> =
abstract member Id: Guid // use this one to be able to filter related events from same string
abstract member Serialize: 'F
type Event<'A> =
abstract member Process: 'A -> Result<'A, string>
type Command<'A, 'E when 'E :> Event<'A>> =
abstract member Execute: 'A -> Result<'A * List<'E>, string>
abstract member Undoer: Option<'A -> StateViewer<'A> -> Result<unit -> Result<List<'E>, string>, string>>
type AggregateCommand<'A, 'E when 'E :> Event<'A>> =
abstract member Execute: 'A -> Result<'A * List<'E>, string>
abstract member Undoer: Option<'A -> AggregateViewer<'A> -> Result<unit -> Result<List<'E>, string>, string>>
Example:
type TodoCommand =
| AddTodo of Todo
| RemoveTodo of Guid
interface Command<TodosContext, TodoEvent> with
member this.Execute (x: TodosContext) =
match this with
| AddTodo t ->
match x.AddTodo t with
| Ok _ -> [TodoAdded] |> Ok
| Error x -> x |> Error
| RemoveTodo g ->
match
x.RemoveTodo g with
| Ok _ -> [TodoEvent.TodoRemoved g] |> Ok
| Error x -> x |> Error
member this.Undoer = None
A command may return more than one event:
type TodoCommand =
[...]
| Add2Todos of Todo * Todo
interface Command<TodosCluster, TodoEvent> with
member this.Execute (x: TodosContext) =
[...]
match this with
| Add2Todos (t1, t2) ->
let evolved =
fun () ->
[TodoEvent.TodoAdded t1; TodoEvent.TodoAdded t2]
|> evolveUNforgivingErrors x
match evolved() with
| Ok _ -> [TodoEvent.TodoAdded t1; TodoEvent.TodoAdded t2] |> Ok
| Error x -> x |> Error
member this.Undoer
Any command must ensure that it will return Result.OK (and therefore, one or more events) only if the events to be returned, when processed on the current state, return an Ok result, i.e. a valid state (and no error).
There are two version of the evolve: one tolerates inconsistent events and another one will fail in case just an event will return an error. The way the evens are stored in the event store ensures that no stored event will return inconsistent state when processed. Therefore future releases of the library will probably use by default the unforgiving version of the evolve function.
Undoer
Here is an example of the use of an undoer for a command:
module CartCommands =
type CartCommands =
| AddGood of Guid * int
| RemoveGood of Guid
interface AggregateCommand<Cart, CartEvents> with
member this.Execute (cart: Cart) =
match this with
| AddGood (goodRef, quantity) ->
cart.AddGood (goodRef, quantity)
|> Result.map (fun s -> (s, [GoodAdded (goodRef, quantity)]))
| RemoveGood goodRef ->
cart.RemoveGood goodRef
|> Result.map (fun s -> (s, [GoodRemoved goodRef]))
member this.Undoer =
match this with
| AddGood (goodRef, _) ->
Some
(fun (cart: Cart) (viewer: AggregateViewer<Cart>) ->
result {
let! (i, _) = viewer (cart.Id)
return
fun () ->
result {
let! (j, state) = viewer (cart.Id)
let! isGreater =
(j >= i)
|> Result.ofBool (sprintf "execution undo state '%d' must be after the undo command state '%d'" j i)
let result =
state.RemoveGood goodRef
|> Result.map (fun _ -> [GoodRemoved goodRef])
return! result
}
}
)
| RemoveGood goodRef ->
Some
(fun (cart: Cart) (viewer: AggregateViewer<Cart>) ->
result {
let! (i, state) = viewer (cart.Id)
let! goodQuantity = state.GetGoodAndQuantity goodRef
return
fun () ->
result {
let! (j, state) = viewer (cart.Id)
let! isGreater =
// this check depends also on the number of events generated by the command (i.e. the j >= (i+1) if command generates 2 event)
(j >= i)
|> Result.ofBool (sprintf "execution undo state '%d' must be after the undo command state '%d'" j i)
let result =
state.AddGood (goodRef, goodQuantity)
|> Result.map (fun _ -> [GoodAdded (goodRef, goodQuantity)])
return! result
}
}
)
This is the abstract definition of an of the undoer of an aggregate.
abstract member Undoer: Option<'A -> AggregateViewer<'A> -> Result<unit -> Result<List<'E>, string>, string>>
The meaning is: Extract from the current state of the aggregate useful info for a future "rollback"/"undo" and return a function that, when applied to the current state of the aggregate, will return the events that will "undo" the effect of this command.
You may need undoers if the event store doesn't support multiple stream transactions or if you will use a distributed architecture with many nodes handling different streams of events. By using PostgresSQL as event store you can just set the undoer to None as the event store will handle the cross-streams transactions for us.
Application service layer
An application service layer implements the logic to the outside world. Here is one of the simplest examples of an entry for a service involving a single context, by building and running an AddTag command.
member this.AddTag tag =
result {
let! result =
tag
|> AddTag
return result
}
The service layer sends commands to the Command Handler so that this one can run it producing and storing the related events, returning the EventStore Ids of the stored events.
Running two commands to different clusters
This code removes the tag with any reference to it. It builds two commands and makes the repository process them at the same time. This code removes a tag and any reference to it.
member this.removeTag id =
ResultCE.result {
let removeTag = TagCommand.RemoveTag id
let removeTagRef = TodoCommand.RemoveTagRef id
return! runTwoCommands<Tag, Todo, TagEvent, TodoEvent> eventStore eventBroker removeTag removeTagRef
}
Source: App.fs
Command Handler
The Command Handler has the responsibility of:
- getting the state
- trying to run commands passed, and eventually storing the related events.
- making periodic snapshots, according to the SnapshotsInterval.
There are two different command handler implementations:
EventStore (database)
An event-store, stores and retrieves events and snapshots related to each single context.
An example of a storage implementation in Postgres: PgEventStore.fs
Refactoring Aggregates
If you need to change an aggregate (or context) for new requirements we need to refactor the aggregate.
The technique is based on being able to create an aggregate that mimics the old one and an upcast function from the old one to the new one.
Beside this, you may do a bulk upcast of all the existing aggregates by making a snapthot of all of them. The new snapthot will use the new aggregate format so the definition of the old aggregate will be unnecessary after the bulk upcast and resnapshot.
You will create the shadow aggregate that mimic the old one in the same module of the current one, providing that the module is defined as rec.
The mechanism is based on handling the failure of the Deserialization by using as fallback the Deserialization using the old aggregate and then the upcast function to the new one.
Testing
As I mentioned in the previous chapter we may have different versions of the same application based on different clusters.
We may want to test all of them and also we may want to test the migration function from one version to another in those tests. A structure of a parametric test that considers various possible combinations of application versions and migration functions is the following:
multipleTestCase "test something " versions <| fun (ap, apUpgd, migrator) ->
let _ = ap._reset()
// test something on ap, which is the "initial version"
// of the application
.....
// then migrate from "ap" version to apUpgd version
// using its migrator function
let migrated = migrator()
Expect.isOk migrated "should be ok"
// test something on apUpgd, which is the "upgraded version" of the application
Apache Kafka
We will see later if and when an event broker is needed.
Anyway the mechanism should be: after the event is stored in the event store, it will be published to a queue/topic/message bus.
The command handler will still use the event store to get the state of the context and to store the events.
An application layer may read the state of some aggregate/context by listening the events.