"use strict"; import os from 'os'; import { EventEmitter, once } from 'events'; import log from '../io/NamespacedLog.mjs'; const l = log("framedtransport"); import PQueue from 'p-queue'; import settings from '../../settings.mjs'; import { write_safe, end_safe } from '../io/StreamHelpers.mjs'; /** * Manages a TCP socket using a simple binary frame system. * Frames look like this: * * */ class FramedTransport extends EventEmitter { /** * Whether this socket is actually connected or not. * @return {bool} */ get connected() { return this.socket === null ? false : !this.socket.destroyed; } constructor(socket) { super(); this.socket = socket; this.socket.on("data", this.handle_chunk.bind(this)); this.buffer = null; /** The length of a uint in bytes @type {number} */ this.uint_length = 4; this.write_safe_timeout = 28 * 1000; this.write_queue = new PQueue({ concurrency: 1, timeout: 30 * 1000, throwOnTimeout: true }); } /** * Handles a single chunk of incoming data. * May or may not emit a frame event. * @param {Buffer|TypedArray} chunk The incoming chunk of data. * @return {void} */ handle_chunk(chunk) { // l.debug(`CHUNK length`, chunk.length, `buffer length`, this.buffer === null ? 0 : this.buffer.length); if(this.buffer instanceof Buffer) { this.buffer = Buffer.concat([ this.buffer, chunk ]); } else this.buffer = chunk; let next_frame_length = this.buffer2uint(this.buffer, 0); // We have enough data! Emit a frame and then start again. if(this.buffer.length - this.uint_length >= next_frame_length) { const frame = this.buffer.slice(this.uint_length, this.uint_length + next_frame_length); if(frame.length !== next_frame_length) throw new Error(`Error: Actual frame length of ${frame.length} does not match expected value ${next_frame_length}`); // l.debug(`FRAME length`, frame.length, `frame`, frame); this.emit("frame", frame); if(this.buffer.length - (this.uint_length + next_frame_length) > 0) this.buffer = this.buffer.slice(this.uint_length + next_frame_length); else this.buffer = null; // l.info(`FRAME buffer length remaining`, this.buffer === null ? 0 : this.buffer.length); } } /** * Converts the number at a given position in a buffer from Uint32 with * big-endian encoding to a normal number. * @param {Buffer|TypedArray} buffer The source buffer. * @param {number} pos32 The position in the buffer to read from. * @return {number} The parsed number from the buffer. */ buffer2uint(buffer, pos32) { // DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer. let u8 = new Uint8Array(buffer).slice(pos32, this.uint_length); // Convert from network byte order if necessary if(os.endianness() == "LE") u8.reverse(); return new Uint32Array(u8.buffer)[0]; } /** * Converts a positive integer into a Buffer using big-endian encoding. * @param {number} uint The positive integer to convert. * @return {Buffer} A new buffer representing the given number. */ uint2buffer(uint) { // DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer. let array = new ArrayBuffer(this.uint_length); const u8 = new Uint8Array(array); const u32 = new Uint32Array(array); u32[0] = uint; // Host to network byte order, if appropriate if(os.endianness() == "LE") u8.reverse(); return Buffer.from(array); } /** * Writes a given Buffer or TypedArray to the underlying socket inn a new frame. * @param {Buffer|TypedArray} frame The data to send in a new frame. * @return {Promise} A promise that resolves when writing is complete. */ async write(frame) { if(this.write_queue == null || !this.connected) { l.warn(`Error: Can't write to an unconnected socket.`); return false; } // l.info(`SEND length`, frame.length, `frame`, frame); try { await this.write_queue.add(async () => { const bytes = Buffer.concat([ this.uint2buffer(frame.length), frame ]); try { await write_safe( this.socket, bytes, this.write_safe_timeout ); } catch(error) { l.warn(`Error while writing, killing connection`, settings.cli.verbose ? error : error.message); await this.destroy(); } }); } catch(error) { l.warn(`Error: write timeout detected, killing connection`); await this.destroy(); return false; } return true; } /** * Gracefulls closes this socket. * @return {Promise} A Promise that resolves when the socket is properly closed. */ async destroy() { this.write_queue.pause(); // Calling socket.end() is important as it closes the stream properly await end_safe(this.socket); await this.socket.destroy(); this.write_queue.clear(); this.write_queue = null; } } export default FramedTransport;