diff --git a/packages/hydrojudge/src/checkers.ts b/packages/hydrojudge/src/checkers.ts index 46e3d868..db0f72a6 100644 --- a/packages/hydrojudge/src/checkers.ts +++ b/packages/hydrojudge/src/checkers.ts @@ -1,7 +1,7 @@ /* eslint-disable no-template-curly-in-string */ import { STATUS } from '@hydrooj/utils/lib/status'; import { FormatError, SystemError } from './error'; -import { CopyInFile, run } from './sandbox'; +import { CopyInFile, runQueued } from './sandbox'; import { parse } from './testlib'; export interface CheckConfig { @@ -54,7 +54,7 @@ function parseDiffMsg(msg: string) { const checkers: Record = new Proxy({ async default(config) { - const { stdout } = await run('/usr/bin/diff -BZ usrout answer', { + const { stdout } = await runQueued('/usr/bin/diff -BZ usrout answer', { copyIn: { usrout: config.user_stdout, answer: config.output, @@ -76,7 +76,7 @@ const checkers: Record = new Proxy({ }, async strict(config) { - const { stdout } = await run('/usr/bin/diff usrout answer', { + const { stdout } = await runQueued('/usr/bin/diff usrout answer', { copyIn: { usrout: config.user_stdout, answer: config.output, @@ -98,7 +98,7 @@ const checkers: Record = new Proxy({ * exit code:返回判断结果 */ async hustoj(config) { - const { code, stdout } = await run(`${config.execute} input answer usrout`, { + const { code, stdout } = await runQueued(`${config.execute} input answer usrout`, { copyIn: { usrout: config.user_stdout, answer: config.output, @@ -123,7 +123,7 @@ const checkers: Record = new Proxy({ * argv[6]:输出错误报告的文件 */ async lemon(config) { - const { files, code } = await run(`${config.execute} input usrout answer ${config.score} score message`, { + const { files, code } = await runQueued(`${config.execute} input usrout answer ${config.score} score message`, { copyIn: { usrout: config.user_stdout, answer: config.output, @@ -156,7 +156,7 @@ const checkers: Record = new Proxy({ * exit code:返回判断结果 */ async qduoj(config) { - const { status, stdout } = await run(`${config.execute} input usrout`, { + const { status, stdout } = await runQueued(`${config.execute} input usrout`, { copyIn: { usrout: config.user_stdout, input: config.input, @@ -182,7 +182,7 @@ const checkers: Record = new Proxy({ */ async syzoj(config) { // eslint-disable-next-line prefer-const - let { status, stdout, stderr } = await run(config.execute, { + let { status, stdout, stderr } = await runQueued(config.execute, { copyIn: { input: config.input, user_out: config.user_stdout, @@ -198,7 +198,7 @@ const checkers: Record = new Proxy({ }, async testlib(config) { - const { stderr, status, code } = await run(`${config.execute} /w/in /w/user_out /w/answer`, { + const { stderr, status, code } = await runQueued(`${config.execute} /w/in /w/user_out /w/answer`, { copyIn: { in: config.input, user_out: config.user_stdout, diff --git a/packages/hydrojudge/src/compile.ts b/packages/hydrojudge/src/compile.ts index 43078149..ff1239fe 100644 --- a/packages/hydrojudge/src/compile.ts +++ b/packages/hydrojudge/src/compile.ts @@ -5,7 +5,7 @@ import checkers from './checkers'; import { CompileError, FormatError } from './error'; import { Execute } from './interface'; import { - CopyIn, CopyInFile, del, run, + CopyIn, CopyInFile, del, runQueued, } from './sandbox'; import { compilerText } from './utils'; @@ -17,7 +17,7 @@ export default async function compile( if (lang.compile) { const { status, stdout, stderr, fileIds, - } = await run( + } = await runQueued( copyIn['compile.sh'] ? '/bin/bash compile.sh' : lang.compile, { copyIn: { ...copyIn, [lang.code_file]: code }, @@ -26,6 +26,7 @@ export default async function compile( time: lang.compile_time_limit || 10000, memory: lang.compile_memory_limit || 256 * 1024 * 1024, }, + 3, ); if (status === STATUS.STATUS_TIME_LIMIT_EXCEEDED) next?.({ message: 'Compile timeout.' }); if (status === STATUS.STATUS_MEMORY_LIMIT_EXCEEDED) next?.({ message: 'Compile memory limit exceeded.' }); diff --git a/packages/hydrojudge/src/config.ts b/packages/hydrojudge/src/config.ts index cc285c4b..a3d2bb5c 100644 --- a/packages/hydrojudge/src/config.ts +++ b/packages/hydrojudge/src/config.ts @@ -17,6 +17,7 @@ const JudgeSettings = Schema.object({ total_time_limit: Schema.number().default(60), processLimit: Schema.number().default(32), parallelism: Schema.number().default(2), + concurrency: Schema.number(), singleTaskParallelism: Schema.number().default(2), rerun: Schema.number().description('Re-Run testcase if time-limit-exceeded (max per submission)').default(0), rate: Schema.number().default(1), diff --git a/packages/hydrojudge/src/hosts/builtin.ts b/packages/hydrojudge/src/hosts/builtin.ts index 72a335aa..463fb7db 100644 --- a/packages/hydrojudge/src/hosts/builtin.ts +++ b/packages/hydrojudge/src/hosts/builtin.ts @@ -80,8 +80,9 @@ export async function postInit() { logger.debug('Record not found: %o', t); return; } - (new JudgeTask(session, Object.assign(rdoc, t))).handle().catch(logger.error); + await (new JudgeTask(session, Object.assign(rdoc, t))).handle().catch(logger.error); }; - TaskModel.consume({ type: 'judge' }, handle); + const parallelism = Math.max(getConfig('parallelism'), 2); + for (let i = 1; i < parallelism; i++) TaskModel.consume({ type: 'judge' }, handle); TaskModel.consume({ type: 'judge', priority: { $gt: -50 } }, handle); } diff --git a/packages/hydrojudge/src/judge/default.ts b/packages/hydrojudge/src/judge/default.ts index 228db3d4..5bdd1b6b 100644 --- a/packages/hydrojudge/src/judge/default.ts +++ b/packages/hydrojudge/src/judge/default.ts @@ -5,7 +5,7 @@ import compile, { compileChecker } from '../compile'; import { runFlow } from '../flow'; import { Execute } from '../interface'; import { Logger } from '../log'; -import { del, run } from '../sandbox'; +import { del, run, runQueued } from '../sandbox'; import signals from '../signals'; import { NormalizedCase } from '../utils'; import { Context, ContextSubTask } from './interface'; @@ -14,7 +14,7 @@ const logger = new Logger('judge/default'); function judgeCase(c: NormalizedCase) { return async (ctx: Context, ctxSubtask: ContextSubTask, runner?: Function) => { - const res = await run( + const res = await runQueued( ctx.execute.execute, { stdin: { src: c.input }, @@ -63,7 +63,7 @@ function judgeCase(c: NormalizedCase) { if (langConfig.analysis && !ctx.analysis) { ctx.analysis = true; try { - const r = await run(langConfig.analysis, { + const r = await runQueued(langConfig.analysis, { copyIn: { ...ctx.execute.copyIn, input: { src: c.input }, diff --git a/packages/hydrojudge/src/judge/hack.ts b/packages/hydrojudge/src/judge/hack.ts index 98ea9911..548df6f6 100644 --- a/packages/hydrojudge/src/judge/hack.ts +++ b/packages/hydrojudge/src/judge/hack.ts @@ -5,7 +5,7 @@ import { STATUS } from '@hydrooj/utils/lib/status'; import checkers from '../checkers'; import compile, { compileChecker, compileValidator } from '../compile'; import { Execute } from '../interface'; -import { del, run } from '../sandbox'; +import { del, run, runQueued } from '../sandbox'; import signals from '../signals'; import { parseMemoryMB, parseTimeMS } from '../utils'; import { Context } from './interface'; @@ -28,7 +28,7 @@ export async function judge(ctx: Context) { ]); ctx.clean.push(() => fs.unlink(input)); ctx.next({ status: STATUS.STATUS_JUDGING, progress: 0 }); - const validateResult = await run( + const validateResult = await runQueued( validator.execute, { stdin: { src: input }, @@ -41,7 +41,7 @@ export async function judge(ctx: Context) { const message = `${validateResult.stdout || ''}\n${validateResult.stderr || ''}`.trim(); return ctx.end({ status: STATUS.STATUS_FORMAT_ERROR, message }); } - const res = await run( + const res = await runQueued( execute.execute, { stdin: { src: input }, @@ -76,9 +76,7 @@ export async function judge(ctx: Context) { if (code < 32) message = signals[code]; else message = { message: 'Your program returned {0}.', params: [code] }; } - await Promise.all( - Object.values(res.fileIds).map((id) => del(id)), - ).catch(() => { /* Ignore file doesn't exist */ }); + await Promise.allSettled(Object.values(res.fileIds).map((id) => del(id))); if (message) ctx.next({ message }); diff --git a/packages/hydrojudge/src/judge/run.ts b/packages/hydrojudge/src/judge/run.ts index f049902f..0ca2277b 100644 --- a/packages/hydrojudge/src/judge/run.ts +++ b/packages/hydrojudge/src/judge/run.ts @@ -2,7 +2,7 @@ import { STATUS } from '@hydrooj/utils/lib/status'; import compile from '../compile'; import { CompileError } from '../error'; import { Logger } from '../log'; -import { run } from '../sandbox'; +import { runQueued } from '../sandbox'; import signals from '../signals'; import { compilerText, parseMemoryMB, parseTimeMS } from '../utils'; import { Context } from './interface'; @@ -46,7 +46,7 @@ export const judge = async (ctx: Context) => { } ctx.clean.push(ctx.execute.clean); ctx.next({ status: STATUS.STATUS_JUDGING, progress: 0 }); - const res = await run( + const res = await runQueued( ctx.execute.execute, { stdin: { content: ctx.input }, @@ -55,6 +55,7 @@ export const judge = async (ctx: Context) => { time: parseTimeMS(ctx.config.time || '1s') * 2, memory: parseMemoryMB(ctx.config.memory || '128m'), }, + 1, ); const { code, time, memory } = res; let { status } = res; @@ -85,7 +86,7 @@ export const judge = async (ctx: Context) => { if (langConfig.analysis) { try { ctx.analysis = true; - const r = await run(langConfig.analysis, { + const r = await runQueued(langConfig.analysis, { copyIn: { ...ctx.execute.copyIn, input: { content: ctx.input }, diff --git a/packages/hydrojudge/src/judge/submit_answer.ts b/packages/hydrojudge/src/judge/submit_answer.ts index 3162252e..fda77332 100644 --- a/packages/hydrojudge/src/judge/submit_answer.ts +++ b/packages/hydrojudge/src/judge/submit_answer.ts @@ -4,7 +4,7 @@ import { STATUS } from '@hydrooj/utils/lib/status'; import checkers from '../checkers'; import { compileChecker } from '../compile'; import { runFlow } from '../flow'; -import { del, run } from '../sandbox'; +import { del, runQueued } from '../sandbox'; import { NormalizedCase } from '../utils'; import { Context } from './interface'; @@ -20,7 +20,7 @@ function judgeCase(c: NormalizedCase) { let score = 0; const fileIds = []; if (ctx.config.subType === 'multi') { - const res = await run( + const res = await runQueued( '/usr/bin/unzip foo.zip', { stdin: null, @@ -54,7 +54,7 @@ function judgeCase(c: NormalizedCase) { env: { ...ctx.env, HYDRO_TESTCASE: c.id.toString() }, })); } - await Promise.all(fileIds.map(del)).catch(() => { /* Ignore file doesn't exist */ }); + await Promise.allSettled(fileIds.map(del)); return { id: c.id, status, diff --git a/packages/hydrojudge/src/sandbox.ts b/packages/hydrojudge/src/sandbox.ts index afe3b20c..5b22d7a6 100644 --- a/packages/hydrojudge/src/sandbox.ts +++ b/packages/hydrojudge/src/sandbox.ts @@ -206,10 +206,10 @@ export async function run(execute: string, params?: Parameter): Promise run(execute, params), { priority }); + return queue.add(() => run(execute, params), { priority }) as Promise; } export * from './sandbox/interface';