Refactor Salesforce event handling and caching mechanisms

- Removed the deprecated SF_PROVISION_EVENT_CHANNEL from environment configuration to streamline event handling.
- Enhanced CatalogCdcSubscriber to utilize product IDs for cache invalidation, improving cache management during CDC events.
- Updated OrderCdcSubscriber to trigger provisioning based on Activation_Status__c changes, refining order processing logic.
- Improved CatalogCacheService to support dependency tracking for cached values, enabling more efficient cache invalidation.
- Refactored provisioning processor to simplify job handling and removed unnecessary replay ID management, enhancing clarity and performance.
This commit is contained in:
barsa 2025-11-06 17:47:55 +09:00
parent 309dac630f
commit cbaa878000
14 changed files with 489 additions and 639 deletions

View File

@ -85,7 +85,6 @@ export const envSchema = z.object({
SF_QUEUE_LONG_RUNNING_TIMEOUT_MS: z.coerce.number().int().positive().default(600000),
SF_EVENTS_ENABLED: z.enum(["true", "false"]).default("false"),
SF_PROVISION_EVENT_CHANNEL: z.string().default("/event/Order_Fulfilment_Requested__e"),
SF_CATALOG_EVENT_CHANNEL: z.string().default("/event/Product_and_Pricebook_Change__e"),
SF_ACCOUNT_EVENT_CHANNEL: z.string().default("/event/Account_Internet_Eligibility_Update__e"),
SF_ORDER_EVENT_CHANNEL: z.string().optional(),

View File

@ -139,11 +139,24 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy {
data: unknown
): Promise<void> {
if (!this.isDataCallback(callbackType)) return;
this.logger.log("Product2 CDC event received, invalidating catalogs", {
const payload = this.extractPayload(data);
const productIds = this.extractRecordIds(payload);
this.logger.log("Product2 CDC event received", {
channel,
topicName: subscription.topicName,
productIds,
});
await this.invalidateAllCatalogs();
const invalidated = await this.catalogCache.invalidateProducts(productIds);
if (!invalidated) {
this.logger.debug("No catalog cache entries were linked to product IDs; falling back to full invalidation", {
channel,
productIds,
});
await this.invalidateAllCatalogs();
}
}
private async handlePricebookEvent(
@ -165,11 +178,26 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy {
return;
}
this.logger.log("PricebookEntry CDC event received, invalidating catalogs", {
const productId = this.extractStringField(payload, ["Product2Id", "ProductId"]);
this.logger.log("PricebookEntry CDC event received", {
channel,
pricebookId,
productId,
});
await this.invalidateAllCatalogs();
const invalidated = await this.catalogCache.invalidateProducts(
productId ? [productId] : []
);
if (!invalidated) {
this.logger.debug("No catalog cache entries mapped to product from pricebook event; performing full invalidation", {
channel,
pricebookId,
productId,
});
await this.invalidateAllCatalogs();
}
}
private async handleAccountEvent(
@ -248,5 +276,29 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy {
}
return undefined;
}
private extractRecordIds(payload: Record<string, unknown> | undefined): string[] {
if (!payload) {
return [];
}
const header = this.extractChangeEventHeader(payload);
const ids = header?.recordIds ?? [];
if (Array.isArray(ids)) {
return ids.filter((id): id is string => typeof id === "string" && id.trim().length > 0);
}
return [];
}
private extractChangeEventHeader(
payload: Record<string, unknown>
): { recordIds?: unknown; changedFields?: unknown } | undefined {
const header = payload["ChangeEventHeader"];
if (header && typeof header === "object") {
return header as { recordIds?: unknown; changedFields?: unknown };
}
return undefined;
}
}

View File

@ -1,11 +0,0 @@
export function replayKey(channel: string): string {
return `sf:pe:replay:${channel}`;
}
export function statusKey(channel: string): string {
return `sf:pe:status:${channel}`;
}
export function latestSeenKey(channel: string): string {
return `sf:pe:latestSeen:${channel}`;
}

View File

@ -3,14 +3,12 @@ import { ConfigModule } from "@nestjs/config";
import { IntegrationsModule } from "@bff/integrations/integrations.module";
import { OrdersModule } from "@bff/modules/orders/orders.module";
import { CatalogModule } from "@bff/modules/catalog/catalog.module";
import { SalesforcePubSubSubscriber } from "./pubsub.subscriber";
import { CatalogCdcSubscriber } from "./catalog-cdc.subscriber";
import { OrderCdcSubscriber } from "./order-cdc.subscriber";
@Module({
imports: [ConfigModule, IntegrationsModule, OrdersModule, CatalogModule],
providers: [
SalesforcePubSubSubscriber, // Platform Event for order provisioning
CatalogCdcSubscriber, // CDC for catalog cache invalidation
OrderCdcSubscriber, // CDC for order cache invalidation
],

View File

@ -28,7 +28,9 @@ type PubSubCtor = new (opts: {
/**
* CDC Subscriber for Order changes
*
* Strategy: Only invalidate cache for customer-facing field changes, NOT internal system fields
* Strategy:
* 1. Trigger provisioning when Salesforce sets Activation_Status__c to "Activating"
* 2. Only invalidate cache for customer-facing field changes, NOT internal system fields
*
* CUSTOMER-FACING FIELDS (invalidate cache):
* - Status (Draft, Pending Review, Completed, Cancelled)
@ -202,9 +204,9 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy {
return;
}
// 1. CHECK FOR PROVISIONING TRIGGER (Status change)
if (payload && changedFields.has("Status")) {
await this.handleStatusChange(payload, orderId, changedFields);
// 1. CHECK FOR PROVISIONING TRIGGER (Activation status change)
if (payload && changedFields.has("Activation_Status__c")) {
await this.handleActivationStatusChange(payload, orderId);
}
// 2. CACHE INVALIDATION (existing logic)
@ -245,45 +247,33 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy {
}
/**
* Handle Status field changes and trigger provisioning if needed
* Handle Activation_Status__c changes and trigger provisioning when Salesforce moves an order to "Activating"
*/
private async handleStatusChange(
private async handleActivationStatusChange(
payload: Record<string, unknown>,
orderId: string,
changedFields: Set<string>
orderId: string
): Promise<void> {
const newStatus = this.extractStringField(payload, ["Status"]);
const activationStatus = this.extractStringField(payload, ["Activation_Status__c"]);
const status = this.extractStringField(payload, ["Status"]);
const whmcsOrderId = this.extractStringField(payload, ["WHMCS_Order_ID__c"]);
// Guard: Only provision for specific statuses
if (!newStatus || !this.PROVISION_TRIGGER_STATUSES.has(newStatus)) {
this.logger.debug("Status changed but not a provision trigger", {
orderId,
newStatus,
});
return;
}
// Guard: Don't trigger if already provisioning
if (activationStatus === "Activating") {
this.logger.debug("Order already provisioning, skipping", {
if (activationStatus !== "Activating") {
this.logger.debug("Activation status changed but not to Activating; skipping provisioning", {
orderId,
activationStatus,
});
return;
}
// Guard: Don't trigger if already activated
if (activationStatus === "Activated") {
this.logger.debug("Order already activated, skipping", {
if (status && !this.PROVISION_TRIGGER_STATUSES.has(status)) {
this.logger.debug("Activation status set to Activating but order status is not a provisioning trigger", {
orderId,
activationStatus,
status,
});
return;
}
// Guard: Check if WHMCS Order ID already exists (idempotency)
if (whmcsOrderId) {
this.logger.log("Order already has WHMCS Order ID, skipping provisioning", {
orderId,
@ -292,28 +282,29 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy {
return;
}
// Trigger provisioning
this.logger.log("Order status changed to provision trigger via CDC, enqueuing fulfillment", {
this.logger.log("Order activation moved to Activating via CDC, enqueuing fulfillment", {
orderId,
status: newStatus,
activationStatus,
status,
});
try {
await this.provisioningQueue.enqueue({
sfOrderId: orderId,
idempotencyKey: `cdc-status-${Date.now()}-${orderId}`,
idempotencyKey: `cdc-activation-${Date.now()}-${orderId}`,
correlationId: `cdc-order-${orderId}`,
});
this.logger.log("Successfully enqueued provisioning job from CDC Status change", {
this.logger.log("Successfully enqueued provisioning job from activation change", {
orderId,
trigger: `Status → ${newStatus}`,
activationStatus,
status,
});
} catch (error) {
this.logger.error("Failed to enqueue provisioning job from CDC", {
this.logger.error("Failed to enqueue provisioning job from activation change", {
orderId,
newStatus,
activationStatus,
status,
error: error instanceof Error ? error.message : String(error),
});
}
@ -408,6 +399,12 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy {
private extractChangedFields(payload: Record<string, unknown> | undefined): Set<string> {
if (!payload) return new Set();
const header = this.extractChangeEventHeader(payload);
const headerChangedFields = Array.isArray(header?.changedFields)
? (header?.changedFields as unknown[])
.filter((field): field is string => typeof field === "string" && field.length > 0)
: [];
// CDC provides changed fields in different formats depending on API version
// Try to extract from common locations
const changedFieldsArray =
@ -415,7 +412,10 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy {
((payload.changeOrigin as { changedFields?: string[] })?.changedFields) ||
[];
return new Set(changedFieldsArray);
return new Set([
...headerChangedFields,
...changedFieldsArray.filter(field => typeof field === "string" && field.length > 0),
]);
}
private isDataCallback(callbackType: string): boolean {
@ -452,5 +452,15 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy {
}
return undefined;
}
private extractChangeEventHeader(
payload: Record<string, unknown>
): { changedFields?: unknown } | undefined {
const header = payload["ChangeEventHeader"];
if (header && typeof header === "object") {
return header as { changedFields?: unknown };
}
return undefined;
}
}

View File

@ -1,366 +0,0 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Inject } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Logger } from "nestjs-pino";
import PubSubApiClientPkg from "salesforce-pubsub-api-client";
import { SalesforceConnection } from "../services/salesforce-connection.service";
import { ProvisioningQueueService } from "@bff/modules/orders/queue/provisioning.queue";
import { CacheService } from "@bff/infra/cache/cache.service";
import {
replayKey as sfReplayKey,
statusKey as sfStatusKey,
latestSeenKey as sfLatestSeenKey,
} from "./event-keys.util";
import type {
SalesforceOrderProvisionEvent,
SalesforcePubSubError,
SalesforcePubSubSubscription,
SalesforcePubSubCallbackType,
SalesforcePubSubUnknownData,
} from "@customer-portal/domain/orders";
type SubscribeCallback = (
subscription: SalesforcePubSubSubscription,
callbackType: SalesforcePubSubCallbackType,
data: SalesforceOrderProvisionEvent | SalesforcePubSubError | SalesforcePubSubUnknownData
) => void | Promise<void>;
interface PubSubClient {
connect(): Promise<void>;
subscribe(topic: string, cb: SubscribeCallback, numRequested?: number): Promise<void>;
subscribeFromReplayId(
topic: string,
cb: SubscribeCallback,
numRequested: number | null,
replayId: number
): Promise<void>;
subscribeFromEarliestEvent(
topic: string,
cb: SubscribeCallback,
numRequested?: number
): Promise<void>;
requestAdditionalEvents(topic: string, numRequested: number): Promise<void>;
close(): Promise<void>;
}
type PubSubCtor = new (opts: {
authType: string;
accessToken: string;
instanceUrl: string;
pubSubEndpoint: string;
}) => PubSubClient;
@Injectable()
export class SalesforcePubSubSubscriber implements OnModuleInit, OnModuleDestroy {
private client: PubSubClient | null = null;
private clientAccessToken: string | null = null;
private channel!: string;
private replayCorruptionRecovered = false;
private subscribeCallback!: SubscribeCallback;
private pubSubCtor: PubSubCtor | null = null;
private clientBuildInFlight: Promise<PubSubClient> | null = null;
constructor(
private readonly config: ConfigService,
private readonly sfConn: SalesforceConnection,
private readonly provisioningQueue: ProvisioningQueueService,
private readonly cache: CacheService,
@Inject(Logger) private readonly logger: Logger
) {}
async onModuleInit(): Promise<void> {
const enabled = this.config.get("SF_EVENTS_ENABLED", "false") === "true";
if (!enabled) {
this.logger.log("Salesforce Pub/Sub subscriber disabled", { enabled });
return;
}
this.channel = this.config.get<string>(
"SF_PROVISION_EVENT_CHANNEL",
"/event/Order_Fulfilment_Requested__e"
);
try {
this.subscribeCallback = this.buildSubscribeCallback();
await this.subscribeWithPolicy(true);
} catch (error) {
this.logger.error("Salesforce Pub/Sub subscription failed", {
error: error instanceof Error ? error.message : String(error),
});
try {
await this.cache.set(sfStatusKey(this.channel || "/event/OrderProvisionRequested__e"), {
status: "disconnected",
since: Date.now(),
});
} catch (cacheErr) {
this.logger.warn("Failed to set SF Pub/Sub disconnected status", {
error: cacheErr instanceof Error ? cacheErr.message : String(cacheErr),
});
}
}
}
async onModuleDestroy(): Promise<void> {
try {
await this.safeCloseClient();
await this.cache.set(sfStatusKey(this.channel), {
status: "disconnected",
since: Date.now(),
});
} catch (error) {
this.logger.warn("Error closing Salesforce Pub/Sub client", {
error: error instanceof Error ? error.message : String(error),
});
}
}
private buildSubscribeCallback(): SubscribeCallback {
return async (subscription, callbackType, data) => {
try {
const argTypes = [typeof subscription, typeof callbackType, typeof data];
const type = callbackType;
const typeNorm = String(type || "").toLowerCase();
const topic = subscription.topicName || this.channel;
if (typeNorm === "data" || typeNorm === "event") {
const event = data as SalesforceOrderProvisionEvent;
this.logger.debug("SF Pub/Sub data callback received", {
topic,
argTypes,
hasPayload: Boolean(event?.payload),
});
const payload = event?.payload;
const orderIdVal = payload?.["OrderId__c"] ?? payload?.["OrderId"];
const orderId = typeof orderIdVal === "string" ? orderIdVal : undefined;
if (!orderId) {
this.logger.warn("Pub/Sub event missing OrderId__c; skipping", {
argTypes,
topic,
payloadKeys: payload ? Object.keys(payload) : [],
});
const depth = await this.provisioningQueue.depth();
if (depth < this.getMaxQueueSize()) {
const activeClient = this.client;
if (activeClient) {
await activeClient.requestAdditionalEvents(topic, 1);
}
}
return;
}
const replayVal = (event as { replayId?: unknown })?.replayId;
const idempotencyKey =
typeof replayVal === "number" || typeof replayVal === "string"
? String(replayVal)
: String(Date.now());
const pubsubReplayId = typeof replayVal === "number" ? replayVal : undefined;
await this.provisioningQueue.enqueue({
sfOrderId: orderId,
idempotencyKey,
pubsubReplayId,
});
this.logger.log("Enqueued provisioning job from SF event", {
sfOrderId: orderId,
replayId: pubsubReplayId,
topic,
});
} else if (typeNorm === "lastevent") {
const depth = await this.provisioningQueue.depth();
const available = Math.max(0, this.getMaxQueueSize() - depth);
const desired = Math.max(0, Math.min(this.getNumRequested(), available));
if (desired > 0) {
const activeClient = this.client;
if (activeClient) {
await activeClient.requestAdditionalEvents(topic, desired);
}
}
} else if (typeNorm === "grpckeepalive") {
const latestVal = (data as { latestReplayId?: unknown })?.latestReplayId;
const latest = typeof latestVal === "number" ? latestVal : undefined;
if (typeof latest === "number") {
await this.cache.set(sfLatestSeenKey(this.channel), {
id: String(latest),
at: Date.now(),
});
}
} else if (typeNorm === "grpcstatus" || typeNorm === "end") {
// Informational no action required
} else if (typeNorm === "error") {
this.logger.warn("SF Pub/Sub stream error", { topic, data });
try {
const errorData = data as SalesforcePubSubError;
const details = errorData.details || "";
const metadata = errorData.metadata || {};
const errorCodes = Array.isArray(metadata["error-code"]) ? metadata["error-code"] : [];
const hasCorruptionCode = errorCodes.some(code =>
String(code).includes("replayid.corrupted")
);
const mentionsReplayValidation = /Replay ID validation failed/i.test(details);
if (
(hasCorruptionCode || mentionsReplayValidation) &&
!this.replayCorruptionRecovered
) {
this.replayCorruptionRecovered = true;
const key = sfReplayKey(this.channel);
await this.cache.del(key);
this.logger.warn(
"Cleared invalid Salesforce Pub/Sub replay cursor; retrying subscription",
{
channel: this.channel,
key,
}
);
}
} catch (recoveryErr) {
this.logger.warn("SF Pub/Sub replay corruption auto-recovery failed", {
error: recoveryErr instanceof Error ? recoveryErr.message : String(recoveryErr),
});
} finally {
await this.recoverFromStreamError();
}
} else {
const maybeEvent = data as SalesforceOrderProvisionEvent | undefined;
const hasPayload = Boolean(maybeEvent?.payload);
this.logger.debug("SF Pub/Sub callback ignored (unknown type)", {
type,
topic,
argTypes,
hasPayload,
});
}
} catch (err) {
this.logger.error("Pub/Sub subscribe callback failed", {
error: err instanceof Error ? err.message : String(err),
});
}
};
}
private getNumRequested(): number {
return Number(this.config.get("SF_PUBSUB_NUM_REQUESTED", "50")) || 50;
}
private getMaxQueueSize(): number {
return Number(this.config.get("SF_PUBSUB_QUEUE_MAX", "100")) || 100;
}
private getPubSubCtor(): PubSubCtor {
if (this.pubSubCtor) {
return this.pubSubCtor;
}
const maybeCtor =
(PubSubApiClientPkg as { default?: unknown })?.default ?? (PubSubApiClientPkg as unknown);
this.pubSubCtor = maybeCtor as PubSubCtor;
return this.pubSubCtor;
}
private async ensureClient(forceRefresh = false): Promise<PubSubClient> {
if (this.clientBuildInFlight && !forceRefresh) {
return this.clientBuildInFlight;
}
this.clientBuildInFlight = (async () => {
await this.sfConn.ensureConnected();
const accessToken = this.sfConn.getAccessToken();
const instanceUrl = this.sfConn.getInstanceUrl();
if (!accessToken || !instanceUrl) {
throw new Error("Salesforce access token || instance URL unavailable");
}
const tokenChanged = this.clientAccessToken !== accessToken;
if (!this.client || forceRefresh || tokenChanged) {
await this.safeCloseClient();
const endpoint = this.config.get<string>(
"SF_PUBSUB_ENDPOINT",
"api.pubsub.salesforce.com:7443"
);
const Ctor = this.getPubSubCtor();
const client = new Ctor({
authType: "user-supplied",
accessToken,
instanceUrl,
pubSubEndpoint: endpoint,
});
await client.connect();
this.client = client;
this.clientAccessToken = accessToken;
this.replayCorruptionRecovered = false;
}
return this.client;
})();
try {
return await this.clientBuildInFlight;
} finally {
this.clientBuildInFlight = null;
}
}
private async subscribeWithPolicy(forceClientRefresh = false): Promise<void> {
if (!this.subscribeCallback) {
throw new Error("Subscribe callback not initialized");
}
await this.cache.set(sfStatusKey(this.channel), {
status: "connecting",
since: Date.now(),
});
const client = await this.ensureClient(forceClientRefresh);
const replayMode = this.config.get<string>("SF_EVENTS_REPLAY", "LATEST");
const replayKey = sfReplayKey(this.channel);
const storedReplay = replayMode !== "ALL" ? await this.cache.get<string>(replayKey) : null;
const numRequested = this.getNumRequested();
if (storedReplay && replayMode !== "ALL") {
await client.subscribeFromReplayId(
this.channel,
this.subscribeCallback,
numRequested,
Number(storedReplay)
);
} else if (replayMode === "ALL") {
await client.subscribeFromEarliestEvent(this.channel, this.subscribeCallback, numRequested);
} else {
await client.subscribe(this.channel, this.subscribeCallback, numRequested);
}
await this.cache.set(sfStatusKey(this.channel), {
status: "connected",
since: Date.now(),
});
this.logger.log("Salesforce Pub/Sub subscription active", { channel: this.channel });
}
private async recoverFromStreamError(): Promise<void> {
await this.cache.set(sfStatusKey(this.channel), {
status: "reconnecting",
since: Date.now(),
});
await this.safeCloseClient();
await this.subscribeWithPolicy(true);
}
private async safeCloseClient(): Promise<void> {
if (!this.client) {
return;
}
try {
await this.client.close();
} catch (error) {
this.logger.warn("Failed to close Salesforce Pub/Sub client", {
error: error instanceof Error ? error.message : String(error),
});
} finally {
this.client = null;
this.clientAccessToken = null;
}
}
}

View File

@ -20,6 +20,7 @@ import { SalesforceWriteThrottleGuard } from "./guards/salesforce-write-throttle
SalesforceWriteThrottleGuard,
],
exports: [
QueueModule,
SalesforceService,
SalesforceConnection,
SalesforceOrderService,

View File

@ -14,6 +14,16 @@ export interface CatalogCacheSnapshot {
invalidations: number;
}
interface CacheDependencies {
productIds?: string[];
}
interface WrappedCatalogValue<T> {
value: T | null;
__catalogCache: true;
dependencies?: CacheDependencies;
}
/**
* Catalog-specific caching service
*
@ -25,10 +35,10 @@ export class CatalogCacheService {
// Hybrid approach: CDC for real-time invalidation + TTL for backup cleanup
// Primary: CDC events invalidate cache when data changes (real-time)
// Backup: TTL expires unused cache entries (memory management)
private readonly CATALOG_TTL = 86400; // 24 hours - general catalog data
private readonly STATIC_TTL = 604800; // 7 days - rarely changing data
private readonly ELIGIBILITY_TTL = 3600; // 1 hour - user-specific eligibility
private readonly VOLATILE_TTL = 60; // 1 minute - real-time data (availability, inventory)
private readonly CATALOG_TTL: number | null = null; // CDC-driven invalidation
private readonly STATIC_TTL: number | null = null; // CDC-driven invalidation
private readonly ELIGIBILITY_TTL: number | null = null; // CDC-driven invalidation
private readonly VOLATILE_TTL = 60; // Volatile data still uses TTL
private readonly metrics: CatalogCacheSnapshot = {
catalog: { hits: 0, misses: 0 },
@ -48,8 +58,12 @@ export class CatalogCacheService {
/**
* Get or fetch catalog data (long-lived cache, event-driven invalidation)
*/
async getCachedCatalog<T>(key: string, fetchFn: () => Promise<T>): Promise<T> {
return this.getOrSet("catalog", key, this.CATALOG_TTL, fetchFn);
async getCachedCatalog<T>(
key: string,
fetchFn: () => Promise<T>,
options?: CatalogCacheOptions<T>
): Promise<T> {
return this.getOrSet("catalog", key, this.CATALOG_TTL, fetchFn, options);
}
/**
@ -70,7 +84,9 @@ export class CatalogCacheService {
* Get or fetch eligibility data (long-lived cache)
*/
async getCachedEligibility<T>(key: string, fetchFn: () => Promise<T>): Promise<T> {
return this.getOrSet("eligibility", key, this.ELIGIBILITY_TTL, fetchFn, true);
return this.getOrSet("eligibility", key, this.ELIGIBILITY_TTL, fetchFn, {
allowNull: true,
});
}
/**
@ -90,6 +106,7 @@ export class CatalogCacheService {
async invalidateCatalog(catalogType: string): Promise<void> {
this.metrics.invalidations++;
await this.cache.delPattern(`catalog:${catalogType}:*`);
await this.flushProductDependencyIndex();
}
/**
@ -107,12 +124,13 @@ export class CatalogCacheService {
async invalidateAllCatalogs(): Promise<void> {
this.metrics.invalidations++;
await this.cache.delPattern("catalog:*");
await this.flushProductDependencyIndex();
}
getTtlConfiguration(): {
catalogSeconds: number;
eligibilitySeconds: number;
staticSeconds: number;
catalogSeconds: number | null;
eligibilitySeconds: number | null;
staticSeconds: number | null;
volatileSeconds: number;
} {
return {
@ -142,7 +160,11 @@ export class CatalogCacheService {
typeof eligibility === "string"
? { Id: accountId, Internet_Eligibility__c: eligibility }
: null;
await this.cache.set(key, this.wrapCachedValue(payload));
if (this.ELIGIBILITY_TTL === null) {
await this.cache.set(key, this.wrapCachedValue(payload));
} else {
await this.cache.set(key, this.wrapCachedValue(payload), this.ELIGIBILITY_TTL);
}
}
private async getOrSet<T>(
@ -150,8 +172,9 @@ export class CatalogCacheService {
key: string,
ttlSeconds: number | null,
fetchFn: () => Promise<T>,
allowNull = false
options?: CatalogCacheOptions<T>
): Promise<T> {
const allowNull = options?.allowNull ?? false;
// 1. Check Redis cache first (fastest path)
const cached = await this.cache.get<unknown>(key);
const unwrapped = this.unwrapCachedValue<T>(cached);
@ -179,12 +202,27 @@ export class CatalogCacheService {
try {
const fresh = await fetchFn();
const valueToStore = allowNull ? (fresh ?? null) : fresh;
const dependencies = options?.resolveDependencies
? await options.resolveDependencies(fresh)
: undefined;
if (unwrapped.dependencies) {
await this.unlinkDependenciesForKey(key, unwrapped.dependencies);
}
// Store in Redis for future requests
if (ttlSeconds === null) {
await this.cache.set(key, this.wrapCachedValue(valueToStore));
await this.cache.set(key, this.wrapCachedValue(valueToStore, dependencies));
} else {
await this.cache.set(key, this.wrapCachedValue(valueToStore), ttlSeconds);
await this.cache.set(
key,
this.wrapCachedValue(valueToStore, dependencies),
ttlSeconds
);
}
if (dependencies) {
await this.linkDependencies(key, dependencies);
}
return fresh;
@ -200,7 +238,46 @@ export class CatalogCacheService {
return fetchPromise;
}
private unwrapCachedValue<T>(cached: unknown): { hit: boolean; value: T | null } {
async invalidateProducts(productIds: string[]): Promise<boolean> {
const uniqueIds = Array.from(new Set((productIds ?? []).filter(Boolean)));
if (uniqueIds.length === 0) {
return false;
}
const keysToInvalidate = new Set<string>();
for (const productId of uniqueIds) {
const indexKey = this.buildProductDependencyKey(productId);
const index = await this.cache.get<{ keys?: string[] }>(indexKey);
const keys = index?.keys ?? [];
keys.forEach(k => keysToInvalidate.add(k));
if (keys.length === 0) {
continue;
}
}
if (keysToInvalidate.size === 0) {
return false;
}
for (const key of keysToInvalidate) {
const cached = await this.cache.get<unknown>(key);
const unwrapped = this.unwrapCachedValue<unknown>(cached);
if (unwrapped.dependencies) {
await this.unlinkDependenciesForKey(key, unwrapped.dependencies);
}
await this.cache.del(key);
this.metrics.invalidations++;
}
return true;
}
private unwrapCachedValue<T>(cached: unknown): {
hit: boolean;
value: T | null;
dependencies?: CacheDependencies;
} {
if (cached === null || cached === undefined) {
return { hit: false, value: null };
}
@ -210,14 +287,90 @@ export class CatalogCacheService {
cached !== null &&
Object.prototype.hasOwnProperty.call(cached, "__catalogCache")
) {
const wrapper = cached as { value: T | null };
return { hit: true, value: wrapper.value ?? null };
const wrapper = cached as WrappedCatalogValue<T>;
return {
hit: true,
value: wrapper.value ?? null,
dependencies: wrapper.dependencies,
};
}
return { hit: true, value: (cached as T) ?? null };
}
private wrapCachedValue<T>(value: T | null): { value: T | null; __catalogCache: true } {
return { value: value ?? null, __catalogCache: true };
private wrapCachedValue<T>(
value: T | null,
dependencies?: CacheDependencies
): WrappedCatalogValue<T> {
return {
value: value ?? null,
__catalogCache: true,
dependencies: dependencies && this.normalizeDependencies(dependencies),
};
}
private normalizeDependencies(dependencies: CacheDependencies): CacheDependencies | undefined {
const productIds = dependencies.productIds?.filter(Boolean);
if (!productIds || productIds.length === 0) {
return undefined;
}
return { productIds: Array.from(new Set(productIds)) };
}
private async linkDependencies(key: string, dependencies: CacheDependencies): Promise<void> {
const normalized = this.normalizeDependencies(dependencies);
if (!normalized) {
return;
}
if (normalized.productIds) {
for (const productId of normalized.productIds) {
const indexKey = this.buildProductDependencyKey(productId);
const existing = (await this.cache.get<{ keys?: string[] }>(indexKey))?.keys ?? [];
if (!existing.includes(key)) {
existing.push(key);
}
await this.cache.set(indexKey, { keys: existing });
}
}
}
private async unlinkDependenciesForKey(
key: string,
dependencies: CacheDependencies
): Promise<void> {
const normalized = this.normalizeDependencies(dependencies);
if (!normalized) {
return;
}
if (normalized.productIds) {
for (const productId of normalized.productIds) {
const indexKey = this.buildProductDependencyKey(productId);
const existing = await this.cache.get<{ keys?: string[] }>(indexKey);
if (!existing?.keys?.length) {
continue;
}
const nextKeys = existing.keys.filter(k => k !== key);
if (nextKeys.length === 0) {
await this.cache.del(indexKey);
} else {
await this.cache.set(indexKey, { keys: nextKeys });
}
}
}
}
private buildProductDependencyKey(productId: string): string {
return `catalog:deps:product:${productId}`;
}
private async flushProductDependencyIndex(): Promise<void> {
await this.cache.delPattern("catalog:deps:product:*");
}
}
export interface CatalogCacheOptions<T> {
allowNull?: boolean;
resolveDependencies?: (value: T) => CacheDependencies | Promise<CacheDependencies | undefined> | undefined;
}

View File

@ -41,87 +41,113 @@ export class InternetCatalogService extends BaseCatalogService {
async getPlans(): Promise<InternetPlanCatalogItem[]> {
const cacheKey = this.catalogCache.buildCatalogKey("internet", "plans");
return this.catalogCache.getCachedCatalog(cacheKey, async () => {
const soql = this.buildCatalogServiceQuery("Internet", [
"Internet_Plan_Tier__c",
"Internet_Offering_Type__c",
"Catalog_Order__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"Internet Plans"
);
return this.catalogCache.getCachedCatalog(
cacheKey,
async () => {
const soql = this.buildCatalogServiceQuery("Internet", [
"Internet_Plan_Tier__c",
"Internet_Offering_Type__c",
"Catalog_Order__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"Internet Plans"
);
return records.map(record => {
const entry = this.extractPricebookEntry(record);
const plan = CatalogProviders.Salesforce.mapInternetPlan(record, entry);
return enrichInternetPlanMetadata(plan);
});
});
return records.map(record => {
const entry = this.extractPricebookEntry(record);
const plan = CatalogProviders.Salesforce.mapInternetPlan(record, entry);
return enrichInternetPlanMetadata(plan);
});
},
{
resolveDependencies: plans => ({
productIds: plans.map(plan => plan.id).filter((id): id is string => Boolean(id)),
}),
}
);
}
async getInstallations(): Promise<InternetInstallationCatalogItem[]> {
const cacheKey = this.catalogCache.buildCatalogKey("internet", "installations");
return this.catalogCache.getCachedCatalog(cacheKey, async () => {
const soql = this.buildProductQuery("Internet", "Installation", [
"Billing_Cycle__c",
"Catalog_Order__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"Internet Installations"
);
return this.catalogCache.getCachedCatalog(
cacheKey,
async () => {
const soql = this.buildProductQuery("Internet", "Installation", [
"Billing_Cycle__c",
"Catalog_Order__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"Internet Installations"
);
this.logger.log(`Found ${records.length} installation records`);
this.logger.log(`Found ${records.length} installation records`);
return records
.map(record => {
const entry = this.extractPricebookEntry(record);
const installation = CatalogProviders.Salesforce.mapInternetInstallation(record, entry);
return {
...installation,
catalogMetadata: {
...installation.catalogMetadata,
installationTerm: inferInstallationTermFromSku(installation.sku ?? ""),
},
};
})
.sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0));
});
return records
.map(record => {
const entry = this.extractPricebookEntry(record);
const installation = CatalogProviders.Salesforce.mapInternetInstallation(record, entry);
return {
...installation,
catalogMetadata: {
...installation.catalogMetadata,
installationTerm: inferInstallationTermFromSku(installation.sku ?? ""),
},
};
})
.sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0));
},
{
resolveDependencies: installations => ({
productIds: installations
.map(item => item.id)
.filter((id): id is string => Boolean(id)),
}),
}
);
}
async getAddons(): Promise<InternetAddonCatalogItem[]> {
const cacheKey = this.catalogCache.buildCatalogKey("internet", "addons");
return this.catalogCache.getCachedCatalog(cacheKey, async () => {
const soql = this.buildProductQuery("Internet", "Add-on", [
"Billing_Cycle__c",
"Catalog_Order__c",
"Bundled_Addon__c",
"Is_Bundled_Addon__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"Internet Add-ons"
);
return this.catalogCache.getCachedCatalog(
cacheKey,
async () => {
const soql = this.buildProductQuery("Internet", "Add-on", [
"Billing_Cycle__c",
"Catalog_Order__c",
"Bundled_Addon__c",
"Is_Bundled_Addon__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"Internet Add-ons"
);
this.logger.log(`Found ${records.length} addon records`);
this.logger.log(`Found ${records.length} addon records`);
return records
.map(record => {
const entry = this.extractPricebookEntry(record);
const addon = CatalogProviders.Salesforce.mapInternetAddon(record, entry);
return {
...addon,
catalogMetadata: {
...addon.catalogMetadata,
addonType: inferAddonTypeFromSku(addon.sku ?? ""),
},
};
})
.sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0));
});
return records
.map(record => {
const entry = this.extractPricebookEntry(record);
const addon = CatalogProviders.Salesforce.mapInternetAddon(record, entry);
return {
...addon,
catalogMetadata: {
...addon.catalogMetadata,
addonType: inferAddonTypeFromSku(addon.sku ?? ""),
},
};
})
.sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0));
},
{
resolveDependencies: addons => ({
productIds: addons.map(addon => addon.id).filter((id): id is string => Boolean(id)),
}),
}
);
}
async getCatalogData() {

View File

@ -29,64 +29,21 @@ export class SimCatalogService extends BaseCatalogService {
async getPlans(): Promise<SimCatalogProduct[]> {
const cacheKey = this.catalogCache.buildCatalogKey("sim", "plans");
return this.catalogCache.getCachedCatalog(cacheKey, async () => {
const soql = this.buildCatalogServiceQuery("SIM", [
"SIM_Data_Size__c",
"SIM_Plan_Type__c",
"SIM_Has_Family_Discount__c",
"Catalog_Order__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"SIM Plans"
);
return this.catalogCache.getCachedCatalog(
cacheKey,
async () => {
const soql = this.buildCatalogServiceQuery("SIM", [
"SIM_Data_Size__c",
"SIM_Plan_Type__c",
"SIM_Has_Family_Discount__c",
"Catalog_Order__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"SIM Plans"
);
return records.map(record => {
const entry = this.extractPricebookEntry(record);
const product = CatalogProviders.Salesforce.mapSimProduct(record, entry);
return {
...product,
description: product.description ?? product.name,
} satisfies SimCatalogProduct;
});
});
}
async getActivationFees(): Promise<SimActivationFeeCatalogItem[]> {
const cacheKey = this.catalogCache.buildCatalogKey("sim", "activation-fees");
return this.catalogCache.getCachedCatalog(cacheKey, async () => {
const soql = this.buildProductQuery("SIM", "Activation", []);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"SIM Activation Fees"
);
return records.map(record => {
const entry = this.extractPricebookEntry(record);
return CatalogProviders.Salesforce.mapSimActivationFee(record, entry);
});
});
}
async getAddons(): Promise<SimCatalogProduct[]> {
const cacheKey = this.catalogCache.buildCatalogKey("sim", "addons");
return this.catalogCache.getCachedCatalog(cacheKey, async () => {
const soql = this.buildProductQuery("SIM", "Add-on", [
"Billing_Cycle__c",
"Catalog_Order__c",
"Bundled_Addon__c",
"Is_Bundled_Addon__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"SIM Add-ons"
);
return records
.map(record => {
return records.map(record => {
const entry = this.extractPricebookEntry(record);
const product = CatalogProviders.Salesforce.mapSimProduct(record, entry);
@ -94,9 +51,80 @@ export class SimCatalogService extends BaseCatalogService {
...product,
description: product.description ?? product.name,
} satisfies SimCatalogProduct;
})
.sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0));
});
});
},
{
resolveDependencies: plans => ({
productIds: plans.map(plan => plan.id).filter((id): id is string => Boolean(id)),
}),
}
);
}
async getActivationFees(): Promise<SimActivationFeeCatalogItem[]> {
const cacheKey = this.catalogCache.buildCatalogKey("sim", "activation-fees");
return this.catalogCache.getCachedCatalog(
cacheKey,
async () => {
const soql = this.buildProductQuery("SIM", "Activation", []);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"SIM Activation Fees"
);
return records.map(record => {
const entry = this.extractPricebookEntry(record);
return CatalogProviders.Salesforce.mapSimActivationFee(record, entry);
});
},
{
resolveDependencies: products => ({
productIds: products
.map(product => product.id)
.filter((id): id is string => Boolean(id)),
}),
}
);
}
async getAddons(): Promise<SimCatalogProduct[]> {
const cacheKey = this.catalogCache.buildCatalogKey("sim", "addons");
return this.catalogCache.getCachedCatalog(
cacheKey,
async () => {
const soql = this.buildProductQuery("SIM", "Add-on", [
"Billing_Cycle__c",
"Catalog_Order__c",
"Bundled_Addon__c",
"Is_Bundled_Addon__c",
]);
const records = await this.executeQuery<SalesforceProduct2WithPricebookEntries>(
soql,
"SIM Add-ons"
);
return records
.map(record => {
const entry = this.extractPricebookEntry(record);
const product = CatalogProviders.Salesforce.mapSimProduct(record, entry);
return {
...product,
description: product.description ?? product.name,
} satisfies SimCatalogProduct;
})
.sort((a, b) => (a.displayOrder ?? 0) - (b.displayOrder ?? 0));
},
{
resolveDependencies: products => ({
productIds: products
.map(product => product.id)
.filter((id): id is string => Boolean(id)),
}),
}
);
}
async getPlansForUser(userId: string): Promise<SimCatalogProduct[]> {

View File

@ -4,10 +4,7 @@ import { Logger } from "nestjs-pino";
import { OrderFulfillmentOrchestrator } from "../services/order-fulfillment-orchestrator.service";
import { SalesforceService } from "@bff/integrations/salesforce/salesforce.service";
import type { ProvisioningJobData } from "./provisioning.queue";
import { CacheService } from "@bff/infra/cache/cache.service";
import { ConfigService } from "@nestjs/config";
import { QUEUE_NAMES } from "@bff/infra/queue/queue.constants";
import { replayKey as sfReplayKey } from "@bff/integrations/salesforce/events/event-keys.util";
@Processor(QUEUE_NAMES.PROVISIONING)
@Injectable()
@ -15,8 +12,6 @@ export class ProvisioningProcessor extends WorkerHost {
constructor(
private readonly orchestrator: OrderFulfillmentOrchestrator,
private readonly salesforceService: SalesforceService,
private readonly cache: CacheService,
private readonly config: ConfigService,
@Inject(Logger) private readonly logger: Logger
) {
super();
@ -28,7 +23,6 @@ export class ProvisioningProcessor extends WorkerHost {
sfOrderId,
idempotencyKey,
correlationId: job.data.correlationId,
pubsubReplayId: job.data.pubsubReplayId,
});
// Guard: Only process if Salesforce Order is currently 'Activating'
@ -41,7 +35,6 @@ export class ProvisioningProcessor extends WorkerHost {
sfOrderId,
currentStatus: status,
});
await this.commitReplay(job);
return; // Ack + no-op to safely handle duplicate/old events
}
@ -52,36 +45,11 @@ export class ProvisioningProcessor extends WorkerHost {
currentStatus: status,
lastErrorCode,
});
await this.commitReplay(job);
return;
}
try {
// Execute the same orchestration used by the webhook path, but without payload validation
await this.orchestrator.executeFulfillment(sfOrderId, {}, idempotencyKey);
this.logger.log("Provisioning job completed", { sfOrderId });
} finally {
// Commit processed replay id for Pub/Sub resume (commit regardless of success to avoid replay storms)
await this.commitReplay(job);
}
}
private async commitReplay(job: { data: ProvisioningJobData }): Promise<void> {
if (typeof job.data.pubsubReplayId !== "number") return;
try {
const channel = this.config.get<string>(
"SF_PROVISION_EVENT_CHANNEL",
"/event/Order_Fulfilment_Requested__e"
);
const replayKey = sfReplayKey(channel);
const prev = Number((await this.cache.get<string>(replayKey)) ?? 0);
if (job.data.pubsubReplayId > prev) {
await this.cache.set(replayKey, String(job.data.pubsubReplayId));
}
} catch (e) {
this.logger.warn("Failed to commit Pub/Sub replay id", {
error: e instanceof Error ? e.message : String(e),
});
}
// Execute the same orchestration used by the webhook path, but without payload validation
await this.orchestrator.executeFulfillment(sfOrderId, {}, idempotencyKey);
this.logger.log("Provisioning job completed", { sfOrderId });
}
}

View File

@ -8,7 +8,6 @@ export interface ProvisioningJobData {
sfOrderId: string;
idempotencyKey: string;
correlationId?: string;
pubsubReplayId?: number;
}
@Injectable()
@ -19,10 +18,7 @@ export class ProvisioningQueueService {
) {}
async enqueue(job: ProvisioningJobData): Promise<void> {
const jobId =
typeof job.pubsubReplayId === "number"
? `sf:${job.sfOrderId}:replay:${job.pubsubReplayId}`
: `sf:${job.sfOrderId}`;
const jobId = job.idempotencyKey || `sf:${job.sfOrderId}`;
try {
await this.queue.add("provision", job, {
jobId,
@ -35,7 +31,6 @@ export class ProvisioningQueueService {
sfOrderId: job.sfOrderId,
idempotencyKey: job.idempotencyKey,
correlationId: job.correlationId,
pubsubReplayId: job.pubsubReplayId,
});
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);

View File

@ -25,11 +25,13 @@ Enqueues provisioning job
```
Salesforce Order Status → "Approved"
CDC: OrderChangeEvent (automatic)
Salesforce Flow: set Activation_Status__c = "Activating" (clear errors)
CDC: OrderChangeEvent (Activation_Status__c)
OrderCdcSubscriber receives event
Detects: Status changed to "Approved"
Detects: Activation_Status__c changed to "Activating"
Enqueues provisioning job
```
@ -41,7 +43,7 @@ Enqueues provisioning job
### **OrderCdcSubscriber Now Handles:**
1. **Order Provisioning** (NEW)
- Detects when Status changes to "Approved" or "Reactivate"
- Detects when Salesforce sets `Activation_Status__c` to `"Activating"`
- Validates order is not already provisioning/provisioned
- Enqueues provisioning job
@ -52,16 +54,13 @@ Enqueues provisioning job
### **Key Guards Added:**
```typescript
// 1. Only trigger for specific statuses
PROVISION_TRIGGER_STATUSES = ["Approved", "Reactivate"]
// 1. Only continue when Activation_Status__c === "Activating"
if (activationStatus !== "Activating") return;
// 2. Don't trigger if already provisioning
if (activationStatus === "Activating") return;
// 2. (Optional) If Status is present, require Approved/Reactivate
if (status && !PROVISION_TRIGGER_STATUSES.has(status)) return;
// 3. Don't trigger if already activated
if (activationStatus === "Activated") return;
// 4. Don't trigger if already has WHMCS Order ID
// 3. Don't trigger if already has WHMCS Order ID
if (whmcsOrderId) return;
```
@ -74,55 +73,54 @@ if (whmcsOrderId) return;
```
TIME: 10:00:00 - Admin clicks "Approve" in Salesforce
Order Status: "Pending Review" → "Approved"
Salesforce Flow:
- Sets Activation_Status__c = "Activating"
- Clears Activation_Error_* fields
TIME: 10:00:01 - CDC event published (automatic)
{
"Id": "801xxx",
"Status": "Approved",
"Activation_Status__c": null,
"WHMCS_Order_ID__c": null,
"changedFields": ["Status"]
"Activation_Status__c": "Activating",
"Activation_Error_Code__c": null,
"Activation_Error_Message__c": null,
"changedFields": ["Activation_Status__c", "Activation_Error_Code__c", "Activation_Error_Message__c"]
}
TIME: 10:00:02 - OrderCdcSubscriber.handleOrderEvent()
Step 1: Check if Status field changed
→ Yes, Status in changedFields
Step 1: Check if Activation_Status__c changed
→ Yes, value is "Activating"
Step 2: handleStatusChange()
newStatus = "Approved" ✅
activationStatus = null ✅ (not provisioning)
Step 2: handleActivationStatusChange()
activationStatus = "Activating" ✅
status (if provided) = "Approved" ✅
→ whmcsOrderId = null ✅ (not provisioned)
Step 3: Enqueue provisioning job
provisioningQueue.enqueue({
sfOrderId: "801xxx",
idempotencyKey: "cdc-status-1699999999999-801xxx",
idempotencyKey: "cdc-activation-1699999999999-801xxx",
correlationId: "cdc-order-801xxx"
})
Log: "Order status changed to provision trigger via CDC"
Log: "Order activation moved to Activating via CDC, enqueuing fulfillment"
TIME: 10:00:03 - Provisioning processor picks up job
Executes fulfillment
Executes fulfillment (Activation_Status__c already = "Activating" so guard passes)
Updates Salesforce:
- Activation_Status__c: "Activating"
- Then: "Activated"
- WHMCS_Order_ID__c: "12345"
- Status: "Completed"
- Activation_Status__c: "Activated"
- WHMCS_Order_ID__c: "12345"
TIME: 10:00:05 - CDC events for status updates
Event 1: Activation_Status__c changed
→ OrderCdcSubscriber checks
→ Is internal field → Skip cache invalidation ✅
→ Status didn't change → Skip provisioning ✅
Event 1: Activation_Status__c changed to "Activated"
→ Internal field → Skip cache invalidation ✅
→ Not "Activating" → Skip provisioning ✅
Event 2: Status → "Completed"
→ OrderCdcSubscriber checks
→ Status changed but not "Approved" → Skip provisioning ✅
→ Status changed but not "Approved"/"Reactivate" → Skip provisioning ✅
→ Customer-facing field → Invalidate cache ✅
```

View File

@ -96,7 +96,6 @@ SF_QUEUE_LONG_RUNNING_TIMEOUT_MS=600000
# Salesforce Platform Events (Provisioning)
SF_EVENTS_ENABLED=true
SF_PROVISION_EVENT_CHANNEL=/event/Order_Fulfilment_Requested__e
SF_CATALOG_EVENT_CHANNEL=/event/Product_and_Pricebook_Change__e
SF_ACCOUNT_EVENT_CHANNEL=/event/Account_Internet_Eligibility_Update__e
SF_ORDER_EVENT_CHANNEL=/event/Order_Fulfilment_Requested__e