Starbeamrainbowlabs
16f23e6b77
In the future, this will make it much easier to refactor into a library. The plan is to allow Agent to have subsystems, but only those which are related to managing peer-to-peer message and not application-specific stuff. For example, wemight have a MeshRouting subsystem that listens for messages and conditionally forwards them based on the content of the message.
160 lines
5.1 KiB
JavaScript
160 lines
5.1 KiB
JavaScript
"use strict";
|
|
|
|
import fs from 'fs';
|
|
import os from 'os';
|
|
import path from 'path';
|
|
import { EventEmitter } from 'events';
|
|
|
|
import log from '../io/NamespacedLog.mjs'; const l = log("agent");
|
|
|
|
import settings from '../../settings.mjs';
|
|
|
|
import PeerServer from './PeerServer.mjs';
|
|
import parse_peer_name from '../parse/peer_name.mjs';
|
|
import hostuuid from '../io/hostuuid.mjs';
|
|
|
|
class Agent extends EventEmitter {
|
|
constructor(config) {
|
|
super();
|
|
|
|
this.config = config;
|
|
}
|
|
|
|
async find_secret() {
|
|
if(this.config.secret_join_filepath !== "CHANGE_ME") {
|
|
let filepath = path.resolve(
|
|
path.dirname(settings.cli.config),
|
|
this.config.secret_join_filepath
|
|
);
|
|
if(fs.existsSync(filepath))
|
|
return await fs.promises.readFile(filepath, "utf-8");
|
|
}
|
|
|
|
if(this.config.secret_join !== "CHANGE_ME")
|
|
return this.config.secret_join;
|
|
|
|
throw new Error(`Error: Failed to find join secret.`);
|
|
}
|
|
|
|
async init() {
|
|
///
|
|
// 1: Create our local environment
|
|
///
|
|
/** Our peer id - calculated automatically from the system's uuid */
|
|
this.peer_id = await hostuuid(this.config.net.port);
|
|
this.peer_name = os.hostname();
|
|
|
|
|
|
///
|
|
// 2: Create the peer server
|
|
///
|
|
this.server = new PeerServer(
|
|
this.peer_id,
|
|
await this.find_secret()
|
|
);
|
|
this.server.retries = this.config.net.peer_retries;
|
|
|
|
|
|
///
|
|
// 3: Attach message handling listeners
|
|
///
|
|
this.server.on("message-peerlist-request", this.__send_peer_list.bind(this));
|
|
this.server.on("message-peerlist-response", this.__handle_peer_list.bind(this));
|
|
|
|
this.server.on("message", this.handle_message.bind(this));
|
|
this.server.on("destroy", this.handle_destroy.bind(this));
|
|
|
|
///
|
|
// 4: Start listening
|
|
///
|
|
l.log(`Starting peer listener....`);
|
|
await this.server.listen(
|
|
this.config.net.port,
|
|
this.config.net.bind_address
|
|
);
|
|
l.log(`Listening on ${this.config.net.bind_address}:${this.config.net.port}`);
|
|
|
|
|
|
///
|
|
// 5: Add initial peers to kick things off
|
|
///
|
|
await this.server.add_peers(...this.config.peers.map(
|
|
peer => parse_peer_name(peer)
|
|
));
|
|
|
|
l.log(`Added ${this.server.peers().length} / ${this.config.peers.length} initial peers`);
|
|
if(this.config.peers.length < 1)
|
|
l.warn(`No initial peers were specified! It's recommended that you specify at least 1 on every host.`);
|
|
|
|
// Ask for more peers
|
|
await this.server.broadcast("peerlist-request", {});
|
|
}
|
|
|
|
/**
|
|
* Sends a list of known peers to the given peer.
|
|
* @param {Peer} peer The peer to send the list to.
|
|
* @return {Promise} A Promise that resolves whent he message has been sent.
|
|
*/
|
|
async __send_peer_list(peer, _msg) {
|
|
l.log(`Sending peer list to ${peer.id_short}`);
|
|
await peer.send("peerlist-response", {
|
|
peers: this.server.peers()
|
|
});
|
|
}
|
|
|
|
async __handle_peer_list(peer, msg) {
|
|
l.log(`Received peer list from ${peer.id_short}`);
|
|
if(!(msg.peers instanceof Array)) {
|
|
l.warn(`Encountered invalid peer list message from peer ${peer.id_short}.`);
|
|
return false;
|
|
}
|
|
let peerids_connected = this.server.peers().map(el => el.id);
|
|
let new_peers = await this.server.add_peers(...msg.peers
|
|
.filter(el => !peerids_connected.includes(el.id) && el.id !== this.peer_id)
|
|
.map(el => { return { address: el.listening_address, port: el.listening_port }; }));
|
|
if(new_peers.length > 0)
|
|
l.log(`Connected to ${new_peers.length} / ${msg.peers.length} peers from peerlist sent by ${peer.id_short}`);
|
|
else
|
|
l.log(`No new peers in peerlist sent by ${peer.id_short}`);
|
|
}
|
|
|
|
handle_message(peer, event_name, msg) {
|
|
this.emit("message", peer, event_name, msg);
|
|
this.emit(`message-${event_name}`, peer, msg);
|
|
}
|
|
|
|
handle_destroy(peer_info) {
|
|
const index = this.connected_peers.indexOf(peer);
|
|
if(index > -1)
|
|
this.connected_peers.splice(index, 1);
|
|
|
|
// Remember, this is the address:port of the connecting port, NOT the server we would connect to to re-establish a connection!
|
|
l.log(`Peer ${peer_info.id_short} from ${peer_info.remote.address}:${peer_info.remote.port} disconnected`);
|
|
this.emit("disconnect", peer_info);
|
|
}
|
|
|
|
/**
|
|
* Sends a message to 1 or more peers.
|
|
* @param {string|Peer|string[]|Peer[]} peer_id Either the peer id or the peer itself to which we should send the message. May also be an array of arbitrarily mixed items - in which case the message will be sent to all the specified peers in parallel. The order which peers are messaged is undefined.
|
|
* @param {string} event_name The name of the event to send.
|
|
* @param {Object} msg The message itself to send.
|
|
* @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent.
|
|
*/
|
|
async send(peer_id, event_name, msg) {
|
|
return await this.server.send(peer_id, event_name, msg);
|
|
}
|
|
|
|
/**
|
|
* Sends a message in parallel to all peers to which we have an established
|
|
* connection.
|
|
* The order which peers are messaged is undefined.
|
|
* @param {string} event_name The name of the event to send.
|
|
* @param {Object} msg The message itself to send.
|
|
* @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent.
|
|
*/
|
|
async broadcast(event_name, msg) {
|
|
return await this.server.broadcast(event_name, msg);
|
|
}
|
|
}
|
|
|
|
export default Agent;
|