Implement FramedTransport to increase efficiency of transmitted encrypted messages
This commit is contained in:
parent
b3da7fea17
commit
c43aeb6c3b
2 changed files with 106 additions and 12 deletions
|
@ -4,10 +4,10 @@ import net from 'net';
|
|||
import { EventEmitter, once } from 'events';
|
||||
|
||||
import l from 'log';
|
||||
import nexline from 'nexline';
|
||||
|
||||
import settings from '../../settings.mjs';
|
||||
import rekey from './rekey.mjs';
|
||||
import FramedTransport from './FramedTransport.mjs';
|
||||
import { write_safe } from '../io/StreamHelpers.mjs';
|
||||
import { encrypt_bytes, decrypt_bytes } from '../crypto/secretbox.mjs';
|
||||
|
||||
|
@ -38,9 +38,8 @@ class Connection extends EventEmitter {
|
|||
|
||||
this.session_key = await rekey(this.socket, this.session_key);
|
||||
|
||||
this.reader = nexline({
|
||||
input: this.socket
|
||||
});
|
||||
this.framer = new FramedTransport(this.socket);
|
||||
this.framer.on("frame", this.handle_frame);
|
||||
|
||||
this.read_task = read_loop();
|
||||
}
|
||||
|
@ -50,11 +49,11 @@ class Connection extends EventEmitter {
|
|||
this.emit("destroy");
|
||||
}
|
||||
|
||||
async read_loop() {
|
||||
async handle_frame(bytes) {
|
||||
try {
|
||||
for await (let line of this.reader) {
|
||||
handle_message(line);
|
||||
}
|
||||
let decrypted = decrypt_bytes(this.session_key, bytes);
|
||||
if(decrypted === null) return;
|
||||
await handle_message(decrypted.toString("utf-8"));
|
||||
}
|
||||
catch(error) {
|
||||
l.warn(`Warning: Killing connection to ${this.address}:${this.port} after error: ${settings.cli.verbose ? error : error.message}`);
|
||||
|
@ -64,9 +63,9 @@ class Connection extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
handle_message(text) {
|
||||
let message = JSON.parse(text);
|
||||
|
||||
async handle_message(msg_text) {
|
||||
const msg = JSON.parse(msg_text);
|
||||
this.emit("message", msg);
|
||||
}
|
||||
|
||||
async send(message) {
|
||||
|
@ -76,7 +75,7 @@ class Connection extends EventEmitter {
|
|||
let payload = JSON.stringify(message);
|
||||
payload = encrypt_bytes(this.session_key, payload);
|
||||
|
||||
await write_safe(this.socket, payload);
|
||||
await this., payload);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
95
src/lib/transport/FramedTransport.mjs
Normal file
95
src/lib/transport/FramedTransport.mjs
Normal file
|
@ -0,0 +1,95 @@
|
|||
"use strict";
|
||||
|
||||
import { EventEmitter, once } from 'events';
|
||||
|
||||
import { write_safe, end_safe } from '../io/StreamHelpers.mjs';
|
||||
|
||||
/**
|
||||
* Manages a TCP socket using a simple binary frame system.
|
||||
* Frames look like this:
|
||||
*
|
||||
* <LENGTH: Uint32 big-endian> <DATA: Uint8Array>
|
||||
*/
|
||||
class FramedTransport {
|
||||
constructor(socket) {
|
||||
this.socket = socket;
|
||||
this.socket.on("data", this.handle_chunk);
|
||||
this.buffer = null;
|
||||
|
||||
/** The length of a uint in bytes @type {number} */
|
||||
this.uint_length = 4;
|
||||
|
||||
this.writing = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles a single chunk of incoming data.
|
||||
* May or may not emit a frame event.
|
||||
* @param {Buffer|TypedArray} chunk The incoming chunk of data.
|
||||
* @return {void}
|
||||
*/
|
||||
handle_chunk(chunk) {
|
||||
if(this.buffer instanceof Buffer)
|
||||
this.buffer = Buffer.concat(this.buffer, chunk);
|
||||
|
||||
let next_frame_length = this.buffer2uint(this.buffer, 0);
|
||||
|
||||
// We have enough data! Emit a frame and then start again.
|
||||
if(this.buffer.length - this.uint_length >= next_frame_length) {
|
||||
this.emit("frame", Buffer.slice(this.uint_length, next_frame_length));
|
||||
if(this.buffer.length - (this.uint_length + next_frame_length) > 0)
|
||||
this.buffer = Buffer.slice(this,uint_length + next_frame_length);
|
||||
else
|
||||
this.buffer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the number at a given position in a buffer from Uint32 with
|
||||
* big-endian encoding to a normal number.
|
||||
* @param {Buffer|TypedArray} buffer The source buffer.
|
||||
* @param {number} pos32 The position in the buffer to read from.
|
||||
* @return {number} The parsed number from the buffer.
|
||||
*/
|
||||
buffer2uint(buffer, pos32) {
|
||||
return new DataView(buffer).getUint32(pos32, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a positive integer into a Buffer using big-endian encoding.
|
||||
* @param {number} uint The positive integer to convert.
|
||||
* @return {Buffer} A new buffer representing the given number.
|
||||
*/
|
||||
uint2buffer(uint) {
|
||||
let array = new ArrayBuffer(this.uint_length);
|
||||
new DataView(array).setUint32(0, uint, false);
|
||||
return Buffer.from(array);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a given Buffer or TypedArray to the underlying socket inn a new frame.
|
||||
* @param {Buffer|TypedArray} frame The data to send in a new frame.
|
||||
* @return {Promise} A promise that resolves when writing is complete.
|
||||
*/
|
||||
async write(frame) {
|
||||
if(this.writing) await once(this, "write-end");
|
||||
this.emit("write-start");
|
||||
this.writing = true;
|
||||
await write_safe(this.socket, this.uint2buffer(frame.length));
|
||||
await write_safe(this.socket, frame);
|
||||
this.writing = false;
|
||||
this.emit("write-end")
|
||||
}
|
||||
|
||||
/**
|
||||
* Gracefulls closes this socket.
|
||||
* @return {Promise} A Promise that resolves when the socket is properly closed.
|
||||
*/
|
||||
async destroy() {
|
||||
// Calling socket.end() is important as it closes the stream properly
|
||||
await end_safe(this.socket);
|
||||
await this.socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
export default FramedTransport;
|
Loading…
Reference in a new issue