mirror of
https://github.com/sbrl/research-rainfallradar
synced 2024-11-24 18:23:01 +00:00
Create (untested) JS interface to Python jsonl→tfrecord converter
also test Python .jsonl.gz → .tfrecord.gz
This commit is contained in:
parent
a02c3436ab
commit
9399d1d8f5
5 changed files with 116 additions and 0 deletions
14
rainfallwrangler/package-lock.json
generated
14
rainfallwrangler/package-lock.json
generated
|
@ -13,6 +13,7 @@
|
||||||
"applause-cli": "^1.8.1",
|
"applause-cli": "^1.8.1",
|
||||||
"gunzip-maybe": "^1.4.2",
|
"gunzip-maybe": "^1.4.2",
|
||||||
"nexline": "^1.2.2",
|
"nexline": "^1.2.2",
|
||||||
|
"p-debounce": "^4.0.0",
|
||||||
"p-map": "^5.5.0",
|
"p-map": "^5.5.0",
|
||||||
"p-reflect": "^3.0.0",
|
"p-reflect": "^3.0.0",
|
||||||
"pretty-ms": "^8.0.0",
|
"pretty-ms": "^8.0.0",
|
||||||
|
@ -1108,6 +1109,14 @@
|
||||||
"wrappy": "1"
|
"wrappy": "1"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/p-debounce": {
|
||||||
|
"version": "4.0.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/p-debounce/-/p-debounce-4.0.0.tgz",
|
||||||
|
"integrity": "sha512-4Ispi9I9qYGO4lueiLDhe4q4iK5ERK8reLsuzH6BPaXn53EGaua8H66PXIFGrW897hwjXp+pVLrm/DLxN0RF0A==",
|
||||||
|
"engines": {
|
||||||
|
"node": ">=12"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/p-map": {
|
"node_modules/p-map": {
|
||||||
"version": "5.5.0",
|
"version": "5.5.0",
|
||||||
"resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz",
|
"resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz",
|
||||||
|
@ -2477,6 +2486,11 @@
|
||||||
"wrappy": "1"
|
"wrappy": "1"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"p-debounce": {
|
||||||
|
"version": "4.0.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/p-debounce/-/p-debounce-4.0.0.tgz",
|
||||||
|
"integrity": "sha512-4Ispi9I9qYGO4lueiLDhe4q4iK5ERK8reLsuzH6BPaXn53EGaua8H66PXIFGrW897hwjXp+pVLrm/DLxN0RF0A=="
|
||||||
|
},
|
||||||
"p-map": {
|
"p-map": {
|
||||||
"version": "5.5.0",
|
"version": "5.5.0",
|
||||||
"resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz",
|
"resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz",
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
"applause-cli": "^1.8.1",
|
"applause-cli": "^1.8.1",
|
||||||
"gunzip-maybe": "^1.4.2",
|
"gunzip-maybe": "^1.4.2",
|
||||||
"nexline": "^1.2.2",
|
"nexline": "^1.2.2",
|
||||||
|
"p-debounce": "^4.0.0",
|
||||||
"p-map": "^5.5.0",
|
"p-map": "^5.5.0",
|
||||||
"p-reflect": "^3.0.0",
|
"p-reflect": "^3.0.0",
|
||||||
"pretty-ms": "^8.0.0",
|
"pretty-ms": "^8.0.0",
|
||||||
|
|
20
rainfallwrangler/src/lib/async/debounce.mjs
Normal file
20
rainfallwrangler/src/lib/async/debounce.mjs
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
"use strict";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a function that debounces the given function.
|
||||||
|
* The function will only be called every INTERVAL number of milliseconds - additional
|
||||||
|
* calls return immediately without calling the provided function.
|
||||||
|
* Useful e.g. debouncing the scroll event in a browser.
|
||||||
|
* Note that p-debounce throttles the function, whereas this debounce implementation CANCELS additional calls to it.
|
||||||
|
* @param {function} fn The function to debounce.
|
||||||
|
* @param {number} interval The interval - in milliseconds - that the function should be called at.
|
||||||
|
* @return {function} A debounced wrapper function around the specified function.
|
||||||
|
*/
|
||||||
|
export default function(fn, interval) {
|
||||||
|
const time_last = 0;
|
||||||
|
return (...args) => {
|
||||||
|
const now = new Date();
|
||||||
|
if(now - time_last > interval)
|
||||||
|
fn(...args);
|
||||||
|
}
|
||||||
|
}
|
47
rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs
Normal file
47
rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
"use strict";
|
||||||
|
|
||||||
|
import fs from 'fs';
|
||||||
|
import child_process from 'child_process';
|
||||||
|
|
||||||
|
import nexline from nexline;
|
||||||
|
|
||||||
|
import log from './NamespacedLog.mjs'; const l = log("gzipchildprocess");
|
||||||
|
import { end_safe } from './StreamHelpers.mjs';
|
||||||
|
import { fstat } from 'fs';
|
||||||
|
|
||||||
|
|
||||||
|
const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/"));
|
||||||
|
|
||||||
|
async function* py_jsonl2tfrecord(filepath_source, filepath_target, filepath_meta=null) {
|
||||||
|
// get stdin() { return this.child_process.stdin; }
|
||||||
|
// get stdout() { return this.child_process.stdout; }
|
||||||
|
// get stderr() { return this.child_process.stderr; }
|
||||||
|
|
||||||
|
|
||||||
|
child_process = child_process.spawn(
|
||||||
|
"python3", [
|
||||||
|
path.join(__dirname, "json2tfrecord.py"),
|
||||||
|
"--input", filepath_source,
|
||||||
|
"--output", filepath_target
|
||||||
|
], { // TODO: detect binary - python3 vs python
|
||||||
|
// Pipe stdin + stdout; send error to the parent process
|
||||||
|
stdio: [ "ignore", "pipe", "inherit" ]
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const reader = nexline({ input: child_process.stdout });
|
||||||
|
|
||||||
|
for await(const line of reader) {
|
||||||
|
if(line.startsWith("SHAPE") && filepath_meta !== null ) {
|
||||||
|
await fs.promises.writeFile(
|
||||||
|
filepath_meta,
|
||||||
|
line.split(/\t+/)[1]
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
yield parseInt(line, 10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default py_jsonl2tfrecord;
|
34
rainfallwrangler/src/lib/record/jsonl_to_tf.mjs
Normal file
34
rainfallwrangler/src/lib/record/jsonl_to_tf.mjs
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
"use strict";
|
||||||
|
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
import p_map from 'p-map';
|
||||||
|
import pretty_ms from 'pretty-ms';
|
||||||
|
|
||||||
|
import debounce from '../async/debounce.mjs';
|
||||||
|
import py_jsonl2tfrecord from '../python/py_jsonl2tfrecord.mjs';
|
||||||
|
import log from '../../lib/io/NamespacedLog.mjs'; const l = log("jsonl2tf");
|
||||||
|
|
||||||
|
export default async function(dirpath_source, dirpath_target) {
|
||||||
|
const files = fs.promises.readdir(dirpath_source);
|
||||||
|
|
||||||
|
let time_start = new Date(), lines_processed = 0, files_complete = 0;
|
||||||
|
|
||||||
|
const update_progress = debounce(() => {
|
||||||
|
process.stdout.write(`${files_complete}/${lines_processed} files/lines complete | ${((new Date() - time_start) / lines_processed).toFixed(3)} lines/sec | ${((files_processed / files.length)*100).toFixed(2)}% complete\r`);
|
||||||
|
});
|
||||||
|
|
||||||
|
p_map(files, async (filename, i) => {
|
||||||
|
const filepath_source = path.join(dirpath_source, filename);
|
||||||
|
const filepath_dest = path.join(dirpath_target, filename);
|
||||||
|
const filepath_meta = i === 0 ? path.join(dirpath_target, `metadata.json`) : null;
|
||||||
|
let time_start = new Date(), lines_done = 0;
|
||||||
|
for await (let line_number of py_jsonl2tfrecord(filepath_source, filepath_dest, filepath_meta)) {
|
||||||
|
lines_processed++;
|
||||||
|
lines_done = line_number;
|
||||||
|
update_progress();
|
||||||
|
}
|
||||||
|
files_complete++;
|
||||||
|
l.log(`converted ${filename}: ${lines_done} lines @ ${pretty_ms((new Date() - time_start) / lines_done)}`);
|
||||||
|
});
|
||||||
|
}
|
Loading…
Reference in a new issue