diff --git a/rainfallwrangler/src/lib/RadarWrangler.mjs b/rainfallwrangler/src/lib/RadarWrangler.mjs index 60299c4..d3f756d 100644 --- a/rainfallwrangler/src/lib/RadarWrangler.mjs +++ b/rainfallwrangler/src/lib/RadarWrangler.mjs @@ -46,10 +46,11 @@ class RadarWrangler { for(let i = offset; i < channel_timestep_count+offset; i++) { acc.push(timestep_buffer[i]); } - grouped_timesteps.push(array2d_pool(acc, this.pooling_operator)); offset += channel_timestep_count; } + + return grouped_timesteps; } } diff --git a/rainfallwrangler/src/lib/io/TFRecordWriter.mjs b/rainfallwrangler/src/lib/io/TFRecordWriter.mjs index f46bc5e..d2c4a7f 100644 --- a/rainfallwrangler/src/lib/io/TFRecordWriter.mjs +++ b/rainfallwrangler/src/lib/io/TFRecordWriter.mjs @@ -36,7 +36,9 @@ class TFRecordWriter { } const sample_radar = await reader_radar.next(); + console.log(`SAMPLE_RADAR`); const sample_water = await reader_water.next(); + console.log(`SAMPLE_WATER`); if(sample_radar.done || sample_water.done) break; @@ -53,8 +55,11 @@ class TFRecordWriter { } make_example(sample_radar, sample_water) { - this.#builder.setFloats("rainfallradar", sample_radar.flat()); + console.log(`SAMPLE WATER ${sample_water.flat().length} RAINFALL ${sample_radar.flat().length}`); + const sample_radar_flat1 = sample_radar.flat(); + this.#builder.setFloats("rainfallradar", sample_radar_flat1.flat()); this.#builder.setInteger("rainfallradar_width", sample_radar[0].length); + this.#builder.setInteger("rainfallradar_channelsize", sample_radar_flat1[0].length); this.#builder.setFloats("waterdepth", sample_water.flat()); this.#builder.setInteger("waterdepth_width", sample_water[0].length); diff --git a/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs index 788771e..22b1aae 100644 --- a/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs +++ b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs @@ -1,13 +1,14 @@ "use strict"; -import { pipeline } from 'stream'; import util from 'util'; +import { Readable } from 'stream'; import fs from 'fs'; import path from 'path'; +import Terrain50 from 'terrain50'; +import gunzip from 'gunzip-maybe'; import log from './NamespacedLog.mjs'; const l = log("reader:terrain50stream"); - import array2d_classify_convert_bin from '../manip/array2d_classify_convert_bin.mjs'; class Terrain50StreamReader { @@ -18,10 +19,11 @@ class Terrain50StreamReader { } async *iterate(filepath) { - const stream = Terrain50.ParseStream(pipeline( - fs.createReadStream(filepath), - gunzip() - ), this.tolerant ? /\s+/ : " "); + const reader = fs.createReadStream(filepath); + const extractor = gunzip(); + reader.pipe(extractor); + + const stream = Terrain50.ParseStream(new Readable(extractor), this.tolerant ? /\s+/ : " "); let i = -1; for await (const next of stream) { diff --git a/rainfallwrangler/src/lib/manip/array2d_pool.mjs b/rainfallwrangler/src/lib/manip/array2d_pool.mjs index 224ed6a..6e6bb0d 100644 --- a/rainfallwrangler/src/lib/manip/array2d_pool.mjs +++ b/rainfallwrangler/src/lib/manip/array2d_pool.mjs @@ -3,15 +3,17 @@ // import tf from '@tensorflow/tfjs-node'; import tf from '@tensorflow/tfjs-node-gpu'; -export default async function array2d_pool(channels, operator) { +export default async function array2d_pool(channels, operator="max") { + if(operator !== "max") + throw new Error(`Error: Unknown operator '${operator}. At present only the 'max' operator is supported.`); + // This is rather a hack to save time. Tensorflow.js is not needed here, but may result in increased speed. It may be worth rolling this out to the rest of the codebase, thinking about it. While Tensorflow.js hasmany bugs, this only extends to the machine learning / loss functions / models etc and not the const result_tensor = tf.tidy(() => { const tensor = tf.tensor(channels); - console.log(`DEFAULT array2d_pool tensor shape:`, tensor); return tf.max(tensor, 0, false); }); - const result_array = await result.array(); + const result_array = await result_tensor.array(); result_tensor.dispose(); return result_array; diff --git a/rainfallwrangler/src/subcommands/tfrecordify/meta.mjs b/rainfallwrangler/src/subcommands/tfrecordify/meta.mjs index 832237a..6ffbae4 100644 --- a/rainfallwrangler/src/subcommands/tfrecordify/meta.mjs +++ b/rainfallwrangler/src/subcommands/tfrecordify/meta.mjs @@ -5,7 +5,7 @@ export default function(cli) { .argument("water", "Path to the water depths file, formatted as a stream of terrain50 objects. May or may not be gzipped.", null, "string") .argument("rainfall", "Path to the rainfall radar data, formatted as jsonl. May or may not be gzipped.", null, "string") .argument("count-file", "The number of records to store in each TFRecord file. See the documentation for the optimal value of this number (default: 4096).", 64*64) - .argument("rainfall-pattern", "The pattern of the number of time steps to average, as a comma-separated list of numbers. Given a point in time, each successive number specified works BACKWARDS from that point. For example, 1,4,10 would be 3 channels: 1 time step on it's own, then average the next 4 time steps, then average the next 10 steps (default: 1,3,3,5,12,24,48).", [1,3,3,5,12,24,48], function(value) { + .argument("rainfall-pattern", "The pattern of the number of time steps to average, as a comma-separated list of numbers. Given a point in time, each successive number specified works BACKWARDS from that point. For example, 1,4,10 would be 3 channels: 1 time step on it's own, then average the next 4 time steps, then average the next 10 steps (default: 1,3,3,5,12,24,48).", [1,3,3,5,12,24,48].reverse(), function(value) { return value.split(",") .map(el => parseInt(el)) .reverse();