Add data readers

This commit is contained in:
Starbeamrainbowlabs 2022-05-18 17:04:11 +01:00
parent 5829db062b
commit bf4866bdbc
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
6 changed files with 313 additions and 0 deletions

View file

@ -0,0 +1,140 @@
"use strict";
import fs from 'fs';
import { Readable } from 'stream';
import nexline from 'nexline';
import gunzip from 'gunzip-maybe';
import log from './lib/io/NamespacedLog.mjs'; const l = log("reader:radar");
import interpolate from '../manip/array2d_interpolate.mjs';
import transpose from '../manip/array2d_transpose.mjs';
/**
* Reads data in order from a directory of .jsonl.gz files.
* @param {string} in_stride Return only every X objects. 1 = return every object. Default: 1
* @param {boolean} do_interpolate Whether to interpolate to fill in missing values.
* @param {number} time_step_interval The nominal interval, in seconds, between time steps (default: 300 seconds)
*/
class RadarReader {
constructor(in_stride = 1, do_interpolate = true, time_step_interval = 300) {
this.time_step_interval = time_step_interval;
this.stride = in_stride;
this.do_interpolate = do_interpolate;
this.writer_interp_stats = [];
}
/**
* An async iterator that yields rainfall radar objects in order.
* Note that for a single RadarReader object, this method may be called
* multiple times - potentially in parallel.
* @param {string} filename The filename to read from.
* @return {Generator<Promise>} The async generator.
*/
async *iterate(filename) {
if(!fs.existsSync(filename))
throw new Error(`RadarReader/Error: Can't read from '${filename}' as it doesn't exist.`);
let read = fs.createReadStream(filename),
extractor = gunzip();
read.pipe(extractor);
let reader = nexline({
input: new Readable().wrap(extractor) // Wrap the stream so that nexline likes it
});
let i = -1;
let prev = null
while(true) {
let next_line = await reader.next();
if(next_line == null)
break;
i++;
if(i % this.stride !== 0) continue;
// Ignore empty lines
if(next_line.trim() === "")
continue;
let next = null;
try {
next = JSON.parse(next_line);
} catch(error) {
l.warn(`Encountered invalid JSON object at line ${i}, skipping (error: ${error.message})`);
continue;
}
if(next == null) continue;
// Sort out the timestamp
if(next.timestamp == null) {
if(next.timestamps !== null) next.timestamp = next.timestamps[1];
else {
l.warn(`Encountered JSON object without a timestamp`);
continue;
}
}
next.timestamp = new Date(next.timestamp);
// Transpose the data to correct our earlier mistake
next.data = transpose(next.data) // Correct the orientation of the data array
let tmp = next.size_extract.height;
next.size_extract.height = next.size_extract.width;
next.size_extract.height = tmp;
// Now, the data is in the format data[y][x]
// Interpolate if needed
if(this.do_interpolate && prev !== null && next.timestamp - prev.timestamp > this.time_step_interval * 1000 * 1.5 * this.stride) {
for await(let item of this.interpolate(prev, next)) {
yield item;
}
}
// We've caught up (if required) - continue as normal
yield next;
prev = next;
}
}
/**
* Interpolates between 2 objects.
* @param {Object} a The first object.
* @param {Object} b The second object.
* @return {Generator<Object>}
*/
async *interpolate(a, b) {
let next_timestamp = new Date(a.timestamp); // This clones the existing Date object
// Increment the time interval
next_timestamp.setSeconds(
next_timestamp.getSeconds() + (this.time_step_interval * this.stride)
);
do {
// The percentage of the way through the interpolation we are
let interpolation_percentage = 1 - ((b.timestamp - next_timestamp) / (b.timestamp - a.timestamp));
// Generate a temporary interpolated object
let obj_interpolated = {};
Object.assign(obj_interpolated, a);
obj_interpolated.timestamp = next_timestamp;
obj_interpolated.data = interpolate(
a.data, b.data,
interpolation_percentage
);
this.writer_interp_stats.push(next_timestamp);
yield obj_interpolated;
// Increment the time interval
next_timestamp.setSeconds(
next_timestamp.getSeconds() + (this.time_step_interval * this.stride)
);
} while(b.timestamp - next_timestamp >= this.time_step_interval * 1000 * this.stride);
}
}
export default RadarReader;

View file

@ -0,0 +1,80 @@
"use strict";
import { pipeline } from 'stream';
import util from 'util';
import fs from 'fs';
import path from 'path';
import log from './lib/io/NamespacedLog.mjs'; const l = log("reader:terrain50stream");
import array2d_classify_convert_bin from '../../manip/array2d_classify_convert_bin.mjs';
class Terrain50StreamReader {
/**
* The tital number of timesteps we need in the buffer before we can even consider generating a sample.
* @return {number}
*/
get timesteps_required() {
return this.channel_pattern.reduce((next, total) => total + next, 0);
}
constructor(threshold, channel_pattern, pooling_operator="max", tolerant = false) {
this.threshold = threshold;
this.channel_pattern = channel_pattern;
this.pooling_operator = "max";
this.tolerant = tolerant;
}
async *iterate(filepath) {
const stream = Terrain50.ParseStream(pipeline(
fs.createReadStream(filepath),
gunzip()
), this.tolerant ? /\s+/ : " ");
const timestep_buffer = [];
let i = -1;
for await (const next of stream) {
i++;
// Skip the first few items, because we want to predict the next
// timestep after the rainfall radar data
if(i < this.temporal_depth)
continue;
const values_bin = array2d_classify_convert_bin(
next.data,
this.threshold
);
timestep_buffer.push(values_bin);
// l.debug(`[DEBUG:Terrain50Stream] values_bin`, util.inspect(values_bin).substr(0, 500));
const result = this.make_sample(timestep_buffer);
if(result == null) continue;
// l.debug(`[Terrain50Stream] Yielding tensor of shape`, values_bin.shape);
yield result;
}
yield this.make_sample(timestep_buffer);
}
make_sample(timestep_buffer) {
if(timestep_buffer.length < this.timesteps_required) return null;
const grouped_timesteps = [];
let offset = 0;
for(const channel_timestep_count of this.channel_pattern) {
const acc = [];
for(let i = offset; i < channel_timestep_count+offset; i++) {
acc.push(timestep_buffer[i]);
}
grouped_timesteps.push(array2d_pool(acc, this.pooling_operator));
offset += channel_timestep_count;
}
}
}
export default Terrain50StreamReader;

View file

@ -0,0 +1,23 @@
"use strict";
/**
* Given a 2D array, makes a new 2D array according to the given threshold
* Apparently making this a mutator makes it slower - despite the theorised decreased load on the garbage collector
*
* @param {number[][]} array The 2D array to process.
* @param {{min:Number,max:Number}[]} bounds An array of objects, generated by bounds2classes().
* @return {number[][]} The generated 2D array.
*/
export default function array2d_classify_convert_bin(arr, threshold) {
if(typeof threshold != "number")
throw new Error("Error: Invalid threshold value.");
let result = [];
for(let i = 0; i < arr.length; i++) {
result[i] = [];
for(let j = 0; j < arr[i].length; j++) {
result[i][j] = arr[i][j] >= threshold ? 1 : 0;
}
}
return result;
}

View file

@ -0,0 +1,18 @@
"use strict";
// import tf from '@tensorflow/tfjs-node';
import tf from '@tensorflow/tfjs-node-gpu';
export default async function array2d_pool(channels, operator) {
// This is rather a hack to save time. Tensorflow.js is not needed here, but may result in increased speed. It may be worth rolling this out to the rest of the codebase, thinking about it. While Tensorflow.js hasmany bugs, this only extends to the machine learning / loss functions / models etc and not the
const result_tensor = tf.tidy(() => {
const tensor = tf.tensor(channels);
console.log(`DEFAULT array2d_pool tensor shape:`, tensor);
return tf.max(tensor, 0, false);
});
const result_array = await result.array();
result_tensor.dispose();
return result_array;
}

View file

@ -0,0 +1,33 @@
"use strict";
/**
* Interpolates the values between 2 2D arrays.
* Note that both arrays MUST be exactly the same size!
* Imported from nimrod-data-downloader.
* @param {number[][]} a A 2D array of numbers.
* @param {number[][]} b A 2D array of numbers.
* @param {number} percentage The percentage of the way from a to b to use when interpolating
* @return {number[][]} Another 2D array of numbers between the 2 different input arrays.
*/
function interpolate(a, b, percentage) {
let result = [];
for(let i = 0; i < a.length; i++) {
let row = [];
for(let j = 0; j < a[i].length; j++) {
// If this row in b isn't defined, fill it in
// Not sure why we'd have variable lengths here
if(typeof b[i] == "undefined")
b[i] = Array.apply(null, Array(a[i].length)).map(Number.prototype.valueOf, 0);
/*
* Example:
* d = 10, e = 20, percentage = 0.75
* result = ((e - d) * percentage + d)
*/
row.push(((b[i][j] - a[i][j]) * percentage) + a[i][j]);
}
result.push(row);
}
return result;
}
export default interpolate;

View file

@ -0,0 +1,19 @@
"use strict";
/**
* Transpose a 2D array. In other words, rotate the contents of the array 90°
* *anti*clockwise.
* Note that a normal transpose would rotate 90° clockwise, not *anti*clockwise.
* Adapted from https://stackoverflow.com/a/17428705/1460422 to rotate it anti-clockwise, not clockwise
* Note that even though `.reverse()` is a mutating function, it's ok here because `.map()` creates a new array.
* This was written because in the NimrodRunner we made the embarrasing mistake
* of winding up writing the data the wrong way around.....
* Imported from nimrod-data-downloader.
* @param {any[][]} data The 2D array to transpose.
* @return {any[][]} The transposed array.
*/
function transpose(data) {
return data[0].map((_col, i) => data.map(row => row[i])).reverse();
}
export default transpose;