Exchange peerlists

This commit is contained in:
Starbeamrainbowlabs 2022-01-09 20:28:58 +00:00
parent 61b830121e
commit 1091f9c3c2
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
4 changed files with 65 additions and 11 deletions

View file

@ -34,16 +34,34 @@ class Agent {
} }
async init() { async init() {
///
// 1: Create our local environment
///
/** Our peer id - calculated automatically from the system's uuid */ /** Our peer id - calculated automatically from the system's uuid */
this.peer_id = await hostuuid(this.config.net.port); this.peer_id = await hostuuid(this.config.net.port);
this.peer_name = os.hostname(); this.peer_name = os.hostname();
///
// 2: Create the peer server
///
this.server = new PeerServer( this.server = new PeerServer(
this.peer_id, this.peer_id,
await this.find_secret() await this.find_secret()
); );
this.server.retries = this.config.net.peer_retries; this.server.retries = this.config.net.peer_retries;
///
// 3: Attach message handling listeners
///
this.server.on("message-peerlist-request", this.__send_peer_list.bind(this));
this.server.on("message-peerlist-response", this.__handle_peer_list.bind(this));
///
// 4: Start listening
///
l.log(`Starting peer listener....`); l.log(`Starting peer listener....`);
await this.server.listen( await this.server.listen(
this.config.net.port, this.config.net.port,
@ -51,6 +69,10 @@ class Agent {
); );
l.log(`Listening on ${this.config.net.bind_address}:${this.config.net.port}`); l.log(`Listening on ${this.config.net.bind_address}:${this.config.net.port}`);
///
// 5: Add initial peers to kick things off
///
await this.server.add_peers(...this.config.peers.map( await this.server.add_peers(...this.config.peers.map(
peer => parse_peer_name(peer) peer => parse_peer_name(peer)
)); ));
@ -58,6 +80,37 @@ class Agent {
l.log(`Added ${this.server.peers().length} / ${this.config.peers.length} initial peers`); l.log(`Added ${this.server.peers().length} / ${this.config.peers.length} initial peers`);
if(this.config.peers.length < 1) if(this.config.peers.length < 1)
l.warn(`No initial peers were specified! It's recommended that you specify at least 1 on every host.`); l.warn(`No initial peers were specified! It's recommended that you specify at least 1 on every host.`);
// Ask for more peers
await this.server.broadcast("peerlist-request", {});
}
/**
* Sends a list of known peers to the given peer.
* @param {Peer} peer The peer to send the list to.
* @return {Promise} A Promise that resolves whent he message has been sent.
*/
async __send_peer_list(peer, _msg) {
l.log(`Sending peer list to ${peer.id_short}`);
await peer.send("peerlist-response", {
peers: this.server.peers()
});
}
async __handle_peer_list(peer, msg) {
l.log(`Received peer list from ${peer.id_short}`);
if(!(msg.peers instanceof Array)) {
l.warn(`Encountered invalid peer list message from peer ${peer.id_short}.`);
return false;
}
let peerids_connected = this.server.peers().map(el => el.id);
let new_peers = await this.server.add_peers(...msg.peers
.filter(el => !peerids_connected.includes(el.id) && el.id !== this.peer_id)
.map(el => { return { address: el.listening_address, port: el.listening_port }; }));
if(new_peers.length > 0)
l.log(`Connected to ${new_peers.length} / ${msg.peers.length} peers from peerlist sent by ${peer.id_short}`);
else
l.log(`No new peers in peerlist sent by ${peer.id_short}`);
} }
} }

View file

@ -33,13 +33,6 @@ class Peer extends EventEmitter {
*/ */
this.connection = connection; this.connection = connection;
/**
* A list of other peers known to this peer.
* May or may not actually be up to date.
* @type {{address:string,port:number}[]}
*/
this.known_peers = [];
// If a message with the event name "end" is received, close our side // If a message with the event name "end" is received, close our side
// of the connection // of the connection
this.once(`message-end`, this.destroy); this.once(`message-end`, this.destroy);
@ -88,7 +81,8 @@ class Peer extends EventEmitter {
*/ */
__handle_hello(msg) { __handle_hello(msg) {
this.id = msg.id; this.id = msg.id;
this.known_peers = msg.peers; this.listening_address = msg.listening_address;
this.listening_port = msg.listening_port;
if(this.id === this.server.our_id) { if(this.id === this.server.our_id) {
l.warn(`Our id (${this.server.our_id}) is equal to that of the remote (${this.id}), killing connection`); l.warn(`Our id (${this.server.our_id}) is equal to that of the remote (${this.id}), killing connection`);
@ -111,7 +105,8 @@ class Peer extends EventEmitter {
async __send_hello() { async __send_hello() {
await this.send("hello", { await this.send("hello", {
id: this.server.our_id, id: this.server.our_id,
peers: this.server.peers() listening_address: this.server.host,
listening_port: this.server.port
}); });
} }

View file

@ -108,7 +108,9 @@ class PeerServer extends EventEmitter {
return this.connected_peers.map(peer => { return { return this.connected_peers.map(peer => { return {
address: peer.address, address: peer.address,
port: peer.port, port: peer.port,
id: peer.id id: peer.id,
listening_address: peer.listening_address,
listening_port: peer.listening_port
}}).filter(el => typeof el.address === "string" && typeof el.port === "number"); }}).filter(el => typeof el.address === "string" && typeof el.port === "number");
} }
@ -153,7 +155,7 @@ class PeerServer extends EventEmitter {
), { ), {
retries: this.retries, retries: this.retries,
onFailedAttempt: (error) => { onFailedAttempt: (error) => {
switch(error instanceof ErrorWrapper ? error.inner_exception.code : error.code) { switch(error.code) {
case "ECONNREFUSED": case "ECONNREFUSED":
l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`); l.error(`Failed to connect to peer at ${new_peer.address}:${new_peer.port}.`);
break; break;

View file

@ -1,6 +1,10 @@
"use strict"; "use strict";
class ErrorWrapper extends Error { class ErrorWrapper extends Error {
get code() {
return this.inner_exception.code;
}
constructor(message, inner_exception) { constructor(message, inner_exception) {
super(message); super(message);
this.inner_exception = inner_exception; this.inner_exception = inner_exception;