diff --git a/packages/hydrooj/src/interface.ts b/packages/hydrooj/src/interface.ts index 5fe7b363..b2492784 100644 --- a/packages/hydrooj/src/interface.ts +++ b/packages/hydrooj/src/interface.ts @@ -562,7 +562,7 @@ export interface FileNode { } export interface EventDoc { - sender: string; + ack: string[]; event: number | string; payload: string; expire: Date; diff --git a/packages/hydrooj/src/model/task.ts b/packages/hydrooj/src/model/task.ts index 6185e45e..a69a5d00 100644 --- a/packages/hydrooj/src/model/task.ts +++ b/packages/hydrooj/src/model/task.ts @@ -2,7 +2,7 @@ import { hostname } from 'os'; import moment from 'moment-timezone'; import { FilterQuery, ObjectID } from 'mongodb'; import { sleep } from '@hydrooj/utils/lib/utils'; -import { BaseService, Task } from '../interface'; +import { BaseService, EventDoc, Task } from '../interface'; import { Logger } from '../logger'; import * as bus from '../service/bus'; import db from '../service/db'; @@ -161,18 +161,35 @@ bus.once('app/started', async () => { }); } await collEvent.createIndex({ expire: 1 }, { expireAfterSeconds: 0 }); - const stream = collEvent.watch([{ $match: { sender: { $ne: id } } }]); - stream.on('change', async (change) => { - if (change.operationType !== 'insert') return; - const doc = change.fullDocument; + const stream = collEvent.watch(); + const handleEvent = async (doc: EventDoc) => { const payload = JSON.parse(doc.payload); if (process.send) process.send({ type: 'hydro:broadcast', data: { event: doc.event, payload } }); - if (doc) bus.parallel(doc.event, ...payload); + await bus.parallel(doc.event, ...payload); + }; + stream.on('change', async (change) => { + if (change.operationType !== 'insert') return; + if (change.fullDocument.ack.includes(id)) return; + await handleEvent(change.fullDocument); + }); + stream.on('error', async () => { + // The $changeStream stage is only supported on replica sets + logger.info('No replica set found.'); + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + const res = await collEvent.findOneAndUpdate( + { ack: { $nin: [id] } }, + { $push: { ack: id } }, + ); + // eslint-disable-next-line no-await-in-loop + await (res.value ? handleEvent(res.value) : sleep(500)); + } }); }); bus.on('bus/broadcast', (event, payload) => { collEvent.insertOne({ - sender: id, + ack: [id], event, payload: JSON.stringify(payload), expire: new Date(Date.now() + 10000),