diff --git a/src/lib/transport/Connection.mjs b/src/lib/transport/Connection.mjs index c9ecf2e..3a73c40 100644 --- a/src/lib/transport/Connection.mjs +++ b/src/lib/transport/Connection.mjs @@ -24,7 +24,7 @@ class Connection extends EventEmitter { * @return {bool} */ get connected() { - return this.socket === null ? true : this.socket.destroyed; + return this.framer == null ? false : this.framer.connected; } constructor(secret_join, socket = null) { @@ -72,6 +72,9 @@ class Connection extends EventEmitter { this.framer.on("frame", this.handle_frame.bind(this)); await this.rekey(); + + // We can await .init() or .connect() - this is just another optiom + this.emit(`connect`); } async rekey() { diff --git a/src/lib/transport/FramedTransport.mjs b/src/lib/transport/FramedTransport.mjs index 1d18b3b..5eb2a4f 100644 --- a/src/lib/transport/FramedTransport.mjs +++ b/src/lib/transport/FramedTransport.mjs @@ -18,6 +18,14 @@ import { write_safe, end_safe } from '../io/StreamHelpers.mjs'; * */ class FramedTransport extends EventEmitter { + /** + * Whether this socket is actually connected or not. + * @return {bool} + */ + get connected() { + return this.socket === null ? false : !this.socket.destroyed; + } + constructor(socket) { super(); @@ -28,6 +36,7 @@ class FramedTransport extends EventEmitter { /** The length of a uint in bytes @type {number} */ this.uint_length = 4; + this.write_safe_timeout = 28 * 1000; this.write_queue = new PQueue({ concurrency: 1, timeout: 30 * 1000, @@ -104,11 +113,19 @@ class FramedTransport extends EventEmitter { * @return {Promise} A promise that resolves when writing is complete. */ async write(frame) { + if(this.write_queue == null || !this.connected) { + l.warn(`Error: Can't write to an unconnected socket.`); + return false; + } + // l.info(`SEND length`, frame.length, `frame`, frame); try { await this.write_queue.add(async () => { const bytes = Buffer.concat([ this.uint2buffer(frame.length), frame ]); - try { await write_safe(this.socket, bytes); } + try { await write_safe( + this.socket, bytes, + this.write_safe_timeout + ); } catch(error) { l.warn(`Error while writing, killing connection`, settings.cli.verbose ? error : error.message); await this.destroy(); @@ -128,10 +145,12 @@ class FramedTransport extends EventEmitter { * @return {Promise} A Promise that resolves when the socket is properly closed. */ async destroy() { - this.write_queue.clear(); + this.write_queue.pause(); // Calling socket.end() is important as it closes the stream properly await end_safe(this.socket); await this.socket.destroy(); + this.write_queue.clear(); + this.write_queue = null; } } diff --git a/src/subcommands/test-client/test-client.mjs b/src/subcommands/test-client/test-client.mjs index ca77570..44972d6 100644 --- a/src/subcommands/test-client/test-client.mjs +++ b/src/subcommands/test-client/test-client.mjs @@ -30,6 +30,7 @@ export default async function() { for(let i = 0; i < 100; i++) { await sleep(1000); - socket.send(`test-client`, `hello world ${i}\n`); + if(!(await socket.send(`test-client`, `hello world ${i}\n`))) + break; } } diff --git a/src/subcommands/test-server/test-server.mjs b/src/subcommands/test-server/test-server.mjs index 4804ad5..5c439a5 100644 --- a/src/subcommands/test-server/test-server.mjs +++ b/src/subcommands/test-server/test-server.mjs @@ -20,7 +20,6 @@ export default async function() { }); let id = setInterval(async () => { const rt = await connection.send("test-server", (new Date()).toString()); - l.info(`RETURNVAL`, rt); if(!rt) clearTimeout(id); }, 1000);