2025-09-06 10:01:44 +09:00
|
|
|
import { Injectable, OnModuleInit, OnModuleDestroy, Inject } from "@nestjs/common";
|
|
|
|
|
import { ConfigService } from "@nestjs/config";
|
|
|
|
|
import { Logger } from "nestjs-pino";
|
|
|
|
|
import PubSubApiClientPkg from "salesforce-pubsub-api-client";
|
|
|
|
|
import { SalesforceConnection } from "../services/salesforce-connection.service";
|
2025-09-18 14:52:26 +09:00
|
|
|
import { ProvisioningQueueService } from "@bff/modules/orders/queue/provisioning.queue";
|
2025-09-17 18:43:43 +09:00
|
|
|
import { CacheService } from "@bff/infra/cache/cache.service";
|
2025-09-06 10:01:44 +09:00
|
|
|
import {
|
|
|
|
|
replayKey as sfReplayKey,
|
|
|
|
|
statusKey as sfStatusKey,
|
|
|
|
|
latestSeenKey as sfLatestSeenKey,
|
|
|
|
|
} from "./event-keys.util";
|
2025-09-25 16:23:24 +09:00
|
|
|
import type {
|
|
|
|
|
SalesforcePubSubEvent,
|
|
|
|
|
SalesforcePubSubError,
|
|
|
|
|
SalesforcePubSubSubscription,
|
|
|
|
|
SalesforcePubSubCallbackType,
|
|
|
|
|
} from "../types/pubsub-events.types";
|
2025-09-06 10:01:44 +09:00
|
|
|
|
|
|
|
|
type SubscribeCallback = (
|
2025-09-25 16:23:24 +09:00
|
|
|
subscription: SalesforcePubSubSubscription,
|
|
|
|
|
callbackType: SalesforcePubSubCallbackType,
|
|
|
|
|
data: SalesforcePubSubEvent | SalesforcePubSubError | unknown
|
2025-09-06 10:01:44 +09:00
|
|
|
) => void | Promise<void>;
|
|
|
|
|
|
|
|
|
|
interface PubSubClient {
|
|
|
|
|
connect(): Promise<void>;
|
2025-09-06 13:58:54 +09:00
|
|
|
subscribe(topic: string, cb: SubscribeCallback, numRequested?: number): Promise<void>;
|
2025-09-06 10:01:44 +09:00
|
|
|
subscribeFromReplayId(
|
|
|
|
|
topic: string,
|
|
|
|
|
cb: SubscribeCallback,
|
2025-09-06 13:58:54 +09:00
|
|
|
numRequested: number | null,
|
2025-09-06 10:01:44 +09:00
|
|
|
replayId: number
|
|
|
|
|
): Promise<void>;
|
|
|
|
|
subscribeFromEarliestEvent(
|
|
|
|
|
topic: string,
|
|
|
|
|
cb: SubscribeCallback,
|
2025-09-06 13:58:54 +09:00
|
|
|
numRequested?: number
|
2025-09-06 10:01:44 +09:00
|
|
|
): Promise<void>;
|
|
|
|
|
requestAdditionalEvents(topic: string, numRequested: number): Promise<void>;
|
|
|
|
|
close(): Promise<void>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type PubSubCtor = new (opts: {
|
|
|
|
|
authType: string;
|
|
|
|
|
accessToken: string;
|
|
|
|
|
instanceUrl: string;
|
|
|
|
|
pubSubEndpoint: string;
|
|
|
|
|
}) => PubSubClient;
|
|
|
|
|
|
|
|
|
|
@Injectable()
|
|
|
|
|
export class SalesforcePubSubSubscriber implements OnModuleInit, OnModuleDestroy {
|
|
|
|
|
private client: PubSubClient | null = null;
|
|
|
|
|
private channel!: string;
|
2025-09-11 13:17:10 +09:00
|
|
|
private replayCorruptionRecovered = false;
|
|
|
|
|
private subscribeCallback!: SubscribeCallback;
|
2025-09-06 10:01:44 +09:00
|
|
|
|
|
|
|
|
constructor(
|
|
|
|
|
private readonly config: ConfigService,
|
|
|
|
|
private readonly sfConn: SalesforceConnection,
|
|
|
|
|
private readonly provisioningQueue: ProvisioningQueueService,
|
|
|
|
|
private readonly cache: CacheService,
|
|
|
|
|
@Inject(Logger) private readonly logger: Logger
|
|
|
|
|
) {}
|
|
|
|
|
|
|
|
|
|
async onModuleInit(): Promise<void> {
|
|
|
|
|
const enabled = this.config.get("SF_EVENTS_ENABLED", "false") === "true";
|
|
|
|
|
if (!enabled) {
|
|
|
|
|
this.logger.log("Salesforce Pub/Sub subscriber disabled", { enabled });
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.channel = this.config.get<string>(
|
|
|
|
|
"SF_PROVISION_EVENT_CHANNEL",
|
|
|
|
|
"/event/Order_Fulfilment_Requested__e"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await this.sfConn.connect();
|
|
|
|
|
const accessToken = this.sfConn.getAccessToken();
|
|
|
|
|
const instanceUrl = this.sfConn.getInstanceUrl();
|
|
|
|
|
if (!accessToken || !instanceUrl) {
|
|
|
|
|
throw new Error("Salesforce access token || instance URL unavailable");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const endpoint = this.config.get<string>(
|
|
|
|
|
"SF_PUBSUB_ENDPOINT",
|
|
|
|
|
"api.pubsub.salesforce.com:7443"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
const maybeCtor: unknown =
|
|
|
|
|
(PubSubApiClientPkg as { default?: unknown })?.default ?? (PubSubApiClientPkg as unknown);
|
|
|
|
|
const Ctor = maybeCtor as PubSubCtor;
|
|
|
|
|
this.client = new Ctor({
|
|
|
|
|
authType: "user-supplied",
|
|
|
|
|
accessToken,
|
|
|
|
|
instanceUrl,
|
|
|
|
|
pubSubEndpoint: endpoint,
|
|
|
|
|
});
|
|
|
|
|
await this.client.connect();
|
|
|
|
|
if (!this.client) throw new Error("Pub/Sub client not initialized after connect");
|
|
|
|
|
const client = this.client;
|
|
|
|
|
|
2025-09-25 15:11:28 +09:00
|
|
|
const _replayKey = sfReplayKey(this.channel);
|
|
|
|
|
const _replayMode = this.config.get<string>("SF_EVENTS_REPLAY", "LATEST");
|
2025-09-06 10:01:44 +09:00
|
|
|
const numRequested = Number(this.config.get("SF_PUBSUB_NUM_REQUESTED", "50")) || 50;
|
|
|
|
|
const maxQueue = Number(this.config.get("SF_PUBSUB_QUEUE_MAX", "100")) || 100;
|
|
|
|
|
|
|
|
|
|
await this.cache.set(sfStatusKey(this.channel), {
|
|
|
|
|
status: "connecting",
|
|
|
|
|
since: Date.now(),
|
|
|
|
|
});
|
|
|
|
|
|
2025-09-11 13:17:10 +09:00
|
|
|
this.subscribeCallback = async (subscription, callbackType, data) => {
|
2025-09-06 10:01:44 +09:00
|
|
|
try {
|
|
|
|
|
const argTypes = [typeof subscription, typeof callbackType, typeof data];
|
2025-09-06 13:58:54 +09:00
|
|
|
const type = callbackType;
|
|
|
|
|
const typeNorm = String(type || "").toLowerCase();
|
2025-09-25 16:23:24 +09:00
|
|
|
const topic = subscription.topicName || this.channel;
|
2025-09-06 13:58:54 +09:00
|
|
|
|
|
|
|
|
if (typeNorm === "data" || typeNorm === "event") {
|
2025-09-25 16:23:24 +09:00
|
|
|
const event = data as SalesforcePubSubEvent;
|
2025-09-06 13:58:54 +09:00
|
|
|
// Basic breadcrumb to confirm we are handling data callbacks
|
|
|
|
|
this.logger.debug("SF Pub/Sub data callback received", {
|
|
|
|
|
topic,
|
|
|
|
|
argTypes,
|
2025-09-25 16:23:24 +09:00
|
|
|
hasPayload: Boolean(event?.payload),
|
2025-09-06 13:58:54 +09:00
|
|
|
});
|
2025-09-25 16:23:24 +09:00
|
|
|
const payload = event?.payload;
|
2025-09-06 10:01:44 +09:00
|
|
|
|
|
|
|
|
// Only check parsed payload
|
|
|
|
|
const orderIdVal = payload?.["OrderId__c"] ?? payload?.["OrderId"];
|
|
|
|
|
const orderId = typeof orderIdVal === "string" ? orderIdVal : undefined;
|
|
|
|
|
if (!orderId) {
|
2025-09-09 18:19:54 +09:00
|
|
|
this.logger.warn("Pub/Sub event missing OrderId__c; skipping", {
|
|
|
|
|
argTypes,
|
|
|
|
|
topic,
|
|
|
|
|
payloadKeys: payload ? Object.keys(payload) : [],
|
|
|
|
|
});
|
2025-09-06 10:01:44 +09:00
|
|
|
const depth = await this.provisioningQueue.depth();
|
|
|
|
|
if (depth < maxQueue) {
|
|
|
|
|
await client.requestAdditionalEvents(topic, 1);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const replayVal = (event as { replayId?: unknown })?.replayId;
|
|
|
|
|
const idempotencyKey =
|
|
|
|
|
typeof replayVal === "number" || typeof replayVal === "string"
|
|
|
|
|
? String(replayVal)
|
|
|
|
|
: String(Date.now());
|
|
|
|
|
const pubsubReplayId = typeof replayVal === "number" ? replayVal : undefined;
|
|
|
|
|
|
|
|
|
|
await this.provisioningQueue.enqueue({
|
|
|
|
|
sfOrderId: orderId,
|
|
|
|
|
idempotencyKey,
|
|
|
|
|
pubsubReplayId,
|
|
|
|
|
});
|
|
|
|
|
this.logger.log("Enqueued provisioning job from SF event", {
|
|
|
|
|
sfOrderId: orderId,
|
|
|
|
|
replayId: pubsubReplayId,
|
|
|
|
|
topic,
|
|
|
|
|
});
|
|
|
|
|
|
2025-09-06 13:58:54 +09:00
|
|
|
// Do not request more here; rely on 'lastevent' to top-up
|
|
|
|
|
} else if (typeNorm === "lastevent") {
|
2025-09-06 10:01:44 +09:00
|
|
|
const depth = await this.provisioningQueue.depth();
|
|
|
|
|
const available = Math.max(0, maxQueue - depth);
|
|
|
|
|
const desired = Math.max(0, Math.min(numRequested, available));
|
|
|
|
|
if (desired > 0) {
|
|
|
|
|
await client.requestAdditionalEvents(topic, desired);
|
|
|
|
|
}
|
2025-09-06 13:58:54 +09:00
|
|
|
} else if (typeNorm === "grpckeepalive") {
|
|
|
|
|
const latestVal = (data as { latestReplayId?: unknown })?.latestReplayId;
|
2025-09-06 10:01:44 +09:00
|
|
|
const latest = typeof latestVal === "number" ? latestVal : undefined;
|
|
|
|
|
if (typeof latest === "number") {
|
|
|
|
|
await this.cache.set(sfLatestSeenKey(this.channel), {
|
|
|
|
|
id: String(latest),
|
|
|
|
|
at: Date.now(),
|
|
|
|
|
});
|
|
|
|
|
}
|
2025-09-06 13:58:54 +09:00
|
|
|
} else if (typeNorm === "grpcstatus" || typeNorm === "end") {
|
|
|
|
|
// No-op; informational
|
|
|
|
|
} else if (typeNorm === "error") {
|
|
|
|
|
this.logger.warn("SF Pub/Sub stream error", { topic, data });
|
2025-09-11 13:17:10 +09:00
|
|
|
try {
|
|
|
|
|
// Detect replay id corruption and auto-recover once by clearing the cursor and resubscribing
|
2025-09-25 16:23:24 +09:00
|
|
|
const errorData = data as SalesforcePubSubError;
|
|
|
|
|
const details = errorData.details || "";
|
|
|
|
|
const metadata = errorData.metadata || {};
|
2025-09-25 17:42:36 +09:00
|
|
|
const errorCodes = Array.isArray(metadata["error-code"])
|
|
|
|
|
? metadata["error-code"]
|
|
|
|
|
: [];
|
2025-09-25 16:23:24 +09:00
|
|
|
const hasCorruptionCode = errorCodes.some(code =>
|
|
|
|
|
String(code).includes("replayid.corrupted")
|
2025-09-17 18:43:43 +09:00
|
|
|
);
|
2025-09-11 13:17:10 +09:00
|
|
|
const mentionsReplayValidation = /Replay ID validation failed/i.test(details);
|
|
|
|
|
|
2025-09-17 18:43:43 +09:00
|
|
|
if (
|
|
|
|
|
(hasCorruptionCode || mentionsReplayValidation) &&
|
|
|
|
|
!this.replayCorruptionRecovered
|
|
|
|
|
) {
|
2025-09-11 13:17:10 +09:00
|
|
|
this.replayCorruptionRecovered = true;
|
|
|
|
|
const key = sfReplayKey(this.channel);
|
|
|
|
|
await this.cache.del(key);
|
2025-09-17 18:43:43 +09:00
|
|
|
this.logger.warn(
|
|
|
|
|
"Cleared invalid Salesforce Pub/Sub replay cursor; retrying subscription",
|
|
|
|
|
{
|
|
|
|
|
channel: this.channel,
|
|
|
|
|
key,
|
|
|
|
|
}
|
|
|
|
|
);
|
2025-09-11 13:17:10 +09:00
|
|
|
await this.cache.set(sfStatusKey(this.channel), {
|
|
|
|
|
status: "reconnecting",
|
|
|
|
|
since: Date.now(),
|
|
|
|
|
});
|
|
|
|
|
// Try re-subscribing without the invalid cursor
|
|
|
|
|
await this.subscribeWithPolicy();
|
|
|
|
|
}
|
|
|
|
|
} catch (recoveryErr) {
|
|
|
|
|
this.logger.warn("SF Pub/Sub replay corruption auto-recovery failed", {
|
|
|
|
|
error: recoveryErr instanceof Error ? recoveryErr.message : String(recoveryErr),
|
|
|
|
|
});
|
|
|
|
|
}
|
2025-09-06 13:58:54 +09:00
|
|
|
} else {
|
|
|
|
|
// Unknown callback type: log once with minimal context
|
2025-09-25 16:23:24 +09:00
|
|
|
const maybeEvent = data as SalesforcePubSubEvent | undefined;
|
|
|
|
|
const hasPayload = Boolean(maybeEvent?.payload);
|
2025-09-06 13:58:54 +09:00
|
|
|
this.logger.debug("SF Pub/Sub callback ignored (unknown type)", {
|
|
|
|
|
type,
|
|
|
|
|
topic,
|
|
|
|
|
argTypes,
|
|
|
|
|
hasPayload,
|
|
|
|
|
});
|
2025-09-06 10:01:44 +09:00
|
|
|
}
|
|
|
|
|
} catch (err) {
|
|
|
|
|
this.logger.error("Pub/Sub subscribe callback failed", {
|
|
|
|
|
error: err instanceof Error ? err.message : String(err),
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2025-09-11 13:17:10 +09:00
|
|
|
await this.subscribeWithPolicy();
|
2025-09-06 10:01:44 +09:00
|
|
|
} catch (error) {
|
|
|
|
|
this.logger.error("Salesforce Pub/Sub subscription failed", {
|
|
|
|
|
error: error instanceof Error ? error.message : String(error),
|
|
|
|
|
});
|
|
|
|
|
try {
|
|
|
|
|
await this.cache.set(sfStatusKey(this.channel || "/event/OrderProvisionRequested__e"), {
|
|
|
|
|
status: "disconnected",
|
|
|
|
|
since: Date.now(),
|
|
|
|
|
});
|
|
|
|
|
} catch (cacheErr) {
|
|
|
|
|
this.logger.warn("Failed to set SF Pub/Sub disconnected status", {
|
|
|
|
|
error: cacheErr instanceof Error ? cacheErr.message : String(cacheErr),
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async onModuleDestroy(): Promise<void> {
|
|
|
|
|
try {
|
|
|
|
|
if (this.client) {
|
|
|
|
|
await this.client.close();
|
|
|
|
|
this.client = null;
|
|
|
|
|
}
|
|
|
|
|
await this.cache.set(sfStatusKey(this.channel), {
|
|
|
|
|
status: "disconnected",
|
|
|
|
|
since: Date.now(),
|
|
|
|
|
});
|
|
|
|
|
} catch (error) {
|
|
|
|
|
this.logger.warn("Error closing Salesforce Pub/Sub client", {
|
|
|
|
|
error: error instanceof Error ? error.message : String(error),
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-11 13:17:10 +09:00
|
|
|
private async subscribeWithPolicy(): Promise<void> {
|
|
|
|
|
if (!this.client) throw new Error("Pub/Sub client not initialized");
|
|
|
|
|
if (!this.subscribeCallback) throw new Error("Subscribe callback not initialized");
|
|
|
|
|
|
2025-09-25 15:11:28 +09:00
|
|
|
const _replayMode = this.config.get<string>("SF_EVENTS_REPLAY", "LATEST");
|
2025-09-11 13:17:10 +09:00
|
|
|
const numRequested = Number(this.config.get("SF_PUBSUB_NUM_REQUESTED", "50")) || 50;
|
2025-09-25 15:11:28 +09:00
|
|
|
const _replayKey = sfReplayKey(this.channel);
|
|
|
|
|
const storedReplay = _replayMode !== "ALL" ? await this.cache.get<string>(_replayKey) : null;
|
2025-09-11 13:17:10 +09:00
|
|
|
|
2025-09-25 15:11:28 +09:00
|
|
|
if (storedReplay && _replayMode !== "ALL") {
|
2025-09-11 13:17:10 +09:00
|
|
|
await this.client.subscribeFromReplayId(
|
|
|
|
|
this.channel,
|
|
|
|
|
this.subscribeCallback,
|
|
|
|
|
numRequested,
|
|
|
|
|
Number(storedReplay)
|
|
|
|
|
);
|
2025-09-25 15:11:28 +09:00
|
|
|
} else if (_replayMode === "ALL") {
|
2025-09-11 13:17:10 +09:00
|
|
|
await this.client.subscribeFromEarliestEvent(
|
|
|
|
|
this.channel,
|
|
|
|
|
this.subscribeCallback,
|
|
|
|
|
numRequested
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
await this.client.subscribe(this.channel, this.subscribeCallback, numRequested);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await this.cache.set(sfStatusKey(this.channel), {
|
|
|
|
|
status: "connected",
|
|
|
|
|
since: Date.now(),
|
|
|
|
|
});
|
|
|
|
|
this.logger.log("Salesforce Pub/Sub subscription active", { channel: this.channel });
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-06 10:01:44 +09:00
|
|
|
// keys moved to shared util
|
|
|
|
|
}
|