Assist_Design/apps/bff/src/infra/queue/services/salesforce-queue-degradation.service.ts

214 lines
6.4 KiB
TypeScript
Raw Normal View History

/**
* Service responsible for Salesforce queue degradation/circuit breaker logic
*/
import { Injectable, Inject } from "@nestjs/common";
import { Logger } from "nestjs-pino";
import { ConfigService } from "@nestjs/config";
import { SalesforceQueueMetricsService } from "./salesforce-queue-metrics.service.js";
export type DegradationReason = "rate-limit" | "usage-threshold" | "queue-pressure";
export interface SalesforceDegradationSnapshot {
degraded: boolean;
reason: DegradationReason | null;
cooldownExpiresAt?: Date;
usagePercent: number;
}
@Injectable()
export class SalesforceQueueDegradationService {
private readonly rateLimitCooldownMs: number;
private readonly queueSizeDegradeThreshold: number;
private readonly queuePendingDegradeThreshold: number;
private readonly averageWaitDegradeMs: number;
private readonly dailyApiLimit: number;
private readonly usageWarningLevels = [0.7, 0.85, 0.95];
private highestUsageWarningIssued = 0;
private degradeState: {
until: Date | null;
reason: DegradationReason | null;
} = { until: null, reason: null };
constructor(
@Inject(Logger) private readonly logger: Logger,
private readonly configService: ConfigService,
private readonly metrics: SalesforceQueueMetricsService
) {
this.rateLimitCooldownMs =
this.parseNumericConfig(this.configService.get("SF_RATE_LIMIT_COOLDOWN_MS")) ?? 60000;
this.queueSizeDegradeThreshold =
this.parseNumericConfig(this.configService.get("SF_QUEUE_PRESSURE_SIZE_THRESHOLD")) ?? 120;
this.queuePendingDegradeThreshold =
this.parseNumericConfig(this.configService.get("SF_QUEUE_PRESSURE_PENDING_THRESHOLD")) ?? 45;
this.averageWaitDegradeMs =
this.parseNumericConfig(this.configService.get("SF_QUEUE_PRESSURE_WAIT_MS")) ?? 1500;
this.dailyApiLimit = this.resolveDailyApiLimit();
}
/**
* Get daily API limit
*/
getDailyApiLimit(): number {
return this.dailyApiLimit;
}
/**
* Get current degradation state
*/
getDegradationState(): SalesforceDegradationSnapshot {
this.clearDegradeWindowIfElapsed();
const usagePercent =
this.dailyApiLimit > 0 ? this.metrics.getDailyApiUsage() / this.dailyApiLimit : 0;
return {
degraded: this.degradeState.until !== null,
reason: this.degradeState.reason,
cooldownExpiresAt: this.degradeState.until ?? undefined,
usagePercent,
};
}
/**
* Check if queue is in degraded state
*/
isDegraded(): boolean {
return this.getDegradationState().degraded;
}
/**
* Check if low priority requests should be throttled
*/
shouldThrottleLowPriority(): boolean {
return this.isDegraded();
}
/**
* Activate degradation window
*/
activateDegradeWindow(reason: DegradationReason): void {
if (this.rateLimitCooldownMs <= 0) {
return;
}
const now = Date.now();
const newUntil = new Date(now + this.rateLimitCooldownMs);
const active = this.degradeState.until && now < this.degradeState.until.getTime();
if (!active || this.degradeState.reason !== reason) {
this.logger.warn("Salesforce circuit breaker engaged", {
reason,
cooldownMs: this.rateLimitCooldownMs,
until: newUntil,
});
}
this.degradeState = { until: newUntil, reason };
}
/**
* Check if usage is approaching limit and trigger warnings/degradation
*/
maybeWarnOnUsage(): void {
if (this.dailyApiLimit <= 0) {
return;
}
const usagePercent = this.metrics.getDailyApiUsage() / this.dailyApiLimit;
if (usagePercent >= 0.95) {
this.activateDegradeWindow("usage-threshold");
}
const threshold = this.usageWarningLevels
.slice()
.reverse()
.find(level => usagePercent >= level && level > this.highestUsageWarningIssued);
if (threshold !== undefined) {
this.highestUsageWarningIssued = threshold;
this.logger.warn("Salesforce daily API usage approaching limit", {
usage: this.metrics.getDailyApiUsage(),
limit: this.dailyApiLimit,
usagePercent,
});
}
}
/**
* Check for queue pressure and trigger degradation if needed
*/
maybeTriggerQueuePressure(
queueSize: number,
pendingRequests: number,
averageWaitTime: number
): void {
const sizeExceeded = queueSize >= this.queueSizeDegradeThreshold;
const pendingExceeded = pendingRequests >= this.queuePendingDegradeThreshold;
const waitExceeded = averageWaitTime >= this.averageWaitDegradeMs;
if (sizeExceeded || pendingExceeded || waitExceeded) {
const now = Date.now();
const queuePressureActive =
this.degradeState.reason === "queue-pressure" &&
this.degradeState.until !== null &&
now < this.degradeState.until.getTime();
if (!queuePressureActive) {
this.logger.warn("Salesforce queue pressure detected", {
queueSize,
pending: pendingRequests,
averageWaitMs: Math.round(averageWaitTime),
sizeThreshold: this.queueSizeDegradeThreshold,
pendingThreshold: this.queuePendingDegradeThreshold,
waitThresholdMs: this.averageWaitDegradeMs,
});
}
this.activateDegradeWindow("queue-pressure");
}
}
/**
* Reset usage warning state (called on daily reset)
*/
resetUsageWarnings(): void {
this.highestUsageWarningIssued = 0;
this.degradeState = { until: null, reason: null };
}
/**
* Clear degradation window if elapsed
*/
clearDegradeWindowIfElapsed(): void {
if (this.degradeState.until && Date.now() >= this.degradeState.until.getTime()) {
this.degradeState = { until: null, reason: null };
}
}
private resolveDailyApiLimit(): number {
const configuredLimit = this.parseNumericConfig(this.configService.get("SF_DAILY_API_LIMIT"));
if (configuredLimit && configuredLimit > 0) {
return configuredLimit;
}
const userCount =
this.parseNumericConfig(this.configService.get("SF_FULL_USER_COUNT")) ||
this.parseNumericConfig(this.configService.get("SF_USERS")) ||
50;
return 100000 + userCount * 1000;
}
private parseNumericConfig(value: unknown): number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string" && value.trim().length > 0) {
const parsed = Number(value);
if (!Number.isNaN(parsed)) {
return parsed;
}
}
return undefined;
}
}