Coverage for typed_stream / __main__.py: 79%
145 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 21:20 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 21:20 +0000
1# Licensed under the EUPL-1.2 or later.
2# You may obtain a copy of the licence in all the official languages of the
3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
5"""Easy interface for streamy handling of files."""
7from __future__ import annotations
9import argparse
10import builtins
11import collections
12import dataclasses
13import inspect
14import json
15import operator
16import sys
17import textwrap
18import traceback
19from collections.abc import Callable, Mapping
20from typing import cast
22from ._impl import Stream, functions
23from ._impl._utils import count_required_positional_arguments
25MODULES = (builtins, collections, functions, operator)
26MODULE_FULL_NAMES: Mapping[str, str] = {
27 mod.__name__: (
28 "typed_stream.functions" if mod is functions else mod.__name__
29 )
30 for mod in MODULES
31}
32EVAL_GLOBALS: Mapping[str, object] = dict(
33 Stream(MODULES[1:])
34 .flat_map(lambda mod: ((name, getattr(mod, name)) for name in mod.__all__))
35 .chain((mod.__name__, mod) for mod in MODULES)
36)
39@dataclasses.dataclass(frozen=True, slots=True)
40class Options:
41 """The options for this cool program."""
43 debug: bool
44 bytes: bool
45 keep_ends: bool
46 no_eval: bool
47 actions: tuple[str, ...]
50def run_program(options: Options) -> str | None: # noqa: C901
51 # pylint: disable=too-complex, too-many-branches, too-many-locals, too-many-statements, line-too-long
52 """Run the program with the options.
54 >>> import contextlib, io
55 >>> in_ = sys.stdin
56 >>> sys.stdin = io.StringIO("200\\n1000\\n30\\n4")
57 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
58 ... run_program(Options(
59 ... debug=True,
60 ... bytes=False,
61 ... keep_ends=False,
62 ... no_eval=False,
63 ... actions=("map", "int", "sum"),
64 ... ))
65 1234
66 >>> print("\\n".join(err.getvalue().split("\\n")[-2:]))
67 import sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).sum())
68 <BLANKLINE>
69 >>> sys.stdin = io.StringIO("300\\n1000\\n20\\n4")
70 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
71 ... run_program(Options(
72 ... debug=True,
73 ... bytes=False,
74 ... keep_ends=False,
75 ... no_eval=False,
76 ... actions=("map", "int", "collect", "builtins.sum")
77 ... ))
78 1324
79 >>> print("\\n".join(err.getvalue().split("\\n")[-2:]))
80 import builtins,sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).collect(builtins.sum))
81 <BLANKLINE>
82 >>> sys.stdin = io.StringIO("")
83 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
84 ... ret = run_program(Options(
85 ... debug=True,
86 ... bytes=False,
87 ... keep_ends=False,
88 ... no_eval=False,
89 ... actions=("map", "int", "(°_°)")
90 ... ))
91 >>> assert not err.getvalue()
92 >>> assert isinstance(ret, str)
93 >>> assert "SyntaxError" in ret
94 >>> assert "(°_°)" in ret
95 >>> sys.stdin = io.StringIO("")
96 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
97 ... ret = run_program(Options(
98 ... debug=True,
99 ... bytes=False,
100 ... keep_ends=False,
101 ... no_eval=False,
102 ... actions=("map", "xxx")
103 ... ))
104 >>> assert not err.getvalue()
105 >>> assert isinstance(ret, str)
106 >>> assert "NameError" in ret
107 >>> assert "xxx" in ret
108 >>> sys.stdin = io.StringIO("")
109 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
110 ... ret = run_program(Options(
111 ... debug=True,
112 ... bytes=False,
113 ... keep_ends=False,
114 ... no_eval=True,
115 ... actions=("map", "xxx")
116 ... ))
117 >>> assert not err.getvalue()
118 >>> print(ret)
119 Can't parse 'xxx' without eval.
120 >>> sys.stdin = io.StringIO("")
121 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
122 ... ret = run_program(Options(
123 ... debug=True,
124 ... bytes=False,
125 ... keep_ends=False,
126 ... no_eval=True,
127 ... actions=("map", "int", "collect", "sum")
128 ... ))
129 >>> assert not err.getvalue()
130 >>> print(ret)
131 StreamableSequence object has no attribute 'sum'. \
132To pass it as argument to Stream.collect use 'builtins.sum'.
133 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"200\\n1000\\n30\\n4"))
134 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
135 ... run_program(Options(
136 ... debug=True,
137 ... bytes=True,
138 ... keep_ends=True,
139 ... no_eval=True,
140 ... actions=("flat_map", "iter", "map", "hex", "collect", "Counter")
141 ... ))
142 {"0x32": 1, "0x30": 6, "0xa": 3, "0x31": 1, "0x33": 1, "0x34": 1}
143 >>> print("\\n".join(err.getvalue().split("\\n")[-2:]))
144 import collections,json,sys,typed_stream;print(json.dumps(dict(typed_stream.Stream(sys.stdin.buffer).flat_map(iter).map(hex).collect(collections.Counter))))
145 <BLANKLINE>
146 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"1\\n2\\n3\\n4"))
147 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
148 ... run_program(Options(
149 ... debug=True,
150 ... bytes=False,
151 ... keep_ends=True,
152 ... no_eval=True,
153 ... actions=("map", "int", "filter", "is_even", "map", "mul", "10")
154 ... ))
155 20
156 40
157 >>> f"\\n{err.getvalue()}".endswith(
158 ... "Stream(sys.stdin).map(int).filter(typed_stream.functions.is_even)"
159 ... ".map(operator.mul,10).for_each(print)\\n"
160 ... )
161 True
162 >>> sys.stdin = in_
163 """ # noqa: D301
164 imports: set[str] = {"typed_stream", "sys"}
165 code: list[str]
166 stream: Stream[bytes] | Stream[str] | object
167 if options.bytes:
168 bytes_stream = Stream(sys.stdin.buffer)
169 code = ["typed_stream.Stream(sys.stdin.buffer)"]
170 if not options.keep_ends:
171 code.append(r""".map(bytes.removesuffix, b"\n")""")
172 bytes_stream = bytes_stream.map(bytes.removesuffix, b"\n")
173 stream = bytes_stream
174 else:
175 str_stream = Stream(sys.stdin)
176 code = ["typed_stream.Stream(sys.stdin)"]
177 if not options.keep_ends:
178 code.append(r""".map(str.removesuffix, "\n")""")
179 str_stream = str_stream.map(str.removesuffix, "\n")
180 stream = str_stream
182 method: None | Callable[[object], object] = None
183 args: list[object] = []
184 for index, action in Stream(options.actions).enumerate(1):
185 if action.lstrip().startswith("_"):
186 return f"{index}: {action!r} isn't allowed to start with '_'."
187 args_left = (
188 count_required_positional_arguments(method) - len(args)
189 if method
190 else 0
191 )
192 if (not args_left or args_left < 0) and hasattr(stream, action):
193 if method:
194 stream = method(*args)
195 args.clear()
196 if code and code[-1] == ",":
197 code[-1] = ")"
198 else:
199 code.append(")")
200 if not hasattr(stream, action):
201 type_name = (
202 type(stream).__qualname__ or type(stream).__name__
203 )
204 meth_name = method.__qualname__ or method.__name__
205 if hasattr(builtins, action):
206 fix = f"builtins.{action}"
207 confident = True
208 else:
209 fix = f"({action})"
210 confident = action in EVAL_GLOBALS
211 use = "use" if confident else "try"
212 return (
213 f"{type_name} object has no attribute {action!r}. "
214 f"To pass it as argument to {meth_name} {use} {fix!r}."
215 )
216 method = getattr(stream, action)
217 code.append(f".{action}(")
218 else:
219 if not method:
220 return f"{action!r} needs to be a Stream method."
221 full_action_qual: str
222 if action.isspace():
223 args.append(action)
224 full_action_qual = repr(action)
225 elif action.isdigit():
226 args.append(int(action))
227 full_action_qual = action
228 elif action in functions.__all__:
229 args.append(getattr(functions, action))
230 full_action_qual = f"typed_stream.functions.{action}" # TODO??
231 elif action in collections.__all__:
232 args.append(getattr(collections, action))
233 full_action_qual = f"collections.{action}"
234 imports.add("collections")
235 elif action in operator.__all__:
236 args.append(getattr(operator, action))
237 full_action_qual = f"operator.{action}"
238 imports.add("operator")
239 elif hasattr(builtins, action):
240 args.append(getattr(builtins, action))
241 full_action_qual = f"{action}"
242 elif options.no_eval:
243 return f"Can't parse {action!r} without eval."
244 else:
245 try:
246 # pylint: disable-next=eval-used
247 arg = eval(action, dict(EVAL_GLOBALS)) # nosec: B307
248 # pylint: disable-next=broad-except
249 except BaseException as exc: # noqa: B036
250 err = traceback.format_exception_only(exc)[-1].strip()
251 return f"Failed to evaluate {action!r}: {err}"
253 imports.update(
254 {
255 full
256 for mod, full in MODULE_FULL_NAMES.items()
257 if action.startswith(f"{mod}.")
258 }
259 )
260 args.append(arg)
261 full_action_qual = action
262 code.extend((full_action_qual, ","))
263 if method:
264 if code and code[-1] == ",":
265 code[-1] = ")"
266 else:
267 code.append(")")
268 stream = method(*args)
270 if isinstance(stream, Stream):
271 # pytype: disable=attribute-error
272 stream.for_each(print)
273 # pytype: enable=attribute-error
274 code.append(".for_each(print)")
275 elif isinstance(stream, Mapping):
276 imports.add("json")
277 print(json.dumps(dict(stream)))
278 code.insert(0, "print(json.dumps(dict(")
279 code.append(")))")
280 elif stream:
281 print(stream)
282 code.insert(0, "print(")
283 code.append(")")
285 code.insert(0, f"import {','.join(sorted(imports))};")
287 sys.stdout.flush()
289 if options.debug:
290 print("".join(code), file=sys.stderr, flush=True)
291 return None
294def main() -> str | None: # noqa: C901
295 """Parse arguments and then run the program."""
296 arg_parser = argparse.ArgumentParser(
297 prog="typed_stream",
298 description="Easy interface for streamy handling of files.",
299 epilog="Do not run this with arguments from an untrusted source.",
300 )
301 arg_parser.add_argument("--debug", action="store_true")
302 arg_parser.add_argument("--bytes", action="store_true")
303 arg_parser.add_argument("--keep-ends", action="store_true")
304 # arg_parser.add_argument("--no-eval", action="store_true")
305 arg_parser.add_argument("actions", nargs="+")
307 args = arg_parser.parse_args()
308 options = Options(
309 debug=bool(args.debug),
310 bytes=bool(args.bytes),
311 keep_ends=bool(args.keep_ends),
312 no_eval=False,
313 actions=tuple(map(str, args.actions)),
314 )
315 if options.actions and options.actions[0] == "help":
316 if not (methods := options.actions[1:]):
317 arg_parser.parse_args([sys.argv[0], "--help"])
319 for i, name in enumerate(methods):
320 if i:
321 print()
322 print(f"Stream.{name}:")
323 if not (method := getattr(Stream, name, None)):
324 to_print = "Does not exist."
325 elif not (doc := cast(str, getattr(method, "__doc__", ""))):
326 to_print = "No docs."
327 else:
328 to_print = inspect.cleandoc(doc)
330 print(textwrap.indent(to_print, " " * 4))
331 return None
333 return run_program(options)
336if __name__ == "__main__":
337 sys.exit(main())