core: fallback for non-replicaset instances

pull/340/head
undefined 3 years ago
parent 429bacfaf2
commit 406e45975f

@ -562,7 +562,7 @@ export interface FileNode {
}
export interface EventDoc {
sender: string;
ack: string[];
event: number | string;
payload: string;
expire: Date;

@ -2,7 +2,7 @@ import { hostname } from 'os';
import moment from 'moment-timezone';
import { FilterQuery, ObjectID } from 'mongodb';
import { sleep } from '@hydrooj/utils/lib/utils';
import { BaseService, Task } from '../interface';
import { BaseService, EventDoc, Task } from '../interface';
import { Logger } from '../logger';
import * as bus from '../service/bus';
import db from '../service/db';
@ -161,18 +161,35 @@ bus.once('app/started', async () => {
});
}
await collEvent.createIndex({ expire: 1 }, { expireAfterSeconds: 0 });
const stream = collEvent.watch([{ $match: { sender: { $ne: id } } }]);
stream.on('change', async (change) => {
if (change.operationType !== 'insert') return;
const doc = change.fullDocument;
const stream = collEvent.watch();
const handleEvent = async (doc: EventDoc) => {
const payload = JSON.parse(doc.payload);
if (process.send) process.send({ type: 'hydro:broadcast', data: { event: doc.event, payload } });
if (doc) bus.parallel(doc.event, ...payload);
await bus.parallel(doc.event, ...payload);
};
stream.on('change', async (change) => {
if (change.operationType !== 'insert') return;
if (change.fullDocument.ack.includes(id)) return;
await handleEvent(change.fullDocument);
});
stream.on('error', async () => {
// The $changeStream stage is only supported on replica sets
logger.info('No replica set found.');
// eslint-disable-next-line no-constant-condition
while (true) {
// eslint-disable-next-line no-await-in-loop
const res = await collEvent.findOneAndUpdate(
{ ack: { $nin: [id] } },
{ $push: { ack: id } },
);
// eslint-disable-next-line no-await-in-loop
await (res.value ? handleEvent(res.value) : sleep(500));
}
});
});
bus.on('bus/broadcast', (event, payload) => {
collEvent.insertOne({
sender: id,
ack: [id],
event,
payload: JSON.stringify(payload),
expire: new Date(Date.now() + 10000),

Loading…
Cancel
Save