systemquery/src/lib/transport/Connection.mjs

137 lines
3.8 KiB
JavaScript
Raw Normal View History

"use strict";
2021-10-02 16:34:15 +00:00
import crypto from 'crypto';
import net from 'net';
import { EventEmitter, once } from 'events';
import l from 'log';
import settings from '../../settings.mjs';
import rekey from './rekey.mjs';
import FramedTransport from './FramedTransport.mjs';
import { write_safe } from '../io/StreamHelpers.mjs';
import { encrypt_bytes, decrypt_bytes } from '../crypto/secretbox.mjs';
/**
* Represents a connection to a single endpoint.
*/
class Connection extends EventEmitter {
2021-10-02 16:00:24 +00:00
constructor(secret_join, socket) {
super();
2021-10-02 16:00:24 +00:00
this.socket = socket;
this.rekey_last = null;
2021-10-02 16:34:15 +00:00
this.rekey_interval_base = 30 * 60 * 1000; // 30 minutes
this.rekey_interval = this.rekey_interval_base + crypto.randomInt(0, 15 * 60 * 1000);
this.rekey_in_progress = false;
this.session_key = Buffer.from(secret_join, "base64");
}
/**
* Connects to a peer and initialises a secure TCP connection thereto.
* @param {string} address The address to connect to.
* @param {string} port The TCP port to connect to.
* @return {net.Socket} A socket setup for secure communication.
*/
async connect(address, port) {
this.address = address; this.port = port;
this.socket = new new.Socket();
this.socket.connect({
address, port
});
await once(this.socket, "connect");
2021-10-02 16:00:24 +00:00
await this.init();
}
async init() {
this.socket.setKeepAlive(true);
2021-10-02 16:00:24 +00:00
await this.rekey();
this.framer = new FramedTransport(this.socket);
this.framer.on("frame", this.handle_frame);
this.read_task = read_loop();
}
2021-10-02 16:00:24 +00:00
async rekey() {
2021-10-02 16:34:15 +00:00
try {
this.rekey_in_progress = true;
this.session_key = await rekey(this, this.session_key);
this.rekey_interval = this.rekey_interval_base + crypto.randomInt(0, 15 * 60 * 1000);
this.rekey_last = new Date();
this.emit("rekey");
}
catch(error) {
l.warn(`Error when rekeying connection ${this.address}:${this.port}: ${settings.cli.verbose ? error : error.message}, killing connection`);
await this.destroy();
}
finally {
this.rekey_in_progress = false;
}
2021-10-02 16:00:24 +00:00
}
async destroy() {
await this.framer.destroy();
this.emit("destroy");
}
async handle_frame(bytes) {
try {
let decrypted = decrypt_bytes(this.session_key, bytes);
if(decrypted === null) return;
await handle_message(decrypted.toString("utf-8"));
}
catch(error) {
l.warn(`Warning: Killing connection to ${this.address}:${this.port} after error: ${settings.cli.verbose ? error : error.message}`);
}
finally {
this.destroy();
}
}
async handle_message(msg_text) {
const msg = JSON.parse(msg_text);
2021-10-02 16:34:15 +00:00
if(msg.event == "rekey") {
// Set and forget here
if(!this.rekey_in_progress) this.rekey();
}
this.emit("message", msg.event, msg.message);
this.emit(`message-${msg.event}`, msg.message);
}
2021-10-02 16:34:15 +00:00
async send(event, message) {
if(typeof event !== "string") throw new Error(`Error: Expected string for event name, but got value of type ${typeof event}.`);
// Rekey at semi-regular intervals, but only if we're not already in the process of doing so
if(new Date() - this.rekey_last > this.rekey_interval && !this.rekey_in_progress)
2021-10-02 16:00:24 +00:00
await this.rekey();
2021-10-02 16:34:15 +00:00
// TODO: Consider anonymous TLS, with jpake for mututal authentication
2021-10-02 02:03:49 +00:00
// TODO: Consider https://devdocs.io/node/crypto#crypto.createCipheriv() - which lets us use any openssl ciphers we like - e.g. ChaCha20-Poly1305
2021-10-02 16:34:15 +00:00
let payload = JSON.stringify({ event, message });
payload = encrypt_bytes(this.session_key, payload);
2021-10-02 16:00:24 +00:00
await this.framer.write(payload);
}
}
2021-10-02 16:00:24 +00:00
Connection.Wrap = async function(secret_join, socket) {
const socket = new Connection(secret_join, socket);
await socket.init();
return socket;
}
Connection.Create = async function(secret_join, address, port) {
const socket = new Connection(secret_join);
socket.connect(address, port);
}
export default Connection;