Start unwinding the incoming tables from other peers, but it's gonna be a nasty and complicated business

This commit is contained in:
Starbeamrainbowlabs 2022-02-01 03:05:27 +00:00
parent 15bdcaf047
commit 44d64c09d0
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
5 changed files with 66 additions and 56 deletions

51
package-lock.json generated
View file

@ -13,7 +13,6 @@
"applause-cli": "^1.7.0",
"jpake": "^1.0.1",
"nexline": "^1.2.2",
"p-queue": "^7.1.0",
"p-reflect": "^3.0.0",
"p-retry": "^5.0.0",
"systeminformation": "^5.9.4",
@ -45,11 +44,6 @@
"resolved": "https://registry.npmjs.org/crypto-js/-/crypto-js-4.1.1.tgz",
"integrity": "sha512-o2JlM7ydqd3Qk9CA0L4NL6mTzU2sdx96a+oOfPu8Mkl/PK51vSyoi8/rQ8NknZtk44vq15lmhAj9CIAGwgeWKw=="
},
"node_modules/eventemitter3": {
"version": "4.0.7",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
},
"node_modules/fs-extra": {
"version": "8.1.0",
"resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-8.1.0.tgz",
@ -114,21 +108,6 @@
"resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz",
"integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A=="
},
"node_modules/p-queue": {
"version": "7.1.0",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz",
"integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==",
"dependencies": {
"eventemitter3": "^4.0.7",
"p-timeout": "^5.0.0"
},
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-reflect": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz",
@ -155,17 +134,6 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-timeout": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz",
"integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow==",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/retry": {
"version": "0.13.1",
"resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz",
@ -243,11 +211,6 @@
"resolved": "https://registry.npmjs.org/crypto-js/-/crypto-js-4.1.1.tgz",
"integrity": "sha512-o2JlM7ydqd3Qk9CA0L4NL6mTzU2sdx96a+oOfPu8Mkl/PK51vSyoi8/rQ8NknZtk44vq15lmhAj9CIAGwgeWKw=="
},
"eventemitter3": {
"version": "4.0.7",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
},
"fs-extra": {
"version": "8.1.0",
"resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-8.1.0.tgz",
@ -303,15 +266,6 @@
"resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz",
"integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A=="
},
"p-queue": {
"version": "7.1.0",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz",
"integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==",
"requires": {
"eventemitter3": "^4.0.7",
"p-timeout": "^5.0.0"
}
},
"p-reflect": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz",
@ -326,11 +280,6 @@
"retry": "^0.13.1"
}
},
"p-timeout": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz",
"integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow=="
},
"retry": {
"version": "0.13.1",
"resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz",

View file

@ -21,7 +21,6 @@
"applause-cli": "^1.7.0",
"jpake": "^1.0.1",
"nexline": "^1.2.2",
"p-queue": "^7.1.0",
"p-reflect": "^3.0.0",
"p-retry": "^5.0.0",
"systeminformation": "^5.9.4",

View file

@ -1,11 +1,14 @@
"use strict";
import { once, EventEmitter } from 'events';
import Agent from './agent/Agent.mjs';
import InfoBroker from './core/InfoBroker.mjs';
class SystemQuery {
class SystemQuery extends EventEmitter {
constructor(config, mode = "agent") {
super();
// The operating mode. Possible values: agent [default], query_client
// TODO: Is this the best way of doing this? Maybe we should have a separate class for this? I'm not sure.
this.mode = mode;
@ -43,13 +46,21 @@ class SystemQuery {
}
async capture_query_response(event_name, ac) {
return await once(this.agent, `message-${event_name}`, { signal: ac });
}
async fetch_table(name) {
*fetch_table(name) {
// If it isn't valid for us, it ain't gonna be valid for anyone else....
if(!this.info.is_valid_table(name)) return null;
const responses = {}; // peer id → table
const handle_response = async (peer, msg) => {
const queue = new p_queue({ })
const peers_seen = [];
while(peers_seen.length < this.agent.connected_peers.length) {
}
const handle_response = (peer, msg) => {
// TODO: Validate response. Is it the right table? Is it even a table? Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident.
// TODO: It would be seriously cool to have fetch_table() be an async generator that yields pairs of peer ids and tables as they come in.
if(no_peers_left_or_hit_timeout) {

View file

@ -14,6 +14,9 @@ import parse_peer_name from '../parse/peer_name.mjs';
import hostuuid from '../io/hostuuid.mjs';
class Agent extends EventEmitter {
get connected_peers() { return this.server.connected_peers; }
get connecting_peers() { return this.server.connecting_peers; }
constructor(config) {
super();

View file

@ -0,0 +1,48 @@
"use strict";
import { EventEmitter, once } from 'events';
class ItemQueue extends EventEmitter {
constructor() {
super();
this.items = [];
}
__push_single(item) {
this.items.push(item);
this.emit("push", item);
}
push(...items) {
}
async pop(time_limit = 0) {
if(this.items.length === 0) {
}
this.pop()
}
wait_for_item(time_limit_ms = 0) {
const ac = new AbortController();
let timeout = null;
if(time_limit_ms > 0) {
timeout = setTimeout(() => {
ac.abort();
}, time_limit_ms);
}
let item = await once(this, "push", { signal: ac.signal });
if(timeout !== null)
clearTimeout(timeout);
this.items.splice(this.items.indexOf(item));
return item;
}
wait_empty() {
}
}
export default ItemQueue;