"use strict"; import fs from 'fs'; import os from 'os'; import path from 'path'; import { EventEmitter } from 'events'; import log from '../io/NamespacedLog.mjs'; const l = log("agent"); import settings from '../../settings.mjs'; import PeerServer from './PeerServer.mjs'; import parse_peer_name from '../parse/peer_name.mjs'; import hostuuid from '../io/hostuuid.mjs'; class Agent extends EventEmitter { get local_port() { return this.config.net.port; } get local_bind_address() { return this.config.net.bind_address; } get connected_peers() { return this.server.connected_peers; } get connecting_peers() { return this.server.connecting_peers; } get listening() { return this.server !== null ? this.server.listening : false; } constructor(config) { super(); this.config = config; this.peer_id = null; this.peer_name = null; this.server = null; } async find_secret() { if(this.config.secret_join_filepath !== "CHANGE_ME") { let filepath = path.resolve( path.dirname(settings.cli.config), this.config.secret_join_filepath ); if(fs.existsSync(filepath)) return await fs.promises.readFile(filepath, "utf-8"); } if(this.config.secret_join !== "CHANGE_ME") return this.config.secret_join; throw new Error(`Error: Failed to find join secret.`); } async init() { /// // 1: Create our local environment /// /** Our peer id - calculated automatically from the system's uuid */ this.peer_id = await hostuuid(this.config.net.port); this.peer_name = os.hostname(); /// // 2: Create the peer server /// this.server = new PeerServer( this.peer_id, await this.find_secret() ); this.server.retries = this.config.net.peer_retries; /// // 3: Attach message handling listeners /// this.server.on("message-peerlist-request", this.__send_peer_list.bind(this)); this.server.on("message-peerlist-response", this.__handle_peer_list.bind(this)); this.server.on("message", this.handle_message.bind(this)); this.server.on("destroy", this.handle_destroy.bind(this)); /// // 4: Start listening /// l.log(`Starting peer listener....`); await this.server.listen( this.config.net.port, this.config.net.bind_address ); l.log(`Listening on ${this.config.net.bind_address}:${this.config.net.port}`); /// // 5: Add initial peers to kick things off /// await this.server.add_peers(...this.config.peers.map( peer => parse_peer_name(peer) )); l.log(`Added ${this.server.peers().length} / ${this.config.peers.length} initial peers`); if(this.config.peers.length < 1) l.warn(`No initial peers were specified! It's recommended that you specify at least 1 on every host.`); // Ask for more peers await this.server.broadcast("peerlist-request", {}); } /** * Sends a list of known peers to the given peer. * @param {Peer} peer The peer to send the list to. * @return {Promise} A Promise that resolves when the message has been sent. */ async __send_peer_list(peer, _msg) { l.log(`Sending peer list to ${peer.id_short}`); await peer.send("peerlist-response", { peers: this.server.peers().map(peer_next => peer_next.info) }); } async __handle_peer_list(peer, msg) { l.log(`Received peer list from ${peer.id_short}`); if(!(msg.peers instanceof Array)) { l.warn(`Encountered invalid peer list message from peer ${peer.id_short}.`); return false; } let peerids_connected = this.server.peers().map(el => el.id); let new_peers = await this.server.add_peers(...msg.peers .filter(el => !peerids_connected.includes(el.id) && el.id !== this.peer_id) .map(el => { return { address: el.listening_address, port: el.listening_port }; })); if(new_peers.length > 0) l.log(`Connected to ${new_peers.length} / ${msg.peers.length} peers from peerlist sent by ${peer.id_short}`); else l.log(`No new peers in peerlist sent by ${peer.id_short}`); } handle_message(peer, event_name, msg) { this.emit("message", peer, event_name, msg); this.emit(`message-${event_name}`, peer, msg); } handle_destroy(peer_info) { const index = this.connected_peers.indexOf(peer); if(index > -1) this.connected_peers.splice(index, 1); // Remember, this is the address:port of the connecting port, NOT the server we would connect to to re-establish a connection! l.log(`Peer ${peer_info.id_short} from ${peer_info.remote.address}:${peer_info.remote.port} disconnected`); this.emit("disconnect", peer_info); } /** * Sends a message to 1 or more peers. * @param {string|Peer|string[]|Peer[]} peer_id Either the peer id or the peer itself to which we should send the message. May also be an array of arbitrarily mixed items - in which case the message will be sent to all the specified peers in parallel. The order which peers are messaged is undefined. * @param {string} event_name The name of the event to send. * @param {Object} msg The message itself to send. * @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent. */ async send(peer_id, event_name, msg) { return await this.server.send(peer_id, event_name, msg); } /** * Sends a message in parallel to all peers to which we have an established * connection. * The order which peers are messaged is undefined. * @param {string} event_name The name of the event to send. * @param {Object} msg The message itself to send. * @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent. */ async broadcast(event_name, msg) { return await this.server.broadcast(event_name, msg); } } export default Agent;