370 lines
12 KiB
TypeScript
370 lines
12 KiB
TypeScript
import { Injectable, OnModuleInit, OnModuleDestroy, Inject } from "@nestjs/common";
|
||
import { ConfigService } from "@nestjs/config";
|
||
import { Logger } from "nestjs-pino";
|
||
import PubSubApiClientPkg from "salesforce-pubsub-api-client";
|
||
import { SalesforceConnection } from "../services/salesforce-connection.service";
|
||
import { ProvisioningQueueService } from "@bff/modules/orders/queue/provisioning.queue";
|
||
import { CacheService } from "@bff/infra/cache/cache.service";
|
||
import {
|
||
replayKey as sfReplayKey,
|
||
statusKey as sfStatusKey,
|
||
latestSeenKey as sfLatestSeenKey,
|
||
} from "./event-keys.util";
|
||
import type {
|
||
SalesforcePubSubEvent,
|
||
SalesforcePubSubError,
|
||
SalesforcePubSubSubscription,
|
||
SalesforcePubSubCallbackType,
|
||
SalesforcePubSubUnknownData,
|
||
} from "../types/pubsub-events.types";
|
||
|
||
type SubscribeCallback = (
|
||
subscription: SalesforcePubSubSubscription,
|
||
callbackType: SalesforcePubSubCallbackType,
|
||
data: SalesforcePubSubEvent | SalesforcePubSubError | SalesforcePubSubUnknownData
|
||
) => void | Promise<void>;
|
||
|
||
interface PubSubClient {
|
||
connect(): Promise<void>;
|
||
subscribe(topic: string, cb: SubscribeCallback, numRequested?: number): Promise<void>;
|
||
subscribeFromReplayId(
|
||
topic: string,
|
||
cb: SubscribeCallback,
|
||
numRequested: number | null,
|
||
replayId: number
|
||
): Promise<void>;
|
||
subscribeFromEarliestEvent(
|
||
topic: string,
|
||
cb: SubscribeCallback,
|
||
numRequested?: number
|
||
): Promise<void>;
|
||
requestAdditionalEvents(topic: string, numRequested: number): Promise<void>;
|
||
close(): Promise<void>;
|
||
}
|
||
|
||
type PubSubCtor = new (opts: {
|
||
authType: string;
|
||
accessToken: string;
|
||
instanceUrl: string;
|
||
pubSubEndpoint: string;
|
||
}) => PubSubClient;
|
||
|
||
@Injectable()
|
||
export class SalesforcePubSubSubscriber implements OnModuleInit, OnModuleDestroy {
|
||
private client: PubSubClient | null = null;
|
||
private clientAccessToken: string | null = null;
|
||
private channel!: string;
|
||
private replayCorruptionRecovered = false;
|
||
private subscribeCallback!: SubscribeCallback;
|
||
private pubSubCtor: PubSubCtor | null = null;
|
||
private clientBuildInFlight: Promise<PubSubClient> | null = null;
|
||
|
||
constructor(
|
||
private readonly config: ConfigService,
|
||
private readonly sfConn: SalesforceConnection,
|
||
private readonly provisioningQueue: ProvisioningQueueService,
|
||
private readonly cache: CacheService,
|
||
@Inject(Logger) private readonly logger: Logger
|
||
) {}
|
||
|
||
async onModuleInit(): Promise<void> {
|
||
const enabled = this.config.get("SF_EVENTS_ENABLED", "false") === "true";
|
||
if (!enabled) {
|
||
this.logger.log("Salesforce Pub/Sub subscriber disabled", { enabled });
|
||
return;
|
||
}
|
||
|
||
this.channel = this.config.get<string>(
|
||
"SF_PROVISION_EVENT_CHANNEL",
|
||
"/event/Order_Fulfilment_Requested__e"
|
||
);
|
||
|
||
try {
|
||
this.subscribeCallback = this.buildSubscribeCallback();
|
||
await this.subscribeWithPolicy(true);
|
||
} catch (error) {
|
||
this.logger.error("Salesforce Pub/Sub subscription failed", {
|
||
error: error instanceof Error ? error.message : String(error),
|
||
});
|
||
try {
|
||
await this.cache.set(sfStatusKey(this.channel || "/event/OrderProvisionRequested__e"), {
|
||
status: "disconnected",
|
||
since: Date.now(),
|
||
});
|
||
} catch (cacheErr) {
|
||
this.logger.warn("Failed to set SF Pub/Sub disconnected status", {
|
||
error: cacheErr instanceof Error ? cacheErr.message : String(cacheErr),
|
||
});
|
||
}
|
||
}
|
||
}
|
||
|
||
async onModuleDestroy(): Promise<void> {
|
||
try {
|
||
await this.safeCloseClient();
|
||
await this.cache.set(sfStatusKey(this.channel), {
|
||
status: "disconnected",
|
||
since: Date.now(),
|
||
});
|
||
} catch (error) {
|
||
this.logger.warn("Error closing Salesforce Pub/Sub client", {
|
||
error: error instanceof Error ? error.message : String(error),
|
||
});
|
||
}
|
||
}
|
||
|
||
private buildSubscribeCallback(): SubscribeCallback {
|
||
return async (subscription, callbackType, data) => {
|
||
try {
|
||
const argTypes = [typeof subscription, typeof callbackType, typeof data];
|
||
const type = callbackType;
|
||
const typeNorm = String(type || "").toLowerCase();
|
||
const topic = subscription.topicName || this.channel;
|
||
|
||
if (typeNorm === "data" || typeNorm === "event") {
|
||
const event = data as SalesforcePubSubEvent;
|
||
this.logger.debug("SF Pub/Sub data callback received", {
|
||
topic,
|
||
argTypes,
|
||
hasPayload: Boolean(event?.payload),
|
||
});
|
||
const payload = event?.payload;
|
||
|
||
const orderIdVal = payload?.["OrderId__c"] ?? payload?.["OrderId"];
|
||
const orderId = typeof orderIdVal === "string" ? orderIdVal : undefined;
|
||
if (!orderId) {
|
||
this.logger.warn("Pub/Sub event missing OrderId__c; skipping", {
|
||
argTypes,
|
||
topic,
|
||
payloadKeys: payload ? Object.keys(payload) : [],
|
||
});
|
||
const depth = await this.provisioningQueue.depth();
|
||
if (depth < this.getMaxQueueSize()) {
|
||
const activeClient = this.client;
|
||
if (activeClient) {
|
||
await activeClient.requestAdditionalEvents(topic, 1);
|
||
}
|
||
}
|
||
return;
|
||
}
|
||
|
||
const replayVal = (event as { replayId?: unknown })?.replayId;
|
||
const idempotencyKey =
|
||
typeof replayVal === "number" || typeof replayVal === "string"
|
||
? String(replayVal)
|
||
: String(Date.now());
|
||
const pubsubReplayId = typeof replayVal === "number" ? replayVal : undefined;
|
||
|
||
await this.provisioningQueue.enqueue({
|
||
sfOrderId: orderId,
|
||
idempotencyKey,
|
||
pubsubReplayId,
|
||
});
|
||
this.logger.log("Enqueued provisioning job from SF event", {
|
||
sfOrderId: orderId,
|
||
replayId: pubsubReplayId,
|
||
topic,
|
||
});
|
||
} else if (typeNorm === "lastevent") {
|
||
const depth = await this.provisioningQueue.depth();
|
||
const available = Math.max(0, this.getMaxQueueSize() - depth);
|
||
const desired = Math.max(0, Math.min(this.getNumRequested(), available));
|
||
if (desired > 0) {
|
||
const activeClient = this.client;
|
||
if (activeClient) {
|
||
await activeClient.requestAdditionalEvents(topic, desired);
|
||
}
|
||
}
|
||
} else if (typeNorm === "grpckeepalive") {
|
||
const latestVal = (data as { latestReplayId?: unknown })?.latestReplayId;
|
||
const latest = typeof latestVal === "number" ? latestVal : undefined;
|
||
if (typeof latest === "number") {
|
||
await this.cache.set(sfLatestSeenKey(this.channel), {
|
||
id: String(latest),
|
||
at: Date.now(),
|
||
});
|
||
}
|
||
} else if (typeNorm === "grpcstatus" || typeNorm === "end") {
|
||
// Informational – no action required
|
||
} else if (typeNorm === "error") {
|
||
this.logger.warn("SF Pub/Sub stream error", { topic, data });
|
||
try {
|
||
const errorData = data as SalesforcePubSubError;
|
||
const details = errorData.details || "";
|
||
const metadata = errorData.metadata || {};
|
||
const errorCodes = Array.isArray(metadata["error-code"])
|
||
? metadata["error-code"]
|
||
: [];
|
||
const hasCorruptionCode = errorCodes.some(code =>
|
||
String(code).includes("replayid.corrupted")
|
||
);
|
||
const mentionsReplayValidation = /Replay ID validation failed/i.test(details);
|
||
|
||
if ((hasCorruptionCode || mentionsReplayValidation) && !this.replayCorruptionRecovered) {
|
||
this.replayCorruptionRecovered = true;
|
||
const key = sfReplayKey(this.channel);
|
||
await this.cache.del(key);
|
||
this.logger.warn(
|
||
"Cleared invalid Salesforce Pub/Sub replay cursor; retrying subscription",
|
||
{
|
||
channel: this.channel,
|
||
key,
|
||
}
|
||
);
|
||
}
|
||
} catch (recoveryErr) {
|
||
this.logger.warn("SF Pub/Sub replay corruption auto-recovery failed", {
|
||
error: recoveryErr instanceof Error ? recoveryErr.message : String(recoveryErr),
|
||
});
|
||
} finally {
|
||
await this.recoverFromStreamError();
|
||
}
|
||
} else {
|
||
const maybeEvent = data as SalesforcePubSubEvent | undefined;
|
||
const hasPayload = Boolean(maybeEvent?.payload);
|
||
this.logger.debug("SF Pub/Sub callback ignored (unknown type)", {
|
||
type,
|
||
topic,
|
||
argTypes,
|
||
hasPayload,
|
||
});
|
||
}
|
||
} catch (err) {
|
||
this.logger.error("Pub/Sub subscribe callback failed", {
|
||
error: err instanceof Error ? err.message : String(err),
|
||
});
|
||
}
|
||
};
|
||
}
|
||
|
||
private getNumRequested(): number {
|
||
return Number(this.config.get("SF_PUBSUB_NUM_REQUESTED", "50")) || 50;
|
||
}
|
||
|
||
private getMaxQueueSize(): number {
|
||
return Number(this.config.get("SF_PUBSUB_QUEUE_MAX", "100")) || 100;
|
||
}
|
||
|
||
private getPubSubCtor(): PubSubCtor {
|
||
if (this.pubSubCtor) {
|
||
return this.pubSubCtor;
|
||
}
|
||
const maybeCtor =
|
||
(PubSubApiClientPkg as { default?: unknown })?.default ?? (PubSubApiClientPkg as unknown);
|
||
this.pubSubCtor = maybeCtor as PubSubCtor;
|
||
return this.pubSubCtor;
|
||
}
|
||
|
||
private async ensureClient(forceRefresh = false): Promise<PubSubClient> {
|
||
if (this.clientBuildInFlight && !forceRefresh) {
|
||
return this.clientBuildInFlight;
|
||
}
|
||
|
||
this.clientBuildInFlight = (async () => {
|
||
await this.sfConn.ensureConnected();
|
||
const accessToken = this.sfConn.getAccessToken();
|
||
const instanceUrl = this.sfConn.getInstanceUrl();
|
||
if (!accessToken || !instanceUrl) {
|
||
throw new Error("Salesforce access token || instance URL unavailable");
|
||
}
|
||
|
||
const tokenChanged = this.clientAccessToken !== accessToken;
|
||
|
||
if (!this.client || forceRefresh || tokenChanged) {
|
||
await this.safeCloseClient();
|
||
|
||
const endpoint = this.config.get<string>(
|
||
"SF_PUBSUB_ENDPOINT",
|
||
"api.pubsub.salesforce.com:7443"
|
||
);
|
||
const Ctor = this.getPubSubCtor();
|
||
const client = new Ctor({
|
||
authType: "user-supplied",
|
||
accessToken,
|
||
instanceUrl,
|
||
pubSubEndpoint: endpoint,
|
||
});
|
||
|
||
await client.connect();
|
||
this.client = client;
|
||
this.clientAccessToken = accessToken;
|
||
this.replayCorruptionRecovered = false;
|
||
}
|
||
|
||
return this.client!;
|
||
})();
|
||
|
||
try {
|
||
return await this.clientBuildInFlight;
|
||
} finally {
|
||
this.clientBuildInFlight = null;
|
||
}
|
||
}
|
||
|
||
private async subscribeWithPolicy(forceClientRefresh = false): Promise<void> {
|
||
if (!this.subscribeCallback) {
|
||
throw new Error("Subscribe callback not initialized");
|
||
}
|
||
|
||
await this.cache.set(sfStatusKey(this.channel), {
|
||
status: "connecting",
|
||
since: Date.now(),
|
||
});
|
||
|
||
const client = await this.ensureClient(forceClientRefresh);
|
||
|
||
const replayMode = this.config.get<string>("SF_EVENTS_REPLAY", "LATEST");
|
||
const replayKey = sfReplayKey(this.channel);
|
||
const storedReplay = replayMode !== "ALL" ? await this.cache.get<string>(replayKey) : null;
|
||
const numRequested = this.getNumRequested();
|
||
|
||
if (storedReplay && replayMode !== "ALL") {
|
||
await client.subscribeFromReplayId(
|
||
this.channel,
|
||
this.subscribeCallback,
|
||
numRequested,
|
||
Number(storedReplay)
|
||
);
|
||
} else if (replayMode === "ALL") {
|
||
await client.subscribeFromEarliestEvent(
|
||
this.channel,
|
||
this.subscribeCallback,
|
||
numRequested
|
||
);
|
||
} else {
|
||
await client.subscribe(this.channel, this.subscribeCallback, numRequested);
|
||
}
|
||
|
||
await this.cache.set(sfStatusKey(this.channel), {
|
||
status: "connected",
|
||
since: Date.now(),
|
||
});
|
||
this.logger.log("Salesforce Pub/Sub subscription active", { channel: this.channel });
|
||
}
|
||
|
||
private async recoverFromStreamError(): Promise<void> {
|
||
await this.cache.set(sfStatusKey(this.channel), {
|
||
status: "reconnecting",
|
||
since: Date.now(),
|
||
});
|
||
await this.safeCloseClient();
|
||
await this.subscribeWithPolicy(true);
|
||
}
|
||
|
||
private async safeCloseClient(): Promise<void> {
|
||
if (!this.client) {
|
||
return;
|
||
}
|
||
try {
|
||
await this.client.close();
|
||
} catch (error) {
|
||
this.logger.warn("Failed to close Salesforce Pub/Sub client", {
|
||
error: error instanceof Error ? error.message : String(error),
|
||
});
|
||
} finally {
|
||
this.client = null;
|
||
this.clientAccessToken = null;
|
||
}
|
||
}
|
||
}
|