Coverage for src / competitive_verifier / oj / resolver.py: 88%

175 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-04-26 12:38 +0900

1import fnmatch 

2import hashlib 

3import os 

4import pathlib 

5import traceback 

6from argparse import ArgumentParser 

7from collections.abc import Generator 

8from functools import cached_property 

9from itertools import chain 

10from logging import getLogger 

11from typing import Any, Literal 

12 

13from pydantic import Field, ValidationError 

14 

15from competitive_verifier import config, git 

16from competitive_verifier.arg import IncludeExcludeArguments, VerboseArguments 

17from competitive_verifier.log import GitHubMessageParams 

18from competitive_verifier.models import ( 

19 AddtionalSource, 

20 CommandVerification, 

21 ConstVerification, 

22 LocalProblemVerification, 

23 ProblemVerification, 

24 ResultStatus, 

25 Verification, 

26 VerificationFile, 

27 VerificationInput, 

28) 

29from competitive_verifier.util import resolve_referenced_path 

30 

31from .languages import LanguageEnvironment, VerificationConfig 

32from .problem import problem_from_url 

33 

34logger = getLogger(__name__) 

35 

36 

37def _get_bundled_dir() -> pathlib.Path: 

38 return config.get_config_dir() / "bundled" 

39 

40 

41def _write_bundled(content: bytes, *, path: pathlib.Path) -> pathlib.Path: 

42 """Write bundled code. 

43 

44 Returns: 

45 output file path. 

46 """ 

47 dest_dir = _get_bundled_dir() 

48 dest_path = dest_dir / path 

49 dest_path.parent.mkdir(parents=True, exist_ok=True) 

50 logger.info("bundle_path=%s", dest_path.as_posix()) 

51 dest_path.write_bytes(content) 

52 return dest_path 

53 

54 

55class OjResolver: 

56 include: list[str] 

57 exclude: list[str] 

58 config: VerificationConfig 

59 _match_exclude_cache: dict[pathlib.Path, bool] 

60 

61 def __init__( 

62 self, 

63 *, 

64 include: list[str], 

65 exclude: list[str], 

66 config: VerificationConfig, 

67 ) -> None: 

68 def _remove_slash(s: str): 

69 s = os.path.normpath(s) 

70 while len(s) > 1 and s[-1] == os.sep: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 s = s[:-1] 

72 return s 

73 

74 self.include = list(map(_remove_slash, include)) 

75 self.exclude = list(map(_remove_slash, exclude)) 

76 self.config = config 

77 self._match_exclude_cache = {} 

78 

79 def _match_exclude2(self, paths: list[pathlib.Path]) -> bool: 

80 if not paths: 

81 return False 

82 path = paths.pop() 

83 cache = self._match_exclude_cache.get(path, None) 

84 if cache is not None: 

85 return cache 

86 

87 for ex in self.exclude: 

88 if fnmatch.fnmatch(path.as_posix(), ex): 

89 self._match_exclude_cache[path] = True 

90 return True 

91 result = self._match_exclude2(paths) 

92 self._match_exclude_cache[path] = result 

93 return result 

94 

95 def _match_exclude(self, path: pathlib.Path) -> bool: 

96 paths = list(path.parents) 

97 paths.reverse() 

98 paths.append(path) 

99 return self._match_exclude2(paths) 

100 

101 @cached_property 

102 def _lang_dict(self): 

103 return self.config.get_dict() 

104 

105 @staticmethod 

106 def env_to_verifications( 

107 env: LanguageEnvironment, 

108 *, 

109 attr: dict[str, Any], 

110 path: pathlib.Path, 

111 basedir: pathlib.Path, 

112 ) -> Generator[Verification, None, None]: 

113 if "IGNORE" in attr: 

114 yield ConstVerification(status=ResultStatus.SKIPPED) 

115 return 

116 

117 error_str = attr.get("ERROR") 

118 try: 

119 error = float(error_str) if error_str else None 

120 except ValueError: 

121 error = None 

122 

123 tle_str = attr.get("TLE") 

124 tle = float(tle_str) if tle_str else None 

125 

126 mle_str = attr.get("MLE") 

127 mle = float(mle_str) if mle_str else None 

128 

129 yield from OjResolver._env_to_local_problem_verification( 

130 env, 

131 attr=attr, 

132 path=path, 

133 basedir=basedir, 

134 error=error, 

135 tle=tle, 

136 mle=mle, 

137 ) 

138 yield from OjResolver._env_to_problem_verification( 

139 env, 

140 attr=attr, 

141 path=path, 

142 basedir=basedir, 

143 error=error, 

144 tle=tle, 

145 mle=mle, 

146 ) 

147 yield from OjResolver._env_to_standalone_verification( 

148 env, 

149 attr=attr, 

150 path=path, 

151 basedir=basedir, 

152 ) 

153 yield from OjResolver._env_to_unittest_verification(attr=attr) 

154 

155 @staticmethod 

156 def _env_to_problem_verification( 

157 env: LanguageEnvironment, 

158 *, 

159 attr: dict[str, Any], 

160 path: pathlib.Path, 

161 basedir: pathlib.Path, 

162 error: float | None, 

163 tle: float | None, 

164 mle: float | None, 

165 ) -> Generator[Verification, None, None]: 

166 url = attr.get("PROBLEM") 

167 

168 if not url: 

169 return 

170 

171 problem = problem_from_url(url) 

172 if problem is None: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 logger.error( 

174 'The URL "%s" is not supported', 

175 url, 

176 extra={"github": GitHubMessageParams()}, 

177 ) 

178 yield ConstVerification(status=ResultStatus.FAILURE) 

179 else: 

180 tempdir = problem.problem_directory 

181 yield ProblemVerification( 

182 name=env.name, 

183 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir), 

184 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir), 

185 problem=url, 

186 error=error, 

187 tle=tle, 

188 mle=mle, 

189 ) 

190 

191 @staticmethod 

192 def _env_to_local_problem_verification( 

193 env: LanguageEnvironment, 

194 *, 

195 attr: dict[str, Any], 

196 path: pathlib.Path, 

197 basedir: pathlib.Path, 

198 error: float | None, 

199 tle: float | None, 

200 mle: float | None, 

201 ) -> Generator[Verification, None, None]: 

202 casedir = attr.get("LOCALCASE") 

203 if casedir is None: 

204 return 

205 casedir = resolve_referenced_path(casedir, basedir=path.parent) 

206 if casedir is None: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 return 

208 

209 tempdir = ( 

210 config.get_cache_dir() 

211 / "localcase" 

212 / hashlib.md5( 

213 path.as_posix().encode("utf-8"), usedforsecurity=False 

214 ).hexdigest() 

215 ) 

216 yield LocalProblemVerification( 

217 name=env.name, 

218 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir), 

219 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir), 

220 input=casedir, 

221 error=error, 

222 tle=tle, 

223 mle=mle, 

224 tempdir=tempdir, 

225 ) 

226 

227 @staticmethod 

228 def _env_to_standalone_verification( 

229 env: LanguageEnvironment, 

230 *, 

231 attr: dict[str, Any], 

232 path: pathlib.Path, 

233 basedir: pathlib.Path, 

234 ) -> Generator[Verification, None, None]: 

235 if attr.get("STANDALONE") is None: 

236 return 

237 

238 tempdir = ( 

239 config.get_cache_dir() 

240 / "standalone" 

241 / hashlib.md5( 

242 path.as_posix().encode("utf-8"), usedforsecurity=False 

243 ).hexdigest() 

244 ) 

245 yield CommandVerification( 

246 name=env.name, 

247 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir), 

248 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir), 

249 tempdir=tempdir, 

250 ) 

251 

252 @staticmethod 

253 def _env_to_unittest_verification( 

254 *, 

255 attr: dict[str, Any], 

256 ) -> Generator[Verification, None, None]: 

257 unit_test_envvar = attr.get("UNITTEST") 

258 if not unit_test_envvar: 

259 return 

260 var = os.getenv(unit_test_envvar) 

261 if var is None: 261 ↛ 264line 261 didn't jump to line 264 because the condition on line 261 was always true

262 logger.warning("UNITTEST envvar %s is not defined.", unit_test_envvar) 

263 yield ConstVerification(status=ResultStatus.FAILURE) 

264 elif var.lower() == "false" or var == "0": 

265 logger.info("UNITTEST envvar %s=%s is falsy.", unit_test_envvar, var) 

266 yield ConstVerification(status=ResultStatus.FAILURE) 

267 else: 

268 logger.info("UNITTEST envvar %s=%s is truthy.", unit_test_envvar, var) 

269 yield ConstVerification(status=ResultStatus.SUCCESS) 

270 

271 def resolve(self, *, bundle: bool) -> VerificationInput: 

272 files: dict[pathlib.Path, VerificationFile] = {} 

273 basedir = pathlib.Path.cwd() 

274 

275 for path in git.ls_files(*self.include): 

276 if self._match_exclude(path): 

277 logger.debug("exclude=%s", path) 

278 continue 

279 

280 language = self._lang_dict.get(path.suffix) 

281 if language is None: 

282 continue 

283 

284 deps = set(git.ls_files(*language.list_dependencies(path, basedir=basedir))) 

285 attr = language.list_attributes(path, basedir=basedir) 

286 

287 additonal_sources: list[AddtionalSource] = [] 

288 if bundle: 

289 try: 

290 bundled_code = language.bundle(path, basedir=basedir) 

291 if bundled_code: 

292 dest_path = _write_bundled(bundled_code, path=path) 

293 additonal_sources.append( 

294 AddtionalSource(name="bundled", path=dest_path) 

295 ) 

296 except Exception: # noqa: BLE001 

297 dest_path = _write_bundled( 

298 traceback.format_exc().encode(), path=path 

299 ) 

300 additonal_sources.append( 

301 AddtionalSource(name="bundle error", path=dest_path) 

302 ) 

303 

304 verifications = list( 

305 chain.from_iterable( 

306 self.env_to_verifications(vs, attr=attr, path=path, basedir=basedir) 

307 for vs in language.list_environments(path, basedir=basedir) 

308 ) 

309 ) 

310 files[path] = VerificationFile( 

311 dependencies=deps, 

312 verification=verifications, 

313 document_attributes=attr, 

314 additonal_sources=additonal_sources, 

315 ) 

316 return VerificationInput(files=files) 

317 

318 

319class OjResolve(IncludeExcludeArguments, VerboseArguments): 

320 subcommand: Literal["oj-resolve"] = Field( 

321 default="oj-resolve", 

322 description="Create verify_files json using `oj-verify`", 

323 ) 

324 bundle: bool = True 

325 config: pathlib.Path | VerificationConfig | None = None 

326 

327 @classmethod 

328 def add_parser(cls, parser: ArgumentParser): 

329 super().add_parser(parser) 

330 parser.add_argument( 

331 "--no-bundle", 

332 dest="bundle", 

333 action="store_false", 

334 help="Disable bundle", 

335 ) 

336 parser.add_argument( 

337 "--config", 

338 help="config.toml", 

339 type=pathlib.Path, 

340 ) 

341 

342 def to_resolver(self) -> OjResolver: 

343 if self.config is None: 

344 logger.info("no config file") 

345 config = VerificationConfig() 

346 elif not isinstance(self.config, VerificationConfig): 346 ↛ 359line 346 didn't jump to line 359 because the condition on line 346 was always true

347 config_path = pathlib.Path(self.config) 

348 try: 

349 with config_path.open("rb") as fp: 

350 config = VerificationConfig.load(fp) 

351 logger.info("config file loaded: %s: %s", config_path, config) 

352 except ValidationError: 

353 logger.exception( 

354 "config file validation error", 

355 extra={"github": GitHubMessageParams(file=config_path)}, 

356 ) 

357 config = VerificationConfig() 

358 else: 

359 config = self.config 

360 

361 return OjResolver( 

362 include=self.include, 

363 exclude=self.exclude, 

364 config=config, 

365 ) 

366 

367 def _run(self) -> bool: 

368 logger.debug("arguments:%s", self) 

369 resolved = self.to_resolver().resolve(bundle=self.bundle) 

370 print(resolved.model_dump_json(exclude_none=True)) 

371 return True