Improve unexpected socket disconnect handling

This commit is contained in:
Starbeamrainbowlabs 2021-10-03 12:24:32 +01:00
parent b48f50e819
commit 7d3161433a
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
4 changed files with 27 additions and 5 deletions

View file

@ -24,7 +24,7 @@ class Connection extends EventEmitter {
* @return {bool} * @return {bool}
*/ */
get connected() { get connected() {
return this.socket === null ? true : this.socket.destroyed; return this.framer == null ? false : this.framer.connected;
} }
constructor(secret_join, socket = null) { constructor(secret_join, socket = null) {
@ -72,6 +72,9 @@ class Connection extends EventEmitter {
this.framer.on("frame", this.handle_frame.bind(this)); this.framer.on("frame", this.handle_frame.bind(this));
await this.rekey(); await this.rekey();
// We can await .init() or .connect() - this is just another optiom
this.emit(`connect`);
} }
async rekey() { async rekey() {

View file

@ -18,6 +18,14 @@ import { write_safe, end_safe } from '../io/StreamHelpers.mjs';
* <LENGTH: Uint32 big-endian> <DATA: Uint8Array> * <LENGTH: Uint32 big-endian> <DATA: Uint8Array>
*/ */
class FramedTransport extends EventEmitter { class FramedTransport extends EventEmitter {
/**
* Whether this socket is actually connected or not.
* @return {bool}
*/
get connected() {
return this.socket === null ? false : !this.socket.destroyed;
}
constructor(socket) { constructor(socket) {
super(); super();
@ -28,6 +36,7 @@ class FramedTransport extends EventEmitter {
/** The length of a uint in bytes @type {number} */ /** The length of a uint in bytes @type {number} */
this.uint_length = 4; this.uint_length = 4;
this.write_safe_timeout = 28 * 1000;
this.write_queue = new PQueue({ this.write_queue = new PQueue({
concurrency: 1, concurrency: 1,
timeout: 30 * 1000, timeout: 30 * 1000,
@ -104,11 +113,19 @@ class FramedTransport extends EventEmitter {
* @return {Promise} A promise that resolves when writing is complete. * @return {Promise} A promise that resolves when writing is complete.
*/ */
async write(frame) { async write(frame) {
if(this.write_queue == null || !this.connected) {
l.warn(`Error: Can't write to an unconnected socket.`);
return false;
}
// l.info(`SEND length`, frame.length, `frame`, frame); // l.info(`SEND length`, frame.length, `frame`, frame);
try { try {
await this.write_queue.add(async () => { await this.write_queue.add(async () => {
const bytes = Buffer.concat([ this.uint2buffer(frame.length), frame ]); const bytes = Buffer.concat([ this.uint2buffer(frame.length), frame ]);
try { await write_safe(this.socket, bytes); } try { await write_safe(
this.socket, bytes,
this.write_safe_timeout
); }
catch(error) { catch(error) {
l.warn(`Error while writing, killing connection`, settings.cli.verbose ? error : error.message); l.warn(`Error while writing, killing connection`, settings.cli.verbose ? error : error.message);
await this.destroy(); await this.destroy();
@ -128,10 +145,12 @@ class FramedTransport extends EventEmitter {
* @return {Promise} A Promise that resolves when the socket is properly closed. * @return {Promise} A Promise that resolves when the socket is properly closed.
*/ */
async destroy() { async destroy() {
this.write_queue.clear(); this.write_queue.pause();
// Calling socket.end() is important as it closes the stream properly // Calling socket.end() is important as it closes the stream properly
await end_safe(this.socket); await end_safe(this.socket);
await this.socket.destroy(); await this.socket.destroy();
this.write_queue.clear();
this.write_queue = null;
} }
} }

View file

@ -30,6 +30,7 @@ export default async function() {
for(let i = 0; i < 100; i++) { for(let i = 0; i < 100; i++) {
await sleep(1000); await sleep(1000);
socket.send(`test-client`, `hello world ${i}\n`); if(!(await socket.send(`test-client`, `hello world ${i}\n`)))
break;
} }
} }

View file

@ -20,7 +20,6 @@ export default async function() {
}); });
let id = setInterval(async () => { let id = setInterval(async () => {
const rt = await connection.send("test-server", (new Date()).toString()); const rt = await connection.send("test-server", (new Date()).toString());
l.info(`RETURNVAL`, rt);
if(!rt) if(!rt)
clearTimeout(id); clearTimeout(id);
}, 1000); }, 1000);