Coverage for src / competitive_verifier / oj / resolver.py: 89%
176 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-05 16:00 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-05 16:00 +0000
1import fnmatch
2import hashlib
3import os
4import pathlib
5import traceback
6from argparse import ArgumentParser
7from collections.abc import Generator
8from functools import cached_property
9from itertools import chain
10from logging import getLogger
11from typing import Any, Literal
13from pydantic import Field, ValidationError
15from competitive_verifier import config, git
16from competitive_verifier.arg import IncludeExcludeArguments, VerboseArguments
17from competitive_verifier.log import GitHubMessageParams
18from competitive_verifier.models import (
19 AddtionalSource,
20 CommandVerification,
21 ConstVerification,
22 LocalProblemVerification,
23 ProblemVerification,
24 ResultStatus,
25 Verification,
26 VerificationFile,
27 VerificationInput,
28)
29from competitive_verifier.util import resolve_relative_or_abs_path
31from .problem import problem_from_url
32from .verify.list import OjVerifyConfig
33from .verify.models import LanguageEnvironment
35logger = getLogger(__name__)
38def _get_bundled_dir() -> pathlib.Path:
39 return config.get_config_dir() / "bundled"
42def _write_bundled(content: bytes, *, path: pathlib.Path) -> pathlib.Path:
43 """Write bundled code.
45 Returns:
46 output file path.
47 """
48 dest_dir = _get_bundled_dir()
49 dest_path = dest_dir / path
50 dest_path.parent.mkdir(parents=True, exist_ok=True)
51 logger.info("bundle_path=%s", dest_path.as_posix())
52 dest_path.write_bytes(content)
53 return dest_path
56class OjResolver:
57 include: list[str]
58 exclude: list[str]
59 config: OjVerifyConfig
60 _match_exclude_cache: dict[pathlib.Path, bool]
62 def __init__(
63 self,
64 *,
65 include: list[str],
66 exclude: list[str],
67 config: OjVerifyConfig,
68 ) -> None:
69 def _remove_slash(s: str):
70 s = os.path.normpath(s)
71 while len(s) > 1 and s[-1] == os.sep: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 s = s[:-1]
73 return s
75 self.include = list(map(_remove_slash, include))
76 self.exclude = list(map(_remove_slash, exclude))
77 self.config = config
78 self._match_exclude_cache = {}
80 def _match_exclude2(self, paths: list[pathlib.Path]) -> bool:
81 if not paths:
82 return False
83 path = paths.pop()
84 cache = self._match_exclude_cache.get(path, None)
85 if cache is not None:
86 return cache
88 for ex in self.exclude:
89 if fnmatch.fnmatch(path.as_posix(), ex):
90 self._match_exclude_cache[path] = True
91 return True
92 result = self._match_exclude2(paths)
93 self._match_exclude_cache[path] = result
94 return result
96 def _match_exclude(self, path: pathlib.Path) -> bool:
97 paths = list(path.parents)
98 paths.reverse()
99 paths.append(path)
100 return self._match_exclude2(paths)
102 @cached_property
103 def _lang_dict(self):
104 return self.config.get_dict()
106 @staticmethod
107 def env_to_verifications(
108 env: LanguageEnvironment,
109 *,
110 attr: dict[str, Any],
111 path: pathlib.Path,
112 basedir: pathlib.Path,
113 ) -> Generator[Verification, None, None]:
114 if "IGNORE" in attr:
115 yield ConstVerification(status=ResultStatus.SKIPPED)
116 return
118 error_str = attr.get("ERROR")
119 try:
120 error = float(error_str) if error_str else None
121 except ValueError:
122 error = None
124 tle_str = attr.get("TLE")
125 tle = float(tle_str) if tle_str else None
127 mle_str = attr.get("MLE")
128 mle = float(mle_str) if mle_str else None
130 yield from OjResolver._env_to_local_problem_verification(
131 env,
132 attr=attr,
133 path=path,
134 basedir=basedir,
135 error=error,
136 tle=tle,
137 mle=mle,
138 )
139 yield from OjResolver._env_to_problem_verification(
140 env,
141 attr=attr,
142 path=path,
143 basedir=basedir,
144 error=error,
145 tle=tle,
146 mle=mle,
147 )
148 yield from OjResolver._env_to_standalone_verification(
149 env,
150 attr=attr,
151 path=path,
152 basedir=basedir,
153 )
154 yield from OjResolver._env_to_unittest_verification(attr=attr)
156 @staticmethod
157 def _env_to_problem_verification(
158 env: LanguageEnvironment,
159 *,
160 attr: dict[str, Any],
161 path: pathlib.Path,
162 basedir: pathlib.Path,
163 error: float | None,
164 tle: float | None,
165 mle: float | None,
166 ) -> Generator[Verification, None, None]:
167 url = attr.get("PROBLEM")
169 if not url:
170 return
172 problem = problem_from_url(url)
173 if problem is None: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 logger.error(
175 'The URL "%s" is not supported',
176 url,
177 extra={"github": GitHubMessageParams()},
178 )
179 yield ConstVerification(status=ResultStatus.FAILURE)
180 else:
181 tempdir = problem.problem_directory
182 yield ProblemVerification(
183 name=env.name,
184 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir),
185 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir),
186 problem=url,
187 error=error,
188 tle=tle,
189 mle=mle,
190 )
192 @staticmethod
193 def _env_to_local_problem_verification(
194 env: LanguageEnvironment,
195 *,
196 attr: dict[str, Any],
197 path: pathlib.Path,
198 basedir: pathlib.Path,
199 error: float | None,
200 tle: float | None,
201 mle: float | None,
202 ) -> Generator[Verification, None, None]:
203 casedir = attr.get("LOCALCASE")
204 if casedir is None:
205 return
206 casedir = resolve_relative_or_abs_path(casedir, basedir=path.parent)
207 if casedir is None: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 return
210 tempdir = (
211 config.get_cache_dir()
212 / "localcase"
213 / hashlib.md5(
214 path.as_posix().encode("utf-8"), usedforsecurity=False
215 ).hexdigest()
216 )
217 yield LocalProblemVerification(
218 name=env.name,
219 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir),
220 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir),
221 input=casedir,
222 error=error,
223 tle=tle,
224 mle=mle,
225 tempdir=tempdir,
226 )
228 @staticmethod
229 def _env_to_standalone_verification(
230 env: LanguageEnvironment,
231 *,
232 attr: dict[str, Any],
233 path: pathlib.Path,
234 basedir: pathlib.Path,
235 ) -> Generator[Verification, None, None]:
236 if attr.get("STANDALONE") is None:
237 return
239 tempdir = (
240 config.get_cache_dir()
241 / "standalone"
242 / hashlib.md5(
243 path.as_posix().encode("utf-8"), usedforsecurity=False
244 ).hexdigest()
245 )
246 yield CommandVerification(
247 name=env.name,
248 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir),
249 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir),
250 tempdir=tempdir,
251 )
253 @staticmethod
254 def _env_to_unittest_verification(
255 *,
256 attr: dict[str, Any],
257 ) -> Generator[Verification, None, None]:
258 unit_test_envvar = attr.get("UNITTEST")
259 if not unit_test_envvar:
260 return
261 var = os.getenv(unit_test_envvar)
262 if var is None: 262 ↛ 265line 262 didn't jump to line 265 because the condition on line 262 was always true
263 logger.warning("UNITTEST envvar %s is not defined.", unit_test_envvar)
264 yield ConstVerification(status=ResultStatus.FAILURE)
265 elif var.lower() == "false" or var == "0":
266 logger.info("UNITTEST envvar %s=%s is falsy.", unit_test_envvar, var)
267 yield ConstVerification(status=ResultStatus.FAILURE)
268 else:
269 logger.info("UNITTEST envvar %s=%s is truthy.", unit_test_envvar, var)
270 yield ConstVerification(status=ResultStatus.SUCCESS)
272 def resolve(self, *, bundle: bool) -> VerificationInput:
273 files: dict[pathlib.Path, VerificationFile] = {}
274 basedir = pathlib.Path.cwd()
276 for path in git.ls_files(*self.include):
277 if self._match_exclude(path):
278 logger.debug("exclude=%s", path)
279 continue
281 language = self._lang_dict.get(path.suffix)
282 if language is None:
283 continue
285 deps = set(git.ls_files(*language.list_dependencies(path, basedir=basedir)))
286 attr = language.list_attributes(path, basedir=basedir)
288 additonal_sources: list[AddtionalSource] = []
289 if bundle:
290 try:
291 bundled_code = language.bundle(path, basedir=basedir)
292 if bundled_code:
293 dest_path = _write_bundled(bundled_code, path=path)
294 additonal_sources.append(
295 AddtionalSource(name="bundled", path=dest_path)
296 )
297 except Exception: # noqa: BLE001
298 dest_path = _write_bundled(
299 traceback.format_exc().encode(), path=path
300 )
301 additonal_sources.append(
302 AddtionalSource(name="bundle error", path=dest_path)
303 )
305 verifications = list(
306 chain.from_iterable(
307 self.env_to_verifications(vs, attr=attr, path=path, basedir=basedir)
308 for vs in language.list_environments(path, basedir=basedir)
309 )
310 )
311 files[path] = VerificationFile(
312 dependencies=deps,
313 verification=verifications,
314 document_attributes=attr,
315 additonal_sources=additonal_sources,
316 )
317 return VerificationInput(files=files)
320class OjResolve(IncludeExcludeArguments, VerboseArguments):
321 subcommand: Literal["oj-resolve"] = Field(
322 default="oj-resolve",
323 description="Create verify_files json using `oj-verify`",
324 )
325 bundle: bool = True
326 config: pathlib.Path | OjVerifyConfig | None = None
328 @classmethod
329 def add_parser(cls, parser: ArgumentParser):
330 super().add_parser(parser)
331 parser.add_argument(
332 "--no-bundle",
333 dest="bundle",
334 action="store_false",
335 help="Disable bundle",
336 )
337 parser.add_argument(
338 "--config",
339 help="config.toml",
340 type=pathlib.Path,
341 )
343 def to_resolver(self) -> OjResolver:
344 if self.config is None:
345 logger.info("no config file")
346 config = OjVerifyConfig()
347 elif not isinstance(self.config, OjVerifyConfig): 347 ↛ 360line 347 didn't jump to line 360 because the condition on line 347 was always true
348 config_path = pathlib.Path(self.config)
349 try:
350 with config_path.open("rb") as fp:
351 config = OjVerifyConfig.load(fp)
352 logger.info("config file loaded: %s: %s", config_path, config)
353 except ValidationError:
354 logger.exception(
355 "config file validation error",
356 extra={"github": GitHubMessageParams(file=config_path)},
357 )
358 config = OjVerifyConfig()
359 else:
360 config = self.config
362 return OjResolver(
363 include=self.include,
364 exclude=self.exclude,
365 config=config,
366 )
368 def _run(self) -> bool:
369 logger.debug("arguments:%s", self)
370 resolved = self.to_resolver().resolve(bundle=self.bundle)
371 print(resolved.model_dump_json(exclude_none=True))
372 return True