Coverage for src / competitive_verifier / oj / resolver.py: 89%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-05 16:00 +0000

1import fnmatch 

2import hashlib 

3import os 

4import pathlib 

5import traceback 

6from argparse import ArgumentParser 

7from collections.abc import Generator 

8from functools import cached_property 

9from itertools import chain 

10from logging import getLogger 

11from typing import Any, Literal 

12 

13from pydantic import Field, ValidationError 

14 

15from competitive_verifier import config, git 

16from competitive_verifier.arg import IncludeExcludeArguments, VerboseArguments 

17from competitive_verifier.log import GitHubMessageParams 

18from competitive_verifier.models import ( 

19 AddtionalSource, 

20 CommandVerification, 

21 ConstVerification, 

22 LocalProblemVerification, 

23 ProblemVerification, 

24 ResultStatus, 

25 Verification, 

26 VerificationFile, 

27 VerificationInput, 

28) 

29from competitive_verifier.util import resolve_relative_or_abs_path 

30 

31from .problem import problem_from_url 

32from .verify.list import OjVerifyConfig 

33from .verify.models import LanguageEnvironment 

34 

35logger = getLogger(__name__) 

36 

37 

38def _get_bundled_dir() -> pathlib.Path: 

39 return config.get_config_dir() / "bundled" 

40 

41 

42def _write_bundled(content: bytes, *, path: pathlib.Path) -> pathlib.Path: 

43 """Write bundled code. 

44 

45 Returns: 

46 output file path. 

47 """ 

48 dest_dir = _get_bundled_dir() 

49 dest_path = dest_dir / path 

50 dest_path.parent.mkdir(parents=True, exist_ok=True) 

51 logger.info("bundle_path=%s", dest_path.as_posix()) 

52 dest_path.write_bytes(content) 

53 return dest_path 

54 

55 

56class OjResolver: 

57 include: list[str] 

58 exclude: list[str] 

59 config: OjVerifyConfig 

60 _match_exclude_cache: dict[pathlib.Path, bool] 

61 

62 def __init__( 

63 self, 

64 *, 

65 include: list[str], 

66 exclude: list[str], 

67 config: OjVerifyConfig, 

68 ) -> None: 

69 def _remove_slash(s: str): 

70 s = os.path.normpath(s) 

71 while len(s) > 1 and s[-1] == os.sep: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 s = s[:-1] 

73 return s 

74 

75 self.include = list(map(_remove_slash, include)) 

76 self.exclude = list(map(_remove_slash, exclude)) 

77 self.config = config 

78 self._match_exclude_cache = {} 

79 

80 def _match_exclude2(self, paths: list[pathlib.Path]) -> bool: 

81 if not paths: 

82 return False 

83 path = paths.pop() 

84 cache = self._match_exclude_cache.get(path, None) 

85 if cache is not None: 

86 return cache 

87 

88 for ex in self.exclude: 

89 if fnmatch.fnmatch(path.as_posix(), ex): 

90 self._match_exclude_cache[path] = True 

91 return True 

92 result = self._match_exclude2(paths) 

93 self._match_exclude_cache[path] = result 

94 return result 

95 

96 def _match_exclude(self, path: pathlib.Path) -> bool: 

97 paths = list(path.parents) 

98 paths.reverse() 

99 paths.append(path) 

100 return self._match_exclude2(paths) 

101 

102 @cached_property 

103 def _lang_dict(self): 

104 return self.config.get_dict() 

105 

106 @staticmethod 

107 def env_to_verifications( 

108 env: LanguageEnvironment, 

109 *, 

110 attr: dict[str, Any], 

111 path: pathlib.Path, 

112 basedir: pathlib.Path, 

113 ) -> Generator[Verification, None, None]: 

114 if "IGNORE" in attr: 

115 yield ConstVerification(status=ResultStatus.SKIPPED) 

116 return 

117 

118 error_str = attr.get("ERROR") 

119 try: 

120 error = float(error_str) if error_str else None 

121 except ValueError: 

122 error = None 

123 

124 tle_str = attr.get("TLE") 

125 tle = float(tle_str) if tle_str else None 

126 

127 mle_str = attr.get("MLE") 

128 mle = float(mle_str) if mle_str else None 

129 

130 yield from OjResolver._env_to_local_problem_verification( 

131 env, 

132 attr=attr, 

133 path=path, 

134 basedir=basedir, 

135 error=error, 

136 tle=tle, 

137 mle=mle, 

138 ) 

139 yield from OjResolver._env_to_problem_verification( 

140 env, 

141 attr=attr, 

142 path=path, 

143 basedir=basedir, 

144 error=error, 

145 tle=tle, 

146 mle=mle, 

147 ) 

148 yield from OjResolver._env_to_standalone_verification( 

149 env, 

150 attr=attr, 

151 path=path, 

152 basedir=basedir, 

153 ) 

154 yield from OjResolver._env_to_unittest_verification(attr=attr) 

155 

156 @staticmethod 

157 def _env_to_problem_verification( 

158 env: LanguageEnvironment, 

159 *, 

160 attr: dict[str, Any], 

161 path: pathlib.Path, 

162 basedir: pathlib.Path, 

163 error: float | None, 

164 tle: float | None, 

165 mle: float | None, 

166 ) -> Generator[Verification, None, None]: 

167 url = attr.get("PROBLEM") 

168 

169 if not url: 

170 return 

171 

172 problem = problem_from_url(url) 

173 if problem is None: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 logger.error( 

175 'The URL "%s" is not supported', 

176 url, 

177 extra={"github": GitHubMessageParams()}, 

178 ) 

179 yield ConstVerification(status=ResultStatus.FAILURE) 

180 else: 

181 tempdir = problem.problem_directory 

182 yield ProblemVerification( 

183 name=env.name, 

184 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir), 

185 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir), 

186 problem=url, 

187 error=error, 

188 tle=tle, 

189 mle=mle, 

190 ) 

191 

192 @staticmethod 

193 def _env_to_local_problem_verification( 

194 env: LanguageEnvironment, 

195 *, 

196 attr: dict[str, Any], 

197 path: pathlib.Path, 

198 basedir: pathlib.Path, 

199 error: float | None, 

200 tle: float | None, 

201 mle: float | None, 

202 ) -> Generator[Verification, None, None]: 

203 casedir = attr.get("LOCALCASE") 

204 if casedir is None: 

205 return 

206 casedir = resolve_relative_or_abs_path(casedir, basedir=path.parent) 

207 if casedir is None: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 return 

209 

210 tempdir = ( 

211 config.get_cache_dir() 

212 / "localcase" 

213 / hashlib.md5( 

214 path.as_posix().encode("utf-8"), usedforsecurity=False 

215 ).hexdigest() 

216 ) 

217 yield LocalProblemVerification( 

218 name=env.name, 

219 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir), 

220 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir), 

221 input=casedir, 

222 error=error, 

223 tle=tle, 

224 mle=mle, 

225 tempdir=tempdir, 

226 ) 

227 

228 @staticmethod 

229 def _env_to_standalone_verification( 

230 env: LanguageEnvironment, 

231 *, 

232 attr: dict[str, Any], 

233 path: pathlib.Path, 

234 basedir: pathlib.Path, 

235 ) -> Generator[Verification, None, None]: 

236 if attr.get("STANDALONE") is None: 

237 return 

238 

239 tempdir = ( 

240 config.get_cache_dir() 

241 / "standalone" 

242 / hashlib.md5( 

243 path.as_posix().encode("utf-8"), usedforsecurity=False 

244 ).hexdigest() 

245 ) 

246 yield CommandVerification( 

247 name=env.name, 

248 command=env.get_execute_command(path, basedir=basedir, tempdir=tempdir), 

249 compile=env.get_compile_command(path, basedir=basedir, tempdir=tempdir), 

250 tempdir=tempdir, 

251 ) 

252 

253 @staticmethod 

254 def _env_to_unittest_verification( 

255 *, 

256 attr: dict[str, Any], 

257 ) -> Generator[Verification, None, None]: 

258 unit_test_envvar = attr.get("UNITTEST") 

259 if not unit_test_envvar: 

260 return 

261 var = os.getenv(unit_test_envvar) 

262 if var is None: 262 ↛ 265line 262 didn't jump to line 265 because the condition on line 262 was always true

263 logger.warning("UNITTEST envvar %s is not defined.", unit_test_envvar) 

264 yield ConstVerification(status=ResultStatus.FAILURE) 

265 elif var.lower() == "false" or var == "0": 

266 logger.info("UNITTEST envvar %s=%s is falsy.", unit_test_envvar, var) 

267 yield ConstVerification(status=ResultStatus.FAILURE) 

268 else: 

269 logger.info("UNITTEST envvar %s=%s is truthy.", unit_test_envvar, var) 

270 yield ConstVerification(status=ResultStatus.SUCCESS) 

271 

272 def resolve(self, *, bundle: bool) -> VerificationInput: 

273 files: dict[pathlib.Path, VerificationFile] = {} 

274 basedir = pathlib.Path.cwd() 

275 

276 for path in git.ls_files(*self.include): 

277 if self._match_exclude(path): 

278 logger.debug("exclude=%s", path) 

279 continue 

280 

281 language = self._lang_dict.get(path.suffix) 

282 if language is None: 

283 continue 

284 

285 deps = set(git.ls_files(*language.list_dependencies(path, basedir=basedir))) 

286 attr = language.list_attributes(path, basedir=basedir) 

287 

288 additonal_sources: list[AddtionalSource] = [] 

289 if bundle: 

290 try: 

291 bundled_code = language.bundle(path, basedir=basedir) 

292 if bundled_code: 

293 dest_path = _write_bundled(bundled_code, path=path) 

294 additonal_sources.append( 

295 AddtionalSource(name="bundled", path=dest_path) 

296 ) 

297 except Exception: # noqa: BLE001 

298 dest_path = _write_bundled( 

299 traceback.format_exc().encode(), path=path 

300 ) 

301 additonal_sources.append( 

302 AddtionalSource(name="bundle error", path=dest_path) 

303 ) 

304 

305 verifications = list( 

306 chain.from_iterable( 

307 self.env_to_verifications(vs, attr=attr, path=path, basedir=basedir) 

308 for vs in language.list_environments(path, basedir=basedir) 

309 ) 

310 ) 

311 files[path] = VerificationFile( 

312 dependencies=deps, 

313 verification=verifications, 

314 document_attributes=attr, 

315 additonal_sources=additonal_sources, 

316 ) 

317 return VerificationInput(files=files) 

318 

319 

320class OjResolve(IncludeExcludeArguments, VerboseArguments): 

321 subcommand: Literal["oj-resolve"] = Field( 

322 default="oj-resolve", 

323 description="Create verify_files json using `oj-verify`", 

324 ) 

325 bundle: bool = True 

326 config: pathlib.Path | OjVerifyConfig | None = None 

327 

328 @classmethod 

329 def add_parser(cls, parser: ArgumentParser): 

330 super().add_parser(parser) 

331 parser.add_argument( 

332 "--no-bundle", 

333 dest="bundle", 

334 action="store_false", 

335 help="Disable bundle", 

336 ) 

337 parser.add_argument( 

338 "--config", 

339 help="config.toml", 

340 type=pathlib.Path, 

341 ) 

342 

343 def to_resolver(self) -> OjResolver: 

344 if self.config is None: 

345 logger.info("no config file") 

346 config = OjVerifyConfig() 

347 elif not isinstance(self.config, OjVerifyConfig): 347 ↛ 360line 347 didn't jump to line 360 because the condition on line 347 was always true

348 config_path = pathlib.Path(self.config) 

349 try: 

350 with config_path.open("rb") as fp: 

351 config = OjVerifyConfig.load(fp) 

352 logger.info("config file loaded: %s: %s", config_path, config) 

353 except ValidationError: 

354 logger.exception( 

355 "config file validation error", 

356 extra={"github": GitHubMessageParams(file=config_path)}, 

357 ) 

358 config = OjVerifyConfig() 

359 else: 

360 config = self.config 

361 

362 return OjResolver( 

363 include=self.include, 

364 exclude=self.exclude, 

365 config=config, 

366 ) 

367 

368 def _run(self) -> bool: 

369 logger.debug("arguments:%s", self) 

370 resolved = self.to_resolver().resolve(bundle=self.bundle) 

371 print(resolved.model_dump_json(exclude_none=True)) 

372 return True