189 lines
5.2 KiB
TypeScript
189 lines
5.2 KiB
TypeScript
|
|
/**
|
||
|
|
* Distributed Lock Service
|
||
|
|
*
|
||
|
|
* Redis-based distributed locking for preventing race conditions
|
||
|
|
* in operations that span multiple systems (e.g., Salesforce + Portal).
|
||
|
|
*
|
||
|
|
* Uses Redis SET NX PX pattern for atomic lock acquisition with TTL.
|
||
|
|
*/
|
||
|
|
|
||
|
|
import { Injectable, Inject } from "@nestjs/common";
|
||
|
|
import { Logger } from "nestjs-pino";
|
||
|
|
import type { Redis } from "ioredis";
|
||
|
|
|
||
|
|
const LOCK_PREFIX = "lock:";
|
||
|
|
const DEFAULT_TTL_MS = 30_000; // 30 seconds
|
||
|
|
const DEFAULT_RETRY_DELAY_MS = 100;
|
||
|
|
const DEFAULT_MAX_RETRIES = 50; // 5 seconds total with 100ms delay
|
||
|
|
|
||
|
|
export interface LockOptions {
|
||
|
|
/** Lock TTL in milliseconds (default: 30000) */
|
||
|
|
ttlMs?: number;
|
||
|
|
/** Delay between retry attempts in milliseconds (default: 100) */
|
||
|
|
retryDelayMs?: number;
|
||
|
|
/** Maximum number of retry attempts (default: 50) */
|
||
|
|
maxRetries?: number;
|
||
|
|
}
|
||
|
|
|
||
|
|
export interface Lock {
|
||
|
|
/** The lock key */
|
||
|
|
key: string;
|
||
|
|
/** Unique token for this lock instance */
|
||
|
|
token: string;
|
||
|
|
/** Release the lock */
|
||
|
|
release: () => Promise<void>;
|
||
|
|
}
|
||
|
|
|
||
|
|
@Injectable()
|
||
|
|
export class DistributedLockService {
|
||
|
|
constructor(
|
||
|
|
@Inject("REDIS_CLIENT") private readonly redis: Redis,
|
||
|
|
@Inject(Logger) private readonly logger: Logger
|
||
|
|
) {}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Acquire a distributed lock
|
||
|
|
*
|
||
|
|
* @param key - Unique key identifying the resource to lock
|
||
|
|
* @param options - Lock options
|
||
|
|
* @returns Lock object if acquired, null if unable to acquire
|
||
|
|
*/
|
||
|
|
async acquire(key: string, options?: LockOptions): Promise<Lock | null> {
|
||
|
|
const lockKey = LOCK_PREFIX + key;
|
||
|
|
const token = this.generateToken();
|
||
|
|
const ttlMs = options?.ttlMs ?? DEFAULT_TTL_MS;
|
||
|
|
const retryDelayMs = options?.retryDelayMs ?? DEFAULT_RETRY_DELAY_MS;
|
||
|
|
const maxRetries = options?.maxRetries ?? DEFAULT_MAX_RETRIES;
|
||
|
|
|
||
|
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||
|
|
// SET key token NX PX ttl - atomic set if not exists with TTL
|
||
|
|
const result = await this.redis.set(lockKey, token, "PX", ttlMs, "NX");
|
||
|
|
|
||
|
|
if (result === "OK") {
|
||
|
|
this.logger.debug("Lock acquired", { key: lockKey, attempt });
|
||
|
|
return {
|
||
|
|
key: lockKey,
|
||
|
|
token,
|
||
|
|
release: () => this.release(lockKey, token),
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
// Lock is held by someone else, wait and retry
|
||
|
|
if (attempt < maxRetries) {
|
||
|
|
await this.delay(retryDelayMs);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
this.logger.warn("Failed to acquire lock after max retries", {
|
||
|
|
key: lockKey,
|
||
|
|
maxRetries,
|
||
|
|
});
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Execute a function with a lock
|
||
|
|
*
|
||
|
|
* Automatically acquires lock before execution and releases after.
|
||
|
|
* If lock cannot be acquired, throws an error.
|
||
|
|
*
|
||
|
|
* @param key - Unique key identifying the resource to lock
|
||
|
|
* @param fn - Function to execute while holding the lock
|
||
|
|
* @param options - Lock options
|
||
|
|
* @returns Result of the function
|
||
|
|
*/
|
||
|
|
async withLock<T>(key: string, fn: () => Promise<T>, options?: LockOptions): Promise<T> {
|
||
|
|
const lock = await this.acquire(key, options);
|
||
|
|
|
||
|
|
if (!lock) {
|
||
|
|
throw new Error(`Unable to acquire lock for key: ${key}`);
|
||
|
|
}
|
||
|
|
|
||
|
|
try {
|
||
|
|
return await fn();
|
||
|
|
} finally {
|
||
|
|
await lock.release();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Try to execute a function with a lock
|
||
|
|
*
|
||
|
|
* Unlike withLock, this returns null if lock cannot be acquired
|
||
|
|
* instead of throwing an error.
|
||
|
|
*
|
||
|
|
* @param key - Unique key identifying the resource to lock
|
||
|
|
* @param fn - Function to execute while holding the lock
|
||
|
|
* @param options - Lock options
|
||
|
|
* @returns Result of the function, or null if lock not acquired
|
||
|
|
*/
|
||
|
|
async tryWithLock<T>(
|
||
|
|
key: string,
|
||
|
|
fn: () => Promise<T>,
|
||
|
|
options?: LockOptions
|
||
|
|
): Promise<{ success: true; result: T } | { success: false; result: null }> {
|
||
|
|
const lock = await this.acquire(key, {
|
||
|
|
...options,
|
||
|
|
maxRetries: 0, // Don't retry for try semantics
|
||
|
|
});
|
||
|
|
|
||
|
|
if (!lock) {
|
||
|
|
return { success: false, result: null };
|
||
|
|
}
|
||
|
|
|
||
|
|
try {
|
||
|
|
const result = await fn();
|
||
|
|
return { success: true, result };
|
||
|
|
} finally {
|
||
|
|
await lock.release();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Release a lock
|
||
|
|
*
|
||
|
|
* Uses a Lua script to ensure we only release our own lock.
|
||
|
|
*/
|
||
|
|
private async release(lockKey: string, token: string): Promise<void> {
|
||
|
|
// Lua script: only delete if the token matches
|
||
|
|
const script = `
|
||
|
|
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||
|
|
return redis.call("del", KEYS[1])
|
||
|
|
else
|
||
|
|
return 0
|
||
|
|
end
|
||
|
|
`;
|
||
|
|
|
||
|
|
try {
|
||
|
|
const result = await this.redis.eval(script, 1, lockKey, token);
|
||
|
|
|
||
|
|
if (result === 1) {
|
||
|
|
this.logger.debug("Lock released", { key: lockKey });
|
||
|
|
} else {
|
||
|
|
this.logger.warn("Lock release failed - token mismatch or expired", {
|
||
|
|
key: lockKey,
|
||
|
|
});
|
||
|
|
}
|
||
|
|
} catch (error) {
|
||
|
|
this.logger.error("Error releasing lock", {
|
||
|
|
key: lockKey,
|
||
|
|
error: error instanceof Error ? error.message : String(error),
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Generate a unique token for lock ownership
|
||
|
|
*/
|
||
|
|
private generateToken(): string {
|
||
|
|
return `${Date.now()}-${Math.random().toString(36).substring(2, 15)}`;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Delay helper
|
||
|
|
*/
|
||
|
|
private delay(ms: number): Promise<void> {
|
||
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
||
|
|
}
|
||
|
|
}
|