deno.land / std@0.224.0 / async / mux_async_iterator.ts

mux_async_iterator.ts
View Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.// This module is browser compatible.
interface TaggedYieldedValue<T> { iterator: AsyncIterator<T>; value: T;}
/** * Multiplexes multiple async iterators into a single stream. It currently * makes an assumption that the final result (the value returned and not * yielded from the iterator) does not matter; if there is any result, it is * discarded. * * @example * ```ts * import { MuxAsyncIterator } from "https://deno.land/std@$STD_VERSION/async/mux_async_iterator.ts"; * * async function* gen123(): AsyncIterableIterator<number> { * yield 1; * yield 2; * yield 3; * } * * async function* gen456(): AsyncIterableIterator<number> { * yield 4; * yield 5; * yield 6; * } * * const mux = new MuxAsyncIterator<number>(); * mux.add(gen123()); * mux.add(gen456()); * for await (const value of mux) { * // ... * } * // .. * ``` */export class MuxAsyncIterator<T> implements AsyncIterable<T> { #iteratorCount = 0; #yields: Array<TaggedYieldedValue<T>> = []; // deno-lint-ignore no-explicit-any #throws: any[] = []; #signal = Promise.withResolvers<void>();
/** Add an async iterable to the stream. */ add(iterable: AsyncIterable<T>) { ++this.#iteratorCount; this.#callIteratorNext(iterable[Symbol.asyncIterator]()); }
async #callIteratorNext( iterator: AsyncIterator<T>, ) { try { const { value, done } = await iterator.next(); if (done) { --this.#iteratorCount; } else { this.#yields.push({ iterator, value }); } } catch (e) { this.#throws.push(e); } this.#signal.resolve(); }
/** Returns an async iterator of the stream. */ async *iterate(): AsyncIterableIterator<T> { while (this.#iteratorCount > 0) { // Sleep until any of the wrapped iterators yields. await this.#signal.promise;
// Note that while we're looping over `yields`, new items may be added. for (const { iterator, value } of this.#yields) { yield value; this.#callIteratorNext(iterator); }
if (this.#throws.length) { for (const e of this.#throws) { throw e; } } // Clear the `yields` list and reset the `signal` promise. this.#yields.length = 0; this.#signal = Promise.withResolvers<void>(); } }
/** Implements an async iterator for the stream. */ [Symbol.asyncIterator](): AsyncIterator<T> { return this.iterate(); }}
std

Version Info

Tagged at
8 months ago