"use strict"; import { EventEmitter, once } from 'events'; import net from 'net'; import Connection from '../transport/Connection.mjs'; import ErrorWrapper from '../core/ErrorWrapper.mjs'; import Peer from './Peer.mjs'; /** * A server that handles connections to many peers. * @extends EventEmitter */ class PeerServer extends EventEmitter { constructor(our_id, secret_join) { super(); this.our_id = our_id; this.secret_join = secret_join; this.connected_peers = []; this.connecting_peers = []; } /** * Starts the PeerServer listening on the given port and bind address. * @param {Number} [port=5252] The port number to listen on. * @param {String} [host="::"] The address to bind to. * @return {Promise} A Promise that resolves when the server setup is complete. */ listen(port = 5252, host="::") { return new Promise((resolve, reject) => { this.server = net.createServer(async (client) => { await this.handle_client(client); }); this.server.once("error", reject); this.server.on("error", this.handle_error); this.server.listen({ host, port, exclusive: false }, () => { this.server.off("error", reject); resolve(); }); }); } handle_error(error) { throw error; } async handle_client(client) { const peer = await Peer.Accept(this, await Connection.Wrap(this.secret_join, client)); this.connected_peers.push(peer); peer.on("message", this.handle_message.bind(this, peer)); peer.on("destroy", this.handle_destroy.bind(this, peer)); await once(peer, "connect"); } async handle_message(peer, message) { this.emit("message", peer, message); this.emit(`message-${message.event}`, peer, message.message); } async handle_destroy(peer) { const index = this.connected_peers.indexOf(peer); if(index > -1) this.connected_peers.splice(index, 1); this.emit("disconnect", peer.remote_endpoint); } /** * Returns a list of all currently known peer addresses. * @return {{address:string,port:number}[]} */ peers() { return this.connected_peers.map((peer) => peer.remote_endpoint) .filter(el => typeof el.addr === "string" && typeof el.port === "number"); } /** * Processes a list of peers. * New connections are established to any peers in the list to which * we don't already have a connection. * Note that this function does NOT connect to any other peers known to the * peers in the list you've specified! You need to do this manually. * @param {...{address: string, port: number}} new_peers The list of new peers to process. * @returns {Promise} A list of new peers to which we have successfully established a connection. */ async add_peers(...new_peers) { return (await Promise.all(new_peers.map(async new_peer => this.add_peer( new_peer.address, new_peer.port )))).filter(peer => peer instanceof Peer); } /** * Connects to a new peer and adds them to the pool of currently connected peers. * Note: This does NOT automatically connect to all the peers known to the * peer you're connecting to! * You need to do this manually. * @throws {ErrorWrapper} Throws if the connection failed. This could be for a large number of different reasons, from an incorrect join secret from the remote to connection issues. * @param {string} address The address to connect to. * @param {number} port The port number to connect to. * @return {Promise} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped. */ async add_peer(address, port) { if(this.peers().some(el => el.address === address && el.port === port)) return; const peer_string = `peer:${address}:${port}`; this.connecting_peers.push(peer_string); let conn; try { conn = await Peer.Initiate(this, address, port); this.connected_peers.push(conn); } catch(error) { throw new ErrorWrapper(`Error: Failed to connect to peer.`, error); } finally { this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string)); } this.emit(`peer`, conn); return conn; } /** * Sends a message to 1 or more peers. * @param {string|Peer|string[]|Peer[]} peer_id Either the peer id or the peer itself to which we should send the message. May also be an array of arbitrarily mixed items - in which case the message will be sent to all the specified peers in parallel. The order which peers are messaged is undefined. * @param {string} event_name The name of the event to send. * @param {Object} msg The message itself to send. * @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent. */ async send(peer_id, event_name, msg) { throw new Error(`Not implemented yet`); } /** * Sends a message in parallel to all peers to which we have an established * connection. * The order which peers are messaged is undefined. * @param {string} event_name The name of the event to send. * @param {Object} msg The message itself to send. * @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent. */ async broadcast(event_name, msg) { await this.send(this.connected_peers, event_name, msg); } /** * Shuts the server down. * This does not disconnect any existing peers! * @return {Promise} A Promise that resolves once the server has been shutdown. */ shutdown_server() { return new Promise((resolve, _reject) => { this.server.close(resolve); }); } /** * Stops the PeerServer and gracefully (if possible) disconnects all existing peers. * @return {Promise} A Promise that resolves once the shutdown process has completed. */ async destroy() { await this.shutdown_server(); await Promise.all(...this.connected_peers.map(peer => peer.destroy())); } } export default PeerServer;