Coverage for src / competitive_verifier / oj / resolver.py: 88%
175 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-26 12:38 +0900
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-26 12:38 +0900
1import fnmatch
2import hashlib
3import os
4import pathlib
5import traceback
6from argparse import ArgumentParser
7from collections.abc import Generator
8from functools import cached_property
9from itertools import chain
10from logging import getLogger
11from typing import Any, Literal
13from pydantic import Field, ValidationError
15from competitive_verifier import config, git
16from competitive_verifier.arg import IncludeExcludeArguments, VerboseArguments
17from competitive_verifier.log import GitHubMessageParams
18from competitive_verifier.models import (
19 AddtionalSource,
20 CommandVerification,
21 ConstVerification,
22 LocalProblemVerification,
23 ProblemVerification,
24 ResultStatus,
25 Verification,
26 VerificationFile,
27 VerificationInput,
28)
29from competitive_verifier.util import resolve_referenced_path
31from .languages import LanguageEnvironment, VerificationConfig
32from .problem import problem_from_url
34logger = getLogger(__name__)
37def _get_bundled_dir() -> pathlib.Path:
38 return config.get_config_dir() / "bundled"
41def _write_bundled(content: bytes, *, path: pathlib.Path) -> pathlib.Path:
42 """Write bundled code.
44 Returns:
45 output file path.
46 """
47 dest_dir = _get_bundled_dir()
48 dest_path = dest_dir / path
49 dest_path.parent.mkdir(parents=True, exist_ok=True)
50 logger.info("bundle_path=%s", dest_path.as_posix())
51 dest_path.write_bytes(content)
52 return dest_path
55class OjResolver:
56 include: list[str]
57 exclude: list[str]
58 config: VerificationConfig
59 _match_exclude_cache: dict[pathlib.Path, bool]
61 def __init__(
62 self,
63 *,
64 include: list[str],
65 exclude: list[str],
66 config: VerificationConfig,
67 ) -> None:
68 def _remove_slash(s: str):
69 s = os.path.normpath(s)
70 while len(s) > 1 and s[-1] == os.sep: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 s = s[:-1]
72 return s
74 self.include = list(map(_remove_slash, include))
75 self.exclude = list(map(_remove_slash, exclude))
76 self.config = config
77 self._match_exclude_cache = {}
79 def _match_exclude2(self, paths: list[pathlib.Path]) -> bool:
80 if not paths:
81 return False
82 path = paths.pop()
83 cache = self._match_exclude_cache.get(path, None)
84 if cache is not None:
85 return cache
87 for ex in self.exclude:
88 if fnmatch.fnmatch(path.as_posix(), ex):
89 self._match_exclude_cache[path] = True
90 return True
91 result = self._match_exclude2(paths)
92 self._match_exclude_cache[path] = result
93 return result
95 def _match_exclude(self, path: pathlib.Path) -> bool:
96 paths = list(path.parents)
97 paths.reverse()
98 paths.append(path)
99 return self._match_exclude2(paths)
101 @cached_property
102 def _lang_dict(self):
103 return self.config.get_dict()
105 @staticmethod
106 def env_to_verifications(
107 env: LanguageEnvironment,
108 *,
109 attr: dict[str, Any],
110 path: pathlib.Path,
111 basedir: pathlib.Path,
112 ) -> Generator[Verification, None, None]:
113 if "IGNORE" in attr:
114 yield ConstVerification(status=ResultStatus.SKIPPED)
115 return
117 error_str = attr.get("ERROR")
118 try:
119 error = float(error_str) if error_str else None
120 except ValueError:
121 error = None
123 tle_str = attr.get("TLE")
124 tle = float(tle_str) if tle_str else None
126 mle_str = attr.get("MLE")
127 mle = float(mle_str) if mle_str else None
129 yield from OjResolver._env_to_local_problem_verification(
130 env,
131 attr=attr,
132 path=path,
133 basedir=basedir,
134 error=error,
135 tle=tle,
136 mle=mle,
137 )
138 yield from OjResolver._env_to_problem_verification(
139 env,
140 attr=attr,
141 path=path,
142 basedir=basedir,
143 error=error,
144 tle=tle,
145 mle=mle,
146 )
147 yield from OjResolver._env_to_standalone_verification(
148 env,
149 attr=attr,
150 path=path,
151 basedir=basedir,
152 )
153 yield from OjResolver._env_to_unittest_verification(attr=attr)
155 @staticmethod
156 def _env_to_problem_verification(
157 env: LanguageEnvironment,
158 *,
159 attr: dict[str, Any],
160 path: pathlib.Path,
161 basedir: pathlib.Path,
162 error: float | None,
163 tle: float | None,
164 mle: float | None,
165 ) -> Generator[Verification, None, None]:
166 url = attr.get("PROBLEM")
168 if not url:
169 return
171 problem = problem_from_url(url)
172 if problem is None: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 logger.error(
174 'The URL "%s" is not supported',
175 url,
176 extra={"github": GitHubMessageParams()},
177 )
178 yield ConstVerification(status=ResultStatus.FAILURE)
179 else:
180 tempdir = problem.problem_directory
181 yield ProblemVerification(
182 name=env.name,
183 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir),
184 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir),
185 problem=url,
186 error=error,
187 tle=tle,
188 mle=mle,
189 )
191 @staticmethod
192 def _env_to_local_problem_verification(
193 env: LanguageEnvironment,
194 *,
195 attr: dict[str, Any],
196 path: pathlib.Path,
197 basedir: pathlib.Path,
198 error: float | None,
199 tle: float | None,
200 mle: float | None,
201 ) -> Generator[Verification, None, None]:
202 casedir = attr.get("LOCALCASE")
203 if casedir is None:
204 return
205 casedir = resolve_referenced_path(casedir, basedir=path.parent)
206 if casedir is None: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 return
209 tempdir = (
210 config.get_cache_dir()
211 / "localcase"
212 / hashlib.md5(
213 path.as_posix().encode("utf-8"), usedforsecurity=False
214 ).hexdigest()
215 )
216 yield LocalProblemVerification(
217 name=env.name,
218 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir),
219 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir),
220 input=casedir,
221 error=error,
222 tle=tle,
223 mle=mle,
224 tempdir=tempdir,
225 )
227 @staticmethod
228 def _env_to_standalone_verification(
229 env: LanguageEnvironment,
230 *,
231 attr: dict[str, Any],
232 path: pathlib.Path,
233 basedir: pathlib.Path,
234 ) -> Generator[Verification, None, None]:
235 if attr.get("STANDALONE") is None:
236 return
238 tempdir = (
239 config.get_cache_dir()
240 / "standalone"
241 / hashlib.md5(
242 path.as_posix().encode("utf-8"), usedforsecurity=False
243 ).hexdigest()
244 )
245 yield CommandVerification(
246 name=env.name,
247 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir),
248 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir),
249 tempdir=tempdir,
250 )
252 @staticmethod
253 def _env_to_unittest_verification(
254 *,
255 attr: dict[str, Any],
256 ) -> Generator[Verification, None, None]:
257 unit_test_envvar = attr.get("UNITTEST")
258 if not unit_test_envvar:
259 return
260 var = os.getenv(unit_test_envvar)
261 if var is None: 261 ↛ 264line 261 didn't jump to line 264 because the condition on line 261 was always true
262 logger.warning("UNITTEST envvar %s is not defined.", unit_test_envvar)
263 yield ConstVerification(status=ResultStatus.FAILURE)
264 elif var.lower() == "false" or var == "0":
265 logger.info("UNITTEST envvar %s=%s is falsy.", unit_test_envvar, var)
266 yield ConstVerification(status=ResultStatus.FAILURE)
267 else:
268 logger.info("UNITTEST envvar %s=%s is truthy.", unit_test_envvar, var)
269 yield ConstVerification(status=ResultStatus.SUCCESS)
271 def resolve(self, *, bundle: bool) -> VerificationInput:
272 files: dict[pathlib.Path, VerificationFile] = {}
273 basedir = pathlib.Path.cwd()
275 for path in git.ls_files(*self.include):
276 if self._match_exclude(path):
277 logger.debug("exclude=%s", path)
278 continue
280 language = self._lang_dict.get(path.suffix)
281 if language is None:
282 continue
284 deps = set(git.ls_files(*language.list_dependencies(path, basedir=basedir)))
285 attr = language.list_attributes(path, basedir=basedir)
287 additonal_sources: list[AddtionalSource] = []
288 if bundle:
289 try:
290 bundled_code = language.bundle(path, basedir=basedir)
291 if bundled_code:
292 dest_path = _write_bundled(bundled_code, path=path)
293 additonal_sources.append(
294 AddtionalSource(name="bundled", path=dest_path)
295 )
296 except Exception: # noqa: BLE001
297 dest_path = _write_bundled(
298 traceback.format_exc().encode(), path=path
299 )
300 additonal_sources.append(
301 AddtionalSource(name="bundle error", path=dest_path)
302 )
304 verifications = list(
305 chain.from_iterable(
306 self.env_to_verifications(vs, attr=attr, path=path, basedir=basedir)
307 for vs in language.list_environments(path, basedir=basedir)
308 )
309 )
310 files[path] = VerificationFile(
311 dependencies=deps,
312 verification=verifications,
313 document_attributes=attr,
314 additonal_sources=additonal_sources,
315 )
316 return VerificationInput(files=files)
319class OjResolve(IncludeExcludeArguments, VerboseArguments):
320 subcommand: Literal["oj-resolve"] = Field(
321 default="oj-resolve",
322 description="Create verify_files json using `oj-verify`",
323 )
324 bundle: bool = True
325 config: pathlib.Path | VerificationConfig | None = None
327 @classmethod
328 def add_parser(cls, parser: ArgumentParser):
329 super().add_parser(parser)
330 parser.add_argument(
331 "--no-bundle",
332 dest="bundle",
333 action="store_false",
334 help="Disable bundle",
335 )
336 parser.add_argument(
337 "--config",
338 help="config.toml",
339 type=pathlib.Path,
340 )
342 def to_resolver(self) -> OjResolver:
343 if self.config is None:
344 logger.info("no config file")
345 config = VerificationConfig()
346 elif not isinstance(self.config, VerificationConfig): 346 ↛ 359line 346 didn't jump to line 359 because the condition on line 346 was always true
347 config_path = pathlib.Path(self.config)
348 try:
349 with config_path.open("rb") as fp:
350 config = VerificationConfig.load(fp)
351 logger.info("config file loaded: %s: %s", config_path, config)
352 except ValidationError:
353 logger.exception(
354 "config file validation error",
355 extra={"github": GitHubMessageParams(file=config_path)},
356 )
357 config = VerificationConfig()
358 else:
359 config = self.config
361 return OjResolver(
362 include=self.include,
363 exclude=self.exclude,
364 config=config,
365 )
367 def _run(self) -> bool:
368 logger.debug("arguments:%s", self)
369 resolved = self.to_resolver().resolve(bundle=self.bundle)
370 print(resolved.model_dump_json(exclude_none=True))
371 return True