From 5b2d71f41f7970d0d32967c08692bf792c4f0f52 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Fri, 1 Jul 2022 19:08:36 +0100 Subject: [PATCH] it works .....I think --- package-lock.json | 24 +++++++++++++++++++ package.json | 5 ++++ rainfallwrangler/src/lib/RadarWrangler.mjs | 3 +-- .../src/lib/io/RecordWrangler.mjs | 21 ++++++++++------ .../src/lib/manip/array2d_pool.mjs | 1 - .../src/lib/record/RecordsWriter.mjs | 11 +++++---- 6 files changed, 50 insertions(+), 15 deletions(-) create mode 100644 package-lock.json create mode 100644 package.json diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..163d77d --- /dev/null +++ b/package-lock.json @@ -0,0 +1,24 @@ +{ + "name": "PhD-Rainfall-Radar", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "dependencies": { + "duplex-child-process": "^1.0.1" + } + }, + "node_modules/duplex-child-process": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/duplex-child-process/-/duplex-child-process-1.0.1.tgz", + "integrity": "sha512-tWbt4tyioDjyK5nh+qicbdvBvNjSXsTUF5zKUwSauuKPg1mokjwn/HezwfvWhh6hXoLdgetY+ZlzU/sMwUMJkg==" + } + }, + "dependencies": { + "duplex-child-process": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/duplex-child-process/-/duplex-child-process-1.0.1.tgz", + "integrity": "sha512-tWbt4tyioDjyK5nh+qicbdvBvNjSXsTUF5zKUwSauuKPg1mokjwn/HezwfvWhh6hXoLdgetY+ZlzU/sMwUMJkg==" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..f05de93 --- /dev/null +++ b/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "duplex-child-process": "^1.0.1" + } +} diff --git a/rainfallwrangler/src/lib/RadarWrangler.mjs b/rainfallwrangler/src/lib/RadarWrangler.mjs index d3f756d..5a4c08c 100644 --- a/rainfallwrangler/src/lib/RadarWrangler.mjs +++ b/rainfallwrangler/src/lib/RadarWrangler.mjs @@ -27,7 +27,6 @@ class RadarWrangler { const result = this.make_sample(timestep_buffer); if(result == null) continue; - yield result; timestep_buffer.shift(); @@ -50,7 +49,7 @@ class RadarWrangler { offset += channel_timestep_count; } - return grouped_timesteps; + return Promise.all(grouped_timesteps); } } diff --git a/rainfallwrangler/src/lib/io/RecordWrangler.mjs b/rainfallwrangler/src/lib/io/RecordWrangler.mjs index 0f9a8db..7918c27 100644 --- a/rainfallwrangler/src/lib/io/RecordWrangler.mjs +++ b/rainfallwrangler/src/lib/io/RecordWrangler.mjs @@ -6,6 +6,7 @@ import path from 'path'; import RecordBuilder from '../record/RecordBuilder.mjs'; import RecordsWriter from '../record/RecordsWriter.mjs'; import pretty_ms from 'pretty-ms'; +import terrain50_analyse_frequencies from 'terrain50/src/static/Terrain50AnalyseFrequencies.mjs'; class RecordWrangler { #builder = new RecordBuilder(); @@ -14,6 +15,8 @@ class RecordWrangler { this.dirpath = dirpath; this.count_per_file = count_per_file; + this.display_interval = 2 * 1000; + if(!fs.existsSync(this.dirpath)) fs.mkdirSync(this.dirpath); } @@ -22,24 +25,23 @@ class RecordWrangler { // TODO: Shuffle stuff about in the *Python* data pipeline let writer = null; - let i = -1, i_file = 0, count_this_file = 0, time_start = new Date(); + let i = 0, i_file = 0, count_this_file = 0, time_start = new Date(), time_display = time_start; while(true) { i++; - console.log(`RecordWriter step ${i}`); - // Start writing to a new file when necessary if(writer == null || count_this_file > this.count_per_file) { if(writer !== null) await writer.close(); const filepath_next = path.join(this.dirpath, `${i_file}.jsonl.gz`); writer = new RecordsWriter(filepath_next); - console.log(`RecordWriter NEW FILE ${filepath_next}`); i_file++; + count_this_file = 0; } + count_this_file++; + const sample_radar = await reader_radar.next(); const sample_water = await reader_water.next(); - if(sample_radar.done || sample_water.done) break; const example_next = this.make_example( @@ -49,14 +51,19 @@ class RecordWrangler { await writer.write(example_next); - process.stderr.write(`Elapsed: ${pretty_ms(new Date() - time_start)}, Written ${count_this_file}/${i_file}/${i} examples/files/total\r`); + const time_now = new Date(); + if(time_now - time_display > this.display_interval) { + const elapsed = new Date() - time_start; + process.stderr.write(`Elapsed: ${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })}, Written ${count_this_file}/${i_file}/${i} examples/files/total | ${(1000 / (elapsed / i)).toFixed(2)} batches/sec | ${this.count_per_file - count_this_file} left for this file\r`); + time_display = time_now; + } } } make_example(sample_radar, sample_water) { this.#builder.add("rainfallradar", sample_radar); - this.#builder.add("waterdepth", sample_water.flat); + this.#builder.add("waterdepth", sample_water); return this.#builder.release(); } } diff --git a/rainfallwrangler/src/lib/manip/array2d_pool.mjs b/rainfallwrangler/src/lib/manip/array2d_pool.mjs index 6e6bb0d..4b5f094 100644 --- a/rainfallwrangler/src/lib/manip/array2d_pool.mjs +++ b/rainfallwrangler/src/lib/manip/array2d_pool.mjs @@ -12,7 +12,6 @@ export default async function array2d_pool(channels, operator="max") { const tensor = tf.tensor(channels); return tf.max(tensor, 0, false); }); - const result_array = await result_tensor.array(); result_tensor.dispose(); diff --git a/rainfallwrangler/src/lib/record/RecordsWriter.mjs b/rainfallwrangler/src/lib/record/RecordsWriter.mjs index 16e0801..cc5c6b1 100644 --- a/rainfallwrangler/src/lib/record/RecordsWriter.mjs +++ b/rainfallwrangler/src/lib/record/RecordsWriter.mjs @@ -2,21 +2,22 @@ import fs from 'fs'; -import SpawnStream from 'spawn-stream'; +import ChildProcess from 'duplex-child-process'; import { write_safe, end_safe } from '../io/StreamHelpers.mjs'; class RecordsWriter { - #stream_out = fs.createWriteStream(filepath); - #gzip = SpawnStream("gzip"); + #stream_out = null; + #gzip = ChildProcess.spawn("gzip"); constructor(filepath) { + this.#stream_out = fs.createWriteStream(filepath); this.#gzip.pipe(this.#stream_out); } async write(sample) { - console.log(sample); - await write_safe(this.#gzip, JSON.stringify(sample)); + const str = JSON.stringify(Object.fromEntries(sample)); + await write_safe(this.#gzip, str); } async close() {