Coverage for src / competitive_verifier / oj / verify / languages / rust.py: 70%
213 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-05 16:00 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-05 16:00 +0000
1import abc
2import enum
3import functools
4import itertools
5import json
6import pathlib
7import shutil
8from collections import defaultdict
9from collections.abc import Sequence
10from enum import Enum
11from logging import getLogger
12from typing import Any, Literal
14from pydantic import BaseModel
16from competitive_verifier.exec import command_stdout
17from competitive_verifier.models import ShellCommand
18from competitive_verifier.oj.verify.models import (
19 Language,
20 LanguageEnvironment,
21 OjVerifyLanguageConfig,
22)
23from competitive_verifier.util import read_text_normalized
25# ruff: noqa: PLR2004
27logger = getLogger(__name__)
29_metadata_by_manifest_path: dict[pathlib.Path, dict[str, Any]] = {}
30_cargo_checked_workspaces: set[pathlib.Path] = set()
31_related_source_files_by_workspace: dict[
32 pathlib.Path, dict[pathlib.Path, frozenset[pathlib.Path]]
33] = {}
36class OjVerifyRustListDependenciesBackend(BaseModel):
37 kind: Literal["none", "cargo-udeps"]
38 toolchain: str | None = None
41class OjVerifyRustConfig(OjVerifyLanguageConfig):
42 list_dependencies_backend: OjVerifyRustListDependenciesBackend | None = None
45class _ListDependenciesBackend:
46 @abc.abstractmethod
47 def list_dependencies(
48 self, path: pathlib.Path, *, basedir: pathlib.Path
49 ) -> list[pathlib.Path]: ...
52class _NoBackend(_ListDependenciesBackend):
53 def list_dependencies(
54 self, path: pathlib.Path, *, basedir: pathlib.Path
55 ) -> list[pathlib.Path]:
56 return _list_dependencies_by_crate(
57 path, basedir=basedir, cargo_udeps_toolchain=None
58 )
61class _CargoUdeps(_ListDependenciesBackend):
62 toolchain: str = "nightly"
64 def __init__(self, *, toolchain: str | None):
65 if toolchain is not None:
66 self.toolchain = toolchain
68 def list_dependencies(
69 self, path: pathlib.Path, *, basedir: pathlib.Path
70 ) -> list[pathlib.Path]:
71 return _list_dependencies_by_crate(
72 path, basedir=basedir, cargo_udeps_toolchain=self.toolchain
73 )
76@functools.cache
77def _list_dependencies_by_crate(
78 path: pathlib.Path, *, basedir: pathlib.Path, cargo_udeps_toolchain: str | None
79) -> list[pathlib.Path]:
80 """The `list_dependencies` implementation for `_NoBackend` and `CargoUdeps`.
82 Args:
83 path (pathlib.Path): A main source file path of a target
84 basedir (pathlib.Path): A parameter from `Language.list_dependencies`
85 cargo_udeps_toolchain (str | None): If not `None`, use `cargo-udeps` with the specified toolchain to detect unused dependencies
86 Returns:
87 list[pathlib.Path]: A list of dependent `.rs` file paths
88 Raises:
89 RuntimeError: If any cargo command fails
90 """
91 path = basedir / path
93 # We regard that a generated file does not depend on any files.
94 for parent in path.parents:
95 if (parent.parent / "Cargo.toml").exists() and parent.parts[-1] == "target": 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 logger.warning("This is a generated file!: %s", path)
97 return [path]
99 metadata = _cargo_metadata(cwd=path.parent)
101 # First, collects source files in the same crate.
102 common_result = set(
103 _source_files_in_same_targets(path, _related_source_files(basedir, metadata))
104 )
106 main_package_and_target = _find_target(metadata, path)
107 if not main_package_and_target: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 return sorted(common_result)
109 main_package, main_target = main_package_and_target
111 packages_by_id = {p["id"]: p for p in metadata["packages"]}
113 class DependencyNamespace(Enum):
114 NORMAL_DEVELOPMENT = enum.auto()
115 BUILD = enum.auto()
117 @classmethod
118 def from_dep_kind(cls, kind: str):
119 if kind == "build":
120 return cls.BUILD
121 return cls.NORMAL_DEVELOPMENT
123 # Collect the `(|dev-|build-)dependencies` into a <is a `build-dependency`> → (<"extern crate name"> → <package>) dictionary.
124 dependencies: defaultdict[DependencyNamespace, dict[str, dict[str, Any]]] = (
125 defaultdict(dict)
126 )
127 for dep in next(
128 n["deps"] for n in metadata["resolve"]["nodes"] if n["id"] == main_package["id"]
129 ):
130 if _need_dev_deps(main_target) or any( 130 ↛ 136line 130 didn't jump to line 136 because the condition on line 130 was always true
131 k["kind"] is None for k in dep["dep_kinds"]
132 ):
133 dependencies[DependencyNamespace.NORMAL_DEVELOPMENT][dep["name"]] = (
134 packages_by_id[dep["pkg"]]
135 )
136 if any(k["kind"] == "build" for k in dep["dep_kinds"]): 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 dependencies[DependencyNamespace.BUILD][dep["name"]] = packages_by_id[
138 dep["pkg"]
139 ]
141 # If `cargo_udeps_toolchain` is present, collects packages that are "unused" by `target`.
142 unused_packages: defaultdict[DependencyNamespace, set[Any]] = defaultdict(set)
143 if cargo_udeps_toolchain is not None: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 explicit_names_in_toml = {
145 (DependencyNamespace.from_dep_kind(d["kind"]), d["rename"])
146 for d in main_package["dependencies"]
147 if d["rename"]
148 }
149 if not shutil.which("cargo-udeps"):
150 raise RuntimeError("`cargo-udeps` not in $PATH")
151 args: list[str] = [
152 "rustup",
153 "run",
154 cargo_udeps_toolchain,
155 "cargo",
156 "udeps",
157 "--output",
158 "json",
159 "--manifest-path",
160 main_package["manifest_path"],
161 *_target_option(main_target),
162 ]
163 unused_deps = json.loads(
164 command_stdout(args, cwd=metadata["workspace_root"], check=False)
165 )["unused_deps"].values()
166 unused_dep = next(
167 (
168 u
169 for u in unused_deps
170 if u["manifest_path"] == main_package["manifest_path"]
171 ),
172 None,
173 )
174 if unused_dep:
175 names_in_toml: list[tuple[DependencyNamespace, Any]] = [
176 (DependencyNamespace.NORMAL_DEVELOPMENT, name_in_toml)
177 for name_in_toml in [*unused_dep["normal"], *unused_dep["development"]]
178 ]
179 names_in_toml.extend(
180 (DependencyNamespace.BUILD, name_in_toml)
181 for name_in_toml in unused_dep["build"]
182 )
183 for dependency_namespace, name_in_toml in names_in_toml:
184 if (dependency_namespace, name_in_toml) in explicit_names_in_toml:
185 # If the `name_in_toml` is explicitly renamed one, it equals to the `extern_crate_name`.
186 unused_package: Any = dependencies[dependency_namespace][
187 name_in_toml
188 ]["id"]
189 else:
190 # Otherwise, it equals to the `package.name`.
191 unused_package = next(
192 p["id"]
193 for p in dependencies[dependency_namespace].values()
194 if p["name"] == name_in_toml
195 )
196 unused_packages[dependency_namespace].add(unused_package)
198 # Finally, adds source files related to the depended crates except:
199 #
200 # - those detected by cargo-udeps
201 # - those come from Crates.io or Git repositories (e.g. `proconio`, other people's libraries including `ac-library-rs`)
203 # `main_package` should always be included.
204 # Note that cargo-udeps does not detect it if it is unused.
205 # https://github.com/est31/cargo-udeps/pull/35
206 depended_packages = [main_package]
207 for dependency_namespace, values in dependencies.items():
208 depended_packages.extend(
209 depended_package
210 for depended_package in values.values()
211 if (
212 depended_package["id"] not in unused_packages[dependency_namespace]
213 and not depended_package["source"]
214 )
215 )
217 ret = common_result
219 for depended_package in depended_packages:
220 depended_targets = [
221 t
222 for t in depended_package["targets"]
223 if t != main_target and (_is_build(t) or _is_lib_or_proc_macro(t))
224 ]
225 assert len(depended_targets) <= 2
226 for depended_target in depended_targets:
227 related_source_files = _related_source_files(
228 basedir,
229 _cargo_metadata_by_manifest_path(
230 pathlib.Path(depended_package["manifest_path"])
231 ),
232 )
233 ret |= _source_files_in_same_targets(
234 pathlib.Path(depended_target["src_path"]).resolve(strict=True),
235 related_source_files,
236 )
237 return sorted(ret)
240def _related_source_files(
241 basedir: pathlib.Path, metadata: dict[str, Any]
242) -> dict[pathlib.Path, frozenset[pathlib.Path]]:
243 """Collects all of the `.rs` files recognized by a workspace.
245 Args:
246 basedir (pathlib.Path): A parameter from `Language.list_dependencies`
247 metadata (dict[str, Any]): "metadata" for a Cargo.toml file in the workspace
248 Returns:
249 dict[pathlib.Path, frozenset[pathlib.Path]]: A (main source file) → (other related files) map
250 Raises:
251 RuntimeError: If any cargo command fails
252 """
253 if pathlib.Path(metadata["workspace_root"]) in _related_source_files_by_workspace:
254 return _related_source_files_by_workspace[
255 pathlib.Path(metadata["workspace_root"])
256 ]
258 # Runs `cargo check` to generate `$target_directory/debug/deps/*.d`.
259 if pathlib.Path(metadata["workspace_root"]) not in _cargo_checked_workspaces: 259 ↛ 273line 259 didn't jump to line 273 because the condition on line 259 was always true
260 command_stdout(
261 [
262 "cargo",
263 "check",
264 "--manifest-path",
265 str(pathlib.Path(metadata["workspace_root"], "Cargo.toml")),
266 "--workspace",
267 "--all-targets",
268 ],
269 cwd=metadata["workspace_root"],
270 )
271 _cargo_checked_workspaces.add(pathlib.Path(metadata["workspace_root"]))
273 ret: dict[pathlib.Path, frozenset[pathlib.Path]] = {}
275 targets_in_workspace = itertools.chain.from_iterable(
276 p["targets"]
277 for p in metadata["packages"]
278 if p["id"] in metadata["workspace_members"]
279 )
280 for target in targets_in_workspace:
281 # Finds the **latest** "dep-info" file that contains a line in the following format, and parses the line.
282 #
283 # ```
284 # <relative/absolute path to the `.d` file itself>: <relative/absolute path to the root source file> <relative/aboslute paths to the other related files>...
285 # ```
286 #
287 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1979-L1997
288 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1824-L1830
289 if _is_build(target): 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 dep_info_paths = pathlib.Path(
291 metadata["target_directory"], "debug", "build"
292 ).rglob(f"{_crate_name(target)}-*.d")
293 elif _is_example(target): 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 dep_info_paths = pathlib.Path(
295 metadata["target_directory"], "debug", "examples"
296 ).glob(f"{_crate_name(target)}-*.d")
297 else:
298 dep_info_paths = pathlib.Path(
299 metadata["target_directory"], "debug", "deps"
300 ).glob(f"{_crate_name(target)}-*.d")
301 for dep_info_path in sorted( 301 ↛ 332line 301 didn't jump to line 332 because the loop on line 301 didn't complete
302 dep_info_paths, key=lambda p: p.stat().st_mtime_ns, reverse=True
303 ):
304 dep_info = read_text_normalized(dep_info_path)
305 for line in dep_info.splitlines(): 305 ↛ 329line 305 didn't jump to line 329 because the loop on line 305 didn't complete
306 ss = line.split(": ")
307 if (
308 len(ss) == 2
309 and pathlib.Path(metadata["workspace_root"], ss[0]) == dep_info_path
310 ):
311 paths: list[pathlib.Path] = []
312 it = iter(ss[1].split())
313 for s in it:
314 ss = s
315 while ss.endswith("\\"): 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 ss = ss.rstrip("\\") + " " + next(it)
317 path = pathlib.Path(metadata["workspace_root"], s).resolve(
318 strict=True
319 )
320 # Ignores paths that don't start with the `basedir`. (e.g. `/dev/null`, `/usr/local/share/foo/bar`)
321 if path.is_relative_to(basedir): 321 ↛ 313line 321 didn't jump to line 313 because the condition on line 321 was always true
322 paths.append(path)
323 if paths[:1] == [ 323 ↛ 305line 323 didn't jump to line 305 because the condition on line 323 was always true
324 pathlib.Path(target["src_path"]).resolve(strict=True)
325 ]:
326 ret[paths[0]] = frozenset(paths[1:])
327 break
328 else:
329 continue
330 break
331 else:
332 logger.error("no `.d` file for `%s`", target["name"])
334 _related_source_files_by_workspace[pathlib.Path(metadata["workspace_root"])] = ret
335 return ret
338def _source_files_in_same_targets(
339 path: pathlib.Path,
340 related_source_files: dict[pathlib.Path, frozenset[pathlib.Path]],
341) -> frozenset[pathlib.Path]:
342 """Returns `.rs` file paths relating to `path`.
344 Args:
345 path (pathlib.Path): A main source file path of a target
346 related_source_files (dict[pathlib.Path, frozenset[pathlib.Path]]): A (main source file) → (other related files) map
347 Returns:
348 frozenset[pathlib.Path]: A set of `.rs` file paths relating to `path`
349 Raises:
350 RuntimeError: If `path` is not found in `related_source_files` and is not related to any other files.
351 """
352 # If `p` is `src_path` of a target, it does not belong to any other target unless it's weirdly symlinked,
353 if path in related_source_files: 353 ↛ 357line 353 didn't jump to line 357 because the condition on line 353 was always true
354 return frozenset({path, *related_source_files[path]})
356 # Otherwise, it may be used by multiple targets with `#[path = ".."] mod foo;` or something.
357 return frozenset(
358 itertools.chain.from_iterable(
359 {k, *v} for (k, v) in related_source_files.items() if path in v
360 )
361 ) or frozenset({path})
364class RustLanguageEnvironment(LanguageEnvironment):
365 @property
366 def name(self) -> str:
367 return "Rust"
369 def get_compile_command(
370 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path
371 ) -> ShellCommand:
372 path = basedir / path
373 metadata = _cargo_metadata(cwd=path.parent)
374 target = _ensure_target(metadata, path)
375 return ShellCommand(
376 command=["cargo", "build", "--release", *_target_option(target)],
377 cwd=path.parent,
378 )
380 def get_execute_command(
381 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path
382 ) -> str:
383 path = basedir / path
384 metadata = _cargo_metadata(cwd=path.parent)
385 target = _ensure_target(metadata, path)
386 return str(
387 pathlib.Path(
388 metadata["target_directory"],
389 "release",
390 *([] if _is_bin(target) else ["examples"]),
391 target["name"],
392 )
393 )
396class RustLanguage(Language):
397 _list_dependencies_backend: _ListDependenciesBackend
399 def __init__(self, *, config: OjVerifyRustConfig | None):
400 if config and config.list_dependencies_backend: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true
401 list_dependencies_backend = config.list_dependencies_backend
403 if list_dependencies_backend.kind == "none":
404 self._list_dependencies_backend = _NoBackend()
405 elif list_dependencies_backend.kind == "cargo-udeps":
406 self._list_dependencies_backend = _CargoUdeps(
407 toolchain=list_dependencies_backend.toolchain,
408 )
409 else:
410 raise RuntimeError(
411 "expected 'none' or 'cargo-udeps' for `languages.rust.list_dependencies_backend.kind`"
412 )
413 else:
414 self._list_dependencies_backend = _NoBackend()
416 def list_dependencies(
417 self, path: pathlib.Path, *, basedir: pathlib.Path
418 ) -> list[pathlib.Path]:
419 return self._list_dependencies_backend.list_dependencies(path, basedir=basedir)
421 def list_environments(
422 self, path: pathlib.Path, *, basedir: pathlib.Path
423 ) -> Sequence[RustLanguageEnvironment]:
424 return [RustLanguageEnvironment()]
427def _cargo_metadata(cwd: pathlib.Path) -> dict[str, Any]:
428 """Returns "metadata" for a Cargo.toml file in `cwd` or its parent directories.
430 Args:
431 cwd (pathlib.Path): The current working directory
432 Returns:
433 dict[str, Any]: Output of `cargo metadata` command
434 Raises:
435 ValueError: If `cwd` is not absolute or contains `..`
436 RuntimeError: If no `Cargo.toml` is found
437 """
438 if not cwd.is_absolute() or ".." in cwd.parts: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 raise ValueError(
440 f"the `cwd` parameter must be absolute and must not contain `..`: {cwd}"
441 )
443 # https://docs.rs/cargo/0.49.0/src/cargo/util/important_paths.rs.html#6-20
444 for directory in [cwd, *cwd.parents]: 444 ↛ 448line 444 didn't jump to line 448 because the loop on line 444 didn't complete
445 manifest_path = directory / "Cargo.toml"
446 if manifest_path.exists():
447 return _cargo_metadata_by_manifest_path(manifest_path)
448 raise RuntimeError(
449 f"could not find `Cargo.toml` in `{cwd}` or any parent directory"
450 )
453def _cargo_metadata_by_manifest_path(manifest_path: pathlib.Path) -> dict[str, Any]:
454 """Returns "metadata" for a certain `Cargo.toml`.
456 Args:
457 manifest_path (pathlib.Path): Path to a `Cargo.toml`
458 Returns:
459 dict[str, Any]: Output of `cargo metadata` command
460 Raises:
461 RuntimeError: If the `cargo metadata` command fails
462 """
463 if manifest_path in _metadata_by_manifest_path:
464 return _metadata_by_manifest_path[manifest_path]
466 metadata = _run_cargo_metadata(manifest_path)
467 root_manifest_path = pathlib.Path(metadata["workspace_root"], "Cargo.toml")
468 if root_manifest_path != manifest_path: 468 ↛ 471line 468 didn't jump to line 471 because the condition on line 468 was always true
469 metadata = _run_cargo_metadata(root_manifest_path)
471 for key in [
472 root_manifest_path,
473 *(
474 pathlib.Path(p["manifest_path"])
475 for p in metadata["packages"]
476 if p["id"] in metadata["workspace_members"]
477 ),
478 ]:
479 _metadata_by_manifest_path[key] = metadata
481 return metadata
484def _run_cargo_metadata(manifest_path: pathlib.Path) -> dict[str, Any]:
485 """Runs `cargo metadata` for a certain `Cargo.toml`.
487 This function is considered to be executed just once for every Cargo.toml in the repository.
488 For detailed information about `cargo metadata`, see:
490 - <https://doc.rust-lang.org/cargo/commands/cargo-metadata.html#output-format>
491 - <https://docs.rs/cargo_metadata>
493 Args:
494 manifest_path (pathlib.Path): Path to a `Cargo.toml`
495 Returns:
496 dict[str, Any]: Output of `cargo metadata` command
497 Raises:
498 RuntimeError: If the `cargo metadata` command fails
499 """
500 return json.loads(
501 command_stdout(
502 [
503 "cargo",
504 "metadata",
505 "--format-version",
506 "1",
507 "--manifest-path",
508 str(manifest_path),
509 ],
510 cwd=manifest_path.parent,
511 )
512 )
515def _find_target(
516 metadata: dict[str, Any],
517 src_path: pathlib.Path,
518) -> tuple[dict[str, Any], dict[str, Any]] | None:
519 for package in metadata["packages"]: 519 ↛ 525line 519 didn't jump to line 525 because the loop on line 519 didn't complete
520 for target in package["targets"]:
521 # A `src_path` may contain `..`
522 # The path may not actually exist by being excluded from the package.
523 if pathlib.Path(target["src_path"]).resolve() == src_path:
524 return package, target
525 return None
528def _ensure_target(metadata: dict[str, Any], src_path: pathlib.Path) -> dict[str, Any]:
529 package_and_target = _find_target(metadata, src_path)
530 if not package_and_target: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 raise RuntimeError(f"{src_path} is not a main source file of any target")
532 _, target = package_and_target
533 return target
536def _crate_name(target: dict[str, Any]) -> bool:
537 return target["name"].replace("-", "_")
540def _is_build(target: dict[str, Any]) -> bool:
541 return target["kind"] == ["custom-build"]
544def _is_lib_or_proc_macro(target: dict[str, Any]) -> bool:
545 return target["kind"] in [["lib"], ["proc-macro"]]
548def _is_bin(target: dict[str, Any]) -> bool:
549 return target["kind"] == ["bin"]
552def _is_example(target: dict[str, Any]) -> bool:
553 return target["kind"] == ["example"]
556def _need_dev_deps(target: dict[str, Any]) -> bool:
557 # Comes from https://docs.rs/cargo/0.49.0/cargo/ops/enum.CompileFilter.html#method.need_dev_deps
558 return not (_is_lib_or_proc_macro(target) or _is_bin(target))
561def _target_option(target: dict[str, Any]) -> list[str]:
562 if target["kind"] == ["bin"]: 562 ↛ 564line 562 didn't jump to line 564 because the condition on line 562 was always true
563 return ["--bin", target["name"]]
564 if target["kind"] == ["example"]:
565 return ["--example", target["name"]]
566 if target["kind"] == ["test"]:
567 return ["--test", target["name"]]
568 if target["kind"] == ["bench"]:
569 return ["--bench", target["name"]]
570 return ["--lib"]