Coverage for tests/__main__.py: 95%

464 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 21:24 +0000

1# Licensed under the EUPL-1.2 or later. 

2# You may obtain a copy of the licence in all the official languages of the 

3# European Union at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 

4# pylint: disable=too-many-lines 

5 

6"""The tests for the Stream.""" 

7 

8from __future__ import annotations 

9 

10import doctest 

11import importlib 

12import operator 

13import sys 

14import traceback 

15import types 

16from collections import Counter 

17from collections.abc import Callable 

18from functools import partial 

19from numbers import Number, Real 

20from operator import add 

21from pathlib import Path 

22from types import EllipsisType 

23from typing import Any 

24 

25from typed_stream import ( 

26 BinaryFileStream, 

27 FileStream, 

28 Stream, 

29 StreamableSequence, 

30 StreamEmptyError, 

31 StreamFinishedError, 

32 StreamIndexError, 

33 _impl, 

34) 

35from typed_stream._impl._iteration_utils import ( 

36 IterWithCleanUp, 

37 Peeker, 

38 sliding_window, 

39) 

40from typed_stream._impl._lazy_file_iterators import ( 

41 LazyFileIteratorRemovingEndsBytes, 

42) 

43from typed_stream._impl._typing import assert_type 

44from typed_stream._impl._utils import IndexValueTuple 

45from typed_stream._impl.functions import method_partial 

46from typed_stream.functions import is_even, is_odd 

47 

48from .test_functions import ( 

49 is_bool, 

50 is_complex, 

51 is_float, 

52 is_int, 

53 is_none, 

54 is_not_none, 

55 is_number, 

56 is_real_number, 

57 is_str, 

58 noop, 

59) 

60 

61for export in Stream(_impl.__all__).map(partial(getattr, _impl)): # noqa: F405 

62 # pylint: disable-next=protected-access 

63 assert export._module() == "typed_stream", f"{export!r}" # nosec: B101 

64 del export 

65 

66 

67def testmod( 

68 mod: types.ModuleType, 

69 optionflags: int = doctest.DONT_ACCEPT_TRUE_FOR_1, 

70) -> tuple[int, int]: 

71 """Test modules. 

72 

73 Like doctest.testmod, but smaller. 

74 """ 

75 runner = doctest.DocTestRunner(optionflags=optionflags) 

76 

77 for test in doctest.DocTestFinder().find(mod, mod.__name__, mod): 

78 runner.run(test) 

79 

80 return runner.failures, runner.tries 

81 

82 

83def run_doc_tests() -> None: # noqa: C901 

84 """Run the doctests in the typed_stream package.""" 

85 dir_ = Path(__file__).resolve().parent.parent / "typed_stream" 

86 acc_fails, acc_tests = 0, 0 

87 for mod in sorted( 

88 Stream(dir_.rglob("*.py")) 

89 .map(lambda p: p.relative_to(dir_).as_posix()) 

90 .map(str.removesuffix, "/__init__.py") 

91 .map(str.removesuffix, ".py") 

92 .map(str.replace, "/", "."), 

93 key=lambda s: (s.count("."), len(s)), 

94 reverse=True, 

95 ): 

96 full_mod = f"typed_stream.{mod}" if mod else "typed_stream" 

97 fails, tests = testmod(importlib.import_module(full_mod)) 

98 acc_fails += fails 

99 acc_tests += tests 

100 if tests: 

101 print( 

102 f"{full_mod}: {tests - fails} / {tests} doctests successful", 

103 file=sys.stderr, 

104 ) 

105 

106 print( 

107 f"SUMMARY: {acc_tests - acc_fails} / {acc_tests} doctests successful", 

108 file=sys.stderr, 

109 ) 

110 if acc_fails == 1 and sys.implementation.name == "rustpython": 

111 print("ignoring 1 failure", file=sys.stderr) 

112 elif acc_fails: 

113 sys.exit(1) 

114 

115 

116run_doc_tests() 

117 

118 

119def typed_nwise_stuff() -> None: 

120 """Test whether the Stream.nwise overloads work.""" 

121 one: Stream[tuple[str]] = Stream("abc").nwise(1) 

122 assert one.map("".join).collect("".join) == "abc" 

123 

124 two: Stream[tuple[str, str]] = Stream("abc").nwise(2) 

125 assert two.map("".join).collect("".join) == "abbc" 

126 

127 three: Stream[tuple[str, ...]] = Stream("abcd").nwise(3) 

128 assert three.map("".join).collect("".join) == "abcbcd" 

129 

130 

131typed_nwise_stuff() 

132 

133 

134# pylint: disable=unnecessary-lambda, protected-access 

135 

136 

137def assert_raises(exc: type[BaseException], fun: Callable[[], object]) -> None: 

138 """Assert that fun raises exc.""" 

139 try: 

140 val = fun() 

141 except exc: 

142 return 

143 except BaseException: # pragma: no cover 

144 print(f"{fun} did not raise {exc}", file=sys.stderr) 

145 raise 

146 raise AssertionError( 

147 f"{fun} did not raise {exc} and instead returned {val}" 

148 ) 

149 

150 

151assert_raises(AssertionError, lambda: assert_raises(Exception, lambda: None)) 

152assert_raises(TypeError, lambda: hash(Stream(...))) 

153assert_raises(TypeError, lambda: hash(Stream([0, 1]))) 

154 

155assert_raises(ValueError, lambda: sliding_window([], -1)) 

156assert_raises(ValueError, lambda: sliding_window((), 0)) 

157assert_raises(ValueError, lambda: Stream([]).nwise(0)) 

158assert_raises(ValueError, lambda: Stream(()).nwise(-1)) 

159 

160assert_raises(ValueError, Stream("1a2b3c4d5").map(int).sum) 

161assert_raises(ValueError, Stream("1a2b3c4d5").map(int).catch(TypeError).sum) 

162assert_raises(ValueError, lambda: Stream(()).catch(StopIteration)) 

163assert_raises(ValueError, lambda: Stream(()).catch(StopIteration, TypeError)) 

164assert_raises(ValueError, lambda: Stream(()).catch(ValueError, StopIteration)) 

165assert ( 

166 assert_type( 

167 Stream("1a2b3c4d5e6f7g8h9").map(int).catch(ValueError).sum(), int 

168 ) 

169 == 45 

170) 

171assert ( 

172 assert_type( 

173 Stream("1a2b3c4d5e6f7g8h9").map(int).catch(Exception).sum(), int 

174 ) 

175 == 45 

176) 

177 

178assert assert_type(Stream("abbccc122333").collect(Counter), Counter[str]) == { 

179 "a": 1, 

180 "b": 2, 

181 "c": 3, 

182 "1": 1, 

183 "2": 2, 

184 "3": 3, 

185} 

186 

187assert Stream.range(10).conditional_map( 

188 is_even, lambda x: x, lambda x: -x 

189).collect() == (0, -1, 2, -3, 4, -5, 6, -7, 8, -9) 

190assert assert_type( 

191 Stream.range(10) 

192 .conditional_map(is_even, lambda x: x * 2, lambda x: -x) 

193 .collect(), 

194 StreamableSequence[int], 

195) == (0, -1, 4, -3, 8, -5, 12, -7, 16, -9) 

196assert assert_type( 

197 Stream.range(10).conditional_map(is_odd, lambda x: -x, lambda x: x * 2), 

198 Stream[int], 

199).collect() == (0, -1, 4, -3, 8, -5, 12, -7, 16, -9) 

200assert ( 

201 assert_type( 

202 Stream.range(10) 

203 .conditional_map(is_even, lambda _: ..., lambda x: None) 

204 .collect(), 

205 StreamableSequence[None | EllipsisType], 

206 ) 

207 == (..., None) * 5 

208) 

209assert assert_type( 

210 Stream.range(10).conditional_map(is_odd, lambda x: -x), Stream[int] 

211).collect() == ( 

212 -1, 

213 -3, 

214 -5, 

215 -7, 

216 -9, 

217) 

218assert ( 

219 assert_type( 

220 Stream.range(10).conditional_map(is_even, lambda _: ...).collect(), 

221 StreamableSequence[EllipsisType], 

222 ) 

223 == (...,) * 5 

224) 

225 

226 

227def raise_exceptions(number: int) -> int: 

228 """Raise different exceptions.""" 

229 if number == 1: 

230 raise ValueError("1") 

231 if number == 3: 

232 raise TypeError("3") 

233 if number == 5: 

234 raise ZeroDivisionError("5") 

235 return number 

236 

237 

238assert assert_type( 

239 Stream.range(6) 

240 .map(raise_exceptions) 

241 .catch(ValueError, TypeError, ZeroDivisionError, default=lambda: -1) 

242 .collect(), 

243 StreamableSequence[int], 

244) == (0, -1, 2, -1, 4, -1) 

245 

246assert assert_type( 

247 Stream.range(6) 

248 .map(raise_exceptions) 

249 .catch(ValueError, TypeError, ZeroDivisionError) 

250 .collect(), 

251 StreamableSequence[int], 

252) == (0, 2, 4) 

253 

254assert assert_type( 

255 Stream.range(6) 

256 .map(raise_exceptions) 

257 .map(str) 

258 .catch( 

259 ValueError, 

260 TypeError, 

261 ZeroDivisionError, 

262 default=lambda _: f"E{_.args[0]}", 

263 ) 

264 .collect(), 

265 StreamableSequence[str], 

266) == ("0", "E1", "2", "E3", "4", "E5") 

267 

268assert ( 

269 assert_type( 

270 Stream.range(20) 

271 .map(raise_exceptions) 

272 .catch(ValueError, TypeError, ZeroDivisionError) 

273 .sum(), 

274 int, 

275 ) 

276 == sum(range(20)) - 1 - 3 - 5 

277) 

278assert ( 

279 assert_type( 

280 Stream.range(20) 

281 .map(raise_exceptions) 

282 .catch(ValueError, TypeError, ZeroDivisionError, default=lambda: 0) 

283 .sum(), 

284 int, 

285 ) 

286 == sum(range(20)) - 1 - 3 - 5 

287) 

288assert ( 

289 assert_type( 

290 Stream.range(20) 

291 .map(raise_exceptions) 

292 .catch(ValueError, TypeError, ZeroDivisionError, default=lambda _: 0) 

293 .sum(), 

294 int, 

295 ) 

296 == sum(range(20)) - 1 - 3 - 5 

297) 

298 

299 

300errors: list[ValueError] = [] 

301assert ( 

302 assert_type( 

303 Stream("1a2b3c4d5e6f7g8h9") 

304 .map(int) 

305 .catch(ValueError, handler=errors.append, default=lambda: 100) 

306 .sum(), 

307 int, 

308 ) 

309 == 845 

310) 

311assert ( 

312 assert_type(Stream(errors).map(type).collect(list), list[type]) 

313 == [ValueError] * 8 

314) 

315errors = [] 

316assert ( 

317 assert_type( 

318 Stream("1x2x3x4x5x6x7x8x9") 

319 .map(int) 

320 .catch( 

321 ValueError, 

322 handler=errors.append, 

323 default=lambda _: len(_.args) * 1000, 

324 ) 

325 .sum(), 

326 int, 

327 ) 

328 == 8045 

329) 

330value_error: ValueError 

331try: 

332 int("x") 

333except ValueError as _: 

334 value_error = _ 

335else: 

336 raise AssertionError("x is int") 

337assert Stream(errors).map(type).collect(list) == [ValueError] * 8 

338assert ( 

339 Stream(errors).flat_map(lambda _: _.args).distinct().collect() 

340 == value_error.args 

341) 

342 

343optional_float_list: list[float | None] = list( 

344 Stream.range(3) 

345 .map((5).__truediv__) 

346 .catch(ZeroDivisionError, default=lambda: None) 

347) 

348assert optional_float_list == [None, 5.0, 2.5] 

349 

350assert ( 

351 Stream.range(69).collect() 

352 == Stream.range(69).nwise(1).sum() 

353 == Stream.range(69).map(lambda x: (x,)).sum() 

354 == Stream.range(69).nwise(1).flat_map(lambda x: x).collect(tuple) 

355 == Stream.range(69).map(lambda x: (x,)).flat_map(lambda x: x).collect(tuple) 

356) 

357assert ( 

358 Stream.range(69).collect(list) 

359 == Stream.range(69).map(lambda x: [x]).sum() 

360 == Stream.range(69).map(lambda x: [x]).flat_map(lambda x: x).collect(list) 

361) 

362assert Stream.range(10).enumerate(-10).sum() == Stream.range(10).enumerate( 

363 -10 

364).flat_map(lambda x: x).collect(tuple) 

365 

366assert " ".join(Stream("ABCDEFG").nwise(1).map("".join)) == "A B C D E F G" 

367assert Stream("ABCDEFG").nwise(1).collect() == ( 

368 ("A",), 

369 ("B",), 

370 ("C",), 

371 ("D",), 

372 ("E",), 

373 ("F",), 

374 ("G",), 

375) 

376 

377assert " ".join(Stream("ABCDEFG").nwise(2).map("".join)) == "AB BC CD DE EF FG" 

378assert Stream("ABCDEFG").nwise(2).collect() == ( 

379 ("A", "B"), 

380 ("B", "C"), 

381 ("C", "D"), 

382 ("D", "E"), 

383 ("E", "F"), 

384 ("F", "G"), 

385) 

386assert ( 

387 " ".join(Stream("ABCDEFG").nwise(3).map("".join)) == "ABC BCD CDE DEF EFG" 

388) 

389assert Stream("ABCDEFG").nwise(3).collect() == ( 

390 ("A", "B", "C"), 

391 ("B", "C", "D"), 

392 ("C", "D", "E"), 

393 ("D", "E", "F"), 

394 ("E", "F", "G"), 

395) 

396assert ( 

397 " ".join(Stream("ABCDEFG").nwise(4).map("".join)) == "ABCD BCDE CDEF DEFG" 

398) 

399assert Stream("ABCDEFG").nwise(4).collect() == ( 

400 ("A", "B", "C", "D"), 

401 ("B", "C", "D", "E"), 

402 ("C", "D", "E", "F"), 

403 ("D", "E", "F", "G"), 

404) 

405assert " ".join(Stream("ABCDEFG").nwise(5).map("".join)) == "ABCDE BCDEF CDEFG" 

406assert Stream("ABCDEFG").nwise(5).collect() == ( 

407 ("A", "B", "C", "D", "E"), 

408 ("B", "C", "D", "E", "F"), 

409 ("C", "D", "E", "F", "G"), 

410) 

411assert " ".join(Stream("ABCDEFG").nwise(6).map("".join)) == "ABCDEF BCDEFG" 

412assert Stream("ABCDEFG").nwise(6).collect() == ( 

413 ("A", "B", "C", "D", "E", "F"), 

414 ("B", "C", "D", "E", "F", "G"), 

415) 

416assert " ".join(Stream("ABCDEFG").nwise(7).map("".join)) == "ABCDEFG" 

417assert Stream("ABCDEFG").nwise(7).collect() == ( 

418 ("A", "B", "C", "D", "E", "F", "G"), 

419) 

420assert Stream("").nwise(1).collect() == () 

421assert Stream("A").nwise(2).collect() == () 

422assert Stream("AB").nwise(3).collect() == () 

423assert Stream("ABC").nwise(4).collect() == () 

424assert Stream("ABCDEFG").nwise(8).collect() == () 

425 

426assert 0 in Stream.counting() 

427assert 1 in Stream([1]) 

428assert 2 not in Stream.range(1, 100, 2) 

429assert 3 not in Stream.range(3) 

430 

431assert "0" in Stream.counting().map(str) 

432assert "1" in Stream([1]).map(str) 

433assert "2" not in Stream.range(1, 100, 2).map(str) 

434assert "3" not in Stream.range(3).map(str) 

435 

436assert assert_type("".join(reversed(Stream("abc"))), str) == "cba" 

437assert tuple(reversed(Stream([1, 2, 3, 4]))) == (4, 3, 2, 1) 

438 

439assert assert_type(Stream([1, 2, 3]).collect(tuple), tuple[int, ...]) == ( 

440 1, 

441 2, 

442 3, 

443) 

444assert assert_type(Stream([1, 2, 3]).collect(set), set[int]) == {1, 2, 3} 

445mapping: dict[int, str] = ( 

446 Stream([1, 2, 3]).map(lambda x: (x, str(x))).collect(dict) 

447) 

448assert mapping == {1: "1", 2: "2", 3: "3"} 

449 

450assert assert_type(Stream([1, 2, 3]).sum(), int) == 6 

451assert assert_type(Stream([1, 2, 3]).max(), int) == 3 

452assert assert_type(Stream([1, 2, 3]).min(), int) == 1 

453assert assert_type(Stream([1, 2, 3]).min(default=0), int) == 1 

454 

455assert assert_type(Stream(["1", "2", "3"]).max(), str) == "3" 

456assert assert_type(Stream(["1", "2", "3"]).min(), str) == "1" 

457assert assert_type(Stream(["1", "2", "3"]).min(default="a"), str) == "1" 

458 

459_stream1 = Stream.from_value(69).enumerate().nwise(3).catch() 

460assert isinstance( 

461 _stream2 := eval( # pylint: disable=eval-used 

462 repr(_stream1), 

463 { 

464 "typed_stream": __import__("typed_stream"), 

465 "repeat": __import__("itertools").repeat, 

466 }, 

467 ), 

468 Stream, 

469) 

470assert _stream1.limit(1000).collect() == _stream2.limit(1000).collect() 

471assert ( 

472 repr(iter(Stream.from_value(69))) 

473 == "typed_stream._impl._iteration_utils.IterWithCleanUp" 

474 + "(<bound method StreamABC.close of typed_stream.Stream" 

475 + "(repeat(69))>,repeat(69))" 

476) 

477try: 

478 assert ( 

479 repr(Peeker(print)) 

480 == "typed_stream._impl._iteration_utils.Peeker(<built-in function print>)" 

481 ) 

482except AssertionError: # pragma: no cover 

483 if sys.implementation.name == "rustpython": 

484 traceback.print_exc() 

485 else: 

486 raise 

487else: # pragma: no cover 

488 if sys.implementation.name == "rustpython": 

489 raise AssertionError("Doesn't fail anymore on RustPython") 

490 

491 

492assert str(Stream(...)) == "typed_stream.Stream(...)" 

493assert repr(Stream(...)) == "typed_stream.Stream(...)" 

494assert repr(FileStream(...)) == "typed_stream.FileStream(...)" 

495 

496assert_raises(StreamFinishedError, lambda: Stream(...)._data) 

497assert_raises(StreamFinishedError, lambda: BinaryFileStream(...)._data) 

498assert_raises(StreamFinishedError, lambda: FileStream(...)._data) 

499 

500assert assert_type( 

501 Stream.range(5).map(str).enumerate(), Stream[IndexValueTuple[str]] 

502).collect(tuple) == ( 

503 (0, "0"), 

504 (1, "1"), 

505 (2, "2"), 

506 (3, "3"), 

507 (4, "4"), 

508) 

509 

510 

511assert assert_type( 

512 Stream.range(5).map(str).enumerate().map(lambda x: x.idx).collect(list), 

513 list[int], 

514) == list(range(5)) 

515enumeration_stream = assert_type( 

516 Stream.range(5).map(str).enumerate(), Stream[IndexValueTuple[str]] 

517) 

518assert assert_type( 

519 enumeration_stream.map(lambda x: x.val).collect(list), list[str] 

520) == list(map(str, range(5))) 

521assert assert_type( 

522 Stream.range(5).map(str).enumerate().map(lambda x: x.val).collect(list), 

523 list[str], 

524) == list(map(str, range(5))) 

525assert assert_type( 

526 Stream.range(5).map(str).enumerate().collect(dict), dict[int, str] 

527) == {0: "0", 1: "1", 2: "2", 3: "3", 4: "4"} 

528 

529assert assert_type( 

530 list(Stream.range(999).enumerate().starmap(int.__eq__).distinct()), 

531 list[bool], 

532) == [True] 

533assert assert_type( 

534 list(Stream.range(999).enumerate().starmap(operator.eq).distinct()), 

535 list[Any], 

536) == [True] 

537assert ( 

538 assert_type(Stream.range(10).nwise(1).starmap(str).sum(), str) 

539 == "0123456789" 

540) 

541assert ( 

542 assert_type(Stream.range(6).nwise(2).starmap(int.__mul__).sum(), int) == 40 

543) 

544assert ( 

545 assert_type(Stream.range(6).nwise(2).starmap(operator.mul).sum(), Any) == 40 

546) 

547 

548STRING = "pjwa nsvoidnvifbp s,cpvmodo nngfibogfmjv in" 

549assert ( 

550 assert_type(Stream(STRING).distinct().collect("".join), str) 

551 == "pjwa nsvoidfb,cmg" 

552) 

553assert ( 

554 assert_type(Stream(STRING).distinct(use_set=False).sum(), str) 

555 == "pjwa nsvoidfb,cmg" 

556) 

557 

558 

559def create_int_stream() -> Stream[int]: 

560 """Create an int stream.""" 

561 return Stream.range(10_000).map(operator.pow, 2) 

562 

563 

564assert ( 

565 333283335000 

566 == assert_type(sum(create_int_stream()), int) 

567 == assert_type(create_int_stream().reduce(lambda x, y: x + y), int) 

568 == assert_type(create_int_stream().reduce(int.__add__), int) 

569 == assert_type(create_int_stream().reduce(add, 0), int) 

570 == assert_type(create_int_stream().reduce(add), int) 

571 == assert_type(create_int_stream().sum(), int) 

572 == assert_type(create_int_stream().collect(lambda x: sum(x)), int) 

573 == assert_type(create_int_stream().collect(sum), int) 

574) 

575 

576assert assert_type(Stream([1, 2, 3, -1]).max(), int) == 3 

577assert assert_type( 

578 Stream([{0, 1}, {2}, {3, 4, 5}, {6}]).max(key=len), set[int] 

579) == {3, 4, 5} 

580assert assert_type(Stream([1, 2, -1, 3]).min(), int) == -1 

581assert assert_type( 

582 Stream([{0, 1}, {2}, {3, 4, 5}, {6}]).min(key=len), set[int] 

583) == {2} 

584 

585assert assert_type(Stream([1, 2, 3]).max(default=None), None | int) == 3 

586assert not assert_type(Stream([1]).limit(0).max(default=None), None | int) 

587assert assert_type(Stream([1, 2, 3]).first(default=None), None | int) == 1 

588assert not assert_type(Stream([1]).limit(0).first(default=None), None | int) 

589assert assert_type(Stream([1, 2, 3]).min(default=None), None | int) == 1 

590assert not assert_type(Stream([1]).limit(0).min(default=None), None | int) 

591 

592assert assert_type(Stream([1, 2, 3]).max(default="None"), str | int) == 3 

593assert ( 

594 assert_type(Stream([1]).limit(0).max(default="None"), str | int) == "None" 

595) 

596assert assert_type(Stream([1, 2, 3]).first(default="None"), str | int) == 1 

597assert ( 

598 assert_type(Stream([1]).limit(0).first(default="None"), str | int) == "None" 

599) 

600assert assert_type(Stream([1, 2, 3]).min(default="None"), str | int) == 1 

601assert ( 

602 assert_type(Stream([1]).limit(0).min(default="None"), str | int) == "None" 

603) 

604 

605assert Stream((1,)).drop(100).reduce(add, 1) == 1 

606assert Stream("").reduce(add, "x") == "x" 

607assert_raises(StreamEmptyError, Stream("").sum) 

608 

609assert tuple(Stream([1, 2, 2, 2, 3]).distinct()) == (1, 2, 3) 

610assert tuple(Stream([1, 2, 1, 1, 2, 1, 2, 3, 3, 3, 2, 2, 1]).distinct()) == ( 

611 1, 

612 2, 

613 3, 

614) 

615 

616assert Stream([1, 4, 7]).flat_map(lambda x: [x, x + 1, x + 2]).collect( 

617 list 

618) == [1, 2, 3, 4, 5, 6, 7, 8, 9] 

619assert Stream([1, 2, 3, 4, 5]).limit(3).collect(list) == [1, 2, 3] 

620assert not Stream([]).limit(3).collect(list) 

621assert Stream([1]).limit(3).collect(list) == [1] 

622assert Stream([1, 2, 3, 4, 5]).count() == 5 

623assert Stream([1, 4, 5]).last() == 5 

624assert Stream([1, 4, 5]).first() == 1 

625assert Stream([True, True, True]).all() 

626assert Stream([]).empty() 

627assert not Stream([1]).empty() 

628 

629assert Stream([1, 2, 3]).chain([4, 5, 6]).collect(tuple) == (1, 2, 3, 4, 5, 6) 

630assert Stream.range(25).chunk(5).map(lambda x: list(x)).collect(tuple) == ( 

631 [0, 1, 2, 3, 4], 

632 [5, 6, 7, 8, 9], 

633 [10, 11, 12, 13, 14], 

634 [15, 16, 17, 18, 19], 

635 [20, 21, 22, 23, 24], 

636) 

637assert_raises(ValueError, lambda: Stream(()).chunk(0)) 

638 

639int_list: list[int] = Stream([None, 1, 2, 3, 4, 0, 23]).filter().collect(list) 

640assert int_list == [1, 2, 3, 4, 23] 

641int_list = Stream([None, 1, 2, 3, None]).filter(is_not_none).collect(list) 

642assert int_list == [1, 2, 3] 

643int_list = Stream([None, 1, 2, 3, None]).exclude(is_none).collect(list) 

644assert int_list == [1, 2, 3] 

645int_list = [] 

646Stream([None, 1, 2, 3, None]).exclude(is_none).for_each(int_list.append) 

647assert int_list == [1, 2, 3] 

648 

649assert len(Stream.from_value("x").limit(1000).tail(10)) == 10 

650 

651assert not Stream("").count() 

652assert Stream.range(10_000).chunk(100).count() == 100 

653assert list(Stream.range(10_000).chunk(100).map(len).distinct()) == [100] 

654 

655assert Stream.counting().take_while((100).__gt__).count() == 100 

656assert list(Stream.counting().take_while((5).__gt__)) == [0, 1, 2, 3, 4] 

657assert list(Stream.range(10).drop_while((5).__gt__)) == [5, 6, 7, 8, 9] 

658assert Stream.range(10).tail(5) == (5, 6, 7, 8, 9) 

659 

660 

661str_stream: Stream[str] = Stream([None, "1", "2", "3", 4, 0, 23]).filter(is_str) 

662assert str_stream.collect(list) == ["1", "2", "3"] 

663 

664assert ( 

665 repr(FileStream("file.txt", "Utf-8", False)) 

666 == "typed_stream.FileStream('file.txt','Utf-8',False)" 

667) 

668assert ( 

669 repr(FileStream("file.txt", "Utf-8", True)) 

670 == "typed_stream.FileStream('file.txt','Utf-8',True)" 

671) 

672 

673INPUT_TXT = Path(__file__).parent / "input.txt" 

674 

675assert ( 

676 FileStream(INPUT_TXT) 

677 .filter(lambda string: string and not string.startswith("#")) 

678 .map(int) 

679 .sum() 

680 == 7 

681) 

682assert ( 

683 FileStream(INPUT_TXT) 

684 .filter() 

685 .exclude(method_partial(str.startswith, "#")) 

686 .map(int) 

687 .sum() 

688 == 7 

689) 

690assert FileStream(INPUT_TXT).map(int).catch(ValueError).sum() == 7 

691 

692assert FileStream(INPUT_TXT, keep_line_ends=True).map( 

693 lambda x: x[-1] 

694).distinct().collect(tuple) == ("\n",) 

695 

696fs = FileStream(INPUT_TXT) 

697assert fs.chain(" ").last() == " " 

698assert fs._file_iterator is None 

699fs.close() # closing twice shouldn't fail 

700assert fs._file_iterator is None 

701 

702fs = FileStream(INPUT_TXT, keep_line_ends=True) 

703for line in fs: 

704 assert line.endswith("\n") 

705 

706assert fs._file_iterator is None 

707fs = FileStream(INPUT_TXT) 

708for line in fs: 

709 assert not line.endswith("\n") 

710 

711assert fs._file_iterator is None 

712 

713fs = FileStream(INPUT_TXT) 

714assert fs.map(lambda _: ...).limit(1).collect(list) == [...] 

715assert fs._file_iterator is None 

716 

717fs = FileStream(INPUT_TXT) 

718assert ( 

719 fs.limit(10).map(repr).map(len).peek(lambda _: ...).map((1).__add__).count() 

720 == 10 

721) 

722assert fs._file_iterator is None 

723 

724with FileStream(INPUT_TXT) as fs: 

725 assert isinstance(next(iter(fs)), str) 

726assert fs._file_iterator is None 

727 

728fs = FileStream(INPUT_TXT) 

729assert fs.take_while(len).count() == 4 

730assert fs._file_iterator is None 

731 

732 

733assert BinaryFileStream(INPUT_TXT, keep_line_ends=True).map( 

734 lambda x: x[-1] 

735).distinct().collect(tuple) == (b"\n"[0],) 

736 

737bfs: BinaryFileStream = BinaryFileStream(INPUT_TXT) 

738assert bfs.chain([b" "]).last() == b" " 

739assert bfs._file_iterator is None 

740 

741bfs = BinaryFileStream(INPUT_TXT) 

742assert list(bfs.map(lambda _: ...).limit(1)) == [...] 

743assert bfs._file_iterator is None 

744 

745bfs = BinaryFileStream(INPUT_TXT) 

746assert ( 

747 bfs.limit(10) 

748 .map(repr) 

749 .map(len) 

750 .peek(lambda _: ...) 

751 .map((1).__add__) 

752 .count() 

753 == 10 

754) 

755assert bfs._file_iterator is None 

756 

757bfs = BinaryFileStream(INPUT_TXT) 

758assert bfs.take_while(len).count() == 4 

759assert bfs._file_iterator is None 

760 

761bfs = BinaryFileStream(INPUT_TXT) 

762first = bfs.first() 

763assert bfs._file_iterator is None 

764 

765with BinaryFileStream(INPUT_TXT) as bfs: 

766 assert first == next(iter(bfs)) 

767assert bfs._file_iterator is None 

768 

769with LazyFileIteratorRemovingEndsBytes(INPUT_TXT) as lfireb: 

770 assert first == next(lfireb) 

771assert not lfireb._file_object 

772lfireb.close() 

773assert not lfireb._file_object 

774lfireb = LazyFileIteratorRemovingEndsBytes(INPUT_TXT) 

775assert next(lfireb) == first 

776assert lfireb._file_object 

777lfireb.close() 

778assert lfireb._file_object is None 

779 

780bfs = BinaryFileStream(INPUT_TXT) # type: ignore[unreachable] 

781fs = FileStream(INPUT_TXT) 

782assert tuple(bfs) == tuple(fs.map(str.encode, "UTF-8").collect(tuple)) 

783assert bfs._file_iterator is None 

784assert fs._file_iterator is None 

785 

786int_list_begin: list[int] = [] 

787int_list_end: list[int] = [] 

788int_stream: Stream[int] = ( 

789 Stream.range(10_000) 

790 .peek(int_list_begin.append) 

791 .limit(1000) 

792 .drop_while(lambda x: x < 500) 

793 .exclude(lambda x: x % 2) 

794 .flat_map(range) 

795 .filter(lambda x: x % 2) 

796 .map(operator.pow, 2) 

797 .peek(int_list_end.append) 

798) 

799assert not int_list_begin # the above code did nothing 

800assert not int_list_end 

801assert list(int_stream) == int_list_end 

802assert int_list_end # list(int_stream) consumed the stream 

803assert len(int_list_begin) == 1000 

804 

805assert repr(int_stream) == "typed_stream.Stream(...)" 

806 

807assert Stream(["abc", "def", "ghijk"]).flat_map(str.encode, "ASCII").map( 

808 operator.sub, 97 

809).collect(tuple) == (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

810 

811stream = ( 

812 Stream.range(10000000000000000000000000000000000000000000000000000000000000) 

813 .peek(noop) 

814 .map(str) 

815 .flat_map(str.encode, "ASCII") 

816 .map(operator.sub, ord("0")) 

817 .limit(100) 

818 .map(operator.mul, 10) 

819 .drop_while((10).__gt__) 

820 .take_while((90).__gt__) 

821 .filter(operator.truth) 

822 .exclude(operator.not_) 

823) 

824 

825for i in range(100): 

826 assert Stream.range(10_000)[i] == i 

827 assert Stream.range(10_000).nth(i) == i 

828 if not i: 

829 continue 

830 assert Stream.range(10_000)[-i] == 10_000 - i 

831 assert Stream.range(10_000).nth(-i) == 10_000 - i 

832 

833 

834for name in dir(Stream(...)): # noqa: C901 # pylint: disable=too-complex 

835 if name in { 

836 "__class__", 

837 "__class_getitem__", 

838 "__del__", 

839 "__delattr__", 

840 "__dir__", 

841 "__enter__", 

842 "__eq__", 

843 "__exit__", 

844 "__format__", 

845 "__ge__", 

846 "__getattribute__", 

847 "__getstate__", 

848 "__gt__", 

849 "__init__", 

850 "__init_subclass__", 

851 "__le__", 

852 "__lt__", 

853 "__ne__", 

854 "__new__", 

855 "__reduce__", 

856 "__reduce_ex__", 

857 "__repr__", 

858 "__setattr__", 

859 "__sizeof__", 

860 "__str__", 

861 "__subclasshook__", 

862 "_close_source", 

863 "_data", 

864 "_finish", 

865 "_full_class_name", 

866 "_get_args", 

867 "_module", 

868 "_StreamABC__data", 

869 "close", 

870 "collect_and_await_all", 

871 "collect_async_to_awaited_tasks", 

872 "counting", 

873 "from_value", 

874 "range", 

875 }: 

876 continue 

877 if ( 

878 sys.implementation.name == "rustpython" 

879 and getattr(Stream(...), name, None) is None 

880 ): 

881 print(f"ignoring Stream.{name}") 

882 continue 

883 if isinstance(method := getattr(Stream(...), name), Callable): 

884 args: tuple[Any, ...] 

885 if name == "chain": 

886 args = ([],) 

887 elif name in { 

888 "chunk", 

889 "drop", 

890 "limit", 

891 "nth", 

892 "nwise", 

893 "tail", 

894 "__contains__", 

895 "__getitem__", 

896 }: 

897 args = (2,) 

898 elif name in { 

899 "concurrent_map", 

900 "drop_while", 

901 "exclude", 

902 "flat_map", 

903 "map", 

904 "peek", 

905 "reduce", 

906 "starcollect", 

907 "starmap", 

908 "take_while", 

909 }: 

910 args = (lambda: ...,) 

911 elif name == "catch": 

912 args = (Exception,) 

913 elif name == "conditional_map": 

914 args = (lambda _: ..., lambda _: ..., lambda _: ...) 

915 else: 

916 args = () 

917 try: 

918 assert_raises(StreamFinishedError, partial(method, *args)) 

919 except NotImplementedError: 

920 if sys.implementation.name == "rustpython": 

921 traceback.print_exc() 

922 else: 

923 raise 

924 

925 

926assert_raises(StreamIndexError, lambda: Stream.range(10)[10]) 

927assert_raises(StreamIndexError, lambda: Stream.range(10)[-11]) 

928assert_raises(StreamIndexError, lambda: Stream(())[-1]) 

929assert_raises(StreamIndexError, lambda: Stream(())[0]) 

930assert_raises(StreamIndexError, lambda: Stream(())[1]) 

931assert_raises(StreamIndexError, lambda: Stream(()).nth(-1)) 

932assert_raises(StreamIndexError, lambda: Stream(()).nth(0)) 

933assert_raises(StreamIndexError, lambda: Stream(()).nth(1)) 

934 

935assert_raises(StreamEmptyError, lambda: Stream(()).first()) 

936assert_raises(StreamEmptyError, lambda: Stream([]).first()) 

937assert_raises(StreamEmptyError, lambda: Stream(()).last()) 

938assert_raises(StreamEmptyError, lambda: Stream([]).last()) 

939 

940 

941assert Stream.range(100).nth(1_000, default=None) is None 

942assert Stream.range(100).nth(-1_000, default=None) is None 

943assert Stream(()).nth(1, default=None) is None 

944assert Stream(()).nth(-1, default=None) is None 

945 

946assert Stream.range(100)[:10] == tuple(range(10)) 

947assert Stream.range(100)[90:] == tuple(range(90, 100)) 

948assert Stream.range(1000)[90:100] == tuple(range(90, 100)) 

949assert Stream.range(1000)[90:100:2] == tuple(range(90, 100, 2)) 

950 

951assert Stream.range(1000)[20:44:5] == tuple(range(20, 44, 5)) 

952 

953assert list(Stream.range(10)) == list(range(10)) 

954assert list(Stream.range(stop=10)) == list(range(10)) 

955assert list(Stream.range(0, 20)) == list(range(0, 20)) 

956assert list(Stream.range(0, stop=20)) == list(range(0, 20)) 

957assert list(Stream.range(start=0, stop=20)) == list(range(0, 20)) 

958assert list(Stream.range(0, 20, 3)) == list(range(0, 20, 3)) 

959assert list(Stream.range(0, 20, step=3)) == list(range(0, 20, 3)) 

960assert list(Stream.range(0, stop=20, step=3)) == list(range(0, 20, 3)) 

961assert list(Stream.range(start=0, stop=20, step=3)) == list(range(0, 20, 3)) 

962 

963 

964# ints_and_strs: list[int | str] = [1, "2", 3, "4"] 

965# str_list: list[str] = list(Stream(ints_and_strs).exclude(is_int)) 

966# assert str_list == ["2", "4"] 

967# int_list: list[str] = list(Stream(ints_and_strs).exclude(is_str)) 

968# assert int_list == [1, 3] 

969 

970 

971source: list[str | int | float | complex | bool | None] = [ 

972 None, 

973 True, 

974 "2", 

975 3, 

976 4.2, 

977 5j, 

978] 

979assert assert_type(Stream(source).filter(is_str).collect(list), list[str]) == [ 

980 "2" 

981] 

982assert assert_type(Stream(source).filter(is_int).collect(list), list[int]) == [ 

983 True, 

984 3, 

985] 

986assert assert_type( 

987 Stream(source).filter(is_float).collect(list), list[float] 

988) == [4.2] 

989assert assert_type( 

990 Stream(source).filter(is_complex).collect(list), list[complex] 

991) == [5j] 

992assert assert_type( 

993 Stream(source).filter(is_bool).collect(list), list[bool] 

994) == [True] 

995assert assert_type( 

996 Stream(source).filter(is_number).collect(list), list[Number] 

997) == [True, 3, 4.2, 5j] 

998assert assert_type( 

999 Stream(source).filter(is_real_number).collect(list), list[Real] 

1000) == [True, 3, 4.2] 

1001assert assert_type( 

1002 Stream(source).filter(is_none).collect(list), list[None] 

1003) == [None] 

1004assert assert_type( 

1005 Stream(source).filter(is_not_none).collect(list), 

1006 list[str | int | float | complex | bool], 

1007) == [True, "2", 3, 4.2, 5j] 

1008 

1009 

1010assert assert_type( 

1011 Stream(source).exclude(is_str).collect(list), 

1012 list[int | float | complex | bool | None], 

1013) == [None, True, 3, 4.2, 5j] 

1014assert assert_type( 

1015 Stream(source).exclude(is_int).collect(list), 

1016 list[str | float | complex | bool | None], 

1017) == [None, "2", 4.2, 5j] 

1018assert assert_type( 

1019 Stream(source).exclude(is_float).collect(list), 

1020 list[str | int | complex | bool | None], 

1021) == [None, True, "2", 3, 5j] 

1022assert assert_type( 

1023 Stream(source).exclude(is_complex).collect(list), 

1024 list[str | int | float | bool | None], 

1025) == [None, True, "2", 3, 4.2] 

1026assert assert_type( 

1027 Stream(source).exclude(is_bool).collect(list), 

1028 list[str | int | float | complex | None], 

1029) == [None, "2", 3, 4.2, 5j] 

1030assert assert_type( 

1031 Stream(source).exclude(is_number).collect(list), list[str | None] 

1032) == [None, "2"] 

1033assert assert_type( 

1034 Stream(source).exclude(is_real_number).collect(list), 

1035 list[str | None | complex], 

1036) == [None, "2", 5j] 

1037assert assert_type( 

1038 Stream(source).exclude(is_none).collect(list), 

1039 list[str | int | float | complex | bool], 

1040) == [True, "2", 3, 4.2, 5j] 

1041assert assert_type( 

1042 Stream(source).exclude(is_not_none).collect(list), list[None] 

1043) == [None] 

1044 

1045assert not Stream(()).count() 

1046assert Stream(range(100)).drop(10).count() == 90 

1047assert Stream(range(100)).drop(10).collect() == tuple(range(10, 100)) 

1048assert Stream("abc").drop(2).collect() == ("c",) 

1049assert Stream("abc")[::] == ("a", "b", "c") == tuple("abc")[::] 

1050assert Stream("abc")[-2::] == ("b", "c") == tuple("abc")[-2::] 

1051assert Stream("abc")[::-1] == ("c", "b", "a") == tuple("abc")[::-1] 

1052assert Stream("abc")[::2] == ("a", "c") == tuple("abc")[::2] 

1053assert Stream("abc")[:2:] == ("a", "b") == tuple("abc")[:2:] 

1054assert Stream("abc")[:-1:] == ("a", "b") == tuple("abc")[:-1:] 

1055assert Stream("abc")[-1:-1:-1] == () == tuple("abc")[-1:-1:-1] 

1056assert Stream("abc")[-2:-1:-1] == () == tuple("abc")[-2:-1:-1] 

1057assert Stream("abc")[-1:-2:-1] == ("c",) == tuple("abc")[-1:-2:-1] 

1058assert Stream("abc")[-1:-3:-1] == ("c", "b") == tuple("abc")[-1:-3:-1] 

1059assert Stream("abc")[-1:] == ("c",) == tuple("abc")[-1:] 

1060assert Stream("abc")[-2:] == ("b", "c") == tuple("abc")[-2:] 

1061assert Stream("abc")[-3:] == ("a", "b", "c") == tuple("abc")[-3:] 

1062assert Stream("abc")[-2:None:None] == ("b", "c") == tuple("abc")[-2:None:None] 

1063 

1064 

1065assert isinstance(StreamableSequence() * 4, StreamableSequence) 

1066assert isinstance(4 * StreamableSequence(), StreamableSequence) 

1067assert isinstance(() + StreamableSequence(), tuple) 

1068assert isinstance(StreamableSequence() + (), StreamableSequence) 

1069assert isinstance( 

1070 StreamableSequence() + StreamableSequence(), StreamableSequence 

1071) 

1072assert isinstance(StreamableSequence()[::], StreamableSequence) 

1073assert StreamableSequence("a")[0] == "a" 

1074 

1075assert StreamableSequence("ab") + ("c", "d") == ("a", "b", "c", "d") 

1076assert ("c", "d") + StreamableSequence("ab") == ("c", "d", "a", "b") 

1077 

1078str_stream = Stream("abc") 

1079assert str_stream.last() == "c" 

1080assert_raises(StreamFinishedError, str_stream.collect) 

1081str_stream = Stream(()) 

1082assert_raises(StreamEmptyError, str_stream.first) 

1083 

1084try: 

1085 assert ( 

1086 Stream("abc").map(str.upper).sum() 

1087 == Stream("abc").concurrent_map(str.upper).sum() 

1088 == "ABC" 

1089 ) 

1090except NotImplementedError: 

1091 if sys.implementation.name == "rustpython": 

1092 traceback.print_exc() 

1093 else: 

1094 raise 

1095 

1096assert_raises(StreamEmptyError, lambda: Stream(()).sum()) 

1097 

1098int_list = [] 

1099assert ( 

1100 assert_type( 

1101 Stream.counting(-100) 

1102 .drop(100) 

1103 .drop_while((1000).__gt__) 

1104 .take_while((100_000).__gt__) 

1105 .filter(is_odd) 

1106 .map(operator.pow, 3) 

1107 .peek(int_list.append) 

1108 .enumerate() 

1109 .flat_map(operator.mul, 2) 

1110 .exclude(is_even) 

1111 .limit(10_000) 

1112 .distinct() 

1113 .chunk(30) 

1114 .map(sum) 

1115 .sum(), 

1116 int, 

1117 ) 

1118 == 432028881523605 

1119) 

1120assert sum(int_list) == 432028878744716 

1121assert len(int_list) - 1 == 3333 

1122 

1123try: # pylint: disable=too-many-try-statements 

1124 int_list.clear() 

1125 assert ( 

1126 assert_type( 

1127 Stream.counting(-100) 

1128 .drop(100) 

1129 .drop_while((1000).__gt__) 

1130 .take_while((100_000).__gt__) 

1131 .filter(is_odd) 

1132 .map(operator.pow, 3) 

1133 .peek(int_list.append) 

1134 .enumerate() 

1135 .flat_map(operator.mul, 2) 

1136 .exclude(is_even) 

1137 .limit(10_000) 

1138 .distinct() 

1139 .chunk(30) 

1140 .concurrent_map(sum) 

1141 .sum(), 

1142 int, 

1143 ) 

1144 == 432028881523605 

1145 ) 

1146 assert sum(int_list) == 432028878744716 

1147 assert len(int_list) - 1 == 3333 

1148except NotImplementedError: 

1149 if sys.implementation.name == "rustpython": 

1150 traceback.print_exc() 

1151 else: 

1152 raise 

1153 

1154assert Stream("abc").starcollect(lambda *args: args) == ("a", "b", "c") 

1155 

1156try: # pylint: disable=too-many-try-statements 

1157 int_list = [] 

1158 it_w_cl: IterWithCleanUp[int] = IterWithCleanUp( 

1159 Stream.counting(1), lambda: int_list.append(1) 

1160 ) 

1161 assert next(it_w_cl) == 1 

1162 assert not int_list 

1163 with it_w_cl as _it: 

1164 assert next(_it) == 2 

1165 assert not int_list 

1166 assert int_list == [1] 

1167 

1168 with it_w_cl as _it: 

1169 assert not next(_it, None) 

1170 assert int_list == [1] 

1171 

1172 assert int_list == [1] 

1173except TypeError: 

1174 if sys.implementation.name == "rustpython": 

1175 traceback.print_exc() 

1176 else: 

1177 raise