Fill out ItemQueue implementation
This commit is contained in:
parent
44d64c09d0
commit
3bdbad150c
1 changed files with 84 additions and 12 deletions
|
@ -2,7 +2,15 @@
|
|||
|
||||
import { EventEmitter, once } from 'events';
|
||||
|
||||
import ErrorWrapper from '../core/ErrorWrapper.mjs';
|
||||
|
||||
|
||||
/**
|
||||
* Implements a simple asynchronous item queue.
|
||||
* @extends EventEmitter
|
||||
*/
|
||||
class ItemQueue extends EventEmitter {
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
this.items = [];
|
||||
|
@ -13,18 +21,42 @@ class ItemQueue extends EventEmitter {
|
|||
this.emit("push", item);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pushes 1 more more items into the queue.
|
||||
* @param {any} items The items to add to the queue.
|
||||
* @return {void}
|
||||
*/
|
||||
push(...items) {
|
||||
|
||||
for(let item of items)
|
||||
this.__push_single(item);
|
||||
}
|
||||
|
||||
async pop(time_limit = 0) {
|
||||
if(this.items.length === 0) {
|
||||
/**
|
||||
* Removes an item from the queue.
|
||||
* If no items exist in the queue at the moment, at more time_limit_ms
|
||||
* milliseconds will elapse before this method returns undefined.
|
||||
* @param {Number} [time_limit_ms=0] The maximum number of milliseconds to wait before timing out.
|
||||
* @return {Promise} A Promise that resolves to either the next item in the queue, or undefined if it timed out.
|
||||
*/
|
||||
async pop(time_limit_ms = 0) {
|
||||
if(this.items.length === 0)
|
||||
return await this.wait_for_item(time_limit_ms);
|
||||
|
||||
let item = this.items.pop();
|
||||
this.emit("pop", item);
|
||||
if(this.items.length === 0)
|
||||
this.emit("empty");
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
this.pop()
|
||||
}
|
||||
|
||||
wait_for_item(time_limit_ms = 0) {
|
||||
/**
|
||||
* Waits at most time_limit_ms for the next item to be pushed, before popping and returning it.
|
||||
* You probably do not want to call this method directly - try the .pop() method instead.
|
||||
* @param {Number} [time_limit_ms=0] The maximum time to wait for the item to come in before timing out.
|
||||
* @return {Promise} A Promise that resolves to the next item, or undefined if we timed out waiting.
|
||||
*/
|
||||
async wait_for_item(time_limit_ms = 0) {
|
||||
const ac = new AbortController();
|
||||
let timeout = null;
|
||||
if(time_limit_ms > 0) {
|
||||
|
@ -32,6 +64,8 @@ class ItemQueue extends EventEmitter {
|
|||
ac.abort();
|
||||
}, time_limit_ms);
|
||||
}
|
||||
|
||||
try {
|
||||
let item = await once(this, "push", { signal: ac.signal });
|
||||
if(timeout !== null)
|
||||
clearTimeout(timeout);
|
||||
|
@ -39,9 +73,47 @@ class ItemQueue extends EventEmitter {
|
|||
this.items.splice(this.items.indexOf(item));
|
||||
return item;
|
||||
}
|
||||
catch(error) {
|
||||
if(timeout !== null)
|
||||
clearTimeout(timeout);
|
||||
|
||||
wait_empty() {
|
||||
if(error.name !== "AbortError")
|
||||
throw new ErrorWrapper(`Error: An unknown error occurred while waiting for the next item.`, error);
|
||||
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for the queue to be empty.
|
||||
* @param {Number} [time_limit_ms=0] The maximum number of milliseconds to wait for the queue to be empty before timing out.
|
||||
* @return {Promise} A Promise that resolves to a bool when the queue is empty (true) or we time out (false).
|
||||
*/
|
||||
async wait_empty(time_limit_ms = 0) {
|
||||
const ac = new AbortController();
|
||||
let timeout = null;
|
||||
if(time_limit_ms > 0) {
|
||||
timeout = setTimeout(() => {
|
||||
ac.abort();
|
||||
}, time_limit_ms);
|
||||
}
|
||||
|
||||
try {
|
||||
await once(this, "empty", { signal: ac.signal });
|
||||
if(timeout !== null)
|
||||
clearTimeout(timeout);
|
||||
|
||||
return true;
|
||||
}
|
||||
catch(error) {
|
||||
if(timeout !== null)
|
||||
clearTimeout(timeout);
|
||||
|
||||
if(error.name !== "AbortError")
|
||||
throw new ErrorWrapper(`Error: An unknown error occurred while waiting for the next item.`, error);
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue