Introduction

Sharpino is a little F# event sourcing framework. Contexts: Objects that can be event sourced. There is no specific id for a context so we just assume that a single instance and a single zero (initial state) instance exist for any Context. Aggregates: Objects that can be event sourced. Many instances of an aggregate can exist at a time so we need an Id for each instance represented by a Guid. The initial state for any aggregate will end up in an initial snapshot in the event store.

Contexts and aggregates need to define members that do Add/Remove/Update operations functionally and by using Result type to handle errors. We tipically use Railway oriented programming is used to handle errors in any Add/Remove/Update operation (by the FsTookit.ErrorHandling library).

Contexts and aggregates need to specify event types associated with Add/Remove/Update operations. Event types need associated commands Commands and events must be Discriminated Union types implementing the Command and the Event interfaces respectively.

This document is a guide to the Sharpino library. It is a work in progress. Focus on the following topics: Contexts, events, commands, service application layer, Event Store.

The sample application 1.

This sample is convoluted and represent unlikely situations like having complex relations and interactions between different aggregates/contexts. I would suggest to skip this part. I'd say the interesting proposal in this example is about the refactoring technique: how to move from a specific distribution of concerns between contexts/aggregate to another distribution. How to test the migration of the context/aggregate. The idea is to create a new version of context/aggregate and at the same time build a migration function.

Each todo contains references to Tags and Categories. I'll show how we can handle two versions of the same application that use different cluster configurations. This will help to talk about the context refactoring.

The "version 1" has a context managing Todo and Categories, and another context managing Tags.

The "version 2" of the application consists of three different contexts: one for the Todo model, one for the Categories model, and another one for the Tags model.

Sample application 2. Booking system for seats in a stadium

Contexts represent rows. Some constraints are applied to the rows. The context is responsible for the seats. The seats are associated with the rows.

Sample application 4. Booking system for seats in a stadium

The same as application 2 but where rows are aggregates (so I can have any number of instances of seat rows). The application uses the SAFE based stack based template son can publish the system as a rest service using the Fable Remoting library.

Entities

I am representing in this example the Todo entity and a Todo context

This is the Todo entity definition:

    type Todo =
        {
            Id: Guid
            CategoryIds : List<Guid>
            TagIds: List<Guid>
            Description: string
        }
    type Todos =
        {
            todos: List<Todo>
        }
        with
            static member Zero =
                {
                    todos = []
                }

The Todos context needs also some other members (to handle the stream name, the serialize/deserialize functions, and the snapshots interval...)

The Context will define a Zero static member (initial state). In case we use Fable remoting) then we need to share the definition of the entities between the client and the server side. (Shared project)

Source: TodosModel.fs

Context

A context is a class meant to be event-sourced, i.e. associated with a stream of events. To buld the state of a context using stored events we use the evolve function.

events are associated to members of the cluster that end up in Update/Delete/Remove of some entity

Static members that are mandatory for any cluster of entities are:

  • Zero: the initial state (no events yet).

  • StorageName and Version: this combination uniquely identifies the cluster and lets the storage know in which stream to store events and snapshots. Whatever will be the storage (memory, Postgres, EventstoreDb, etc.) the cluster will be stored in a stream named as the concatenation of the StorageName and the Version (i.e. "_todo_01")

  • SnapshotsInterval: the number of events that can be stored after a snapshot before creating a new snapshot (i.e. the number of events between snapshots).

The Command handler, by the runCommand function, applies a command, then stores the related events and returns the EventStore (database) IDs of those stored events and the KafkaDeliveryResult (if Kafka broker is enabled).

Example of a cluster of entities handling the todos and the categories:

    type TodosCluster =
        {
            todos: Todos
            categories: Categories
        }
        static member Zero =
            {
                todos = Todos.Zero
                categories = Categories.Zero
            }
        static member StorageName =
            "_todo"
        static member Version =
            "_01"
        static member SnapshotsInterval =
            15

In the following example, the TodosContext can check the validity of the categories referenced by any todo before adding it (to preserve the invariant rule that you can add only todo with valid category ID references). It uses the "result" computational expression included in the FsToolkit.ErrorHandling library which supports the railway-oriented programming pattern of handling errors.

Example:

    member this.AddTodo (t: Todo) =
        let checkCategoryExists (c: Guid ) =
            this.categories.GetCategories() 
            |> List.exists (fun x -> x.Id = c) 
            |> boolToResult (sprintf "A category with id '%A' does not exist" c)

        result
            {
                let! categoriesMustExist = t.CategoryIds |> catchErrors checkCategoryExists
                let! todos = this.todos.AddTodo t
                return 
                    {
                        this with
                            todos = todos
                    }
            }

Todo: Context.fs

Events

Events are discriminated unions (DU) with cases associated with members of the context that end up in adding/updating/deleting/ entities.

When we process an event it returns a new state or an error:

    type Event<'A> =
        abstract member Process: 'A -> Result<'A, string>

The 'A is the generic context or aggregate type, the event is associated with.

This is an example of a concrete implementation of an event related to the TodoCluster members Add and Remove.

So, for example, the TodoAdded event is associated with the AddTodo member. The Process member of the event is implemented by calling the related clusters member.

    type TodoEvent =
        | TodoAdded of Todo
        | TodoRemoved of Guid
            interface Event<TodosCluster> with
                member this.Process (x: TodosContext ) =
                    match this with
                    | TodoAdded (t: Todo) -> 
                        x.AddTodo t
                    | TodoRemoved (g: Guid) -> 
                        x.RemoveTodo g

Source code: Events.fs

Commands

(this section needs an update as we have a new )

A Command type is a Discriminated Union. Executing the command on a specific cluster or aggregate means returning a proper list of events or an error. You can also specify "command undoers", that allow you to compensate the effect of a command in case it is part of a multiple stream transaction that fails as we will see later. An undoer issues the events that can reverse the effect of the related command. For example, the "under" of AddTodo is the related RemoveTodo (see next paragraph).

In the following code we can see the signature for any state viewers for any context or aggregate. State viewer corresponds to read models: they will provide the current state of aggregate or context. Typically, that state may come from a cache, from the event store (by processing the events) or from a topic of Kafa (or eventually any other message/event broker, even though I haven't implemented completed any of them yet).


    type StateViewer<'A> = unit -> Result<EventId * 'A, string>
    type AggregateViewer<'A> = Guid -> Result<EventId * 'A,string>
    
    type Aggregate<'F> =
        abstract member Id: Guid // use this one to be able to filter related events from same string
        abstract member Serialize: 'F
    
    type Event<'A> =
        abstract member Process: 'A -> Result<'A, string>

    type Command<'A, 'E when 'E :> Event<'A>> =
        abstract member Execute: 'A -> Result<'A * List<'E>, string>
        abstract member Undoer: Option<'A -> StateViewer<'A> -> Result<unit -> Result<List<'E>, string>, string>>
        
    type AggregateCommand<'A, 'E when 'E :> Event<'A>> =
        abstract member Execute: 'A -> Result<'A * List<'E>, string>
        abstract member Undoer: Option<'A -> AggregateViewer<'A> -> Result<unit -> Result<List<'E>, string>, string>>


Example:

    type TodoCommand =
        | AddTodo of Todo
        | RemoveTodo of Guid

        interface Command<TodosContext, TodoEvent> with
            member this.Execute (x: TodosContext) =
                match this with
                | AddTodo t -> 
                    match x.AddTodo t with
                    | Ok _ -> [TodoAdded] |> Ok
                    | Error x -> x |> Error
                | RemoveTodo g ->
                    match
                        x.RemoveTodo g with
                        | Ok _ -> [TodoEvent.TodoRemoved g] |> Ok
                        | Error x -> x |> Error
            member this.Undoer = None

A command may return more than one event:

    type TodoCommand =
        [...]
        | Add2Todos of Todo * Todo
        interface Command<TodosCluster, TodoEvent> with
            member this.Execute (x: TodosContext) =
            [...]
            match this with
            | Add2Todos (t1, t2) -> 
                let evolved =
                    fun () ->
                    [TodoEvent.TodoAdded t1; TodoEvent.TodoAdded t2]
                    |> evolveUNforgivingErrors x
                match evolved() with
                    | Ok _ -> [TodoEvent.TodoAdded t1; TodoEvent.TodoAdded t2] |> Ok
                    | Error x -> x |> Error
            member this.Undoer

Any command must ensure that it will return Result.OK (and therefore, one or more events) only if the events to be returned, when processed on the current state, return an Ok result, i.e. a valid state (and no error).

The evolve tolerates inconsistent events. Thus the evolve will just skip events that, when processed, return an error. This feature is associated with a specific permissive optimistic lock type that we will see later.

Undoer

The use of the lambda expression is a nice trick for the undoers (the under is returned as a lambda that retrieves the context for applying the undo and returns another lambda that actually can "undo" the command).

I haven't simplified any example of undoer yet, but this is the idea related to commands for adding and removing items

module CartCommands =
    type CartCommands =
    | AddGood of Guid * int
    | RemoveGood of Guid
        interface AggregateCommand<Cart, CartEvents> with
            member this.Execute (cart: Cart) =
                match this with
                | AddGood (goodRef, quantity) -> 
                    cart.AddGood (goodRef, quantity)
                    |> Result.map (fun s -> (s, [GoodAdded (goodRef, quantity)]))
                | RemoveGood goodRef ->
                    cart.RemoveGood goodRef
                    |> Result.map (fun s -> (s, [GoodRemoved goodRef]))
            member this.Undoer = 
                match this with
                | AddGood (goodRef, _) -> 
                    Some 
                        (fun (cart: Cart) (viewer: AggregateViewer<Cart>) ->
                            result {
                                let! (i, _) = viewer (cart.Id) 
                                return
                                    fun () ->
                                        result {
                                            let! (j, state) = viewer (cart.Id)
                                            let! isGreater = 
                                                (j >= i)
                                                |> Result.ofBool (sprintf "execution undo state '%d' must be after the undo command state '%d'" j i)
                                            let result =
                                                state.RemoveGood goodRef
                                                |> Result.map (fun _ -> [GoodRemoved goodRef])
                                            return! result
                                        }
                                }
                        )
                | RemoveGood goodRef ->
                    Some
                        (fun (cart: Cart) (viewer: AggregateViewer<Cart>) ->
                            result {
                                let! (i, state) = viewer (cart.Id) 
                                let! goodQuantity = state.GetGoodAndQuantity goodRef
                                return
                                    fun () ->
                                        result {
                                            let! (j, state) = viewer (cart.Id)
                                            let! isGreater = 
                                                // this check depends also on the number of events generated by the command (i.e. the j >= (i+1) if command generates 2 event)
                                                (j >= i)
                                                |> Result.ofBool (sprintf "execution undo state '%d' must be after the undo command state '%d'" j i)
                                            let result =
                                                state.AddGood (goodRef, goodQuantity)
                                                |> Result.map (fun _ -> [GoodAdded (goodRef, goodQuantity)])
                                            return! result
                                        }
                                }
                        )

Probably to follow the example its worth reading again the definition of an undoer:

        abstract member Undoer: Option<'A -> AggregateViewer<'A> -> Result<unit -> Result<List<'E>, string>, string>>

This can be simplified as the state of the aggregate is accessed in different ways, however this is the meaning:

Extract from the current state of the aggregate useful info for a future "rollback"/"undo" and return a function that, when applied to the current state of the aggregate, will return the events that will "undo" the effect of the command.

Saga like transaction handling will probably need this logic. However, most of the time the commands between multiple aggregates uses db transactions (like Postgres) and the undoer is not needed.

Commands.fs

Application service layer

An application service layer implements multiple context logic. Here is one of the simplest examples of an entry for a service involving a single context, by building and running an AddTag command.

        member this.AddTag tag =
            result {
                let! result =
                    tag
                    |> AddTag
                    |> runCommand<TagsContext, TagEvent> storage eventBroker tagsStateViewer
                return result 
            }

As in the previous example, the service layer sends commands to the Command Handler so that this one can run it producing and storing the related events, returning the EventStore Ids of the stored events and the KafkaDeliveryResult (if Kafka broker is enabled).

We may adopt strong and immediate consistency by enforcing the use of a pessimistic lock on the command handler. An example of explicit use of lock:

    member this.addTodo todo =
        lock TagsCluster.LockObj <| fun () ->
            result {
                let! (_, tagState) = getState<TagsCluster, TagEvent>(storage)
                let tagIds = tagState.GetTags() |>> (fun x -> x.Id)

                let! tagIdIsValid =    
                    (todo.TagIds.IsEmpty ||
                    todo.TagIds |> List.forall (fun x -> (tagIds |> List.contains x)))
                    |> boolToResult "A tag reference contained is in the todo is related to a tag that does not exist"

                return! 
                    todo
                    |> TodoCommand.AddTodo
                    |> (runCommand<TodosCluster, TodoEvent> storage)
            }

The todo can be added only if it contains valid tag references.

Running two commands to different clusters

This code removes the tag with any reference to it. It builds two commands and makes the repository process them at the same time. This code removes a tag and any reference to it.

    member this.removeTag id =
        ResultCE.result {
            let removeTag = TagCommand.RemoveTag id
            let removeTagRef = TodoCommand.RemoveTagRef id
            return! runTwoCommands<TagsCluster, TodosCluster, TagEvent, TodoEvent> storage removeTag removeTagRef
        }

The runTwoCommands uses the undoer if the current event store requires it (i.e. it lacks multiple stream transactions).

Source: App.fs

Command Handler

The Command Handler has the responsibility of:

  • getting the state
  • trying to run commands passed, and eventually storing the related events.
  • making periodic snapshots, according to the SnapshotsInterval.

There are two different command handler implementations:

CommandHandler.fs

There is also an experimental repository based on a publish/subscribe storage model (Eventstoredb). See lightrepository

LightCommandHandler.fs

Note: from version 1.4.7 the commandHandler can use a Kafka broker to retrieve the current state. That means that the commandHandler can be used in a distributed environment, where the state is stored in a Kafka topic. Moreover, it can anyway rely on the EventStoreDb to rebuild the current state in case of failure of the Kafka broker (for example when events are out of sync).

EventStore (database)

An event-store, stores and retrieves events and snapshots related to each single context.

An example of a storage implementation in Postgres: DbStorage.fs

EventStoreDb:

The alternative storage is the EventStoreBridge, which is a bridge to the EventStore database. It is still experimental. EventStoreStorage.fs

Refactoring strategy

By cluster refactoring, I mean when we just move models (collections of entities) among contexts.

Here I am showing a strategy for refactoring in terms of:

  • moving the model's ownership between contexts,
  • introducing new contexts
  • upgrading old contexts.
  • dropping contexts.

The problem arises because it looks overcomplicated d to make upfront decisions about contexts. Consider that an application may start, for simplicity, with a single context and then, at a later stage, we may want to split it into multiple contexts.

Refactoring leaves the application service layer behavior unchanged.

The steps that may be followed are:

  • defining new contexts and eventually creating upgraded versions of current contexts
  • moving collection of entities from contexts to others.
  • creating an upgraded version of the application service layer using the new versions
  • applying the equivalent tests of the previous service layer to the new one.

About this latest point, a parametric testing strategy is also possible.

Here is an example of a list of tuples of multiple application version configurations with migration functions.

let allVersions =
    [
        (applicationPostgresStorage,        applicationPostgresStorage,       fun () -> () |> Result.Ok)
        (applicationShadowPostgresStorage,  applicationShadowPostgresStorage, fun () -> () |> Result.Ok)
        (applicationPostgresStorage,        applicationShadowPostgresStorage, applicationPostgresStorage._migrator.Value)

        (applicationMemoryStorage,          applicationMemoryStorage,         fun () -> () |> Result.Ok)
        (applicationShadowMemoryStorage,    applicationShadowMemoryStorage,   fun () -> () |> Result.Ok)
        (applicationMemoryStorage,          applicationShadowMemoryStorage,   applicationMemoryStorage._migrator.Value)
    ]

There are specific attributes to distinguish current and "upgrading" versions of elements of the application.

A migration function is needed to extract data from the current version and store it in the upgraded version.

Code here: MultiVersionsTests.fs

Testing

As I mentioned in the previous chapter we may have different versions of the same application based on different clusters.

We may want to test all of them and also we may want to test the migration function from one version to another in those tests. A structure of a parametric test that considers various possible combinations of application versions and migration functions is the following:


        multipleTestCase "test something " versions <| fun (ap, apUpgd, migrator)  ->
            let _ = ap._reset()

            // test something on ap, which is the "initial version" 
            // of the application
            .....

            // then migrate from "ap" version to apUpgd version 
            // using its migrator function

            let migrated = migrator()
            Expect.isOk migrated "should be ok"

            // test something on apUpgd, which is the "upgraded version" of the application

In some cases, you just want to run your tests fast and you don't want to test everything against all the possible combinations of application versions.

Then you may just skip some of the versions set up in the "version" triples (initversion, endversion, migrator) by commenting out temporarily the ones you don't want to spend time to test at the moment:

let allVersions =
    [

        // (AppVersions.currentPostgresApp,        AppVersions.currentPostgresApp,     fun () -> () |> Result.Ok)
        // (AppVersions.upgradedPostgresApp,       AppVersions.upgradedPostgresApp,    fun () -> () |> Result.Ok)
        // (AppVersions.currentPostgresApp,        AppVersions.upgradedPostgresApp,    AppVersions.currentPostgresApp._migrator.Value)

        (AppVersions.currentMemoryApp,          AppVersions.currentMemoryApp,       fun () -> () |> Result.Ok)
        // (AppVersions.upgradedMemoryApp,         AppVersions.upgradedMemoryApp,      fun () -> () |> Result.Ok)
        // (AppVersions.currentMemoryApp,          AppVersions.upgradedMemoryApp,      AppVersions.currentMemoryApp._migrator.Value)

        // (AppVersions.evSApp,                    AppVersions.evSApp,                 fun () -> () |> Result.Ok)
    ]

The above code enables only the tests of the current version of the app that uses the in-memory storage.

Apache Kafka

the apache Kafka development is still in progress

We can use Apache Kafka to notify events after storing them. The style is the outbox pattern (without db). Examples of usage are here. Some tests are instrumented to eventually listen to Apache Kafka, see here. Any "view" node may build its own view of the cluster state by listening to the events. KafkaStateViewer and KafkaAggregateStateViewer are able to build the current state by subscribing events on specific topics and specific partitions of topics.

Command handler is able to use read models given by KafkaStateViewer. Kafka based state viewer will be able to detect anomaly by checking the progressive id of any event. In case of such anomaly the state viewer will be able to access to the event store to build the state.

This part is still under development to be optmized.