From 1091f9c3c28ff731f919b23cbe99d294797cbf2b Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Sun, 9 Jan 2022 20:28:58 +0000 Subject: [PATCH] Exchange peerlists --- src/lib/agent/Agent.mjs | 53 +++++++++++++++++++++++++++++++++++ src/lib/agent/Peer.mjs | 13 +++------ src/lib/agent/PeerServer.mjs | 6 ++-- src/lib/core/ErrorWrapper.mjs | 4 +++ 4 files changed, 65 insertions(+), 11 deletions(-) diff --git a/src/lib/agent/Agent.mjs b/src/lib/agent/Agent.mjs index 5694e79..3373ce8 100644 --- a/src/lib/agent/Agent.mjs +++ b/src/lib/agent/Agent.mjs @@ -34,16 +34,34 @@ class Agent { } async init() { + /// + // 1: Create our local environment + /// /** Our peer id - calculated automatically from the system's uuid */ this.peer_id = await hostuuid(this.config.net.port); this.peer_name = os.hostname(); + + /// + // 2: Create the peer server + /// this.server = new PeerServer( this.peer_id, await this.find_secret() ); this.server.retries = this.config.net.peer_retries; + + /// + // 3: Attach message handling listeners + /// + this.server.on("message-peerlist-request", this.__send_peer_list.bind(this)); + this.server.on("message-peerlist-response", this.__handle_peer_list.bind(this)); + + + /// + // 4: Start listening + /// l.log(`Starting peer listener....`); await this.server.listen( this.config.net.port, @@ -51,6 +69,10 @@ class Agent { ); l.log(`Listening on ${this.config.net.bind_address}:${this.config.net.port}`); + + /// + // 5: Add initial peers to kick things off + /// await this.server.add_peers(...this.config.peers.map( peer => parse_peer_name(peer) )); @@ -58,6 +80,37 @@ class Agent { l.log(`Added ${this.server.peers().length} / ${this.config.peers.length} initial peers`); if(this.config.peers.length < 1) l.warn(`No initial peers were specified! It's recommended that you specify at least 1 on every host.`); + + // Ask for more peers + await this.server.broadcast("peerlist-request", {}); + } + + /** + * Sends a list of known peers to the given peer. + * @param {Peer} peer The peer to send the list to. + * @return {Promise} A Promise that resolves whent he message has been sent. + */ + async __send_peer_list(peer, _msg) { + l.log(`Sending peer list to ${peer.id_short}`); + await peer.send("peerlist-response", { + peers: this.server.peers() + }); + } + + async __handle_peer_list(peer, msg) { + l.log(`Received peer list from ${peer.id_short}`); + if(!(msg.peers instanceof Array)) { + l.warn(`Encountered invalid peer list message from peer ${peer.id_short}.`); + return false; + } + let peerids_connected = this.server.peers().map(el => el.id); + let new_peers = await this.server.add_peers(...msg.peers + .filter(el => !peerids_connected.includes(el.id) && el.id !== this.peer_id) + .map(el => { return { address: el.listening_address, port: el.listening_port }; })); + if(new_peers.length > 0) + l.log(`Connected to ${new_peers.length} / ${msg.peers.length} peers from peerlist sent by ${peer.id_short}`); + else + l.log(`No new peers in peerlist sent by ${peer.id_short}`); } } diff --git a/src/lib/agent/Peer.mjs b/src/lib/agent/Peer.mjs index 67ed86b..474f0f4 100644 --- a/src/lib/agent/Peer.mjs +++ b/src/lib/agent/Peer.mjs @@ -33,13 +33,6 @@ class Peer extends EventEmitter { */ this.connection = connection; - /** - * A list of other peers known to this peer. - * May or may not actually be up to date. - * @type {{address:string,port:number}[]} - */ - this.known_peers = []; - // If a message with the event name "end" is received, close our side // of the connection this.once(`message-end`, this.destroy); @@ -88,7 +81,8 @@ class Peer extends EventEmitter { */ __handle_hello(msg) { this.id = msg.id; - this.known_peers = msg.peers; + this.listening_address = msg.listening_address; + this.listening_port = msg.listening_port; if(this.id === this.server.our_id) { l.warn(`Our id (${this.server.our_id}) is equal to that of the remote (${this.id}), killing connection`); @@ -111,7 +105,8 @@ class Peer extends EventEmitter { async __send_hello() { await this.send("hello", { id: this.server.our_id, - peers: this.server.peers() + listening_address: this.server.host, + listening_port: this.server.port }); } diff --git a/src/lib/agent/PeerServer.mjs b/src/lib/agent/PeerServer.mjs index f56e002..41e9cbc 100644 --- a/src/lib/agent/PeerServer.mjs +++ b/src/lib/agent/PeerServer.mjs @@ -108,7 +108,9 @@ class PeerServer extends EventEmitter { return this.connected_peers.map(peer => { return { address: peer.address, port: peer.port, - id: peer.id + id: peer.id, + listening_address: peer.listening_address, + listening_port: peer.listening_port }}).filter(el => typeof el.address === "string" && typeof el.port === "number"); } @@ -153,7 +155,7 @@ class PeerServer extends EventEmitter { ), { retries: this.retries, onFailedAttempt: (error) => { - switch(error instanceof ErrorWrapper ? error.inner_exception.code : error.code) { + switch(error.code) { case "ECONNREFUSED": l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`); break; diff --git a/src/lib/core/ErrorWrapper.mjs b/src/lib/core/ErrorWrapper.mjs index 27fb432..5f8128e 100644 --- a/src/lib/core/ErrorWrapper.mjs +++ b/src/lib/core/ErrorWrapper.mjs @@ -1,6 +1,10 @@ "use strict"; class ErrorWrapper extends Error { + get code() { + return this.inner_exception.code; + } + constructor(message, inner_exception) { super(message); this.inner_exception = inner_exception;