diff --git a/rainfallwrangler/package-lock.json b/rainfallwrangler/package-lock.json index c267806..852b2d0 100644 --- a/rainfallwrangler/package-lock.json +++ b/rainfallwrangler/package-lock.json @@ -12,9 +12,13 @@ "@tensorflow/tfjs-node-gpu": "^3.18.0", "applause-cli": "^1.8.1", "gunzip-maybe": "^1.4.2", + "nexline": "^1.2.2", + "p-map": "^5.5.0", + "p-reflect": "^3.0.0", "pretty-ms": "^8.0.0", "spawn-stream": "^1.0.2", - "terrain50": "^1.10.1" + "terrain50": "^1.10.1", + "workerpool": "^6.2.1" } }, "node_modules/@mapbox/node-pre-gyp": { @@ -367,6 +371,21 @@ "node": ">= 4.0.0" } }, + "node_modules/aggregate-error": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-4.0.1.tgz", + "integrity": "sha512-0poP0T7el6Vq3rstR8Mn4V/IQrpBLO6POkUSrN7RhyY+GF/InCFShQzsQ39T25gkHhLgSLByyAz+Kjb+c2L98w==", + "dependencies": { + "clean-stack": "^4.0.0", + "indent-string": "^5.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/ansi-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz", @@ -468,6 +487,20 @@ "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", "integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==" }, + "node_modules/clean-stack": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-4.2.0.tgz", + "integrity": "sha512-LYv6XPxoyODi36Dp976riBtSY27VmFo+MKqEU9QCCWyTrdEPDog+RWA7xQWHi6Vbp61j5c4cdzzX1NidnwtUWg==", + "dependencies": { + "escape-string-regexp": "5.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/cliui": { "version": "7.0.4", "resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz", @@ -655,6 +688,17 @@ "node": ">=6" } }, + "node_modules/escape-string-regexp": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz", + "integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/form-data": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.1.tgz", @@ -798,6 +842,17 @@ "node": ">=0.10.0" } }, + "node_modules/indent-string": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-5.0.0.tgz", + "integrity": "sha512-m6FAo/spmsW2Ab2fU35JTYwtOKa2yAwXSwgjSv1TJzh4Mh7mC3lzAOVLBprb72XsTrgkEIsl7YrFNAiDiRhIGg==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/inflight": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", @@ -1053,6 +1108,31 @@ "wrappy": "1" } }, + "node_modules/p-map": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", + "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", + "dependencies": { + "aggregate-error": "^4.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-reflect": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", + "integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/pako": { "version": "0.2.9", "resolved": "https://registry.npmjs.org/pako/-/pako-0.2.9.tgz", @@ -1382,6 +1462,11 @@ "resolved": "https://registry.npmjs.org/wkt-parser/-/wkt-parser-1.3.2.tgz", "integrity": "sha512-A26BOOo7sHAagyxG7iuRhnKMO7Q3mEOiOT4oGUmohtN/Li5wameeU4S6f8vWw6NADTVKljBs8bzA8JPQgSEMVQ==" }, + "node_modules/workerpool": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.2.1.tgz", + "integrity": "sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw==" + }, "node_modules/wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", @@ -1809,6 +1894,15 @@ "es6-promisify": "^5.0.0" } }, + "aggregate-error": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-4.0.1.tgz", + "integrity": "sha512-0poP0T7el6Vq3rstR8Mn4V/IQrpBLO6POkUSrN7RhyY+GF/InCFShQzsQ39T25gkHhLgSLByyAz+Kjb+c2L98w==", + "requires": { + "clean-stack": "^4.0.0", + "indent-string": "^5.0.0" + } + }, "ansi-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz", @@ -1895,6 +1989,14 @@ "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", "integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==" }, + "clean-stack": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-4.2.0.tgz", + "integrity": "sha512-LYv6XPxoyODi36Dp976riBtSY27VmFo+MKqEU9QCCWyTrdEPDog+RWA7xQWHi6Vbp61j5c4cdzzX1NidnwtUWg==", + "requires": { + "escape-string-regexp": "5.0.0" + } + }, "cliui": { "version": "7.0.4", "resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz", @@ -2046,6 +2148,11 @@ "resolved": "https://registry.npmjs.org/escalade/-/escalade-3.1.1.tgz", "integrity": "sha512-k0er2gUkLf8O0zKJiAhmkTnJlTvINGv7ygDNPbeIsX/TJjGJZHuh9B2UxbsaEkmlEo9MfhrSzmhIlhRlI2GXnw==" }, + "escape-string-regexp": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz", + "integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==" + }, "form-data": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.1.tgz", @@ -2162,6 +2269,11 @@ "safer-buffer": ">= 2.1.2 < 3" } }, + "indent-string": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-5.0.0.tgz", + "integrity": "sha512-m6FAo/spmsW2Ab2fU35JTYwtOKa2yAwXSwgjSv1TJzh4Mh7mC3lzAOVLBprb72XsTrgkEIsl7YrFNAiDiRhIGg==" + }, "inflight": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", @@ -2365,6 +2477,19 @@ "wrappy": "1" } }, + "p-map": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", + "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", + "requires": { + "aggregate-error": "^4.0.0" + } + }, + "p-reflect": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", + "integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==" + }, "pako": { "version": "0.2.9", "resolved": "https://registry.npmjs.org/pako/-/pako-0.2.9.tgz", @@ -2637,6 +2762,11 @@ "resolved": "https://registry.npmjs.org/wkt-parser/-/wkt-parser-1.3.2.tgz", "integrity": "sha512-A26BOOo7sHAagyxG7iuRhnKMO7Q3mEOiOT4oGUmohtN/Li5wameeU4S6f8vWw6NADTVKljBs8bzA8JPQgSEMVQ==" }, + "workerpool": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.2.1.tgz", + "integrity": "sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw==" + }, "wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", diff --git a/rainfallwrangler/package.json b/rainfallwrangler/package.json index eff478e..0a5964a 100644 --- a/rainfallwrangler/package.json +++ b/rainfallwrangler/package.json @@ -16,8 +16,12 @@ "@tensorflow/tfjs-node-gpu": "^3.18.0", "applause-cli": "^1.8.1", "gunzip-maybe": "^1.4.2", + "nexline": "^1.2.2", + "p-map": "^5.5.0", + "p-reflect": "^3.0.0", "pretty-ms": "^8.0.0", "spawn-stream": "^1.0.2", - "terrain50": "^1.10.1" + "terrain50": "^1.10.1", + "workerpool": "^6.2.1" } } diff --git a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs new file mode 100644 index 0000000..f75334d --- /dev/null +++ b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs @@ -0,0 +1,135 @@ +"use strict"; + +import fs from 'fs'; +import path from 'path'; +import os from 'os'; + +import workerpool from 'workerpool'; +import p_map from 'p-map'; +import p_reflect from 'p-reflect'; +import pretty_ms from 'pretty-ms'; + +import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:manager"); +import records_recompress from './records_recompress.mjs'; + + +const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/")); + +class RecordUniqManager { + constructor(items_per_file) { + this.items_per_file = items_per_file; + + this.worker_count = os.cpus().length; + this.pool = workerpool.pool(path.join(__dirname, "record_uniq_worker/worker.mjs"), { + maxQueueSize: 100, + maxWorkers: this.worker_count + }); + + this.hashes = new Map(); + this.init_complete = false; + } + + async init() { + if(this.init_complete) return; + + this.proxy = await this.pool.proxy(); + + this.init_complete = true; + } + + async deduplicate(dirpath_source, dirpath_target) { + const time_start = new Date(); + this.hashes.clear(); + + const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename)); + + l.log(`STEP [1 / 5]: Hashing files`); + await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 }); + l.log(`STEP [1 / 5]: ${this.hashes.size} hashes gathered in total.`); + + l.log(`STEP [ 2 / 5 ]: Identify duplicates`); + const dupes = this.find_duplicates(); + this.hashes.clear(); // Save memory + l.log(`STEP [ 2 / 5 ]: ${dupes.length} duplicate groups identified`); + + l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`); + const deletion_lists = this.assemble_deletion_lists(dupes); + l.log(`STEP [ 3 / 5 ]: ${deletion_lists.values().reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`); + + l.log(`STEP [ 4 / 5 ]: Delete duplicates`); + await p_map( + deletion_lists.entries(), + async (args) => await this.#do_single_delete(...args), + { concurrency: this.worker_count + 10 } + ); + l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`); + + l.log(`STEP [ 5 / 5 ]: Recompress files`); + const { recompress_lines, recompress_files } = await records_recompress( + dirpath_source, dirpath_target ?? this.#adjacent_dir(dirpath_source), + this.items_per_file + ); + l.log(`STEP [ 5 / 5 ]: Complete with ${recompress_files} files ${recompress_lines} lines at final count.`); + l.log(`Done in ${pretty_ms(new Date() - time_start)}, thank you :D`); + } + + #adjacent_dir(dir, target="deduped") { + const dirname = path.dirname(dir); + const basename = path.basename(dir); + return path.join(dirname, `${basename}-${tag}`); + } + + find_duplicates() { + const result = []; + for(const [ id, hash ] of this.hashes.entries()) { + const dupes_group = [ { id, hash } ]; + for(const [ id_inner, hash_inner ] of this.hashes.entries()) { + if(id === id_inner) continue; + if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner }); + } + if(dupes_group.length > 1) { + result.push(result); + } + } + + return result; + } + + assemble_deletion_lists(dupe_groups) { + const result = new Map(); + for(const dupe_group of dupe_groups) { + for(const dupe of dupe_group.slice(1)) { // Keep the first one + const [ filename, i ] = dupe.id.split(`|`, 2); + if(!result.has(filename)) result.set(filename, []); + result.get(filename).push(i); + } + } + return result; + } + + async #do_single_hash(filepath) { + if(filepath.includes("|")) throw new Error(`Filepath contains bar character: ${filepath}`); + const filename = path.basename(filepath); + + l.log(`Hashing ${path.basename(filepath)}`); + const result = await p_reflect(this.proxy.hash_targets(filepath)); + if(result.isRejected) { + l.warn(`Got error from worker when hashing ${filename}:`, result.reason); + return; + } + + for(const { i, hash } of result.value) { + this.hashes.set(`${filepath}|${i}`, hash); + } + } + + async #do_single_delete(filename, deletion_list) { + const result = await p_reflect(this.proxy.delete_duplicates(filename, deletion_list)); + if(result.isRejected) { + l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename}:`, result.reason); + return; + } + } +} + +export default RecordUniqManager; \ No newline at end of file diff --git a/rainfallwrangler/src/lib/record/RecordsWriter.mjs b/rainfallwrangler/src/lib/record/RecordsWriter.mjs index 74c003b..6e2b311 100644 --- a/rainfallwrangler/src/lib/record/RecordsWriter.mjs +++ b/rainfallwrangler/src/lib/record/RecordsWriter.mjs @@ -15,11 +15,30 @@ class RecordsWriter { this.#gzip.pipe(this.#stream_out); } + /** + * Writes a sample to the file, followed by a new line \n character. + * @param {Map} sample The sample to write. + * @return {Promise} + */ async write(sample) { const str = JSON.stringify(Object.fromEntries(sample)); await write_safe(this.#gzip, str+"\n"); } + /** + * Writes a raw value to the file, followed by a new line \n character. + * @param {string} line The thing to write. + * @return {Promise} + */ + async write_raw(line) { + await write_safe(this.#gzip, line+"\n"); + } + + /** + * Closes the underlying file gracefully. + * No more may be written to the file after this method is called. + * @return {Promise} + */ async close() { await end_safe(this.#gzip); await end_safe(this.#stream_out); diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs new file mode 100644 index 0000000..002dc2e --- /dev/null +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs @@ -0,0 +1,24 @@ +"use strict"; + +import crypto from 'crypto'; + +import records_read from "../records_read.mjs"; +import RecordsWriter from '../RecordsWriter.mjs'; + +import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); + +// This could be muxed together rather than use a worker like this in the main thread since it's I/O bound +export default async function(filepath, lines) { + const result = []; + + let i = -1, writer = new RecordsWriter(filepath); + for await(const line of records_read(filename)) { + i++; + if(line === "" || lines.includes(i)) continue; + + await writer.write_raw(line); + } + await writer.close(); + + return result; +} diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs new file mode 100644 index 0000000..0835e84 --- /dev/null +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs @@ -0,0 +1,22 @@ +"use strict"; + +import crypto from 'crypto'; + +import records_read from "../records_read.mjs"; + +import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); + +export default async function(filename) { + const result = []; + + let i = -1; + for await(const line of records_read(filename)) { + i++; + if(line === "") continue; + + // Ref https://stackoverflow.com/a/58307338/1460422 + result.push({ i, hash: crypto.createHash("sha256").update(line, "binary").digest("base64") }); + } + + return result; +} \ No newline at end of file diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/worker.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/worker.mjs new file mode 100644 index 0000000..98e8ef5 --- /dev/null +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/worker.mjs @@ -0,0 +1,11 @@ +"use strict"; + +import workerpool from 'workerpool'; + +import hash_targets from './hash_targets.mjs'; +import delete_duplicates from './delete_duplicates.mjs'; + +workerpool.worker({ + hash_targets, + delete_duplicates +}); diff --git a/rainfallwrangler/src/lib/record/records_read.mjs b/rainfallwrangler/src/lib/record/records_read.mjs new file mode 100644 index 0000000..2ec37e1 --- /dev/null +++ b/rainfallwrangler/src/lib/record/records_read.mjs @@ -0,0 +1,19 @@ +"use strict"; + +import fs from 'fs'; + +import nexline from 'nexline'; +import gunzip from 'gunzip-maybe'; + +/** + * Reads the records from a (potentially gzipped) .jsonl / .jsonl.gz file. + * @param {string} filename The filename to read from. + * @return {AsyncGenerator} An asynchronous generator that iteratively returns the lines in the file. + */ +function records_read(filename) { + return nexline({ + input: fs.createReadStream(filename).pipe(gunzip()) + }); +} + +export default records_read; \ No newline at end of file diff --git a/rainfallwrangler/src/lib/record/records_recompress.mjs b/rainfallwrangler/src/lib/record/records_recompress.mjs new file mode 100644 index 0000000..fa28106 --- /dev/null +++ b/rainfallwrangler/src/lib/record/records_recompress.mjs @@ -0,0 +1,48 @@ +"use strict"; + +import fs from 'fs'; +import path from 'path'; + +import nexline from 'nexline'; +import pretty_ms from 'pretty-ms'; + +import RecordsWriter from './RecordsWriter.mjs'; + +async function records_recompress(dirpath_source, dirpath_target, items_per_file) { + const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename)); + + const reader = nexline({ + input: files.map(filepath => fs.createReadStream(filepath).pipe(gunzip())) + }); + + + if(!fs.existsSync(dirpath_target)) + await fs.promises.mkdir(dirpath_target, { recursive: true }); + + let writer = null, i = 0, i_file = 0, i_this_file; + let time_start = new Date(), time_display = time_start; + for await(const line of reader) { + if(line === "") continue; + + if(writer === null || i_this_file >= items_per_file) { + if(writer !== null) await writer.close(); + writer = new RecordsWriter(path.join(dirpath_target, `${i_file}.jsonl.gz`)); + i_file++; i_this_file = 0; + } + + await writer.write_raw(line.trim()); + + i++; + i_this_file++; + + if(new Date() - time_display > 2000) { + const elapsed = new Date() - time_start; + process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${this.items_per_file - i_this_file} left for this file\r`) + } + } + await writer.close(); + + return { recompress_lines: i, recompress_files: i_file }; +} + +export default records_recompress; \ No newline at end of file diff --git a/rainfallwrangler/src/subcommands/recordify/meta.mjs b/rainfallwrangler/src/subcommands/recordify/meta.mjs index eea97db..8d428ed 100644 --- a/rainfallwrangler/src/subcommands/recordify/meta.mjs +++ b/rainfallwrangler/src/subcommands/recordify/meta.mjs @@ -4,7 +4,7 @@ export default function(cli) { cli.subcommand("recordify", "Converts rainfall radar and water depth data to a directory of .jsonl.gz files.") .argument("water", "Path to the water depths file, formatted as a stream of terrain50 objects. May or may not be gzipped.", null, "string") .argument("rainfall", "Path to the rainfall radar data, formatted as jsonl. May or may not be gzipped.", null, "string") - .argument("count-file", "The number of records to store in each TFRecord file. See the documentation for the optimal value of this number (default: 4096).", 64*64) + .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64) .argument("rainfall-pattern", "The pattern of the number of time steps to average, as a comma-separated list of numbers. Given a point in time, each successive number specified works BACKWARDS from that point. For example, 1,4,10 would be 3 channels: 1 time step on it's own, then average the next 4 time steps, then average the next 10 steps (default: 1,3,3,5,12,24,48).", [1,3,3,5,12,24,48].reverse(), function(value) { return value.split(",") .map(el => parseInt(el)) diff --git a/rainfallwrangler/src/subcommands/uniq/meta.mjs b/rainfallwrangler/src/subcommands/uniq/meta.mjs new file mode 100644 index 0000000..3468203 --- /dev/null +++ b/rainfallwrangler/src/subcommands/uniq/meta.mjs @@ -0,0 +1,8 @@ +"use strict"; + +export default function(cli) { + cli.subcommand("uniq", "Deduplicates the entries in a directory of recordified files.") + .argument("source", "Path to the source directory.", null, "string") + .argument("target", "Path to the target directory.", null, "string") + .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64); +} diff --git a/rainfallwrangler/src/subcommands/uniq/uniq.mjs b/rainfallwrangler/src/subcommands/uniq/uniq.mjs new file mode 100644 index 0000000..53ed1ed --- /dev/null +++ b/rainfallwrangler/src/subcommands/uniq/uniq.mjs @@ -0,0 +1,18 @@ +"use strict"; + +import fs from 'fs'; + +import settings from "../../settings.mjs"; + +import RecordUniqManager from '../../lib/record/RecordUniqManager.mjs'; + +export default async function() { + if(typeof settings.cli.source !== "string") + throw new Error(`Error: No source directory specified (see the --source CLI argument)`); + if(!fs.existsSync(settings.cli.source)) + throw new Error(`Error: The source directory at '${settings.cli.source}' doesn't exist or you haven't got permission to access it.`); + + + const uniq_manager = new RecordUniqManager(settings.cli.count_file); + await uniq_manager.deduplicate(settings.cli.source, settings.cli.target); +} \ No newline at end of file