2022-01-23 19:51:23 +00:00
"use strict" ;
2022-02-24 01:27:41 +00:00
import fs from 'fs' ;
import path from 'path' ;
2022-02-01 03:05:27 +00:00
import { once , EventEmitter } from 'events' ;
2022-01-23 19:51:23 +00:00
2022-02-12 01:53:12 +00:00
import log from './io/NamespacedLog.mjs' ; const l = log ( "systemquery" ) ;
2022-02-24 01:27:41 +00:00
import ItemQueue from './async/ItemQueue.mjs' ;
import current _git _commit from './io/current_git_commit.mjs' ;
2022-02-10 21:42:53 +00:00
2022-01-23 19:51:23 +00:00
import Agent from './agent/Agent.mjs' ;
2022-01-30 00:51:28 +00:00
import InfoBroker from './core/InfoBroker.mjs' ;
2022-02-12 16:27:50 +00:00
import HttpSubsystem from './agent/subsystems/http/HttpSubsystem.mjs' ;
2022-01-23 19:51:23 +00:00
2022-02-24 01:27:41 +00:00
const _ _dirname = import . meta . url . slice ( 7 , import . meta . url . lastIndexOf ( "/" ) ) ;
2022-02-09 03:06:52 +00:00
2022-02-01 03:05:27 +00:00
class SystemQuery extends EventEmitter {
2022-02-21 21:55:20 +00:00
/ * *
* Returns an object representing our local information that looks and
* even quacks a bit like a real Peer object , but isn ' t .
* @ return { Object }
* /
get peer _local ( ) {
return {
id : this . agent . peer _id ,
id _short : ( this . agent . peer _id || "" ) . substring ( 0 , 7 ) ,
name : this . agent . peer _name ,
address : this . agent . local _bind _address ,
port : this . agent . local _port ,
self : true
}
}
2022-02-24 01:27:14 +00:00
get listening ( ) { return this . agent !== null ? this . agent . listening : false ; }
2022-02-21 21:55:20 +00:00
// TODO: Handle duplicate connections better by both skipping counting them here, and also implementing a more robust reaper for killing duplicate connections that always kills the newest/oldest connection to avoid issues.
2022-01-31 03:06:34 +00:00
constructor ( config , mode = "agent" ) {
2022-02-01 03:05:27 +00:00
super ( ) ;
2022-01-31 03:06:34 +00:00
// The operating mode. Possible values: agent [default], query_client
// TODO: Is this the best way of doing this? Maybe we should have a separate class for this? I'm not sure.
this . mode = mode ;
2022-01-23 19:51:23 +00:00
this . config = config ;
2022-01-30 00:51:28 +00:00
this . info = new InfoBroker ( ) ;
2022-02-12 16:27:50 +00:00
2022-02-24 01:27:41 +00:00
this . pkg = null ;
this . version = null ;
this . commit = null ;
this . agent = null ;
2022-02-12 16:27:50 +00:00
this . http = new HttpSubsystem ( this ) ;
2022-01-23 19:51:23 +00:00
}
2022-02-21 21:55:20 +00:00
/ * *
* Initialises the SystemQuery agent .
* @ return { Promise } A Promise that resolves when initialisation is complete .
* /
2022-01-23 19:51:23 +00:00
async init ( ) {
2022-02-24 01:27:41 +00:00
///
// 0: Preamble
///
this . pkg = JSON . parse ( await fs . promises . readFile ( path . join ( _ _dirname , "../../package.json" ) ) ) ;
this . version = this . pkg . version ;
this . commit = await current _git _commit ( path . join ( _ _dirname , "../../.git" ) ) ;
2022-01-23 19:51:23 +00:00
///
// 1: Create agent
///
this . agent = new Agent ( this . config ) ;
2022-02-12 16:27:50 +00:00
this . http . init (
this . config . net . http . port ,
this . config . net . http . bind _address
) ;
2022-01-23 19:51:23 +00:00
///
// 2: Attach listeners
///
this . agent . on ( "message-query" , this . handle _query . bind ( this ) ) ;
this . agent . on ( "message-query-response" , this . handle _query _response . bind ( this ) ) ;
2022-02-12 16:27:50 +00:00
2022-02-21 03:10:49 +00:00
///
// 3: Start agent
///
await this . agent . init ( ) ;
2022-01-23 19:51:23 +00:00
}
async handle _query ( peer , msg ) {
2022-01-30 00:51:28 +00:00
// 1: Validate input
2022-02-09 03:06:52 +00:00
if ( typeof msg . name !== "string"
|| ! this . info . is _valid _table ( msg . name ) ) return ;
2022-01-23 19:51:23 +00:00
2022-01-30 00:51:28 +00:00
// 2: Fetch system info
2022-02-09 03:06:52 +00:00
let table = await this . info . fetch _table ( msg . name ) ;
2022-01-30 00:51:28 +00:00
if ( table === null ) return ;
// 3: Return to requester
2022-02-09 03:06:52 +00:00
await peer . send ( "query-response" , { name : msg . name , table } ) ;
2022-01-23 19:51:23 +00:00
}
async handle _query _response ( peer , msg ) {
2022-02-10 21:42:53 +00:00
l . log ( ` query-response from ${ peer . id _short } ` , msg ) ;
2022-01-23 19:51:23 +00:00
}
2022-01-31 03:06:34 +00:00
2022-02-01 03:05:27 +00:00
async capture _query _response ( event _name , ac ) {
return await once ( this . agent , ` message- ${ event _name } ` , { signal : ac } ) ;
}
2022-01-31 03:06:34 +00:00
2022-02-21 21:55:20 +00:00
/ * *
* Fetches the table with the given name from all peers in the swarm
* ( including this local instance ) .
* While this function iteratively yields tables as they are received , it
* will wait until all peers respond before finally ending .
* If 1 of more peers do not respond , then if no answers are received
* within the time limit specified in the config object ( supplied at
* startup ) this function will return .
* @ param { string } name The name of the table to fetch .
* @ return { AsyncGenerator < Object > } An asynchronous generator that yields the tables as they are received .
* /
2022-02-12 16:27:50 +00:00
async * fetch _table ( name ) {
2022-01-31 03:06:34 +00:00
// If it isn't valid for us, it ain't gonna be valid for anyone else....
if ( ! this . info . is _valid _table ( name ) ) return null ;
2022-02-21 03:14:41 +00:00
2022-02-09 03:06:52 +00:00
const queue = new ItemQueue ( ) ;
2022-02-01 03:05:27 +00:00
const handle _response = ( peer , msg ) => {
2022-02-21 21:55:20 +00:00
// Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident.
2022-02-09 03:06:52 +00:00
if ( typeof msg !== "object"
|| typeof msg . table !== "object"
|| typeof msg . name !== "string" ) {
l . debug ( ` Discarding invalid table from peer ${ peer . id _short } ` ) ;
return ;
}
// If it's not the right table, ignore it
if ( msg . name !== name ) return ;
2022-02-21 03:10:49 +00:00
queue . enqueue ( { peer , table : msg . table } ) ;
2022-01-31 03:06:34 +00:00
} ;
2022-02-21 03:10:49 +00:00
this . agent . on ( "message-query-response" , handle _response ) ;
2022-01-31 03:06:34 +00:00
2022-02-09 03:06:52 +00:00
// Only *after* we have our listeners in place do we then broadcast the
// query. Note that despite not having entered the below while loop yet,
// we do not drop any messages due to the use of the ItemQueue.
this . agent . broadcast ( ` query ` , { name } ) ;
2022-02-21 03:14:41 +00:00
2022-02-21 21:55:20 +00:00
// Yield our result first
let table _ours = await this . info . fetch _table ( name ) ;
yield {
peer : this . peer _local ,
table : table _ours
} ;
2022-02-21 03:14:41 +00:00
2022-02-12 16:27:50 +00:00
let peers _seen = [ ] ;
2022-02-09 03:06:52 +00:00
while ( peers _seen . length < this . agent . connected _peers . length ) {
2022-02-21 03:14:41 +00:00
l . info ( ` peers_seen: ` , peers _seen , ` connected peers: ` , this . agent . connected _peers . map ( peer => peer . id ) ) ;
2022-02-21 03:10:49 +00:00
let next = await queue . dequeue ( this . config . net . table _timeout * 1000 ) ;
2022-02-09 03:06:52 +00:00
if ( typeof next === "undefined" ) // We timed out
break ;
2022-02-21 03:10:49 +00:00
l . log ( ` fetch table DEBUG ` , next ) ;
2022-02-12 16:27:50 +00:00
if ( ! peers _seen . includes ( next . peer . id ) )
peers _seen . push ( next . peer . id ) ;
2022-02-09 03:06:52 +00:00
yield next ;
}
2022-02-21 03:10:49 +00:00
this . agent . off ( "message-query-response" , handle _response ) ;
2022-02-09 03:06:52 +00:00
2022-01-31 03:06:34 +00:00
// FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table.
}
2022-01-23 19:51:23 +00:00
}
SystemQuery . Create = async function ( config ) {
let result = new SystemQuery ( config ) ;
await result . init ( ) ;
return result ;
}
export default SystemQuery ;