fix: unit of work now produces a correlationId to solve issue with using the same queryRunner for every request

This commit is contained in:
user
2021-09-19 19:15:17 +02:00
parent 6646099455
commit e89c5b8081
17 changed files with 221 additions and 125 deletions

View File

@@ -260,7 +260,7 @@ Some CQS purists may say that a `Command` shouldn't return anything at all. But
Though, violating this rule and returning some metadata, like `ID` of a created item, redirect link, confirmation message, status, or other metadata is a more practical approach than following dogmas.
All changes done by `Commands` (or by events or anything else) across multiple aggregates should be saved in a single database transaction (if you are using a single database). This means that inside a single process, one command/request to your application usually should execute **only one** [transactional operation](https://en.wikipedia.org/wiki/Database_transaction) to save **all** changes (or cancel **all** changes of that command/request in case if something fails). This should be done to maintain consistency. To do that something like [Unit of Work](https://www.c-sharpcorner.com/UploadFile/b1df45/unit-of-work-in-repository-pattern/) or similar patterns can be used. Example: [create-user.service.ts](src/modules/user/commands/create-user/create-user.service.ts) - notice how everything is wrapped in a UnitOfWork method `CreateUserUoW.execute(...)`.
All changes done by `Commands` (or by events or anything else) across multiple aggregates should be saved in a single database transaction (if you are using a single database). This means that inside a single process, one command/request to your application usually should execute **only one** [transactional operation](https://en.wikipedia.org/wiki/Database_transaction) to save **all** changes (or cancel **all** changes of that command/request in case if something fails). This should be done to maintain consistency. To do that something like [Unit of Work](https://www.c-sharpcorner.com/UploadFile/b1df45/unit-of-work-in-repository-pattern/) or similar patterns can be used. Example: [create-user.service.ts](src/modules/user/commands/create-user/create-user.service.ts) - notice how everything is wrapped in a UnitOfWork method `UnitOfWork.execute(...)`.
**Note**: `Command` is not the same as [Command Pattern](https://refactoring.guru/design-patterns/command), it is just a convenient name to represent that this object executes some state-changing action. Both `Commands` and `Queries` in this example are just simple objects that carry data between layers.
@@ -1025,6 +1025,10 @@ Example tools:
- [k6](https://github.com/grafana/k6)
- [Artillery](https://www.npmjs.com/package/artillery) is a load testing tool based on NodeJS.
Example files:
- [create-user.artillery.yaml](tests/user/create-user/create-user.artillery.yaml) - Artillery load testing config file. Also can be useful for seeding database with dummy data.
More info:
- [Top 6 Tools for API & Load Testing](https://medium.com/@Dickson_Mwendia/top-6-tools-for-api-load-testing-7ff51d1ac1e8).

5
package-lock.json generated
View File

@@ -9831,6 +9831,11 @@
"thenify-all": "^1.0.0"
}
},
"nanoid": {
"version": "3.1.25",
"resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.1.25.tgz",
"integrity": "sha512-rdwtIXaXCLFAQbnfqDRnI6jaRHp9fTcYBjtFKE8eezcZ7LuLjhUaQGNeMXf1HmRoCH32CLz6XwX0TtxEOS/A3Q=="
},
"nanomatch": {
"version": "1.2.13",
"resolved": "https://registry.npmjs.org/nanomatch/-/nanomatch-1.2.13.tgz",

View File

@@ -37,6 +37,7 @@
"class-transformer": "^0.3.1",
"class-validator": "^0.12.2",
"dotenv": "^8.2.0",
"nanoid": "^3.1.25",
"nest-event": "^1.0.8",
"nestjs-console": "^7.0.0",
"pg": "^8.5.1",

View File

@@ -12,5 +12,6 @@ export const typeormConfig: TypeOrmModuleOptions = {
database: process.env.DB_NAME,
entities: [],
autoLoadEntities: true,
connectTimeoutMS: 2000,
logging: ['error', 'migration', 'schema'],
};

View File

@@ -0,0 +1,26 @@
import { UnitOfWorkOrm } from '@src/libs/ddd/infrastructure/database/base-classes/unit-of-work-orm';
import { UserOrmEntity } from '@modules/user/database/user.orm-entity';
import { UserRepository } from '@modules/user/database/user.repository';
import { WalletOrmEntity } from '@modules/wallet/database/wallet.orm-entity';
import { WalletRepository } from '@modules/wallet/database/wallet.repository';
export class UnitOfWork extends UnitOfWorkOrm {
// Add new repositories below to use this generic UnitOfWork
init(): string {
return UnitOfWork.init();
}
// Convert TypeOrm Repository to a Domain Repository
getUserRepository(correlationId: string): UserRepository {
return new UserRepository(
UnitOfWork.getOrmRepository(UserOrmEntity, correlationId),
).setCorrelationId(correlationId);
}
getWalletRepository(correlationId: string): WalletRepository {
return new WalletRepository(
UnitOfWork.getOrmRepository(WalletOrmEntity, correlationId),
).setCorrelationId(correlationId);
}
}

View File

@@ -1,15 +0,0 @@
import { UnitOfWork } from '@libs/ddd/infrastructure/database/base-classes/unit-of-work.base';
import { UserOrmEntity } from '@modules/user/database/user.orm-entity';
import { UserRepository } from '@modules/user/database/user.repository';
import { WalletOrmEntity } from '@modules/wallet/database/wallet.orm-entity';
import { WalletRepository } from '@modules/wallet/database/wallet.repository';
export class CreateUserUoW extends UnitOfWork {
static getUserRepository(): UserRepository {
return new UserRepository(this.getOrmRepository(UserOrmEntity));
}
static getWalletRepository(): WalletRepository {
return new WalletRepository(this.getOrmRepository(WalletOrmEntity));
}
}

View File

@@ -2,5 +2,6 @@ export abstract class DomainEvent {
constructor(
public readonly aggregateId: string,
public readonly dateOccurred: number,
public correlationId?: string,
) {}
}

View File

@@ -38,7 +38,11 @@ export class DomainEvents {
}
}
public static async publishEvents(id: ID, logger: Logger): Promise<void> {
public static async publishEvents(
id: ID,
logger: Logger,
correlationId?: string,
): Promise<void> {
const aggregate = this.findAggregateByID(id);
if (aggregate) {
@@ -47,6 +51,10 @@ export class DomainEvents {
logger.debug(
`[Domain Event published]: ${event.constructor.name} ${aggregate.id.value}`,
);
if (correlationId && !event.correlationId) {
// eslint-disable-next-line no-param-reassign
event.correlationId = correlationId;
}
return this.publish(event);
}),
);

View File

@@ -3,4 +3,5 @@ export interface Logger {
error(message: string, trace?: unknown, ...meta: unknown[]): void;
warn(message: string, ...meta: unknown[]): void;
debug(message: string, ...meta: unknown[]): void;
setContext(context: string): void;
}

View File

@@ -40,7 +40,11 @@ export abstract class TypeormRepositoryBase<
async save(entity: Entity): Promise<Entity> {
const ormEntity = this.mapper.toOrmEntity(entity);
const result = await this.repository.save(ormEntity);
await DomainEvents.publishEvents(entity.id, this.logger);
await DomainEvents.publishEvents(
entity.id,
this.logger,
this.correlationId,
);
this.logger.debug(
`[Entity persisted]: ${this.tableName} ${entity.id.value}`,
);
@@ -52,7 +56,7 @@ export abstract class TypeormRepositoryBase<
const result = await this.repository.save(ormEntities);
await Promise.all(
entities.map(entity =>
DomainEvents.publishEvents(entity.id, this.logger),
DomainEvents.publishEvents(entity.id, this.logger, this.correlationId),
),
);
this.logger.debug(
@@ -126,8 +130,28 @@ export abstract class TypeormRepositoryBase<
async delete(entity: Entity): Promise<Entity> {
await this.repository.remove(this.mapper.toOrmEntity(entity));
await DomainEvents.publishEvents(entity.id, this.logger);
await DomainEvents.publishEvents(
entity.id,
this.logger,
this.correlationId,
);
this.logger.debug(`[Entity deleted]: ${this.tableName} ${entity.id.value}`);
return entity;
}
protected correlationId?: string;
setCorrelationId(correlationId: string): this {
this.correlationId = correlationId;
this.setContext();
return this;
}
private setContext() {
if (this.correlationId) {
this.logger.setContext(`${this.constructor.name}:${this.correlationId}`);
} else {
this.logger.setContext(this.constructor.name);
}
}
}

View File

@@ -0,0 +1,75 @@
import { Logger } from '@nestjs/common';
import { EntityTarget, getConnection, QueryRunner, Repository } from 'typeorm';
import { nanoid } from 'nanoid';
export class UnitOfWorkOrm {
private static queryRunners: Map<string, QueryRunner> = new Map();
/**
* Create a connection pool and get its ID.
* ID is used for correlation purposes (to use a correct query runner, correlate logs etc)
*/
static init(): string {
const queryRunner = getConnection().createQueryRunner();
const correlationId = nanoid(8);
this.queryRunners.set(correlationId, queryRunner);
return correlationId;
}
static getQueryRunner(correlationId: string): QueryRunner {
const queryRunner = this.queryRunners.get(correlationId);
if (!queryRunner) {
throw new Error(
'Query runner not found. UnitOfWork must be initiated first. Use "UnitOfWork.init()" method.',
);
}
return queryRunner;
}
static getOrmRepository<Entity>(
entity: EntityTarget<Entity>,
correlationId: string,
): Repository<Entity> {
const queryRunner = this.getQueryRunner(correlationId);
return queryRunner.manager.getRepository(entity);
}
/**
* Execute a UnitOfWork.
* Database operations wrapped in a UnitOfWork will execute in a single
* transactional operation, so everything gets saved or nothing.
* Make sure to generate and inject correct repositories for this to work.
*/
static async execute<T>(
correlationId: string,
callback: () => Promise<T>,
): Promise<T> {
const logger = new Logger(`${this.name}:${correlationId}`);
logger.debug(`[Starting transaction]`);
const queryRunner = this.getQueryRunner(correlationId);
await queryRunner.startTransaction();
let result: T;
try {
result = await callback();
} catch (error) {
await queryRunner.rollbackTransaction();
logger.debug(`[Error]: ${error.message}`);
logger.debug(`[Transaction rolled back]`);
await this.finish(correlationId);
throw error;
}
await queryRunner.commitTransaction();
await this.finish(correlationId);
logger.debug(`[Transaction committed]`);
return result;
}
static async finish(correlationId: string): Promise<void> {
const queryRunner = this.getQueryRunner(correlationId);
queryRunner.release();
this.queryRunners.delete(correlationId);
}
}

View File

@@ -1,60 +0,0 @@
import { Logger } from '@nestjs/common';
import { EntityTarget, getConnection, QueryRunner, Repository } from 'typeorm';
export abstract class UnitOfWork {
private static queryRunner: QueryRunner;
static init(): void {
if (!this.queryRunner) {
this.queryRunner = getConnection().createQueryRunner();
}
}
static getQueryRunner(): QueryRunner {
this.validate();
return this.queryRunner;
}
static getOrmRepository<Entity>(
entity: EntityTarget<Entity>,
): Repository<Entity> {
this.validate();
return this.queryRunner.manager.getRepository(entity);
}
/**
* Execute a UnitOfWork.
* Database operations wrapped in a UnitOfWork will execute in a single
* transactional operation, so everything gets saved or nothing.
* Make sure to generate and inject correct repositories for this to work.
*/
static async execute<T>(callback: () => Promise<T>): Promise<T> {
this.validate();
const logger = new Logger(this.name);
logger.debug('Starting transaction');
await this.queryRunner.startTransaction();
let result: T;
try {
result = await callback();
} catch (error) {
await this.queryRunner.rollbackTransaction();
logger.debug(`Error: ${error.message}`);
logger.debug('Transaction rolled back');
throw error;
}
await this.queryRunner.commitTransaction();
logger.debug('Transaction committed');
return result;
}
private static validate(): void {
if (!this.queryRunner) {
throw new Error(
'UnitOfWork must be initiated. Use "UnitOfWork.init()" method first.',
);
}
}
}

View File

@@ -3,38 +3,46 @@ import { UserRepositoryPort } from '@modules/user/database/user.repository.port'
import { ConflictException } from '@libs/exceptions';
import { Address } from '@modules/user/domain/value-objects/address.value-object';
import { Email } from '@modules/user/domain/value-objects/email.value-object';
import { CreateUserUoW } from '@src/infrastructure/database/units-of-work';
import { UnitOfWork } from '@src/infrastructure/database/unit-of-work';
import { CreateUserCommand } from './create-user.command';
import { UserEntity } from '../../domain/entities/user.entity';
export class CreateUserService {
constructor(
// no direct dependency on a repository, instead depends on a port
private readonly userRepo: UserRepositoryPort,
) {}
constructor(private readonly unitOfWork: UnitOfWork) {}
async create(
command: CreateUserCommand,
userRepo: UserRepositoryPort,
): Promise<ID> {
// user uniqueness guard
if (await userRepo.exists(command.email)) {
throw new ConflictException('User already exists');
}
const user = UserEntity.create({
email: new Email(command.email),
address: new Address({
country: command.country,
postalCode: command.postalCode,
street: command.street,
}),
});
user.someBusinessLogic();
const created = await userRepo.save(user);
return created.id;
}
async execute(command: CreateUserCommand): Promise<ID> {
// Wrapping everything in a UnitOfWork so events get included in a transaction
return CreateUserUoW.execute(async () => {
// user uniqueness guard
if (await this.userRepo.exists(command.email)) {
throw new ConflictException('User already exists');
}
const user = UserEntity.create({
email: new Email(command.email),
address: new Address({
country: command.country,
postalCode: command.postalCode,
street: command.street,
}),
});
user.someBusinessLogic();
const created = await this.userRepo.save(user);
return created.id;
});
const correlationId = this.unitOfWork.init();
const userRepo: UserRepositoryPort = this.unitOfWork.getUserRepository(
correlationId,
);
// Wrapping user creation in a UnitOfWork so events get included in a transaction
return UnitOfWork.execute(correlationId, async () =>
this.create(command, userRepo),
);
}
}

View File

@@ -1,5 +1,5 @@
import { Logger, Provider } from '@nestjs/common';
import { CreateUserUoW } from '@src/infrastructure/database/units-of-work';
import { UnitOfWork } from '@src/infrastructure/database/unit-of-work';
import { UserRepository } from './database/user.repository';
import { CreateUserService } from './commands/create-user/create-user.service';
import { DeleteUserService } from './commands/delete-user/delete-user.service';
@@ -17,10 +17,7 @@ export const createUserSymbol = Symbol('createUser');
export const createUserProvider: Provider = {
provide: createUserSymbol,
useFactory: (): CreateUserService => {
// Initiating UnitOfWork and injecting a transactional repository
CreateUserUoW.init();
const userRepo = CreateUserUoW.getUserRepository();
return new CreateUserService(userRepo);
return new CreateUserService(new UnitOfWork());
},
inject: [UserRepository],
};

View File

@@ -2,18 +2,22 @@ import { UserCreatedDomainEvent } from '@modules/user/domain/events/user-created
import { WalletRepositoryPort } from '@modules/wallet/database/wallet.repository.port';
import { DomainEventHandler } from '@libs/ddd/domain/domain-events';
import { UUID } from '@libs/ddd/domain/value-objects/uuid.value-object';
import { UnitOfWork } from '@src/infrastructure/database/unit-of-work';
import { WalletEntity } from '../../domain/entities/wallet.entity';
export class CreateWalletWhenUserIsCreatedDomainEventHandler extends DomainEventHandler {
constructor(private readonly walletRepo: WalletRepositoryPort) {
constructor(private readonly unitOfWork: UnitOfWork) {
super(UserCreatedDomainEvent);
}
// Do changes to other aggregates or prepare Integration Event for dispatching.
async handle(event: UserCreatedDomainEvent): Promise<void> {
const walletRepo: WalletRepositoryPort = this.unitOfWork.getWalletRepository(
event.correlationId as string,
);
const wallet = WalletEntity.create({
userId: new UUID(event.aggregateId),
});
await this.walletRepo.save(wallet);
await walletRepo.save(wallet);
}
}

View File

@@ -1,23 +1,14 @@
import { Provider } from '@nestjs/common';
import { CreateUserUoW } from '@src/infrastructure/database/units-of-work';
import { WalletRepository } from './database/wallet.repository';
import { UnitOfWork } from '@src/infrastructure/database/unit-of-work';
import { CreateWalletWhenUserIsCreatedDomainEventHandler } from './application/event-handlers/create-wallet-when-user-is-created.domain-event-handler';
export const createWalletWhenUserIsCreatedProvider: Provider = {
provide: CreateWalletWhenUserIsCreatedDomainEventHandler,
useFactory: (): CreateWalletWhenUserIsCreatedDomainEventHandler => {
/**
* Creating event handler with a transactional repository
* provided by a UnitOfWork so all the changes across the domain
* are saved in a single transaction (or rolled back in case of failure).
*/
CreateUserUoW.init();
const walletRepo = CreateUserUoW.getWalletRepository();
const eventHandler = new CreateWalletWhenUserIsCreatedDomainEventHandler(
walletRepo,
new UnitOfWork(),
);
eventHandler.listen();
return eventHandler;
},
inject: [WalletRepository],
};

View File

@@ -0,0 +1,25 @@
# Load testing with Artillery.
# Can also be good for seeding database with lots of dummy data.
# https://www.npmjs.com/package/artillery
# https://www.npmjs.com/package/artillery-plugin-faker
config:
target: http://localhost:3000
phases:
- duration: 1
arrivalRate: 50
plugins:
faker:
locale: en
variables:
email: '$faker.internet.email'
country: '$faker.address.country'
street: '$faker.address.streetName'
scenarios:
- flow:
- post:
url: '/users'
json:
email: '{{ email }}'
country: '{{ country }}'
postalCode: '12345'
street: '{{ street }}'