mirror of
https://github.com/sbrl/research-rainfallradar
synced 2024-11-04 17:13:02 +00:00
it works
.....I think
This commit is contained in:
parent
1297f41105
commit
5b2d71f41f
6 changed files with 50 additions and 15 deletions
24
package-lock.json
generated
Normal file
24
package-lock.json
generated
Normal file
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"name": "PhD-Rainfall-Radar",
|
||||
"lockfileVersion": 2,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"dependencies": {
|
||||
"duplex-child-process": "^1.0.1"
|
||||
}
|
||||
},
|
||||
"node_modules/duplex-child-process": {
|
||||
"version": "1.0.1",
|
||||
"resolved": "https://registry.npmjs.org/duplex-child-process/-/duplex-child-process-1.0.1.tgz",
|
||||
"integrity": "sha512-tWbt4tyioDjyK5nh+qicbdvBvNjSXsTUF5zKUwSauuKPg1mokjwn/HezwfvWhh6hXoLdgetY+ZlzU/sMwUMJkg=="
|
||||
}
|
||||
},
|
||||
"dependencies": {
|
||||
"duplex-child-process": {
|
||||
"version": "1.0.1",
|
||||
"resolved": "https://registry.npmjs.org/duplex-child-process/-/duplex-child-process-1.0.1.tgz",
|
||||
"integrity": "sha512-tWbt4tyioDjyK5nh+qicbdvBvNjSXsTUF5zKUwSauuKPg1mokjwn/HezwfvWhh6hXoLdgetY+ZlzU/sMwUMJkg=="
|
||||
}
|
||||
}
|
||||
}
|
5
package.json
Normal file
5
package.json
Normal file
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"dependencies": {
|
||||
"duplex-child-process": "^1.0.1"
|
||||
}
|
||||
}
|
|
@ -27,7 +27,6 @@ class RadarWrangler {
|
|||
|
||||
const result = this.make_sample(timestep_buffer);
|
||||
if(result == null) continue;
|
||||
|
||||
yield result;
|
||||
|
||||
timestep_buffer.shift();
|
||||
|
@ -50,7 +49,7 @@ class RadarWrangler {
|
|||
offset += channel_timestep_count;
|
||||
}
|
||||
|
||||
return grouped_timesteps;
|
||||
return Promise.all(grouped_timesteps);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import path from 'path';
|
|||
import RecordBuilder from '../record/RecordBuilder.mjs';
|
||||
import RecordsWriter from '../record/RecordsWriter.mjs';
|
||||
import pretty_ms from 'pretty-ms';
|
||||
import terrain50_analyse_frequencies from 'terrain50/src/static/Terrain50AnalyseFrequencies.mjs';
|
||||
|
||||
class RecordWrangler {
|
||||
#builder = new RecordBuilder();
|
||||
|
@ -14,6 +15,8 @@ class RecordWrangler {
|
|||
this.dirpath = dirpath;
|
||||
this.count_per_file = count_per_file;
|
||||
|
||||
this.display_interval = 2 * 1000;
|
||||
|
||||
if(!fs.existsSync(this.dirpath))
|
||||
fs.mkdirSync(this.dirpath);
|
||||
}
|
||||
|
@ -22,24 +25,23 @@ class RecordWrangler {
|
|||
// TODO: Shuffle stuff about in the *Python* data pipeline
|
||||
|
||||
let writer = null;
|
||||
let i = -1, i_file = 0, count_this_file = 0, time_start = new Date();
|
||||
let i = 0, i_file = 0, count_this_file = 0, time_start = new Date(), time_display = time_start;
|
||||
while(true) {
|
||||
i++;
|
||||
|
||||
console.log(`RecordWriter step ${i}`);
|
||||
|
||||
// Start writing to a new file when necessary
|
||||
if(writer == null || count_this_file > this.count_per_file) {
|
||||
if(writer !== null) await writer.close();
|
||||
const filepath_next = path.join(this.dirpath, `${i_file}.jsonl.gz`);
|
||||
writer = new RecordsWriter(filepath_next);
|
||||
console.log(`RecordWriter NEW FILE ${filepath_next}`);
|
||||
i_file++;
|
||||
count_this_file = 0;
|
||||
}
|
||||
|
||||
count_this_file++;
|
||||
|
||||
const sample_radar = await reader_radar.next();
|
||||
const sample_water = await reader_water.next();
|
||||
|
||||
if(sample_radar.done || sample_water.done) break;
|
||||
|
||||
const example_next = this.make_example(
|
||||
|
@ -49,14 +51,19 @@ class RecordWrangler {
|
|||
|
||||
await writer.write(example_next);
|
||||
|
||||
process.stderr.write(`Elapsed: ${pretty_ms(new Date() - time_start)}, Written ${count_this_file}/${i_file}/${i} examples/files/total\r`);
|
||||
const time_now = new Date();
|
||||
if(time_now - time_display > this.display_interval) {
|
||||
const elapsed = new Date() - time_start;
|
||||
process.stderr.write(`Elapsed: ${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })}, Written ${count_this_file}/${i_file}/${i} examples/files/total | ${(1000 / (elapsed / i)).toFixed(2)} batches/sec | ${this.count_per_file - count_this_file} left for this file\r`);
|
||||
time_display = time_now;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
make_example(sample_radar, sample_water) {
|
||||
this.#builder.add("rainfallradar", sample_radar);
|
||||
this.#builder.add("waterdepth", sample_water.flat);
|
||||
this.#builder.add("waterdepth", sample_water);
|
||||
return this.#builder.release();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ export default async function array2d_pool(channels, operator="max") {
|
|||
const tensor = tf.tensor(channels);
|
||||
return tf.max(tensor, 0, false);
|
||||
});
|
||||
|
||||
const result_array = await result_tensor.array();
|
||||
result_tensor.dispose();
|
||||
|
||||
|
|
|
@ -2,21 +2,22 @@
|
|||
|
||||
import fs from 'fs';
|
||||
|
||||
import SpawnStream from 'spawn-stream';
|
||||
import ChildProcess from 'duplex-child-process';
|
||||
|
||||
import { write_safe, end_safe } from '../io/StreamHelpers.mjs';
|
||||
|
||||
class RecordsWriter {
|
||||
#stream_out = fs.createWriteStream(filepath);
|
||||
#gzip = SpawnStream("gzip");
|
||||
#stream_out = null;
|
||||
#gzip = ChildProcess.spawn("gzip");
|
||||
|
||||
constructor(filepath) {
|
||||
this.#stream_out = fs.createWriteStream(filepath);
|
||||
this.#gzip.pipe(this.#stream_out);
|
||||
}
|
||||
|
||||
async write(sample) {
|
||||
console.log(sample);
|
||||
await write_safe(this.#gzip, JSON.stringify(sample));
|
||||
const str = JSON.stringify(Object.fromEntries(sample));
|
||||
await write_safe(this.#gzip, str);
|
||||
}
|
||||
|
||||
async close() {
|
||||
|
|
Loading…
Reference in a new issue