Coverage for src / competitive_verifier / verify / verifier.py: 100%
146 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-05 16:00 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-05 16:00 +0000
1import datetime
2import pathlib
3import time
4from abc import ABC, abstractmethod
5from functools import cached_property
6from logging import getLogger
8from competitive_verifier import git, log
9from competitive_verifier.download import download_files as run_download
10from competitive_verifier.models import (
11 FileResult,
12 ResultStatus,
13 VerifcationTimeoutError,
14 Verification,
15 VerificationFile,
16 VerificationInput,
17 VerificationResult,
18 VerifyCommandResult,
19)
20from competitive_verifier.resource import try_ulimit_stack
21from competitive_verifier.verify.split_state import SplitState
23logger = getLogger(__name__)
26def _now() -> datetime.datetime:
27 return datetime.datetime.now(datetime.timezone.utc).astimezone()
30class InputContainer(ABC):
31 verifications: VerificationInput
32 verification_time: datetime.datetime
33 prev_result: VerifyCommandResult | None
34 split_state: SplitState | None
36 def __init__(
37 self,
38 *,
39 verifications: VerificationInput,
40 verification_time: datetime.datetime,
41 prev_result: VerifyCommandResult | None,
42 split_state: SplitState | None,
43 ) -> None:
44 self.verifications = verifications
45 self.verification_time = verification_time
46 self.prev_result = prev_result
47 self.split_state = split_state
49 @abstractmethod
50 def get_file_timestamp(self, path: pathlib.Path) -> datetime.datetime: ...
52 def file_need_verification(
53 self,
54 path: pathlib.Path,
55 file_result: FileResult,
56 ) -> bool:
57 if not path.exists():
58 return False
59 base_time = min(self.verification_time, self.get_file_timestamp(path))
60 result = file_result.need_verification(base_time)
61 if result:
62 logger.info("%s needs verification. base_time: %s", path, base_time)
63 else:
64 logger.info("%s doesn't need verification. base_time: %s", path, base_time)
65 return result
67 @cached_property
68 def verification_files(self) -> dict[pathlib.Path, VerificationFile]:
69 """List of verification files."""
70 return {
71 p: f for p, f in self.verifications.files.items() if f.is_verification()
72 }
74 @cached_property
75 def skippable_verification_files(self) -> dict[pathlib.Path, VerificationFile]:
76 return {
77 p: f
78 for p, f in self.verification_files.items()
79 if f.is_lightweight_verification()
80 }
82 @cached_property
83 def remaining_verification_files(self) -> dict[pathlib.Path, VerificationFile]:
84 """List of verification files that have not yet been verified."""
85 verification_files = {
86 p: f
87 for p, f in self.verification_files.items()
88 if p not in self.skippable_verification_files
89 }
91 if self.prev_result is None:
92 return verification_files
94 not_updated_files = {
95 k
96 for k, v in self.verifications.filterd_files(self.prev_result.files)
97 if not self.file_need_verification(k, v)
98 }
99 return {
100 p: f for p, f in verification_files.items() if p not in not_updated_files
101 }
103 @cached_property
104 def current_verification_files(self) -> dict[pathlib.Path, VerificationFile]:
105 """List of verification files that self should verify.
107 if ``split_state`` is None the property is ``remaining_verification_files``;
109 else ``split_state.split(remaining_verification_files)``.
110 """
111 if self.split_state is None:
112 return self.remaining_verification_files
114 lst = [(p, f) for p, f in self.remaining_verification_files.items()]
115 lst.sort(key=lambda tup: tup[0])
117 return dict(self.split_state.split(lst))
120class BaseVerifier(InputContainer):
121 timeout: float
122 default_tle: float | None
123 default_mle: float | None
124 split_state: SplitState | None
126 _result: VerifyCommandResult | None
128 def __init__(
129 self,
130 verifications: VerificationInput,
131 *,
132 timeout: float,
133 default_tle: float | None,
134 default_mle: float | None,
135 prev_result: VerifyCommandResult | None,
136 split_state: SplitState | None,
137 verification_time: datetime.datetime | None = None,
138 ) -> None:
139 super().__init__(
140 verifications=verifications,
141 verification_time=verification_time or _now(),
142 prev_result=prev_result,
143 split_state=split_state,
144 )
145 self._input = verifications
146 self.timeout = timeout
147 self.default_tle = default_tle
148 self.default_mle = default_mle
149 self._result = None
151 @property
152 def is_first(self) -> bool:
153 if not self.split_state:
154 return True
155 return self.split_state.index == 0
157 def _enumerate_verifications(
158 self,
159 p: pathlib.Path,
160 f: VerificationFile,
161 *,
162 download: bool,
163 deadline: float,
164 ) -> list[VerificationResult]:
165 logger.debug("%r", f)
166 verifications = list[VerificationResult]()
167 try:
168 if time.perf_counter() > deadline:
169 raise VerifcationTimeoutError # noqa: TRY301
170 if download:
171 run_download(f, check=True, group_log=False)
172 except VerifcationTimeoutError:
173 verifications.append(
174 self.create_command_result(ResultStatus.SKIPPED, time.perf_counter())
175 )
176 logger.warning("Skip[Timeout]: %s", p)
177 return verifications
178 except BaseException:
179 verifications.append(
180 self.create_command_result(ResultStatus.FAILURE, time.perf_counter())
181 )
182 logger.exception(
183 "Failed to download: %s",
184 f.verification,
185 extra={"github": log.GitHubMessageParams()},
186 )
187 return verifications
189 for ve in f.verification_list:
190 logger.debug("command=%r", ve)
191 prev_time = time.perf_counter()
192 try:
193 if prev_time > deadline:
194 raise VerifcationTimeoutError # noqa: TRY301
196 rs, error_message = self.run_verification(ve, deadline=deadline)
197 if error_message:
198 logger.error(
199 "%s: %s, verification=%s",
200 error_message,
201 p,
202 ve.model_dump_json(exclude_unset=True),
203 extra={"github": log.GitHubMessageParams(file=p)},
204 )
205 verifications.append(
206 self.create_command_result(rs, prev_time, name=ve.name)
207 )
208 except VerifcationTimeoutError:
209 logger.warning("Skip[Timeout]: %s, %r", p, ve)
210 verifications.append(
211 self.create_command_result(
212 ResultStatus.SKIPPED,
213 prev_time,
214 name=ve.name,
215 )
216 )
217 except BaseException:
218 logger.exception(
219 "Failed to verify: %s, %r",
220 p,
221 ve,
222 extra={"github": log.GitHubMessageParams()},
223 )
224 verifications.append(
225 self.create_command_result(
226 ResultStatus.FAILURE,
227 prev_time,
228 name=ve.name,
229 )
230 )
231 return verifications
233 def verify(self, *, download: bool = True) -> VerifyCommandResult:
234 start_time = time.perf_counter()
235 deadline = start_time + self.timeout
237 with log.group("current_verification_files"):
238 current_verification_files = self.current_verification_files
239 logger.info(
240 "current_verification_files: %s",
241 " ".join(p.as_posix() for p in current_verification_files),
242 )
243 try_ulimit_stack()
245 file_results: dict[pathlib.Path, FileResult] = (
246 {
247 k: v.model_copy(update={"newest": False})
248 for k, v in self.verifications.filterd_files(self.prev_result.files)
249 if k.exists()
250 }
251 if self.prev_result
252 else {}
253 )
255 for p, f in current_verification_files.items():
256 with log.group(f"Verify: {p.as_posix()}"):
257 file_results[p] = FileResult(
258 verifications=self._enumerate_verifications(
259 p,
260 f,
261 download=download,
262 deadline=deadline,
263 )
264 )
266 sippable_file_results = self.skippable_results()
267 self._result = VerifyCommandResult(
268 total_seconds=time.perf_counter() - start_time,
269 files=file_results | sippable_file_results,
270 )
271 return self._result
273 def run_verification(
274 self,
275 verification: Verification,
276 *,
277 deadline: float = float("inf"),
278 ) -> tuple[ResultStatus | VerificationResult, str | None]:
279 """Run verification.
281 Returns:
282 tuple[ResultStatus, Optional[str]]: (Result, error_message)
283 """
284 if not verification.run_compile_command():
285 return ResultStatus.FAILURE, "Failed to compile"
287 if time.perf_counter() > deadline:
288 raise VerifcationTimeoutError
290 rs = verification.run(self, deadline=deadline)
292 if rs.status != ResultStatus.SUCCESS:
293 return rs, "Failed to test"
294 return rs, None
296 def skippable_results(self) -> dict[pathlib.Path, FileResult]:
297 """Run skippable verification."""
298 results = dict[pathlib.Path, FileResult]()
299 if self.is_first:
300 for p, f in self.skippable_verification_files.items():
301 logger.info("Start skippable: %s", p)
302 verifications = list[VerificationResult]()
303 prev_time = time.perf_counter()
305 for v in f.verification_list:
306 rs = self.run_verification(v)[0]
307 verifications.append(
308 self.create_command_result(rs, prev_time, name=v.name)
309 )
310 results[p] = FileResult(
311 verifications=verifications,
312 newest=True,
313 )
314 return results
316 def create_command_result(
317 self,
318 status_or_result: ResultStatus | VerificationResult,
319 prev_time: float,
320 *,
321 name: str | None = None,
322 ) -> VerificationResult:
323 if isinstance(status_or_result, VerificationResult):
324 return status_or_result
326 elapsed = time.perf_counter() - prev_time
327 return VerificationResult(
328 verification_name=name,
329 status=status_or_result,
330 elapsed=elapsed,
331 last_execution_time=self.verification_time,
332 )
335class Verifier(BaseVerifier):
336 use_git_timestamp: bool
338 def __init__(
339 self,
340 verifications: VerificationInput,
341 *,
342 timeout: float,
343 default_tle: float | None,
344 default_mle: float | None,
345 prev_result: VerifyCommandResult | None,
346 split_state: SplitState | None,
347 verification_time: datetime.datetime | None = None,
348 use_git_timestamp: bool,
349 ) -> None:
350 super().__init__(
351 verifications=verifications,
352 verification_time=verification_time or _now(),
353 prev_result=prev_result,
354 split_state=split_state,
355 timeout=timeout,
356 default_tle=default_tle,
357 default_mle=default_mle,
358 )
359 self.use_git_timestamp = use_git_timestamp
361 def get_file_timestamp(self, path: pathlib.Path) -> datetime.datetime:
362 dependicies = self.verifications.transitive_depends_on[path]
364 if self.use_git_timestamp:
365 return git.get_commit_time(dependicies)
367 timestamp = max(x.stat().st_mtime for x in dependicies)
368 system_local_timezone = _now().tzinfo
370 # microsecond=0 is required because it's erased in git commit
371 return datetime.datetime.fromtimestamp(
372 timestamp, tz=system_local_timezone
373 ).replace(microsecond=0)