refactor: infrastructure consolidation
- Create RollbackCoordinator shared by TransactionService and DistributedTransactionService - Remove unused executeSimpleTransaction() - Split AuditService into AuditLogService (writes) and AuditQueryService (reads) - Create CacheStrategyBase with request coalescing, metrics, and getOrSet pattern - Refactor orders and support cache services to extend CacheStrategyBase
This commit is contained in:
parent
6e51012d21
commit
34abe1981f
97
apps/bff/src/infra/audit/audit-log.service.ts
Normal file
97
apps/bff/src/infra/audit/audit-log.service.ts
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
import { Injectable, Inject } from "@nestjs/common";
|
||||||
|
import { Prisma, AuditAction } from "@prisma/client";
|
||||||
|
import { PrismaService } from "../database/prisma.service.js";
|
||||||
|
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||||
|
import { Logger } from "nestjs-pino";
|
||||||
|
import { extractClientIp, extractUserAgent } from "@bff/core/http/request-context.util.js";
|
||||||
|
|
||||||
|
// Re-export AuditAction from Prisma for consumers
|
||||||
|
export { AuditAction } from "@prisma/client";
|
||||||
|
|
||||||
|
export interface AuditLogData {
|
||||||
|
userId?: string | undefined;
|
||||||
|
action: AuditAction;
|
||||||
|
resource?: string | undefined;
|
||||||
|
details?: Record<string, unknown> | string | number | boolean | null | undefined;
|
||||||
|
ipAddress?: string | undefined;
|
||||||
|
userAgent?: string | undefined;
|
||||||
|
success?: boolean | undefined;
|
||||||
|
error?: string | undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Minimal request shape for audit logging.
|
||||||
|
* Compatible with Express Request but only requires the fields needed for IP/UA extraction.
|
||||||
|
* Must be compatible with RequestContextLike from request-context.util.ts.
|
||||||
|
*/
|
||||||
|
export type AuditRequest = {
|
||||||
|
headers?: Record<string, string | string[] | undefined> | undefined;
|
||||||
|
ip?: string | undefined;
|
||||||
|
connection?: { remoteAddress?: string | undefined } | undefined;
|
||||||
|
socket?: { remoteAddress?: string | undefined } | undefined;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Service for writing audit log entries.
|
||||||
|
*
|
||||||
|
* Handles all audit write operations: creating log entries and
|
||||||
|
* logging authentication events with request context extraction.
|
||||||
|
*/
|
||||||
|
@Injectable()
|
||||||
|
export class AuditLogService {
|
||||||
|
constructor(
|
||||||
|
private readonly prisma: PrismaService,
|
||||||
|
@Inject(Logger) private readonly logger: Logger
|
||||||
|
) {}
|
||||||
|
|
||||||
|
async log(data: AuditLogData): Promise<void> {
|
||||||
|
try {
|
||||||
|
const createData: Parameters<typeof this.prisma.auditLog.create>[0]["data"] = {
|
||||||
|
action: data.action,
|
||||||
|
success: data.success ?? true,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (data.userId !== undefined) createData.userId = data.userId;
|
||||||
|
if (data.resource !== undefined) createData.resource = data.resource;
|
||||||
|
if (data.ipAddress !== undefined) createData.ipAddress = data.ipAddress;
|
||||||
|
if (data.userAgent !== undefined) createData.userAgent = data.userAgent;
|
||||||
|
if (data.error !== undefined) createData.error = data.error;
|
||||||
|
if (data.details !== undefined) {
|
||||||
|
createData.details =
|
||||||
|
data.details === null
|
||||||
|
? Prisma.JsonNull
|
||||||
|
: (JSON.parse(JSON.stringify(data.details)) as Prisma.InputJsonValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.prisma.auditLog.create({ data: createData });
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error("Audit logging failed", {
|
||||||
|
errorType: error instanceof Error ? error.constructor.name : "Unknown",
|
||||||
|
message: extractErrorMessage(error),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async logAuthEvent(
|
||||||
|
action: AuditAction,
|
||||||
|
userId?: string,
|
||||||
|
details?: Record<string, unknown> | string | number | boolean | null,
|
||||||
|
request?: AuditRequest,
|
||||||
|
success: boolean = true,
|
||||||
|
error?: string
|
||||||
|
): Promise<void> {
|
||||||
|
const ipAddress = extractClientIp(request);
|
||||||
|
const userAgent = extractUserAgent(request);
|
||||||
|
|
||||||
|
await this.log({
|
||||||
|
userId,
|
||||||
|
action,
|
||||||
|
resource: "auth",
|
||||||
|
details,
|
||||||
|
ipAddress,
|
||||||
|
userAgent,
|
||||||
|
success,
|
||||||
|
error,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
91
apps/bff/src/infra/audit/audit-query.service.ts
Normal file
91
apps/bff/src/infra/audit/audit-query.service.ts
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
import { Injectable } from "@nestjs/common";
|
||||||
|
import { Prisma, AuditAction } from "@prisma/client";
|
||||||
|
import { PrismaService } from "../database/prisma.service.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Service for reading and querying audit log entries.
|
||||||
|
*
|
||||||
|
* Handles all audit read operations: paginated log retrieval
|
||||||
|
* and security analytics/statistics.
|
||||||
|
*/
|
||||||
|
@Injectable()
|
||||||
|
export class AuditQueryService {
|
||||||
|
constructor(private readonly prisma: PrismaService) {}
|
||||||
|
|
||||||
|
async getAuditLogs({
|
||||||
|
page,
|
||||||
|
limit,
|
||||||
|
action,
|
||||||
|
userId,
|
||||||
|
}: {
|
||||||
|
page: number;
|
||||||
|
limit: number;
|
||||||
|
action?: AuditAction;
|
||||||
|
userId?: string;
|
||||||
|
}) {
|
||||||
|
const skip = (page - 1) * limit;
|
||||||
|
const where: Prisma.AuditLogWhereInput = {};
|
||||||
|
if (action) where.action = action;
|
||||||
|
if (userId) where.userId = userId;
|
||||||
|
|
||||||
|
const [logs, total] = await Promise.all([
|
||||||
|
this.prisma.auditLog.findMany({
|
||||||
|
where,
|
||||||
|
include: {
|
||||||
|
user: {
|
||||||
|
select: {
|
||||||
|
id: true,
|
||||||
|
email: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
orderBy: { createdAt: "desc" },
|
||||||
|
skip,
|
||||||
|
take: limit,
|
||||||
|
}),
|
||||||
|
this.prisma.auditLog.count({ where }),
|
||||||
|
]);
|
||||||
|
|
||||||
|
return { logs, total };
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSecurityStats() {
|
||||||
|
const today = new Date(new Date().setHours(0, 0, 0, 0));
|
||||||
|
|
||||||
|
const [totalUsers, lockedAccounts, failedLoginsToday, successfulLoginsToday] =
|
||||||
|
await Promise.all([
|
||||||
|
this.prisma.user.count(),
|
||||||
|
this.prisma.user.count({
|
||||||
|
where: {
|
||||||
|
lockedUntil: {
|
||||||
|
gt: new Date(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
this.prisma.auditLog.count({
|
||||||
|
where: {
|
||||||
|
action: AuditAction.LOGIN_FAILED,
|
||||||
|
createdAt: {
|
||||||
|
gte: today,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
this.prisma.auditLog.count({
|
||||||
|
where: {
|
||||||
|
action: AuditAction.LOGIN_SUCCESS,
|
||||||
|
createdAt: {
|
||||||
|
gte: today,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
|
||||||
|
return {
|
||||||
|
totalUsers,
|
||||||
|
lockedAccounts,
|
||||||
|
failedLoginsToday,
|
||||||
|
successfulLoginsToday,
|
||||||
|
securityEventsToday: failedLoginsToday + successfulLoginsToday,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,9 +1,20 @@
|
|||||||
import { Global, Module } from "@nestjs/common";
|
import { Global, Module } from "@nestjs/common";
|
||||||
import { AuditService } from "./audit.service.js";
|
import { AuditLogService } from "./audit-log.service.js";
|
||||||
|
import { AuditQueryService } from "./audit-query.service.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Global audit module.
|
||||||
|
*
|
||||||
|
* Provides:
|
||||||
|
* - AuditLogService (write operations: log, logAuthEvent)
|
||||||
|
* - AuditQueryService (read operations: getAuditLogs, getSecurityStats)
|
||||||
|
*
|
||||||
|
* Existing callers can continue to import { AuditService } from audit.service.ts,
|
||||||
|
* which re-exports AuditLogService under the old name for backward compatibility.
|
||||||
|
*/
|
||||||
@Global()
|
@Global()
|
||||||
@Module({
|
@Module({
|
||||||
providers: [AuditService],
|
providers: [AuditLogService, AuditQueryService],
|
||||||
exports: [AuditService],
|
exports: [AuditLogService, AuditQueryService],
|
||||||
})
|
})
|
||||||
export class AuditModule {}
|
export class AuditModule {}
|
||||||
|
|||||||
@ -1,168 +1,15 @@
|
|||||||
import { Injectable, Inject } from "@nestjs/common";
|
|
||||||
import { Prisma, AuditAction } from "@prisma/client";
|
|
||||||
import { PrismaService } from "../database/prisma.service.js";
|
|
||||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
|
||||||
import { Logger } from "nestjs-pino";
|
|
||||||
import { extractClientIp, extractUserAgent } from "@bff/core/http/request-context.util.js";
|
|
||||||
|
|
||||||
// Re-export AuditAction from Prisma for consumers
|
|
||||||
export { AuditAction } from "@prisma/client";
|
|
||||||
|
|
||||||
export interface AuditLogData {
|
|
||||||
userId?: string | undefined;
|
|
||||||
action: AuditAction;
|
|
||||||
resource?: string | undefined;
|
|
||||||
details?: Record<string, unknown> | string | number | boolean | null | undefined;
|
|
||||||
ipAddress?: string | undefined;
|
|
||||||
userAgent?: string | undefined;
|
|
||||||
success?: boolean | undefined;
|
|
||||||
error?: string | undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Minimal request shape for audit logging.
|
* Backward-compatible re-exports.
|
||||||
* Compatible with Express Request but only requires the fields needed for IP/UA extraction.
|
*
|
||||||
* Must be compatible with RequestContextLike from request-context.util.ts.
|
* The original AuditService has been split into:
|
||||||
|
* - AuditLogService (write operations: log, logAuthEvent)
|
||||||
|
* - AuditQueryService (read operations: getAuditLogs, getSecurityStats)
|
||||||
|
*
|
||||||
|
* Existing callers that import { AuditService } will get AuditLogService
|
||||||
|
* via the type alias below, so no import path changes are needed.
|
||||||
*/
|
*/
|
||||||
export type AuditRequest = {
|
|
||||||
headers?: Record<string, string | string[] | undefined> | undefined;
|
|
||||||
ip?: string | undefined;
|
|
||||||
connection?: { remoteAddress?: string | undefined } | undefined;
|
|
||||||
socket?: { remoteAddress?: string | undefined } | undefined;
|
|
||||||
};
|
|
||||||
|
|
||||||
@Injectable()
|
export { AuditLogService as AuditService } from "./audit-log.service.js";
|
||||||
export class AuditService {
|
export type { AuditLogData, AuditRequest } from "./audit-log.service.js";
|
||||||
constructor(
|
export { AuditAction } from "./audit-log.service.js";
|
||||||
private readonly prisma: PrismaService,
|
export { AuditQueryService } from "./audit-query.service.js";
|
||||||
@Inject(Logger) private readonly logger: Logger
|
|
||||||
) {}
|
|
||||||
|
|
||||||
async log(data: AuditLogData): Promise<void> {
|
|
||||||
try {
|
|
||||||
const createData: Parameters<typeof this.prisma.auditLog.create>[0]["data"] = {
|
|
||||||
action: data.action,
|
|
||||||
success: data.success ?? true,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (data.userId !== undefined) createData.userId = data.userId;
|
|
||||||
if (data.resource !== undefined) createData.resource = data.resource;
|
|
||||||
if (data.ipAddress !== undefined) createData.ipAddress = data.ipAddress;
|
|
||||||
if (data.userAgent !== undefined) createData.userAgent = data.userAgent;
|
|
||||||
if (data.error !== undefined) createData.error = data.error;
|
|
||||||
if (data.details !== undefined) {
|
|
||||||
createData.details =
|
|
||||||
data.details === null
|
|
||||||
? Prisma.JsonNull
|
|
||||||
: (JSON.parse(JSON.stringify(data.details)) as Prisma.InputJsonValue);
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.prisma.auditLog.create({ data: createData });
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error("Audit logging failed", {
|
|
||||||
errorType: error instanceof Error ? error.constructor.name : "Unknown",
|
|
||||||
message: extractErrorMessage(error),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async logAuthEvent(
|
|
||||||
action: AuditAction,
|
|
||||||
userId?: string,
|
|
||||||
details?: Record<string, unknown> | string | number | boolean | null,
|
|
||||||
request?: AuditRequest,
|
|
||||||
success: boolean = true,
|
|
||||||
error?: string
|
|
||||||
): Promise<void> {
|
|
||||||
const ipAddress = extractClientIp(request);
|
|
||||||
const userAgent = extractUserAgent(request);
|
|
||||||
|
|
||||||
await this.log({
|
|
||||||
userId,
|
|
||||||
action,
|
|
||||||
resource: "auth",
|
|
||||||
details,
|
|
||||||
ipAddress,
|
|
||||||
userAgent,
|
|
||||||
success,
|
|
||||||
error,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async getAuditLogs({
|
|
||||||
page,
|
|
||||||
limit,
|
|
||||||
action,
|
|
||||||
userId,
|
|
||||||
}: {
|
|
||||||
page: number;
|
|
||||||
limit: number;
|
|
||||||
action?: AuditAction;
|
|
||||||
userId?: string;
|
|
||||||
}) {
|
|
||||||
const skip = (page - 1) * limit;
|
|
||||||
const where: Prisma.AuditLogWhereInput = {};
|
|
||||||
if (action) where.action = action;
|
|
||||||
if (userId) where.userId = userId;
|
|
||||||
|
|
||||||
const [logs, total] = await Promise.all([
|
|
||||||
this.prisma.auditLog.findMany({
|
|
||||||
where,
|
|
||||||
include: {
|
|
||||||
user: {
|
|
||||||
select: {
|
|
||||||
id: true,
|
|
||||||
email: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
orderBy: { createdAt: "desc" },
|
|
||||||
skip,
|
|
||||||
take: limit,
|
|
||||||
}),
|
|
||||||
this.prisma.auditLog.count({ where }),
|
|
||||||
]);
|
|
||||||
|
|
||||||
return { logs, total };
|
|
||||||
}
|
|
||||||
|
|
||||||
async getSecurityStats() {
|
|
||||||
const today = new Date(new Date().setHours(0, 0, 0, 0));
|
|
||||||
|
|
||||||
const [totalUsers, lockedAccounts, failedLoginsToday, successfulLoginsToday] =
|
|
||||||
await Promise.all([
|
|
||||||
this.prisma.user.count(),
|
|
||||||
this.prisma.user.count({
|
|
||||||
where: {
|
|
||||||
lockedUntil: {
|
|
||||||
gt: new Date(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
this.prisma.auditLog.count({
|
|
||||||
where: {
|
|
||||||
action: AuditAction.LOGIN_FAILED,
|
|
||||||
createdAt: {
|
|
||||||
gte: today,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
this.prisma.auditLog.count({
|
|
||||||
where: {
|
|
||||||
action: AuditAction.LOGIN_SUCCESS,
|
|
||||||
createdAt: {
|
|
||||||
gte: today,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
]);
|
|
||||||
|
|
||||||
return {
|
|
||||||
totalUsers,
|
|
||||||
lockedAccounts,
|
|
||||||
failedLoginsToday,
|
|
||||||
successfulLoginsToday,
|
|
||||||
securityEventsToday: failedLoginsToday + successfulLoginsToday,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
163
apps/bff/src/infra/cache/cache-strategy.base.ts
vendored
Normal file
163
apps/bff/src/infra/cache/cache-strategy.base.ts
vendored
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
import { CacheService } from "./cache.service.js";
|
||||||
|
import type { CacheBucketMetrics } from "./cache.types.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration for a cache bucket.
|
||||||
|
*
|
||||||
|
* @property ttlSeconds - TTL in seconds. `null` means no TTL (CDC-driven invalidation only).
|
||||||
|
* @property allowNull - Whether to cache null/undefined values from the fetcher.
|
||||||
|
*/
|
||||||
|
export interface CacheBucketConfig {
|
||||||
|
ttlSeconds: number | null;
|
||||||
|
allowNull?: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Options for the getOrSet operation.
|
||||||
|
*/
|
||||||
|
export interface GetOrSetOptions {
|
||||||
|
/** Whether to cache null/undefined values (overrides bucket default) */
|
||||||
|
allowNull?: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for domain-specific cache services.
|
||||||
|
*
|
||||||
|
* Extracts the common patterns shared across OrdersCacheService,
|
||||||
|
* SupportCacheService, and others:
|
||||||
|
*
|
||||||
|
* - **Request coalescing**: Deduplicates in-flight requests to the same key,
|
||||||
|
* preventing thundering herd after cache miss or invalidation.
|
||||||
|
*
|
||||||
|
* - **Cache metrics tracking**: Tracks hits, misses, and invalidations
|
||||||
|
* per named bucket.
|
||||||
|
*
|
||||||
|
* - **getOrSet pattern**: Check cache, coalesce, fetch, store -- with
|
||||||
|
* configurable TTL (including null for CDC-driven caches) and null handling.
|
||||||
|
*
|
||||||
|
* Subclasses define their own bucket names, key builders, and invalidation methods.
|
||||||
|
* The base handles the mechanical caching logic so subclasses focus on domain semantics.
|
||||||
|
*
|
||||||
|
* @example
|
||||||
|
* ```typescript
|
||||||
|
* class OrdersCacheService extends CacheStrategyBase<'summaries' | 'details'> {
|
||||||
|
* constructor(cache: CacheService) {
|
||||||
|
* super(cache, {
|
||||||
|
* summaries: { ttlSeconds: null }, // CDC-driven, no TTL
|
||||||
|
* details: { ttlSeconds: null, allowNull: true },
|
||||||
|
* });
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
export abstract class CacheStrategyBase<TBucket extends string> {
|
||||||
|
private readonly metrics: Record<TBucket, CacheBucketMetrics>;
|
||||||
|
private readonly inflightRequests = new Map<string, Promise<unknown>>();
|
||||||
|
private readonly bucketConfigs: Record<TBucket, CacheBucketConfig>;
|
||||||
|
private invalidationCount = 0;
|
||||||
|
|
||||||
|
protected constructor(
|
||||||
|
protected readonly cache: CacheService,
|
||||||
|
bucketConfigs: Record<TBucket, CacheBucketConfig>
|
||||||
|
) {
|
||||||
|
this.bucketConfigs = bucketConfigs;
|
||||||
|
|
||||||
|
// Initialize metrics for each bucket
|
||||||
|
const metrics = {} as Record<TBucket, CacheBucketMetrics>;
|
||||||
|
for (const bucket of Object.keys(bucketConfigs) as TBucket[]) {
|
||||||
|
metrics[bucket] = { hits: 0, misses: 0 };
|
||||||
|
}
|
||||||
|
this.metrics = metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or fetch a cached value with request coalescing.
|
||||||
|
*
|
||||||
|
* 1. Check Redis cache
|
||||||
|
* 2. If cache miss, check for an in-flight request for the same key
|
||||||
|
* 3. If no in-flight request, call the fetcher, store in cache, and return
|
||||||
|
*
|
||||||
|
* @param bucket - The metrics bucket to track hits/misses against
|
||||||
|
* @param key - The Redis cache key
|
||||||
|
* @param fetcher - Async function to fetch fresh data on cache miss
|
||||||
|
* @param options - Optional overrides (e.g., allowNull)
|
||||||
|
* @returns The cached or freshly fetched value
|
||||||
|
*/
|
||||||
|
protected async getOrSet<T>(
|
||||||
|
bucket: TBucket,
|
||||||
|
key: string,
|
||||||
|
fetcher: () => Promise<T>,
|
||||||
|
options?: GetOrSetOptions
|
||||||
|
): Promise<T> {
|
||||||
|
const config = this.bucketConfigs[bucket];
|
||||||
|
const allowNull = options?.allowNull ?? config.allowNull ?? false;
|
||||||
|
const ttlSeconds = config.ttlSeconds;
|
||||||
|
|
||||||
|
// Check Redis cache first
|
||||||
|
const cached = await this.cache.get<T>(key);
|
||||||
|
|
||||||
|
if (cached !== null) {
|
||||||
|
this.metrics[bucket].hits++;
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for in-flight request (prevents thundering herd)
|
||||||
|
const existingRequest = this.inflightRequests.get(key);
|
||||||
|
if (existingRequest) {
|
||||||
|
return existingRequest as Promise<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch fresh data
|
||||||
|
this.metrics[bucket].misses++;
|
||||||
|
|
||||||
|
const fetchPromise = (async () => {
|
||||||
|
try {
|
||||||
|
const fresh = await fetcher();
|
||||||
|
const valueToStore = allowNull ? (fresh ?? null) : fresh;
|
||||||
|
|
||||||
|
if (ttlSeconds === null) {
|
||||||
|
await this.cache.set(key, valueToStore);
|
||||||
|
} else {
|
||||||
|
await this.cache.set(key, valueToStore, ttlSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fresh;
|
||||||
|
} finally {
|
||||||
|
this.inflightRequests.delete(key);
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
this.inflightRequests.set(key, fetchPromise);
|
||||||
|
return fetchPromise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidate a single cache key and track the invalidation.
|
||||||
|
*/
|
||||||
|
protected async invalidate(key: string): Promise<void> {
|
||||||
|
this.invalidationCount++;
|
||||||
|
await this.cache.del(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidate all keys matching a pattern and track the invalidation.
|
||||||
|
*/
|
||||||
|
protected async invalidatePattern(pattern: string): Promise<void> {
|
||||||
|
this.invalidationCount++;
|
||||||
|
await this.cache.delPattern(pattern);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current invalidation count.
|
||||||
|
*/
|
||||||
|
protected getInvalidationCount(): number {
|
||||||
|
return this.invalidationCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a snapshot of metrics for a specific bucket.
|
||||||
|
*/
|
||||||
|
protected getBucketMetrics(bucket: TBucket): CacheBucketMetrics {
|
||||||
|
return { ...this.metrics[bucket] };
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,6 +2,7 @@ import { Injectable, Inject } from "@nestjs/common";
|
|||||||
import { Logger } from "nestjs-pino";
|
import { Logger } from "nestjs-pino";
|
||||||
import { TransactionService, type TransactionOperation } from "./transaction.service.js";
|
import { TransactionService, type TransactionOperation } from "./transaction.service.js";
|
||||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||||
|
import { RollbackCoordinator } from "./rollback-coordinator.js";
|
||||||
|
|
||||||
export interface DistributedStep<TId extends string = string, TResult = unknown> {
|
export interface DistributedStep<TId extends string = string, TResult = unknown> {
|
||||||
id: TId;
|
id: TId;
|
||||||
@ -460,32 +461,19 @@ export class DistributedTransactionService {
|
|||||||
executedSteps: readonly string[],
|
executedSteps: readonly string[],
|
||||||
transactionId: string
|
transactionId: string
|
||||||
): Promise<number> {
|
): Promise<number> {
|
||||||
this.logger.warn(`Executing rollbacks for ${executedSteps.length} steps [${transactionId}]`);
|
// Build rollback actions only for executed steps that have a rollback function
|
||||||
|
const actions = executedSteps
|
||||||
|
.map(stepId => {
|
||||||
|
const step = steps.find(s => s.id === stepId);
|
||||||
|
if (!step?.rollback) return null;
|
||||||
|
return {
|
||||||
|
execute: step.rollback,
|
||||||
|
label: `step: ${stepId}`,
|
||||||
|
};
|
||||||
|
})
|
||||||
|
.filter((a): a is NonNullable<typeof a> => a !== null);
|
||||||
|
|
||||||
let rollbacksExecuted = 0;
|
return RollbackCoordinator.executeInReverse(actions, transactionId, this.logger);
|
||||||
|
|
||||||
for (let i = executedSteps.length - 1; i >= 0; i--) {
|
|
||||||
const stepId = executedSteps[i];
|
|
||||||
const step = steps.find(s => s.id === stepId);
|
|
||||||
|
|
||||||
if (step?.rollback) {
|
|
||||||
try {
|
|
||||||
this.logger.debug(`Executing rollback for step: ${stepId} [${transactionId}]`);
|
|
||||||
// eslint-disable-next-line no-await-in-loop -- Rollbacks must execute sequentially in reverse order (LIFO) to maintain consistency
|
|
||||||
await step.rollback();
|
|
||||||
rollbacksExecuted++;
|
|
||||||
this.logger.debug(`Rollback completed for step: ${stepId} [${transactionId}]`);
|
|
||||||
} catch (rollbackError) {
|
|
||||||
this.logger.error(`Rollback failed for step: ${stepId} [${transactionId}]`, {
|
|
||||||
error: extractErrorMessage(rollbackError),
|
|
||||||
});
|
|
||||||
// Continue with other rollbacks even if one fails
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.log(`Completed ${rollbacksExecuted} rollbacks [${transactionId}]`);
|
|
||||||
return rollbacksExecuted;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async executeStepWithRetry<TResult>(
|
private async executeStepWithRetry<TResult>(
|
||||||
|
|||||||
@ -3,3 +3,4 @@
|
|||||||
*/
|
*/
|
||||||
export * from "./transaction.service.js";
|
export * from "./transaction.service.js";
|
||||||
export * from "./distributed-transaction.service.js";
|
export * from "./distributed-transaction.service.js";
|
||||||
|
export * from "./rollback-coordinator.js";
|
||||||
|
|||||||
83
apps/bff/src/infra/database/services/rollback-coordinator.ts
Normal file
83
apps/bff/src/infra/database/services/rollback-coordinator.ts
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A rollback action that can be executed during transaction failure recovery.
|
||||||
|
*/
|
||||||
|
export interface RollbackAction {
|
||||||
|
/** Execute the rollback */
|
||||||
|
execute: () => Promise<void>;
|
||||||
|
/** Human-readable label for logging (e.g., step ID or index) */
|
||||||
|
label: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Logger interface for the RollbackCoordinator.
|
||||||
|
* Compatible with nestjs-pino Logger.
|
||||||
|
*/
|
||||||
|
export interface RollbackLogger {
|
||||||
|
warn(message: string, ...args: unknown[]): void;
|
||||||
|
debug(message: string, ...args: unknown[]): void;
|
||||||
|
error(message: string, ...args: unknown[]): void;
|
||||||
|
log(message: string, ...args: unknown[]): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Coordinates rollback execution across transaction services.
|
||||||
|
*
|
||||||
|
* Shared patterns:
|
||||||
|
* - LIFO (last-in-first-out) execution order
|
||||||
|
* - Continue on individual rollback failure (best-effort)
|
||||||
|
* - Structured logging of progress and failures
|
||||||
|
* - Returns count of successfully executed rollbacks
|
||||||
|
*
|
||||||
|
* Used by both TransactionService (database transactions with external rollbacks)
|
||||||
|
* and DistributedTransactionService (multi-step distributed transactions).
|
||||||
|
*/
|
||||||
|
export class RollbackCoordinator {
|
||||||
|
/**
|
||||||
|
* Execute rollback actions in reverse order (LIFO).
|
||||||
|
*
|
||||||
|
* Each rollback is attempted independently; a failure in one rollback
|
||||||
|
* does not prevent subsequent rollbacks from executing.
|
||||||
|
*
|
||||||
|
* @param actions - Ordered list of rollback actions (will be executed in reverse)
|
||||||
|
* @param transactionId - Transaction identifier for log correlation
|
||||||
|
* @param logger - Logger instance for structured output
|
||||||
|
* @returns Number of successfully executed rollbacks
|
||||||
|
*/
|
||||||
|
static async executeInReverse(
|
||||||
|
actions: readonly RollbackAction[],
|
||||||
|
transactionId: string,
|
||||||
|
logger: RollbackLogger
|
||||||
|
): Promise<number> {
|
||||||
|
if (actions.length === 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warn(`Executing ${actions.length} rollback actions [${transactionId}]`);
|
||||||
|
|
||||||
|
let rollbacksExecuted = 0;
|
||||||
|
|
||||||
|
for (let i = actions.length - 1; i >= 0; i--) {
|
||||||
|
const action = actions[i];
|
||||||
|
if (!action) continue;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// eslint-disable-next-line no-await-in-loop -- Rollbacks must execute sequentially in LIFO order
|
||||||
|
await action.execute();
|
||||||
|
rollbacksExecuted++;
|
||||||
|
logger.debug(`Rollback ${action.label} completed [${transactionId}]`);
|
||||||
|
} catch (rollbackError) {
|
||||||
|
logger.error(`Rollback ${action.label} failed [${transactionId}]`, {
|
||||||
|
error: extractErrorMessage(rollbackError),
|
||||||
|
});
|
||||||
|
// Continue with other rollbacks even if one fails
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.log(
|
||||||
|
`Completed ${rollbacksExecuted}/${actions.length} rollbacks [${transactionId}]`
|
||||||
|
);
|
||||||
|
return rollbacksExecuted;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -3,6 +3,7 @@ import { Logger } from "nestjs-pino";
|
|||||||
import { Prisma } from "@prisma/client";
|
import { Prisma } from "@prisma/client";
|
||||||
import { PrismaService } from "../prisma.service.js";
|
import { PrismaService } from "../prisma.service.js";
|
||||||
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
import { extractErrorMessage } from "@bff/core/utils/error.util.js";
|
||||||
|
import { RollbackCoordinator } from "./rollback-coordinator.js";
|
||||||
|
|
||||||
export interface TransactionContext {
|
export interface TransactionContext {
|
||||||
id: string;
|
id: string;
|
||||||
@ -21,8 +22,6 @@ export type TransactionOperation<T> = (
|
|||||||
context: TransactionContext & TransactionContextHelpers
|
context: TransactionContext & TransactionContextHelpers
|
||||||
) => Promise<T>;
|
) => Promise<T>;
|
||||||
|
|
||||||
export type SimpleTransactionOperation<T> = (tx: Prisma.TransactionClient) => Promise<T>;
|
|
||||||
|
|
||||||
export interface TransactionOptions {
|
export interface TransactionOptions {
|
||||||
/**
|
/**
|
||||||
* Maximum time to wait for transaction to complete (ms)
|
* Maximum time to wait for transaction to complete (ms)
|
||||||
@ -225,25 +224,6 @@ export class TransactionService {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute a simple database-only transaction (no external operations)
|
|
||||||
*/
|
|
||||||
async executeSimpleTransaction<T>(
|
|
||||||
operation: SimpleTransactionOperation<T>,
|
|
||||||
options: Omit<TransactionOptions, "autoRollback"> = {}
|
|
||||||
): Promise<T> {
|
|
||||||
const result = await this.executeTransaction(async tx => operation(tx), {
|
|
||||||
...options,
|
|
||||||
autoRollback: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!result.success) {
|
|
||||||
throw new Error(result.error || "Transaction failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
return result.data!;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async executeTransactionAttempt<T>(
|
private async executeTransactionAttempt<T>(
|
||||||
operation: TransactionOperation<T>,
|
operation: TransactionOperation<T>,
|
||||||
context: TransactionContext,
|
context: TransactionContext,
|
||||||
@ -288,33 +268,12 @@ export class TransactionService {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.warn(
|
const actions = context.rollbackActions.map((fn, i) => ({
|
||||||
`Executing ${context.rollbackActions.length} rollback actions [${context.id}]`
|
execute: fn,
|
||||||
);
|
label: `${i + 1}`,
|
||||||
|
}));
|
||||||
|
|
||||||
let rollbacksExecuted = 0;
|
return RollbackCoordinator.executeInReverse(actions, context.id, this.logger);
|
||||||
|
|
||||||
// Execute rollbacks in reverse order (LIFO)
|
|
||||||
for (let i = context.rollbackActions.length - 1; i >= 0; i--) {
|
|
||||||
const rollbackAction = context.rollbackActions[i];
|
|
||||||
if (!rollbackAction) continue;
|
|
||||||
try {
|
|
||||||
// eslint-disable-next-line no-await-in-loop -- Rollbacks must execute sequentially in LIFO order
|
|
||||||
await rollbackAction();
|
|
||||||
rollbacksExecuted++;
|
|
||||||
this.logger.debug(`Rollback ${i + 1} completed [${context.id}]`);
|
|
||||||
} catch (rollbackError) {
|
|
||||||
this.logger.error(`Rollback ${i + 1} failed [${context.id}]`, {
|
|
||||||
error: extractErrorMessage(rollbackError),
|
|
||||||
});
|
|
||||||
// Continue with other rollbacks even if one fails
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Completed ${rollbacksExecuted}/${context.rollbackActions.length} rollbacks [${context.id}]`
|
|
||||||
);
|
|
||||||
return rollbacksExecuted;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private isRetryableError(error: unknown): boolean {
|
private isRetryableError(error: unknown): boolean {
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import { Injectable } from "@nestjs/common";
|
import { Injectable } from "@nestjs/common";
|
||||||
import { CacheService } from "@bff/infra/cache/cache.service.js";
|
import { CacheService } from "@bff/infra/cache/cache.service.js";
|
||||||
|
import { CacheStrategyBase } from "@bff/infra/cache/cache-strategy.base.js";
|
||||||
import type { CacheBucketMetrics } from "@bff/infra/cache/cache.types.js";
|
import type { CacheBucketMetrics } from "@bff/infra/cache/cache.types.js";
|
||||||
import type { OrderDetails, OrderSummary } from "@customer-portal/domain/orders";
|
import type { OrderDetails, OrderSummary } from "@customer-portal/domain/orders";
|
||||||
|
|
||||||
@ -21,18 +22,13 @@ interface OrdersCacheMetrics {
|
|||||||
* - Metrics tracking: Monitors hits, misses, and invalidations
|
* - Metrics tracking: Monitors hits, misses, and invalidations
|
||||||
*/
|
*/
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class OrdersCacheService {
|
export class OrdersCacheService extends CacheStrategyBase<"summaries" | "details"> {
|
||||||
private readonly metrics: OrdersCacheMetrics = {
|
constructor(cache: CacheService) {
|
||||||
summaries: { hits: 0, misses: 0 },
|
super(cache, {
|
||||||
details: { hits: 0, misses: 0 },
|
summaries: { ttlSeconds: null },
|
||||||
invalidations: 0,
|
details: { ttlSeconds: null, allowNull: true },
|
||||||
};
|
});
|
||||||
|
}
|
||||||
// Request coalescing: Prevents duplicate API calls when multiple users
|
|
||||||
// request the same data after CDC invalidation
|
|
||||||
private readonly inflightRequests = new Map<string, Promise<unknown>>();
|
|
||||||
|
|
||||||
constructor(private readonly cache: CacheService) {}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get order summaries for an account (CDC-driven cache)
|
* Get order summaries for an account (CDC-driven cache)
|
||||||
@ -42,7 +38,7 @@ export class OrdersCacheService {
|
|||||||
fetcher: () => Promise<OrderSummary[]>
|
fetcher: () => Promise<OrderSummary[]>
|
||||||
): Promise<OrderSummary[]> {
|
): Promise<OrderSummary[]> {
|
||||||
const key = this.buildAccountKey(sfAccountId);
|
const key = this.buildAccountKey(sfAccountId);
|
||||||
return this.getOrSet<OrderSummary[]>("summaries", key, fetcher, false);
|
return this.getOrSet("summaries", key, fetcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -53,7 +49,7 @@ export class OrdersCacheService {
|
|||||||
fetcher: () => Promise<OrderDetails | null>
|
fetcher: () => Promise<OrderDetails | null>
|
||||||
): Promise<OrderDetails | null> {
|
): Promise<OrderDetails | null> {
|
||||||
const key = this.buildOrderKey(orderId);
|
const key = this.buildOrderKey(orderId);
|
||||||
return this.getOrSet<OrderDetails | null>("details", key, fetcher, true);
|
return this.getOrSet("details", key, fetcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -61,8 +57,7 @@ export class OrdersCacheService {
|
|||||||
*/
|
*/
|
||||||
async invalidateAccountOrders(sfAccountId: string): Promise<void> {
|
async invalidateAccountOrders(sfAccountId: string): Promise<void> {
|
||||||
const key = this.buildAccountKey(sfAccountId);
|
const key = this.buildAccountKey(sfAccountId);
|
||||||
this.metrics.invalidations++;
|
await this.invalidate(key);
|
||||||
await this.cache.del(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -70,8 +65,7 @@ export class OrdersCacheService {
|
|||||||
*/
|
*/
|
||||||
async invalidateOrder(orderId: string): Promise<void> {
|
async invalidateOrder(orderId: string): Promise<void> {
|
||||||
const key = this.buildOrderKey(orderId);
|
const key = this.buildOrderKey(orderId);
|
||||||
this.metrics.invalidations++;
|
await this.invalidate(key);
|
||||||
await this.cache.del(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -79,50 +73,12 @@ export class OrdersCacheService {
|
|||||||
*/
|
*/
|
||||||
getMetrics(): OrdersCacheMetrics {
|
getMetrics(): OrdersCacheMetrics {
|
||||||
return {
|
return {
|
||||||
summaries: { ...this.metrics.summaries },
|
summaries: this.getBucketMetrics("summaries"),
|
||||||
details: { ...this.metrics.details },
|
details: this.getBucketMetrics("details"),
|
||||||
invalidations: this.metrics.invalidations,
|
invalidations: this.getInvalidationCount(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private async getOrSet<T>(
|
|
||||||
bucket: keyof Pick<OrdersCacheMetrics, "summaries" | "details">,
|
|
||||||
key: string,
|
|
||||||
fetcher: () => Promise<T>,
|
|
||||||
allowNull: boolean
|
|
||||||
): Promise<T> {
|
|
||||||
// Check Redis cache first
|
|
||||||
const cached = await this.cache.get<T>(key);
|
|
||||||
|
|
||||||
if (cached !== null && (allowNull || cached !== null)) {
|
|
||||||
this.metrics[bucket].hits++;
|
|
||||||
return cached;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for in-flight request (prevents thundering herd)
|
|
||||||
const existingRequest = this.inflightRequests.get(key);
|
|
||||||
if (existingRequest) {
|
|
||||||
return existingRequest as Promise<T>;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch fresh data
|
|
||||||
this.metrics[bucket].misses++;
|
|
||||||
|
|
||||||
const fetchPromise = (async () => {
|
|
||||||
try {
|
|
||||||
const fresh = await fetcher();
|
|
||||||
const valueToStore = allowNull ? (fresh ?? null) : fresh;
|
|
||||||
await this.cache.set(key, valueToStore);
|
|
||||||
return fresh;
|
|
||||||
} finally {
|
|
||||||
this.inflightRequests.delete(key);
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
|
|
||||||
this.inflightRequests.set(key, fetchPromise);
|
|
||||||
return fetchPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
private buildAccountKey(sfAccountId: string): string {
|
private buildAccountKey(sfAccountId: string): string {
|
||||||
return `orders:account:${sfAccountId}`;
|
return `orders:account:${sfAccountId}`;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import { Injectable } from "@nestjs/common";
|
import { Injectable } from "@nestjs/common";
|
||||||
import { CacheService } from "@bff/infra/cache/cache.service.js";
|
import { CacheService } from "@bff/infra/cache/cache.service.js";
|
||||||
|
import { CacheStrategyBase } from "@bff/infra/cache/cache-strategy.base.js";
|
||||||
import type { CacheBucketMetrics } from "@bff/infra/cache/cache.types.js";
|
import type { CacheBucketMetrics } from "@bff/infra/cache/cache.types.js";
|
||||||
import type { SupportCaseList, CaseMessageList } from "@customer-portal/domain/support";
|
import type { SupportCaseList, CaseMessageList } from "@customer-portal/domain/support";
|
||||||
|
|
||||||
@ -36,17 +37,13 @@ interface SupportCacheMetrics {
|
|||||||
* - Metrics tracking: Monitors hits, misses, and invalidations
|
* - Metrics tracking: Monitors hits, misses, and invalidations
|
||||||
*/
|
*/
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class SupportCacheService {
|
export class SupportCacheService extends CacheStrategyBase<"caseList" | "messages"> {
|
||||||
private readonly metrics: SupportCacheMetrics = {
|
constructor(cache: CacheService) {
|
||||||
caseList: { hits: 0, misses: 0 },
|
super(cache, {
|
||||||
messages: { hits: 0, misses: 0 },
|
caseList: { ttlSeconds: CACHE_TTL.CASE_LIST },
|
||||||
invalidations: 0,
|
messages: { ttlSeconds: CACHE_TTL.CASE_MESSAGES },
|
||||||
};
|
});
|
||||||
|
}
|
||||||
// Request coalescing: Prevents duplicate API calls
|
|
||||||
private readonly inflightRequests = new Map<string, Promise<unknown>>();
|
|
||||||
|
|
||||||
constructor(private readonly cache: CacheService) {}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get cached case list for an account
|
* Get cached case list for an account
|
||||||
@ -56,7 +53,7 @@ export class SupportCacheService {
|
|||||||
fetcher: () => Promise<SupportCaseList>
|
fetcher: () => Promise<SupportCaseList>
|
||||||
): Promise<SupportCaseList> {
|
): Promise<SupportCaseList> {
|
||||||
const key = this.buildCaseListKey(sfAccountId);
|
const key = this.buildCaseListKey(sfAccountId);
|
||||||
return this.getOrSet("caseList", key, fetcher, CACHE_TTL.CASE_LIST);
|
return this.getOrSet("caseList", key, fetcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -67,7 +64,7 @@ export class SupportCacheService {
|
|||||||
fetcher: () => Promise<CaseMessageList>
|
fetcher: () => Promise<CaseMessageList>
|
||||||
): Promise<CaseMessageList> {
|
): Promise<CaseMessageList> {
|
||||||
const key = this.buildMessagesKey(caseId);
|
const key = this.buildMessagesKey(caseId);
|
||||||
return this.getOrSet("messages", key, fetcher, CACHE_TTL.CASE_MESSAGES);
|
return this.getOrSet("messages", key, fetcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -76,8 +73,7 @@ export class SupportCacheService {
|
|||||||
*/
|
*/
|
||||||
async invalidateCaseList(sfAccountId: string): Promise<void> {
|
async invalidateCaseList(sfAccountId: string): Promise<void> {
|
||||||
const key = this.buildCaseListKey(sfAccountId);
|
const key = this.buildCaseListKey(sfAccountId);
|
||||||
this.metrics.invalidations++;
|
await this.invalidate(key);
|
||||||
await this.cache.del(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -86,8 +82,7 @@ export class SupportCacheService {
|
|||||||
*/
|
*/
|
||||||
async invalidateCaseMessages(caseId: string): Promise<void> {
|
async invalidateCaseMessages(caseId: string): Promise<void> {
|
||||||
const key = this.buildMessagesKey(caseId);
|
const key = this.buildMessagesKey(caseId);
|
||||||
this.metrics.invalidations++;
|
await this.invalidate(key);
|
||||||
await this.cache.del(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -106,49 +101,12 @@ export class SupportCacheService {
|
|||||||
*/
|
*/
|
||||||
getMetrics(): SupportCacheMetrics {
|
getMetrics(): SupportCacheMetrics {
|
||||||
return {
|
return {
|
||||||
caseList: { ...this.metrics.caseList },
|
caseList: this.getBucketMetrics("caseList"),
|
||||||
messages: { ...this.metrics.messages },
|
messages: this.getBucketMetrics("messages"),
|
||||||
invalidations: this.metrics.invalidations,
|
invalidations: this.getInvalidationCount(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private async getOrSet<T>(
|
|
||||||
bucket: "caseList" | "messages",
|
|
||||||
key: string,
|
|
||||||
fetcher: () => Promise<T>,
|
|
||||||
ttlSeconds: number
|
|
||||||
): Promise<T> {
|
|
||||||
// Check Redis cache first
|
|
||||||
const cached = await this.cache.get<T>(key);
|
|
||||||
|
|
||||||
if (cached !== null) {
|
|
||||||
this.metrics[bucket].hits++;
|
|
||||||
return cached;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for in-flight request (prevents thundering herd)
|
|
||||||
const existingRequest = this.inflightRequests.get(key);
|
|
||||||
if (existingRequest) {
|
|
||||||
return existingRequest as Promise<T>;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch fresh data
|
|
||||||
this.metrics[bucket].misses++;
|
|
||||||
|
|
||||||
const fetchPromise = (async () => {
|
|
||||||
try {
|
|
||||||
const fresh = await fetcher();
|
|
||||||
await this.cache.set(key, fresh, ttlSeconds);
|
|
||||||
return fresh;
|
|
||||||
} finally {
|
|
||||||
this.inflightRequests.delete(key);
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
|
|
||||||
this.inflightRequests.set(key, fetchPromise);
|
|
||||||
return fetchPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
private buildCaseListKey(sfAccountId: string): string {
|
private buildCaseListKey(sfAccountId: string): string {
|
||||||
return `support:cases:${sfAccountId}`;
|
return `support:cases:${sfAccountId}`;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user