Coverage for typed_stream / _impl / stream.py: 99%

221 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-17 08:56 +0000

1# Licensed under the EUPL-1.2 or later. 

2# You may obtain a copy of the licence in all the official languages of the 

3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 

4 

5"""Typed Stream class for easier handling of iterables.""" 

6 

7from __future__ import annotations 

8 

9import collections 

10import concurrent.futures 

11import functools 

12import itertools 

13import operator 

14import sys 

15import typing 

16from collections.abc import Callable, Iterable, Iterator, Mapping 

17from numbers import Number, Real 

18from types import EllipsisType 

19 

20from .. import exceptions, streamable 

21from . import _iteration_utils, functions, stream_abc 

22from ._default_value import DEFAULT_VALUE as _DEFAULT_VALUE 

23from ._default_value import DefaultValueType as _DefaultValueType 

24from ._typing import Self, TypeVarTuple, Unpack, override 

25from ._utils import map_with_additional_args 

26 

27# pylint: disable=too-many-lines 

28__all__: tuple[typing.Literal["Stream"]] = ("Stream",) 

29 

30if typing.TYPE_CHECKING: 

31 from . import _types, _utils 

32 

33K = typing.TypeVar("K") 

34T = typing.TypeVar("T") 

35U = typing.TypeVar("U") 

36V = typing.TypeVar("V") 

37Prim = typing.TypeVar("Prim", int, str, bool, complex, Number, Real) 

38Exc = typing.TypeVar("Exc", bound=BaseException) 

39 

40SA = typing.TypeVar("SA", bound="_types.SupportsAdd") 

41SC = typing.TypeVar("SC", bound="_types.SupportsComparison") 

42 

43Tvt = TypeVarTuple("Tvt") 

44 

45add: Callable[[SA, SA], SA] = operator.add 

46 

47 

48class Stream(stream_abc.StreamABC[T], Iterable[T]): 

49 """Typed Stream class for easier handling of iterables. 

50 

51 It is not recommended to store Stream instances in variables, 

52 instead use method chaining to handle the values and collect them when finished. 

53 """ 

54 

55 __slots__ = () 

56 

57 _data: Iterable[T] 

58 

59 def __init__( 

60 self, 

61 data: Iterable[T] | EllipsisType, 

62 close_source_callable: Callable[[], None] | None = None, 

63 ) -> None: 

64 """Create a new Stream. 

65 

66 To create a finished Stream do Stream(...). 

67 """ 

68 super().__init__( 

69 ... if isinstance(data, EllipsisType) else data, 

70 close_source_callable, 

71 ) 

72 

73 def __contains__(self, value: T, /) -> bool: 

74 """Check whether this stream contains the given value. 

75 

76 >>> 2 in Stream((1, 2, 3)) 

77 True 

78 >>> 4 in Stream((1, 2, 3)) 

79 False 

80 >>> 3 in Stream((1, 2, 3, 4, 5, 6, 7, 8)).peek(print) 

81 1 

82 2 

83 3 

84 True 

85 """ 

86 for element in self._data: 

87 if element == value: 

88 return self._finish(True, close_source=True) 

89 return self._finish(False, close_source=True) 

90 

91 @typing.overload 

92 def __getitem__(self, item: int, /) -> T: 

93 """Nobody inspects the spammish repetition.""" 

94 

95 @typing.overload 

96 def __getitem__( 

97 self, item: slice[int | None, int | None, int | None], / # noqa: W504 

98 ) -> streamable.StreamableSequence[T]: 

99 """Nobody inspects the spammish repetition.""" 

100 

101 def __getitem__( 

102 self, 

103 item: slice[int | None, int | None, int | None] | int, 

104 /, # noqa: W504 

105 ) -> streamable.StreamableSequence[T] | T: 

106 """Finish the stream by collecting. 

107 

108 >>> Stream((1, 2, 3))[1] 

109 2 

110 >>> Stream((1, 2, 3))[1:3] 

111 (2, 3) 

112 """ 

113 if isinstance(item, int): 

114 return self.nth(item) 

115 return self._get_slice(start=item.start, stop=item.stop, step=item.step) 

116 

117 @override 

118 def __iter__(self) -> Iterator[T]: 

119 """Iterate over the values of this Stream. This finishes the Stream. 

120 

121 >>> for value in Stream((1, 2, 3)): 

122 ... print(value) 

123 1 

124 2 

125 3 

126 """ 

127 return _iteration_utils.IterWithCleanUp(self._data, self.close) 

128 

129 def __length_hint__(self) -> int: 

130 """Return an estimated length for this Stream. 

131 

132 >>> from operator import length_hint 

133 >>> length_hint(Stream([1, 2, 3])) 

134 3 

135 >>> length_hint(Stream.range(100)) 

136 100 

137 """ 

138 return operator.length_hint(self._data) 

139 

140 def __reversed__(self) -> Iterator[T]: 

141 """Return the items of this Stream in reversed order. 

142 

143 This finishes the Stream and collects all the element. 

144 

145 Equivalent to reversed(self.collect()). 

146 

147 >>> tuple(reversed(Stream((1, 2, 3)))) 

148 (3, 2, 1) 

149 >>> "".join(reversed(Stream("abc"))) 

150 'cba' 

151 """ 

152 return reversed(self.collect()) 

153 

154 def _get_slice( # noqa: C901 

155 self, 

156 *, 

157 start: int | None = None, 

158 stop: int | None = None, 

159 step: int | None = None, 

160 ) -> streamable.StreamableSequence[T]: 

161 """Implement __getitem__ with slices.""" 

162 if start is stop is step is None: 

163 return self.collect() 

164 if ( # pylint: disable=too-many-boolean-expressions 

165 (start is None or start >= 0) 

166 and (step is None or step >= 0) 

167 and (stop is None or stop >= 0) 

168 ): 

169 return self._finish( 

170 streamable.StreamableSequence( 

171 itertools.islice(self._data, start, stop, step) 

172 ), 

173 close_source=True, 

174 ) 

175 if ( 

176 start is not None 

177 and start < 0 

178 and step in {None, 1} 

179 and stop is None 

180 ): 

181 return self.tail(abs(start)) 

182 return self.collect()[start:stop:step] 

183 

184 @classmethod 

185 @override 

186 def _module(cls) -> str: 

187 if cls == Stream: 

188 return "typed_stream" 

189 return cls.__module__ 

190 

191 @staticmethod 

192 def counting(start: int = 0, step: int = 1) -> Stream[int]: 

193 """Create an endless counting Stream. 

194 

195 >>> Stream.counting().limit(5).collect() 

196 (0, 1, 2, 3, 4) 

197 >>> Stream.counting(5, 2).limit(5).collect() 

198 (5, 7, 9, 11, 13) 

199 """ 

200 return Stream(itertools.count(start, step)) 

201 

202 @staticmethod 

203 def from_value(value: K) -> Stream[K]: 

204 """Create an endless Stream of the same value. 

205 

206 >>> Stream.from_value(1).limit(5).collect() 

207 (1, 1, 1, 1, 1) 

208 """ 

209 return Stream(itertools.repeat(value)) 

210 

211 @typing.overload 

212 @staticmethod 

213 def range(stop: int, /) -> Stream[int]: ... # noqa: D102 

214 

215 @typing.overload 

216 @staticmethod 

217 def range(*, stop: int) -> Stream[int]: ... # noqa: D102 

218 

219 @typing.overload 

220 @staticmethod 

221 def range(*, start: int, stop: int) -> Stream[int]: ... # noqa: D102 

222 

223 @typing.overload 

224 @staticmethod 

225 def range(start: int, stop: int, /) -> Stream[int]: ... # noqa: D102 

226 

227 @typing.overload 

228 @staticmethod 

229 def range(start: int, /, *, stop: int) -> Stream[int]: ... # noqa: D102 

230 

231 @typing.overload 

232 @staticmethod 

233 def range( # noqa: D102 

234 start: int, stop: int, /, *, step: int 

235 ) -> Stream[int]: ... 

236 

237 @typing.overload 

238 @staticmethod 

239 def range( # noqa: D102 

240 start: int, stop: int, step: int, / # noqa: W504 

241 ) -> Stream[int]: ... 

242 

243 @typing.overload 

244 @staticmethod 

245 def range( # noqa: D102 

246 start: int, /, *, stop: int, step: int 

247 ) -> Stream[int]: ... 

248 

249 @typing.overload 

250 @staticmethod 

251 def range( # noqa: D102 

252 *, start: int, stop: int, step: int 

253 ) -> Stream[int]: ... 

254 

255 @staticmethod 

256 def range( # noqa: C901 

257 *args: int, 

258 start: int | _DefaultValueType = _DEFAULT_VALUE, 

259 stop: int | _DefaultValueType = _DEFAULT_VALUE, 

260 step: int | _DefaultValueType = _DEFAULT_VALUE, 

261 ) -> Stream[int]: 

262 """Create a Stream[int] from a range. 

263 

264 The arguments behave like to the built-in range function: 

265 - Stream.range(stop) -> Stream[int] 

266 - Stream.range(start, stop[, step]) -> Stream[int] 

267 

268 >>> Stream.range(5).collect() == Stream(range(5)).collect() 

269 True 

270 >>> Stream.range(1, 13).collect() == Stream(range(1, 13)).collect() 

271 True 

272 >>> Stream.range(1, 9, 2).collect() == Stream(range(1, 9, 2)).collect() 

273 True 

274 >>> Stream.range(start=1, stop=7, step=2).collect() 

275 (1, 3, 5) 

276 """ 

277 # pylint: disable=confusing-consecutive-elif 

278 if not isinstance(start, _DefaultValueType): 

279 if not args and not isinstance(stop, _DefaultValueType): 

280 return Stream( 

281 range(start, stop) 

282 if isinstance(step, _DefaultValueType) 

283 else range(start, stop, step) 

284 ) 

285 elif isinstance(stop, _DefaultValueType): 

286 if isinstance(step, _DefaultValueType): 

287 return Stream(range(*args)) # no kwarg given 

288 if len(args) == 2: 

289 return Stream(range(args[0], args[1], step)) 

290 elif isinstance(step, _DefaultValueType): 

291 if len(args) == 1: 

292 return Stream(range(args[0], stop)) 

293 if not args: 

294 return Stream(range(stop)) 

295 elif len(args) == 1: 

296 return Stream(range(args[0], stop, step)) 

297 raise TypeError("Unexpected arguments to Stream.range()") 

298 

299 def all(self) -> bool: 

300 """Check whether all values are Truthy. This finishes the Stream. 

301 

302 Returns False if there is any false value in the Stream. 

303 

304 >>> Stream([1, 2, 3]).peek(print).all() 

305 1 

306 2 

307 3 

308 True 

309 >>> Stream([1, 2, 0, 4, 5, 6, 7, 8]).peek(print).all() 

310 1 

311 2 

312 0 

313 False 

314 >>> Stream([]).all() 

315 True 

316 """ 

317 return self._finish(all(self._data), close_source=True) 

318 

319 @typing.overload 

320 def catch( # noqa: D102 

321 self, 

322 *exception_class: type[Exc], 

323 ) -> Self: ... 

324 

325 @typing.overload 

326 def catch( # noqa: D102 

327 self, 

328 *exception_class: type[Exc], 

329 handler: Callable[[Exc], object], 

330 ) -> Self: ... 

331 

332 @typing.overload 

333 def catch( # noqa: D102 

334 self, 

335 *exception_class: type[Exc], 

336 default: Callable[[Exc], K] | Callable[[], K], 

337 ) -> Stream[T | K]: ... 

338 

339 @typing.overload 

340 def catch( # noqa: D102 

341 self, 

342 *exception_class: type[Exc], 

343 handler: Callable[[Exc], object], 

344 default: Callable[[Exc], K] | Callable[[], K], 

345 ) -> Stream[T | K]: ... 

346 

347 def catch( 

348 self, 

349 *exception_class: type[Exc], 

350 handler: Callable[[Exc], object] | None = None, 

351 default: Callable[[Exc], K] | Callable[[], K] | None = None, 

352 ) -> Stream[T | K]: 

353 """Catch exceptions. 

354 

355 >>> Stream("1a2").map(int).catch(ValueError, handler=print).collect() 

356 invalid literal for int() with base 10: 'a' 

357 (1, 2) 

358 >>> Stream("1a2").map(int).catch(ValueError, default=lambda _:_).collect() 

359 (1, ValueError("invalid literal for int() with base 10: 'a'"), 2) 

360 >>> Stream("1a2").map(int).peek(print) \ 

361 .catch(ValueError, handler=print).collect() 

362 1 

363 invalid literal for int() with base 10: 'a' 

364 2 

365 (1, 2) 

366 """ 

367 return self._finish( 

368 Stream( 

369 _iteration_utils.ExceptionHandler( 

370 self._data, exception_class, handler, default 

371 ) 

372 ) 

373 ) 

374 

375 def chain(self, *iterables: Iterable[T]) -> Self: 

376 """Add another iterable to the end of the Stream. 

377 

378 >>> Stream([1, 2, 3]).chain([4, 5, 6]).collect() 

379 (1, 2, 3, 4, 5, 6) 

380 >>> Stream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect() 

381 (1, 2, 3, 4, 5, 6, 7, 8, 9) 

382 >>> Stream("abc").chain("def", "ghi", "jkl").collect("".join) 

383 'abcdefghijkl' 

384 """ 

385 self._data = itertools.chain(self._data, *iterables) 

386 return self 

387 

388 if sys.version_info >= (3, 12) and hasattr(itertools, "batched"): 

389 

390 def chunk(self, size: int) -> Stream[tuple[T, ...]]: 

391 """Split the stream into chunks of the specified size. 

392 

393 The last chunk may be shorter. 

394 

395 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect() 

396 ((1, 2), (3, 4), (5, 6)) 

397 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect() 

398 ((1, 2, 3), (4, 5, 6)) 

399 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect() 

400 ((1, 2, 3), (4, 5, 6), (7,)) 

401 """ 

402 return self._finish( 

403 Stream( 

404 # add strict=False if min version is 3.13 (is the default) 

405 itertools.batched(self._data, size), # noqa: B911 

406 ) 

407 ) 

408 

409 else: # pragma: no cover 

410 

411 def chunk(self, size: int) -> Stream[tuple[T, ...]]: 

412 """Split the stream into chunks of the specified size. 

413 

414 The last chunk may be shorter. 

415 

416 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect() 

417 ((1, 2), (3, 4), (5, 6)) 

418 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect() 

419 ((1, 2, 3), (4, 5, 6)) 

420 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect() 

421 ((1, 2, 3), (4, 5, 6), (7,)) 

422 """ 

423 return self._finish( 

424 _iteration_utils.Chunked(self._data, size).stream() 

425 ) 

426 

427 @typing.overload 

428 def collect(self, /) -> streamable.StreamableSequence[T]: ... # noqa: D102 

429 

430 @typing.overload 

431 def collect( # noqa: D102 

432 self, 

433 fun: Callable[[Iterable[T]], streamable.StreamableSequence[T]], 

434 /, 

435 ) -> streamable.StreamableSequence[T]: ... 

436 

437 @typing.overload 

438 def collect( # noqa: D102 

439 self, 

440 fun: type[collections.Counter[T]], 

441 /, 

442 ) -> collections.Counter[T]: ... 

443 

444 @typing.overload 

445 def collect( # noqa: D102 

446 self, fun: Callable[[Iterable[T]], tuple[T, ...]], / # noqa: W504 

447 ) -> tuple[T, ...]: ... 

448 

449 @typing.overload 

450 def collect( # noqa: D102 

451 self, fun: Callable[[Iterable[T]], list[T]], / # noqa: W504 

452 ) -> list[T]: ... 

453 

454 @typing.overload 

455 def collect( # noqa: D102 

456 self: Stream[SA], fun: Callable[[Iterable[SA]], SA], / # noqa: W504 

457 ) -> SA: ... 

458 

459 @typing.overload 

460 def collect( # noqa: D102 

461 self, fun: Callable[[Iterable[T]], set[T]], / # noqa: W504 

462 ) -> set[T]: ... 

463 

464 @typing.overload 

465 def collect( # noqa: D102 

466 self, fun: Callable[[Iterable[T]], frozenset[T]], / # noqa: W504 

467 ) -> frozenset[T]: ... 

468 

469 @typing.overload 

470 def collect( # noqa: D102 

471 self: Stream[tuple[K, V]], 

472 fun: Callable[[Iterable[tuple[K, V]]], dict[K, V]], 

473 /, 

474 ) -> dict[K, V]: ... 

475 

476 @typing.overload 

477 def collect( # noqa: D102 

478 self: Stream[tuple[K, V]], 

479 fun: Callable[[Iterable[tuple[K, V]]], Mapping[K, V]], 

480 /, 

481 ) -> Mapping[K, V]: ... 

482 

483 @typing.overload 

484 def collect( # noqa: D102 

485 self, fun: Callable[[Iterable[T]], K], / # noqa: W504 

486 ) -> K: ... 

487 

488 def collect( 

489 self: Stream[U], 

490 fun: Callable[[Iterable[U]], object] = streamable.StreamableSequence, 

491 /, 

492 ) -> object: 

493 """Collect the values of this Stream. This finishes the Stream. 

494 

495 >>> Stream([1, 2, 3]).collect(list) 

496 [1, 2, 3] 

497 >>> Stream([1, 2, 3]).collect(sum) 

498 6 

499 >>> Stream([1, 2, 3]).collect(dict.fromkeys) 

500 {1: None, 2: None, 3: None} 

501 >>> Stream([(1, 2), (3, 4)]).collect(dict) 

502 {1: 2, 3: 4} 

503 """ 

504 return self._finish(fun(self._data), close_source=True) 

505 

506 def concurrent_map( 

507 self, fun: Callable[[T], K], /, max_workers: int | None = None 

508 ) -> Stream[K]: 

509 """Map values concurrently. 

510 

511 See: https://docs.python.org/3/library/concurrent.futures.html 

512 

513 >>> Stream("123").concurrent_map(int).collect() 

514 (1, 2, 3) 

515 """ 

516 with concurrent.futures.ProcessPoolExecutor( 

517 max_workers=max_workers 

518 ) as executor: 

519 return self._finish(Stream(executor.map(fun, self._data))) 

520 

521 def conditional_map( 

522 self, 

523 condition: Callable[[T], object], 

524 if_true: Callable[[T], U], 

525 if_false: Callable[[T], V] | None = None, 

526 ) -> Stream[U | V]: 

527 """Map values conditionally. 

528 

529 >>> Stream("1x2x3x").conditional_map(str.isdigit, int).collect() 

530 (1, 2, 3) 

531 >>> Stream("1x2x3x").conditional_map(str.isdigit, int, ord).collect() 

532 (1, 120, 2, 120, 3, 120) 

533 """ 

534 return self._finish( 

535 Stream( 

536 _iteration_utils.IfElseMap( 

537 self._data, condition, if_true, if_false 

538 ) 

539 ) 

540 ) 

541 

542 def count(self) -> int: 

543 """Count the elements in this Stream. This finishes the Stream. 

544 

545 Equivalent to: Stream(...).map(lambda x: 1).sum() 

546 

547 >>> Stream([1, 2, 3]).count() 

548 3 

549 >>> Stream("abcdef").count() 

550 6 

551 """ 

552 return self._finish( 

553 _iteration_utils.count(self._data), close_source=True 

554 ) 

555 

556 def dedup(self, *, key: None | Callable[[T], object] = None) -> Self: 

557 """Remove consecutive equal values. 

558 

559 If the input is sorted this is the same as Stream.distinct(). 

560 

561 >>> Stream([1] * 100).dedup().collect(list) 

562 [1] 

563 >>> Stream([1, 2, 3, 1]).dedup().collect() 

564 (1, 2, 3, 1) 

565 >>> Stream([1, 1, 2, 2, 2, 2, 3, 1]).dedup().collect() 

566 (1, 2, 3, 1) 

567 >>> Stream([]).dedup().collect() 

568 () 

569 >>> Stream("ABC").dedup(key=str.lower).collect("".join) 

570 'ABC' 

571 >>> Stream("aAaAbbbcCCAaBbCc").dedup(key=str.lower).collect("".join) 

572 'abcABC' 

573 """ 

574 # Inspired by the unique_justseen itertools recipe 

575 # https://docs.python.org/3/library/itertools.html#itertools-recipes 

576 self._data = map( 

577 next, 

578 map( 

579 operator.itemgetter(1), 

580 itertools.groupby(self._data, key=key), 

581 ), 

582 ) 

583 return self 

584 

585 def dedup_counting(self) -> Stream[tuple[T, int]]: 

586 """Group the stream and count the items in the group. 

587 

588 >>> Stream("abba").dedup_counting().starmap(print).for_each() 

589 a 1 

590 b 2 

591 a 1 

592 >>> Stream("AaaaBBcccc").dedup_counting().starmap(print).for_each() 

593 A 1 

594 a 3 

595 B 2 

596 c 4 

597 """ 

598 

599 def _map(k: T, g: Iterator[T]) -> tuple[T, int]: 

600 return (k, _iteration_utils.count(g)) 

601 

602 return Stream(itertools.starmap(_map, itertools.groupby(self))) 

603 

604 @override 

605 def distinct(self, *, use_set: bool = True) -> Self: 

606 """Remove duplicate values. 

607 

608 >>> from typed_stream import Stream 

609 >>> Stream([1, 2, 2, 2, 3, 2, 2]).distinct().collect() 

610 (1, 2, 3) 

611 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct().collect() # doctest: +ELLIPSIS 

612 Traceback (most recent call last): 

613 ... 

614 TypeError: ...unhashable type: 'set'... 

615 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct(use_set=False).collect() 

616 ({1}, {2}, {3}) 

617 """ 

618 # pylint: disable=duplicate-code 

619 encountered: set[T] | list[T] 

620 peek_fun: Callable[[T], None] 

621 if use_set: 

622 encountered = set() 

623 peek_fun = encountered.add 

624 else: 

625 encountered = [] 

626 peek_fun = encountered.append 

627 

628 self._data = map( 

629 _iteration_utils.Peeker(peek_fun), 

630 itertools.filterfalse(encountered.__contains__, self._data), 

631 ) 

632 return self 

633 

634 @override 

635 def drop(self, c: int, /) -> Self: 

636 """Drop the first count values. 

637 

638 >>> Stream([1, 2, 3, 4, 5]).drop(2).collect() 

639 (3, 4, 5) 

640 """ 

641 self._data = itertools.islice(self._data, c, None) 

642 return self 

643 

644 def drop_while(self, fun: Callable[[T], object], /) -> Self: 

645 """Drop values as long as the function returns a truthy value. 

646 

647 See: https://docs.python.org/3/library/itertools.html#itertools.dropwhile 

648 

649 >>> Stream([1, 2, 3, 4, 1]).drop_while(lambda x: x < 3).collect() 

650 (3, 4, 1) 

651 """ 

652 self._data = itertools.dropwhile(fun, self._data) 

653 return self 

654 

655 def empty(self) -> bool: 

656 """Check whether this doesn't contain any value. This finishes the Stream. 

657 

658 >>> Stream([1, 2, 3]).empty() 

659 False 

660 >>> Stream([]).empty() 

661 True 

662 """ 

663 try: 

664 self.first() 

665 except exceptions.StreamEmptyError: 

666 return True 

667 return False 

668 

669 def enumerate( 

670 self, start_index: int = 0, / # noqa: W504 

671 ) -> Stream[_utils.IndexValueTuple[T]]: 

672 """Map the values to a tuple of index and value. 

673 

674 >>> Stream([1, 2, 3]).enumerate().collect() 

675 ((0, 1), (1, 2), (2, 3)) 

676 >>> Stream("abc").enumerate().collect() 

677 ((0, 'a'), (1, 'b'), (2, 'c')) 

678 >>> Stream("abc").enumerate(100).collect() 

679 ((100, 'a'), (101, 'b'), (102, 'c')) 

680 >>> Stream("abc").enumerate().map(lambda el: {el.idx: el.val}).collect() 

681 ({0: 'a'}, {1: 'b'}, {2: 'c'}) 

682 """ 

683 return self._finish( 

684 Stream(_iteration_utils.Enumerator(self._data, start_index)) 

685 ) 

686 

687 @typing.overload 

688 def exclude( # noqa: D102 

689 self: Stream[K | Prim], 

690 fun: _utils.InstanceChecker[Prim], 

691 /, # noqa: W504 

692 ) -> Stream[K]: ... 

693 

694 @typing.overload 

695 def exclude( # noqa: D102 

696 self: Stream[K | U], fun: _utils.InstanceChecker[U], / # noqa: W504 

697 ) -> Stream[K]: ... 

698 

699 @typing.overload 

700 def exclude( # noqa: D102 

701 self: Stream[K | None], fun: _utils.NoneChecker, / # noqa: W504 

702 ) -> Stream[K]: ... 

703 

704 # @typing.overload 

705 # def exclude( # noqa: D102 

706 # self: Stream[K | U], fun: TypeGuardingCallable[U, K | U] 

707 # ) -> Stream[K]: 

708 # ... 

709 

710 @typing.overload 

711 def exclude( # noqa: D102 

712 self: Stream[T], fun: Callable[[T], object], / # noqa: W504 

713 ) -> Stream[T]: ... 

714 

715 @override 

716 def exclude(self, fun: Callable[[T], object], /) -> object: 

717 """Exclude values if the function returns a truthy value. 

718 

719 See: https://docs.python.org/3/library/itertools.html#itertools.filterfalse 

720 

721 >>> Stream([1, 2, 3, 4, 5]).exclude(lambda x: x % 2).collect() 

722 (2, 4) 

723 >>> Stream([1, 2, None, 3]).exclude(lambda x: x is None).collect() 

724 (1, 2, 3) 

725 """ 

726 self._data = itertools.filterfalse(fun, self._data) 

727 return self 

728 

729 @typing.overload 

730 def filter( # noqa: D102 

731 self: Stream[K | None], fun: _utils.NotNoneChecker 

732 ) -> Stream[K]: ... 

733 

734 @typing.overload 

735 def filter(self: Stream[K | None]) -> Stream[K]: ... # noqa: D102 

736 

737 @typing.overload 

738 def filter(self: Stream[T]) -> Stream[T]: ... # noqa: D102 

739 

740 @typing.overload 

741 def filter( # noqa: D102 

742 self, fun: _utils.InstanceChecker[K] 

743 ) -> Stream[K]: # pragma: no cover 

744 ... 

745 

746 @typing.overload 

747 def filter( # noqa: D102 

748 self, fun: _types.TypeGuardingCallable[K, T] 

749 ) -> Stream[K]: # pragma: no cover 

750 ... 

751 

752 @typing.overload 

753 def filter( # noqa: D102 

754 self: Stream[T], fun: Callable[[T], object] 

755 ) -> Stream[T]: ... 

756 

757 def filter(self, fun: Callable[[T], object] | None = None) -> object: 

758 """Use built-in filter to filter values. 

759 

760 >>> Stream([1, 2, 3, 4, 5]).filter(lambda x: x % 2).collect() 

761 (1, 3, 5) 

762 """ 

763 self._data = filter(fun, self._data) 

764 return self 

765 

766 def first(self, default: K | _DefaultValueType = _DEFAULT_VALUE) -> T | K: 

767 """Return the first element of the Stream. This finishes the Stream. 

768 

769 >>> Stream([1, 2, 3]).first() 

770 1 

771 >>> Stream("abc").first() 

772 'a' 

773 >>> Stream([]).first() 

774 Traceback (most recent call last): 

775 ... 

776 typed_stream.exceptions.StreamEmptyError 

777 >>> Stream([]).first(default="default") 

778 'default' 

779 """ 

780 try: 

781 first = next(iter(self._data)) 

782 except StopIteration: 

783 if not isinstance(default, _DefaultValueType): 

784 return default 

785 raise exceptions.StreamEmptyError() from None 

786 finally: 

787 self._finish(None, close_source=True) 

788 return first 

789 

790 @typing.overload 

791 def flat_map( # noqa: D102 

792 self, fun: Callable[[T], Iterable[K]], / # noqa: W504 

793 ) -> Stream[K]: ... 

794 

795 @typing.overload 

796 def flat_map( # noqa: D102 

797 self, 

798 fun: Callable[[T, Unpack[Tvt]], Iterable[K]], 

799 /, 

800 *args: Unpack[Tvt], 

801 ) -> Stream[K]: ... 

802 

803 def flat_map( 

804 self, 

805 fun: Callable[[T, Unpack[Tvt]], Iterable[K]], 

806 /, 

807 *args: Unpack[Tvt], 

808 ) -> Stream[K]: 

809 """Map each value to another. 

810 

811 This lazily finishes the current Stream and creates a new one. 

812 

813 >>> Stream([1, 4, 7]).flat_map(lambda x: [x, x + 1, x + 2]).collect() 

814 (1, 2, 3, 4, 5, 6, 7, 8, 9) 

815 >>> Stream(["abc", "def"]).flat_map(str.encode, "ASCII").collect() 

816 (97, 98, 99, 100, 101, 102) 

817 """ 

818 return Stream( 

819 itertools.chain.from_iterable( 

820 map_with_additional_args(self._data, fun, args) 

821 ) 

822 ) 

823 

824 def for_each(self, fun: Callable[[T], object] = functions.noop, /) -> None: 

825 """Consume all the values of the Stream with the callable. 

826 

827 >>> Stream([1, 2, 3]).for_each(print) 

828 1 

829 2 

830 3 

831 """ 

832 for value in self._data: 

833 fun(value) 

834 self._finish(None, close_source=True) 

835 

836 def last(self) -> T: 

837 """Return the last element of the Stream. This finishes the Stream. 

838 

839 raises StreamEmptyError if stream is empty. 

840 

841 >>> Stream([1, 2, 3]).last() 

842 3 

843 >>> Stream([]).last() 

844 Traceback (most recent call last): 

845 ... 

846 typed_stream.exceptions.StreamEmptyError 

847 """ 

848 if tail := self.tail(1): 

849 return tail[-1] 

850 raise exceptions.StreamEmptyError() 

851 

852 @override 

853 def limit(self, c: int, /) -> Self: 

854 """Stop the Stream after count values. 

855 

856 >>> Stream([1, 2, 3, 4, 5]).limit(3).collect() 

857 (1, 2, 3) 

858 >>> Stream.from_value(3).limit(1000).collect() == (3,) * 1000 

859 True 

860 """ 

861 self._data = itertools.islice(self._data, c) 

862 return self 

863 

864 @typing.overload 

865 def map(self, fun: Callable[[T], K], /) -> Stream[K]: ... # noqa: D102 

866 

867 @typing.overload 

868 def map( # noqa: D102 

869 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt] 

870 ) -> Stream[K]: ... 

871 

872 def map( 

873 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt] 

874 ) -> Stream[K]: 

875 """Map each value to another. 

876 

877 This lazily finishes the current Stream and creates a new one. 

878 

879 >>> Stream([1, 2, 3]).map(lambda x: x * 3).collect() 

880 (3, 6, 9) 

881 >>> Stream([1, 2, 3]).map(operator.mul, 3).collect() 

882 (3, 6, 9) 

883 """ 

884 return self._finish( 

885 Stream(map_with_additional_args(self._data, fun, args)) 

886 ) 

887 

888 @typing.overload 

889 def max(self: Stream[SC]) -> SC: ... # noqa: D102 

890 

891 @typing.overload 

892 def max( # noqa: D102 

893 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE 

894 ) -> SC | K: ... 

895 

896 @typing.overload 

897 def max( # noqa: D102 

898 self, 

899 default: K | _DefaultValueType = _DEFAULT_VALUE, 

900 *, 

901 key: Callable[[T], SC], 

902 ) -> T | K: ... 

903 

904 def max( 

905 self, 

906 default: object = _DEFAULT_VALUE, 

907 *, 

908 key: Callable[[T], SC] | None = None, 

909 ) -> object: 

910 """Return the biggest element of the stream. 

911 

912 >>> Stream([3, 2, 1]).max() 

913 3 

914 >>> Stream(["a", "b", "c"]).max() 

915 'c' 

916 >>> Stream(["abc", "de", "f"]).max(key=len) 

917 'abc' 

918 >>> Stream([]).max(default=0) 

919 0 

920 >>> Stream([]).max() 

921 Traceback (most recent call last): 

922 ... 

923 typed_stream.exceptions.StreamEmptyError 

924 """ 

925 max_ = max(self._data, default=default, key=key) # type: ignore[type-var,arg-type] 

926 if isinstance(max_, _DefaultValueType): 

927 raise exceptions.StreamEmptyError() from None 

928 return self._finish(max_, close_source=True) 

929 

930 @typing.overload 

931 def min(self: Stream[SC]) -> SC: ... # noqa: D102 

932 

933 @typing.overload 

934 def min( # noqa: D102 

935 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE 

936 ) -> SC | K: ... 

937 

938 @typing.overload 

939 def min( # noqa: D102 

940 self, 

941 default: K | _DefaultValueType = _DEFAULT_VALUE, 

942 *, 

943 key: Callable[[T], SC], 

944 ) -> T | K: ... 

945 

946 def min( 

947 self, 

948 default: object = _DEFAULT_VALUE, 

949 *, 

950 key: Callable[[T], SC] | None = None, 

951 ) -> object: 

952 """Return the smallest element of the stream. 

953 

954 >>> Stream([1, 2, 3]).min() 

955 1 

956 >>> Stream(["a", "b", "c"]).min() 

957 'a' 

958 >>> Stream(["abc", "de", "f"]).min(key=len) 

959 'f' 

960 >>> Stream([]).min(default=0) 

961 0 

962 >>> Stream([]).min() 

963 Traceback (most recent call last): 

964 ... 

965 typed_stream.exceptions.StreamEmptyError 

966 """ 

967 min_ = min(self._data, default=default, key=key) # type: ignore[type-var,arg-type] 

968 if isinstance(min_, _DefaultValueType): 

969 raise exceptions.StreamEmptyError() from None 

970 return self._finish(min_, close_source=True) 

971 

972 @typing.overload 

973 def nth(self, index: int, /) -> T: ... # noqa: D102 

974 

975 @typing.overload 

976 def nth(self, index: int, /, default: T) -> T: ... # noqa: D102 

977 

978 @typing.overload 

979 def nth(self, index: int, /, default: K) -> T | K: ... # noqa: D102 

980 

981 def nth( # noqa: C901 

982 self, 

983 index: int, 

984 /, 

985 default: K | _DefaultValueType = _DEFAULT_VALUE, 

986 ) -> T | K: 

987 """Return the nth item of the stream. 

988 

989 Raises StreamIndexError if no default value is given and the Stream 

990 does not have an item at the given index. 

991 

992 Stream(...).nth(0) gets the first element of the stream. 

993 

994 >>> Stream([1, 2, 3]).nth(0) 

995 1 

996 >>> Stream("abc").nth(1) 

997 'b' 

998 >>> Stream([]).nth(22) 

999 Traceback (most recent call last): 

1000 ... 

1001 typed_stream.exceptions.StreamIndexError 

1002 >>> Stream([]).nth(22, default=42) 

1003 42 

1004 """ 

1005 value: T | _DefaultValueType 

1006 if index < 0: 

1007 tail = self.tail(abs(index)) 

1008 value = tail[0] if len(tail) == abs(index) else _DEFAULT_VALUE 

1009 else: # value >= 0 

1010 try: 

1011 value = self.drop(index).first() 

1012 except exceptions.StreamEmptyError: 

1013 value = _DEFAULT_VALUE 

1014 

1015 if not isinstance(value, _DefaultValueType): 

1016 return value 

1017 

1018 if isinstance(default, _DefaultValueType): 

1019 raise exceptions.StreamIndexError() 

1020 

1021 return default 

1022 

1023 @typing.overload 

1024 def nwise( # noqa: D102 

1025 self, size: typing.Literal[1], / # noqa: W504 

1026 ) -> Stream[tuple[T]]: ... 

1027 

1028 @typing.overload 

1029 def nwise( # noqa: D102 

1030 self, size: typing.Literal[2], / # noqa: W504 

1031 ) -> Stream[tuple[T, T]]: ... # noqa: D102 

1032 

1033 @typing.overload 

1034 def nwise( # noqa: D102 

1035 self, size: int, / # noqa: W504 

1036 ) -> Stream[tuple[T, ...]]: ... 

1037 

1038 def nwise( 

1039 self, size: int, / # noqa: W504 

1040 ) -> Stream[tuple[T, ...]] | Stream[tuple[T, T]] | Stream[tuple[T]]: 

1041 """Return a Stream of overlapping n-lets. 

1042 

1043 This is often called a sliding window. 

1044 For n=2 it behaves like pairwise from itertools. 

1045 

1046 The returned Stream will consist of tuples of length n. 

1047 If n is bigger than the count of values in self, it will be empty. 

1048 

1049 >>> Stream([1, 2, 3]).nwise(1).collect() 

1050 ((1,), (2,), (3,)) 

1051 >>> Stream([1, 2, 3]).nwise(2).collect() 

1052 ((1, 2), (2, 3)) 

1053 >>> Stream([1, 2, 3, 4]).nwise(3).collect() 

1054 ((1, 2, 3), (2, 3, 4)) 

1055 >>> Stream([1, 2, 3, 4, 5]).nwise(4).collect() 

1056 ((1, 2, 3, 4), (2, 3, 4, 5)) 

1057 """ 

1058 return self._finish( 

1059 Stream(_iteration_utils.sliding_window(self._data, size)) 

1060 ) 

1061 

1062 @override 

1063 def peek(self, fun: Callable[[T], object], /) -> Self: 

1064 """Peek at every value, without modifying the values in the Stream. 

1065 

1066 >>> stream = Stream([1, 2, 3]).peek(print) 

1067 >>> stream.map(str).collect() 

1068 1 

1069 2 

1070 3 

1071 ('1', '2', '3') 

1072 """ 

1073 self._data = map(_iteration_utils.Peeker(fun), self._data) 

1074 return self 

1075 

1076 def reduce( 

1077 self, 

1078 fun: Callable[[T, T], T], 

1079 initial: T | _DefaultValueType = _DEFAULT_VALUE, 

1080 ) -> T: 

1081 """Reduce the values of this stream. This finishes the Stream. 

1082 

1083 If no initial value is provided a StreamEmptyError is raised if 

1084 the stream is empty. 

1085 

1086 >>> Stream([1, 2, 3]).reduce(operator.add) 

1087 6 

1088 >>> Stream([1, 2, 3]).reduce(operator.mul) 

1089 6 

1090 >>> Stream([]).reduce(operator.mul) 

1091 Traceback (most recent call last): 

1092 ... 

1093 typed_stream.exceptions.StreamEmptyError 

1094 """ 

1095 iterator = iter(self._data) 

1096 if isinstance(initial, _DefaultValueType): 

1097 try: 

1098 initial = next(iterator) 

1099 except StopIteration: 

1100 raise exceptions.StreamEmptyError() from None 

1101 return self._finish(functools.reduce(fun, iterator, initial), True) 

1102 

1103 def starcollect(self, fun: _types.StarCallable[T, K]) -> K: 

1104 """Collect the values of this Stream. This finishes the Stream. 

1105 

1106 >>> Stream([1, 2, 3]).starcollect(lambda *args: args) 

1107 (1, 2, 3) 

1108 >>> Stream([]).starcollect(lambda *args: args) 

1109 () 

1110 """ 

1111 return self._finish(fun(*self._data), close_source=True) 

1112 

1113 @typing.overload 

1114 def starmap( # noqa: D102 

1115 self: Stream[_utils.IndexValueTuple[K]], 

1116 fun: Callable[[int, K], U], 

1117 /, 

1118 ) -> Stream[U]: ... 

1119 

1120 @typing.overload 

1121 def starmap( # noqa: D102 

1122 self: Stream[tuple[Unpack[Tvt]]], 

1123 fun: Callable[[Unpack[Tvt]], U], 

1124 /, 

1125 ) -> Stream[U]: ... 

1126 

1127 def starmap( # noqa: D102 

1128 self: Stream[tuple[Unpack[Tvt]]], 

1129 fun: Callable[[Unpack[Tvt]], U], 

1130 /, 

1131 ) -> Stream[U]: 

1132 """Map each value to another. 

1133 

1134 This lazily finishes the current Stream and creates a new one. 

1135 

1136 >>> Stream([(1, 2), (3, 4)]).starmap(operator.mul).collect() 

1137 (2, 12) 

1138 >>> Stream([(2, "x"), (3, "y")]).starmap(operator.mul).collect() 

1139 ('xx', 'yyy') 

1140 """ 

1141 return self._finish(Stream(itertools.starmap(fun, self._data))) 

1142 

1143 def sum(self: Stream[SA]) -> SA: 

1144 """Calculate the sum of the elements. 

1145 

1146 This works for every type that supports addition. 

1147 

1148 For numbers stream.collect(sum) could be faster. 

1149 For strings stream.collect("".join) could be faster. 

1150 For lists stream.flat_map(lambda _: _).collect(list) could be faster. 

1151 For tuples stream.flat_map(lambda _: _).collect(tuple) could be faster. 

1152 

1153 >>> Stream([1, 2, 3]).sum() 

1154 6 

1155 >>> Stream(["a", "b", "c"]).sum() 

1156 'abc' 

1157 >>> Stream([(1, 2), (3, 4)]).sum() 

1158 (1, 2, 3, 4) 

1159 >>> Stream(([1], [2], [3])).sum() 

1160 [1, 2, 3] 

1161 """ 

1162 return self.reduce(add) 

1163 

1164 def tail(self, c: int, /) -> streamable.StreamableSequence[T]: 

1165 """Return a Sequence with the last count items. 

1166 

1167 >>> Stream([1, 2, 3]).tail(2) 

1168 (2, 3) 

1169 >>> Stream.range(100).tail(10) 

1170 (90, 91, 92, 93, 94, 95, 96, 97, 98, 99) 

1171 """ 

1172 return self._finish( 

1173 streamable.StreamableSequence( 

1174 collections.deque(self._data, maxlen=c) 

1175 ), 

1176 close_source=True, 

1177 ) 

1178 

1179 def take_while(self, fun: Callable[[T], object]) -> Self: 

1180 """Take values as long the function returns a truthy value. 

1181 

1182 See: https://docs.python.org/3/library/itertools.html#itertools.takewhile 

1183 

1184 >>> Stream([1, 2, 3, 4, 1]).take_while(lambda x: x < 4).collect() 

1185 (1, 2, 3) 

1186 >>> Stream([1, 2, 3, -4, -1]).take_while(lambda x: x <= 0).collect() 

1187 () 

1188 """ 

1189 self._data = itertools.takewhile(fun, self._data) 

1190 return self