Coverage for src / competitive_verifier / oj / languages / rust.py: 72%
212 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-26 12:38 +0900
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-26 12:38 +0900
1import abc
2import enum
3import functools
4import itertools
5import json
6import pathlib
7import shutil
8from collections import defaultdict
9from collections.abc import Sequence
10from enum import Enum
11from logging import getLogger
12from typing import Any, Literal
14from pydantic import BaseModel, Field
16from competitive_verifier.exec import command_stdout
17from competitive_verifier.models import ShellCommand
18from competitive_verifier.util import read_text_normalized
20from .base import Language, LanguageEnvironment, OjVerifyLanguageConfig
22# ruff: noqa: PLR2004
24logger = getLogger(__name__)
26_metadata_by_manifest_path: dict[pathlib.Path, dict[str, Any]] = {}
27_cargo_checked_workspaces: set[pathlib.Path] = set()
28_related_source_files_by_workspace: dict[
29 pathlib.Path, dict[pathlib.Path, frozenset[pathlib.Path]]
30] = {}
33class OjVerifyRustListDependenciesBackend(BaseModel):
34 kind: Literal["none", "cargo-udeps"]
35 toolchain: str | None = None
38class OjVerifyRustConfig(OjVerifyLanguageConfig):
39 list_dependencies_backend: OjVerifyRustListDependenciesBackend | None = None
42class _ListDependenciesBackend(abc.ABC, BaseModel):
43 @abc.abstractmethod
44 def list_dependencies(
45 self, path: pathlib.Path, *, basedir: pathlib.Path
46 ) -> list[pathlib.Path]: ...
49class _NoBackend(_ListDependenciesBackend):
50 def list_dependencies(
51 self, path: pathlib.Path, *, basedir: pathlib.Path
52 ) -> list[pathlib.Path]:
53 return _list_dependencies_by_crate(
54 path, basedir=basedir, cargo_udeps_toolchain=None
55 )
58class _CargoUdeps(_ListDependenciesBackend):
59 toolchain: str = "nightly"
61 def list_dependencies(
62 self, path: pathlib.Path, *, basedir: pathlib.Path
63 ) -> list[pathlib.Path]:
64 return _list_dependencies_by_crate(
65 path, basedir=basedir, cargo_udeps_toolchain=self.toolchain
66 )
69@functools.cache
70def _list_dependencies_by_crate(
71 path: pathlib.Path, *, basedir: pathlib.Path, cargo_udeps_toolchain: str | None
72) -> list[pathlib.Path]:
73 """The `list_dependencies` implementation for `_NoBackend` and `CargoUdeps`.
75 Args:
76 path (pathlib.Path): A main source file path of a target
77 basedir (pathlib.Path): A parameter from `Language.list_dependencies`
78 cargo_udeps_toolchain (str | None): If not `None`, use `cargo-udeps` with the specified toolchain to detect unused dependencies
79 Returns:
80 list[pathlib.Path]: A list of dependent `.rs` file paths
81 Raises:
82 RuntimeError: If any cargo command fails
83 """
84 path = basedir / path
86 # We regard that a generated file does not depend on any files.
87 for parent in path.parents:
88 if (parent.parent / "Cargo.toml").exists() and parent.parts[-1] == "target": 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 logger.warning("This is a generated file!: %s", path)
90 return [path]
92 metadata = _cargo_metadata(cwd=path.parent)
94 # First, collects source files in the same crate.
95 common_result = set(
96 _source_files_in_same_targets(path, _related_source_files(basedir, metadata))
97 )
99 main_package_and_target = _find_target(metadata, path)
100 if not main_package_and_target: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 return sorted(common_result)
102 main_package, main_target = main_package_and_target
104 packages_by_id = {p["id"]: p for p in metadata["packages"]}
106 class DependencyNamespace(Enum):
107 NORMAL_DEVELOPMENT = enum.auto()
108 BUILD = enum.auto()
110 @classmethod
111 def from_dep_kind(cls, kind: str):
112 if kind == "build":
113 return cls.BUILD
114 return cls.NORMAL_DEVELOPMENT
116 # Collect the `(|dev-|build-)dependencies` into a <is a `build-dependency`> → (<"extern crate name"> → <package>) dictionary.
117 dependencies: defaultdict[DependencyNamespace, dict[str, dict[str, Any]]] = (
118 defaultdict(dict)
119 )
120 for dep in next(
121 n["deps"] for n in metadata["resolve"]["nodes"] if n["id"] == main_package["id"]
122 ):
123 if _need_dev_deps(main_target) or any( 123 ↛ 129line 123 didn't jump to line 129 because the condition on line 123 was always true
124 k["kind"] is None for k in dep["dep_kinds"]
125 ):
126 dependencies[DependencyNamespace.NORMAL_DEVELOPMENT][dep["name"]] = (
127 packages_by_id[dep["pkg"]]
128 )
129 if any(k["kind"] == "build" for k in dep["dep_kinds"]): 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 dependencies[DependencyNamespace.BUILD][dep["name"]] = packages_by_id[
131 dep["pkg"]
132 ]
134 # If `cargo_udeps_toolchain` is present, collects packages that are "unused" by `target`.
135 unused_packages: defaultdict[DependencyNamespace, set[Any]] = defaultdict(set)
136 if cargo_udeps_toolchain is not None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 explicit_names_in_toml = {
138 (DependencyNamespace.from_dep_kind(d["kind"]), d["rename"])
139 for d in main_package["dependencies"]
140 if d["rename"]
141 }
142 if not shutil.which("cargo-udeps"):
143 raise RuntimeError("`cargo-udeps` not in $PATH")
144 args: list[str] = [
145 "rustup",
146 "run",
147 cargo_udeps_toolchain,
148 "cargo",
149 "udeps",
150 "--output",
151 "json",
152 "--manifest-path",
153 main_package["manifest_path"],
154 *_target_option(main_target),
155 ]
156 unused_deps = json.loads(
157 command_stdout(args, cwd=metadata["workspace_root"], check=False)
158 )["unused_deps"].values()
159 unused_dep = next(
160 (
161 u
162 for u in unused_deps
163 if u["manifest_path"] == main_package["manifest_path"]
164 ),
165 None,
166 )
167 if unused_dep:
168 names_in_toml: list[tuple[DependencyNamespace, Any]] = [
169 (DependencyNamespace.NORMAL_DEVELOPMENT, name_in_toml)
170 for name_in_toml in [*unused_dep["normal"], *unused_dep["development"]]
171 ]
172 names_in_toml.extend(
173 (DependencyNamespace.BUILD, name_in_toml)
174 for name_in_toml in unused_dep["build"]
175 )
176 for dependency_namespace, name_in_toml in names_in_toml:
177 if (dependency_namespace, name_in_toml) in explicit_names_in_toml:
178 # If the `name_in_toml` is explicitly renamed one, it equals to the `extern_crate_name`.
179 unused_package: Any = dependencies[dependency_namespace][
180 name_in_toml
181 ]["id"]
182 else:
183 # Otherwise, it equals to the `package.name`.
184 unused_package = next(
185 p["id"]
186 for p in dependencies[dependency_namespace].values()
187 if p["name"] == name_in_toml
188 )
189 unused_packages[dependency_namespace].add(unused_package)
191 # Finally, adds source files related to the depended crates except:
192 #
193 # - those detected by cargo-udeps
194 # - those come from Crates.io or Git repositories (e.g. `proconio`, other people's libraries including `ac-library-rs`)
196 # `main_package` should always be included.
197 # Note that cargo-udeps does not detect it if it is unused.
198 # https://github.com/est31/cargo-udeps/pull/35
199 depended_packages = [main_package]
200 for dependency_namespace, values in dependencies.items():
201 depended_packages.extend(
202 depended_package
203 for depended_package in values.values()
204 if (
205 depended_package["id"] not in unused_packages[dependency_namespace]
206 and not depended_package["source"]
207 )
208 )
210 ret = common_result
212 for depended_package in depended_packages:
213 depended_targets = [
214 t
215 for t in depended_package["targets"]
216 if t != main_target and (_is_build(t) or _is_lib_or_proc_macro(t))
217 ]
218 assert len(depended_targets) <= 2
219 for depended_target in depended_targets:
220 related_source_files = _related_source_files(
221 basedir,
222 _cargo_metadata_by_manifest_path(
223 pathlib.Path(depended_package["manifest_path"])
224 ),
225 )
226 ret |= _source_files_in_same_targets(
227 pathlib.Path(depended_target["src_path"]).resolve(strict=True),
228 related_source_files,
229 )
230 return sorted(ret)
233def _related_source_files(
234 basedir: pathlib.Path, metadata: dict[str, Any]
235) -> dict[pathlib.Path, frozenset[pathlib.Path]]:
236 """Collects all of the `.rs` files recognized by a workspace.
238 Args:
239 basedir (pathlib.Path): A parameter from `Language.list_dependencies`
240 metadata (dict[str, Any]): "metadata" for a Cargo.toml file in the workspace
241 Returns:
242 dict[pathlib.Path, frozenset[pathlib.Path]]: A (main source file) → (other related files) map
243 Raises:
244 RuntimeError: If any cargo command fails
245 """
246 if pathlib.Path(metadata["workspace_root"]) in _related_source_files_by_workspace:
247 return _related_source_files_by_workspace[
248 pathlib.Path(metadata["workspace_root"])
249 ]
251 # Runs `cargo check` to generate `$target_directory/debug/deps/*.d`.
252 if pathlib.Path(metadata["workspace_root"]) not in _cargo_checked_workspaces: 252 ↛ 266line 252 didn't jump to line 266 because the condition on line 252 was always true
253 command_stdout(
254 [
255 "cargo",
256 "check",
257 "--manifest-path",
258 str(pathlib.Path(metadata["workspace_root"], "Cargo.toml")),
259 "--workspace",
260 "--all-targets",
261 ],
262 cwd=metadata["workspace_root"],
263 )
264 _cargo_checked_workspaces.add(pathlib.Path(metadata["workspace_root"]))
266 ret: dict[pathlib.Path, frozenset[pathlib.Path]] = {}
268 targets_in_workspace = itertools.chain.from_iterable(
269 p["targets"]
270 for p in metadata["packages"]
271 if p["id"] in metadata["workspace_members"]
272 )
273 for target in targets_in_workspace:
274 # Finds the **latest** "dep-info" file that contains a line in the following format, and parses the line.
275 #
276 # ```
277 # <relative/absolute path to the `.d` file itself>: <relative/absolute path to the root source file> <relative/aboslute paths to the other related files>...
278 # ```
279 #
280 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1979-L1997
281 # - https://github.com/rust-lang/cargo/blob/rust-1.49.0/src/cargo/core/compiler/fingerprint.rs#L1824-L1830
282 if _is_build(target): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 dep_info_paths = pathlib.Path(
284 metadata["target_directory"], "debug", "build"
285 ).rglob(f"{_crate_name(target)}-*.d")
286 elif _is_example(target): 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true
287 dep_info_paths = pathlib.Path(
288 metadata["target_directory"], "debug", "examples"
289 ).glob(f"{_crate_name(target)}-*.d")
290 else:
291 dep_info_paths = pathlib.Path(
292 metadata["target_directory"], "debug", "deps"
293 ).glob(f"{_crate_name(target)}-*.d")
294 for dep_info_path in sorted( 294 ↛ 325line 294 didn't jump to line 325 because the loop on line 294 didn't complete
295 dep_info_paths, key=lambda p: p.stat().st_mtime_ns, reverse=True
296 ):
297 dep_info = read_text_normalized(dep_info_path)
298 for line in dep_info.splitlines(): 298 ↛ 322line 298 didn't jump to line 322 because the loop on line 298 didn't complete
299 ss = line.split(": ")
300 if (
301 len(ss) == 2
302 and pathlib.Path(metadata["workspace_root"], ss[0]) == dep_info_path
303 ):
304 paths: list[pathlib.Path] = []
305 it = iter(ss[1].split())
306 for s in it:
307 ss = s
308 while ss.endswith("\\"): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 ss = ss.rstrip("\\") + " " + next(it)
310 path = pathlib.Path(metadata["workspace_root"], s).resolve(
311 strict=True
312 )
313 # Ignores paths that don't start with the `basedir`. (e.g. `/dev/null`, `/usr/local/share/foo/bar`)
314 if path.is_relative_to(basedir): 314 ↛ 306line 314 didn't jump to line 306 because the condition on line 314 was always true
315 paths.append(path)
316 if paths[:1] == [ 316 ↛ 298line 316 didn't jump to line 298 because the condition on line 316 was always true
317 pathlib.Path(target["src_path"]).resolve(strict=True)
318 ]:
319 ret[paths[0]] = frozenset(paths[1:])
320 break
321 else:
322 continue
323 break
324 else:
325 logger.error("no `.d` file for `%s`", target["name"])
327 _related_source_files_by_workspace[pathlib.Path(metadata["workspace_root"])] = ret
328 return ret
331def _source_files_in_same_targets(
332 path: pathlib.Path,
333 related_source_files: dict[pathlib.Path, frozenset[pathlib.Path]],
334) -> frozenset[pathlib.Path]:
335 """Returns `.rs` file paths relating to `path`.
337 Args:
338 path (pathlib.Path): A main source file path of a target
339 related_source_files (dict[pathlib.Path, frozenset[pathlib.Path]]): A (main source file) → (other related files) map
340 Returns:
341 frozenset[pathlib.Path]: A set of `.rs` file paths relating to `path`
342 Raises:
343 RuntimeError: If `path` is not found in `related_source_files` and is not related to any other files.
344 """
345 # If `p` is `src_path` of a target, it does not belong to any other target unless it's weirdly symlinked,
346 if path in related_source_files: 346 ↛ 350line 346 didn't jump to line 350 because the condition on line 346 was always true
347 return frozenset({path, *related_source_files[path]})
349 # Otherwise, it may be used by multiple targets with `#[path = ".."] mod foo;` or something.
350 return frozenset(
351 itertools.chain.from_iterable(
352 {k, *v} for (k, v) in related_source_files.items() if path in v
353 )
354 ) or frozenset({path})
357class RustLanguageEnvironment(LanguageEnvironment):
358 @property
359 def name(self) -> str:
360 return "Rust"
362 def get_compile_command(
363 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path
364 ) -> ShellCommand:
365 path = basedir / path
366 metadata = _cargo_metadata(cwd=path.parent)
367 target = _ensure_target(metadata, path)
368 return ShellCommand(
369 command=["cargo", "build", "--release", *_target_option(target)],
370 cwd=path.parent,
371 )
373 def get_execute_command(
374 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path
375 ) -> str:
376 path = basedir / path
377 metadata = _cargo_metadata(cwd=path.parent)
378 target = _ensure_target(metadata, path)
379 return str(
380 pathlib.Path(
381 metadata["target_directory"],
382 "release",
383 *([] if _is_bin(target) else ["examples"]),
384 target["name"],
385 )
386 )
389class RustLanguage(Language):
390 config: OjVerifyRustConfig = Field(default_factory=OjVerifyRustConfig)
392 @functools.cached_property
393 def _list_dependencies_backend(self) -> _ListDependenciesBackend:
394 list_dependencies_backend = self.config.list_dependencies_backend
395 if ( 395 ↛ 400line 395 didn't jump to line 400 because the condition on line 395 was always true
396 list_dependencies_backend is None
397 or list_dependencies_backend.kind == "none"
398 ):
399 return _NoBackend()
400 if list_dependencies_backend.kind == "cargo-udeps":
401 if toolchain := list_dependencies_backend.toolchain:
402 return _CargoUdeps(toolchain=toolchain)
403 return _CargoUdeps()
404 raise RuntimeError(
405 "expected 'none' or 'cargo-udeps' for `languages.rust.list_dependencies_backend.kind`"
406 )
408 def list_dependencies(
409 self, path: pathlib.Path, *, basedir: pathlib.Path
410 ) -> list[pathlib.Path]:
411 return self._list_dependencies_backend.list_dependencies(path, basedir=basedir)
413 def list_environments(
414 self, path: pathlib.Path, *, basedir: pathlib.Path
415 ) -> Sequence[RustLanguageEnvironment]:
416 return [RustLanguageEnvironment()]
419def _cargo_metadata(cwd: pathlib.Path) -> dict[str, Any]:
420 """Returns "metadata" for a Cargo.toml file in `cwd` or its parent directories.
422 Args:
423 cwd (pathlib.Path): The current working directory
424 Returns:
425 dict[str, Any]: Output of `cargo metadata` command
426 Raises:
427 ValueError: If `cwd` is not absolute or contains `..`
428 RuntimeError: If no `Cargo.toml` is found
429 """
430 if not cwd.is_absolute() or ".." in cwd.parts: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true
431 raise ValueError(
432 f"the `cwd` parameter must be absolute and must not contain `..`: {cwd}"
433 )
435 # https://docs.rs/cargo/0.49.0/src/cargo/util/important_paths.rs.html#6-20
436 for directory in [cwd, *cwd.parents]: 436 ↛ 440line 436 didn't jump to line 440 because the loop on line 436 didn't complete
437 manifest_path = directory / "Cargo.toml"
438 if manifest_path.exists():
439 return _cargo_metadata_by_manifest_path(manifest_path)
440 raise RuntimeError(
441 f"could not find `Cargo.toml` in `{cwd}` or any parent directory"
442 )
445def _cargo_metadata_by_manifest_path(manifest_path: pathlib.Path) -> dict[str, Any]:
446 """Returns "metadata" for a certain `Cargo.toml`.
448 Args:
449 manifest_path (pathlib.Path): Path to a `Cargo.toml`
450 Returns:
451 dict[str, Any]: Output of `cargo metadata` command
452 Raises:
453 RuntimeError: If the `cargo metadata` command fails
454 """
455 if manifest_path in _metadata_by_manifest_path:
456 return _metadata_by_manifest_path[manifest_path]
458 metadata = _run_cargo_metadata(manifest_path)
459 root_manifest_path = pathlib.Path(metadata["workspace_root"], "Cargo.toml")
460 if root_manifest_path != manifest_path: 460 ↛ 463line 460 didn't jump to line 463 because the condition on line 460 was always true
461 metadata = _run_cargo_metadata(root_manifest_path)
463 for key in [
464 root_manifest_path,
465 *(
466 pathlib.Path(p["manifest_path"])
467 for p in metadata["packages"]
468 if p["id"] in metadata["workspace_members"]
469 ),
470 ]:
471 _metadata_by_manifest_path[key] = metadata
473 return metadata
476def _run_cargo_metadata(manifest_path: pathlib.Path) -> dict[str, Any]:
477 """Runs `cargo metadata` for a certain `Cargo.toml`.
479 This function is considered to be executed just once for every Cargo.toml in the repository.
480 For detailed information about `cargo metadata`, see:
482 - <https://doc.rust-lang.org/cargo/commands/cargo-metadata.html#output-format>
483 - <https://docs.rs/cargo_metadata>
485 Args:
486 manifest_path (pathlib.Path): Path to a `Cargo.toml`
487 Returns:
488 dict[str, Any]: Output of `cargo metadata` command
489 Raises:
490 RuntimeError: If the `cargo metadata` command fails
491 """
492 return json.loads(
493 command_stdout(
494 [
495 "cargo",
496 "metadata",
497 "--format-version",
498 "1",
499 "--manifest-path",
500 str(manifest_path),
501 ],
502 cwd=manifest_path.parent,
503 )
504 )
507def _find_target(
508 metadata: dict[str, Any],
509 src_path: pathlib.Path,
510) -> tuple[dict[str, Any], dict[str, Any]] | None:
511 for package in metadata["packages"]: 511 ↛ 517line 511 didn't jump to line 517 because the loop on line 511 didn't complete
512 for target in package["targets"]:
513 # A `src_path` may contain `..`
514 # The path may not actually exist by being excluded from the package.
515 if pathlib.Path(target["src_path"]).resolve() == src_path:
516 return package, target
517 return None
520def _ensure_target(metadata: dict[str, Any], src_path: pathlib.Path) -> dict[str, Any]:
521 package_and_target = _find_target(metadata, src_path)
522 if not package_and_target: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 raise RuntimeError(f"{src_path} is not a main source file of any target")
524 _, target = package_and_target
525 return target
528def _crate_name(target: dict[str, Any]) -> bool:
529 return target["name"].replace("-", "_")
532def _is_build(target: dict[str, Any]) -> bool:
533 return target["kind"] == ["custom-build"]
536def _is_lib_or_proc_macro(target: dict[str, Any]) -> bool:
537 return target["kind"] in [["lib"], ["proc-macro"]]
540def _is_bin(target: dict[str, Any]) -> bool:
541 return target["kind"] == ["bin"]
544def _is_example(target: dict[str, Any]) -> bool:
545 return target["kind"] == ["example"]
548def _need_dev_deps(target: dict[str, Any]) -> bool:
549 # Comes from https://docs.rs/cargo/0.49.0/cargo/ops/enum.CompileFilter.html#method.need_dev_deps
550 return not (_is_lib_or_proc_macro(target) or _is_bin(target))
553def _target_option(target: dict[str, Any]) -> list[str]:
554 if target["kind"] == ["bin"]: 554 ↛ 556line 554 didn't jump to line 556 because the condition on line 554 was always true
555 return ["--bin", target["name"]]
556 if target["kind"] == ["example"]:
557 return ["--example", target["name"]]
558 if target["kind"] == ["test"]:
559 return ["--test", target["name"]]
560 if target["kind"] == ["bench"]:
561 return ["--bench", target["name"]]
562 return ["--lib"]