Coverage for typed_stream/_impl/stream_abc.py: 98%
60 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-03-31 18:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-03-31 18:16 +0000
1# Licensed under the EUPL-1.2 or later.
2# You may obtain a copy of the licence in all the official languages of the
3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
5"""ABC Stream classes for easier handling of iterables."""
7from __future__ import annotations
9import abc
10from collections.abc import AsyncIterable, Callable, Iterable
11from types import EllipsisType
12from typing import Generic, TypeVar
14from ..exceptions import StreamFinishedError
15from ._types import Closeable, PrettyRepr
16from ._typing import Self, override
18__all__ = ("StreamABC",)
20T = TypeVar("T", bound=object)
21V = TypeVar("V")
24class StreamABC(Generic[T], Closeable, PrettyRepr, abc.ABC):
25 """ABC for Streams."""
27 __data: AsyncIterable[T] | Iterable[T] | None
28 _close_source_callable: None | Callable[[], None]
30 __slots__ = ("__data", "_close_source_callable")
32 __hash__ = None # type: ignore[assignment]
34 def __init__(
35 self,
36 data: AsyncIterable[T] | Iterable[T] | EllipsisType,
37 close_source_callable: Callable[[], None] | None = None,
38 ) -> None:
39 """Initialize self."""
40 self.__data = None if isinstance(data, EllipsisType) else data
41 self._close_source_callable = close_source_callable
43 @property
44 def _data(self) -> AsyncIterable[T] | Iterable[T]:
45 """Return the internal iterator.
47 >>> from typed_stream import Stream
48 >>> iterator = iter([1, 2, 3])
49 >>> stream = Stream(iterator)
50 >>> stream._data == iterator
51 True
52 >>> stream.close()
53 >>> stream._data
54 Traceback (most recent call last):
55 ...
56 typed_stream.exceptions.StreamFinishedError: Stream is finished.
57 >>> Stream(...)._data
58 Traceback (most recent call last):
59 ...
60 typed_stream.exceptions.StreamFinishedError: Stream is finished.
61 """
62 if self.__data is None:
63 raise StreamFinishedError("Stream is finished.")
64 return self.__data
66 @_data.setter
67 def _data(self, value: AsyncIterable[T] | Iterable[T]) -> None:
68 """Set the internal iterator."""
69 self.__data = value
71 def _finish(self, ret: V, close_source: bool = False) -> V:
72 """Mark this Stream as finished."""
73 if self._close_source_callable:
74 if close_source:
75 self._close_source_callable()
76 elif isinstance(ret, StreamABC):
77 # pylint: disable=protected-access
78 ret._close_source_callable = self._close_source_callable
79 self._close_source_callable = None
80 self.__data = None
81 return ret
83 @override
84 def _get_args(self) -> tuple[object, ...]:
85 """Return the args used to initializing self."""
86 data: object = self.__data or ...
87 if self._close_source_callable is None:
88 return (data,)
89 return data, self._close_source_callable
91 @override
92 def close(self) -> None:
93 """Close this stream cleanly.
95 >>> from typed_stream import Stream
96 >>> stream = Stream([1, 2, 3], lambda: print("abc"))
97 >>> stream.close()
98 abc
99 >>> stream.close()
100 """
101 self._finish(None, close_source=True)
103 def distinct(self, *, use_set: bool = True) -> Self:
104 """Remove duplicate values.
106 >>> from typed_stream import Stream
107 >>> StreamABC.distinct(Stream([1, 2, 2, 2, 3, 2, 2])).collect()
108 (1, 2, 3)
109 >>> StreamABC.distinct(Stream([{1}, {2}, {3}, {2}, {2}])).collect()
110 Traceback (most recent call last):
111 ...
112 TypeError: unhashable type: 'set'
113 >>> StreamABC.distinct(Stream([{1}, {2}, {1}]), use_set=False).collect()
114 ({1}, {2})
115 """
116 encountered: set[T] | list[T]
117 peek_fun: Callable[[T], None]
118 if use_set:
119 encountered = set()
120 peek_fun = encountered.add
121 else:
122 encountered = []
123 peek_fun = encountered.append
124 # pytype: disable=attribute-error
125 return self.exclude(encountered.__contains__).peek(peek_fun)
126 # pytype: enable=attribute-error
128 @abc.abstractmethod
129 def limit(self, count: int, /) -> Self:
130 """Stop the Stream after count values.
132 Example:
133 - Stream([1, 2, 3, 4, 5]).limit(3)
134 """
136 @abc.abstractmethod
137 def drop(self, count: int, /) -> Self:
138 """Drop the first count values."""
140 @abc.abstractmethod
141 def peek(self, fun: Callable[[T], object], /) -> Self:
142 """Peek at every value, without modifying the values in the Stream.
144 Example:
145 - Stream([1, 2, 3]).peek(print)
146 """
148 @abc.abstractmethod
149 def exclude(self, fun: Callable[[T], object], /) -> Self:
150 """Exclude values from the Stream if fun returns a truthy value."""