From 44d64c09d02205df73b9d28ae9e0d01436c0a700 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Tue, 1 Feb 2022 03:05:27 +0000 Subject: [PATCH] Start unwinding the incoming tables from other peers, but it's gonna be a nasty and complicated business --- package-lock.json | 51 ------------------------------------- package.json | 1 - src/lib/SystemQuery.mjs | 19 +++++++++++--- src/lib/agent/Agent.mjs | 3 +++ src/lib/async/ItemQueue.mjs | 48 ++++++++++++++++++++++++++++++++++ 5 files changed, 66 insertions(+), 56 deletions(-) create mode 100644 src/lib/async/ItemQueue.mjs diff --git a/package-lock.json b/package-lock.json index 854fae0..c141749 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,6 @@ "applause-cli": "^1.7.0", "jpake": "^1.0.1", "nexline": "^1.2.2", - "p-queue": "^7.1.0", "p-reflect": "^3.0.0", "p-retry": "^5.0.0", "systeminformation": "^5.9.4", @@ -45,11 +44,6 @@ "resolved": "https://registry.npmjs.org/crypto-js/-/crypto-js-4.1.1.tgz", "integrity": "sha512-o2JlM7ydqd3Qk9CA0L4NL6mTzU2sdx96a+oOfPu8Mkl/PK51vSyoi8/rQ8NknZtk44vq15lmhAj9CIAGwgeWKw==" }, - "node_modules/eventemitter3": { - "version": "4.0.7", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", - "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" - }, "node_modules/fs-extra": { "version": "8.1.0", "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-8.1.0.tgz", @@ -114,21 +108,6 @@ "resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz", "integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A==" }, - "node_modules/p-queue": { - "version": "7.1.0", - "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz", - "integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==", - "dependencies": { - "eventemitter3": "^4.0.7", - "p-timeout": "^5.0.0" - }, - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/p-reflect": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", @@ -155,17 +134,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/p-timeout": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz", - "integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow==", - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/retry": { "version": "0.13.1", "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", @@ -243,11 +211,6 @@ "resolved": "https://registry.npmjs.org/crypto-js/-/crypto-js-4.1.1.tgz", "integrity": "sha512-o2JlM7ydqd3Qk9CA0L4NL6mTzU2sdx96a+oOfPu8Mkl/PK51vSyoi8/rQ8NknZtk44vq15lmhAj9CIAGwgeWKw==" }, - "eventemitter3": { - "version": "4.0.7", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", - "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" - }, "fs-extra": { "version": "8.1.0", "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-8.1.0.tgz", @@ -303,15 +266,6 @@ "resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz", "integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A==" }, - "p-queue": { - "version": "7.1.0", - "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz", - "integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==", - "requires": { - "eventemitter3": "^4.0.7", - "p-timeout": "^5.0.0" - } - }, "p-reflect": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", @@ -326,11 +280,6 @@ "retry": "^0.13.1" } }, - "p-timeout": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz", - "integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow==" - }, "retry": { "version": "0.13.1", "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", diff --git a/package.json b/package.json index c605106..760d454 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,6 @@ "applause-cli": "^1.7.0", "jpake": "^1.0.1", "nexline": "^1.2.2", - "p-queue": "^7.1.0", "p-reflect": "^3.0.0", "p-retry": "^5.0.0", "systeminformation": "^5.9.4", diff --git a/src/lib/SystemQuery.mjs b/src/lib/SystemQuery.mjs index 75b9e22..f58b77d 100644 --- a/src/lib/SystemQuery.mjs +++ b/src/lib/SystemQuery.mjs @@ -1,11 +1,14 @@ "use strict"; +import { once, EventEmitter } from 'events'; import Agent from './agent/Agent.mjs'; import InfoBroker from './core/InfoBroker.mjs'; -class SystemQuery { +class SystemQuery extends EventEmitter { constructor(config, mode = "agent") { + super(); + // The operating mode. Possible values: agent [default], query_client // TODO: Is this the best way of doing this? Maybe we should have a separate class for this? I'm not sure. this.mode = mode; @@ -43,13 +46,21 @@ class SystemQuery { } + async capture_query_response(event_name, ac) { + return await once(this.agent, `message-${event_name}`, { signal: ac }); + } - async fetch_table(name) { + *fetch_table(name) { // If it isn't valid for us, it ain't gonna be valid for anyone else.... if(!this.info.is_valid_table(name)) return null; - const responses = {}; // peer id → table - const handle_response = async (peer, msg) => { + const queue = new p_queue({ }) + const peers_seen = []; + while(peers_seen.length < this.agent.connected_peers.length) { + + } + + const handle_response = (peer, msg) => { // TODO: Validate response. Is it the right table? Is it even a table? Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident. // TODO: It would be seriously cool to have fetch_table() be an async generator that yields pairs of peer ids and tables as they come in. if(no_peers_left_or_hit_timeout) { diff --git a/src/lib/agent/Agent.mjs b/src/lib/agent/Agent.mjs index 6d11589..210a548 100644 --- a/src/lib/agent/Agent.mjs +++ b/src/lib/agent/Agent.mjs @@ -14,6 +14,9 @@ import parse_peer_name from '../parse/peer_name.mjs'; import hostuuid from '../io/hostuuid.mjs'; class Agent extends EventEmitter { + get connected_peers() { return this.server.connected_peers; } + get connecting_peers() { return this.server.connecting_peers; } + constructor(config) { super(); diff --git a/src/lib/async/ItemQueue.mjs b/src/lib/async/ItemQueue.mjs new file mode 100644 index 0000000..913cb69 --- /dev/null +++ b/src/lib/async/ItemQueue.mjs @@ -0,0 +1,48 @@ +"use strict"; + +import { EventEmitter, once } from 'events'; + +class ItemQueue extends EventEmitter { + constructor() { + super(); + this.items = []; + } + + __push_single(item) { + this.items.push(item); + this.emit("push", item); + } + + push(...items) { + + } + + async pop(time_limit = 0) { + if(this.items.length === 0) { + } + + this.pop() + } + + wait_for_item(time_limit_ms = 0) { + const ac = new AbortController(); + let timeout = null; + if(time_limit_ms > 0) { + timeout = setTimeout(() => { + ac.abort(); + }, time_limit_ms); + } + let item = await once(this, "push", { signal: ac.signal }); + if(timeout !== null) + clearTimeout(timeout); + + this.items.splice(this.items.indexOf(item)); + return item; + } + + wait_empty() { + + } +} + +export default ItemQueue;