systemquery/src/lib/transport/FramedTransport.mjs

126 lines
4 KiB
JavaScript

"use strict";
import os from 'os';
import log from 'log';
const l = log.get("framedtransport");
import { EventEmitter, once } from 'events';
import { write_safe, end_safe } from '../io/StreamHelpers.mjs';
/**
* Manages a TCP socket using a simple binary frame system.
* Frames look like this:
*
* <LENGTH: Uint32 big-endian> <DATA: Uint8Array>
*/
class FramedTransport extends EventEmitter {
constructor(socket) {
super();
this.socket = socket;
this.socket.on("data", this.handle_chunk.bind(this));
this.buffer = null;
/** The length of a uint in bytes @type {number} */
this.uint_length = 4;
this.writing = false;
}
/**
* Handles a single chunk of incoming data.
* May or may not emit a frame event.
* @param {Buffer|TypedArray} chunk The incoming chunk of data.
* @return {void}
*/
handle_chunk(chunk) {
l.debug(`CHUNK length`, chunk.length, `buffer length`, this.buffer === null ? 0 : this.buffer.length);
if(this.buffer instanceof Buffer) {
this.buffer = Buffer.concat([ this.buffer, chunk ]);
}
else
this.buffer = chunk;
l.debug(`CHUNK buffer`, this.buffer);
let next_frame_length = this.buffer2uint(this.buffer, 0);
l.debug(`CHUNK total length`, this.buffer.length, `next_frame_length`, next_frame_length);
// We have enough data! Emit a frame and then start again.
if(this.buffer.length - this.uint_length >= next_frame_length) {
this.emit("frame", this.buffer.slice(this.uint_length, next_frame_length));
if(this.buffer.length - (this.uint_length + next_frame_length) > 0)
this.buffer = this.buffer.slice(this.uint_length + next_frame_length);
else
this.buffer = null;
l.info(`FRAME length`, next_frame_length, `length_remaining`, this.buffer === null ? 0 : this.buffer.length);
}
}
/**
* Converts the number at a given position in a buffer from Uint32 with
* big-endian encoding to a normal number.
* @param {Buffer|TypedArray} buffer The source buffer.
* @param {number} pos32 The position in the buffer to read from.
* @return {number} The parsed number from the buffer.
*/
buffer2uint(buffer, pos32) {
l.debug(`BUFFER2UINT buffer`, buffer, `pos32`, pos32);
// DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer.
let u8 = new Uint8Array(buffer).slice(pos32, this.uint_length);
// Convert from network byte order if necessary
if(os.endianness() == "LE") u8.reverse();
l.debug(`BUFFER2UINT u8`, u8, `u32`, new Uint32Array(u8.buffer));
return new Uint32Array(u8.buffer)[0];
}
/**
* Converts a positive integer into a Buffer using big-endian encoding.
* @param {number} uint The positive integer to convert.
* @return {Buffer} A new buffer representing the given number.
*/
uint2buffer(uint) {
// DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer.
let array = new ArrayBuffer(this.uint_length);
const u8 = new Uint8Array(array);
const u32 = new Uint32Array(array);
u32[0] = uint;
// Host to network byte order, if appropriate
if(os.endianness() == "LE") u8.reverse();
return Buffer.from(array);
}
/**
* Writes a given Buffer or TypedArray to the underlying socket inn a new frame.
* @param {Buffer|TypedArray} frame The data to send in a new frame.
* @return {Promise} A promise that resolves when writing is complete.
*/
async write(frame) {
if(this.writing) await once(this, "write-end");
this.emit("write-start");
this.writing = true;
l.info(`SEND length`, frame.length);
await write_safe(this.socket, this.uint2buffer(frame.length));
await write_safe(this.socket, frame);
this.writing = false;
this.emit("write-end")
}
/**
* Gracefulls closes this socket.
* @return {Promise} A Promise that resolves when the socket is properly closed.
*/
async destroy() {
// Calling socket.end() is important as it closes the stream properly
await end_safe(this.socket);
await this.socket.destroy();
}
}
export default FramedTransport;