properly close all teh streams

This commit is contained in:
Starbeamrainbowlabs 2022-07-08 16:51:17 +01:00
parent 1a657bd653
commit b9a018f9a9
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
4 changed files with 47 additions and 9 deletions

View file

@ -51,6 +51,10 @@ class RadarWrangler {
return Promise.all(grouped_timesteps); return Promise.all(grouped_timesteps);
} }
async close() {
await this.reader.close();
}
} }
export default RadarWrangler; export default RadarWrangler;

View file

@ -9,6 +9,7 @@ import gunzip from 'gunzip-maybe';
import log from './NamespacedLog.mjs'; const l = log("reader:radar"); import log from './NamespacedLog.mjs'; const l = log("reader:radar");
import interpolate from '../manip/array2d_interpolate.mjs'; import interpolate from '../manip/array2d_interpolate.mjs';
import transpose from '../manip/array2d_transpose.mjs'; import transpose from '../manip/array2d_transpose.mjs';
import { end_safe } from './StreamHelpers.mjs';
/** /**
* Reads data in order from a directory of .jsonl.gz files. * Reads data in order from a directory of .jsonl.gz files.
@ -23,6 +24,10 @@ class RadarReader {
this.do_interpolate = do_interpolate; this.do_interpolate = do_interpolate;
// this.writer_interp_stats = []; // this.writer_interp_stats = [];
this.reader = null;
this.stream_in = null;
this.stream_extractor = null;
} }
/** /**
@ -36,18 +41,17 @@ class RadarReader {
if(!fs.existsSync(filename)) if(!fs.existsSync(filename))
throw new Error(`RadarReader/Error: Can't read from '${filename}' as it doesn't exist.`); throw new Error(`RadarReader/Error: Can't read from '${filename}' as it doesn't exist.`);
let read = fs.createReadStream(filename), this.stream_in = fs.createReadStream(filename),
extractor = gunzip(); this.stream_extractor = this.stream_in.pipe(gunzip());
read.pipe(extractor);
let reader = nexline({ this.reader = nexline({
input: new Readable().wrap(extractor) // Wrap the stream so that nexline likes it input: new Readable().wrap(this.stream_extractor) // Wrap the stream so that nexline likes it
}); });
let i = -1; let i = -1;
let prev = null let prev = null
while(true) { while(true) {
let next_line = await reader.next(); let next_line = await this.reader.next();
if(next_line == null) if(next_line == null)
break; break;
@ -134,6 +138,16 @@ class RadarReader {
); );
} while(b.timestamp - next_timestamp >= this.time_step_interval * 1000 * this.stride); } while(b.timestamp - next_timestamp >= this.time_step_interval * 1000 * this.stride);
} }
async close() {
if(this.stream_in !== null) this.stream_in.close();
if(this.stream_extractor !== null) await end_safe(this.stream_extractor);
if(this.reader !== null) this.reader.close();
this.stream_in = null;
this.stream_extractor = null;
this.reader = null;
}
} }
export default RadarReader; export default RadarReader;

View file

@ -5,24 +5,29 @@ import { Readable } from 'stream';
import fs from 'fs'; import fs from 'fs';
import path from 'path'; import path from 'path';
import Terrain50 from 'terrain50';
import gunzip from 'gunzip-maybe'; import gunzip from 'gunzip-maybe';
import Terrain50 from 'terrain50';
import log from './NamespacedLog.mjs'; const l = log("reader:terrain50stream"); import log from './NamespacedLog.mjs'; const l = log("reader:terrain50stream");
import array2d_classify_convert_bin from '../manip/array2d_classify_convert_bin.mjs'; import array2d_classify_convert_bin from '../manip/array2d_classify_convert_bin.mjs';
import { end_safe }
class Terrain50StreamReader { class Terrain50StreamReader {
constructor(threshold = 0.1, tolerant = false) { constructor(threshold = 0.1, tolerant = false) {
this.threshold = threshold; this.threshold = threshold;
this.tolerant = tolerant; this.tolerant = tolerant;
this.stream_in = null;
this.stream_extractor = null;
} }
async *iterate(filepath) { async *iterate(filepath) {
const reader = fs.createReadStream(filepath); this.stream_in = fs.createReadStream(filepath);
this.stream_extractor = gunzip();
const stream = Terrain50.ParseStream( const stream = Terrain50.ParseStream(
new Readable().wrap(reader.pipe(gunzip())), new Readable().wrap(this.stream_in.pipe(this.stream_extractor)),
this.tolerant ? /\s+/ : " " this.tolerant ? /\s+/ : " "
); );
@ -46,6 +51,14 @@ class Terrain50StreamReader {
yield values_bin; yield values_bin;
} }
} }
async close() {
if(this.stream_in !== null) this.stream_in.close();
if(this.stream_extractor !== null) await end_safe(this.stream_extractor);
this.stream_in = null
this.stream_extractor = null;
}
} }
export default Terrain50StreamReader; export default Terrain50StreamReader;

View file

@ -7,6 +7,7 @@ import RecordWrangler from '../../lib/io/RecordWrangler.mjs';
import RadarWrangler from '../../lib/RadarWrangler.mjs'; import RadarWrangler from '../../lib/RadarWrangler.mjs';
import Terrain50StreamReader from '../../lib/io/Terrain50StreamReader.mjs'; import Terrain50StreamReader from '../../lib/io/Terrain50StreamReader.mjs';
import log from './NamespacedLog.mjs'; const l = log("recordify");
export default async function() { export default async function() {
if(typeof settings.water !== "string") if(typeof settings.water !== "string")
@ -27,4 +28,10 @@ export default async function() {
const reader_water = new Terrain50StreamReader(); const reader_water = new Terrain50StreamReader();
await writer.write(reader_radar.iterate(settings.rainfall), reader_water.iterate(settings.water)); await writer.write(reader_radar.iterate(settings.rainfall), reader_water.iterate(settings.water));
l.log("Closing reader reader")
await reader_radar.close();
l.log("Closing water depth data reader")
await reader_water.close();
l.log(`All streams closed`);
} }