From bf4866bdbc0ef18b1cd61ea900c3423905bc65fc Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Wed, 18 May 2022 17:04:11 +0100 Subject: [PATCH] Add data readers --- rainfallwrangler/src/lib/io/RadarReader.mjs | 140 ++++++++++++++++++ .../src/lib/io/Terrain50StreamReader.mjs | 80 ++++++++++ .../manip/array2d_classify_convert_bin.mjs | 23 +++ .../src/lib/manip/array2d_pool.mjs | 18 +++ .../src/lib/manip/array_interpolate.mjs | 33 +++++ .../src/lib/manip/array_transpose.mjs | 19 +++ 6 files changed, 313 insertions(+) create mode 100644 rainfallwrangler/src/lib/io/RadarReader.mjs create mode 100644 rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs create mode 100644 rainfallwrangler/src/lib/manip/array2d_classify_convert_bin.mjs create mode 100644 rainfallwrangler/src/lib/manip/array2d_pool.mjs create mode 100644 rainfallwrangler/src/lib/manip/array_interpolate.mjs create mode 100644 rainfallwrangler/src/lib/manip/array_transpose.mjs diff --git a/rainfallwrangler/src/lib/io/RadarReader.mjs b/rainfallwrangler/src/lib/io/RadarReader.mjs new file mode 100644 index 0000000..a04d9d8 --- /dev/null +++ b/rainfallwrangler/src/lib/io/RadarReader.mjs @@ -0,0 +1,140 @@ +"use strict"; + +import fs from 'fs'; +import { Readable } from 'stream'; + +import nexline from 'nexline'; +import gunzip from 'gunzip-maybe'; + +import log from './lib/io/NamespacedLog.mjs'; const l = log("reader:radar"); +import interpolate from '../manip/array2d_interpolate.mjs'; +import transpose from '../manip/array2d_transpose.mjs'; + +/** + * Reads data in order from a directory of .jsonl.gz files. + * @param {string} in_stride Return only every X objects. 1 = return every object. Default: 1 + * @param {boolean} do_interpolate Whether to interpolate to fill in missing values. + * @param {number} time_step_interval The nominal interval, in seconds, between time steps (default: 300 seconds) + */ +class RadarReader { + constructor(in_stride = 1, do_interpolate = true, time_step_interval = 300) { + this.time_step_interval = time_step_interval; + this.stride = in_stride; + this.do_interpolate = do_interpolate; + + this.writer_interp_stats = []; + } + + + /** + * An async iterator that yields rainfall radar objects in order. + * Note that for a single RadarReader object, this method may be called + * multiple times - potentially in parallel. + * @param {string} filename The filename to read from. + * @return {Generator} The async generator. + */ + async *iterate(filename) { + if(!fs.existsSync(filename)) + throw new Error(`RadarReader/Error: Can't read from '${filename}' as it doesn't exist.`); + + let read = fs.createReadStream(filename), + extractor = gunzip(); + read.pipe(extractor); + + let reader = nexline({ + input: new Readable().wrap(extractor) // Wrap the stream so that nexline likes it + }); + + let i = -1; + let prev = null + while(true) { + let next_line = await reader.next(); + if(next_line == null) + break; + + i++; + if(i % this.stride !== 0) continue; + + // Ignore empty lines + if(next_line.trim() === "") + continue; + + let next = null; + try { + next = JSON.parse(next_line); + } catch(error) { + l.warn(`Encountered invalid JSON object at line ${i}, skipping (error: ${error.message})`); + continue; + } + + if(next == null) continue; + + // Sort out the timestamp + if(next.timestamp == null) { + if(next.timestamps !== null) next.timestamp = next.timestamps[1]; + else { + l.warn(`Encountered JSON object without a timestamp`); + continue; + } + } + next.timestamp = new Date(next.timestamp); + // Transpose the data to correct our earlier mistake + next.data = transpose(next.data) // Correct the orientation of the data array + let tmp = next.size_extract.height; + next.size_extract.height = next.size_extract.width; + next.size_extract.height = tmp; + // Now, the data is in the format data[y][x] + + // Interpolate if needed + if(this.do_interpolate && prev !== null && next.timestamp - prev.timestamp > this.time_step_interval * 1000 * 1.5 * this.stride) { + for await(let item of this.interpolate(prev, next)) { + yield item; + } + } + + // We've caught up (if required) - continue as normal + yield next; + prev = next; + } + } + + /** + * Interpolates between 2 objects. + * @param {Object} a The first object. + * @param {Object} b The second object. + * @return {Generator} + */ + async *interpolate(a, b) { + let next_timestamp = new Date(a.timestamp); // This clones the existing Date object + + // Increment the time interval + next_timestamp.setSeconds( + next_timestamp.getSeconds() + (this.time_step_interval * this.stride) + ); + + do { + // The percentage of the way through the interpolation we are + let interpolation_percentage = 1 - ((b.timestamp - next_timestamp) / (b.timestamp - a.timestamp)); + + // Generate a temporary interpolated object + let obj_interpolated = {}; + Object.assign(obj_interpolated, a); + obj_interpolated.timestamp = next_timestamp; + obj_interpolated.data = interpolate( + a.data, b.data, + interpolation_percentage + ); + + this.writer_interp_stats.push(next_timestamp); + + yield obj_interpolated; + + // Increment the time interval + next_timestamp.setSeconds( + next_timestamp.getSeconds() + (this.time_step_interval * this.stride) + ); + } while(b.timestamp - next_timestamp >= this.time_step_interval * 1000 * this.stride); + } +} + +export default RadarReader; diff --git a/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs new file mode 100644 index 0000000..577d81f --- /dev/null +++ b/rainfallwrangler/src/lib/io/Terrain50StreamReader.mjs @@ -0,0 +1,80 @@ +"use strict"; + +import { pipeline } from 'stream'; +import util from 'util'; +import fs from 'fs'; +import path from 'path'; + + +import log from './lib/io/NamespacedLog.mjs'; const l = log("reader:terrain50stream"); + +import array2d_classify_convert_bin from '../../manip/array2d_classify_convert_bin.mjs'; + +class Terrain50StreamReader { + /** + * The tital number of timesteps we need in the buffer before we can even consider generating a sample. + * @return {number} + */ + get timesteps_required() { + return this.channel_pattern.reduce((next, total) => total + next, 0); + } + + constructor(threshold, channel_pattern, pooling_operator="max", tolerant = false) { + this.threshold = threshold; + this.channel_pattern = channel_pattern; + + this.pooling_operator = "max"; + this.tolerant = tolerant; + } + + async *iterate(filepath) { + const stream = Terrain50.ParseStream(pipeline( + fs.createReadStream(filepath), + gunzip() + ), this.tolerant ? /\s+/ : " "); + + const timestep_buffer = []; + + let i = -1; + for await (const next of stream) { + i++; + // Skip the first few items, because we want to predict the next + // timestep after the rainfall radar data + if(i < this.temporal_depth) + continue; + + const values_bin = array2d_classify_convert_bin( + next.data, + this.threshold + ); + + timestep_buffer.push(values_bin); + // l.debug(`[DEBUG:Terrain50Stream] values_bin`, util.inspect(values_bin).substr(0, 500)); + + const result = this.make_sample(timestep_buffer); + if(result == null) continue; + // l.debug(`[Terrain50Stream] Yielding tensor of shape`, values_bin.shape); + yield result; + } + + yield this.make_sample(timestep_buffer); + } + + make_sample(timestep_buffer) { + if(timestep_buffer.length < this.timesteps_required) return null; + + const grouped_timesteps = []; + let offset = 0; + for(const channel_timestep_count of this.channel_pattern) { + const acc = []; + for(let i = offset; i < channel_timestep_count+offset; i++) { + acc.push(timestep_buffer[i]); + } + + grouped_timesteps.push(array2d_pool(acc, this.pooling_operator)); + offset += channel_timestep_count; + } + } +} + +export default Terrain50StreamReader; diff --git a/rainfallwrangler/src/lib/manip/array2d_classify_convert_bin.mjs b/rainfallwrangler/src/lib/manip/array2d_classify_convert_bin.mjs new file mode 100644 index 0000000..f7928ec --- /dev/null +++ b/rainfallwrangler/src/lib/manip/array2d_classify_convert_bin.mjs @@ -0,0 +1,23 @@ +"use strict"; + +/** + * Given a 2D array, makes a new 2D array according to the given threshold + * Apparently making this a mutator makes it slower - despite the theorised decreased load on the garbage collector + * + * @param {number[][]} array The 2D array to process. + * @param {{min:Number,max:Number}[]} bounds An array of objects, generated by bounds2classes(). + * @return {number[][]} The generated 2D array. + */ +export default function array2d_classify_convert_bin(arr, threshold) { + if(typeof threshold != "number") + throw new Error("Error: Invalid threshold value."); + + let result = []; + for(let i = 0; i < arr.length; i++) { + result[i] = []; + for(let j = 0; j < arr[i].length; j++) { + result[i][j] = arr[i][j] >= threshold ? 1 : 0; + } + } + return result; +} diff --git a/rainfallwrangler/src/lib/manip/array2d_pool.mjs b/rainfallwrangler/src/lib/manip/array2d_pool.mjs new file mode 100644 index 0000000..224ed6a --- /dev/null +++ b/rainfallwrangler/src/lib/manip/array2d_pool.mjs @@ -0,0 +1,18 @@ +"use strict"; + +// import tf from '@tensorflow/tfjs-node'; +import tf from '@tensorflow/tfjs-node-gpu'; + +export default async function array2d_pool(channels, operator) { + // This is rather a hack to save time. Tensorflow.js is not needed here, but may result in increased speed. It may be worth rolling this out to the rest of the codebase, thinking about it. While Tensorflow.js hasmany bugs, this only extends to the machine learning / loss functions / models etc and not the + const result_tensor = tf.tidy(() => { + const tensor = tf.tensor(channels); + console.log(`DEFAULT array2d_pool tensor shape:`, tensor); + return tf.max(tensor, 0, false); + }); + + const result_array = await result.array(); + result_tensor.dispose(); + + return result_array; +} diff --git a/rainfallwrangler/src/lib/manip/array_interpolate.mjs b/rainfallwrangler/src/lib/manip/array_interpolate.mjs new file mode 100644 index 0000000..ba5338c --- /dev/null +++ b/rainfallwrangler/src/lib/manip/array_interpolate.mjs @@ -0,0 +1,33 @@ +"use strict"; + +/** + * Interpolates the values between 2 2D arrays. + * Note that both arrays MUST be exactly the same size! + * Imported from nimrod-data-downloader. + * @param {number[][]} a A 2D array of numbers. + * @param {number[][]} b A 2D array of numbers. + * @param {number} percentage The percentage of the way from a to b to use when interpolating + * @return {number[][]} Another 2D array of numbers between the 2 different input arrays. + */ +function interpolate(a, b, percentage) { + let result = []; + for(let i = 0; i < a.length; i++) { + let row = []; + for(let j = 0; j < a[i].length; j++) { + // If this row in b isn't defined, fill it in + // Not sure why we'd have variable lengths here + if(typeof b[i] == "undefined") + b[i] = Array.apply(null, Array(a[i].length)).map(Number.prototype.valueOf, 0); + /* + * Example: + * d = 10, e = 20, percentage = 0.75 + * result = ((e - d) * percentage + d) + */ + row.push(((b[i][j] - a[i][j]) * percentage) + a[i][j]); + } + result.push(row); + } + return result; +} + +export default interpolate; diff --git a/rainfallwrangler/src/lib/manip/array_transpose.mjs b/rainfallwrangler/src/lib/manip/array_transpose.mjs new file mode 100644 index 0000000..6f1953b --- /dev/null +++ b/rainfallwrangler/src/lib/manip/array_transpose.mjs @@ -0,0 +1,19 @@ +"use strict"; + +/** + * Transpose a 2D array. In other words, rotate the contents of the array 90° + * *anti*clockwise. + * Note that a normal transpose would rotate 90° clockwise, not *anti*clockwise. + * Adapted from https://stackoverflow.com/a/17428705/1460422 to rotate it anti-clockwise, not clockwise + * Note that even though `.reverse()` is a mutating function, it's ok here because `.map()` creates a new array. + * This was written because in the NimrodRunner we made the embarrasing mistake + * of winding up writing the data the wrong way around..... + * Imported from nimrod-data-downloader. + * @param {any[][]} data The 2D array to transpose. + * @return {any[][]} The transposed array. + */ +function transpose(data) { + return data[0].map((_col, i) => data.map(row => row[i])).reverse(); +} + +export default transpose;