Implement simple peer system, but it's untested.
Next step is to exchange peers.
This commit is contained in:
parent
0b82e34273
commit
6f181971e3
4 changed files with 188 additions and 2 deletions
84
src/lib/agent/Peer.mjs
Normal file
84
src/lib/agent/Peer.mjs
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
"use strict";
|
||||||
|
|
||||||
|
import { EventEmitter } from 'events';
|
||||||
|
import Connection from '../transport/Connection.mjs';
|
||||||
|
|
||||||
|
class Peer extends EventEmitter {
|
||||||
|
get address() { return this.connection.address; }
|
||||||
|
get port() { return this.connection.port; }
|
||||||
|
get remote_endpoint() {
|
||||||
|
return { address: this.address, port: this.port };
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(server, connection) {
|
||||||
|
super();
|
||||||
|
|
||||||
|
this.id = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The parent server this Peer is part of.
|
||||||
|
* @type {PeerServer}
|
||||||
|
*/
|
||||||
|
this.server = server;
|
||||||
|
/**
|
||||||
|
* The underlying Connection.
|
||||||
|
* @type {Connection}
|
||||||
|
*/
|
||||||
|
this.connection = connection;
|
||||||
|
|
||||||
|
// Fetch the remote's id
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accepts an existing connection as a new Peer.
|
||||||
|
* @param {Connection} connection The Connection to accept.
|
||||||
|
* @return {Promise} A Promise that resolves once the initial handshake is complete.
|
||||||
|
*/
|
||||||
|
__accept(connection) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.connection = connection;
|
||||||
|
this.connection.on("message-request-id", (msg) => {
|
||||||
|
this.connection.id = msg.my_id;
|
||||||
|
this.connection.send("id", { my_id: this.server.our_id })
|
||||||
|
.then(resolve, reject);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initiates the handshake after opening a new connection.
|
||||||
|
* @return {Promise} A Promise that resolves after the initial peer handshake is complete.
|
||||||
|
*/
|
||||||
|
__initiate() {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.connection.send("request-id", { my_id: this.server.our_id })
|
||||||
|
.then(() => {
|
||||||
|
this.connection.once("message-id", (msg) => {
|
||||||
|
this.id = msg.my_id;
|
||||||
|
|
||||||
|
this.emit("connect");
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
}, reject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async destroy() {
|
||||||
|
await this.connection.destroy();
|
||||||
|
this.emit("destroy");
|
||||||
|
this.removeAllListeners();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Peer.Initiate = function(server, address, port) {
|
||||||
|
const conn = await Connection.Create(server.secret_join, address, port);
|
||||||
|
const peer = new Peer(server, conn);
|
||||||
|
await peer.__initiate();
|
||||||
|
}
|
||||||
|
|
||||||
|
Peer.Accept = function(server, connection) {
|
||||||
|
const peer = new Peer(server);
|
||||||
|
await peer.__accept(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
export default Peer;
|
|
@ -1,8 +1,99 @@
|
||||||
"use strict";
|
"use strict";
|
||||||
|
|
||||||
class PeerServer {
|
import { EventEmitter, once } from 'events';
|
||||||
constructor() {
|
import net from 'net';
|
||||||
|
|
||||||
|
import Connection from '../transport/Connection.mjs';
|
||||||
|
|
||||||
|
import Peer from './Peer.mjs';
|
||||||
|
|
||||||
|
class PeerServer extends EventEmitter {
|
||||||
|
constructor(our_id) {
|
||||||
|
super();
|
||||||
|
|
||||||
|
this.our_id = our_id;
|
||||||
|
|
||||||
|
this.peers = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the PeerServer listening on the given port and bind address.
|
||||||
|
* @param {Number} [port=5252] The port number to listen on.
|
||||||
|
* @param {String} [host="::"] The address to bind to.
|
||||||
|
* @return {Promise<void>} A Promise that resolves when the server setup is complete.
|
||||||
|
*/
|
||||||
|
listen(port = 5252, host="::") {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.server = net.createServer(async (client) => {
|
||||||
|
await this.handle_client(client);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.server.once("error", reject);
|
||||||
|
this.server.on("error", this.handle_error);
|
||||||
|
this.server.listen({
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
exclusive: false
|
||||||
|
}, () => {
|
||||||
|
this.server.off("error", reject);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
handle_error(error) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
async handle_client(client) {
|
||||||
|
const peer = new Peer(this, await Connection.Wrap(client));
|
||||||
|
this.peers.push(peer);
|
||||||
|
peer.on("message", this.handle_message.bind(this, peer));
|
||||||
|
peer.on("destroy", this.handle_destroy.bind(this, peer));
|
||||||
|
await once(peer, "connect");
|
||||||
|
}
|
||||||
|
|
||||||
|
async handle_message(peer, message) {
|
||||||
|
this.emit("message", peer, message);
|
||||||
|
this.emit(`message-${message.event}`, peer, message.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handle_destroy(peer) {
|
||||||
|
const index = this.peers.indexOf(peer);
|
||||||
|
if(index > -1)
|
||||||
|
this.peers.splice(index, 1);
|
||||||
|
|
||||||
|
this.emit("disconnect", peer.remote_endpoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of all currently known peer addresses.
|
||||||
|
* @return {{address:string,port:number}[]}
|
||||||
|
*/
|
||||||
|
peer_addresses() {
|
||||||
|
return this.peers.map((peer) => peer.remote_endpoint)
|
||||||
|
.filter(el => typeof el.addr === "string" && typeof el.port === "number");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shuts the server down.
|
||||||
|
* This does not disconnect any existing peers!
|
||||||
|
* @return {Promise} A Promise that resolves once the server has been shutdown.
|
||||||
|
*/
|
||||||
|
shutdown_server() {
|
||||||
|
return new Promise((resolve, _reject) => {
|
||||||
|
this.server.close(resolve);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the PeerServer and gracefully (if possible) disconnects all existing peers.
|
||||||
|
* @return {Promise} A Promise that resolves once the shutdown process has completed.
|
||||||
|
*/
|
||||||
|
async destroy() {
|
||||||
|
await this.shutdown_server();
|
||||||
|
await Promise.all(...this.peers.map(peer => peer.destroy()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
9
src/lib/raft/RaftSubsystem.mjs
Normal file
9
src/lib/raft/RaftSubsystem.mjs
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
"use strict";
|
||||||
|
|
||||||
|
class RaftAgent {
|
||||||
|
constructor() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default RaftAgent;
|
|
@ -124,6 +124,8 @@ class Connection extends EventEmitter {
|
||||||
* @type {void}
|
* @type {void}
|
||||||
*/
|
*/
|
||||||
this.emit("destroy");
|
this.emit("destroy");
|
||||||
|
|
||||||
|
this.removeAllListeners();
|
||||||
}
|
}
|
||||||
|
|
||||||
async handle_frame(bytes) {
|
async handle_frame(bytes) {
|
||||||
|
|
Loading…
Reference in a new issue