From 1a657bd65398eb706489e65f24cd3a985310652a Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Mon, 4 Jul 2022 19:46:06 +0100 Subject: [PATCH] add new uniq subcommand It deduplicates lines in the files, with the potential to add the ability to filter on a specific property later. The reasoningf or this is thus: 1. There will naturally be periods of time where nothing happens 2. Too many duplicates will interfere and confuse with the contrastive learning algorithm, as in each batch it will have less variance in samples This is especially important because contrastive learning causes it to compare every item in each batch with every othear item in the batch. --- rainfallwrangler/package-lock.json | 132 ++++++++++++++++- rainfallwrangler/package.json | 6 +- .../src/lib/record/RecordUniqManager.mjs | 135 ++++++++++++++++++ .../src/lib/record/RecordsWriter.mjs | 19 +++ .../record_uniq_worker/delete_duplicates.mjs | 24 ++++ .../record_uniq_worker/hash_targets.mjs | 22 +++ .../lib/record/record_uniq_worker/worker.mjs | 11 ++ .../src/lib/record/records_read.mjs | 19 +++ .../src/lib/record/records_recompress.mjs | 48 +++++++ .../src/subcommands/recordify/meta.mjs | 2 +- .../src/subcommands/uniq/meta.mjs | 8 ++ .../src/subcommands/uniq/uniq.mjs | 18 +++ 12 files changed, 441 insertions(+), 3 deletions(-) create mode 100644 rainfallwrangler/src/lib/record/RecordUniqManager.mjs create mode 100644 rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs create mode 100644 rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs create mode 100644 rainfallwrangler/src/lib/record/record_uniq_worker/worker.mjs create mode 100644 rainfallwrangler/src/lib/record/records_read.mjs create mode 100644 rainfallwrangler/src/lib/record/records_recompress.mjs create mode 100644 rainfallwrangler/src/subcommands/uniq/meta.mjs create mode 100644 rainfallwrangler/src/subcommands/uniq/uniq.mjs diff --git a/rainfallwrangler/package-lock.json b/rainfallwrangler/package-lock.json index c267806..852b2d0 100644 --- a/rainfallwrangler/package-lock.json +++ b/rainfallwrangler/package-lock.json @@ -12,9 +12,13 @@ "@tensorflow/tfjs-node-gpu": "^3.18.0", "applause-cli": "^1.8.1", "gunzip-maybe": "^1.4.2", + "nexline": "^1.2.2", + "p-map": "^5.5.0", + "p-reflect": "^3.0.0", "pretty-ms": "^8.0.0", "spawn-stream": "^1.0.2", - "terrain50": "^1.10.1" + "terrain50": "^1.10.1", + "workerpool": "^6.2.1" } }, "node_modules/@mapbox/node-pre-gyp": { @@ -367,6 +371,21 @@ "node": ">= 4.0.0" } }, + "node_modules/aggregate-error": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-4.0.1.tgz", + "integrity": "sha512-0poP0T7el6Vq3rstR8Mn4V/IQrpBLO6POkUSrN7RhyY+GF/InCFShQzsQ39T25gkHhLgSLByyAz+Kjb+c2L98w==", + "dependencies": { + "clean-stack": "^4.0.0", + "indent-string": "^5.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/ansi-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz", @@ -468,6 +487,20 @@ "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", "integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==" }, + "node_modules/clean-stack": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-4.2.0.tgz", + "integrity": "sha512-LYv6XPxoyODi36Dp976riBtSY27VmFo+MKqEU9QCCWyTrdEPDog+RWA7xQWHi6Vbp61j5c4cdzzX1NidnwtUWg==", + "dependencies": { + "escape-string-regexp": "5.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/cliui": { "version": "7.0.4", "resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz", @@ -655,6 +688,17 @@ "node": ">=6" } }, + "node_modules/escape-string-regexp": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz", + "integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/form-data": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.1.tgz", @@ -798,6 +842,17 @@ "node": ">=0.10.0" } }, + "node_modules/indent-string": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-5.0.0.tgz", + "integrity": "sha512-m6FAo/spmsW2Ab2fU35JTYwtOKa2yAwXSwgjSv1TJzh4Mh7mC3lzAOVLBprb72XsTrgkEIsl7YrFNAiDiRhIGg==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/inflight": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", @@ -1053,6 +1108,31 @@ "wrappy": "1" } }, + "node_modules/p-map": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", + "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", + "dependencies": { + "aggregate-error": "^4.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-reflect": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", + "integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/pako": { "version": "0.2.9", "resolved": "https://registry.npmjs.org/pako/-/pako-0.2.9.tgz", @@ -1382,6 +1462,11 @@ "resolved": "https://registry.npmjs.org/wkt-parser/-/wkt-parser-1.3.2.tgz", "integrity": "sha512-A26BOOo7sHAagyxG7iuRhnKMO7Q3mEOiOT4oGUmohtN/Li5wameeU4S6f8vWw6NADTVKljBs8bzA8JPQgSEMVQ==" }, + "node_modules/workerpool": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.2.1.tgz", + "integrity": "sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw==" + }, "node_modules/wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", @@ -1809,6 +1894,15 @@ "es6-promisify": "^5.0.0" } }, + "aggregate-error": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-4.0.1.tgz", + "integrity": "sha512-0poP0T7el6Vq3rstR8Mn4V/IQrpBLO6POkUSrN7RhyY+GF/InCFShQzsQ39T25gkHhLgSLByyAz+Kjb+c2L98w==", + "requires": { + "clean-stack": "^4.0.0", + "indent-string": "^5.0.0" + } + }, "ansi-regex": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz", @@ -1895,6 +1989,14 @@ "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", "integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==" }, + "clean-stack": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-4.2.0.tgz", + "integrity": "sha512-LYv6XPxoyODi36Dp976riBtSY27VmFo+MKqEU9QCCWyTrdEPDog+RWA7xQWHi6Vbp61j5c4cdzzX1NidnwtUWg==", + "requires": { + "escape-string-regexp": "5.0.0" + } + }, "cliui": { "version": "7.0.4", "resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz", @@ -2046,6 +2148,11 @@ "resolved": "https://registry.npmjs.org/escalade/-/escalade-3.1.1.tgz", "integrity": "sha512-k0er2gUkLf8O0zKJiAhmkTnJlTvINGv7ygDNPbeIsX/TJjGJZHuh9B2UxbsaEkmlEo9MfhrSzmhIlhRlI2GXnw==" }, + "escape-string-regexp": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz", + "integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==" + }, "form-data": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.1.tgz", @@ -2162,6 +2269,11 @@ "safer-buffer": ">= 2.1.2 < 3" } }, + "indent-string": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-5.0.0.tgz", + "integrity": "sha512-m6FAo/spmsW2Ab2fU35JTYwtOKa2yAwXSwgjSv1TJzh4Mh7mC3lzAOVLBprb72XsTrgkEIsl7YrFNAiDiRhIGg==" + }, "inflight": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", @@ -2365,6 +2477,19 @@ "wrappy": "1" } }, + "p-map": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", + "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", + "requires": { + "aggregate-error": "^4.0.0" + } + }, + "p-reflect": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz", + "integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==" + }, "pako": { "version": "0.2.9", "resolved": "https://registry.npmjs.org/pako/-/pako-0.2.9.tgz", @@ -2637,6 +2762,11 @@ "resolved": "https://registry.npmjs.org/wkt-parser/-/wkt-parser-1.3.2.tgz", "integrity": "sha512-A26BOOo7sHAagyxG7iuRhnKMO7Q3mEOiOT4oGUmohtN/Li5wameeU4S6f8vWw6NADTVKljBs8bzA8JPQgSEMVQ==" }, + "workerpool": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.2.1.tgz", + "integrity": "sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw==" + }, "wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", diff --git a/rainfallwrangler/package.json b/rainfallwrangler/package.json index eff478e..0a5964a 100644 --- a/rainfallwrangler/package.json +++ b/rainfallwrangler/package.json @@ -16,8 +16,12 @@ "@tensorflow/tfjs-node-gpu": "^3.18.0", "applause-cli": "^1.8.1", "gunzip-maybe": "^1.4.2", + "nexline": "^1.2.2", + "p-map": "^5.5.0", + "p-reflect": "^3.0.0", "pretty-ms": "^8.0.0", "spawn-stream": "^1.0.2", - "terrain50": "^1.10.1" + "terrain50": "^1.10.1", + "workerpool": "^6.2.1" } } diff --git a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs new file mode 100644 index 0000000..f75334d --- /dev/null +++ b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs @@ -0,0 +1,135 @@ +"use strict"; + +import fs from 'fs'; +import path from 'path'; +import os from 'os'; + +import workerpool from 'workerpool'; +import p_map from 'p-map'; +import p_reflect from 'p-reflect'; +import pretty_ms from 'pretty-ms'; + +import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:manager"); +import records_recompress from './records_recompress.mjs'; + + +const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/")); + +class RecordUniqManager { + constructor(items_per_file) { + this.items_per_file = items_per_file; + + this.worker_count = os.cpus().length; + this.pool = workerpool.pool(path.join(__dirname, "record_uniq_worker/worker.mjs"), { + maxQueueSize: 100, + maxWorkers: this.worker_count + }); + + this.hashes = new Map(); + this.init_complete = false; + } + + async init() { + if(this.init_complete) return; + + this.proxy = await this.pool.proxy(); + + this.init_complete = true; + } + + async deduplicate(dirpath_source, dirpath_target) { + const time_start = new Date(); + this.hashes.clear(); + + const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename)); + + l.log(`STEP [1 / 5]: Hashing files`); + await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 }); + l.log(`STEP [1 / 5]: ${this.hashes.size} hashes gathered in total.`); + + l.log(`STEP [ 2 / 5 ]: Identify duplicates`); + const dupes = this.find_duplicates(); + this.hashes.clear(); // Save memory + l.log(`STEP [ 2 / 5 ]: ${dupes.length} duplicate groups identified`); + + l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`); + const deletion_lists = this.assemble_deletion_lists(dupes); + l.log(`STEP [ 3 / 5 ]: ${deletion_lists.values().reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`); + + l.log(`STEP [ 4 / 5 ]: Delete duplicates`); + await p_map( + deletion_lists.entries(), + async (args) => await this.#do_single_delete(...args), + { concurrency: this.worker_count + 10 } + ); + l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`); + + l.log(`STEP [ 5 / 5 ]: Recompress files`); + const { recompress_lines, recompress_files } = await records_recompress( + dirpath_source, dirpath_target ?? this.#adjacent_dir(dirpath_source), + this.items_per_file + ); + l.log(`STEP [ 5 / 5 ]: Complete with ${recompress_files} files ${recompress_lines} lines at final count.`); + l.log(`Done in ${pretty_ms(new Date() - time_start)}, thank you :D`); + } + + #adjacent_dir(dir, target="deduped") { + const dirname = path.dirname(dir); + const basename = path.basename(dir); + return path.join(dirname, `${basename}-${tag}`); + } + + find_duplicates() { + const result = []; + for(const [ id, hash ] of this.hashes.entries()) { + const dupes_group = [ { id, hash } ]; + for(const [ id_inner, hash_inner ] of this.hashes.entries()) { + if(id === id_inner) continue; + if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner }); + } + if(dupes_group.length > 1) { + result.push(result); + } + } + + return result; + } + + assemble_deletion_lists(dupe_groups) { + const result = new Map(); + for(const dupe_group of dupe_groups) { + for(const dupe of dupe_group.slice(1)) { // Keep the first one + const [ filename, i ] = dupe.id.split(`|`, 2); + if(!result.has(filename)) result.set(filename, []); + result.get(filename).push(i); + } + } + return result; + } + + async #do_single_hash(filepath) { + if(filepath.includes("|")) throw new Error(`Filepath contains bar character: ${filepath}`); + const filename = path.basename(filepath); + + l.log(`Hashing ${path.basename(filepath)}`); + const result = await p_reflect(this.proxy.hash_targets(filepath)); + if(result.isRejected) { + l.warn(`Got error from worker when hashing ${filename}:`, result.reason); + return; + } + + for(const { i, hash } of result.value) { + this.hashes.set(`${filepath}|${i}`, hash); + } + } + + async #do_single_delete(filename, deletion_list) { + const result = await p_reflect(this.proxy.delete_duplicates(filename, deletion_list)); + if(result.isRejected) { + l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename}:`, result.reason); + return; + } + } +} + +export default RecordUniqManager; \ No newline at end of file diff --git a/rainfallwrangler/src/lib/record/RecordsWriter.mjs b/rainfallwrangler/src/lib/record/RecordsWriter.mjs index 74c003b..6e2b311 100644 --- a/rainfallwrangler/src/lib/record/RecordsWriter.mjs +++ b/rainfallwrangler/src/lib/record/RecordsWriter.mjs @@ -15,11 +15,30 @@ class RecordsWriter { this.#gzip.pipe(this.#stream_out); } + /** + * Writes a sample to the file, followed by a new line \n character. + * @param {Map} sample The sample to write. + * @return {Promise} + */ async write(sample) { const str = JSON.stringify(Object.fromEntries(sample)); await write_safe(this.#gzip, str+"\n"); } + /** + * Writes a raw value to the file, followed by a new line \n character. + * @param {string} line The thing to write. + * @return {Promise} + */ + async write_raw(line) { + await write_safe(this.#gzip, line+"\n"); + } + + /** + * Closes the underlying file gracefully. + * No more may be written to the file after this method is called. + * @return {Promise} + */ async close() { await end_safe(this.#gzip); await end_safe(this.#stream_out); diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs new file mode 100644 index 0000000..002dc2e --- /dev/null +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs @@ -0,0 +1,24 @@ +"use strict"; + +import crypto from 'crypto'; + +import records_read from "../records_read.mjs"; +import RecordsWriter from '../RecordsWriter.mjs'; + +import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); + +// This could be muxed together rather than use a worker like this in the main thread since it's I/O bound +export default async function(filepath, lines) { + const result = []; + + let i = -1, writer = new RecordsWriter(filepath); + for await(const line of records_read(filename)) { + i++; + if(line === "" || lines.includes(i)) continue; + + await writer.write_raw(line); + } + await writer.close(); + + return result; +} diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs new file mode 100644 index 0000000..0835e84 --- /dev/null +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs @@ -0,0 +1,22 @@ +"use strict"; + +import crypto from 'crypto'; + +import records_read from "../records_read.mjs"; + +import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); + +export default async function(filename) { + const result = []; + + let i = -1; + for await(const line of records_read(filename)) { + i++; + if(line === "") continue; + + // Ref https://stackoverflow.com/a/58307338/1460422 + result.push({ i, hash: crypto.createHash("sha256").update(line, "binary").digest("base64") }); + } + + return result; +} \ No newline at end of file diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/worker.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/worker.mjs new file mode 100644 index 0000000..98e8ef5 --- /dev/null +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/worker.mjs @@ -0,0 +1,11 @@ +"use strict"; + +import workerpool from 'workerpool'; + +import hash_targets from './hash_targets.mjs'; +import delete_duplicates from './delete_duplicates.mjs'; + +workerpool.worker({ + hash_targets, + delete_duplicates +}); diff --git a/rainfallwrangler/src/lib/record/records_read.mjs b/rainfallwrangler/src/lib/record/records_read.mjs new file mode 100644 index 0000000..2ec37e1 --- /dev/null +++ b/rainfallwrangler/src/lib/record/records_read.mjs @@ -0,0 +1,19 @@ +"use strict"; + +import fs from 'fs'; + +import nexline from 'nexline'; +import gunzip from 'gunzip-maybe'; + +/** + * Reads the records from a (potentially gzipped) .jsonl / .jsonl.gz file. + * @param {string} filename The filename to read from. + * @return {AsyncGenerator} An asynchronous generator that iteratively returns the lines in the file. + */ +function records_read(filename) { + return nexline({ + input: fs.createReadStream(filename).pipe(gunzip()) + }); +} + +export default records_read; \ No newline at end of file diff --git a/rainfallwrangler/src/lib/record/records_recompress.mjs b/rainfallwrangler/src/lib/record/records_recompress.mjs new file mode 100644 index 0000000..fa28106 --- /dev/null +++ b/rainfallwrangler/src/lib/record/records_recompress.mjs @@ -0,0 +1,48 @@ +"use strict"; + +import fs from 'fs'; +import path from 'path'; + +import nexline from 'nexline'; +import pretty_ms from 'pretty-ms'; + +import RecordsWriter from './RecordsWriter.mjs'; + +async function records_recompress(dirpath_source, dirpath_target, items_per_file) { + const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename)); + + const reader = nexline({ + input: files.map(filepath => fs.createReadStream(filepath).pipe(gunzip())) + }); + + + if(!fs.existsSync(dirpath_target)) + await fs.promises.mkdir(dirpath_target, { recursive: true }); + + let writer = null, i = 0, i_file = 0, i_this_file; + let time_start = new Date(), time_display = time_start; + for await(const line of reader) { + if(line === "") continue; + + if(writer === null || i_this_file >= items_per_file) { + if(writer !== null) await writer.close(); + writer = new RecordsWriter(path.join(dirpath_target, `${i_file}.jsonl.gz`)); + i_file++; i_this_file = 0; + } + + await writer.write_raw(line.trim()); + + i++; + i_this_file++; + + if(new Date() - time_display > 2000) { + const elapsed = new Date() - time_start; + process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${this.items_per_file - i_this_file} left for this file\r`) + } + } + await writer.close(); + + return { recompress_lines: i, recompress_files: i_file }; +} + +export default records_recompress; \ No newline at end of file diff --git a/rainfallwrangler/src/subcommands/recordify/meta.mjs b/rainfallwrangler/src/subcommands/recordify/meta.mjs index eea97db..8d428ed 100644 --- a/rainfallwrangler/src/subcommands/recordify/meta.mjs +++ b/rainfallwrangler/src/subcommands/recordify/meta.mjs @@ -4,7 +4,7 @@ export default function(cli) { cli.subcommand("recordify", "Converts rainfall radar and water depth data to a directory of .jsonl.gz files.") .argument("water", "Path to the water depths file, formatted as a stream of terrain50 objects. May or may not be gzipped.", null, "string") .argument("rainfall", "Path to the rainfall radar data, formatted as jsonl. May or may not be gzipped.", null, "string") - .argument("count-file", "The number of records to store in each TFRecord file. See the documentation for the optimal value of this number (default: 4096).", 64*64) + .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64) .argument("rainfall-pattern", "The pattern of the number of time steps to average, as a comma-separated list of numbers. Given a point in time, each successive number specified works BACKWARDS from that point. For example, 1,4,10 would be 3 channels: 1 time step on it's own, then average the next 4 time steps, then average the next 10 steps (default: 1,3,3,5,12,24,48).", [1,3,3,5,12,24,48].reverse(), function(value) { return value.split(",") .map(el => parseInt(el)) diff --git a/rainfallwrangler/src/subcommands/uniq/meta.mjs b/rainfallwrangler/src/subcommands/uniq/meta.mjs new file mode 100644 index 0000000..3468203 --- /dev/null +++ b/rainfallwrangler/src/subcommands/uniq/meta.mjs @@ -0,0 +1,8 @@ +"use strict"; + +export default function(cli) { + cli.subcommand("uniq", "Deduplicates the entries in a directory of recordified files.") + .argument("source", "Path to the source directory.", null, "string") + .argument("target", "Path to the target directory.", null, "string") + .argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64); +} diff --git a/rainfallwrangler/src/subcommands/uniq/uniq.mjs b/rainfallwrangler/src/subcommands/uniq/uniq.mjs new file mode 100644 index 0000000..53ed1ed --- /dev/null +++ b/rainfallwrangler/src/subcommands/uniq/uniq.mjs @@ -0,0 +1,18 @@ +"use strict"; + +import fs from 'fs'; + +import settings from "../../settings.mjs"; + +import RecordUniqManager from '../../lib/record/RecordUniqManager.mjs'; + +export default async function() { + if(typeof settings.cli.source !== "string") + throw new Error(`Error: No source directory specified (see the --source CLI argument)`); + if(!fs.existsSync(settings.cli.source)) + throw new Error(`Error: The source directory at '${settings.cli.source}' doesn't exist or you haven't got permission to access it.`); + + + const uniq_manager = new RecordUniqManager(settings.cli.count_file); + await uniq_manager.deduplicate(settings.cli.source, settings.cli.target); +} \ No newline at end of file