Enhance Salesforce request handling and metrics tracking
- Introduced new metrics for daily API usage, including dailyApiLimit and dailyUsagePercent, to monitor API consumption effectively. - Added route-level metrics tracking to capture request success and failure rates for better performance insights. - Implemented degradation state management to handle rate limits and usage thresholds, improving resilience during high load. - Enhanced SalesforceRequestQueueService to include detailed logging for route-level metrics, aiding in debugging and performance analysis. - Updated Salesforce module to export new SalesforceReadThrottleGuard for improved request rate limiting across services. - Refactored various services to utilize the new metrics and logging features, ensuring consistent behavior and improved maintainability.
This commit is contained in:
parent
b3e3689315
commit
c79488a6a4
@ -13,6 +13,10 @@ export interface SalesforceQueueMetrics {
|
||||
lastRequestTime?: Date;
|
||||
lastErrorTime?: Date;
|
||||
lastRateLimitTime?: Date;
|
||||
dailyApiLimit?: number;
|
||||
dailyUsagePercent?: number;
|
||||
routeBreakdown?: Record<string, SalesforceRouteMetricsSnapshot>;
|
||||
degradation?: SalesforceDegradationSnapshot;
|
||||
}
|
||||
|
||||
export interface SalesforceRequestOptions {
|
||||
@ -21,6 +25,30 @@ export interface SalesforceRequestOptions {
|
||||
retryAttempts?: number; // Number of retry attempts
|
||||
retryDelay?: number; // Base delay between retries in ms
|
||||
isLongRunning?: boolean; // Mark as long-running request (>20s expected)
|
||||
label?: string; // Logical label for route-level metrics
|
||||
}
|
||||
|
||||
interface SalesforceRouteMetricsInternal {
|
||||
label: string;
|
||||
totalRequests: number;
|
||||
failedRequests: number;
|
||||
lastSuccessTime?: Date;
|
||||
lastErrorTime?: Date;
|
||||
}
|
||||
|
||||
interface SalesforceRouteMetricsSnapshot {
|
||||
totalRequests: number;
|
||||
failedRequests: number;
|
||||
successRate: number;
|
||||
lastSuccessTime?: Date;
|
||||
lastErrorTime?: Date;
|
||||
}
|
||||
|
||||
interface SalesforceDegradationSnapshot {
|
||||
degraded: boolean;
|
||||
reason: "rate-limit" | "usage-threshold" | null;
|
||||
cooldownExpiresAt?: Date;
|
||||
usagePercent: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -76,12 +104,24 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
private readonly executionTimes: number[] = [];
|
||||
private readonly maxMetricsHistory = 100;
|
||||
private dailyUsageResetTime: Date;
|
||||
private readonly routeMetrics = new Map<string, SalesforceRouteMetricsInternal>();
|
||||
private readonly usageWarningLevels = [0.7, 0.85, 0.95];
|
||||
private highestUsageWarningIssued = 0;
|
||||
private readonly dailyApiLimit: number;
|
||||
private readonly rateLimitCooldownMs: number;
|
||||
private degradeState: {
|
||||
until: Date | null;
|
||||
reason: "rate-limit" | "usage-threshold" | null;
|
||||
} = { until: null, reason: null };
|
||||
|
||||
constructor(
|
||||
@Inject(Logger) private readonly logger: Logger,
|
||||
private readonly configService: ConfigService
|
||||
) {
|
||||
this.dailyUsageResetTime = this.getNextDayReset();
|
||||
this.dailyApiLimit = this.resolveDailyApiLimit();
|
||||
this.rateLimitCooldownMs =
|
||||
this.parseNumericConfig(this.configService.get("SF_RATE_LIMIT_COOLDOWN_MS")) ?? 60000;
|
||||
}
|
||||
|
||||
private async loadPQueue(): Promise<PQueueCtor> {
|
||||
@ -199,20 +239,23 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
options: SalesforceRequestOptions = {}
|
||||
): Promise<T> {
|
||||
const { standardQueue, longRunningQueue } = await this.ensureQueuesInitialized();
|
||||
// Check daily API usage
|
||||
// Check daily usage
|
||||
this.checkDailyUsage();
|
||||
|
||||
const startTime = Date.now();
|
||||
const requestId = this.generateRequestId();
|
||||
const isLongRunning = options.isLongRunning || false;
|
||||
const queue = isLongRunning ? longRunningQueue : standardQueue;
|
||||
const label = this.normalizeLabel(options.label);
|
||||
|
||||
try {
|
||||
const result = (await queue.add(
|
||||
async () => {
|
||||
this.metrics.totalRequests++;
|
||||
this.metrics.dailyApiUsage++;
|
||||
this.recordRouteStart(label);
|
||||
this.updateQueueMetrics();
|
||||
this.maybeWarnOnUsage();
|
||||
|
||||
this.logger.debug("Executing Salesforce request", {
|
||||
requestId,
|
||||
@ -221,6 +264,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
pending: queue.pending,
|
||||
priority: options.priority || 0,
|
||||
dailyUsage: this.metrics.dailyApiUsage,
|
||||
label,
|
||||
});
|
||||
|
||||
const waitTime = Date.now() - startTime;
|
||||
@ -235,6 +279,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
this.recordExecutionTime(executionTime);
|
||||
this.metrics.completedRequests++;
|
||||
this.metrics.lastRequestTime = new Date();
|
||||
this.recordRouteSuccess(label);
|
||||
|
||||
this.logger.debug("Salesforce request completed", {
|
||||
requestId,
|
||||
@ -242,6 +287,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
waitTime,
|
||||
executionTime,
|
||||
totalTime: Date.now() - startTime,
|
||||
label,
|
||||
});
|
||||
|
||||
return response;
|
||||
@ -250,13 +296,16 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
this.recordExecutionTime(executionTime);
|
||||
this.metrics.failedRequests++;
|
||||
this.metrics.lastErrorTime = new Date();
|
||||
this.recordRouteFailure(label);
|
||||
|
||||
// Check if it's a rate limit error
|
||||
if (this.isRateLimitError(error)) {
|
||||
this.metrics.lastRateLimitTime = new Date();
|
||||
this.activateDegradeWindow("rate-limit");
|
||||
this.logger.warn("Salesforce rate limit encountered", {
|
||||
requestId,
|
||||
dailyUsage: this.metrics.dailyApiUsage,
|
||||
label,
|
||||
});
|
||||
}
|
||||
|
||||
@ -266,6 +315,7 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
waitTime,
|
||||
executionTime,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
label,
|
||||
});
|
||||
|
||||
throw error;
|
||||
@ -285,15 +335,28 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
/**
|
||||
* Execute high-priority Salesforce request (jumps queue)
|
||||
*/
|
||||
async executeHighPriority<T>(requestFn: () => Promise<T>, isLongRunning = false): Promise<T> {
|
||||
return this.execute(requestFn, { priority: 10, isLongRunning });
|
||||
async executeHighPriority<T>(
|
||||
requestFn: () => Promise<T>,
|
||||
options: SalesforceRequestOptions = {}
|
||||
): Promise<T> {
|
||||
return this.execute(requestFn, {
|
||||
...options,
|
||||
priority: options.priority ?? 10,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute long-running Salesforce request (uses separate queue)
|
||||
*/
|
||||
async executeLongRunning<T>(requestFn: () => Promise<T>): Promise<T> {
|
||||
return this.execute(requestFn, { isLongRunning: true, timeout: 600000 });
|
||||
async executeLongRunning<T>(
|
||||
requestFn: () => Promise<T>,
|
||||
options: SalesforceRequestOptions = {}
|
||||
): Promise<T> {
|
||||
return this.execute(requestFn, {
|
||||
...options,
|
||||
isLongRunning: true,
|
||||
timeout: options.timeout ?? 600000,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -301,7 +364,16 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
*/
|
||||
getMetrics(): SalesforceQueueMetrics {
|
||||
this.updateQueueMetrics();
|
||||
return { ...this.metrics };
|
||||
const dailyUsagePercent =
|
||||
this.dailyApiLimit > 0 ? this.metrics.dailyApiUsage / this.dailyApiLimit : 0;
|
||||
|
||||
return {
|
||||
...this.metrics,
|
||||
dailyApiLimit: this.dailyApiLimit,
|
||||
dailyUsagePercent,
|
||||
routeBreakdown: this.getRouteMetricsSnapshot(),
|
||||
degradation: this.getDegradationState(),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@ -320,9 +392,9 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
const errorRate =
|
||||
this.metrics.totalRequests > 0 ? this.metrics.failedRequests / this.metrics.totalRequests : 0;
|
||||
|
||||
// Estimate daily limit (conservative: 150,000 for ~50 users)
|
||||
const estimatedDailyLimit = 150000;
|
||||
const dailyUsagePercent = this.metrics.dailyApiUsage / estimatedDailyLimit;
|
||||
const effectiveDailyLimit = this.dailyApiLimit > 0 ? this.dailyApiLimit : 150000;
|
||||
const dailyUsagePercent =
|
||||
effectiveDailyLimit > 0 ? this.metrics.dailyApiUsage / effectiveDailyLimit : 0;
|
||||
|
||||
let status: "healthy" | "degraded" | "unhealthy" = "healthy";
|
||||
|
||||
@ -350,13 +422,19 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
usage: number;
|
||||
resetTime: Date;
|
||||
hoursUntilReset: number;
|
||||
limit: number;
|
||||
usagePercent: number;
|
||||
} {
|
||||
const limit = this.dailyApiLimit;
|
||||
const usagePercent = limit > 0 ? this.metrics.dailyApiUsage / limit : 0;
|
||||
return {
|
||||
usage: this.metrics.dailyApiUsage,
|
||||
resetTime: this.dailyUsageResetTime,
|
||||
hoursUntilReset: Math.ceil(
|
||||
(this.dailyUsageResetTime.getTime() - Date.now()) / (1000 * 60 * 60)
|
||||
),
|
||||
limit,
|
||||
usagePercent,
|
||||
};
|
||||
}
|
||||
|
||||
@ -445,9 +523,12 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
if (now >= this.dailyUsageResetTime) {
|
||||
this.metrics.dailyApiUsage = 0;
|
||||
this.dailyUsageResetTime = this.getNextDayReset();
|
||||
this.highestUsageWarningIssued = 0;
|
||||
this.degradeState = { until: null, reason: null };
|
||||
|
||||
this.logger.log("Daily Salesforce API usage reset", {
|
||||
resetTime: this.dailyUsageResetTime,
|
||||
dailyApiLimit: this.dailyApiLimit,
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -459,6 +540,33 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
return tomorrow;
|
||||
}
|
||||
|
||||
private resolveDailyApiLimit(): number {
|
||||
const configuredLimit = this.parseNumericConfig(this.configService.get("SF_DAILY_API_LIMIT"));
|
||||
if (configuredLimit && configuredLimit > 0) {
|
||||
return configuredLimit;
|
||||
}
|
||||
|
||||
const userCount =
|
||||
this.parseNumericConfig(this.configService.get("SF_FULL_USER_COUNT")) ||
|
||||
this.parseNumericConfig(this.configService.get("SF_USERS")) ||
|
||||
50;
|
||||
|
||||
return 100000 + userCount * 1000;
|
||||
}
|
||||
|
||||
private parseNumericConfig(value: unknown): number | undefined {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
return value;
|
||||
}
|
||||
if (typeof value === "string" && value.trim().length > 0) {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isNaN(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private setupQueueListeners(): void {
|
||||
if (!this.standardQueue || !this.longRunningQueue) {
|
||||
return;
|
||||
@ -494,6 +602,8 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
return;
|
||||
}
|
||||
|
||||
this.clearDegradeWindowIfElapsed();
|
||||
|
||||
this.metrics.queueSize = this.standardQueue.size + this.longRunningQueue.size;
|
||||
this.metrics.pendingRequests = this.standardQueue.pending + this.longRunningQueue.pending;
|
||||
|
||||
@ -526,4 +636,133 @@ export class SalesforceRequestQueueService implements OnModuleInit, OnModuleDest
|
||||
private generateRequestId(): string {
|
||||
return `sf_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
||||
}
|
||||
|
||||
private normalizeLabel(label?: string): string {
|
||||
if (!label) {
|
||||
return "generic";
|
||||
}
|
||||
const trimmed = label.trim();
|
||||
return trimmed.length > 0 ? trimmed : "generic";
|
||||
}
|
||||
|
||||
private recordRouteStart(label: string): void {
|
||||
const metric = this.getOrCreateRouteMetric(label);
|
||||
metric.totalRequests += 1;
|
||||
}
|
||||
|
||||
private recordRouteSuccess(label: string): void {
|
||||
const metric = this.getOrCreateRouteMetric(label);
|
||||
metric.lastSuccessTime = new Date();
|
||||
}
|
||||
|
||||
private recordRouteFailure(label: string): void {
|
||||
const metric = this.getOrCreateRouteMetric(label);
|
||||
metric.failedRequests += 1;
|
||||
metric.lastErrorTime = new Date();
|
||||
}
|
||||
|
||||
private getOrCreateRouteMetric(label: string): SalesforceRouteMetricsInternal {
|
||||
const existing = this.routeMetrics.get(label);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const metric: SalesforceRouteMetricsInternal = {
|
||||
label,
|
||||
totalRequests: 0,
|
||||
failedRequests: 0,
|
||||
};
|
||||
this.routeMetrics.set(label, metric);
|
||||
return metric;
|
||||
}
|
||||
|
||||
private getRouteMetricsSnapshot(): Record<string, SalesforceRouteMetricsSnapshot> {
|
||||
const snapshot: Record<string, SalesforceRouteMetricsSnapshot> = {};
|
||||
|
||||
for (const [label, metric] of this.routeMetrics.entries()) {
|
||||
const total = metric.totalRequests;
|
||||
const failures = metric.failedRequests;
|
||||
const successRate = total > 0 ? (total - failures) / total : 1;
|
||||
|
||||
snapshot[label] = {
|
||||
totalRequests: total,
|
||||
failedRequests: failures,
|
||||
successRate,
|
||||
lastSuccessTime: metric.lastSuccessTime,
|
||||
lastErrorTime: metric.lastErrorTime,
|
||||
};
|
||||
}
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
private maybeWarnOnUsage(): void {
|
||||
if (this.dailyApiLimit <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const usagePercent = this.metrics.dailyApiUsage / this.dailyApiLimit;
|
||||
if (usagePercent >= 0.95) {
|
||||
this.activateDegradeWindow("usage-threshold");
|
||||
}
|
||||
|
||||
const threshold = this.usageWarningLevels
|
||||
.slice()
|
||||
.reverse()
|
||||
.find(level => usagePercent >= level && level > this.highestUsageWarningIssued);
|
||||
|
||||
if (threshold !== undefined) {
|
||||
this.highestUsageWarningIssued = threshold;
|
||||
this.logger.warn("Salesforce daily API usage approaching limit", {
|
||||
usage: this.metrics.dailyApiUsage,
|
||||
limit: this.dailyApiLimit,
|
||||
usagePercent,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
getDegradationState(): SalesforceDegradationSnapshot {
|
||||
this.clearDegradeWindowIfElapsed();
|
||||
const usagePercent = this.dailyApiLimit > 0 ? this.metrics.dailyApiUsage / this.dailyApiLimit : 0;
|
||||
return {
|
||||
degraded: this.degradeState.until !== null,
|
||||
reason: this.degradeState.reason,
|
||||
cooldownExpiresAt: this.degradeState.until ?? undefined,
|
||||
usagePercent,
|
||||
};
|
||||
}
|
||||
|
||||
isDegraded(): boolean {
|
||||
return this.getDegradationState().degraded;
|
||||
}
|
||||
|
||||
shouldThrottleLowPriority(): boolean {
|
||||
return this.isDegraded();
|
||||
}
|
||||
|
||||
private activateDegradeWindow(reason: "rate-limit" | "usage-threshold"): void {
|
||||
if (this.rateLimitCooldownMs <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const newUntil = new Date(now + this.rateLimitCooldownMs);
|
||||
const active = this.degradeState.until && now < this.degradeState.until.getTime();
|
||||
|
||||
if (!active || this.degradeState.reason !== reason) {
|
||||
this.logger.warn("Salesforce circuit breaker engaged", {
|
||||
reason,
|
||||
cooldownMs: this.rateLimitCooldownMs,
|
||||
until: newUntil,
|
||||
});
|
||||
}
|
||||
|
||||
this.degradeState = { until: newUntil, reason };
|
||||
}
|
||||
|
||||
private clearDegradeWindowIfElapsed(): void {
|
||||
if (this.degradeState.until && Date.now() >= this.degradeState.until.getTime()) {
|
||||
this.degradeState = { until: null, reason: null };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,32 @@
|
||||
import { CanActivate, ExecutionContext, Inject, Injectable, TooManyRequestsException } from "@nestjs/common";
|
||||
import type { Request } from "express";
|
||||
import { Logger } from "nestjs-pino";
|
||||
import { SalesforceRequestQueueService } from "@bff/core/queue/services/salesforce-request-queue.service";
|
||||
|
||||
@Injectable()
|
||||
export class SalesforceReadThrottleGuard implements CanActivate {
|
||||
constructor(
|
||||
private readonly queue: SalesforceRequestQueueService,
|
||||
@Inject(Logger) private readonly logger: Logger
|
||||
) {}
|
||||
|
||||
canActivate(context: ExecutionContext): boolean {
|
||||
const state = this.queue.getDegradationState();
|
||||
if (!state.degraded) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const request = context.switchToHttp().getRequest<Request>();
|
||||
this.logger.warn("Throttling Salesforce-backed read due to degraded state", {
|
||||
path: request?.originalUrl ?? request?.url,
|
||||
reason: state.reason,
|
||||
cooldownExpiresAt: state.cooldownExpiresAt,
|
||||
usagePercent: state.usagePercent,
|
||||
});
|
||||
|
||||
throw new TooManyRequestsException(
|
||||
"We're experiencing high load right now. Please try again in a moment."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import { SalesforceConnection } from "./services/salesforce-connection.service";
|
||||
import { SalesforceAccountService } from "./services/salesforce-account.service";
|
||||
import { SalesforceOrderService } from "./services/salesforce-order.service";
|
||||
import { OrderFieldConfigModule } from "@bff/modules/orders/config/order-field-config.module";
|
||||
import { SalesforceReadThrottleGuard } from "./guards/salesforce-read-throttle.guard";
|
||||
|
||||
@Module({
|
||||
imports: [QueueModule, ConfigModule, OrderFieldConfigModule],
|
||||
@ -14,7 +15,13 @@ import { OrderFieldConfigModule } from "@bff/modules/orders/config/order-field-c
|
||||
SalesforceAccountService,
|
||||
SalesforceOrderService,
|
||||
SalesforceService,
|
||||
SalesforceReadThrottleGuard,
|
||||
],
|
||||
exports: [
|
||||
SalesforceService,
|
||||
SalesforceConnection,
|
||||
SalesforceOrderService,
|
||||
SalesforceReadThrottleGuard,
|
||||
],
|
||||
exports: [SalesforceService, SalesforceConnection, SalesforceOrderService],
|
||||
})
|
||||
export class SalesforceModule {}
|
||||
|
||||
@ -56,6 +56,12 @@ export class SalesforceService implements OnModuleInit {
|
||||
return this.accountService.findByCustomerNumber(customerNumber);
|
||||
}
|
||||
|
||||
async findAccountWithDetailsByCustomerNumber(
|
||||
customerNumber: string
|
||||
): Promise<{ id: string; WH_Account__c?: string | null; Name?: string | null } | null> {
|
||||
return this.accountService.findWithDetailsByCustomerNumber(customerNumber);
|
||||
}
|
||||
|
||||
async getAccountDetails(
|
||||
accountId: string
|
||||
): Promise<{ id: string; WH_Account__c?: string | null; Name?: string | null } | null> {
|
||||
@ -122,7 +128,8 @@ export class SalesforceService implements OnModuleInit {
|
||||
AccountId, Account.Name
|
||||
FROM Order
|
||||
WHERE Id = '${orderId}'
|
||||
LIMIT 1`
|
||||
LIMIT 1`,
|
||||
{ label: "orders:integration:getOrder" }
|
||||
)) as { records: SalesforceOrderRecord[]; totalSize: number };
|
||||
|
||||
return result.records?.[0] || null;
|
||||
|
||||
@ -29,7 +29,8 @@ export class SalesforceAccountService {
|
||||
|
||||
try {
|
||||
const result = (await this.connection.query(
|
||||
`SELECT Id FROM Account WHERE SF_Account_No__c = '${this.safeSoql(validCustomerNumber)}'`
|
||||
`SELECT Id FROM Account WHERE SF_Account_No__c = '${this.safeSoql(validCustomerNumber)}'`,
|
||||
{ label: "auth:findAccountByCustomerNumber" }
|
||||
)) as SalesforceResponse<SalesforceAccountRecord>;
|
||||
return result.totalSize > 0 ? { id: result.records[0]?.Id ?? "" } : null;
|
||||
} catch (error) {
|
||||
@ -40,6 +41,39 @@ export class SalesforceAccountService {
|
||||
}
|
||||
}
|
||||
|
||||
async findWithDetailsByCustomerNumber(
|
||||
customerNumber: string
|
||||
): Promise<{
|
||||
id: string;
|
||||
Name?: string | null;
|
||||
WH_Account__c?: string | null;
|
||||
} | null> {
|
||||
const validCustomerNumber = customerNumberSchema.parse(customerNumber);
|
||||
|
||||
try {
|
||||
const result = (await this.connection.query(
|
||||
`SELECT Id, Name, WH_Account__c FROM Account WHERE SF_Account_No__c = '${this.safeSoql(validCustomerNumber)}'`,
|
||||
{ label: "auth:findAccountWithDetails" }
|
||||
)) as SalesforceResponse<SalesforceAccountRecord>;
|
||||
|
||||
if (result.totalSize === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const record = result.records[0];
|
||||
return {
|
||||
id: record?.Id ?? "",
|
||||
Name: record?.Name,
|
||||
WH_Account__c: record?.WH_Account__c || undefined,
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to find account with details by customer number", {
|
||||
error: getErrorMessage(error),
|
||||
});
|
||||
throw new Error("Failed to find account");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get account details including WH_Account__c field
|
||||
* Used in signup workflow to check if account is already linked to WHMCS
|
||||
@ -51,7 +85,8 @@ export class SalesforceAccountService {
|
||||
|
||||
try {
|
||||
const result = (await this.connection.query(
|
||||
`SELECT Id, Name, WH_Account__c FROM Account WHERE Id = '${this.safeSoql(validAccountId)}'`
|
||||
`SELECT Id, Name, WH_Account__c FROM Account WHERE Id = '${this.safeSoql(validAccountId)}'`,
|
||||
{ label: "auth:getAccountDetails" }
|
||||
)) as SalesforceResponse<SalesforceAccountRecord>;
|
||||
|
||||
if (result.totalSize === 0) {
|
||||
|
||||
@ -53,7 +53,11 @@ describe("SalesforceConnection", () => {
|
||||
|
||||
expect(requestQueue.execute).toHaveBeenCalledTimes(1);
|
||||
const [, options] = (requestQueue.execute as jest.Mock).mock.calls[0];
|
||||
expect(options).toMatchObject({ priority: 8, isLongRunning: false });
|
||||
expect(options).toMatchObject({
|
||||
priority: 8,
|
||||
isLongRunning: false,
|
||||
label: "salesforce:query:account",
|
||||
});
|
||||
expect(queryMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
@ -64,5 +68,7 @@ describe("SalesforceConnection", () => {
|
||||
await sobject.create({ Name: "Test" });
|
||||
|
||||
expect(requestQueue.executeHighPriority).toHaveBeenCalledTimes(1);
|
||||
const [, options] = (requestQueue.executeHighPriority as jest.Mock).mock.calls[0];
|
||||
expect(options).toMatchObject({ label: "salesforce:sobject:Order:create" });
|
||||
});
|
||||
});
|
||||
|
||||
@ -241,9 +241,10 @@ export class SalesforceConnection {
|
||||
}
|
||||
|
||||
// Expose connection methods with automatic re-authentication
|
||||
async query(soql: string): Promise<unknown> {
|
||||
async query(soql: string, options: { label?: string } = {}): Promise<unknown> {
|
||||
const priority = this.getQueryPriority(soql);
|
||||
const isLongRunning = this.isLongRunningQuery(soql);
|
||||
const label = options.label ?? this.deriveQueryLabel(soql);
|
||||
|
||||
try {
|
||||
return await this.requestQueue.execute(
|
||||
@ -283,7 +284,7 @@ export class SalesforceConnection {
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ priority, isLongRunning }
|
||||
{ priority, isLongRunning, label }
|
||||
);
|
||||
} catch (error: unknown) {
|
||||
throw error;
|
||||
@ -351,7 +352,7 @@ export class SalesforceConnection {
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
}, { label: `salesforce:sobject:${type}:create` });
|
||||
},
|
||||
|
||||
update: async (data: object & { Id: string }) => {
|
||||
@ -396,11 +397,66 @@ export class SalesforceConnection {
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
}, { label: `salesforce:sobject:${type}:update` });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async compositeTree<T>(
|
||||
sobjectType: string,
|
||||
body: unknown,
|
||||
options: { allOrNone?: boolean; priority?: number; label?: string } = {}
|
||||
): Promise<T> {
|
||||
const priority = options.priority ?? 5;
|
||||
const allOrNone = options.allOrNone ?? true;
|
||||
const path = this.buildCompositeTreePath(sobjectType, allOrNone);
|
||||
const label = options.label ?? `salesforce:composite:${sobjectType}`;
|
||||
|
||||
return this.requestQueue.execute(async () => {
|
||||
await this.ensureConnected();
|
||||
|
||||
try {
|
||||
return (await this.connection.requestPost(path, body)) as T;
|
||||
} catch (error) {
|
||||
if (this.isSessionExpiredError(error)) {
|
||||
const reAuthStartTime = Date.now();
|
||||
this.logger.warn("Salesforce session expired during composite tree request, retrying", {
|
||||
sobjectType,
|
||||
originalError: getErrorMessage(error),
|
||||
});
|
||||
|
||||
await this.connect(true);
|
||||
const reAuthDuration = Date.now() - reAuthStartTime;
|
||||
|
||||
this.logger.debug("Retrying composite tree request after re-authentication", {
|
||||
sobjectType,
|
||||
reAuthDuration,
|
||||
});
|
||||
|
||||
return (await this.connection.requestPost(path, body)) as T;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}, { priority, label });
|
||||
}
|
||||
|
||||
private buildCompositeTreePath(sobjectType: string, allOrNone: boolean): string {
|
||||
const version = this.connection.version || "58.0";
|
||||
const normalizedVersion = version.startsWith("v") ? version.slice(1) : version;
|
||||
const encodedType = encodeURIComponent(sobjectType);
|
||||
const query = `?allOrNone=${allOrNone ? "true" : "false"}`;
|
||||
return `/services/data/v${normalizedVersion}/composite/tree/${encodedType}${query}`;
|
||||
}
|
||||
|
||||
private deriveQueryLabel(soql: string): string {
|
||||
const match = /from\s+([a-z0-9_]+)/i.exec(soql);
|
||||
if (!match) {
|
||||
return "salesforce:query:unknown";
|
||||
}
|
||||
return `salesforce:query:${match[1]}`;
|
||||
}
|
||||
|
||||
isConnected(): boolean {
|
||||
return !!this.connection.accessToken;
|
||||
}
|
||||
@ -449,7 +505,8 @@ export class SalesforceConnection {
|
||||
/**
|
||||
* Execute a high-priority Salesforce request (jumps queue)
|
||||
*/
|
||||
async queryHighPriority(soql: string): Promise<unknown> {
|
||||
async queryHighPriority(soql: string, options: { label?: string } = {}): Promise<unknown> {
|
||||
const label = options.label ?? `${this.deriveQueryLabel(soql)}:high`;
|
||||
return this.requestQueue.executeHighPriority(async () => {
|
||||
await this.ensureConnected();
|
||||
try {
|
||||
@ -474,7 +531,7 @@ export class SalesforceConnection {
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
}, { label });
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -40,6 +40,8 @@ export class SalesforceOrderService {
|
||||
private readonly orderFieldMap: OrderFieldMapService
|
||||
) {}
|
||||
|
||||
private readonly compositeOrderReference = "order_ref";
|
||||
|
||||
/**
|
||||
* Get order by ID with full details including order items
|
||||
*/
|
||||
@ -76,8 +78,12 @@ export class SalesforceOrderService {
|
||||
try {
|
||||
// Execute queries in parallel
|
||||
const [orderResult, itemsResult] = await Promise.all([
|
||||
this.sf.query(orderSoql) as Promise<SalesforceResponse<SalesforceOrderRecord>>,
|
||||
this.sf.query(orderItemsSoql) as Promise<SalesforceResponse<SalesforceOrderItemRecord>>,
|
||||
this.sf.query(orderSoql, { label: "orders:getOrderById" }) as Promise<
|
||||
SalesforceResponse<SalesforceOrderRecord>
|
||||
>,
|
||||
this.sf.query(orderItemsSoql, { label: "orders:getOrderItemsById" }) as Promise<
|
||||
SalesforceResponse<SalesforceOrderItemRecord>
|
||||
>,
|
||||
]);
|
||||
|
||||
const order = orderResult.records?.[0];
|
||||
@ -129,6 +135,84 @@ export class SalesforceOrderService {
|
||||
}
|
||||
}
|
||||
|
||||
async createOrderWithItems(
|
||||
orderFields: Record<string, unknown>,
|
||||
items: Array<{ pricebookEntryId: string; unitPrice: number; quantity: number; sku?: string }>
|
||||
): Promise<{ id: string }> {
|
||||
if (!items.length) {
|
||||
this.logger.warn(
|
||||
"No order items provided for composite order creation; falling back to single order create"
|
||||
);
|
||||
return this.createOrder(orderFields);
|
||||
}
|
||||
|
||||
const typeField = this.orderFieldMap.fields.order.type;
|
||||
const payload = {
|
||||
records: [
|
||||
{
|
||||
attributes: { type: "Order", referenceId: this.compositeOrderReference },
|
||||
...orderFields,
|
||||
OrderItems: {
|
||||
records: items.map((item, index) => ({
|
||||
attributes: { type: "OrderItem", referenceId: `order_item_${index + 1}` },
|
||||
PricebookEntryId: item.pricebookEntryId,
|
||||
Quantity: item.quantity,
|
||||
UnitPrice: item.unitPrice,
|
||||
})),
|
||||
},
|
||||
},
|
||||
],
|
||||
} satisfies SalesforceCompositeTreeRequest;
|
||||
|
||||
try {
|
||||
const result = await this.sf.compositeTree<SalesforceCompositeTreeResponse>(
|
||||
"Order",
|
||||
payload,
|
||||
{ allOrNone: true, priority: 8, label: "orders:createWithItems" }
|
||||
);
|
||||
|
||||
if (result.hasErrors) {
|
||||
const errorDetails = this.flattenCompositeErrors(result)
|
||||
.map(err => `[${err.statusCode}] ${err.message}`)
|
||||
.join("; ");
|
||||
|
||||
throw new Error(
|
||||
errorDetails || "Salesforce composite tree returned errors during order creation"
|
||||
);
|
||||
}
|
||||
|
||||
const orderResult = result.results.find(
|
||||
entry => entry.referenceId === this.compositeOrderReference
|
||||
);
|
||||
|
||||
if (!orderResult?.id) {
|
||||
throw new Error("Salesforce composite tree response missing order ID");
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
{
|
||||
orderId: orderResult.id,
|
||||
orderType: orderFields[typeField],
|
||||
orderItems: items.length,
|
||||
},
|
||||
"Salesforce order created via composite tree"
|
||||
);
|
||||
|
||||
return { id: orderResult.id };
|
||||
} catch (error: unknown) {
|
||||
this.logger.error("Failed to create Salesforce order with composite tree", {
|
||||
error: getErrorMessage(error),
|
||||
orderType: orderFields[typeField],
|
||||
orderItems: items.length,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private flattenCompositeErrors(response: SalesforceCompositeTreeResponse) {
|
||||
return (response.results ?? []).flatMap(entry => entry.errors ?? []);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get orders for a Salesforce account with item summaries
|
||||
*/
|
||||
@ -158,9 +242,9 @@ export class SalesforceOrderService {
|
||||
|
||||
try {
|
||||
// Fetch orders
|
||||
const ordersResult = (await this.sf.query(
|
||||
ordersSoql
|
||||
)) as SalesforceResponse<SalesforceOrderRecord>;
|
||||
const ordersResult = (await this.sf.query(ordersSoql, {
|
||||
label: "orders:listByAccount",
|
||||
})) as SalesforceResponse<SalesforceOrderRecord>;
|
||||
const orders = ordersResult.records || [];
|
||||
|
||||
if (orders.length === 0) {
|
||||
@ -186,9 +270,9 @@ export class SalesforceOrderService {
|
||||
ORDER BY CreatedDate ASC
|
||||
`;
|
||||
|
||||
const itemsResult = (await this.sf.query(
|
||||
itemsSoql
|
||||
)) as SalesforceResponse<SalesforceOrderItemRecord>;
|
||||
const itemsResult = (await this.sf.query(itemsSoql, {
|
||||
label: "orders:listItemsByAccount",
|
||||
})) as SalesforceResponse<SalesforceOrderItemRecord>;
|
||||
const allItems = itemsResult.records || [];
|
||||
|
||||
// Group items by order ID
|
||||
@ -233,3 +317,27 @@ export class SalesforceOrderService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface SalesforceCompositeTreeRequest {
|
||||
records: Array<{
|
||||
attributes: { type: string; referenceId: string };
|
||||
OrderItems?: {
|
||||
records: Array<{
|
||||
attributes: { type: string; referenceId: string };
|
||||
PricebookEntryId: string;
|
||||
Quantity: number;
|
||||
UnitPrice: number;
|
||||
}>;
|
||||
};
|
||||
[key: string]: unknown;
|
||||
}>;
|
||||
}
|
||||
|
||||
interface SalesforceCompositeTreeResponse {
|
||||
hasErrors: boolean;
|
||||
results: Array<{
|
||||
referenceId?: string;
|
||||
id?: string;
|
||||
errors?: Array<{ statusCode: string; message: string; fields?: string[] }>;
|
||||
}>;
|
||||
}
|
||||
|
||||
@ -32,8 +32,21 @@ type _SanitizedPrismaUser = Omit<
|
||||
"passwordHash" | "failedLoginAttempts" | "lockedUntil"
|
||||
>;
|
||||
|
||||
interface SignupAccountSnapshot {
|
||||
id: string;
|
||||
Name?: string | null;
|
||||
WH_Account__c?: string | null;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class SignupWorkflowService {
|
||||
private readonly accountLookupCache = new Map<
|
||||
string,
|
||||
{ value: SignupAccountSnapshot | null; expiresAt: number }
|
||||
>();
|
||||
private readonly accountCacheTtlMs = 30_000;
|
||||
private readonly accountCacheMaxEntries = 500;
|
||||
|
||||
constructor(
|
||||
private readonly usersFacade: UsersFacade,
|
||||
private readonly mappingsService: MappingsService,
|
||||
@ -51,8 +64,8 @@ export class SignupWorkflowService {
|
||||
const { sfNumber } = validateData;
|
||||
|
||||
try {
|
||||
const sfAccount = await this.salesforceService.findAccountByCustomerNumber(sfNumber);
|
||||
if (!sfAccount) {
|
||||
const accountSnapshot = await this.getAccountSnapshot(sfNumber);
|
||||
if (!accountSnapshot) {
|
||||
await this.auditService.logAuthEvent(
|
||||
AuditAction.SIGNUP,
|
||||
undefined,
|
||||
@ -64,12 +77,12 @@ export class SignupWorkflowService {
|
||||
throw new BadRequestException("Customer number not found in Salesforce");
|
||||
}
|
||||
|
||||
const existingMapping = await this.mappingsService.findBySfAccountId(sfAccount.id);
|
||||
const existingMapping = await this.mappingsService.findBySfAccountId(accountSnapshot.id);
|
||||
if (existingMapping) {
|
||||
await this.auditService.logAuthEvent(
|
||||
AuditAction.SIGNUP,
|
||||
undefined,
|
||||
{ sfNumber, sfAccountId: sfAccount.id, reason: "Already has mapping" },
|
||||
{ sfNumber, sfAccountId: accountSnapshot.id, reason: "Already has mapping" },
|
||||
request,
|
||||
false,
|
||||
"Customer number already registered"
|
||||
@ -79,15 +92,14 @@ export class SignupWorkflowService {
|
||||
);
|
||||
}
|
||||
|
||||
const accountDetails = await this.salesforceService.getAccountDetails(sfAccount.id);
|
||||
if (accountDetails?.WH_Account__c && accountDetails.WH_Account__c.trim() !== "") {
|
||||
if (accountSnapshot.WH_Account__c && accountSnapshot.WH_Account__c.trim() !== "") {
|
||||
await this.auditService.logAuthEvent(
|
||||
AuditAction.SIGNUP,
|
||||
undefined,
|
||||
{
|
||||
sfNumber,
|
||||
sfAccountId: sfAccount.id,
|
||||
whAccount: accountDetails.WH_Account__c,
|
||||
sfAccountId: accountSnapshot.id,
|
||||
whAccount: accountSnapshot.WH_Account__c,
|
||||
reason: "WH Account not empty",
|
||||
},
|
||||
request,
|
||||
@ -102,14 +114,14 @@ export class SignupWorkflowService {
|
||||
await this.auditService.logAuthEvent(
|
||||
AuditAction.SIGNUP,
|
||||
undefined,
|
||||
{ sfNumber, sfAccountId: sfAccount.id, step: "validation" },
|
||||
{ sfNumber, sfAccountId: accountSnapshot.id, step: "validation" },
|
||||
request,
|
||||
true
|
||||
);
|
||||
|
||||
return {
|
||||
valid: true,
|
||||
sfAccountId: sfAccount.id,
|
||||
sfAccountId: accountSnapshot.id,
|
||||
message: "Customer number validated successfully",
|
||||
};
|
||||
} catch (error) {
|
||||
@ -176,13 +188,19 @@ export class SignupWorkflowService {
|
||||
const passwordHash = await bcrypt.hash(password, saltRounds);
|
||||
|
||||
try {
|
||||
const sfAccount = await this.salesforceService.findAccountByCustomerNumber(sfNumber);
|
||||
if (!sfAccount) {
|
||||
const accountSnapshot = await this.getAccountSnapshot(sfNumber);
|
||||
if (!accountSnapshot) {
|
||||
throw new BadRequestException(
|
||||
`Salesforce account not found for Customer Number: ${sfNumber}`
|
||||
);
|
||||
}
|
||||
|
||||
if (accountSnapshot.WH_Account__c && accountSnapshot.WH_Account__c.trim() !== "") {
|
||||
throw new ConflictException(
|
||||
"You already have an account. Please use the login page to access your existing account."
|
||||
);
|
||||
}
|
||||
|
||||
let whmcsClient: { clientId: number };
|
||||
try {
|
||||
try {
|
||||
@ -289,7 +307,7 @@ export class SignupWorkflowService {
|
||||
data: {
|
||||
userId: created.id,
|
||||
whmcsClientId: whmcsClient.clientId,
|
||||
sfAccountId: sfAccount.id,
|
||||
sfAccountId: accountSnapshot.id,
|
||||
},
|
||||
});
|
||||
|
||||
@ -415,15 +433,15 @@ export class SignupWorkflowService {
|
||||
return result;
|
||||
}
|
||||
|
||||
const sfAccount = await this.salesforceService.findAccountByCustomerNumber(sfNumber);
|
||||
if (!sfAccount) {
|
||||
const accountSnapshot = await this.getAccountSnapshot(sfNumber);
|
||||
if (!accountSnapshot) {
|
||||
result.nextAction = "fix_input";
|
||||
result.messages.push("Customer number not found in Salesforce");
|
||||
return result;
|
||||
}
|
||||
result.salesforce.accountId = sfAccount.id;
|
||||
result.salesforce.accountId = accountSnapshot.id;
|
||||
|
||||
const existingMapping = await this.mappingsService.findBySfAccountId(sfAccount.id);
|
||||
const existingMapping = await this.mappingsService.findBySfAccountId(accountSnapshot.id);
|
||||
if (existingMapping) {
|
||||
result.salesforce.alreadyMapped = true;
|
||||
result.nextAction = "login";
|
||||
@ -468,4 +486,65 @@ export class SignupWorkflowService {
|
||||
result.messages.push("All checks passed. Ready to create your account.");
|
||||
return result;
|
||||
}
|
||||
|
||||
private async getAccountSnapshot(sfNumber: string): Promise<SignupAccountSnapshot | null> {
|
||||
const normalized = this.normalizeCustomerNumber(sfNumber);
|
||||
if (!normalized) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
this.pruneExpiredAccountSnapshots(now);
|
||||
|
||||
const cached = this.accountLookupCache.get(normalized);
|
||||
if (cached && cached.expiresAt > now) {
|
||||
return cached.value;
|
||||
}
|
||||
|
||||
if (cached) {
|
||||
this.accountLookupCache.delete(normalized);
|
||||
}
|
||||
|
||||
const resolved = await this.salesforceService.findAccountWithDetailsByCustomerNumber(normalized);
|
||||
if (resolved && resolved.id) {
|
||||
this.storeAccountSnapshot(normalized, resolved);
|
||||
return resolved;
|
||||
}
|
||||
|
||||
this.storeAccountSnapshot(normalized, null);
|
||||
return null;
|
||||
}
|
||||
|
||||
private normalizeCustomerNumber(sfNumber: string): string | null {
|
||||
if (typeof sfNumber !== "string") {
|
||||
return null;
|
||||
}
|
||||
const trimmed = sfNumber.trim();
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
private pruneExpiredAccountSnapshots(referenceTime: number): void {
|
||||
for (const [key, entry] of this.accountLookupCache.entries()) {
|
||||
if (entry.expiresAt <= referenceTime) {
|
||||
this.accountLookupCache.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private storeAccountSnapshot(
|
||||
sfNumber: string,
|
||||
snapshot: SignupAccountSnapshot | null
|
||||
): void {
|
||||
this.accountLookupCache.set(sfNumber, {
|
||||
value: snapshot,
|
||||
expiresAt: Date.now() + this.accountCacheTtlMs,
|
||||
});
|
||||
|
||||
if (this.accountLookupCache.size > this.accountCacheMaxEntries) {
|
||||
const oldestKey = this.accountLookupCache.keys().next().value;
|
||||
if (typeof oldestKey === "string") {
|
||||
this.accountLookupCache.delete(oldestKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import { LoginResultInterceptor } from "./interceptors/login-result.interceptor"
|
||||
import { Public } from "../../decorators/public.decorator";
|
||||
import { ZodValidationPipe } from "@customer-portal/validation/nestjs";
|
||||
import type { RequestWithUser } from "@bff/modules/auth/auth.types";
|
||||
import { SalesforceReadThrottleGuard } from "@bff/integrations/salesforce/guards/salesforce-read-throttle.guard";
|
||||
|
||||
// Import Zod schemas from domain
|
||||
import {
|
||||
@ -132,7 +133,7 @@ export class AuthController {
|
||||
|
||||
@Public()
|
||||
@Post("validate-signup")
|
||||
@UseGuards(AuthThrottleGuard)
|
||||
@UseGuards(AuthThrottleGuard, SalesforceReadThrottleGuard)
|
||||
@Throttle({ default: { limit: 20, ttl: 600 } }) // 20 validations per 10 minutes per IP
|
||||
@UsePipes(new ZodValidationPipe(validateSignupRequestSchema))
|
||||
async validateSignup(@Body() validateData: ValidateSignupRequest, @Req() req: Request) {
|
||||
@ -147,7 +148,7 @@ export class AuthController {
|
||||
|
||||
@Public()
|
||||
@Post("signup-preflight")
|
||||
@UseGuards(AuthThrottleGuard)
|
||||
@UseGuards(AuthThrottleGuard, SalesforceReadThrottleGuard)
|
||||
@Throttle({ default: { limit: 20, ttl: 600 } }) // 20 validations per 10 minutes per IP
|
||||
@UsePipes(new ZodValidationPipe(signupRequestSchema))
|
||||
@HttpCode(200)
|
||||
|
||||
22
apps/bff/src/modules/catalog/catalog-health.controller.ts
Normal file
22
apps/bff/src/modules/catalog/catalog-health.controller.ts
Normal file
@ -0,0 +1,22 @@
|
||||
import { Controller, Get } from "@nestjs/common";
|
||||
import { CatalogCacheService } from "./services/catalog-cache.service";
|
||||
|
||||
@Controller("health/catalog")
|
||||
export class CatalogHealthController {
|
||||
constructor(private readonly catalogCache: CatalogCacheService) {}
|
||||
|
||||
@Get("cache")
|
||||
getCacheMetrics() {
|
||||
return {
|
||||
timestamp: new Date().toISOString(),
|
||||
metrics: this.catalogCache.getMetrics(),
|
||||
ttl: {
|
||||
catalogSeconds: 3600,
|
||||
eligibilitySeconds: 900,
|
||||
staticSeconds: 900,
|
||||
volatileSeconds: 60,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,9 +15,10 @@ import {
|
||||
import { InternetCatalogService } from "./services/internet-catalog.service";
|
||||
import { SimCatalogService } from "./services/sim-catalog.service";
|
||||
import { VpnCatalogService } from "./services/vpn-catalog.service";
|
||||
import { SalesforceReadThrottleGuard } from "@bff/integrations/salesforce/guards/salesforce-read-throttle.guard";
|
||||
|
||||
@Controller("catalog")
|
||||
@UseGuards(ThrottlerGuard)
|
||||
@UseGuards(SalesforceReadThrottleGuard, ThrottlerGuard)
|
||||
export class CatalogController {
|
||||
constructor(
|
||||
private internetCatalog: InternetCatalogService,
|
||||
@ -27,7 +28,7 @@ export class CatalogController {
|
||||
|
||||
@Get("internet/plans")
|
||||
@Throttle({ default: { limit: 20, ttl: 60 } }) // 20 requests per minute
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getInternetPlans(@Request() req: RequestWithUser): Promise<{
|
||||
plans: InternetPlanCatalogItem[];
|
||||
installations: InternetInstallationCatalogItem[];
|
||||
@ -49,20 +50,20 @@ export class CatalogController {
|
||||
}
|
||||
|
||||
@Get("internet/addons")
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getInternetAddons(): Promise<InternetAddonCatalogItem[]> {
|
||||
return this.internetCatalog.getAddons();
|
||||
}
|
||||
|
||||
@Get("internet/installations")
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getInternetInstallations(): Promise<InternetInstallationCatalogItem[]> {
|
||||
return this.internetCatalog.getInstallations();
|
||||
}
|
||||
|
||||
@Get("sim/plans")
|
||||
@Throttle({ default: { limit: 20, ttl: 60 } }) // 20 requests per minute
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getSimCatalogData(@Request() req: RequestWithUser): Promise<SimCatalogCollection> {
|
||||
const userId = req.user?.id;
|
||||
if (!userId) {
|
||||
@ -83,26 +84,26 @@ export class CatalogController {
|
||||
}
|
||||
|
||||
@Get("sim/activation-fees")
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getSimActivationFees(): Promise<SimActivationFeeCatalogItem[]> {
|
||||
return this.simCatalog.getActivationFees();
|
||||
}
|
||||
|
||||
@Get("sim/addons")
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getSimAddons(): Promise<SimCatalogProduct[]> {
|
||||
return this.simCatalog.getAddons();
|
||||
}
|
||||
|
||||
@Get("vpn/plans")
|
||||
@Throttle({ default: { limit: 20, ttl: 60 } }) // 20 requests per minute
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getVpnPlans(): Promise<VpnCatalogProduct[]> {
|
||||
return this.vpnCatalog.getPlans();
|
||||
}
|
||||
|
||||
@Get("vpn/activation-fees")
|
||||
@Header("Cache-Control", "public, max-age=300, s-maxage=300") // 5 minutes
|
||||
@Header("Cache-Control", "public, max-age=3600, s-maxage=3600") // 60 minutes
|
||||
async getVpnActivationFees(): Promise<VpnCatalogProduct[]> {
|
||||
return this.vpnCatalog.getActivationFees();
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { CatalogController } from "./catalog.controller";
|
||||
import { CatalogHealthController } from "./catalog-health.controller";
|
||||
import { IntegrationsModule } from "@bff/integrations/integrations.module";
|
||||
import { MappingsModule } from "@bff/modules/id-mappings/mappings.module";
|
||||
import { CoreConfigModule } from "@bff/core/config/config.module";
|
||||
@ -13,7 +14,7 @@ import { CatalogCacheService } from "./services/catalog-cache.service";
|
||||
|
||||
@Module({
|
||||
imports: [IntegrationsModule, MappingsModule, CoreConfigModule, CacheModule],
|
||||
controllers: [CatalogController],
|
||||
controllers: [CatalogController, CatalogHealthController],
|
||||
providers: [
|
||||
BaseCatalogService,
|
||||
InternetCatalogService,
|
||||
|
||||
@ -40,7 +40,9 @@ export class BaseCatalogService {
|
||||
context: string
|
||||
): Promise<TRecord[]> {
|
||||
try {
|
||||
const res = (await this.sf.query(soql)) as SalesforceResponse<TRecord>;
|
||||
const res = (await this.sf.query(soql, {
|
||||
label: `catalog:${context.replace(/\s+/g, "_" ).toLowerCase()}`,
|
||||
})) as SalesforceResponse<TRecord>;
|
||||
return res.records ?? [];
|
||||
} catch (error: unknown) {
|
||||
this.logger.error(`Query failed: ${context}`, {
|
||||
|
||||
@ -1,6 +1,19 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { CacheService } from "@bff/infra/cache/cache.service";
|
||||
|
||||
interface CacheBucketMetrics {
|
||||
hits: number;
|
||||
misses: number;
|
||||
}
|
||||
|
||||
interface CatalogCacheSnapshot {
|
||||
catalog: CacheBucketMetrics;
|
||||
static: CacheBucketMetrics;
|
||||
volatile: CacheBucketMetrics;
|
||||
eligibility: CacheBucketMetrics;
|
||||
invalidations: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Catalog-specific caching service
|
||||
*
|
||||
@ -9,36 +22,54 @@ import { CacheService } from "@bff/infra/cache/cache.service";
|
||||
*/
|
||||
@Injectable()
|
||||
export class CatalogCacheService {
|
||||
// 5 minutes for catalog data (plans, SKUs, pricing)
|
||||
private readonly CATALOG_TTL = 300;
|
||||
// 60 minutes for catalog data (plans, SKUs, pricing)
|
||||
private readonly CATALOG_TTL = 3600;
|
||||
|
||||
// 15 minutes for relatively static data (categories, metadata)
|
||||
private readonly STATIC_TTL = 900;
|
||||
|
||||
// 15 minutes for account eligibility snapshots
|
||||
private readonly ELIGIBILITY_TTL = 900;
|
||||
|
||||
// 1 minute for volatile data (availability, inventory)
|
||||
private readonly VOLATILE_TTL = 60;
|
||||
|
||||
private readonly metrics: CatalogCacheSnapshot = {
|
||||
catalog: { hits: 0, misses: 0 },
|
||||
static: { hits: 0, misses: 0 },
|
||||
volatile: { hits: 0, misses: 0 },
|
||||
eligibility: { hits: 0, misses: 0 },
|
||||
invalidations: 0,
|
||||
};
|
||||
|
||||
constructor(private readonly cache: CacheService) {}
|
||||
|
||||
/**
|
||||
* Get or fetch catalog data with standard 5-minute TTL
|
||||
* Get or fetch catalog data with standard 60-minute TTL
|
||||
*/
|
||||
async getCachedCatalog<T>(key: string, fetchFn: () => Promise<T>): Promise<T> {
|
||||
return this.cache.getOrSet(key, fetchFn, this.CATALOG_TTL);
|
||||
return this.getOrSet("catalog", key, this.CATALOG_TTL, fetchFn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or fetch static catalog data with 15-minute TTL
|
||||
*/
|
||||
async getCachedStatic<T>(key: string, fetchFn: () => Promise<T>): Promise<T> {
|
||||
return this.cache.getOrSet(key, fetchFn, this.STATIC_TTL);
|
||||
return this.getOrSet("static", key, this.STATIC_TTL, fetchFn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or fetch volatile catalog data with 1-minute TTL
|
||||
*/
|
||||
async getCachedVolatile<T>(key: string, fetchFn: () => Promise<T>): Promise<T> {
|
||||
return this.cache.getOrSet(key, fetchFn, this.VOLATILE_TTL);
|
||||
return this.getOrSet("volatile", key, this.VOLATILE_TTL, fetchFn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or fetch eligibility data with 15-minute TTL
|
||||
*/
|
||||
async getCachedEligibility<T>(key: string, fetchFn: () => Promise<T>): Promise<T> {
|
||||
return this.getOrSet("eligibility", key, this.ELIGIBILITY_TTL, fetchFn, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -48,10 +79,15 @@ export class CatalogCacheService {
|
||||
return `catalog:${catalogType}:${parts.join(":")}`;
|
||||
}
|
||||
|
||||
buildEligibilityKey(catalogType: string, accountId: string): string {
|
||||
return `catalog:${catalogType}:eligibility:${accountId}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate catalog cache by pattern
|
||||
*/
|
||||
async invalidateCatalog(catalogType: string): Promise<void> {
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.delPattern(`catalog:${catalogType}:*`);
|
||||
}
|
||||
|
||||
@ -59,6 +95,62 @@ export class CatalogCacheService {
|
||||
* Invalidate all catalog cache
|
||||
*/
|
||||
async invalidateAllCatalogs(): Promise<void> {
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.delPattern("catalog:*");
|
||||
}
|
||||
|
||||
getMetrics(): CatalogCacheSnapshot {
|
||||
return {
|
||||
catalog: { ...this.metrics.catalog },
|
||||
static: { ...this.metrics.static },
|
||||
volatile: { ...this.metrics.volatile },
|
||||
eligibility: { ...this.metrics.eligibility },
|
||||
invalidations: this.metrics.invalidations,
|
||||
};
|
||||
}
|
||||
|
||||
private async getOrSet<T>(
|
||||
bucket: "catalog" | "static" | "volatile" | "eligibility",
|
||||
key: string,
|
||||
ttlSeconds: number,
|
||||
fetchFn: () => Promise<T>,
|
||||
allowNull = false
|
||||
): Promise<T> {
|
||||
const cached = await this.cache.get<unknown>(key);
|
||||
const unwrapped = this.unwrapCachedValue<T>(cached);
|
||||
|
||||
if (unwrapped.hit) {
|
||||
if (allowNull || unwrapped.value !== null) {
|
||||
this.metrics[bucket].hits++;
|
||||
return unwrapped.value as T;
|
||||
}
|
||||
}
|
||||
|
||||
this.metrics[bucket].misses++;
|
||||
const fresh = await fetchFn();
|
||||
const valueToStore = allowNull ? (fresh ?? null) : fresh;
|
||||
await this.cache.set(key, this.wrapCachedValue(valueToStore), ttlSeconds);
|
||||
return fresh;
|
||||
}
|
||||
|
||||
private unwrapCachedValue<T>(cached: unknown): { hit: boolean; value: T | null } {
|
||||
if (cached === null || cached === undefined) {
|
||||
return { hit: false, value: null };
|
||||
}
|
||||
|
||||
if (
|
||||
typeof cached === "object" &&
|
||||
cached !== null &&
|
||||
Object.prototype.hasOwnProperty.call(cached, "__catalogCache")
|
||||
) {
|
||||
const wrapper = cached as { value: T | null };
|
||||
return { hit: true, value: wrapper.value ?? null };
|
||||
}
|
||||
|
||||
return { hit: true, value: (cached as T) ?? null };
|
||||
}
|
||||
|
||||
private wrapCachedValue<T>(value: T | null): { value: T | null; __catalogCache: true } {
|
||||
return { value: value ?? null, __catalogCache: true };
|
||||
}
|
||||
}
|
||||
|
||||
@ -147,15 +147,21 @@ export class InternetCatalogService extends BaseCatalogService {
|
||||
|
||||
// Get customer's eligibility from Salesforce
|
||||
const sfAccountId = assertSalesforceId(mapping.sfAccountId, "sfAccountId");
|
||||
const soql = buildAccountEligibilityQuery(sfAccountId);
|
||||
const accounts = await this.executeQuery(soql, "Customer Eligibility");
|
||||
const eligibilityKey = this.catalogCache.buildEligibilityKey("internet", sfAccountId);
|
||||
const account = await this.catalogCache.getCachedEligibility<SalesforceAccount | null>(
|
||||
eligibilityKey,
|
||||
async () => {
|
||||
const soql = buildAccountEligibilityQuery(sfAccountId);
|
||||
const accounts = await this.executeQuery(soql, "Customer Eligibility");
|
||||
return accounts.length > 0 ? (accounts[0] as unknown as SalesforceAccount) : null;
|
||||
}
|
||||
);
|
||||
|
||||
if (accounts.length === 0) {
|
||||
if (!account) {
|
||||
this.logger.warn(`No Salesforce account found for user ${userId}, returning all plans`);
|
||||
return allPlans;
|
||||
}
|
||||
|
||||
const account = accounts[0] as unknown as SalesforceAccount;
|
||||
const eligibility = account.Internet_Eligibility__c;
|
||||
|
||||
if (!eligibility) {
|
||||
|
||||
@ -25,6 +25,7 @@ import {
|
||||
import { apiSuccessResponseSchema } from "@customer-portal/domain/common";
|
||||
import { Observable } from "rxjs";
|
||||
import { OrderEventsService } from "./services/order-events.service";
|
||||
import { SalesforceReadThrottleGuard } from "@bff/integrations/salesforce/guards/salesforce-read-throttle.guard";
|
||||
|
||||
@Controller("orders")
|
||||
@UseGuards(ThrottlerGuard)
|
||||
@ -67,12 +68,14 @@ export class OrdersController {
|
||||
}
|
||||
|
||||
@Get("user")
|
||||
@UseGuards(SalesforceReadThrottleGuard)
|
||||
async getUserOrders(@Request() req: RequestWithUser) {
|
||||
return this.orderOrchestrator.getOrdersForUser(req.user.id);
|
||||
}
|
||||
|
||||
@Get(":sfOrderId")
|
||||
@UsePipes(new ZodValidationPipe(sfOrderIdParamSchema))
|
||||
@UseGuards(SalesforceReadThrottleGuard)
|
||||
async get(@Request() req: RequestWithUser, @Param() params: SfOrderIdParam) {
|
||||
return this.orderOrchestrator.getOrder(params.sfOrderId);
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import { UsersModule } from "@bff/modules/users/users.module";
|
||||
import { CoreConfigModule } from "@bff/core/config/config.module";
|
||||
import { DatabaseModule } from "@bff/core/database/database.module";
|
||||
import { CatalogModule } from "@bff/modules/catalog/catalog.module";
|
||||
import { CacheModule } from "@bff/infra/cache/cache.module";
|
||||
|
||||
// Clean modular order services
|
||||
import { OrderValidator } from "./services/order-validator.service";
|
||||
@ -17,6 +18,7 @@ import { OrderOrchestrator } from "./services/order-orchestrator.service";
|
||||
import { PaymentValidatorService } from "./services/payment-validator.service";
|
||||
import { CheckoutService } from "./services/checkout.service";
|
||||
import { OrderEventsService } from "./services/order-events.service";
|
||||
import { OrdersCacheService } from "./services/orders-cache.service";
|
||||
|
||||
// Clean modular fulfillment services
|
||||
import { OrderFulfillmentValidator } from "./services/order-fulfillment-validator.service";
|
||||
@ -35,6 +37,7 @@ import { OrderFieldConfigModule } from "./config/order-field-config.module";
|
||||
CoreConfigModule,
|
||||
DatabaseModule,
|
||||
CatalogModule,
|
||||
CacheModule,
|
||||
OrderFieldConfigModule,
|
||||
],
|
||||
controllers: [OrdersController, CheckoutController],
|
||||
@ -49,6 +52,7 @@ import { OrderFieldConfigModule } from "./config/order-field-config.module";
|
||||
OrderItemBuilder,
|
||||
OrderPricebookService,
|
||||
OrderOrchestrator,
|
||||
OrdersCacheService,
|
||||
CheckoutService,
|
||||
|
||||
// Order fulfillment services (modular)
|
||||
|
||||
@ -12,6 +12,7 @@ import { SimFulfillmentService } from "./sim-fulfillment.service";
|
||||
import { DistributedTransactionService } from "@bff/core/database/services/distributed-transaction.service";
|
||||
import { getErrorMessage } from "@bff/core/utils/error.util";
|
||||
import { OrderEventsService } from "./order-events.service";
|
||||
import { OrdersCacheService } from "./orders-cache.service";
|
||||
import {
|
||||
type OrderDetails,
|
||||
type OrderFulfillmentValidationResult,
|
||||
@ -58,7 +59,8 @@ export class OrderFulfillmentOrchestrator {
|
||||
private readonly orderFulfillmentErrorService: OrderFulfillmentErrorService,
|
||||
private readonly simFulfillmentService: SimFulfillmentService,
|
||||
private readonly distributedTransactionService: DistributedTransactionService,
|
||||
private readonly orderEvents: OrderEventsService
|
||||
private readonly orderEvents: OrderEventsService,
|
||||
private readonly ordersCache: OrdersCacheService
|
||||
) {}
|
||||
|
||||
/**
|
||||
@ -116,6 +118,7 @@ export class OrderFulfillmentOrchestrator {
|
||||
whmcsOrderId: context.validation.whmcsOrderId,
|
||||
},
|
||||
});
|
||||
await this.invalidateOrderCaches(sfOrderId, context.validation?.sfOrder?.AccountId);
|
||||
return context;
|
||||
}
|
||||
} catch (error) {
|
||||
@ -379,8 +382,10 @@ export class OrderFulfillmentOrchestrator {
|
||||
duration: fulfillmentResult.duration,
|
||||
});
|
||||
|
||||
await this.invalidateOrderCaches(sfOrderId, context.validation?.sfOrder?.AccountId);
|
||||
return context;
|
||||
} catch (error) {
|
||||
await this.invalidateOrderCaches(sfOrderId, context.validation?.sfOrder?.AccountId);
|
||||
await this.handleFulfillmentError(context, error as Error);
|
||||
this.orderEvents.publish(sfOrderId, {
|
||||
orderId: sfOrderId,
|
||||
@ -417,13 +422,32 @@ export class OrderFulfillmentOrchestrator {
|
||||
return steps;
|
||||
}
|
||||
|
||||
private extractConfigurations(value: unknown): Record<string, unknown> {
|
||||
if (value && typeof value === "object") {
|
||||
return value as Record<string, unknown>;
|
||||
private async extractConfigurations(
|
||||
rawConfigurations: unknown
|
||||
): Promise<Record<string, unknown>> {
|
||||
if (rawConfigurations && typeof rawConfigurations === "object") {
|
||||
return rawConfigurations as Record<string, unknown>;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
private async invalidateOrderCaches(orderId: string, accountId?: string | null): Promise<void> {
|
||||
const tasks: Array<Promise<unknown>> = [this.ordersCache.invalidateOrder(orderId)];
|
||||
if (accountId) {
|
||||
tasks.push(this.ordersCache.invalidateAccountOrders(accountId));
|
||||
}
|
||||
|
||||
try {
|
||||
await Promise.all(tasks);
|
||||
} catch (error) {
|
||||
this.logger.warn("Failed to invalidate order caches", {
|
||||
orderId,
|
||||
accountId: accountId ?? undefined,
|
||||
error: getErrorMessage(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle fulfillment errors and update Salesforce
|
||||
*/
|
||||
|
||||
@ -1,8 +1,6 @@
|
||||
import { Injectable, BadRequestException, NotFoundException, Inject } from "@nestjs/common";
|
||||
import { Logger } from "nestjs-pino";
|
||||
import { SalesforceConnection } from "@bff/integrations/salesforce/services/salesforce-connection.service";
|
||||
import { OrderPricebookService } from "./order-pricebook.service";
|
||||
import { PrismaService } from "@bff/infra/database/prisma.service";
|
||||
import { createOrderRequestSchema } from "@customer-portal/domain/orders";
|
||||
|
||||
/**
|
||||
@ -12,33 +10,31 @@ import { createOrderRequestSchema } from "@customer-portal/domain/orders";
|
||||
export class OrderItemBuilder {
|
||||
constructor(
|
||||
@Inject(Logger) private readonly logger: Logger,
|
||||
private readonly sf: SalesforceConnection,
|
||||
private readonly pricebookService: OrderPricebookService,
|
||||
private readonly prisma: PrismaService
|
||||
private readonly pricebookService: OrderPricebookService
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Create OrderItems directly from SKU array
|
||||
* Prepare OrderItems for Salesforce composite tree creation
|
||||
*/
|
||||
async createOrderItemsFromSKUs(
|
||||
orderId: string,
|
||||
async buildOrderItemsPayload(
|
||||
skus: string[],
|
||||
pricebookId: string
|
||||
): Promise<void> {
|
||||
): Promise<OrderItemCompositePayload[]> {
|
||||
const { skus: validatedSkus } = buildItemsSchema.parse({ skus });
|
||||
if (pricebookId.length === 0) {
|
||||
throw new BadRequestException("Product SKU is required");
|
||||
}
|
||||
|
||||
this.logger.log({ orderId, skus }, "Creating OrderItems from SKU array");
|
||||
this.logger.log({ skus }, "Preparing OrderItems payload from SKU array");
|
||||
|
||||
const metaMap = await this.pricebookService.fetchProductMeta(pricebookId, skus);
|
||||
|
||||
// Create OrderItems for each SKU
|
||||
const payload: OrderItemCompositePayload[] = [];
|
||||
|
||||
for (const sku of validatedSkus) {
|
||||
const normalizedSkuValue = sku?.trim();
|
||||
if (!normalizedSkuValue) {
|
||||
this.logger.error({ orderId }, "Encountered empty SKU while creating order items");
|
||||
this.logger.error("Encountered empty SKU while preparing order items payload");
|
||||
throw new BadRequestException("Product SKU is required");
|
||||
}
|
||||
|
||||
@ -58,33 +54,20 @@ export class OrderItemBuilder {
|
||||
throw new Error(`PricebookEntry for SKU ${normalizedSkuValue} has no UnitPrice set`);
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
{
|
||||
sku: normalizedSkuValue,
|
||||
pbeId: meta.pricebookEntryId,
|
||||
unitPrice: meta.unitPrice,
|
||||
},
|
||||
"Creating OrderItem"
|
||||
);
|
||||
|
||||
try {
|
||||
// Salesforce requires explicit UnitPrice even with PricebookEntryId
|
||||
await this.sf.sobject("OrderItem").create({
|
||||
OrderId: orderId,
|
||||
PricebookEntryId: meta.pricebookEntryId,
|
||||
Quantity: 1,
|
||||
UnitPrice: meta.unitPrice,
|
||||
});
|
||||
|
||||
this.logger.log({ orderId, sku: normalizedSkuValue }, "OrderItem created successfully");
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
{ error, orderId, sku: normalizedSkuValue },
|
||||
"Failed to create OrderItem"
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
payload.push({
|
||||
sku: normalizedSkuValue,
|
||||
pricebookEntryId: meta.pricebookEntryId,
|
||||
quantity: 1,
|
||||
unitPrice: meta.unitPrice,
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
{ itemCount: payload.length, skus: payload.map(item => item.sku) },
|
||||
"Prepared OrderItems payload"
|
||||
);
|
||||
|
||||
return payload;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -130,4 +113,11 @@ export class OrderItemBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
export interface OrderItemCompositePayload {
|
||||
sku: string;
|
||||
pricebookEntryId: string;
|
||||
quantity: number;
|
||||
unitPrice: number;
|
||||
}
|
||||
|
||||
const buildItemsSchema = createOrderRequestSchema.pick({ skus: true });
|
||||
|
||||
@ -3,7 +3,8 @@ import { Logger } from "nestjs-pino";
|
||||
import { SalesforceOrderService } from "@bff/integrations/salesforce/services/salesforce-order.service";
|
||||
import { OrderValidator } from "./order-validator.service";
|
||||
import { OrderBuilder } from "./order-builder.service";
|
||||
import { OrderItemBuilder } from "./order-item-builder.service";
|
||||
import { OrderItemBuilder, OrderItemCompositePayload } from "./order-item-builder.service";
|
||||
import { OrdersCacheService } from "./orders-cache.service";
|
||||
import { type OrderDetails, type OrderSummary } from "@customer-portal/domain/orders";
|
||||
import { assertSalesforceId } from "@bff/integrations/salesforce/utils/soql.util";
|
||||
|
||||
@ -21,7 +22,8 @@ export class OrderOrchestrator {
|
||||
private readonly salesforceOrderService: SalesforceOrderService,
|
||||
private readonly orderValidator: OrderValidator,
|
||||
private readonly orderBuilder: OrderBuilder,
|
||||
private readonly orderItemBuilder: OrderItemBuilder
|
||||
private readonly orderItemBuilder: OrderItemBuilder,
|
||||
private readonly ordersCache: OrdersCacheService
|
||||
) {}
|
||||
|
||||
/**
|
||||
@ -51,20 +53,24 @@ export class OrderOrchestrator {
|
||||
validatedBody.userId
|
||||
);
|
||||
|
||||
// 3) Create Order in Salesforce via integration service
|
||||
const created = await this.salesforceOrderService.createOrder(orderFields);
|
||||
const orderItemsPayload: OrderItemCompositePayload[] =
|
||||
await this.orderItemBuilder.buildOrderItemsPayload(validatedBody.skus, pricebookId);
|
||||
|
||||
// 4) Create OrderItems from SKUs
|
||||
await this.orderItemBuilder.createOrderItemsFromSKUs(
|
||||
created.id,
|
||||
validatedBody.skus,
|
||||
pricebookId
|
||||
const created = await this.salesforceOrderService.createOrderWithItems(
|
||||
orderFields,
|
||||
orderItemsPayload
|
||||
);
|
||||
|
||||
if (userMapping.sfAccountId) {
|
||||
await this.ordersCache.invalidateAccountOrders(userMapping.sfAccountId);
|
||||
}
|
||||
await this.ordersCache.invalidateOrder(created.id);
|
||||
|
||||
this.logger.log(
|
||||
{
|
||||
orderId: created.id,
|
||||
skuCount: validatedBody.skus.length,
|
||||
orderItemCount: orderItemsPayload.length,
|
||||
},
|
||||
"Order creation workflow completed successfully"
|
||||
);
|
||||
@ -84,7 +90,9 @@ export class OrderOrchestrator {
|
||||
this.logger.log({ orderId: safeOrderId }, "Fetching order details");
|
||||
|
||||
// Use integration service - it handles queries and transformations
|
||||
return this.salesforceOrderService.getOrderById(safeOrderId);
|
||||
return this.ordersCache.getOrderDetails(safeOrderId, () =>
|
||||
this.salesforceOrderService.getOrderById(safeOrderId)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -105,6 +113,8 @@ export class OrderOrchestrator {
|
||||
}
|
||||
|
||||
// Use integration service - it handles queries and transformations
|
||||
return this.salesforceOrderService.getOrdersForAccount(sfAccountId);
|
||||
return this.ordersCache.getOrderSummaries(sfAccountId, () =>
|
||||
this.salesforceOrderService.getOrdersForAccount(sfAccountId)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,7 +39,9 @@ export class OrderPricebookService {
|
||||
const soql = `SELECT Id, Name FROM Pricebook2 WHERE IsActive = true AND Name LIKE '%${sanitizeSoqlLiteral(name)}%' LIMIT 1`;
|
||||
|
||||
try {
|
||||
const result = (await this.sf.query(soql)) as SalesforceResponse<{ Id?: string }>;
|
||||
const result = (await this.sf.query(soql, {
|
||||
label: "orders:pricebook:findPortal",
|
||||
})) as SalesforceResponse<{ Id?: string }>;
|
||||
if (result.records?.length) {
|
||||
const resolved = result.records[0]?.Id;
|
||||
if (resolved) {
|
||||
@ -48,7 +50,8 @@ export class OrderPricebookService {
|
||||
}
|
||||
|
||||
const std = (await this.sf.query(
|
||||
"SELECT Id FROM Pricebook2 WHERE IsStandard = true AND IsActive = true LIMIT 1"
|
||||
"SELECT Id FROM Pricebook2 WHERE IsStandard = true AND IsActive = true LIMIT 1",
|
||||
{ label: "orders:pricebook:findStandard" }
|
||||
)) as SalesforceResponse<{ Id?: string }>;
|
||||
|
||||
const pricebookId = std.records?.[0]?.Id;
|
||||
@ -93,7 +96,9 @@ export class OrderPricebookService {
|
||||
`WHERE Pricebook2Id='${safePricebookId}' AND IsActive=true AND Product2.StockKeepingUnit IN ${whereIn}`;
|
||||
|
||||
try {
|
||||
const res = (await this.sf.query(soql)) as SalesforceResponse<
|
||||
const res = (await this.sf.query(soql, {
|
||||
label: "orders:pricebook:fetchProducts",
|
||||
})) as SalesforceResponse<
|
||||
SalesforcePricebookEntryRecord & { Product2?: SalesforceProduct2Record | null }
|
||||
>;
|
||||
|
||||
|
||||
130
apps/bff/src/modules/orders/services/orders-cache.service.ts
Normal file
130
apps/bff/src/modules/orders/services/orders-cache.service.ts
Normal file
@ -0,0 +1,130 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { CacheService } from "@bff/infra/cache/cache.service";
|
||||
import type { OrderDetails, OrderSummary } from "@customer-portal/domain/orders";
|
||||
|
||||
interface CacheBucketMetrics {
|
||||
hits: number;
|
||||
misses: number;
|
||||
}
|
||||
|
||||
interface OrdersCacheMetrics {
|
||||
summaries: CacheBucketMetrics;
|
||||
details: CacheBucketMetrics;
|
||||
invalidations: number;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class OrdersCacheService {
|
||||
private readonly SUMMARY_TTL_SECONDS = 120;
|
||||
private readonly DETAIL_TTL_SECONDS = 90;
|
||||
|
||||
private readonly metrics: OrdersCacheMetrics = {
|
||||
summaries: { hits: 0, misses: 0 },
|
||||
details: { hits: 0, misses: 0 },
|
||||
invalidations: 0,
|
||||
};
|
||||
|
||||
constructor(private readonly cache: CacheService) {}
|
||||
|
||||
async getOrderSummaries(
|
||||
sfAccountId: string,
|
||||
fetcher: () => Promise<OrderSummary[]>
|
||||
): Promise<OrderSummary[]> {
|
||||
const key = this.buildAccountKey(sfAccountId);
|
||||
return this.getOrSet<OrderSummary[]>(
|
||||
"summaries",
|
||||
key,
|
||||
this.SUMMARY_TTL_SECONDS,
|
||||
fetcher,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
async getOrderDetails(
|
||||
orderId: string,
|
||||
fetcher: () => Promise<OrderDetails | null>
|
||||
): Promise<OrderDetails | null> {
|
||||
const key = this.buildOrderKey(orderId);
|
||||
return this.getOrSet<OrderDetails | null>(
|
||||
"details",
|
||||
key,
|
||||
this.DETAIL_TTL_SECONDS,
|
||||
fetcher,
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
async invalidateAccountOrders(sfAccountId: string): Promise<void> {
|
||||
const key = this.buildAccountKey(sfAccountId);
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.del(key);
|
||||
}
|
||||
|
||||
async invalidateOrder(orderId: string): Promise<void> {
|
||||
const key = this.buildOrderKey(orderId);
|
||||
this.metrics.invalidations++;
|
||||
await this.cache.del(key);
|
||||
}
|
||||
|
||||
getMetrics(): OrdersCacheMetrics {
|
||||
return {
|
||||
summaries: { ...this.metrics.summaries },
|
||||
details: { ...this.metrics.details },
|
||||
invalidations: this.metrics.invalidations,
|
||||
};
|
||||
}
|
||||
|
||||
private async getOrSet<T>(
|
||||
bucket: keyof Pick<OrdersCacheMetrics, "summaries" | "details">,
|
||||
key: string,
|
||||
ttlSeconds: number,
|
||||
fetcher: () => Promise<T>,
|
||||
allowNull: boolean
|
||||
): Promise<T> {
|
||||
const cached = await this.cache.get<unknown>(key);
|
||||
const unwrapped = this.unwrapCachedValue<T>(cached);
|
||||
|
||||
if (unwrapped.hit) {
|
||||
if (allowNull || unwrapped.value !== null) {
|
||||
this.metrics[bucket].hits++;
|
||||
return unwrapped.value as T;
|
||||
}
|
||||
}
|
||||
|
||||
this.metrics[bucket].misses++;
|
||||
const fresh = await fetcher();
|
||||
const valueToStore = allowNull ? (fresh ?? null) : fresh;
|
||||
await this.cache.set(key, this.wrapCachedValue(valueToStore), ttlSeconds);
|
||||
return fresh;
|
||||
}
|
||||
|
||||
private unwrapCachedValue<T>(cached: unknown): { hit: boolean; value: T | null } {
|
||||
if (cached === null || cached === undefined) {
|
||||
return { hit: false, value: null };
|
||||
}
|
||||
|
||||
if (
|
||||
typeof cached === "object" &&
|
||||
cached !== null &&
|
||||
Object.prototype.hasOwnProperty.call(cached, "__ordersCache")
|
||||
) {
|
||||
const wrapper = cached as { value: T | null };
|
||||
return { hit: true, value: wrapper.value ?? null };
|
||||
}
|
||||
|
||||
return { hit: true, value: (cached as T) ?? null };
|
||||
}
|
||||
|
||||
private wrapCachedValue<T>(value: T | null): { value: T | null; __ordersCache: true } {
|
||||
return { value: value ?? null, __ordersCache: true };
|
||||
}
|
||||
|
||||
private buildAccountKey(sfAccountId: string): string {
|
||||
return `orders:account:${sfAccountId}`;
|
||||
}
|
||||
|
||||
private buildOrderKey(orderId: string): string {
|
||||
return `orders:detail:${orderId}`;
|
||||
}
|
||||
}
|
||||
|
||||
84
docs/salesforce-shadow-sync-plan.md
Normal file
84
docs/salesforce-shadow-sync-plan.md
Normal file
@ -0,0 +1,84 @@
|
||||
# Salesforce Shadow Data Sync Plan
|
||||
|
||||
## Objectives
|
||||
- Reduce repetitive Salesforce reads for hot catalog and eligibility data.
|
||||
- Provide resilient fallbacks when Salesforce limits are reached by serving data from Postgres shadow tables.
|
||||
- Maintain data freshness within minutes via event-driven updates, with scheduled backstops.
|
||||
|
||||
## Scope
|
||||
- **Catalog metadata**: `Product2`, `PricebookEntry`, add-on metadata (SIM/Internet/VPN).
|
||||
- **Pricing snapshots**: Unit price, currency, and active flags per SKU.
|
||||
- **Account eligibility**: `Account.Internet_Eligibility__c` and related readiness fields used by personalized catalogs.
|
||||
|
||||
## Proposed Schema (Postgres)
|
||||
|
||||
```sql
|
||||
CREATE TABLE sf_product_shadow (
|
||||
product_id TEXT PRIMARY KEY,
|
||||
sku TEXT UNIQUE NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
item_class TEXT,
|
||||
offering_type TEXT,
|
||||
plan_tier TEXT,
|
||||
vpn_region TEXT,
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
raw_payload JSONB NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE sf_pricebook_shadow (
|
||||
pricebook_entry_id TEXT PRIMARY KEY,
|
||||
product_id TEXT NOT NULL REFERENCES sf_product_shadow(product_id) ON DELETE CASCADE,
|
||||
pricebook_id TEXT NOT NULL,
|
||||
unit_price NUMERIC(12,2) NOT NULL,
|
||||
currency_iso_code TEXT NOT NULL,
|
||||
is_active BOOLEAN NOT NULL,
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
raw_payload JSONB NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE sf_account_eligibility_shadow (
|
||||
account_id TEXT PRIMARY KEY,
|
||||
internet_eligibility TEXT,
|
||||
eligibility_source TEXT,
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
raw_payload JSONB NOT NULL
|
||||
);
|
||||
```
|
||||
|
||||
## Sync Strategy
|
||||
|
||||
| Phase | Approach | Tooling |
|
||||
| --- | --- | --- |
|
||||
| Backfill | Bulk API v2 query for each object (Product2, PricebookEntry, Account) to seed tables. | New CLI job (`pnpm nx run bff:salesforce-backfill-shadow`) |
|
||||
| Incremental updates | Subscribe to Platform Events or Change Data Capture streams for Product2, PricebookEntry, and Account. Push events onto existing SalesforceRequestQueue, enqueue to BullMQ worker that upserts into shadow tables. | Extend provisioning queue or add new `SF_SHADOW_SYNC` queue |
|
||||
| Catch-up | Nightly scheduled Bulk API delta query (using `SystemModstamp`) to reconcile missed events. | Cron worker (same Bull queue) |
|
||||
|
||||
### Upsert Flow
|
||||
1. Event payload arrives from Salesforce Pub/Sub → persisted to queue (reuse `SalesforceRequestQueueService` backoff).
|
||||
2. Worker normalizes payload (maps relationship fields, handles deletions).
|
||||
3. Performs PostgreSQL `INSERT ... ON CONFLICT` using transaction to keep product ↔ pricebook relationships consistent.
|
||||
4. Invalidate Redis keys (`catalog:*`, `eligibility:*`) via `CatalogCacheService.invalidateAllCatalogs()` or targeted invalidation when specific SKU/account changes.
|
||||
|
||||
## Integration Points
|
||||
- **Catalog services**: attempt to read from shadow tables via Prisma before falling back to Salesforce query; only hit Salesforce on cache miss _and_ shadow miss.
|
||||
- **Eligibility lookup**: `InternetCatalogService.getPlansForUser` first loads from `sf_account_eligibility_shadow`; if stale (>15 min) fallback to Salesforce + refresh row asynchronously.
|
||||
- **Order flows**: continue using live Salesforce (writes) but use shadow data for price lookups where possible.
|
||||
|
||||
## Monitoring & Alerts
|
||||
- Add Prometheus counters: `sf_shadow_sync_events_total`, `sf_shadow_sync_failures_total`.
|
||||
- Track lag metrics: `MAX(now() - updated_at)` per table.
|
||||
- Hook into existing queue health endpoint to expose shadow worker backlog.
|
||||
|
||||
## Rollout Checklist
|
||||
1. Implement schema migrations (SQL or Prisma) under feature flag.
|
||||
2. Build bulk backfill command; run in staging, verify record counts vs Salesforce SOQL.
|
||||
3. Enable event ingestion in staging, monitor for 48h, validate cache invalidation.
|
||||
4. Update catalog services to prefer shadow reads; release behind environment variable `ENABLE_SF_SHADOW_READS`.
|
||||
5. Roll to production gradually: run backfill, enable read flag, then enable event consumer.
|
||||
6. Document operational runbooks (replay events, manual backfill, clearing caches).
|
||||
|
||||
## Open Questions
|
||||
- Do we mirror additional fields (e.g., localization strings) needed for future UX changes?
|
||||
- Should eligibility sync include other readiness signals (credit status, serviceability flags)?
|
||||
- Confirm retention strategy for `raw_payload` column (e.g., prune older versions weekly).
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user