Debug connectiono system

This commit is contained in:
Starbeamrainbowlabs 2022-01-09 16:57:07 +00:00
parent 53b0d661a9
commit 63b698573f
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
4 changed files with 53 additions and 27 deletions

17
package-lock.json generated
View file

@ -14,6 +14,7 @@
"jpake": "^1.0.1", "jpake": "^1.0.1",
"nexline": "^1.2.2", "nexline": "^1.2.2",
"p-queue": "^7.1.0", "p-queue": "^7.1.0",
"p-reflect": "^3.0.0",
"p-retry": "^5.0.0", "p-retry": "^5.0.0",
"systeminformation": "^5.9.4", "systeminformation": "^5.9.4",
"tweetnacl": "^1.0.3" "tweetnacl": "^1.0.3"
@ -128,6 +129,17 @@
"url": "https://github.com/sponsors/sindresorhus" "url": "https://github.com/sponsors/sindresorhus"
} }
}, },
"node_modules/p-reflect": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz",
"integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-retry": { "node_modules/p-retry": {
"version": "5.0.0", "version": "5.0.0",
"resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz", "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz",
@ -300,6 +312,11 @@
"p-timeout": "^5.0.0" "p-timeout": "^5.0.0"
} }
}, },
"p-reflect": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz",
"integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA=="
},
"p-retry": { "p-retry": {
"version": "5.0.0", "version": "5.0.0",
"resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz", "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-5.0.0.tgz",

View file

@ -22,6 +22,7 @@
"jpake": "^1.0.1", "jpake": "^1.0.1",
"nexline": "^1.2.2", "nexline": "^1.2.2",
"p-queue": "^7.1.0", "p-queue": "^7.1.0",
"p-reflect": "^3.0.0",
"p-retry": "^5.0.0", "p-retry": "^5.0.0",
"systeminformation": "^5.9.4", "systeminformation": "^5.9.4",
"tweetnacl": "^1.0.3" "tweetnacl": "^1.0.3"

View file

@ -66,11 +66,12 @@ class Peer extends EventEmitter {
* @return {Promise<HelloMsg>} A Promise that resolves after the initial peer handshake is complete. * @return {Promise<HelloMsg>} A Promise that resolves after the initial peer handshake is complete.
*/ */
async __initiate() { async __initiate() {
this.connection.once("message-hello", async (msg) => {
if(!this.__handle_hello(msg))
await this.destroy();
this.emit("connect");
});
await this.__send_hello(); await this.__send_hello();
const [ msg ] = await once(this.connection, "message-hello");
if(!this.__handle_hello(msg))
await this.destroy();
this.emit("connect");
} }
/** /**
@ -79,6 +80,7 @@ class Peer extends EventEmitter {
* @return {boolean} Whether the peer should stay connected or not. * @return {boolean} Whether the peer should stay connected or not.
*/ */
__handle_hello(msg) { __handle_hello(msg) {
l.debug(`handling hello`);
this.id = msg.id; this.id = msg.id;
this.known_peers = msg.peers; this.known_peers = msg.peers;

View file

@ -4,6 +4,7 @@ import { EventEmitter, once } from 'events';
import net from 'net'; import net from 'net';
import p_retry from 'p-retry'; import p_retry from 'p-retry';
import p_reflect from 'p-reflect';
import log from '../io/NamespacedLog.mjs'; const l = log("peerserver"); import log from '../io/NamespacedLog.mjs'; const l = log("peerserver");
@ -138,29 +139,34 @@ class PeerServer extends EventEmitter {
* @returns {Promise<Peer[]>} A list of new peers to which we have successfully established a connection. * @returns {Promise<Peer[]>} A list of new peers to which we have successfully established a connection.
*/ */
async add_peers(...new_peers) { async add_peers(...new_peers) {
// The arrow function here is NOT async because the promise from
// this.add_peer is returned directly to await Promise.all().
return (await Promise.all(new_peers.map( return (await Promise.all(new_peers.map(
new_peer => p_retry(async () => await this.__add_peer( async new_peer => {
new_peer.address, let result = await p_reflect(p_retry(async () => await this.__add_peer(
new_peer.port new_peer.address,
), { new_peer.port
retries: this.retries, ), {
onFailedAttempt: (error) => { retries: this.retries,
switch(error instanceof ErrorWrapper ? error.inner_exception.code : error.code) { onFailedAttempt: (error) => {
case "ECONNREFUSED": switch(error instanceof ErrorWrapper ? error.inner_exception.code : error.code) {
l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`); case "ECONNREFUSED":
break; l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`);
break;
default: default:
let msg = `[attempt ${error.attemptNumber} / ${error.retriesLeft+error.attemptNumber}] Error while connecting to ${new_peer.address}:${new_peer.port}: ${error}`; let msg = `[attempt ${error.attemptNumber} / ${error.retriesLeft+error.attemptNumber}] Error while connecting to ${new_peer.address}:${new_peer.port}: ${error}`;
if(settings.cli.verbose) msg += `\n${error.stack}`; if(settings.cli.verbose) msg += `\n${error.stack}`;
l.error(msg); l.error(msg);
break; break;
}
} }
}));
if(result.isRejected) {
l.error(`Failed to connect to ${new_peer.address}:${new_peer.port}: ${result.reason}`);
return null;
} }
return result.value;
}) })
))).filter(peer => peer instanceof Peer); )).filter(peer => peer instanceof Peer);
} }
/** /**
@ -177,15 +183,16 @@ class PeerServer extends EventEmitter {
* @return {Promise<Peer|null>} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped. * @return {Promise<Peer|null>} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped.
*/ */
async __add_peer(address, port) { async __add_peer(address, port) {
l.log(`Attempting to connect to ${address}:${port}`); l.info(`Attempting to connect to ${address}:${port}`);
// If we're already connected, don't bother reconnecting again // If we're already connected, don't bother reconnecting again
if(this.peers().some(el => el.address === address && el.port === port)) { if(this.peers().some(el => el.address === address && el.port === port)) {
l.log(`A connection is already open to ${address}:${port}, skipping`); l.log(`A connection is already open to ${address}:${port}, skipping`);
return; return;
} }
// If we are attempting to connect to ourselves, then don't bother // If we are attempting to connect to ourselves, then don't bother
if(address == this.host && port == this.port) { if((address == "127.0.0.1" || address == "::" || address == "::1") && port == this.port) {
l.log(`${address}:${port} is actually us, skipping`); l.log(`${address}:${port} is actually us, skipping`);
return; return;
} }
@ -195,8 +202,7 @@ class PeerServer extends EventEmitter {
let conn = null; let conn = null;
try { try {
conn = await Peer.Initiate(this, address, port); conn = await Peer.Initiate(this, address, port);
peer_initialise(conn); this.peer_initialise(conn);
console.log(`[DEBUG] conn before`, conn);
} }
catch(error) { catch(error) {
throw new ErrorWrapper(`Error: Failed to connect to peer.`, error); throw new ErrorWrapper(`Error: Failed to connect to peer.`, error);