Coverage for src / competitive_verifier / oj / languages / rust.py: 72%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-04-26 12:38 +0900

1import abc 

2import enum 

3import functools 

4import itertools 

5import json 

6import pathlib 

7import shutil 

8from collections import defaultdict 

9from collections.abc import Sequence 

10from enum import Enum 

11from logging import getLogger 

12from typing import Any, Literal 

13 

14from pydantic import BaseModel, Field 

15 

16from competitive_verifier.exec import command_stdout 

17from competitive_verifier.models import ShellCommand 

18from competitive_verifier.util import read_text_normalized 

19 

20from .base import Language, LanguageEnvironment, OjVerifyLanguageConfig 

21 

22# ruff: noqa: PLR2004 

23 

24logger = getLogger(__name__) 

25 

26_metadata_by_manifest_path: dict[pathlib.Path, dict[str, Any]] = {} 

27_cargo_checked_workspaces: set[pathlib.Path] = set() 

28_related_source_files_by_workspace: dict[ 

29 pathlib.Path, dict[pathlib.Path, frozenset[pathlib.Path]] 

30] = {} 

31 

32 

33class OjVerifyRustListDependenciesBackend(BaseModel): 

34 kind: Literal["none", "cargo-udeps"] 

35 toolchain: str | None = None 

36 

37 

38class OjVerifyRustConfig(OjVerifyLanguageConfig): 

39 list_dependencies_backend: OjVerifyRustListDependenciesBackend | None = None 

40 

41 

42class _ListDependenciesBackend(abc.ABC, BaseModel): 

43 @abc.abstractmethod 

44 def list_dependencies( 

45 self, path: pathlib.Path, *, basedir: pathlib.Path 

46 ) -> list[pathlib.Path]: ... 

47 

48 

49class _NoBackend(_ListDependenciesBackend): 

50 def list_dependencies( 

51 self, path: pathlib.Path, *, basedir: pathlib.Path 

52 ) -> list[pathlib.Path]: 

53 return _list_dependencies_by_crate( 

54 path, basedir=basedir, cargo_udeps_toolchain=None 

55 ) 

56 

57 

58class _CargoUdeps(_ListDependenciesBackend): 

59 toolchain: str = "nightly" 

60 

61 def list_dependencies( 

62 self, path: pathlib.Path, *, basedir: pathlib.Path 

63 ) -> list[pathlib.Path]: 

64 return _list_dependencies_by_crate( 

65 path, basedir=basedir, cargo_udeps_toolchain=self.toolchain 

66 ) 

67 

68 

69@functools.cache 

70def _list_dependencies_by_crate( 

71 path: pathlib.Path, *, basedir: pathlib.Path, cargo_udeps_toolchain: str | None 

72) -> list[pathlib.Path]: 

73 """The `list_dependencies` implementation for `_NoBackend` and `CargoUdeps`. 

74 

75 Args: 

76 path (pathlib.Path): A main source file path of a target 

77 basedir (pathlib.Path): A parameter from `Language.list_dependencies` 

78 cargo_udeps_toolchain (str | None): If not `None`, use `cargo-udeps` with the specified toolchain to detect unused dependencies 

79 Returns: 

80 list[pathlib.Path]: A list of dependent `.rs` file paths 

81 Raises: 

82 RuntimeError: If any cargo command fails 

83 """ 

84 path = basedir / path 

85 

86 # We regard that a generated file does not depend on any files. 

87 for parent in path.parents: 

88 if (parent.parent / "Cargo.toml").exists() and parent.parts[-1] == "target": 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 logger.warning("This is a generated file!: %s", path) 

90 return [path] 

91 

92 metadata = _cargo_metadata(cwd=path.parent) 

93 

94 # First, collects source files in the same crate. 

95 common_result = set( 

96 _source_files_in_same_targets(path, _related_source_files(basedir, metadata)) 

97 ) 

98 

99 main_package_and_target = _find_target(metadata, path) 

100 if not main_package_and_target: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true

101 return sorted(common_result) 

102 main_package, main_target = main_package_and_target 

103 

104 packages_by_id = {p["id"]: p for p in metadata["packages"]} 

105 

106 class DependencyNamespace(Enum): 

107 NORMAL_DEVELOPMENT = enum.auto() 

108 BUILD = enum.auto() 

109 

110 @classmethod 

111 def from_dep_kind(cls, kind: str): 

112 if kind == "build": 

113 return cls.BUILD 

114 return cls.NORMAL_DEVELOPMENT 

115 

116 # Collect the `(|dev-|build-)dependencies` into a <is a `build-dependency`> → (<"extern crate name"> → <package>) dictionary. 

117 dependencies: defaultdict[DependencyNamespace, dict[str, dict[str, Any]]] = ( 

118 defaultdict(dict) 

119 ) 

120 for dep in next( 

121 n["deps"] for n in metadata["resolve"]["nodes"] if n["id"] == main_package["id"] 

122 ): 

123 if _need_dev_deps(main_target) or any( 123 ↛ 129line 123 didn't jump to line 129 because the condition on line 123 was always true

124 k["kind"] is None for k in dep["dep_kinds"] 

125 ): 

126 dependencies[DependencyNamespace.NORMAL_DEVELOPMENT][dep["name"]] = ( 

127 packages_by_id[dep["pkg"]] 

128 ) 

129 if any(k["kind"] == "build" for k in dep["dep_kinds"]): 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 dependencies[DependencyNamespace.BUILD][dep["name"]] = packages_by_id[ 

131 dep["pkg"] 

132 ] 

133 

134 # If `cargo_udeps_toolchain` is present, collects packages that are "unused" by `target`. 

135 unused_packages: defaultdict[DependencyNamespace, set[Any]] = defaultdict(set) 

136 if cargo_udeps_toolchain is not None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 explicit_names_in_toml = { 

138 (DependencyNamespace.from_dep_kind(d["kind"]), d["rename"]) 

139 for d in main_package["dependencies"] 

140 if d["rename"] 

141 } 

142 if not shutil.which("cargo-udeps"): 

143 raise RuntimeError("`cargo-udeps` not in $PATH") 

144 args: list[str] = [ 

145 "rustup", 

146 "run", 

147 cargo_udeps_toolchain, 

148 "cargo", 

149 "udeps", 

150 "--output", 

151 "json", 

152 "--manifest-path", 

153 main_package["manifest_path"], 

154 *_target_option(main_target), 

155 ] 

156 unused_deps = json.loads( 

157 command_stdout(args, cwd=metadata["workspace_root"], check=False) 

158 )["unused_deps"].values() 

159 unused_dep = next( 

160 ( 

161 u 

162 for u in unused_deps 

163 if u["manifest_path"] == main_package["manifest_path"] 

164 ), 

165 None, 

166 ) 

167 if unused_dep: 

168 names_in_toml: list[tuple[DependencyNamespace, Any]] = [ 

169 (DependencyNamespace.NORMAL_DEVELOPMENT, name_in_toml) 

170 for name_in_toml in [*unused_dep["normal"], *unused_dep["development"]] 

171 ] 

172 names_in_toml.extend( 

173 (DependencyNamespace.BUILD, name_in_toml) 

174 for name_in_toml in unused_dep["build"] 

175 ) 

176 for dependency_namespace, name_in_toml in names_in_toml: 

177 if (dependency_namespace, name_in_toml) in explicit_names_in_toml: 

178 # If the `name_in_toml` is explicitly renamed one, it equals to the `extern_crate_name`. 

179 unused_package: Any = dependencies[dependency_namespace][ 

180 name_in_toml 

181 ]["id"] 

182 else: 

183 # Otherwise, it equals to the `package.name`. 

184 unused_package = next( 

185 p["id"] 

186 for p in dependencies[dependency_namespace].values() 

187 if p["name"] == name_in_toml 

188 ) 

189 unused_packages[dependency_namespace].add(unused_package) 

190 

191 # Finally, adds source files related to the depended crates except: 

192 # 

193 # - those detected by cargo-udeps 

194 # - those come from Crates.io or Git repositories (e.g. `proconio`, other people's libraries including `ac-library-rs`) 

195 

196 # `main_package` should always be included. 

197 # Note that cargo-udeps does not detect it if it is unused. 

198 # https://github.com/est31/cargo-udeps/pull/35 

199 depended_packages = [main_package] 

200 for dependency_namespace, values in dependencies.items(): 

201 depended_packages.extend( 

202 depended_package 

203 for depended_package in values.values() 

204 if ( 

205 depended_package["id"] not in unused_packages[dependency_namespace] 

206 and not depended_package["source"] 

207 ) 

208 ) 

209 

210 ret = common_result 

211 

212 for depended_package in depended_packages: 

213 depended_targets = [ 

214 t 

215 for t in depended_package["targets"] 

216 if t != main_target and (_is_build(t) or _is_lib_or_proc_macro(t)) 

217 ] 

218 assert len(depended_targets) <= 2 

219 for depended_target in depended_targets: 

220 related_source_files = _related_source_files( 

221 basedir, 

222 _cargo_metadata_by_manifest_path( 

223 pathlib.Path(depended_package["manifest_path"]) 

224 ), 

225 ) 

226 ret |= _source_files_in_same_targets( 

227 pathlib.Path(depended_target["src_path"]).resolve(strict=True), 

228 related_source_files, 

229 ) 

230 return sorted(ret) 

231 

232 

233def _related_source_files( 

234 basedir: pathlib.Path, metadata: dict[str, Any] 

235) -> dict[pathlib.Path, frozenset[pathlib.Path]]: 

236 """Collects all of the `.rs` files recognized by a workspace. 

237 

238 Args: 

239 basedir (pathlib.Path): A parameter from `Language.list_dependencies` 

240 metadata (dict[str, Any]): "metadata" for a Cargo.toml file in the workspace 

241 Returns: 

242 dict[pathlib.Path, frozenset[pathlib.Path]]: A (main source file) → (other related files) map 

243 Raises: 

244 RuntimeError: If any cargo command fails 

245 """ 

246 if pathlib.Path(metadata["workspace_root"]) in _related_source_files_by_workspace: 

247 return _related_source_files_by_workspace[ 

248 pathlib.Path(metadata["workspace_root"]) 

249 ] 

250 

251 # Runs `cargo check` to generate `$target_directory/debug/deps/*.d`. 

252 if pathlib.Path(metadata["workspace_root"]) not in _cargo_checked_workspaces: 252 ↛ 266line 252 didn't jump to line 266 because the condition on line 252 was always true

253 command_stdout( 

254 [ 

255 "cargo", 

256 "check", 

257 "--manifest-path", 

258 str(pathlib.Path(metadata["workspace_root"], "Cargo.toml")), 

259 "--workspace", 

260 "--all-targets", 

261 ], 

262 cwd=metadata["workspace_root"], 

263 ) 

264 _cargo_checked_workspaces.add(pathlib.Path(metadata["workspace_root"])) 

265 

266 ret: dict[pathlib.Path, frozenset[pathlib.Path]] = {} 

267 

268 targets_in_workspace = itertools.chain.from_iterable( 

269 p["targets"] 

270 for p in metadata["packages"] 

271 if p["id"] in metadata["workspace_members"] 

272 ) 

273 for target in targets_in_workspace: 

274 # Finds the **latest** "dep-info" file that contains a line in the following format, and parses the line. 

275 # 

276 # ``` 

277 # <relative/absolute path to the `.d` file itself>: <relative/absolute path to the root source file> <relative/aboslute paths to the other related files>... 

278 # ``` 

279 # 

280 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1979-L1997 

281 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1824-L1830 

282 if _is_build(target): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 dep_info_paths = pathlib.Path( 

284 metadata["target_directory"], "debug", "build" 

285 ).rglob(f"{_crate_name(target)}-*.d") 

286 elif _is_example(target): 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 dep_info_paths = pathlib.Path( 

288 metadata["target_directory"], "debug", "examples" 

289 ).glob(f"{_crate_name(target)}-*.d") 

290 else: 

291 dep_info_paths = pathlib.Path( 

292 metadata["target_directory"], "debug", "deps" 

293 ).glob(f"{_crate_name(target)}-*.d") 

294 for dep_info_path in sorted( 294 ↛ 325line 294 didn't jump to line 325 because the loop on line 294 didn't complete

295 dep_info_paths, key=lambda p: p.stat().st_mtime_ns, reverse=True 

296 ): 

297 dep_info = read_text_normalized(dep_info_path) 

298 for line in dep_info.splitlines(): 298 ↛ 322line 298 didn't jump to line 322 because the loop on line 298 didn't complete

299 ss = line.split(": ") 

300 if ( 

301 len(ss) == 2 

302 and pathlib.Path(metadata["workspace_root"], ss[0]) == dep_info_path 

303 ): 

304 paths: list[pathlib.Path] = [] 

305 it = iter(ss[1].split()) 

306 for s in it: 

307 ss = s 

308 while ss.endswith("\\"): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 ss = ss.rstrip("\\") + " " + next(it) 

310 path = pathlib.Path(metadata["workspace_root"], s).resolve( 

311 strict=True 

312 ) 

313 # Ignores paths that don't start with the `basedir`. (e.g. `/dev/null`, `/usr/local/share/foo/bar`) 

314 if path.is_relative_to(basedir): 314 ↛ 306line 314 didn't jump to line 306 because the condition on line 314 was always true

315 paths.append(path) 

316 if paths[:1] == [ 316 ↛ 298line 316 didn't jump to line 298 because the condition on line 316 was always true

317 pathlib.Path(target["src_path"]).resolve(strict=True) 

318 ]: 

319 ret[paths[0]] = frozenset(paths[1:]) 

320 break 

321 else: 

322 continue 

323 break 

324 else: 

325 logger.error("no `.d` file for `%s`", target["name"]) 

326 

327 _related_source_files_by_workspace[pathlib.Path(metadata["workspace_root"])] = ret 

328 return ret 

329 

330 

331def _source_files_in_same_targets( 

332 path: pathlib.Path, 

333 related_source_files: dict[pathlib.Path, frozenset[pathlib.Path]], 

334) -> frozenset[pathlib.Path]: 

335 """Returns `.rs` file paths relating to `path`. 

336 

337 Args: 

338 path (pathlib.Path): A main source file path of a target 

339 related_source_files (dict[pathlib.Path, frozenset[pathlib.Path]]): A (main source file) → (other related files) map 

340 Returns: 

341 frozenset[pathlib.Path]: A set of `.rs` file paths relating to `path` 

342 Raises: 

343 RuntimeError: If `path` is not found in `related_source_files` and is not related to any other files. 

344 """ 

345 # If `p` is `src_path` of a target, it does not belong to any other target unless it's weirdly symlinked, 

346 if path in related_source_files: 346 ↛ 350line 346 didn't jump to line 350 because the condition on line 346 was always true

347 return frozenset({path, *related_source_files[path]}) 

348 

349 # Otherwise, it may be used by multiple targets with `#[path = ".."] mod foo;` or something. 

350 return frozenset( 

351 itertools.chain.from_iterable( 

352 {k, *v} for (k, v) in related_source_files.items() if path in v 

353 ) 

354 ) or frozenset({path}) 

355 

356 

357class RustLanguageEnvironment(LanguageEnvironment): 

358 @property 

359 def name(self) -> str: 

360 return "Rust" 

361 

362 def get_compile_command( 

363 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

364 ) -> ShellCommand: 

365 path = basedir / path 

366 metadata = _cargo_metadata(cwd=path.parent) 

367 target = _ensure_target(metadata, path) 

368 return ShellCommand( 

369 command=["cargo", "build", "--release", *_target_option(target)], 

370 cwd=path.parent, 

371 ) 

372 

373 def get_execute_command( 

374 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

375 ) -> str: 

376 path = basedir / path 

377 metadata = _cargo_metadata(cwd=path.parent) 

378 target = _ensure_target(metadata, path) 

379 return str( 

380 pathlib.Path( 

381 metadata["target_directory"], 

382 "release", 

383 *([] if _is_bin(target) else ["examples"]), 

384 target["name"], 

385 ) 

386 ) 

387 

388 

389class RustLanguage(Language): 

390 config: OjVerifyRustConfig = Field(default_factory=OjVerifyRustConfig) 

391 

392 @functools.cached_property 

393 def _list_dependencies_backend(self) -> _ListDependenciesBackend: 

394 list_dependencies_backend = self.config.list_dependencies_backend 

395 if ( 395 ↛ 400line 395 didn't jump to line 400 because the condition on line 395 was always true

396 list_dependencies_backend is None 

397 or list_dependencies_backend.kind == "none" 

398 ): 

399 return _NoBackend() 

400 if list_dependencies_backend.kind == "cargo-udeps": 

401 if toolchain := list_dependencies_backend.toolchain: 

402 return _CargoUdeps(toolchain=toolchain) 

403 return _CargoUdeps() 

404 raise RuntimeError( 

405 "expected 'none' or 'cargo-udeps' for `languages.rust.list_dependencies_backend.kind`" 

406 ) 

407 

408 def list_dependencies( 

409 self, path: pathlib.Path, *, basedir: pathlib.Path 

410 ) -> list[pathlib.Path]: 

411 return self._list_dependencies_backend.list_dependencies(path, basedir=basedir) 

412 

413 def list_environments( 

414 self, path: pathlib.Path, *, basedir: pathlib.Path 

415 ) -> Sequence[RustLanguageEnvironment]: 

416 return [RustLanguageEnvironment()] 

417 

418 

419def _cargo_metadata(cwd: pathlib.Path) -> dict[str, Any]: 

420 """Returns "metadata" for a Cargo.toml file in `cwd` or its parent directories. 

421 

422 Args: 

423 cwd (pathlib.Path): The current working directory 

424 Returns: 

425 dict[str, Any]: Output of `cargo metadata` command 

426 Raises: 

427 ValueError: If `cwd` is not absolute or contains `..` 

428 RuntimeError: If no `Cargo.toml` is found 

429 """ 

430 if not cwd.is_absolute() or ".." in cwd.parts: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true

431 raise ValueError( 

432 f"the `cwd` parameter must be absolute and must not contain `..`: {cwd}" 

433 ) 

434 

435 # https://docs.rs/cargo/0.49.0/src/cargo/util/important_paths.rs.html#6-20 

436 for directory in [cwd, *cwd.parents]: 436 ↛ 440line 436 didn't jump to line 440 because the loop on line 436 didn't complete

437 manifest_path = directory / "Cargo.toml" 

438 if manifest_path.exists(): 

439 return _cargo_metadata_by_manifest_path(manifest_path) 

440 raise RuntimeError( 

441 f"could not find `Cargo.toml` in `{cwd}` or any parent directory" 

442 ) 

443 

444 

445def _cargo_metadata_by_manifest_path(manifest_path: pathlib.Path) -> dict[str, Any]: 

446 """Returns "metadata" for a certain `Cargo.toml`. 

447 

448 Args: 

449 manifest_path (pathlib.Path): Path to a `Cargo.toml` 

450 Returns: 

451 dict[str, Any]: Output of `cargo metadata` command 

452 Raises: 

453 RuntimeError: If the `cargo metadata` command fails 

454 """ 

455 if manifest_path in _metadata_by_manifest_path: 

456 return _metadata_by_manifest_path[manifest_path] 

457 

458 metadata = _run_cargo_metadata(manifest_path) 

459 root_manifest_path = pathlib.Path(metadata["workspace_root"], "Cargo.toml") 

460 if root_manifest_path != manifest_path: 460 ↛ 463line 460 didn't jump to line 463 because the condition on line 460 was always true

461 metadata = _run_cargo_metadata(root_manifest_path) 

462 

463 for key in [ 

464 root_manifest_path, 

465 *( 

466 pathlib.Path(p["manifest_path"]) 

467 for p in metadata["packages"] 

468 if p["id"] in metadata["workspace_members"] 

469 ), 

470 ]: 

471 _metadata_by_manifest_path[key] = metadata 

472 

473 return metadata 

474 

475 

476def _run_cargo_metadata(manifest_path: pathlib.Path) -> dict[str, Any]: 

477 """Runs `cargo metadata` for a certain `Cargo.toml`. 

478 

479 This function is considered to be executed just once for every Cargo.toml in the repository. 

480 For detailed information about `cargo metadata`, see: 

481 

482 - <https://doc.rust-lang.org/cargo/commands/cargo-metadata.html#output-format> 

483 - <https://docs.rs/cargo_metadata> 

484 

485 Args: 

486 manifest_path (pathlib.Path): Path to a `Cargo.toml` 

487 Returns: 

488 dict[str, Any]: Output of `cargo metadata` command 

489 Raises: 

490 RuntimeError: If the `cargo metadata` command fails 

491 """ 

492 return json.loads( 

493 command_stdout( 

494 [ 

495 "cargo", 

496 "metadata", 

497 "--format-version", 

498 "1", 

499 "--manifest-path", 

500 str(manifest_path), 

501 ], 

502 cwd=manifest_path.parent, 

503 ) 

504 ) 

505 

506 

507def _find_target( 

508 metadata: dict[str, Any], 

509 src_path: pathlib.Path, 

510) -> tuple[dict[str, Any], dict[str, Any]] | None: 

511 for package in metadata["packages"]: 511 ↛ 517line 511 didn't jump to line 517 because the loop on line 511 didn't complete

512 for target in package["targets"]: 

513 # A `src_path` may contain `..` 

514 # The path may not actually exist by being excluded from the package. 

515 if pathlib.Path(target["src_path"]).resolve() == src_path: 

516 return package, target 

517 return None 

518 

519 

520def _ensure_target(metadata: dict[str, Any], src_path: pathlib.Path) -> dict[str, Any]: 

521 package_and_target = _find_target(metadata, src_path) 

522 if not package_and_target: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 raise RuntimeError(f"{src_path} is not a main source file of any target") 

524 _, target = package_and_target 

525 return target 

526 

527 

528def _crate_name(target: dict[str, Any]) -> bool: 

529 return target["name"].replace("-", "_") 

530 

531 

532def _is_build(target: dict[str, Any]) -> bool: 

533 return target["kind"] == ["custom-build"] 

534 

535 

536def _is_lib_or_proc_macro(target: dict[str, Any]) -> bool: 

537 return target["kind"] in [["lib"], ["proc-macro"]] 

538 

539 

540def _is_bin(target: dict[str, Any]) -> bool: 

541 return target["kind"] == ["bin"] 

542 

543 

544def _is_example(target: dict[str, Any]) -> bool: 

545 return target["kind"] == ["example"] 

546 

547 

548def _need_dev_deps(target: dict[str, Any]) -> bool: 

549 # Comes from https://docs.rs/cargo/0.49.0/cargo/ops/enum.CompileFilter.html#method.need_dev_deps 

550 return not (_is_lib_or_proc_macro(target) or _is_bin(target)) 

551 

552 

553def _target_option(target: dict[str, Any]) -> list[str]: 

554 if target["kind"] == ["bin"]: 554 ↛ 556line 554 didn't jump to line 556 because the condition on line 554 was always true

555 return ["--bin", target["name"]] 

556 if target["kind"] == ["example"]: 

557 return ["--example", target["name"]] 

558 if target["kind"] == ["test"]: 

559 return ["--test", target["name"]] 

560 if target["kind"] == ["bench"]: 

561 return ["--bench", target["name"]] 

562 return ["--lib"]