Disconnect duplicate connections

This commit is contained in:
Starbeamrainbowlabs 2022-01-09 17:37:06 +00:00
parent e4c01ae828
commit 43fed309dc
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
3 changed files with 43 additions and 24 deletions

View file

@ -39,10 +39,6 @@ class Peer extends EventEmitter {
* @type {{address:string,port:number}[]} * @type {{address:string,port:number}[]}
*/ */
this.known_peers = []; this.known_peers = [];
this.once("connect", () => {
l.log(`${this.connection.address}:${this.connection.port} connected`);
});
} }
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
@ -58,6 +54,7 @@ class Peer extends EventEmitter {
if(!this.__handle_hello(msg)) if(!this.__handle_hello(msg))
await this.destroy(); await this.destroy();
await this.__send_hello();
this.emit("connect"); this.emit("connect");
} }
@ -65,13 +62,19 @@ class Peer extends EventEmitter {
* Initiates the handshake after opening a new connection. * Initiates the handshake after opening a new connection.
* @return {Promise<HelloMsg>} A Promise that resolves after the initial peer handshake is complete. * @return {Promise<HelloMsg>} A Promise that resolves after the initial peer handshake is complete.
*/ */
async __initiate() { __initiate() {
this.connection.once("message-hello", async (msg) => { return new Promise((resolve, reject) => {
if(!this.__handle_hello(msg)) this.connection.once("error", reject);
await this.destroy(); this.connection.once("message-hello", async (msg) => {
this.emit("connect"); if(!this.__handle_hello(msg))
await this.destroy();
this.connection.off("error", reject);
this.emit("connect");
resolve();
});
this.__send_hello(); // Set and forget
}); });
await this.__send_hello();
} }
/** /**
@ -80,7 +83,6 @@ class Peer extends EventEmitter {
* @return {boolean} Whether the peer should stay connected or not. * @return {boolean} Whether the peer should stay connected or not.
*/ */
__handle_hello(msg) { __handle_hello(msg) {
l.debug(`handling hello`);
this.id = msg.id; this.id = msg.id;
this.known_peers = msg.peers; this.known_peers = msg.peers;

View file

@ -81,6 +81,9 @@ class PeerServer extends EventEmitter {
this.connected_peers.push(peer); this.connected_peers.push(peer);
peer.on("message", this.handle_message.bind(this, peer)); peer.on("message", this.handle_message.bind(this, peer));
peer.on("destroy", this.handle_destroy.bind(this, peer)); peer.on("destroy", this.handle_destroy.bind(this, peer));
l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} connected`);
this.emit("peer", peer);
} }
async handle_message(peer, message) { async handle_message(peer, message) {
@ -93,7 +96,7 @@ class PeerServer extends EventEmitter {
if(index > -1) if(index > -1)
this.connected_peers.splice(index, 1); this.connected_peers.splice(index, 1);
l.log(`Peer ${peer.address}:${peer.port} disconnected`); l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} disconnected`);
this.emit("disconnect", peer.remote_endpoint); this.emit("disconnect", peer.remote_endpoint);
} }
@ -102,8 +105,11 @@ class PeerServer extends EventEmitter {
* @return {{address:string,port:number}[]} * @return {{address:string,port:number}[]}
*/ */
peers() { peers() {
return this.connected_peers.map((peer) => peer.remote_endpoint) return this.connected_peers.map(peer => { return {
.filter(el => typeof el.address === "string" && typeof el.port === "number"); address: peer.address,
port: peer.port,
id: peer.id
}}).filter(el => typeof el.address === "string" && typeof el.port === "number");
} }
/** /**
@ -199,23 +205,34 @@ class PeerServer extends EventEmitter {
const peer_string = `peer:${address}:${port}`; const peer_string = `peer:${address}:${port}`;
this.connecting_peers.push(peer_string); this.connecting_peers.push(peer_string);
let conn = null; let peer = null;
try { try {
conn = await Peer.Initiate(this, address, port); peer = await Peer.Initiate(this, address, port);
this.peer_initialise(conn);
} }
catch(error) { catch(error) {
throw new ErrorWrapper(`Error: Failed to connect to peer.`, error); throw new ErrorWrapper(`Error: Failed to connect to peer.`, error);
} }
finally { finally {
this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string)); this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string), 1);
} }
if(conn === null) return null; // If the peer didn't connect properly, ignore it
if(peer === null) return null;
// If the ID of the peer is equal to that of one of the existing peers, close the
// connection
if(this.peers().some(existing_peer => existing_peer.id === peer.id)) {
l.info(`Closing duplicate connection to ${peer.id_short}`);
this.remove_peers(peer);
}
l.log(`Peer ${conn.id_short} from ${address}:${port} connected`) this.peer_initialise(peer);
this.emit(`peer`, conn); return peer;
return conn; }
async remove_peers(...peers) {
// No need to remove the peer from the list of connected peers, as
// there's already a catch-all event handler attached to do that
await Promise.all(peers.map(async peer => await peer.destroy()));
} }
/** /**
@ -246,7 +263,7 @@ class PeerServer extends EventEmitter {
} }
/** /**
* Shuts the server down. * Shuts the server that listens for new peer connections down.
* This does not disconnect any existing peers! * This does not disconnect any existing peers!
* @return {Promise} A Promise that resolves once the server has been shutdown. * @return {Promise} A Promise that resolves once the server has been shutdown.
*/ */

View file

@ -110,7 +110,7 @@ class Connection extends EventEmitter {
} }
async destroy() { async destroy() {
l.log(`Killing connection to ${this.address}:${this.port}`, new Error().stack); l.debug(`Killing connection to ${this.address}:${this.port}`, new Error().stack);
if(this.framer instanceof FramedTransport) if(this.framer instanceof FramedTransport)
await this.framer.destroy(); await this.framer.destroy();
else { else {