Coverage for typed_stream / __main__.py: 79%

145 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 21:20 +0000

1# Licensed under the EUPL-1.2 or later. 

2# You may obtain a copy of the licence in all the official languages of the 

3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 

4 

5"""Easy interface for streamy handling of files.""" 

6 

7from __future__ import annotations 

8 

9import argparse 

10import builtins 

11import collections 

12import dataclasses 

13import inspect 

14import json 

15import operator 

16import sys 

17import textwrap 

18import traceback 

19from collections.abc import Callable, Mapping 

20from typing import cast 

21 

22from ._impl import Stream, functions 

23from ._impl._utils import count_required_positional_arguments 

24 

25MODULES = (builtins, collections, functions, operator) 

26MODULE_FULL_NAMES: Mapping[str, str] = { 

27 mod.__name__: ( 

28 "typed_stream.functions" if mod is functions else mod.__name__ 

29 ) 

30 for mod in MODULES 

31} 

32EVAL_GLOBALS: Mapping[str, object] = dict( 

33 Stream(MODULES[1:]) 

34 .flat_map(lambda mod: ((name, getattr(mod, name)) for name in mod.__all__)) 

35 .chain((mod.__name__, mod) for mod in MODULES) 

36) 

37 

38 

39@dataclasses.dataclass(frozen=True, slots=True) 

40class Options: 

41 """The options for this cool program.""" 

42 

43 debug: bool 

44 bytes: bool 

45 keep_ends: bool 

46 no_eval: bool 

47 actions: tuple[str, ...] 

48 

49 

50def run_program(options: Options) -> str | None: # noqa: C901 

51 # pylint: disable=too-complex, too-many-branches, too-many-locals, too-many-statements, line-too-long 

52 """Run the program with the options. 

53 

54 >>> import contextlib, io 

55 >>> in_ = sys.stdin 

56 >>> sys.stdin = io.StringIO("200\\n1000\\n30\\n4") 

57 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

58 ... run_program(Options( 

59 ... debug=True, 

60 ... bytes=False, 

61 ... keep_ends=False, 

62 ... no_eval=False, 

63 ... actions=("map", "int", "sum"), 

64 ... )) 

65 1234 

66 >>> print("\\n".join(err.getvalue().split("\\n")[-2:])) 

67 import sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).sum()) 

68 <BLANKLINE> 

69 >>> sys.stdin = io.StringIO("300\\n1000\\n20\\n4") 

70 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

71 ... run_program(Options( 

72 ... debug=True, 

73 ... bytes=False, 

74 ... keep_ends=False, 

75 ... no_eval=False, 

76 ... actions=("map", "int", "collect", "builtins.sum") 

77 ... )) 

78 1324 

79 >>> print("\\n".join(err.getvalue().split("\\n")[-2:])) 

80 import builtins,sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).collect(builtins.sum)) 

81 <BLANKLINE> 

82 >>> sys.stdin = io.StringIO("") 

83 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

84 ... ret = run_program(Options( 

85 ... debug=True, 

86 ... bytes=False, 

87 ... keep_ends=False, 

88 ... no_eval=False, 

89 ... actions=("map", "int", "(°_°)") 

90 ... )) 

91 >>> assert not err.getvalue() 

92 >>> assert isinstance(ret, str) 

93 >>> assert "SyntaxError" in ret 

94 >>> assert "(°_°)" in ret 

95 >>> sys.stdin = io.StringIO("") 

96 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

97 ... ret = run_program(Options( 

98 ... debug=True, 

99 ... bytes=False, 

100 ... keep_ends=False, 

101 ... no_eval=False, 

102 ... actions=("map", "xxx") 

103 ... )) 

104 >>> assert not err.getvalue() 

105 >>> assert isinstance(ret, str) 

106 >>> assert "NameError" in ret 

107 >>> assert "xxx" in ret 

108 >>> sys.stdin = io.StringIO("") 

109 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

110 ... ret = run_program(Options( 

111 ... debug=True, 

112 ... bytes=False, 

113 ... keep_ends=False, 

114 ... no_eval=True, 

115 ... actions=("map", "xxx") 

116 ... )) 

117 >>> assert not err.getvalue() 

118 >>> print(ret) 

119 Can't parse 'xxx' without eval. 

120 >>> sys.stdin = io.StringIO("") 

121 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

122 ... ret = run_program(Options( 

123 ... debug=True, 

124 ... bytes=False, 

125 ... keep_ends=False, 

126 ... no_eval=True, 

127 ... actions=("map", "int", "collect", "sum") 

128 ... )) 

129 >>> assert not err.getvalue() 

130 >>> print(ret) 

131 StreamableSequence object has no attribute 'sum'. \ 

132To pass it as argument to Stream.collect use 'builtins.sum'. 

133 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"200\\n1000\\n30\\n4")) 

134 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

135 ... run_program(Options( 

136 ... debug=True, 

137 ... bytes=True, 

138 ... keep_ends=True, 

139 ... no_eval=True, 

140 ... actions=("flat_map", "iter", "map", "hex", "collect", "Counter") 

141 ... )) 

142 {"0x32": 1, "0x30": 6, "0xa": 3, "0x31": 1, "0x33": 1, "0x34": 1} 

143 >>> print("\\n".join(err.getvalue().split("\\n")[-2:])) 

144 import collections,json,sys,typed_stream;print(json.dumps(dict(typed_stream.Stream(sys.stdin.buffer).flat_map(iter).map(hex).collect(collections.Counter)))) 

145 <BLANKLINE> 

146 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"1\\n2\\n3\\n4")) 

147 >>> with contextlib.redirect_stderr(io.StringIO()) as err: 

148 ... run_program(Options( 

149 ... debug=True, 

150 ... bytes=False, 

151 ... keep_ends=True, 

152 ... no_eval=True, 

153 ... actions=("map", "int", "filter", "is_even", "map", "mul", "10") 

154 ... )) 

155 20 

156 40 

157 >>> f"\\n{err.getvalue()}".endswith( 

158 ... "Stream(sys.stdin).map(int).filter(typed_stream.functions.is_even)" 

159 ... ".map(operator.mul,10).for_each(print)\\n" 

160 ... ) 

161 True 

162 >>> sys.stdin = in_ 

163 """ # noqa: D301 

164 imports: set[str] = {"typed_stream", "sys"} 

165 code: list[str] 

166 stream: Stream[bytes] | Stream[str] | object 

167 if options.bytes: 

168 bytes_stream = Stream(sys.stdin.buffer) 

169 code = ["typed_stream.Stream(sys.stdin.buffer)"] 

170 if not options.keep_ends: 

171 code.append(r""".map(bytes.removesuffix, b"\n")""") 

172 bytes_stream = bytes_stream.map(bytes.removesuffix, b"\n") 

173 stream = bytes_stream 

174 else: 

175 str_stream = Stream(sys.stdin) 

176 code = ["typed_stream.Stream(sys.stdin)"] 

177 if not options.keep_ends: 

178 code.append(r""".map(str.removesuffix, "\n")""") 

179 str_stream = str_stream.map(str.removesuffix, "\n") 

180 stream = str_stream 

181 

182 method: None | Callable[[object], object] = None 

183 args: list[object] = [] 

184 for index, action in Stream(options.actions).enumerate(1): 

185 if action.lstrip().startswith("_"): 

186 return f"{index}: {action!r} isn't allowed to start with '_'." 

187 args_left = ( 

188 count_required_positional_arguments(method) - len(args) 

189 if method 

190 else 0 

191 ) 

192 if (not args_left or args_left < 0) and hasattr(stream, action): 

193 if method: 

194 stream = method(*args) 

195 args.clear() 

196 if code and code[-1] == ",": 

197 code[-1] = ")" 

198 else: 

199 code.append(")") 

200 if not hasattr(stream, action): 

201 type_name = ( 

202 type(stream).__qualname__ or type(stream).__name__ 

203 ) 

204 meth_name = method.__qualname__ or method.__name__ 

205 if hasattr(builtins, action): 

206 fix = f"builtins.{action}" 

207 confident = True 

208 else: 

209 fix = f"({action})" 

210 confident = action in EVAL_GLOBALS 

211 use = "use" if confident else "try" 

212 return ( 

213 f"{type_name} object has no attribute {action!r}. " 

214 f"To pass it as argument to {meth_name} {use} {fix!r}." 

215 ) 

216 method = getattr(stream, action) 

217 code.append(f".{action}(") 

218 else: 

219 if not method: 

220 return f"{action!r} needs to be a Stream method." 

221 full_action_qual: str 

222 if action.isspace(): 

223 args.append(action) 

224 full_action_qual = repr(action) 

225 elif action.isdigit(): 

226 args.append(int(action)) 

227 full_action_qual = action 

228 elif action in functions.__all__: 

229 args.append(getattr(functions, action)) 

230 full_action_qual = f"typed_stream.functions.{action}" # TODO?? 

231 elif action in collections.__all__: 

232 args.append(getattr(collections, action)) 

233 full_action_qual = f"collections.{action}" 

234 imports.add("collections") 

235 elif action in operator.__all__: 

236 args.append(getattr(operator, action)) 

237 full_action_qual = f"operator.{action}" 

238 imports.add("operator") 

239 elif hasattr(builtins, action): 

240 args.append(getattr(builtins, action)) 

241 full_action_qual = f"{action}" 

242 elif options.no_eval: 

243 return f"Can't parse {action!r} without eval." 

244 else: 

245 try: 

246 # pylint: disable-next=eval-used 

247 arg = eval(action, dict(EVAL_GLOBALS)) # nosec: B307 

248 # pylint: disable-next=broad-except 

249 except BaseException as exc: # noqa: B036 

250 err = traceback.format_exception_only(exc)[-1].strip() 

251 return f"Failed to evaluate {action!r}: {err}" 

252 

253 imports.update( 

254 { 

255 full 

256 for mod, full in MODULE_FULL_NAMES.items() 

257 if action.startswith(f"{mod}.") 

258 } 

259 ) 

260 args.append(arg) 

261 full_action_qual = action 

262 code.extend((full_action_qual, ",")) 

263 if method: 

264 if code and code[-1] == ",": 

265 code[-1] = ")" 

266 else: 

267 code.append(")") 

268 stream = method(*args) 

269 

270 if isinstance(stream, Stream): 

271 # pytype: disable=attribute-error 

272 stream.for_each(print) 

273 # pytype: enable=attribute-error 

274 code.append(".for_each(print)") 

275 elif isinstance(stream, Mapping): 

276 imports.add("json") 

277 print(json.dumps(dict(stream))) 

278 code.insert(0, "print(json.dumps(dict(") 

279 code.append(")))") 

280 elif stream: 

281 print(stream) 

282 code.insert(0, "print(") 

283 code.append(")") 

284 

285 code.insert(0, f"import {','.join(sorted(imports))};") 

286 

287 sys.stdout.flush() 

288 

289 if options.debug: 

290 print("".join(code), file=sys.stderr, flush=True) 

291 return None 

292 

293 

294def main() -> str | None: # noqa: C901 

295 """Parse arguments and then run the program.""" 

296 arg_parser = argparse.ArgumentParser( 

297 prog="typed_stream", 

298 description="Easy interface for streamy handling of files.", 

299 epilog="Do not run this with arguments from an untrusted source.", 

300 ) 

301 arg_parser.add_argument("--debug", action="store_true") 

302 arg_parser.add_argument("--bytes", action="store_true") 

303 arg_parser.add_argument("--keep-ends", action="store_true") 

304 # arg_parser.add_argument("--no-eval", action="store_true") 

305 arg_parser.add_argument("actions", nargs="+") 

306 

307 args = arg_parser.parse_args() 

308 options = Options( 

309 debug=bool(args.debug), 

310 bytes=bool(args.bytes), 

311 keep_ends=bool(args.keep_ends), 

312 no_eval=False, 

313 actions=tuple(map(str, args.actions)), 

314 ) 

315 if options.actions and options.actions[0] == "help": 

316 if not (methods := options.actions[1:]): 

317 arg_parser.parse_args([sys.argv[0], "--help"]) 

318 

319 for i, name in enumerate(methods): 

320 if i: 

321 print() 

322 print(f"Stream.{name}:") 

323 if not (method := getattr(Stream, name, None)): 

324 to_print = "Does not exist." 

325 elif not (doc := cast(str, getattr(method, "__doc__", ""))): 

326 to_print = "No docs." 

327 else: 

328 to_print = inspect.cleandoc(doc) 

329 

330 print(textwrap.indent(to_print, " " * 4)) 

331 return None 

332 

333 return run_program(options) 

334 

335 

336if __name__ == "__main__": 

337 sys.exit(main())