/* eslint-disable no-await-in-loop */ import cluster from 'cluster'; import { Db, ObjectID } from 'mongodb'; import { Mdoc, Rdoc, User } from '../interface'; const _hooks: Record any>> = {}; const _disposables = []; function isBailed(value: any) { return value !== null && value !== false && value !== undefined; } export type Disposable = () => void interface EventMap { 'app/started': () => void 'app/exit': () => Promise | void 'dispose': () => void 'database/connect': (db: Db) => void 'user/message': (uid: number, mdoc: Mdoc, udoc: User) => void 'document/add': (doc: any) => Promise | string | void 'document/set': (domainId: string, docType: number, docId: ObjectID | string | number, args: any) => Promise | string | void 'record/change': (rdoc: Rdoc, $set?: any, $push?: any) => void } function getHooks(name: K) { const hooks = _hooks[name] || (_hooks[name] = []); if (hooks.length >= 128) { console.warn( 'max listener count (128) for event "%s" exceeded, which may be caused by a memory leak', name, ); } return hooks; } export function removeListener(name: K, listener: EventMap[K]) { const index = (_hooks[name] || []).findIndex((callback) => callback === listener); if (index >= 0) { _hooks[name].splice(index, 1); return true; } return false; } export function addListener(name: K, listener: EventMap[K]) { getHooks(name).push(listener); const dispose = () => removeListener(name, listener); _disposables.push(name === 'dispose' ? listener as Disposable : dispose); return dispose; } export function prependListener(name: K, listener: EventMap[K]) { getHooks(name).unshift(listener); const dispose = () => removeListener(name, listener); _disposables.push(name === 'dispose' ? listener as Disposable : dispose); return dispose; } export function once(name: K, listener: EventMap[K]) { // @ts-ignore const dispose = addListener(name, (...args: Parameters) => { dispose(); // @ts-ignore return listener.apply(this, args); }); return dispose; } export function on(name: K, listener: EventMap[K]) { return addListener(name, listener); } export function off(name: K, listener: EventMap[K]) { return removeListener(name, listener); } export async function parallel(name: K, ...args: Parameters): Promise { const tasks: Promise[] = []; for (const callback of _hooks[name] || []) { tasks.push(callback.apply(this, args)); } await Promise.all(tasks); } export function emit(name: K, ...args: Parameters) { return parallel(name, ...args); } export async function serial(name: K, ...args: Parameters): Promise> { for (const callback of _hooks[name] || []) { const result = await callback.apply(this, args); if (isBailed(result)) return result; } return null; } export function bail(name: K, ...args: Parameters): ReturnType { for (const callback of _hooks[name] || []) { const result = callback.apply(this, args); if (isBailed(result)) return result; } return null; } export function boardcast(event: K, ...payload: Parameters) { // Process forked by pm2 would also have process.send if (process.send && !cluster.isMaster) { process.send({ event: 'bus', eventName: event, payload, }); } else parallel(event, ...payload); } global.Hydro.service.bus = { addListener, bail, boardcast, emit, on, off, once, parallel, prependListener, removeListener, serial, };