diff --git a/src/lib/transport/Connection.mjs b/src/lib/transport/Connection.mjs index a028ddb..a498e09 100644 --- a/src/lib/transport/Connection.mjs +++ b/src/lib/transport/Connection.mjs @@ -4,10 +4,10 @@ import net from 'net'; import { EventEmitter, once } from 'events'; import l from 'log'; -import nexline from 'nexline'; import settings from '../../settings.mjs'; import rekey from './rekey.mjs'; +import FramedTransport from './FramedTransport.mjs'; import { write_safe } from '../io/StreamHelpers.mjs'; import { encrypt_bytes, decrypt_bytes } from '../crypto/secretbox.mjs'; @@ -38,9 +38,8 @@ class Connection extends EventEmitter { this.session_key = await rekey(this.socket, this.session_key); - this.reader = nexline({ - input: this.socket - }); + this.framer = new FramedTransport(this.socket); + this.framer.on("frame", this.handle_frame); this.read_task = read_loop(); } @@ -50,11 +49,11 @@ class Connection extends EventEmitter { this.emit("destroy"); } - async read_loop() { + async handle_frame(bytes) { try { - for await (let line of this.reader) { - handle_message(line); - } + let decrypted = decrypt_bytes(this.session_key, bytes); + if(decrypted === null) return; + await handle_message(decrypted.toString("utf-8")); } catch(error) { l.warn(`Warning: Killing connection to ${this.address}:${this.port} after error: ${settings.cli.verbose ? error : error.message}`); @@ -64,9 +63,9 @@ class Connection extends EventEmitter { } } - handle_message(text) { - let message = JSON.parse(text); - + async handle_message(msg_text) { + const msg = JSON.parse(msg_text); + this.emit("message", msg); } async send(message) { @@ -76,7 +75,7 @@ class Connection extends EventEmitter { let payload = JSON.stringify(message); payload = encrypt_bytes(this.session_key, payload); - await write_safe(this.socket, payload); + await this., payload); } } diff --git a/src/lib/transport/FramedTransport.mjs b/src/lib/transport/FramedTransport.mjs new file mode 100644 index 0000000..ceb7dcf --- /dev/null +++ b/src/lib/transport/FramedTransport.mjs @@ -0,0 +1,95 @@ +"use strict"; + +import { EventEmitter, once } from 'events'; + +import { write_safe, end_safe } from '../io/StreamHelpers.mjs'; + +/** + * Manages a TCP socket using a simple binary frame system. + * Frames look like this: + * + * + */ +class FramedTransport { + constructor(socket) { + this.socket = socket; + this.socket.on("data", this.handle_chunk); + this.buffer = null; + + /** The length of a uint in bytes @type {number} */ + this.uint_length = 4; + + this.writing = false; + } + + /** + * Handles a single chunk of incoming data. + * May or may not emit a frame event. + * @param {Buffer|TypedArray} chunk The incoming chunk of data. + * @return {void} + */ + handle_chunk(chunk) { + if(this.buffer instanceof Buffer) + this.buffer = Buffer.concat(this.buffer, chunk); + + let next_frame_length = this.buffer2uint(this.buffer, 0); + + // We have enough data! Emit a frame and then start again. + if(this.buffer.length - this.uint_length >= next_frame_length) { + this.emit("frame", Buffer.slice(this.uint_length, next_frame_length)); + if(this.buffer.length - (this.uint_length + next_frame_length) > 0) + this.buffer = Buffer.slice(this,uint_length + next_frame_length); + else + this.buffer = null; + } + } + + /** + * Converts the number at a given position in a buffer from Uint32 with + * big-endian encoding to a normal number. + * @param {Buffer|TypedArray} buffer The source buffer. + * @param {number} pos32 The position in the buffer to read from. + * @return {number} The parsed number from the buffer. + */ + buffer2uint(buffer, pos32) { + return new DataView(buffer).getUint32(pos32, false); + } + + /** + * Converts a positive integer into a Buffer using big-endian encoding. + * @param {number} uint The positive integer to convert. + * @return {Buffer} A new buffer representing the given number. + */ + uint2buffer(uint) { + let array = new ArrayBuffer(this.uint_length); + new DataView(array).setUint32(0, uint, false); + return Buffer.from(array); + } + + /** + * Writes a given Buffer or TypedArray to the underlying socket inn a new frame. + * @param {Buffer|TypedArray} frame The data to send in a new frame. + * @return {Promise} A promise that resolves when writing is complete. + */ + async write(frame) { + if(this.writing) await once(this, "write-end"); + this.emit("write-start"); + this.writing = true; + await write_safe(this.socket, this.uint2buffer(frame.length)); + await write_safe(this.socket, frame); + this.writing = false; + this.emit("write-end") + } + + /** + * Gracefulls closes this socket. + * @return {Promise} A Promise that resolves when the socket is properly closed. + */ + async destroy() { + // Calling socket.end() is important as it closes the stream properly + await end_safe(this.socket); + await this.socket.destroy(); + } +} + +export default FramedTransport;