From 43fed309dcd60760c8f41cb6ec514700421c9825 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Sun, 9 Jan 2022 17:37:06 +0000 Subject: [PATCH] Disconnect duplicate connections --- src/lib/agent/Peer.mjs | 24 ++++++++++--------- src/lib/agent/PeerServer.mjs | 41 ++++++++++++++++++++++---------- src/lib/transport/Connection.mjs | 2 +- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/lib/agent/Peer.mjs b/src/lib/agent/Peer.mjs index 53eff85..255a7ab 100644 --- a/src/lib/agent/Peer.mjs +++ b/src/lib/agent/Peer.mjs @@ -39,10 +39,6 @@ class Peer extends EventEmitter { * @type {{address:string,port:number}[]} */ this.known_peers = []; - - this.once("connect", () => { - l.log(`${this.connection.address}:${this.connection.port} connected`); - }); } /////////////////////////////////////////////////////////////////////////// @@ -58,6 +54,7 @@ class Peer extends EventEmitter { if(!this.__handle_hello(msg)) await this.destroy(); + await this.__send_hello(); this.emit("connect"); } @@ -65,13 +62,19 @@ class Peer extends EventEmitter { * Initiates the handshake after opening a new connection. * @return {Promise} A Promise that resolves after the initial peer handshake is complete. */ - async __initiate() { - this.connection.once("message-hello", async (msg) => { - if(!this.__handle_hello(msg)) - await this.destroy(); - this.emit("connect"); + __initiate() { + return new Promise((resolve, reject) => { + this.connection.once("error", reject); + this.connection.once("message-hello", async (msg) => { + if(!this.__handle_hello(msg)) + await this.destroy(); + + this.connection.off("error", reject); + this.emit("connect"); + resolve(); + }); + this.__send_hello(); // Set and forget }); - await this.__send_hello(); } /** @@ -80,7 +83,6 @@ class Peer extends EventEmitter { * @return {boolean} Whether the peer should stay connected or not. */ __handle_hello(msg) { - l.debug(`handling hello`); this.id = msg.id; this.known_peers = msg.peers; diff --git a/src/lib/agent/PeerServer.mjs b/src/lib/agent/PeerServer.mjs index 17bca10..0aeeb66 100644 --- a/src/lib/agent/PeerServer.mjs +++ b/src/lib/agent/PeerServer.mjs @@ -81,6 +81,9 @@ class PeerServer extends EventEmitter { this.connected_peers.push(peer); peer.on("message", this.handle_message.bind(this, peer)); peer.on("destroy", this.handle_destroy.bind(this, peer)); + + l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} connected`); + this.emit("peer", peer); } async handle_message(peer, message) { @@ -93,7 +96,7 @@ class PeerServer extends EventEmitter { if(index > -1) this.connected_peers.splice(index, 1); - l.log(`Peer ${peer.address}:${peer.port} disconnected`); + l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} disconnected`); this.emit("disconnect", peer.remote_endpoint); } @@ -102,8 +105,11 @@ class PeerServer extends EventEmitter { * @return {{address:string,port:number}[]} */ peers() { - return this.connected_peers.map((peer) => peer.remote_endpoint) - .filter(el => typeof el.address === "string" && typeof el.port === "number"); + return this.connected_peers.map(peer => { return { + address: peer.address, + port: peer.port, + id: peer.id + }}).filter(el => typeof el.address === "string" && typeof el.port === "number"); } /** @@ -199,23 +205,34 @@ class PeerServer extends EventEmitter { const peer_string = `peer:${address}:${port}`; this.connecting_peers.push(peer_string); - let conn = null; + let peer = null; try { - conn = await Peer.Initiate(this, address, port); - this.peer_initialise(conn); + peer = await Peer.Initiate(this, address, port); } catch(error) { throw new ErrorWrapper(`Error: Failed to connect to peer.`, error); } finally { - this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string)); + this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string), 1); } - if(conn === null) return null; + // If the peer didn't connect properly, ignore it + if(peer === null) return null; + // If the ID of the peer is equal to that of one of the existing peers, close the + // connection + if(this.peers().some(existing_peer => existing_peer.id === peer.id)) { + l.info(`Closing duplicate connection to ${peer.id_short}`); + this.remove_peers(peer); + } - l.log(`Peer ${conn.id_short} from ${address}:${port} connected`) - this.emit(`peer`, conn); - return conn; + this.peer_initialise(peer); + return peer; + } + + async remove_peers(...peers) { + // No need to remove the peer from the list of connected peers, as + // there's already a catch-all event handler attached to do that + await Promise.all(peers.map(async peer => await peer.destroy())); } /** @@ -246,7 +263,7 @@ class PeerServer extends EventEmitter { } /** - * Shuts the server down. + * Shuts the server that listens for new peer connections down. * This does not disconnect any existing peers! * @return {Promise} A Promise that resolves once the server has been shutdown. */ diff --git a/src/lib/transport/Connection.mjs b/src/lib/transport/Connection.mjs index 4e95768..70c5253 100644 --- a/src/lib/transport/Connection.mjs +++ b/src/lib/transport/Connection.mjs @@ -110,7 +110,7 @@ class Connection extends EventEmitter { } async destroy() { - l.log(`Killing connection to ${this.address}:${this.port}`, new Error().stack); + l.debug(`Killing connection to ${this.address}:${this.port}`, new Error().stack); if(this.framer instanceof FramedTransport) await this.framer.destroy(); else {