tasklimiter.mjs

// Helper classes for queueing a batch of promises

function positive_int(name, val, min=0){
	if (!(Number.isInteger(val) && val >= min || val === Infinity))
		throw name+" must be a number >= "+min;
}
const _task_limiter_counts = ["running","queued","blocked","tasks"];

/** Limits the number of Promises that are running, and optionally queued as well.
 *
 * ```js
 * const tl = new TaskLimiter(1,2);
 * 
 * // preferred usage
 * await tl.add(some_function);
 * // you can leave off the await if the queued_limit is Infinity (the default), or to avoid race conditions
 * 
 * // not recommended, but allowed
 * tl.add(already_running_promise);
 * 
 * // more manual control; works as well, but only if the `add` call is sync;
 * // will still need to await tl.add if you want to know when it was queued
 * await tl.tasksBelow(6)
 * tl.add(some_function);
 * // some additional wrappers available for the xxxBelow methods
 * tl.runningEmpty().then(() => {
 * 	process.exit();
 * })
 * ```
 * 
 * After {@link TaskLimiter#add} is called, a task goes through four phases. A phase will be skipped
 * if it can proceed directly to the next:
 * 
 * 1.   The tasks is blocked until it can be added to the queue. You can have multiple tasks blocked
 *      at the same time for use cases like multithreading or uncoordinated code sections, but in
 *      general you should not add more tasks until the previous has unblocked.
 * 2.	Once the number of queued tasks falls below {@link TaskLimiter#queued_limit}, the task
 * 		is moved from blocked to queued.
 * 3. 	Once the number of running tasks falls below {@link TaskLimiter#running_limit}, the task
 * 		is moved from queued to running. If the tasks is a function, it is called here
 * 4. 	The task finishes, possibly asynchronously, and is removed from running
 * 
 * How does queueing tasks recursively work? Take `TaskLimiter(1, 0)` for example: if the running
 * task tries to add another task it could get deadlocked. Here's how to handle recursive tasks:
 * 
 * 1.   If the child is running in-place of the parent task, meaning the parent blocks while waiting
 *      for the child to finish, you do not need to use the TaskLimiter for the child. Just run the
 *      child function/promise directly in the parent.
 * 2.   If for whatever reason the child does need to use the TaskLimiter, for example multiple
 *      children running simultaneously, or you want to limit the total parent+child recursion, you
 *      can run into deadlock scenarios. The easiest way to prevent this is to increment
 *      {@link TaskLimiter#running_limit} before doing your child tasks, and then decrement when
 *      done. For tail recursion scenarios, where the parent would immediately exit when the
 *      children have been queued, you can use `await add(task, true)`; if your tail recursion could
 *      run into recursion stack limit issues, this could be a good alternative to #1.
 */
export class TaskLimiter{
	/** Create a new task limiter
	 * @param {number} [running_limit=10] Initial value for {@link TaskLimiter#running_limit} property
	 * @param {number} [queued_limit=Infinity] Initial value for {@link TaskLimiter#queued_limit} property
	 */
	constructor(running_limit=10, queued_limit=Infinity){
		positive_int("running limit", running_limit, 1);
		positive_int("queued limit", queued_limit);
		/** Private storage for {@link TaskLimiter#running_limit}
		 * @private
		 */
		this._running_limit = running_limit;
		/** Private storage for {@link TaskLimiter#queued_limit}
		 * @private
		 */
		this._queued_limit = queued_limit;
		/** Set of promises waiting to be resolved... the async tasks
		 * @type {Set<Promise>}
		 * @private
		 */
		this.tasks_running = new Set();
		/** These tasks are waiting to be run;each entry a function that generates a promise (async
		 * task) or something else (sync task)
		 * @type {Array<function|Promise>}
		 * @private
		 */ 
		this.tasks_queued = [];
		/** Blocked tasks; if this grows too large it means you are not awaiting promise from `add`.
		 * Each entry a tuple [fn, resolver]; `fn` is moved to `tasks_queued` once resolved
		 * @type {Array.<function[]>}
		 * @private
		 */
		this.tasks_blocked = [];
		/** Whether we are shifting tasks to different queues (_shift_tasks method). It is
		 * incremented/decremented for each recursive shift call
		 * @type {number}
		 * @private
		 */
		this.shifting = 0;
		/** Listeners for when running/queued/blocked/tasks counts go below a threshold:
		 * `{[type]: Map(limit => [[lid, resolver_fn], ...])}`
		 *	
		 * I chose this structure since I assume the number of unique `limit` values will be very
		 * small. Better to just loop through all limits than have the cost of maintaining sorted
		 * structure.
		 *	
		 * Out of all limiters[type][limit][0] candidates that meet our dirty check criteria, we
		 * resolve in order of lid, so that promises are resolved in the order of creation. Could
		 * use a heap, but think its overkill: with avg 2 candidates per check, faster to just brute
		 * force look for the min.
		 * @type {Object<string, Map<number, Array>>}
		 * @private
		 */
		this.listeners = {};
		for (const t of _task_limiter_counts)
			this.listeners[t] = new Map();
		
		this.lid = 0;
	}
	/** Limits the number of async tasks that can be running at the same time. If you
	 *  - add a Promise directly to `add` method
	 *  - dynamically change this value
	 * 
	 * then the running task count could rise above this limit temporarily.
	 * @type {number}
	 */
	get running_limit(){ return this._running_limit; }
	/** Limits the number of tasks that can be queued. {@link TaskLimiter#add} will block until the
	 * queue opens up more slots. If you dynamically change this value, the queued task count could
	 * rise above this limit temporarily.
	 * 
	 * Set this to `Infinity` for unlimited queue, and no blocked tasks. If not `Infinity`, make
	 * sure you await the {@link TaskLimiter#add} result, otherwise an internal `tasks_blocked`
	 * list will be filled. This can be zero to have no queue, blocking until a task can be run.
	 * @type {number}
	 */
	get queued_limit(){ return this._queued_limit; }
	set running_limit(limit){
		if (limit == this._running_limit)
			return
		positive_int("running limit", limit, 1);
		let more_slots = this._running_limit > limit;
		this._running_limit = limit;
		if (more_slots)
			this._shift_tasks();
	}
	set queued_limit(limit){
		if (limit == this._queued_limit)
			return
		positive_int("queued limit", limit);
		let more_slots = this._queued_limit > limit;
		this._queued_limit = limit;
		if (more_slots)
			this._shift_tasks();
	}

	/** Number of running tasks
	 * @readonly
	 * @type {number}
	 */
	get running(){ return this.tasks_running.size; }
	/** Number of queued tasks
	 * @readonly
	 * @type {number}
	 */
	get queued(){ return this.tasks_queued.length; }
	/** Number of blocked tasks that have yet to be queued
	 * @readonly
	 * @type {number}
	 */
	get blocked(){ return this.tasks_blocked.length; }
	/** Number of tasks, either running and queued
	 * @readonly
	 * @type {number}
	 */
	get tasks(){ return this.running + this.queued; }

	/** Internal method used to register xxxBelow listeners
	 * @param limit when the count of "type" falls below this limit, the listener is resolved
	 * @param type one of the vlaues from _task_limiter_counts
	 * @returns {Promise} 
	 * @private
	 */
	_add_listener(limit, type){
		if (limit !== null)
			positive_int(type+" limit", limit, 1);
		const listener = new Promise((resolve) => {
			// register the listener
			const t = this.listeners[type];
			// we fire listeners in order they are registered
			let queue = t.get(limit);
			if (!queue){
				queue = [];
				t.set(limit, queue);
			}
			queue.push([++this.uid, resolve]);
		});
		// in case we are not shifting already, do so now;
		// shift_tasks triggers listeners once all tasks have been shifted into their correct lists
		this._shift_tasks();
		return listener;
	}
	/**
	 * Returns a promise which resolves when a task can be run. This is an alias for
	 * `runningBelow(null)` call.
	 * @see {@link TaskLimiter#runningBelow}
	 * @returns {Promise}
	 */
	canRun(){ return this.runningBelow(); }
	/**
	 * Returns a promise which resolves when a task can be queued. This is an alias for
	 * `queuedBelow(null)` call.
	 * @see {@link TaskLimiter#queuedBelow}
	 * @returns {Promise}
	 */
	canQueue(){ return this.queuedBelow(); }
	/**
	 * Returns a promise which resolves when a task can be queued or run. See
	 * {@link TaskLimiter#canRun} and {@link TaskLimiter#canQueue} methods. This is an alias for
	 * `tasksBelow(null)` call.
	 * @see {@link TaskLimiter#tasksBelow}
	 * @returns {Promise}
	 */
	canHandle(){ return this.tasksBelow(); }

	/**
	 * Marks a promise `task` as running
	 * @private
	 */
	_async_task(task){
		this.tasks_running.add(task);
		task.catch((err) => {
			console.error("TaskLimiter async task failed:", err);
			// TODO: figure out how to catch this in outer scopes, instead of just killing process
			process.exit(3);
		}).finally(() => {
			this.tasks_running.delete(task);
			this._shift_tasks();
		});
	}
	/**
	 * Substitutes `null` for the current limit; used in listeners
	 * @private
	 */
	_null_limits(type){
		switch (type){
			case "running":
				return this.running_limit;
			case "queued":
				return this.queued_limit;
			case "tasks":
				return this.running_limit + this.queued_limit;
			case "blocked":
				return 1;
		}
	}
	/** Shifts tasks from blocked -> queued -> running if possible, respecting the running/queued
	 * limits.
	 * @private
	 */
	_shift_tasks(){
		// avoid recursive calls; this method has an implicit write lock on the tasks_xxx arrays;
		// so only want one method running at a time or it gets complicated
		if (this.shifting++)
			return;
		// we have this outer loop in case the `listeners` add more tasks / modify limits
		outer: while (true){
			// can run more tasks?
			while (this.running < this._running_limit){
				let fn;
				// run a queued task
				if (this.queued)
					fn = this.tasks_queued.shift();
				// unblock, skipping the queue and running directly
				else if (this.blocked){
					const block = this.tasks_blocked.shift();
					fn = block[0];
					// resolver for `add` promise
					block[1]();
				}
				else break;
				// start the task
				const task = fn();
				if (task instanceof Promise)
					this._async_task(task);
			}
			// no more remaining `running` slots; fill in `queued` slots
			while (this.queued < this._queued_limit && this.blocked){
				const [fn, resolve] = this.tasks_blocked.shift();
				this.tasks_queued.push(fn);
				// resolver for `add` promise
				resolve();
			}

			/* Evaluate listeners;

				TODO: This could be more efficient, making use of extra information we know about, like which
					task_xxx lengths stayed the same, decremented, or incremented. Only need to add/remove entries
					of the heap in specific cases. Thinking we could pass a flag with extra information about such
					things to _shift_tasks, or perhaps just a bitset on the class that gets disjuncted
			*/
			// use prev_shifting to track if the listener resolvers do some recursive action that requires further shifting
			let prev_shifting = this.shifting;
			// initialize heap-like structure (we won't actually heapify), for evaluating listeners promises in order;
			// each entry a triplet [type, limit, [[lid, resolver], ...]]
			let heap = [];
			const heap_filter = (t, l) => {
				// substitute null limits with actual values
				const lval = l === null ? this._null_limits(t) : l;
				return this[t] < lval;
			};
			for (const t in this.listeners){
				for (const [l,q] of this.listeners[t]){
					if (heap_filter(t, l))
						heap.push([t, l, q]);
				}
			}
			while (heap.length){
				// heap peek
				let min = 0;
				for (let i=1; i<heap.length; i++)
					if (heap[i][2][0] < heap[min][2][0])
						min = heap[i];
				const [t, l, q] = heap[min];
				const head = q.shift();
				// resolve listener
				head[1]();
				// no more listeners for this type+limit?
				if (!q.length){
					this.listeners[t].delete(l);
					heap.splice(min, 1);
				}
				/* if prev_shifting different, listener triggered further shifting either
						a) running/queued limit incremented, or
						b) new blocked tasks
					we double check if shifts are needed; note we need to re-shift immediately, so that, for example, we don't trigger
					multiple "canQueue" resolvers when only one can be queued
				*/
				if (prev_shifting != this.shifting && (this.queued < this._queued_limit && this.blocked || this.running < this._running_limit && this.queued))
					continue outer;
				/* trim out heap values that don't meet limit criteria anymore; since we restart with the prev_shifting cases,
					this will only happen when:
						a) running/queued limit decreased, or
						b) running async new tasks
				*/
				for (let i=0; i<heap.length; ){
					const [ti, li, qi] = heap[i];
					if (!heap_filter(ti, li))
						heap.splice(i,1);
					else i++;
				}
			}
			break;
		}
		this.shifting = 0;
	}
	/** Add a task to be queued
	 * @param {Promise | function} task This can be a promise, in which case it is considered
	 *  running already. Otherwise it should be a function representing a task. If the function
	 *  returns a promise, it is considered an async task and will handled accordingly; if it
	 *  returns any other type, it is assumed to be a synchronous task.
	 * @param {boolean} nonblocking If true, for a task that is not yet running this will send it
	 *  directly to the queue, disregarding any {@link TaskLimiter#queued_limit}. You might use this
	 *  if you add tasks in a tail recursive manner, knowing that a running slot will open up and
	 *  bring the queued length back to queued_limit.
	 * @returns {Promise} resolves when the task is queued or run immediately
	 */
	add(task, nonblocking=false){
		// already running; we just add it as running task
		if (task instanceof Promise){
			this._async_task(task);
			return Promise.resolve();
		}
		if (nonblocking){
			this.tasks_queued.push(task);
			this._shift_tasks();
			return Promise.resolve();
		}
		// otherwise it starts out blocked; _shift_tasks handles queueing/running it
		return new Promise((resolve) => {
			this.tasks_blocked.push([task, resolve]);
			this._shift_tasks();
		});
	}
}
// Add methods for [type]Below listeners
for (const t of _task_limiter_counts){
	TaskLimiter.prototype[t+"Below"] = function(limit=null){
		return this._add_listener(limit, t);
	};
	TaskLimiter.prototype[t+"Empty"] = function(){
		return this._add_listener(1, t);
	};
}
/** Returns promise that resolves when {@link TaskLimiter#running} falls below `limit`
 * @function 
 * @name runningBelow
 * @memberof TaskLimiter.prototype
 * @param {?number} [limit=null] null checks if it is below {@link TaskLimiter#running_limit}
 * @returns {Promise}
 */
/** Returns promise that resolves when {@link TaskLimiter#queued} falls below `limit`
 * @function
 * @name queuedBelow
 * @memberof TaskLimiter.prototype
 * @param {number?} [limit=null] null checks if it is below {@link TaskLimiter#queued_limit}
 * @returns {Promise}
 */
/** Returns promise that resolves when {@link TaskLimiter#blocked} falls below `limit`
 * @function
 * @name blockedBelow
 * @memberof TaskLimiter.prototype
 * @param {number?} [limit=null] null checks if it is below one (e.g. no blocked tasks)
 * @returns {Promise}
 */
/** Returns promise that resolves when {@link TaskLimiter#tasks} falls below `limit`
 * @function
 * @name tasksBelow
 * @memberof TaskLimiter.prototype
 * @param {number?} [limit=null] null checks if it is below the sum of 
 *  {@link TaskLimiter#running_limit} and {@link TaskLimiter#queued_limit}
 * @returns {Promise}
 */

/** Returns promise that resolves when there are no running tasks
 * @function
 * @name runningEmpty
 * @memberof TaskLimiter.prototype
 * @returns {Promise}
 */
/** Returns promise that resolves when there are no queued tasks
 * @function
 * @name queuedEmpty
 * @memberof TaskLimiter.prototype
 * @returns {Promise}
 */
/** Returns promise that resolves when there are no blocked tasks
 * @function
 * @name blockedEmpty
 * @memberof TaskLimiter.prototype
 * @returns {Promise}
 */
/** Returns promise that resolves when there are no running or queued tasks
 * @function
 * @name tasksEmpty
 * @memberof TaskLimiter.prototype
 * @returns {Promise}
 */

/** Listener called with ordered task results
 * @callback StackListener
 * @param {any} res return value of the `num`-th aded task
 * @param {number} num zero-indexed number indicating the order this task was queued
 * @param {...any} args forwarded arguments that were passed when the task was added
 */

/** Used to run many sync/async functions simultaneously, but return results in the order they were
 * added. This utilizes {@link TaskLimiter} internally to limit the number of executed tasks. An
 * example use case might be the need to read/preprocess many files, but needing to aggregate the
 * results in order.
 */
export class TaskStackAsync{
	/** Create a new TaskStackAsync
	 * @param {!Stacklistener} listener listener for task results
	 * @param {number} [running_limit=16] number of tasks that can be running at a time (e.g.
	 * 	{@link TaskLimiter#running_limit})
	 * @param {number} [queued_limit=Infinity] number of tasks that can be queued at a time (e.g.
	 * 	{@link TaskLimiter#queued_limit})
	 */
	constructor(listener, running_limit=16, queued_limit=Infinity){
		/** Buffered results that need to be ordered. Maps the task `num` to listener callback arguments
		 * @type {Map<number, any[]>}
		 * @private
		 */
		this.results = new Map();
		/** Number of tasks that have been added
		 * @type {number}
		 * @private
		 */
		this.count = 0;
		/** Current task number we're waiting on to complete
		 * @type {number}
		 * @private
		 */
		this.current = 0;
		/** Listener for results
		 * @type {!StackListener}
		 */
		this.listener = listener;
		/** Limiter for simultaneous running tasks
		 * @type {TaskLimiter}
		 */
		this.limiter = new TaskLimiter(running_limit, queued_limit);
		/** Resolvers for when queue is empty
		 * @type {function[]}
		 * @private
		 */
		this.done_listeners = [];
	}
	/** Add an ordered task to the stack
	 * @param {Promise | function} task Running promise or function
	 * 	(if promise is already running, the limit config option may not be respected)
	 * @param {...any} args Arguments to pass to the {@link TaskStackAsync#listener} (not the task!)
	 * @returns {Promise} a promise resolving when the underlying {@link TaskLimiter} unblocks
	 */
	async add(task, ...args){
		let qtask, num = this.count;
		// already running promise
		if (task instanceof Promise)
			qtask = this._add_cbk(task, num, args);
		else{
			qtask = () => {
				const res = task(); // returns sync result or a promise
				return this._add_cbk(res, num, args);
			}
		}
		this.count++;
		return this.limiter.add(qtask);
	}
	/** Adds callback for a task's completion (supports both sync or async task)
	 * @param {Promise | any} task the result of running a task; a Promise indicates an async result
	 * @param {number} num task index
	 * @param {any[]} args arguments to be passed to the listener
	 * @private
	 */
	_add_cbk(task, num, args){
		const complete_handler = (res) => {
			args.unshift(res, num);
			this.results.set(num, args);
			// remove from buffer
			while (this.results.has(this.current)){
				this.listener(...this.results.get(this.current));
				this.results.delete(this.current);
				this.current++;
			}
			// done promise resolver
			if (this.current == this.count){
				for (let l of this.done_listeners)
					l();
				this.done_listeners = [];
			}
		};
		// async (promise that resolves a result)
		if (task instanceof Promise)
			task.then(complete_handler);
		// sync (result)
		else complete_handler(task);
		return task;
	}
	/** Returns promise which resolves when stack is empty
	 * @returns {Promise}
	 */
	empty(){
		if (this.current == this.count)
			return Promise.resolve();
		return new Promise((resolve) => {
			this.done_listeners.push(resolve);
		});
	}
}

/** Used to run many sync/async functions sequentially. Only one task is run at a time, so the
 * results do not need to be buffered and ordered. Also to keep the class simple, there is no queue
 * limit imposed. This is equivalent to either {@link TaskStackAsync} or {@link TaskLimiter} with
 * `running_limit = 1` and `queued_limit = Infinity`. For full control, you can use those
 * alternatives.
 * 
 * Note that while this runs tasks "synchronously" all tasks are converted to async functions, as
 * tasks may not be run immediately, instead queued in the stack.
 */
export class TaskStackSync{
	/** Create a new `TaskStackSync`
	 * @param {?StackListener} [listener=null] listener for task results; optional for this class,
	 * 	since you can be guaranteed tasks are run sequentially
	 */
	constructor(listener=null){
		/** Listener for results
		 * @type {?StackListener}
		 */
		this.listener = listener;
		/** Functions/promises that are queued
		 * @type {Promise[]}
		 * @private
		 */
		this.queue = [];
		/** Listeners that are called when the stack empties
		 * @type {function[]}
		 * @private
		 */
		this.done_listeners = [];
		/** Count for task unique id's
		 * @type {number}
		 * @private
		 */
		this.count = 0;
	}
	/** Adds another task to be run
	 * @param {function|Promise} task
	 * @param {...any} args arguments to pass to the {@link TaskStackSync#listener} (not the task!)
	*/
	add(task, ...args){
		const id = this.count++;
		const qtask = async () => {
			let res;
			if (task instanceof Promise)
				res = await task;
			else{
				res = task();
				if (res instanceof Promise)
					res = await res;
			}
			if (this.listener)
				this.listener(res, id, ...args);
			this.queue.shift();
			// next task
			if (this.queue.length)
				this.queue[0]();
			else{
				for (let l of this.done_listeners)
					l();
				this.done_listeners = [];
			}
		}
		if (this.queue.length)
			this.queue.push(qtask);
		else qtask();
	}
	/** Returns promise which resolves when stack is empty
	 * @returns {Promise}
	 */
	empty(){
		if (!this.queue.length)
			return Promise.resolve();
		return new Promise((resolve) => {
			this.done_listeners.push(resolve);
		});
	}
}