Coverage for typed_stream/__main__.py: 78%

143 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-08 22:58 +0000

1# Licensed under the EUPL-1.2 or later. 

2# You may obtain a copy of the licence in all the official languages of the 

3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 

4 

5"""Easy interface for streamy handling of files.""" 

6 

7from __future__ import annotations 

8 

9import argparse 

10import builtins 

11import collections 

12import dataclasses 

13import inspect 

14import json 

15import operator 

16import sys 

17import textwrap 

18import traceback 

19from collections.abc import Callable, Mapping 

20from typing import cast 

21 

22from ._impl import Stream, functions 

23from ._impl._utils import count_required_positional_arguments 

24 

25MODULES = (builtins, collections, functions, operator) 

26MODULE_FULL_NAMES: Mapping[str, str] = { 

27 mod.__name__: ( 

28 "typed_stream.functions" if mod is functions else mod.__name__ 

29 ) 

30 for mod in MODULES 

31} 

32EVAL_GLOBALS: Mapping[str, object] = dict( 

33 Stream(MODULES[1:]) 

34 .flat_map(lambda mod: ((name, getattr(mod, name)) for name in mod.__all__)) 

35 .chain((mod.__name__, mod) for mod in MODULES) 

36) 

37 

38 

39@dataclasses.dataclass(frozen=True, slots=True) 

40class Options: 

41 """The options for this cool program.""" 

42 

43 debug: bool 

44 bytes: bool 

45 keep_ends: bool 

46 no_eval: bool 

47 actions: tuple[str, ...] 

48 

49 

50def run_program(options: Options) -> str | None: # noqa: C901 

51 # pylint: disable=too-complex, too-many-branches, too-many-locals, too-many-statements, line-too-long 

52 """Run the program with the options. 

53 

54 >>> import contextlib, io 

55 >>> in_ = sys.stdin 

56 >>> sys.stdin = io.StringIO("200\\n1000\\n30\\n4") 

57 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

58 ... run_program(Options( 

59 ... debug=True, 

60 ... bytes=False, 

61 ... keep_ends=False, 

62 ... no_eval=False, 

63 ... actions=("map", "int", "sum"), 

64 ... )) 

65 1234 

66 >>> print("\\n".join(err.getvalue().split("\\n")[-2:])) 

67 import sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).sum()) 

68 <BLANKLINE> 

69 >>> sys.stdin = io.StringIO("300\\n1000\\n20\\n4") 

70 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

71 ... run_program(Options( 

72 ... debug=True, 

73 ... bytes=False, 

74 ... keep_ends=False, 

75 ... no_eval=False, 

76 ... actions=("map", "int", "collect", "builtins.sum") 

77 ... )) 

78 1324 

79 >>> print("\\n".join(err.getvalue().split("\\n")[-2:])) 

80 import builtins,sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).collect(builtins.sum)) 

81 <BLANKLINE> 

82 >>> sys.stdin = io.StringIO("") 

83 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

84 ... ret = run_program(Options( 

85 ... debug=True, 

86 ... bytes=False, 

87 ... keep_ends=False, 

88 ... no_eval=False, 

89 ... actions=("map", "int", "(°_°)") 

90 ... )) 

91 >>> assert not err.getvalue() 

92 >>> assert isinstance(ret, str) 

93 >>> assert "SyntaxError" in ret 

94 >>> assert "(°_°)" in ret 

95 >>> sys.stdin = io.StringIO("") 

96 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

97 ... ret = run_program(Options( 

98 ... debug=True, 

99 ... bytes=False, 

100 ... keep_ends=False, 

101 ... no_eval=False, 

102 ... actions=("map", "xxx") 

103 ... )) 

104 >>> assert not err.getvalue() 

105 >>> assert isinstance(ret, str) 

106 >>> assert "NameError" in ret 

107 >>> assert "xxx" in ret 

108 >>> sys.stdin = io.StringIO("") 

109 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

110 ... ret = run_program(Options( 

111 ... debug=True, 

112 ... bytes=False, 

113 ... keep_ends=False, 

114 ... no_eval=True, 

115 ... actions=("map", "xxx") 

116 ... )) 

117 >>> assert not err.getvalue() 

118 >>> print(ret) 

119 Can't parse 'xxx' without eval. 

120 >>> sys.stdin = io.StringIO("") 

121 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

122 ... ret = run_program(Options( 

123 ... debug=True, 

124 ... bytes=False, 

125 ... keep_ends=False, 

126 ... no_eval=True, 

127 ... actions=("map", "int", "collect", "sum") 

128 ... )) 

129 >>> assert not err.getvalue() 

130 >>> print(ret) 

131 StreamableSequence object has no attribute 'sum'. \ 

132To pass it as argument to Stream.collect use 'builtins.sum'. 

133 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"200\\n1000\\n30\\n4")) 

134 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

135 ... run_program(Options( 

136 ... debug=True, 

137 ... bytes=True, 

138 ... keep_ends=True, 

139 ... no_eval=True, 

140 ... actions=("flat_map", "iter", "map", "hex", "collect", "Counter") 

141 ... )) 

142 {"0x32": 1, "0x30": 6, "0xa": 3, "0x31": 1, "0x33": 1, "0x34": 1} 

143 >>> print("\\n".join(err.getvalue().split("\\n")[-2:])) 

144 import collections,json,sys,typed_stream;print(json.dumps(dict(typed_stream.Stream(sys.stdin.buffer).flat_map(iter).map(hex).collect(collections.Counter)))) 

145 <BLANKLINE> 

146 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"1\\n2\\n3\\n4")) 

147 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

148 ... run_program(Options( 

149 ... debug=True, 

150 ... bytes=False, 

151 ... keep_ends=True, 

152 ... no_eval=True, 

153 ... actions=("map", "int", "filter", "is_even", "map", "mul", "10") 

154 ... )) 

155 20 

156 40 

157 >>> f"\\n{err.getvalue()}".endswith( 

158 ... "Stream(sys.stdin).map(int).filter(typed_stream.functions.is_even)" 

159 ... ".map(operator.mul,10).for_each(print)\\n" 

160 ... ) 

161 True 

162 >>> sys.stdin = in_ 

163 """ # noqa: D301 

164 imports: set[str] = {"typed_stream", "sys"} 

165 code: list[str] 

166 stream: Stream[bytes] | Stream[str] | object 

167 if options.bytes: 

168 stream = Stream(sys.stdin.buffer) 

169 code = ["typed_stream.Stream(sys.stdin.buffer)"] 

170 if not options.keep_ends: 

171 code.append(r""".map(bytes.removesuffix, b"\n")""") 

172 stream = stream.map(bytes.removesuffix, b"\n") 

173 else: 

174 stream = Stream(sys.stdin) 

175 code = ["typed_stream.Stream(sys.stdin)"] 

176 if not options.keep_ends: 

177 code.append(r""".map(str.removesuffix, "\n")""") 

178 stream = stream.map(str.removesuffix, "\n") 

179 

180 method: None | Callable[[object], object] = None 

181 args: list[object] = [] 

182 for index, action in Stream(options.actions).enumerate(1): 

183 if action.lstrip().startswith("_"): 

184 return f"{index}: {action!r} isn't allowed to start with '_'." 

185 args_left = ( 

186 count_required_positional_arguments(method) - len(args) 

187 if method 

188 else 0 

189 ) 

190 if (not args_left or args_left < 0) and hasattr(stream, action): 

191 if method: 

192 stream = method(*args) # pylint: disable=not-callable 

193 args.clear() 

194 if code and code[-1] == ",": 

195 code[-1] = ")" 

196 else: 

197 code.append(")") 

198 if not hasattr(stream, action): 

199 type_name = ( 

200 type(stream).__qualname__ or type(stream).__name__ 

201 ) 

202 meth_name = method.__qualname__ or method.__name__ 

203 if hasattr(builtins, action): 

204 fix = f"builtins.{action}" 

205 confident = True 

206 else: 

207 fix = f"({action})" 

208 confident = action in EVAL_GLOBALS 

209 use = "use" if confident else "try" 

210 return ( 

211 f"{type_name} object has no attribute {action!r}. " 

212 f"To pass it as argument to {meth_name} {use} {fix!r}." 

213 ) 

214 method = getattr(stream, action) 

215 code.append(f".{action}(") 

216 else: 

217 if not method: 

218 return f"{action!r} needs to be a Stream method." 

219 full_action_qual: str 

220 if action.isspace(): 

221 args.append(action) 

222 full_action_qual = repr(action) 

223 elif action.isdigit(): 

224 args.append(int(action)) 

225 full_action_qual = action 

226 elif action in functions.__all__: 

227 args.append(getattr(functions, action)) 

228 full_action_qual = f"typed_stream.functions.{action}" # TODO?? 

229 elif action in collections.__all__: 

230 args.append(getattr(collections, action)) 

231 full_action_qual = f"collections.{action}" 

232 imports.add("collections") 

233 elif action in operator.__all__: 

234 args.append(getattr(operator, action)) 

235 full_action_qual = f"operator.{action}" 

236 imports.add("operator") 

237 elif hasattr(builtins, action): 

238 args.append(getattr(builtins, action)) 

239 full_action_qual = f"{action}" 

240 elif options.no_eval: 

241 return f"Can't parse {action!r} without eval." 

242 else: 

243 try: 

244 # pylint: disable-next=eval-used 

245 arg = eval(action, dict(EVAL_GLOBALS)) # nosec: B307 

246 # pylint: disable-next=broad-except 

247 except BaseException as exc: # noqa: B036 

248 err = traceback.format_exception_only(exc)[-1].strip() 

249 return f"Failed to evaluate {action!r}: {err}" 

250 

251 imports.update( 

252 { 

253 full 

254 for mod, full in MODULE_FULL_NAMES.items() 

255 if action.startswith(f"{mod}.") 

256 } 

257 ) 

258 args.append(arg) 

259 full_action_qual = action 

260 code.extend((full_action_qual, ",")) 

261 if method: 

262 if code and code[-1] == ",": 

263 code[-1] = ")" 

264 else: 

265 code.append(")") 

266 stream = method(*args) 

267 

268 if isinstance(stream, Stream): 

269 # pytype: disable=attribute-error 

270 stream.for_each(print) 

271 # pytype: enable=attribute-error 

272 code.append(".for_each(print)") 

273 elif isinstance(stream, Mapping): 

274 imports.add("json") 

275 print(json.dumps(dict(stream))) 

276 code.insert(0, "print(json.dumps(dict(") 

277 code.append(")))") 

278 elif stream: 

279 print(stream) 

280 code.insert(0, "print(") 

281 code.append(")") 

282 

283 code.insert(0, f"import {','.join(sorted(imports))};") 

284 

285 sys.stdout.flush() 

286 

287 if options.debug: 

288 print("".join(code), file=sys.stderr, flush=True) 

289 return None 

290 

291 

292def main() -> str | None: # noqa: C901 

293 """Parse arguments and then run the program.""" 

294 arg_parser = argparse.ArgumentParser( 

295 prog="typed_stream", 

296 description="Easy interface for streamy handling of files.", 

297 epilog="Do not run this with arguments from an untrusted source.", 

298 ) 

299 arg_parser.add_argument("--debug", action="store_true") 

300 arg_parser.add_argument("--bytes", action="store_true") 

301 arg_parser.add_argument("--keep-ends", action="store_true") 

302 # arg_parser.add_argument("--no-eval", action="store_true") 

303 arg_parser.add_argument("actions", nargs="+") 

304 

305 args = arg_parser.parse_args() 

306 options = Options( 

307 debug=bool(args.debug), 

308 bytes=bool(args.bytes), 

309 keep_ends=bool(args.keep_ends), 

310 no_eval=False, 

311 actions=tuple(map(str, args.actions)), 

312 ) 

313 if options.actions and options.actions[0] == "help": 

314 if not (methods := options.actions[1:]): 

315 arg_parser.parse_args([sys.argv[0], "--help"]) 

316 

317 for i, name in enumerate(methods): 

318 if i: 

319 print() 

320 print(f"Stream.{name}:") 

321 if not (method := getattr(Stream, name, None)): 

322 to_print = "Does not exist." 

323 elif not (doc := cast(str, getattr(method, "__doc__", ""))): 

324 to_print = "No docs." 

325 else: 

326 to_print = inspect.cleandoc(doc) 

327 

328 print(textwrap.indent(to_print, " " * 4)) 

329 return None 

330 

331 return run_program(options) 

332 

333 

334if __name__ == "__main__": 

335 sys.exit(main())