Coverage for src / competitive_verifier / oj / verify / languages / rust.py: 70%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-05 16:00 +0000

1import abc 

2import enum 

3import functools 

4import itertools 

5import json 

6import pathlib 

7import shutil 

8from collections import defaultdict 

9from collections.abc import Sequence 

10from enum import Enum 

11from logging import getLogger 

12from typing import Any, Literal 

13 

14from pydantic import BaseModel 

15 

16from competitive_verifier.exec import command_stdout 

17from competitive_verifier.models import ShellCommand 

18from competitive_verifier.oj.verify.models import ( 

19 Language, 

20 LanguageEnvironment, 

21 OjVerifyLanguageConfig, 

22) 

23from competitive_verifier.util import read_text_normalized 

24 

25# ruff: noqa: PLR2004 

26 

27logger = getLogger(__name__) 

28 

29_metadata_by_manifest_path: dict[pathlib.Path, dict[str, Any]] = {} 

30_cargo_checked_workspaces: set[pathlib.Path] = set() 

31_related_source_files_by_workspace: dict[ 

32 pathlib.Path, dict[pathlib.Path, frozenset[pathlib.Path]] 

33] = {} 

34 

35 

36class OjVerifyRustListDependenciesBackend(BaseModel): 

37 kind: Literal["none", "cargo-udeps"] 

38 toolchain: str | None = None 

39 

40 

41class OjVerifyRustConfig(OjVerifyLanguageConfig): 

42 list_dependencies_backend: OjVerifyRustListDependenciesBackend | None = None 

43 

44 

45class _ListDependenciesBackend: 

46 @abc.abstractmethod 

47 def list_dependencies( 

48 self, path: pathlib.Path, *, basedir: pathlib.Path 

49 ) -> list[pathlib.Path]: ... 

50 

51 

52class _NoBackend(_ListDependenciesBackend): 

53 def list_dependencies( 

54 self, path: pathlib.Path, *, basedir: pathlib.Path 

55 ) -> list[pathlib.Path]: 

56 return _list_dependencies_by_crate( 

57 path, basedir=basedir, cargo_udeps_toolchain=None 

58 ) 

59 

60 

61class _CargoUdeps(_ListDependenciesBackend): 

62 toolchain: str = "nightly" 

63 

64 def __init__(self, *, toolchain: str | None): 

65 if toolchain is not None: 

66 self.toolchain = toolchain 

67 

68 def list_dependencies( 

69 self, path: pathlib.Path, *, basedir: pathlib.Path 

70 ) -> list[pathlib.Path]: 

71 return _list_dependencies_by_crate( 

72 path, basedir=basedir, cargo_udeps_toolchain=self.toolchain 

73 ) 

74 

75 

76@functools.cache 

77def _list_dependencies_by_crate( 

78 path: pathlib.Path, *, basedir: pathlib.Path, cargo_udeps_toolchain: str | None 

79) -> list[pathlib.Path]: 

80 """The `list_dependencies` implementation for `_NoBackend` and `CargoUdeps`. 

81 

82 Args: 

83 path (pathlib.Path): A main source file path of a target 

84 basedir (pathlib.Path): A parameter from `Language.list_dependencies` 

85 cargo_udeps_toolchain (str | None): If not `None`, use `cargo-udeps` with the specified toolchain to detect unused dependencies 

86 Returns: 

87 list[pathlib.Path]: A list of dependent `.rs` file paths 

88 Raises: 

89 RuntimeError: If any cargo command fails 

90 """ 

91 path = basedir / path 

92 

93 # We regard that a generated file does not depend on any files. 

94 for parent in path.parents: 

95 if (parent.parent / "Cargo.toml").exists() and parent.parts[-1] == "target": 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 logger.warning("This is a generated file!: %s", path) 

97 return [path] 

98 

99 metadata = _cargo_metadata(cwd=path.parent) 

100 

101 # First, collects source files in the same crate. 

102 common_result = set( 

103 _source_files_in_same_targets(path, _related_source_files(basedir, metadata)) 

104 ) 

105 

106 main_package_and_target = _find_target(metadata, path) 

107 if not main_package_and_target: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 return sorted(common_result) 

109 main_package, main_target = main_package_and_target 

110 

111 packages_by_id = {p["id"]: p for p in metadata["packages"]} 

112 

113 class DependencyNamespace(Enum): 

114 NORMAL_DEVELOPMENT = enum.auto() 

115 BUILD = enum.auto() 

116 

117 @classmethod 

118 def from_dep_kind(cls, kind: str): 

119 if kind == "build": 

120 return cls.BUILD 

121 return cls.NORMAL_DEVELOPMENT 

122 

123 # Collect the `(|dev-|build-)dependencies` into a <is a `build-dependency`> → (<"extern crate name"> → <package>) dictionary. 

124 dependencies: defaultdict[DependencyNamespace, dict[str, dict[str, Any]]] = ( 

125 defaultdict(dict) 

126 ) 

127 for dep in next( 

128 n["deps"] for n in metadata["resolve"]["nodes"] if n["id"] == main_package["id"] 

129 ): 

130 if _need_dev_deps(main_target) or any( 130 ↛ 136line 130 didn't jump to line 136 because the condition on line 130 was always true

131 k["kind"] is None for k in dep["dep_kinds"] 

132 ): 

133 dependencies[DependencyNamespace.NORMAL_DEVELOPMENT][dep["name"]] = ( 

134 packages_by_id[dep["pkg"]] 

135 ) 

136 if any(k["kind"] == "build" for k in dep["dep_kinds"]): 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 dependencies[DependencyNamespace.BUILD][dep["name"]] = packages_by_id[ 

138 dep["pkg"] 

139 ] 

140 

141 # If `cargo_udeps_toolchain` is present, collects packages that are "unused" by `target`. 

142 unused_packages: defaultdict[DependencyNamespace, set[Any]] = defaultdict(set) 

143 if cargo_udeps_toolchain is not None: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 explicit_names_in_toml = { 

145 (DependencyNamespace.from_dep_kind(d["kind"]), d["rename"]) 

146 for d in main_package["dependencies"] 

147 if d["rename"] 

148 } 

149 if not shutil.which("cargo-udeps"): 

150 raise RuntimeError("`cargo-udeps` not in $PATH") 

151 args: list[str] = [ 

152 "rustup", 

153 "run", 

154 cargo_udeps_toolchain, 

155 "cargo", 

156 "udeps", 

157 "--output", 

158 "json", 

159 "--manifest-path", 

160 main_package["manifest_path"], 

161 *_target_option(main_target), 

162 ] 

163 unused_deps = json.loads( 

164 command_stdout(args, cwd=metadata["workspace_root"], check=False) 

165 )["unused_deps"].values() 

166 unused_dep = next( 

167 ( 

168 u 

169 for u in unused_deps 

170 if u["manifest_path"] == main_package["manifest_path"] 

171 ), 

172 None, 

173 ) 

174 if unused_dep: 

175 names_in_toml: list[tuple[DependencyNamespace, Any]] = [ 

176 (DependencyNamespace.NORMAL_DEVELOPMENT, name_in_toml) 

177 for name_in_toml in [*unused_dep["normal"], *unused_dep["development"]] 

178 ] 

179 names_in_toml.extend( 

180 (DependencyNamespace.BUILD, name_in_toml) 

181 for name_in_toml in unused_dep["build"] 

182 ) 

183 for dependency_namespace, name_in_toml in names_in_toml: 

184 if (dependency_namespace, name_in_toml) in explicit_names_in_toml: 

185 # If the `name_in_toml` is explicitly renamed one, it equals to the `extern_crate_name`. 

186 unused_package: Any = dependencies[dependency_namespace][ 

187 name_in_toml 

188 ]["id"] 

189 else: 

190 # Otherwise, it equals to the `package.name`. 

191 unused_package = next( 

192 p["id"] 

193 for p in dependencies[dependency_namespace].values() 

194 if p["name"] == name_in_toml 

195 ) 

196 unused_packages[dependency_namespace].add(unused_package) 

197 

198 # Finally, adds source files related to the depended crates except: 

199 # 

200 # - those detected by cargo-udeps 

201 # - those come from Crates.io or Git repositories (e.g. `proconio`, other people's libraries including `ac-library-rs`) 

202 

203 # `main_package` should always be included. 

204 # Note that cargo-udeps does not detect it if it is unused. 

205 # https://github.com/est31/cargo-udeps/pull/35 

206 depended_packages = [main_package] 

207 for dependency_namespace, values in dependencies.items(): 

208 depended_packages.extend( 

209 depended_package 

210 for depended_package in values.values() 

211 if ( 

212 depended_package["id"] not in unused_packages[dependency_namespace] 

213 and not depended_package["source"] 

214 ) 

215 ) 

216 

217 ret = common_result 

218 

219 for depended_package in depended_packages: 

220 depended_targets = [ 

221 t 

222 for t in depended_package["targets"] 

223 if t != main_target and (_is_build(t) or _is_lib_or_proc_macro(t)) 

224 ] 

225 assert len(depended_targets) <= 2 

226 for depended_target in depended_targets: 

227 related_source_files = _related_source_files( 

228 basedir, 

229 _cargo_metadata_by_manifest_path( 

230 pathlib.Path(depended_package["manifest_path"]) 

231 ), 

232 ) 

233 ret |= _source_files_in_same_targets( 

234 pathlib.Path(depended_target["src_path"]).resolve(strict=True), 

235 related_source_files, 

236 ) 

237 return sorted(ret) 

238 

239 

240def _related_source_files( 

241 basedir: pathlib.Path, metadata: dict[str, Any] 

242) -> dict[pathlib.Path, frozenset[pathlib.Path]]: 

243 """Collects all of the `.rs` files recognized by a workspace. 

244 

245 Args: 

246 basedir (pathlib.Path): A parameter from `Language.list_dependencies` 

247 metadata (dict[str, Any]): "metadata" for a Cargo.toml file in the workspace 

248 Returns: 

249 dict[pathlib.Path, frozenset[pathlib.Path]]: A (main source file) → (other related files) map 

250 Raises: 

251 RuntimeError: If any cargo command fails 

252 """ 

253 if pathlib.Path(metadata["workspace_root"]) in _related_source_files_by_workspace: 

254 return _related_source_files_by_workspace[ 

255 pathlib.Path(metadata["workspace_root"]) 

256 ] 

257 

258 # Runs `cargo check` to generate `$target_directory/debug/deps/*.d`. 

259 if pathlib.Path(metadata["workspace_root"]) not in _cargo_checked_workspaces: 259 ↛ 273line 259 didn't jump to line 273 because the condition on line 259 was always true

260 command_stdout( 

261 [ 

262 "cargo", 

263 "check", 

264 "--manifest-path", 

265 str(pathlib.Path(metadata["workspace_root"], "Cargo.toml")), 

266 "--workspace", 

267 "--all-targets", 

268 ], 

269 cwd=metadata["workspace_root"], 

270 ) 

271 _cargo_checked_workspaces.add(pathlib.Path(metadata["workspace_root"])) 

272 

273 ret: dict[pathlib.Path, frozenset[pathlib.Path]] = {} 

274 

275 targets_in_workspace = itertools.chain.from_iterable( 

276 p["targets"] 

277 for p in metadata["packages"] 

278 if p["id"] in metadata["workspace_members"] 

279 ) 

280 for target in targets_in_workspace: 

281 # Finds the **latest** "dep-info" file that contains a line in the following format, and parses the line. 

282 # 

283 # ``` 

284 # <relative/absolute path to the `.d` file itself>: <relative/absolute path to the root source file> <relative/aboslute paths to the other related files>... 

285 # ``` 

286 # 

287 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1979-L1997 

288 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1824-L1830 

289 if _is_build(target): 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 dep_info_paths = pathlib.Path( 

291 metadata["target_directory"], "debug", "build" 

292 ).rglob(f"{_crate_name(target)}-*.d") 

293 elif _is_example(target): 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 dep_info_paths = pathlib.Path( 

295 metadata["target_directory"], "debug", "examples" 

296 ).glob(f"{_crate_name(target)}-*.d") 

297 else: 

298 dep_info_paths = pathlib.Path( 

299 metadata["target_directory"], "debug", "deps" 

300 ).glob(f"{_crate_name(target)}-*.d") 

301 for dep_info_path in sorted( 301 ↛ 332line 301 didn't jump to line 332 because the loop on line 301 didn't complete

302 dep_info_paths, key=lambda p: p.stat().st_mtime_ns, reverse=True 

303 ): 

304 dep_info = read_text_normalized(dep_info_path) 

305 for line in dep_info.splitlines(): 305 ↛ 329line 305 didn't jump to line 329 because the loop on line 305 didn't complete

306 ss = line.split(": ") 

307 if ( 

308 len(ss) == 2 

309 and pathlib.Path(metadata["workspace_root"], ss[0]) == dep_info_path 

310 ): 

311 paths: list[pathlib.Path] = [] 

312 it = iter(ss[1].split()) 

313 for s in it: 

314 ss = s 

315 while ss.endswith("\\"): 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 ss = ss.rstrip("\\") + " " + next(it) 

317 path = pathlib.Path(metadata["workspace_root"], s).resolve( 

318 strict=True 

319 ) 

320 # Ignores paths that don't start with the `basedir`. (e.g. `/dev/null`, `/usr/local/share/foo/bar`) 

321 if path.is_relative_to(basedir): 321 ↛ 313line 321 didn't jump to line 313 because the condition on line 321 was always true

322 paths.append(path) 

323 if paths[:1] == [ 323 ↛ 305line 323 didn't jump to line 305 because the condition on line 323 was always true

324 pathlib.Path(target["src_path"]).resolve(strict=True) 

325 ]: 

326 ret[paths[0]] = frozenset(paths[1:]) 

327 break 

328 else: 

329 continue 

330 break 

331 else: 

332 logger.error("no `.d` file for `%s`", target["name"]) 

333 

334 _related_source_files_by_workspace[pathlib.Path(metadata["workspace_root"])] = ret 

335 return ret 

336 

337 

338def _source_files_in_same_targets( 

339 path: pathlib.Path, 

340 related_source_files: dict[pathlib.Path, frozenset[pathlib.Path]], 

341) -> frozenset[pathlib.Path]: 

342 """Returns `.rs` file paths relating to `path`. 

343 

344 Args: 

345 path (pathlib.Path): A main source file path of a target 

346 related_source_files (dict[pathlib.Path, frozenset[pathlib.Path]]): A (main source file) → (other related files) map 

347 Returns: 

348 frozenset[pathlib.Path]: A set of `.rs` file paths relating to `path` 

349 Raises: 

350 RuntimeError: If `path` is not found in `related_source_files` and is not related to any other files. 

351 """ 

352 # If `p` is `src_path` of a target, it does not belong to any other target unless it's weirdly symlinked, 

353 if path in related_source_files: 353 ↛ 357line 353 didn't jump to line 357 because the condition on line 353 was always true

354 return frozenset({path, *related_source_files[path]}) 

355 

356 # Otherwise, it may be used by multiple targets with `#[path = ".."] mod foo;` or something. 

357 return frozenset( 

358 itertools.chain.from_iterable( 

359 {k, *v} for (k, v) in related_source_files.items() if path in v 

360 ) 

361 ) or frozenset({path}) 

362 

363 

364class RustLanguageEnvironment(LanguageEnvironment): 

365 @property 

366 def name(self) -> str: 

367 return "Rust" 

368 

369 def get_compile_command( 

370 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

371 ) -> ShellCommand: 

372 path = basedir / path 

373 metadata = _cargo_metadata(cwd=path.parent) 

374 target = _ensure_target(metadata, path) 

375 return ShellCommand( 

376 command=["cargo", "build", "--release", *_target_option(target)], 

377 cwd=path.parent, 

378 ) 

379 

380 def get_execute_command( 

381 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

382 ) -> str: 

383 path = basedir / path 

384 metadata = _cargo_metadata(cwd=path.parent) 

385 target = _ensure_target(metadata, path) 

386 return str( 

387 pathlib.Path( 

388 metadata["target_directory"], 

389 "release", 

390 *([] if _is_bin(target) else ["examples"]), 

391 target["name"], 

392 ) 

393 ) 

394 

395 

396class RustLanguage(Language): 

397 _list_dependencies_backend: _ListDependenciesBackend 

398 

399 def __init__(self, *, config: OjVerifyRustConfig | None): 

400 if config and config.list_dependencies_backend: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true

401 list_dependencies_backend = config.list_dependencies_backend 

402 

403 if list_dependencies_backend.kind == "none": 

404 self._list_dependencies_backend = _NoBackend() 

405 elif list_dependencies_backend.kind == "cargo-udeps": 

406 self._list_dependencies_backend = _CargoUdeps( 

407 toolchain=list_dependencies_backend.toolchain, 

408 ) 

409 else: 

410 raise RuntimeError( 

411 "expected 'none' or 'cargo-udeps' for `languages.rust.list_dependencies_backend.kind`" 

412 ) 

413 else: 

414 self._list_dependencies_backend = _NoBackend() 

415 

416 def list_dependencies( 

417 self, path: pathlib.Path, *, basedir: pathlib.Path 

418 ) -> list[pathlib.Path]: 

419 return self._list_dependencies_backend.list_dependencies(path, basedir=basedir) 

420 

421 def list_environments( 

422 self, path: pathlib.Path, *, basedir: pathlib.Path 

423 ) -> Sequence[RustLanguageEnvironment]: 

424 return [RustLanguageEnvironment()] 

425 

426 

427def _cargo_metadata(cwd: pathlib.Path) -> dict[str, Any]: 

428 """Returns "metadata" for a Cargo.toml file in `cwd` or its parent directories. 

429 

430 Args: 

431 cwd (pathlib.Path): The current working directory 

432 Returns: 

433 dict[str, Any]: Output of `cargo metadata` command 

434 Raises: 

435 ValueError: If `cwd` is not absolute or contains `..` 

436 RuntimeError: If no `Cargo.toml` is found 

437 """ 

438 if not cwd.is_absolute() or ".." in cwd.parts: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true

439 raise ValueError( 

440 f"the `cwd` parameter must be absolute and must not contain `..`: {cwd}" 

441 ) 

442 

443 # https://docs.rs/cargo/0.49.0/src/cargo/util/important_paths.rs.html#6-20 

444 for directory in [cwd, *cwd.parents]: 444 ↛ 448line 444 didn't jump to line 448 because the loop on line 444 didn't complete

445 manifest_path = directory / "Cargo.toml" 

446 if manifest_path.exists(): 

447 return _cargo_metadata_by_manifest_path(manifest_path) 

448 raise RuntimeError( 

449 f"could not find `Cargo.toml` in `{cwd}` or any parent directory" 

450 ) 

451 

452 

453def _cargo_metadata_by_manifest_path(manifest_path: pathlib.Path) -> dict[str, Any]: 

454 """Returns "metadata" for a certain `Cargo.toml`. 

455 

456 Args: 

457 manifest_path (pathlib.Path): Path to a `Cargo.toml` 

458 Returns: 

459 dict[str, Any]: Output of `cargo metadata` command 

460 Raises: 

461 RuntimeError: If the `cargo metadata` command fails 

462 """ 

463 if manifest_path in _metadata_by_manifest_path: 

464 return _metadata_by_manifest_path[manifest_path] 

465 

466 metadata = _run_cargo_metadata(manifest_path) 

467 root_manifest_path = pathlib.Path(metadata["workspace_root"], "Cargo.toml") 

468 if root_manifest_path != manifest_path: 468 ↛ 471line 468 didn't jump to line 471 because the condition on line 468 was always true

469 metadata = _run_cargo_metadata(root_manifest_path) 

470 

471 for key in [ 

472 root_manifest_path, 

473 *( 

474 pathlib.Path(p["manifest_path"]) 

475 for p in metadata["packages"] 

476 if p["id"] in metadata["workspace_members"] 

477 ), 

478 ]: 

479 _metadata_by_manifest_path[key] = metadata 

480 

481 return metadata 

482 

483 

484def _run_cargo_metadata(manifest_path: pathlib.Path) -> dict[str, Any]: 

485 """Runs `cargo metadata` for a certain `Cargo.toml`. 

486 

487 This function is considered to be executed just once for every Cargo.toml in the repository. 

488 For detailed information about `cargo metadata`, see: 

489 

490 - <https://doc.rust-lang.org/cargo/commands/cargo-metadata.html#output-format> 

491 - <https://docs.rs/cargo_metadata> 

492 

493 Args: 

494 manifest_path (pathlib.Path): Path to a `Cargo.toml` 

495 Returns: 

496 dict[str, Any]: Output of `cargo metadata` command 

497 Raises: 

498 RuntimeError: If the `cargo metadata` command fails 

499 """ 

500 return json.loads( 

501 command_stdout( 

502 [ 

503 "cargo", 

504 "metadata", 

505 "--format-version", 

506 "1", 

507 "--manifest-path", 

508 str(manifest_path), 

509 ], 

510 cwd=manifest_path.parent, 

511 ) 

512 ) 

513 

514 

515def _find_target( 

516 metadata: dict[str, Any], 

517 src_path: pathlib.Path, 

518) -> tuple[dict[str, Any], dict[str, Any]] | None: 

519 for package in metadata["packages"]: 519 ↛ 525line 519 didn't jump to line 525 because the loop on line 519 didn't complete

520 for target in package["targets"]: 

521 # A `src_path` may contain `..` 

522 # The path may not actually exist by being excluded from the package. 

523 if pathlib.Path(target["src_path"]).resolve() == src_path: 

524 return package, target 

525 return None 

526 

527 

528def _ensure_target(metadata: dict[str, Any], src_path: pathlib.Path) -> dict[str, Any]: 

529 package_and_target = _find_target(metadata, src_path) 

530 if not package_and_target: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 raise RuntimeError(f"{src_path} is not a main source file of any target") 

532 _, target = package_and_target 

533 return target 

534 

535 

536def _crate_name(target: dict[str, Any]) -> bool: 

537 return target["name"].replace("-", "_") 

538 

539 

540def _is_build(target: dict[str, Any]) -> bool: 

541 return target["kind"] == ["custom-build"] 

542 

543 

544def _is_lib_or_proc_macro(target: dict[str, Any]) -> bool: 

545 return target["kind"] in [["lib"], ["proc-macro"]] 

546 

547 

548def _is_bin(target: dict[str, Any]) -> bool: 

549 return target["kind"] == ["bin"] 

550 

551 

552def _is_example(target: dict[str, Any]) -> bool: 

553 return target["kind"] == ["example"] 

554 

555 

556def _need_dev_deps(target: dict[str, Any]) -> bool: 

557 # Comes from https://docs.rs/cargo/0.49.0/cargo/ops/enum.CompileFilter.html#method.need_dev_deps 

558 return not (_is_lib_or_proc_macro(target) or _is_bin(target)) 

559 

560 

561def _target_option(target: dict[str, Any]) -> list[str]: 

562 if target["kind"] == ["bin"]: 562 ↛ 564line 562 didn't jump to line 564 because the condition on line 562 was always true

563 return ["--bin", target["name"]] 

564 if target["kind"] == ["example"]: 

565 return ["--example", target["name"]] 

566 if target["kind"] == ["test"]: 

567 return ["--test", target["name"]] 

568 if target["kind"] == ["bench"]: 

569 return ["--bench", target["name"]] 

570 return ["--lib"]