diff --git a/src/lib/agent/Peer.mjs b/src/lib/agent/Peer.mjs index de5f963..211860e 100644 --- a/src/lib/agent/Peer.mjs +++ b/src/lib/agent/Peer.mjs @@ -26,7 +26,12 @@ class Peer extends EventEmitter { */ this.connection = connection; - // Fetch the remote's id + /** + * A list of other peers known to this peer. + * May or may not actually be up to date. + * @type {{address:string,port:number}[]} + */ + this.known_peers = []; } /////////////////////////////////////////////////////////////////////////// @@ -34,7 +39,7 @@ class Peer extends EventEmitter { /** * Accepts an existing connection as a new Peer. * @param {Connection} connection The Connection to accept. - * @return {Promise} A Promise that resolves once the initial handshake is complete. + * @return {Promise} A Promise that resolves once the initial handshake is complete. */ async __accept(connection) { this.connection = connection; @@ -46,18 +51,18 @@ class Peer extends EventEmitter { /** * Initiates the handshake after opening a new connection. - * @return {Promise} A Promise that resolves after the initial peer handshake is complete. + * @return {Promise} A Promise that resolves after the initial peer handshake is complete. */ async __initiate() { await this.__send_hello(); const [ msg ] = await once(this.connection, "message-hello"); this.__handle_hello(msg); - this.emit("connect"); } __handle_hello(msg) { this.id = msg.id; + this.known_peers = msg.peers; } async __send_hello() { @@ -80,11 +85,14 @@ Peer.Initiate = function(server, address, port) { const conn = await Connection.Create(server.secret_join, address, port); const peer = new Peer(server, conn); await peer.__initiate(); + return peer; } Peer.Accept = function(server, connection) { const peer = new Peer(server); await peer.__accept(connection); + + return peer; } export default Peer; diff --git a/src/lib/agent/PeerServer.mjs b/src/lib/agent/PeerServer.mjs index 403e4a2..ffd1360 100644 --- a/src/lib/agent/PeerServer.mjs +++ b/src/lib/agent/PeerServer.mjs @@ -4,16 +4,19 @@ import { EventEmitter, once } from 'events'; import net from 'net'; import Connection from '../transport/Connection.mjs'; +import ErrorWrapper from '../core/ErrorWrapper.mjs'; import Peer from './Peer.mjs'; class PeerServer extends EventEmitter { - constructor(our_id) { + constructor(our_id, secret_join) { super(); this.our_id = our_id; + this.secret_join = secret_join; - this.peers = []; + this.connected_peers = []; + this.connecting_peers = []; } /** @@ -46,8 +49,8 @@ class PeerServer extends EventEmitter { } async handle_client(client) { - const peer = new Peer(this, await Connection.Wrap(client)); - this.peers.push(peer); + const peer = await Peer.Accept(this, await Connection.Wrap(this.secret_join, client)); + this.connected_peers.push(peer); peer.on("message", this.handle_message.bind(this, peer)); peer.on("destroy", this.handle_destroy.bind(this, peer)); await once(peer, "connect"); @@ -59,9 +62,9 @@ class PeerServer extends EventEmitter { } async handle_destroy(peer) { - const index = this.peers.indexOf(peer); + const index = this.connected_peers.indexOf(peer); if(index > -1) - this.peers.splice(index, 1); + this.connected_peers.splice(index, 1); this.emit("disconnect", peer.remote_endpoint); } @@ -70,11 +73,58 @@ class PeerServer extends EventEmitter { * Returns a list of all currently known peer addresses. * @return {{address:string,port:number}[]} */ - peer() { - return this.peers.map((peer) => peer.remote_endpoint) + peers() { + return this.connected_peers.map((peer) => peer.remote_endpoint) .filter(el => typeof el.addr === "string" && typeof el.port === "number"); } + /** + * Processes a list of peers. + * New connections are established to any peers in the list to which + * we don't already have a connection. + * Note that this function does NOT connect to any other peers known to the + * peers in the list you've specified! You need to do this manually. + * @param {...{address: string, port: number}} new_peers The list of new peers to process. + * @returns {Promise} A list of new peers to which we have successfully established a connection. + */ + async add_peers(...new_peers) { + return (await Promise.all(new_peers.map(async new_peer => this.add_peer( + new_peer.address, + new_peer.port + )))).filter(peer => peer instanceof Peer); + } + + /** + * Connects to a new peer and adds them to the pool of currently connected peers. + * Note: This does NOT automatically connect to all the peers known to the + * peer you're connecting to! + * You need to do this manually. + * @throws {ErrorWrapper} Throws if the connection failed. This could be for a large number of different reasons, from an incorrect join secret from the remote to connection issues. + * @param {string} address The address to connect to. + * @param {number} port The port number to connect to. + * @return {Promise} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped. + */ + async add_peer(address, port) { + if(this.peers().some(el => el.address === address && el.port === port)) + return; + + const peer_string = `peer:${address}:${port}`; + this.connecting_peers.push(peer_string); + let conn; + try { + conn = await Peer.Initiate(this, address, port); + this.connected_peers.push(conn); + } + catch(error) { + throw new ErrorWrapper(`Error: Failed to connect to peer.`, error); + } + finally { + this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string)); + } + + this.emit(`peer`, conn); + return conn; + } /** * Shuts the server down. @@ -93,7 +143,7 @@ class PeerServer extends EventEmitter { */ async destroy() { await this.shutdown_server(); - await Promise.all(...this.peers.map(peer => peer.destroy())); + await Promise.all(...this.connected_peers.map(peer => peer.destroy())); } } diff --git a/src/lib/core/ErrorWrapper.mjs b/src/lib/core/ErrorWrapper.mjs new file mode 100644 index 0000000..5382109 --- /dev/null +++ b/src/lib/core/ErrorWrapper.mjs @@ -0,0 +1,14 @@ +"use strict"; + +class ErrorWrapper extends Error { + constructor(message, inner_exception) { + super(message); + this.inner_exception = inner_exception; + } + + toString() { + return `${super.toString()}\n***Inner Exception ***\n${this.inner_exception}`; + } +} + +export ErrorWrapper;