recordify: fix process exiting and imcomplete files issues

• Node.js not exiting at all
 • Node.js exiting on end_safe ing stream.Writable (?????)
 • Incomplete files - "unexpected end of file" errors and invalid JSON
This commit is contained in:
Starbeamrainbowlabs 2022-07-08 18:54:00 +01:00
parent cb922ae8c8
commit 3b2715c6cd
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
6 changed files with 99 additions and 15 deletions

View file

@ -0,0 +1,81 @@
"use strict";
import EventEmitter from 'events';
import child_process from 'child_process';
import log from './NamespacedLog.mjs'; const l = log("gzipchildprocess");
import { end_safe } from './StreamHelpers.mjs';
/**
* Spawns and manages a gzip child process.
* @deprecated Use spawn-stream instead
* @extends EventEmitter
*/
class GzipChildProcess extends EventEmitter {
get stdin() { return this.child_process.stdin; }
get stdout() { return this.child_process.stdout; }
get stderr() { return this.child_process.stderr; }
constructor(auto_start = true) {
super();
this.debug = false;
this.child_process = null;
this.has_ended = false;
if(auto_start)
this.start();
}
start() {
if(this.child_process != null)
throw new Error("Invalid Operation: Can't start the child process, since it's already been started.");
this.child_process = child_process.spawn(
"gzip", [], {
// Pipe stdin + stdout; send error to the parent process
stdio: [ "pipe", "pipe", "inherit" ]
}
);
this.child_process.on("close", () => {
if(this.debug) l.debug("Close event triggered");
this.has_ended = true;
this.emit("close");
});
// FUTURE: Perhaps just throwing the error would be a better choice?
this.child_process.on("error", (error) => {
this.emit("error", error);
});
}
/**
* Returns a Promise that resolves when the gzip process gracefully exits.
* If the gzip child process has already exited, then it resolves immediately.
* Gracefully closes the process by closing stdin.
* @return {Promise}
*/
async close() {
if(this.debug) l.debug("end_gracefully called");
if(this.has_ended) {
if(this.debug) l.debug("It's been ended already - nothing to do");
return;
}
if(!this.stdin.writableFinished) {
if(this.debug) l.debug("Closing stdin");
await end_safe(this.stdin);
if(this.debug) l.debug("stdin closed successfully");
}
if(this.has_ended) {
if(this.debug) l.debug("It's been ended already - nothing to do");
return;
}
if(this.debug) l.debug("Waiting for close event");
await EventEmitter.once(this, "close");
if(this.debug) l.debug("Close event fired, our work is done");
}
}
export default GzipChildProcess;

View file

@ -140,8 +140,6 @@ class RadarReader {
} }
async close() { async close() {
if(this.stream_in !== null) this.stream_in.close();
if(this.stream_extractor !== null) await end_safe(this.stream_extractor);
if(this.reader !== null) this.reader.close(); if(this.reader !== null) this.reader.close();
this.stream_in = null; this.stream_in = null;

View file

@ -3,10 +3,11 @@
import fs from 'fs'; import fs from 'fs';
import path from 'path'; import path from 'path';
import log from '../../lib/io/NamespacedLog.mjs'; const l = log("recordwrangler");
import RecordBuilder from '../record/RecordBuilder.mjs'; import RecordBuilder from '../record/RecordBuilder.mjs';
import RecordsWriter from '../record/RecordsWriter.mjs'; import RecordsWriter from '../record/RecordsWriter.mjs';
import pretty_ms from 'pretty-ms'; import pretty_ms from 'pretty-ms';
import terrain50_analyse_frequencies from 'terrain50/src/static/Terrain50AnalyseFrequencies.mjs';
import { end_safe } from './StreamHelpers.mjs'; import { end_safe } from './StreamHelpers.mjs';
class RecordWrangler { class RecordWrangler {
@ -43,7 +44,10 @@ class RecordWrangler {
const sample_radar = await reader_radar.next(); const sample_radar = await reader_radar.next();
const sample_water = await reader_water.next(); const sample_water = await reader_water.next();
if(sample_radar.done || sample_water.done) break; if(sample_radar.done || sample_water.done) {
l.log(`Done because ${sample_radar.done?"radar":"water"} reader is out of records`);
break;
}
const example_next = this.make_example( const example_next = this.make_example(
sample_radar.value, sample_radar.value,
@ -61,7 +65,7 @@ class RecordWrangler {
} }
await writer.close(); await writer.close();
console.log(`\nComplete! ${i_file}/${i} files/records_total written in ${pretty_ms(new Date() - time_start)}`); console.log(`\nComplete! ${i_file}/${i} files/records_total written in ${pretty_ms(new Date() - time_start)}\n`);
} }
make_example(sample_radar, sample_water) { make_example(sample_radar, sample_water) {

View file

@ -42,7 +42,7 @@ function write_safe(stream_out, data) {
*/ */
function end_safe(stream, chunk = undefined) { function end_safe(stream, chunk = undefined) {
return new Promise((resolve, _reject) => { return new Promise((resolve, _reject) => {
stream.once("finish", resolve); stream.once("finish", () => { console.log(`end_safe DEBUG finish`); resolve(); });
if(typeof chunk == "undefined") stream.end(); if(typeof chunk == "undefined") stream.end();
else stream.end(chunk); else stream.end(chunk);
}); });

View file

@ -2,17 +2,18 @@
import fs from 'fs'; import fs from 'fs';
import ChildProcess from 'duplex-child-process'; import log from '../../lib/io/NamespacedLog.mjs'; const l = log("recordswriter");
import GzipChildProcess from '../io/GzipChildProcess.mjs';
import { write_safe, end_safe } from '../io/StreamHelpers.mjs'; import { write_safe, end_safe } from '../io/StreamHelpers.mjs';
class RecordsWriter { class RecordsWriter {
#stream_out = null; #stream_out = null;
#gzip = ChildProcess.spawn("gzip"); #gzip = new GzipChildProcess();
constructor(filepath) { constructor(filepath) {
this.#stream_out = fs.createWriteStream(filepath); this.#stream_out = fs.createWriteStream(filepath);
this.#gzip.pipe(this.#stream_out); this.#gzip.stdout.pipe(this.#stream_out);
} }
/** /**
@ -22,7 +23,7 @@ class RecordsWriter {
*/ */
async write(sample) { async write(sample) {
const str = JSON.stringify(Object.fromEntries(sample)); const str = JSON.stringify(Object.fromEntries(sample));
await write_safe(this.#gzip, str+"\n"); await write_safe(this.#gzip.stdin, str+"\n");
} }
/** /**
@ -31,7 +32,7 @@ class RecordsWriter {
* @return {Promise} * @return {Promise}
*/ */
async write_raw(line) { async write_raw(line) {
await write_safe(this.#gzip, line+"\n"); await write_safe(this.#gzip.stdin, line+"\n");
} }
/** /**
@ -40,8 +41,8 @@ class RecordsWriter {
* @return {Promise} * @return {Promise}
*/ */
async close() { async close() {
await end_safe(this.#gzip); await this.#gzip.close();
await end_safe(this.#stream_out); // Closing this.#stream_out causes a silent crash O.o 2022-07-08 @sbrl Node.js 18.4.0
} }
} }

View file

@ -7,7 +7,7 @@ import RecordWrangler from '../../lib/io/RecordWrangler.mjs';
import RadarWrangler from '../../lib/RadarWrangler.mjs'; import RadarWrangler from '../../lib/RadarWrangler.mjs';
import Terrain50StreamReader from '../../lib/io/Terrain50StreamReader.mjs'; import Terrain50StreamReader from '../../lib/io/Terrain50StreamReader.mjs';
import log from './NamespacedLog.mjs'; const l = log("recordify"); import log from '../../lib/io/NamespacedLog.mjs'; const l = log("recordify");
export default async function() { export default async function() {
if(typeof settings.water !== "string") if(typeof settings.water !== "string")
@ -29,7 +29,7 @@ export default async function() {
await writer.write(reader_radar.iterate(settings.rainfall), reader_water.iterate(settings.water)); await writer.write(reader_radar.iterate(settings.rainfall), reader_water.iterate(settings.water));
l.log("Closing reader reader") l.log("Closing radar reader")
await reader_radar.close(); await reader_radar.close();
l.log("Closing water depth data reader") l.log("Closing water depth data reader")
await reader_water.close(); await reader_water.close();