systemquery/src/lib/transport/Connection.mjs

208 lines
6.4 KiB
JavaScript

"use strict";
import crypto from 'crypto';
import net from 'net';
import { EventEmitter, once } from 'events';
import log from '../io/NamespacedLog.mjs'; const l = log("connection");
import settings from '../../settings.mjs';
import rekey from './rekey.mjs';
import FramedTransport from './FramedTransport.mjs';
import { write_safe } from '../io/StreamHelpers.mjs';
import { encrypt_bytes, decrypt_bytes } from '../crypto/secretbox.mjs';
/**
* Represents a connection to a single endpoint.
* @param {string} secret_join The shared join secret, encoded as base64
* @param {net.Socket?} socket Optional. A pre-existing socket to take over and manage.
*
* @event Connection#connect The initial connection setup is complete.
* @event Connection#rekey The session key has been re-exchanged.
* @event Connection#destroy The connection has been closed
* @event Connection#message A message has been received.
* @event Connection#message-EVENTNAME A message with a given event name has been received
*/
class Connection extends EventEmitter {
/**
* Whether this socket is actually connected or not.
* @return {bool}
*/
get connected() {
return this.framer == null ? false : this.framer.connected;
}
constructor(secret_join, socket = null) {
super();
if(typeof secret_join !== "string")
throw new Error(`Error: Expected secret_join to be of type string, but received variable of type ${typeof secret_join}`);
this.socket = socket;
this.rekey_last = null;
this.rekey_interval_base = 30 * 60 * 1000; // 30 minutes
this.rekey_interval = this.rekey_interval_base + crypto.randomInt(0, 15 * 60 * 1000);
this.rekey_in_progress = false;
this.session_key = Buffer.from(secret_join, "base64");
}
/**
* Connects to a peer and initialises a secure TCP connection thereto.
* @param {string} address The address to connect to.
* @param {string} port The TCP port to connect to.
* @return {net.Socket} A socket setup for secure communication.
*/
async connect(address, port) {
this.address = address; this.port = port;
this.socket = new net.Socket();
this.socket.connect({
address, port
});
this.socket.once("end", () => {
l.log(`${this.address}:${this.port} disconnected`);
});
await once(this.socket, "connect");
await this.init();
}
async init() {
this.address = this.socket.remoteAddress;
this.port = this.socket.remotePort;
this.socket.setKeepAlive(true);
this.framer = new FramedTransport(this.socket);
this.framer.on("frame", this.handle_frame.bind(this));
await this.rekey();
// We can await .init() or .connect() - this is just another optiom
/**
* The initial connection setup is complete.
* @event Connection#connect
* @type {void}
*/
this.emit(`connect`);
}
async rekey() {
try {
this.rekey_in_progress = true;
this.session_key = await rekey(this, this.session_key);
this.rekey_interval = this.rekey_interval_base + crypto.randomInt(0, 15 * 60 * 1000);
this.rekey_last = new Date();
/**
* The session key has been re-exchanged.
* @event Connection#rekey
* @type {void}
*/
this.emit("rekey");
}
catch(error) {
l.warn(`Error when rekeying connection ${this.address}:${this.port}, killing connection`, settings.cli.verbose ? error : error.message);
await this.destroy();
}
finally {
this.rekey_in_progress = false;
}
}
async destroy() {
l.debug(`Killing connection to ${this.address}:${this.port}`, new Error().stack);
if(this.framer instanceof FramedTransport)
await this.framer.destroy();
else {
await this.socket.end();
await this.socket.destroy();
}
/**
* The connection has been closed
* @event Connection#destroy
* @type {void}
*/
this.emit("destroy");
this.removeAllListeners();
}
async handle_frame(bytes) {
try {
// l.info(`FRAME length`, bytes.length, `frame`, bytes);
let decrypted = decrypt_bytes(this.session_key, bytes);
if(decrypted === null) {
l.warn(`Decryption of message failed`);
return;
}
await this.handle_message(decrypted.toString("utf-8"));
}
catch(error) {
l.warn(`Warning: Killing connection to ${this.address}:${this.port} after error:`, settings.cli.verbose ? error : error.message);
this.destroy();
}
}
async handle_message(msg_text) {
const msg = JSON.parse(msg_text);
l.debug(`RECEIVE:${msg.event}`, msg.message);
if(msg.event == "rekey" && !this.rekey_in_progress) {
// Set and forget here
this.rekey();
}
/**
* A message has been received.
* @event Connection#message
* @type {string,object} The name of the event, followed by the message content.
*/
this.emit("message", msg.event, msg.message);
/**
* A message with a specific event name has been received.
* @event Connection#message-EVENTNAME
* @type {object} The message content.
*/
this.emit(`message-${msg.event}`, msg.message);
}
async send(event, message) {
if(typeof event !== "string") throw new Error(`Error: Expected string for event name, but got value of type ${typeof event}.`);
l.debug(`SEND event`, event, `message`, message);
// Rekey at semi-regular intervals, but only if we're not already in the process of doing so
if(new Date() - this.rekey_last > this.rekey_interval && !this.rekey_in_progress)
await this.rekey();
// TODO: Consider anonymous TLS, with jpake for mututal authentication
// TODO: Consider https://devdocs.io/node/crypto#crypto.createCipheriv() - which lets us use any openssl ciphers we like - e.g. ChaCha20-Poly1305
// TODO: We're currently vulnerable to a replay attack. We need to mitigate this somehow - probably by maintaining a sequence number. Instead of sending the sequence number though we should instead compute a MAC that also includes the message length and a bunch of other things etc. Of course, we will also need to make sure we don't fall afoul of mac-then-encrypt, encrypt-then-mac, etc issues...
let payload = JSON.stringify({ event, message });
payload = encrypt_bytes(
this.session_key,
Buffer.from(payload, "utf-8")
);
return await this.framer.write(payload);
}
}
Connection.Wrap = async function(secret_join, socket) {
const socket_wrap = new Connection(secret_join, socket);
await socket_wrap.init();
return socket_wrap;
}
Connection.Create = async function(secret_join, address, port) {
const socket = new Connection(secret_join);
await socket.connect(address, port);
return socket;
}
export default Connection;