Add new recompress subcommand

also fix typos, CLI definitions
This commit is contained in:
Starbeamrainbowlabs 2022-07-25 17:54:23 +01:00
parent d9b9a4f9fc
commit 3332fa598a
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
6 changed files with 36 additions and 5 deletions

View file

@ -11,7 +11,9 @@ import gunzip from 'gunzip-maybe';
import RecordsWriter from './RecordsWriter.mjs';
async function records_recompress(dirpath_source, dirpath_target, items_per_file) {
const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename));
const files = (await fs.promises.readdir(dirpath_source))
.filter(filename => filename.endsWith(`.jsonl.gz`))
.map(filename => path.join(dirpath_source, filename));
const reader = nexline({
input: files.map(filepath => new Readable().wrap(fs.createReadStream(filepath).pipe(gunzip())))
@ -39,7 +41,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file
if(new Date() - time_display > 500) {
const elapsed = new Date() - time_start;
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/thisfile/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file \r`);
process.stderr.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/thisfile/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file \r`);
time_display = new Date();
}
}

View file

@ -0,0 +1,8 @@
"use strict";
export default function(cli) {
cli.subcommand("recompress", "Recompress a source to a target directory with a given number of records per file.")
.argument("source", "Path to the source directory.", null, "string")
.argument("target", "Path to the target directory.", null, "string")
.argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64, "integer");
}

View file

@ -0,0 +1,21 @@
"use strict";
import fs from 'fs';
import settings from "../../settings.mjs";
import records_recompress from '../../lib/record/records_recompress.mjs';
export default async function() {
if(typeof settings.source !== "string")
throw new Error(`Error: No source directory specified (see the --source CLI argument)`);
if(typeof settings.target !== "string")
throw new Error(`Error: No target directory specified (see the --target CLI argument)`);
if(!fs.existsSync(settings.source))
throw new Error(`Error: The source directory at '${settings.source}' doesn't exist or you haven't got permission to access it.`);
if(!fs.existsSync(settings.target))
await fs.promises.mkdir(settings.target);
await records_recompress(settings.source, settings.target);
}

View file

@ -4,7 +4,7 @@ export default function(cli) {
cli.subcommand("recordify", "Converts rainfall radar and water depth data to a directory of .jsonl.gz files.")
.argument("water", "Path to the water depths file, formatted as a stream of terrain50 objects. May or may not be gzipped.", null, "string")
.argument("rainfall", "Path to the rainfall radar data, formatted as jsonl. May or may not be gzipped.", null, "string")
.argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64)
.argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64, "integer")
.argument("rainfall-pattern", "The pattern of the number of time steps to average, as a comma-separated list of numbers. Given a point in time, each successive number specified works BACKWARDS from that point. For example, 1,4,10 would be 3 channels: 1 time step on it's own, then average the next 4 time steps, then average the next 10 steps (default: 1,3,3,5,12,24,48).", [1,3,3,5,12,24,48].reverse(), function(value) {
return value.split(",")
.map(el => parseInt(el))

View file

@ -4,5 +4,5 @@ export default function(cli) {
cli.subcommand("uniq", "Deduplicates the entries in a directory of recordified files.")
.argument("source", "Path to the source directory.", null, "string")
.argument("target", "Path to the target directory.", null, "string")
.argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64);
.argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64, "integer");
}

View file

@ -15,5 +15,5 @@ export default async function() {
const uniq_manager = new RecordUniqManager(settings.count_file);
await uniq_manager.deduplicate(settings.source, settings.target);
uniq_manager.close(); // Terminate the workerpool nowt hat we're done
uniq_manager.close(); // Terminate the workerpool now that we're done
}