2021-10-02 14:56:34 +00:00
|
|
|
"use strict";
|
|
|
|
|
2021-10-02 23:34:55 +00:00
|
|
|
import os from 'os';
|
2021-10-03 01:33:54 +00:00
|
|
|
import { EventEmitter, once } from 'events';
|
2021-10-02 23:34:55 +00:00
|
|
|
|
|
|
|
import log from 'log';
|
|
|
|
const l = log.get("framedtransport");
|
|
|
|
|
2021-10-03 01:33:54 +00:00
|
|
|
import PQueue from 'p-queue';
|
2021-10-02 14:56:34 +00:00
|
|
|
|
2021-10-03 01:33:54 +00:00
|
|
|
import settings from '../../settings.mjs';
|
2021-10-02 14:56:34 +00:00
|
|
|
import { write_safe, end_safe } from '../io/StreamHelpers.mjs';
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Manages a TCP socket using a simple binary frame system.
|
|
|
|
* Frames look like this:
|
|
|
|
*
|
|
|
|
* <LENGTH: Uint32 big-endian> <DATA: Uint8Array>
|
|
|
|
*/
|
2021-10-02 23:34:55 +00:00
|
|
|
class FramedTransport extends EventEmitter {
|
2021-10-02 14:56:34 +00:00
|
|
|
constructor(socket) {
|
2021-10-02 23:34:55 +00:00
|
|
|
super();
|
|
|
|
|
2021-10-02 14:56:34 +00:00
|
|
|
this.socket = socket;
|
2021-10-02 23:34:55 +00:00
|
|
|
this.socket.on("data", this.handle_chunk.bind(this));
|
2021-10-02 14:56:34 +00:00
|
|
|
this.buffer = null;
|
|
|
|
|
|
|
|
/** The length of a uint in bytes @type {number} */
|
|
|
|
this.uint_length = 4;
|
|
|
|
|
2021-10-03 01:33:54 +00:00
|
|
|
this.write_queue = new PQueue({
|
|
|
|
concurrency: 1,
|
|
|
|
timeout: 30 * 1000,
|
|
|
|
throwOnTimeout: true
|
|
|
|
});
|
2021-10-02 14:56:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Handles a single chunk of incoming data.
|
|
|
|
* May or may not emit a frame event.
|
|
|
|
* @param {Buffer|TypedArray} chunk The incoming chunk of data.
|
|
|
|
* @return {void}
|
|
|
|
*/
|
|
|
|
handle_chunk(chunk) {
|
2021-10-03 11:14:57 +00:00
|
|
|
// l.debug(`CHUNK length`, chunk.length, `buffer length`, this.buffer === null ? 0 : this.buffer.length);
|
2021-10-02 23:34:55 +00:00
|
|
|
if(this.buffer instanceof Buffer) {
|
|
|
|
this.buffer = Buffer.concat([ this.buffer, chunk ]);
|
|
|
|
|
|
|
|
}
|
|
|
|
else
|
|
|
|
this.buffer = chunk;
|
|
|
|
|
2021-10-02 14:56:34 +00:00
|
|
|
let next_frame_length = this.buffer2uint(this.buffer, 0);
|
|
|
|
|
|
|
|
// We have enough data! Emit a frame and then start again.
|
|
|
|
if(this.buffer.length - this.uint_length >= next_frame_length) {
|
2021-10-03 01:33:54 +00:00
|
|
|
const frame = this.buffer.slice(this.uint_length, this.uint_length + next_frame_length);
|
|
|
|
if(frame.length !== next_frame_length)
|
|
|
|
throw new Error(`Error: Actual frame length of ${frame.length} does not match expected value ${next_frame_length}`);
|
|
|
|
// l.debug(`FRAME length`, frame.length, `frame`, frame);
|
|
|
|
this.emit("frame", frame);
|
2021-10-02 14:56:34 +00:00
|
|
|
if(this.buffer.length - (this.uint_length + next_frame_length) > 0)
|
2021-10-02 23:34:55 +00:00
|
|
|
this.buffer = this.buffer.slice(this.uint_length + next_frame_length);
|
2021-10-02 14:56:34 +00:00
|
|
|
else
|
|
|
|
this.buffer = null;
|
2021-10-03 01:33:54 +00:00
|
|
|
// l.info(`FRAME buffer length remaining`, this.buffer === null ? 0 : this.buffer.length);
|
2021-10-02 14:56:34 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Converts the number at a given position in a buffer from Uint32 with
|
|
|
|
* big-endian encoding to a normal number.
|
|
|
|
* @param {Buffer|TypedArray} buffer The source buffer.
|
|
|
|
* @param {number} pos32 The position in the buffer to read from.
|
|
|
|
* @return {number} The parsed number from the buffer.
|
|
|
|
*/
|
|
|
|
buffer2uint(buffer, pos32) {
|
2021-10-02 23:34:55 +00:00
|
|
|
// DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer.
|
|
|
|
let u8 = new Uint8Array(buffer).slice(pos32, this.uint_length);
|
|
|
|
// Convert from network byte order if necessary
|
|
|
|
if(os.endianness() == "LE") u8.reverse();
|
|
|
|
return new Uint32Array(u8.buffer)[0];
|
2021-10-02 14:56:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Converts a positive integer into a Buffer using big-endian encoding.
|
|
|
|
* @param {number} uint The positive integer to convert.
|
|
|
|
* @return {Buffer} A new buffer representing the given number.
|
|
|
|
*/
|
|
|
|
uint2buffer(uint) {
|
2021-10-02 23:34:55 +00:00
|
|
|
// DataView doesn't work as expected here, because Node.js optimises small buffer to be views of 1 larger buffer.
|
2021-10-02 14:56:34 +00:00
|
|
|
let array = new ArrayBuffer(this.uint_length);
|
2021-10-02 23:34:55 +00:00
|
|
|
const u8 = new Uint8Array(array);
|
|
|
|
const u32 = new Uint32Array(array);
|
|
|
|
u32[0] = uint;
|
|
|
|
// Host to network byte order, if appropriate
|
|
|
|
if(os.endianness() == "LE") u8.reverse();
|
2021-10-02 14:56:34 +00:00
|
|
|
return Buffer.from(array);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Writes a given Buffer or TypedArray to the underlying socket inn a new frame.
|
|
|
|
* @param {Buffer|TypedArray} frame The data to send in a new frame.
|
|
|
|
* @return {Promise} A promise that resolves when writing is complete.
|
|
|
|
*/
|
|
|
|
async write(frame) {
|
2021-10-03 01:33:54 +00:00
|
|
|
// l.info(`SEND length`, frame.length, `frame`, frame);
|
|
|
|
try {
|
|
|
|
await this.write_queue.add(async () => {
|
|
|
|
const bytes = Buffer.concat([ this.uint2buffer(frame.length), frame ]);
|
|
|
|
try { await write_safe(this.socket, bytes); }
|
|
|
|
catch(error) {
|
|
|
|
l.warn(`Error while writing, killing connection`, settings.cli.verbose ? error : error.message);
|
|
|
|
await this.destroy();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
catch(error) {
|
|
|
|
l.warn(`Error: write timeout detected, killing connection`);
|
|
|
|
await this.destroy();
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
2021-10-02 14:56:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Gracefulls closes this socket.
|
|
|
|
* @return {Promise} A Promise that resolves when the socket is properly closed.
|
|
|
|
*/
|
|
|
|
async destroy() {
|
2021-10-03 01:33:54 +00:00
|
|
|
this.write_queue.clear();
|
2021-10-02 14:56:34 +00:00
|
|
|
// Calling socket.end() is important as it closes the stream properly
|
|
|
|
await end_safe(this.socket);
|
|
|
|
await this.socket.destroy();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export default FramedTransport;
|