refactor: infrastructure consolidation
This commit is contained in:
commit
058ed4765f
97
apps/bff/src/infra/audit/audit-log.service.ts
Normal file
97
apps/bff/src/infra/audit/audit-log.service.ts
Normal file
@ -0,0 +1,97 @@
|
||||
import { Injectable, Inject } from "@nestjs/common";
|
||||
import { Prisma, AuditAction } from "@prisma/client";
|
||||
import { PrismaService } from "../database/prisma.service.js";
|
||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||
import { Logger } from "nestjs-pino";
|
||||
import { extractClientIp, extractUserAgent } from "@bff/core/http/request-context.util.js";
|
||||
|
||||
// Re-export AuditAction from Prisma for consumers
|
||||
export { AuditAction } from "@prisma/client";
|
||||
|
||||
export interface AuditLogData {
|
||||
userId?: string | undefined;
|
||||
action: AuditAction;
|
||||
resource?: string | undefined;
|
||||
details?: Record<string, unknown> | string | number | boolean | null | undefined;
|
||||
ipAddress?: string | undefined;
|
||||
userAgent?: string | undefined;
|
||||
success?: boolean | undefined;
|
||||
error?: string | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimal request shape for audit logging.
|
||||
* Compatible with Express Request but only requires the fields needed for IP/UA extraction.
|
||||
* Must be compatible with RequestContextLike from request-context.util.ts.
|
||||
*/
|
||||
export type AuditRequest = {
|
||||
headers?: Record<string, string | string[] | undefined> | undefined;
|
||||
ip?: string | undefined;
|
||||
connection?: { remoteAddress?: string | undefined } | undefined;
|
||||
socket?: { remoteAddress?: string | undefined } | undefined;
|
||||
};
|
||||
|
||||
/**
|
||||
* Service for writing audit log entries.
|
||||
*
|
||||
* Handles all audit write operations: creating log entries and
|
||||
* logging authentication events with request context extraction.
|
||||
*/
|
||||
@Injectable()
|
||||
export class AuditLogService {
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
@Inject(Logger) private readonly logger: Logger
|
||||
) {}
|
||||
|
||||
async log(data: AuditLogData): Promise<void> {
|
||||
try {
|
||||
const createData: Parameters<typeof this.prisma.auditLog.create>[0]["data"] = {
|
||||
action: data.action,
|
||||
success: data.success ?? true,
|
||||
};
|
||||
|
||||
if (data.userId !== undefined) createData.userId = data.userId;
|
||||
if (data.resource !== undefined) createData.resource = data.resource;
|
||||
if (data.ipAddress !== undefined) createData.ipAddress = data.ipAddress;
|
||||
if (data.userAgent !== undefined) createData.userAgent = data.userAgent;
|
||||
if (data.error !== undefined) createData.error = data.error;
|
||||
if (data.details !== undefined) {
|
||||
createData.details =
|
||||
data.details === null
|
||||
? Prisma.JsonNull
|
||||
: (JSON.parse(JSON.stringify(data.details)) as Prisma.InputJsonValue);
|
||||
}
|
||||
|
||||
await this.prisma.auditLog.create({ data: createData });
|
||||
} catch (error) {
|
||||
this.logger.error("Audit logging failed", {
|
||||
errorType: error instanceof Error ? error.constructor.name : "Unknown",
|
||||
message: extractErrorMessage(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async logAuthEvent(
|
||||
action: AuditAction,
|
||||
userId?: string,
|
||||
details?: Record<string, unknown> | string | number | boolean | null,
|
||||
request?: AuditRequest,
|
||||
success: boolean = true,
|
||||
error?: string
|
||||
): Promise<void> {
|
||||
const ipAddress = extractClientIp(request);
|
||||
const userAgent = extractUserAgent(request);
|
||||
|
||||
await this.log({
|
||||
userId,
|
||||
action,
|
||||
resource: "auth",
|
||||
details,
|
||||
ipAddress,
|
||||
userAgent,
|
||||
success,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
91
apps/bff/src/infra/audit/audit-query.service.ts
Normal file
91
apps/bff/src/infra/audit/audit-query.service.ts
Normal file
@ -0,0 +1,91 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { Prisma, AuditAction } from "@prisma/client";
|
||||
import { PrismaService } from "../database/prisma.service.js";
|
||||
|
||||
/**
|
||||
* Service for reading and querying audit log entries.
|
||||
*
|
||||
* Handles all audit read operations: paginated log retrieval
|
||||
* and security analytics/statistics.
|
||||
*/
|
||||
@Injectable()
|
||||
export class AuditQueryService {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async getAuditLogs({
|
||||
page,
|
||||
limit,
|
||||
action,
|
||||
userId,
|
||||
}: {
|
||||
page: number;
|
||||
limit: number;
|
||||
action?: AuditAction;
|
||||
userId?: string;
|
||||
}) {
|
||||
const skip = (page - 1) * limit;
|
||||
const where: Prisma.AuditLogWhereInput = {};
|
||||
if (action) where.action = action;
|
||||
if (userId) where.userId = userId;
|
||||
|
||||
const [logs, total] = await Promise.all([
|
||||
this.prisma.auditLog.findMany({
|
||||
where,
|
||||
include: {
|
||||
user: {
|
||||
select: {
|
||||
id: true,
|
||||
email: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
orderBy: { createdAt: "desc" },
|
||||
skip,
|
||||
take: limit,
|
||||
}),
|
||||
this.prisma.auditLog.count({ where }),
|
||||
]);
|
||||
|
||||
return { logs, total };
|
||||
}
|
||||
|
||||
async getSecurityStats() {
|
||||
const today = new Date(new Date().setHours(0, 0, 0, 0));
|
||||
|
||||
const [totalUsers, lockedAccounts, failedLoginsToday, successfulLoginsToday] =
|
||||
await Promise.all([
|
||||
this.prisma.user.count(),
|
||||
this.prisma.user.count({
|
||||
where: {
|
||||
lockedUntil: {
|
||||
gt: new Date(),
|
||||
},
|
||||
},
|
||||
}),
|
||||
this.prisma.auditLog.count({
|
||||
where: {
|
||||
action: AuditAction.LOGIN_FAILED,
|
||||
createdAt: {
|
||||
gte: today,
|
||||
},
|
||||
},
|
||||
}),
|
||||
this.prisma.auditLog.count({
|
||||
where: {
|
||||
action: AuditAction.LOGIN_SUCCESS,
|
||||
createdAt: {
|
||||
gte: today,
|
||||
},
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
return {
|
||||
totalUsers,
|
||||
lockedAccounts,
|
||||
failedLoginsToday,
|
||||
successfulLoginsToday,
|
||||
securityEventsToday: failedLoginsToday + successfulLoginsToday,
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -1,9 +1,20 @@
|
||||
import { Global, Module } from "@nestjs/common";
|
||||
import { AuditService } from "./audit.service.js";
|
||||
import { AuditLogService } from "./audit-log.service.js";
|
||||
import { AuditQueryService } from "./audit-query.service.js";
|
||||
|
||||
/**
|
||||
* Global audit module.
|
||||
*
|
||||
* Provides:
|
||||
* - AuditLogService (write operations: log, logAuthEvent)
|
||||
* - AuditQueryService (read operations: getAuditLogs, getSecurityStats)
|
||||
*
|
||||
* Existing callers can continue to import { AuditService } from audit.service.ts,
|
||||
* which re-exports AuditLogService under the old name for backward compatibility.
|
||||
*/
|
||||
@Global()
|
||||
@Module({
|
||||
providers: [AuditService],
|
||||
exports: [AuditService],
|
||||
providers: [AuditLogService, AuditQueryService],
|
||||
exports: [AuditLogService, AuditQueryService],
|
||||
})
|
||||
export class AuditModule {}
|
||||
|
||||
@ -1,168 +1,15 @@
|
||||
import { Injectable, Inject } from "@nestjs/common";
|
||||
import { Prisma, AuditAction } from "@prisma/client";
|
||||
import { PrismaService } from "../database/prisma.service.js";
|
||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||
import { Logger } from "nestjs-pino";
|
||||
import { extractClientIp, extractUserAgent } from "@bff/core/http/request-context.util.js";
|
||||
|
||||
// Re-export AuditAction from Prisma for consumers
|
||||
export { AuditAction } from "@prisma/client";
|
||||
|
||||
export interface AuditLogData {
|
||||
userId?: string | undefined;
|
||||
action: AuditAction;
|
||||
resource?: string | undefined;
|
||||
details?: Record<string, unknown> | string | number | boolean | null | undefined;
|
||||
ipAddress?: string | undefined;
|
||||
userAgent?: string | undefined;
|
||||
success?: boolean | undefined;
|
||||
error?: string | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimal request shape for audit logging.
|
||||
* Compatible with Express Request but only requires the fields needed for IP/UA extraction.
|
||||
* Must be compatible with RequestContextLike from request-context.util.ts.
|
||||
* Backward-compatible re-exports.
|
||||
*
|
||||
* The original AuditService has been split into:
|
||||
* - AuditLogService (write operations: log, logAuthEvent)
|
||||
* - AuditQueryService (read operations: getAuditLogs, getSecurityStats)
|
||||
*
|
||||
* Existing callers that import { AuditService } will get AuditLogService
|
||||
* via the type alias below, so no import path changes are needed.
|
||||
*/
|
||||
export type AuditRequest = {
|
||||
headers?: Record<string, string | string[] | undefined> | undefined;
|
||||
ip?: string | undefined;
|
||||
connection?: { remoteAddress?: string | undefined } | undefined;
|
||||
socket?: { remoteAddress?: string | undefined } | undefined;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class AuditService {
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
@Inject(Logger) private readonly logger: Logger
|
||||
) {}
|
||||
|
||||
async log(data: AuditLogData): Promise<void> {
|
||||
try {
|
||||
const createData: Parameters<typeof this.prisma.auditLog.create>[0]["data"] = {
|
||||
action: data.action,
|
||||
success: data.success ?? true,
|
||||
};
|
||||
|
||||
if (data.userId !== undefined) createData.userId = data.userId;
|
||||
if (data.resource !== undefined) createData.resource = data.resource;
|
||||
if (data.ipAddress !== undefined) createData.ipAddress = data.ipAddress;
|
||||
if (data.userAgent !== undefined) createData.userAgent = data.userAgent;
|
||||
if (data.error !== undefined) createData.error = data.error;
|
||||
if (data.details !== undefined) {
|
||||
createData.details =
|
||||
data.details === null
|
||||
? Prisma.JsonNull
|
||||
: (JSON.parse(JSON.stringify(data.details)) as Prisma.InputJsonValue);
|
||||
}
|
||||
|
||||
await this.prisma.auditLog.create({ data: createData });
|
||||
} catch (error) {
|
||||
this.logger.error("Audit logging failed", {
|
||||
errorType: error instanceof Error ? error.constructor.name : "Unknown",
|
||||
message: extractErrorMessage(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async logAuthEvent(
|
||||
action: AuditAction,
|
||||
userId?: string,
|
||||
details?: Record<string, unknown> | string | number | boolean | null,
|
||||
request?: AuditRequest,
|
||||
success: boolean = true,
|
||||
error?: string
|
||||
): Promise<void> {
|
||||
const ipAddress = extractClientIp(request);
|
||||
const userAgent = extractUserAgent(request);
|
||||
|
||||
await this.log({
|
||||
userId,
|
||||
action,
|
||||
resource: "auth",
|
||||
details,
|
||||
ipAddress,
|
||||
userAgent,
|
||||
success,
|
||||
error,
|
||||
});
|
||||
}
|
||||
|
||||
async getAuditLogs({
|
||||
page,
|
||||
limit,
|
||||
action,
|
||||
userId,
|
||||
}: {
|
||||
page: number;
|
||||
limit: number;
|
||||
action?: AuditAction;
|
||||
userId?: string;
|
||||
}) {
|
||||
const skip = (page - 1) * limit;
|
||||
const where: Prisma.AuditLogWhereInput = {};
|
||||
if (action) where.action = action;
|
||||
if (userId) where.userId = userId;
|
||||
|
||||
const [logs, total] = await Promise.all([
|
||||
this.prisma.auditLog.findMany({
|
||||
where,
|
||||
include: {
|
||||
user: {
|
||||
select: {
|
||||
id: true,
|
||||
email: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
orderBy: { createdAt: "desc" },
|
||||
skip,
|
||||
take: limit,
|
||||
}),
|
||||
this.prisma.auditLog.count({ where }),
|
||||
]);
|
||||
|
||||
return { logs, total };
|
||||
}
|
||||
|
||||
async getSecurityStats() {
|
||||
const today = new Date(new Date().setHours(0, 0, 0, 0));
|
||||
|
||||
const [totalUsers, lockedAccounts, failedLoginsToday, successfulLoginsToday] =
|
||||
await Promise.all([
|
||||
this.prisma.user.count(),
|
||||
this.prisma.user.count({
|
||||
where: {
|
||||
lockedUntil: {
|
||||
gt: new Date(),
|
||||
},
|
||||
},
|
||||
}),
|
||||
this.prisma.auditLog.count({
|
||||
where: {
|
||||
action: AuditAction.LOGIN_FAILED,
|
||||
createdAt: {
|
||||
gte: today,
|
||||
},
|
||||
},
|
||||
}),
|
||||
this.prisma.auditLog.count({
|
||||
where: {
|
||||
action: AuditAction.LOGIN_SUCCESS,
|
||||
createdAt: {
|
||||
gte: today,
|
||||
},
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
return {
|
||||
totalUsers,
|
||||
lockedAccounts,
|
||||
failedLoginsToday,
|
||||
successfulLoginsToday,
|
||||
securityEventsToday: failedLoginsToday + successfulLoginsToday,
|
||||
};
|
||||
}
|
||||
}
|
||||
export { AuditLogService as AuditService } from "./audit-log.service.js";
|
||||
export type { AuditLogData, AuditRequest } from "./audit-log.service.js";
|
||||
export { AuditAction } from "./audit-log.service.js";
|
||||
export { AuditQueryService } from "./audit-query.service.js";
|
||||
|
||||
163
apps/bff/src/infra/cache/cache-strategy.base.ts
vendored
Normal file
163
apps/bff/src/infra/cache/cache-strategy.base.ts
vendored
Normal file
@ -0,0 +1,163 @@
|
||||
import { CacheService } from "./cache.service.js";
|
||||
import type { CacheBucketMetrics } from "./cache.types.js";
|
||||
|
||||
/**
|
||||
* Configuration for a cache bucket.
|
||||
*
|
||||
* @property ttlSeconds - TTL in seconds. `null` means no TTL (CDC-driven invalidation only).
|
||||
* @property allowNull - Whether to cache null/undefined values from the fetcher.
|
||||
*/
|
||||
export interface CacheBucketConfig {
|
||||
ttlSeconds: number | null;
|
||||
allowNull?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for the getOrSet operation.
|
||||
*/
|
||||
export interface GetOrSetOptions {
|
||||
/** Whether to cache null/undefined values (overrides bucket default) */
|
||||
allowNull?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for domain-specific cache services.
|
||||
*
|
||||
* Extracts the common patterns shared across OrdersCacheService,
|
||||
* SupportCacheService, and others:
|
||||
*
|
||||
* - **Request coalescing**: Deduplicates in-flight requests to the same key,
|
||||
* preventing thundering herd after cache miss or invalidation.
|
||||
*
|
||||
* - **Cache metrics tracking**: Tracks hits, misses, and invalidations
|
||||
* per named bucket.
|
||||
*
|
||||
* - **getOrSet pattern**: Check cache, coalesce, fetch, store -- with
|
||||
* configurable TTL (including null for CDC-driven caches) and null handling.
|
||||
*
|
||||
* Subclasses define their own bucket names, key builders, and invalidation methods.
|
||||
* The base handles the mechanical caching logic so subclasses focus on domain semantics.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* class OrdersCacheService extends CacheStrategyBase<'summaries' | 'details'> {
|
||||
* constructor(cache: CacheService) {
|
||||
* super(cache, {
|
||||
* summaries: { ttlSeconds: null }, // CDC-driven, no TTL
|
||||
* details: { ttlSeconds: null, allowNull: true },
|
||||
* });
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export abstract class CacheStrategyBase<TBucket extends string> {
|
||||
private readonly metrics: Record<TBucket, CacheBucketMetrics>;
|
||||
private readonly inflightRequests = new Map<string, Promise<unknown>>();
|
||||
private readonly bucketConfigs: Record<TBucket, CacheBucketConfig>;
|
||||
private invalidationCount = 0;
|
||||
|
||||
protected constructor(
|
||||
protected readonly cache: CacheService,
|
||||
bucketConfigs: Record<TBucket, CacheBucketConfig>
|
||||
) {
|
||||
this.bucketConfigs = bucketConfigs;
|
||||
|
||||
// Initialize metrics for each bucket
|
||||
const metrics = {} as Record<TBucket, CacheBucketMetrics>;
|
||||
for (const bucket of Object.keys(bucketConfigs) as TBucket[]) {
|
||||
metrics[bucket] = { hits: 0, misses: 0 };
|
||||
}
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or fetch a cached value with request coalescing.
|
||||
*
|
||||
* 1. Check Redis cache
|
||||
* 2. If cache miss, check for an in-flight request for the same key
|
||||
* 3. If no in-flight request, call the fetcher, store in cache, and return
|
||||
*
|
||||
* @param bucket - The metrics bucket to track hits/misses against
|
||||
* @param key - The Redis cache key
|
||||
* @param fetcher - Async function to fetch fresh data on cache miss
|
||||
* @param options - Optional overrides (e.g., allowNull)
|
||||
* @returns The cached or freshly fetched value
|
||||
*/
|
||||
protected async getOrSet<T>(
|
||||
bucket: TBucket,
|
||||
key: string,
|
||||
fetcher: () => Promise<T>,
|
||||
options?: GetOrSetOptions
|
||||
): Promise<T> {
|
||||
const config = this.bucketConfigs[bucket];
|
||||
const allowNull = options?.allowNull ?? config.allowNull ?? false;
|
||||
const ttlSeconds = config.ttlSeconds;
|
||||
|
||||
// Check Redis cache first
|
||||
const cached = await this.cache.get<T>(key);
|
||||
|
||||
if (cached !== null) {
|
||||
this.metrics[bucket].hits++;
|
||||
return cached;
|
||||
}
|
||||
|
||||
// Check for in-flight request (prevents thundering herd)
|
||||
const existingRequest = this.inflightRequests.get(key);
|
||||
if (existingRequest) {
|
||||
return existingRequest as Promise<T>;
|
||||
}
|
||||
|
||||
// Fetch fresh data
|
||||
this.metrics[bucket].misses++;
|
||||
|
||||
const fetchPromise = (async () => {
|
||||
try {
|
||||
const fresh = await fetcher();
|
||||
const valueToStore = allowNull ? (fresh ?? null) : fresh;
|
||||
|
||||
if (ttlSeconds === null) {
|
||||
await this.cache.set(key, valueToStore);
|
||||
} else {
|
||||
await this.cache.set(key, valueToStore, ttlSeconds);
|
||||
}
|
||||
|
||||
return fresh;
|
||||
} finally {
|
||||
this.inflightRequests.delete(key);
|
||||
}
|
||||
})();
|
||||
|
||||
this.inflightRequests.set(key, fetchPromise);
|
||||
return fetchPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate a single cache key and track the invalidation.
|
||||
*/
|
||||
protected async invalidate(key: string): Promise<void> {
|
||||
this.invalidationCount++;
|
||||
await this.cache.del(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate all keys matching a pattern and track the invalidation.
|
||||
*/
|
||||
protected async invalidatePattern(pattern: string): Promise<void> {
|
||||
this.invalidationCount++;
|
||||
await this.cache.delPattern(pattern);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current invalidation count.
|
||||
*/
|
||||
protected getInvalidationCount(): number {
|
||||
return this.invalidationCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a snapshot of metrics for a specific bucket.
|
||||
*/
|
||||
protected getBucketMetrics(bucket: TBucket): CacheBucketMetrics {
|
||||
return { ...this.metrics[bucket] };
|
||||
}
|
||||
}
|
||||
@ -2,6 +2,7 @@ import { Injectable, Inject } from "@nestjs/common";
|
||||
import { Logger } from "nestjs-pino";
|
||||
import { TransactionService, type TransactionOperation } from "./transaction.service.js";
|
||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||
import { RollbackCoordinator } from "./rollback-coordinator.js";
|
||||
|
||||
export interface DistributedStep<TId extends string = string, TResult = unknown> {
|
||||
id: TId;
|
||||
@ -460,32 +461,19 @@ export class DistributedTransactionService {
|
||||
executedSteps: readonly string[],
|
||||
transactionId: string
|
||||
): Promise<number> {
|
||||
this.logger.warn(`Executing rollbacks for ${executedSteps.length} steps [${transactionId}]`);
|
||||
// Build rollback actions only for executed steps that have a rollback function
|
||||
const actions = executedSteps
|
||||
.map(stepId => {
|
||||
const step = steps.find(s => s.id === stepId);
|
||||
if (!step?.rollback) return null;
|
||||
return {
|
||||
execute: step.rollback,
|
||||
label: `step: ${stepId}`,
|
||||
};
|
||||
})
|
||||
.filter((a): a is NonNullable<typeof a> => a !== null);
|
||||
|
||||
let rollbacksExecuted = 0;
|
||||
|
||||
for (let i = executedSteps.length - 1; i >= 0; i--) {
|
||||
const stepId = executedSteps[i];
|
||||
const step = steps.find(s => s.id === stepId);
|
||||
|
||||
if (step?.rollback) {
|
||||
try {
|
||||
this.logger.debug(`Executing rollback for step: ${stepId} [${transactionId}]`);
|
||||
// eslint-disable-next-line no-await-in-loop -- Rollbacks must execute sequentially in reverse order (LIFO) to maintain consistency
|
||||
await step.rollback();
|
||||
rollbacksExecuted++;
|
||||
this.logger.debug(`Rollback completed for step: ${stepId} [${transactionId}]`);
|
||||
} catch (rollbackError) {
|
||||
this.logger.error(`Rollback failed for step: ${stepId} [${transactionId}]`, {
|
||||
error: extractErrorMessage(rollbackError),
|
||||
});
|
||||
// Continue with other rollbacks even if one fails
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`Completed ${rollbacksExecuted} rollbacks [${transactionId}]`);
|
||||
return rollbacksExecuted;
|
||||
return RollbackCoordinator.executeInReverse(actions, transactionId, this.logger);
|
||||
}
|
||||
|
||||
private async executeStepWithRetry<TResult>(
|
||||
|
||||
@ -3,3 +3,4 @@
|
||||
*/
|
||||
export * from "./transaction.service.js";
|
||||
export * from "./distributed-transaction.service.js";
|
||||
export * from "./rollback-coordinator.js";
|
||||
|
||||
83
apps/bff/src/infra/database/services/rollback-coordinator.ts
Normal file
83
apps/bff/src/infra/database/services/rollback-coordinator.ts
Normal file
@ -0,0 +1,83 @@
|
||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||
|
||||
/**
|
||||
* A rollback action that can be executed during transaction failure recovery.
|
||||
*/
|
||||
export interface RollbackAction {
|
||||
/** Execute the rollback */
|
||||
execute: () => Promise<void>;
|
||||
/** Human-readable label for logging (e.g., step ID or index) */
|
||||
label: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logger interface for the RollbackCoordinator.
|
||||
* Compatible with nestjs-pino Logger.
|
||||
*/
|
||||
export interface RollbackLogger {
|
||||
warn(message: string, ...args: unknown[]): void;
|
||||
debug(message: string, ...args: unknown[]): void;
|
||||
error(message: string, ...args: unknown[]): void;
|
||||
log(message: string, ...args: unknown[]): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Coordinates rollback execution across transaction services.
|
||||
*
|
||||
* Shared patterns:
|
||||
* - LIFO (last-in-first-out) execution order
|
||||
* - Continue on individual rollback failure (best-effort)
|
||||
* - Structured logging of progress and failures
|
||||
* - Returns count of successfully executed rollbacks
|
||||
*
|
||||
* Used by both TransactionService (database transactions with external rollbacks)
|
||||
* and DistributedTransactionService (multi-step distributed transactions).
|
||||
*/
|
||||
export class RollbackCoordinator {
|
||||
/**
|
||||
* Execute rollback actions in reverse order (LIFO).
|
||||
*
|
||||
* Each rollback is attempted independently; a failure in one rollback
|
||||
* does not prevent subsequent rollbacks from executing.
|
||||
*
|
||||
* @param actions - Ordered list of rollback actions (will be executed in reverse)
|
||||
* @param transactionId - Transaction identifier for log correlation
|
||||
* @param logger - Logger instance for structured output
|
||||
* @returns Number of successfully executed rollbacks
|
||||
*/
|
||||
static async executeInReverse(
|
||||
actions: readonly RollbackAction[],
|
||||
transactionId: string,
|
||||
logger: RollbackLogger
|
||||
): Promise<number> {
|
||||
if (actions.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
logger.warn(`Executing ${actions.length} rollback actions [${transactionId}]`);
|
||||
|
||||
let rollbacksExecuted = 0;
|
||||
|
||||
for (let i = actions.length - 1; i >= 0; i--) {
|
||||
const action = actions[i];
|
||||
if (!action) continue;
|
||||
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop -- Rollbacks must execute sequentially in LIFO order
|
||||
await action.execute();
|
||||
rollbacksExecuted++;
|
||||
logger.debug(`Rollback ${action.label} completed [${transactionId}]`);
|
||||
} catch (rollbackError) {
|
||||
logger.error(`Rollback ${action.label} failed [${transactionId}]`, {
|
||||
error: extractErrorMessage(rollbackError),
|
||||
});
|
||||
// Continue with other rollbacks even if one fails
|
||||
}
|
||||
}
|
||||
|
||||
logger.log(
|
||||
`Completed ${rollbacksExecuted}/${actions.length} rollbacks [${transactionId}]`
|
||||
);
|
||||
return rollbacksExecuted;
|
||||
}
|
||||
}
|
||||
@ -3,6 +3,7 @@ import { Logger } from "nestjs-pino";
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { PrismaService } from "../prisma.service.js";
|
||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||
import { RollbackCoordinator } from "./rollback-coordinator.js";
|
||||
|
||||
export interface TransactionContext {
|
||||
id: string;
|
||||
@ -21,8 +22,6 @@ export type TransactionOperation<T> = (
|
||||
context: TransactionContext & TransactionContextHelpers
|
||||
) => Promise<T>;
|
||||
|
||||
export type SimpleTransactionOperation<T> = (tx: Prisma.TransactionClient) => Promise<T>;
|
||||
|
||||
export interface TransactionOptions {
|
||||
/**
|
||||
* Maximum time to wait for transaction to complete (ms)
|
||||
@ -225,25 +224,6 @@ export class TransactionService {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a simple database-only transaction (no external operations)
|
||||
*/
|
||||
async executeSimpleTransaction<T>(
|
||||
operation: SimpleTransactionOperation<T>,
|
||||
options: Omit<TransactionOptions, "autoRollback"> = {}
|
||||
): Promise<T> {
|
||||
const result = await this.executeTransaction(async tx => operation(tx), {
|
||||
...options,
|
||||
autoRollback: false,
|
||||
});
|
||||
|
||||
if (!result.success) {
|
||||
throw new Error(result.error || "Transaction failed");
|
||||
}
|
||||
|
||||
return result.data!;
|
||||
}
|
||||
|
||||
private async executeTransactionAttempt<T>(
|
||||
operation: TransactionOperation<T>,
|
||||
context: TransactionContext,
|
||||
@ -288,33 +268,12 @@ export class TransactionService {
|
||||
return 0;
|
||||
}
|
||||
|
||||
this.logger.warn(
|
||||
`Executing ${context.rollbackActions.length} rollback actions [${context.id}]`
|
||||
);
|
||||
const actions = context.rollbackActions.map((fn, i) => ({
|
||||
execute: fn,
|
||||
label: `${i + 1}`,
|
||||
}));
|
||||
|
||||
let rollbacksExecuted = 0;
|
||||
|
||||
// Execute rollbacks in reverse order (LIFO)
|
||||
for (let i = context.rollbackActions.length - 1; i >= 0; i--) {
|
||||
const rollbackAction = context.rollbackActions[i];
|
||||
if (!rollbackAction) continue;
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop -- Rollbacks must execute sequentially in LIFO order
|
||||
await rollbackAction();
|
||||
rollbacksExecuted++;
|
||||
this.logger.debug(`Rollback ${i + 1} completed [${context.id}]`);
|
||||
} catch (rollbackError) {
|
||||
this.logger.error(`Rollback ${i + 1} failed [${context.id}]`, {
|
||||
error: extractErrorMessage(rollbackError),
|
||||
});
|
||||
// Continue with other rollbacks even if one fails
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`Completed ${rollbacksExecuted}/${context.rollbackActions.length} rollbacks [${context.id}]`
|
||||
);
|
||||
return rollbacksExecuted;
|
||||
return RollbackCoordinator.executeInReverse(actions, context.id, this.logger);
|
||||
}
|
||||
|
||||
private isRetryableError(error: unknown): boolean {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { CacheService } from "@bff/infra/cache/cache.service.js";
|
||||
import { CacheStrategyBase } from "@bff/infra/cache/cache-strategy.base.js";
|
||||
import type { CacheBucketMetrics } from "@bff/infra/cache/cache.types.js";
|
||||
import type { OrderDetails, OrderSummary } from "@customer-portal/domain/orders";
|
||||
|
||||
@ -21,18 +22,13 @@ interface OrdersCacheMetrics {
|
||||
* - Metrics tracking: Monitors hits, misses, and invalidations
|
||||
*/
|
||||
@Injectable()
|
||||
export class OrdersCacheService {
|
||||
private readonly metrics: OrdersCacheMetrics = {
|
||||
summaries: { hits: 0, misses: 0 },
|
||||
details: { hits: 0, misses: 0 },
|
||||
invalidations: 0,
|
||||
};
|
||||
|
||||
// Request coalescing: Prevents duplicate API calls when multiple users
|
||||
// request the same data after CDC invalidation
|
||||
private readonly inflightRequests = new Map<string, Promise<unknown>>();
|
||||
|
||||
constructor(private readonly cache: CacheService) {}
|
||||
export class OrdersCacheService extends CacheStrategyBase<"summaries" | "details"> {
|
||||
constructor(cache: CacheService) {
|
||||
super(cache, {
|
||||
summaries: { ttlSeconds: null },
|
||||
details: { ttlSeconds: null, allowNull: true },
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get order summaries for an account (CDC-driven cache)
|
||||
@ -42,7 +38,7 @@ export class OrdersCacheService {
|
||||
fetcher: () => Promise<OrderSummary[]>
|
||||
): Promise<OrderSummary[]> {
|
||||
const key = this.buildAccountKey(sfAccountId);
|
||||
return this.getOrSet<OrderSummary[]>("summaries", key, fetcher, false);
|
||||
return this.getOrSet("summaries", key, fetcher);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -53,7 +49,7 @@ export class OrdersCacheService {
|
||||
fetcher: () => Promise<OrderDetails | null>
|
||||
): Promise<OrderDetails | null> {
|
||||
const key = this.buildOrderKey(orderId);
|
||||
return this.getOrSet<OrderDetails | null>("details", key, fetcher, true);
|
||||
return this.getOrSet("details", key, fetcher);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -61,8 +57,7 @@ export class OrdersCacheService {
|
||||
*/
|
||||
async invalidateAccountOrders(sfAccountId: string): Promise<void> {
|
||||
const key = this.buildAccountKey(sfAccountId);
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.del(key);
|
||||
await this.invalidate(key);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -70,8 +65,7 @@ export class OrdersCacheService {
|
||||
*/
|
||||
async invalidateOrder(orderId: string): Promise<void> {
|
||||
const key = this.buildOrderKey(orderId);
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.del(key);
|
||||
await this.invalidate(key);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -79,50 +73,12 @@ export class OrdersCacheService {
|
||||
*/
|
||||
getMetrics(): OrdersCacheMetrics {
|
||||
return {
|
||||
summaries: { ...this.metrics.summaries },
|
||||
details: { ...this.metrics.details },
|
||||
invalidations: this.metrics.invalidations,
|
||||
summaries: this.getBucketMetrics("summaries"),
|
||||
details: this.getBucketMetrics("details"),
|
||||
invalidations: this.getInvalidationCount(),
|
||||
};
|
||||
}
|
||||
|
||||
private async getOrSet<T>(
|
||||
bucket: keyof Pick<OrdersCacheMetrics, "summaries" | "details">,
|
||||
key: string,
|
||||
fetcher: () => Promise<T>,
|
||||
allowNull: boolean
|
||||
): Promise<T> {
|
||||
// Check Redis cache first
|
||||
const cached = await this.cache.get<T>(key);
|
||||
|
||||
if (cached !== null && (allowNull || cached !== null)) {
|
||||
this.metrics[bucket].hits++;
|
||||
return cached;
|
||||
}
|
||||
|
||||
// Check for in-flight request (prevents thundering herd)
|
||||
const existingRequest = this.inflightRequests.get(key);
|
||||
if (existingRequest) {
|
||||
return existingRequest as Promise<T>;
|
||||
}
|
||||
|
||||
// Fetch fresh data
|
||||
this.metrics[bucket].misses++;
|
||||
|
||||
const fetchPromise = (async () => {
|
||||
try {
|
||||
const fresh = await fetcher();
|
||||
const valueToStore = allowNull ? (fresh ?? null) : fresh;
|
||||
await this.cache.set(key, valueToStore);
|
||||
return fresh;
|
||||
} finally {
|
||||
this.inflightRequests.delete(key);
|
||||
}
|
||||
})();
|
||||
|
||||
this.inflightRequests.set(key, fetchPromise);
|
||||
return fetchPromise;
|
||||
}
|
||||
|
||||
private buildAccountKey(sfAccountId: string): string {
|
||||
return `orders:account:${sfAccountId}`;
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { CacheService } from "@bff/infra/cache/cache.service.js";
|
||||
import { CacheStrategyBase } from "@bff/infra/cache/cache-strategy.base.js";
|
||||
import type { CacheBucketMetrics } from "@bff/infra/cache/cache.types.js";
|
||||
import type { SupportCaseList, CaseMessageList } from "@customer-portal/domain/support";
|
||||
|
||||
@ -36,17 +37,13 @@ interface SupportCacheMetrics {
|
||||
* - Metrics tracking: Monitors hits, misses, and invalidations
|
||||
*/
|
||||
@Injectable()
|
||||
export class SupportCacheService {
|
||||
private readonly metrics: SupportCacheMetrics = {
|
||||
caseList: { hits: 0, misses: 0 },
|
||||
messages: { hits: 0, misses: 0 },
|
||||
invalidations: 0,
|
||||
};
|
||||
|
||||
// Request coalescing: Prevents duplicate API calls
|
||||
private readonly inflightRequests = new Map<string, Promise<unknown>>();
|
||||
|
||||
constructor(private readonly cache: CacheService) {}
|
||||
export class SupportCacheService extends CacheStrategyBase<"caseList" | "messages"> {
|
||||
constructor(cache: CacheService) {
|
||||
super(cache, {
|
||||
caseList: { ttlSeconds: CACHE_TTL.CASE_LIST },
|
||||
messages: { ttlSeconds: CACHE_TTL.CASE_MESSAGES },
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cached case list for an account
|
||||
@ -56,7 +53,7 @@ export class SupportCacheService {
|
||||
fetcher: () => Promise<SupportCaseList>
|
||||
): Promise<SupportCaseList> {
|
||||
const key = this.buildCaseListKey(sfAccountId);
|
||||
return this.getOrSet("caseList", key, fetcher, CACHE_TTL.CASE_LIST);
|
||||
return this.getOrSet("caseList", key, fetcher);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -67,7 +64,7 @@ export class SupportCacheService {
|
||||
fetcher: () => Promise<CaseMessageList>
|
||||
): Promise<CaseMessageList> {
|
||||
const key = this.buildMessagesKey(caseId);
|
||||
return this.getOrSet("messages", key, fetcher, CACHE_TTL.CASE_MESSAGES);
|
||||
return this.getOrSet("messages", key, fetcher);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -76,8 +73,7 @@ export class SupportCacheService {
|
||||
*/
|
||||
async invalidateCaseList(sfAccountId: string): Promise<void> {
|
||||
const key = this.buildCaseListKey(sfAccountId);
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.del(key);
|
||||
await this.invalidate(key);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -86,8 +82,7 @@ export class SupportCacheService {
|
||||
*/
|
||||
async invalidateCaseMessages(caseId: string): Promise<void> {
|
||||
const key = this.buildMessagesKey(caseId);
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.del(key);
|
||||
await this.invalidate(key);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -106,49 +101,12 @@ export class SupportCacheService {
|
||||
*/
|
||||
getMetrics(): SupportCacheMetrics {
|
||||
return {
|
||||
caseList: { ...this.metrics.caseList },
|
||||
messages: { ...this.metrics.messages },
|
||||
invalidations: this.metrics.invalidations,
|
||||
caseList: this.getBucketMetrics("caseList"),
|
||||
messages: this.getBucketMetrics("messages"),
|
||||
invalidations: this.getInvalidationCount(),
|
||||
};
|
||||
}
|
||||
|
||||
private async getOrSet<T>(
|
||||
bucket: "caseList" | "messages",
|
||||
key: string,
|
||||
fetcher: () => Promise<T>,
|
||||
ttlSeconds: number
|
||||
): Promise<T> {
|
||||
// Check Redis cache first
|
||||
const cached = await this.cache.get<T>(key);
|
||||
|
||||
if (cached !== null) {
|
||||
this.metrics[bucket].hits++;
|
||||
return cached;
|
||||
}
|
||||
|
||||
// Check for in-flight request (prevents thundering herd)
|
||||
const existingRequest = this.inflightRequests.get(key);
|
||||
if (existingRequest) {
|
||||
return existingRequest as Promise<T>;
|
||||
}
|
||||
|
||||
// Fetch fresh data
|
||||
this.metrics[bucket].misses++;
|
||||
|
||||
const fetchPromise = (async () => {
|
||||
try {
|
||||
const fresh = await fetcher();
|
||||
await this.cache.set(key, fresh, ttlSeconds);
|
||||
return fresh;
|
||||
} finally {
|
||||
this.inflightRequests.delete(key);
|
||||
}
|
||||
})();
|
||||
|
||||
this.inflightRequests.set(key, fetchPromise);
|
||||
return fetchPromise;
|
||||
}
|
||||
|
||||
private buildCaseListKey(sfAccountId: string): string {
|
||||
return `support:cases:${sfAccountId}`;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user