From 135b2e8d1b99bbf98cd828538abb918d8bfadc1c Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Sat, 8 Jan 2022 16:59:08 +0000 Subject: [PATCH] Write a bunch more glue code ....but it's all still untested. I'm getting kinda nervous here --- package-lock.json | 48 +++++++++++++++++ package.json | 1 + src/cli.mjs | 5 +- src/lib/agent/Agent.mjs | 41 ++++++++++++--- src/lib/agent/Peer.mjs | 14 ++++- src/lib/agent/PeerServer.mjs | 63 ++++++++++++++++++++--- src/lib/crypto/hash.mjs | 9 +++- src/lib/io/TomlSettings.mjs | 55 ++++++++++++++++++++ src/lib/parse/peer_name.mjs | 18 +++++++ src/subcommands/agent/agent.mjs | 17 +++++- src/subcommands/agent/config.default.toml | 22 ++++++++ 11 files changed, 274 insertions(+), 19 deletions(-) create mode 100644 src/lib/io/TomlSettings.mjs create mode 100644 src/lib/parse/peer_name.mjs create mode 100644 src/subcommands/agent/config.default.toml diff --git a/package-lock.json b/package-lock.json index d58c8cb..1432e30 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "log-node": "^8.0.1", "nexline": "^1.2.2", "p-queue": "^7.1.0", + "p-retry": "^5.0.0", "systeminformation": "^5.9.4", "tweetnacl": "^1.0.3" } @@ -24,6 +25,11 @@ "resolved": "https://registry.npmjs.org/@types/crypto-js/-/crypto-js-4.0.2.tgz", "integrity": "sha512-sCVniU+h3GcGqxOmng11BRvf9TfN9yIs8KKjB8C8d75W69cpTfZG80gau9yTx5SxF3gvHGbJhdESzzvnjtf3Og==" }, + "node_modules/@types/retry": { + "version": "0.12.1", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.1.tgz", + "integrity": "sha512-xoDlM2S4ortawSWORYqsdU+2rxdh4LRW9ytc3zmT37RIKQh6IHyKwwtKhKis9ah8ol07DCkZxPt8BBvPjC6v4g==" + }, "node_modules/ansi-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz", @@ -361,6 +367,21 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-retry": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz", + "integrity": "sha512-swGFiU6Y1Q3rBikAGHpaT0FHSbiO9H04fSsJRKVtWyEQMAe2Sb1uXeBcqE/RlZqt2prlq4W2HA/+MZAt3V2NkQ==", + "dependencies": { + "@types/retry": "^0.12.1", + "retry": "^0.13.1" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-timeout": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz", @@ -372,6 +393,14 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", + "engines": { + "node": ">= 4" + } + }, "node_modules/safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", @@ -457,6 +486,11 @@ "resolved": "https://registry.npmjs.org/@types/crypto-js/-/crypto-js-4.0.2.tgz", "integrity": "sha512-sCVniU+h3GcGqxOmng11BRvf9TfN9yIs8KKjB8C8d75W69cpTfZG80gau9yTx5SxF3gvHGbJhdESzzvnjtf3Og==" }, + "@types/retry": { + "version": "0.12.1", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.1.tgz", + "integrity": "sha512-xoDlM2S4ortawSWORYqsdU+2rxdh4LRW9ytc3zmT37RIKQh6IHyKwwtKhKis9ah8ol07DCkZxPt8BBvPjC6v4g==" + }, "ansi-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz", @@ -763,11 +797,25 @@ "p-timeout": "^5.0.0" } }, + "p-retry": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz", + "integrity": "sha512-swGFiU6Y1Q3rBikAGHpaT0FHSbiO9H04fSsJRKVtWyEQMAe2Sb1uXeBcqE/RlZqt2prlq4W2HA/+MZAt3V2NkQ==", + "requires": { + "@types/retry": "^0.12.1", + "retry": "^0.13.1" + } + }, "p-timeout": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz", "integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow==" }, + "retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==" + }, "safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", diff --git a/package.json b/package.json index 11a90d4..0ca6492 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "log-node": "^8.0.1", "nexline": "^1.2.2", "p-queue": "^7.1.0", + "p-retry": "^5.0.0", "systeminformation": "^5.9.4", "tweetnacl": "^1.0.3" } diff --git a/src/cli.mjs b/src/cli.mjs index 0b0c947..bab7747 100644 --- a/src/cli.mjs +++ b/src/cli.mjs @@ -3,10 +3,11 @@ import path from 'path'; import fs from 'fs'; -import CliParser from 'applause-cli'; - import l from 'log'; import ln from 'log-node'; ln(); + +import CliParser from 'applause-cli'; + import a from './lib/io/Ansi.mjs'; const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/")); diff --git a/src/lib/agent/Agent.mjs b/src/lib/agent/Agent.mjs index 8e67a41..26da19c 100644 --- a/src/lib/agent/Agent.mjs +++ b/src/lib/agent/Agent.mjs @@ -1,29 +1,56 @@ "use strict"; +import fs from 'fs'; import os from 'os'; -import make_cert from 'make-cert'; +import log from 'log'; const l = log.get("Agent"); import systeminfo from 'systeminformation'; +import PeerServer from './PeerServer.mjs'; import hash from '../crypto/hash.mjs'; +import parse_peer_name from '../parse/peer_name.mjs'; class Agent { - constructor() { - + constructor(config) { + this.config = config; } - async init(secret_join) { - this.secret_join = secret_join; + async find_secret() { + if(this.config.secret_join_filepath !== "CHANGE_ME" + && fs.existsSync(this.config.secret_join_filepath)) + return await fs.promises.readFile(this.config.secret_join_filepath, "utf-8"); + if(this.config.secret_join !== "CHANGE_ME") + return this.config.secret_join; + + return null; + } + + async init() { /** Our peer id - calculated automatically from the system's uuid */ this.peer_id = hash("sha256", "base64", await systeminfo.system().serial) .replace(/[+/=]/g, ""); this.peer_name = os.hostname(); + this.server = new PeerServer( + this.peer_id, + await this.find_secret() + ); + this.server.retries = this.config.net.peer_retries; + l.log(`Starting peer listener....`); + await this.server.listen( + this.config.net.port, + this.config.net.bind_address + ); + l.log(`Listening on ${this.config.net.bind_address}:${this.config.net.port}`); - // Properties: key, cert - this.cert = make_cert(`${our_id}.systemquery-peer.localhost`); + await this.server.add_peers(...this.config.peers.map( + peer => parse_peer_name(peer) + )); + l.log(`Added ${this.config.peers.length} initial peers`); + if(this.config.peers.length < 1) + l.warn(`No initial peers were specified! It's recommended that you specify at least 1 on every host.`); } } diff --git a/src/lib/agent/Peer.mjs b/src/lib/agent/Peer.mjs index 211860e..96c099c 100644 --- a/src/lib/agent/Peer.mjs +++ b/src/lib/agent/Peer.mjs @@ -1,6 +1,9 @@ "use strict"; import { EventEmitter, once } from 'events'; + +import log from 'log'; const l = log.get("Peer"); + import Connection from '../transport/Connection.mjs'; class Peer extends EventEmitter { @@ -32,6 +35,11 @@ class Peer extends EventEmitter { * @type {{address:string,port:number}[]} */ this.known_peers = []; + + // TODO: Log when disconnected too + this.once("connect", () => { + l.log(`${this.connection.address}:${this.connection.port} connected`); + }); } /////////////////////////////////////////////////////////////////////////// @@ -66,12 +74,16 @@ class Peer extends EventEmitter { } async __send_hello() { - await this.connection.send("hello", { + await this.send("hello", { id: this.server.our_id, peers: this.server.peers() }); } + async send(event_name, msg) { + await this.connection.send(event_name, msg); + } + /////////////////////////////////////////////////////////////////////////// async destroy() { diff --git a/src/lib/agent/PeerServer.mjs b/src/lib/agent/PeerServer.mjs index 85f670f..de6247b 100644 --- a/src/lib/agent/PeerServer.mjs +++ b/src/lib/agent/PeerServer.mjs @@ -3,6 +3,9 @@ import { EventEmitter, once } from 'events'; import net from 'net'; +import p_retry from 'p-retry'; +import log from 'log'; const l = l.get("PeerServer"); + import Connection from '../transport/Connection.mjs'; import ErrorWrapper from '../core/ErrorWrapper.mjs'; @@ -21,6 +24,9 @@ class PeerServer extends EventEmitter { this.our_id = our_id; this.secret_join = secret_join; + // The number of retries when attempting to connect to a peer + this.retries = 5; + this.connected_peers = []; this.connecting_peers = []; } @@ -56,10 +62,20 @@ class PeerServer extends EventEmitter { async handle_client(client) { const peer = await Peer.Accept(this, await Connection.Wrap(this.secret_join, client)); + this.peer_initialise(peer); + await once(peer, "connect"); + } + + /** + * Initialises a CONNECTED peer and registers it as a valid peer. + * This also includesd attaching the necessary event handlers. + * @param {Peer} peer The peer in question. + * @return {void} + */ + peer_initialise(peer) { this.connected_peers.push(peer); peer.on("message", this.handle_message.bind(this, peer)); peer.on("destroy", this.handle_destroy.bind(this, peer)); - await once(peer, "connect"); } async handle_message(peer, message) { @@ -72,6 +88,7 @@ class PeerServer extends EventEmitter { if(index > -1) this.connected_peers.splice(index, 1); + l.log(`Peer ${peer.address}:${peer.port} disconnected`); this.emit("disconnect", peer.remote_endpoint); } @@ -84,6 +101,29 @@ class PeerServer extends EventEmitter { .filter(el => typeof el.addr === "string" && typeof el.port === "number"); } + /** + * Resolves a Peer id to the respective peer instance. + * @param {string|Peer} peer_id The peer ID to resolve as a string. If a Peer instance is passed instead, this is simply returned unchanged. + * @return {Peer} The Peer instance associated with the given peer id. + */ + peer_resolve(peer_id) { + if(peer_id instanceof Peer) return peer_id; + for (let peer of this.connected_peers) { + if(peer.id === peer_id) return peer; + } + } + + /** + * Resolves a list of peer ids (and potentially Peer instances) to a list + * of Peer instances. + * Any Peer instances passed are returned unchanged. + * @param {...string|Peer} peers The peer(s) to resolve. + * @return {Peer[]} A list of Peer instances. + */ + peers_resolve(...peers) { + return peers.map(this.peer_resolve); + } + /** * Processes a list of peers. * New connections are established to any peers in the list to which @@ -94,10 +134,14 @@ class PeerServer extends EventEmitter { * @returns {Promise} A list of new peers to which we have successfully established a connection. */ async add_peers(...new_peers) { - return (await Promise.all(new_peers.map(async new_peer => this.add_peer( - new_peer.address, - new_peer.port - )))).filter(peer => peer instanceof Peer); + // The arrow function here is NOT async because the promise from + // this.add_peer is returned directly to await Promise.all(). + return (await Promise.all(new_peers.map( + new_peer => p_retry(async () => await this.add_peer( + new_peer.address, + new_peer.port + ), { retries: this.retries }) + ))).filter(peer => peer instanceof Peer); } /** @@ -111,6 +155,7 @@ class PeerServer extends EventEmitter { * @return {Promise} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped. */ async add_peer(address, port) { + // If we're already connected, don't bother reconnecting again if(this.peers().some(el => el.address === address && el.port === port)) return; @@ -119,7 +164,7 @@ class PeerServer extends EventEmitter { let conn; try { conn = await Peer.Initiate(this, address, port); - this.connected_peers.push(conn); + peer_initialise(conn); } catch(error) { throw new ErrorWrapper(`Error: Failed to connect to peer.`, error); @@ -140,7 +185,11 @@ class PeerServer extends EventEmitter { * @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent. */ async send(peer_id, event_name, msg) { - throw new Error(`Not implemented yet`); + if(!(peer_id instanceof Array)) peer_id = [ peer_id ]; + + await Promise.all(this.peers_resolve(...peer_id).map( + peer => peer.send(event_name, msg) + )); } /** diff --git a/src/lib/crypto/hash.mjs b/src/lib/crypto/hash.mjs index b36b998..3fc8def 100644 --- a/src/lib/crypto/hash.mjs +++ b/src/lib/crypto/hash.mjs @@ -2,8 +2,15 @@ import crypto from 'crypto'; -export default function hash(algorithm, encoding, data) { +function hash(algorithm, encoding, data) { let hasher = crypto.createHash(algorithm); hasher.update(data); return hasher.digest(encoding); } + +function sha3hex(data) { + return hash("SHA3-256", "hex", data); +} + +export { hash, sha3hex }; +export default hash; diff --git a/src/lib/io/TomlSettings.mjs b/src/lib/io/TomlSettings.mjs new file mode 100644 index 0000000..83a4c87 --- /dev/null +++ b/src/lib/io/TomlSettings.mjs @@ -0,0 +1,55 @@ +"use strict"; + +import fs from 'fs'; + +import TOML from '@ltd/j-toml'; + +/** + * Reads a pair of TOML configuration files. + * @param {string} file_default The path to the default configuration file. + * @param {string} file_custom The path to the custom configuration file. + * @return {object} The parsed settings object. + */ +function toml_settings_read(file_default, file_custom) { + let obj_default = toml_parse(fs.readFileSync(file_default)), + obj_custom = toml_parse(fs.readFileSync(file_custom)); + + obj_apply_recursive(obj_custom, obj_default); + return obj_default; +} + +/** + * Helper function to parse a given source string of TOML into an object. + * @param {string} source The source string to parse. + * @return {Object} The resulting object. + */ +function toml_parse(source) { + return TOML.parse( + source, // Source string + 1.0, // Specification version + "\n", // Multi line joiner + Number.MAX_SAFE_INTEGER // Use big int + ); +} + +/** + * Merges 2 object trees, overwriting keys in target with those of source. + * Note that target will be mutated! + * @param {object} source The source object to merge. + * @param {object} target The target object to merge. + */ +function obj_apply_recursive(source, target) { + for(let key in source) { + if(typeof source[key] == "object") { + if(typeof target[key] == "undefined") + target[key] = {}; + + obj_apply_recursive(source[key], target[key]); + continue; + } + + target[key] = source[key]; + } +} + +export default toml_settings_read; diff --git a/src/lib/parse/peer_name.mjs b/src/lib/parse/peer_name.mjs new file mode 100644 index 0000000..037117c --- /dev/null +++ b/src/lib/parse/peer_name.mjs @@ -0,0 +1,18 @@ +"use strict"; + +export default function(peer_name) { + let parts = peer_name.split(":", 2); + + if(parts.length < 2) + throw new Error(`Error: The peer name '${peer_name}' doesn't seem to contain a colon.`); + + let address = parts[0].replace(/^\[|\]$/, ""); + let port = parseInt(parts[1]); + if(isNaN(port)) + throw new Error(`Error: Failed to parse port number '${parts[1]}' as a port number.`); + + return { + address, + port + }; +} diff --git a/src/subcommands/agent/agent.mjs b/src/subcommands/agent/agent.mjs index 2b8930d..fc83f72 100644 --- a/src/subcommands/agent/agent.mjs +++ b/src/subcommands/agent/agent.mjs @@ -4,7 +4,22 @@ import fs from 'fs'; import path from 'path'; import settings from '../../settings.mjs'; +import toml_settings_read from '../../lib/io/TomlSettings.mjs'; + +import Agent from '../../lib/agent/Agent.mjs'; + +// HACK: Make sure __dirname is defined when using es6 modules. I forget where I found this - a PR with a source URL would be great :D +const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/")); export default async function () { - // Do stuff + if(!fs.existsSync(settings.cli.config)) + throw new Error(`Error: The config file at '${settings.cli.config}' doesn't appear to exist, or we don't have permission to access it.`); + + let config = toml_settings_read( + path.join(__dirname, "config.default.toml"), + settings.cli.config + ); + + let agent = new Agent(config); + await agent.init(); } diff --git a/src/subcommands/agent/config.default.toml b/src/subcommands/agent/config.default.toml new file mode 100644 index 0000000..8cb8384 --- /dev/null +++ b/src/subcommands/agent/config.default.toml @@ -0,0 +1,22 @@ +# The join secret. This HAS to be the same across al nodes in the swarm. +secret_join = "CHANGE_ME" +# Alternatively, specify a path to a file to read the secret from instead. +# secret_join_filepath will be tried first, and if it's set to CHANGE_ME or +# fails for some reason, secret_join is tried second. +secret_join_filepath = "CHANGE_ME" + +# A list of 1 or more host:port strings to join initially. +# Note that a complete list of hosts in the swarm is NOT required, as when +# this hosts joins at leaast 1 host in this list it will ask them for the +# location of other members of the swarm. +peers = [ ] + +[net] +# Binds to both IPv6 AND IPv4 automatically thanks to the Linux networking stack +bind_address = "::" +# The port to listen on +port = 5252 + +# The number of retries when connecting to peers. +# Exponential backoff on retries is enforced +peer_retries = 5