Suppose I have some asnyc iterable objects like this:
// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
setTimeout(() => resolve(ms), ms);
});
const a = {
[Symbol.asyncIterator]: async function * () {
yield 'a';
await sleep(1000);
yield 'b';
await sleep(2000);
yield 'c';
},
};
const b = {
[Symbol.asyncIterator]: async function * () {
await sleep(6000);
yield 'i';
yield 'j';
await sleep(2000);
yield 'k';
},
};
const c = {
[Symbol.asyncIterator]: async function * () {
yield 'x';
await sleep(2000);
yield 'y';
await sleep(8000);
yield 'z';
await sleep(10000);
throw new Error('You have gone too far! ');
},
};
Now, suppose I can concat them like this:
const abcs = async function * () {
yield * a;
yield * b;
yield * c;
};
The (first 9) items yielded will be:
(async () => {
const limit = 9;
let i = 0;
const xs = [];
for await (const x of abcs()) {
xs.push(x);
i++;
if (i === limit) {
break;
}
}
console.log(xs);
})().catch(error => console.error(error));
// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]
But imagine that I do not care about the order, that a
, b
and c
yield at different speeds, and that I want to yield as quickly as possible.
How can I rewrite this loop so that x
s are yielded as soon as possible, ignoring order?
It is also possible that a
, b
or c
are infinite sequences, so the solution must not require all elements to be buffered into an array.
There is no way to write this with a loop statement.
async
/await
code always executes sequentially, to do things concurrently you need to use promise combinators directly. For plain promises, there’sPromise.all
, for async iterators there is nothing (yet) so we need to write it on our own:Notice that
combine
does not support passing values intonext
or cancellation through.throw
or.return
.You can call it like
If I change
abcs
to accept the generators to process, I come up with this, see inline comments:Live Example:
I hope I understood your question correctly, here’s how I’d approach it:
I tried it with three normal arrays:
And it resulted in
[1, 4, 7, 2, 5, 8, 3, 6, 9]
.I solved this using async generators. (I wish I’d find this question a few days ago, would save me some time)
Will gladly hear opinion and criticism.
It took me a bunch of time to find and I got so excited I put it in a npm package 🙂 https://www.npmjs.com/package/mergen
In case anyone finds it useful, here’s a typescript version of the currently accepted answer:
This is a complicated task, so I’m going to break it up into individual parts:
Step 1: logging each value from each async iterable to the console
Before we even think about creating an async iterator we should first consider the task of simply logging each value from each iterator to the console as they arrive. As with most concurrent tasks in javascript, this involves calling multiple async functions and awaiting their results with
Promise.all
.CodeSandbox link: https://codesandbox.io/s/tender-ives-4hijy?fontsize=14
The
merge
function logs values from each iterator, but is mostly useless; it returns a promise which fulfills to an array ofundefined
when all iterators finish.Step 2: Replacing the merge function with a merge async generator
The next step is to replace
console.log
calls with calls to a function which pushes to a parent async iterator. To do this with an async generator, we need a little bit more code, because the only way to “push” a value onto an async generator is with theyield
operator, which can’t be used in child function scopes. The solution is to create two queues, a push queue and a pull queue. Next, we define apush
function which either pushes to the push queue if there are no pending pulls, or enqueues a value to be pulled later. Finally, we have to perpetually yield either values from the push queue if it has values, or promises which enqueue a resolve function to be called by push later. Here’s the code:CodeSandbox link: https://codesandbox.io/s/misty-cookies-du1eg
This almost works! If you run the code, you’ll notice that the
xs
is correctly printed, but thebreak
statement is not respected, and values continue to be pulled from child iterators, causing the error thrown inc
to be thrown, resulting in an unhandled promise rejection. Also note that we don’t do anything with the result of thePromise.all
call. Ideally, when thefinishP
promise settles, the generator should be returned. We need just a little bit more code to make sure that 1. the child iterators are returned when the parent iterator is returned (with abreak
statement in afor await
loop, for instance), and 2. the parent iterator is returned when all child iterators return.Step 3: stopping each child iterator when the parent iterator is returned, and the parent iterator when every child has returned.
To make sure each child async iterable is correctly returned when the parent async generator is returned, we can use a finally block to listen for the completion of the parent async generator. And to make sure the parent generator is returned when the child iterators return, we can race yielded promises against the
finishP
promise.CodeSandbox link: https://codesandbox.io/s/vigilant-leavitt-h247u
There are some things we still need to do before this code is production ready. For instance, values are pulled from the child iterators continuously, without waiting for the parent iterator to pull them. This, combined with the fact that
pushQueue
is an unbounded array, can cause memory leaks if the parent iterator pulls values at a slower pace than the child iterators produces them.Additionally, the merge iterator returns
undefined
as its final value, but you might want the final value to be the final value from the last-completing child iterator.If you’re looking for a small, focused library which has a merge function like the one above which covers some more use-cases and edge-cases, check out Repeater.js, which I wrote. It defines the static method
Repeater.merge
, which does what I described above. It also provides a clean API for turning callback-based APIs into promises and other combinator static methods to combine async iterators in other ways.