write glue for .jsonl.gz → .tfrecord.gz converter

This commit is contained in:
Starbeamrainbowlabs 2022-08-08 15:33:59 +01:00
parent f3652edf82
commit 222a6146ec
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
6 changed files with 79 additions and 25 deletions

View file

@ -1 +1,2 @@
tensorflow>=2.4
tensorflow>=2.4
silence_tensorflow

View file

@ -6,6 +6,9 @@ import gzip
import json
import argparse
from silence_tensorflow import silence_tensorflow
if not os.environ.get("NO_SILENCE"):
silence_tensorflow()
import tensorflow as tf
# TO PARSE:
@ -59,7 +62,7 @@ def convert(filepath_in, filepath_out):
## 3: Print shape definitions (required when parsing)
###
if i == 0:
print("SHAPES\t"+json.dumps({ "rainfallradar": rainfall.shape.as_list(), "waterdepth": water.shape.as_list() }))
print("SHAPES\t"+json.dumps({ "rainfallradar": rainfall.shape.as_list(), "waterdepth": water.shape.as_list() }), flush=True)
###
## 4: Serialise tensors
@ -76,7 +79,8 @@ def convert(filepath_in, filepath_out):
}))
writer.write(record.SerializeToString())
print(i)
print(f"{i}", flush=True)
def main():

View file

@ -1,45 +1,54 @@
"use strict";
import fs from 'fs';
import path from 'path';
import child_process from 'child_process';
import { Readable } from 'stream';
import nexline from nexline;
import nexline from 'nexline';
import log from './NamespacedLog.mjs'; const l = log("gzipchildprocess");
import { end_safe } from './StreamHelpers.mjs';
import { fstat } from 'fs';
import log from '../io/NamespacedLog.mjs'; const l = log("gzipchildprocess");
// import { end_safe } from '../io/StreamHelpers.mjs';
function snore(ms) {
return new Promise((resolve, _reject) => setTimeout(resolve, ms));
}
const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/"));
async function* py_jsonl2tfrecord(filepath_source, filepath_target, filepath_meta=null) {
// get stdin() { return this.child_process.stdin; }
// get stdout() { return this.child_process.stdout; }
// get stderr() { return this.child_process.stderr; }
// get stdin() { return this.converter.stdin; }
// get stdout() { return this.converter.stdout; }
// get stderr() { return this.converter.stderr; }
const env = {}; Object.assign(env, process.env);
if(filepath_meta !== null) env["NO_SILENCE"] = "NO_SILENCE";
child_process = child_process.spawn(
const converter = child_process.spawn(
"python3", [
path.join(__dirname, "json2tfrecord.py"),
"--input", filepath_source,
"--output", filepath_target
], { // TODO: detect binary - python3 vs python
// Pipe stdin + stdout; send error to the parent process
stdio: [ "ignore", "pipe", "inherit" ]
stdio: [ "ignore", "pipe", "inherit" ],
env
}
);
// converter.stdout.on("data", (chunk) => console.log(`DEBUG chunk`, chunk));
const reader = nexline({ input: child_process.stdout });
const reader = nexline({ input: new Readable().wrap(converter.stdout) });
for await(const line of reader) {
if(line.startsWith("SHAPE") && filepath_meta !== null ) {
await fs.promises.writeFile(
filepath_meta,
line.split(/\t+/)[1]
);
if(line.startsWith("SHAPES\t")) {
if(filepath_meta !== null) {
await fs.promises.writeFile(
filepath_meta,
line.split(/\t+/)[1]
);
}
continue;
}
yield parseInt(line, 10);
}
}

View file

@ -2,6 +2,7 @@
import fs from 'fs';
import path from 'path';
import os from 'os';
import p_map from 'p-map';
import pretty_ms from 'pretty-ms';
@ -9,19 +10,28 @@ import debounce from '../async/debounce.mjs';
import py_jsonl2tfrecord from '../python/py_jsonl2tfrecord.mjs';
import log from '../../lib/io/NamespacedLog.mjs'; const l = log("jsonl2tf");
/**
* Converts a directory of .jsonl.gz files to .tfrecord.gz files.
* @param {string} dirpath_source The source directory to read from.
* @param {string} dirpath_target The target directory to write to.
* @return {void}
*/
export default async function(dirpath_source, dirpath_target) {
const files = fs.promises.readdir(dirpath_source);
const files = await fs.promises.readdir(dirpath_source);
let time_start = new Date(), lines_processed = 0, files_complete = 0;
const update_progress = debounce(() => {
const update_progress_force = () => {
process.stdout.write(`${files_complete}/${lines_processed} files/lines complete | ${((new Date() - time_start) / lines_processed).toFixed(3)} lines/sec | ${((files_processed / files.length)*100).toFixed(2)}% complete\r`);
});
};
const update_progress = debounce(update_progress_force);
p_map(files, async (filename, i) => {
await p_map(files, async (filename, i) => {
const filepath_source = path.join(dirpath_source, filename);
const filepath_dest = path.join(dirpath_target, filename);
const filepath_dest = path.join(dirpath_target, filename.replace(/\.jsonl\.gz$/, ".tfrecord.gz"));
const filepath_meta = i === 0 ? path.join(dirpath_target, `metadata.json`) : null;
l.log(`start ${i} | ${filename} | META ${filepath_meta}`);
let time_start = new Date(), lines_done = 0;
for await (let line_number of py_jsonl2tfrecord(filepath_source, filepath_dest, filepath_meta)) {
lines_processed++;
@ -30,5 +40,7 @@ export default async function(dirpath_source, dirpath_target) {
}
files_complete++;
l.log(`converted ${filename}: ${lines_done} lines @ ${pretty_ms((new Date() - time_start) / lines_done)}`);
});
}, { concurrency: os.cpus().length });
update_progress_force();
l.log(`complete: ${lines_processed}/${files_complete} lines/files processed in ${pretty_ms(new Date() - time_start)}`);
}

View file

@ -0,0 +1,21 @@
"use strict";
import fs from 'fs';
import settings from "../../settings.mjs";
import jsonl_to_tf from '../../lib/record/jsonl_to_tf.mjs';
export default async function() {
if(typeof settings.source !== "string")
throw new Error(`Error: No source directory specified (see the --source CLI argument)`);
if(typeof settings.target !== "string")
throw new Error(`Error: No target directory specified (see the --target CLI argument)`);
if(!fs.existsSync(settings.source))
throw new Error(`Error: The source directory at '${settings.source}' doesn't exist or you haven't got permission to access it.`);
if(!fs.existsSync(settings.target))
await fs.promises.mkdir(settings.target);
await jsonl_to_tf(settings.source, settings.target);
}

View file

@ -0,0 +1,7 @@
"use strict";
export default function(cli) {
cli.subcommand("jsonl2tfrecord", "Convert a directory of .jsonl.gz files to .tfrecord.gz files.")
.argument("source", "Path to the source directory.", null, "string")
.argument("target", "Path to the target directory.", null, "string");
}