Coverage for typed_stream/_impl/stream_abc.py: 98%

60 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-03-31 18:16 +0000

1# Licensed under the EUPL-1.2 or later. 

2# You may obtain a copy of the licence in all the official languages of the 

3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 

4 

5"""ABC Stream classes for easier handling of iterables.""" 

6 

7from __future__ import annotations 

8 

9import abc 

10from collections.abc import AsyncIterable, Callable, Iterable 

11from types import EllipsisType 

12from typing import Generic, TypeVar 

13 

14from ..exceptions import StreamFinishedError 

15from ._types import Closeable, PrettyRepr 

16from ._typing import Self, override 

17 

18__all__ = ("StreamABC",) 

19 

20T = TypeVar("T", bound=object) 

21V = TypeVar("V") 

22 

23 

24class StreamABC(Generic[T], Closeable, PrettyRepr, abc.ABC): 

25 """ABC for Streams.""" 

26 

27 __data: AsyncIterable[T] | Iterable[T] | None 

28 _close_source_callable: None | Callable[[], None] 

29 

30 __slots__ = ("__data", "_close_source_callable") 

31 

32 __hash__ = None # type: ignore[assignment] 

33 

34 def __init__( 

35 self, 

36 data: AsyncIterable[T] | Iterable[T] | EllipsisType, 

37 close_source_callable: Callable[[], None] | None = None, 

38 ) -> None: 

39 """Initialize self.""" 

40 self.__data = None if isinstance(data, EllipsisType) else data 

41 self._close_source_callable = close_source_callable 

42 

43 @property 

44 def _data(self) -> AsyncIterable[T] | Iterable[T]: 

45 """Return the internal iterator. 

46 

47 >>> from typed_stream import Stream 

48 >>> iterator = iter([1, 2, 3]) 

49 >>> stream = Stream(iterator) 

50 >>> stream._data == iterator 

51 True 

52 >>> stream.close() 

53 >>> stream._data 

54 Traceback (most recent call last): 

55 ... 

56 typed_stream.exceptions.StreamFinishedError: Stream is finished. 

57 >>> Stream(...)._data 

58 Traceback (most recent call last): 

59 ... 

60 typed_stream.exceptions.StreamFinishedError: Stream is finished. 

61 """ 

62 if self.__data is None: 

63 raise StreamFinishedError("Stream is finished.") 

64 return self.__data 

65 

66 @_data.setter 

67 def _data(self, value: AsyncIterable[T] | Iterable[T]) -> None: 

68 """Set the internal iterator.""" 

69 self.__data = value 

70 

71 def _finish(self, ret: V, close_source: bool = False) -> V: 

72 """Mark this Stream as finished.""" 

73 if self._close_source_callable: 

74 if close_source: 

75 self._close_source_callable() 

76 elif isinstance(ret, StreamABC): 

77 # pylint: disable=protected-access 

78 ret._close_source_callable = self._close_source_callable 

79 self._close_source_callable = None 

80 self.__data = None 

81 return ret 

82 

83 @override 

84 def _get_args(self) -> tuple[object, ...]: 

85 """Return the args used to initializing self.""" 

86 data: object = self.__data or ... 

87 if self._close_source_callable is None: 

88 return (data,) 

89 return data, self._close_source_callable 

90 

91 @override 

92 def close(self) -> None: 

93 """Close this stream cleanly. 

94 

95 >>> from typed_stream import Stream 

96 >>> stream = Stream([1, 2, 3], lambda: print("abc")) 

97 >>> stream.close() 

98 abc 

99 >>> stream.close() 

100 """ 

101 self._finish(None, close_source=True) 

102 

103 def distinct(self, *, use_set: bool = True) -> Self: 

104 """Remove duplicate values. 

105 

106 >>> from typed_stream import Stream 

107 >>> StreamABC.distinct(Stream([1, 2, 2, 2, 3, 2, 2])).collect() 

108 (1, 2, 3) 

109 >>> StreamABC.distinct(Stream([{1}, {2}, {3}, {2}, {2}])).collect() 

110 Traceback (most recent call last): 

111 ... 

112 TypeError: unhashable type: 'set' 

113 >>> StreamABC.distinct(Stream([{1}, {2}, {1}]), use_set=False).collect() 

114 ({1}, {2}) 

115 """ 

116 encountered: set[T] | list[T] 

117 peek_fun: Callable[[T], None] 

118 if use_set: 

119 encountered = set() 

120 peek_fun = encountered.add 

121 else: 

122 encountered = [] 

123 peek_fun = encountered.append 

124 # pytype: disable=attribute-error 

125 return self.exclude(encountered.__contains__).peek(peek_fun) 

126 # pytype: enable=attribute-error 

127 

128 @abc.abstractmethod 

129 def limit(self, count: int, /) -> Self: 

130 """Stop the Stream after count values. 

131 

132 Example: 

133 - Stream([1, 2, 3, 4, 5]).limit(3) 

134 """ 

135 

136 @abc.abstractmethod 

137 def drop(self, count: int, /) -> Self: 

138 """Drop the first count values.""" 

139 

140 @abc.abstractmethod 

141 def peek(self, fun: Callable[[T], object], /) -> Self: 

142 """Peek at every value, without modifying the values in the Stream. 

143 

144 Example: 

145 - Stream([1, 2, 3]).peek(print) 

146 """ 

147 

148 @abc.abstractmethod 

149 def exclude(self, fun: Callable[[T], object], /) -> Self: 

150 """Exclude values from the Stream if fun returns a truthy value."""