Coverage for src / competitive_verifier / verify / verifier.py: 100%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-05 16:00 +0000

1import datetime 

2import pathlib 

3import time 

4from abc import ABC, abstractmethod 

5from functools import cached_property 

6from logging import getLogger 

7 

8from competitive_verifier import git, log 

9from competitive_verifier.download import download_files as run_download 

10from competitive_verifier.models import ( 

11 FileResult, 

12 ResultStatus, 

13 VerifcationTimeoutError, 

14 Verification, 

15 VerificationFile, 

16 VerificationInput, 

17 VerificationResult, 

18 VerifyCommandResult, 

19) 

20from competitive_verifier.resource import try_ulimit_stack 

21from competitive_verifier.verify.split_state import SplitState 

22 

23logger = getLogger(__name__) 

24 

25 

26def _now() -> datetime.datetime: 

27 return datetime.datetime.now(datetime.timezone.utc).astimezone() 

28 

29 

30class InputContainer(ABC): 

31 verifications: VerificationInput 

32 verification_time: datetime.datetime 

33 prev_result: VerifyCommandResult | None 

34 split_state: SplitState | None 

35 

36 def __init__( 

37 self, 

38 *, 

39 verifications: VerificationInput, 

40 verification_time: datetime.datetime, 

41 prev_result: VerifyCommandResult | None, 

42 split_state: SplitState | None, 

43 ) -> None: 

44 self.verifications = verifications 

45 self.verification_time = verification_time 

46 self.prev_result = prev_result 

47 self.split_state = split_state 

48 

49 @abstractmethod 

50 def get_file_timestamp(self, path: pathlib.Path) -> datetime.datetime: ... 

51 

52 def file_need_verification( 

53 self, 

54 path: pathlib.Path, 

55 file_result: FileResult, 

56 ) -> bool: 

57 if not path.exists(): 

58 return False 

59 base_time = min(self.verification_time, self.get_file_timestamp(path)) 

60 result = file_result.need_verification(base_time) 

61 if result: 

62 logger.info("%s needs verification. base_time: %s", path, base_time) 

63 else: 

64 logger.info("%s doesn't need verification. base_time: %s", path, base_time) 

65 return result 

66 

67 @cached_property 

68 def verification_files(self) -> dict[pathlib.Path, VerificationFile]: 

69 """List of verification files.""" 

70 return { 

71 p: f for p, f in self.verifications.files.items() if f.is_verification() 

72 } 

73 

74 @cached_property 

75 def skippable_verification_files(self) -> dict[pathlib.Path, VerificationFile]: 

76 return { 

77 p: f 

78 for p, f in self.verification_files.items() 

79 if f.is_lightweight_verification() 

80 } 

81 

82 @cached_property 

83 def remaining_verification_files(self) -> dict[pathlib.Path, VerificationFile]: 

84 """List of verification files that have not yet been verified.""" 

85 verification_files = { 

86 p: f 

87 for p, f in self.verification_files.items() 

88 if p not in self.skippable_verification_files 

89 } 

90 

91 if self.prev_result is None: 

92 return verification_files 

93 

94 not_updated_files = { 

95 k 

96 for k, v in self.verifications.filterd_files(self.prev_result.files) 

97 if not self.file_need_verification(k, v) 

98 } 

99 return { 

100 p: f for p, f in verification_files.items() if p not in not_updated_files 

101 } 

102 

103 @cached_property 

104 def current_verification_files(self) -> dict[pathlib.Path, VerificationFile]: 

105 """List of verification files that self should verify. 

106 

107 if ``split_state`` is None the property is ``remaining_verification_files``; 

108 

109 else ``split_state.split(remaining_verification_files)``. 

110 """ 

111 if self.split_state is None: 

112 return self.remaining_verification_files 

113 

114 lst = [(p, f) for p, f in self.remaining_verification_files.items()] 

115 lst.sort(key=lambda tup: tup[0]) 

116 

117 return dict(self.split_state.split(lst)) 

118 

119 

120class BaseVerifier(InputContainer): 

121 timeout: float 

122 default_tle: float | None 

123 default_mle: float | None 

124 split_state: SplitState | None 

125 

126 _result: VerifyCommandResult | None 

127 

128 def __init__( 

129 self, 

130 verifications: VerificationInput, 

131 *, 

132 timeout: float, 

133 default_tle: float | None, 

134 default_mle: float | None, 

135 prev_result: VerifyCommandResult | None, 

136 split_state: SplitState | None, 

137 verification_time: datetime.datetime | None = None, 

138 ) -> None: 

139 super().__init__( 

140 verifications=verifications, 

141 verification_time=verification_time or _now(), 

142 prev_result=prev_result, 

143 split_state=split_state, 

144 ) 

145 self._input = verifications 

146 self.timeout = timeout 

147 self.default_tle = default_tle 

148 self.default_mle = default_mle 

149 self._result = None 

150 

151 @property 

152 def is_first(self) -> bool: 

153 if not self.split_state: 

154 return True 

155 return self.split_state.index == 0 

156 

157 def _enumerate_verifications( 

158 self, 

159 p: pathlib.Path, 

160 f: VerificationFile, 

161 *, 

162 download: bool, 

163 deadline: float, 

164 ) -> list[VerificationResult]: 

165 logger.debug("%r", f) 

166 verifications = list[VerificationResult]() 

167 try: 

168 if time.perf_counter() > deadline: 

169 raise VerifcationTimeoutError # noqa: TRY301 

170 if download: 

171 run_download(f, check=True, group_log=False) 

172 except VerifcationTimeoutError: 

173 verifications.append( 

174 self.create_command_result(ResultStatus.SKIPPED, time.perf_counter()) 

175 ) 

176 logger.warning("Skip[Timeout]: %s", p) 

177 return verifications 

178 except BaseException: 

179 verifications.append( 

180 self.create_command_result(ResultStatus.FAILURE, time.perf_counter()) 

181 ) 

182 logger.exception( 

183 "Failed to download: %s", 

184 f.verification, 

185 extra={"github": log.GitHubMessageParams()}, 

186 ) 

187 return verifications 

188 

189 for ve in f.verification_list: 

190 logger.debug("command=%r", ve) 

191 prev_time = time.perf_counter() 

192 try: 

193 if prev_time > deadline: 

194 raise VerifcationTimeoutError # noqa: TRY301 

195 

196 rs, error_message = self.run_verification(ve, deadline=deadline) 

197 if error_message: 

198 logger.error( 

199 "%s: %s, verification=%s", 

200 error_message, 

201 p, 

202 ve.model_dump_json(exclude_unset=True), 

203 extra={"github": log.GitHubMessageParams(file=p)}, 

204 ) 

205 verifications.append( 

206 self.create_command_result(rs, prev_time, name=ve.name) 

207 ) 

208 except VerifcationTimeoutError: 

209 logger.warning("Skip[Timeout]: %s, %r", p, ve) 

210 verifications.append( 

211 self.create_command_result( 

212 ResultStatus.SKIPPED, 

213 prev_time, 

214 name=ve.name, 

215 ) 

216 ) 

217 except BaseException: 

218 logger.exception( 

219 "Failed to verify: %s, %r", 

220 p, 

221 ve, 

222 extra={"github": log.GitHubMessageParams()}, 

223 ) 

224 verifications.append( 

225 self.create_command_result( 

226 ResultStatus.FAILURE, 

227 prev_time, 

228 name=ve.name, 

229 ) 

230 ) 

231 return verifications 

232 

233 def verify(self, *, download: bool = True) -> VerifyCommandResult: 

234 start_time = time.perf_counter() 

235 deadline = start_time + self.timeout 

236 

237 with log.group("current_verification_files"): 

238 current_verification_files = self.current_verification_files 

239 logger.info( 

240 "current_verification_files: %s", 

241 " ".join(p.as_posix() for p in current_verification_files), 

242 ) 

243 try_ulimit_stack() 

244 

245 file_results: dict[pathlib.Path, FileResult] = ( 

246 { 

247 k: v.model_copy(update={"newest": False}) 

248 for k, v in self.verifications.filterd_files(self.prev_result.files) 

249 if k.exists() 

250 } 

251 if self.prev_result 

252 else {} 

253 ) 

254 

255 for p, f in current_verification_files.items(): 

256 with log.group(f"Verify: {p.as_posix()}"): 

257 file_results[p] = FileResult( 

258 verifications=self._enumerate_verifications( 

259 p, 

260 f, 

261 download=download, 

262 deadline=deadline, 

263 ) 

264 ) 

265 

266 sippable_file_results = self.skippable_results() 

267 self._result = VerifyCommandResult( 

268 total_seconds=time.perf_counter() - start_time, 

269 files=file_results | sippable_file_results, 

270 ) 

271 return self._result 

272 

273 def run_verification( 

274 self, 

275 verification: Verification, 

276 *, 

277 deadline: float = float("inf"), 

278 ) -> tuple[ResultStatus | VerificationResult, str | None]: 

279 """Run verification. 

280 

281 Returns: 

282 tuple[ResultStatus, Optional[str]]: (Result, error_message) 

283 """ 

284 if not verification.run_compile_command(): 

285 return ResultStatus.FAILURE, "Failed to compile" 

286 

287 if time.perf_counter() > deadline: 

288 raise VerifcationTimeoutError 

289 

290 rs = verification.run(self, deadline=deadline) 

291 

292 if rs.status != ResultStatus.SUCCESS: 

293 return rs, "Failed to test" 

294 return rs, None 

295 

296 def skippable_results(self) -> dict[pathlib.Path, FileResult]: 

297 """Run skippable verification.""" 

298 results = dict[pathlib.Path, FileResult]() 

299 if self.is_first: 

300 for p, f in self.skippable_verification_files.items(): 

301 logger.info("Start skippable: %s", p) 

302 verifications = list[VerificationResult]() 

303 prev_time = time.perf_counter() 

304 

305 for v in f.verification_list: 

306 rs = self.run_verification(v)[0] 

307 verifications.append( 

308 self.create_command_result(rs, prev_time, name=v.name) 

309 ) 

310 results[p] = FileResult( 

311 verifications=verifications, 

312 newest=True, 

313 ) 

314 return results 

315 

316 def create_command_result( 

317 self, 

318 status_or_result: ResultStatus | VerificationResult, 

319 prev_time: float, 

320 *, 

321 name: str | None = None, 

322 ) -> VerificationResult: 

323 if isinstance(status_or_result, VerificationResult): 

324 return status_or_result 

325 

326 elapsed = time.perf_counter() - prev_time 

327 return VerificationResult( 

328 verification_name=name, 

329 status=status_or_result, 

330 elapsed=elapsed, 

331 last_execution_time=self.verification_time, 

332 ) 

333 

334 

335class Verifier(BaseVerifier): 

336 use_git_timestamp: bool 

337 

338 def __init__( 

339 self, 

340 verifications: VerificationInput, 

341 *, 

342 timeout: float, 

343 default_tle: float | None, 

344 default_mle: float | None, 

345 prev_result: VerifyCommandResult | None, 

346 split_state: SplitState | None, 

347 verification_time: datetime.datetime | None = None, 

348 use_git_timestamp: bool, 

349 ) -> None: 

350 super().__init__( 

351 verifications=verifications, 

352 verification_time=verification_time or _now(), 

353 prev_result=prev_result, 

354 split_state=split_state, 

355 timeout=timeout, 

356 default_tle=default_tle, 

357 default_mle=default_mle, 

358 ) 

359 self.use_git_timestamp = use_git_timestamp 

360 

361 def get_file_timestamp(self, path: pathlib.Path) -> datetime.datetime: 

362 dependicies = self.verifications.transitive_depends_on[path] 

363 

364 if self.use_git_timestamp: 

365 return git.get_commit_time(dependicies) 

366 

367 timestamp = max(x.stat().st_mtime for x in dependicies) 

368 system_local_timezone = _now().tzinfo 

369 

370 # microsecond=0 is required because it's erased in git commit 

371 return datetime.datetime.fromtimestamp( 

372 timestamp, tz=system_local_timezone 

373 ).replace(microsecond=0)