diff --git a/package-lock.json b/package-lock.json index 63fde0a..d58c8cb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "log": "^6.2.0", "log-node": "^8.0.1", "nexline": "^1.2.2", + "p-queue": "^7.1.0", "systeminformation": "^5.9.4", "tweetnacl": "^1.0.3" } @@ -169,6 +170,11 @@ "es5-ext": "~0.10.14" } }, + "node_modules/eventemitter3": { + "version": "4.0.7", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + }, "node_modules/ext": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/ext/-/ext-1.6.0.tgz", @@ -340,6 +346,32 @@ "resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz", "integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A==" }, + "node_modules/p-queue": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz", + "integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==", + "dependencies": { + "eventemitter3": "^4.0.7", + "p-timeout": "^5.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-timeout": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz", + "integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", @@ -566,6 +598,11 @@ "es5-ext": "~0.10.14" } }, + "eventemitter3": { + "version": "4.0.7", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + }, "ext": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/ext/-/ext-1.6.0.tgz", @@ -717,6 +754,20 @@ "resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz", "integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A==" }, + "p-queue": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz", + "integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==", + "requires": { + "eventemitter3": "^4.0.7", + "p-timeout": "^5.0.0" + } + }, + "p-timeout": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz", + "integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow==" + }, "safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", diff --git a/package.json b/package.json index bd1875f..11a90d4 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "log": "^6.2.0", "log-node": "^8.0.1", "nexline": "^1.2.2", + "p-queue": "^7.1.0", "systeminformation": "^5.9.4", "tweetnacl": "^1.0.3" } diff --git a/src/lib/crypto/secretbox.mjs b/src/lib/crypto/secretbox.mjs index cf7c961..8021ca8 100644 --- a/src/lib/crypto/secretbox.mjs +++ b/src/lib/crypto/secretbox.mjs @@ -43,8 +43,10 @@ function encrypt_bytes(key_bytes, data_bytes) { const concat_bytes = Buffer.concat([nonce, cipher_bytes]); - key_bytes.fill(0); - nonce.fill(0); + // We don't need to zero out the key after we're done in this instance, + // since we only have a reference to the actual key + // key_bytes.fill(0); + // nonce.fill(0); return Buffer.from(concat_bytes); } @@ -73,15 +75,14 @@ function decrypt(key, cipher_text) { function decrypt_bytes(key_bytes, cipher_text_bytes) { if(!is_uint8array(key_bytes)) throw new Error(`Error: Expected key_bytes to be of type Uint8Array, but got ${typeof key_bytes}`); if(!is_uint8array(cipher_text_bytes)) throw new Error(`Error: Expected cipher_text_bytes to be of type Uint8Array, but got ${typeof cipher_text_bytes}`); - const nonce = cipher_text_bytes.slice(0, secretbox.nonceLength); - const cipher_bytes = cipher_text_bytes.slice(secretbox.nonceLength); + const nonce = new Uint8Array(cipher_text_bytes.slice(0, secretbox.nonceLength)); + const cipher_bytes = new Uint8Array(cipher_text_bytes.slice(secretbox.nonceLength)); const data_bytes = secretbox.open(cipher_bytes, nonce, key_bytes); // Failed to decrypt message. Could be because the nonce, key, or ciphertext is invalid // Ref https://github.com/dchest/tweetnacl-js/blob/master/test/04-secretbox.quick.js // Ref https://github.com/dchest/tweetnacl-js/wiki/Examples#secretbox if(!data_bytes) return null; - return Buffer.from(data_bytes); } diff --git a/src/lib/io/StreamHelpers.mjs b/src/lib/io/StreamHelpers.mjs index dda88b5..7f730ef 100644 --- a/src/lib/io/StreamHelpers.mjs +++ b/src/lib/io/StreamHelpers.mjs @@ -22,7 +22,7 @@ * @param {string|Buffer|Uint8Array} data The data to write. * @return {Promise} A promise that resolves when writing is complete. */ -function write_safe(stream_out, data) { +function write_safe(stream_out, data, timeout = null) { return new Promise(function (resolve, reject) { // console.log(`Beginning write`); // Handle errors @@ -42,13 +42,22 @@ function write_safe(stream_out, data) { resolve(); } else { - // We need to wait for the drain event before continuing - // console.log(`Waiting for drain event`); - stream_out.once("drain", () => { + let handler_drain = () => { stream_out.off("error", handler_error); + if(timeout !== null) clearTimeout(id); // console.log(`Drain event received, handler detached, resolving`); resolve(); - }); + }; + let id; + if(timeout !== null) + id = setTimeout(() => { + stream_out.off("drain", handler_drain); + handler_error(new Error(`Error: Timeout reached waiting for drain event`)); + }, timeout); + + // We need to wait for the drain event before continuing + // console.log(`Waiting for drain event`); + stream_out.once("drain", handler_drain); } }); } diff --git a/src/lib/transport/Connection.mjs b/src/lib/transport/Connection.mjs index 8c269ef..98990ac 100644 --- a/src/lib/transport/Connection.mjs +++ b/src/lib/transport/Connection.mjs @@ -19,7 +19,15 @@ import { encrypt_bytes, decrypt_bytes } from '../crypto/secretbox.mjs'; * @param {net.Socket?} socket Optional. A pre-existing socket to take over and manage. */ class Connection extends EventEmitter { - constructor(secret_join, socket) { + /** + * Whether this socket is actually connected or not. + * @return {bool} + */ + get connected() { + return this.socket === null ? true : this.socket.destroyed; + } + + constructor(secret_join, socket = null) { super(); if(typeof secret_join !== "string") @@ -96,8 +104,8 @@ class Connection extends EventEmitter { async handle_frame(bytes) { try { - l.info(`FRAME length`, bytes.length); - let decrypted = decrypt_bytes(this.session_key, new Uint8Array(bytes)); + l.info(`FRAME length`, bytes.length, `frame`, bytes); + let decrypted = decrypt_bytes(this.session_key, bytes); if(decrypted === null) { l.warn(`Decryption of message failed`); return; @@ -113,7 +121,7 @@ class Connection extends EventEmitter { async handle_message(msg_text) { const msg = JSON.parse(msg_text); - l.log(`MESSAGE:${msg.event} content`, msg.message); + l.info(`RECEIVE:${msg.event}`, msg.message); if(msg.event == "rekey" && !this.rekey_in_progress) { // Set and forget here @@ -126,6 +134,8 @@ class Connection extends EventEmitter { async send(event, message) { if(typeof event !== "string") throw new Error(`Error: Expected string for event name, but got value of type ${typeof event}.`); + l.info(`SEND event`, event, `message`, message); + // Rekey at semi-regular intervals, but only if we're not already in the process of doing so if(new Date() - this.rekey_last > this.rekey_interval && !this.rekey_in_progress) await this.rekey(); @@ -138,7 +148,7 @@ class Connection extends EventEmitter { Buffer.from(payload, "utf-8") ); - await this.framer.write(payload); + return await this.framer.write(payload); } } diff --git a/src/lib/transport/FramedTransport.mjs b/src/lib/transport/FramedTransport.mjs index 2b1560e..c2ecf3c 100644 --- a/src/lib/transport/FramedTransport.mjs +++ b/src/lib/transport/FramedTransport.mjs @@ -1,12 +1,14 @@ "use strict"; import os from 'os'; +import { EventEmitter, once } from 'events'; import log from 'log'; const l = log.get("framedtransport"); -import { EventEmitter, once } from 'events'; +import PQueue from 'p-queue'; +import settings from '../../settings.mjs'; import { write_safe, end_safe } from '../io/StreamHelpers.mjs'; /** @@ -26,7 +28,11 @@ class FramedTransport extends EventEmitter { /** The length of a uint in bytes @type {number} */ this.uint_length = 4; - this.writing = false; + this.write_queue = new PQueue({ + concurrency: 1, + timeout: 30 * 1000, + throwOnTimeout: true + }); } /** @@ -44,21 +50,20 @@ class FramedTransport extends EventEmitter { else this.buffer = chunk; - l.debug(`CHUNK buffer`, this.buffer); - let next_frame_length = this.buffer2uint(this.buffer, 0); - l.debug(`CHUNK total length`, this.buffer.length, `next_frame_length`, next_frame_length); - - // We have enough data! Emit a frame and then start again. if(this.buffer.length - this.uint_length >= next_frame_length) { - this.emit("frame", this.buffer.slice(this.uint_length, next_frame_length)); + const frame = this.buffer.slice(this.uint_length, this.uint_length + next_frame_length); + if(frame.length !== next_frame_length) + throw new Error(`Error: Actual frame length of ${frame.length} does not match expected value ${next_frame_length}`); + // l.debug(`FRAME length`, frame.length, `frame`, frame); + this.emit("frame", frame); if(this.buffer.length - (this.uint_length + next_frame_length) > 0) this.buffer = this.buffer.slice(this.uint_length + next_frame_length); else this.buffer = null; - l.info(`FRAME length`, next_frame_length, `length_remaining`, this.buffer === null ? 0 : this.buffer.length); + // l.info(`FRAME buffer length remaining`, this.buffer === null ? 0 : this.buffer.length); } } @@ -70,12 +75,10 @@ class FramedTransport extends EventEmitter { * @return {number} The parsed number from the buffer. */ buffer2uint(buffer, pos32) { - l.debug(`BUFFER2UINT buffer`, buffer, `pos32`, pos32); // DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer. let u8 = new Uint8Array(buffer).slice(pos32, this.uint_length); // Convert from network byte order if necessary if(os.endianness() == "LE") u8.reverse(); - l.debug(`BUFFER2UINT u8`, u8, `u32`, new Uint32Array(u8.buffer)); return new Uint32Array(u8.buffer)[0]; } @@ -101,14 +104,23 @@ class FramedTransport extends EventEmitter { * @return {Promise} A promise that resolves when writing is complete. */ async write(frame) { - if(this.writing) await once(this, "write-end"); - this.emit("write-start"); - this.writing = true; - l.info(`SEND length`, frame.length); - await write_safe(this.socket, this.uint2buffer(frame.length)); - await write_safe(this.socket, frame); - this.writing = false; - this.emit("write-end") + // l.info(`SEND length`, frame.length, `frame`, frame); + try { + await this.write_queue.add(async () => { + const bytes = Buffer.concat([ this.uint2buffer(frame.length), frame ]); + try { await write_safe(this.socket, bytes); } + catch(error) { + l.warn(`Error while writing, killing connection`, settings.cli.verbose ? error : error.message); + await this.destroy(); + } + }); + } + catch(error) { + l.warn(`Error: write timeout detected, killing connection`); + await this.destroy(); + return false; + } + return true; } /** @@ -116,6 +128,7 @@ class FramedTransport extends EventEmitter { * @return {Promise} A Promise that resolves when the socket is properly closed. */ async destroy() { + this.write_queue.clear(); // Calling socket.end() is important as it closes the stream properly await end_safe(this.socket); await this.socket.destroy(); diff --git a/src/lib/transport/rekey.mjs b/src/lib/transport/rekey.mjs index cf9a9be..c37e25d 100644 --- a/src/lib/transport/rekey.mjs +++ b/src/lib/transport/rekey.mjs @@ -2,7 +2,8 @@ import { once } from 'events'; -import l from 'log'; +import log from 'log'; +const l = log.get("rekey"); import jpake from 'jpake'; export default async function rekey(connection, secret_join) { @@ -14,7 +15,8 @@ export default async function rekey(connection, secret_join) { // 2: Round 2 - const their_round1 = (await once(connection, "message-rekey"))[1]; + const their_round1 = (await once(connection, "message-rekey"))[0]; + l.debug(`THEIR_ROUND1`, their_round1); if(typeof their_round1 !== "object" || their_round1.round !== 1 @@ -30,10 +32,11 @@ export default async function rekey(connection, secret_join) { connection.send("rekey", { round: 2, content: our_round2 }); // 3: Compute new shared key - const their_round2 = (await once(connection, "message-rekey"))[1]; + const their_round2 = (await once(connection, "message-rekey"))[0]; + l.debug(`THEIR_ROUND2`, their_round2); if(typeof their_round2 !== "object" - || their_round2.round !== 1 + || their_round2.round !== 2 || typeof their_round2.content !== "string") throw new Error(`Error: Received invalid round 2 from peer`); diff --git a/src/subcommands/test-client/test-client.mjs b/src/subcommands/test-client/test-client.mjs index 437d288..cb87816 100644 --- a/src/subcommands/test-client/test-client.mjs +++ b/src/subcommands/test-client/test-client.mjs @@ -7,17 +7,18 @@ import l from 'log'; import settings from '../../settings.mjs'; import sleep from '../../lib/async/sleep.mjs'; import Connection from '../../lib/transport/Connection.mjs'; -import { encrypt, decrypt } from '../../lib/crypto/secretbox.mjs'; +import { encrypt_bytes, decrypt_bytes } from '../../lib/crypto/secretbox.mjs'; export default async function() { const test_key = "H7xKSxvJFoZoNjCKAfxn4E3qUzY3Y/4bjY+qIzxg+78="; - + const test_key_bytes = Buffer.from(test_key, "base64"); const test_data = "hello, world"; l.notice(`TEST_DATA`, test_data); - const encrypted = encrypt(test_key, test_data); + const encrypted = encrypt_bytes(test_key_bytes, Buffer.from(test_data, "utf-8")); l.notice(`ENCRYPTED`, encrypted); - const decrypted = decrypt(test_key, encrypted); + const decrypted = decrypt_bytes(test_key_bytes, encrypted); l.notice(`DECRYPTED`, decrypted); + l.notice(`DECRYPTED_TEXT`, decrypted.toString("utf-8")); const socket = await Connection.Create(test_key, "::1", settings.cli.port); @@ -28,7 +29,6 @@ export default async function() { for(let i = 0; i < 100; i++) { await sleep(1000); - l.notice(`>>> hello world ${i}`); socket.send(`test-client`, `hello world ${i}\n`); } } diff --git a/src/subcommands/test-server/test-server.mjs b/src/subcommands/test-server/test-server.mjs index 8fc34ab..47a2861 100644 --- a/src/subcommands/test-server/test-server.mjs +++ b/src/subcommands/test-server/test-server.mjs @@ -13,12 +13,15 @@ export default async function() { const server = net.createServer(async (client) => { l.notice("client connected"); const connection = await Connection.Wrap(test_key, client); - connection.write("hello\n"); + connection.send("test-server", "hello\n"); connection.on("message", (event, msg) => { l.notice(`<<< ${event}: ${JSON.stringify(msg)}`); }); - setInterval(async () => { - connection.send("test-server", (new Date()).toString()); + let id = setInterval(async () => { + const rt = await connection.send("test-server", (new Date()).toString()); + l.info(`RETURNVAL`, rt); + if(!rt) + clearTimeout(id); }, 1000); });