diff --git a/rainfallwrangler/package-lock.json b/rainfallwrangler/package-lock.json index 852b2d0..d937ded 100644 --- a/rainfallwrangler/package-lock.json +++ b/rainfallwrangler/package-lock.json @@ -13,6 +13,7 @@ "applause-cli": "^1.8.1", "gunzip-maybe": "^1.4.2", "nexline": "^1.2.2", + "p-debounce": "^4.0.0", "p-map": "^5.5.0", "p-reflect": "^3.0.0", "pretty-ms": "^8.0.0", @@ -1108,6 +1109,14 @@ "wrappy": "1" } }, + "node_modules/p-debounce": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/p-debounce/-/p-debounce-4.0.0.tgz", + "integrity": "sha512-4Ispi9I9qYGO4lueiLDhe4q4iK5ERK8reLsuzH6BPaXn53EGaua8H66PXIFGrW897hwjXp+pVLrm/DLxN0RF0A==", + "engines": { + "node": ">=12" + } + }, "node_modules/p-map": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", @@ -2477,6 +2486,11 @@ "wrappy": "1" } }, + "p-debounce": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/p-debounce/-/p-debounce-4.0.0.tgz", + "integrity": "sha512-4Ispi9I9qYGO4lueiLDhe4q4iK5ERK8reLsuzH6BPaXn53EGaua8H66PXIFGrW897hwjXp+pVLrm/DLxN0RF0A==" + }, "p-map": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", diff --git a/rainfallwrangler/package.json b/rainfallwrangler/package.json index 0a5964a..dc68942 100644 --- a/rainfallwrangler/package.json +++ b/rainfallwrangler/package.json @@ -17,6 +17,7 @@ "applause-cli": "^1.8.1", "gunzip-maybe": "^1.4.2", "nexline": "^1.2.2", + "p-debounce": "^4.0.0", "p-map": "^5.5.0", "p-reflect": "^3.0.0", "pretty-ms": "^8.0.0", diff --git a/rainfallwrangler/src/lib/async/debounce.mjs b/rainfallwrangler/src/lib/async/debounce.mjs new file mode 100644 index 0000000..a21f08c --- /dev/null +++ b/rainfallwrangler/src/lib/async/debounce.mjs @@ -0,0 +1,20 @@ +"use strict"; + +/** + * Returns a function that debounces the given function. + * The function will only be called every INTERVAL number of milliseconds - additional + * calls return immediately without calling the provided function. + * Useful e.g. debouncing the scroll event in a browser. + * Note that p-debounce throttles the function, whereas this debounce implementation CANCELS additional calls to it. + * @param {function} fn The function to debounce. + * @param {number} interval The interval - in milliseconds - that the function should be called at. + * @return {function} A debounced wrapper function around the specified function. + */ +export default function(fn, interval) { + const time_last = 0; + return (...args) => { + const now = new Date(); + if(now - time_last > interval) + fn(...args); + } +} \ No newline at end of file diff --git a/rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs b/rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs new file mode 100644 index 0000000..ed310db --- /dev/null +++ b/rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs @@ -0,0 +1,47 @@ +"use strict"; + +import fs from 'fs'; +import child_process from 'child_process'; + +import nexline from nexline; + +import log from './NamespacedLog.mjs'; const l = log("gzipchildprocess"); +import { end_safe } from './StreamHelpers.mjs'; +import { fstat } from 'fs'; + + +const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/")); + +async function* py_jsonl2tfrecord(filepath_source, filepath_target, filepath_meta=null) { + // get stdin() { return this.child_process.stdin; } + // get stdout() { return this.child_process.stdout; } + // get stderr() { return this.child_process.stderr; } + + + child_process = child_process.spawn( + "python3", [ + path.join(__dirname, "json2tfrecord.py"), + "--input", filepath_source, + "--output", filepath_target + ], { // TODO: detect binary - python3 vs python + // Pipe stdin + stdout; send error to the parent process + stdio: [ "ignore", "pipe", "inherit" ] + } + ); + + const reader = nexline({ input: child_process.stdout }); + + for await(const line of reader) { + if(line.startsWith("SHAPE") && filepath_meta !== null ) { + await fs.promises.writeFile( + filepath_meta, + line.split(/\t+/)[1] + ); + continue; + } + + yield parseInt(line, 10); + } +} + +export default py_jsonl2tfrecord; diff --git a/rainfallwrangler/src/lib/record/jsonl_to_tf.mjs b/rainfallwrangler/src/lib/record/jsonl_to_tf.mjs new file mode 100644 index 0000000..2228e3f --- /dev/null +++ b/rainfallwrangler/src/lib/record/jsonl_to_tf.mjs @@ -0,0 +1,34 @@ +"use strict"; + +import fs from 'fs'; +import path from 'path'; +import p_map from 'p-map'; +import pretty_ms from 'pretty-ms'; + +import debounce from '../async/debounce.mjs'; +import py_jsonl2tfrecord from '../python/py_jsonl2tfrecord.mjs'; +import log from '../../lib/io/NamespacedLog.mjs'; const l = log("jsonl2tf"); + +export default async function(dirpath_source, dirpath_target) { + const files = fs.promises.readdir(dirpath_source); + + let time_start = new Date(), lines_processed = 0, files_complete = 0; + + const update_progress = debounce(() => { + process.stdout.write(`${files_complete}/${lines_processed} files/lines complete | ${((new Date() - time_start) / lines_processed).toFixed(3)} lines/sec | ${((files_processed / files.length)*100).toFixed(2)}% complete\r`); + }); + + p_map(files, async (filename, i) => { + const filepath_source = path.join(dirpath_source, filename); + const filepath_dest = path.join(dirpath_target, filename); + const filepath_meta = i === 0 ? path.join(dirpath_target, `metadata.json`) : null; + let time_start = new Date(), lines_done = 0; + for await (let line_number of py_jsonl2tfrecord(filepath_source, filepath_dest, filepath_meta)) { + lines_processed++; + lines_done = line_number; + update_progress(); + } + files_complete++; + l.log(`converted ${filename}: ${lines_done} lines @ ${pretty_ms((new Date() - time_start) / lines_done)}`); + }); +}