diff --git a/rainfallwrangler/src/lib/RadarWrangler.mjs b/rainfallwrangler/src/lib/RadarWrangler.mjs new file mode 100644 index 0000000..c0b1cfe --- /dev/null +++ b/rainfallwrangler/src/lib/RadarWrangler.mjs @@ -0,0 +1,52 @@ +"use strict"; + +class RadarWrangler { + /** + * The total number of timesteps we need in the buffer before we can even consider generating a sample. + * @return {number} + */ + get timesteps_required() { + return this.channel_pattern.reduce((next, total) => total + next, 0); + } + + constructor(channel_pattern, pooling_operator="max", time_step_interval = 300) { + this.channel_pattern = channel_pattern; + this.pooling_operator = pooling_operator; + + this.reader = new RadarReader(time_step_interval); + } + + async *iterate(filepath) { + const timestep_buffer = []; + for await(const next of this.reader.iterate(filepath)) { + timestep_buffer.push(next.data); + + const result = this.make_sample(timestep_buffer); + if(result == null) continue; + + yield result; + + timestep_buffer.shift(); + } + + yield this.make_sample(timestep_buffer); + } + + make_sample(timestep_buffer) { + if(timestep_buffer.length < this.timesteps_required) return null; + + const grouped_timesteps = []; + let offset = 0; + for(const channel_timestep_count of this.channel_pattern) { + const acc = []; + for(let i = offset; i < channel_timestep_count+offset; i++) { + acc.push(timestep_buffer[i]); + } + + grouped_timesteps.push(array2d_pool(acc, this.pooling_operator)); + offset += channel_timestep_count; + } + } +} + +export default RadarWrangler; diff --git a/rainfallwrangler/src/lib/io/RadarReader.mjs b/rainfallwrangler/src/lib/io/RadarReader.mjs index a04d9d8..2102b2d 100644 --- a/rainfallwrangler/src/lib/io/RadarReader.mjs +++ b/rainfallwrangler/src/lib/io/RadarReader.mjs @@ -17,15 +17,14 @@ import transpose from '../manip/array2d_transpose.mjs'; * @param {number} time_step_interval The nominal interval, in seconds, between time steps (default: 300 seconds) */ class RadarReader { - constructor(in_stride = 1, do_interpolate = true, time_step_interval = 300) { + constructor(time_step_interval = 300, in_stride = 1, do_interpolate = true) { this.time_step_interval = time_step_interval; this.stride = in_stride; this.do_interpolate = do_interpolate; - this.writer_interp_stats = []; + // this.writer_interp_stats = []; } - /** * An async iterator that yields rainfall radar objects in order. * Note that for a single RadarReader object, this method may be called @@ -125,7 +124,7 @@ class RadarReader { interpolation_percentage ); - this.writer_interp_stats.push(next_timestamp); + // this.writer_interp_stats.push(next_timestamp); yield obj_interpolated; diff --git a/rainfallwrangler/src/lib/io/TFRecordWriter.mjs b/rainfallwrangler/src/lib/io/TFRecordWriter.mjs new file mode 100644 index 0000000..932366d --- /dev/null +++ b/rainfallwrangler/src/lib/io/TFRecordWriter.mjs @@ -0,0 +1,62 @@ +"use strict"; + +import fs from 'fs'; +import path from 'path'; + +import tfrecord from 'tfrecord-stream'; + +class TFRecordWriter { + constructor(dirpath, count_per_file) { + this.dirpath = dirpath; + this.count_per_file = count_per_file; + + if(!fs.existsSync(dirpath)) + fs.mkdirSync(dirpath); + + this.#builder = tfrecord.createBuilder(); + } + + write(reader_radar, reader_water) { + // TODO: Shuffle stuff about in the *Python* data pipeline + + let writer = null; + let i = -1, i_file = 0, count_this_file = 0; + while(true) { + i++; + + // Start writing to a new file when necessary + if(writer == null || count_this_file > this.count_per_file) { + if(writer !== null) await writer.close(); + const filepath_next = path.join(dirpath, `${i_file}.tfrecord`); + writer = await tfrecord.Writer.createFromStream( + fs.createWriteStream(filepath_next) + ); + i_file++; + } + + const sample_radar = await reader_radar.next(); + const sample_water = await reader_water.next(); + + if(sample_radar.done || sample_water.done) break; + + const example_next = this.make_example( + sample_radar.value, + sample_water.value + ); + + await writer.writeExample(example_next); + } + + } + + make_example(sample_radar, sample_water) { + this.#builder.setFloats("rainfallradar", sample_radar.flat()); + this.#builder.setInteger("rainfallradar_width", sample_radar[0].length); + this.#builder.setFloats("waterdepth", sample_water.flat()); + this.#builder.setInteger("waterdepth_width", sample_water[0].length); + + return this.#builder.releaseExample(); + } +} + +export default TFRecordWriter; diff --git a/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs index 577d81f..16114a8 100644 --- a/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs +++ b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs @@ -10,20 +10,10 @@ import log from './lib/io/NamespacedLog.mjs'; const l = log("reader:terrain50str import array2d_classify_convert_bin from '../../manip/array2d_classify_convert_bin.mjs'; -class Terrain50StreamReader { - /** - * The tital number of timesteps we need in the buffer before we can even consider generating a sample. - * @return {number} - */ - get timesteps_required() { - return this.channel_pattern.reduce((next, total) => total + next, 0); - } - - constructor(threshold, channel_pattern, pooling_operator="max", tolerant = false) { +class Terrain50StreamReader { + constructor(threshold, tolerant = false) { this.threshold = threshold; - this.channel_pattern = channel_pattern; - this.pooling_operator = "max"; this.tolerant = tolerant; } @@ -33,14 +23,12 @@ class Terrain50StreamReader { gunzip() ), this.tolerant ? /\s+/ : " "); - const timestep_buffer = []; - let i = -1; for await (const next of stream) { i++; - // Skip the first few items, because we want to predict the next - // timestep after the rainfall radar data - if(i < this.temporal_depth) + // Skip the first few items, because we want to predict the water + // depth after the rainfall radar data + if(i < this.offset) continue; const values_bin = array2d_classify_convert_bin( @@ -48,31 +36,10 @@ class Terrain50StreamReader { this.threshold ); - timestep_buffer.push(values_bin); // l.debug(`[DEBUG:Terrain50Stream] values_bin`, util.inspect(values_bin).substr(0, 500)); - const result = this.make_sample(timestep_buffer); - if(result == null) continue; // l.debug(`[Terrain50Stream] Yielding tensor of shape`, values_bin.shape); - yield result; - } - - yield this.make_sample(timestep_buffer); - } - - make_sample(timestep_buffer) { - if(timestep_buffer.length < this.timesteps_required) return null; - - const grouped_timesteps = []; - let offset = 0; - for(const channel_timestep_count of this.channel_pattern) { - const acc = []; - for(let i = offset; i < channel_timestep_count+offset; i++) { - acc.push(timestep_buffer[i]); - } - - grouped_timesteps.push(array2d_pool(acc, this.pooling_operator)); - offset += channel_timestep_count; + yield values_bin; } } }