import { Inject, Injectable, Logger } from "@nestjs/common"; import type { OnModuleDestroy, OnModuleInit } from "@nestjs/common"; import { Redis } from "ioredis"; import { getErrorMessage } from "@bff/core/utils/error.util.js"; import type { RealtimePubSubMessage } from "./realtime.types.js"; type Handler = (message: RealtimePubSubMessage) => void; /** * Redis Pub/Sub wrapper for realtime events. * * - Uses a dedicated subscriber connection (required by Redis pub/sub semantics) * - Publishes JSON messages to a single channel * - Dispatches messages to in-process handlers */ @Injectable() export class RealtimePubSubService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(RealtimePubSubService.name); private readonly CHANNEL = "realtime:events"; private subscriber: Redis | null = null; private handlers = new Set(); constructor(@Inject("REDIS_CLIENT") private readonly redis: Redis) {} async onModuleInit(): Promise { // Create a dedicated connection for subscriptions this.subscriber = this.redis.duplicate(); this.subscriber.on("error", err => { this.logger.warn("Realtime Redis subscriber error", { error: getErrorMessage(err) }); }); this.subscriber.on("end", () => { this.logger.warn("Realtime Redis subscriber connection ended"); }); await this.subscriber.subscribe(this.CHANNEL); this.subscriber.on("message", (_channel, raw) => { const msg = this.safeParse(raw); if (!msg) return; for (const handler of this.handlers) { try { handler(msg); } catch (error) { this.logger.warn("Realtime handler threw", { error: getErrorMessage(error) }); } } }); this.logger.log("Realtime Pub/Sub initialized", { channel: this.CHANNEL }); } async onModuleDestroy(): Promise { // Capture the connection reference up-front and null out the field early. // This makes shutdown idempotent and avoids races if the hook is invoked more than once. const subscriber = this.subscriber; if (!subscriber) return; this.subscriber = null; this.handlers.clear(); try { await subscriber.unsubscribe(this.CHANNEL); await subscriber.quit(); } catch { // Best-effort immediate close if graceful shutdown fails. try { subscriber.disconnect(); } catch { // ignore } } } publish(message: RealtimePubSubMessage): Promise { return this.redis.publish(this.CHANNEL, JSON.stringify(message)); } addHandler(handler: Handler): () => void { this.handlers.add(handler); return () => { this.handlers.delete(handler); }; } private safeParse(raw: string): RealtimePubSubMessage | null { try { const parsed = JSON.parse(raw) as RealtimePubSubMessage; if (!parsed || typeof parsed !== "object") return null; if (typeof parsed.topic !== "string" || typeof parsed.event !== "string") return null; return parsed; } catch { return null; } } }