diff --git a/src/lib/async/ItemQueue.mjs b/src/lib/async/ItemQueue.mjs index 913cb69..6f6ff54 100644 --- a/src/lib/async/ItemQueue.mjs +++ b/src/lib/async/ItemQueue.mjs @@ -2,7 +2,15 @@ import { EventEmitter, once } from 'events'; +import ErrorWrapper from '../core/ErrorWrapper.mjs'; + + +/** + * Implements a simple asynchronous item queue. + * @extends EventEmitter + */ class ItemQueue extends EventEmitter { + constructor() { super(); this.items = []; @@ -13,18 +21,42 @@ class ItemQueue extends EventEmitter { this.emit("push", item); } + /** + * Pushes 1 more more items into the queue. + * @param {any} items The items to add to the queue. + * @return {void} + */ push(...items) { - + for(let item of items) + this.__push_single(item); } - async pop(time_limit = 0) { - if(this.items.length === 0) { - } + /** + * Removes an item from the queue. + * If no items exist in the queue at the moment, at more time_limit_ms + * milliseconds will elapse before this method returns undefined. + * @param {Number} [time_limit_ms=0] The maximum number of milliseconds to wait before timing out. + * @return {Promise} A Promise that resolves to either the next item in the queue, or undefined if it timed out. + */ + async pop(time_limit_ms = 0) { + if(this.items.length === 0) + return await this.wait_for_item(time_limit_ms); - this.pop() + let item = this.items.pop(); + this.emit("pop", item); + if(this.items.length === 0) + this.emit("empty"); + + return item; } - wait_for_item(time_limit_ms = 0) { + /** + * Waits at most time_limit_ms for the next item to be pushed, before popping and returning it. + * You probably do not want to call this method directly - try the .pop() method instead. + * @param {Number} [time_limit_ms=0] The maximum time to wait for the item to come in before timing out. + * @return {Promise} A Promise that resolves to the next item, or undefined if we timed out waiting. + */ + async wait_for_item(time_limit_ms = 0) { const ac = new AbortController(); let timeout = null; if(time_limit_ms > 0) { @@ -32,16 +64,56 @@ class ItemQueue extends EventEmitter { ac.abort(); }, time_limit_ms); } - let item = await once(this, "push", { signal: ac.signal }); - if(timeout !== null) - clearTimeout(timeout); - this.items.splice(this.items.indexOf(item)); - return item; + try { + let item = await once(this, "push", { signal: ac.signal }); + if(timeout !== null) + clearTimeout(timeout); + + this.items.splice(this.items.indexOf(item)); + return item; + } + catch(error) { + if(timeout !== null) + clearTimeout(timeout); + + if(error.name !== "AbortError") + throw new ErrorWrapper(`Error: An unknown error occurred while waiting for the next item.`, error); + + return undefined; + } } - wait_empty() { + /** + * Waits for the queue to be empty. + * @param {Number} [time_limit_ms=0] The maximum number of milliseconds to wait for the queue to be empty before timing out. + * @return {Promise} A Promise that resolves to a bool when the queue is empty (true) or we time out (false). + */ + async wait_empty(time_limit_ms = 0) { + const ac = new AbortController(); + let timeout = null; + if(time_limit_ms > 0) { + timeout = setTimeout(() => { + ac.abort(); + }, time_limit_ms); + } + try { + await once(this, "empty", { signal: ac.signal }); + if(timeout !== null) + clearTimeout(timeout); + + return true; + } + catch(error) { + if(timeout !== null) + clearTimeout(timeout); + + if(error.name !== "AbortError") + throw new ErrorWrapper(`Error: An unknown error occurred while waiting for the next item.`, error); + + return false; + } } }