Introduction

Sharpino is a little F# event sourcing framework. Two types of event sourced objects are supported: Contexts and Aggregates. Contexts: There is no specific id for a context so we just assume that a single instance and a single zero (initial state) instance exist for any Context.

Note: "Contexts" will be not used anymore. A plain aggregate with a specific constand Id can substitute it (as in example 15). From here only "aggregates" (event sourced objects with an Id) will be mentioned.

Aggregates: Many instances of an aggregate type can exist at a time so we need an Id for each instance represented by a Guid. The initial state for any aggregate is its initial snapshot in the event store.

An aggregate implements at least one transformation members that will return a new instance of it or an error (using the Result data type) i.e. it returns Result<A, string> An aggregate must also specify as static members ways to serialize and deserialize and the stream name and the snapshot interval. Examples of aggregates are given in the sample applications. An aggregate must implement the Aggregate interface based on a generic type that can be string or byte[] depending on the serialization method used. Json serialization will use string while binary serialization will use byte[].

Transformative members can be associated with events that, when processed, will return a new instance of the aggregate with a new state. Commands are functions that given the current state of an aggregate and some parameters will return a pair of a new state of the aggregate and the list of events that, processed, will return such state.

Note: Beware that there are a specific commad type for Aggregates which is called AggregateCommand. See the definition in the source code.

Commands and events can be easily implemented using Discriminated Union types implementing the Command and the Event interfaces respectively.

Note: events must be serializable and deserialisable. Those functions use a generic serialization type (string or byte[]) in the same way as for aggregates.

Important note: many examples use the cross-aggregate transaction feature that has some advantages by allowing bidirectional references between aggregates making the navigation easier from any side. However, by adopting an in memory materialized view approach (see example 15) the need for cross-aggregate transactions is greatly reduced as the "navigation" can be done using the materialized view (or detailed view).

This document is a small guide to the Sharpino library (note: I cannot ensure that this is always up to date, read the examples starting from the end). Focus on the following topics:

The sample application 1.

This application will be deprecated as it fails in showing the main features of the library.

Sample application 2. Booking system for seats in a stadium

Contexts represent rows. Some constraints are applied to the rows. The context is responsible for the seats. The seats are associated with the rows. Note: there is a better way to implement this application using aggregates for the rows (see application 3).

Sample application 3. Booking system for seats in a stadium

The same as application 2 but where rows are aggregates (so I can have any number of instances of seat rows).

Sample application 6. Pub system

Objects: Dishes, Ingredients, and Suppliers.

Sample application 7. Shopping cart

Objects Shopping Cart, Goods, a container with references to the existing goods. Note: the container ("context") can be ditched. the applications has two different versions: one using binary serialization and another using text/json serialization

Sample application 8. Tycoon Transport

Partial implementation of the problem described here: Transport Tycoon Note: this will be revied by moving from bidirectional references to unidirectional references and materialized views/details.

Sample application 9. Classes, Teachers, Students, Reservations, Items

Classes, techer, students. Introducing the problem of course creation and cancellation fees showing transactions on multiple objects. (i.e. course, students, teachers, balance...). An experimental feature is related to "cross aggregates constraints" passed as lambda expressions to a command: a command may query the state of objects that are not directly related to the command.

Sample application 10. Multiple commands of any type

Thi example shows a way to execute multiple commands of any type in a single transaction. Note: it not advised to abuse this feature as a proper design is better and safer in using transaction scoped to a single aggregate type. However, in some cases this may be useful and is a fair alterative to the user of "sagas" or "process managers" or "orchestrators" or "compensators" (which are supported anyway on the command side).

Sample application 11. Students and Courses. Some performances meausurements

The examples check the performances by a creating massive number of students and courses. If RabbitMQ is installed and running it is possible to use it as event bus to decouple the event store from the read model update process.

To check with RabbitMQ just run with the following command line: dotnet run --configuration:RabbitMQ

Sample application 12. Use of binary serialization

The serialization library provided with the framework is based on FsPiclker and supports both binary and json (even though any library can be used on the application side). This time binary is used.

Sample application 13. Reservation pattern

In theory constraints can be voided by concurrent commands despite the use of optimistic concurrency control. To face this possibility a reservation pattern can be used.

Sample application 14.

Using FSharp.SystemTextJson for serialization/deserialization instead of the built in json serializer. Using type Id for any aggregate instead of Guid (primitive obsession). By facing primitive obsession it is possible to get better type safety and avoid errors due to wrong Ids usage.

Sample application 15.

Use of details (materialized views) instead of cross-aggregate transactions. Using refreshable details to make the details able to be cached and refreshed when needed, i.e. when any event related to the streams that it depends on is committed.

General note: more often than note the examples can be executed using rabbitmq message sending. Check the .fsproj files and if the RabbitMQ configuration is present it is possible to run the example using RabbitMQ as event bus by the command line below: dotnet run --configuration:RabbitMQ

Contexts

=This content is deprecated: instead of contexts use aggregates (see sample applications 2 and 3).

A context is a class meant to be event-sourced, i.e., associated with a stream of events. To build the state of a context using stored events, we use the evolve function.

events are associated with members of the cluster that end up in Update/Delete/Remove of some entity

Static members that are mandatory for any cluster of entities are:

  • Zero: the initial state (no events yet).

  • StorageName and Version: this combination uniquely identifies the cluster and lets the storage know in which stream to store events and snapshots. Whatever will be the storage (memory, Postgres, EventstoreDb, etc.) the cluster will be stored in a stream named as the concatenation of the StorageName and the Version (i.e. "_todo_01")

  • SnapshotsInterval: the number of events that can be stored after a snapshot before creating a new snapshot (i.e. the number of events between snapshots).

The Command handler, by the runCommand function, applies a command, then stores the related events and returns the EventStore (database) IDs of those stored events and the KafkaDeliveryResult (if Kafka broker is enabled).

Example of a cluster of entities handling the todos and the categories:

    type Todos =
        {
            todos: Todos
            categories: Categories
        }
        static member Zero =
            {
                todos = Todos.Zero
                categories = Categories.Zero
            }
        static member StorageName =
            "_todo"
        static member Version =
            "_01"
        static member SnapshotsInterval =
            15
        static member Deserialize (s: string) =
            jsonSerializer.Deserialize<Todos> s
        member Serialize = 
            this
            |> jsonSerializer.Serialize

In the following example, the TodosContext can check the validity of the categories referenced by any todo before adding it (to preserve the invariant rule that you can add only a todo with a valid category ID reference). It uses the "result" computational expression included in the FsToolkit.ErrorHandling library which supports the railway-oriented programming pattern of handling errors.

Example:

    member this.AddTodo (t: Todo) =
        let checkCategoryExists (c: Guid ) =
            this.categories.GetCategories() 
            |> List.exists (fun x -> x.Id = c) 
            |> boolToResult (sprintf "A category with id '%A' does not exist" c)

        result
            {
                let! categoriesMustExist = t.CategoryIds |> catchErrors checkCategoryExists
                let! todos = this.todos.AddTodo t
                return 
                    {
                        this with
                            todos = todos
                    }
            }

Todo: Context.fs

Events

It is easy to represent events by discriminated unions (DU) with cases associated with transformation members. We need to implement the Event interface and so the Process member.

When we process an event it returns a new state or an error:

    type Event<'A> =
        abstract member Process: 'A -> Result<'A, string>

The 'A is the generic context or aggregate type, the event is associated with.

This is an example of a concrete implementation of an event related to the TodoCluster members Add and Remove.

So, for example, the TodoAdded event is associated with the AddTodo member. The Process member of the event is implemented by calling the related clusters member.

    type TodoEvent =
        | TodoAdded of Todo
        | TodoRemoved of Guid
            interface Event<Todo> with
                member this.Process (x: TodosContext ) =
                    match this with
                    | TodoAdded (t: Todo) -> 
                        x.AddTodo t
                    | TodoRemoved (g: Guid) -> 
                        x.RemoveTodo g

Note: by overloading the members of an aggregates may make the association between events and members less clear as there is no valid way to express members overloding in the event types directly (would be an invalid DU). A workaround would be easy (create a specific DU for the parameters to express the overloading for example)

Source code: Events.fs

Commands

(this section needs an update as we have a new )

A Command type can be represented by a Discriminated Union. Executing the command on a specific context or aggregate means returning a new state and, accordingly, a list of events, or an error. You can also specify "command undoers", that allow you to compensate the effect of a command. An undoer returns a new function that in the future can be executed to return the events that can reverse the effect of the command itself. For example, the "under" of AddTodo is the related RemoveTodo (see next paragraph).

In the following code we can see the signature for any state viewers for any context or aggregate. State viewer corresponds to read models: they will provide the current state of aggregate or context. Typically, that state may come from a cache, from the event store (by processing the events) or from a topic of Kafa (or eventually any other message/event broker, even though I haven't implemented completed any of them yet).


    type StateViewer<'A> = unit -> Result<EventId * 'A, string>
    type AggregateViewer<'A> = AggregateId -> Result<EventId * 'A,string>
   
   
    type Aggregate<'F> =
        abstract member Id: AggregateId 
        abstract member Serialize: 'F
    
    type Event<'A> =
        abstract member Process: 'A -> Result<'A, string>

    type CommandUndoer<'A, 'E> =  Option<'A -> StateViewer<'A> -> Result<unit -> Result<List<'E>, string>, string>>
    type AggregateCommandUndoer<'A, 'E> = Option<'A -> AggregateViewer<'A> -> Result<unit -> Result<'A * List<'E>, string>, string>>
    type Command<'A, 'E when 'E :> Event<'A>> =
        abstract member Execute: 'A -> Result<'A * List<'E>, string>
        abstract member Undoer: CommandUndoer<'A, 'E>
        
    type AggregateCommand<'A, 'E when 'E :> Event<'A>> =
        abstract member Execute: 'A -> Result<'A * List<'E>, string>
        abstract member Undoer: AggregateCommandUndoer<'A, 'E>

Example:


    type CourseEvents =
        | StudentEnrolled of StudentId
        | StudentUnenrolled of StudentId
        | Renamed of string
        interface Event<Course> with
            member this.Process (course: Course) =
                match this with
                | StudentEnrolled studentId -> course.EnrollStudent studentId
                | StudentUnenrolled studentId -> course.UnenrollStudent studentId
                | Renamed name -> course.Rename name
       
        static member Deserialize (x: string): Result<CourseEvents, string> =
            try
                JsonSerializer.Deserialize<CourseEvents> (x, jsonOptions) |> Ok
            with
            | ex ->
                Error (ex.Message)
        member this.Serialize =
            JsonSerializer.Serialize (this, jsonOptions)

A command returns returns a new state and a list of on e or more events:

    type CourseCommands =
        | EnrollStudent of StudentId
        | UnenrollStudent of StudentId
        | Rename of string
        interface AggregateCommand<Course, CourseEvents> with
            member this.Execute (course: Course) =
                match this with
                | EnrollStudent studentId ->
                    course.EnrollStudent studentId
                    |> Result.map (fun s -> (s, [ StudentEnrolled studentId]))
                | UnenrollStudent studentId ->
                    course.UnenrollStudent studentId
                    |> Result.map (fun s -> (s, [ StudentUnenrolled studentId]))
                | Rename name ->
                    course.Rename name
                    |> Result.map (fun s -> (s, [ Renamed name]))    
            member this.Undoer = None


Any command must ensure that it will return Result.OK (and therefore, one or more events) only if the events to be returned, when processed on the current state, return an Ok result, i.e. a valid state (and no error). 

There are two version of the _evolve_: one tolerates inconsistent events and another one will fail in case just an event will return an error.
The way the evens are stored in the event store ensures that no stored event will return inconsistent state when processed. Therefore future releases of the library will probably use by default the unforgiving version of the _evolve_ function.
Currently the policy is using the "forgiving" verion only as a fallback, by logging the error and skipping the inconsistent events.  

## Undoer

An undoer would be useful in case the only way to make two streams consistent is to "rollback" the effect of a command on one stream if the command on another stream fails.

(this should be handled at the application level, though)

Unfortunately there is no  simple way to express the undoer, yet. The good news is that an undoer is optional, and generally is set to None.

```fsharp
    member this.Undoer =
        match this with
        | EnrollStudent studentId ->
            (
                fun (course: Course) (viewer: AggregateViewer<Course>) ->
                    result {
                        return 
                            fun () ->
                                result {
                                    let! _, state = viewer course.Id.Id
                                    let result =
                                        state.UnenrollStudent studentId
                                        |> Result.map (fun s -> s, [ StudentUnenrolled studentId])
                                    return! result
                                }
                    }    
            )
            |> Some
        | UnenrollStudent studentId ->
            (
                fun (course: Course) (viewer: AggregateViewer<Course>) ->
                    result {
                        return 
                            fun () ->
                                result {
                                    let! _, state = viewer course.Id.Id
                                    let result =
                                        state.EnrollStudent studentId 
                                        |> Result.map (fun s -> s, [StudentEnrolled studentId])
                                    return! result
                                }
                    } 
            )
            |> Some
        | Rename _ ->
            None

The meaning is: if a command is already executed and the related events are stored, we have a way to ask to that command to provide a function, that can be executed in the future to return another function that when executed will return a result with the list of events that can "undo" the effect of the command itself.

This is the abstract definition of an of the undoer of an aggregate.

    type AggregateCommandUndoer<'A, 'E> = Option<'A -> AggregateViewer<'A> -> Result<unit -> Result<'A * List<'E>, string>, string>>

The meaning is: Extract from the current state of the aggregate useful info for a future "rollback"/"undo" and return a function that, when applied to the current state of the aggregate, will return the events that will "undo" the effect of this command.

You may need undoers if the eventstore doesn't support multiple stream transactions or if you will use a distributed architecture with many nodes handling different streams of events. By using PostgresSQL as event store you can just set the undoer to None as the event store will handle the cross-streams transactions for us.

Commands.fs

Application service layer

An application service layer implements the logic and makes business logic calls, or queries, available. Here is one of the simplest examples of an entry for a service involving a single context, by building and running an AddTag command.

Initializing the application service layer means providing the correct "viewers" for the streams (which can be based on the event store, mediated by the cache, or can be based on an event broker listener, as in the RabbitMq provided examples) plus some compound viewers if needed (i.e. to read the state of all instances of an aggregate type) and a "messageSenders" object that can be passed to the command handler to let it send messages to the event broker (if any) in a fire and forget way:


    type CourseManager
        (
            eventStore: IEventStore<string>,
            courseViewer: AggregateViewer<Course>,
            studentViewer: AggregateViewer<Student>,
            messageSenders: MessageSenders,
            allStudentsAggregateStatesViewer: unit -> Result<(Definitions.EventId * Student) list, string>
        )

A member of the service to provide the initialization of an aggregate:

        member this.AddStudent (student: Student) =
            result
                {
                    return!
                        runInit<Student, StudentEvents, string>
                        eventStore
                        messageSenders
                        student
                }

A member of the service to retrieve a student via its id


        member this.GetStudent (id: StudentId)  =
            result
                {
                    let! _, student = studentViewer id.Id
                    return student
                }

Note: by using the typed Id, we need to extract the proper Guid Id via the Id property if using the studentViewer.

An example using two commands related to two streams. The example assume that we prefer that the enrollments are stored in both the Student aggregate and the Course aggregate (i.e. bidirectional references). So we need to run two commands in a single transaction to preserve consistency.:

        member this.EnrollStudentToCourse (studentId: StudentId) (courseId: CourseId) =
            result
                {
                    let addCourseToStudentEnrollments = StudentCommands.Enroll courseId
                    let addStudentToCourseEnrollments = CourseCommands.EnrollStudent studentId
                    return!
                        runTwoAggregateCommands
                            studentId.Id
                            courseId.Id
                            eventStore
                            messageSenders
                            addCourseToStudentEnrollments
                            addStudentToCourseEnrollments
                }

In the following example, the enrollment is a single object type that register enrollments in a single stream (a more conventional approach):

        member this.CreateEnrollment (studentId: StudentId) (courseId: CourseId) =
            result {
                let! enrollments = this.GetOrCreateEnrollments()
                do! 
                    enrollments.Enrollments
                    |> List.exists (fun e -> e.StudentId = studentId && e.CourseId = courseId)
                    |> not
                    |> Result.ofBool "Student is already enrolled in this course."

                let enrollmentItem = 
                    { CourseId = courseId
                      StudentId = studentId
                      EnrollmentDate = DateTime.UtcNow }
                    
                let studentDetailsKey =  DetailsCacheKey (typeof<RefreshableStudentDetails>, studentId.Id)
                let _ =
                    DetailsCache.Instance.UpdateMultipleAggregateIdAssociationRef [|courseId.Id|] studentDetailsKey ((TimeSpan.FromMinutes 10.0) |> Some)
                    
                let command = EnrollmentCommands.AddEnrollment enrollmentItem
                let! result = 
                    runAggregateCommand<Enrollments, EnrollmentEvents, string>
                        enrollmentId.Id
                        eventStore
                        messageSenders
                        command
                return result
            }

Note: the examples show the complexity of handling cache of the details. This aspect wil be covered up and hopefully simplified in future version of this documentation. Just note that any existing RefreshableStudentDetails object need to be notified that a new enrollment is created so that it can refresh its state when needed.

The quoted code is related to example 15.

Command Handler

The Command Handler has the responsibility of running the commands which means doing the following steps:

  • get the state/s of the aggregate/s invoved on the command.
  • try to run commands on that states,
  • try to store the resulting events,
  • send the events to the event broker/message bus (if any),
  • make periodic snapshots, according to the SnapshotsInterval. Those steps are enveloped by a result computation expression expressing explicitly the "happy path" and implicitly the "error path" using monadic operators. Note: The only real success occurs after the event store is stored. The delivery of the messages does not affect the result of running the command. Any message listener can/should implement a "fallback" policy so thy they can be rehydrated by a call to the event store to restore the correct state of the aggregate.

CommandHandler.fs

EventStore (database)

An eventstore stores and retrieves events and snapshots related to each single context. There are plenty of members to store and retrieve events and snapshots in different ways. Particularly some of them may use explicitly the Task return type and cancellation token. The standard behavior is still the use of a globally configured timeout.

Some (but not all) the commandHandler functions use the Task and optiona cancellation token as well. The goal is going for a full TaskResult based Api with optional cancellation token in the next major release.

The Postgres (json and binary) implementations use optimistic concurrency by checking the eventId before storing new events.

The inmemory implementation is useful for testing and prototyping.

An example of a storage implementation in Postgres: PgEventStore.fs

Refactoring Aggregates via upcasting

If you need to change an aggregate (or context) for new requirements, you need to refactor the aggregate so that it can handle the old and the new formats.

An upcasting technique is based on being able to create an aggregate that mimics the old one and an upcast function to convert from the old one to the new one.

It is convenient to do a bulk upcast of all the existing aggregates by making a snapshot of all of them to be able to drop the old aggregate definition from the code. The new snapshots will use the new aggregate format, so the definition of the old aggregate will be unnecessary after the bulk upcast and resnapshot.

To do an upcast, you can create a shadow aggregate that mimics the old one in the same module of the current one, provided that the module is defined as rec (recursive).

The mechanism is based on handling the failure of the Deserialization by using as fallback the Deserialization using the old aggregate and then the upcast function to the new one.

Note: it may depend on the serialization library and its configuration as it is not always true that you can deserialize the old aggregate changing its target type with a different name than the original one.

The provided examples are all using FSPickler configured in a way that it allows the upcast from the old aggregate with a different name to the new one.

Quick example of an upcast from an old aggregate to a new one:

The current version of the Course aggregate is:

type Curse =
    {
        Id: Guid
        Name: string
        Students: List<Guid>
        MaxNumberOfStudents: int
    }

We want to add a new field Teachers to the aggregate so it will become:

type Course =
    {
        Id: Guid
        Name: string
        Students: List<Guid>
        MaxNumberOfStudents: int
        Teachers: List<Guid>
    }

We want to be able to read the old version of the aggregate and upcast it to the new one. So we redefine a type Course001 that mimics the old version of the aggregate:

type Course001 =
    {
        Id: Guid
        Name: string
        Students: List<Guid>
        MaxNumberOfStudents: int
    }

We instrument the course001 with deserialization and upcast function

type Course001 =
    {
        Id: Guid
        Name: string
        Students: List<Guid>
        MaxNumberOfStudents: int
        Teachers: List<Guid>
    }

    with
        static member Deserialize x =
            jsonPSerializer.Deserialize<Curse001> x
        member this.Upcast(): Course =
            {   Name = this.Name
                Id = this.Id
                Students = this.Students
                MaxNumberOfStudents = this.MaxNumberOfStudents
                Teachers = []
            }

Finallly we make sure that the current deserialize function of the Course aggregate can handle both the old and the new format:
```fsharp 
    static member Deserialize(x: string): Result<Course, string> =
        let firstTry = jsonPSerializer.Deserialize<Course> x
        match firstTry with
        | Ok x -> x |> Ok
        | Error e ->
            let secondTry = jsonPSerializer.Deserialize<Curse001> x
            match secondTry with
            | Ok x -> x.Upcast() |> Ok
            | Error e1 ->
                Error (e + " " + e1)

Examples using FSharp.SystemTextJson are also privided. It's important to test any library to be sure that the "mimic old type" deserialization and upcast works as in this example.

Testing

There are plenty of tests in the examples folders. of the Sharpino repository. the structure is based on using a setup that wipe the event store and the cache before each test case. The test must be run in a sequence to avoid interference between them because of the state of the event store and the cache.o

An extension of Expecto provide a "multipleTestCase` function that makes the test parametric.

A particular case of using the multipleTestCase is to run the same test case with different event store implementations (i.e. inmemory, Postgres json, Postgres binary eventually in conjuction with Rabbitmq).

A particular parameter needed to test by using RabbitMQ as event broker is to run the tests in sequence with some delay to allow the propagation of the messages between the different consumers. The delay parameter will so depend on which configuration is under test.

An example of setting "instances" (which should mean 'parameters') to feed the test cases is the following:

let instances =
    [
        #if RABBITMQ
            (fun () -> setUp pgEventStore),  ItemManager(pgEventStore, rabbitMqItemStateViewer, rabbitMqReservationStateViewer, messageSenders), 100
        #else
            (fun () -> setUp(pgEventStore)), ItemManager(pgEventStore, pgStorageItemViewer, pgStorageReservationViewer), 0
            (fun () -> setUp(memEventStore)),  ItemManager(memEventStore, memoryStorageItemViewer, memoryStorageReservationViewer), 0
        #endif
    ]

That configuration use two (or three if Rabbitmq is defined) different event stores and viewers to run the same test cases.

In the examples we see that there exist "viewers" based on the event store and based on Rabbitmq consumers.

Any "viewer" based on the eventStore (which uses also the cache in between) has the following signature:

    type AggregateViewer<'A> = AggregateId -> Result<EventId * 'A,string>

By using a viewer based on the event store we can rely on this function present in the command handler:

    let inline getAggregateStorageFreshStateViewer<'A, 'E, 'F
        when 'A :> Aggregate<'F> 
        and 'A : (static member Deserialize: 'F -> Result<'A, string>) 
        and 'A : (static member StorageName: string) 
        and 'A : (static member Version: string) 
        and 'E :> Event<'A>
        and 'E: (static member Deserialize: 'F -> Result<'E, string>)
        >
        (eventStore: IEventStore<'F>) 
        =
            fun (id: Guid) ->
                result
                    {
                        let! (eventId, result) = getAggregateFreshState<'A, 'E, 'F> id eventStore
                        return
                            (eventId, result :?> 'A) 
                    }

Note: a critical point is that given that the cache layer is type agnostic, some cast/boxing is needed to return the proper type of the aggregate. It seems that this use is safe enough as long as the following constraints is satisfied: whenever an aggregate of type 'A is stored in the event store, it is retrieved as type 'A as well. This aspect should be invisible to the application using the library but, still, it worth mentioning it. Also note that the box/unboxing operations may have a performance impact, but that is overcomed by the empirical evidence of the improvement given by the cache layer.

A viewer based on Rabbitmq deserves a different chapter. However the provided examples are based on using some blueprint to facilitate handling the aggregate states of a certains stream as a ConcurrentDictionary:

    ConcurrentDictionary<AggregateId, (EventId * 'A)

Many examples based on RabbitMq use such blueprints to implement a "Consumer" thet, among all, provide a viewer function that can be used in the service layer.

        member this.GetAggregateState (id: AggregateId) =
            if (statePerAggregate.ContainsKey id) then
                statePerAggregate.[id]
                |> Result.Ok
            else
                Result.Error "No state" 

In essence: by instrumenting any stream with RabbitMq consumer, and by also instrumenting the tests to launch the consumers as service, it will be possible to use and test the viewers based on RabbitMq.