SystemQuery: start filling out fetch_table properly, but it's still unfinished
...hey, at least we know what we're doing nwo and have a plan!
This commit is contained in:
parent
b1485d5410
commit
a736acc1b7
3 changed files with 41 additions and 12 deletions
|
@ -5,6 +5,9 @@ import { once, EventEmitter } from 'events';
|
||||||
import Agent from './agent/Agent.mjs';
|
import Agent from './agent/Agent.mjs';
|
||||||
import InfoBroker from './core/InfoBroker.mjs';
|
import InfoBroker from './core/InfoBroker.mjs';
|
||||||
|
|
||||||
|
import ItemQueue from './async/ItemQueue.mjs';
|
||||||
|
|
||||||
|
|
||||||
class SystemQuery extends EventEmitter {
|
class SystemQuery extends EventEmitter {
|
||||||
constructor(config, mode = "agent") {
|
constructor(config, mode = "agent") {
|
||||||
super();
|
super();
|
||||||
|
@ -32,15 +35,15 @@ class SystemQuery extends EventEmitter {
|
||||||
|
|
||||||
async handle_query(peer, msg) {
|
async handle_query(peer, msg) {
|
||||||
// 1: Validate input
|
// 1: Validate input
|
||||||
if(typeof msg.table !== "string"
|
if(typeof msg.name !== "string"
|
||||||
|| !this.info.is_valid_table(msg.table)) return;
|
|| !this.info.is_valid_table(msg.name)) return;
|
||||||
|
|
||||||
// 2: Fetch system info
|
// 2: Fetch system info
|
||||||
let table = await this.info.fetch_table(msg.table);
|
let table = await this.info.fetch_table(msg.name);
|
||||||
if(table === null) return;
|
if(table === null) return;
|
||||||
|
|
||||||
// 3: Return to requester
|
// 3: Return to requester
|
||||||
await peer.send("query-response", { result: table });
|
await peer.send("query-response", { name: msg.name, table });
|
||||||
}
|
}
|
||||||
async handle_query_response(peer, msg) {
|
async handle_query_response(peer, msg) {
|
||||||
|
|
||||||
|
@ -54,22 +57,43 @@ class SystemQuery extends EventEmitter {
|
||||||
// If it isn't valid for us, it ain't gonna be valid for anyone else....
|
// If it isn't valid for us, it ain't gonna be valid for anyone else....
|
||||||
if(!this.info.is_valid_table(name)) return null;
|
if(!this.info.is_valid_table(name)) return null;
|
||||||
|
|
||||||
const queue = new p_queue({ })
|
const queue = new ItemQueue();
|
||||||
const peers_seen = [];
|
|
||||||
while(peers_seen.length < this.agent.connected_peers.length) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
const handle_response = (peer, msg) => {
|
const handle_response = (peer, msg) => {
|
||||||
// TODO: Validate response. Is it the right table? Is it even a table? Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident.
|
// TODO: Validate response. Is it the right table? Is it even a table? Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident.
|
||||||
// TODO: It would be seriously cool to have fetch_table() be an async generator that yields pairs of peer ids and tables as they come in.
|
// TODO: It would be seriously cool to have fetch_table() be an async generator that yields pairs of peer ids and tables as they come in.
|
||||||
if(no_peers_left_or_hit_timeout) {
|
if(typeof msg !== "object"
|
||||||
this.off("message-query-response", handle_response);
|
|| typeof msg.table !== "object"
|
||||||
}
|
|| typeof msg.name !== "string") {
|
||||||
|
l.debug(`Discarding invalid table from peer ${peer.id_short}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If it's not the right table, ignore it
|
||||||
|
if(msg.name !== name) return;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
queue.push({ peer, table: msg.table });
|
||||||
};
|
};
|
||||||
|
|
||||||
this.on("message-query-response", handle_response);
|
this.on("message-query-response", handle_response);
|
||||||
|
|
||||||
|
// Only *after* we have our listeners in place do we then broadcast the
|
||||||
|
// query. Note that despite not having entered the below while loop yet,
|
||||||
|
// we do not drop any messages due to the use of the ItemQueue.
|
||||||
|
this.agent.broadcast(`query`, { name });
|
||||||
|
|
||||||
|
while(peers_seen.length < this.agent.connected_peers.length) {
|
||||||
|
let next = queue.pop(this.config.net.table_config);
|
||||||
|
if(typeof next === "undefined") // We timed out
|
||||||
|
break;
|
||||||
|
|
||||||
|
yield next;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.off("message-query-response", handle_response);
|
||||||
|
|
||||||
// FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table.
|
// FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ class ItemQueue extends EventEmitter {
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
this.items = [];
|
this.items = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,3 +20,7 @@ port = 5252
|
||||||
# The number of retries when connecting to peers.
|
# The number of retries when connecting to peers.
|
||||||
# Exponential backoff on retries is enforced
|
# Exponential backoff on retries is enforced
|
||||||
peer_retries = 5
|
peer_retries = 5
|
||||||
|
|
||||||
|
# Wait this number of seconds before giving up on receiving a table from peers.
|
||||||
|
# Decimal values are allowed, and will be rounded to the nearest millisecond.
|
||||||
|
table_timeout = 5
|
||||||
|
|
Loading…
Reference in a new issue