25

I recently started to dive into CQRS / ES because I might need to apply it at work. It seems very promising in our case, as it would solve a lot of problems.

I sketched my rough understanding on how an ES / CQRS app should look like contextualized to a simplified banking use case (withdrawing money).

ES / CQRS

Just to sum up, if person A withdraws some money:

  • a command is issued
  • command is handed over for validation / verification
  • an event is pushed to an event store if the validation succeeds
  • an aggregator dequeues the event to apply modifications on the aggregate

From what I understood, the event log is the source of truth, as it is the log of FACTS, we can then derive any projection out of it.


Now, what I don't understand, in this grand scheme of things, is what happens in this case:

  • rule: a balance cannot be negative
  • person A has a balance of 100e
  • person A issues a WithdrawCommand of 100e
  • validation passes and MoneyWithdrewEvent of 100e event is emitted
  • in the meantime, person A issues another WithdrawCommand of 100e
  • the first MoneyWithdrewEvent did not get aggregated yet therefore validation passes, because the validation check against the aggregate (that has not been updated yet)
  • MoneyWithdrewEvent of 100e is emitted another time

==> We are in an inconsistent state of a balance being at -100e and the log contains 2 MoneyWithdrewEvent

As I understand there are several strategies to cope with this problem:

  • a) put the aggregate version id along with the event in the event store so if there is a version mismatch upon modification, nothing happens
  • b) use some locking strategies, implying that the verification layer has to somehow create one

Questions related to the strategies:

  • a) In this case, the event log is not the source of truth anymore, how to deal with it ? Also, we returned to the client OK whereas it was totally wrong to allow the withdrawal, is it better in this case to use locks ?
  • b) Locks == deadlocks, do you have any insights about the best practices ?

Overall, is my understanding correct on how to handle concurrency ?

Note: I understand that the same person withdrawing two times money in such a short time window is impossible, but I took a simple example, not to get lost into details

Louis F.
  • 353
  • 3
  • 6
  • Why not update the aggregate in step 4 instead of waiting until step 7? – Erik Eidt May 24 '17 at 19:34
  • So you mean that in this case, the event store is just a log that only gets read upon starting application to recreate aggregates / other projections ? – Louis F. May 24 '17 at 19:57

2 Answers2

26

I sketched my rough understanding on how an ES / CQRS app should look like contextualized to a simplified banking use case (withdrawing money).

This is the perfect example of an event sourced application. Let's start.

Every time a command is processed or retried (you will understand, be patient) the following steps are performed:

  1. the command reaches a command handler, i.e. a service in the Application layer.
  2. the command handler identifies the Aggregate and loads it from the repository (in this case the loading is performed by new-ing an Aggregate instance, fetching all the previously emitted events of this aggregate and re-applying them to the Aggregate itself; the Aggregate version is stored for later use; after the events are applied the Aggregate is in its final state - i.e. the current account balance is computed as a number)
  3. the command handler calls the appropriate method on the Aggregate, like Account::withdrawMoney(100) and collects the yielded events, i.e. MoneyWithdrewEvent(AccountId, 100); if there are not enough money in the account (balance < 100) then an Exception is raised and all is aborted; otherwise, the next step is performed.
  4. the command handler tries to persist the Aggregate to the repository (in this case the repository is the Event Store); it do so by appending the new events to the Event stream if and only if the version of the Aggregate is still the one that was when the Aggregate was loaded. If the version is not the same, then the command is retried - go to step 1. If the version is the same, then the events are appended to the Event stream and the client is provided with the Success status.

This version checking is called optimistic locking and is a general locking mechanism. One other mechanism is pessimistic locking when other writings are blocked(as in not started) until the current one completes.

The term Event stream is an abstraction around all the events that were emitted by the same Aggregate.

You should understand that the Event store is just another kind of persistence where are stored all the changes to an Aggregate, not just the final state.

a) In this case, the event log is not the source of truth anymore, how to deal with it ? Also, we returned to the client OK whereas it was totally wrong to allow the withdrawal, is it better in this case to use locks ?

The Event store is always the source of truth.

b) Locks == deadlocks, do you have any insights about the best practices ?

By using optimistic locking you have no locks, just command retrying.

Anyways, Locks != deadlocks

Constantin Galbenu
  • 3,242
  • 12
  • 16
  • 3
    There are some optimizations regarding the loading of an `Aggregate` where you don't apply all the events but you keep a snapshot of the `Aggregate` up to a point in the past and apply only the events that happened after that point. – Constantin Galbenu May 25 '17 at 06:46
  • Ok I think my confusion comes from the fact that event store == event bus (I have kafka in mind) therefore re-constructing the aggregate might be costy as you might need to re-read a LOT of events. In the case of having a snapshot of the `Aggregate`, when should the snapshot be updated ? Is the snapshot store the same as the event store or is it a materialised view derived from the event bus ? – Louis F. May 25 '17 at 08:33
  • There are some strategies about creating the snapshot. One is to make a snapshot every n events. You should store the snapshot along with the events, in the same place/persistence/database, on the same commit. The idea is that the snapshot is strongly related to the Aggregate's version. – Constantin Galbenu May 25 '17 at 08:36
  • Ok, I think I have a clearer vision on how to handle this. Now last question, what is the role of the event bus in the end ? If the aggregates are updated synchronously ? – Louis F. May 25 '17 at 08:44
  • You shouldn't need an event bus. You need some mechanism to notify the Event handlers (Projections, Read models, Sagas etc) about new events, and maybe there you could use it. If commands are handled async you need a `Command bus`. – Constantin Galbenu May 25 '17 at 08:46
  • Ok but maybe I have 5 different representations of my model, connected to 5 different databases, therefore I should do this asynchronously. But I understood: publish to the event store, and then publish to the event bus ! No validation reading the aggregated views (updated through my aggregate handlers, reading from my event bus) – Louis F. May 25 '17 at 10:17
  • 1
    Yes, you could use a RabbitMQ or whatever channel you want to send the events to the read models asynchronously but only after you persist them to the event store. Indeed, no event validation is done after they are persisted: the events represent facts that happened; a read model may or may not like that something has happened but it cannot change the history. – Constantin Galbenu May 25 '17 at 10:22
  • yes ! I think I understood everything ! It is much clearer ! Thanks a lot for this eye opening conversation ! – Louis F. May 25 '17 at 10:24
  • @ConstantinGalbenu In a use case wherein collision is high, would you say pessimistic locking would be the OK? – froi Oct 29 '19 at 19:42
  • @froi I would say it's no more bad than pesimistic locking, at least. – Constantin Galbenu Oct 30 '19 at 01:48
2

I sketched my rough understanding on how an ES / CQRS app should look like contextualized to a simplified banking use case (withdrawing money).

Close. The problem is that the logic for updating your "aggregate" is in a weird place.

The more usual implementation is that the data model that your command handler keeps in memory, and the stream of events in the event store are kept synchronized.

An easy example to describe is the case where the command handler makes synchronous writes to the event store, and updates its local copy of the model if the connection to the event store indicates that the write succeeded.

If the command handler needs to resynchronize with the event store (because its internal model doesn't match that of the store), it does so by loading the history from the store and rebuilding its own internal state.

In other words, arrows 2&3 (if present) would normally be connected to the event store, not to an aggregate store.

put the aggregate version id along with the event in the event store so if there is a version mismatch upon modification, nothing happens

Variations of this are the usual case -- rather than appending to the stream in the event stream, we typically PUT to a specific location in the stream; if that operation is incompatible with the state of the store, the write fails, and the service can choose the appropriate failure mode (fail to client, retry, merge....). Using idempotent writes solves a number of problems in distributed messaging, but of course it does require having a store that supports an idempotent write.

VoiceOfUnreason
  • 32,131
  • 2
  • 42
  • 79
  • Hmm I think I misunderstood the event store component then. I thought that everything should go through it and gets streamed. What if my event store is a kafka and is readonly ? I cannot afford on step 2 and 3 to go through all the messages again. It seems that overall my vision matched this one: https://medium.com/technology-learning/event-sourcing-and-cqrs-a-look-at-kafka-e0c1b90d17d8 – Louis F. May 24 '17 at 20:02