Enhance real-time event handling and configuration in BFF application

- Updated environment validation to enable Salesforce events by default, improving real-time cache invalidation.
- Modified `CatalogCdcSubscriber` to conditionally subscribe to CDC channels based on the new configuration, enhancing flexibility for high-volume scenarios.
- Improved message serialization in `RealtimeService` to ensure valid JSON for browser EventSource.
- Adjusted cache control headers in `CatalogController` to prevent browser caching, ensuring real-time data accuracy.
- Enhanced `RealtimeController` with connection limiting and improved logging for better monitoring of real-time streams.
- Updated health check endpoint in production management script for consistency.
This commit is contained in:
barsa 2025-12-15 11:55:55 +09:00
parent e1bdae08bd
commit 22c9428c77
10 changed files with 126 additions and 38 deletions

View File

@ -89,7 +89,8 @@ export const envSchema = z.object({
SF_QUEUE_TIMEOUT_MS: z.coerce.number().int().positive().default(30000),
SF_QUEUE_LONG_RUNNING_TIMEOUT_MS: z.coerce.number().int().positive().default(600000),
SF_EVENTS_ENABLED: z.enum(["true", "false"]).default("false"),
// Default ON: the portal relies on Salesforce events for real-time cache invalidation.
SF_EVENTS_ENABLED: z.enum(["true", "false"]).default("true"),
SF_CATALOG_EVENT_CHANNEL: z.string().default("/event/Product_and_Pricebook_Change__e"),
SF_ACCOUNT_EVENT_CHANNEL: z.string().default("/event/Account_Internet_Eligibility_Update__e"),
SF_EVENTS_REPLAY: z.enum(["LATEST", "ALL"]).default("LATEST"),

View File

@ -123,10 +123,9 @@ export class RealtimeService {
private buildMessage<TEvent extends string>(event: TEvent, data: unknown): MessageEvent {
return {
data: {
event,
data,
} satisfies RealtimeEventEnvelope<TEvent, unknown>,
// Always serialize explicitly so the browser EventSource receives valid JSON text.
// This avoids environments where SSE payloads may be coerced to "[object Object]".
data: JSON.stringify({ event, data } satisfies RealtimeEventEnvelope<TEvent, unknown>),
} satisfies MessageEvent;
}
}

View File

@ -46,42 +46,53 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy {
}
async onModuleInit(): Promise<void> {
// Catalog CDC subscriptions can be optionally disabled (high-volume).
// Account eligibility updates are expected to be delivered via a Platform Event
// and should always be subscribed (best UX + explicit payload).
const cdcEnabled = this.config.get("SF_EVENTS_ENABLED", "true") === "true";
const productChannel =
this.config.get<string>("SF_CATALOG_PRODUCT_CDC_CHANNEL")?.trim() ||
"/data/Product2ChangeEvent";
const pricebookChannel =
this.config.get<string>("SF_CATALOG_PRICEBOOKENTRY_CDC_CHANNEL")?.trim() ||
"/data/PricebookEntryChangeEvent";
const accountChannel = this.config.get<string>("SF_ACCOUNT_ELIGIBILITY_CHANNEL")?.trim();
// Always use Platform Event for eligibility updates.
// Default is set in env schema: /event/Account_Internet_Eligibility_Update__e
const accountChannel = this.config.get<string>("SF_ACCOUNT_EVENT_CHANNEL")!.trim();
try {
const client = await this.ensureClient();
this.productChannel = productChannel;
await client.subscribe(
productChannel,
this.handleProductEvent.bind(this, productChannel),
this.numRequested
);
this.logger.log("Subscribed to Product2 CDC channel", { productChannel });
this.pricebookChannel = pricebookChannel;
await client.subscribe(
pricebookChannel,
this.handlePricebookEvent.bind(this, pricebookChannel),
this.numRequested
);
this.logger.log("Subscribed to PricebookEntry CDC channel", { pricebookChannel });
if (accountChannel) {
this.accountChannel = accountChannel;
if (cdcEnabled) {
this.productChannel = productChannel;
await client.subscribe(
accountChannel,
this.handleAccountEvent.bind(this, accountChannel),
productChannel,
this.handleProductEvent.bind(this, productChannel),
this.numRequested
);
this.logger.log("Subscribed to account eligibility channel", { accountChannel });
this.logger.log("Subscribed to Product2 CDC channel", { productChannel });
this.pricebookChannel = pricebookChannel;
await client.subscribe(
pricebookChannel,
this.handlePricebookEvent.bind(this, pricebookChannel),
this.numRequested
);
this.logger.log("Subscribed to PricebookEntry CDC channel", { pricebookChannel });
} else {
this.logger.debug("Catalog CDC subscriptions disabled (SF_EVENTS_ENABLED=false)");
}
this.accountChannel = accountChannel;
await client.subscribe(
accountChannel,
this.handleAccountEvent.bind(this, accountChannel),
this.numRequested
);
this.logger.log("Subscribed to account eligibility platform event channel", {
accountChannel,
});
} catch (error) {
this.logger.warn("Failed to initialize catalog CDC subscriber", {
error: error instanceof Error ? error.message : String(error),
@ -257,7 +268,7 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy {
): Promise<void> {
if (!this.isDataCallback(callbackType)) return;
const payload = this.extractPayload(data);
const accountId = this.extractStringField(payload, ["AccountId__c", "AccountId"]);
const accountId = this.extractStringField(payload, ["AccountId__c", "AccountId", "Id"]);
const eligibility = this.extractStringField(payload, [
"Internet_Eligibility__c",
"InternetEligibility__c",
@ -273,8 +284,7 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy {
this.logger.log("Account eligibility event received", {
channel,
accountId,
eligibility,
accountIdTail: accountId.slice(-4),
});
await this.catalogCache.invalidateEligibility(accountId);

View File

@ -28,7 +28,7 @@ export class CatalogController {
@Get("internet/plans")
@RateLimit({ limit: 20, ttl: 60 }) // 20 requests per minute
@Header("Cache-Control", "private, max-age=300") // Personalised responses: prevent shared caching
@Header("Cache-Control", "private, no-store") // Personalised responses: avoid browser caching (realtime invalidation relies on refetch)
async getInternetPlans(@Request() req: RequestWithUser): Promise<{
plans: InternetPlanCatalogItem[];
installations: InternetInstallationCatalogItem[];
@ -63,7 +63,7 @@ export class CatalogController {
@Get("sim/plans")
@RateLimit({ limit: 20, ttl: 60 }) // 20 requests per minute
@Header("Cache-Control", "private, max-age=300") // Personalised responses: prevent shared caching
@Header("Cache-Control", "private, no-store") // Personalised responses: avoid browser caching (realtime invalidation relies on refetch)
async getSimCatalogData(@Request() req: RequestWithUser): Promise<SimCatalogCollection> {
const userId = req.user?.id;
if (!userId) {

View File

@ -0,0 +1,34 @@
import { Injectable } from "@nestjs/common";
/**
* Simple per-instance SSE connection limiter.
*
* This prevents a single user (or a runaway client) from opening an unbounded number of
* long-lived SSE connections to a single BFF instance.
*
* Note: This is intentionally in-memory (fast, low overhead). RateLimitGuard still protects
* connection attempts globally (Redis-backed) across instances.
*/
@Injectable()
export class RealtimeConnectionLimiterService {
private readonly maxPerUser = 3;
private readonly counts = new Map<string, number>();
tryAcquire(userId: string): boolean {
const current = this.counts.get(userId) ?? 0;
if (current >= this.maxPerUser) {
return false;
}
this.counts.set(userId, current + 1);
return true;
}
release(userId: string): void {
const current = this.counts.get(userId) ?? 0;
if (current <= 1) {
this.counts.delete(userId);
return;
}
this.counts.set(userId, current - 1);
}
}

View File

@ -1,16 +1,31 @@
import { Controller, Header, Request, Sse } from "@nestjs/common";
import {
Controller,
Header,
Request,
Sse,
Inject,
HttpException,
HttpStatus,
UseGuards,
} from "@nestjs/common";
import type { MessageEvent } from "@nestjs/common";
import { Observable } from "rxjs";
import { merge } from "rxjs";
import { finalize } from "rxjs/operators";
import type { RequestWithUser } from "@bff/modules/auth/auth.types.js";
import { RealtimeService } from "@bff/infra/realtime/realtime.service.js";
import { MappingsService } from "@bff/modules/id-mappings/mappings.service.js";
import { Logger } from "nestjs-pino";
import { RateLimit, RateLimitGuard } from "@bff/core/rate-limiting/index.js";
import { RealtimeConnectionLimiterService } from "./realtime-connection-limiter.service.js";
@Controller("events")
export class RealtimeController {
constructor(
private readonly realtime: RealtimeService,
private readonly mappings: MappingsService
private readonly mappings: MappingsService,
private readonly limiter: RealtimeConnectionLimiterService,
@Inject(Logger) private readonly logger: Logger
) {}
/**
@ -20,11 +35,28 @@ export class RealtimeController {
* Backed by Redis pub/sub for multi-instance delivery.
*/
@Sse()
@Header("Cache-Control", "no-cache")
@UseGuards(RateLimitGuard)
@RateLimit({ limit: 30, ttl: 60 }) // protect against reconnect storms / refresh spam
@Header("Cache-Control", "private, no-store")
@Header("X-Accel-Buffering", "no") // nginx: disable response buffering for SSE
async stream(@Request() req: RequestWithUser): Promise<Observable<MessageEvent>> {
if (!this.limiter.tryAcquire(req.user.id)) {
throw new HttpException(
"Too many concurrent realtime connections",
HttpStatus.TOO_MANY_REQUESTS
);
}
const mapping = await this.mappings.findByUserId(req.user.id);
const sfAccountId = mapping?.sfAccountId;
// Intentionally log minimal info for debugging connection issues.
this.logger.log("Account realtime stream connected", {
userId: req.user.id,
hasSfAccountId: Boolean(sfAccountId),
sfAccountIdTail: sfAccountId ? sfAccountId.slice(-4) : null,
});
const accountStream = this.realtime.subscribe(
sfAccountId ? `account:sf:${sfAccountId}` : "account:unknown",
{
@ -42,6 +74,13 @@ export class RealtimeController {
heartbeatMs: 0,
});
return merge(accountStream, globalCatalogStream);
return merge(accountStream, globalCatalogStream).pipe(
finalize(() => {
this.limiter.release(req.user.id);
this.logger.debug("Account realtime stream disconnected", {
userId: req.user.id,
});
})
);
}
}

View File

@ -1,9 +1,11 @@
import { Module } from "@nestjs/common";
import { MappingsModule } from "@bff/modules/id-mappings/mappings.module.js";
import { RealtimeController } from "./realtime.controller.js";
import { RealtimeConnectionLimiterService } from "./realtime-connection-limiter.service.js";
@Module({
imports: [MappingsModule],
controllers: [RealtimeController],
providers: [RealtimeConnectionLimiterService],
})
export class RealtimeApiModule {}

View File

@ -1,6 +1,6 @@
/// <reference types="next" />
/// <reference types="next/image-types/global" />
import "./.next/types/routes.d.ts";
import "./.next/dev/types/routes.d.ts";
// NOTE: This file should not be edited
// see https://nextjs.org/docs/app/api-reference/config/typescript for more information.

View File

@ -43,16 +43,19 @@ export function AccountEventsListener() {
if (!parsed || typeof parsed !== "object") return;
if (parsed.event === "catalog.eligibility.changed") {
logger.info("Received catalog.eligibility.changed; invalidating catalog queries");
void queryClient.invalidateQueries({ queryKey: queryKeys.catalog.all() });
return;
}
if (parsed.event === "catalog.changed") {
logger.info("Received catalog.changed; invalidating catalog queries");
void queryClient.invalidateQueries({ queryKey: queryKeys.catalog.all() });
return;
}
if (parsed.event === "orders.changed") {
logger.info("Received orders.changed; invalidating orders + dashboard queries");
void queryClient.invalidateQueries({ queryKey: queryKeys.orders.list() });
// Dashboard summary often depends on orders/subscriptions; cheap to keep in sync.
void queryClient.invalidateQueries({ queryKey: queryKeys.dashboard.summary() });

View File

@ -141,7 +141,7 @@ status() {
if docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" ps proxy >/dev/null 2>&1; then
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec proxy wget --spider -q http://localhost/healthz && echo "✅ Proxy/Frontend healthy" || echo "❌ Proxy/Frontend unhealthy"
else
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec frontend wget --spider -q http://localhost:3000/api/health && echo "✅ Frontend healthy" || echo "❌ Frontend unhealthy"
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec frontend wget --spider -q http://localhost:3000/_health && echo "✅ Frontend healthy" || echo "❌ Frontend unhealthy"
fi
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec backend wget --spider -q http://localhost:4000/health && echo "✅ Backend healthy" || echo "❌ Backend unhealthy"
}