diff --git a/apps/bff/src/core/config/env.validation.ts b/apps/bff/src/core/config/env.validation.ts index ff78dec0..bb0be8fe 100644 --- a/apps/bff/src/core/config/env.validation.ts +++ b/apps/bff/src/core/config/env.validation.ts @@ -89,7 +89,8 @@ export const envSchema = z.object({ SF_QUEUE_TIMEOUT_MS: z.coerce.number().int().positive().default(30000), SF_QUEUE_LONG_RUNNING_TIMEOUT_MS: z.coerce.number().int().positive().default(600000), - SF_EVENTS_ENABLED: z.enum(["true", "false"]).default("false"), + // Default ON: the portal relies on Salesforce events for real-time cache invalidation. + SF_EVENTS_ENABLED: z.enum(["true", "false"]).default("true"), SF_CATALOG_EVENT_CHANNEL: z.string().default("/event/Product_and_Pricebook_Change__e"), SF_ACCOUNT_EVENT_CHANNEL: z.string().default("/event/Account_Internet_Eligibility_Update__e"), SF_EVENTS_REPLAY: z.enum(["LATEST", "ALL"]).default("LATEST"), diff --git a/apps/bff/src/infra/realtime/realtime.service.ts b/apps/bff/src/infra/realtime/realtime.service.ts index 40d0e248..8a754488 100644 --- a/apps/bff/src/infra/realtime/realtime.service.ts +++ b/apps/bff/src/infra/realtime/realtime.service.ts @@ -123,10 +123,9 @@ export class RealtimeService { private buildMessage(event: TEvent, data: unknown): MessageEvent { return { - data: { - event, - data, - } satisfies RealtimeEventEnvelope, + // Always serialize explicitly so the browser EventSource receives valid JSON text. + // This avoids environments where SSE payloads may be coerced to "[object Object]". + data: JSON.stringify({ event, data } satisfies RealtimeEventEnvelope), } satisfies MessageEvent; } } diff --git a/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts b/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts index 7466fc1b..8ba18a2b 100644 --- a/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts +++ b/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts @@ -46,42 +46,53 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { } async onModuleInit(): Promise { + // Catalog CDC subscriptions can be optionally disabled (high-volume). + // Account eligibility updates are expected to be delivered via a Platform Event + // and should always be subscribed (best UX + explicit payload). + const cdcEnabled = this.config.get("SF_EVENTS_ENABLED", "true") === "true"; + const productChannel = this.config.get("SF_CATALOG_PRODUCT_CDC_CHANNEL")?.trim() || "/data/Product2ChangeEvent"; const pricebookChannel = this.config.get("SF_CATALOG_PRICEBOOKENTRY_CDC_CHANNEL")?.trim() || "/data/PricebookEntryChangeEvent"; - const accountChannel = this.config.get("SF_ACCOUNT_ELIGIBILITY_CHANNEL")?.trim(); + // Always use Platform Event for eligibility updates. + // Default is set in env schema: /event/Account_Internet_Eligibility_Update__e + const accountChannel = this.config.get("SF_ACCOUNT_EVENT_CHANNEL")!.trim(); try { const client = await this.ensureClient(); - this.productChannel = productChannel; - await client.subscribe( - productChannel, - this.handleProductEvent.bind(this, productChannel), - this.numRequested - ); - this.logger.log("Subscribed to Product2 CDC channel", { productChannel }); - - this.pricebookChannel = pricebookChannel; - await client.subscribe( - pricebookChannel, - this.handlePricebookEvent.bind(this, pricebookChannel), - this.numRequested - ); - this.logger.log("Subscribed to PricebookEntry CDC channel", { pricebookChannel }); - - if (accountChannel) { - this.accountChannel = accountChannel; + if (cdcEnabled) { + this.productChannel = productChannel; await client.subscribe( - accountChannel, - this.handleAccountEvent.bind(this, accountChannel), + productChannel, + this.handleProductEvent.bind(this, productChannel), this.numRequested ); - this.logger.log("Subscribed to account eligibility channel", { accountChannel }); + this.logger.log("Subscribed to Product2 CDC channel", { productChannel }); + + this.pricebookChannel = pricebookChannel; + await client.subscribe( + pricebookChannel, + this.handlePricebookEvent.bind(this, pricebookChannel), + this.numRequested + ); + this.logger.log("Subscribed to PricebookEntry CDC channel", { pricebookChannel }); + } else { + this.logger.debug("Catalog CDC subscriptions disabled (SF_EVENTS_ENABLED=false)"); } + + this.accountChannel = accountChannel; + await client.subscribe( + accountChannel, + this.handleAccountEvent.bind(this, accountChannel), + this.numRequested + ); + this.logger.log("Subscribed to account eligibility platform event channel", { + accountChannel, + }); } catch (error) { this.logger.warn("Failed to initialize catalog CDC subscriber", { error: error instanceof Error ? error.message : String(error), @@ -257,7 +268,7 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { ): Promise { if (!this.isDataCallback(callbackType)) return; const payload = this.extractPayload(data); - const accountId = this.extractStringField(payload, ["AccountId__c", "AccountId"]); + const accountId = this.extractStringField(payload, ["AccountId__c", "AccountId", "Id"]); const eligibility = this.extractStringField(payload, [ "Internet_Eligibility__c", "InternetEligibility__c", @@ -273,8 +284,7 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { this.logger.log("Account eligibility event received", { channel, - accountId, - eligibility, + accountIdTail: accountId.slice(-4), }); await this.catalogCache.invalidateEligibility(accountId); diff --git a/apps/bff/src/modules/catalog/catalog.controller.ts b/apps/bff/src/modules/catalog/catalog.controller.ts index 667b26b0..3b85bde1 100644 --- a/apps/bff/src/modules/catalog/catalog.controller.ts +++ b/apps/bff/src/modules/catalog/catalog.controller.ts @@ -28,7 +28,7 @@ export class CatalogController { @Get("internet/plans") @RateLimit({ limit: 20, ttl: 60 }) // 20 requests per minute - @Header("Cache-Control", "private, max-age=300") // Personalised responses: prevent shared caching + @Header("Cache-Control", "private, no-store") // Personalised responses: avoid browser caching (realtime invalidation relies on refetch) async getInternetPlans(@Request() req: RequestWithUser): Promise<{ plans: InternetPlanCatalogItem[]; installations: InternetInstallationCatalogItem[]; @@ -63,7 +63,7 @@ export class CatalogController { @Get("sim/plans") @RateLimit({ limit: 20, ttl: 60 }) // 20 requests per minute - @Header("Cache-Control", "private, max-age=300") // Personalised responses: prevent shared caching + @Header("Cache-Control", "private, no-store") // Personalised responses: avoid browser caching (realtime invalidation relies on refetch) async getSimCatalogData(@Request() req: RequestWithUser): Promise { const userId = req.user?.id; if (!userId) { diff --git a/apps/bff/src/modules/realtime/realtime-connection-limiter.service.ts b/apps/bff/src/modules/realtime/realtime-connection-limiter.service.ts new file mode 100644 index 00000000..869877f4 --- /dev/null +++ b/apps/bff/src/modules/realtime/realtime-connection-limiter.service.ts @@ -0,0 +1,34 @@ +import { Injectable } from "@nestjs/common"; + +/** + * Simple per-instance SSE connection limiter. + * + * This prevents a single user (or a runaway client) from opening an unbounded number of + * long-lived SSE connections to a single BFF instance. + * + * Note: This is intentionally in-memory (fast, low overhead). RateLimitGuard still protects + * connection attempts globally (Redis-backed) across instances. + */ +@Injectable() +export class RealtimeConnectionLimiterService { + private readonly maxPerUser = 3; + private readonly counts = new Map(); + + tryAcquire(userId: string): boolean { + const current = this.counts.get(userId) ?? 0; + if (current >= this.maxPerUser) { + return false; + } + this.counts.set(userId, current + 1); + return true; + } + + release(userId: string): void { + const current = this.counts.get(userId) ?? 0; + if (current <= 1) { + this.counts.delete(userId); + return; + } + this.counts.set(userId, current - 1); + } +} diff --git a/apps/bff/src/modules/realtime/realtime.controller.ts b/apps/bff/src/modules/realtime/realtime.controller.ts index 736abc17..9c71a826 100644 --- a/apps/bff/src/modules/realtime/realtime.controller.ts +++ b/apps/bff/src/modules/realtime/realtime.controller.ts @@ -1,16 +1,31 @@ -import { Controller, Header, Request, Sse } from "@nestjs/common"; +import { + Controller, + Header, + Request, + Sse, + Inject, + HttpException, + HttpStatus, + UseGuards, +} from "@nestjs/common"; import type { MessageEvent } from "@nestjs/common"; import { Observable } from "rxjs"; import { merge } from "rxjs"; +import { finalize } from "rxjs/operators"; import type { RequestWithUser } from "@bff/modules/auth/auth.types.js"; import { RealtimeService } from "@bff/infra/realtime/realtime.service.js"; import { MappingsService } from "@bff/modules/id-mappings/mappings.service.js"; +import { Logger } from "nestjs-pino"; +import { RateLimit, RateLimitGuard } from "@bff/core/rate-limiting/index.js"; +import { RealtimeConnectionLimiterService } from "./realtime-connection-limiter.service.js"; @Controller("events") export class RealtimeController { constructor( private readonly realtime: RealtimeService, - private readonly mappings: MappingsService + private readonly mappings: MappingsService, + private readonly limiter: RealtimeConnectionLimiterService, + @Inject(Logger) private readonly logger: Logger ) {} /** @@ -20,11 +35,28 @@ export class RealtimeController { * Backed by Redis pub/sub for multi-instance delivery. */ @Sse() - @Header("Cache-Control", "no-cache") + @UseGuards(RateLimitGuard) + @RateLimit({ limit: 30, ttl: 60 }) // protect against reconnect storms / refresh spam + @Header("Cache-Control", "private, no-store") + @Header("X-Accel-Buffering", "no") // nginx: disable response buffering for SSE async stream(@Request() req: RequestWithUser): Promise> { + if (!this.limiter.tryAcquire(req.user.id)) { + throw new HttpException( + "Too many concurrent realtime connections", + HttpStatus.TOO_MANY_REQUESTS + ); + } + const mapping = await this.mappings.findByUserId(req.user.id); const sfAccountId = mapping?.sfAccountId; + // Intentionally log minimal info for debugging connection issues. + this.logger.log("Account realtime stream connected", { + userId: req.user.id, + hasSfAccountId: Boolean(sfAccountId), + sfAccountIdTail: sfAccountId ? sfAccountId.slice(-4) : null, + }); + const accountStream = this.realtime.subscribe( sfAccountId ? `account:sf:${sfAccountId}` : "account:unknown", { @@ -42,6 +74,13 @@ export class RealtimeController { heartbeatMs: 0, }); - return merge(accountStream, globalCatalogStream); + return merge(accountStream, globalCatalogStream).pipe( + finalize(() => { + this.limiter.release(req.user.id); + this.logger.debug("Account realtime stream disconnected", { + userId: req.user.id, + }); + }) + ); } } diff --git a/apps/bff/src/modules/realtime/realtime.module.ts b/apps/bff/src/modules/realtime/realtime.module.ts index ebe451da..192ae894 100644 --- a/apps/bff/src/modules/realtime/realtime.module.ts +++ b/apps/bff/src/modules/realtime/realtime.module.ts @@ -1,9 +1,11 @@ import { Module } from "@nestjs/common"; import { MappingsModule } from "@bff/modules/id-mappings/mappings.module.js"; import { RealtimeController } from "./realtime.controller.js"; +import { RealtimeConnectionLimiterService } from "./realtime-connection-limiter.service.js"; @Module({ imports: [MappingsModule], controllers: [RealtimeController], + providers: [RealtimeConnectionLimiterService], }) export class RealtimeApiModule {} diff --git a/apps/portal/next-env.d.ts b/apps/portal/next-env.d.ts index 9edff1c7..c4b7818f 100644 --- a/apps/portal/next-env.d.ts +++ b/apps/portal/next-env.d.ts @@ -1,6 +1,6 @@ /// /// -import "./.next/types/routes.d.ts"; +import "./.next/dev/types/routes.d.ts"; // NOTE: This file should not be edited // see https://nextjs.org/docs/app/api-reference/config/typescript for more information. diff --git a/apps/portal/src/features/realtime/components/AccountEventsListener.tsx b/apps/portal/src/features/realtime/components/AccountEventsListener.tsx index 26811b42..570f2ea6 100644 --- a/apps/portal/src/features/realtime/components/AccountEventsListener.tsx +++ b/apps/portal/src/features/realtime/components/AccountEventsListener.tsx @@ -43,16 +43,19 @@ export function AccountEventsListener() { if (!parsed || typeof parsed !== "object") return; if (parsed.event === "catalog.eligibility.changed") { + logger.info("Received catalog.eligibility.changed; invalidating catalog queries"); void queryClient.invalidateQueries({ queryKey: queryKeys.catalog.all() }); return; } if (parsed.event === "catalog.changed") { + logger.info("Received catalog.changed; invalidating catalog queries"); void queryClient.invalidateQueries({ queryKey: queryKeys.catalog.all() }); return; } if (parsed.event === "orders.changed") { + logger.info("Received orders.changed; invalidating orders + dashboard queries"); void queryClient.invalidateQueries({ queryKey: queryKeys.orders.list() }); // Dashboard summary often depends on orders/subscriptions; cheap to keep in sync. void queryClient.invalidateQueries({ queryKey: queryKeys.dashboard.summary() }); diff --git a/scripts/prod/manage.sh b/scripts/prod/manage.sh index 8a39b8fa..71cc6425 100755 --- a/scripts/prod/manage.sh +++ b/scripts/prod/manage.sh @@ -141,7 +141,7 @@ status() { if docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" ps proxy >/dev/null 2>&1; then docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec proxy wget --spider -q http://localhost/healthz && echo "✅ Proxy/Frontend healthy" || echo "❌ Proxy/Frontend unhealthy" else - docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec frontend wget --spider -q http://localhost:3000/api/health && echo "✅ Frontend healthy" || echo "❌ Frontend unhealthy" + docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec frontend wget --spider -q http://localhost:3000/_health && echo "✅ Frontend healthy" || echo "❌ Frontend unhealthy" fi docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" -p "$PROJECT_NAME" exec backend wget --spider -q http://localhost:4000/health && echo "✅ Backend healthy" || echo "❌ Backend unhealthy" }