judge: use p-queue

pull/427/head
undefined 2 years ago
parent 636446c43a
commit 1394af0591

@ -15,10 +15,10 @@
~ */
import './utils';
import PQueue from 'p-queue';
import { getConfig } from './config';
import HydroHost from './hosts/hydro';
import log from './log';
import { Queue } from './utils';
declare global {
namespace NodeJS {
@ -51,18 +51,11 @@ process.on('unhandledRejection', (reason, p) => {
console.log('Unhandled Rejection at: Promise ', p);
});
async function worker(queue: Queue<any>) {
while ('Orz Soha') {
const [task] = await queue.get();
task.handle();
}
}
async function daemon() {
const _hosts = getConfig('hosts');
const hosts = {};
const queue = new Queue<any>();
worker(queue).catch((e) => log.error(e));
const queue = new PQueue({ concurrency: Infinity });
queue.on('error', (e) => log.error(e));
for (const i in _hosts) {
_hosts[i].host = _hosts[i].host || i;
hosts[i] = new HydroHost(_hosts[i]);

@ -13,7 +13,7 @@ import { getConfig } from '../config';
import { FormatError, SystemError } from '../error';
import log from '../log';
import { JudgeTask } from '../task';
import { Lock, Queue } from '../utils';
import { Lock } from '../utils';
function removeNixPath(text: string) {
return text.replace(/\/nix\/store\/[a-z0-9]{32}-/g, '/nix/');
@ -150,7 +150,7 @@ export default class Hydro {
};
}
async consume(queue: Queue<any>) {
async consume(queue: PQueue) {
log.info('正在连接 %sjudge/conn', this.config.server_url);
this.ws = new WebSocket(`${this.config.server_url.replace(/^http/i, 'ws')}judge/conn`, {
headers: {
@ -165,7 +165,7 @@ export default class Hydro {
this.ws.on('message', (data) => {
const request = JSON.parse(data.toString());
if (request.language) this.language = request.language;
if (request.task) queue.push(new JudgeTask(this, request.task));
if (request.task) queue.add(() => new JudgeTask(this, request.task).handle().catch((e) => log.error(e)));
});
this.ws.on('close', (data, reason) => {
log.warn(`[${this.config.host}] Websocket 断开:`, data, reason.toString());
@ -222,7 +222,7 @@ export default class Hydro {
}
}
async retry(queue: Queue<any>) {
async retry(queue: PQueue) {
this.consume(queue).catch(() => {
setTimeout(() => this.retry(queue), 30000);
});

@ -1,7 +1,5 @@
import { EventEmitter } from 'events';
import path from 'path';
import fs from 'fs-extra';
import _ from 'lodash';
import { parse } from 'shell-quote';
import { sleep } from '@hydrooj/utils/lib/utils';
import { FormatError } from './error';
@ -15,34 +13,6 @@ export function parseFilename(filePath: string) {
return t[t.length - 1];
}
export class Queue<T> extends EventEmitter {
queue: T[] = [];
waiting: any[] = [];
get(count = 1) {
if (this.queue.length < count) {
return new Promise<T[]>((resolve) => {
this.waiting.push({ count, resolve });
});
}
const items = [];
for (let i = 0; i < count; i++) items.push(this.queue[i]);
this.queue = _.drop(this.queue, count);
return items as T[];
}
push(value: T) {
this.queue.push(value);
if (this.waiting.length && this.waiting[0].count <= this.queue.length) {
const items = [];
for (let i = 0; i < this.waiting[0].count; i++) items.push(this.queue[i]);
this.queue = _.drop(this.queue, this.waiting[0].count);
this.waiting[0].resolve(items);
this.waiting.shift();
}
}
}
export namespace Lock {
const data = {};

Loading…
Cancel
Save