From 63b698573f94310ab43f00a9377a13b645d7b076 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Sun, 9 Jan 2022 16:57:07 +0000 Subject: [PATCH] Debug connectiono system --- package-lock.json | 17 ++++++++++++ package.json | 1 + src/lib/agent/Peer.mjs | 10 ++++--- src/lib/agent/PeerServer.mjs | 52 ++++++++++++++++++++---------------- 4 files changed, 53 insertions(+), 27 deletions(-) diff --git a/package-lock.json b/package-lock.json index 1005b18..854fae0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "jpake": "^1.0.1", "nexline": "^1.2.2", "p-queue": "^7.1.0", + "p-reflect": "^3.0.0", "p-retry": "^5.0.0", "systeminformation": "^5.9.4", "tweetnacl": "^1.0.3" @@ -128,6 +129,17 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-reflect": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", + "integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-retry": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz", @@ -300,6 +312,11 @@ "p-timeout": "^5.0.0" } }, + "p-reflect": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", + "integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==" + }, "p-retry": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz", diff --git a/package.json b/package.json index fa30454..c605106 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "jpake": "^1.0.1", "nexline": "^1.2.2", "p-queue": "^7.1.0", + "p-reflect": "^3.0.0", "p-retry": "^5.0.0", "systeminformation": "^5.9.4", "tweetnacl": "^1.0.3" diff --git a/src/lib/agent/Peer.mjs b/src/lib/agent/Peer.mjs index 6237177..53eff85 100644 --- a/src/lib/agent/Peer.mjs +++ b/src/lib/agent/Peer.mjs @@ -66,11 +66,12 @@ class Peer extends EventEmitter { * @return {Promise} A Promise that resolves after the initial peer handshake is complete. */ async __initiate() { + this.connection.once("message-hello", async (msg) => { + if(!this.__handle_hello(msg)) + await this.destroy(); + this.emit("connect"); + }); await this.__send_hello(); - const [ msg ] = await once(this.connection, "message-hello"); - if(!this.__handle_hello(msg)) - await this.destroy(); - this.emit("connect"); } /** @@ -79,6 +80,7 @@ class Peer extends EventEmitter { * @return {boolean} Whether the peer should stay connected or not. */ __handle_hello(msg) { + l.debug(`handling hello`); this.id = msg.id; this.known_peers = msg.peers; diff --git a/src/lib/agent/PeerServer.mjs b/src/lib/agent/PeerServer.mjs index 6ed0088..c65109c 100644 --- a/src/lib/agent/PeerServer.mjs +++ b/src/lib/agent/PeerServer.mjs @@ -4,6 +4,7 @@ import { EventEmitter, once } from 'events'; import net from 'net'; import p_retry from 'p-retry'; +import p_reflect from 'p-reflect'; import log from '../io/NamespacedLog.mjs'; const l = log("peerserver"); @@ -138,29 +139,34 @@ class PeerServer extends EventEmitter { * @returns {Promise} A list of new peers to which we have successfully established a connection. */ async add_peers(...new_peers) { - // The arrow function here is NOT async because the promise from - // this.add_peer is returned directly to await Promise.all(). return (await Promise.all(new_peers.map( - new_peer => p_retry(async () => await this.__add_peer( - new_peer.address, - new_peer.port - ), { - retries: this.retries, - onFailedAttempt: (error) => { - switch(error instanceof ErrorWrapper ? error.inner_exception.code : error.code) { - case "ECONNREFUSED": - l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`); - break; - - default: - let msg = `[attempt ${error.attemptNumber} / ${error.retriesLeft+error.attemptNumber}] Error while connecting to ${new_peer.address}:${new_peer.port}: ${error}`; - if(settings.cli.verbose) msg += `\n${error.stack}`; - l.error(msg); - break; + async new_peer => { + let result = await p_reflect(p_retry(async () => await this.__add_peer( + new_peer.address, + new_peer.port + ), { + retries: this.retries, + onFailedAttempt: (error) => { + switch(error instanceof ErrorWrapper ? error.inner_exception.code : error.code) { + case "ECONNREFUSED": + l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`); + break; + + default: + let msg = `[attempt ${error.attemptNumber} / ${error.retriesLeft+error.attemptNumber}] Error while connecting to ${new_peer.address}:${new_peer.port}: ${error}`; + if(settings.cli.verbose) msg += `\n${error.stack}`; + l.error(msg); + break; + } } + })); + if(result.isRejected) { + l.error(`Failed to connect to ${new_peer.address}:${new_peer.port}: ${result.reason}`); + return null; } + return result.value; }) - ))).filter(peer => peer instanceof Peer); + )).filter(peer => peer instanceof Peer); } /** @@ -177,15 +183,16 @@ class PeerServer extends EventEmitter { * @return {Promise} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped. */ async __add_peer(address, port) { - l.log(`Attempting to connect to ${address}:${port}`); + l.info(`Attempting to connect to ${address}:${port}`); // If we're already connected, don't bother reconnecting again if(this.peers().some(el => el.address === address && el.port === port)) { l.log(`A connection is already open to ${address}:${port}, skipping`); return; } + // If we are attempting to connect to ourselves, then don't bother - if(address == this.host && port == this.port) { + if((address == "127.0.0.1" || address == "::" || address == "::1") && port == this.port) { l.log(`${address}:${port} is actually us, skipping`); return; } @@ -195,8 +202,7 @@ class PeerServer extends EventEmitter { let conn = null; try { conn = await Peer.Initiate(this, address, port); - peer_initialise(conn); - console.log(`[DEBUG] conn before`, conn); + this.peer_initialise(conn); } catch(error) { throw new ErrorWrapper(`Error: Failed to connect to peer.`, error);