core: add @subscribe decorator

pull/501/head
undefined 2 years ago
parent 88461a78cd
commit 3807d12168

@ -27,9 +27,8 @@ import * as system from '../model/system';
import token from '../model/token';
import * as training from '../model/training';
import user from '../model/user';
import * as bus from '../service/bus';
import {
ConnectionHandler, Handler, param, query, requireSudo, Types,
ConnectionHandler, Handler, param, query, requireSudo, subscribe, Types,
} from '../service/server';
import { camelCase, md5 } from '../utils';
@ -519,22 +518,14 @@ class HomeMessagesHandler extends Handler {
class HomeMessagesConnectionHandler extends ConnectionHandler {
category = '#message';
dispose: bus.Disposable;
async prepare() {
this.dispose = bus.on('user/message', this.onMessageReceived.bind(this));
}
@subscribe('user/message')
async onMessageReceived(uid: number, mdoc: MessageDoc) {
if (uid !== this.user._id) return;
const udoc = (await user.getById(this.args.domainId, mdoc.from))!;
udoc.avatarUrl = avatar(udoc.avatar, 64);
this.send({ udoc, mdoc });
}
async cleanup() {
if (this.dispose) this.dispose();
}
}
export async function apply(ctx: Context) {

@ -30,7 +30,6 @@ import solution from '../model/solution';
import storage from '../model/storage';
import * as system from '../model/system';
import user from '../model/user';
import * as bus from '../service/bus';
import {
Handler, param, post, query, route, Types,
} from '../service/server';
@ -168,7 +167,7 @@ export class ProblemMainHandler extends Handler {
});
sort = result.hits;
}
await bus.parallel('problem/list', query, this);
await this.ctx.parallel('problem/list', query, this);
// eslint-disable-next-line prefer-const
let [pdocs, ppcount, pcount] = fail
? [[], 0, 0]
@ -297,7 +296,7 @@ export class ProblemRandomHandler extends Handler {
.map((i) => i.split('category:')[1]?.split(',')));
const q = buildQuery(this.user);
if (category.length) q.$and = category.map((tag) => ({ tag }));
await bus.parallel('problem/list', q, this);
await this.ctx.parallel('problem/list', q, this);
const pid = await problem.random(domainId, q);
if (!pid) throw new NoProblemError();
this.response.body = { pid };
@ -352,7 +351,7 @@ export class ProblemDetailHandler extends ContestDetailBaseHandler {
if (this.domain.langs) t.push(this.domain.langs.split(',').map((i) => i.trim()).filter((i) => i));
this.pdoc.config.langs = intersection(baseLangs, ...t);
}
await bus.parallel('problem/get', this.pdoc, this);
await this.ctx.parallel('problem/get', this.pdoc, this);
[this.psdoc, this.udoc] = await Promise.all([
problem.getStatus(domainId, this.pdoc.docId, this.user._id),
user.getById(domainId, this.pdoc.owner),
@ -547,7 +546,7 @@ export class ProblemSubmitHandler extends ProblemDetailHandler {
tid && contest.updateStatus(domainId, tid, this.user._id, rid, this.pdoc.docId),
]);
}
bus.broadcast('record/change', rdoc);
this.ctx.broadcast('record/change', rdoc);
if (tid && !pretest && !contest.canShowSelfRecord.call(this, this.tdoc)) {
this.response.body = { tid };
this.response.redirect = this.url(this.tdoc.rule === 'homework' ? 'homework_detail' : 'contest_detail', { tid });
@ -608,7 +607,7 @@ export class ProblemHackHandler extends ProblemDetailHandler {
);
const rdoc = await record.get(domainId, rid);
// TODO contest: update status;
bus.broadcast('record/change', rdoc);
this.ctx.broadcast('record/change', rdoc);
this.response.body = { rid };
this.response.redirect = this.url('record_detail', { rid });
}

@ -1,9 +1,9 @@
import { Context } from '../context';
import { PRIV } from '../model/builtin';
import * as DocumentModel from '../model/document';
import DomainModel from '../model/domain';
import RecordModel from '../model/record';
import UserModel from '../model/user';
import * as bus from '../service/bus';
import db from '../service/db';
import { Handler } from '../service/server';
@ -70,10 +70,9 @@ class StatusUpdateHandler extends Handler {
}
}
bus.on('ready', () => coll.createIndex('updateAt', { expireAfterSeconds: 24 * 3600 }));
export async function apply(ctx) {
export async function apply(ctx: Context) {
ctx.Route('status', '/status', StatusHandler);
ctx.Route('status_admin', '/.status', AdminStatusHandler);
ctx.Route('status_update', '/status/update', StatusUpdateHandler);
await db.ensureIndexes(coll, { key: { updateAt: 1 }, expireAfterSeconds: 24 * 2600 });
}

@ -112,7 +112,6 @@ export async function apply(ctx: Context) {
});
if (process.env.NODE_APP_INSTANCE !== '0') return;
await collEvent.createIndex({ expire: 1 }, { expireAfterSeconds: 0 });
const stream = collEvent.watch();
const handleEvent = async (doc: EventDoc) => {
const payload = JSON.parse(doc.payload);
@ -138,6 +137,7 @@ export async function apply(ctx: Context) {
await (res.value ? handleEvent(res.value) : sleep(500));
}
});
await db.ensureIndexes(collEvent, { name: 'expire', key: { expire: 1 }, expireAfterSeconds: 0 });
await db.ensureIndexes(coll, { name: 'task', key: { type: 1, subType: 1, priority: -1 } });
}

@ -1,6 +1,7 @@
import { Time } from '@hydrooj/utils';
import { ValidationError } from '../error';
import { Converter, Type, Validator } from '../lib/validator';
import { EventMap } from './bus';
import type { Handler } from './server';
type MethodDecorator = (target: any, name: string, obj: any) => any;
@ -81,6 +82,12 @@ export const post: DescriptorBuilder = (name, ...args) => _descriptor(_buildPara
export const route: DescriptorBuilder = (name, ...args) => _descriptor(_buildParam(name, 'route', ...args));
export const param: DescriptorBuilder = (name, ...args) => _descriptor(_buildParam(name, 'all', ...args));
export const subscribe: (name: keyof EventMap) => MethodDecorator = (name) => (target, funcName, obj) => {
target.__subscribe ||= [];
target.__subscribe.push({ name, target: obj.value });
return obj;
};
export function requireSudo(target: any, funcName: string, obj: any) {
const originalMethod = obj.value;
obj.value = function sudo(this: Handler, ...args: any[]) {

@ -374,22 +374,24 @@ export function Connection(
await bus.parallel('connection/create', h);
ctx.handler = h;
h.conn = conn;
const disposables = [];
try {
checker.call(h);
if (h._prepare) await h._prepare(args);
if (h.prepare) await h.prepare(args);
if (h.message) {
conn.onmessage = (e) => {
h.message(JSON.parse(e.data.toString()));
};
}
// eslint-disable-next-line @typescript-eslint/no-shadow
for (const { name, target } of h.__subscribe || []) disposables.push(bus.on(name, target.bind(h)));
conn.onmessage = (e) => h.message?.(JSON.parse(e.data.toString()));
conn.onclose = () => {
bus.emit('connection/close', h);
disposables.forEach((d) => d());
h.cleanup?.(args);
};
await bus.parallel('connection/active', h);
} catch (e) {
await h.onerror(e);
} finally {
disposables.forEach((d) => d());
}
});
return router.disposeLastOp;

@ -524,14 +524,12 @@ const scripts: UpgradeScript[] = [
if (defaultPriv & list[key]) defaultPriv -= list[key];
}
await system.set('default.priv', defaultPriv);
return iterateAllUser(async (udoc) => {
if (udoc.priv < 0) return;
const old = udoc.priv;
for (const key in list) {
if (udoc.priv & list[key]) udoc.priv -= list[key];
}
if (old !== udoc.priv) await user.setById(udoc._id, { priv: udoc.priv });
});
for (const key in list) {
await user.coll.updateMany(
{ priv: { $bitsAllSet: list[key] } },
{ $inc: { priv: -list[key] } },
);
}
},
];

Loading…
Cancel
Save