From b9a018f9a90c4e8c422c205e161d70e82b953a4b Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Fri, 8 Jul 2022 16:51:17 +0100 Subject: [PATCH] properly close all teh streams --- rainfallwrangler/src/lib/RadarWrangler.mjs | 4 +++ rainfallwrangler/src/lib/io/RadarReader.mjs | 26 ++++++++++++++----- .../src/lib/io/Terrain50StreamReader.mjs | 19 +++++++++++--- .../src/subcommands/recordify/recordify.mjs | 7 +++++ 4 files changed, 47 insertions(+), 9 deletions(-) diff --git a/rainfallwrangler/src/lib/RadarWrangler.mjs b/rainfallwrangler/src/lib/RadarWrangler.mjs index 5a4c08c..565b48b 100644 --- a/rainfallwrangler/src/lib/RadarWrangler.mjs +++ b/rainfallwrangler/src/lib/RadarWrangler.mjs @@ -51,6 +51,10 @@ class RadarWrangler { return Promise.all(grouped_timesteps); } + + async close() { + await this.reader.close(); + } } export default RadarWrangler; diff --git a/rainfallwrangler/src/lib/io/RadarReader.mjs b/rainfallwrangler/src/lib/io/RadarReader.mjs index 4c63083..de95f12 100644 --- a/rainfallwrangler/src/lib/io/RadarReader.mjs +++ b/rainfallwrangler/src/lib/io/RadarReader.mjs @@ -9,6 +9,7 @@ import gunzip from 'gunzip-maybe'; import log from './NamespacedLog.mjs'; const l = log("reader:radar"); import interpolate from '../manip/array2d_interpolate.mjs'; import transpose from '../manip/array2d_transpose.mjs'; +import { end_safe } from './StreamHelpers.mjs'; /** * Reads data in order from a directory of .jsonl.gz files. @@ -23,6 +24,10 @@ class RadarReader { this.do_interpolate = do_interpolate; // this.writer_interp_stats = []; + + this.reader = null; + this.stream_in = null; + this.stream_extractor = null; } /** @@ -36,18 +41,17 @@ class RadarReader { if(!fs.existsSync(filename)) throw new Error(`RadarReader/Error: Can't read from '${filename}' as it doesn't exist.`); - let read = fs.createReadStream(filename), - extractor = gunzip(); - read.pipe(extractor); + this.stream_in = fs.createReadStream(filename), + this.stream_extractor = this.stream_in.pipe(gunzip()); - let reader = nexline({ - input: new Readable().wrap(extractor) // Wrap the stream so that nexline likes it + this.reader = nexline({ + input: new Readable().wrap(this.stream_extractor) // Wrap the stream so that nexline likes it }); let i = -1; let prev = null while(true) { - let next_line = await reader.next(); + let next_line = await this.reader.next(); if(next_line == null) break; @@ -134,6 +138,16 @@ class RadarReader { ); } while(b.timestamp - next_timestamp >= this.time_step_interval * 1000 * this.stride); } + + async close() { + if(this.stream_in !== null) this.stream_in.close(); + if(this.stream_extractor !== null) await end_safe(this.stream_extractor); + if(this.reader !== null) this.reader.close(); + + this.stream_in = null; + this.stream_extractor = null; + this.reader = null; + } } export default RadarReader; diff --git a/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs index 6a6cad9..aa62317 100644 --- a/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs +++ b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs @@ -5,24 +5,29 @@ import { Readable } from 'stream'; import fs from 'fs'; import path from 'path'; -import Terrain50 from 'terrain50'; import gunzip from 'gunzip-maybe'; +import Terrain50 from 'terrain50'; import log from './NamespacedLog.mjs'; const l = log("reader:terrain50stream"); import array2d_classify_convert_bin from '../manip/array2d_classify_convert_bin.mjs'; +import { end_safe } class Terrain50StreamReader { constructor(threshold = 0.1, tolerant = false) { this.threshold = threshold; this.tolerant = tolerant; + + this.stream_in = null; + this.stream_extractor = null; } async *iterate(filepath) { - const reader = fs.createReadStream(filepath); + this.stream_in = fs.createReadStream(filepath); + this.stream_extractor = gunzip(); const stream = Terrain50.ParseStream( - new Readable().wrap(reader.pipe(gunzip())), + new Readable().wrap(this.stream_in.pipe(this.stream_extractor)), this.tolerant ? /\s+/ : " " ); @@ -46,6 +51,14 @@ class Terrain50StreamReader { yield values_bin; } } + + async close() { + if(this.stream_in !== null) this.stream_in.close(); + if(this.stream_extractor !== null) await end_safe(this.stream_extractor); + + this.stream_in = null + this.stream_extractor = null; + } } export default Terrain50StreamReader; diff --git a/rainfallwrangler/src/subcommands/recordify/recordify.mjs b/rainfallwrangler/src/subcommands/recordify/recordify.mjs index 66c728d..a46b5eb 100644 --- a/rainfallwrangler/src/subcommands/recordify/recordify.mjs +++ b/rainfallwrangler/src/subcommands/recordify/recordify.mjs @@ -7,6 +7,7 @@ import RecordWrangler from '../../lib/io/RecordWrangler.mjs'; import RadarWrangler from '../../lib/RadarWrangler.mjs'; import Terrain50StreamReader from '../../lib/io/Terrain50StreamReader.mjs'; +import log from './NamespacedLog.mjs'; const l = log("recordify"); export default async function() { if(typeof settings.water !== "string") @@ -27,4 +28,10 @@ export default async function() { const reader_water = new Terrain50StreamReader(); await writer.write(reader_radar.iterate(settings.rainfall), reader_water.iterate(settings.water)); + + l.log("Closing reader reader") + await reader_radar.close(); + l.log("Closing water depth data reader") + await reader_water.close(); + l.log(`All streams closed`); }