From c79488a6a4ddaa78dc395c90c70875eb81bd77a3 Mon Sep 17 00:00:00 2001 From: barsa Date: Thu, 6 Nov 2025 13:26:30 +0900 Subject: [PATCH] Enhance Salesforce request handling and metrics tracking - Introduced new metrics for daily API usage, including dailyApiLimit and dailyUsagePercent, to monitor API consumption effectively. - Added route-level metrics tracking to capture request success and failure rates for better performance insights. - Implemented degradation state management to handle rate limits and usage thresholds, improving resilience during high load. - Enhanced SalesforceRequestQueueService to include detailed logging for route-level metrics, aiding in debugging and performance analysis. - Updated Salesforce module to export new SalesforceReadThrottleGuard for improved request rate limiting across services. - Refactored various services to utilize the new metrics and logging features, ensuring consistent behavior and improved maintainability. --- .../salesforce-request-queue.service.ts | 257 +++++++++++++++++- .../guards/salesforce-read-throttle.guard.ts | 32 +++ .../salesforce/salesforce.module.ts | 9 +- .../salesforce/salesforce.service.ts | 9 +- .../services/salesforce-account.service.ts | 39 ++- .../salesforce-connection.service.spec.ts | 8 +- .../services/salesforce-connection.service.ts | 69 ++++- .../services/salesforce-order.service.ts | 124 ++++++++- .../workflows/signup-workflow.service.ts | 113 ++++++-- .../auth/presentation/http/auth.controller.ts | 5 +- .../catalog/catalog-health.controller.ts | 22 ++ .../src/modules/catalog/catalog.controller.ts | 19 +- .../bff/src/modules/catalog/catalog.module.ts | 3 +- .../catalog/services/base-catalog.service.ts | 4 +- .../catalog/services/catalog-cache.service.ts | 104 ++++++- .../services/internet-catalog.service.ts | 14 +- .../src/modules/orders/orders.controller.ts | 3 + apps/bff/src/modules/orders/orders.module.ts | 4 + .../order-fulfillment-orchestrator.service.ts | 32 ++- .../services/order-item-builder.service.ts | 66 ++--- .../services/order-orchestrator.service.ts | 32 ++- .../services/order-pricebook.service.ts | 11 +- .../orders/services/orders-cache.service.ts | 130 +++++++++ docs/salesforce-shadow-sync-plan.md | 84 ++++++ 24 files changed, 1069 insertions(+), 124 deletions(-) create mode 100644 apps/bff/src/integrations/salesforce/guards/salesforce-read-throttle.guard.ts create mode 100644 apps/bff/src/modules/catalog/catalog-health.controller.ts create mode 100644 apps/bff/src/modules/orders/services/orders-cache.service.ts create mode 100644 docs/salesforce-shadow-sync-plan.md diff --git a/apps/bff/src/core/queue/services/salesforce-request-queue.service.ts b/apps/bff/src/core/queue/services/salesforce-request-queue.service.ts index b85b67b1..a19ca9a3 100644 --- a/apps/bff/src/core/queue/services/salesforce-request-queue.service.ts +++ b/apps/bff/src/core/queue/services/salesforce-request-queue.service.ts @@ -13,6 +13,10 @@ export interface SalesforceQueueMetrics { lastRequestTime?: Date; lastErrorTime?: Date; lastRateLimitTime?: Date; + dailyApiLimit?: number; + dailyUsagePercent?: number; + routeBreakdown?: Record; + degradation?: SalesforceDegradationSnapshot; } export interface SalesforceRequestOptions { @@ -21,6 +25,30 @@ export interface SalesforceRequestOptions { retryAttempts?: number; // Number of retry attempts retryDelay?: number; // Base delay between retries in ms isLongRunning?: boolean; // Mark as long-running request (>20s expected) + label?: string; // Logical label for route-level metrics +} + +interface SalesforceRouteMetricsInternal { + label: string; + totalRequests: number; + failedRequests: number; + lastSuccessTime?: Date; + lastErrorTime?: Date; +} + +interface SalesforceRouteMetricsSnapshot { + totalRequests: number; + failedRequests: number; + successRate: number; + lastSuccessTime?: Date; + lastErrorTime?: Date; +} + +interface SalesforceDegradationSnapshot { + degraded: boolean; + reason: "rate-limit" | "usage-threshold" | null; + cooldownExpiresAt?: Date; + usagePercent: number; } /** @@ -76,12 +104,24 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest private readonly executionTimes: number[] = []; private readonly maxMetricsHistory = 100; private dailyUsageResetTime: Date; + private readonly routeMetrics = new Map(); + private readonly usageWarningLevels = [0.7, 0.85, 0.95]; + private highestUsageWarningIssued = 0; + private readonly dailyApiLimit: number; + private readonly rateLimitCooldownMs: number; + private degradeState: { + until: Date | null; + reason: "rate-limit" | "usage-threshold" | null; + } = { until: null, reason: null }; constructor( @Inject(Logger) private readonly logger: Logger, private readonly configService: ConfigService ) { this.dailyUsageResetTime = this.getNextDayReset(); + this.dailyApiLimit = this.resolveDailyApiLimit(); + this.rateLimitCooldownMs = + this.parseNumericConfig(this.configService.get("SF_RATE_LIMIT_COOLDOWN_MS")) ?? 60000; } private async loadPQueue(): Promise { @@ -199,20 +239,23 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest options: SalesforceRequestOptions = {} ): Promise { const { standardQueue, longRunningQueue } = await this.ensureQueuesInitialized(); - // Check daily API usage + // Check daily usage this.checkDailyUsage(); const startTime = Date.now(); const requestId = this.generateRequestId(); const isLongRunning = options.isLongRunning || false; const queue = isLongRunning ? longRunningQueue : standardQueue; + const label = this.normalizeLabel(options.label); try { const result = (await queue.add( async () => { this.metrics.totalRequests++; this.metrics.dailyApiUsage++; + this.recordRouteStart(label); this.updateQueueMetrics(); + this.maybeWarnOnUsage(); this.logger.debug("Executing Salesforce request", { requestId, @@ -221,6 +264,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest pending: queue.pending, priority: options.priority || 0, dailyUsage: this.metrics.dailyApiUsage, + label, }); const waitTime = Date.now() - startTime; @@ -235,6 +279,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest this.recordExecutionTime(executionTime); this.metrics.completedRequests++; this.metrics.lastRequestTime = new Date(); + this.recordRouteSuccess(label); this.logger.debug("Salesforce request completed", { requestId, @@ -242,6 +287,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest waitTime, executionTime, totalTime: Date.now() - startTime, + label, }); return response; @@ -250,13 +296,16 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest this.recordExecutionTime(executionTime); this.metrics.failedRequests++; this.metrics.lastErrorTime = new Date(); + this.recordRouteFailure(label); // Check if it's a rate limit error if (this.isRateLimitError(error)) { this.metrics.lastRateLimitTime = new Date(); + this.activateDegradeWindow("rate-limit"); this.logger.warn("Salesforce rate limit encountered", { requestId, dailyUsage: this.metrics.dailyApiUsage, + label, }); } @@ -266,6 +315,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest waitTime, executionTime, error: error instanceof Error ? error.message : String(error), + label, }); throw error; @@ -285,15 +335,28 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest /** * Execute high-priority Salesforce request (jumps queue) */ - async executeHighPriority(requestFn: () => Promise, isLongRunning = false): Promise { - return this.execute(requestFn, { priority: 10, isLongRunning }); + async executeHighPriority( + requestFn: () => Promise, + options: SalesforceRequestOptions = {} + ): Promise { + return this.execute(requestFn, { + ...options, + priority: options.priority ?? 10, + }); } /** * Execute long-running Salesforce request (uses separate queue) */ - async executeLongRunning(requestFn: () => Promise): Promise { - return this.execute(requestFn, { isLongRunning: true, timeout: 600000 }); + async executeLongRunning( + requestFn: () => Promise, + options: SalesforceRequestOptions = {} + ): Promise { + return this.execute(requestFn, { + ...options, + isLongRunning: true, + timeout: options.timeout ?? 600000, + }); } /** @@ -301,7 +364,16 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest */ getMetrics(): SalesforceQueueMetrics { this.updateQueueMetrics(); - return { ...this.metrics }; + const dailyUsagePercent = + this.dailyApiLimit > 0 ? this.metrics.dailyApiUsage / this.dailyApiLimit : 0; + + return { + ...this.metrics, + dailyApiLimit: this.dailyApiLimit, + dailyUsagePercent, + routeBreakdown: this.getRouteMetricsSnapshot(), + degradation: this.getDegradationState(), + }; } /** @@ -320,9 +392,9 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest const errorRate = this.metrics.totalRequests > 0 ? this.metrics.failedRequests / this.metrics.totalRequests : 0; - // Estimate daily limit (conservative: 150,000 for ~50 users) - const estimatedDailyLimit = 150000; - const dailyUsagePercent = this.metrics.dailyApiUsage / estimatedDailyLimit; + const effectiveDailyLimit = this.dailyApiLimit > 0 ? this.dailyApiLimit : 150000; + const dailyUsagePercent = + effectiveDailyLimit > 0 ? this.metrics.dailyApiUsage / effectiveDailyLimit : 0; let status: "healthy" | "degraded" | "unhealthy" = "healthy"; @@ -350,13 +422,19 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest usage: number; resetTime: Date; hoursUntilReset: number; + limit: number; + usagePercent: number; } { + const limit = this.dailyApiLimit; + const usagePercent = limit > 0 ? this.metrics.dailyApiUsage / limit : 0; return { usage: this.metrics.dailyApiUsage, resetTime: this.dailyUsageResetTime, hoursUntilReset: Math.ceil( (this.dailyUsageResetTime.getTime() - Date.now()) / (1000 * 60 * 60) ), + limit, + usagePercent, }; } @@ -445,9 +523,12 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest if (now >= this.dailyUsageResetTime) { this.metrics.dailyApiUsage = 0; this.dailyUsageResetTime = this.getNextDayReset(); + this.highestUsageWarningIssued = 0; + this.degradeState = { until: null, reason: null }; this.logger.log("Daily Salesforce API usage reset", { resetTime: this.dailyUsageResetTime, + dailyApiLimit: this.dailyApiLimit, }); } } @@ -459,6 +540,33 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest return tomorrow; } + private resolveDailyApiLimit(): number { + const configuredLimit = this.parseNumericConfig(this.configService.get("SF_DAILY_API_LIMIT")); + if (configuredLimit && configuredLimit > 0) { + return configuredLimit; + } + + const userCount = + this.parseNumericConfig(this.configService.get("SF_FULL_USER_COUNT")) || + this.parseNumericConfig(this.configService.get("SF_USERS")) || + 50; + + return 100000 + userCount * 1000; + } + + private parseNumericConfig(value: unknown): number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value === "string" && value.trim().length > 0) { + const parsed = Number(value); + if (!Number.isNaN(parsed)) { + return parsed; + } + } + return undefined; + } + private setupQueueListeners(): void { if (!this.standardQueue || !this.longRunningQueue) { return; @@ -494,6 +602,8 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest return; } + this.clearDegradeWindowIfElapsed(); + this.metrics.queueSize = this.standardQueue.size + this.longRunningQueue.size; this.metrics.pendingRequests = this.standardQueue.pending + this.longRunningQueue.pending; @@ -526,4 +636,133 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest private generateRequestId(): string { return `sf_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } + + private normalizeLabel(label?: string): string { + if (!label) { + return "generic"; + } + const trimmed = label.trim(); + return trimmed.length > 0 ? trimmed : "generic"; + } + + private recordRouteStart(label: string): void { + const metric = this.getOrCreateRouteMetric(label); + metric.totalRequests += 1; + } + + private recordRouteSuccess(label: string): void { + const metric = this.getOrCreateRouteMetric(label); + metric.lastSuccessTime = new Date(); + } + + private recordRouteFailure(label: string): void { + const metric = this.getOrCreateRouteMetric(label); + metric.failedRequests += 1; + metric.lastErrorTime = new Date(); + } + + private getOrCreateRouteMetric(label: string): SalesforceRouteMetricsInternal { + const existing = this.routeMetrics.get(label); + if (existing) { + return existing; + } + + const metric: SalesforceRouteMetricsInternal = { + label, + totalRequests: 0, + failedRequests: 0, + }; + this.routeMetrics.set(label, metric); + return metric; + } + + private getRouteMetricsSnapshot(): Record { + const snapshot: Record = {}; + + for (const [label, metric] of this.routeMetrics.entries()) { + const total = metric.totalRequests; + const failures = metric.failedRequests; + const successRate = total > 0 ? (total - failures) / total : 1; + + snapshot[label] = { + totalRequests: total, + failedRequests: failures, + successRate, + lastSuccessTime: metric.lastSuccessTime, + lastErrorTime: metric.lastErrorTime, + }; + } + + return snapshot; + } + + private maybeWarnOnUsage(): void { + if (this.dailyApiLimit <= 0) { + return; + } + + const usagePercent = this.metrics.dailyApiUsage / this.dailyApiLimit; + if (usagePercent >= 0.95) { + this.activateDegradeWindow("usage-threshold"); + } + + const threshold = this.usageWarningLevels + .slice() + .reverse() + .find(level => usagePercent >= level && level > this.highestUsageWarningIssued); + + if (threshold !== undefined) { + this.highestUsageWarningIssued = threshold; + this.logger.warn("Salesforce daily API usage approaching limit", { + usage: this.metrics.dailyApiUsage, + limit: this.dailyApiLimit, + usagePercent, + }); + } + } + + getDegradationState(): SalesforceDegradationSnapshot { + this.clearDegradeWindowIfElapsed(); + const usagePercent = this.dailyApiLimit > 0 ? this.metrics.dailyApiUsage / this.dailyApiLimit : 0; + return { + degraded: this.degradeState.until !== null, + reason: this.degradeState.reason, + cooldownExpiresAt: this.degradeState.until ?? undefined, + usagePercent, + }; + } + + isDegraded(): boolean { + return this.getDegradationState().degraded; + } + + shouldThrottleLowPriority(): boolean { + return this.isDegraded(); + } + + private activateDegradeWindow(reason: "rate-limit" | "usage-threshold"): void { + if (this.rateLimitCooldownMs <= 0) { + return; + } + + const now = Date.now(); + const newUntil = new Date(now + this.rateLimitCooldownMs); + const active = this.degradeState.until && now < this.degradeState.until.getTime(); + + if (!active || this.degradeState.reason !== reason) { + this.logger.warn("Salesforce circuit breaker engaged", { + reason, + cooldownMs: this.rateLimitCooldownMs, + until: newUntil, + }); + } + + this.degradeState = { until: newUntil, reason }; + } + + private clearDegradeWindowIfElapsed(): void { + if (this.degradeState.until && Date.now() >= this.degradeState.until.getTime()) { + this.degradeState = { until: null, reason: null }; + } + } } diff --git a/apps/bff/src/integrations/salesforce/guards/salesforce-read-throttle.guard.ts b/apps/bff/src/integrations/salesforce/guards/salesforce-read-throttle.guard.ts new file mode 100644 index 00000000..7134c724 --- /dev/null +++ b/apps/bff/src/integrations/salesforce/guards/salesforce-read-throttle.guard.ts @@ -0,0 +1,32 @@ +import { CanActivate, ExecutionContext, Inject, Injectable, TooManyRequestsException } from "@nestjs/common"; +import type { Request } from "express"; +import { Logger } from "nestjs-pino"; +import { SalesforceRequestQueueService } from "@bff/core/queue/services/salesforce-request-queue.service"; + +@Injectable() +export class SalesforceReadThrottleGuard implements CanActivate { + constructor( + private readonly queue: SalesforceRequestQueueService, + @Inject(Logger) private readonly logger: Logger + ) {} + + canActivate(context: ExecutionContext): boolean { + const state = this.queue.getDegradationState(); + if (!state.degraded) { + return true; + } + + const request = context.switchToHttp().getRequest(); + this.logger.warn("Throttling Salesforce-backed read due to degraded state", { + path: request?.originalUrl ?? request?.url, + reason: state.reason, + cooldownExpiresAt: state.cooldownExpiresAt, + usagePercent: state.usagePercent, + }); + + throw new TooManyRequestsException( + "We're experiencing high load right now. Please try again in a moment." + ); + } +} + diff --git a/apps/bff/src/integrations/salesforce/salesforce.module.ts b/apps/bff/src/integrations/salesforce/salesforce.module.ts index d0204301..9dfee43a 100644 --- a/apps/bff/src/integrations/salesforce/salesforce.module.ts +++ b/apps/bff/src/integrations/salesforce/salesforce.module.ts @@ -6,6 +6,7 @@ import { SalesforceConnection } from "./services/salesforce-connection.service"; import { SalesforceAccountService } from "./services/salesforce-account.service"; import { SalesforceOrderService } from "./services/salesforce-order.service"; import { OrderFieldConfigModule } from "@bff/modules/orders/config/order-field-config.module"; +import { SalesforceReadThrottleGuard } from "./guards/salesforce-read-throttle.guard"; @Module({ imports: [QueueModule, ConfigModule, OrderFieldConfigModule], @@ -14,7 +15,13 @@ import { OrderFieldConfigModule } from "@bff/modules/orders/config/order-field-c SalesforceAccountService, SalesforceOrderService, SalesforceService, + SalesforceReadThrottleGuard, + ], + exports: [ + SalesforceService, + SalesforceConnection, + SalesforceOrderService, + SalesforceReadThrottleGuard, ], - exports: [SalesforceService, SalesforceConnection, SalesforceOrderService], }) export class SalesforceModule {} diff --git a/apps/bff/src/integrations/salesforce/salesforce.service.ts b/apps/bff/src/integrations/salesforce/salesforce.service.ts index 229cc74c..4f56842e 100644 --- a/apps/bff/src/integrations/salesforce/salesforce.service.ts +++ b/apps/bff/src/integrations/salesforce/salesforce.service.ts @@ -56,6 +56,12 @@ export class SalesforceService implements OnModuleInit { return this.accountService.findByCustomerNumber(customerNumber); } + async findAccountWithDetailsByCustomerNumber( + customerNumber: string + ): Promise<{ id: string; WH_Account__c?: string | null; Name?: string | null } | null> { + return this.accountService.findWithDetailsByCustomerNumber(customerNumber); + } + async getAccountDetails( accountId: string ): Promise<{ id: string; WH_Account__c?: string | null; Name?: string | null } | null> { @@ -122,7 +128,8 @@ export class SalesforceService implements OnModuleInit { AccountId, Account.Name FROM Order WHERE Id = '${orderId}' - LIMIT 1` + LIMIT 1`, + { label: "orders:integration:getOrder" } )) as { records: SalesforceOrderRecord[]; totalSize: number }; return result.records?.[0] || null; diff --git a/apps/bff/src/integrations/salesforce/services/salesforce-account.service.ts b/apps/bff/src/integrations/salesforce/services/salesforce-account.service.ts index 51d45717..8211b823 100644 --- a/apps/bff/src/integrations/salesforce/services/salesforce-account.service.ts +++ b/apps/bff/src/integrations/salesforce/services/salesforce-account.service.ts @@ -29,7 +29,8 @@ export class SalesforceAccountService { try { const result = (await this.connection.query( - `SELECT Id FROM Account WHERE SF_Account_No__c = '${this.safeSoql(validCustomerNumber)}'` + `SELECT Id FROM Account WHERE SF_Account_No__c = '${this.safeSoql(validCustomerNumber)}'`, + { label: "auth:findAccountByCustomerNumber" } )) as SalesforceResponse; return result.totalSize > 0 ? { id: result.records[0]?.Id ?? "" } : null; } catch (error) { @@ -40,6 +41,39 @@ export class SalesforceAccountService { } } + async findWithDetailsByCustomerNumber( + customerNumber: string + ): Promise<{ + id: string; + Name?: string | null; + WH_Account__c?: string | null; + } | null> { + const validCustomerNumber = customerNumberSchema.parse(customerNumber); + + try { + const result = (await this.connection.query( + `SELECT Id, Name, WH_Account__c FROM Account WHERE SF_Account_No__c = '${this.safeSoql(validCustomerNumber)}'`, + { label: "auth:findAccountWithDetails" } + )) as SalesforceResponse; + + if (result.totalSize === 0) { + return null; + } + + const record = result.records[0]; + return { + id: record?.Id ?? "", + Name: record?.Name, + WH_Account__c: record?.WH_Account__c || undefined, + }; + } catch (error) { + this.logger.error("Failed to find account with details by customer number", { + error: getErrorMessage(error), + }); + throw new Error("Failed to find account"); + } + } + /** * Get account details including WH_Account__c field * Used in signup workflow to check if account is already linked to WHMCS @@ -51,7 +85,8 @@ export class SalesforceAccountService { try { const result = (await this.connection.query( - `SELECT Id, Name, WH_Account__c FROM Account WHERE Id = '${this.safeSoql(validAccountId)}'` + `SELECT Id, Name, WH_Account__c FROM Account WHERE Id = '${this.safeSoql(validAccountId)}'`, + { label: "auth:getAccountDetails" } )) as SalesforceResponse; if (result.totalSize === 0) { diff --git a/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.spec.ts b/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.spec.ts index 82be45aa..50bb1bc7 100644 --- a/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.spec.ts +++ b/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.spec.ts @@ -53,7 +53,11 @@ describe("SalesforceConnection", () => { expect(requestQueue.execute).toHaveBeenCalledTimes(1); const [, options] = (requestQueue.execute as jest.Mock).mock.calls[0]; - expect(options).toMatchObject({ priority: 8, isLongRunning: false }); + expect(options).toMatchObject({ + priority: 8, + isLongRunning: false, + label: "salesforce:query:account", + }); expect(queryMock).toHaveBeenCalledTimes(1); }); @@ -64,5 +68,7 @@ describe("SalesforceConnection", () => { await sobject.create({ Name: "Test" }); expect(requestQueue.executeHighPriority).toHaveBeenCalledTimes(1); + const [, options] = (requestQueue.executeHighPriority as jest.Mock).mock.calls[0]; + expect(options).toMatchObject({ label: "salesforce:sobject:Order:create" }); }); }); diff --git a/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.ts b/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.ts index b2241f66..af76227f 100644 --- a/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.ts +++ b/apps/bff/src/integrations/salesforce/services/salesforce-connection.service.ts @@ -241,9 +241,10 @@ export class SalesforceConnection { } // Expose connection methods with automatic re-authentication - async query(soql: string): Promise { + async query(soql: string, options: { label?: string } = {}): Promise { const priority = this.getQueryPriority(soql); const isLongRunning = this.isLongRunningQuery(soql); + const label = options.label ?? this.deriveQueryLabel(soql); try { return await this.requestQueue.execute( @@ -283,7 +284,7 @@ export class SalesforceConnection { throw error; } }, - { priority, isLongRunning } + { priority, isLongRunning, label } ); } catch (error: unknown) { throw error; @@ -351,7 +352,7 @@ export class SalesforceConnection { } throw error; } - }); + }, { label: `salesforce:sobject:${type}:create` }); }, update: async (data: object & { Id: string }) => { @@ -396,11 +397,66 @@ export class SalesforceConnection { } throw error; } - }); + }, { label: `salesforce:sobject:${type}:update` }); }, }; } + async compositeTree( + sobjectType: string, + body: unknown, + options: { allOrNone?: boolean; priority?: number; label?: string } = {} + ): Promise { + const priority = options.priority ?? 5; + const allOrNone = options.allOrNone ?? true; + const path = this.buildCompositeTreePath(sobjectType, allOrNone); + const label = options.label ?? `salesforce:composite:${sobjectType}`; + + return this.requestQueue.execute(async () => { + await this.ensureConnected(); + + try { + return (await this.connection.requestPost(path, body)) as T; + } catch (error) { + if (this.isSessionExpiredError(error)) { + const reAuthStartTime = Date.now(); + this.logger.warn("Salesforce session expired during composite tree request, retrying", { + sobjectType, + originalError: getErrorMessage(error), + }); + + await this.connect(true); + const reAuthDuration = Date.now() - reAuthStartTime; + + this.logger.debug("Retrying composite tree request after re-authentication", { + sobjectType, + reAuthDuration, + }); + + return (await this.connection.requestPost(path, body)) as T; + } + + throw error; + } + }, { priority, label }); + } + + private buildCompositeTreePath(sobjectType: string, allOrNone: boolean): string { + const version = this.connection.version || "58.0"; + const normalizedVersion = version.startsWith("v") ? version.slice(1) : version; + const encodedType = encodeURIComponent(sobjectType); + const query = `?allOrNone=${allOrNone ? "true" : "false"}`; + return `/services/data/v${normalizedVersion}/composite/tree/${encodedType}${query}`; + } + + private deriveQueryLabel(soql: string): string { + const match = /from\s+([a-z0-9_]+)/i.exec(soql); + if (!match) { + return "salesforce:query:unknown"; + } + return `salesforce:query:${match[1]}`; + } + isConnected(): boolean { return !!this.connection.accessToken; } @@ -449,7 +505,8 @@ export class SalesforceConnection { /** * Execute a high-priority Salesforce request (jumps queue) */ - async queryHighPriority(soql: string): Promise { + async queryHighPriority(soql: string, options: { label?: string } = {}): Promise { + const label = options.label ?? `${this.deriveQueryLabel(soql)}:high`; return this.requestQueue.executeHighPriority(async () => { await this.ensureConnected(); try { @@ -474,7 +531,7 @@ export class SalesforceConnection { } throw error; } - }); + }, { label }); } /** diff --git a/apps/bff/src/integrations/salesforce/services/salesforce-order.service.ts b/apps/bff/src/integrations/salesforce/services/salesforce-order.service.ts index f1c7d4f4..1fcf61f3 100644 --- a/apps/bff/src/integrations/salesforce/services/salesforce-order.service.ts +++ b/apps/bff/src/integrations/salesforce/services/salesforce-order.service.ts @@ -40,6 +40,8 @@ export class SalesforceOrderService { private readonly orderFieldMap: OrderFieldMapService ) {} + private readonly compositeOrderReference = "order_ref"; + /** * Get order by ID with full details including order items */ @@ -76,8 +78,12 @@ export class SalesforceOrderService { try { // Execute queries in parallel const [orderResult, itemsResult] = await Promise.all([ - this.sf.query(orderSoql) as Promise>, - this.sf.query(orderItemsSoql) as Promise>, + this.sf.query(orderSoql, { label: "orders:getOrderById" }) as Promise< + SalesforceResponse + >, + this.sf.query(orderItemsSoql, { label: "orders:getOrderItemsById" }) as Promise< + SalesforceResponse + >, ]); const order = orderResult.records?.[0]; @@ -129,6 +135,84 @@ export class SalesforceOrderService { } } + async createOrderWithItems( + orderFields: Record, + items: Array<{ pricebookEntryId: string; unitPrice: number; quantity: number; sku?: string }> + ): Promise<{ id: string }> { + if (!items.length) { + this.logger.warn( + "No order items provided for composite order creation; falling back to single order create" + ); + return this.createOrder(orderFields); + } + + const typeField = this.orderFieldMap.fields.order.type; + const payload = { + records: [ + { + attributes: { type: "Order", referenceId: this.compositeOrderReference }, + ...orderFields, + OrderItems: { + records: items.map((item, index) => ({ + attributes: { type: "OrderItem", referenceId: `order_item_${index + 1}` }, + PricebookEntryId: item.pricebookEntryId, + Quantity: item.quantity, + UnitPrice: item.unitPrice, + })), + }, + }, + ], + } satisfies SalesforceCompositeTreeRequest; + + try { + const result = await this.sf.compositeTree( + "Order", + payload, + { allOrNone: true, priority: 8, label: "orders:createWithItems" } + ); + + if (result.hasErrors) { + const errorDetails = this.flattenCompositeErrors(result) + .map(err => `[${err.statusCode}] ${err.message}`) + .join("; "); + + throw new Error( + errorDetails || "Salesforce composite tree returned errors during order creation" + ); + } + + const orderResult = result.results.find( + entry => entry.referenceId === this.compositeOrderReference + ); + + if (!orderResult?.id) { + throw new Error("Salesforce composite tree response missing order ID"); + } + + this.logger.log( + { + orderId: orderResult.id, + orderType: orderFields[typeField], + orderItems: items.length, + }, + "Salesforce order created via composite tree" + ); + + return { id: orderResult.id }; + } catch (error: unknown) { + this.logger.error("Failed to create Salesforce order with composite tree", { + error: getErrorMessage(error), + orderType: orderFields[typeField], + orderItems: items.length, + }); + throw error; + } + } + + private flattenCompositeErrors(response: SalesforceCompositeTreeResponse) { + return (response.results ?? []).flatMap(entry => entry.errors ?? []); + } + /** * Get orders for a Salesforce account with item summaries */ @@ -158,9 +242,9 @@ export class SalesforceOrderService { try { // Fetch orders - const ordersResult = (await this.sf.query( - ordersSoql - )) as SalesforceResponse; + const ordersResult = (await this.sf.query(ordersSoql, { + label: "orders:listByAccount", + })) as SalesforceResponse; const orders = ordersResult.records || []; if (orders.length === 0) { @@ -186,9 +270,9 @@ export class SalesforceOrderService { ORDER BY CreatedDate ASC `; - const itemsResult = (await this.sf.query( - itemsSoql - )) as SalesforceResponse; + const itemsResult = (await this.sf.query(itemsSoql, { + label: "orders:listItemsByAccount", + })) as SalesforceResponse; const allItems = itemsResult.records || []; // Group items by order ID @@ -233,3 +317,27 @@ export class SalesforceOrderService { } } } + +interface SalesforceCompositeTreeRequest { + records: Array<{ + attributes: { type: string; referenceId: string }; + OrderItems?: { + records: Array<{ + attributes: { type: string; referenceId: string }; + PricebookEntryId: string; + Quantity: number; + UnitPrice: number; + }>; + }; + [key: string]: unknown; + }>; +} + +interface SalesforceCompositeTreeResponse { + hasErrors: boolean; + results: Array<{ + referenceId?: string; + id?: string; + errors?: Array<{ statusCode: string; message: string; fields?: string[] }>; + }>; +} diff --git a/apps/bff/src/modules/auth/infra/workflows/workflows/signup-workflow.service.ts b/apps/bff/src/modules/auth/infra/workflows/workflows/signup-workflow.service.ts index 18c297a2..ac0549fc 100644 --- a/apps/bff/src/modules/auth/infra/workflows/workflows/signup-workflow.service.ts +++ b/apps/bff/src/modules/auth/infra/workflows/workflows/signup-workflow.service.ts @@ -32,8 +32,21 @@ type _SanitizedPrismaUser = Omit< "passwordHash" | "failedLoginAttempts" | "lockedUntil" >; +interface SignupAccountSnapshot { + id: string; + Name?: string | null; + WH_Account__c?: string | null; +} + @Injectable() export class SignupWorkflowService { + private readonly accountLookupCache = new Map< + string, + { value: SignupAccountSnapshot | null; expiresAt: number } + >(); + private readonly accountCacheTtlMs = 30_000; + private readonly accountCacheMaxEntries = 500; + constructor( private readonly usersFacade: UsersFacade, private readonly mappingsService: MappingsService, @@ -51,8 +64,8 @@ export class SignupWorkflowService { const { sfNumber } = validateData; try { - const sfAccount = await this.salesforceService.findAccountByCustomerNumber(sfNumber); - if (!sfAccount) { + const accountSnapshot = await this.getAccountSnapshot(sfNumber); + if (!accountSnapshot) { await this.auditService.logAuthEvent( AuditAction.SIGNUP, undefined, @@ -64,12 +77,12 @@ export class SignupWorkflowService { throw new BadRequestException("Customer number not found in Salesforce"); } - const existingMapping = await this.mappingsService.findBySfAccountId(sfAccount.id); + const existingMapping = await this.mappingsService.findBySfAccountId(accountSnapshot.id); if (existingMapping) { await this.auditService.logAuthEvent( AuditAction.SIGNUP, undefined, - { sfNumber, sfAccountId: sfAccount.id, reason: "Already has mapping" }, + { sfNumber, sfAccountId: accountSnapshot.id, reason: "Already has mapping" }, request, false, "Customer number already registered" @@ -79,15 +92,14 @@ export class SignupWorkflowService { ); } - const accountDetails = await this.salesforceService.getAccountDetails(sfAccount.id); - if (accountDetails?.WH_Account__c && accountDetails.WH_Account__c.trim() !== "") { + if (accountSnapshot.WH_Account__c && accountSnapshot.WH_Account__c.trim() !== "") { await this.auditService.logAuthEvent( AuditAction.SIGNUP, undefined, { sfNumber, - sfAccountId: sfAccount.id, - whAccount: accountDetails.WH_Account__c, + sfAccountId: accountSnapshot.id, + whAccount: accountSnapshot.WH_Account__c, reason: "WH Account not empty", }, request, @@ -102,14 +114,14 @@ export class SignupWorkflowService { await this.auditService.logAuthEvent( AuditAction.SIGNUP, undefined, - { sfNumber, sfAccountId: sfAccount.id, step: "validation" }, + { sfNumber, sfAccountId: accountSnapshot.id, step: "validation" }, request, true ); return { valid: true, - sfAccountId: sfAccount.id, + sfAccountId: accountSnapshot.id, message: "Customer number validated successfully", }; } catch (error) { @@ -176,13 +188,19 @@ export class SignupWorkflowService { const passwordHash = await bcrypt.hash(password, saltRounds); try { - const sfAccount = await this.salesforceService.findAccountByCustomerNumber(sfNumber); - if (!sfAccount) { + const accountSnapshot = await this.getAccountSnapshot(sfNumber); + if (!accountSnapshot) { throw new BadRequestException( `Salesforce account not found for Customer Number: ${sfNumber}` ); } + if (accountSnapshot.WH_Account__c && accountSnapshot.WH_Account__c.trim() !== "") { + throw new ConflictException( + "You already have an account. Please use the login page to access your existing account." + ); + } + let whmcsClient: { clientId: number }; try { try { @@ -289,7 +307,7 @@ export class SignupWorkflowService { data: { userId: created.id, whmcsClientId: whmcsClient.clientId, - sfAccountId: sfAccount.id, + sfAccountId: accountSnapshot.id, }, }); @@ -415,15 +433,15 @@ export class SignupWorkflowService { return result; } - const sfAccount = await this.salesforceService.findAccountByCustomerNumber(sfNumber); - if (!sfAccount) { + const accountSnapshot = await this.getAccountSnapshot(sfNumber); + if (!accountSnapshot) { result.nextAction = "fix_input"; result.messages.push("Customer number not found in Salesforce"); return result; } - result.salesforce.accountId = sfAccount.id; + result.salesforce.accountId = accountSnapshot.id; - const existingMapping = await this.mappingsService.findBySfAccountId(sfAccount.id); + const existingMapping = await this.mappingsService.findBySfAccountId(accountSnapshot.id); if (existingMapping) { result.salesforce.alreadyMapped = true; result.nextAction = "login"; @@ -468,4 +486,65 @@ export class SignupWorkflowService { result.messages.push("All checks passed. Ready to create your account."); return result; } + + private async getAccountSnapshot(sfNumber: string): Promise { + const normalized = this.normalizeCustomerNumber(sfNumber); + if (!normalized) { + return null; + } + + const now = Date.now(); + this.pruneExpiredAccountSnapshots(now); + + const cached = this.accountLookupCache.get(normalized); + if (cached && cached.expiresAt > now) { + return cached.value; + } + + if (cached) { + this.accountLookupCache.delete(normalized); + } + + const resolved = await this.salesforceService.findAccountWithDetailsByCustomerNumber(normalized); + if (resolved && resolved.id) { + this.storeAccountSnapshot(normalized, resolved); + return resolved; + } + + this.storeAccountSnapshot(normalized, null); + return null; + } + + private normalizeCustomerNumber(sfNumber: string): string | null { + if (typeof sfNumber !== "string") { + return null; + } + const trimmed = sfNumber.trim(); + return trimmed.length > 0 ? trimmed : null; + } + + private pruneExpiredAccountSnapshots(referenceTime: number): void { + for (const [key, entry] of this.accountLookupCache.entries()) { + if (entry.expiresAt <= referenceTime) { + this.accountLookupCache.delete(key); + } + } + } + + private storeAccountSnapshot( + sfNumber: string, + snapshot: SignupAccountSnapshot | null + ): void { + this.accountLookupCache.set(sfNumber, { + value: snapshot, + expiresAt: Date.now() + this.accountCacheTtlMs, + }); + + if (this.accountLookupCache.size > this.accountCacheMaxEntries) { + const oldestKey = this.accountLookupCache.keys().next().value; + if (typeof oldestKey === "string") { + this.accountLookupCache.delete(oldestKey); + } + } + } } diff --git a/apps/bff/src/modules/auth/presentation/http/auth.controller.ts b/apps/bff/src/modules/auth/presentation/http/auth.controller.ts index d4b3eb76..4d931284 100644 --- a/apps/bff/src/modules/auth/presentation/http/auth.controller.ts +++ b/apps/bff/src/modules/auth/presentation/http/auth.controller.ts @@ -24,6 +24,7 @@ import { LoginResultInterceptor } from "./interceptors/login-result.interceptor" import { Public } from "../../decorators/public.decorator"; import { ZodValidationPipe } from "@customer-portal/validation/nestjs"; import type { RequestWithUser } from "@bff/modules/auth/auth.types"; +import { SalesforceReadThrottleGuard } from "@bff/integrations/salesforce/guards/salesforce-read-throttle.guard"; // Import Zod schemas from domain import { @@ -132,7 +133,7 @@ export class AuthController { @Public() @Post("validate-signup") - @UseGuards(AuthThrottleGuard) + @UseGuards(AuthThrottleGuard, SalesforceReadThrottleGuard) @Throttle({ default: { limit: 20, ttl: 600 } }) // 20 validations per 10 minutes per IP @UsePipes(new ZodValidationPipe(validateSignupRequestSchema)) async validateSignup(@Body() validateData: ValidateSignupRequest, @Req() req: Request) { @@ -147,7 +148,7 @@ export class AuthController { @Public() @Post("signup-preflight") - @UseGuards(AuthThrottleGuard) + @UseGuards(AuthThrottleGuard, SalesforceReadThrottleGuard) @Throttle({ default: { limit: 20, ttl: 600 } }) // 20 validations per 10 minutes per IP @UsePipes(new ZodValidationPipe(signupRequestSchema)) @HttpCode(200) diff --git a/apps/bff/src/modules/catalog/catalog-health.controller.ts b/apps/bff/src/modules/catalog/catalog-health.controller.ts new file mode 100644 index 00000000..cf7e7b10 --- /dev/null +++ b/apps/bff/src/modules/catalog/catalog-health.controller.ts @@ -0,0 +1,22 @@ +import { Controller, Get } from "@nestjs/common"; +import { CatalogCacheService } from "./services/catalog-cache.service"; + +@Controller("health/catalog") +export class CatalogHealthController { + constructor(private readonly catalogCache: CatalogCacheService) {} + + @Get("cache") + getCacheMetrics() { + return { + timestamp: new Date().toISOString(), + metrics: this.catalogCache.getMetrics(), + ttl: { + catalogSeconds: 3600, + eligibilitySeconds: 900, + staticSeconds: 900, + volatileSeconds: 60, + }, + }; + } +} + diff --git a/apps/bff/src/modules/catalog/catalog.controller.ts b/apps/bff/src/modules/catalog/catalog.controller.ts index 0f494de0..6b6eebd2 100644 --- a/apps/bff/src/modules/catalog/catalog.controller.ts +++ b/apps/bff/src/modules/catalog/catalog.controller.ts @@ -15,9 +15,10 @@ import { import { InternetCatalogService } from "./services/internet-catalog.service"; import { SimCatalogService } from "./services/sim-catalog.service"; import { VpnCatalogService } from "./services/vpn-catalog.service"; +import { SalesforceReadThrottleGuard } from "@bff/integrations/salesforce/guards/salesforce-read-throttle.guard"; @Controller("catalog") -@UseGuards(ThrottlerGuard) +@UseGuards(SalesforceReadThrottleGuard, ThrottlerGuard) export class CatalogController { constructor( private internetCatalog: InternetCatalogService, @@ -27,7 +28,7 @@ export class CatalogController { @Get("internet/plans") @Throttle({ default: { limit: 20, ttl: 60 } }) // 20 requests per minute - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getInternetPlans(@Request() req: RequestWithUser): Promise<{ plans: InternetPlanCatalogItem[]; installations: InternetInstallationCatalogItem[]; @@ -49,20 +50,20 @@ export class CatalogController { } @Get("internet/addons") - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getInternetAddons(): Promise { return this.internetCatalog.getAddons(); } @Get("internet/installations") - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getInternetInstallations(): Promise { return this.internetCatalog.getInstallations(); } @Get("sim/plans") @Throttle({ default: { limit: 20, ttl: 60 } }) // 20 requests per minute - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getSimCatalogData(@Request() req: RequestWithUser): Promise { const userId = req.user?.id; if (!userId) { @@ -83,26 +84,26 @@ export class CatalogController { } @Get("sim/activation-fees") - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getSimActivationFees(): Promise { return this.simCatalog.getActivationFees(); } @Get("sim/addons") - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getSimAddons(): Promise { return this.simCatalog.getAddons(); } @Get("vpn/plans") @Throttle({ default: { limit: 20, ttl: 60 } }) // 20 requests per minute - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getVpnPlans(): Promise { return this.vpnCatalog.getPlans(); } @Get("vpn/activation-fees") - @Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes + @Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes async getVpnActivationFees(): Promise { return this.vpnCatalog.getActivationFees(); } diff --git a/apps/bff/src/modules/catalog/catalog.module.ts b/apps/bff/src/modules/catalog/catalog.module.ts index 3b55f95b..430a917e 100644 --- a/apps/bff/src/modules/catalog/catalog.module.ts +++ b/apps/bff/src/modules/catalog/catalog.module.ts @@ -1,5 +1,6 @@ import { Module } from "@nestjs/common"; import { CatalogController } from "./catalog.controller"; +import { CatalogHealthController } from "./catalog-health.controller"; import { IntegrationsModule } from "@bff/integrations/integrations.module"; import { MappingsModule } from "@bff/modules/id-mappings/mappings.module"; import { CoreConfigModule } from "@bff/core/config/config.module"; @@ -13,7 +14,7 @@ import { CatalogCacheService } from "./services/catalog-cache.service"; @Module({ imports: [IntegrationsModule, MappingsModule, CoreConfigModule, CacheModule], - controllers: [CatalogController], + controllers: [CatalogController, CatalogHealthController], providers: [ BaseCatalogService, InternetCatalogService, diff --git a/apps/bff/src/modules/catalog/services/base-catalog.service.ts b/apps/bff/src/modules/catalog/services/base-catalog.service.ts index 6d2ae570..832e1f31 100644 --- a/apps/bff/src/modules/catalog/services/base-catalog.service.ts +++ b/apps/bff/src/modules/catalog/services/base-catalog.service.ts @@ -40,7 +40,9 @@ export class BaseCatalogService { context: string ): Promise { try { - const res = (await this.sf.query(soql)) as SalesforceResponse; + const res = (await this.sf.query(soql, { + label: `catalog:${context.replace(/\s+/g, "_" ).toLowerCase()}`, + })) as SalesforceResponse; return res.records ?? []; } catch (error: unknown) { this.logger.error(`Query failed: ${context}`, { diff --git a/apps/bff/src/modules/catalog/services/catalog-cache.service.ts b/apps/bff/src/modules/catalog/services/catalog-cache.service.ts index 6d109ded..bf87afc7 100644 --- a/apps/bff/src/modules/catalog/services/catalog-cache.service.ts +++ b/apps/bff/src/modules/catalog/services/catalog-cache.service.ts @@ -1,6 +1,19 @@ import { Injectable } from "@nestjs/common"; import { CacheService } from "@bff/infra/cache/cache.service"; +interface CacheBucketMetrics { + hits: number; + misses: number; +} + +interface CatalogCacheSnapshot { + catalog: CacheBucketMetrics; + static: CacheBucketMetrics; + volatile: CacheBucketMetrics; + eligibility: CacheBucketMetrics; + invalidations: number; +} + /** * Catalog-specific caching service * @@ -9,36 +22,54 @@ import { CacheService } from "@bff/infra/cache/cache.service"; */ @Injectable() export class CatalogCacheService { - // 5 minutes for catalog data (plans, SKUs, pricing) - private readonly CATALOG_TTL = 300; + // 60 minutes for catalog data (plans, SKUs, pricing) + private readonly CATALOG_TTL = 3600; // 15 minutes for relatively static data (categories, metadata) private readonly STATIC_TTL = 900; + // 15 minutes for account eligibility snapshots + private readonly ELIGIBILITY_TTL = 900; + // 1 minute for volatile data (availability, inventory) private readonly VOLATILE_TTL = 60; + private readonly metrics: CatalogCacheSnapshot = { + catalog: { hits: 0, misses: 0 }, + static: { hits: 0, misses: 0 }, + volatile: { hits: 0, misses: 0 }, + eligibility: { hits: 0, misses: 0 }, + invalidations: 0, + }; + constructor(private readonly cache: CacheService) {} /** - * Get or fetch catalog data with standard 5-minute TTL + * Get or fetch catalog data with standard 60-minute TTL */ async getCachedCatalog(key: string, fetchFn: () => Promise): Promise { - return this.cache.getOrSet(key, fetchFn, this.CATALOG_TTL); + return this.getOrSet("catalog", key, this.CATALOG_TTL, fetchFn); } /** * Get or fetch static catalog data with 15-minute TTL */ async getCachedStatic(key: string, fetchFn: () => Promise): Promise { - return this.cache.getOrSet(key, fetchFn, this.STATIC_TTL); + return this.getOrSet("static", key, this.STATIC_TTL, fetchFn); } /** * Get or fetch volatile catalog data with 1-minute TTL */ async getCachedVolatile(key: string, fetchFn: () => Promise): Promise { - return this.cache.getOrSet(key, fetchFn, this.VOLATILE_TTL); + return this.getOrSet("volatile", key, this.VOLATILE_TTL, fetchFn); + } + + /** + * Get or fetch eligibility data with 15-minute TTL + */ + async getCachedEligibility(key: string, fetchFn: () => Promise): Promise { + return this.getOrSet("eligibility", key, this.ELIGIBILITY_TTL, fetchFn, true); } /** @@ -48,10 +79,15 @@ export class CatalogCacheService { return `catalog:${catalogType}:${parts.join(":")}`; } + buildEligibilityKey(catalogType: string, accountId: string): string { + return `catalog:${catalogType}:eligibility:${accountId}`; + } + /** * Invalidate catalog cache by pattern */ async invalidateCatalog(catalogType: string): Promise { + this.metrics.invalidations++; await this.cache.delPattern(`catalog:${catalogType}:*`); } @@ -59,6 +95,62 @@ export class CatalogCacheService { * Invalidate all catalog cache */ async invalidateAllCatalogs(): Promise { + this.metrics.invalidations++; await this.cache.delPattern("catalog:*"); } + + getMetrics(): CatalogCacheSnapshot { + return { + catalog: { ...this.metrics.catalog }, + static: { ...this.metrics.static }, + volatile: { ...this.metrics.volatile }, + eligibility: { ...this.metrics.eligibility }, + invalidations: this.metrics.invalidations, + }; + } + + private async getOrSet( + bucket: "catalog" | "static" | "volatile" | "eligibility", + key: string, + ttlSeconds: number, + fetchFn: () => Promise, + allowNull = false + ): Promise { + const cached = await this.cache.get(key); + const unwrapped = this.unwrapCachedValue(cached); + + if (unwrapped.hit) { + if (allowNull || unwrapped.value !== null) { + this.metrics[bucket].hits++; + return unwrapped.value as T; + } + } + + this.metrics[bucket].misses++; + const fresh = await fetchFn(); + const valueToStore = allowNull ? (fresh ?? null) : fresh; + await this.cache.set(key, this.wrapCachedValue(valueToStore), ttlSeconds); + return fresh; + } + + private unwrapCachedValue(cached: unknown): { hit: boolean; value: T | null } { + if (cached === null || cached === undefined) { + return { hit: false, value: null }; + } + + if ( + typeof cached === "object" && + cached !== null && + Object.prototype.hasOwnProperty.call(cached, "__catalogCache") + ) { + const wrapper = cached as { value: T | null }; + return { hit: true, value: wrapper.value ?? null }; + } + + return { hit: true, value: (cached as T) ?? null }; + } + + private wrapCachedValue(value: T | null): { value: T | null; __catalogCache: true } { + return { value: value ?? null, __catalogCache: true }; + } } diff --git a/apps/bff/src/modules/catalog/services/internet-catalog.service.ts b/apps/bff/src/modules/catalog/services/internet-catalog.service.ts index 27afb8b0..de4f4e0a 100644 --- a/apps/bff/src/modules/catalog/services/internet-catalog.service.ts +++ b/apps/bff/src/modules/catalog/services/internet-catalog.service.ts @@ -147,15 +147,21 @@ export class InternetCatalogService extends BaseCatalogService { // Get customer's eligibility from Salesforce const sfAccountId = assertSalesforceId(mapping.sfAccountId, "sfAccountId"); - const soql = buildAccountEligibilityQuery(sfAccountId); - const accounts = await this.executeQuery(soql, "Customer Eligibility"); + const eligibilityKey = this.catalogCache.buildEligibilityKey("internet", sfAccountId); + const account = await this.catalogCache.getCachedEligibility( + eligibilityKey, + async () => { + const soql = buildAccountEligibilityQuery(sfAccountId); + const accounts = await this.executeQuery(soql, "Customer Eligibility"); + return accounts.length > 0 ? (accounts[0] as unknown as SalesforceAccount) : null; + } + ); - if (accounts.length === 0) { + if (!account) { this.logger.warn(`No Salesforce account found for user ${userId}, returning all plans`); return allPlans; } - const account = accounts[0] as unknown as SalesforceAccount; const eligibility = account.Internet_Eligibility__c; if (!eligibility) { diff --git a/apps/bff/src/modules/orders/orders.controller.ts b/apps/bff/src/modules/orders/orders.controller.ts index d4518ba6..659187cc 100644 --- a/apps/bff/src/modules/orders/orders.controller.ts +++ b/apps/bff/src/modules/orders/orders.controller.ts @@ -25,6 +25,7 @@ import { import { apiSuccessResponseSchema } from "@customer-portal/domain/common"; import { Observable } from "rxjs"; import { OrderEventsService } from "./services/order-events.service"; +import { SalesforceReadThrottleGuard } from "@bff/integrations/salesforce/guards/salesforce-read-throttle.guard"; @Controller("orders") @UseGuards(ThrottlerGuard) @@ -67,12 +68,14 @@ export class OrdersController { } @Get("user") + @UseGuards(SalesforceReadThrottleGuard) async getUserOrders(@Request() req: RequestWithUser) { return this.orderOrchestrator.getOrdersForUser(req.user.id); } @Get(":sfOrderId") @UsePipes(new ZodValidationPipe(sfOrderIdParamSchema)) + @UseGuards(SalesforceReadThrottleGuard) async get(@Request() req: RequestWithUser, @Param() params: SfOrderIdParam) { return this.orderOrchestrator.getOrder(params.sfOrderId); } diff --git a/apps/bff/src/modules/orders/orders.module.ts b/apps/bff/src/modules/orders/orders.module.ts index 6a73fd42..6186df7b 100644 --- a/apps/bff/src/modules/orders/orders.module.ts +++ b/apps/bff/src/modules/orders/orders.module.ts @@ -7,6 +7,7 @@ import { UsersModule } from "@bff/modules/users/users.module"; import { CoreConfigModule } from "@bff/core/config/config.module"; import { DatabaseModule } from "@bff/core/database/database.module"; import { CatalogModule } from "@bff/modules/catalog/catalog.module"; +import { CacheModule } from "@bff/infra/cache/cache.module"; // Clean modular order services import { OrderValidator } from "./services/order-validator.service"; @@ -17,6 +18,7 @@ import { OrderOrchestrator } from "./services/order-orchestrator.service"; import { PaymentValidatorService } from "./services/payment-validator.service"; import { CheckoutService } from "./services/checkout.service"; import { OrderEventsService } from "./services/order-events.service"; +import { OrdersCacheService } from "./services/orders-cache.service"; // Clean modular fulfillment services import { OrderFulfillmentValidator } from "./services/order-fulfillment-validator.service"; @@ -35,6 +37,7 @@ import { OrderFieldConfigModule } from "./config/order-field-config.module"; CoreConfigModule, DatabaseModule, CatalogModule, + CacheModule, OrderFieldConfigModule, ], controllers: [OrdersController, CheckoutController], @@ -49,6 +52,7 @@ import { OrderFieldConfigModule } from "./config/order-field-config.module"; OrderItemBuilder, OrderPricebookService, OrderOrchestrator, + OrdersCacheService, CheckoutService, // Order fulfillment services (modular) diff --git a/apps/bff/src/modules/orders/services/order-fulfillment-orchestrator.service.ts b/apps/bff/src/modules/orders/services/order-fulfillment-orchestrator.service.ts index ad47a27c..123de890 100644 --- a/apps/bff/src/modules/orders/services/order-fulfillment-orchestrator.service.ts +++ b/apps/bff/src/modules/orders/services/order-fulfillment-orchestrator.service.ts @@ -12,6 +12,7 @@ import { SimFulfillmentService } from "./sim-fulfillment.service"; import { DistributedTransactionService } from "@bff/core/database/services/distributed-transaction.service"; import { getErrorMessage } from "@bff/core/utils/error.util"; import { OrderEventsService } from "./order-events.service"; +import { OrdersCacheService } from "./orders-cache.service"; import { type OrderDetails, type OrderFulfillmentValidationResult, @@ -58,7 +59,8 @@ export class OrderFulfillmentOrchestrator { private readonly orderFulfillmentErrorService: OrderFulfillmentErrorService, private readonly simFulfillmentService: SimFulfillmentService, private readonly distributedTransactionService: DistributedTransactionService, - private readonly orderEvents: OrderEventsService + private readonly orderEvents: OrderEventsService, + private readonly ordersCache: OrdersCacheService ) {} /** @@ -116,6 +118,7 @@ export class OrderFulfillmentOrchestrator { whmcsOrderId: context.validation.whmcsOrderId, }, }); + await this.invalidateOrderCaches(sfOrderId, context.validation?.sfOrder?.AccountId); return context; } } catch (error) { @@ -379,8 +382,10 @@ export class OrderFulfillmentOrchestrator { duration: fulfillmentResult.duration, }); + await this.invalidateOrderCaches(sfOrderId, context.validation?.sfOrder?.AccountId); return context; } catch (error) { + await this.invalidateOrderCaches(sfOrderId, context.validation?.sfOrder?.AccountId); await this.handleFulfillmentError(context, error as Error); this.orderEvents.publish(sfOrderId, { orderId: sfOrderId, @@ -417,13 +422,32 @@ export class OrderFulfillmentOrchestrator { return steps; } - private extractConfigurations(value: unknown): Record { - if (value && typeof value === "object") { - return value as Record; + private async extractConfigurations( + rawConfigurations: unknown + ): Promise> { + if (rawConfigurations && typeof rawConfigurations === "object") { + return rawConfigurations as Record; } return {}; } + private async invalidateOrderCaches(orderId: string, accountId?: string | null): Promise { + const tasks: Array> = [this.ordersCache.invalidateOrder(orderId)]; + if (accountId) { + tasks.push(this.ordersCache.invalidateAccountOrders(accountId)); + } + + try { + await Promise.all(tasks); + } catch (error) { + this.logger.warn("Failed to invalidate order caches", { + orderId, + accountId: accountId ?? undefined, + error: getErrorMessage(error), + }); + } + } + /** * Handle fulfillment errors and update Salesforce */ diff --git a/apps/bff/src/modules/orders/services/order-item-builder.service.ts b/apps/bff/src/modules/orders/services/order-item-builder.service.ts index e7d4e039..ba6f828b 100644 --- a/apps/bff/src/modules/orders/services/order-item-builder.service.ts +++ b/apps/bff/src/modules/orders/services/order-item-builder.service.ts @@ -1,8 +1,6 @@ import { Injectable, BadRequestException, NotFoundException, Inject } from "@nestjs/common"; import { Logger } from "nestjs-pino"; -import { SalesforceConnection } from "@bff/integrations/salesforce/services/salesforce-connection.service"; import { OrderPricebookService } from "./order-pricebook.service"; -import { PrismaService } from "@bff/infra/database/prisma.service"; import { createOrderRequestSchema } from "@customer-portal/domain/orders"; /** @@ -12,33 +10,31 @@ import { createOrderRequestSchema } from "@customer-portal/domain/orders"; export class OrderItemBuilder { constructor( @Inject(Logger) private readonly logger: Logger, - private readonly sf: SalesforceConnection, - private readonly pricebookService: OrderPricebookService, - private readonly prisma: PrismaService + private readonly pricebookService: OrderPricebookService ) {} /** - * Create OrderItems directly from SKU array + * Prepare OrderItems for Salesforce composite tree creation */ - async createOrderItemsFromSKUs( - orderId: string, + async buildOrderItemsPayload( skus: string[], pricebookId: string - ): Promise { + ): Promise { const { skus: validatedSkus } = buildItemsSchema.parse({ skus }); if (pricebookId.length === 0) { throw new BadRequestException("Product SKU is required"); } - this.logger.log({ orderId, skus }, "Creating OrderItems from SKU array"); + this.logger.log({ skus }, "Preparing OrderItems payload from SKU array"); const metaMap = await this.pricebookService.fetchProductMeta(pricebookId, skus); - // Create OrderItems for each SKU + const payload: OrderItemCompositePayload[] = []; + for (const sku of validatedSkus) { const normalizedSkuValue = sku?.trim(); if (!normalizedSkuValue) { - this.logger.error({ orderId }, "Encountered empty SKU while creating order items"); + this.logger.error("Encountered empty SKU while preparing order items payload"); throw new BadRequestException("Product SKU is required"); } @@ -58,33 +54,20 @@ export class OrderItemBuilder { throw new Error(`PricebookEntry for SKU ${normalizedSkuValue} has no UnitPrice set`); } - this.logger.log( - { - sku: normalizedSkuValue, - pbeId: meta.pricebookEntryId, - unitPrice: meta.unitPrice, - }, - "Creating OrderItem" - ); - - try { - // Salesforce requires explicit UnitPrice even with PricebookEntryId - await this.sf.sobject("OrderItem").create({ - OrderId: orderId, - PricebookEntryId: meta.pricebookEntryId, - Quantity: 1, - UnitPrice: meta.unitPrice, - }); - - this.logger.log({ orderId, sku: normalizedSkuValue }, "OrderItem created successfully"); - } catch (error) { - this.logger.error( - { error, orderId, sku: normalizedSkuValue }, - "Failed to create OrderItem" - ); - throw error; - } + payload.push({ + sku: normalizedSkuValue, + pricebookEntryId: meta.pricebookEntryId, + quantity: 1, + unitPrice: meta.unitPrice, + }); } + + this.logger.log( + { itemCount: payload.length, skus: payload.map(item => item.sku) }, + "Prepared OrderItems payload" + ); + + return payload; } /** @@ -130,4 +113,11 @@ export class OrderItemBuilder { } } +export interface OrderItemCompositePayload { + sku: string; + pricebookEntryId: string; + quantity: number; + unitPrice: number; +} + const buildItemsSchema = createOrderRequestSchema.pick({ skus: true }); diff --git a/apps/bff/src/modules/orders/services/order-orchestrator.service.ts b/apps/bff/src/modules/orders/services/order-orchestrator.service.ts index be8d1b52..61d5e836 100644 --- a/apps/bff/src/modules/orders/services/order-orchestrator.service.ts +++ b/apps/bff/src/modules/orders/services/order-orchestrator.service.ts @@ -3,7 +3,8 @@ import { Logger } from "nestjs-pino"; import { SalesforceOrderService } from "@bff/integrations/salesforce/services/salesforce-order.service"; import { OrderValidator } from "./order-validator.service"; import { OrderBuilder } from "./order-builder.service"; -import { OrderItemBuilder } from "./order-item-builder.service"; +import { OrderItemBuilder, OrderItemCompositePayload } from "./order-item-builder.service"; +import { OrdersCacheService } from "./orders-cache.service"; import { type OrderDetails, type OrderSummary } from "@customer-portal/domain/orders"; import { assertSalesforceId } from "@bff/integrations/salesforce/utils/soql.util"; @@ -21,7 +22,8 @@ export class OrderOrchestrator { private readonly salesforceOrderService: SalesforceOrderService, private readonly orderValidator: OrderValidator, private readonly orderBuilder: OrderBuilder, - private readonly orderItemBuilder: OrderItemBuilder + private readonly orderItemBuilder: OrderItemBuilder, + private readonly ordersCache: OrdersCacheService ) {} /** @@ -51,20 +53,24 @@ export class OrderOrchestrator { validatedBody.userId ); - // 3) Create Order in Salesforce via integration service - const created = await this.salesforceOrderService.createOrder(orderFields); + const orderItemsPayload: OrderItemCompositePayload[] = + await this.orderItemBuilder.buildOrderItemsPayload(validatedBody.skus, pricebookId); - // 4) Create OrderItems from SKUs - await this.orderItemBuilder.createOrderItemsFromSKUs( - created.id, - validatedBody.skus, - pricebookId + const created = await this.salesforceOrderService.createOrderWithItems( + orderFields, + orderItemsPayload ); + if (userMapping.sfAccountId) { + await this.ordersCache.invalidateAccountOrders(userMapping.sfAccountId); + } + await this.ordersCache.invalidateOrder(created.id); + this.logger.log( { orderId: created.id, skuCount: validatedBody.skus.length, + orderItemCount: orderItemsPayload.length, }, "Order creation workflow completed successfully" ); @@ -84,7 +90,9 @@ export class OrderOrchestrator { this.logger.log({ orderId: safeOrderId }, "Fetching order details"); // Use integration service - it handles queries and transformations - return this.salesforceOrderService.getOrderById(safeOrderId); + return this.ordersCache.getOrderDetails(safeOrderId, () => + this.salesforceOrderService.getOrderById(safeOrderId) + ); } /** @@ -105,6 +113,8 @@ export class OrderOrchestrator { } // Use integration service - it handles queries and transformations - return this.salesforceOrderService.getOrdersForAccount(sfAccountId); + return this.ordersCache.getOrderSummaries(sfAccountId, () => + this.salesforceOrderService.getOrdersForAccount(sfAccountId) + ); } } diff --git a/apps/bff/src/modules/orders/services/order-pricebook.service.ts b/apps/bff/src/modules/orders/services/order-pricebook.service.ts index 902de01a..5dcc7787 100644 --- a/apps/bff/src/modules/orders/services/order-pricebook.service.ts +++ b/apps/bff/src/modules/orders/services/order-pricebook.service.ts @@ -39,7 +39,9 @@ export class OrderPricebookService { const soql = `SELECT Id, Name FROM Pricebook2 WHERE IsActive = true AND Name LIKE '%${sanitizeSoqlLiteral(name)}%' LIMIT 1`; try { - const result = (await this.sf.query(soql)) as SalesforceResponse<{ Id?: string }>; + const result = (await this.sf.query(soql, { + label: "orders:pricebook:findPortal", + })) as SalesforceResponse<{ Id?: string }>; if (result.records?.length) { const resolved = result.records[0]?.Id; if (resolved) { @@ -48,7 +50,8 @@ export class OrderPricebookService { } const std = (await this.sf.query( - "SELECT Id FROM Pricebook2 WHERE IsStandard = true AND IsActive = true LIMIT 1" + "SELECT Id FROM Pricebook2 WHERE IsStandard = true AND IsActive = true LIMIT 1", + { label: "orders:pricebook:findStandard" } )) as SalesforceResponse<{ Id?: string }>; const pricebookId = std.records?.[0]?.Id; @@ -93,7 +96,9 @@ export class OrderPricebookService { `WHERE Pricebook2Id='${safePricebookId}' AND IsActive=true AND Product2.StockKeepingUnit IN ${whereIn}`; try { - const res = (await this.sf.query(soql)) as SalesforceResponse< + const res = (await this.sf.query(soql, { + label: "orders:pricebook:fetchProducts", + })) as SalesforceResponse< SalesforcePricebookEntryRecord & { Product2?: SalesforceProduct2Record | null } >; diff --git a/apps/bff/src/modules/orders/services/orders-cache.service.ts b/apps/bff/src/modules/orders/services/orders-cache.service.ts new file mode 100644 index 00000000..4d48eeab --- /dev/null +++ b/apps/bff/src/modules/orders/services/orders-cache.service.ts @@ -0,0 +1,130 @@ +import { Injectable } from "@nestjs/common"; +import { CacheService } from "@bff/infra/cache/cache.service"; +import type { OrderDetails, OrderSummary } from "@customer-portal/domain/orders"; + +interface CacheBucketMetrics { + hits: number; + misses: number; +} + +interface OrdersCacheMetrics { + summaries: CacheBucketMetrics; + details: CacheBucketMetrics; + invalidations: number; +} + +@Injectable() +export class OrdersCacheService { + private readonly SUMMARY_TTL_SECONDS = 120; + private readonly DETAIL_TTL_SECONDS = 90; + + private readonly metrics: OrdersCacheMetrics = { + summaries: { hits: 0, misses: 0 }, + details: { hits: 0, misses: 0 }, + invalidations: 0, + }; + + constructor(private readonly cache: CacheService) {} + + async getOrderSummaries( + sfAccountId: string, + fetcher: () => Promise + ): Promise { + const key = this.buildAccountKey(sfAccountId); + return this.getOrSet( + "summaries", + key, + this.SUMMARY_TTL_SECONDS, + fetcher, + false + ); + } + + async getOrderDetails( + orderId: string, + fetcher: () => Promise + ): Promise { + const key = this.buildOrderKey(orderId); + return this.getOrSet( + "details", + key, + this.DETAIL_TTL_SECONDS, + fetcher, + true + ); + } + + async invalidateAccountOrders(sfAccountId: string): Promise { + const key = this.buildAccountKey(sfAccountId); + this.metrics.invalidations++; + await this.cache.del(key); + } + + async invalidateOrder(orderId: string): Promise { + const key = this.buildOrderKey(orderId); + this.metrics.invalidations++; + await this.cache.del(key); + } + + getMetrics(): OrdersCacheMetrics { + return { + summaries: { ...this.metrics.summaries }, + details: { ...this.metrics.details }, + invalidations: this.metrics.invalidations, + }; + } + + private async getOrSet( + bucket: keyof Pick, + key: string, + ttlSeconds: number, + fetcher: () => Promise, + allowNull: boolean + ): Promise { + const cached = await this.cache.get(key); + const unwrapped = this.unwrapCachedValue(cached); + + if (unwrapped.hit) { + if (allowNull || unwrapped.value !== null) { + this.metrics[bucket].hits++; + return unwrapped.value as T; + } + } + + this.metrics[bucket].misses++; + const fresh = await fetcher(); + const valueToStore = allowNull ? (fresh ?? null) : fresh; + await this.cache.set(key, this.wrapCachedValue(valueToStore), ttlSeconds); + return fresh; + } + + private unwrapCachedValue(cached: unknown): { hit: boolean; value: T | null } { + if (cached === null || cached === undefined) { + return { hit: false, value: null }; + } + + if ( + typeof cached === "object" && + cached !== null && + Object.prototype.hasOwnProperty.call(cached, "__ordersCache") + ) { + const wrapper = cached as { value: T | null }; + return { hit: true, value: wrapper.value ?? null }; + } + + return { hit: true, value: (cached as T) ?? null }; + } + + private wrapCachedValue(value: T | null): { value: T | null; __ordersCache: true } { + return { value: value ?? null, __ordersCache: true }; + } + + private buildAccountKey(sfAccountId: string): string { + return `orders:account:${sfAccountId}`; + } + + private buildOrderKey(orderId: string): string { + return `orders:detail:${orderId}`; + } +} + diff --git a/docs/salesforce-shadow-sync-plan.md b/docs/salesforce-shadow-sync-plan.md new file mode 100644 index 00000000..a4628abd --- /dev/null +++ b/docs/salesforce-shadow-sync-plan.md @@ -0,0 +1,84 @@ +# Salesforce Shadow Data Sync Plan + +## Objectives +- Reduce repetitive Salesforce reads for hot catalog and eligibility data. +- Provide resilient fallbacks when Salesforce limits are reached by serving data from Postgres shadow tables. +- Maintain data freshness within minutes via event-driven updates, with scheduled backstops. + +## Scope +- **Catalog metadata**: `Product2`, `PricebookEntry`, add-on metadata (SIM/Internet/VPN). +- **Pricing snapshots**: Unit price, currency, and active flags per SKU. +- **Account eligibility**: `Account.Internet_Eligibility__c` and related readiness fields used by personalized catalogs. + +## Proposed Schema (Postgres) + +```sql +CREATE TABLE sf_product_shadow ( + product_id TEXT PRIMARY KEY, + sku TEXT UNIQUE NOT NULL, + name TEXT NOT NULL, + item_class TEXT, + offering_type TEXT, + plan_tier TEXT, + vpn_region TEXT, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL, + raw_payload JSONB NOT NULL +); + +CREATE TABLE sf_pricebook_shadow ( + pricebook_entry_id TEXT PRIMARY KEY, + product_id TEXT NOT NULL REFERENCES sf_product_shadow(product_id) ON DELETE CASCADE, + pricebook_id TEXT NOT NULL, + unit_price NUMERIC(12,2) NOT NULL, + currency_iso_code TEXT NOT NULL, + is_active BOOLEAN NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL, + raw_payload JSONB NOT NULL +); + +CREATE TABLE sf_account_eligibility_shadow ( + account_id TEXT PRIMARY KEY, + internet_eligibility TEXT, + eligibility_source TEXT, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL, + raw_payload JSONB NOT NULL +); +``` + +## Sync Strategy + +| Phase | Approach | Tooling | +| --- | --- | --- | +| Backfill | Bulk API v2 query for each object (Product2, PricebookEntry, Account) to seed tables. | New CLI job (`pnpm nx run bff:salesforce-backfill-shadow`) | +| Incremental updates | Subscribe to Platform Events or Change Data Capture streams for Product2, PricebookEntry, and Account. Push events onto existing SalesforceRequestQueue, enqueue to BullMQ worker that upserts into shadow tables. | Extend provisioning queue or add new `SF_SHADOW_SYNC` queue | +| Catch-up | Nightly scheduled Bulk API delta query (using `SystemModstamp`) to reconcile missed events. | Cron worker (same Bull queue) | + +### Upsert Flow +1. Event payload arrives from Salesforce Pub/Sub → persisted to queue (reuse `SalesforceRequestQueueService` backoff). +2. Worker normalizes payload (maps relationship fields, handles deletions). +3. Performs PostgreSQL `INSERT ... ON CONFLICT` using transaction to keep product ↔ pricebook relationships consistent. +4. Invalidate Redis keys (`catalog:*`, `eligibility:*`) via `CatalogCacheService.invalidateAllCatalogs()` or targeted invalidation when specific SKU/account changes. + +## Integration Points +- **Catalog services**: attempt to read from shadow tables via Prisma before falling back to Salesforce query; only hit Salesforce on cache miss _and_ shadow miss. +- **Eligibility lookup**: `InternetCatalogService.getPlansForUser` first loads from `sf_account_eligibility_shadow`; if stale (>15 min) fallback to Salesforce + refresh row asynchronously. +- **Order flows**: continue using live Salesforce (writes) but use shadow data for price lookups where possible. + +## Monitoring & Alerts +- Add Prometheus counters: `sf_shadow_sync_events_total`, `sf_shadow_sync_failures_total`. +- Track lag metrics: `MAX(now() - updated_at)` per table. +- Hook into existing queue health endpoint to expose shadow worker backlog. + +## Rollout Checklist +1. Implement schema migrations (SQL or Prisma) under feature flag. +2. Build bulk backfill command; run in staging, verify record counts vs Salesforce SOQL. +3. Enable event ingestion in staging, monitor for 48h, validate cache invalidation. +4. Update catalog services to prefer shadow reads; release behind environment variable `ENABLE_SF_SHADOW_READS`. +5. Roll to production gradually: run backfill, enable read flag, then enable event consumer. +6. Document operational runbooks (replay events, manual backfill, clearing caches). + +## Open Questions +- Do we mirror additional fields (e.g., localization strings) needed for future UX changes? +- Should eligibility sync include other readiness signals (credit status, serviceability flags)? +- Confirm retention strategy for `raw_payload` column (e.g., prune older versions weekly). +