From 16f23e6b77ddae62a5d48e244861dcda3e5808cc Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Sat, 22 Jan 2022 14:22:06 +0000 Subject: [PATCH] Agent: refactor such that it can be used as a base class In the future, this will make it much easier to refactor into a library. The plan is to allow Agent to have subsystems, but only those which are related to managing peer-to-peer message and not application-specific stuff. For example, wemight have a MeshRouting subsystem that listens for messages and conditionally forwards them based on the content of the message. --- src/lib/agent/Agent.mjs | 45 +++++++++++++++++++++++++++++++++++- src/lib/agent/PeerServer.mjs | 8 +++++-- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/lib/agent/Agent.mjs b/src/lib/agent/Agent.mjs index 3373ce8..6d11589 100644 --- a/src/lib/agent/Agent.mjs +++ b/src/lib/agent/Agent.mjs @@ -3,6 +3,7 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; +import { EventEmitter } from 'events'; import log from '../io/NamespacedLog.mjs'; const l = log("agent"); @@ -12,8 +13,10 @@ import PeerServer from './PeerServer.mjs'; import parse_peer_name from '../parse/peer_name.mjs'; import hostuuid from '../io/hostuuid.mjs'; -class Agent { +class Agent extends EventEmitter { constructor(config) { + super(); + this.config = config; } @@ -58,6 +61,8 @@ class Agent { this.server.on("message-peerlist-request", this.__send_peer_list.bind(this)); this.server.on("message-peerlist-response", this.__handle_peer_list.bind(this)); + this.server.on("message", this.handle_message.bind(this)); + this.server.on("destroy", this.handle_destroy.bind(this)); /// // 4: Start listening @@ -112,6 +117,44 @@ class Agent { else l.log(`No new peers in peerlist sent by ${peer.id_short}`); } + + handle_message(peer, event_name, msg) { + this.emit("message", peer, event_name, msg); + this.emit(`message-${event_name}`, peer, msg); + } + + handle_destroy(peer_info) { + const index = this.connected_peers.indexOf(peer); + if(index > -1) + this.connected_peers.splice(index, 1); + + // Remember, this is the address:port of the connecting port, NOT the server we would connect to to re-establish a connection! + l.log(`Peer ${peer_info.id_short} from ${peer_info.remote.address}:${peer_info.remote.port} disconnected`); + this.emit("disconnect", peer_info); + } + + /** + * Sends a message to 1 or more peers. + * @param {string|Peer|string[]|Peer[]} peer_id Either the peer id or the peer itself to which we should send the message. May also be an array of arbitrarily mixed items - in which case the message will be sent to all the specified peers in parallel. The order which peers are messaged is undefined. + * @param {string} event_name The name of the event to send. + * @param {Object} msg The message itself to send. + * @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent. + */ + async send(peer_id, event_name, msg) { + return await this.server.send(peer_id, event_name, msg); + } + + /** + * Sends a message in parallel to all peers to which we have an established + * connection. + * The order which peers are messaged is undefined. + * @param {string} event_name The name of the event to send. + * @param {Object} msg The message itself to send. + * @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent. + */ + async broadcast(event_name, msg) { + return await this.server.broadcast(event_name, msg); + } } export default Agent; diff --git a/src/lib/agent/PeerServer.mjs b/src/lib/agent/PeerServer.mjs index 9d3168f..8b6c3c9 100644 --- a/src/lib/agent/PeerServer.mjs +++ b/src/lib/agent/PeerServer.mjs @@ -96,8 +96,12 @@ class PeerServer extends EventEmitter { if(index > -1) this.connected_peers.splice(index, 1); - l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} disconnected`); - this.emit("disconnect", peer.remote_endpoint); + this.emit("disconnect", { + id: peer.id, + id_short: peer.id_short, + // Remember, this is the address:port of the connecting port, NOT the server we would connect to to re-establish a connection! + remote: peer.remote_endpoint + }); } /**