Imnplement plumbing, but it's all untested

This commit is contained in:
Starbeamrainbowlabs 2022-05-18 17:47:02 +01:00
parent bf4866bdbc
commit 0fa7ae9d6a
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
4 changed files with 123 additions and 43 deletions

View file

@ -0,0 +1,52 @@
"use strict";
class RadarWrangler {
/**
* The total number of timesteps we need in the buffer before we can even consider generating a sample.
* @return {number}
*/
get timesteps_required() {
return this.channel_pattern.reduce((next, total) => total + next, 0);
}
constructor(channel_pattern, pooling_operator="max", time_step_interval = 300) {
this.channel_pattern = channel_pattern;
this.pooling_operator = pooling_operator;
this.reader = new RadarReader(time_step_interval);
}
async *iterate(filepath) {
const timestep_buffer = [];
for await(const next of this.reader.iterate(filepath)) {
timestep_buffer.push(next.data);
const result = this.make_sample(timestep_buffer);
if(result == null) continue;
yield result;
timestep_buffer.shift();
}
yield this.make_sample(timestep_buffer);
}
make_sample(timestep_buffer) {
if(timestep_buffer.length < this.timesteps_required) return null;
const grouped_timesteps = [];
let offset = 0;
for(const channel_timestep_count of this.channel_pattern) {
const acc = [];
for(let i = offset; i < channel_timestep_count+offset; i++) {
acc.push(timestep_buffer[i]);
}
grouped_timesteps.push(array2d_pool(acc, this.pooling_operator));
offset += channel_timestep_count;
}
}
}
export default RadarWrangler;

View file

@ -17,15 +17,14 @@ import transpose from '../manip/array2d_transpose.mjs';
* @param {number} time_step_interval The nominal interval, in seconds, between time steps (default: 300 seconds) * @param {number} time_step_interval The nominal interval, in seconds, between time steps (default: 300 seconds)
*/ */
class RadarReader { class RadarReader {
constructor(in_stride = 1, do_interpolate = true, time_step_interval = 300) { constructor(time_step_interval = 300, in_stride = 1, do_interpolate = true) {
this.time_step_interval = time_step_interval; this.time_step_interval = time_step_interval;
this.stride = in_stride; this.stride = in_stride;
this.do_interpolate = do_interpolate; this.do_interpolate = do_interpolate;
this.writer_interp_stats = []; // this.writer_interp_stats = [];
} }
/** /**
* An async iterator that yields rainfall radar objects in order. * An async iterator that yields rainfall radar objects in order.
* Note that for a single RadarReader object, this method may be called * Note that for a single RadarReader object, this method may be called
@ -125,7 +124,7 @@ class RadarReader {
interpolation_percentage interpolation_percentage
); );
this.writer_interp_stats.push(next_timestamp); // this.writer_interp_stats.push(next_timestamp);
yield obj_interpolated; yield obj_interpolated;

View file

@ -0,0 +1,62 @@
"use strict";
import fs from 'fs';
import path from 'path';
import tfrecord from 'tfrecord-stream';
class TFRecordWriter {
constructor(dirpath, count_per_file) {
this.dirpath = dirpath;
this.count_per_file = count_per_file;
if(!fs.existsSync(dirpath))
fs.mkdirSync(dirpath);
this.#builder = tfrecord.createBuilder();
}
write(reader_radar, reader_water) {
// TODO: Shuffle stuff about in the *Python* data pipeline
let writer = null;
let i = -1, i_file = 0, count_this_file = 0;
while(true) {
i++;
// Start writing to a new file when necessary
if(writer == null || count_this_file > this.count_per_file) {
if(writer !== null) await writer.close();
const filepath_next = path.join(dirpath, `${i_file}.tfrecord`);
writer = await tfrecord.Writer.createFromStream(
fs.createWriteStream(filepath_next)
);
i_file++;
}
const sample_radar = await reader_radar.next();
const sample_water = await reader_water.next();
if(sample_radar.done || sample_water.done) break;
const example_next = this.make_example(
sample_radar.value,
sample_water.value
);
await writer.writeExample(example_next);
}
}
make_example(sample_radar, sample_water) {
this.#builder.setFloats("rainfallradar", sample_radar.flat());
this.#builder.setInteger("rainfallradar_width", sample_radar[0].length);
this.#builder.setFloats("waterdepth", sample_water.flat());
this.#builder.setInteger("waterdepth_width", sample_water[0].length);
return this.#builder.releaseExample();
}
}
export default TFRecordWriter;

View file

@ -10,20 +10,10 @@ import log from './lib/io/NamespacedLog.mjs'; const l = log("reader:terrain50str
import array2d_classify_convert_bin from '../../manip/array2d_classify_convert_bin.mjs'; import array2d_classify_convert_bin from '../../manip/array2d_classify_convert_bin.mjs';
class Terrain50StreamReader { class Terrain50StreamReader {
/** constructor(threshold, tolerant = false) {
* The tital number of timesteps we need in the buffer before we can even consider generating a sample.
* @return {number}
*/
get timesteps_required() {
return this.channel_pattern.reduce((next, total) => total + next, 0);
}
constructor(threshold, channel_pattern, pooling_operator="max", tolerant = false) {
this.threshold = threshold; this.threshold = threshold;
this.channel_pattern = channel_pattern;
this.pooling_operator = "max";
this.tolerant = tolerant; this.tolerant = tolerant;
} }
@ -33,14 +23,12 @@ class Terrain50StreamReader {
gunzip() gunzip()
), this.tolerant ? /\s+/ : " "); ), this.tolerant ? /\s+/ : " ");
const timestep_buffer = [];
let i = -1; let i = -1;
for await (const next of stream) { for await (const next of stream) {
i++; i++;
// Skip the first few items, because we want to predict the next // Skip the first few items, because we want to predict the water
// timestep after the rainfall radar data // depth after the rainfall radar data
if(i < this.temporal_depth) if(i < this.offset)
continue; continue;
const values_bin = array2d_classify_convert_bin( const values_bin = array2d_classify_convert_bin(
@ -48,31 +36,10 @@ class Terrain50StreamReader {
this.threshold this.threshold
); );
timestep_buffer.push(values_bin);
// l.debug(`[DEBUG:Terrain50Stream] values_bin`, util.inspect(values_bin).substr(0, 500)); // l.debug(`[DEBUG:Terrain50Stream] values_bin`, util.inspect(values_bin).substr(0, 500));
const result = this.make_sample(timestep_buffer);
if(result == null) continue;
// l.debug(`[Terrain50Stream] Yielding tensor of shape`, values_bin.shape); // l.debug(`[Terrain50Stream] Yielding tensor of shape`, values_bin.shape);
yield result; yield values_bin;
}
yield this.make_sample(timestep_buffer);
}
make_sample(timestep_buffer) {
if(timestep_buffer.length < this.timesteps_required) return null;
const grouped_timesteps = [];
let offset = 0;
for(const channel_timestep_count of this.channel_pattern) {
const acc = [];
for(let i = offset; i < channel_timestep_count+offset; i++) {
acc.push(timestep_buffer[i]);
}
grouped_timesteps.push(array2d_pool(acc, this.pooling_operator));
offset += channel_timestep_count;
} }
} }
} }