From 6f181971e367468728c52cac1967ba09c856e4f1 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Sat, 9 Oct 2021 18:00:54 +0100 Subject: [PATCH] Implement simple peer system, but it's untested. Next step is to exchange peers. --- src/lib/agent/Peer.mjs | 84 ++++++++++++++++++++++++++++ src/lib/agent/PeerServer.mjs | 95 +++++++++++++++++++++++++++++++- src/lib/raft/RaftSubsystem.mjs | 9 +++ src/lib/transport/Connection.mjs | 2 + 4 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 src/lib/agent/Peer.mjs create mode 100644 src/lib/raft/RaftSubsystem.mjs diff --git a/src/lib/agent/Peer.mjs b/src/lib/agent/Peer.mjs new file mode 100644 index 0000000..a30ec6a --- /dev/null +++ b/src/lib/agent/Peer.mjs @@ -0,0 +1,84 @@ +"use strict"; + +import { EventEmitter } from 'events'; +import Connection from '../transport/Connection.mjs'; + +class Peer extends EventEmitter { + get address() { return this.connection.address; } + get port() { return this.connection.port; } + get remote_endpoint() { + return { address: this.address, port: this.port }; + } + + constructor(server, connection) { + super(); + + this.id = null; + + /** + * The parent server this Peer is part of. + * @type {PeerServer} + */ + this.server = server; + /** + * The underlying Connection. + * @type {Connection} + */ + this.connection = connection; + + // Fetch the remote's id + } + + /** + * Accepts an existing connection as a new Peer. + * @param {Connection} connection The Connection to accept. + * @return {Promise} A Promise that resolves once the initial handshake is complete. + */ + __accept(connection) { + return new Promise((resolve, reject) => { + this.connection = connection; + this.connection.on("message-request-id", (msg) => { + this.connection.id = msg.my_id; + this.connection.send("id", { my_id: this.server.our_id }) + .then(resolve, reject); + }); + }); + } + + /** + * Initiates the handshake after opening a new connection. + * @return {Promise} A Promise that resolves after the initial peer handshake is complete. + */ + __initiate() { + return new Promise((resolve, reject) => { + this.connection.send("request-id", { my_id: this.server.our_id }) + .then(() => { + this.connection.once("message-id", (msg) => { + this.id = msg.my_id; + + this.emit("connect"); + resolve(); + }); + }, reject); + }); + } + + async destroy() { + await this.connection.destroy(); + this.emit("destroy"); + this.removeAllListeners(); + } +} + +Peer.Initiate = function(server, address, port) { + const conn = await Connection.Create(server.secret_join, address, port); + const peer = new Peer(server, conn); + await peer.__initiate(); +} + +Peer.Accept = function(server, connection) { + const peer = new Peer(server); + await peer.__accept(connection); +} + +export default Peer; diff --git a/src/lib/agent/PeerServer.mjs b/src/lib/agent/PeerServer.mjs index 0ff1de3..5a4dac7 100644 --- a/src/lib/agent/PeerServer.mjs +++ b/src/lib/agent/PeerServer.mjs @@ -1,8 +1,99 @@ "use strict"; -class PeerServer { - constructor() { +import { EventEmitter, once } from 'events'; +import net from 'net'; + +import Connection from '../transport/Connection.mjs'; + +import Peer from './Peer.mjs'; + +class PeerServer extends EventEmitter { + constructor(our_id) { + super(); + this.our_id = our_id; + + this.peers = []; + } + + /** + * Starts the PeerServer listening on the given port and bind address. + * @param {Number} [port=5252] The port number to listen on. + * @param {String} [host="::"] The address to bind to. + * @return {Promise} A Promise that resolves when the server setup is complete. + */ + listen(port = 5252, host="::") { + return new Promise((resolve, reject) => { + this.server = net.createServer(async (client) => { + await this.handle_client(client); + }); + + this.server.once("error", reject); + this.server.on("error", this.handle_error); + this.server.listen({ + host, + port, + exclusive: false + }, () => { + this.server.off("error", reject); + resolve(); + }); + }); + } + + handle_error(error) { + throw error; + } + + async handle_client(client) { + const peer = new Peer(this, await Connection.Wrap(client)); + this.peers.push(peer); + peer.on("message", this.handle_message.bind(this, peer)); + peer.on("destroy", this.handle_destroy.bind(this, peer)); + await once(peer, "connect"); + } + + async handle_message(peer, message) { + this.emit("message", peer, message); + this.emit(`message-${message.event}`, peer, message.message); + } + + async handle_destroy(peer) { + const index = this.peers.indexOf(peer); + if(index > -1) + this.peers.splice(index, 1); + + this.emit("disconnect", peer.remote_endpoint); + } + + /** + * Returns a list of all currently known peer addresses. + * @return {{address:string,port:number}[]} + */ + peer_addresses() { + return this.peers.map((peer) => peer.remote_endpoint) + .filter(el => typeof el.addr === "string" && typeof el.port === "number"); + } + + + /** + * Shuts the server down. + * This does not disconnect any existing peers! + * @return {Promise} A Promise that resolves once the server has been shutdown. + */ + shutdown_server() { + return new Promise((resolve, _reject) => { + this.server.close(resolve); + }); + } + + /** + * Stops the PeerServer and gracefully (if possible) disconnects all existing peers. + * @return {Promise} A Promise that resolves once the shutdown process has completed. + */ + async destroy() { + await this.shutdown_server(); + await Promise.all(...this.peers.map(peer => peer.destroy())); } } diff --git a/src/lib/raft/RaftSubsystem.mjs b/src/lib/raft/RaftSubsystem.mjs new file mode 100644 index 0000000..8f9c932 --- /dev/null +++ b/src/lib/raft/RaftSubsystem.mjs @@ -0,0 +1,9 @@ +"use strict"; + +class RaftAgent { + constructor() { + + } +} + +export default RaftAgent; diff --git a/src/lib/transport/Connection.mjs b/src/lib/transport/Connection.mjs index c9a45b7..9554639 100644 --- a/src/lib/transport/Connection.mjs +++ b/src/lib/transport/Connection.mjs @@ -124,6 +124,8 @@ class Connection extends EventEmitter { * @type {void} */ this.emit("destroy"); + + this.removeAllListeners(); } async handle_frame(bytes) {