diff --git a/aimodel/requirements.txt b/aimodel/requirements.txt index 8145e77..707c092 100644 --- a/aimodel/requirements.txt +++ b/aimodel/requirements.txt @@ -1 +1,2 @@ -tensorflow>=2.4 \ No newline at end of file +tensorflow>=2.4 +silence_tensorflow \ No newline at end of file diff --git a/rainfallwrangler/src/lib/python/json2tfrecord.py b/rainfallwrangler/src/lib/python/json2tfrecord.py index 667a98e..1c9619c 100755 --- a/rainfallwrangler/src/lib/python/json2tfrecord.py +++ b/rainfallwrangler/src/lib/python/json2tfrecord.py @@ -6,6 +6,9 @@ import gzip import json import argparse +from silence_tensorflow import silence_tensorflow +if not os.environ.get("NO_SILENCE"): + silence_tensorflow() import tensorflow as tf # TO PARSE: @@ -59,7 +62,7 @@ def convert(filepath_in, filepath_out): ## 3: Print shape definitions (required when parsing) ### if i == 0: - print("SHAPES\t"+json.dumps({ "rainfallradar": rainfall.shape.as_list(), "waterdepth": water.shape.as_list() })) + print("SHAPES\t"+json.dumps({ "rainfallradar": rainfall.shape.as_list(), "waterdepth": water.shape.as_list() }), flush=True) ### ## 4: Serialise tensors @@ -76,7 +79,8 @@ def convert(filepath_in, filepath_out): })) writer.write(record.SerializeToString()) - print(i) + print(f"{i}", flush=True) + def main(): diff --git a/rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs b/rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs index ed310db..248a1c6 100644 --- a/rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs +++ b/rainfallwrangler/src/lib/python/py_jsonl2tfrecord.mjs @@ -1,45 +1,54 @@ "use strict"; import fs from 'fs'; +import path from 'path'; import child_process from 'child_process'; +import { Readable } from 'stream'; -import nexline from nexline; +import nexline from 'nexline'; -import log from './NamespacedLog.mjs'; const l = log("gzipchildprocess"); -import { end_safe } from './StreamHelpers.mjs'; -import { fstat } from 'fs'; +import log from '../io/NamespacedLog.mjs'; const l = log("gzipchildprocess"); +// import { end_safe } from '../io/StreamHelpers.mjs'; +function snore(ms) { + return new Promise((resolve, _reject) => setTimeout(resolve, ms)); +} const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/")); async function* py_jsonl2tfrecord(filepath_source, filepath_target, filepath_meta=null) { - // get stdin() { return this.child_process.stdin; } - // get stdout() { return this.child_process.stdout; } - // get stderr() { return this.child_process.stderr; } + // get stdin() { return this.converter.stdin; } + // get stdout() { return this.converter.stdout; } + // get stderr() { return this.converter.stderr; } + const env = {}; Object.assign(env, process.env); + if(filepath_meta !== null) env["NO_SILENCE"] = "NO_SILENCE"; - child_process = child_process.spawn( + const converter = child_process.spawn( "python3", [ path.join(__dirname, "json2tfrecord.py"), "--input", filepath_source, "--output", filepath_target ], { // TODO: detect binary - python3 vs python // Pipe stdin + stdout; send error to the parent process - stdio: [ "ignore", "pipe", "inherit" ] + stdio: [ "ignore", "pipe", "inherit" ], + env } ); + // converter.stdout.on("data", (chunk) => console.log(`DEBUG chunk`, chunk)); - const reader = nexline({ input: child_process.stdout }); + const reader = nexline({ input: new Readable().wrap(converter.stdout) }); for await(const line of reader) { - if(line.startsWith("SHAPE") && filepath_meta !== null ) { - await fs.promises.writeFile( - filepath_meta, - line.split(/\t+/)[1] - ); + if(line.startsWith("SHAPES\t")) { + if(filepath_meta !== null) { + await fs.promises.writeFile( + filepath_meta, + line.split(/\t+/)[1] + ); + } continue; } - yield parseInt(line, 10); } } diff --git a/rainfallwrangler/src/lib/record/jsonl_to_tf.mjs b/rainfallwrangler/src/lib/record/jsonl_to_tf.mjs index 2228e3f..1aedd85 100644 --- a/rainfallwrangler/src/lib/record/jsonl_to_tf.mjs +++ b/rainfallwrangler/src/lib/record/jsonl_to_tf.mjs @@ -2,6 +2,7 @@ import fs from 'fs'; import path from 'path'; +import os from 'os'; import p_map from 'p-map'; import pretty_ms from 'pretty-ms'; @@ -9,19 +10,28 @@ import debounce from '../async/debounce.mjs'; import py_jsonl2tfrecord from '../python/py_jsonl2tfrecord.mjs'; import log from '../../lib/io/NamespacedLog.mjs'; const l = log("jsonl2tf"); + +/** + * Converts a directory of .jsonl.gz files to .tfrecord.gz files. + * @param {string} dirpath_source The source directory to read from. + * @param {string} dirpath_target The target directory to write to. + * @return {void} + */ export default async function(dirpath_source, dirpath_target) { - const files = fs.promises.readdir(dirpath_source); + const files = await fs.promises.readdir(dirpath_source); let time_start = new Date(), lines_processed = 0, files_complete = 0; - const update_progress = debounce(() => { + const update_progress_force = () => { process.stdout.write(`${files_complete}/${lines_processed} files/lines complete | ${((new Date() - time_start) / lines_processed).toFixed(3)} lines/sec | ${((files_processed / files.length)*100).toFixed(2)}% complete\r`); - }); + }; + const update_progress = debounce(update_progress_force); - p_map(files, async (filename, i) => { + await p_map(files, async (filename, i) => { const filepath_source = path.join(dirpath_source, filename); - const filepath_dest = path.join(dirpath_target, filename); + const filepath_dest = path.join(dirpath_target, filename.replace(/\.jsonl\.gz$/, ".tfrecord.gz")); const filepath_meta = i === 0 ? path.join(dirpath_target, `metadata.json`) : null; + l.log(`start ${i} | ${filename} | META ${filepath_meta}`); let time_start = new Date(), lines_done = 0; for await (let line_number of py_jsonl2tfrecord(filepath_source, filepath_dest, filepath_meta)) { lines_processed++; @@ -30,5 +40,7 @@ export default async function(dirpath_source, dirpath_target) { } files_complete++; l.log(`converted ${filename}: ${lines_done} lines @ ${pretty_ms((new Date() - time_start) / lines_done)}`); - }); + }, { concurrency: os.cpus().length }); + update_progress_force(); + l.log(`complete: ${lines_processed}/${files_complete} lines/files processed in ${pretty_ms(new Date() - time_start)}`); } diff --git a/rainfallwrangler/src/subcommands/jsonl2tfrecord/jsonl2tfrecord.mjs b/rainfallwrangler/src/subcommands/jsonl2tfrecord/jsonl2tfrecord.mjs new file mode 100644 index 0000000..59f9a94 --- /dev/null +++ b/rainfallwrangler/src/subcommands/jsonl2tfrecord/jsonl2tfrecord.mjs @@ -0,0 +1,21 @@ +"use strict"; + +import fs from 'fs'; + +import settings from "../../settings.mjs"; + +import jsonl_to_tf from '../../lib/record/jsonl_to_tf.mjs'; + +export default async function() { + if(typeof settings.source !== "string") + throw new Error(`Error: No source directory specified (see the --source CLI argument)`); + if(typeof settings.target !== "string") + throw new Error(`Error: No target directory specified (see the --target CLI argument)`); + + if(!fs.existsSync(settings.source)) + throw new Error(`Error: The source directory at '${settings.source}' doesn't exist or you haven't got permission to access it.`); + if(!fs.existsSync(settings.target)) + await fs.promises.mkdir(settings.target); + + await jsonl_to_tf(settings.source, settings.target); +} diff --git a/rainfallwrangler/src/subcommands/jsonl2tfrecord/meta.mjs b/rainfallwrangler/src/subcommands/jsonl2tfrecord/meta.mjs new file mode 100644 index 0000000..950391b --- /dev/null +++ b/rainfallwrangler/src/subcommands/jsonl2tfrecord/meta.mjs @@ -0,0 +1,7 @@ +"use strict"; + +export default function(cli) { + cli.subcommand("jsonl2tfrecord", "Convert a directory of .jsonl.gz files to .tfrecord.gz files.") + .argument("source", "Path to the source directory.", null, "string") + .argument("target", "Path to the target directory.", null, "string"); +}