From a736acc1b76bff8849a524b0a349cc4e1d07ea59 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Wed, 9 Feb 2022 03:06:52 +0000 Subject: [PATCH] SystemQuery: start filling out fetch_table properly, but it's still unfinished ...hey, at least we know what we're doing nwo and have a plan! --- src/lib/SystemQuery.mjs | 48 +++++++++++++++++------ src/lib/async/ItemQueue.mjs | 1 + src/subcommands/agent/config.default.toml | 4 ++ 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/lib/SystemQuery.mjs b/src/lib/SystemQuery.mjs index f58b77d..a81bde8 100644 --- a/src/lib/SystemQuery.mjs +++ b/src/lib/SystemQuery.mjs @@ -5,6 +5,9 @@ import { once, EventEmitter } from 'events'; import Agent from './agent/Agent.mjs'; import InfoBroker from './core/InfoBroker.mjs'; +import ItemQueue from './async/ItemQueue.mjs'; + + class SystemQuery extends EventEmitter { constructor(config, mode = "agent") { super(); @@ -32,15 +35,15 @@ class SystemQuery extends EventEmitter { async handle_query(peer, msg) { // 1: Validate input - if(typeof msg.table !== "string" - || !this.info.is_valid_table(msg.table)) return; + if(typeof msg.name !== "string" + || !this.info.is_valid_table(msg.name)) return; // 2: Fetch system info - let table = await this.info.fetch_table(msg.table); + let table = await this.info.fetch_table(msg.name); if(table === null) return; // 3: Return to requester - await peer.send("query-response", { result: table }); + await peer.send("query-response", { name: msg.name, table }); } async handle_query_response(peer, msg) { @@ -54,22 +57,43 @@ class SystemQuery extends EventEmitter { // If it isn't valid for us, it ain't gonna be valid for anyone else.... if(!this.info.is_valid_table(name)) return null; - const queue = new p_queue({ }) - const peers_seen = []; - while(peers_seen.length < this.agent.connected_peers.length) { - - } + const queue = new ItemQueue(); const handle_response = (peer, msg) => { // TODO: Validate response. Is it the right table? Is it even a table? Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident. // TODO: It would be seriously cool to have fetch_table() be an async generator that yields pairs of peer ids and tables as they come in. - if(no_peers_left_or_hit_timeout) { - this.off("message-query-response", handle_response); - } + if(typeof msg !== "object" + || typeof msg.table !== "object" + || typeof msg.name !== "string") { + l.debug(`Discarding invalid table from peer ${peer.id_short}`); + return; + } + + // If it's not the right table, ignore it + if(msg.name !== name) return; + + + + queue.push({ peer, table: msg.table }); }; this.on("message-query-response", handle_response); + // Only *after* we have our listeners in place do we then broadcast the + // query. Note that despite not having entered the below while loop yet, + // we do not drop any messages due to the use of the ItemQueue. + this.agent.broadcast(`query`, { name }); + + while(peers_seen.length < this.agent.connected_peers.length) { + let next = queue.pop(this.config.net.table_config); + if(typeof next === "undefined") // We timed out + break; + + yield next; + } + + this.off("message-query-response", handle_response); + // FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table. } } diff --git a/src/lib/async/ItemQueue.mjs b/src/lib/async/ItemQueue.mjs index 6f6ff54..77ef600 100644 --- a/src/lib/async/ItemQueue.mjs +++ b/src/lib/async/ItemQueue.mjs @@ -13,6 +13,7 @@ class ItemQueue extends EventEmitter { constructor() { super(); + this.items = []; } diff --git a/src/subcommands/agent/config.default.toml b/src/subcommands/agent/config.default.toml index 8cb8384..46ba867 100644 --- a/src/subcommands/agent/config.default.toml +++ b/src/subcommands/agent/config.default.toml @@ -20,3 +20,7 @@ port = 5252 # The number of retries when connecting to peers. # Exponential backoff on retries is enforced peer_retries = 5 + +# Wait this number of seconds before giving up on receiving a table from peers. +# Decimal values are allowed, and will be rounded to the nearest millisecond. +table_timeout = 5