181 lines
5.6 KiB
JavaScript
181 lines
5.6 KiB
JavaScript
"use strict";
|
|
|
|
import fs from 'fs';
|
|
import path from 'path';
|
|
import { once, EventEmitter } from 'events';
|
|
|
|
import log from './io/NamespacedLog.mjs'; const l = log("systemquery");
|
|
import ItemQueue from './async/ItemQueue.mjs';
|
|
import current_git_commit from './io/current_git_commit.mjs';
|
|
|
|
import Agent from './agent/Agent.mjs';
|
|
import InfoBroker from './core/InfoBroker.mjs';
|
|
import HttpSubsystem from './agent/subsystems/http/HttpSubsystem.mjs';
|
|
|
|
const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/"));
|
|
|
|
class SystemQuery extends EventEmitter {
|
|
/**
|
|
* Returns an object representing our local information that looks and
|
|
* even quacks a bit like a real Peer object, but isn't.
|
|
* @return {Object}
|
|
*/
|
|
get peer_local() {
|
|
return {
|
|
id: this.agent.peer_id,
|
|
id_short: (this.agent.peer_id || "").substring(0, 7),
|
|
name: this.agent.peer_name,
|
|
address: this.agent.local_bind_address,
|
|
port: this.agent.local_port,
|
|
self: true
|
|
}
|
|
}
|
|
|
|
get listening() { return this.agent !== null ? this.agent.listening : false; }
|
|
|
|
|
|
// TODO: Handle duplicate connections better by both skipping counting them here, and also implementing a more robust reaper for killing duplicate connections that always kills the newest/oldest connection to avoid issues.
|
|
|
|
|
|
constructor(config, mode = "agent") {
|
|
super();
|
|
|
|
// The operating mode. Possible values: agent [default], query_client
|
|
// TODO: Is this the best way of doing this? Maybe we should have a separate class for this? I'm not sure.
|
|
this.mode = mode;
|
|
this.config = config;
|
|
this.info = new InfoBroker();
|
|
|
|
this.pkg = null;
|
|
this.version = null;
|
|
this.commit = null;
|
|
|
|
this.agent = null;
|
|
this.http = new HttpSubsystem(this);
|
|
}
|
|
|
|
/**
|
|
* Initialises the SystemQuery agent.
|
|
* @return {Promise} A Promise that resolves when initialisation is complete.
|
|
*/
|
|
async init() {
|
|
///
|
|
// 0: Preamble
|
|
///
|
|
this.pkg = JSON.parse(await fs.promises.readFile(path.join(__dirname, "../../package.json")));
|
|
this.version = this.pkg.version;
|
|
this.commit = await current_git_commit(path.join(__dirname, "../../.git"));
|
|
|
|
///
|
|
// 1: Create agent
|
|
///
|
|
this.agent = new Agent(this.config);
|
|
this.http.init(
|
|
this.config.net.http.port,
|
|
this.config.net.http.bind_address
|
|
);
|
|
|
|
///
|
|
// 2: Attach listeners
|
|
///
|
|
this.agent.on("message-query", this.handle_query.bind(this));
|
|
this.agent.on("message-query-response", this.handle_query_response.bind(this));
|
|
|
|
///
|
|
// 3: Start agent
|
|
///
|
|
await this.agent.init();
|
|
}
|
|
|
|
async handle_query(peer, msg) {
|
|
// 1: Validate input
|
|
if(typeof msg.name !== "string"
|
|
|| !this.info.is_valid_table(msg.name)) return;
|
|
|
|
// 2: Fetch system info
|
|
let table = await this.info.fetch_table(msg.name);
|
|
if(table === null) return;
|
|
|
|
// 3: Return to requester
|
|
await peer.send("query-response", { name: msg.name, table });
|
|
}
|
|
async handle_query_response(peer, msg) {
|
|
l.log(`query-response from ${peer.id_short}`, msg);
|
|
}
|
|
|
|
async capture_query_response(event_name, ac) {
|
|
return await once(this.agent, `message-${event_name}`, { signal: ac });
|
|
}
|
|
|
|
/**
|
|
* Fetches the table with the given name from all peers in the swarm
|
|
* (including this local instance).
|
|
* While this function iteratively yields tables as they are received, it
|
|
* will wait until all peers respond before finally ending.
|
|
* If 1 of more peers do not respond, then if no answers are received
|
|
* within the time limit specified in the config object (supplied at
|
|
* startup) this function will return.
|
|
* @param {string} name The name of the table to fetch.
|
|
* @return {AsyncGenerator<Object>} An asynchronous generator that yields the tables as they are received.
|
|
*/
|
|
async *fetch_table(name) {
|
|
// If it isn't valid for us, it ain't gonna be valid for anyone else....
|
|
if(!this.info.is_valid_table(name)) return null;
|
|
|
|
const queue = new ItemQueue();
|
|
|
|
const handle_response = (peer, msg) => {
|
|
// Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident.
|
|
if(typeof msg !== "object"
|
|
|| typeof msg.table !== "object"
|
|
|| typeof msg.name !== "string") {
|
|
l.debug(`Discarding invalid table from peer ${peer.id_short}`);
|
|
return;
|
|
}
|
|
|
|
// If it's not the right table, ignore it
|
|
if(msg.name !== name) return;
|
|
|
|
queue.enqueue({ peer, table: msg.table });
|
|
};
|
|
|
|
this.agent.on("message-query-response", handle_response);
|
|
|
|
// Only *after* we have our listeners in place do we then broadcast the
|
|
// query. Note that despite not having entered the below while loop yet,
|
|
// we do not drop any messages due to the use of the ItemQueue.
|
|
this.agent.broadcast(`query`, { name });
|
|
|
|
|
|
// Yield our result first
|
|
let table_ours = await this.info.fetch_table(name);
|
|
yield {
|
|
peer: this.peer_local,
|
|
table: table_ours
|
|
};
|
|
|
|
let peers_seen = [];
|
|
while(peers_seen.length < this.agent.connected_peers.length) {
|
|
l.info(`peers_seen:`, peers_seen, `connected peers:`, this.agent.connected_peers.map(peer => peer.id));
|
|
let next = await queue.dequeue(this.config.net.table_timeout * 1000);
|
|
if(typeof next === "undefined") // We timed out
|
|
break;
|
|
l.log(`fetch table DEBUG`, next);
|
|
if(!peers_seen.includes(next.peer.id))
|
|
peers_seen.push(next.peer.id);
|
|
yield next;
|
|
}
|
|
|
|
this.agent.off("message-query-response", handle_response);
|
|
|
|
// FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table.
|
|
}
|
|
}
|
|
|
|
SystemQuery.Create = async function(config) {
|
|
let result = new SystemQuery(config);
|
|
await result.init();
|
|
return result;
|
|
}
|
|
|
|
export default SystemQuery;
|