import { Injectable, OnModuleInit, OnModuleDestroy, Inject } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { Logger } from "nestjs-pino"; import PubSubApiClientPkg from "salesforce-pubsub-api-client"; import { SalesforceConnection } from "../services/salesforce-connection.service"; import { ProvisioningQueueService } from "@bff/modules/orders/queue/provisioning.queue"; import { CacheService } from "@bff/infra/cache/cache.service"; import { replayKey as sfReplayKey, statusKey as sfStatusKey, latestSeenKey as sfLatestSeenKey, } from "./event-keys.util"; import type { SalesforcePubSubEvent, SalesforcePubSubError, SalesforcePubSubSubscription, SalesforcePubSubCallbackType, SalesforcePubSubUnknownData, } from "../types/pubsub-events.types"; type SubscribeCallback = ( subscription: SalesforcePubSubSubscription, callbackType: SalesforcePubSubCallbackType, data: SalesforcePubSubEvent | SalesforcePubSubError | SalesforcePubSubUnknownData ) => void | Promise; interface PubSubClient { connect(): Promise; subscribe(topic: string, cb: SubscribeCallback, numRequested?: number): Promise; subscribeFromReplayId( topic: string, cb: SubscribeCallback, numRequested: number | null, replayId: number ): Promise; subscribeFromEarliestEvent( topic: string, cb: SubscribeCallback, numRequested?: number ): Promise; requestAdditionalEvents(topic: string, numRequested: number): Promise; close(): Promise; } type PubSubCtor = new (opts: { authType: string; accessToken: string; instanceUrl: string; pubSubEndpoint: string; }) => PubSubClient; @Injectable() export class SalesforcePubSubSubscriber implements OnModuleInit, OnModuleDestroy { private client: PubSubClient | null = null; private clientAccessToken: string | null = null; private channel!: string; private replayCorruptionRecovered = false; private subscribeCallback!: SubscribeCallback; private pubSubCtor: PubSubCtor | null = null; private clientBuildInFlight: Promise | null = null; constructor( private readonly config: ConfigService, private readonly sfConn: SalesforceConnection, private readonly provisioningQueue: ProvisioningQueueService, private readonly cache: CacheService, @Inject(Logger) private readonly logger: Logger ) {} async onModuleInit(): Promise { const enabled = this.config.get("SF_EVENTS_ENABLED", "false") === "true"; if (!enabled) { this.logger.log("Salesforce Pub/Sub subscriber disabled", { enabled }); return; } this.channel = this.config.get( "SF_PROVISION_EVENT_CHANNEL", "/event/Order_Fulfilment_Requested__e" ); try { this.subscribeCallback = this.buildSubscribeCallback(); await this.subscribeWithPolicy(true); } catch (error) { this.logger.error("Salesforce Pub/Sub subscription failed", { error: error instanceof Error ? error.message : String(error), }); try { await this.cache.set(sfStatusKey(this.channel || "/event/OrderProvisionRequested__e"), { status: "disconnected", since: Date.now(), }); } catch (cacheErr) { this.logger.warn("Failed to set SF Pub/Sub disconnected status", { error: cacheErr instanceof Error ? cacheErr.message : String(cacheErr), }); } } } async onModuleDestroy(): Promise { try { await this.safeCloseClient(); await this.cache.set(sfStatusKey(this.channel), { status: "disconnected", since: Date.now(), }); } catch (error) { this.logger.warn("Error closing Salesforce Pub/Sub client", { error: error instanceof Error ? error.message : String(error), }); } } private buildSubscribeCallback(): SubscribeCallback { return async (subscription, callbackType, data) => { try { const argTypes = [typeof subscription, typeof callbackType, typeof data]; const type = callbackType; const typeNorm = String(type || "").toLowerCase(); const topic = subscription.topicName || this.channel; if (typeNorm === "data" || typeNorm === "event") { const event = data as SalesforcePubSubEvent; this.logger.debug("SF Pub/Sub data callback received", { topic, argTypes, hasPayload: Boolean(event?.payload), }); const payload = event?.payload; const orderIdVal = payload?.["OrderId__c"] ?? payload?.["OrderId"]; const orderId = typeof orderIdVal === "string" ? orderIdVal : undefined; if (!orderId) { this.logger.warn("Pub/Sub event missing OrderId__c; skipping", { argTypes, topic, payloadKeys: payload ? Object.keys(payload) : [], }); const depth = await this.provisioningQueue.depth(); if (depth < this.getMaxQueueSize()) { const activeClient = this.client; if (activeClient) { await activeClient.requestAdditionalEvents(topic, 1); } } return; } const replayVal = (event as { replayId?: unknown })?.replayId; const idempotencyKey = typeof replayVal === "number" || typeof replayVal === "string" ? String(replayVal) : String(Date.now()); const pubsubReplayId = typeof replayVal === "number" ? replayVal : undefined; await this.provisioningQueue.enqueue({ sfOrderId: orderId, idempotencyKey, pubsubReplayId, }); this.logger.log("Enqueued provisioning job from SF event", { sfOrderId: orderId, replayId: pubsubReplayId, topic, }); } else if (typeNorm === "lastevent") { const depth = await this.provisioningQueue.depth(); const available = Math.max(0, this.getMaxQueueSize() - depth); const desired = Math.max(0, Math.min(this.getNumRequested(), available)); if (desired > 0) { const activeClient = this.client; if (activeClient) { await activeClient.requestAdditionalEvents(topic, desired); } } } else if (typeNorm === "grpckeepalive") { const latestVal = (data as { latestReplayId?: unknown })?.latestReplayId; const latest = typeof latestVal === "number" ? latestVal : undefined; if (typeof latest === "number") { await this.cache.set(sfLatestSeenKey(this.channel), { id: String(latest), at: Date.now(), }); } } else if (typeNorm === "grpcstatus" || typeNorm === "end") { // Informational – no action required } else if (typeNorm === "error") { this.logger.warn("SF Pub/Sub stream error", { topic, data }); try { const errorData = data as SalesforcePubSubError; const details = errorData.details || ""; const metadata = errorData.metadata || {}; const errorCodes = Array.isArray(metadata["error-code"]) ? metadata["error-code"] : []; const hasCorruptionCode = errorCodes.some(code => String(code).includes("replayid.corrupted") ); const mentionsReplayValidation = /Replay ID validation failed/i.test(details); if ( (hasCorruptionCode || mentionsReplayValidation) && !this.replayCorruptionRecovered ) { this.replayCorruptionRecovered = true; const key = sfReplayKey(this.channel); await this.cache.del(key); this.logger.warn( "Cleared invalid Salesforce Pub/Sub replay cursor; retrying subscription", { channel: this.channel, key, } ); } } catch (recoveryErr) { this.logger.warn("SF Pub/Sub replay corruption auto-recovery failed", { error: recoveryErr instanceof Error ? recoveryErr.message : String(recoveryErr), }); } finally { await this.recoverFromStreamError(); } } else { const maybeEvent = data as SalesforcePubSubEvent | undefined; const hasPayload = Boolean(maybeEvent?.payload); this.logger.debug("SF Pub/Sub callback ignored (unknown type)", { type, topic, argTypes, hasPayload, }); } } catch (err) { this.logger.error("Pub/Sub subscribe callback failed", { error: err instanceof Error ? err.message : String(err), }); } }; } private getNumRequested(): number { return Number(this.config.get("SF_PUBSUB_NUM_REQUESTED", "50")) || 50; } private getMaxQueueSize(): number { return Number(this.config.get("SF_PUBSUB_QUEUE_MAX", "100")) || 100; } private getPubSubCtor(): PubSubCtor { if (this.pubSubCtor) { return this.pubSubCtor; } const maybeCtor = (PubSubApiClientPkg as { default?: unknown })?.default ?? (PubSubApiClientPkg as unknown); this.pubSubCtor = maybeCtor as PubSubCtor; return this.pubSubCtor; } private async ensureClient(forceRefresh = false): Promise { if (this.clientBuildInFlight && !forceRefresh) { return this.clientBuildInFlight; } this.clientBuildInFlight = (async () => { await this.sfConn.ensureConnected(); const accessToken = this.sfConn.getAccessToken(); const instanceUrl = this.sfConn.getInstanceUrl(); if (!accessToken || !instanceUrl) { throw new Error("Salesforce access token || instance URL unavailable"); } const tokenChanged = this.clientAccessToken !== accessToken; if (!this.client || forceRefresh || tokenChanged) { await this.safeCloseClient(); const endpoint = this.config.get( "SF_PUBSUB_ENDPOINT", "api.pubsub.salesforce.com:7443" ); const Ctor = this.getPubSubCtor(); const client = new Ctor({ authType: "user-supplied", accessToken, instanceUrl, pubSubEndpoint: endpoint, }); await client.connect(); this.client = client; this.clientAccessToken = accessToken; this.replayCorruptionRecovered = false; } return this.client; })(); try { return await this.clientBuildInFlight; } finally { this.clientBuildInFlight = null; } } private async subscribeWithPolicy(forceClientRefresh = false): Promise { if (!this.subscribeCallback) { throw new Error("Subscribe callback not initialized"); } await this.cache.set(sfStatusKey(this.channel), { status: "connecting", since: Date.now(), }); const client = await this.ensureClient(forceClientRefresh); const replayMode = this.config.get("SF_EVENTS_REPLAY", "LATEST"); const replayKey = sfReplayKey(this.channel); const storedReplay = replayMode !== "ALL" ? await this.cache.get(replayKey) : null; const numRequested = this.getNumRequested(); if (storedReplay && replayMode !== "ALL") { await client.subscribeFromReplayId( this.channel, this.subscribeCallback, numRequested, Number(storedReplay) ); } else if (replayMode === "ALL") { await client.subscribeFromEarliestEvent(this.channel, this.subscribeCallback, numRequested); } else { await client.subscribe(this.channel, this.subscribeCallback, numRequested); } await this.cache.set(sfStatusKey(this.channel), { status: "connected", since: Date.now(), }); this.logger.log("Salesforce Pub/Sub subscription active", { channel: this.channel }); } private async recoverFromStreamError(): Promise { await this.cache.set(sfStatusKey(this.channel), { status: "reconnecting", since: Date.now(), }); await this.safeCloseClient(); await this.subscribeWithPolicy(true); } private async safeCloseClient(): Promise { if (!this.client) { return; } try { await this.client.close(); } catch (error) { this.logger.warn("Failed to close Salesforce Pub/Sub client", { error: error instanceof Error ? error.message : String(error), }); } finally { this.client = null; this.clientAccessToken = null; } } }