From cbaa87800064431458355508e96971a4fbf6ffc0 Mon Sep 17 00:00:00 2001 From: barsa Date: Thu, 6 Nov 2025 17:47:55 +0900 Subject: [PATCH] Refactor Salesforce event handling and caching mechanisms - Removed the deprecated SF_PROVISION_EVENT_CHANNEL from environment configuration to streamline event handling. - Enhanced CatalogCdcSubscriber to utilize product IDs for cache invalidation, improving cache management during CDC events. - Updated OrderCdcSubscriber to trigger provisioning based on Activation_Status__c changes, refining order processing logic. - Improved CatalogCacheService to support dependency tracking for cached values, enabling more efficient cache invalidation. - Refactored provisioning processor to simplify job handling and removed unnecessary replay ID management, enhancing clarity and performance. --- apps/bff/src/core/config/env.validation.ts | 1 - .../events/catalog-cdc.subscriber.ts | 60 ++- .../salesforce/events/event-keys.util.ts | 11 - .../salesforce/events/events.module.ts | 2 - .../salesforce/events/order-cdc.subscriber.ts | 78 ++-- .../salesforce/events/pubsub.subscriber.ts | 366 ------------------ .../salesforce/salesforce.module.ts | 1 + .../catalog/services/catalog-cache.service.ts | 191 ++++++++- .../services/internet-catalog.service.ts | 158 ++++---- .../catalog/services/sim-catalog.service.ts | 148 ++++--- .../orders/queue/provisioning.processor.ts | 38 +- .../orders/queue/provisioning.queue.ts | 7 +- docs/CDC_ONLY_ORDER_IMPLEMENTATION.md | 66 ++-- env/portal-backend.env.sample | 1 - 14 files changed, 489 insertions(+), 639 deletions(-) delete mode 100644 apps/bff/src/integrations/salesforce/events/event-keys.util.ts delete mode 100644 apps/bff/src/integrations/salesforce/events/pubsub.subscriber.ts diff --git a/apps/bff/src/core/config/env.validation.ts b/apps/bff/src/core/config/env.validation.ts index 2efdbb8a..e3a1de4a 100644 --- a/apps/bff/src/core/config/env.validation.ts +++ b/apps/bff/src/core/config/env.validation.ts @@ -85,7 +85,6 @@ export const envSchema = z.object({ SF_QUEUE_LONG_RUNNING_TIMEOUT_MS: z.coerce.number().int().positive().default(600000), SF_EVENTS_ENABLED: z.enum(["true", "false"]).default("false"), - SF_PROVISION_EVENT_CHANNEL: z.string().default("/event/Order_Fulfilment_Requested__e"), SF_CATALOG_EVENT_CHANNEL: z.string().default("/event/Product_and_Pricebook_Change__e"), SF_ACCOUNT_EVENT_CHANNEL: z.string().default("/event/Account_Internet_Eligibility_Update__e"), SF_ORDER_EVENT_CHANNEL: z.string().optional(), diff --git a/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts b/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts index 56db6ff0..a2ff7148 100644 --- a/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts +++ b/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts @@ -139,11 +139,24 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { data: unknown ): Promise { if (!this.isDataCallback(callbackType)) return; - this.logger.log("Product2 CDC event received, invalidating catalogs", { + const payload = this.extractPayload(data); + const productIds = this.extractRecordIds(payload); + + this.logger.log("Product2 CDC event received", { channel, topicName: subscription.topicName, + productIds, }); - await this.invalidateAllCatalogs(); + + const invalidated = await this.catalogCache.invalidateProducts(productIds); + + if (!invalidated) { + this.logger.debug("No catalog cache entries were linked to product IDs; falling back to full invalidation", { + channel, + productIds, + }); + await this.invalidateAllCatalogs(); + } } private async handlePricebookEvent( @@ -165,11 +178,26 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { return; } - this.logger.log("PricebookEntry CDC event received, invalidating catalogs", { + const productId = this.extractStringField(payload, ["Product2Id", "ProductId"]); + + this.logger.log("PricebookEntry CDC event received", { channel, pricebookId, + productId, }); - await this.invalidateAllCatalogs(); + + const invalidated = await this.catalogCache.invalidateProducts( + productId ? [productId] : [] + ); + + if (!invalidated) { + this.logger.debug("No catalog cache entries mapped to product from pricebook event; performing full invalidation", { + channel, + pricebookId, + productId, + }); + await this.invalidateAllCatalogs(); + } } private async handleAccountEvent( @@ -248,5 +276,29 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { } return undefined; } + + private extractRecordIds(payload: Record | undefined): string[] { + if (!payload) { + return []; + } + + const header = this.extractChangeEventHeader(payload); + const ids = header?.recordIds ?? []; + if (Array.isArray(ids)) { + return ids.filter((id): id is string => typeof id === "string" && id.trim().length > 0); + } + + return []; + } + + private extractChangeEventHeader( + payload: Record + ): { recordIds?: unknown; changedFields?: unknown } | undefined { + const header = payload["ChangeEventHeader"]; + if (header && typeof header === "object") { + return header as { recordIds?: unknown; changedFields?: unknown }; + } + return undefined; + } } diff --git a/apps/bff/src/integrations/salesforce/events/event-keys.util.ts b/apps/bff/src/integrations/salesforce/events/event-keys.util.ts deleted file mode 100644 index ee4664bd..00000000 --- a/apps/bff/src/integrations/salesforce/events/event-keys.util.ts +++ /dev/null @@ -1,11 +0,0 @@ -export function replayKey(channel: string): string { - return `sf:pe:replay:${channel}`; -} - -export function statusKey(channel: string): string { - return `sf:pe:status:${channel}`; -} - -export function latestSeenKey(channel: string): string { - return `sf:pe:latestSeen:${channel}`; -} diff --git a/apps/bff/src/integrations/salesforce/events/events.module.ts b/apps/bff/src/integrations/salesforce/events/events.module.ts index a57e09c5..b557b606 100644 --- a/apps/bff/src/integrations/salesforce/events/events.module.ts +++ b/apps/bff/src/integrations/salesforce/events/events.module.ts @@ -3,14 +3,12 @@ import { ConfigModule } from "@nestjs/config"; import { IntegrationsModule } from "@bff/integrations/integrations.module"; import { OrdersModule } from "@bff/modules/orders/orders.module"; import { CatalogModule } from "@bff/modules/catalog/catalog.module"; -import { SalesforcePubSubSubscriber } from "./pubsub.subscriber"; import { CatalogCdcSubscriber } from "./catalog-cdc.subscriber"; import { OrderCdcSubscriber } from "./order-cdc.subscriber"; @Module({ imports: [ConfigModule, IntegrationsModule, OrdersModule, CatalogModule], providers: [ - SalesforcePubSubSubscriber, // Platform Event for order provisioning CatalogCdcSubscriber, // CDC for catalog cache invalidation OrderCdcSubscriber, // CDC for order cache invalidation ], diff --git a/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts b/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts index ae03bbcf..8b29bb8b 100644 --- a/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts +++ b/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts @@ -28,7 +28,9 @@ type PubSubCtor = new (opts: { /** * CDC Subscriber for Order changes * - * Strategy: Only invalidate cache for customer-facing field changes, NOT internal system fields + * Strategy: + * 1. Trigger provisioning when Salesforce sets Activation_Status__c to "Activating" + * 2. Only invalidate cache for customer-facing field changes, NOT internal system fields * * CUSTOMER-FACING FIELDS (invalidate cache): * - Status (Draft, Pending Review, Completed, Cancelled) @@ -202,9 +204,9 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { return; } - // 1. CHECK FOR PROVISIONING TRIGGER (Status change) - if (payload && changedFields.has("Status")) { - await this.handleStatusChange(payload, orderId, changedFields); + // 1. CHECK FOR PROVISIONING TRIGGER (Activation status change) + if (payload && changedFields.has("Activation_Status__c")) { + await this.handleActivationStatusChange(payload, orderId); } // 2. CACHE INVALIDATION (existing logic) @@ -245,45 +247,33 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { } /** - * Handle Status field changes and trigger provisioning if needed + * Handle Activation_Status__c changes and trigger provisioning when Salesforce moves an order to "Activating" */ - private async handleStatusChange( + private async handleActivationStatusChange( payload: Record, - orderId: string, - changedFields: Set + orderId: string ): Promise { - const newStatus = this.extractStringField(payload, ["Status"]); const activationStatus = this.extractStringField(payload, ["Activation_Status__c"]); + const status = this.extractStringField(payload, ["Status"]); const whmcsOrderId = this.extractStringField(payload, ["WHMCS_Order_ID__c"]); - // Guard: Only provision for specific statuses - if (!newStatus || !this.PROVISION_TRIGGER_STATUSES.has(newStatus)) { - this.logger.debug("Status changed but not a provision trigger", { - orderId, - newStatus, - }); - return; - } - - // Guard: Don't trigger if already provisioning - if (activationStatus === "Activating") { - this.logger.debug("Order already provisioning, skipping", { + if (activationStatus !== "Activating") { + this.logger.debug("Activation status changed but not to Activating; skipping provisioning", { orderId, activationStatus, }); return; } - // Guard: Don't trigger if already activated - if (activationStatus === "Activated") { - this.logger.debug("Order already activated, skipping", { + if (status && !this.PROVISION_TRIGGER_STATUSES.has(status)) { + this.logger.debug("Activation status set to Activating but order status is not a provisioning trigger", { orderId, activationStatus, + status, }); return; } - // Guard: Check if WHMCS Order ID already exists (idempotency) if (whmcsOrderId) { this.logger.log("Order already has WHMCS Order ID, skipping provisioning", { orderId, @@ -292,28 +282,29 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { return; } - // Trigger provisioning - this.logger.log("Order status changed to provision trigger via CDC, enqueuing fulfillment", { + this.logger.log("Order activation moved to Activating via CDC, enqueuing fulfillment", { orderId, - status: newStatus, activationStatus, + status, }); try { await this.provisioningQueue.enqueue({ sfOrderId: orderId, - idempotencyKey: `cdc-status-${Date.now()}-${orderId}`, + idempotencyKey: `cdc-activation-${Date.now()}-${orderId}`, correlationId: `cdc-order-${orderId}`, }); - this.logger.log("Successfully enqueued provisioning job from CDC Status change", { + this.logger.log("Successfully enqueued provisioning job from activation change", { orderId, - trigger: `Status → ${newStatus}`, + activationStatus, + status, }); } catch (error) { - this.logger.error("Failed to enqueue provisioning job from CDC", { + this.logger.error("Failed to enqueue provisioning job from activation change", { orderId, - newStatus, + activationStatus, + status, error: error instanceof Error ? error.message : String(error), }); } @@ -408,6 +399,12 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { private extractChangedFields(payload: Record | undefined): Set { if (!payload) return new Set(); + const header = this.extractChangeEventHeader(payload); + const headerChangedFields = Array.isArray(header?.changedFields) + ? (header?.changedFields as unknown[]) + .filter((field): field is string => typeof field === "string" && field.length > 0) + : []; + // CDC provides changed fields in different formats depending on API version // Try to extract from common locations const changedFieldsArray = @@ -415,7 +412,10 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { ((payload.changeOrigin as { changedFields?: string[] })?.changedFields) || []; - return new Set(changedFieldsArray); + return new Set([ + ...headerChangedFields, + ...changedFieldsArray.filter(field => typeof field === "string" && field.length > 0), + ]); } private isDataCallback(callbackType: string): boolean { @@ -452,5 +452,15 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { } return undefined; } + + private extractChangeEventHeader( + payload: Record + ): { changedFields?: unknown } | undefined { + const header = payload["ChangeEventHeader"]; + if (header && typeof header === "object") { + return header as { changedFields?: unknown }; + } + return undefined; + } } diff --git a/apps/bff/src/integrations/salesforce/events/pubsub.subscriber.ts b/apps/bff/src/integrations/salesforce/events/pubsub.subscriber.ts deleted file mode 100644 index 25bcc6c0..00000000 --- a/apps/bff/src/integrations/salesforce/events/pubsub.subscriber.ts +++ /dev/null @@ -1,366 +0,0 @@ -import { Injectable, OnModuleInit, OnModuleDestroy, Inject } from "@nestjs/common"; -import { ConfigService } from "@nestjs/config"; -import { Logger } from "nestjs-pino"; -import PubSubApiClientPkg from "salesforce-pubsub-api-client"; -import { SalesforceConnection } from "../services/salesforce-connection.service"; -import { ProvisioningQueueService } from "@bff/modules/orders/queue/provisioning.queue"; -import { CacheService } from "@bff/infra/cache/cache.service"; -import { - replayKey as sfReplayKey, - statusKey as sfStatusKey, - latestSeenKey as sfLatestSeenKey, -} from "./event-keys.util"; -import type { - SalesforceOrderProvisionEvent, - SalesforcePubSubError, - SalesforcePubSubSubscription, - SalesforcePubSubCallbackType, - SalesforcePubSubUnknownData, -} from "@customer-portal/domain/orders"; - -type SubscribeCallback = ( - subscription: SalesforcePubSubSubscription, - callbackType: SalesforcePubSubCallbackType, - data: SalesforceOrderProvisionEvent | SalesforcePubSubError | SalesforcePubSubUnknownData -) => void | Promise; - -interface PubSubClient { - connect(): Promise; - subscribe(topic: string, cb: SubscribeCallback, numRequested?: number): Promise; - subscribeFromReplayId( - topic: string, - cb: SubscribeCallback, - numRequested: number | null, - replayId: number - ): Promise; - subscribeFromEarliestEvent( - topic: string, - cb: SubscribeCallback, - numRequested?: number - ): Promise; - requestAdditionalEvents(topic: string, numRequested: number): Promise; - close(): Promise; -} - -type PubSubCtor = new (opts: { - authType: string; - accessToken: string; - instanceUrl: string; - pubSubEndpoint: string; -}) => PubSubClient; - -@Injectable() -export class SalesforcePubSubSubscriber implements OnModuleInit, OnModuleDestroy { - private client: PubSubClient | null = null; - private clientAccessToken: string | null = null; - private channel!: string; - private replayCorruptionRecovered = false; - private subscribeCallback!: SubscribeCallback; - private pubSubCtor: PubSubCtor | null = null; - private clientBuildInFlight: Promise | null = null; - - constructor( - private readonly config: ConfigService, - private readonly sfConn: SalesforceConnection, - private readonly provisioningQueue: ProvisioningQueueService, - private readonly cache: CacheService, - @Inject(Logger) private readonly logger: Logger - ) {} - - async onModuleInit(): Promise { - const enabled = this.config.get("SF_EVENTS_ENABLED", "false") === "true"; - if (!enabled) { - this.logger.log("Salesforce Pub/Sub subscriber disabled", { enabled }); - return; - } - - this.channel = this.config.get( - "SF_PROVISION_EVENT_CHANNEL", - "/event/Order_Fulfilment_Requested__e" - ); - - try { - this.subscribeCallback = this.buildSubscribeCallback(); - await this.subscribeWithPolicy(true); - } catch (error) { - this.logger.error("Salesforce Pub/Sub subscription failed", { - error: error instanceof Error ? error.message : String(error), - }); - try { - await this.cache.set(sfStatusKey(this.channel || "/event/OrderProvisionRequested__e"), { - status: "disconnected", - since: Date.now(), - }); - } catch (cacheErr) { - this.logger.warn("Failed to set SF Pub/Sub disconnected status", { - error: cacheErr instanceof Error ? cacheErr.message : String(cacheErr), - }); - } - } - } - - async onModuleDestroy(): Promise { - try { - await this.safeCloseClient(); - await this.cache.set(sfStatusKey(this.channel), { - status: "disconnected", - since: Date.now(), - }); - } catch (error) { - this.logger.warn("Error closing Salesforce Pub/Sub client", { - error: error instanceof Error ? error.message : String(error), - }); - } - } - - private buildSubscribeCallback(): SubscribeCallback { - return async (subscription, callbackType, data) => { - try { - const argTypes = [typeof subscription, typeof callbackType, typeof data]; - const type = callbackType; - const typeNorm = String(type || "").toLowerCase(); - const topic = subscription.topicName || this.channel; - - if (typeNorm === "data" || typeNorm === "event") { - const event = data as SalesforceOrderProvisionEvent; - this.logger.debug("SF Pub/Sub data callback received", { - topic, - argTypes, - hasPayload: Boolean(event?.payload), - }); - const payload = event?.payload; - - const orderIdVal = payload?.["OrderId__c"] ?? payload?.["OrderId"]; - const orderId = typeof orderIdVal === "string" ? orderIdVal : undefined; - if (!orderId) { - this.logger.warn("Pub/Sub event missing OrderId__c; skipping", { - argTypes, - topic, - payloadKeys: payload ? Object.keys(payload) : [], - }); - const depth = await this.provisioningQueue.depth(); - if (depth < this.getMaxQueueSize()) { - const activeClient = this.client; - if (activeClient) { - await activeClient.requestAdditionalEvents(topic, 1); - } - } - return; - } - - const replayVal = (event as { replayId?: unknown })?.replayId; - const idempotencyKey = - typeof replayVal === "number" || typeof replayVal === "string" - ? String(replayVal) - : String(Date.now()); - const pubsubReplayId = typeof replayVal === "number" ? replayVal : undefined; - - await this.provisioningQueue.enqueue({ - sfOrderId: orderId, - idempotencyKey, - pubsubReplayId, - }); - this.logger.log("Enqueued provisioning job from SF event", { - sfOrderId: orderId, - replayId: pubsubReplayId, - topic, - }); - } else if (typeNorm === "lastevent") { - const depth = await this.provisioningQueue.depth(); - const available = Math.max(0, this.getMaxQueueSize() - depth); - const desired = Math.max(0, Math.min(this.getNumRequested(), available)); - if (desired > 0) { - const activeClient = this.client; - if (activeClient) { - await activeClient.requestAdditionalEvents(topic, desired); - } - } - } else if (typeNorm === "grpckeepalive") { - const latestVal = (data as { latestReplayId?: unknown })?.latestReplayId; - const latest = typeof latestVal === "number" ? latestVal : undefined; - if (typeof latest === "number") { - await this.cache.set(sfLatestSeenKey(this.channel), { - id: String(latest), - at: Date.now(), - }); - } - } else if (typeNorm === "grpcstatus" || typeNorm === "end") { - // Informational – no action required - } else if (typeNorm === "error") { - this.logger.warn("SF Pub/Sub stream error", { topic, data }); - try { - const errorData = data as SalesforcePubSubError; - const details = errorData.details || ""; - const metadata = errorData.metadata || {}; - const errorCodes = Array.isArray(metadata["error-code"]) ? metadata["error-code"] : []; - const hasCorruptionCode = errorCodes.some(code => - String(code).includes("replayid.corrupted") - ); - const mentionsReplayValidation = /Replay ID validation failed/i.test(details); - - if ( - (hasCorruptionCode || mentionsReplayValidation) && - !this.replayCorruptionRecovered - ) { - this.replayCorruptionRecovered = true; - const key = sfReplayKey(this.channel); - await this.cache.del(key); - this.logger.warn( - "Cleared invalid Salesforce Pub/Sub replay cursor; retrying subscription", - { - channel: this.channel, - key, - } - ); - } - } catch (recoveryErr) { - this.logger.warn("SF Pub/Sub replay corruption auto-recovery failed", { - error: recoveryErr instanceof Error ? recoveryErr.message : String(recoveryErr), - }); - } finally { - await this.recoverFromStreamError(); - } - } else { - const maybeEvent = data as SalesforceOrderProvisionEvent | undefined; - const hasPayload = Boolean(maybeEvent?.payload); - this.logger.debug("SF Pub/Sub callback ignored (unknown type)", { - type, - topic, - argTypes, - hasPayload, - }); - } - } catch (err) { - this.logger.error("Pub/Sub subscribe callback failed", { - error: err instanceof Error ? err.message : String(err), - }); - } - }; - } - - private getNumRequested(): number { - return Number(this.config.get("SF_PUBSUB_NUM_REQUESTED", "50")) || 50; - } - - private getMaxQueueSize(): number { - return Number(this.config.get("SF_PUBSUB_QUEUE_MAX", "100")) || 100; - } - - private getPubSubCtor(): PubSubCtor { - if (this.pubSubCtor) { - return this.pubSubCtor; - } - const maybeCtor = - (PubSubApiClientPkg as { default?: unknown })?.default ?? (PubSubApiClientPkg as unknown); - this.pubSubCtor = maybeCtor as PubSubCtor; - return this.pubSubCtor; - } - - private async ensureClient(forceRefresh = false): Promise { - if (this.clientBuildInFlight && !forceRefresh) { - return this.clientBuildInFlight; - } - - this.clientBuildInFlight = (async () => { - await this.sfConn.ensureConnected(); - const accessToken = this.sfConn.getAccessToken(); - const instanceUrl = this.sfConn.getInstanceUrl(); - if (!accessToken || !instanceUrl) { - throw new Error("Salesforce access token || instance URL unavailable"); - } - - const tokenChanged = this.clientAccessToken !== accessToken; - - if (!this.client || forceRefresh || tokenChanged) { - await this.safeCloseClient(); - - const endpoint = this.config.get( - "SF_PUBSUB_ENDPOINT", - "api.pubsub.salesforce.com:7443" - ); - const Ctor = this.getPubSubCtor(); - const client = new Ctor({ - authType: "user-supplied", - accessToken, - instanceUrl, - pubSubEndpoint: endpoint, - }); - - await client.connect(); - this.client = client; - this.clientAccessToken = accessToken; - this.replayCorruptionRecovered = false; - } - - return this.client; - })(); - - try { - return await this.clientBuildInFlight; - } finally { - this.clientBuildInFlight = null; - } - } - - private async subscribeWithPolicy(forceClientRefresh = false): Promise { - if (!this.subscribeCallback) { - throw new Error("Subscribe callback not initialized"); - } - - await this.cache.set(sfStatusKey(this.channel), { - status: "connecting", - since: Date.now(), - }); - - const client = await this.ensureClient(forceClientRefresh); - - const replayMode = this.config.get("SF_EVENTS_REPLAY", "LATEST"); - const replayKey = sfReplayKey(this.channel); - const storedReplay = replayMode !== "ALL" ? await this.cache.get(replayKey) : null; - const numRequested = this.getNumRequested(); - - if (storedReplay && replayMode !== "ALL") { - await client.subscribeFromReplayId( - this.channel, - this.subscribeCallback, - numRequested, - Number(storedReplay) - ); - } else if (replayMode === "ALL") { - await client.subscribeFromEarliestEvent(this.channel, this.subscribeCallback, numRequested); - } else { - await client.subscribe(this.channel, this.subscribeCallback, numRequested); - } - - await this.cache.set(sfStatusKey(this.channel), { - status: "connected", - since: Date.now(), - }); - this.logger.log("Salesforce Pub/Sub subscription active", { channel: this.channel }); - } - - private async recoverFromStreamError(): Promise { - await this.cache.set(sfStatusKey(this.channel), { - status: "reconnecting", - since: Date.now(), - }); - await this.safeCloseClient(); - await this.subscribeWithPolicy(true); - } - - private async safeCloseClient(): Promise { - if (!this.client) { - return; - } - try { - await this.client.close(); - } catch (error) { - this.logger.warn("Failed to close Salesforce Pub/Sub client", { - error: error instanceof Error ? error.message : String(error), - }); - } finally { - this.client = null; - this.clientAccessToken = null; - } - } -} diff --git a/apps/bff/src/integrations/salesforce/salesforce.module.ts b/apps/bff/src/integrations/salesforce/salesforce.module.ts index 31bd02c3..7e7b5226 100644 --- a/apps/bff/src/integrations/salesforce/salesforce.module.ts +++ b/apps/bff/src/integrations/salesforce/salesforce.module.ts @@ -20,6 +20,7 @@ import { SalesforceWriteThrottleGuard } from "./guards/salesforce-write-throttle SalesforceWriteThrottleGuard, ], exports: [ + QueueModule, SalesforceService, SalesforceConnection, SalesforceOrderService, diff --git a/apps/bff/src/modules/catalog/services/catalog-cache.service.ts b/apps/bff/src/modules/catalog/services/catalog-cache.service.ts index 1b110bff..03d35f35 100644 --- a/apps/bff/src/modules/catalog/services/catalog-cache.service.ts +++ b/apps/bff/src/modules/catalog/services/catalog-cache.service.ts @@ -14,6 +14,16 @@ export interface CatalogCacheSnapshot { invalidations: number; } +interface CacheDependencies { + productIds?: string[]; +} + +interface WrappedCatalogValue { + value: T | null; + __catalogCache: true; + dependencies?: CacheDependencies; +} + /** * Catalog-specific caching service * @@ -25,10 +35,10 @@ export class CatalogCacheService { // Hybrid approach: CDC for real-time invalidation + TTL for backup cleanup // Primary: CDC events invalidate cache when data changes (real-time) // Backup: TTL expires unused cache entries (memory management) - private readonly CATALOG_TTL = 86400; // 24 hours - general catalog data - private readonly STATIC_TTL = 604800; // 7 days - rarely changing data - private readonly ELIGIBILITY_TTL = 3600; // 1 hour - user-specific eligibility - private readonly VOLATILE_TTL = 60; // 1 minute - real-time data (availability, inventory) + private readonly CATALOG_TTL: number | null = null; // CDC-driven invalidation + private readonly STATIC_TTL: number | null = null; // CDC-driven invalidation + private readonly ELIGIBILITY_TTL: number | null = null; // CDC-driven invalidation + private readonly VOLATILE_TTL = 60; // Volatile data still uses TTL private readonly metrics: CatalogCacheSnapshot = { catalog: { hits: 0, misses: 0 }, @@ -48,8 +58,12 @@ export class CatalogCacheService { /** * Get or fetch catalog data (long-lived cache, event-driven invalidation) */ - async getCachedCatalog(key: string, fetchFn: () => Promise): Promise { - return this.getOrSet("catalog", key, this.CATALOG_TTL, fetchFn); + async getCachedCatalog( + key: string, + fetchFn: () => Promise, + options?: CatalogCacheOptions + ): Promise { + return this.getOrSet("catalog", key, this.CATALOG_TTL, fetchFn, options); } /** @@ -70,7 +84,9 @@ export class CatalogCacheService { * Get or fetch eligibility data (long-lived cache) */ async getCachedEligibility(key: string, fetchFn: () => Promise): Promise { - return this.getOrSet("eligibility", key, this.ELIGIBILITY_TTL, fetchFn, true); + return this.getOrSet("eligibility", key, this.ELIGIBILITY_TTL, fetchFn, { + allowNull: true, + }); } /** @@ -90,6 +106,7 @@ export class CatalogCacheService { async invalidateCatalog(catalogType: string): Promise { this.metrics.invalidations++; await this.cache.delPattern(`catalog:${catalogType}:*`); + await this.flushProductDependencyIndex(); } /** @@ -107,12 +124,13 @@ export class CatalogCacheService { async invalidateAllCatalogs(): Promise { this.metrics.invalidations++; await this.cache.delPattern("catalog:*"); + await this.flushProductDependencyIndex(); } getTtlConfiguration(): { - catalogSeconds: number; - eligibilitySeconds: number; - staticSeconds: number; + catalogSeconds: number | null; + eligibilitySeconds: number | null; + staticSeconds: number | null; volatileSeconds: number; } { return { @@ -142,7 +160,11 @@ export class CatalogCacheService { typeof eligibility === "string" ? { Id: accountId, Internet_Eligibility__c: eligibility } : null; - await this.cache.set(key, this.wrapCachedValue(payload)); + if (this.ELIGIBILITY_TTL === null) { + await this.cache.set(key, this.wrapCachedValue(payload)); + } else { + await this.cache.set(key, this.wrapCachedValue(payload), this.ELIGIBILITY_TTL); + } } private async getOrSet( @@ -150,8 +172,9 @@ export class CatalogCacheService { key: string, ttlSeconds: number | null, fetchFn: () => Promise, - allowNull = false + options?: CatalogCacheOptions ): Promise { + const allowNull = options?.allowNull ?? false; // 1. Check Redis cache first (fastest path) const cached = await this.cache.get(key); const unwrapped = this.unwrapCachedValue(cached); @@ -179,12 +202,27 @@ export class CatalogCacheService { try { const fresh = await fetchFn(); const valueToStore = allowNull ? (fresh ?? null) : fresh; + const dependencies = options?.resolveDependencies + ? await options.resolveDependencies(fresh) + : undefined; + + if (unwrapped.dependencies) { + await this.unlinkDependenciesForKey(key, unwrapped.dependencies); + } // Store in Redis for future requests if (ttlSeconds === null) { - await this.cache.set(key, this.wrapCachedValue(valueToStore)); + await this.cache.set(key, this.wrapCachedValue(valueToStore, dependencies)); } else { - await this.cache.set(key, this.wrapCachedValue(valueToStore), ttlSeconds); + await this.cache.set( + key, + this.wrapCachedValue(valueToStore, dependencies), + ttlSeconds + ); + } + + if (dependencies) { + await this.linkDependencies(key, dependencies); } return fresh; @@ -200,7 +238,46 @@ export class CatalogCacheService { return fetchPromise; } - private unwrapCachedValue(cached: unknown): { hit: boolean; value: T | null } { + async invalidateProducts(productIds: string[]): Promise { + const uniqueIds = Array.from(new Set((productIds ?? []).filter(Boolean))); + if (uniqueIds.length === 0) { + return false; + } + + const keysToInvalidate = new Set(); + + for (const productId of uniqueIds) { + const indexKey = this.buildProductDependencyKey(productId); + const index = await this.cache.get<{ keys?: string[] }>(indexKey); + const keys = index?.keys ?? []; + keys.forEach(k => keysToInvalidate.add(k)); + if (keys.length === 0) { + continue; + } + } + + if (keysToInvalidate.size === 0) { + return false; + } + + for (const key of keysToInvalidate) { + const cached = await this.cache.get(key); + const unwrapped = this.unwrapCachedValue(cached); + if (unwrapped.dependencies) { + await this.unlinkDependenciesForKey(key, unwrapped.dependencies); + } + await this.cache.del(key); + this.metrics.invalidations++; + } + + return true; + } + + private unwrapCachedValue(cached: unknown): { + hit: boolean; + value: T | null; + dependencies?: CacheDependencies; + } { if (cached === null || cached === undefined) { return { hit: false, value: null }; } @@ -210,14 +287,90 @@ export class CatalogCacheService { cached !== null && Object.prototype.hasOwnProperty.call(cached, "__catalogCache") ) { - const wrapper = cached as { value: T | null }; - return { hit: true, value: wrapper.value ?? null }; + const wrapper = cached as WrappedCatalogValue; + return { + hit: true, + value: wrapper.value ?? null, + dependencies: wrapper.dependencies, + }; } return { hit: true, value: (cached as T) ?? null }; } - private wrapCachedValue(value: T | null): { value: T | null; __catalogCache: true } { - return { value: value ?? null, __catalogCache: true }; + private wrapCachedValue( + value: T | null, + dependencies?: CacheDependencies + ): WrappedCatalogValue { + return { + value: value ?? null, + __catalogCache: true, + dependencies: dependencies && this.normalizeDependencies(dependencies), + }; + } + + private normalizeDependencies(dependencies: CacheDependencies): CacheDependencies | undefined { + const productIds = dependencies.productIds?.filter(Boolean); + if (!productIds || productIds.length === 0) { + return undefined; + } + return { productIds: Array.from(new Set(productIds)) }; + } + + private async linkDependencies(key: string, dependencies: CacheDependencies): Promise { + const normalized = this.normalizeDependencies(dependencies); + if (!normalized) { + return; + } + + if (normalized.productIds) { + for (const productId of normalized.productIds) { + const indexKey = this.buildProductDependencyKey(productId); + const existing = (await this.cache.get<{ keys?: string[] }>(indexKey))?.keys ?? []; + if (!existing.includes(key)) { + existing.push(key); + } + await this.cache.set(indexKey, { keys: existing }); + } + } + } + + private async unlinkDependenciesForKey( + key: string, + dependencies: CacheDependencies + ): Promise { + const normalized = this.normalizeDependencies(dependencies); + if (!normalized) { + return; + } + + if (normalized.productIds) { + for (const productId of normalized.productIds) { + const indexKey = this.buildProductDependencyKey(productId); + const existing = await this.cache.get<{ keys?: string[] }>(indexKey); + if (!existing?.keys?.length) { + continue; + } + const nextKeys = existing.keys.filter(k => k !== key); + if (nextKeys.length === 0) { + await this.cache.del(indexKey); + } else { + await this.cache.set(indexKey, { keys: nextKeys }); + } + } + } + } + + private buildProductDependencyKey(productId: string): string { + return `catalog:deps:product:${productId}`; + } + + private async flushProductDependencyIndex(): Promise { + await this.cache.delPattern("catalog:deps:product:*"); } } + +export interface CatalogCacheOptions { + allowNull?: boolean; + resolveDependencies?: (value: T) => CacheDependencies | Promise | undefined; +} diff --git a/apps/bff/src/modules/catalog/services/internet-catalog.service.ts b/apps/bff/src/modules/catalog/services/internet-catalog.service.ts index de4f4e0a..569234e6 100644 --- a/apps/bff/src/modules/catalog/services/internet-catalog.service.ts +++ b/apps/bff/src/modules/catalog/services/internet-catalog.service.ts @@ -41,87 +41,113 @@ export class InternetCatalogService extends BaseCatalogService { async getPlans(): Promise { const cacheKey = this.catalogCache.buildCatalogKey("internet", "plans"); - return this.catalogCache.getCachedCatalog(cacheKey, async () => { - const soql = this.buildCatalogServiceQuery("Internet", [ - "Internet_Plan_Tier__c", - "Internet_Offering_Type__c", - "Catalog_Order__c", - ]); - const records = await this.executeQuery( - soql, - "Internet Plans" - ); + return this.catalogCache.getCachedCatalog( + cacheKey, + async () => { + const soql = this.buildCatalogServiceQuery("Internet", [ + "Internet_Plan_Tier__c", + "Internet_Offering_Type__c", + "Catalog_Order__c", + ]); + const records = await this.executeQuery( + soql, + "Internet Plans" + ); - return records.map(record => { - const entry = this.extractPricebookEntry(record); - const plan = CatalogProviders.Salesforce.mapInternetPlan(record, entry); - return enrichInternetPlanMetadata(plan); - }); - }); + return records.map(record => { + const entry = this.extractPricebookEntry(record); + const plan = CatalogProviders.Salesforce.mapInternetPlan(record, entry); + return enrichInternetPlanMetadata(plan); + }); + }, + { + resolveDependencies: plans => ({ + productIds: plans.map(plan => plan.id).filter((id): id is string => Boolean(id)), + }), + } + ); } async getInstallations(): Promise { const cacheKey = this.catalogCache.buildCatalogKey("internet", "installations"); - return this.catalogCache.getCachedCatalog(cacheKey, async () => { - const soql = this.buildProductQuery("Internet", "Installation", [ - "Billing_Cycle__c", - "Catalog_Order__c", - ]); - const records = await this.executeQuery( - soql, - "Internet Installations" - ); + return this.catalogCache.getCachedCatalog( + cacheKey, + async () => { + const soql = this.buildProductQuery("Internet", "Installation", [ + "Billing_Cycle__c", + "Catalog_Order__c", + ]); + const records = await this.executeQuery( + soql, + "Internet Installations" + ); - this.logger.log(`Found ${records.length} installation records`); + this.logger.log(`Found ${records.length} installation records`); - return records - .map(record => { - const entry = this.extractPricebookEntry(record); - const installation = CatalogProviders.Salesforce.mapInternetInstallation(record, entry); - return { - ...installation, - catalogMetadata: { - ...installation.catalogMetadata, - installationTerm: inferInstallationTermFromSku(installation.sku ?? ""), - }, - }; - }) - .sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0)); - }); + return records + .map(record => { + const entry = this.extractPricebookEntry(record); + const installation = CatalogProviders.Salesforce.mapInternetInstallation(record, entry); + return { + ...installation, + catalogMetadata: { + ...installation.catalogMetadata, + installationTerm: inferInstallationTermFromSku(installation.sku ?? ""), + }, + }; + }) + .sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0)); + }, + { + resolveDependencies: installations => ({ + productIds: installations + .map(item => item.id) + .filter((id): id is string => Boolean(id)), + }), + } + ); } async getAddons(): Promise { const cacheKey = this.catalogCache.buildCatalogKey("internet", "addons"); - return this.catalogCache.getCachedCatalog(cacheKey, async () => { - const soql = this.buildProductQuery("Internet", "Add-on", [ - "Billing_Cycle__c", - "Catalog_Order__c", - "Bundled_Addon__c", - "Is_Bundled_Addon__c", - ]); - const records = await this.executeQuery( - soql, - "Internet Add-ons" - ); + return this.catalogCache.getCachedCatalog( + cacheKey, + async () => { + const soql = this.buildProductQuery("Internet", "Add-on", [ + "Billing_Cycle__c", + "Catalog_Order__c", + "Bundled_Addon__c", + "Is_Bundled_Addon__c", + ]); + const records = await this.executeQuery( + soql, + "Internet Add-ons" + ); - this.logger.log(`Found ${records.length} addon records`); + this.logger.log(`Found ${records.length} addon records`); - return records - .map(record => { - const entry = this.extractPricebookEntry(record); - const addon = CatalogProviders.Salesforce.mapInternetAddon(record, entry); - return { - ...addon, - catalogMetadata: { - ...addon.catalogMetadata, - addonType: inferAddonTypeFromSku(addon.sku ?? ""), - }, - }; - }) - .sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0)); - }); + return records + .map(record => { + const entry = this.extractPricebookEntry(record); + const addon = CatalogProviders.Salesforce.mapInternetAddon(record, entry); + return { + ...addon, + catalogMetadata: { + ...addon.catalogMetadata, + addonType: inferAddonTypeFromSku(addon.sku ?? ""), + }, + }; + }) + .sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0)); + }, + { + resolveDependencies: addons => ({ + productIds: addons.map(addon => addon.id).filter((id): id is string => Boolean(id)), + }), + } + ); } async getCatalogData() { diff --git a/apps/bff/src/modules/catalog/services/sim-catalog.service.ts b/apps/bff/src/modules/catalog/services/sim-catalog.service.ts index 22f7ff5c..8335d1b0 100644 --- a/apps/bff/src/modules/catalog/services/sim-catalog.service.ts +++ b/apps/bff/src/modules/catalog/services/sim-catalog.service.ts @@ -29,64 +29,21 @@ export class SimCatalogService extends BaseCatalogService { async getPlans(): Promise { const cacheKey = this.catalogCache.buildCatalogKey("sim", "plans"); - return this.catalogCache.getCachedCatalog(cacheKey, async () => { - const soql = this.buildCatalogServiceQuery("SIM", [ - "SIM_Data_Size__c", - "SIM_Plan_Type__c", - "SIM_Has_Family_Discount__c", - "Catalog_Order__c", - ]); - const records = await this.executeQuery( - soql, - "SIM Plans" - ); + return this.catalogCache.getCachedCatalog( + cacheKey, + async () => { + const soql = this.buildCatalogServiceQuery("SIM", [ + "SIM_Data_Size__c", + "SIM_Plan_Type__c", + "SIM_Has_Family_Discount__c", + "Catalog_Order__c", + ]); + const records = await this.executeQuery( + soql, + "SIM Plans" + ); - return records.map(record => { - const entry = this.extractPricebookEntry(record); - const product = CatalogProviders.Salesforce.mapSimProduct(record, entry); - - return { - ...product, - description: product.description ?? product.name, - } satisfies SimCatalogProduct; - }); - }); - } - - async getActivationFees(): Promise { - const cacheKey = this.catalogCache.buildCatalogKey("sim", "activation-fees"); - - return this.catalogCache.getCachedCatalog(cacheKey, async () => { - const soql = this.buildProductQuery("SIM", "Activation", []); - const records = await this.executeQuery( - soql, - "SIM Activation Fees" - ); - - return records.map(record => { - const entry = this.extractPricebookEntry(record); - return CatalogProviders.Salesforce.mapSimActivationFee(record, entry); - }); - }); - } - - async getAddons(): Promise { - const cacheKey = this.catalogCache.buildCatalogKey("sim", "addons"); - - return this.catalogCache.getCachedCatalog(cacheKey, async () => { - const soql = this.buildProductQuery("SIM", "Add-on", [ - "Billing_Cycle__c", - "Catalog_Order__c", - "Bundled_Addon__c", - "Is_Bundled_Addon__c", - ]); - const records = await this.executeQuery( - soql, - "SIM Add-ons" - ); - - return records - .map(record => { + return records.map(record => { const entry = this.extractPricebookEntry(record); const product = CatalogProviders.Salesforce.mapSimProduct(record, entry); @@ -94,9 +51,80 @@ export class SimCatalogService extends BaseCatalogService { ...product, description: product.description ?? product.name, } satisfies SimCatalogProduct; - }) - .sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0)); - }); + }); + }, + { + resolveDependencies: plans => ({ + productIds: plans.map(plan => plan.id).filter((id): id is string => Boolean(id)), + }), + } + ); + } + + async getActivationFees(): Promise { + const cacheKey = this.catalogCache.buildCatalogKey("sim", "activation-fees"); + + return this.catalogCache.getCachedCatalog( + cacheKey, + async () => { + const soql = this.buildProductQuery("SIM", "Activation", []); + const records = await this.executeQuery( + soql, + "SIM Activation Fees" + ); + + return records.map(record => { + const entry = this.extractPricebookEntry(record); + return CatalogProviders.Salesforce.mapSimActivationFee(record, entry); + }); + }, + { + resolveDependencies: products => ({ + productIds: products + .map(product => product.id) + .filter((id): id is string => Boolean(id)), + }), + } + ); + } + + async getAddons(): Promise { + const cacheKey = this.catalogCache.buildCatalogKey("sim", "addons"); + + return this.catalogCache.getCachedCatalog( + cacheKey, + async () => { + const soql = this.buildProductQuery("SIM", "Add-on", [ + "Billing_Cycle__c", + "Catalog_Order__c", + "Bundled_Addon__c", + "Is_Bundled_Addon__c", + ]); + const records = await this.executeQuery( + soql, + "SIM Add-ons" + ); + + return records + .map(record => { + const entry = this.extractPricebookEntry(record); + const product = CatalogProviders.Salesforce.mapSimProduct(record, entry); + + return { + ...product, + description: product.description ?? product.name, + } satisfies SimCatalogProduct; + }) + .sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0)); + }, + { + resolveDependencies: products => ({ + productIds: products + .map(product => product.id) + .filter((id): id is string => Boolean(id)), + }), + } + ); } async getPlansForUser(userId: string): Promise { diff --git a/apps/bff/src/modules/orders/queue/provisioning.processor.ts b/apps/bff/src/modules/orders/queue/provisioning.processor.ts index 151749c6..6f16bc22 100644 --- a/apps/bff/src/modules/orders/queue/provisioning.processor.ts +++ b/apps/bff/src/modules/orders/queue/provisioning.processor.ts @@ -4,10 +4,7 @@ import { Logger } from "nestjs-pino"; import { OrderFulfillmentOrchestrator } from "../services/order-fulfillment-orchestrator.service"; import { SalesforceService } from "@bff/integrations/salesforce/salesforce.service"; import type { ProvisioningJobData } from "./provisioning.queue"; -import { CacheService } from "@bff/infra/cache/cache.service"; -import { ConfigService } from "@nestjs/config"; import { QUEUE_NAMES } from "@bff/infra/queue/queue.constants"; -import { replayKey as sfReplayKey } from "@bff/integrations/salesforce/events/event-keys.util"; @Processor(QUEUE_NAMES.PROVISIONING) @Injectable() @@ -15,8 +12,6 @@ export class ProvisioningProcessor extends WorkerHost { constructor( private readonly orchestrator: OrderFulfillmentOrchestrator, private readonly salesforceService: SalesforceService, - private readonly cache: CacheService, - private readonly config: ConfigService, @Inject(Logger) private readonly logger: Logger ) { super(); @@ -28,7 +23,6 @@ export class ProvisioningProcessor extends WorkerHost { sfOrderId, idempotencyKey, correlationId: job.data.correlationId, - pubsubReplayId: job.data.pubsubReplayId, }); // Guard: Only process if Salesforce Order is currently 'Activating' @@ -41,7 +35,6 @@ export class ProvisioningProcessor extends WorkerHost { sfOrderId, currentStatus: status, }); - await this.commitReplay(job); return; // Ack + no-op to safely handle duplicate/old events } @@ -52,36 +45,11 @@ export class ProvisioningProcessor extends WorkerHost { currentStatus: status, lastErrorCode, }); - await this.commitReplay(job); return; } - try { - // Execute the same orchestration used by the webhook path, but without payload validation - await this.orchestrator.executeFulfillment(sfOrderId, {}, idempotencyKey); - this.logger.log("Provisioning job completed", { sfOrderId }); - } finally { - // Commit processed replay id for Pub/Sub resume (commit regardless of success to avoid replay storms) - await this.commitReplay(job); - } - } - - private async commitReplay(job: { data: ProvisioningJobData }): Promise { - if (typeof job.data.pubsubReplayId !== "number") return; - try { - const channel = this.config.get( - "SF_PROVISION_EVENT_CHANNEL", - "/event/Order_Fulfilment_Requested__e" - ); - const replayKey = sfReplayKey(channel); - const prev = Number((await this.cache.get(replayKey)) ?? 0); - if (job.data.pubsubReplayId > prev) { - await this.cache.set(replayKey, String(job.data.pubsubReplayId)); - } - } catch (e) { - this.logger.warn("Failed to commit Pub/Sub replay id", { - error: e instanceof Error ? e.message : String(e), - }); - } + // Execute the same orchestration used by the webhook path, but without payload validation + await this.orchestrator.executeFulfillment(sfOrderId, {}, idempotencyKey); + this.logger.log("Provisioning job completed", { sfOrderId }); } } diff --git a/apps/bff/src/modules/orders/queue/provisioning.queue.ts b/apps/bff/src/modules/orders/queue/provisioning.queue.ts index 750f6ded..2923664f 100644 --- a/apps/bff/src/modules/orders/queue/provisioning.queue.ts +++ b/apps/bff/src/modules/orders/queue/provisioning.queue.ts @@ -8,7 +8,6 @@ export interface ProvisioningJobData { sfOrderId: string; idempotencyKey: string; correlationId?: string; - pubsubReplayId?: number; } @Injectable() @@ -19,10 +18,7 @@ export class ProvisioningQueueService { ) {} async enqueue(job: ProvisioningJobData): Promise { - const jobId = - typeof job.pubsubReplayId === "number" - ? `sf:${job.sfOrderId}:replay:${job.pubsubReplayId}` - : `sf:${job.sfOrderId}`; + const jobId = job.idempotencyKey || `sf:${job.sfOrderId}`; try { await this.queue.add("provision", job, { jobId, @@ -35,7 +31,6 @@ export class ProvisioningQueueService { sfOrderId: job.sfOrderId, idempotencyKey: job.idempotencyKey, correlationId: job.correlationId, - pubsubReplayId: job.pubsubReplayId, }); } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); diff --git a/docs/CDC_ONLY_ORDER_IMPLEMENTATION.md b/docs/CDC_ONLY_ORDER_IMPLEMENTATION.md index 4fb21bd1..b6c7c54d 100644 --- a/docs/CDC_ONLY_ORDER_IMPLEMENTATION.md +++ b/docs/CDC_ONLY_ORDER_IMPLEMENTATION.md @@ -25,11 +25,13 @@ Enqueues provisioning job ``` Salesforce Order Status → "Approved" ↓ -CDC: OrderChangeEvent (automatic) +Salesforce Flow: set Activation_Status__c = "Activating" (clear errors) + ↓ +CDC: OrderChangeEvent (Activation_Status__c) ↓ OrderCdcSubscriber receives event ↓ -Detects: Status changed to "Approved" +Detects: Activation_Status__c changed to "Activating" ↓ Enqueues provisioning job ``` @@ -41,7 +43,7 @@ Enqueues provisioning job ### **OrderCdcSubscriber Now Handles:** 1. **Order Provisioning** (NEW) - - Detects when Status changes to "Approved" or "Reactivate" + - Detects when Salesforce sets `Activation_Status__c` to `"Activating"` - Validates order is not already provisioning/provisioned - Enqueues provisioning job @@ -52,16 +54,13 @@ Enqueues provisioning job ### **Key Guards Added:** ```typescript -// 1. Only trigger for specific statuses -PROVISION_TRIGGER_STATUSES = ["Approved", "Reactivate"] +// 1. Only continue when Activation_Status__c === "Activating" +if (activationStatus !== "Activating") return; -// 2. Don't trigger if already provisioning -if (activationStatus === "Activating") return; +// 2. (Optional) If Status is present, require Approved/Reactivate +if (status && !PROVISION_TRIGGER_STATUSES.has(status)) return; -// 3. Don't trigger if already activated -if (activationStatus === "Activated") return; - -// 4. Don't trigger if already has WHMCS Order ID +// 3. Don't trigger if already has WHMCS Order ID if (whmcsOrderId) return; ``` @@ -74,55 +73,54 @@ if (whmcsOrderId) return; ``` TIME: 10:00:00 - Admin clicks "Approve" in Salesforce ↓ - Order Status: "Pending Review" → "Approved" + Salesforce Flow: + - Sets Activation_Status__c = "Activating" + - Clears Activation_Error_* fields ↓ TIME: 10:00:01 - CDC event published (automatic) { "Id": "801xxx", - "Status": "Approved", - "Activation_Status__c": null, - "WHMCS_Order_ID__c": null, - "changedFields": ["Status"] + "Activation_Status__c": "Activating", + "Activation_Error_Code__c": null, + "Activation_Error_Message__c": null, + "changedFields": ["Activation_Status__c", "Activation_Error_Code__c", "Activation_Error_Message__c"] } ↓ TIME: 10:00:02 - OrderCdcSubscriber.handleOrderEvent() ↓ - Step 1: Check if Status field changed - → Yes, Status in changedFields + Step 1: Check if Activation_Status__c changed + → Yes, value is "Activating" ↓ - Step 2: handleStatusChange() - → newStatus = "Approved" ✅ - → activationStatus = null ✅ (not provisioning) + Step 2: handleActivationStatusChange() + → activationStatus = "Activating" ✅ + → status (if provided) = "Approved" ✅ → whmcsOrderId = null ✅ (not provisioned) ↓ Step 3: Enqueue provisioning job provisioningQueue.enqueue({ sfOrderId: "801xxx", - idempotencyKey: "cdc-status-1699999999999-801xxx", + idempotencyKey: "cdc-activation-1699999999999-801xxx", correlationId: "cdc-order-801xxx" }) ↓ - Log: "Order status changed to provision trigger via CDC" + Log: "Order activation moved to Activating via CDC, enqueuing fulfillment" ↓ TIME: 10:00:03 - Provisioning processor picks up job ↓ - Executes fulfillment + Executes fulfillment (Activation_Status__c already = "Activating" so guard passes) ↓ Updates Salesforce: - - Activation_Status__c: "Activating" - - Then: "Activated" - - WHMCS_Order_ID__c: "12345" - Status: "Completed" + - Activation_Status__c: "Activated" + - WHMCS_Order_ID__c: "12345" ↓ TIME: 10:00:05 - CDC events for status updates - Event 1: Activation_Status__c changed - → OrderCdcSubscriber checks - → Is internal field → Skip cache invalidation ✅ - → Status didn't change → Skip provisioning ✅ - + Event 1: Activation_Status__c changed to "Activated" + → Internal field → Skip cache invalidation ✅ + → Not "Activating" → Skip provisioning ✅ + Event 2: Status → "Completed" - → OrderCdcSubscriber checks - → Status changed but not "Approved" → Skip provisioning ✅ + → Status changed but not "Approved"/"Reactivate" → Skip provisioning ✅ → Customer-facing field → Invalidate cache ✅ ``` diff --git a/env/portal-backend.env.sample b/env/portal-backend.env.sample index c20b9cd9..8d478bc3 100644 --- a/env/portal-backend.env.sample +++ b/env/portal-backend.env.sample @@ -96,7 +96,6 @@ SF_QUEUE_LONG_RUNNING_TIMEOUT_MS=600000 # Salesforce Platform Events (Provisioning) SF_EVENTS_ENABLED=true -SF_PROVISION_EVENT_CHANNEL=/event/Order_Fulfilment_Requested__e SF_CATALOG_EVENT_CHANNEL=/event/Product_and_Pricebook_Change__e SF_ACCOUNT_EVENT_CHANNEL=/event/Account_Internet_Eligibility_Update__e SF_ORDER_EVENT_CHANNEL=/event/Order_Fulfilment_Requested__e