
231 lines
7.2 KiB
Raw Normal View History

"use strict";
2021-10-02 16:34:15 +00:00
import crypto from 'crypto';
import net from 'net';
import { EventEmitter, once } from 'events';
2022-01-09 16:30:42 +00:00
import log from '../io/NamespacedLog.mjs'; const l = log("connection");
import settings from '../../settings.mjs';
import rekey from './rekey.mjs';
import FramedTransport from './FramedTransport.mjs';
import { write_safe } from '../io/StreamHelpers.mjs';
import { encrypt_bytes, decrypt_bytes } from '../crypto/secretbox.mjs';
* Represents a connection to a single endpoint.
* @param {string} secret_join The shared join secret, encoded as base64
* @param {net.Socket?} socket Optional. A pre-existing socket to take over and manage.
* @event Connection#connect The initial connection setup is complete.
* @event Connection#rekey The session key has been re-exchanged.
* @event Connection#destroy The connection has been closed
* @event Connection#message A message has been received.
* @event Connection#message-EVENTNAME A message with a given event name has been received
class Connection extends EventEmitter {
* Whether this socket is actually connected or not.
* @return {bool}
get connected() {
return this.framer == null ? false : this.framer.connected;
constructor(secret_join, socket = null) {
if(typeof secret_join !== "string")
throw new Error(`Error: Expected secret_join to be of type string, but received variable of type ${typeof secret_join}`);
2021-10-02 16:00:24 +00:00
this.socket = socket;
this.rekey_last = null;
2021-10-02 16:34:15 +00:00
this.rekey_interval_base = 30 * 60 * 1000; // 30 minutes
this.rekey_interval = this.rekey_interval_base + crypto.randomInt(0, 15 * 60 * 1000);
this.rekey_in_progress = false;
this.sequence_count_receive = 0;
this.sequence_count_send = 0;
this.session_key = Buffer.from(secret_join, "base64");
* Connects to a peer and initialises a secure TCP connection thereto.
* @param {string} address The address to connect to.
* @param {string} port The TCP port to connect to.
* @return {net.Socket} A socket setup for secure communication.
async connect(address, port) {
this.address = address; this.port = port;
this.socket = new net.Socket();
address, port
this.socket.once("end", () => {
2022-01-09 16:30:42 +00:00
l.log(`${this.address}:${this.port} disconnected`);
await once(this.socket, "connect");
2021-10-02 16:00:24 +00:00
await this.init();
async init() {
this.address = this.socket.remoteAddress;
this.port = this.socket.remotePort;
this.framer = new FramedTransport(this.socket);
this.framer.on("frame", this.handle_frame.bind(this));
await this.rekey();
// We can await .init() or .connect() - this is just another optiom
* The initial connection setup is complete.
* @event Connection#connect
* @type {void}
2021-10-02 16:00:24 +00:00
async rekey() {
2021-10-02 16:34:15 +00:00
try {
this.rekey_in_progress = true;
this.session_key = await rekey(this, this.session_key);
this.rekey_interval = this.rekey_interval_base + crypto.randomInt(0, 15 * 60 * 1000);
this.rekey_last = new Date();
// Also reset the sequence counters. This will help avoid integer overflow issues - however unlikely they may be
this.sequence_count_send = 0;
this.sequence_count_receive = 0;
* The session key has been re-exchanged.
* This event is fired with the rekeying process
* is complete.
* @event Connection#rekey
* @type {void}
2021-10-02 16:34:15 +00:00
catch(error) {
l.warn(`Error when rekeying connection ${this.address}:${this.port}, killing connection`, settings.cli.verbose ? error : error.message);
2021-10-02 16:34:15 +00:00
await this.destroy();
finally {
this.rekey_in_progress = false;
2021-10-02 16:00:24 +00:00
async destroy() {
2022-01-09 17:37:06 +00:00
l.debug(`Killing connection to ${this.address}:${this.port}`, new Error().stack);
if(this.framer instanceof FramedTransport)
await this.framer.destroy();
else {
await this.socket.end();
await this.socket.destroy();
* The connection has been closed
* @event Connection#destroy
* @type {void}
async handle_frame(bytes) {
try {
2021-10-03 11:14:57 +00:00
//`FRAME length`, bytes.length, `frame`, bytes);
let decrypted = decrypt_bytes(this.session_key, bytes);
if(decrypted === null) {
l.warn(`Decryption of message failed`);
await this.handle_message(decrypted.toString("utf-8"));
catch(error) {
l.warn(`Warning: Killing connection to ${this.address}:${this.port} after error:`, settings.cli.verbose ? error : error.message);
async handle_message(msg_text) {
// If this JSON.parse() call fails, we kill the connection 'cause the
// catch part of the above try..catch will trigger in this.handle_frame.
// This is very important, because if it didn't kill the connection
// we would no doubt be open to all sorts of unpleasant attacks
// given that we wouldn't have a chance to check the sequence number
// (see below).
const msg = JSON.parse(msg_text);
2021-10-02 16:34:15 +00:00
l.debug(`RECEIVE ${msg.sequence}:${msg.event}`, msg.message);
if(msg.sequence !== this.sequence_count_receive) {
l.warn(`Killing connection due to invalid sequence number in received message: expected ${this.sequence_count_receive}, but got ${msg.sequence}.`);
if(msg.event == "rekey" && !this.rekey_in_progress) {
2021-10-02 16:34:15 +00:00
// Set and forget here
2021-10-02 16:34:15 +00:00
* A message has been received.
* @event Connection#message
* @type {string,object} The name of the event, followed by the message content.
2021-10-02 16:34:15 +00:00
this.emit("message", msg.event, msg.message);
* A message with a specific event name has been received.
* @event Connection#message-EVENTNAME
* @type {object} The message content.
2021-10-02 16:34:15 +00:00
this.emit(`message-${msg.event}`, msg.message);
2021-10-02 16:34:15 +00:00
async send(event, message) {
if(typeof event !== "string") throw new Error(`Error: Expected string for event name, but got value of type ${typeof event}.`);
2021-10-03 11:14:57 +00:00
l.debug(`SEND event`, event, `message`, message);
2021-10-02 16:34:15 +00:00
// Rekey at semi-regular intervals, but only if we're not already in the process of doing so
if(new Date() - this.rekey_last > this.rekey_interval && !this.rekey_in_progress)
2021-10-02 16:00:24 +00:00
await this.rekey();
TODO: Consider anonymous TLS, with jpake for mututal authentication
TODO: Consider - which lets us use any openssl ciphers we like - e.g. ChaCha20-Poly1305
Note here that we do *not* need another manual MAC to make this
authenticated, as tweetnacl's secretbox uses xsalsa20-poly1305,
which is an *authenticated* encryption algorithm.
let payload = JSON.stringify({ event, message, sequence: this.sequence_count_send++ });
payload = encrypt_bytes(
Buffer.from(payload, "utf-8")
2021-10-02 16:00:24 +00:00
return await this.framer.write(payload);
2021-10-02 16:00:24 +00:00
Connection.Wrap = async function(secret_join, socket) {
const socket_wrap = new Connection(secret_join, socket);
await socket_wrap.init();
2021-10-02 16:00:24 +00:00
return socket_wrap;
2021-10-02 16:00:24 +00:00
Connection.Create = async function(secret_join, address, port) {
const socket = new Connection(secret_join);
await socket.connect(address, port);
return socket;
2021-10-02 16:00:24 +00:00
export default Connection;