Coverage for src / competitive_verifier / oj / verify / languages / user_defined.py: 85%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-05 16:00 +0000

1# Python Version: 3.x 

2import glob 

3import pathlib 

4from collections.abc import Sequence 

5from logging import getLogger 

6from tempfile import TemporaryDirectory 

7 

8from pydantic import BaseModel 

9 

10from competitive_verifier.models import ShellCommand, ShellCommandLike 

11from competitive_verifier.oj.verify.models import ( 

12 Language, 

13 LanguageEnvironment, 

14 OjVerifyUserDefinedConfig, 

15) 

16 

17from . import special_comments 

18 

19logger = getLogger(__name__) 

20 

21StrPath = pathlib.Path | str 

22 

23 

24class PathContainer(BaseModel): 

25 path: StrPath 

26 basedir: StrPath 

27 tempdir: StrPath 

28 

29 def _format(self, fmt: str): 

30 return fmt.format( 

31 path=str(self.path), 

32 dir=str(pathlib.Path(self.path).parent), 

33 basedir=str(self.basedir), 

34 tempdir=str(self.tempdir), 

35 ) 

36 

37 def format_command(self, command: ShellCommandLike) -> ShellCommandLike: 

38 if isinstance(command, str): 

39 return self._format(command) 

40 

41 if not isinstance(command, ShellCommand): 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 return list(map(self._format, command)) 

43 

44 cmd = ( 

45 self._format(command.command) 

46 if isinstance(command.command, str) 

47 else list(map(self._format, command.command)) 

48 ) 

49 

50 env = ( 

51 {self._format(k): self._format(v) for k, v in command.env.items()} 

52 if command.env 

53 else None 

54 ) 

55 

56 cwd = pathlib.Path(self._format(str(command.cwd))) if command.cwd else None 

57 

58 return ShellCommand( 

59 command=cmd, 

60 env=env, 

61 cwd=cwd, 

62 ) 

63 

64 def parse_command(self, command: ShellCommandLike) -> ShellCommand: 

65 return ShellCommand.parse_command_like(self.format_command(command)) 

66 

67 

68class UserDefinedLanguageEnvironment(LanguageEnvironment): 

69 config: OjVerifyUserDefinedConfig 

70 _name: str 

71 

72 def __init__(self, *, name: str, config: OjVerifyUserDefinedConfig): 

73 self.config = config 

74 self._name = name 

75 

76 @property 

77 def name(self) -> str: 

78 return self._name 

79 

80 def get_compile_command( 

81 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

82 ) -> ShellCommandLike | None: 

83 if self.config.compile: 

84 return PathContainer( 

85 path=path, basedir=basedir, tempdir=tempdir 

86 ).format_command(self.config.compile) 

87 return None 

88 

89 def get_execute_command( 

90 self, path: pathlib.Path, *, basedir: pathlib.Path, tempdir: pathlib.Path 

91 ) -> ShellCommandLike: 

92 return PathContainer( 

93 path=path, basedir=basedir, tempdir=tempdir 

94 ).format_command(self.config.execute) 

95 

96 

97class UserDefinedLanguage(Language): 

98 extension: str 

99 config: OjVerifyUserDefinedConfig 

100 

101 def __init__(self, *, extension: str, config: OjVerifyUserDefinedConfig): 

102 self.extension = extension 

103 self.config = config 

104 

105 def list_attributes( 

106 self, path: pathlib.Path, *, basedir: pathlib.Path 

107 ) -> dict[str, str]: 

108 if self.config.list_attributes is None: 108 ↛ 111line 108 didn't jump to line 111 because the condition on line 108 was always true

109 return dict(special_comments.list_special_comments(path)) 

110 

111 with TemporaryDirectory() as tempdir: 

112 text = ( 

113 PathContainer(path=path, basedir=basedir, tempdir=tempdir) 

114 .parse_command(self.config.list_attributes) 

115 .exec_command(text=False, capture_output=True) 

116 .stdout 

117 ) 

118 attributes: dict[str, str] = {} 

119 for line in text.splitlines(): 

120 key, _, value = line.decode().partition(" ") 

121 attributes[key] = value 

122 return attributes 

123 

124 def list_dependencies( 

125 self, path: pathlib.Path, *, basedir: pathlib.Path 

126 ) -> list[pathlib.Path]: 

127 if self.config.list_dependencies is None: 

128 logger.warning( 

129 "The functionality to list dependencies of .%s file is not implemented yet.", 

130 self.extension, 

131 ) 

132 return [ 

133 path 

134 for path in map(pathlib.Path, glob.glob("**", recursive=True)) 

135 if path.suffix == "." + self.extension 

136 ] 

137 

138 with TemporaryDirectory() as tempdir: 

139 text = ( 

140 PathContainer(path=path, basedir=basedir, tempdir=tempdir) 

141 .parse_command(self.config.list_dependencies) 

142 .exec_command(text=False, capture_output=True) 

143 .stdout 

144 ) 

145 dependencies = [path] 

146 dependencies.extend(pathlib.Path(line.decode()) for line in text.splitlines()) 

147 return dependencies 

148 

149 def bundle(self, path: pathlib.Path, *, basedir: pathlib.Path) -> bytes | None: 

150 if self.config.bundle is None: 

151 return None 

152 with TemporaryDirectory() as tempdir: 

153 return ( 

154 PathContainer(path=path, basedir=basedir, tempdir=tempdir) 

155 .parse_command(self.config.bundle) 

156 .exec_command(text=False, capture_output=True) 

157 .stdout 

158 ) 

159 

160 def list_environments( 

161 self, path: pathlib.Path, *, basedir: pathlib.Path 

162 ) -> Sequence[LanguageEnvironment]: 

163 return [UserDefinedLanguageEnvironment(name=self.extension, config=self.config)]