how-cqrs-and-eventsourcing-work

How CQRS & Event Sourcing Work Using a Nestjs Module

published at 14.11.2024 by Yaroslav Pavliuk

Hello there!

My name is Yaroslav, and in this article, I'm sharing with you my small investigation on the @nestjs/cqrs module. At EasyLayer, we use CQRS together with Event Sourcing.


This article isn't a full guide on CQRS or Event Sourcing. We'll quickly look at what these are and why they're used. I'll focus mainly on how the Nestjs CQRS module works and what tools it offers for using CQRS and Event Sourcing.


Table of Contents:


  • Concept of @nestjs/cqrs.
  • What are CQRS and Event Sourcing?
  • How does the @nestjs/cqrs module work? Example with a block diagram;
  • Command workflow with a sequence diagram;
  • Query workflow with a sequence diagram;
  • Conclusion;


Concept of @nestjs/cqrs

The @nestjs/cqrs package is designed more as a tool rather than a complete framework for CQRS. This means it can be extended and modified to fit the needs of your project. The package takes full advantage of reflect-metadata and rxjs, making it very powerful and flexible.


reflect-metadata allows for handling metadata in TypeScript, simplifying the creation and use of decorators for dependency injection and class behavior management.


rxjs provides asynchronous data handling through publishers and subscribers. This approach effectively manages data streams and events, which is crucial in a CQRS architecture.


What are CQRS and Event Sourcing?

CQRS is an architectural pattern that separates read and write operations in an application. It emphasises that methods should only return a value if they do not cause side effects (for example, do not change the state of an object or external variables). This principle is called Command Query Responsibility Segregation (CQRS).


CQRS separates the logic of handling commands (actions that change the state) from queries (read operations). This allows for separately testing, optimising, and scaling read and write infrastructures, which is critical for large and complex systems.


Often, Event Sourcing is used alongside CQRS. Without going into deep detail, Event Sourcing plays a key role in ensuring transactionality in the CQRS architecture, but it is not the only solution.


Instead of repeating what you can find online, check out a detailed document on CQRS by Greg Young, a CQRS guide from Microsoft, and info on Event Sourcing at microservices.io.


Now, let’s talk about the @nestjs/cqrs module.


How does the @nestjs/cqrs module work? Example with a block diagram

I have a great repository that's set up as a sandbox where I've already configured everything for testing and exploring this module, so go ahead and download it now. I specifically included the @nestjs/cqrs module and its source code so you can debug and study it.


cqrs-block-diagram


I will use class and interface names from the @nestjs/cqrs module.


As shown in the diagram, we divide the system into two parts: the state change flow and the data query flow, which are managed by the ICommand and IQuery interfaces.


Commands (ICommand) represent intentions to change the system's state; these are objects that encapsulate both action and context.


The command flow in the @nestjs/cqrs module is detailed with the use of CommandBus, which acts as a central mechanism for dispatching commands. CommandBus serves as a coordinator, directing commands to the appropriate CommandHandlers—these handlers implement the ICommandHandler interface and then activate the AggregateRoot model, which is responsible for executing the business logic and maintaining the system's state.


Often, AggregateRoot is considered a domain in the context of Domain-Driven Design (DDD), but essentially, it's just a model representing the state of your system.


The result of a successful state update is an IEvent. In Event Sourcing, a crucial point is that a single event changes the state of an aggregate and is recorded in the database. After processing a command, it can generate one or more events that record the changes that occurred. These events are saved in a Write-database, which functions as an Event Store. The state of the aggregate is changed only if the event is successfully recorded in the database, thus ensuring the persistence of the system's state changes.


Successful saving of events in the Event Store leads to their publication. This is where the EventBus and the publisher mechanism come into play. The EventBus distributes events among different parts of the system. The published events are then asynchronously intercepted by EventHandlers. These event handlers are responsible for updating the Read-database, ensuring that data is up-to-date for read operations.


In the CQRS architecture implemented through the @nestjs/cqrs module, the Query flow differs from the command flow because it focuses exclusively on reading data without making changes to the system's state. The main component here is the QueryBus, which is similar to the CommandBus but is used for handling queries.


The IQuery interface defines the format of a query. Each query describes a specific set of data that needs to be retrieved from the system. Classes that implement the IQueryHandler interface are responsible for handling these queries. These handlers receive queries through the QueryBus and carry out data retrieval operations.


The QueryBus acts as a mediator, receiving queries and directing them to the appropriate handlers. This ensures a clear encapsulation of query logic and its distribution within the system.


An important aspect of the query flow is that all read operations are performed with the Read-database. This provides high performance and scalability for read operations, as they do not affect the main database where writing occurs.


Command workflow with a sequence diagram

Now, let's move deeper into what happens when a client makes a request to execute a command. We will examine each of the main classes involved in this flow. 


command-workflow-with-sequence-diagram


Suppose a client makes a REST request to the server. The request in the Controller initiates the action CommandBus.execute(command: ICommand).


class CommandBus extends ObservableBus {
  register(handlers: ICommandHandler[]): void;
  execute(command: T): Promise;
}


Under the hood, CommandBus has two main methods:


  • register(handlers: ICommandHandler[]) - This method registers all ICommandHandlers in the system at module start-up. Using the @CommandHandler() decorator and the reflect-metadata library, it links handlers to specific commands.


First, the @CommandHandler decorator adds the annotation __commandHandler__ to the handler's metadata and the annotation __command__ to the ICommand of this handler.


const CommandHandler = (command) => {
  return (target) => {
    if (!Reflect.hasOwnMetadata('__command__', command)) {
      Reflect.defineMetadata('__command__', { id: (0, uuid_1.v4)() }, command);
    }
    Reflect.defineMetadata('__commandHandler__', command, target);
  };
};


Second, the decorator adds a unique identifier metadata.id for the command specified in the decorator and links the specific command to the specific handler using this id. This allows two different handlers to handle the same command. In CommandBus, these links are stored in a Map() structure, where the key is the automatically generated metadata.id = uuid() for a specific command, and the value in this structure is an instance of a specific ICommandHandler class extracted from the Dependency Injection (DI) container.


  • execute(command: ICommand) - This method performs two important actions:


  execute(command) {
	…
    this._publisher.publish(command);
    return handler.execute(command);
  }


It first asynchronously publishes the command using the built-in ICommandPublisher. What does publish mean? — CommandBus inherits from ObservableBus, and it has a variable this.subject$ which is a Subject from rxjs, allowing system components to subscribe to it and asynchronously process commands. 


import { Observable, Subject } from 'rxjs';
class ObservableBus extends Observable {
  protected _subject$: Subject;
  get subject$(): Subject;
}


By sending commands through Subject.next(), CommandBus allows the system to respond to command events in real-time without waiting for responses. This reactive ability ensures that commands can trigger further actions or responses immediately as they occur.


Currently, in @nestjs/cqrs, there are no built-in mechanisms for subscribing to command events, although such features might be introduced in the future or could be implemented by developers to enhance functionality.


Then, the method calls handler.execute(command) on the appropriate handler. It's important to note that handler.execute() returns a promise, which allows the CommandBus to wait for the processing of the command result or an error.


The work of ICommandHandler involves an asynchronous method async execute(command: ICommand). It's expected that ICommandHandler will perform several actions:


  • Change the State of AggregateRoot: First, it should modify the state of the AggregateRoot model. If the model was previously created, we try to restore it from the write database, or create a new one. Importantly, AggregateRoot is a model and it does not depend on services.


abstract class AggregateRoot {
  apply(event: T, isFromHistory?: boolean): void;
}


In @nestjs/cqrs, there is a mechanism that allows us to merge event publication methods into the model instance. More on this later.


  • Save State Changes to the Write Database: In Event Sourcing, each change in state is considered a separate event and is stored in the Event Store. 


@nestjs/cqrs does not provide an implementation for a write storage; this is left to the developer's discretion as there can be many use cases.


  • Publish an Event about the Successful State Change: This is managed by the AggregateRoot model itself and its commit() method. Under the hood, this method retrieves all unpublished events added to the model and publishes them on the EventBus. An important note is that the commit() method in the @nestjs/cqrs implementation is synchronous; once commit() is executed, the command handler completes and a response is sent to the user.


How does the AggregateRoot model publish an event on the EventBus if it's just a model and has no dependencies on services?


In @nestjs/cqrs, the merging of objects is used where methods such as publish() and publishAll() from the EventBus are merged into the AggregateRoot model using the EventPublisher class. This allows the model to be self-contained and independent.


The EventPublisher class is simply a service, does not have an interface, and should not be confused with IEventPublisher.


export declare class EventPublisher {
  private readonly eventBus;
  constructor(eventBus: EventBus< IEvent >);
  mergeClassContext>>(metatype: T): T;
  mergeObjectContext>(object: T): T;
}


Now, let's understand what the publish method does and how the EventBus works under the hood. Like the CommandBus, the EventBus also inherits from ObservableBus and has a Subject stream from rxjs, which acts as both an observer and an observable. This means it can generate events and we can subscribe to it.


class EventBus extends ObservableBus< IEvent> implements IEventBus< IEvent > {
  private _publisher: IEventPublisher;

  publish(event, context) {
    return this._publisher.publish(event, context);
  }
  publishAll(events, context) {
    if (this._publisher.publishAll) {
      return this._publisher.publishAll(events, context);
    }
    return (events || []).map((event) => this._publisher.publish(event, context));
  }
}


When the publish method is called on the EventBus, it executes subject.next(event). This is the primary action of the publication mechanism, where the provided event is placed into the event stream stored in the Subject. All subscribers to this topic will immediately receive the new event.


interface IEventPublisher {
  publish(event: IEvent, context?: unknown): any;
  publishAll?(events: IEvent[], context?: unknown): any;
}


The IEventPublisher interface typically provides a level of abstraction for publishing events. It defines how events are sent to the EventBus. This might include additional logic, such as event transformation or preprocessing before calling subject.next(event). Essentially, IEventPublisher ensures the consistency of the event publication process and encapsulates all necessary operations that must occur before the actual generation of the event.


At the start of the module, event handlers are registered. In the @nestjs/cqrs module, there are two types: IEventHandler and Saga, handled by the methods register(handler: IEventHandler[]) and registerSagas(funcs: Function[]).


class EventBus {
  register(handlers: IEventHandler[]): void;
  registerSagas(funcs: Function[]): void;
}


Event Handlers Registration:


import { from, Observable } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

class EventBus extends ObservableBus {
  private subscriptions = [];

  register(handlers = []) {
    handlers.forEach(handler => this.registerHandler(handler));
  }

  registerHandler(handler) {
    const events = this.reflectEvents(handler);
    events.forEach(event => this.bind(handler, event));
  }

  reflectEvents(handler) {
    return Reflect.getMetadata('events', handler) || [];
  }

  bind(handler, event) {
    const stream$ = from([event]);
    const subscription = stream$
      .pipe(mergeMap(event => Promise.resolve(handler.handle(event))))
      .subscribe();

    this.subscriptions.push(subscription);
  }
}


  1. Event handlers are registered using the @EventHandler() decorator. Like command handlers, this decorator generates metadata including annotations __eventHandler__ and __event__, assigning an identifier to each event and linking it to one or more handlers. It's crucial to understand that a single event can trigger multiple handlers, or multiple events can be handled by a single handler.
  2. Handlers are registered by subscribing to the stream from IEventPublisher, and these subscriptions are stored in an array in EventBus.subscriptions = [].
  3. Events are processed through mergeMap() from rxjs, meaning all event handlers run in parallel. This ensures that event processing is efficient and non-blocking.
  4. The handle(event) method is invoked during processing, which is then wrapped in a forced Promise.resolve(handler.handle(event)). This likely accommodates handlers that may be inherently synchronous, allowing them to be seamlessly integrated into the asynchronous event processing system.
  5. In case of an error in the stream, the error goes to a special class UnhandledExceptionBus. We'll look at this later, but it's important to understand that catching a handler's error cannot be done synchronously.
  6. Thus, we asynchronously invoke event handlers and can maintain projections in the read database. The read database stores projections, which are denormalised views of data optimised for read operations.


Sagas Registration:


import { from, Observable, of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

class EventBus extends ObservableBus {
  private subscriptions = [];
  private commandBus = new CommandBus();

  registerSagas(sagas = []) {
    sagas.forEach(saga => this.registerSaga(saga));
  }

  registerSaga(saga) {
    const events = this.reflectEvents(saga);
    events.forEach(event => this.bind(saga, event));
  }

  reflectEvents(handler) {
    return Reflect.getMetadata('sagas', handler) || [];
  }

  bind(saga, event) {
    const stream$ = from([event]);
    const subscription = stream$
      .pipe(mergeMap(command => defer(() => this.commandBus.execute(command))))
      .subscribe();

    this.subscriptions.push(subscription);
  }
}


  1. Simplified with the @Saga() decorator, which is applied to a method rather than an entire class. This decorator adds the metadata __saga__ to the method and links it with a specific event, similar to IEventHandler, supporting subscription. It also uses mergeMap() from rxjs.
  2. The main difference from IEventHandler is that Sagas operate through CommandBus, invoking the CommandBus.execute(command) method with the command returned by the method subscribed to the Saga. The error detection mechanism is the same as in IEventHandler, where errors are handled using UnhandledExceptionBus.
  3. Sagas act as part of the write model and can trigger commands, while IEventHandlers are part of the read database and projections processing.


Note that I am only discussing how the current implementation of the @nestjs/cqrs module is structured, not providing solutions to problems. Some aspects might have been changed or improved in this module, so I plan to write a few articles on how this module can be enhanced or altered.


UnhandledExceptionBus, mentioned earlier, is where errors from both IEventHandler and Saga are directed. 


class UnhandledExceptionBus extends ObservableBus {}


It's important to note that both types of handlers use the same error handler. 


UnhandledExceptionBus also inherits from the ObservableBus class, like other buses, and uses the rxjs subject stream. This allows for manual subscription to monitor errors from specific handlers.


Query workflow with a sequence diagram

Now let's consider the Query request flow.


query workflow with sequence diagram


Queries are operations that do not alter the state but are designed to retrieve projections from the read database and return them to the client. Queries are also executed via QueryBus, whose structure is similar to CommandBus.


class QueryBus extends ObservableBus {
  register(handlers: IQueryHandler[]): void;
  execute(query: T): Promise;
}


A key difference is that it uses its own decorator for handlers, @QueryHandler(), which assigns the metadata __queryHandler__ to the handler and __query__ to the query object itself. QueryBus has similar methods to CommandBus and operates in a similar way through the QueryBus.execute() method, which publishes an event in the subject publisher and returns the result of the execution method. This method also allows us to await the result or detect an error.


  async execute(query: IQuery) {
    this._publisher.publish(query);
    const result = await handler.execute(query);
    return result;
  }


Conclusion

This exploration into the @nestjs/cqrs module highlights the architectural advantages of CQRS, such as improved performance and scalability due to the separation of concerns. Furthermore, it facilitates a cleaner design by segregating the command and query responsibilities, which aligns well with principles like Single Responsibility and Event Sourcing. As developers and architects consider implementing CQRS, understanding these nuances helps in creating systems that are robust, maintainable, and adaptable to change.


I hope this has given you a clear understanding of how @nestjs/cqrs works in detail.


GitHub repository - https://github.com/EasyLayer/cqrs-poc

Discussion - https://github.com/EasyLayer/core/discussions/2