From 3807d12168d7f5b32bd10cf38ba7cd2ed2395df6 Mon Sep 17 00:00:00 2001 From: undefined Date: Mon, 30 Jan 2023 07:27:45 +0800 Subject: [PATCH] core: add @subscribe decorator --- packages/hydrooj/src/handler/home.ts | 13 ++----------- packages/hydrooj/src/handler/problem.ts | 11 +++++------ packages/hydrooj/src/handler/status.ts | 7 +++---- packages/hydrooj/src/model/task.ts | 2 +- packages/hydrooj/src/service/decorators.ts | 7 +++++++ packages/hydrooj/src/service/server.ts | 12 +++++++----- packages/hydrooj/src/upgrade.ts | 14 ++++++-------- 7 files changed, 31 insertions(+), 35 deletions(-) diff --git a/packages/hydrooj/src/handler/home.ts b/packages/hydrooj/src/handler/home.ts index d149151b..42b5454b 100644 --- a/packages/hydrooj/src/handler/home.ts +++ b/packages/hydrooj/src/handler/home.ts @@ -27,9 +27,8 @@ import * as system from '../model/system'; import token from '../model/token'; import * as training from '../model/training'; import user from '../model/user'; -import * as bus from '../service/bus'; import { - ConnectionHandler, Handler, param, query, requireSudo, Types, + ConnectionHandler, Handler, param, query, requireSudo, subscribe, Types, } from '../service/server'; import { camelCase, md5 } from '../utils'; @@ -519,22 +518,14 @@ class HomeMessagesHandler extends Handler { class HomeMessagesConnectionHandler extends ConnectionHandler { category = '#message'; - dispose: bus.Disposable; - - async prepare() { - this.dispose = bus.on('user/message', this.onMessageReceived.bind(this)); - } + @subscribe('user/message') async onMessageReceived(uid: number, mdoc: MessageDoc) { if (uid !== this.user._id) return; const udoc = (await user.getById(this.args.domainId, mdoc.from))!; udoc.avatarUrl = avatar(udoc.avatar, 64); this.send({ udoc, mdoc }); } - - async cleanup() { - if (this.dispose) this.dispose(); - } } export async function apply(ctx: Context) { diff --git a/packages/hydrooj/src/handler/problem.ts b/packages/hydrooj/src/handler/problem.ts index 3a550cd9..ba424cff 100644 --- a/packages/hydrooj/src/handler/problem.ts +++ b/packages/hydrooj/src/handler/problem.ts @@ -30,7 +30,6 @@ import solution from '../model/solution'; import storage from '../model/storage'; import * as system from '../model/system'; import user from '../model/user'; -import * as bus from '../service/bus'; import { Handler, param, post, query, route, Types, } from '../service/server'; @@ -168,7 +167,7 @@ export class ProblemMainHandler extends Handler { }); sort = result.hits; } - await bus.parallel('problem/list', query, this); + await this.ctx.parallel('problem/list', query, this); // eslint-disable-next-line prefer-const let [pdocs, ppcount, pcount] = fail ? [[], 0, 0] @@ -297,7 +296,7 @@ export class ProblemRandomHandler extends Handler { .map((i) => i.split('category:')[1]?.split(','))); const q = buildQuery(this.user); if (category.length) q.$and = category.map((tag) => ({ tag })); - await bus.parallel('problem/list', q, this); + await this.ctx.parallel('problem/list', q, this); const pid = await problem.random(domainId, q); if (!pid) throw new NoProblemError(); this.response.body = { pid }; @@ -352,7 +351,7 @@ export class ProblemDetailHandler extends ContestDetailBaseHandler { if (this.domain.langs) t.push(this.domain.langs.split(',').map((i) => i.trim()).filter((i) => i)); this.pdoc.config.langs = intersection(baseLangs, ...t); } - await bus.parallel('problem/get', this.pdoc, this); + await this.ctx.parallel('problem/get', this.pdoc, this); [this.psdoc, this.udoc] = await Promise.all([ problem.getStatus(domainId, this.pdoc.docId, this.user._id), user.getById(domainId, this.pdoc.owner), @@ -547,7 +546,7 @@ export class ProblemSubmitHandler extends ProblemDetailHandler { tid && contest.updateStatus(domainId, tid, this.user._id, rid, this.pdoc.docId), ]); } - bus.broadcast('record/change', rdoc); + this.ctx.broadcast('record/change', rdoc); if (tid && !pretest && !contest.canShowSelfRecord.call(this, this.tdoc)) { this.response.body = { tid }; this.response.redirect = this.url(this.tdoc.rule === 'homework' ? 'homework_detail' : 'contest_detail', { tid }); @@ -608,7 +607,7 @@ export class ProblemHackHandler extends ProblemDetailHandler { ); const rdoc = await record.get(domainId, rid); // TODO contest: update status; - bus.broadcast('record/change', rdoc); + this.ctx.broadcast('record/change', rdoc); this.response.body = { rid }; this.response.redirect = this.url('record_detail', { rid }); } diff --git a/packages/hydrooj/src/handler/status.ts b/packages/hydrooj/src/handler/status.ts index 94fb0e76..dd9e4a2d 100644 --- a/packages/hydrooj/src/handler/status.ts +++ b/packages/hydrooj/src/handler/status.ts @@ -1,9 +1,9 @@ +import { Context } from '../context'; import { PRIV } from '../model/builtin'; import * as DocumentModel from '../model/document'; import DomainModel from '../model/domain'; import RecordModel from '../model/record'; import UserModel from '../model/user'; -import * as bus from '../service/bus'; import db from '../service/db'; import { Handler } from '../service/server'; @@ -70,10 +70,9 @@ class StatusUpdateHandler extends Handler { } } -bus.on('ready', () => coll.createIndex('updateAt', { expireAfterSeconds: 24 * 3600 })); - -export async function apply(ctx) { +export async function apply(ctx: Context) { ctx.Route('status', '/status', StatusHandler); ctx.Route('status_admin', '/.status', AdminStatusHandler); ctx.Route('status_update', '/status/update', StatusUpdateHandler); + await db.ensureIndexes(coll, { key: { updateAt: 1 }, expireAfterSeconds: 24 * 2600 }); } diff --git a/packages/hydrooj/src/model/task.ts b/packages/hydrooj/src/model/task.ts index 4706306b..eaa6a9b6 100644 --- a/packages/hydrooj/src/model/task.ts +++ b/packages/hydrooj/src/model/task.ts @@ -112,7 +112,6 @@ export async function apply(ctx: Context) { }); if (process.env.NODE_APP_INSTANCE !== '0') return; - await collEvent.createIndex({ expire: 1 }, { expireAfterSeconds: 0 }); const stream = collEvent.watch(); const handleEvent = async (doc: EventDoc) => { const payload = JSON.parse(doc.payload); @@ -138,6 +137,7 @@ export async function apply(ctx: Context) { await (res.value ? handleEvent(res.value) : sleep(500)); } }); + await db.ensureIndexes(collEvent, { name: 'expire', key: { expire: 1 }, expireAfterSeconds: 0 }); await db.ensureIndexes(coll, { name: 'task', key: { type: 1, subType: 1, priority: -1 } }); } diff --git a/packages/hydrooj/src/service/decorators.ts b/packages/hydrooj/src/service/decorators.ts index 4ad2c283..ff163e60 100644 --- a/packages/hydrooj/src/service/decorators.ts +++ b/packages/hydrooj/src/service/decorators.ts @@ -1,6 +1,7 @@ import { Time } from '@hydrooj/utils'; import { ValidationError } from '../error'; import { Converter, Type, Validator } from '../lib/validator'; +import { EventMap } from './bus'; import type { Handler } from './server'; type MethodDecorator = (target: any, name: string, obj: any) => any; @@ -81,6 +82,12 @@ export const post: DescriptorBuilder = (name, ...args) => _descriptor(_buildPara export const route: DescriptorBuilder = (name, ...args) => _descriptor(_buildParam(name, 'route', ...args)); export const param: DescriptorBuilder = (name, ...args) => _descriptor(_buildParam(name, 'all', ...args)); +export const subscribe: (name: keyof EventMap) => MethodDecorator = (name) => (target, funcName, obj) => { + target.__subscribe ||= []; + target.__subscribe.push({ name, target: obj.value }); + return obj; +}; + export function requireSudo(target: any, funcName: string, obj: any) { const originalMethod = obj.value; obj.value = function sudo(this: Handler, ...args: any[]) { diff --git a/packages/hydrooj/src/service/server.ts b/packages/hydrooj/src/service/server.ts index 21f3cd12..ea27e4fb 100644 --- a/packages/hydrooj/src/service/server.ts +++ b/packages/hydrooj/src/service/server.ts @@ -374,22 +374,24 @@ export function Connection( await bus.parallel('connection/create', h); ctx.handler = h; h.conn = conn; + const disposables = []; try { checker.call(h); if (h._prepare) await h._prepare(args); if (h.prepare) await h.prepare(args); - if (h.message) { - conn.onmessage = (e) => { - h.message(JSON.parse(e.data.toString())); - }; - } + // eslint-disable-next-line @typescript-eslint/no-shadow + for (const { name, target } of h.__subscribe || []) disposables.push(bus.on(name, target.bind(h))); + conn.onmessage = (e) => h.message?.(JSON.parse(e.data.toString())); conn.onclose = () => { bus.emit('connection/close', h); + disposables.forEach((d) => d()); h.cleanup?.(args); }; await bus.parallel('connection/active', h); } catch (e) { await h.onerror(e); + } finally { + disposables.forEach((d) => d()); } }); return router.disposeLastOp; diff --git a/packages/hydrooj/src/upgrade.ts b/packages/hydrooj/src/upgrade.ts index 2518635a..e67de50e 100644 --- a/packages/hydrooj/src/upgrade.ts +++ b/packages/hydrooj/src/upgrade.ts @@ -524,14 +524,12 @@ const scripts: UpgradeScript[] = [ if (defaultPriv & list[key]) defaultPriv -= list[key]; } await system.set('default.priv', defaultPriv); - return iterateAllUser(async (udoc) => { - if (udoc.priv < 0) return; - const old = udoc.priv; - for (const key in list) { - if (udoc.priv & list[key]) udoc.priv -= list[key]; - } - if (old !== udoc.priv) await user.setById(udoc._id, { priv: udoc.priv }); - }); + for (const key in list) { + await user.coll.updateMany( + { priv: { $bitsAllSet: list[key] } }, + { $inc: { priv: -list[key] } }, + ); + } }, ];