core: pm2 event bus

pull/220/head
undefined 3 years ago
parent fc7aa51571
commit 3adde783a3

@ -1,6 +1,6 @@
{
"name": "hydrooj",
"version": "2.36.15",
"version": "2.37.0",
"bin": "bin/hydrooj.js",
"main": "src/loader",
"module": "src/loader",
@ -64,6 +64,10 @@
"@types/source-map-support": "^0",
"@types/superagent": "^4.1.12",
"formidable": "^1.2.2",
"moment": "^2.29.1"
"moment": "^2.29.1",
"pm2": "^5.1.1"
},
"peerDependencies": {
"pm2": "*"
}
}

@ -261,7 +261,6 @@ class HomeSettingsHandler extends Handler {
}
for (const key in booleanKeys) if (!args[key]) $set[key] = false;
await setter($set);
await sleep(100);
this.back();
}
}

@ -179,7 +179,6 @@ class SystemSettingHandler extends SystemHandler {
}
tasks.push(bus.parallel('system/setting', args));
await Promise.all(tasks);
await sleep(100);
this.back();
}
}

@ -135,12 +135,14 @@ class TaskModel {
static Worker = Worker;
}
const id = hostname();
Worker.addHandler('task.daily', async () => {
await global.Hydro.script.rp?.run({}, new Logger('task/rp').debug);
await global.Hydro.script.problemStat?.run({}, new Logger('task/problem').debug);
});
bus.on('domain/delete', (domainId) => coll.deleteMany({ domainId }));
bus.once('app/started', async () => {
if (process.env.NODE_APP_INSTANCE !== '0') return;
if (!await TaskModel.count({ type: 'schedule', subType: 'task.daily' })) {
await TaskModel.add({
type: 'schedule',
@ -150,7 +152,6 @@ bus.once('app/started', async () => {
});
}
await collEvent.createIndex({ expire: 1 }, { expireAfterSeconds: 0 });
const id = hostname() + process.env.NODE_APP_INSTANCE;
(async () => {
// eslint-disable-next-line no-constant-condition
while (true) {
@ -167,7 +168,7 @@ bus.once('app/started', async () => {
});
bus.on('bus/broadcast', (event, payload) => {
collEvent.insertOne({
ack: [],
ack: [id],
event,
payload: JSON.stringify(payload),
expire: new Date(Date.now() + 10000),

@ -183,6 +183,24 @@ export function broadcast<K extends keyof EventMap>(event: K, ...payload: Parame
return parallel('bus/broadcast', event, payload);
}
try {
if (!process.send) throw new Error();
const pm2: typeof import('pm2') = require('pm2');
pm2.launchBus((err, bus) => {
if (err) throw new Error();
bus.on('hydro:broadcast', (packet) => {
parallel(packet.data.event, ...packet.data.payload);
});
on('bus/broadcast', (event, payload) => {
process.send({ type: 'hydro:broadcast', data: { event, payload } });
});
console.debug('Using pm2 event bus');
});
} catch (e) {
on('bus/broadcast', (event, payload) => parallel(event, ...payload));
console.debug('Using mongodb external event bus');
}
global.Hydro.service.bus = {
addListener, bail, broadcast, emit, on, off, once, parallel, prependListener, removeListener, serial,
};

@ -887,13 +887,17 @@ export function Connection(
let started = false;
// TODO use postInit?
export function start() {
export async function start() {
if (started) return;
const port = system.get('server.port');
app.use(router.routes()).use(router.allowedMethods());
server.listen(argv.options.port || port);
logger.success('Server listening at: %d', argv.options.port || port);
started = true;
await new Promise((resolve) => {
server.listen(argv.options.port || port, () => {
logger.success('Server listening at: %d', argv.options.port || port);
started = true;
resolve(true);
});
});
}
global.Hydro.service.server = {

Loading…
Cancel
Save