systemquery/src/lib/agent/PeerServer.mjs

151 lines
4.5 KiB
JavaScript
Raw Normal View History

"use strict";
import { EventEmitter, once } from 'events';
import net from 'net';
import Connection from '../transport/Connection.mjs';
import ErrorWrapper from '../core/ErrorWrapper.mjs';
import Peer from './Peer.mjs';
class PeerServer extends EventEmitter {
constructor(our_id, secret_join) {
super();
this.our_id = our_id;
this.secret_join = secret_join;
this.connected_peers = [];
this.connecting_peers = [];
}
/**
* Starts the PeerServer listening on the given port and bind address.
* @param {Number} [port=5252] The port number to listen on.
* @param {String} [host="::"] The address to bind to.
* @return {Promise<void>} A Promise that resolves when the server setup is complete.
*/
listen(port = 5252, host="::") {
return new Promise((resolve, reject) => {
this.server = net.createServer(async (client) => {
await this.handle_client(client);
});
this.server.once("error", reject);
this.server.on("error", this.handle_error);
this.server.listen({
host,
port,
exclusive: false
}, () => {
this.server.off("error", reject);
resolve();
});
});
}
handle_error(error) {
throw error;
}
async handle_client(client) {
const peer = await Peer.Accept(this, await Connection.Wrap(this.secret_join, client));
this.connected_peers.push(peer);
peer.on("message", this.handle_message.bind(this, peer));
peer.on("destroy", this.handle_destroy.bind(this, peer));
await once(peer, "connect");
}
async handle_message(peer, message) {
this.emit("message", peer, message);
this.emit(`message-${message.event}`, peer, message.message);
}
async handle_destroy(peer) {
const index = this.connected_peers.indexOf(peer);
if(index > -1)
this.connected_peers.splice(index, 1);
this.emit("disconnect", peer.remote_endpoint);
}
/**
* Returns a list of all currently known peer addresses.
* @return {{address:string,port:number}[]}
*/
peers() {
return this.connected_peers.map((peer) => peer.remote_endpoint)
.filter(el => typeof el.addr === "string" && typeof el.port === "number");
}
/**
* Processes a list of peers.
* New connections are established to any peers in the list to which
* we don't already have a connection.
* Note that this function does NOT connect to any other peers known to the
* peers in the list you've specified! You need to do this manually.
* @param {...{address: string, port: number}} new_peers The list of new peers to process.
* @returns {Promise<Peer[]>} A list of new peers to which we have successfully established a connection.
*/
async add_peers(...new_peers) {
return (await Promise.all(new_peers.map(async new_peer => this.add_peer(
new_peer.address,
new_peer.port
)))).filter(peer => peer instanceof Peer);
}
/**
* Connects to a new peer and adds them to the pool of currently connected peers.
* Note: This does NOT automatically connect to all the peers known to the
* peer you're connecting to!
* You need to do this manually.
* @throws {ErrorWrapper} Throws if the connection failed. This could be for a large number of different reasons, from an incorrect join secret from the remote to connection issues.
* @param {string} address The address to connect to.
* @param {number} port The port number to connect to.
* @return {Promise<Peer|null>} A Promise that resolves to the resulting Peer connection, or null if the connection wasn't attemped.
*/
async add_peer(address, port) {
if(this.peers().some(el => el.address === address && el.port === port))
return;
const peer_string = `peer:${address}:${port}`;
this.connecting_peers.push(peer_string);
let conn;
try {
conn = await Peer.Initiate(this, address, port);
this.connected_peers.push(conn);
}
catch(error) {
throw new ErrorWrapper(`Error: Failed to connect to peer.`, error);
}
finally {
this.connecting_peers.splice(this.connecting_peers.indexOf(peer_string));
}
this.emit(`peer`, conn);
return conn;
}
/**
* Shuts the server down.
* This does not disconnect any existing peers!
* @return {Promise} A Promise that resolves once the server has been shutdown.
*/
shutdown_server() {
return new Promise((resolve, _reject) => {
this.server.close(resolve);
});
}
/**
* Stops the PeerServer and gracefully (if possible) disconnects all existing peers.
* @return {Promise} A Promise that resolves once the shutdown process has completed.
*/
async destroy() {
await this.shutdown_server();
await Promise.all(...this.connected_peers.map(peer => peer.destroy()));
}
}
export default PeerServer;