Coverage for typed_stream/_impl/stream.py: 99%

331 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 21:24 +0000

1# Licensed under the EUPL-1.2 or later. 

2# You may obtain a copy of the licence in all the official languages of the 

3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 

4 

5"""Typed Stream class for easier handling of iterables.""" 

6 

7from __future__ import annotations 

8 

9import collections 

10import concurrent.futures 

11import functools 

12import itertools 

13import operator 

14import sys 

15import typing 

16from collections.abc import Callable, Iterable, Iterator, Mapping 

17from numbers import Number, Real 

18from types import EllipsisType 

19 

20from .. import exceptions, streamable 

21from . import _iteration_utils, functions, stream_abc 

22from ._default_value import DEFAULT_VALUE as _DEFAULT_VALUE 

23from ._default_value import DefaultValueType as _DefaultValueType 

24from ._typing import Self, TypeVarTuple, Unpack, override 

25 

26# pylint: disable=too-many-lines 

27__all__: tuple[typing.Literal["Stream"]] = ("Stream",) 

28 

29if typing.TYPE_CHECKING: 

30 from . import _types, _utils 

31 

32K = typing.TypeVar("K") 

33T = typing.TypeVar("T") 

34U = typing.TypeVar("U") 

35V = typing.TypeVar("V") 

36Prim = typing.TypeVar("Prim", int, str, bool, complex, Number, Real) 

37Exc = typing.TypeVar("Exc", bound=BaseException) 

38 

39SA = typing.TypeVar("SA", bound="_types.SupportsAdd") 

40SC = typing.TypeVar("SC", bound="_types.SupportsComparison") 

41 

42Tvt = TypeVarTuple("Tvt") 

43 

44add: Callable[[SA, SA], SA] = operator.add 

45 

46 

47class Stream(stream_abc.StreamABC[T], Iterable[T]): 

48 """Typed Stream class for easier handling of iterables. 

49 

50 It is not recommended to store Stream instances in variables, 

51 instead use method chaining to handle the values and collect them when finished. 

52 """ 

53 

54 __slots__ = () 

55 

56 _data: Iterable[T] 

57 

58 def __init__( 

59 self, 

60 data: Iterable[T] | EllipsisType, 

61 close_source_callable: Callable[[], None] | None = None, 

62 ) -> None: 

63 """Create a new Stream. 

64 

65 To create a finished Stream do Stream(...). 

66 """ 

67 super().__init__( 

68 ... if isinstance(data, EllipsisType) else data, 

69 close_source_callable, 

70 ) 

71 

72 def __contains__(self, value: T, /) -> bool: 

73 """Check whether this stream contains the given value. 

74 

75 >>> 2 in Stream((1, 2, 3)) 

76 True 

77 >>> 4 in Stream((1, 2, 3)) 

78 False 

79 >>> 3 in Stream((1, 2, 3, 4, 5, 6, 7, 8)).peek(print) 

80 1 

81 2 

82 3 

83 True 

84 """ 

85 for element in self._data: 

86 if element == value: 

87 return self._finish(True, close_source=True) 

88 return self._finish(False, close_source=True) 

89 

90 @typing.overload 

91 def __getitem__(self, item: int, /) -> T: 

92 """Nobody inspects the spammish repetition.""" 

93 

94 @typing.overload 

95 def __getitem__( 

96 self, item: slice[int | None, int | None, int | None], / # noqa: W504 

97 ) -> streamable.StreamableSequence[T]: 

98 """Nobody inspects the spammish repetition.""" 

99 

100 def __getitem__( 

101 self, 

102 item: slice[int | None, int | None, int | None] | int, 

103 /, # noqa: W504 

104 ) -> streamable.StreamableSequence[T] | T: 

105 """Finish the stream by collecting. 

106 

107 >>> Stream((1, 2, 3))[1] 

108 2 

109 >>> Stream((1, 2, 3))[1:3] 

110 (2, 3) 

111 """ 

112 if isinstance(item, int): 

113 return self.nth(item) 

114 return self._get_slice(start=item.start, stop=item.stop, step=item.step) 

115 

116 @override 

117 def __iter__(self) -> Iterator[T]: 

118 """Iterate over the values of this Stream. This finishes the Stream. 

119 

120 >>> for value in Stream((1, 2, 3)): 

121 ... print(value) 

122 1 

123 2 

124 3 

125 """ 

126 return _iteration_utils.IterWithCleanUp(self._data, self.close) 

127 

128 def __length_hint__(self) -> int: 

129 """Return an estimated length for this Stream. 

130 

131 >>> from operator import length_hint 

132 >>> length_hint(Stream([1, 2, 3])) 

133 3 

134 >>> length_hint(Stream.range(100)) 

135 100 

136 """ 

137 return operator.length_hint(self._data) 

138 

139 def __reversed__(self) -> Iterator[T]: 

140 """Return the items of this Stream in reversed order. 

141 

142 This finishes the Stream and collects all the element. 

143 

144 Equivalent to reversed(self.collect()). 

145 

146 >>> tuple(reversed(Stream((1, 2, 3)))) 

147 (3, 2, 1) 

148 >>> "".join(reversed(Stream("abc"))) 

149 'cba' 

150 """ 

151 return reversed(self.collect()) 

152 

153 def _get_slice( # noqa: C901 

154 self, 

155 *, 

156 start: int | None = None, 

157 stop: int | None = None, 

158 step: int | None = None, 

159 ) -> streamable.StreamableSequence[T]: 

160 """Implement __getitem__ with slices.""" 

161 if start is stop is step is None: 

162 return self.collect() 

163 if ( # pylint: disable=too-many-boolean-expressions 

164 (start is None or start >= 0) 

165 and (step is None or step >= 0) 

166 and (stop is None or stop >= 0) 

167 ): 

168 return self._finish( 

169 streamable.StreamableSequence( 

170 itertools.islice(self._data, start, stop, step) 

171 ), 

172 close_source=True, 

173 ) 

174 if ( 

175 start is not None 

176 and start < 0 

177 and step in {None, 1} 

178 and stop is None 

179 ): 

180 return self.tail(abs(start)) 

181 return self.collect()[start:stop:step] 

182 

183 @classmethod 

184 @override 

185 def _module(cls) -> str: 

186 if cls == Stream: 

187 return "typed_stream" 

188 return cls.__module__ 

189 

190 @staticmethod 

191 def counting(start: int = 0, step: int = 1) -> Stream[int]: 

192 """Create an endless counting Stream. 

193 

194 >>> Stream.counting().limit(5).collect() 

195 (0, 1, 2, 3, 4) 

196 >>> Stream.counting(5, 2).limit(5).collect() 

197 (5, 7, 9, 11, 13) 

198 """ 

199 return Stream(itertools.count(start, step)) 

200 

201 @staticmethod 

202 def from_value(value: K) -> Stream[K]: 

203 """Create an endless Stream of the same value. 

204 

205 >>> Stream.from_value(1).limit(5).collect() 

206 (1, 1, 1, 1, 1) 

207 """ 

208 return Stream(itertools.repeat(value)) 

209 

210 @typing.overload 

211 @staticmethod 

212 def range(stop: int, /) -> Stream[int]: ... # noqa: D102 

213 

214 @typing.overload 

215 @staticmethod 

216 def range(*, stop: int) -> Stream[int]: ... # noqa: D102 

217 

218 @typing.overload 

219 @staticmethod 

220 def range(*, start: int, stop: int) -> Stream[int]: ... # noqa: D102 

221 

222 @typing.overload 

223 @staticmethod 

224 def range(start: int, stop: int, /) -> Stream[int]: ... # noqa: D102 

225 

226 @typing.overload 

227 @staticmethod 

228 def range(start: int, /, *, stop: int) -> Stream[int]: ... # noqa: D102 

229 

230 @typing.overload 

231 @staticmethod 

232 def range( # noqa: D102 

233 start: int, stop: int, /, *, step: int 

234 ) -> Stream[int]: ... 

235 

236 @typing.overload 

237 @staticmethod 

238 def range( # noqa: D102 

239 start: int, stop: int, step: int, / # noqa: W504 

240 ) -> Stream[int]: ... 

241 

242 @typing.overload 

243 @staticmethod 

244 def range( # noqa: D102 

245 start: int, /, *, stop: int, step: int 

246 ) -> Stream[int]: ... 

247 

248 @typing.overload 

249 @staticmethod 

250 def range( # noqa: D102 

251 *, start: int, stop: int, step: int 

252 ) -> Stream[int]: ... 

253 

254 @staticmethod 

255 def range( # noqa: C901 

256 *args: int, 

257 start: int | _DefaultValueType = _DEFAULT_VALUE, 

258 stop: int | _DefaultValueType = _DEFAULT_VALUE, 

259 step: int | _DefaultValueType = _DEFAULT_VALUE, 

260 ) -> Stream[int]: 

261 """Create a Stream[int] from a range. 

262 

263 The arguments behave like to the built-in range function: 

264 - Stream.range(stop) -> Stream[int] 

265 - Stream.range(start, stop[, step]) -> Stream[int] 

266 

267 >>> Stream.range(5).collect() == Stream(range(5)).collect() 

268 True 

269 >>> Stream.range(1, 13).collect() == Stream(range(1, 13)).collect() 

270 True 

271 >>> Stream.range(1, 9, 2).collect() == Stream(range(1, 9, 2)).collect() 

272 True 

273 >>> Stream.range(start=1, stop=7, step=2).collect() 

274 (1, 3, 5) 

275 """ 

276 # pylint: disable=confusing-consecutive-elif 

277 if not isinstance(start, _DefaultValueType): 

278 if not args and not isinstance(stop, _DefaultValueType): 

279 return Stream( 

280 range(start, stop) 

281 if isinstance(step, _DefaultValueType) 

282 else range(start, stop, step) 

283 ) 

284 elif isinstance(stop, _DefaultValueType): 

285 if isinstance(step, _DefaultValueType): 

286 return Stream(range(*args)) # no kwarg given 

287 if len(args) == 2: 

288 return Stream(range(args[0], args[1], step)) 

289 elif isinstance(step, _DefaultValueType): 

290 if len(args) == 1: 

291 return Stream(range(args[0], stop)) 

292 if not args: 

293 return Stream(range(stop)) 

294 elif len(args) == 1: 

295 return Stream(range(args[0], stop, step)) 

296 raise TypeError("Unexpected arguments to Stream.range()") 

297 

298 def all(self) -> bool: 

299 """Check whether all values are Truthy. This finishes the Stream. 

300 

301 Returns False if there is any false value in the Stream. 

302 

303 >>> Stream([1, 2, 3]).peek(print).all() 

304 1 

305 2 

306 3 

307 True 

308 >>> Stream([1, 2, 0, 4, 5, 6, 7, 8]).peek(print).all() 

309 1 

310 2 

311 0 

312 False 

313 >>> Stream([]).all() 

314 True 

315 """ 

316 return self._finish(all(self._data), close_source=True) 

317 

318 @typing.overload 

319 def catch( # noqa: D102 

320 self, 

321 *exception_class: type[Exc], 

322 ) -> Self: ... 

323 

324 @typing.overload 

325 def catch( # noqa: D102 

326 self, 

327 *exception_class: type[Exc], 

328 handler: Callable[[Exc], object], 

329 ) -> Self: ... 

330 

331 @typing.overload 

332 def catch( # noqa: D102 

333 self, 

334 *exception_class: type[Exc], 

335 default: Callable[[Exc], K] | Callable[[], K], 

336 ) -> Stream[T | K]: ... 

337 

338 @typing.overload 

339 def catch( # noqa: D102 

340 self, 

341 *exception_class: type[Exc], 

342 handler: Callable[[Exc], object], 

343 default: Callable[[Exc], K] | Callable[[], K], 

344 ) -> Stream[T | K]: ... 

345 

346 def catch( 

347 self, 

348 *exception_class: type[Exc], 

349 handler: Callable[[Exc], object] | None = None, 

350 default: Callable[[Exc], K] | Callable[[], K] | None = None, 

351 ) -> Stream[T | K]: 

352 """Catch exceptions. 

353 

354 >>> Stream("1a2").map(int).catch(ValueError, handler=print).collect() 

355 invalid literal for int() with base 10: 'a' 

356 (1, 2) 

357 >>> Stream("1a2").map(int).catch(ValueError, default=lambda _:_).collect() 

358 (1, ValueError("invalid literal for int() with base 10: 'a'"), 2) 

359 >>> Stream("1a2").map(int).peek(print) \ 

360 .catch(ValueError, handler=print).collect() 

361 1 

362 invalid literal for int() with base 10: 'a' 

363 2 

364 (1, 2) 

365 """ 

366 return self._finish( 

367 Stream( 

368 _iteration_utils.ExceptionHandler( 

369 self._data, exception_class, handler, default 

370 ) 

371 ) 

372 ) 

373 

374 def chain(self, *iterables: Iterable[T]) -> Self: 

375 """Add another iterable to the end of the Stream. 

376 

377 >>> Stream([1, 2, 3]).chain([4, 5, 6]).collect() 

378 (1, 2, 3, 4, 5, 6) 

379 >>> Stream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect() 

380 (1, 2, 3, 4, 5, 6, 7, 8, 9) 

381 >>> Stream("abc").chain("def", "ghi", "jkl").collect("".join) 

382 'abcdefghijkl' 

383 """ 

384 self._data = itertools.chain(self._data, *iterables) 

385 return self 

386 

387 if sys.version_info >= (3, 12) and hasattr(itertools, "batched"): 

388 

389 def chunk(self, size: int) -> Stream[tuple[T, ...]]: 

390 """Split the stream into chunks of the specified size. 

391 

392 The last chunk may be shorter. 

393 

394 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect() 

395 ((1, 2), (3, 4), (5, 6)) 

396 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect() 

397 ((1, 2, 3), (4, 5, 6)) 

398 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect() 

399 ((1, 2, 3), (4, 5, 6), (7,)) 

400 """ 

401 return self._finish( 

402 Stream( 

403 # add strict=False if min version is 3.13 (is the default) 

404 itertools.batched(self._data, size), # noqa: B911 

405 ) 

406 ) 

407 

408 else: # pragma: no cover 

409 

410 def chunk(self, size: int) -> Stream[tuple[T, ...]]: 

411 """Split the stream into chunks of the specified size. 

412 

413 The last chunk may be shorter. 

414 

415 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(2).collect() 

416 ((1, 2), (3, 4), (5, 6)) 

417 >>> Stream([1, 2, 3, 4, 5, 6]).chunk(3).collect() 

418 ((1, 2, 3), (4, 5, 6)) 

419 >>> Stream([1, 2, 3, 4, 5, 6, 7]).chunk(3).collect() 

420 ((1, 2, 3), (4, 5, 6), (7,)) 

421 """ 

422 return self._finish( 

423 _iteration_utils.Chunked(self._data, size).stream() 

424 ) 

425 

426 @typing.overload 

427 def collect(self, /) -> streamable.StreamableSequence[T]: ... # noqa: D102 

428 

429 @typing.overload 

430 def collect( # noqa: D102 

431 self, 

432 fun: Callable[[Iterable[T]], streamable.StreamableSequence[T]], 

433 /, 

434 ) -> streamable.StreamableSequence[T]: ... 

435 

436 @typing.overload 

437 def collect( # noqa: D102 

438 self, 

439 fun: type[collections.Counter[T]], 

440 /, 

441 ) -> collections.Counter[T]: ... 

442 

443 @typing.overload 

444 def collect( # noqa: D102 

445 self, fun: Callable[[Iterable[T]], tuple[T, ...]], / # noqa: W504 

446 ) -> tuple[T, ...]: ... 

447 

448 @typing.overload 

449 def collect( # noqa: D102 

450 self, fun: Callable[[Iterable[T]], list[T]], / # noqa: W504 

451 ) -> list[T]: ... 

452 

453 @typing.overload 

454 def collect( # noqa: D102 

455 self: Stream[SA], fun: Callable[[Iterable[SA]], SA], / # noqa: W504 

456 ) -> SA: ... 

457 

458 @typing.overload 

459 def collect( # noqa: D102 

460 self, fun: Callable[[Iterable[T]], set[T]], / # noqa: W504 

461 ) -> set[T]: ... 

462 

463 @typing.overload 

464 def collect( # noqa: D102 

465 self, fun: Callable[[Iterable[T]], frozenset[T]], / # noqa: W504 

466 ) -> frozenset[T]: ... 

467 

468 @typing.overload 

469 def collect( # noqa: D102 

470 self: Stream[tuple[K, V]], 

471 fun: Callable[[Iterable[tuple[K, V]]], dict[K, V]], 

472 /, 

473 ) -> dict[K, V]: ... 

474 

475 @typing.overload 

476 def collect( # noqa: D102 

477 self: Stream[tuple[K, V]], 

478 fun: Callable[[Iterable[tuple[K, V]]], Mapping[K, V]], 

479 /, 

480 ) -> Mapping[K, V]: ... 

481 

482 @typing.overload 

483 def collect( # noqa: D102 

484 self, fun: Callable[[Iterable[T]], K], / # noqa: W504 

485 ) -> K: ... 

486 

487 def collect( 

488 self: Stream[U], 

489 fun: Callable[[Iterable[U]], object] = streamable.StreamableSequence, 

490 /, 

491 ) -> object: 

492 """Collect the values of this Stream. This finishes the Stream. 

493 

494 >>> Stream([1, 2, 3]).collect(list) 

495 [1, 2, 3] 

496 >>> Stream([1, 2, 3]).collect(sum) 

497 6 

498 >>> Stream([1, 2, 3]).collect(dict.fromkeys) 

499 {1: None, 2: None, 3: None} 

500 >>> Stream([(1, 2), (3, 4)]).collect(dict) 

501 {1: 2, 3: 4} 

502 """ 

503 return self._finish(fun(self._data), close_source=True) 

504 

505 def concurrent_map( 

506 self, fun: Callable[[T], K], /, max_workers: int | None = None 

507 ) -> Stream[K]: 

508 """Map values concurrently. 

509 

510 See: https://docs.python.org/3/library/concurrent.futures.html 

511 

512 >>> Stream("123").concurrent_map(int).collect() 

513 (1, 2, 3) 

514 """ 

515 with concurrent.futures.ProcessPoolExecutor( 

516 max_workers=max_workers 

517 ) as executor: 

518 return self._finish(Stream(executor.map(fun, self._data))) 

519 

520 def conditional_map( 

521 self, 

522 condition: Callable[[T], object], 

523 if_true: Callable[[T], U], 

524 if_false: Callable[[T], V] | None = None, 

525 ) -> Stream[U | V]: 

526 """Map values conditionally. 

527 

528 >>> Stream("1x2x3x").conditional_map(str.isdigit, int).collect() 

529 (1, 2, 3) 

530 >>> Stream("1x2x3x").conditional_map(str.isdigit, int, ord).collect() 

531 (1, 120, 2, 120, 3, 120) 

532 """ 

533 return self._finish( 

534 Stream( 

535 _iteration_utils.IfElseMap( 

536 self._data, condition, if_true, if_false 

537 ) 

538 ) 

539 ) 

540 

541 def count(self) -> int: 

542 """Count the elements in this Stream. This finishes the Stream. 

543 

544 Equivalent to: Stream(...).map(lambda x: 1).sum() 

545 

546 >>> Stream([1, 2, 3]).count() 

547 3 

548 >>> Stream("abcdef").count() 

549 6 

550 """ 

551 return self._finish( 

552 _iteration_utils.count(self._data), close_source=True 

553 ) 

554 

555 def dedup(self, *, key: None | Callable[[T], object] = None) -> Self: 

556 """Remove consecutive equal values. 

557 

558 If the input is sorted this is the same as Stream.distinct(). 

559 

560 >>> Stream([1] * 100).dedup().collect(list) 

561 [1] 

562 >>> Stream([1, 2, 3, 1]).dedup().collect() 

563 (1, 2, 3, 1) 

564 >>> Stream([1, 1, 2, 2, 2, 2, 3, 1]).dedup().collect() 

565 (1, 2, 3, 1) 

566 >>> Stream([]).dedup().collect() 

567 () 

568 >>> Stream("ABC").dedup(key=str.lower).collect("".join) 

569 'ABC' 

570 >>> Stream("aAaAbbbcCCAaBbCc").dedup(key=str.lower).collect("".join) 

571 'abcABC' 

572 """ 

573 # Inspired by the unique_justseen itertools recipe 

574 # https://docs.python.org/3/library/itertools.html#itertools-recipes 

575 self._data = map( 

576 next, 

577 map( 

578 operator.itemgetter(1), 

579 itertools.groupby(self._data, key=key), 

580 ), 

581 ) 

582 return self 

583 

584 def dedup_counting(self) -> Stream[tuple[T, int]]: 

585 """Group the stream and count the items in the group. 

586 

587 >>> Stream("abba").dedup_counting().starmap(print).for_each() 

588 a 1 

589 b 2 

590 a 1 

591 >>> Stream("AaaaBBcccc").dedup_counting().starmap(print).for_each() 

592 A 1 

593 a 3 

594 B 2 

595 c 4 

596 """ 

597 

598 def _map(k: T, g: Iterator[T]) -> tuple[T, int]: 

599 return (k, _iteration_utils.count(g)) 

600 

601 return Stream(itertools.starmap(_map, itertools.groupby(self))) 

602 

603 @override 

604 def distinct(self, *, use_set: bool = True) -> Self: 

605 """Remove duplicate values. 

606 

607 >>> from typed_stream import Stream 

608 >>> Stream([1, 2, 2, 2, 3, 2, 2]).distinct().collect() 

609 (1, 2, 3) 

610 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct().collect() 

611 Traceback (most recent call last): 

612 ... 

613 TypeError: unhashable type: 'set' 

614 >>> Stream([{1}, {2}, {3}, {2}, {2}]).distinct(use_set=False).collect() 

615 ({1}, {2}, {3}) 

616 """ 

617 # pylint: disable=duplicate-code 

618 encountered: set[T] | list[T] 

619 peek_fun: Callable[[T], None] 

620 if use_set: 

621 encountered = set() 

622 peek_fun = encountered.add 

623 else: 

624 encountered = [] 

625 peek_fun = encountered.append 

626 

627 self._data = map( 

628 _iteration_utils.Peeker(peek_fun), 

629 itertools.filterfalse(encountered.__contains__, self._data), 

630 ) 

631 return self 

632 

633 @override 

634 def drop(self, c: int, /) -> Self: 

635 """Drop the first count values. 

636 

637 >>> Stream([1, 2, 3, 4, 5]).drop(2).collect() 

638 (3, 4, 5) 

639 """ 

640 self._data = itertools.islice(self._data, c, None) 

641 return self 

642 

643 def drop_while(self, fun: Callable[[T], object], /) -> Self: 

644 """Drop values as long as the function returns a truthy value. 

645 

646 See: https://docs.python.org/3/library/itertools.html#itertools.dropwhile 

647 

648 >>> Stream([1, 2, 3, 4, 1]).drop_while(lambda x: x < 3).collect() 

649 (3, 4, 1) 

650 """ 

651 self._data = itertools.dropwhile(fun, self._data) 

652 return self 

653 

654 def empty(self) -> bool: 

655 """Check whether this doesn't contain any value. This finishes the Stream. 

656 

657 >>> Stream([1, 2, 3]).empty() 

658 False 

659 >>> Stream([]).empty() 

660 True 

661 """ 

662 try: 

663 self.first() 

664 except exceptions.StreamEmptyError: 

665 return True 

666 return False 

667 

668 def enumerate( 

669 self, start_index: int = 0, / # noqa: W504 

670 ) -> Stream[_utils.IndexValueTuple[T]]: 

671 """Map the values to a tuple of index and value. 

672 

673 >>> Stream([1, 2, 3]).enumerate().collect() 

674 ((0, 1), (1, 2), (2, 3)) 

675 >>> Stream("abc").enumerate().collect() 

676 ((0, 'a'), (1, 'b'), (2, 'c')) 

677 >>> Stream("abc").enumerate(100).collect() 

678 ((100, 'a'), (101, 'b'), (102, 'c')) 

679 >>> Stream("abc").enumerate().map(lambda el: {el.idx: el.val}).collect() 

680 ({0: 'a'}, {1: 'b'}, {2: 'c'}) 

681 """ 

682 return self._finish( 

683 Stream(_iteration_utils.Enumerator(self._data, start_index)) 

684 ) 

685 

686 @typing.overload 

687 def exclude( # noqa: D102 

688 self: Stream[K | Prim], 

689 fun: _utils.InstanceChecker[Prim], 

690 /, # noqa: W504 

691 ) -> Stream[K]: ... 

692 

693 @typing.overload 

694 def exclude( # noqa: D102 

695 self: Stream[K | U], fun: _utils.InstanceChecker[U], / # noqa: W504 

696 ) -> Stream[K]: ... 

697 

698 @typing.overload 

699 def exclude( # noqa: D102 

700 self: Stream[K | None], fun: _utils.NoneChecker, / # noqa: W504 

701 ) -> Stream[K]: ... 

702 

703 # @typing.overload 

704 # def exclude( # noqa: D102 

705 # self: Stream[K | U], fun: TypeGuardingCallable[U, K | U] 

706 # ) -> Stream[K]: 

707 # ... 

708 

709 @typing.overload 

710 def exclude( # noqa: D102 

711 self: Stream[T], fun: Callable[[T], object], / # noqa: W504 

712 ) -> Stream[T]: ... 

713 

714 @override 

715 def exclude(self, fun: Callable[[T], object], /) -> object: 

716 """Exclude values if the function returns a truthy value. 

717 

718 See: https://docs.python.org/3/library/itertools.html#itertools.filterfalse 

719 

720 >>> Stream([1, 2, 3, 4, 5]).exclude(lambda x: x % 2).collect() 

721 (2, 4) 

722 >>> Stream([1, 2, None, 3]).exclude(lambda x: x is None).collect() 

723 (1, 2, 3) 

724 """ 

725 self._data = itertools.filterfalse(fun, self._data) 

726 return self 

727 

728 @typing.overload 

729 def filter( # noqa: D102 

730 self: Stream[K | None], fun: _utils.NotNoneChecker 

731 ) -> Stream[K]: ... 

732 

733 @typing.overload 

734 def filter(self: Stream[K | None]) -> Stream[K]: ... # noqa: D102 

735 

736 @typing.overload 

737 def filter(self: Stream[T]) -> Stream[T]: ... # noqa: D102 

738 

739 @typing.overload 

740 def filter( # noqa: D102 

741 self, fun: _utils.InstanceChecker[K] 

742 ) -> Stream[K]: # pragma: no cover 

743 ... 

744 

745 @typing.overload 

746 def filter( # noqa: D102 

747 self, fun: _types.TypeGuardingCallable[K, T] 

748 ) -> Stream[K]: # pragma: no cover 

749 ... 

750 

751 @typing.overload 

752 def filter( # noqa: D102 

753 self: Stream[T], fun: Callable[[T], object] 

754 ) -> Stream[T]: ... 

755 

756 def filter(self, fun: Callable[[T], object] | None = None) -> object: 

757 """Use built-in filter to filter values. 

758 

759 >>> Stream([1, 2, 3, 4, 5]).filter(lambda x: x % 2).collect() 

760 (1, 3, 5) 

761 """ 

762 self._data = filter(fun, self._data) 

763 return self 

764 

765 def first(self, default: K | _DefaultValueType = _DEFAULT_VALUE) -> T | K: 

766 """Return the first element of the Stream. This finishes the Stream. 

767 

768 >>> Stream([1, 2, 3]).first() 

769 1 

770 >>> Stream("abc").first() 

771 'a' 

772 >>> Stream([]).first() 

773 Traceback (most recent call last): 

774 ... 

775 typed_stream.exceptions.StreamEmptyError 

776 >>> Stream([]).first(default="default") 

777 'default' 

778 """ 

779 try: 

780 first = next(iter(self._data)) 

781 except StopIteration: 

782 if not isinstance(default, _DefaultValueType): 

783 return default 

784 raise exceptions.StreamEmptyError() from None 

785 finally: 

786 self._finish(None, close_source=True) 

787 return first 

788 

789 @typing.overload 

790 def flat_map( # noqa: D102 

791 self, fun: Callable[[T], Iterable[K]], / # noqa: W504 

792 ) -> Stream[K]: ... 

793 

794 @typing.overload 

795 def flat_map( # noqa: D102 

796 self, 

797 fun: Callable[[T, Unpack[Tvt]], Iterable[K]], 

798 /, 

799 *args: Unpack[Tvt], 

800 ) -> Stream[K]: ... 

801 

802 def flat_map( 

803 self, 

804 fun: Callable[[T, Unpack[Tvt]], Iterable[K]], 

805 /, 

806 *args: Unpack[Tvt], 

807 ) -> Stream[K]: 

808 """Map each value to another. 

809 

810 This lazily finishes the current Stream and creates a new one. 

811 

812 >>> Stream([1, 4, 7]).flat_map(lambda x: [x, x + 1, x + 2]).collect() 

813 (1, 2, 3, 4, 5, 6, 7, 8, 9) 

814 >>> Stream(["abc", "def"]).flat_map(str.encode, "ASCII").collect() 

815 (97, 98, 99, 100, 101, 102) 

816 """ 

817 return Stream( 

818 itertools.chain.from_iterable( 

819 map(fun, self._data, *(itertools.repeat(arg) for arg in args)) 

820 ) 

821 ) 

822 

823 def for_each(self, fun: Callable[[T], object] = functions.noop, /) -> None: 

824 """Consume all the values of the Stream with the callable. 

825 

826 >>> Stream([1, 2, 3]).for_each(print) 

827 1 

828 2 

829 3 

830 """ 

831 for value in self._data: 

832 fun(value) 

833 self._finish(None, close_source=True) 

834 

835 def last(self) -> T: 

836 """Return the last element of the Stream. This finishes the Stream. 

837 

838 raises StreamEmptyError if stream is empty. 

839 

840 >>> Stream([1, 2, 3]).last() 

841 3 

842 >>> Stream([]).last() 

843 Traceback (most recent call last): 

844 ... 

845 typed_stream.exceptions.StreamEmptyError 

846 """ 

847 if tail := self.tail(1): 

848 return tail[-1] 

849 raise exceptions.StreamEmptyError() 

850 

851 @override 

852 def limit(self, c: int, /) -> Self: 

853 """Stop the Stream after count values. 

854 

855 >>> Stream([1, 2, 3, 4, 5]).limit(3).collect() 

856 (1, 2, 3) 

857 >>> Stream.from_value(3).limit(1000).collect() == (3,) * 1000 

858 True 

859 """ 

860 self._data = itertools.islice(self._data, c) 

861 return self 

862 

863 @typing.overload 

864 def map(self, fun: Callable[[T], K], /) -> Stream[K]: ... # noqa: D102 

865 

866 @typing.overload 

867 def map( # noqa: D102 

868 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt] 

869 ) -> Stream[K]: ... 

870 

871 def map( 

872 self, fun: Callable[[T, Unpack[Tvt]], K], /, *args: Unpack[Tvt] 

873 ) -> Stream[K]: 

874 """Map each value to another. 

875 

876 This lazily finishes the current Stream and creates a new one. 

877 

878 >>> Stream([1, 2, 3]).map(lambda x: x * 3).collect() 

879 (3, 6, 9) 

880 >>> Stream([1, 2, 3]).map(operator.mul, 3).collect() 

881 (3, 6, 9) 

882 """ 

883 return self._finish( 

884 Stream( 

885 map(fun, self._data, *(itertools.repeat(arg) for arg in args)) 

886 ) 

887 ) 

888 

889 @typing.overload 

890 def max(self: Stream[SC]) -> SC: ... # noqa: D102 

891 

892 @typing.overload 

893 def max( # noqa: D102 

894 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE 

895 ) -> SC | K: ... 

896 

897 @typing.overload 

898 def max( # noqa: D102 

899 self, 

900 default: K | _DefaultValueType = _DEFAULT_VALUE, 

901 *, 

902 key: Callable[[T], SC], 

903 ) -> T | K: ... 

904 

905 def max( 

906 self, 

907 default: object = _DEFAULT_VALUE, 

908 *, 

909 key: Callable[[T], SC] | None = None, 

910 ) -> object: 

911 """Return the biggest element of the stream. 

912 

913 >>> Stream([3, 2, 1]).max() 

914 3 

915 >>> Stream(["a", "b", "c"]).max() 

916 'c' 

917 >>> Stream(["abc", "de", "f"]).max(key=len) 

918 'abc' 

919 >>> Stream([]).max(default=0) 

920 0 

921 >>> Stream([]).max() 

922 Traceback (most recent call last): 

923 ... 

924 typed_stream.exceptions.StreamEmptyError 

925 """ 

926 max_ = max(self._data, default=default, key=key) # type: ignore[type-var,arg-type] 

927 if isinstance(max_, _DefaultValueType): 

928 raise exceptions.StreamEmptyError() from None 

929 return self._finish(max_, close_source=True) 

930 

931 @typing.overload 

932 def min(self: Stream[SC]) -> SC: ... # noqa: D102 

933 

934 @typing.overload 

935 def min( # noqa: D102 

936 self: Stream[SC], default: K | _DefaultValueType = _DEFAULT_VALUE 

937 ) -> SC | K: ... 

938 

939 @typing.overload 

940 def min( # noqa: D102 

941 self, 

942 default: K | _DefaultValueType = _DEFAULT_VALUE, 

943 *, 

944 key: Callable[[T], SC], 

945 ) -> T | K: ... 

946 

947 def min( 

948 self, 

949 default: object = _DEFAULT_VALUE, 

950 *, 

951 key: Callable[[T], SC] | None = None, 

952 ) -> object: 

953 """Return the smallest element of the stream. 

954 

955 >>> Stream([1, 2, 3]).min() 

956 1 

957 >>> Stream(["a", "b", "c"]).min() 

958 'a' 

959 >>> Stream(["abc", "de", "f"]).min(key=len) 

960 'f' 

961 >>> Stream([]).min(default=0) 

962 0 

963 >>> Stream([]).min() 

964 Traceback (most recent call last): 

965 ... 

966 typed_stream.exceptions.StreamEmptyError 

967 """ 

968 min_ = min(self._data, default=default, key=key) # type: ignore[type-var,arg-type] 

969 if isinstance(min_, _DefaultValueType): 

970 raise exceptions.StreamEmptyError() from None 

971 return self._finish(min_, close_source=True) 

972 

973 @typing.overload 

974 def nth(self, index: int, /) -> T: ... # noqa: D102 

975 

976 @typing.overload 

977 def nth(self, index: int, /, default: T) -> T: ... # noqa: D102 

978 

979 @typing.overload 

980 def nth(self, index: int, /, default: K) -> T | K: ... # noqa: D102 

981 

982 def nth( # noqa: C901 

983 self, 

984 index: int, 

985 /, 

986 default: K | _DefaultValueType = _DEFAULT_VALUE, 

987 ) -> T | K: 

988 """Return the nth item of the stream. 

989 

990 Raises StreamIndexError if no default value is given and the Stream 

991 does not have an item at the given index. 

992 

993 Stream(...).nth(0) gets the first element of the stream. 

994 

995 >>> Stream([1, 2, 3]).nth(0) 

996 1 

997 >>> Stream("abc").nth(1) 

998 'b' 

999 >>> Stream([]).nth(22) 

1000 Traceback (most recent call last): 

1001 ... 

1002 typed_stream.exceptions.StreamIndexError 

1003 >>> Stream([]).nth(22, default=42) 

1004 42 

1005 """ 

1006 value: T | _DefaultValueType 

1007 if index < 0: 

1008 tail = self.tail(abs(index)) 

1009 value = tail[0] if len(tail) == abs(index) else _DEFAULT_VALUE 

1010 else: # value >= 0 

1011 try: 

1012 value = self.drop(index).first() 

1013 except exceptions.StreamEmptyError: 

1014 value = _DEFAULT_VALUE 

1015 

1016 if not isinstance(value, _DefaultValueType): 

1017 return value 

1018 

1019 if isinstance(default, _DefaultValueType): 

1020 raise exceptions.StreamIndexError() 

1021 

1022 return default 

1023 

1024 @typing.overload 

1025 def nwise( # noqa: D102 

1026 self, size: typing.Literal[1], / # noqa: W504 

1027 ) -> Stream[tuple[T]]: ... 

1028 

1029 @typing.overload 

1030 def nwise( # noqa: D102 

1031 self, size: typing.Literal[2], / # noqa: W504 

1032 ) -> Stream[tuple[T, T]]: ... # noqa: D102 

1033 

1034 @typing.overload 

1035 def nwise( # noqa: D102 

1036 self, size: int, / # noqa: W504 

1037 ) -> Stream[tuple[T, ...]]: ... 

1038 

1039 def nwise( 

1040 self, size: int, / # noqa: W504 

1041 ) -> Stream[tuple[T, ...]] | Stream[tuple[T, T]] | Stream[tuple[T]]: 

1042 """Return a Stream of overlapping n-lets. 

1043 

1044 This is often called a sliding window. 

1045 For n=2 it behaves like pairwise from itertools. 

1046 

1047 The returned Stream will consist of tuples of length n. 

1048 If n is bigger than the count of values in self, it will be empty. 

1049 

1050 >>> Stream([1, 2, 3]).nwise(1).collect() 

1051 ((1,), (2,), (3,)) 

1052 >>> Stream([1, 2, 3]).nwise(2).collect() 

1053 ((1, 2), (2, 3)) 

1054 >>> Stream([1, 2, 3, 4]).nwise(3).collect() 

1055 ((1, 2, 3), (2, 3, 4)) 

1056 >>> Stream([1, 2, 3, 4, 5]).nwise(4).collect() 

1057 ((1, 2, 3, 4), (2, 3, 4, 5)) 

1058 """ 

1059 return self._finish( 

1060 Stream(_iteration_utils.sliding_window(self._data, size)) 

1061 ) 

1062 

1063 @override 

1064 def peek(self, fun: Callable[[T], object], /) -> Self: 

1065 """Peek at every value, without modifying the values in the Stream. 

1066 

1067 >>> stream = Stream([1, 2, 3]).peek(print) 

1068 >>> stream.map(str).collect() 

1069 1 

1070 2 

1071 3 

1072 ('1', '2', '3') 

1073 """ 

1074 self._data = map(_iteration_utils.Peeker(fun), self._data) 

1075 return self 

1076 

1077 def reduce( 

1078 self, 

1079 fun: Callable[[T, T], T], 

1080 initial: T | _DefaultValueType = _DEFAULT_VALUE, 

1081 ) -> T: 

1082 """Reduce the values of this stream. This finishes the Stream. 

1083 

1084 If no initial value is provided a StreamEmptyError is raised if 

1085 the stream is empty. 

1086 

1087 >>> Stream([1, 2, 3]).reduce(operator.add) 

1088 6 

1089 >>> Stream([1, 2, 3]).reduce(operator.mul) 

1090 6 

1091 >>> Stream([]).reduce(operator.mul) 

1092 Traceback (most recent call last): 

1093 ... 

1094 typed_stream.exceptions.StreamEmptyError 

1095 """ 

1096 iterator = iter(self._data) 

1097 if isinstance(initial, _DefaultValueType): 

1098 try: 

1099 initial = next(iterator) 

1100 except StopIteration: 

1101 raise exceptions.StreamEmptyError() from None 

1102 return self._finish(functools.reduce(fun, iterator, initial), True) 

1103 

1104 def starcollect(self, fun: _types.StarCallable[T, K]) -> K: 

1105 """Collect the values of this Stream. This finishes the Stream. 

1106 

1107 >>> Stream([1, 2, 3]).starcollect(lambda *args: args) 

1108 (1, 2, 3) 

1109 >>> Stream([]).starcollect(lambda *args: args) 

1110 () 

1111 """ 

1112 return self._finish(fun(*self._data), close_source=True) 

1113 

1114 @typing.overload 

1115 def starmap( # noqa: D102 

1116 self: Stream[_utils.IndexValueTuple[K]], 

1117 fun: Callable[[int, K], U], 

1118 /, 

1119 ) -> Stream[U]: ... 

1120 

1121 @typing.overload 

1122 def starmap( # noqa: D102 

1123 self: Stream[tuple[Unpack[Tvt]]], 

1124 fun: Callable[[Unpack[Tvt]], U], 

1125 /, 

1126 ) -> Stream[U]: ... 

1127 

1128 def starmap( # noqa: D102 

1129 self: Stream[tuple[Unpack[Tvt]]], 

1130 fun: Callable[[Unpack[Tvt]], U], 

1131 /, 

1132 ) -> Stream[U]: 

1133 """Map each value to another. 

1134 

1135 This lazily finishes the current Stream and creates a new one. 

1136 

1137 >>> Stream([(1, 2), (3, 4)]).starmap(operator.mul).collect() 

1138 (2, 12) 

1139 >>> Stream([(2, "x"), (3, "y")]).starmap(operator.mul).collect() 

1140 ('xx', 'yyy') 

1141 """ 

1142 return self._finish(Stream(itertools.starmap(fun, self._data))) 

1143 

1144 def sum(self: Stream[SA]) -> SA: 

1145 """Calculate the sum of the elements. 

1146 

1147 This works for every type that supports addition. 

1148 

1149 For numbers stream.collect(sum) could be faster. 

1150 For strings stream.collect("".join) could be faster. 

1151 For lists stream.flat_map(lambda _: _).collect(list) could be faster. 

1152 For tuples stream.flat_map(lambda _: _).collect(tuple) could be faster. 

1153 

1154 >>> Stream([1, 2, 3]).sum() 

1155 6 

1156 >>> Stream(["a", "b", "c"]).sum() 

1157 'abc' 

1158 >>> Stream([(1, 2), (3, 4)]).sum() 

1159 (1, 2, 3, 4) 

1160 >>> Stream(([1], [2], [3])).sum() 

1161 [1, 2, 3] 

1162 """ 

1163 return self.reduce(add) 

1164 

1165 def tail(self, c: int, /) -> streamable.StreamableSequence[T]: 

1166 """Return a Sequence with the last count items. 

1167 

1168 >>> Stream([1, 2, 3]).tail(2) 

1169 (2, 3) 

1170 >>> Stream.range(100).tail(10) 

1171 (90, 91, 92, 93, 94, 95, 96, 97, 98, 99) 

1172 """ 

1173 return self._finish( 

1174 streamable.StreamableSequence( 

1175 collections.deque(self._data, maxlen=c) 

1176 ), 

1177 close_source=True, 

1178 ) 

1179 

1180 def take_while(self, fun: Callable[[T], object]) -> Self: 

1181 """Take values as long the function returns a truthy value. 

1182 

1183 See: https://docs.python.org/3/library/itertools.html#itertools.takewhile 

1184 

1185 >>> Stream([1, 2, 3, 4, 1]).take_while(lambda x: x < 4).collect() 

1186 (1, 2, 3) 

1187 >>> Stream([1, 2, 3, -4, -1]).take_while(lambda x: x <= 0).collect() 

1188 () 

1189 """ 

1190 self._data = itertools.takewhile(fun, self._data) 

1191 return self