import { Injectable, Inject } from "@nestjs/common"; import { Logger } from "nestjs-pino"; import { Prisma } from "@prisma/client"; import { PrismaService } from "../prisma.service.js"; import { getErrorMessage } from "@bff/core/utils/error.util.js"; export interface TransactionContext { id: string; startTime: Date; operations: string[]; rollbackActions: (() => Promise)[]; } export interface TransactionContextHelpers { addOperation: (description: string) => void; addRollback: (rollbackFn: () => Promise) => void; } export type TransactionOperation = ( tx: Prisma.TransactionClient, context: TransactionContext & TransactionContextHelpers ) => Promise; export type SimpleTransactionOperation = (tx: Prisma.TransactionClient) => Promise; export interface TransactionOptions { /** * Maximum time to wait for transaction to complete (ms) * Default: 30 seconds */ timeout?: number; /** * Maximum number of retry attempts on serialization failures * Default: 3 */ maxRetries?: number; /** * Custom isolation level for the transaction * Default: ReadCommitted */ isolationLevel?: "ReadUncommitted" | "ReadCommitted" | "RepeatableRead" | "Serializable"; /** * Description of the transaction for logging */ description?: string; /** * Whether to automatically rollback external operations on database rollback * Default: true */ autoRollback?: boolean; } export interface TransactionResult { success: boolean; data?: T; error?: string; duration: number; operationsCount: number; rollbacksExecuted: number; } /** * Service for managing database transactions with external operation coordination * Provides atomic operations across database and external systems */ @Injectable() export class TransactionService { private readonly defaultTimeout = 30000; // 30 seconds private readonly defaultMaxRetries = 3; constructor( private readonly prisma: PrismaService, @Inject(Logger) private readonly logger: Logger ) {} /** * Execute operations within a database transaction with rollback support * * @example * ```typescript * const result = await this.transactionService.executeTransaction( * async (tx, context) => { * // Database operations * const user = await tx.user.create({ data: userData }); * * // External operations with rollback * const whmcsClient = await this.whmcsService.createClient(user.email); * context.addRollback(async () => { * await this.whmcsService.deleteClient(whmcsClient.id); * }); * * // Salesforce operations with rollback * const sfAccount = await this.salesforceService.createAccount(user); * context.addRollback(async () => { * await this.salesforceService.deleteAccount(sfAccount.Id); * }); * * return { user, whmcsClient, sfAccount }; * }, * { * description: "User signup with external integrations", * timeout: 60000 * } * ); * ``` */ async executeTransaction( operation: TransactionOperation, options: TransactionOptions = {} ): Promise> { const { timeout = this.defaultTimeout, maxRetries = this.defaultMaxRetries, isolationLevel = "ReadCommitted", description = "Database transaction", autoRollback = true, } = options; const transactionId = this.generateTransactionId(); const startTime = new Date(); let context: TransactionContext = { id: transactionId, startTime, operations: [], rollbackActions: [], }; this.logger.log(`Starting transaction [${transactionId}]`, { description, timeout, isolationLevel, maxRetries, }); let attempt = 0; let lastError: Error | null = null; while (attempt < maxRetries) { attempt++; try { // Reset context for retry attempts if (attempt > 1) { context = { id: transactionId, startTime, operations: [], rollbackActions: [], }; } const result = await Promise.race([ this.executeTransactionAttempt(operation, context, isolationLevel), this.createTimeoutPromise(timeout, transactionId), ]); const duration = Date.now() - startTime.getTime(); this.logger.log(`Transaction completed successfully [${transactionId}]`, { description, duration, attempt, operationsCount: context.operations.length, }); return { success: true, data: result, duration, operationsCount: context.operations.length, rollbacksExecuted: 0, }; } catch (error) { lastError = error as Error; const duration = Date.now() - startTime.getTime(); this.logger.error(`Transaction attempt ${attempt} failed [${transactionId}]`, { description, error: getErrorMessage(error), duration, operationsCount: context.operations.length, rollbackActionsCount: context.rollbackActions.length, }); // Execute rollbacks if this is the final attempt or not a retryable error if (attempt === maxRetries || !this.isRetryableError(error)) { const rollbacksExecuted = await this.executeRollbacks(context, autoRollback); return { success: false, error: getErrorMessage(error), duration, operationsCount: context.operations.length, rollbacksExecuted, }; } // Wait before retry (exponential backoff) await this.delay(Math.pow(2, attempt - 1) * 1000); } } // This should never be reached, but just in case const duration = Date.now() - startTime.getTime(); return { success: false, error: lastError ? getErrorMessage(lastError) : "Unknown transaction error", duration, operationsCount: context.operations.length, rollbacksExecuted: 0, }; } /** * Execute a simple database-only transaction (no external operations) */ async executeSimpleTransaction( operation: SimpleTransactionOperation, options: Omit = {} ): Promise { const result = await this.executeTransaction(async tx => operation(tx), { ...options, autoRollback: false, }); if (!result.success) { throw new Error(result.error || "Transaction failed"); } return result.data!; } private async executeTransactionAttempt( operation: TransactionOperation, context: TransactionContext, isolationLevel: Prisma.TransactionIsolationLevel ): Promise { return await this.prisma.$transaction( async tx => { // Enhance context with helper methods const enhancedContext = this.enhanceContext(context); // Execute the operation return await operation(tx, enhancedContext); }, { isolationLevel, timeout: 30000, // Prisma transaction timeout } ); } private enhanceContext( context: TransactionContext ): TransactionContext & TransactionContextHelpers { const helpers: TransactionContextHelpers = { addOperation: (description: string) => { context.operations.push(`${new Date().toISOString()}: ${description}`); }, addRollback: (rollbackFn: () => Promise) => { context.rollbackActions.push(rollbackFn); }, }; return Object.assign(context, helpers); } private async executeRollbacks( context: TransactionContext, autoRollback: boolean ): Promise { if (!autoRollback || context.rollbackActions.length === 0) { return 0; } this.logger.warn( `Executing ${context.rollbackActions.length} rollback actions [${context.id}]` ); let rollbacksExecuted = 0; // Execute rollbacks in reverse order (LIFO) for (let i = context.rollbackActions.length - 1; i >= 0; i--) { try { await context.rollbackActions[i](); rollbacksExecuted++; this.logger.debug(`Rollback ${i + 1} completed [${context.id}]`); } catch (rollbackError) { this.logger.error(`Rollback ${i + 1} failed [${context.id}]`, { error: getErrorMessage(rollbackError), }); // Continue with other rollbacks even if one fails } } this.logger.log( `Completed ${rollbacksExecuted}/${context.rollbackActions.length} rollbacks [${context.id}]` ); return rollbacksExecuted; } private isRetryableError(error: unknown): boolean { const errorMessage = getErrorMessage(error).toLowerCase(); // Retry on serialization failures, deadlocks, and temporary connection issues return ( errorMessage.includes("serialization failure") || errorMessage.includes("deadlock") || errorMessage.includes("connection") || errorMessage.includes("timeout") || errorMessage.includes("lock wait timeout") ); } private async createTimeoutPromise(timeout: number, transactionId: string): Promise { return new Promise((_, reject) => { setTimeout(() => { reject(new Error(`Transaction timeout after ${timeout}ms [${transactionId}]`)); }, timeout); }); } private async delay(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } private generateTransactionId(): string { return `tx_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`; } /** * Get transaction statistics for monitoring */ async getTransactionStats(): Promise<{ activeTransactions: number; totalTransactions: number; successRate: number; averageDuration: number; }> { return await Promise.resolve({ activeTransactions: 0, totalTransactions: 0, successRate: 0, averageDuration: 0, }); } }