89 lines
2.7 KiB
TypeScript
89 lines
2.7 KiB
TypeScript
|
|
import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit } from "@nestjs/common";
|
||
|
|
import { Redis } from "ioredis";
|
||
|
|
import { getErrorMessage } from "@bff/core/utils/error.util.js";
|
||
|
|
import type { RealtimePubSubMessage } from "./realtime.types.js";
|
||
|
|
|
||
|
|
type Handler = (message: RealtimePubSubMessage) => void;
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Redis Pub/Sub wrapper for realtime events.
|
||
|
|
*
|
||
|
|
* - Uses a dedicated subscriber connection (required by Redis pub/sub semantics)
|
||
|
|
* - Publishes JSON messages to a single channel
|
||
|
|
* - Dispatches messages to in-process handlers
|
||
|
|
*/
|
||
|
|
@Injectable()
|
||
|
|
export class RealtimePubSubService implements OnModuleInit, OnModuleDestroy {
|
||
|
|
private readonly logger = new Logger(RealtimePubSubService.name);
|
||
|
|
private readonly CHANNEL = "realtime:events";
|
||
|
|
|
||
|
|
private subscriber: Redis | null = null;
|
||
|
|
private handlers = new Set<Handler>();
|
||
|
|
|
||
|
|
constructor(@Inject("REDIS_CLIENT") private readonly redis: Redis) {}
|
||
|
|
|
||
|
|
async onModuleInit(): Promise<void> {
|
||
|
|
// Create a dedicated connection for subscriptions
|
||
|
|
this.subscriber = this.redis.duplicate();
|
||
|
|
|
||
|
|
this.subscriber.on("error", err => {
|
||
|
|
this.logger.warn("Realtime Redis subscriber error", { error: getErrorMessage(err) });
|
||
|
|
});
|
||
|
|
|
||
|
|
this.subscriber.on("end", () => {
|
||
|
|
this.logger.warn("Realtime Redis subscriber connection ended");
|
||
|
|
});
|
||
|
|
|
||
|
|
await this.subscriber.subscribe(this.CHANNEL);
|
||
|
|
|
||
|
|
this.subscriber.on("message", (_channel, raw) => {
|
||
|
|
const msg = this.safeParse(raw);
|
||
|
|
if (!msg) return;
|
||
|
|
for (const handler of this.handlers) {
|
||
|
|
try {
|
||
|
|
handler(msg);
|
||
|
|
} catch (error) {
|
||
|
|
this.logger.warn("Realtime handler threw", { error: getErrorMessage(error) });
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
this.logger.log("Realtime Pub/Sub initialized", { channel: this.CHANNEL });
|
||
|
|
}
|
||
|
|
|
||
|
|
async onModuleDestroy(): Promise<void> {
|
||
|
|
if (!this.subscriber) return;
|
||
|
|
try {
|
||
|
|
await this.subscriber.unsubscribe(this.CHANNEL);
|
||
|
|
await this.subscriber.quit();
|
||
|
|
} catch {
|
||
|
|
this.subscriber.disconnect();
|
||
|
|
} finally {
|
||
|
|
this.subscriber = null;
|
||
|
|
this.handlers.clear();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
publish(message: RealtimePubSubMessage): Promise<number> {
|
||
|
|
return this.redis.publish(this.CHANNEL, JSON.stringify(message));
|
||
|
|
}
|
||
|
|
|
||
|
|
addHandler(handler: Handler): () => void {
|
||
|
|
this.handlers.add(handler);
|
||
|
|
return () => {
|
||
|
|
this.handlers.delete(handler);
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
private safeParse(raw: string): RealtimePubSubMessage | null {
|
||
|
|
try {
|
||
|
|
const parsed = JSON.parse(raw) as RealtimePubSubMessage;
|
||
|
|
if (!parsed || typeof parsed !== "object") return null;
|
||
|
|
if (typeof parsed.topic !== "string" || typeof parsed.event !== "string") return null;
|
||
|
|
return parsed;
|
||
|
|
} catch {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|