From e1c8b6c15e16fa2c40d2204e27790aabc48cd232 Mon Sep 17 00:00:00 2001 From: barsa Date: Mon, 15 Dec 2025 11:10:50 +0900 Subject: [PATCH] Integrate Realtime module and enhance event handling across various services - Added `RealtimeModule` and `RealtimeApiModule` to the BFF application for improved real-time capabilities. - Updated `CatalogCdcSubscriber` and `OrderCdcSubscriber` to utilize `RealtimeService` for publishing catalog and order updates, ensuring instant notifications across connected clients. - Enhanced `OrderEventsService` to leverage `RealtimeService` for order event subscriptions, improving reliability across multiple BFF instances. - Introduced `AccountEventsListener` in the portal layout to handle real-time account updates. - Removed stale time and garbage collection settings from several hooks to streamline data fetching processes. --- apps/bff/src/app.module.ts | 4 + apps/bff/src/core/config/router.config.ts | 2 + .../bff/src/infra/realtime/realtime.module.ts | 10 ++ .../bff/src/infra/realtime/realtime.pubsub.ts | 88 ++++++++++++ .../src/infra/realtime/realtime.service.ts | 132 ++++++++++++++++++ apps/bff/src/infra/realtime/realtime.types.ts | 35 +++++ .../events/catalog-cdc.subscriber.ts | 29 ++++ .../salesforce/events/order-cdc.subscriber.ts | 24 ++++ .../src/modules/orders/orders.controller.ts | 13 +- .../orders/services/order-events.service.ts | 96 +++---------- .../modules/realtime/realtime.controller.ts | 47 +++++++ .../src/modules/realtime/realtime.module.ts | 9 ++ .../portal/src/app/(authenticated)/layout.tsx | 8 +- .../src/features/catalog/hooks/useCatalog.ts | 6 - .../dashboard/hooks/useDashboardSummary.ts | 2 - .../features/orders/hooks/useOrdersList.ts | 5 - .../components/AccountEventsListener.tsx | 97 +++++++++++++ .../subscriptions/hooks/useSubscriptions.ts | 10 -- .../features/support/hooks/useSupportCase.ts | 2 - .../features/support/hooks/useSupportCases.ts | 1 - apps/portal/src/lib/hooks/useCurrency.ts | 2 - packages/domain/index.ts | 1 + packages/domain/package.json | 8 ++ packages/domain/realtime/events.ts | 21 +++ packages/domain/realtime/index.ts | 1 + packages/domain/tsconfig.json | 1 + 26 files changed, 545 insertions(+), 109 deletions(-) create mode 100644 apps/bff/src/infra/realtime/realtime.module.ts create mode 100644 apps/bff/src/infra/realtime/realtime.pubsub.ts create mode 100644 apps/bff/src/infra/realtime/realtime.service.ts create mode 100644 apps/bff/src/infra/realtime/realtime.types.ts create mode 100644 apps/bff/src/modules/realtime/realtime.controller.ts create mode 100644 apps/bff/src/modules/realtime/realtime.module.ts create mode 100644 apps/portal/src/features/realtime/components/AccountEventsListener.tsx create mode 100644 packages/domain/realtime/events.ts create mode 100644 packages/domain/realtime/index.ts diff --git a/apps/bff/src/app.module.ts b/apps/bff/src/app.module.ts index 742c7030..15e46b8c 100644 --- a/apps/bff/src/app.module.ts +++ b/apps/bff/src/app.module.ts @@ -15,6 +15,7 @@ import { RateLimitModule } from "@bff/core/rate-limiting/index.js"; import { PrismaModule } from "@bff/infra/database/prisma.module.js"; import { RedisModule } from "@bff/infra/redis/redis.module.js"; import { CacheModule } from "@bff/infra/cache/cache.module.js"; +import { RealtimeModule } from "@bff/infra/realtime/realtime.module.js"; import { QueueModule } from "@bff/infra/queue/queue.module.js"; import { AuditModule } from "@bff/infra/audit/audit.module.js"; import { EmailModule } from "@bff/infra/email/email.module.js"; @@ -33,6 +34,7 @@ import { InvoicesModule } from "@bff/modules/invoices/invoices.module.js"; import { SubscriptionsModule } from "@bff/modules/subscriptions/subscriptions.module.js"; import { CurrencyModule } from "@bff/modules/currency/currency.module.js"; import { SupportModule } from "@bff/modules/support/support.module.js"; +import { RealtimeApiModule } from "@bff/modules/realtime/realtime.module.js"; // System Modules import { HealthModule } from "@bff/modules/health/health.module.js"; @@ -63,6 +65,7 @@ import { HealthModule } from "@bff/modules/health/health.module.js"; PrismaModule, RedisModule, CacheModule, + RealtimeModule, QueueModule, AuditModule, EmailModule, @@ -81,6 +84,7 @@ import { HealthModule } from "@bff/modules/health/health.module.js"; SubscriptionsModule, CurrencyModule, SupportModule, + RealtimeApiModule, // === SYSTEM MODULES === HealthModule, diff --git a/apps/bff/src/core/config/router.config.ts b/apps/bff/src/core/config/router.config.ts index f9d41868..f82d32e0 100644 --- a/apps/bff/src/core/config/router.config.ts +++ b/apps/bff/src/core/config/router.config.ts @@ -9,6 +9,7 @@ import { SubscriptionsModule } from "@bff/modules/subscriptions/subscriptions.mo import { CurrencyModule } from "@bff/modules/currency/currency.module.js"; import { SecurityModule } from "@bff/core/security/security.module.js"; import { SupportModule } from "@bff/modules/support/support.module.js"; +import { RealtimeApiModule } from "@bff/modules/realtime/realtime.module.js"; export const apiRoutes: Routes = [ { @@ -24,6 +25,7 @@ export const apiRoutes: Routes = [ { path: "", module: CurrencyModule }, { path: "", module: SupportModule }, { path: "", module: SecurityModule }, + { path: "", module: RealtimeApiModule }, ], }, ]; diff --git a/apps/bff/src/infra/realtime/realtime.module.ts b/apps/bff/src/infra/realtime/realtime.module.ts new file mode 100644 index 00000000..b779d83d --- /dev/null +++ b/apps/bff/src/infra/realtime/realtime.module.ts @@ -0,0 +1,10 @@ +import { Global, Module } from "@nestjs/common"; +import { RealtimePubSubService } from "./realtime.pubsub.js"; +import { RealtimeService } from "./realtime.service.js"; + +@Global() +@Module({ + providers: [RealtimePubSubService, RealtimeService], + exports: [RealtimeService], +}) +export class RealtimeModule {} diff --git a/apps/bff/src/infra/realtime/realtime.pubsub.ts b/apps/bff/src/infra/realtime/realtime.pubsub.ts new file mode 100644 index 00000000..6476e326 --- /dev/null +++ b/apps/bff/src/infra/realtime/realtime.pubsub.ts @@ -0,0 +1,88 @@ +import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit } from "@nestjs/common"; +import { Redis } from "ioredis"; +import { getErrorMessage } from "@bff/core/utils/error.util.js"; +import type { RealtimePubSubMessage } from "./realtime.types.js"; + +type Handler = (message: RealtimePubSubMessage) => void; + +/** + * Redis Pub/Sub wrapper for realtime events. + * + * - Uses a dedicated subscriber connection (required by Redis pub/sub semantics) + * - Publishes JSON messages to a single channel + * - Dispatches messages to in-process handlers + */ +@Injectable() +export class RealtimePubSubService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(RealtimePubSubService.name); + private readonly CHANNEL = "realtime:events"; + + private subscriber: Redis | null = null; + private handlers = new Set(); + + constructor(@Inject("REDIS_CLIENT") private readonly redis: Redis) {} + + async onModuleInit(): Promise { + // Create a dedicated connection for subscriptions + this.subscriber = this.redis.duplicate(); + + this.subscriber.on("error", err => { + this.logger.warn("Realtime Redis subscriber error", { error: getErrorMessage(err) }); + }); + + this.subscriber.on("end", () => { + this.logger.warn("Realtime Redis subscriber connection ended"); + }); + + await this.subscriber.subscribe(this.CHANNEL); + + this.subscriber.on("message", (_channel, raw) => { + const msg = this.safeParse(raw); + if (!msg) return; + for (const handler of this.handlers) { + try { + handler(msg); + } catch (error) { + this.logger.warn("Realtime handler threw", { error: getErrorMessage(error) }); + } + } + }); + + this.logger.log("Realtime Pub/Sub initialized", { channel: this.CHANNEL }); + } + + async onModuleDestroy(): Promise { + if (!this.subscriber) return; + try { + await this.subscriber.unsubscribe(this.CHANNEL); + await this.subscriber.quit(); + } catch { + this.subscriber.disconnect(); + } finally { + this.subscriber = null; + this.handlers.clear(); + } + } + + publish(message: RealtimePubSubMessage): Promise { + return this.redis.publish(this.CHANNEL, JSON.stringify(message)); + } + + addHandler(handler: Handler): () => void { + this.handlers.add(handler); + return () => { + this.handlers.delete(handler); + }; + } + + private safeParse(raw: string): RealtimePubSubMessage | null { + try { + const parsed = JSON.parse(raw) as RealtimePubSubMessage; + if (!parsed || typeof parsed !== "object") return null; + if (typeof parsed.topic !== "string" || typeof parsed.event !== "string") return null; + return parsed; + } catch { + return null; + } + } +} diff --git a/apps/bff/src/infra/realtime/realtime.service.ts b/apps/bff/src/infra/realtime/realtime.service.ts new file mode 100644 index 00000000..40d0e248 --- /dev/null +++ b/apps/bff/src/infra/realtime/realtime.service.ts @@ -0,0 +1,132 @@ +import { Injectable, Logger } from "@nestjs/common"; +import type { MessageEvent } from "@nestjs/common"; +import { Observable } from "rxjs"; +import { getErrorMessage } from "@bff/core/utils/error.util.js"; +import type { + RealtimeEventEnvelope, + RealtimePubSubMessage, + RealtimeStreamOptions, +} from "./realtime.types.js"; +import { RealtimePubSubService } from "./realtime.pubsub.js"; + +interface InternalObserver { + next: (event: MessageEvent) => void; + complete: () => void; + error: (error: unknown) => void; +} + +/** + * Production-ready realtime event hub. + * + * - Subscriptions are in-memory per instance + * - Publishes and receives via Redis Pub/Sub for multi-instance delivery + * - Provides consistent "ready" + "heartbeat" conventions + */ +@Injectable() +export class RealtimeService { + private readonly logger = new Logger(RealtimeService.name); + private readonly observersByTopic = new Map>(); + + constructor(private readonly pubsub: RealtimePubSubService) { + // Fan-in all Redis events and deliver to local subscribers + this.pubsub.addHandler(msg => this.deliver(msg)); + } + + subscribe(topic: string, options: RealtimeStreamOptions = {}): Observable { + const heartbeatMs = options.heartbeatMs ?? 30000; + const readyEvent = options.readyEvent === undefined ? "stream.ready" : options.readyEvent; + const heartbeatEvent = + options.heartbeatEvent === undefined ? "stream.heartbeat" : options.heartbeatEvent; + + return new Observable(subscriber => { + const wrappedObserver: InternalObserver = { + next: value => subscriber.next(value), + complete: () => subscriber.complete(), + error: error => subscriber.error(error), + }; + + const set = this.observersByTopic.get(topic) ?? new Set(); + set.add(wrappedObserver); + this.observersByTopic.set(topic, set); + + this.logger.debug("Realtime stream connected", { topic, listeners: set.size }); + + if (readyEvent) { + wrappedObserver.next( + this.buildMessage(readyEvent, { + timestamp: new Date().toISOString(), + }) + ); + } + + const heartbeat = + heartbeatMs > 0 && Boolean(heartbeatEvent) + ? setInterval(() => { + wrappedObserver.next( + this.buildMessage(heartbeatEvent as string, { + timestamp: new Date().toISOString(), + }) + ); + }, heartbeatMs) + : null; + + return () => { + if (heartbeat) { + clearInterval(heartbeat); + } + + const current = this.observersByTopic.get(topic); + if (current) { + current.delete(wrappedObserver); + if (current.size === 0) { + this.observersByTopic.delete(topic); + } + } + + this.logger.debug("Realtime stream disconnected", { + topic, + listeners: current?.size ?? 0, + }); + }; + }); + } + + publish(topic: string, event: TEvent, data: TData): void { + const message: RealtimePubSubMessage = { topic, event, data }; + void this.pubsub.publish(message).catch(error => { + this.logger.warn("Failed to publish realtime event", { + topic, + event, + error: getErrorMessage(error), + }); + }); + } + + private deliver(message: RealtimePubSubMessage): void { + const set = this.observersByTopic.get(message.topic); + if (!set || set.size === 0) { + return; + } + + const evt = this.buildMessage(message.event, message.data); + set.forEach(observer => { + try { + observer.next(evt); + } catch (error) { + this.logger.warn("Failed to notify realtime listener", { + topic: message.topic, + error: getErrorMessage(error), + }); + } + }); + } + + private buildMessage(event: TEvent, data: unknown): MessageEvent { + return { + data: { + event, + data, + } satisfies RealtimeEventEnvelope, + } satisfies MessageEvent; + } +} diff --git a/apps/bff/src/infra/realtime/realtime.types.ts b/apps/bff/src/infra/realtime/realtime.types.ts new file mode 100644 index 00000000..37964542 --- /dev/null +++ b/apps/bff/src/infra/realtime/realtime.types.ts @@ -0,0 +1,35 @@ +import type { MessageEvent } from "@nestjs/common"; + +export interface RealtimeEventEnvelope { + event: TEvent; + data: TData; +} + +export interface RealtimePubSubMessage { + /** + * Topic identifies which logical stream this event belongs to. + * Examples: + * - orders:sf:801xx0000001234 + * - catalog:eligibility:001xx000000abcd + */ + topic: string; + event: TEvent; + data: TData; +} + +export interface RealtimeStreamOptions { + /** + * Event emitted immediately after subscription is established. + */ + readyEvent?: string | null; + /** + * Heartbeat interval (ms). Set to 0 to disable. + */ + heartbeatMs?: number; + /** + * Heartbeat event name. + */ + heartbeatEvent?: string | null; +} + +export type RealtimeMessageEvent = MessageEvent; diff --git a/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts b/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts index 894c13ed..7466fc1b 100644 --- a/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts +++ b/apps/bff/src/integrations/salesforce/events/catalog-cdc.subscriber.ts @@ -5,6 +5,7 @@ import { Logger } from "nestjs-pino"; import PubSubApiClientPkg from "salesforce-pubsub-api-client"; import { SalesforceConnection } from "../services/salesforce-connection.service.js"; import { CatalogCacheService } from "@bff/modules/catalog/services/catalog-cache.service.js"; +import { RealtimeService } from "@bff/infra/realtime/realtime.service.js"; type PubSubCallback = ( subscription: { topicName?: string }, @@ -38,6 +39,7 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { private readonly config: ConfigService, private readonly sfConnection: SalesforceConnection, private readonly catalogCache: CatalogCacheService, + private readonly realtime: RealtimeService, @Inject(Logger) private readonly logger: Logger ) { this.numRequested = this.resolveNumRequested(); @@ -180,7 +182,19 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { } ); await this.invalidateAllCatalogs(); + // Full invalidation already implies all clients should refetch catalog + this.realtime.publish("global:catalog", "catalog.changed", { + reason: "product.cdc.fallback_full_invalidation", + timestamp: new Date().toISOString(), + }); + return; } + + // Product changes can affect catalog results for all users + this.realtime.publish("global:catalog", "catalog.changed", { + reason: "product.cdc", + timestamp: new Date().toISOString(), + }); } private async handlePricebookEvent( @@ -222,7 +236,17 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { } ); await this.invalidateAllCatalogs(); + this.realtime.publish("global:catalog", "catalog.changed", { + reason: "pricebook.cdc.fallback_full_invalidation", + timestamp: new Date().toISOString(), + }); + return; } + + this.realtime.publish("global:catalog", "catalog.changed", { + reason: "pricebook.cdc", + timestamp: new Date().toISOString(), + }); } private async handleAccountEvent( @@ -255,6 +279,11 @@ export class CatalogCdcSubscriber implements OnModuleInit, OnModuleDestroy { await this.catalogCache.invalidateEligibility(accountId); await this.catalogCache.setEligibilityValue(accountId, eligibility ?? null); + + // Notify connected portals immediately (multi-instance safe via Redis pub/sub) + this.realtime.publish(`account:sf:${accountId}`, "catalog.eligibility.changed", { + timestamp: new Date().toISOString(), + }); } private async invalidateAllCatalogs(): Promise { diff --git a/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts b/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts index 0117f9f9..4c2b6b87 100644 --- a/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts +++ b/apps/bff/src/integrations/salesforce/events/order-cdc.subscriber.ts @@ -6,6 +6,7 @@ import PubSubApiClientPkg from "salesforce-pubsub-api-client"; import { SalesforceConnection } from "../services/salesforce-connection.service.js"; import { OrdersCacheService } from "@bff/modules/orders/services/orders-cache.service.js"; import { ProvisioningQueueService } from "@bff/modules/orders/queue/provisioning.queue.js"; +import { RealtimeService } from "@bff/infra/realtime/realtime.service.js"; type PubSubCallback = ( subscription: { topicName?: string }, @@ -79,6 +80,7 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { private readonly sfConnection: SalesforceConnection, private readonly ordersCache: OrdersCacheService, private readonly provisioningQueue: ProvisioningQueueService, + private readonly realtime: RealtimeService, @Inject(Logger) private readonly logger: Logger ) { this.numRequested = this.resolveNumRequested(); @@ -304,6 +306,17 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { if (accountId) { await this.ordersCache.invalidateAccountOrders(accountId); } + + // Notify portals for instant refresh (account-scoped + order-scoped) + const timestamp = new Date().toISOString(); + if (accountId) { + this.realtime.publish(`account:sf:${accountId}`, "orders.changed", { + timestamp, + }); + } + this.realtime.publish(`orders:sf:${orderId}`, "order.cdc.changed", { + timestamp, + }); } catch (error) { this.logger.warn("Failed to invalidate order cache from CDC event", { orderId, @@ -396,6 +409,7 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { const changedFields = this.extractChangedFields(payload); const orderId = this.extractStringField(payload, ["OrderId"]); + const accountId = this.extractStringField(payload, ["AccountId"]); if (!orderId) { this.logger.warn("OrderItem CDC event missing OrderId; skipping", { channel }); return; @@ -421,6 +435,16 @@ export class OrderCdcSubscriber implements OnModuleInit, OnModuleDestroy { try { await this.ordersCache.invalidateOrder(orderId); + + const timestamp = new Date().toISOString(); + if (accountId) { + this.realtime.publish(`account:sf:${accountId}`, "orders.changed", { + timestamp, + }); + } + this.realtime.publish(`orders:sf:${orderId}`, "orderitem.cdc.changed", { + timestamp, + }); } catch (error) { this.logger.warn("Failed to invalidate order cache from OrderItem CDC event", { orderId, diff --git a/apps/bff/src/modules/orders/orders.controller.ts b/apps/bff/src/modules/orders/orders.controller.ts index 081e55c8..8350e3de 100644 --- a/apps/bff/src/modules/orders/orders.controller.ts +++ b/apps/bff/src/modules/orders/orders.controller.ts @@ -2,6 +2,7 @@ import { Body, Controller, Get, + NotFoundException, Param, Post, Request, @@ -88,7 +89,17 @@ export class OrdersController { @Sse(":sfOrderId/events") @UsePipes(new ZodValidationPipe(sfOrderIdParamSchema)) - streamOrderUpdates(@Param() params: SfOrderIdParam): Observable { + @UseGuards(SalesforceReadThrottleGuard) + async streamOrderUpdates( + @Request() req: RequestWithUser, + @Param() params: SfOrderIdParam + ): Promise> { + // Ensure caller is allowed to access this order stream (avoid leaking existence) + try { + await this.orderOrchestrator.getOrderForUser(params.sfOrderId, req.user.id); + } catch { + throw new NotFoundException("Order not found"); + } return this.orderEvents.subscribe(params.sfOrderId); } diff --git a/apps/bff/src/modules/orders/services/order-events.service.ts b/apps/bff/src/modules/orders/services/order-events.service.ts index 122aac73..43d3d789 100644 --- a/apps/bff/src/modules/orders/services/order-events.service.ts +++ b/apps/bff/src/modules/orders/services/order-events.service.ts @@ -1,95 +1,33 @@ -import { Injectable, Logger } from "@nestjs/common"; +import { Injectable } from "@nestjs/common"; import type { MessageEvent } from "@nestjs/common"; import { Observable } from "rxjs"; import type { OrderUpdateEventPayload } from "@customer-portal/domain/orders"; +import { RealtimeService } from "@bff/infra/realtime/realtime.service.js"; -interface InternalObserver { - next: (event: MessageEvent) => void; - complete: () => void; - error: (error: unknown) => void; -} - +/** + * Order SSE publisher/subscriber adapter. + * + * Uses the shared RealtimeService (Redis-backed Pub/Sub) so updates are reliable + * across multiple BFF instances, while keeping the existing event names stable. + */ @Injectable() export class OrderEventsService { - private readonly logger = new Logger(OrderEventsService.name); - - private readonly observers = new Map>(); + constructor(private readonly realtime: RealtimeService) {} subscribe(orderId: string): Observable { - return new Observable(subscriber => { - const wrappedObserver: InternalObserver = { - next: value => subscriber.next(value), - complete: () => subscriber.complete(), - error: error => subscriber.error(error), - }; - - const orderObservers = this.observers.get(orderId) ?? new Set(); - orderObservers.add(wrappedObserver); - this.observers.set(orderId, orderObservers); - - this.logger.debug(`Order stream connected`, { orderId, listeners: orderObservers.size }); - - // Immediately notify client that stream is ready - wrappedObserver.next( - this.buildEvent("order.stream.ready", { - orderId, - timestamp: new Date().toISOString(), - }) - ); - - const heartbeat = setInterval(() => { - wrappedObserver.next( - this.buildEvent("order.stream.heartbeat", { - orderId, - timestamp: new Date().toISOString(), - }) - ); - }, 30000); - - return () => { - clearInterval(heartbeat); - const currentObservers = this.observers.get(orderId); - if (currentObservers) { - currentObservers.delete(wrappedObserver); - if (currentObservers.size === 0) { - this.observers.delete(orderId); - } - } - this.logger.debug(`Order stream disconnected`, { - orderId, - listeners: currentObservers?.size ?? 0, - }); - }; + const topic = this.topic(orderId); + return this.realtime.subscribe(topic, { + readyEvent: "order.stream.ready", + heartbeatEvent: "order.stream.heartbeat", + heartbeatMs: 30000, }); } publish(orderId: string, update: OrderUpdateEventPayload): void { - const currentObservers = this.observers.get(orderId); - if (!currentObservers || currentObservers.size === 0) { - this.logger.debug("No active listeners for order update", { orderId }); - return; - } - - const event = this.buildEvent("order.update", update); - - currentObservers.forEach(observer => { - try { - observer.next(event); - } catch (error) { - this.logger.warn("Failed to notify order update listener", { - orderId, - error: error instanceof Error ? error.message : String(error), - }); - } - }); + this.realtime.publish(this.topic(orderId), "order.update", update); } - private buildEvent(event: string, data: T): MessageEvent { - return { - data: { - event, - data, - }, - } satisfies MessageEvent; + private topic(orderId: string): string { + return `orders:sf:${orderId}`; } } diff --git a/apps/bff/src/modules/realtime/realtime.controller.ts b/apps/bff/src/modules/realtime/realtime.controller.ts new file mode 100644 index 00000000..736abc17 --- /dev/null +++ b/apps/bff/src/modules/realtime/realtime.controller.ts @@ -0,0 +1,47 @@ +import { Controller, Header, Request, Sse } from "@nestjs/common"; +import type { MessageEvent } from "@nestjs/common"; +import { Observable } from "rxjs"; +import { merge } from "rxjs"; +import type { RequestWithUser } from "@bff/modules/auth/auth.types.js"; +import { RealtimeService } from "@bff/infra/realtime/realtime.service.js"; +import { MappingsService } from "@bff/modules/id-mappings/mappings.service.js"; + +@Controller("events") +export class RealtimeController { + constructor( + private readonly realtime: RealtimeService, + private readonly mappings: MappingsService + ) {} + + /** + * Account-scoped realtime events stream. + * + * Single predictable SSE entrypoint for the portal. + * Backed by Redis pub/sub for multi-instance delivery. + */ + @Sse() + @Header("Cache-Control", "no-cache") + async stream(@Request() req: RequestWithUser): Promise> { + const mapping = await this.mappings.findByUserId(req.user.id); + const sfAccountId = mapping?.sfAccountId; + + const accountStream = this.realtime.subscribe( + sfAccountId ? `account:sf:${sfAccountId}` : "account:unknown", + { + // Always provide a single predictable ready + heartbeat for the main account stream. + readyEvent: "account.stream.ready", + heartbeatEvent: "account.stream.heartbeat", + heartbeatMs: 30000, + } + ); + + const globalCatalogStream = this.realtime.subscribe("global:catalog", { + // Avoid duplicate ready/heartbeat noise on the combined stream. + readyEvent: null, + heartbeatEvent: null, + heartbeatMs: 0, + }); + + return merge(accountStream, globalCatalogStream); + } +} diff --git a/apps/bff/src/modules/realtime/realtime.module.ts b/apps/bff/src/modules/realtime/realtime.module.ts new file mode 100644 index 00000000..ebe451da --- /dev/null +++ b/apps/bff/src/modules/realtime/realtime.module.ts @@ -0,0 +1,9 @@ +import { Module } from "@nestjs/common"; +import { MappingsModule } from "@bff/modules/id-mappings/mappings.module.js"; +import { RealtimeController } from "./realtime.controller.js"; + +@Module({ + imports: [MappingsModule], + controllers: [RealtimeController], +}) +export class RealtimeApiModule {} diff --git a/apps/portal/src/app/(authenticated)/layout.tsx b/apps/portal/src/app/(authenticated)/layout.tsx index 07c4e640..5f6a0e8e 100644 --- a/apps/portal/src/app/(authenticated)/layout.tsx +++ b/apps/portal/src/app/(authenticated)/layout.tsx @@ -1,6 +1,12 @@ import type { ReactNode } from "react"; import { AppShell } from "@/components/organisms"; +import { AccountEventsListener } from "@/features/realtime/components/AccountEventsListener"; export default function PortalLayout({ children }: { children: ReactNode }) { - return {children}; + return ( + + + {children} + + ); } diff --git a/apps/portal/src/features/catalog/hooks/useCatalog.ts b/apps/portal/src/features/catalog/hooks/useCatalog.ts index 1f5b2edc..236464ec 100644 --- a/apps/portal/src/features/catalog/hooks/useCatalog.ts +++ b/apps/portal/src/features/catalog/hooks/useCatalog.ts @@ -15,8 +15,6 @@ export function useInternetCatalog() { return useQuery({ queryKey: queryKeys.catalog.internet.combined(), queryFn: () => catalogService.getInternetCatalog(), - staleTime: 5 * 60 * 1000, - gcTime: 10 * 60 * 1000, }); } @@ -28,8 +26,6 @@ export function useSimCatalog() { return useQuery({ queryKey: queryKeys.catalog.sim.combined(), queryFn: () => catalogService.getSimCatalog(), - staleTime: 5 * 60 * 1000, - gcTime: 10 * 60 * 1000, }); } @@ -41,8 +37,6 @@ export function useVpnCatalog() { return useQuery({ queryKey: queryKeys.catalog.vpn.combined(), queryFn: () => catalogService.getVpnCatalog(), - staleTime: 5 * 60 * 1000, - gcTime: 10 * 60 * 1000, }); } diff --git a/apps/portal/src/features/dashboard/hooks/useDashboardSummary.ts b/apps/portal/src/features/dashboard/hooks/useDashboardSummary.ts index 214a2114..98f4d778 100644 --- a/apps/portal/src/features/dashboard/hooks/useDashboardSummary.ts +++ b/apps/portal/src/features/dashboard/hooks/useDashboardSummary.ts @@ -68,8 +68,6 @@ export function useDashboardSummary() { ); } }, - staleTime: 2 * 60 * 1000, // 2 minutes - gcTime: 5 * 60 * 1000, // 5 minutes (formerly cacheTime) enabled: isAuthenticated, retry: (failureCount, error) => { // Don't retry authentication errors diff --git a/apps/portal/src/features/orders/hooks/useOrdersList.ts b/apps/portal/src/features/orders/hooks/useOrdersList.ts index fc73b2bf..1a865dd4 100644 --- a/apps/portal/src/features/orders/hooks/useOrdersList.ts +++ b/apps/portal/src/features/orders/hooks/useOrdersList.ts @@ -6,9 +6,6 @@ import { queryKeys } from "@/lib/api"; import { ordersService } from "@/features/orders/services/orders.service"; import type { OrderSummary } from "@customer-portal/domain/orders"; -const STALE_TIME_MS = 2 * 60 * 1000; -const GC_TIME_MS = 10 * 60 * 1000; - export function useOrdersList() { const { isAuthenticated } = useAuthSession(); @@ -16,8 +13,6 @@ export function useOrdersList() { queryKey: queryKeys.orders.list(), queryFn: () => ordersService.getMyOrders(), enabled: isAuthenticated, - staleTime: STALE_TIME_MS, - gcTime: GC_TIME_MS, refetchOnWindowFocus: false, refetchOnReconnect: false, }); diff --git a/apps/portal/src/features/realtime/components/AccountEventsListener.tsx b/apps/portal/src/features/realtime/components/AccountEventsListener.tsx new file mode 100644 index 00000000..26811b42 --- /dev/null +++ b/apps/portal/src/features/realtime/components/AccountEventsListener.tsx @@ -0,0 +1,97 @@ +"use client"; + +import { useEffect, useRef } from "react"; +import { useQueryClient } from "@tanstack/react-query"; +import { resolveBaseUrl, queryKeys } from "@/lib/api"; +import { logger } from "@/lib/logger"; +import { useAuthSession } from "@/features/auth/services/auth.store"; + +type RealtimeEventEnvelope = { + event: TEvent; + data: TData; +}; + +export function AccountEventsListener() { + const { isAuthenticated } = useAuthSession(); + const queryClient = useQueryClient(); + + const clientRef = useRef<{ close: () => void } | null>(null); + + useEffect(() => { + if (!isAuthenticated) { + clientRef.current?.close(); + clientRef.current = null; + return; + } + + let cancelled = false; + let es: EventSource | null = null; + let reconnectTimeout: ReturnType | null = null; + + const baseUrl = resolveBaseUrl(); + const url = new URL("/api/events", baseUrl).toString(); + + const connect = () => { + if (cancelled) return; + + logger.debug("Connecting to account events stream", { url }); + es = new EventSource(url, { withCredentials: true }); + + const onMessage = (event: MessageEvent) => { + try { + const parsed = JSON.parse(event.data) as RealtimeEventEnvelope; + if (!parsed || typeof parsed !== "object") return; + + if (parsed.event === "catalog.eligibility.changed") { + void queryClient.invalidateQueries({ queryKey: queryKeys.catalog.all() }); + return; + } + + if (parsed.event === "catalog.changed") { + void queryClient.invalidateQueries({ queryKey: queryKeys.catalog.all() }); + return; + } + + if (parsed.event === "orders.changed") { + void queryClient.invalidateQueries({ queryKey: queryKeys.orders.list() }); + // Dashboard summary often depends on orders/subscriptions; cheap to keep in sync. + void queryClient.invalidateQueries({ queryKey: queryKeys.dashboard.summary() }); + return; + } + } catch (error) { + logger.warn("Failed to parse account event", { error }); + } + }; + + const onError = (error: Event) => { + logger.warn("Account events stream disconnected", { error }); + es?.close(); + es = null; + + if (!cancelled) { + reconnectTimeout = setTimeout(connect, 5000); + } + }; + + es.addEventListener("message", onMessage as EventListener); + es.onerror = onError; + }; + + connect(); + + clientRef.current = { + close: () => { + cancelled = true; + if (es) es.close(); + if (reconnectTimeout) clearTimeout(reconnectTimeout); + }, + }; + + return () => { + clientRef.current?.close(); + clientRef.current = null; + }; + }, [isAuthenticated, queryClient]); + + return null; +} diff --git a/apps/portal/src/features/subscriptions/hooks/useSubscriptions.ts b/apps/portal/src/features/subscriptions/hooks/useSubscriptions.ts index 9381fd69..8b8c3cc2 100644 --- a/apps/portal/src/features/subscriptions/hooks/useSubscriptions.ts +++ b/apps/portal/src/features/subscriptions/hooks/useSubscriptions.ts @@ -34,8 +34,6 @@ export function useSubscriptions(options: UseSubscriptionsOptions = {}) { ); return getDataOrThrow(response, "Failed to load subscriptions"); }, - staleTime: 5 * 60 * 1000, - gcTime: 10 * 60 * 1000, enabled: isAuthenticated, }); } @@ -52,8 +50,6 @@ export function useActiveSubscriptions() { const response = await apiClient.GET("/api/subscriptions/active"); return getDataOrThrow(response, "Failed to load active subscriptions"); }, - staleTime: 5 * 60 * 1000, - gcTime: 10 * 60 * 1000, enabled: isAuthenticated, }); } @@ -73,8 +69,6 @@ export function useSubscriptionStats() { } return subscriptionStatsSchema.parse(response.data); }, - staleTime: 5 * 60 * 1000, - gcTime: 10 * 60 * 1000, enabled: isAuthenticated, }); } @@ -93,8 +87,6 @@ export function useSubscription(subscriptionId: number) { }); return getDataOrThrow(response, "Failed to load subscription details"); }, - staleTime: 5 * 60 * 1000, - gcTime: 10 * 60 * 1000, enabled: isAuthenticated && subscriptionId > 0, }); } @@ -120,8 +112,6 @@ export function useSubscriptionInvoices( }); return getDataOrThrow(response, "Failed to load subscription invoices"); }, - staleTime: 60 * 1000, - gcTime: 5 * 60 * 1000, enabled: isAuthenticated && subscriptionId > 0, }); } diff --git a/apps/portal/src/features/support/hooks/useSupportCase.ts b/apps/portal/src/features/support/hooks/useSupportCase.ts index fcbc4f5c..1cc66fb4 100644 --- a/apps/portal/src/features/support/hooks/useSupportCase.ts +++ b/apps/portal/src/features/support/hooks/useSupportCase.ts @@ -15,7 +15,5 @@ export function useSupportCase(caseId: string | undefined) { return getDataOrThrow(response, "Failed to load support case"); }, enabled: isAuthenticated && !!caseId, - staleTime: 60 * 1000, }); } - diff --git a/apps/portal/src/features/support/hooks/useSupportCases.ts b/apps/portal/src/features/support/hooks/useSupportCases.ts index 7f94dd50..3a957ea4 100644 --- a/apps/portal/src/features/support/hooks/useSupportCases.ts +++ b/apps/portal/src/features/support/hooks/useSupportCases.ts @@ -19,6 +19,5 @@ export function useSupportCases(filters?: SupportCaseFilter) { return getDataOrThrow(response, "Failed to load support cases"); }, enabled: isAuthenticated, - staleTime: 60 * 1000, }); } diff --git a/apps/portal/src/lib/hooks/useCurrency.ts b/apps/portal/src/lib/hooks/useCurrency.ts index e3a2fba1..3ab0f5c4 100644 --- a/apps/portal/src/lib/hooks/useCurrency.ts +++ b/apps/portal/src/lib/hooks/useCurrency.ts @@ -9,8 +9,6 @@ export function useCurrency() { const { data, isLoading, isError, error } = useQuery({ queryKey: queryKeys.currency.default(), queryFn: () => currencyService.getDefaultCurrency(), - staleTime: 60 * 60 * 1000, // cache currency for 1 hour - gcTime: 2 * 60 * 60 * 1000, retry: 2, }); diff --git a/packages/domain/index.ts b/packages/domain/index.ts index ad70d807..914b9630 100644 --- a/packages/domain/index.ts +++ b/packages/domain/index.ts @@ -17,3 +17,4 @@ export * as Auth from "./auth/index.js"; export * as Customer from "./customer/index.js"; export * as Mappings from "./mappings/index.js"; export * as Dashboard from "./dashboard/index.js"; +export * as Realtime from "./realtime/index.js"; diff --git a/packages/domain/package.json b/packages/domain/package.json index 948ef8f9..4792a6ee 100644 --- a/packages/domain/package.json +++ b/packages/domain/package.json @@ -87,6 +87,14 @@ "import": "./dist/payments/*.js", "types": "./dist/payments/*.d.ts" }, + "./realtime": { + "import": "./dist/realtime/index.js", + "types": "./dist/realtime/index.d.ts" + }, + "./realtime/*": { + "import": "./dist/realtime/*.js", + "types": "./dist/realtime/*.d.ts" + }, "./sim": { "import": "./dist/sim/index.js", "types": "./dist/sim/index.d.ts" diff --git a/packages/domain/realtime/events.ts b/packages/domain/realtime/events.ts new file mode 100644 index 00000000..da6e3f0b --- /dev/null +++ b/packages/domain/realtime/events.ts @@ -0,0 +1,21 @@ +/** + * Realtime Events - Shared Contracts + * + * Shared SSE payload shapes for portal + BFF. + */ + +export interface RealtimeEventEnvelope { + event: TEvent; + data: TData; +} + +export interface CatalogEligibilityChangedPayload { + accountId: string; + eligibility: string | null; + timestamp: string; +} + +export type AccountStreamEvent = + | RealtimeEventEnvelope<"account.stream.ready", { topic: string; timestamp: string }> + | RealtimeEventEnvelope<"account.stream.heartbeat", { topic: string; timestamp: string }> + | RealtimeEventEnvelope<"catalog.eligibility.changed", CatalogEligibilityChangedPayload>; diff --git a/packages/domain/realtime/index.ts b/packages/domain/realtime/index.ts new file mode 100644 index 00000000..930686e7 --- /dev/null +++ b/packages/domain/realtime/index.ts @@ -0,0 +1 @@ +export * from "./events.js"; diff --git a/packages/domain/tsconfig.json b/packages/domain/tsconfig.json index 8757a636..d7bf71ec 100644 --- a/packages/domain/tsconfig.json +++ b/packages/domain/tsconfig.json @@ -20,6 +20,7 @@ "orders/**/*", "payments/**/*", "providers/**/*", + "realtime/**/*", "sim/**/*", "subscriptions/**/*", "support/**/*",