Fix bugs. It works!!!
Now we need to tidy up the logging output.....
This commit is contained in:
parent
0eed69917d
commit
49ca7f0a57
9 changed files with 137 additions and 46 deletions
51
package-lock.json
generated
51
package-lock.json
generated
|
@ -14,6 +14,7 @@
|
|||
"log": "^6.2.0",
|
||||
"log-node": "^8.0.1",
|
||||
"nexline": "^1.2.2",
|
||||
"p-queue": "^7.1.0",
|
||||
"systeminformation": "^5.9.4",
|
||||
"tweetnacl": "^1.0.3"
|
||||
}
|
||||
|
@ -169,6 +170,11 @@
|
|||
"es5-ext": "~0.10.14"
|
||||
}
|
||||
},
|
||||
"node_modules/eventemitter3": {
|
||||
"version": "4.0.7",
|
||||
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
|
||||
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
|
||||
},
|
||||
"node_modules/ext": {
|
||||
"version": "1.6.0",
|
||||
"resolved": "https://registry.npmjs.org/ext/-/ext-1.6.0.tgz",
|
||||
|
@ -340,6 +346,32 @@
|
|||
"resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz",
|
||||
"integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A=="
|
||||
},
|
||||
"node_modules/p-queue": {
|
||||
"version": "7.1.0",
|
||||
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz",
|
||||
"integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==",
|
||||
"dependencies": {
|
||||
"eventemitter3": "^4.0.7",
|
||||
"p-timeout": "^5.0.0"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sindresorhus"
|
||||
}
|
||||
},
|
||||
"node_modules/p-timeout": {
|
||||
"version": "5.0.0",
|
||||
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz",
|
||||
"integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow==",
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sindresorhus"
|
||||
}
|
||||
},
|
||||
"node_modules/safer-buffer": {
|
||||
"version": "2.1.2",
|
||||
"resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz",
|
||||
|
@ -566,6 +598,11 @@
|
|||
"es5-ext": "~0.10.14"
|
||||
}
|
||||
},
|
||||
"eventemitter3": {
|
||||
"version": "4.0.7",
|
||||
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
|
||||
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
|
||||
},
|
||||
"ext": {
|
||||
"version": "1.6.0",
|
||||
"resolved": "https://registry.npmjs.org/ext/-/ext-1.6.0.tgz",
|
||||
|
@ -717,6 +754,20 @@
|
|||
"resolved": "https://registry.npmjs.org/noble-secp256k1/-/noble-secp256k1-1.2.10.tgz",
|
||||
"integrity": "sha512-PXlDRYoWD5JHm+fKVx8PsiLVsuYLIp+5ZhO76L//H/q2/YcQZAS59z9aXO7lcq4IMOq8a1U18KXgpijnkz+C5A=="
|
||||
},
|
||||
"p-queue": {
|
||||
"version": "7.1.0",
|
||||
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-7.1.0.tgz",
|
||||
"integrity": "sha512-V+0vPJbhYkBqknPp0qnaz+dWcj8cNepfXZcsVIVEHPbFQXMPwrzCNIiM4FoxGtwHXtPzVCPHDvqCr1YrOJX2Gw==",
|
||||
"requires": {
|
||||
"eventemitter3": "^4.0.7",
|
||||
"p-timeout": "^5.0.0"
|
||||
}
|
||||
},
|
||||
"p-timeout": {
|
||||
"version": "5.0.0",
|
||||
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.0.0.tgz",
|
||||
"integrity": "sha512-z+bU/N7L1SABsqKnQzvAnINgPX7NHdzwUV+gHyJE7VGNDZSr03rhcPODCZSWiiT9k+gf74QPmzcZzqJRvxYZow=="
|
||||
},
|
||||
"safer-buffer": {
|
||||
"version": "2.1.2",
|
||||
"resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz",
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
"log": "^6.2.0",
|
||||
"log-node": "^8.0.1",
|
||||
"nexline": "^1.2.2",
|
||||
"p-queue": "^7.1.0",
|
||||
"systeminformation": "^5.9.4",
|
||||
"tweetnacl": "^1.0.3"
|
||||
}
|
||||
|
|
|
@ -43,8 +43,10 @@ function encrypt_bytes(key_bytes, data_bytes) {
|
|||
|
||||
const concat_bytes = Buffer.concat([nonce, cipher_bytes]);
|
||||
|
||||
key_bytes.fill(0);
|
||||
nonce.fill(0);
|
||||
// We don't need to zero out the key after we're done in this instance,
|
||||
// since we only have a reference to the actual key
|
||||
// key_bytes.fill(0);
|
||||
// nonce.fill(0);
|
||||
|
||||
return Buffer.from(concat_bytes);
|
||||
}
|
||||
|
@ -73,15 +75,14 @@ function decrypt(key, cipher_text) {
|
|||
function decrypt_bytes(key_bytes, cipher_text_bytes) {
|
||||
if(!is_uint8array(key_bytes)) throw new Error(`Error: Expected key_bytes to be of type Uint8Array, but got ${typeof key_bytes}`);
|
||||
if(!is_uint8array(cipher_text_bytes)) throw new Error(`Error: Expected cipher_text_bytes to be of type Uint8Array, but got ${typeof cipher_text_bytes}`);
|
||||
const nonce = cipher_text_bytes.slice(0, secretbox.nonceLength);
|
||||
const cipher_bytes = cipher_text_bytes.slice(secretbox.nonceLength);
|
||||
const nonce = new Uint8Array(cipher_text_bytes.slice(0, secretbox.nonceLength));
|
||||
const cipher_bytes = new Uint8Array(cipher_text_bytes.slice(secretbox.nonceLength));
|
||||
|
||||
const data_bytes = secretbox.open(cipher_bytes, nonce, key_bytes);
|
||||
// Failed to decrypt message. Could be because the nonce, key, or ciphertext is invalid
|
||||
// Ref https://github.com/dchest/tweetnacl-js/blob/master/test/04-secretbox.quick.js
|
||||
// Ref https://github.com/dchest/tweetnacl-js/wiki/Examples#secretbox
|
||||
if(!data_bytes) return null;
|
||||
|
||||
return Buffer.from(data_bytes);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
* @param {string|Buffer|Uint8Array} data The data to write.
|
||||
* @return {Promise} A promise that resolves when writing is complete.
|
||||
*/
|
||||
function write_safe(stream_out, data) {
|
||||
function write_safe(stream_out, data, timeout = null) {
|
||||
return new Promise(function (resolve, reject) {
|
||||
// console.log(`Beginning write`);
|
||||
// Handle errors
|
||||
|
@ -42,13 +42,22 @@ function write_safe(stream_out, data) {
|
|||
resolve();
|
||||
}
|
||||
else {
|
||||
// We need to wait for the drain event before continuing
|
||||
// console.log(`Waiting for drain event`);
|
||||
stream_out.once("drain", () => {
|
||||
let handler_drain = () => {
|
||||
stream_out.off("error", handler_error);
|
||||
if(timeout !== null) clearTimeout(id);
|
||||
// console.log(`Drain event received, handler detached, resolving`);
|
||||
resolve();
|
||||
});
|
||||
};
|
||||
let id;
|
||||
if(timeout !== null)
|
||||
id = setTimeout(() => {
|
||||
stream_out.off("drain", handler_drain);
|
||||
handler_error(new Error(`Error: Timeout reached waiting for drain event`));
|
||||
}, timeout);
|
||||
|
||||
// We need to wait for the drain event before continuing
|
||||
// console.log(`Waiting for drain event`);
|
||||
stream_out.once("drain", handler_drain);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -19,7 +19,15 @@ import { encrypt_bytes, decrypt_bytes } from '../crypto/secretbox.mjs';
|
|||
* @param {net.Socket?} socket Optional. A pre-existing socket to take over and manage.
|
||||
*/
|
||||
class Connection extends EventEmitter {
|
||||
constructor(secret_join, socket) {
|
||||
/**
|
||||
* Whether this socket is actually connected or not.
|
||||
* @return {bool}
|
||||
*/
|
||||
get connected() {
|
||||
return this.socket === null ? true : this.socket.destroyed;
|
||||
}
|
||||
|
||||
constructor(secret_join, socket = null) {
|
||||
super();
|
||||
|
||||
if(typeof secret_join !== "string")
|
||||
|
@ -96,8 +104,8 @@ class Connection extends EventEmitter {
|
|||
|
||||
async handle_frame(bytes) {
|
||||
try {
|
||||
l.info(`FRAME length`, bytes.length);
|
||||
let decrypted = decrypt_bytes(this.session_key, new Uint8Array(bytes));
|
||||
l.info(`FRAME length`, bytes.length, `frame`, bytes);
|
||||
let decrypted = decrypt_bytes(this.session_key, bytes);
|
||||
if(decrypted === null) {
|
||||
l.warn(`Decryption of message failed`);
|
||||
return;
|
||||
|
@ -113,7 +121,7 @@ class Connection extends EventEmitter {
|
|||
async handle_message(msg_text) {
|
||||
const msg = JSON.parse(msg_text);
|
||||
|
||||
l.log(`MESSAGE:${msg.event} content`, msg.message);
|
||||
l.info(`RECEIVE:${msg.event}`, msg.message);
|
||||
|
||||
if(msg.event == "rekey" && !this.rekey_in_progress) {
|
||||
// Set and forget here
|
||||
|
@ -126,6 +134,8 @@ class Connection extends EventEmitter {
|
|||
async send(event, message) {
|
||||
if(typeof event !== "string") throw new Error(`Error: Expected string for event name, but got value of type ${typeof event}.`);
|
||||
|
||||
l.info(`SEND event`, event, `message`, message);
|
||||
|
||||
// Rekey at semi-regular intervals, but only if we're not already in the process of doing so
|
||||
if(new Date() - this.rekey_last > this.rekey_interval && !this.rekey_in_progress)
|
||||
await this.rekey();
|
||||
|
@ -138,7 +148,7 @@ class Connection extends EventEmitter {
|
|||
Buffer.from(payload, "utf-8")
|
||||
);
|
||||
|
||||
await this.framer.write(payload);
|
||||
return await this.framer.write(payload);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
"use strict";
|
||||
|
||||
import os from 'os';
|
||||
import { EventEmitter, once } from 'events';
|
||||
|
||||
import log from 'log';
|
||||
const l = log.get("framedtransport");
|
||||
|
||||
import { EventEmitter, once } from 'events';
|
||||
import PQueue from 'p-queue';
|
||||
|
||||
import settings from '../../settings.mjs';
|
||||
import { write_safe, end_safe } from '../io/StreamHelpers.mjs';
|
||||
|
||||
/**
|
||||
|
@ -26,7 +28,11 @@ class FramedTransport extends EventEmitter {
|
|||
/** The length of a uint in bytes @type {number} */
|
||||
this.uint_length = 4;
|
||||
|
||||
this.writing = false;
|
||||
this.write_queue = new PQueue({
|
||||
concurrency: 1,
|
||||
timeout: 30 * 1000,
|
||||
throwOnTimeout: true
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -44,21 +50,20 @@ class FramedTransport extends EventEmitter {
|
|||
else
|
||||
this.buffer = chunk;
|
||||
|
||||
l.debug(`CHUNK buffer`, this.buffer);
|
||||
|
||||
let next_frame_length = this.buffer2uint(this.buffer, 0);
|
||||
|
||||
l.debug(`CHUNK total length`, this.buffer.length, `next_frame_length`, next_frame_length);
|
||||
|
||||
|
||||
// We have enough data! Emit a frame and then start again.
|
||||
if(this.buffer.length - this.uint_length >= next_frame_length) {
|
||||
this.emit("frame", this.buffer.slice(this.uint_length, next_frame_length));
|
||||
const frame = this.buffer.slice(this.uint_length, this.uint_length + next_frame_length);
|
||||
if(frame.length !== next_frame_length)
|
||||
throw new Error(`Error: Actual frame length of ${frame.length} does not match expected value ${next_frame_length}`);
|
||||
// l.debug(`FRAME length`, frame.length, `frame`, frame);
|
||||
this.emit("frame", frame);
|
||||
if(this.buffer.length - (this.uint_length + next_frame_length) > 0)
|
||||
this.buffer = this.buffer.slice(this.uint_length + next_frame_length);
|
||||
else
|
||||
this.buffer = null;
|
||||
l.info(`FRAME length`, next_frame_length, `length_remaining`, this.buffer === null ? 0 : this.buffer.length);
|
||||
// l.info(`FRAME buffer length remaining`, this.buffer === null ? 0 : this.buffer.length);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,12 +75,10 @@ class FramedTransport extends EventEmitter {
|
|||
* @return {number} The parsed number from the buffer.
|
||||
*/
|
||||
buffer2uint(buffer, pos32) {
|
||||
l.debug(`BUFFER2UINT buffer`, buffer, `pos32`, pos32);
|
||||
// DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer.
|
||||
let u8 = new Uint8Array(buffer).slice(pos32, this.uint_length);
|
||||
// Convert from network byte order if necessary
|
||||
if(os.endianness() == "LE") u8.reverse();
|
||||
l.debug(`BUFFER2UINT u8`, u8, `u32`, new Uint32Array(u8.buffer));
|
||||
return new Uint32Array(u8.buffer)[0];
|
||||
}
|
||||
|
||||
|
@ -101,14 +104,23 @@ class FramedTransport extends EventEmitter {
|
|||
* @return {Promise} A promise that resolves when writing is complete.
|
||||
*/
|
||||
async write(frame) {
|
||||
if(this.writing) await once(this, "write-end");
|
||||
this.emit("write-start");
|
||||
this.writing = true;
|
||||
l.info(`SEND length`, frame.length);
|
||||
await write_safe(this.socket, this.uint2buffer(frame.length));
|
||||
await write_safe(this.socket, frame);
|
||||
this.writing = false;
|
||||
this.emit("write-end")
|
||||
// l.info(`SEND length`, frame.length, `frame`, frame);
|
||||
try {
|
||||
await this.write_queue.add(async () => {
|
||||
const bytes = Buffer.concat([ this.uint2buffer(frame.length), frame ]);
|
||||
try { await write_safe(this.socket, bytes); }
|
||||
catch(error) {
|
||||
l.warn(`Error while writing, killing connection`, settings.cli.verbose ? error : error.message);
|
||||
await this.destroy();
|
||||
}
|
||||
});
|
||||
}
|
||||
catch(error) {
|
||||
l.warn(`Error: write timeout detected, killing connection`);
|
||||
await this.destroy();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -116,6 +128,7 @@ class FramedTransport extends EventEmitter {
|
|||
* @return {Promise} A Promise that resolves when the socket is properly closed.
|
||||
*/
|
||||
async destroy() {
|
||||
this.write_queue.clear();
|
||||
// Calling socket.end() is important as it closes the stream properly
|
||||
await end_safe(this.socket);
|
||||
await this.socket.destroy();
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
|
||||
import { once } from 'events';
|
||||
|
||||
import l from 'log';
|
||||
import log from 'log';
|
||||
const l = log.get("rekey");
|
||||
import jpake from 'jpake';
|
||||
|
||||
export default async function rekey(connection, secret_join) {
|
||||
|
@ -14,7 +15,8 @@ export default async function rekey(connection, secret_join) {
|
|||
|
||||
// 2: Round 2
|
||||
|
||||
const their_round1 = (await once(connection, "message-rekey"))[1];
|
||||
const their_round1 = (await once(connection, "message-rekey"))[0];
|
||||
l.debug(`THEIR_ROUND1`, their_round1);
|
||||
|
||||
if(typeof their_round1 !== "object"
|
||||
|| their_round1.round !== 1
|
||||
|
@ -30,10 +32,11 @@ export default async function rekey(connection, secret_join) {
|
|||
connection.send("rekey", { round: 2, content: our_round2 });
|
||||
|
||||
// 3: Compute new shared key
|
||||
const their_round2 = (await once(connection, "message-rekey"))[1];
|
||||
const their_round2 = (await once(connection, "message-rekey"))[0];
|
||||
l.debug(`THEIR_ROUND2`, their_round2);
|
||||
|
||||
if(typeof their_round2 !== "object"
|
||||
|| their_round2.round !== 1
|
||||
|| their_round2.round !== 2
|
||||
|| typeof their_round2.content !== "string")
|
||||
throw new Error(`Error: Received invalid round 2 from peer`);
|
||||
|
||||
|
|
|
@ -7,17 +7,18 @@ import l from 'log';
|
|||
import settings from '../../settings.mjs';
|
||||
import sleep from '../../lib/async/sleep.mjs';
|
||||
import Connection from '../../lib/transport/Connection.mjs';
|
||||
import { encrypt, decrypt } from '../../lib/crypto/secretbox.mjs';
|
||||
import { encrypt_bytes, decrypt_bytes } from '../../lib/crypto/secretbox.mjs';
|
||||
|
||||
export default async function() {
|
||||
const test_key = "H7xKSxvJFoZoNjCKAfxn4E3qUzY3Y/4bjY+qIzxg+78=";
|
||||
|
||||
const test_key_bytes = Buffer.from(test_key, "base64");
|
||||
const test_data = "hello, world";
|
||||
l.notice(`TEST_DATA`, test_data);
|
||||
const encrypted = encrypt(test_key, test_data);
|
||||
const encrypted = encrypt_bytes(test_key_bytes, Buffer.from(test_data, "utf-8"));
|
||||
l.notice(`ENCRYPTED`, encrypted);
|
||||
const decrypted = decrypt(test_key, encrypted);
|
||||
const decrypted = decrypt_bytes(test_key_bytes, encrypted);
|
||||
l.notice(`DECRYPTED`, decrypted);
|
||||
l.notice(`DECRYPTED_TEXT`, decrypted.toString("utf-8"));
|
||||
|
||||
const socket = await Connection.Create(test_key, "::1", settings.cli.port);
|
||||
|
||||
|
@ -28,7 +29,6 @@ export default async function() {
|
|||
|
||||
for(let i = 0; i < 100; i++) {
|
||||
await sleep(1000);
|
||||
l.notice(`>>> hello world ${i}`);
|
||||
socket.send(`test-client`, `hello world ${i}\n`);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,12 +13,15 @@ export default async function() {
|
|||
const server = net.createServer(async (client) => {
|
||||
l.notice("client connected");
|
||||
const connection = await Connection.Wrap(test_key, client);
|
||||
connection.write("hello\n");
|
||||
connection.send("test-server", "hello\n");
|
||||
connection.on("message", (event, msg) => {
|
||||
l.notice(`<<< ${event}: ${JSON.stringify(msg)}`);
|
||||
});
|
||||
setInterval(async () => {
|
||||
connection.send("test-server", (new Date()).toString());
|
||||
let id = setInterval(async () => {
|
||||
const rt = await connection.send("test-server", (new Date()).toString());
|
||||
l.info(`RETURNVAL`, rt);
|
||||
if(!rt)
|
||||
clearTimeout(id);
|
||||
}, 1000);
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in a new issue