Manage duplicate connections more effectively.
Thebroadcast function now transparently avoids sending the same message to the same peer over multiple connections at once.
This commit is contained in:
parent
e4a381b915
commit
da33a146e2
4 changed files with 84 additions and 17 deletions
|
@ -12,6 +12,26 @@ import ItemQueue from './async/ItemQueue.mjs';
|
||||||
|
|
||||||
|
|
||||||
class SystemQuery extends EventEmitter {
|
class SystemQuery extends EventEmitter {
|
||||||
|
/**
|
||||||
|
* Returns an object representing our local information that looks and
|
||||||
|
* even quacks a bit like a real Peer object, but isn't.
|
||||||
|
* @return {Object}
|
||||||
|
*/
|
||||||
|
get peer_local() {
|
||||||
|
return {
|
||||||
|
id: this.agent.peer_id,
|
||||||
|
id_short: (this.agent.peer_id || "").substring(0, 7),
|
||||||
|
name: this.agent.peer_name,
|
||||||
|
address: this.agent.local_bind_address,
|
||||||
|
port: this.agent.local_port,
|
||||||
|
self: true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// TODO: Handle duplicate connections better by both skipping counting them here, and also implementing a more robust reaper for killing duplicate connections that always kills the newest/oldest connection to avoid issues.
|
||||||
|
|
||||||
|
|
||||||
constructor(config, mode = "agent") {
|
constructor(config, mode = "agent") {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
|
@ -24,6 +44,10 @@ class SystemQuery extends EventEmitter {
|
||||||
this.http = new HttpSubsystem(this);
|
this.http = new HttpSubsystem(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialises the SystemQuery agent.
|
||||||
|
* @return {Promise} A Promise that resolves when initialisation is complete.
|
||||||
|
*/
|
||||||
async init() {
|
async init() {
|
||||||
///
|
///
|
||||||
// 1: Create agent
|
// 1: Create agent
|
||||||
|
@ -66,6 +90,17 @@ class SystemQuery extends EventEmitter {
|
||||||
return await once(this.agent, `message-${event_name}`, { signal: ac });
|
return await once(this.agent, `message-${event_name}`, { signal: ac });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the table with the given name from all peers in the swarm
|
||||||
|
* (including this local instance).
|
||||||
|
* While this function iteratively yields tables as they are received, it
|
||||||
|
* will wait until all peers respond before finally ending.
|
||||||
|
* If 1 of more peers do not respond, then if no answers are received
|
||||||
|
* within the time limit specified in the config object (supplied at
|
||||||
|
* startup) this function will return.
|
||||||
|
* @param {string} name The name of the table to fetch.
|
||||||
|
* @return {AsyncGenerator<Object>} An asynchronous generator that yields the tables as they are received.
|
||||||
|
*/
|
||||||
async *fetch_table(name) {
|
async *fetch_table(name) {
|
||||||
// If it isn't valid for us, it ain't gonna be valid for anyone else....
|
// If it isn't valid for us, it ain't gonna be valid for anyone else....
|
||||||
if(!this.info.is_valid_table(name)) return null;
|
if(!this.info.is_valid_table(name)) return null;
|
||||||
|
@ -73,8 +108,7 @@ class SystemQuery extends EventEmitter {
|
||||||
const queue = new ItemQueue();
|
const queue = new ItemQueue();
|
||||||
|
|
||||||
const handle_response = (peer, msg) => {
|
const handle_response = (peer, msg) => {
|
||||||
// TODO: Validate response. Is it the right table? Is it even a table? Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident.
|
// Note that multiple fetch_table calls may be running in parallel, so we should not make too much of a fuss if we get the wrong table by accident.
|
||||||
// TODO: It would be seriously cool to have fetch_table() be an async generator that yields pairs of peer ids and tables as they come in.
|
|
||||||
if(typeof msg !== "object"
|
if(typeof msg !== "object"
|
||||||
|| typeof msg.table !== "object"
|
|| typeof msg.table !== "object"
|
||||||
|| typeof msg.name !== "string") {
|
|| typeof msg.name !== "string") {
|
||||||
|
@ -96,8 +130,12 @@ class SystemQuery extends EventEmitter {
|
||||||
this.agent.broadcast(`query`, { name });
|
this.agent.broadcast(`query`, { name });
|
||||||
|
|
||||||
|
|
||||||
// TODO: Yield a table value for us too
|
// Yield our result first
|
||||||
// TODO: Handle duplicate connections better by both skipping counting them here, and also implementing a more robust reaper for killing duplicate connections that always kills the newest/oldest connection to avoid issues.
|
let table_ours = await this.info.fetch_table(name);
|
||||||
|
yield {
|
||||||
|
peer: this.peer_local,
|
||||||
|
table: table_ours
|
||||||
|
};
|
||||||
|
|
||||||
let peers_seen = [];
|
let peers_seen = [];
|
||||||
while(peers_seen.length < this.agent.connected_peers.length) {
|
while(peers_seen.length < this.agent.connected_peers.length) {
|
||||||
|
|
|
@ -14,6 +14,9 @@ import parse_peer_name from '../parse/peer_name.mjs';
|
||||||
import hostuuid from '../io/hostuuid.mjs';
|
import hostuuid from '../io/hostuuid.mjs';
|
||||||
|
|
||||||
class Agent extends EventEmitter {
|
class Agent extends EventEmitter {
|
||||||
|
get local_port() { return this.config.net.port; }
|
||||||
|
get local_bind_address() { return this.config.net.bind_address; }
|
||||||
|
|
||||||
get connected_peers() { return this.server.connected_peers; }
|
get connected_peers() { return this.server.connected_peers; }
|
||||||
get connecting_peers() { return this.server.connecting_peers; }
|
get connecting_peers() { return this.server.connecting_peers; }
|
||||||
|
|
||||||
|
@ -96,12 +99,12 @@ class Agent extends EventEmitter {
|
||||||
/**
|
/**
|
||||||
* Sends a list of known peers to the given peer.
|
* Sends a list of known peers to the given peer.
|
||||||
* @param {Peer} peer The peer to send the list to.
|
* @param {Peer} peer The peer to send the list to.
|
||||||
* @return {Promise} A Promise that resolves whent he message has been sent.
|
* @return {Promise} A Promise that resolves when the message has been sent.
|
||||||
*/
|
*/
|
||||||
async __send_peer_list(peer, _msg) {
|
async __send_peer_list(peer, _msg) {
|
||||||
l.log(`Sending peer list to ${peer.id_short}`);
|
l.log(`Sending peer list to ${peer.id_short}`);
|
||||||
await peer.send("peerlist-response", {
|
await peer.send("peerlist-response", {
|
||||||
peers: this.server.peers()
|
peers: this.server.peers().map(peer_next => peer_next.info)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,16 @@ class Peer extends EventEmitter {
|
||||||
return this.id.substring(0, 7);
|
return this.id.substring(0, 7);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get info() {
|
||||||
|
return {
|
||||||
|
address: this.address,
|
||||||
|
port: this.port,
|
||||||
|
id: this.id,
|
||||||
|
listening_address: this.listening_address,
|
||||||
|
listening_port: this.listening_port
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
constructor(server, connection) {
|
constructor(server, connection) {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
|
|
|
@ -82,7 +82,7 @@ class PeerServer extends EventEmitter {
|
||||||
peer.on("message", this.handle_message.bind(this, peer));
|
peer.on("message", this.handle_message.bind(this, peer));
|
||||||
peer.on("destroy", this.handle_destroy.bind(this, peer));
|
peer.on("destroy", this.handle_destroy.bind(this, peer));
|
||||||
|
|
||||||
l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} connected`);
|
l.log(`Peer ${peer.id_short} from ${peer.address}:${peer.port} connected (currently connected: ${this.peers_dedupe().map(peer_next => peer_next.id_short).join(", ")})`);
|
||||||
this.emit("peer", peer);
|
this.emit("peer", peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,13 +109,22 @@ class PeerServer extends EventEmitter {
|
||||||
* @return {{address:string,port:number}[]}
|
* @return {{address:string,port:number}[]}
|
||||||
*/
|
*/
|
||||||
peers() {
|
peers() {
|
||||||
return this.connected_peers.map(peer => { return {
|
return this.connected_peers
|
||||||
address: peer.address,
|
.filter(el => typeof el.address === "string" && typeof el.port === "number");
|
||||||
port: peer.port,
|
}
|
||||||
id: peer.id,
|
|
||||||
listening_address: peer.listening_address,
|
/**
|
||||||
listening_port: peer.listening_port
|
* Like .peers(), but deduplicates peers by their ID.
|
||||||
}}).filter(el => typeof el.address === "string" && typeof el.port === "number");
|
* @return {Object[]} [description]
|
||||||
|
*/
|
||||||
|
peers_dedupe() {
|
||||||
|
let deduped = new Map();
|
||||||
|
|
||||||
|
for(let peer of this.peers()) {
|
||||||
|
deduped.set(peer.id, peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Array.from(deduped.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -125,6 +134,9 @@ class PeerServer extends EventEmitter {
|
||||||
*/
|
*/
|
||||||
peer_resolve(peer_id) {
|
peer_resolve(peer_id) {
|
||||||
if(peer_id instanceof Peer) return peer_id;
|
if(peer_id instanceof Peer) return peer_id;
|
||||||
|
if(typeof peer_id !== "string")
|
||||||
|
throw new Error(`Expected variable of type string or Peer, got variable of type ${typeof peer_id}`);
|
||||||
|
|
||||||
for (let peer of this.connected_peers) {
|
for (let peer of this.connected_peers) {
|
||||||
if(peer.id === peer_id) return peer;
|
if(peer.id === peer_id) return peer;
|
||||||
}
|
}
|
||||||
|
@ -138,7 +150,7 @@ class PeerServer extends EventEmitter {
|
||||||
* @return {Peer[]} A list of Peer instances.
|
* @return {Peer[]} A list of Peer instances.
|
||||||
*/
|
*/
|
||||||
peers_resolve(...peers) {
|
peers_resolve(...peers) {
|
||||||
return peers.map(this.peer_resolve);
|
return peers.map(peer => this.peer_resolve(peer));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -228,8 +240,11 @@ class PeerServer extends EventEmitter {
|
||||||
// connection
|
// connection
|
||||||
if(this.peers().some(existing_peer => existing_peer.id === peer.id)) {
|
if(this.peers().some(existing_peer => existing_peer.id === peer.id)) {
|
||||||
l.info(`Closing duplicate connection to ${peer.id_short}`);
|
l.info(`Closing duplicate connection to ${peer.id_short}`);
|
||||||
this.remove_peers(peer);
|
await this.remove_peers(peer);
|
||||||
}
|
}
|
||||||
|
if(this.connecting_peers.filter(other_peer => other_peer.id !== null)
|
||||||
|
.some(other_peer => other_peer.id === peer.id))
|
||||||
|
await this.remove_peers(peer);
|
||||||
|
|
||||||
this.peer_initialise(peer);
|
this.peer_initialise(peer);
|
||||||
return peer;
|
return peer;
|
||||||
|
@ -265,7 +280,8 @@ class PeerServer extends EventEmitter {
|
||||||
* @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent.
|
* @return {Promise} A Promise that resolves (or potentially rejects) when the message has been sent.
|
||||||
*/
|
*/
|
||||||
async broadcast(event_name, msg) {
|
async broadcast(event_name, msg) {
|
||||||
await this.send(this.connected_peers, event_name, msg);
|
let peers = this.peers_dedupe();
|
||||||
|
await this.send(peers, event_name, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue