systemquery/src/lib/agent/PeerServer.mjs

334 lines
11 KiB
JavaScript

"use strict";
import { EventEmitter, once } from 'events';
import net from 'net';
import p_retry from 'p-retry';
import p_reflect from 'p-reflect';
import log from '../io/NamespacedLog.mjs'; const l = log("peerserver");
import settings from '../../settings.mjs';
import Connection from '../transport/Connection.mjs';
import ErrorWrapper from '../core/ErrorWrapper.mjs';
import Peer from './Peer.mjs';
/**
* A server that handles connections to many peers.
* Note that when a new peer connects it is NOT asked for a list of peers it is
* aware of. This is something you need to handle yourself!
* @extends EventEmitter
*/
class PeerServer extends EventEmitter {
get listening() {
return this.server instanceof net.Server ? this.server.listening : false;
}
constructor(our_id, our_name, secret_join) {
super();
this.our_id = our_id;
this.our_name = our_name;
this.secret_join = secret_join;
// The number of retries when attempting to connect to a peer
this.retries = 5;
this.connected_peers = [];
this.connecting_peers = [];
}
/**
* Starts the PeerServer listening on the given port and bind address.
* @param {Number} [port=5252] The port number to listen on.
* @param {String} [host="::"] The address to bind to.
* @return {Promise<void>} A Promise that resolves when the server setup is complete.
*/
listen(port = 5252, host="::") {
return new Promise((resolve, reject) => {
this.host = host;
this.port = port;
this.server = net.createServer(async (client) => {
await this.handle_client(client);
});
this.server.once("error", reject);
this.server.on("error", this.handle_error);
this.server.listen({
host,
port,
exclusive: false
}, () => {
this.server.off("error", reject);
resolve();
});
});
}
handle_error(error) {
throw error;
}
async handle_client(client) {
const peer = await Peer.Accept(this, await Connection.Wrap(this.secret_join, client));
this.peer_initialise(peer);
await once(peer, "connect");
}
/**
* Initialises a CONNECTED peer and registers it as a valid peer.
* This also includesd attaching the necessary event handlers.
* @param {Peer} peer The peer in question.
* @return {void}
*/
peer_initialise(peer) {
this.connected_peers.push(peer);
peer.on("message", this.handle_message.bind(this, peer));
peer.on("destroy", this.handle_destroy.bind(this, peer));
l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} connected (currently connected: ${this.peers_dedupe().map(peer_next => peer_next.id_short).join(", ")})`);
this.emit("peer", peer);
}
async handle_message(peer, event_name, msg) {
this.emit("message", peer, event_name, msg);
this.emit(`message-${event_name}`, peer, msg);
}
async handle_destroy(peer) {
const index = this.connected_peers.indexOf(peer);
if(index > -1)
this.connected_peers.splice(index, 1);
this.emit("disconnect", {
id: peer.id,
id_short: peer.id_short,
// Remember, this is the address:port of the connecting port, NOT the server we would connect to to re-establish a connection!
remote: peer.remote_endpoint
});
}
/**
* Returns a list of all currently known peer addresses.
* @return {{address:string,port:number}[]}
*/
peers() {
return this.connected_peers
.filter(el => typeof el.address === "string" && typeof el.port === "number");
}
/**
* Like .peers(), but deduplicates peers by their ID.
* @return {Object[]} [description]
*/
peers_dedupe() {
let deduped = new Map();
for(let peer of this.peers()) {
deduped.set(peer.id, peer);
}
return Array.from(deduped.values());
}
/**
* Resolves a Peer id to the respective peer instance.
* @param {string|Peer} peer_id The peer ID to resolve as a string. If a Peer instance is passed instead, this is simply returned unchanged.
* @return {Peer} The Peer instance associated with the given peer id.
*/
peer_resolve(peer_id) {
if(peer_id instanceof Peer) return peer_id;
if(typeof peer_id !== "string")
throw new Error(`Expected variable of type string or Peer, got variable of type ${typeof peer_id}`);
for (let peer of this.connected_peers) {
if(peer.id === peer_id) return peer;
}
}
/**
* Resolves a list of peer ids (and potentially Peer instances) to a list
* of Peer instances.
* Any Peer instances passed are returned unchanged.
* @param {...string|Peer} peers The peer(s) to resolve.
* @return {Peer[]} A list of Peer instances.
*/
peers_resolve(...peers) {
return peers.map(peer => this.peer_resolve(peer));
}
/**
* Processes a list of peers.
* New connections are established to any peers in the list to which
* we don't already have a connection.
* Note that this function does NOT connect to any other peers known to the
* peers in the list you've specified! You need to do this manually.
* @param {...{address: string, port: number}} new_peers The list of new peers to process.
* @returns {Promise<Peer[]>} A list of new peers to which we have successfully established a connection.
*/
async add_peers(...new_peers) {
return (await Promise.all(new_peers.map(
async new_peer => {
let result = await p_reflect(p_retry(async () => await this.__add_peer(
new_peer.address,
new_peer.port
), {
retries: this.retries,
onFailedAttempt: (error) => {
switch(error.code) {
case "ECONNREFUSED":
l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`);
break;
default:
let msg = `[attempt ${error.attemptNumber} / ${error.retriesLeft+error.attemptNumber}] Error while connecting to ${new_peer.address}:${new_peer.port}: ${error}`;
if(settings.cli.verbose) msg += `\n${error.stack}`;
l.error(msg);
break;
}
}
}));
if(result.isRejected) {
l.error(`Failed to connect to ${new_peer.address}:${new_peer.port}: ${result.reason}`);
return null;
}
return result.value;
})
)).filter(peer => peer instanceof Peer);
}
/**
* Connects to a new peer and adds them to the pool of currently connected peers.
* Note: This does NOT automatically connect to all the peers known to the
* peer you're connecting to!
* You need to do this manually.
* Note that you should NOT use this function directly. Instead, use
* add_peers(), which supports multiple peers and also automatically
* retries on failure too.
* @throws {ErrorWrapper} Throws if the connection failed. This could be for a large number of different reasons, from an incorrect join secret from the remote to connection issues.
* @param {string} address The address to connect to.
* @param {number} port The port number to connect to.
* @return {Promise<Peer|null>} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped.
*/
async __add_peer(address, port) {
l.debug(`Attempting to connect to ${address}:${port}`);
// If we're already connected, don't bother reconnecting again
if(this.peers().some(el => el.address === address && el.port === port)) {
l.log(`A connection is already open to ${address}:${port}, skipping`);
return;
}
// If we are attempting to connect to ourselves, then don't bother
if((address == "127.0.0.1" || address == "::" || address == "::1") && port == this.port) {
l.log(`${address}:${port} is actually us, skipping`);
return;
}
const peer_string = `peer:${address}:${port}`;
this.connecting_peers.push(peer_string);
let peer = null;
try {
peer = await Peer.Initiate(this, address, port);
}
catch(error) {
throw new ErrorWrapper(`Error: Failed to connect to peer.`, error);
}
finally {
this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string), 1);
}
// If the peer didn't connect properly, ignore it
if(peer === null) return null;
// If the ID of the peer is equal to that of one of the existing peers, close the
// connection
for(const existing_peer of this.peers()) {
if(existing_peer.id === peer.id) {
// If this peer was reaped, don't continue
if(this.dedupe(peer, existing_peer) === peer)
return null;
}
}
this.peer_initialise(peer);
return peer;
}
/**
* Deduplicates 2 idential peers.
* This function ensures that deduplication does *not* result in 2 closed
* connections, and that 1 connection to the peer in question remains open.
* @param {Peer} peer_a The first duplicate peer.
* @param {Peer} peer_b The second duplicate peer.
* @return {Promise<bool>} A Promise that resolves when deduplication is complete. Returns the reaped peer.
*/
async dedupe(peer_a, peer_b) {
if(peer_a.id !== peer_b.id) return null;
if(peer_a.conn_id > peer_b.conn_id) {
await this.remove_peers(peer_b);
return peer_b;
}
else {
await this.remove_peers(peer_a);
return peer_a;
}
}
async remove_peers(...peers) {
// No need to remove the peer from the list of connected peers, as
// there's already a catch-all event handler attached to do that
await Promise.all(peers.map(async peer => await peer.destroy()));
}
/**
* Sends a message to 1 or more peers.
* @param {string|Peer|string[]|Peer[]} peer_id Either the peer id or the peer itself to which we should send the message. May also be an array of arbitrarily mixed items - in which case the message will be sent to all the specified peers in parallel. The order which peers are messaged is undefined.
* @param {string} event_name The name of the event to send.
* @param {Object} msg The message itself to send.
* @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent.
*/
async send(peer_id, event_name, msg) {
if(!(peer_id instanceof Array)) peer_id = [ peer_id ];
await Promise.all(this.peers_resolve(...peer_id).map(
peer => peer.send(event_name, msg)
));
}
/**
* Sends a message in parallel to all peers to which we have an established
* connection.
* The order which peers are messaged is undefined.
* @param {string} event_name The name of the event to send.
* @param {Object} msg The message itself to send.
* @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent.
*/
async broadcast(event_name, msg) {
let peers = this.peers_dedupe();
await this.send(peers, event_name, msg);
}
/**
* Shuts the server that listens for new peer connections down.
* This does not disconnect any existing peers!
* @return {Promise} A Promise that resolves once the server has been shutdown.
*/
shutdown_server() {
return new Promise((resolve, _reject) => {
this.server.close(resolve);
});
}
/**
* Stops the PeerServer and gracefully (if possible) disconnects all existing peers.
* @return {Promise} A Promise that resolves once the shutdown process has completed.
*/
async destroy() {
await this.shutdown_server();
await Promise.all(...this.connected_peers.map(peer => peer.destroy()));
}
}
export default PeerServer;