Coverage for src / competitive_verifier / oj / languages / user_defined.py: 85%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-04-26 12:38 +0900

1# Python Version: 3.x 

2import glob 

3import pathlib 

4from collections.abc import Sequence 

5from logging import getLogger 

6from tempfile import TemporaryDirectory 

7 

8from pydantic import BaseModel 

9 

10from competitive_verifier.models import ShellCommand, ShellCommandLike 

11 

12from . import special_comments 

13from .base import Language, LanguageEnvironment, OjVerifyUserDefinedConfig 

14 

15logger = getLogger(__name__) 

16 

17StrPath = pathlib.Path | str 

18 

19 

20class PathContainer(BaseModel): 

21 path: StrPath 

22 basedir: StrPath 

23 tempdir: StrPath 

24 

25 def _format(self, fmt: str): 

26 return fmt.format( 

27 path=str(self.path), 

28 dir=str(pathlib.Path(self.path).parent), 

29 basedir=str(self.basedir), 

30 tempdir=str(self.tempdir), 

31 ) 

32 

33 def format_command(self, command: ShellCommandLike) -> ShellCommandLike: 

34 if isinstance(command, str): 

35 return self._format(command) 

36 

37 if not isinstance(command, ShellCommand): 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true

38 return list(map(self._format, command)) 

39 

40 cmd = ( 

41 self._format(command.command) 

42 if isinstance(command.command, str) 

43 else list(map(self._format, command.command)) 

44 ) 

45 

46 env = ( 

47 {self._format(k): self._format(v) for k, v in command.env.items()} 

48 if command.env 

49 else None 

50 ) 

51 

52 cwd = pathlib.Path(self._format(str(command.cwd))) if command.cwd else None 

53 

54 return ShellCommand( 

55 command=cmd, 

56 env=env, 

57 cwd=cwd, 

58 ) 

59 

60 def parse_command(self, command: ShellCommandLike) -> ShellCommand: 

61 return ShellCommand.parse_command_like(self.format_command(command)) 

62 

63 

64class UserDefinedLanguageEnvironment(LanguageEnvironment): 

65 config: OjVerifyUserDefinedConfig 

66 _name: str 

67 

68 def __init__(self, *, name: str, config: OjVerifyUserDefinedConfig): 

69 self.config = config 

70 self._name = name 

71 

72 @property 

73 def name(self) -> str: 

74 return self._name 

75 

76 def get_compile_command( 

77 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

78 ) -> ShellCommandLike | None: 

79 if self.config.compile: 

80 return PathContainer( 

81 path=path, basedir=basedir, tempdir=tempdir 

82 ).format_command(self.config.compile) 

83 return None 

84 

85 def get_execute_command( 

86 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

87 ) -> ShellCommandLike: 

88 return PathContainer( 

89 path=path, basedir=basedir, tempdir=tempdir 

90 ).format_command(self.config.execute) 

91 

92 

93class UserDefinedLanguage(Language): 

94 extension: str 

95 config: OjVerifyUserDefinedConfig 

96 

97 def list_attributes( 

98 self, path: pathlib.Path, *, basedir: pathlib.Path 

99 ) -> dict[str, str]: 

100 if self.config.list_attributes is None: 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true

101 return dict(special_comments.list_special_comments(path)) 

102 

103 with TemporaryDirectory() as tempdir: 

104 text = ( 

105 PathContainer(path=path, basedir=basedir, tempdir=tempdir) 

106 .parse_command(self.config.list_attributes) 

107 .exec_command(text=False, capture_output=True) 

108 .stdout 

109 ) 

110 attributes: dict[str, str] = {} 

111 for line in text.splitlines(): 

112 key, _, value = line.decode().partition(" ") 

113 attributes[key] = value 

114 return attributes 

115 

116 def list_dependencies( 

117 self, path: pathlib.Path, *, basedir: pathlib.Path 

118 ) -> list[pathlib.Path]: 

119 if self.config.list_dependencies is None: 

120 logger.warning( 

121 "The functionality to list dependencies of .%s file is not implemented yet.", 

122 self.extension, 

123 ) 

124 return [ 

125 path 

126 for path in map(pathlib.Path, glob.glob("**", recursive=True)) 

127 if path.suffix == "." + self.extension 

128 ] 

129 

130 with TemporaryDirectory() as tempdir: 

131 text = ( 

132 PathContainer(path=path, basedir=basedir, tempdir=tempdir) 

133 .parse_command(self.config.list_dependencies) 

134 .exec_command(text=False, capture_output=True) 

135 .stdout 

136 ) 

137 dependencies = [path] 

138 dependencies.extend(pathlib.Path(line.decode()) for line in text.splitlines()) 

139 return dependencies 

140 

141 def bundle(self, path: pathlib.Path, *, basedir: pathlib.Path) -> bytes | None: 

142 if self.config.bundle is None: 

143 return None 

144 with TemporaryDirectory() as tempdir: 

145 return ( 

146 PathContainer(path=path, basedir=basedir, tempdir=tempdir) 

147 .parse_command(self.config.bundle) 

148 .exec_command(text=False, capture_output=True) 

149 .stdout 

150 ) 

151 

152 def list_environments( 

153 self, path: pathlib.Path, *, basedir: pathlib.Path 

154 ) -> Sequence[LanguageEnvironment]: 

155 return [UserDefinedLanguageEnvironment(name=self.extension, config=self.config)]