Coverage for typed_stream/__main__.py: 78%
143 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-08 22:58 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-08 22:58 +0000
1# Licensed under the EUPL-1.2 or later.
2# You may obtain a copy of the licence in all the official languages of the
3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
5"""Easy interface for streamy handling of files."""
7from __future__ import annotations
9import argparse
10import builtins
11import collections
12import dataclasses
13import inspect
14import json
15import operator
16import sys
17import textwrap
18import traceback
19from collections.abc import Callable, Mapping
20from typing import cast
22from ._impl import Stream, functions
23from ._impl._utils import count_required_positional_arguments
25MODULES = (builtins, collections, functions, operator)
26MODULE_FULL_NAMES: Mapping[str, str] = {
27 mod.__name__: (
28 "typed_stream.functions" if mod is functions else mod.__name__
29 )
30 for mod in MODULES
31}
32EVAL_GLOBALS: Mapping[str, object] = dict(
33 Stream(MODULES[1:])
34 .flat_map(lambda mod: ((name, getattr(mod, name)) for name in mod.__all__))
35 .chain((mod.__name__, mod) for mod in MODULES)
36)
39@dataclasses.dataclass(frozen=True, slots=True)
40class Options:
41 """The options for this cool program."""
43 debug: bool
44 bytes: bool
45 keep_ends: bool
46 no_eval: bool
47 actions: tuple[str, ...]
50def run_program(options: Options) -> str | None: # noqa: C901
51 # pylint: disable=too-complex, too-many-branches, too-many-locals, too-many-statements, line-too-long
52 """Run the program with the options.
54 >>> import contextlib, io
55 >>> in_ = sys.stdin
56 >>> sys.stdin = io.StringIO("200\\n1000\\n30\\n4")
57 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
58 ... run_program(Options(
59 ... debug=True,
60 ... bytes=False,
61 ... keep_ends=False,
62 ... no_eval=False,
63 ... actions=("map", "int", "sum"),
64 ... ))
65 1234
66 >>> print("\\n".join(err.getvalue().split("\\n")[-2:]))
67 import sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).sum())
68 <BLANKLINE>
69 >>> sys.stdin = io.StringIO("300\\n1000\\n20\\n4")
70 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
71 ... run_program(Options(
72 ... debug=True,
73 ... bytes=False,
74 ... keep_ends=False,
75 ... no_eval=False,
76 ... actions=("map", "int", "collect", "builtins.sum")
77 ... ))
78 1324
79 >>> print("\\n".join(err.getvalue().split("\\n")[-2:]))
80 import builtins,sys,typed_stream;print(typed_stream.Stream(sys.stdin).map(str.removesuffix, "\\n").map(int).collect(builtins.sum))
81 <BLANKLINE>
82 >>> sys.stdin = io.StringIO("")
83 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
84 ... ret = run_program(Options(
85 ... debug=True,
86 ... bytes=False,
87 ... keep_ends=False,
88 ... no_eval=False,
89 ... actions=("map", "int", "(°_°)")
90 ... ))
91 >>> assert not err.getvalue()
92 >>> assert isinstance(ret, str)
93 >>> assert "SyntaxError" in ret
94 >>> assert "(°_°)" in ret
95 >>> sys.stdin = io.StringIO("")
96 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
97 ... ret = run_program(Options(
98 ... debug=True,
99 ... bytes=False,
100 ... keep_ends=False,
101 ... no_eval=False,
102 ... actions=("map", "xxx")
103 ... ))
104 >>> assert not err.getvalue()
105 >>> assert isinstance(ret, str)
106 >>> assert "NameError" in ret
107 >>> assert "xxx" in ret
108 >>> sys.stdin = io.StringIO("")
109 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
110 ... ret = run_program(Options(
111 ... debug=True,
112 ... bytes=False,
113 ... keep_ends=False,
114 ... no_eval=True,
115 ... actions=("map", "xxx")
116 ... ))
117 >>> assert not err.getvalue()
118 >>> print(ret)
119 Can't parse 'xxx' without eval.
120 >>> sys.stdin = io.StringIO("")
121 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
122 ... ret = run_program(Options(
123 ... debug=True,
124 ... bytes=False,
125 ... keep_ends=False,
126 ... no_eval=True,
127 ... actions=("map", "int", "collect", "sum")
128 ... ))
129 >>> assert not err.getvalue()
130 >>> print(ret)
131 StreamableSequence object has no attribute 'sum'. \
132To pass it as argument to Stream.collect use 'builtins.sum'.
133 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"200\\n1000\\n30\\n4"))
134 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
135 ... run_program(Options(
136 ... debug=True,
137 ... bytes=True,
138 ... keep_ends=True,
139 ... no_eval=True,
140 ... actions=("flat_map", "iter", "map", "hex", "collect", "Counter")
141 ... ))
142 {"0x32": 1, "0x30": 6, "0xa": 3, "0x31": 1, "0x33": 1, "0x34": 1}
143 >>> print("\\n".join(err.getvalue().split("\\n")[-2:]))
144 import collections,json,sys,typed_stream;print(json.dumps(dict(typed_stream.Stream(sys.stdin.buffer).flat_map(iter).map(hex).collect(collections.Counter))))
145 <BLANKLINE>
146 >>> sys.stdin = io.TextIOWrapper(io.BytesIO(b"1\\n2\\n3\\n4"))
147 >>> with contextlib.redirect_stderr(io.StringIO()) as err:
148 ... run_program(Options(
149 ... debug=True,
150 ... bytes=False,
151 ... keep_ends=True,
152 ... no_eval=True,
153 ... actions=("map", "int", "filter", "is_even", "map", "mul", "10")
154 ... ))
155 20
156 40
157 >>> f"\\n{err.getvalue()}".endswith(
158 ... "Stream(sys.stdin).map(int).filter(typed_stream.functions.is_even)"
159 ... ".map(operator.mul,10).for_each(print)\\n"
160 ... )
161 True
162 >>> sys.stdin = in_
163 """ # noqa: D301
164 imports: set[str] = {"typed_stream", "sys"}
165 code: list[str]
166 stream: Stream[bytes] | Stream[str] | object
167 if options.bytes:
168 stream = Stream(sys.stdin.buffer)
169 code = ["typed_stream.Stream(sys.stdin.buffer)"]
170 if not options.keep_ends:
171 code.append(r""".map(bytes.removesuffix, b"\n")""")
172 stream = stream.map(bytes.removesuffix, b"\n")
173 else:
174 stream = Stream(sys.stdin)
175 code = ["typed_stream.Stream(sys.stdin)"]
176 if not options.keep_ends:
177 code.append(r""".map(str.removesuffix, "\n")""")
178 stream = stream.map(str.removesuffix, "\n")
180 method: None | Callable[[object], object] = None
181 args: list[object] = []
182 for index, action in Stream(options.actions).enumerate(1):
183 if action.lstrip().startswith("_"):
184 return f"{index}: {action!r} isn't allowed to start with '_'."
185 args_left = (
186 count_required_positional_arguments(method) - len(args)
187 if method
188 else 0
189 )
190 if (not args_left or args_left < 0) and hasattr(stream, action):
191 if method:
192 stream = method(*args) # pylint: disable=not-callable
193 args.clear()
194 if code and code[-1] == ",":
195 code[-1] = ")"
196 else:
197 code.append(")")
198 if not hasattr(stream, action):
199 type_name = (
200 type(stream).__qualname__ or type(stream).__name__
201 )
202 meth_name = method.__qualname__ or method.__name__
203 if hasattr(builtins, action):
204 fix = f"builtins.{action}"
205 confident = True
206 else:
207 fix = f"({action})"
208 confident = action in EVAL_GLOBALS
209 use = "use" if confident else "try"
210 return (
211 f"{type_name} object has no attribute {action!r}. "
212 f"To pass it as argument to {meth_name} {use} {fix!r}."
213 )
214 method = getattr(stream, action)
215 code.append(f".{action}(")
216 else:
217 if not method:
218 return f"{action!r} needs to be a Stream method."
219 full_action_qual: str
220 if action.isspace():
221 args.append(action)
222 full_action_qual = repr(action)
223 elif action.isdigit():
224 args.append(int(action))
225 full_action_qual = action
226 elif action in functions.__all__:
227 args.append(getattr(functions, action))
228 full_action_qual = f"typed_stream.functions.{action}" # TODO??
229 elif action in collections.__all__:
230 args.append(getattr(collections, action))
231 full_action_qual = f"collections.{action}"
232 imports.add("collections")
233 elif action in operator.__all__:
234 args.append(getattr(operator, action))
235 full_action_qual = f"operator.{action}"
236 imports.add("operator")
237 elif hasattr(builtins, action):
238 args.append(getattr(builtins, action))
239 full_action_qual = f"{action}"
240 elif options.no_eval:
241 return f"Can't parse {action!r} without eval."
242 else:
243 try:
244 # pylint: disable-next=eval-used
245 arg = eval(action, dict(EVAL_GLOBALS)) # nosec: B307
246 # pylint: disable-next=broad-except
247 except BaseException as exc: # noqa: B036
248 err = traceback.format_exception_only(exc)[-1].strip()
249 return f"Failed to evaluate {action!r}: {err}"
251 imports.update(
252 {
253 full
254 for mod, full in MODULE_FULL_NAMES.items()
255 if action.startswith(f"{mod}.")
256 }
257 )
258 args.append(arg)
259 full_action_qual = action
260 code.extend((full_action_qual, ","))
261 if method:
262 if code and code[-1] == ",":
263 code[-1] = ")"
264 else:
265 code.append(")")
266 stream = method(*args)
268 if isinstance(stream, Stream):
269 # pytype: disable=attribute-error
270 stream.for_each(print)
271 # pytype: enable=attribute-error
272 code.append(".for_each(print)")
273 elif isinstance(stream, Mapping):
274 imports.add("json")
275 print(json.dumps(dict(stream)))
276 code.insert(0, "print(json.dumps(dict(")
277 code.append(")))")
278 elif stream:
279 print(stream)
280 code.insert(0, "print(")
281 code.append(")")
283 code.insert(0, f"import {','.join(sorted(imports))};")
285 sys.stdout.flush()
287 if options.debug:
288 print("".join(code), file=sys.stderr, flush=True)
289 return None
292def main() -> str | None: # noqa: C901
293 """Parse arguments and then run the program."""
294 arg_parser = argparse.ArgumentParser(
295 prog="typed_stream",
296 description="Easy interface for streamy handling of files.",
297 epilog="Do not run this with arguments from an untrusted source.",
298 )
299 arg_parser.add_argument("--debug", action="store_true")
300 arg_parser.add_argument("--bytes", action="store_true")
301 arg_parser.add_argument("--keep-ends", action="store_true")
302 # arg_parser.add_argument("--no-eval", action="store_true")
303 arg_parser.add_argument("actions", nargs="+")
305 args = arg_parser.parse_args()
306 options = Options(
307 debug=bool(args.debug),
308 bytes=bool(args.bytes),
309 keep_ends=bool(args.keep_ends),
310 no_eval=False,
311 actions=tuple(map(str, args.actions)),
312 )
313 if options.actions and options.actions[0] == "help":
314 if not (methods := options.actions[1:]):
315 arg_parser.parse_args([sys.argv[0], "--help"])
317 for i, name in enumerate(methods):
318 if i:
319 print()
320 print(f"Stream.{name}:")
321 if not (method := getattr(Stream, name, None)):
322 to_print = "Does not exist."
323 elif not (doc := cast(str, getattr(method, "__doc__", ""))):
324 to_print = "No docs."
325 else:
326 to_print = inspect.cleandoc(doc)
328 print(textwrap.indent(to_print, " " * 4))
329 return None
331 return run_program(options)
334if __name__ == "__main__":
335 sys.exit(main())