如何限制 Promise 最大并发量

偶尔在 B 站刷到有人分享这个面试题。于是就自己作答以下。

p-limit package

最佳方案当然是使用,battle tested 的 package。https://github.com/sindresorhus/p-limit#readme

这工作中,谁要是自己写,当然是拖出去打死。

import pLimit from 'p-limit';

const limit = pLimit(2);

const input = [
	limit(() => createTask(1, 400)()),
	limit(() => createTask(2, 1000)()),
	limit(() => createTask(3, 1000)()),
    limit(() => createTask(4, 400)()),
    limit(() => createTask(5, 1000)())
];

// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);

function createTask(num, ms) {
    return () => {
        return new Promise((resolve, reject) => {
            setTimeout(() => {
                console.log(`promise ${num}`, Date.now());
                resolve(num);
            }, ms)
        })
    }
}

递归

其实 pLimit 实现也不复杂,为了实现简单,我下面实现就不考虑接口的设计。

这个问题的难点是,怎么知道并发 pool 里面的一个 promise 执行完成,并且在完成后移除,并加入一个新的 promise。

这里我们可以使用递归。

async function pLimitPool(max, tasks) {
    const pending = tasks.splice(0, max).map(task => task());

    function drainPromise(){
        while(pending.length) {
            const p = pending.shift();
            p.then(
               (value)=>{
                if(tasks.length){
                    pending.push(...tasks.splice(0, 1).map(task => task()));
                    drainPromise()
                }
               }
            )
        }
    }
    drainPromise();
}

const task1 = createTask(1, 400);
const task2 = createTask(2, 1000);
const task3 = createTask(3, 1000);
const task4 = createTask(4, 400);
const task5 = createTask(5, 1000);
const task6 = createTask(6, 1000);
const task7 = createTask(7, 1000);
const task8 = createTask(8, 500);

const tasks = [task1, task2, task3, task4, task5, task6, task7, task8];


pLimitPool(2, tasks);
function createTask(num, ms) {
    return () => {
        return new Promise((resolve, reject) => {
            setTimeout(() => {
                console.log(`promise ${num}`, Date.now());
                resolve(num);
            }, ms)
        })
    }
}

Promise Race

如果你不喜欢递归,可以利用 Promise.race()

**Promise.race(iterable) **方法返回一个 promise,一旦迭代器中的某个 promise 解决或拒绝,返回的 promise 就会解决或拒绝。

需要注意的是, race 方法返回的是一个新的 promise, 需要使用 then 保存 原始的 promise。

实现代码,

async function pLimitPool(max, tasks) {
    const result=[];
    const pending = tasks.splice(0, max).map(task => task());
    for (; pending.length;) {
        // 因为 race 返回的是一个新的 promise, 所以在这里保存下 reference, 好用来从 pending 中移除。
        // 这里还需要处理 reject case TODO:
        const {originPromise, value} = await Promise.race(pending.map(task => task.then(
            value => ({
            originPromise: task,
            value
        }))));
        result.push(value);
        pending.splice(pending.indexOf(originPromise), 1);
        if(tasks.length){
            pending.push(...tasks.splice(0, max- pending.length).map(task => task()));
        }
    }

    return result;
}

const task1 = createTask(1, 400);
const task2 = createTask(2, 1000);
const task3 = createTask(3, 1000);
const task4 = createTask(4, 400);
const task5 = createTask(5, 1000);
const task6 = createTask(6, 1000);
const task7 = createTask(7, 1000);
const task8 = createTask(8, 500);

const tasks = [task1, task2, task3, task4, task5, task6, task7, task8];

const result = await pLimitPool(2, tasks);

console.log(result);

function createTask(num, ms) { // 如上}

Async generate + Promise Race

仅仅是用 race 是完全可以工作的,但是可读性和灵活性上不是很好,利用 Async generate 可以是代码变得更加灵活。

async function* pLimitPool(max, tasks) {
    const pending = tasks.splice(0, max).map(task => task());
    for (; pending.length;) {
        // 因为 race 返回的是一个新的 promise, 所以在这里保存下 reference, 好用来从 pending 中移除。
        // 这里还需要处理 reject case TODO:
        const {originPromise, value} = await Promise.race(pending.map(task => task.then(
            value => ({
            originPromise: task,
            value
        }))));
        yield originPromise;
        pending.splice(pending.indexOf(originPromise), 1);
        if(tasks.length){
            pending.push(...tasks.splice(0, max- pending.length).map(task => task()));
        }
    }
}

const task1 = createTask(1, 400);
const task2 = createTask(2, 1000);
const task3 = createTask(3, 1000);
const task4 = createTask(4, 400);
const task5 = createTask(5, 1000);
const task6 = createTask(6, 1000);
const task7 = createTask(7, 1000);
const task8 = createTask(8, 500);

const tasks = [task1, task2, task3, task4, task5, task6, task7, task8];


for await (const result of pLimitPool(2, tasks)){
    console.log(result);
}
function createTask(num, ms) { // 如上}