Coverage for typed_stream / _impl / stream.py: 99%
221 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-17 08:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-17 08:56 +0000
1# Licensed under the EUPL-1.2 or later.
2# You may obtain a copy of the licence in all the official languages of the
3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
5"""Typed Stream class for easier handling of iterables."""
7from __future__ import annotations
9import collections
10import concurrent.futures
11import functools
12import itertools
13import operator
14import sys
15import typing
16from collections.abc import Callable, Iterable, Iterator, Mapping
17from numbers import Number, Real
18from types import EllipsisType
20from .. import exceptions, streamable
21from . import _iteration_utils, functions, stream_abc
22from ._default_value import DEFAULT_VALUE as _DEFAULT_VALUE
23from ._default_value import DefaultValueType as _DefaultValueType
24from ._typing import Self, TypeVarTuple, Unpack, override
25from ._utils import map_with_additional_args
27# pylint: disable=too-many-lines
28__all__: tuple[typing.Literal["Stream"]] = ("Stream",)
30if typing.TYPE_CHECKING:
31 from . import _types, _utils
33K = typing.TypeVar("K")
34T = typing.TypeVar("T")
35U = typing.TypeVar("U")
36V = typing.TypeVar("V")
37Prim = typing.TypeVar("Prim", int, str, bool, complex, Number, Real)
38Exc = typing.TypeVar("Exc", bound=BaseException)
40SA = typing.TypeVar("SA", bound="_types.SupportsAdd")
41SC = typing.TypeVar("SC", bound="_types.SupportsComparison")
43Tvt = TypeVarTuple("Tvt")
45add: Callable[[SA, SA], SA] = operator.add
48class Stream(stream_abc.StreamABC[T], Iterable[T]):
49 """Typed Stream class for easier handling of iterables.
51 It is not recommended to store Stream instances in variables,
52 instead use method chaining to handle the values and collect them when finished.
53 """
55 __slots__ = ()
57 _data: Iterable[T]
59 def __init__(
60 self,
61 data: Iterable[T] | EllipsisType,
62 close_source_callable: Callable[[], None] | None = None,
63 ) -> None:
64 """Create a new Stream.
66 To create a finished Stream do Stream(...).
67 """
68 super().__init__(
69 ... if isinstance(data, EllipsisType) else data,
70 close_source_callable,
71 )
73 def __contains__(self, value: T, /) -> bool:
74 """Check whether this stream contains the given value.
76 >>> 2 in Stream((1, 2, 3))
77 True
78 >>> 4 in Stream((1, 2, 3))
79 False
80 >>> 3 in Stream((1, 2, 3, 4, 5, 6, 7, 8)).peek(print)
81 1
82 2
83 3
84 True
85 """
86 for element in self._data:
87 if element == value:
88 return self._finish(True, close_source=True)
89 return self._finish(False, close_source=True)
91 @typing.overload
92 def __getitem__(self, item: int, /) -> T:
93 """Nobody inspects the spammish repetition."""
95 @typing.overload
96 def __getitem__(
97 self, item: slice[int | None, int | None, int | None], / # noqa: W504
98 ) -> streamable.StreamableSequence[T]:
99 """Nobody inspects the spammish repetition."""
101 def __getitem__(
102 self,
103 item: slice[int | None, int | None, int | None] | int,
104 /, # noqa: W504
105 ) -> streamable.StreamableSequence[T] | T:
106 """Finish the stream by collecting.
108 >>> Stream((1, 2, 3))[1]
109 2
110 >>> Stream((1, 2, 3))[1:3]
111 (2, 3)
112 """
113 if isinstance(item, int):
114 return self.nth(item)
115 return self._get_slice(start=item.start, stop=item.stop, step=item.step)
117 @override
118 def __iter__(self) -> Iterator[T]:
119 """Iterate over the values of this Stream. This finishes the Stream.
121 >>> for value in Stream((1, 2, 3)):
122 ... print(value)
123 1
124 2
125 3
126 """
127 return _iteration_utils.IterWithCleanUp(self._data, self.close)
129 def __length_hint__(self) -> int:
130 """Return an estimated length for this Stream.
132 >>> from operator import length_hint
133 >>> length_hint(Stream([1, 2, 3]))
134 3
135 >>> length_hint(Stream.range(100))
136 100
137 """
138 return operator.length_hint(self._data)
140 def __reversed__(self) -> Iterator[T]:
141 """Return the items of this Stream in reversed order.
143 This finishes the Stream and collects all the element.
145 Equivalent to reversed(self.collect()).
147 >>> tuple(reversed(Stream((1, 2, 3))))
148 (3, 2, 1)
149 >>> "".join(reversed(Stream("abc")))
150 'cba'
151 """
152 return reversed(self.collect())
154 def _get_slice( # noqa: C901
155 self,
156 *,
157 start: int | None = None,
158 stop: int | None = None,
159 step: int | None = None,
160 ) -> streamable.StreamableSequence[T]:
161 """Implement __getitem__ with slices."""
162 if start is stop is step is None:
163 return self.collect()
164 if ( # pylint: disable=too-many-boolean-expressions
165 (start is None or start >= 0)
166 and (step is None or step >= 0)
167 and (stop is None or stop >= 0)
168 ):
169 return self._finish(
170 streamable.StreamableSequence(
171 itertools.islice(self._data, start, stop, step)
172 ),
173 close_source=True,
174 )
175 if (
176 start is not None
177 and start < 0
178 and step in {None, 1}
179 and stop is None
180 ):
181 return self.tail(abs(start))
182 return self.collect()[start:stop:step]
184 @classmethod
185 @override
186 def _module(cls) -> str:
187 if cls == Stream:
188 return "typed_stream"
189 return cls.__module__
191 @staticmethod
192 def counting(start: int = 0, step: int = 1) -> Stream[int]:
193 """Create an endless counting Stream.
195 >>> Stream.counting().limit(5).collect()
196 (0, 1, 2, 3, 4)
197 >>> Stream.counting(5, 2).limit(5).collect()
198 (5, 7, 9, 11, 13)
199 """
200 return Stream(itertools.count(start, step))
202 @staticmethod
203 def from_value(value: K) -> Stream[K]:
204 """Create an endless Stream of the same value.
206 >>> Stream.from_value(1).limit(5).collect()
207 (1, 1, 1, 1, 1)
208 """
209 return Stream(itertools.repeat(value))
211 @typing.overload
212 @staticmethod
213 def range(stop: int, /) -> Stream[int]: ... # noqa: D102
215 @typing.overload
216 @staticmethod
217 def range(*, stop: int) -> Stream[int]: ... # noqa: D102
219 @typing.overload
220 @staticmethod
221 def range(*, start: int, stop: int) -> Stream[int]: ... # noqa: D102
223 @typing.overload
224 @staticmethod
225 def range(start: int, stop: int, /) -> Stream[int]: ... # noqa: D102
227 @typing.overload
228 @staticmethod
229 def range(start: int, /, *, stop: int) -> Stream[int]: ... # noqa: D102
231 @typing.overload
232 @staticmethod
233 def range( # noqa: D102
234 start: int, stop: int, /, *, step: int
235 ) -> Stream[int]: ...
237 @typing.overload
238 @staticmethod
239 def range( # noqa: D102
240 start: int, stop: int, step: int, / # noqa: W504
241 ) -> Stream[int]: ...
243 @typing.overload
244 @staticmethod
245 def range( # noqa: D102
246 start: int, /, *, stop: int, step: int
247 ) -> Stream[int]: ...
249 @typing.overload
250 @staticmethod
251 def range( # noqa: D102
252 *, start: int, stop: int, step: int
253 ) -> Stream[int]: ...
255 @staticmethod
256 def range( # noqa: C901
257 *args: int,
258 start: int | _DefaultValueType = _DEFAULT_VALUE,
259 stop: int | _DefaultValueType = _DEFAULT_VALUE,
260 step: int | _DefaultValueType = _DEFAULT_VALUE,
261 ) -> Stream[int]:
262 """Create a Stream[int] from a range.
264 The arguments behave like to the built-in range function:
265 - Stream.range(stop) -> Stream[int]
266 - Stream.range(start, stop[, step]) -> Stream[int]
268 >>> Stream.range(5).collect() == Stream(range(5)).collect()
269 True
270 >>> Stream.range(1, 13).collect() == Stream(range(1, 13)).collect()
271 True
272 >>> Stream.range(1, 9, 2).collect() == Stream(range(1, 9, 2)).collect()
273 True
274 >>> Stream.range(start=1, stop=7, step=2).collect()
275 (1, 3, 5)
276 """
277 # pylint: disable=confusing-consecutive-elif
278 if not isinstance(start, _DefaultValueType):
279 if not args and not isinstance(stop, _DefaultValueType):
280 return Stream(
281 range(start, stop)
282 if isinstance(step, _DefaultValueType)
283 else range(start, stop, step)
284 )
285 elif isinstance(stop, _DefaultValueType):
286 if isinstance(step, _DefaultValueType):
287 return Stream(range(*args)) # no kwarg given
288 if len(args) == 2:
289 return Stream(range(args[0], args[1], step))
290 elif isinstance(step, _DefaultValueType):
291 if len(args) == 1:
292 return Stream(range(args[0], stop))
293 if not args:
294 return Stream(range(stop))
295 elif len(args) == 1:
296 return Stream(range(args[0], stop, step))
297 raise TypeError("Unexpected arguments to Stream.range()")
299 def all(self) -> bool:
300 """Check whether all values are Truthy. This finishes the Stream.
302 Returns False if there is any false value in the Stream.
304 >>> Stream([1, 2, 3]).peek(print).all()
305 1
306 2
307 3
308 True
309 >>> Stream([1, 2, 0, 4, 5, 6, 7, 8]).peek(print).all()
310 1
311 2
312 0
313 False
314 >>> Stream([]).all()
315 True
316 """
317 return self._finish(all(self._data), close_source=True)
319 @typing.overload
320 def catch( # noqa: D102
321 self,
322 *exception_class: type[Exc],
323 ) -> Self: ...
325 @typing.overload
326 def catch( # noqa: D102
327 self,
328 *exception_class: type[Exc],
329 handler: Callable[[Exc], object],
330 ) -> Self: ...
332 @typing.overload
333 def catch( # noqa: D102
334 self,
335 *exception_class: type[Exc],
336 default: Callable[[Exc], K] | Callable[[], K],
337 ) -> Stream[T | K]: ...
339 @typing.overload
340 def catch( # noqa: D102
341 self,
342 *exception_class: type[Exc],
343 handler: Callable[[Exc], object],
344 default: Callable[[Exc], K] | Callable[[], K],
345 ) -> Stream[T | K]: ...
347 def catch(
348 self,
349 *exception_class: type[Exc],
350 handler: Callable[[Exc], object] | None = None,
351 default: Callable[[Exc], K] | Callable[[], K] | None = None,
352 ) -> Stream[T | K]:
353 """Catch exceptions.
355 >>> Stream("1a2").map(int).catch(ValueError, handler=print).collect()
356 invalid literal for int() with base 10: 'a'
357 (1, 2)
358 >>> Stream("1a2").map(int).catch(ValueError, default=lambda _:_).collect()
359 (1, ValueError("invalid literal for int() with base 10: 'a'"), 2)
360 >>> Stream("1a2").map(int).peek(print) \
361 .catch(ValueError, handler=print).collect()
362 1
363 invalid literal for int() with base 10: 'a'
364 2
365 (1, 2)
366 """
367 return self._finish(
368 Stream(
369 _iteration_utils.ExceptionHandler(
370 self._data, exception_class, handler, default
371 )
372 )
373 )
375 def chain(self, *iterables: Iterable[T]) -> Self:
376 """Add another iterable to the end of the Stream.
378 >>> Stream([1, 2, 3]).chain([4, 5, 6]).collect()
379 (1, 2, 3, 4, 5, 6)
380 >>> Stream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect()
381 (1, 2, 3, 4, 5, 6, 7, 8, 9)
382 >>> Stream("abc").chain("def", "ghi", "jkl").collect("".join)
383 'abcdefghijkl'
384 """
385 self._data = itertools.chain(self._data, *iterables)
386 return self
388 if sys.version_info >= (3, 12) and hasattr(itertools, "batched"):
390 def chunk(self, size: int) -> Stream[tuple[T, ...]]:
391 """Split the stream into chunks of the specified size.
393 The last chunk may be shorter.
395 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect()
396 ((1, 2), (3, 4), (5, 6))
397 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect()
398 ((1, 2, 3), (4, 5, 6))
399 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect()
400 ((1, 2, 3), (4, 5, 6), (7,))
401 """
402 return self._finish(
403 Stream(
404 # add strict=False if min version is 3.13 (is the default)
405 itertools.batched(self._data, size), # noqa: B911
406 )
407 )
409 else: # pragma: no cover
411 def chunk(self, size: int) -> Stream[tuple[T, ...]]:
412 """Split the stream into chunks of the specified size.
414 The last chunk may be shorter.
416 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect()
417 ((1, 2), (3, 4), (5, 6))
418 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect()
419 ((1, 2, 3), (4, 5, 6))
420 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect()
421 ((1, 2, 3), (4, 5, 6), (7,))
422 """
423 return self._finish(
424 _iteration_utils.Chunked(self._data, size).stream()
425 )
427 @typing.overload
428 def collect(self, /) -> streamable.StreamableSequence[T]: ... # noqa: D102
430 @typing.overload
431 def collect( # noqa: D102
432 self,
433 fun: Callable[[Iterable[T]], streamable.StreamableSequence[T]],
434 /,
435 ) -> streamable.StreamableSequence[T]: ...
437 @typing.overload
438 def collect( # noqa: D102
439 self,
440 fun: type[collections.Counter[T]],
441 /,
442 ) -> collections.Counter[T]: ...
444 @typing.overload
445 def collect( # noqa: D102
446 self, fun: Callable[[Iterable[T]], tuple[T, ...]], / # noqa: W504
447 ) -> tuple[T, ...]: ...
449 @typing.overload
450 def collect( # noqa: D102
451 self, fun: Callable[[Iterable[T]], list[T]], / # noqa: W504
452 ) -> list[T]: ...
454 @typing.overload
455 def collect( # noqa: D102
456 self: Stream[SA], fun: Callable[[Iterable[SA]], SA], / # noqa: W504
457 ) -> SA: ...
459 @typing.overload
460 def collect( # noqa: D102
461 self, fun: Callable[[Iterable[T]], set[T]], / # noqa: W504
462 ) -> set[T]: ...
464 @typing.overload
465 def collect( # noqa: D102
466 self, fun: Callable[[Iterable[T]], frozenset[T]], / # noqa: W504
467 ) -> frozenset[T]: ...
469 @typing.overload
470 def collect( # noqa: D102
471 self: Stream[tuple[K, V]],
472 fun: Callable[[Iterable[tuple[K, V]]], dict[K, V]],
473 /,
474 ) -> dict[K, V]: ...
476 @typing.overload
477 def collect( # noqa: D102
478 self: Stream[tuple[K, V]],
479 fun: Callable[[Iterable[tuple[K, V]]], Mapping[K, V]],
480 /,
481 ) -> Mapping[K, V]: ...
483 @typing.overload
484 def collect( # noqa: D102
485 self, fun: Callable[[Iterable[T]], K], / # noqa: W504
486 ) -> K: ...
488 def collect(
489 self: Stream[U],
490 fun: Callable[[Iterable[U]], object] = streamable.StreamableSequence,
491 /,
492 ) -> object:
493 """Collect the values of this Stream. This finishes the Stream.
495 >>> Stream([1, 2, 3]).collect(list)
496 [1, 2, 3]
497 >>> Stream([1, 2, 3]).collect(sum)
498 6
499 >>> Stream([1, 2, 3]).collect(dict.fromkeys)
500 {1: None, 2: None, 3: None}
501 >>> Stream([(1, 2), (3, 4)]).collect(dict)
502 {1: 2, 3: 4}
503 """
504 return self._finish(fun(self._data), close_source=True)
506 def concurrent_map(
507 self, fun: Callable[[T], K], /, max_workers: int | None = None
508 ) -> Stream[K]:
509 """Map values concurrently.
511 See: https://docs.python.org/3/library/concurrent.futures.html
513 >>> Stream("123").concurrent_map(int).collect()
514 (1, 2, 3)
515 """
516 with concurrent.futures.ProcessPoolExecutor(
517 max_workers=max_workers
518 ) as executor:
519 return self._finish(Stream(executor.map(fun, self._data)))
521 def conditional_map(
522 self,
523 condition: Callable[[T], object],
524 if_true: Callable[[T], U],
525 if_false: Callable[[T], V] | None = None,
526 ) -> Stream[U | V]:
527 """Map values conditionally.
529 >>> Stream("1x2x3x").conditional_map(str.isdigit, int).collect()
530 (1, 2, 3)
531 >>> Stream("1x2x3x").conditional_map(str.isdigit, int, ord).collect()
532 (1, 120, 2, 120, 3, 120)
533 """
534 return self._finish(
535 Stream(
536 _iteration_utils.IfElseMap(
537 self._data, condition, if_true, if_false
538 )
539 )
540 )
542 def count(self) -> int:
543 """Count the elements in this Stream. This finishes the Stream.
545 Equivalent to: Stream(...).map(lambda x: 1).sum()
547 >>> Stream([1, 2, 3]).count()
548 3
549 >>> Stream("abcdef").count()
550 6
551 """
552 return self._finish(
553 _iteration_utils.count(self._data), close_source=True
554 )
556 def dedup(self, *, key: None | Callable[[T], object] = None) -> Self:
557 """Remove consecutive equal values.
559 If the input is sorted this is the same as Stream.distinct().
561 >>> Stream([1] * 100).dedup().collect(list)
562 [1]
563 >>> Stream([1, 2, 3, 1]).dedup().collect()
564 (1, 2, 3, 1)
565 >>> Stream([1, 1, 2, 2, 2, 2, 3, 1]).dedup().collect()
566 (1, 2, 3, 1)
567 >>> Stream([]).dedup().collect()
568 ()
569 >>> Stream("ABC").dedup(key=str.lower).collect("".join)
570 'ABC'
571 >>> Stream("aAaAbbbcCCAaBbCc").dedup(key=str.lower).collect("".join)
572 'abcABC'
573 """
574 # Inspired by the unique_justseen itertools recipe
575 # https://docs.python.org/3/library/itertools.html#itertools-recipes
576 self._data = map(
577 next,
578 map(
579 operator.itemgetter(1),
580 itertools.groupby(self._data, key=key),
581 ),
582 )
583 return self
585 def dedup_counting(self) -> Stream[tuple[T, int]]:
586 """Group the stream and count the items in the group.
588 >>> Stream("abba").dedup_counting().starmap(print).for_each()
589 a 1
590 b 2
591 a 1
592 >>> Stream("AaaaBBcccc").dedup_counting().starmap(print).for_each()
593 A 1
594 a 3
595 B 2
596 c 4
597 """
599 def _map(k: T, g: Iterator[T]) -> tuple[T, int]:
600 return (k, _iteration_utils.count(g))
602 return Stream(itertools.starmap(_map, itertools.groupby(self)))
604 @override
605 def distinct(self, *, use_set: bool = True) -> Self:
606 """Remove duplicate values.
608 >>> from typed_stream import Stream
609 >>> Stream([1, 2, 2, 2, 3, 2, 2]).distinct().collect()
610 (1, 2, 3)
611 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct().collect() # doctest: +ELLIPSIS
612 Traceback (most recent call last):
613 ...
614 TypeError: ...unhashable type: 'set'...
615 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct(use_set=False).collect()
616 ({1}, {2}, {3})
617 """
618 # pylint: disable=duplicate-code
619 encountered: set[T] | list[T]
620 peek_fun: Callable[[T], None]
621 if use_set:
622 encountered = set()
623 peek_fun = encountered.add
624 else:
625 encountered = []
626 peek_fun = encountered.append
628 self._data = map(
629 _iteration_utils.Peeker(peek_fun),
630 itertools.filterfalse(encountered.__contains__, self._data),
631 )
632 return self
634 @override
635 def drop(self, c: int, /) -> Self:
636 """Drop the first count values.
638 >>> Stream([1, 2, 3, 4, 5]).drop(2).collect()
639 (3, 4, 5)
640 """
641 self._data = itertools.islice(self._data, c, None)
642 return self
644 def drop_while(self, fun: Callable[[T], object], /) -> Self:
645 """Drop values as long as the function returns a truthy value.
647 See: https://docs.python.org/3/library/itertools.html#itertools.dropwhile
649 >>> Stream([1, 2, 3, 4, 1]).drop_while(lambda x: x < 3).collect()
650 (3, 4, 1)
651 """
652 self._data = itertools.dropwhile(fun, self._data)
653 return self
655 def empty(self) -> bool:
656 """Check whether this doesn't contain any value. This finishes the Stream.
658 >>> Stream([1, 2, 3]).empty()
659 False
660 >>> Stream([]).empty()
661 True
662 """
663 try:
664 self.first()
665 except exceptions.StreamEmptyError:
666 return True
667 return False
669 def enumerate(
670 self, start_index: int = 0, / # noqa: W504
671 ) -> Stream[_utils.IndexValueTuple[T]]:
672 """Map the values to a tuple of index and value.
674 >>> Stream([1, 2, 3]).enumerate().collect()
675 ((0, 1), (1, 2), (2, 3))
676 >>> Stream("abc").enumerate().collect()
677 ((0, 'a'), (1, 'b'), (2, 'c'))
678 >>> Stream("abc").enumerate(100).collect()
679 ((100, 'a'), (101, 'b'), (102, 'c'))
680 >>> Stream("abc").enumerate().map(lambda el: {el.idx: el.val}).collect()
681 ({0: 'a'}, {1: 'b'}, {2: 'c'})
682 """
683 return self._finish(
684 Stream(_iteration_utils.Enumerator(self._data, start_index))
685 )
687 @typing.overload
688 def exclude( # noqa: D102
689 self: Stream[K | Prim],
690 fun: _utils.InstanceChecker[Prim],
691 /, # noqa: W504
692 ) -> Stream[K]: ...
694 @typing.overload
695 def exclude( # noqa: D102
696 self: Stream[K | U], fun: _utils.InstanceChecker[U], / # noqa: W504
697 ) -> Stream[K]: ...
699 @typing.overload
700 def exclude( # noqa: D102
701 self: Stream[K | None], fun: _utils.NoneChecker, / # noqa: W504
702 ) -> Stream[K]: ...
704 # @typing.overload
705 # def exclude( # noqa: D102
706 # self: Stream[K | U], fun: TypeGuardingCallable[U, K | U]
707 # ) -> Stream[K]:
708 # ...
710 @typing.overload
711 def exclude( # noqa: D102
712 self: Stream[T], fun: Callable[[T], object], / # noqa: W504
713 ) -> Stream[T]: ...
715 @override
716 def exclude(self, fun: Callable[[T], object], /) -> object:
717 """Exclude values if the function returns a truthy value.
719 See: https://docs.python.org/3/library/itertools.html#itertools.filterfalse
721 >>> Stream([1, 2, 3, 4, 5]).exclude(lambda x: x % 2).collect()
722 (2, 4)
723 >>> Stream([1, 2, None, 3]).exclude(lambda x: x is None).collect()
724 (1, 2, 3)
725 """
726 self._data = itertools.filterfalse(fun, self._data)
727 return self
729 @typing.overload
730 def filter( # noqa: D102
731 self: Stream[K | None], fun: _utils.NotNoneChecker
732 ) -> Stream[K]: ...
734 @typing.overload
735 def filter(self: Stream[K | None]) -> Stream[K]: ... # noqa: D102
737 @typing.overload
738 def filter(self: Stream[T]) -> Stream[T]: ... # noqa: D102
740 @typing.overload
741 def filter( # noqa: D102
742 self, fun: _utils.InstanceChecker[K]
743 ) -> Stream[K]: # pragma: no cover
744 ...
746 @typing.overload
747 def filter( # noqa: D102
748 self, fun: _types.TypeGuardingCallable[K, T]
749 ) -> Stream[K]: # pragma: no cover
750 ...
752 @typing.overload
753 def filter( # noqa: D102
754 self: Stream[T], fun: Callable[[T], object]
755 ) -> Stream[T]: ...
757 def filter(self, fun: Callable[[T], object] | None = None) -> object:
758 """Use built-in filter to filter values.
760 >>> Stream([1, 2, 3, 4, 5]).filter(lambda x: x % 2).collect()
761 (1, 3, 5)
762 """
763 self._data = filter(fun, self._data)
764 return self
766 def first(self, default: K | _DefaultValueType = _DEFAULT_VALUE) -> T | K:
767 """Return the first element of the Stream. This finishes the Stream.
769 >>> Stream([1, 2, 3]).first()
770 1
771 >>> Stream("abc").first()
772 'a'
773 >>> Stream([]).first()
774 Traceback (most recent call last):
775 ...
776 typed_stream.exceptions.StreamEmptyError
777 >>> Stream([]).first(default="default")
778 'default'
779 """
780 try:
781 first = next(iter(self._data))
782 except StopIteration:
783 if not isinstance(default, _DefaultValueType):
784 return default
785 raise exceptions.StreamEmptyError() from None
786 finally:
787 self._finish(None, close_source=True)
788 return first
790 @typing.overload
791 def flat_map( # noqa: D102
792 self, fun: Callable[[T], Iterable[K]], / # noqa: W504
793 ) -> Stream[K]: ...
795 @typing.overload
796 def flat_map( # noqa: D102
797 self,
798 fun: Callable[[T, Unpack[Tvt]], Iterable[K]],
799 /,
800 *args: Unpack[Tvt],
801 ) -> Stream[K]: ...
803 def flat_map(
804 self,
805 fun: Callable[[T, Unpack[Tvt]], Iterable[K]],
806 /,
807 *args: Unpack[Tvt],
808 ) -> Stream[K]:
809 """Map each value to another.
811 This lazily finishes the current Stream and creates a new one.
813 >>> Stream([1, 4, 7]).flat_map(lambda x: [x, x + 1, x + 2]).collect()
814 (1, 2, 3, 4, 5, 6, 7, 8, 9)
815 >>> Stream(["abc", "def"]).flat_map(str.encode, "ASCII").collect()
816 (97, 98, 99, 100, 101, 102)
817 """
818 return Stream(
819 itertools.chain.from_iterable(
820 map_with_additional_args(self._data, fun, args)
821 )
822 )
824 def for_each(self, fun: Callable[[T], object] = functions.noop, /) -> None:
825 """Consume all the values of the Stream with the callable.
827 >>> Stream([1, 2, 3]).for_each(print)
828 1
829 2
830 3
831 """
832 for value in self._data:
833 fun(value)
834 self._finish(None, close_source=True)
836 def last(self) -> T:
837 """Return the last element of the Stream. This finishes the Stream.
839 raises StreamEmptyError if stream is empty.
841 >>> Stream([1, 2, 3]).last()
842 3
843 >>> Stream([]).last()
844 Traceback (most recent call last):
845 ...
846 typed_stream.exceptions.StreamEmptyError
847 """
848 if tail := self.tail(1):
849 return tail[-1]
850 raise exceptions.StreamEmptyError()
852 @override
853 def limit(self, c: int, /) -> Self:
854 """Stop the Stream after count values.
856 >>> Stream([1, 2, 3, 4, 5]).limit(3).collect()
857 (1, 2, 3)
858 >>> Stream.from_value(3).limit(1000).collect() == (3,) * 1000
859 True
860 """
861 self._data = itertools.islice(self._data, c)
862 return self
864 @typing.overload
865 def map(self, fun: Callable[[T], K], /) -> Stream[K]: ... # noqa: D102
867 @typing.overload
868 def map( # noqa: D102
869 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt]
870 ) -> Stream[K]: ...
872 def map(
873 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt]
874 ) -> Stream[K]:
875 """Map each value to another.
877 This lazily finishes the current Stream and creates a new one.
879 >>> Stream([1, 2, 3]).map(lambda x: x * 3).collect()
880 (3, 6, 9)
881 >>> Stream([1, 2, 3]).map(operator.mul, 3).collect()
882 (3, 6, 9)
883 """
884 return self._finish(
885 Stream(map_with_additional_args(self._data, fun, args))
886 )
888 @typing.overload
889 def max(self: Stream[SC]) -> SC: ... # noqa: D102
891 @typing.overload
892 def max( # noqa: D102
893 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE
894 ) -> SC | K: ...
896 @typing.overload
897 def max( # noqa: D102
898 self,
899 default: K | _DefaultValueType = _DEFAULT_VALUE,
900 *,
901 key: Callable[[T], SC],
902 ) -> T | K: ...
904 def max(
905 self,
906 default: object = _DEFAULT_VALUE,
907 *,
908 key: Callable[[T], SC] | None = None,
909 ) -> object:
910 """Return the biggest element of the stream.
912 >>> Stream([3, 2, 1]).max()
913 3
914 >>> Stream(["a", "b", "c"]).max()
915 'c'
916 >>> Stream(["abc", "de", "f"]).max(key=len)
917 'abc'
918 >>> Stream([]).max(default=0)
919 0
920 >>> Stream([]).max()
921 Traceback (most recent call last):
922 ...
923 typed_stream.exceptions.StreamEmptyError
924 """
925 max_ = max(self._data, default=default, key=key) # type: ignore[type-var,arg-type]
926 if isinstance(max_, _DefaultValueType):
927 raise exceptions.StreamEmptyError() from None
928 return self._finish(max_, close_source=True)
930 @typing.overload
931 def min(self: Stream[SC]) -> SC: ... # noqa: D102
933 @typing.overload
934 def min( # noqa: D102
935 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE
936 ) -> SC | K: ...
938 @typing.overload
939 def min( # noqa: D102
940 self,
941 default: K | _DefaultValueType = _DEFAULT_VALUE,
942 *,
943 key: Callable[[T], SC],
944 ) -> T | K: ...
946 def min(
947 self,
948 default: object = _DEFAULT_VALUE,
949 *,
950 key: Callable[[T], SC] | None = None,
951 ) -> object:
952 """Return the smallest element of the stream.
954 >>> Stream([1, 2, 3]).min()
955 1
956 >>> Stream(["a", "b", "c"]).min()
957 'a'
958 >>> Stream(["abc", "de", "f"]).min(key=len)
959 'f'
960 >>> Stream([]).min(default=0)
961 0
962 >>> Stream([]).min()
963 Traceback (most recent call last):
964 ...
965 typed_stream.exceptions.StreamEmptyError
966 """
967 min_ = min(self._data, default=default, key=key) # type: ignore[type-var,arg-type]
968 if isinstance(min_, _DefaultValueType):
969 raise exceptions.StreamEmptyError() from None
970 return self._finish(min_, close_source=True)
972 @typing.overload
973 def nth(self, index: int, /) -> T: ... # noqa: D102
975 @typing.overload
976 def nth(self, index: int, /, default: T) -> T: ... # noqa: D102
978 @typing.overload
979 def nth(self, index: int, /, default: K) -> T | K: ... # noqa: D102
981 def nth( # noqa: C901
982 self,
983 index: int,
984 /,
985 default: K | _DefaultValueType = _DEFAULT_VALUE,
986 ) -> T | K:
987 """Return the nth item of the stream.
989 Raises StreamIndexError if no default value is given and the Stream
990 does not have an item at the given index.
992 Stream(...).nth(0) gets the first element of the stream.
994 >>> Stream([1, 2, 3]).nth(0)
995 1
996 >>> Stream("abc").nth(1)
997 'b'
998 >>> Stream([]).nth(22)
999 Traceback (most recent call last):
1000 ...
1001 typed_stream.exceptions.StreamIndexError
1002 >>> Stream([]).nth(22, default=42)
1003 42
1004 """
1005 value: T | _DefaultValueType
1006 if index < 0:
1007 tail = self.tail(abs(index))
1008 value = tail[0] if len(tail) == abs(index) else _DEFAULT_VALUE
1009 else: # value >= 0
1010 try:
1011 value = self.drop(index).first()
1012 except exceptions.StreamEmptyError:
1013 value = _DEFAULT_VALUE
1015 if not isinstance(value, _DefaultValueType):
1016 return value
1018 if isinstance(default, _DefaultValueType):
1019 raise exceptions.StreamIndexError()
1021 return default
1023 @typing.overload
1024 def nwise( # noqa: D102
1025 self, size: typing.Literal[1], / # noqa: W504
1026 ) -> Stream[tuple[T]]: ...
1028 @typing.overload
1029 def nwise( # noqa: D102
1030 self, size: typing.Literal[2], / # noqa: W504
1031 ) -> Stream[tuple[T, T]]: ... # noqa: D102
1033 @typing.overload
1034 def nwise( # noqa: D102
1035 self, size: int, / # noqa: W504
1036 ) -> Stream[tuple[T, ...]]: ...
1038 def nwise(
1039 self, size: int, / # noqa: W504
1040 ) -> Stream[tuple[T, ...]] | Stream[tuple[T, T]] | Stream[tuple[T]]:
1041 """Return a Stream of overlapping n-lets.
1043 This is often called a sliding window.
1044 For n=2 it behaves like pairwise from itertools.
1046 The returned Stream will consist of tuples of length n.
1047 If n is bigger than the count of values in self, it will be empty.
1049 >>> Stream([1, 2, 3]).nwise(1).collect()
1050 ((1,), (2,), (3,))
1051 >>> Stream([1, 2, 3]).nwise(2).collect()
1052 ((1, 2), (2, 3))
1053 >>> Stream([1, 2, 3, 4]).nwise(3).collect()
1054 ((1, 2, 3), (2, 3, 4))
1055 >>> Stream([1, 2, 3, 4, 5]).nwise(4).collect()
1056 ((1, 2, 3, 4), (2, 3, 4, 5))
1057 """
1058 return self._finish(
1059 Stream(_iteration_utils.sliding_window(self._data, size))
1060 )
1062 @override
1063 def peek(self, fun: Callable[[T], object], /) -> Self:
1064 """Peek at every value, without modifying the values in the Stream.
1066 >>> stream = Stream([1, 2, 3]).peek(print)
1067 >>> stream.map(str).collect()
1068 1
1069 2
1070 3
1071 ('1', '2', '3')
1072 """
1073 self._data = map(_iteration_utils.Peeker(fun), self._data)
1074 return self
1076 def reduce(
1077 self,
1078 fun: Callable[[T, T], T],
1079 initial: T | _DefaultValueType = _DEFAULT_VALUE,
1080 ) -> T:
1081 """Reduce the values of this stream. This finishes the Stream.
1083 If no initial value is provided a StreamEmptyError is raised if
1084 the stream is empty.
1086 >>> Stream([1, 2, 3]).reduce(operator.add)
1087 6
1088 >>> Stream([1, 2, 3]).reduce(operator.mul)
1089 6
1090 >>> Stream([]).reduce(operator.mul)
1091 Traceback (most recent call last):
1092 ...
1093 typed_stream.exceptions.StreamEmptyError
1094 """
1095 iterator = iter(self._data)
1096 if isinstance(initial, _DefaultValueType):
1097 try:
1098 initial = next(iterator)
1099 except StopIteration:
1100 raise exceptions.StreamEmptyError() from None
1101 return self._finish(functools.reduce(fun, iterator, initial), True)
1103 def starcollect(self, fun: _types.StarCallable[T, K]) -> K:
1104 """Collect the values of this Stream. This finishes the Stream.
1106 >>> Stream([1, 2, 3]).starcollect(lambda *args: args)
1107 (1, 2, 3)
1108 >>> Stream([]).starcollect(lambda *args: args)
1109 ()
1110 """
1111 return self._finish(fun(*self._data), close_source=True)
1113 @typing.overload
1114 def starmap( # noqa: D102
1115 self: Stream[_utils.IndexValueTuple[K]],
1116 fun: Callable[[int, K], U],
1117 /,
1118 ) -> Stream[U]: ...
1120 @typing.overload
1121 def starmap( # noqa: D102
1122 self: Stream[tuple[Unpack[Tvt]]],
1123 fun: Callable[[Unpack[Tvt]], U],
1124 /,
1125 ) -> Stream[U]: ...
1127 def starmap( # noqa: D102
1128 self: Stream[tuple[Unpack[Tvt]]],
1129 fun: Callable[[Unpack[Tvt]], U],
1130 /,
1131 ) -> Stream[U]:
1132 """Map each value to another.
1134 This lazily finishes the current Stream and creates a new one.
1136 >>> Stream([(1, 2), (3, 4)]).starmap(operator.mul).collect()
1137 (2, 12)
1138 >>> Stream([(2, "x"), (3, "y")]).starmap(operator.mul).collect()
1139 ('xx', 'yyy')
1140 """
1141 return self._finish(Stream(itertools.starmap(fun, self._data)))
1143 def sum(self: Stream[SA]) -> SA:
1144 """Calculate the sum of the elements.
1146 This works for every type that supports addition.
1148 For numbers stream.collect(sum) could be faster.
1149 For strings stream.collect("".join) could be faster.
1150 For lists stream.flat_map(lambda _: _).collect(list) could be faster.
1151 For tuples stream.flat_map(lambda _: _).collect(tuple) could be faster.
1153 >>> Stream([1, 2, 3]).sum()
1154 6
1155 >>> Stream(["a", "b", "c"]).sum()
1156 'abc'
1157 >>> Stream([(1, 2), (3, 4)]).sum()
1158 (1, 2, 3, 4)
1159 >>> Stream(([1], [2], [3])).sum()
1160 [1, 2, 3]
1161 """
1162 return self.reduce(add)
1164 def tail(self, c: int, /) -> streamable.StreamableSequence[T]:
1165 """Return a Sequence with the last count items.
1167 >>> Stream([1, 2, 3]).tail(2)
1168 (2, 3)
1169 >>> Stream.range(100).tail(10)
1170 (90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
1171 """
1172 return self._finish(
1173 streamable.StreamableSequence(
1174 collections.deque(self._data, maxlen=c)
1175 ),
1176 close_source=True,
1177 )
1179 def take_while(self, fun: Callable[[T], object]) -> Self:
1180 """Take values as long the function returns a truthy value.
1182 See: https://docs.python.org/3/library/itertools.html#itertools.takewhile
1184 >>> Stream([1, 2, 3, 4, 1]).take_while(lambda x: x < 4).collect()
1185 (1, 2, 3)
1186 >>> Stream([1, 2, 3, -4, -1]).take_while(lambda x: x <= 0).collect()
1187 ()
1188 """
1189 self._data = itertools.takewhile(fun, self._data)
1190 return self