diff --git a/rainfallwrangler/src/lib/record/records_recompress.mjs b/rainfallwrangler/src/lib/record/records_recompress.mjs index 2a9f5e4..9ae2419 100644 --- a/rainfallwrangler/src/lib/record/records_recompress.mjs +++ b/rainfallwrangler/src/lib/record/records_recompress.mjs @@ -11,7 +11,9 @@ import gunzip from 'gunzip-maybe'; import RecordsWriter from './RecordsWriter.mjs'; async function records_recompress(dirpath_source, dirpath_target, items_per_file) { - const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename)); + const files = (await fs.promises.readdir(dirpath_source)) + .filter(filename => filename.endsWith(`.jsonl.gz`)) + .map(filename => path.join(dirpath_source, filename)); const reader = nexline({ input: files.map(filepath => new Readable().wrap(fs.createReadStream(filepath).pipe(gunzip()))) @@ -39,7 +41,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file if(new Date() - time_display > 500) { const elapsed = new Date() - time_start; - process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/thisfile/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file \r`); + process.stderr.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/thisfile/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file \r`); time_display = new Date(); } } diff --git a/rainfallwrangler/src/subcommands/recompress/meta.mjs b/rainfallwrangler/src/subcommands/recompress/meta.mjs new file mode 100644 index 0000000..4a6aca8 --- /dev/null +++ b/rainfallwrangler/src/subcommands/recompress/meta.mjs @@ -0,0 +1,8 @@ +"use strict"; + +export default function(cli) { + cli.subcommand("recompress", "Recompress a source to a target directory with a given number of records per file.") + .argument("source", "Path to the source directory.", null, "string") + .argument("target", "Path to the target directory.", null, "string") + .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64, "integer"); +} diff --git a/rainfallwrangler/src/subcommands/recompress/recompress.mjs b/rainfallwrangler/src/subcommands/recompress/recompress.mjs new file mode 100644 index 0000000..34333cb --- /dev/null +++ b/rainfallwrangler/src/subcommands/recompress/recompress.mjs @@ -0,0 +1,21 @@ +"use strict"; + +import fs from 'fs'; + +import settings from "../../settings.mjs"; + +import records_recompress from '../../lib/record/records_recompress.mjs'; + +export default async function() { + if(typeof settings.source !== "string") + throw new Error(`Error: No source directory specified (see the --source CLI argument)`); + if(typeof settings.target !== "string") + throw new Error(`Error: No target directory specified (see the --target CLI argument)`); + + if(!fs.existsSync(settings.source)) + throw new Error(`Error: The source directory at '${settings.source}' doesn't exist or you haven't got permission to access it.`); + if(!fs.existsSync(settings.target)) + await fs.promises.mkdir(settings.target); + + await records_recompress(settings.source, settings.target); +} \ No newline at end of file diff --git a/rainfallwrangler/src/subcommands/recordify/meta.mjs b/rainfallwrangler/src/subcommands/recordify/meta.mjs index 8d428ed..ea39200 100644 --- a/rainfallwrangler/src/subcommands/recordify/meta.mjs +++ b/rainfallwrangler/src/subcommands/recordify/meta.mjs @@ -4,7 +4,7 @@ export default function(cli) { cli.subcommand("recordify", "Converts rainfall radar and water depth data to a directory of .jsonl.gz files.") .argument("water", "Path to the water depths file, formatted as a stream of terrain50 objects. May or may not be gzipped.", null, "string") .argument("rainfall", "Path to the rainfall radar data, formatted as jsonl. May or may not be gzipped.", null, "string") - .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64) + .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64, "integer") .argument("rainfall-pattern", "The pattern of the number of time steps to average, as a comma-separated list of numbers. Given a point in time, each successive number specified works BACKWARDS from that point. For example, 1,4,10 would be 3 channels: 1 time step on it's own, then average the next 4 time steps, then average the next 10 steps (default: 1,3,3,5,12,24,48).", [1,3,3,5,12,24,48].reverse(), function(value) { return value.split(",") .map(el => parseInt(el)) diff --git a/rainfallwrangler/src/subcommands/uniq/meta.mjs b/rainfallwrangler/src/subcommands/uniq/meta.mjs index 3468203..00a8c70 100644 --- a/rainfallwrangler/src/subcommands/uniq/meta.mjs +++ b/rainfallwrangler/src/subcommands/uniq/meta.mjs @@ -4,5 +4,5 @@ export default function(cli) { cli.subcommand("uniq", "Deduplicates the entries in a directory of recordified files.") .argument("source", "Path to the source directory.", null, "string") .argument("target", "Path to the target directory.", null, "string") - .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64); + .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64, "integer"); } diff --git a/rainfallwrangler/src/subcommands/uniq/uniq.mjs b/rainfallwrangler/src/subcommands/uniq/uniq.mjs index 27cbac7..2b842b7 100644 --- a/rainfallwrangler/src/subcommands/uniq/uniq.mjs +++ b/rainfallwrangler/src/subcommands/uniq/uniq.mjs @@ -15,5 +15,5 @@ export default async function() { const uniq_manager = new RecordUniqManager(settings.count_file); await uniq_manager.deduplicate(settings.source, settings.target); - uniq_manager.close(); // Terminate the workerpool nowt hat we're done + uniq_manager.close(); // Terminate the workerpool now that we're done } \ No newline at end of file