Coverage for typed_stream/_impl/stream.py: 99%
331 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 21:24 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 21:24 +0000
1# Licensed under the EUPL-1.2 or later.
2# You may obtain a copy of the licence in all the official languages of the
3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
5"""Typed Stream class for easier handling of iterables."""
7from __future__ import annotations
9import collections
10import concurrent.futures
11import functools
12import itertools
13import operator
14import sys
15import typing
16from collections.abc import Callable, Iterable, Iterator, Mapping
17from numbers import Number, Real
18from types import EllipsisType
20from .. import exceptions, streamable
21from . import _iteration_utils, functions, stream_abc
22from ._default_value import DEFAULT_VALUE as _DEFAULT_VALUE
23from ._default_value import DefaultValueType as _DefaultValueType
24from ._typing import Self, TypeVarTuple, Unpack, override
26# pylint: disable=too-many-lines
27__all__: tuple[typing.Literal["Stream"]] = ("Stream",)
29if typing.TYPE_CHECKING:
30 from . import _types, _utils
32K = typing.TypeVar("K")
33T = typing.TypeVar("T")
34U = typing.TypeVar("U")
35V = typing.TypeVar("V")
36Prim = typing.TypeVar("Prim", int, str, bool, complex, Number, Real)
37Exc = typing.TypeVar("Exc", bound=BaseException)
39SA = typing.TypeVar("SA", bound="_types.SupportsAdd")
40SC = typing.TypeVar("SC", bound="_types.SupportsComparison")
42Tvt = TypeVarTuple("Tvt")
44add: Callable[[SA, SA], SA] = operator.add
47class Stream(stream_abc.StreamABC[T], Iterable[T]):
48 """Typed Stream class for easier handling of iterables.
50 It is not recommended to store Stream instances in variables,
51 instead use method chaining to handle the values and collect them when finished.
52 """
54 __slots__ = ()
56 _data: Iterable[T]
58 def __init__(
59 self,
60 data: Iterable[T] | EllipsisType,
61 close_source_callable: Callable[[], None] | None = None,
62 ) -> None:
63 """Create a new Stream.
65 To create a finished Stream do Stream(...).
66 """
67 super().__init__(
68 ... if isinstance(data, EllipsisType) else data,
69 close_source_callable,
70 )
72 def __contains__(self, value: T, /) -> bool:
73 """Check whether this stream contains the given value.
75 >>> 2 in Stream((1, 2, 3))
76 True
77 >>> 4 in Stream((1, 2, 3))
78 False
79 >>> 3 in Stream((1, 2, 3, 4, 5, 6, 7, 8)).peek(print)
80 1
81 2
82 3
83 True
84 """
85 for element in self._data:
86 if element == value:
87 return self._finish(True, close_source=True)
88 return self._finish(False, close_source=True)
90 @typing.overload
91 def __getitem__(self, item: int, /) -> T:
92 """Nobody inspects the spammish repetition."""
94 @typing.overload
95 def __getitem__(
96 self, item: slice[int | None, int | None, int | None], / # noqa: W504
97 ) -> streamable.StreamableSequence[T]:
98 """Nobody inspects the spammish repetition."""
100 def __getitem__(
101 self,
102 item: slice[int | None, int | None, int | None] | int,
103 /, # noqa: W504
104 ) -> streamable.StreamableSequence[T] | T:
105 """Finish the stream by collecting.
107 >>> Stream((1, 2, 3))[1]
108 2
109 >>> Stream((1, 2, 3))[1:3]
110 (2, 3)
111 """
112 if isinstance(item, int):
113 return self.nth(item)
114 return self._get_slice(start=item.start, stop=item.stop, step=item.step)
116 @override
117 def __iter__(self) -> Iterator[T]:
118 """Iterate over the values of this Stream. This finishes the Stream.
120 >>> for value in Stream((1, 2, 3)):
121 ... print(value)
122 1
123 2
124 3
125 """
126 return _iteration_utils.IterWithCleanUp(self._data, self.close)
128 def __length_hint__(self) -> int:
129 """Return an estimated length for this Stream.
131 >>> from operator import length_hint
132 >>> length_hint(Stream([1, 2, 3]))
133 3
134 >>> length_hint(Stream.range(100))
135 100
136 """
137 return operator.length_hint(self._data)
139 def __reversed__(self) -> Iterator[T]:
140 """Return the items of this Stream in reversed order.
142 This finishes the Stream and collects all the element.
144 Equivalent to reversed(self.collect()).
146 >>> tuple(reversed(Stream((1, 2, 3))))
147 (3, 2, 1)
148 >>> "".join(reversed(Stream("abc")))
149 'cba'
150 """
151 return reversed(self.collect())
153 def _get_slice( # noqa: C901
154 self,
155 *,
156 start: int | None = None,
157 stop: int | None = None,
158 step: int | None = None,
159 ) -> streamable.StreamableSequence[T]:
160 """Implement __getitem__ with slices."""
161 if start is stop is step is None:
162 return self.collect()
163 if ( # pylint: disable=too-many-boolean-expressions
164 (start is None or start >= 0)
165 and (step is None or step >= 0)
166 and (stop is None or stop >= 0)
167 ):
168 return self._finish(
169 streamable.StreamableSequence(
170 itertools.islice(self._data, start, stop, step)
171 ),
172 close_source=True,
173 )
174 if (
175 start is not None
176 and start < 0
177 and step in {None, 1}
178 and stop is None
179 ):
180 return self.tail(abs(start))
181 return self.collect()[start:stop:step]
183 @classmethod
184 @override
185 def _module(cls) -> str:
186 if cls == Stream:
187 return "typed_stream"
188 return cls.__module__
190 @staticmethod
191 def counting(start: int = 0, step: int = 1) -> Stream[int]:
192 """Create an endless counting Stream.
194 >>> Stream.counting().limit(5).collect()
195 (0, 1, 2, 3, 4)
196 >>> Stream.counting(5, 2).limit(5).collect()
197 (5, 7, 9, 11, 13)
198 """
199 return Stream(itertools.count(start, step))
201 @staticmethod
202 def from_value(value: K) -> Stream[K]:
203 """Create an endless Stream of the same value.
205 >>> Stream.from_value(1).limit(5).collect()
206 (1, 1, 1, 1, 1)
207 """
208 return Stream(itertools.repeat(value))
210 @typing.overload
211 @staticmethod
212 def range(stop: int, /) -> Stream[int]: ... # noqa: D102
214 @typing.overload
215 @staticmethod
216 def range(*, stop: int) -> Stream[int]: ... # noqa: D102
218 @typing.overload
219 @staticmethod
220 def range(*, start: int, stop: int) -> Stream[int]: ... # noqa: D102
222 @typing.overload
223 @staticmethod
224 def range(start: int, stop: int, /) -> Stream[int]: ... # noqa: D102
226 @typing.overload
227 @staticmethod
228 def range(start: int, /, *, stop: int) -> Stream[int]: ... # noqa: D102
230 @typing.overload
231 @staticmethod
232 def range( # noqa: D102
233 start: int, stop: int, /, *, step: int
234 ) -> Stream[int]: ...
236 @typing.overload
237 @staticmethod
238 def range( # noqa: D102
239 start: int, stop: int, step: int, / # noqa: W504
240 ) -> Stream[int]: ...
242 @typing.overload
243 @staticmethod
244 def range( # noqa: D102
245 start: int, /, *, stop: int, step: int
246 ) -> Stream[int]: ...
248 @typing.overload
249 @staticmethod
250 def range( # noqa: D102
251 *, start: int, stop: int, step: int
252 ) -> Stream[int]: ...
254 @staticmethod
255 def range( # noqa: C901
256 *args: int,
257 start: int | _DefaultValueType = _DEFAULT_VALUE,
258 stop: int | _DefaultValueType = _DEFAULT_VALUE,
259 step: int | _DefaultValueType = _DEFAULT_VALUE,
260 ) -> Stream[int]:
261 """Create a Stream[int] from a range.
263 The arguments behave like to the built-in range function:
264 - Stream.range(stop) -> Stream[int]
265 - Stream.range(start, stop[, step]) -> Stream[int]
267 >>> Stream.range(5).collect() == Stream(range(5)).collect()
268 True
269 >>> Stream.range(1, 13).collect() == Stream(range(1, 13)).collect()
270 True
271 >>> Stream.range(1, 9, 2).collect() == Stream(range(1, 9, 2)).collect()
272 True
273 >>> Stream.range(start=1, stop=7, step=2).collect()
274 (1, 3, 5)
275 """
276 # pylint: disable=confusing-consecutive-elif
277 if not isinstance(start, _DefaultValueType):
278 if not args and not isinstance(stop, _DefaultValueType):
279 return Stream(
280 range(start, stop)
281 if isinstance(step, _DefaultValueType)
282 else range(start, stop, step)
283 )
284 elif isinstance(stop, _DefaultValueType):
285 if isinstance(step, _DefaultValueType):
286 return Stream(range(*args)) # no kwarg given
287 if len(args) == 2:
288 return Stream(range(args[0], args[1], step))
289 elif isinstance(step, _DefaultValueType):
290 if len(args) == 1:
291 return Stream(range(args[0], stop))
292 if not args:
293 return Stream(range(stop))
294 elif len(args) == 1:
295 return Stream(range(args[0], stop, step))
296 raise TypeError("Unexpected arguments to Stream.range()")
298 def all(self) -> bool:
299 """Check whether all values are Truthy. This finishes the Stream.
301 Returns False if there is any false value in the Stream.
303 >>> Stream([1, 2, 3]).peek(print).all()
304 1
305 2
306 3
307 True
308 >>> Stream([1, 2, 0, 4, 5, 6, 7, 8]).peek(print).all()
309 1
310 2
311 0
312 False
313 >>> Stream([]).all()
314 True
315 """
316 return self._finish(all(self._data), close_source=True)
318 @typing.overload
319 def catch( # noqa: D102
320 self,
321 *exception_class: type[Exc],
322 ) -> Self: ...
324 @typing.overload
325 def catch( # noqa: D102
326 self,
327 *exception_class: type[Exc],
328 handler: Callable[[Exc], object],
329 ) -> Self: ...
331 @typing.overload
332 def catch( # noqa: D102
333 self,
334 *exception_class: type[Exc],
335 default: Callable[[Exc], K] | Callable[[], K],
336 ) -> Stream[T | K]: ...
338 @typing.overload
339 def catch( # noqa: D102
340 self,
341 *exception_class: type[Exc],
342 handler: Callable[[Exc], object],
343 default: Callable[[Exc], K] | Callable[[], K],
344 ) -> Stream[T | K]: ...
346 def catch(
347 self,
348 *exception_class: type[Exc],
349 handler: Callable[[Exc], object] | None = None,
350 default: Callable[[Exc], K] | Callable[[], K] | None = None,
351 ) -> Stream[T | K]:
352 """Catch exceptions.
354 >>> Stream("1a2").map(int).catch(ValueError, handler=print).collect()
355 invalid literal for int() with base 10: 'a'
356 (1, 2)
357 >>> Stream("1a2").map(int).catch(ValueError, default=lambda _:_).collect()
358 (1, ValueError("invalid literal for int() with base 10: 'a'"), 2)
359 >>> Stream("1a2").map(int).peek(print) \
360 .catch(ValueError, handler=print).collect()
361 1
362 invalid literal for int() with base 10: 'a'
363 2
364 (1, 2)
365 """
366 return self._finish(
367 Stream(
368 _iteration_utils.ExceptionHandler(
369 self._data, exception_class, handler, default
370 )
371 )
372 )
374 def chain(self, *iterables: Iterable[T]) -> Self:
375 """Add another iterable to the end of the Stream.
377 >>> Stream([1, 2, 3]).chain([4, 5, 6]).collect()
378 (1, 2, 3, 4, 5, 6)
379 >>> Stream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect()
380 (1, 2, 3, 4, 5, 6, 7, 8, 9)
381 >>> Stream("abc").chain("def", "ghi", "jkl").collect("".join)
382 'abcdefghijkl'
383 """
384 self._data = itertools.chain(self._data, *iterables)
385 return self
387 if sys.version_info >= (3, 12) and hasattr(itertools, "batched"):
389 def chunk(self, size: int) -> Stream[tuple[T, ...]]:
390 """Split the stream into chunks of the specified size.
392 The last chunk may be shorter.
394 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect()
395 ((1, 2), (3, 4), (5, 6))
396 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect()
397 ((1, 2, 3), (4, 5, 6))
398 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect()
399 ((1, 2, 3), (4, 5, 6), (7,))
400 """
401 return self._finish(
402 Stream(
403 # add strict=False if min version is 3.13 (is the default)
404 itertools.batched(self._data, size), # noqa: B911
405 )
406 )
408 else: # pragma: no cover
410 def chunk(self, size: int) -> Stream[tuple[T, ...]]:
411 """Split the stream into chunks of the specified size.
413 The last chunk may be shorter.
415 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect()
416 ((1, 2), (3, 4), (5, 6))
417 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect()
418 ((1, 2, 3), (4, 5, 6))
419 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect()
420 ((1, 2, 3), (4, 5, 6), (7,))
421 """
422 return self._finish(
423 _iteration_utils.Chunked(self._data, size).stream()
424 )
426 @typing.overload
427 def collect(self, /) -> streamable.StreamableSequence[T]: ... # noqa: D102
429 @typing.overload
430 def collect( # noqa: D102
431 self,
432 fun: Callable[[Iterable[T]], streamable.StreamableSequence[T]],
433 /,
434 ) -> streamable.StreamableSequence[T]: ...
436 @typing.overload
437 def collect( # noqa: D102
438 self,
439 fun: type[collections.Counter[T]],
440 /,
441 ) -> collections.Counter[T]: ...
443 @typing.overload
444 def collect( # noqa: D102
445 self, fun: Callable[[Iterable[T]], tuple[T, ...]], / # noqa: W504
446 ) -> tuple[T, ...]: ...
448 @typing.overload
449 def collect( # noqa: D102
450 self, fun: Callable[[Iterable[T]], list[T]], / # noqa: W504
451 ) -> list[T]: ...
453 @typing.overload
454 def collect( # noqa: D102
455 self: Stream[SA], fun: Callable[[Iterable[SA]], SA], / # noqa: W504
456 ) -> SA: ...
458 @typing.overload
459 def collect( # noqa: D102
460 self, fun: Callable[[Iterable[T]], set[T]], / # noqa: W504
461 ) -> set[T]: ...
463 @typing.overload
464 def collect( # noqa: D102
465 self, fun: Callable[[Iterable[T]], frozenset[T]], / # noqa: W504
466 ) -> frozenset[T]: ...
468 @typing.overload
469 def collect( # noqa: D102
470 self: Stream[tuple[K, V]],
471 fun: Callable[[Iterable[tuple[K, V]]], dict[K, V]],
472 /,
473 ) -> dict[K, V]: ...
475 @typing.overload
476 def collect( # noqa: D102
477 self: Stream[tuple[K, V]],
478 fun: Callable[[Iterable[tuple[K, V]]], Mapping[K, V]],
479 /,
480 ) -> Mapping[K, V]: ...
482 @typing.overload
483 def collect( # noqa: D102
484 self, fun: Callable[[Iterable[T]], K], / # noqa: W504
485 ) -> K: ...
487 def collect(
488 self: Stream[U],
489 fun: Callable[[Iterable[U]], object] = streamable.StreamableSequence,
490 /,
491 ) -> object:
492 """Collect the values of this Stream. This finishes the Stream.
494 >>> Stream([1, 2, 3]).collect(list)
495 [1, 2, 3]
496 >>> Stream([1, 2, 3]).collect(sum)
497 6
498 >>> Stream([1, 2, 3]).collect(dict.fromkeys)
499 {1: None, 2: None, 3: None}
500 >>> Stream([(1, 2), (3, 4)]).collect(dict)
501 {1: 2, 3: 4}
502 """
503 return self._finish(fun(self._data), close_source=True)
505 def concurrent_map(
506 self, fun: Callable[[T], K], /, max_workers: int | None = None
507 ) -> Stream[K]:
508 """Map values concurrently.
510 See: https://docs.python.org/3/library/concurrent.futures.html
512 >>> Stream("123").concurrent_map(int).collect()
513 (1, 2, 3)
514 """
515 with concurrent.futures.ProcessPoolExecutor(
516 max_workers=max_workers
517 ) as executor:
518 return self._finish(Stream(executor.map(fun, self._data)))
520 def conditional_map(
521 self,
522 condition: Callable[[T], object],
523 if_true: Callable[[T], U],
524 if_false: Callable[[T], V] | None = None,
525 ) -> Stream[U | V]:
526 """Map values conditionally.
528 >>> Stream("1x2x3x").conditional_map(str.isdigit, int).collect()
529 (1, 2, 3)
530 >>> Stream("1x2x3x").conditional_map(str.isdigit, int, ord).collect()
531 (1, 120, 2, 120, 3, 120)
532 """
533 return self._finish(
534 Stream(
535 _iteration_utils.IfElseMap(
536 self._data, condition, if_true, if_false
537 )
538 )
539 )
541 def count(self) -> int:
542 """Count the elements in this Stream. This finishes the Stream.
544 Equivalent to: Stream(...).map(lambda x: 1).sum()
546 >>> Stream([1, 2, 3]).count()
547 3
548 >>> Stream("abcdef").count()
549 6
550 """
551 return self._finish(
552 _iteration_utils.count(self._data), close_source=True
553 )
555 def dedup(self, *, key: None | Callable[[T], object] = None) -> Self:
556 """Remove consecutive equal values.
558 If the input is sorted this is the same as Stream.distinct().
560 >>> Stream([1] * 100).dedup().collect(list)
561 [1]
562 >>> Stream([1, 2, 3, 1]).dedup().collect()
563 (1, 2, 3, 1)
564 >>> Stream([1, 1, 2, 2, 2, 2, 3, 1]).dedup().collect()
565 (1, 2, 3, 1)
566 >>> Stream([]).dedup().collect()
567 ()
568 >>> Stream("ABC").dedup(key=str.lower).collect("".join)
569 'ABC'
570 >>> Stream("aAaAbbbcCCAaBbCc").dedup(key=str.lower).collect("".join)
571 'abcABC'
572 """
573 # Inspired by the unique_justseen itertools recipe
574 # https://docs.python.org/3/library/itertools.html#itertools-recipes
575 self._data = map(
576 next,
577 map(
578 operator.itemgetter(1),
579 itertools.groupby(self._data, key=key),
580 ),
581 )
582 return self
584 def dedup_counting(self) -> Stream[tuple[T, int]]:
585 """Group the stream and count the items in the group.
587 >>> Stream("abba").dedup_counting().starmap(print).for_each()
588 a 1
589 b 2
590 a 1
591 >>> Stream("AaaaBBcccc").dedup_counting().starmap(print).for_each()
592 A 1
593 a 3
594 B 2
595 c 4
596 """
598 def _map(k: T, g: Iterator[T]) -> tuple[T, int]:
599 return (k, _iteration_utils.count(g))
601 return Stream(itertools.starmap(_map, itertools.groupby(self)))
603 @override
604 def distinct(self, *, use_set: bool = True) -> Self:
605 """Remove duplicate values.
607 >>> from typed_stream import Stream
608 >>> Stream([1, 2, 2, 2, 3, 2, 2]).distinct().collect()
609 (1, 2, 3)
610 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct().collect()
611 Traceback (most recent call last):
612 ...
613 TypeError: unhashable type: 'set'
614 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct(use_set=False).collect()
615 ({1}, {2}, {3})
616 """
617 # pylint: disable=duplicate-code
618 encountered: set[T] | list[T]
619 peek_fun: Callable[[T], None]
620 if use_set:
621 encountered = set()
622 peek_fun = encountered.add
623 else:
624 encountered = []
625 peek_fun = encountered.append
627 self._data = map(
628 _iteration_utils.Peeker(peek_fun),
629 itertools.filterfalse(encountered.__contains__, self._data),
630 )
631 return self
633 @override
634 def drop(self, c: int, /) -> Self:
635 """Drop the first count values.
637 >>> Stream([1, 2, 3, 4, 5]).drop(2).collect()
638 (3, 4, 5)
639 """
640 self._data = itertools.islice(self._data, c, None)
641 return self
643 def drop_while(self, fun: Callable[[T], object], /) -> Self:
644 """Drop values as long as the function returns a truthy value.
646 See: https://docs.python.org/3/library/itertools.html#itertools.dropwhile
648 >>> Stream([1, 2, 3, 4, 1]).drop_while(lambda x: x < 3).collect()
649 (3, 4, 1)
650 """
651 self._data = itertools.dropwhile(fun, self._data)
652 return self
654 def empty(self) -> bool:
655 """Check whether this doesn't contain any value. This finishes the Stream.
657 >>> Stream([1, 2, 3]).empty()
658 False
659 >>> Stream([]).empty()
660 True
661 """
662 try:
663 self.first()
664 except exceptions.StreamEmptyError:
665 return True
666 return False
668 def enumerate(
669 self, start_index: int = 0, / # noqa: W504
670 ) -> Stream[_utils.IndexValueTuple[T]]:
671 """Map the values to a tuple of index and value.
673 >>> Stream([1, 2, 3]).enumerate().collect()
674 ((0, 1), (1, 2), (2, 3))
675 >>> Stream("abc").enumerate().collect()
676 ((0, 'a'), (1, 'b'), (2, 'c'))
677 >>> Stream("abc").enumerate(100).collect()
678 ((100, 'a'), (101, 'b'), (102, 'c'))
679 >>> Stream("abc").enumerate().map(lambda el: {el.idx: el.val}).collect()
680 ({0: 'a'}, {1: 'b'}, {2: 'c'})
681 """
682 return self._finish(
683 Stream(_iteration_utils.Enumerator(self._data, start_index))
684 )
686 @typing.overload
687 def exclude( # noqa: D102
688 self: Stream[K | Prim],
689 fun: _utils.InstanceChecker[Prim],
690 /, # noqa: W504
691 ) -> Stream[K]: ...
693 @typing.overload
694 def exclude( # noqa: D102
695 self: Stream[K | U], fun: _utils.InstanceChecker[U], / # noqa: W504
696 ) -> Stream[K]: ...
698 @typing.overload
699 def exclude( # noqa: D102
700 self: Stream[K | None], fun: _utils.NoneChecker, / # noqa: W504
701 ) -> Stream[K]: ...
703 # @typing.overload
704 # def exclude( # noqa: D102
705 # self: Stream[K | U], fun: TypeGuardingCallable[U, K | U]
706 # ) -> Stream[K]:
707 # ...
709 @typing.overload
710 def exclude( # noqa: D102
711 self: Stream[T], fun: Callable[[T], object], / # noqa: W504
712 ) -> Stream[T]: ...
714 @override
715 def exclude(self, fun: Callable[[T], object], /) -> object:
716 """Exclude values if the function returns a truthy value.
718 See: https://docs.python.org/3/library/itertools.html#itertools.filterfalse
720 >>> Stream([1, 2, 3, 4, 5]).exclude(lambda x: x % 2).collect()
721 (2, 4)
722 >>> Stream([1, 2, None, 3]).exclude(lambda x: x is None).collect()
723 (1, 2, 3)
724 """
725 self._data = itertools.filterfalse(fun, self._data)
726 return self
728 @typing.overload
729 def filter( # noqa: D102
730 self: Stream[K | None], fun: _utils.NotNoneChecker
731 ) -> Stream[K]: ...
733 @typing.overload
734 def filter(self: Stream[K | None]) -> Stream[K]: ... # noqa: D102
736 @typing.overload
737 def filter(self: Stream[T]) -> Stream[T]: ... # noqa: D102
739 @typing.overload
740 def filter( # noqa: D102
741 self, fun: _utils.InstanceChecker[K]
742 ) -> Stream[K]: # pragma: no cover
743 ...
745 @typing.overload
746 def filter( # noqa: D102
747 self, fun: _types.TypeGuardingCallable[K, T]
748 ) -> Stream[K]: # pragma: no cover
749 ...
751 @typing.overload
752 def filter( # noqa: D102
753 self: Stream[T], fun: Callable[[T], object]
754 ) -> Stream[T]: ...
756 def filter(self, fun: Callable[[T], object] | None = None) -> object:
757 """Use built-in filter to filter values.
759 >>> Stream([1, 2, 3, 4, 5]).filter(lambda x: x % 2).collect()
760 (1, 3, 5)
761 """
762 self._data = filter(fun, self._data)
763 return self
765 def first(self, default: K | _DefaultValueType = _DEFAULT_VALUE) -> T | K:
766 """Return the first element of the Stream. This finishes the Stream.
768 >>> Stream([1, 2, 3]).first()
769 1
770 >>> Stream("abc").first()
771 'a'
772 >>> Stream([]).first()
773 Traceback (most recent call last):
774 ...
775 typed_stream.exceptions.StreamEmptyError
776 >>> Stream([]).first(default="default")
777 'default'
778 """
779 try:
780 first = next(iter(self._data))
781 except StopIteration:
782 if not isinstance(default, _DefaultValueType):
783 return default
784 raise exceptions.StreamEmptyError() from None
785 finally:
786 self._finish(None, close_source=True)
787 return first
789 @typing.overload
790 def flat_map( # noqa: D102
791 self, fun: Callable[[T], Iterable[K]], / # noqa: W504
792 ) -> Stream[K]: ...
794 @typing.overload
795 def flat_map( # noqa: D102
796 self,
797 fun: Callable[[T, Unpack[Tvt]], Iterable[K]],
798 /,
799 *args: Unpack[Tvt],
800 ) -> Stream[K]: ...
802 def flat_map(
803 self,
804 fun: Callable[[T, Unpack[Tvt]], Iterable[K]],
805 /,
806 *args: Unpack[Tvt],
807 ) -> Stream[K]:
808 """Map each value to another.
810 This lazily finishes the current Stream and creates a new one.
812 >>> Stream([1, 4, 7]).flat_map(lambda x: [x, x + 1, x + 2]).collect()
813 (1, 2, 3, 4, 5, 6, 7, 8, 9)
814 >>> Stream(["abc", "def"]).flat_map(str.encode, "ASCII").collect()
815 (97, 98, 99, 100, 101, 102)
816 """
817 return Stream(
818 itertools.chain.from_iterable(
819 map(fun, self._data, *(itertools.repeat(arg) for arg in args))
820 )
821 )
823 def for_each(self, fun: Callable[[T], object] = functions.noop, /) -> None:
824 """Consume all the values of the Stream with the callable.
826 >>> Stream([1, 2, 3]).for_each(print)
827 1
828 2
829 3
830 """
831 for value in self._data:
832 fun(value)
833 self._finish(None, close_source=True)
835 def last(self) -> T:
836 """Return the last element of the Stream. This finishes the Stream.
838 raises StreamEmptyError if stream is empty.
840 >>> Stream([1, 2, 3]).last()
841 3
842 >>> Stream([]).last()
843 Traceback (most recent call last):
844 ...
845 typed_stream.exceptions.StreamEmptyError
846 """
847 if tail := self.tail(1):
848 return tail[-1]
849 raise exceptions.StreamEmptyError()
851 @override
852 def limit(self, c: int, /) -> Self:
853 """Stop the Stream after count values.
855 >>> Stream([1, 2, 3, 4, 5]).limit(3).collect()
856 (1, 2, 3)
857 >>> Stream.from_value(3).limit(1000).collect() == (3,) * 1000
858 True
859 """
860 self._data = itertools.islice(self._data, c)
861 return self
863 @typing.overload
864 def map(self, fun: Callable[[T], K], /) -> Stream[K]: ... # noqa: D102
866 @typing.overload
867 def map( # noqa: D102
868 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt]
869 ) -> Stream[K]: ...
871 def map(
872 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt]
873 ) -> Stream[K]:
874 """Map each value to another.
876 This lazily finishes the current Stream and creates a new one.
878 >>> Stream([1, 2, 3]).map(lambda x: x * 3).collect()
879 (3, 6, 9)
880 >>> Stream([1, 2, 3]).map(operator.mul, 3).collect()
881 (3, 6, 9)
882 """
883 return self._finish(
884 Stream(
885 map(fun, self._data, *(itertools.repeat(arg) for arg in args))
886 )
887 )
889 @typing.overload
890 def max(self: Stream[SC]) -> SC: ... # noqa: D102
892 @typing.overload
893 def max( # noqa: D102
894 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE
895 ) -> SC | K: ...
897 @typing.overload
898 def max( # noqa: D102
899 self,
900 default: K | _DefaultValueType = _DEFAULT_VALUE,
901 *,
902 key: Callable[[T], SC],
903 ) -> T | K: ...
905 def max(
906 self,
907 default: object = _DEFAULT_VALUE,
908 *,
909 key: Callable[[T], SC] | None = None,
910 ) -> object:
911 """Return the biggest element of the stream.
913 >>> Stream([3, 2, 1]).max()
914 3
915 >>> Stream(["a", "b", "c"]).max()
916 'c'
917 >>> Stream(["abc", "de", "f"]).max(key=len)
918 'abc'
919 >>> Stream([]).max(default=0)
920 0
921 >>> Stream([]).max()
922 Traceback (most recent call last):
923 ...
924 typed_stream.exceptions.StreamEmptyError
925 """
926 max_ = max(self._data, default=default, key=key) # type: ignore[type-var,arg-type]
927 if isinstance(max_, _DefaultValueType):
928 raise exceptions.StreamEmptyError() from None
929 return self._finish(max_, close_source=True)
931 @typing.overload
932 def min(self: Stream[SC]) -> SC: ... # noqa: D102
934 @typing.overload
935 def min( # noqa: D102
936 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE
937 ) -> SC | K: ...
939 @typing.overload
940 def min( # noqa: D102
941 self,
942 default: K | _DefaultValueType = _DEFAULT_VALUE,
943 *,
944 key: Callable[[T], SC],
945 ) -> T | K: ...
947 def min(
948 self,
949 default: object = _DEFAULT_VALUE,
950 *,
951 key: Callable[[T], SC] | None = None,
952 ) -> object:
953 """Return the smallest element of the stream.
955 >>> Stream([1, 2, 3]).min()
956 1
957 >>> Stream(["a", "b", "c"]).min()
958 'a'
959 >>> Stream(["abc", "de", "f"]).min(key=len)
960 'f'
961 >>> Stream([]).min(default=0)
962 0
963 >>> Stream([]).min()
964 Traceback (most recent call last):
965 ...
966 typed_stream.exceptions.StreamEmptyError
967 """
968 min_ = min(self._data, default=default, key=key) # type: ignore[type-var,arg-type]
969 if isinstance(min_, _DefaultValueType):
970 raise exceptions.StreamEmptyError() from None
971 return self._finish(min_, close_source=True)
973 @typing.overload
974 def nth(self, index: int, /) -> T: ... # noqa: D102
976 @typing.overload
977 def nth(self, index: int, /, default: T) -> T: ... # noqa: D102
979 @typing.overload
980 def nth(self, index: int, /, default: K) -> T | K: ... # noqa: D102
982 def nth( # noqa: C901
983 self,
984 index: int,
985 /,
986 default: K | _DefaultValueType = _DEFAULT_VALUE,
987 ) -> T | K:
988 """Return the nth item of the stream.
990 Raises StreamIndexError if no default value is given and the Stream
991 does not have an item at the given index.
993 Stream(...).nth(0) gets the first element of the stream.
995 >>> Stream([1, 2, 3]).nth(0)
996 1
997 >>> Stream("abc").nth(1)
998 'b'
999 >>> Stream([]).nth(22)
1000 Traceback (most recent call last):
1001 ...
1002 typed_stream.exceptions.StreamIndexError
1003 >>> Stream([]).nth(22, default=42)
1004 42
1005 """
1006 value: T | _DefaultValueType
1007 if index < 0:
1008 tail = self.tail(abs(index))
1009 value = tail[0] if len(tail) == abs(index) else _DEFAULT_VALUE
1010 else: # value >= 0
1011 try:
1012 value = self.drop(index).first()
1013 except exceptions.StreamEmptyError:
1014 value = _DEFAULT_VALUE
1016 if not isinstance(value, _DefaultValueType):
1017 return value
1019 if isinstance(default, _DefaultValueType):
1020 raise exceptions.StreamIndexError()
1022 return default
1024 @typing.overload
1025 def nwise( # noqa: D102
1026 self, size: typing.Literal[1], / # noqa: W504
1027 ) -> Stream[tuple[T]]: ...
1029 @typing.overload
1030 def nwise( # noqa: D102
1031 self, size: typing.Literal[2], / # noqa: W504
1032 ) -> Stream[tuple[T, T]]: ... # noqa: D102
1034 @typing.overload
1035 def nwise( # noqa: D102
1036 self, size: int, / # noqa: W504
1037 ) -> Stream[tuple[T, ...]]: ...
1039 def nwise(
1040 self, size: int, / # noqa: W504
1041 ) -> Stream[tuple[T, ...]] | Stream[tuple[T, T]] | Stream[tuple[T]]:
1042 """Return a Stream of overlapping n-lets.
1044 This is often called a sliding window.
1045 For n=2 it behaves like pairwise from itertools.
1047 The returned Stream will consist of tuples of length n.
1048 If n is bigger than the count of values in self, it will be empty.
1050 >>> Stream([1, 2, 3]).nwise(1).collect()
1051 ((1,), (2,), (3,))
1052 >>> Stream([1, 2, 3]).nwise(2).collect()
1053 ((1, 2), (2, 3))
1054 >>> Stream([1, 2, 3, 4]).nwise(3).collect()
1055 ((1, 2, 3), (2, 3, 4))
1056 >>> Stream([1, 2, 3, 4, 5]).nwise(4).collect()
1057 ((1, 2, 3, 4), (2, 3, 4, 5))
1058 """
1059 return self._finish(
1060 Stream(_iteration_utils.sliding_window(self._data, size))
1061 )
1063 @override
1064 def peek(self, fun: Callable[[T], object], /) -> Self:
1065 """Peek at every value, without modifying the values in the Stream.
1067 >>> stream = Stream([1, 2, 3]).peek(print)
1068 >>> stream.map(str).collect()
1069 1
1070 2
1071 3
1072 ('1', '2', '3')
1073 """
1074 self._data = map(_iteration_utils.Peeker(fun), self._data)
1075 return self
1077 def reduce(
1078 self,
1079 fun: Callable[[T, T], T],
1080 initial: T | _DefaultValueType = _DEFAULT_VALUE,
1081 ) -> T:
1082 """Reduce the values of this stream. This finishes the Stream.
1084 If no initial value is provided a StreamEmptyError is raised if
1085 the stream is empty.
1087 >>> Stream([1, 2, 3]).reduce(operator.add)
1088 6
1089 >>> Stream([1, 2, 3]).reduce(operator.mul)
1090 6
1091 >>> Stream([]).reduce(operator.mul)
1092 Traceback (most recent call last):
1093 ...
1094 typed_stream.exceptions.StreamEmptyError
1095 """
1096 iterator = iter(self._data)
1097 if isinstance(initial, _DefaultValueType):
1098 try:
1099 initial = next(iterator)
1100 except StopIteration:
1101 raise exceptions.StreamEmptyError() from None
1102 return self._finish(functools.reduce(fun, iterator, initial), True)
1104 def starcollect(self, fun: _types.StarCallable[T, K]) -> K:
1105 """Collect the values of this Stream. This finishes the Stream.
1107 >>> Stream([1, 2, 3]).starcollect(lambda *args: args)
1108 (1, 2, 3)
1109 >>> Stream([]).starcollect(lambda *args: args)
1110 ()
1111 """
1112 return self._finish(fun(*self._data), close_source=True)
1114 @typing.overload
1115 def starmap( # noqa: D102
1116 self: Stream[_utils.IndexValueTuple[K]],
1117 fun: Callable[[int, K], U],
1118 /,
1119 ) -> Stream[U]: ...
1121 @typing.overload
1122 def starmap( # noqa: D102
1123 self: Stream[tuple[Unpack[Tvt]]],
1124 fun: Callable[[Unpack[Tvt]], U],
1125 /,
1126 ) -> Stream[U]: ...
1128 def starmap( # noqa: D102
1129 self: Stream[tuple[Unpack[Tvt]]],
1130 fun: Callable[[Unpack[Tvt]], U],
1131 /,
1132 ) -> Stream[U]:
1133 """Map each value to another.
1135 This lazily finishes the current Stream and creates a new one.
1137 >>> Stream([(1, 2), (3, 4)]).starmap(operator.mul).collect()
1138 (2, 12)
1139 >>> Stream([(2, "x"), (3, "y")]).starmap(operator.mul).collect()
1140 ('xx', 'yyy')
1141 """
1142 return self._finish(Stream(itertools.starmap(fun, self._data)))
1144 def sum(self: Stream[SA]) -> SA:
1145 """Calculate the sum of the elements.
1147 This works for every type that supports addition.
1149 For numbers stream.collect(sum) could be faster.
1150 For strings stream.collect("".join) could be faster.
1151 For lists stream.flat_map(lambda _: _).collect(list) could be faster.
1152 For tuples stream.flat_map(lambda _: _).collect(tuple) could be faster.
1154 >>> Stream([1, 2, 3]).sum()
1155 6
1156 >>> Stream(["a", "b", "c"]).sum()
1157 'abc'
1158 >>> Stream([(1, 2), (3, 4)]).sum()
1159 (1, 2, 3, 4)
1160 >>> Stream(([1], [2], [3])).sum()
1161 [1, 2, 3]
1162 """
1163 return self.reduce(add)
1165 def tail(self, c: int, /) -> streamable.StreamableSequence[T]:
1166 """Return a Sequence with the last count items.
1168 >>> Stream([1, 2, 3]).tail(2)
1169 (2, 3)
1170 >>> Stream.range(100).tail(10)
1171 (90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
1172 """
1173 return self._finish(
1174 streamable.StreamableSequence(
1175 collections.deque(self._data, maxlen=c)
1176 ),
1177 close_source=True,
1178 )
1180 def take_while(self, fun: Callable[[T], object]) -> Self:
1181 """Take values as long the function returns a truthy value.
1183 See: https://docs.python.org/3/library/itertools.html#itertools.takewhile
1185 >>> Stream([1, 2, 3, 4, 1]).take_while(lambda x: x < 4).collect()
1186 (1, 2, 3)
1187 >>> Stream([1, 2, 3, -4, -1]).take_while(lambda x: x <= 0).collect()
1188 ()
1189 """
1190 self._data = itertools.takewhile(fun, self._data)
1191 return self