add new uniq subcommand

It deduplicates lines in the files, with the potential to add the ability to filter on a specific property later.
The reasoningf or this is thus:
1. There will naturally be periods of time where nothing happens
2. Too many duplicates will interfere and confuse with the contrastive learning algorithm, as in each batch it will have less variance in samples

This is especially important because contrastive learning causes it to compare every item in each batch with every othear item in the batch.
This commit is contained in:
Starbeamrainbowlabs 2022-07-04 19:46:06 +01:00
parent 234e2b7978
commit 1a657bd653
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
12 changed files with 441 additions and 3 deletions

View file

@ -12,9 +12,13 @@
"@tensorflow/tfjs-node-gpu": "^3.18.0",
"applause-cli": "^1.8.1",
"gunzip-maybe": "^1.4.2",
"nexline": "^1.2.2",
"p-map": "^5.5.0",
"p-reflect": "^3.0.0",
"pretty-ms": "^8.0.0",
"spawn-stream": "^1.0.2",
"terrain50": "^1.10.1"
"terrain50": "^1.10.1",
"workerpool": "^6.2.1"
}
},
"node_modules/@mapbox/node-pre-gyp": {
@ -367,6 +371,21 @@
"node": ">= 4.0.0"
}
},
"node_modules/aggregate-error": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-4.0.1.tgz",
"integrity": "sha512-0poP0T7el6Vq3rstR8Mn4V/IQrpBLO6POkUSrN7RhyY+GF/InCFShQzsQ39T25gkHhLgSLByyAz+Kjb+c2L98w==",
"dependencies": {
"clean-stack": "^4.0.0",
"indent-string": "^5.0.0"
},
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/ansi-regex": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz",
@ -468,6 +487,20 @@
"resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz",
"integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg=="
},
"node_modules/clean-stack": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-4.2.0.tgz",
"integrity": "sha512-LYv6XPxoyODi36Dp976riBtSY27VmFo+MKqEU9QCCWyTrdEPDog+RWA7xQWHi6Vbp61j5c4cdzzX1NidnwtUWg==",
"dependencies": {
"escape-string-regexp": "5.0.0"
},
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/cliui": {
"version": "7.0.4",
"resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz",
@ -655,6 +688,17 @@
"node": ">=6"
}
},
"node_modules/escape-string-regexp": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz",
"integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/form-data": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.1.tgz",
@ -798,6 +842,17 @@
"node": ">=0.10.0"
}
},
"node_modules/indent-string": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/indent-string/-/indent-string-5.0.0.tgz",
"integrity": "sha512-m6FAo/spmsW2Ab2fU35JTYwtOKa2yAwXSwgjSv1TJzh4Mh7mC3lzAOVLBprb72XsTrgkEIsl7YrFNAiDiRhIGg==",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/inflight": {
"version": "1.0.6",
"resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz",
@ -1053,6 +1108,31 @@
"wrappy": "1"
}
},
"node_modules/p-map": {
"version": "5.5.0",
"resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz",
"integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==",
"dependencies": {
"aggregate-error": "^4.0.0"
},
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-reflect": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz",
"integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA==",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/pako": {
"version": "0.2.9",
"resolved": "https://registry.npmjs.org/pako/-/pako-0.2.9.tgz",
@ -1382,6 +1462,11 @@
"resolved": "https://registry.npmjs.org/wkt-parser/-/wkt-parser-1.3.2.tgz",
"integrity": "sha512-A26BOOo7sHAagyxG7iuRhnKMO7Q3mEOiOT4oGUmohtN/Li5wameeU4S6f8vWw6NADTVKljBs8bzA8JPQgSEMVQ=="
},
"node_modules/workerpool": {
"version": "6.2.1",
"resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.2.1.tgz",
"integrity": "sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw=="
},
"node_modules/wrap-ansi": {
"version": "7.0.0",
"resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz",
@ -1809,6 +1894,15 @@
"es6-promisify": "^5.0.0"
}
},
"aggregate-error": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-4.0.1.tgz",
"integrity": "sha512-0poP0T7el6Vq3rstR8Mn4V/IQrpBLO6POkUSrN7RhyY+GF/InCFShQzsQ39T25gkHhLgSLByyAz+Kjb+c2L98w==",
"requires": {
"clean-stack": "^4.0.0",
"indent-string": "^5.0.0"
}
},
"ansi-regex": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz",
@ -1895,6 +1989,14 @@
"resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz",
"integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg=="
},
"clean-stack": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-4.2.0.tgz",
"integrity": "sha512-LYv6XPxoyODi36Dp976riBtSY27VmFo+MKqEU9QCCWyTrdEPDog+RWA7xQWHi6Vbp61j5c4cdzzX1NidnwtUWg==",
"requires": {
"escape-string-regexp": "5.0.0"
}
},
"cliui": {
"version": "7.0.4",
"resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz",
@ -2046,6 +2148,11 @@
"resolved": "https://registry.npmjs.org/escalade/-/escalade-3.1.1.tgz",
"integrity": "sha512-k0er2gUkLf8O0zKJiAhmkTnJlTvINGv7ygDNPbeIsX/TJjGJZHuh9B2UxbsaEkmlEo9MfhrSzmhIlhRlI2GXnw=="
},
"escape-string-regexp": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz",
"integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw=="
},
"form-data": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-3.0.1.tgz",
@ -2162,6 +2269,11 @@
"safer-buffer": ">= 2.1.2 < 3"
}
},
"indent-string": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/indent-string/-/indent-string-5.0.0.tgz",
"integrity": "sha512-m6FAo/spmsW2Ab2fU35JTYwtOKa2yAwXSwgjSv1TJzh4Mh7mC3lzAOVLBprb72XsTrgkEIsl7YrFNAiDiRhIGg=="
},
"inflight": {
"version": "1.0.6",
"resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz",
@ -2365,6 +2477,19 @@
"wrappy": "1"
}
},
"p-map": {
"version": "5.5.0",
"resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz",
"integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==",
"requires": {
"aggregate-error": "^4.0.0"
}
},
"p-reflect": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/p-reflect/-/p-reflect-3.0.0.tgz",
"integrity": "sha512-rOgYyrvUxnJdSYKGSK7UnO7RxFSnT/IJYFPiosuQ2/AtRWIryIrv8lecWqJXWbKnMcUjJvxiHDMp80m0Yj4eLA=="
},
"pako": {
"version": "0.2.9",
"resolved": "https://registry.npmjs.org/pako/-/pako-0.2.9.tgz",
@ -2637,6 +2762,11 @@
"resolved": "https://registry.npmjs.org/wkt-parser/-/wkt-parser-1.3.2.tgz",
"integrity": "sha512-A26BOOo7sHAagyxG7iuRhnKMO7Q3mEOiOT4oGUmohtN/Li5wameeU4S6f8vWw6NADTVKljBs8bzA8JPQgSEMVQ=="
},
"workerpool": {
"version": "6.2.1",
"resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.2.1.tgz",
"integrity": "sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw=="
},
"wrap-ansi": {
"version": "7.0.0",
"resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz",

View file

@ -16,8 +16,12 @@
"@tensorflow/tfjs-node-gpu": "^3.18.0",
"applause-cli": "^1.8.1",
"gunzip-maybe": "^1.4.2",
"nexline": "^1.2.2",
"p-map": "^5.5.0",
"p-reflect": "^3.0.0",
"pretty-ms": "^8.0.0",
"spawn-stream": "^1.0.2",
"terrain50": "^1.10.1"
"terrain50": "^1.10.1",
"workerpool": "^6.2.1"
}
}

View file

@ -0,0 +1,135 @@
"use strict";
import fs from 'fs';
import path from 'path';
import os from 'os';
import workerpool from 'workerpool';
import p_map from 'p-map';
import p_reflect from 'p-reflect';
import pretty_ms from 'pretty-ms';
import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:manager");
import records_recompress from './records_recompress.mjs';
const __dirname = import.meta.url.slice(7, import.meta.url.lastIndexOf("/"));
class RecordUniqManager {
constructor(items_per_file) {
this.items_per_file = items_per_file;
this.worker_count = os.cpus().length;
this.pool = workerpool.pool(path.join(__dirname, "record_uniq_worker/worker.mjs"), {
maxQueueSize: 100,
maxWorkers: this.worker_count
});
this.hashes = new Map();
this.init_complete = false;
}
async init() {
if(this.init_complete) return;
this.proxy = await this.pool.proxy();
this.init_complete = true;
}
async deduplicate(dirpath_source, dirpath_target) {
const time_start = new Date();
this.hashes.clear();
const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename));
l.log(`STEP [1 / 5]: Hashing files`);
await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 });
l.log(`STEP [1 / 5]: ${this.hashes.size} hashes gathered in total.`);
l.log(`STEP [ 2 / 5 ]: Identify duplicates`);
const dupes = this.find_duplicates();
this.hashes.clear(); // Save memory
l.log(`STEP [ 2 / 5 ]: ${dupes.length} duplicate groups identified`);
l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`);
const deletion_lists = this.assemble_deletion_lists(dupes);
l.log(`STEP [ 3 / 5 ]: ${deletion_lists.values().reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`);
l.log(`STEP [ 4 / 5 ]: Delete duplicates`);
await p_map(
deletion_lists.entries(),
async (args) => await this.#do_single_delete(...args),
{ concurrency: this.worker_count + 10 }
);
l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`);
l.log(`STEP [ 5 / 5 ]: Recompress files`);
const { recompress_lines, recompress_files } = await records_recompress(
dirpath_source, dirpath_target ?? this.#adjacent_dir(dirpath_source),
this.items_per_file
);
l.log(`STEP [ 5 / 5 ]: Complete with ${recompress_files} files ${recompress_lines} lines at final count.`);
l.log(`Done in ${pretty_ms(new Date() - time_start)}, thank you :D`);
}
#adjacent_dir(dir, target="deduped") {
const dirname = path.dirname(dir);
const basename = path.basename(dir);
return path.join(dirname, `${basename}-${tag}`);
}
find_duplicates() {
const result = [];
for(const [ id, hash ] of this.hashes.entries()) {
const dupes_group = [ { id, hash } ];
for(const [ id_inner, hash_inner ] of this.hashes.entries()) {
if(id === id_inner) continue;
if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner });
}
if(dupes_group.length > 1) {
result.push(result);
}
}
return result;
}
assemble_deletion_lists(dupe_groups) {
const result = new Map();
for(const dupe_group of dupe_groups) {
for(const dupe of dupe_group.slice(1)) { // Keep the first one
const [ filename, i ] = dupe.id.split(`|`, 2);
if(!result.has(filename)) result.set(filename, []);
result.get(filename).push(i);
}
}
return result;
}
async #do_single_hash(filepath) {
if(filepath.includes("|")) throw new Error(`Filepath contains bar character: ${filepath}`);
const filename = path.basename(filepath);
l.log(`Hashing ${path.basename(filepath)}`);
const result = await p_reflect(this.proxy.hash_targets(filepath));
if(result.isRejected) {
l.warn(`Got error from worker when hashing ${filename}:`, result.reason);
return;
}
for(const { i, hash } of result.value) {
this.hashes.set(`${filepath}|${i}`, hash);
}
}
async #do_single_delete(filename, deletion_list) {
const result = await p_reflect(this.proxy.delete_duplicates(filename, deletion_list));
if(result.isRejected) {
l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename}:`, result.reason);
return;
}
}
}
export default RecordUniqManager;

View file

@ -15,11 +15,30 @@ class RecordsWriter {
this.#gzip.pipe(this.#stream_out);
}
/**
* Writes a sample to the file, followed by a new line \n character.
* @param {Map} sample The sample to write.
* @return {Promise}
*/
async write(sample) {
const str = JSON.stringify(Object.fromEntries(sample));
await write_safe(this.#gzip, str+"\n");
}
/**
* Writes a raw value to the file, followed by a new line \n character.
* @param {string} line The thing to write.
* @return {Promise}
*/
async write_raw(line) {
await write_safe(this.#gzip, line+"\n");
}
/**
* Closes the underlying file gracefully.
* No more may be written to the file after this method is called.
* @return {Promise}
*/
async close() {
await end_safe(this.#gzip);
await end_safe(this.#stream_out);

View file

@ -0,0 +1,24 @@
"use strict";
import crypto from 'crypto';
import records_read from "../records_read.mjs";
import RecordsWriter from '../RecordsWriter.mjs';
import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
// This could be muxed together rather than use a worker like this in the main thread since it's I/O bound
export default async function(filepath, lines) {
const result = [];
let i = -1, writer = new RecordsWriter(filepath);
for await(const line of records_read(filename)) {
i++;
if(line === "" || lines.includes(i)) continue;
await writer.write_raw(line);
}
await writer.close();
return result;
}

View file

@ -0,0 +1,22 @@
"use strict";
import crypto from 'crypto';
import records_read from "../records_read.mjs";
import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
export default async function(filename) {
const result = [];
let i = -1;
for await(const line of records_read(filename)) {
i++;
if(line === "") continue;
// Ref https://stackoverflow.com/a/58307338/1460422
result.push({ i, hash: crypto.createHash("sha256").update(line, "binary").digest("base64") });
}
return result;
}

View file

@ -0,0 +1,11 @@
"use strict";
import workerpool from 'workerpool';
import hash_targets from './hash_targets.mjs';
import delete_duplicates from './delete_duplicates.mjs';
workerpool.worker({
hash_targets,
delete_duplicates
});

View file

@ -0,0 +1,19 @@
"use strict";
import fs from 'fs';
import nexline from 'nexline';
import gunzip from 'gunzip-maybe';
/**
* Reads the records from a (potentially gzipped) .jsonl / .jsonl.gz file.
* @param {string} filename The filename to read from.
* @return {AsyncGenerator<string>} An asynchronous generator that iteratively returns the lines in the file.
*/
function records_read(filename) {
return nexline({
input: fs.createReadStream(filename).pipe(gunzip())
});
}
export default records_read;

View file

@ -0,0 +1,48 @@
"use strict";
import fs from 'fs';
import path from 'path';
import nexline from 'nexline';
import pretty_ms from 'pretty-ms';
import RecordsWriter from './RecordsWriter.mjs';
async function records_recompress(dirpath_source, dirpath_target, items_per_file) {
const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename));
const reader = nexline({
input: files.map(filepath => fs.createReadStream(filepath).pipe(gunzip()))
});
if(!fs.existsSync(dirpath_target))
await fs.promises.mkdir(dirpath_target, { recursive: true });
let writer = null, i = 0, i_file = 0, i_this_file;
let time_start = new Date(), time_display = time_start;
for await(const line of reader) {
if(line === "") continue;
if(writer === null || i_this_file >= items_per_file) {
if(writer !== null) await writer.close();
writer = new RecordsWriter(path.join(dirpath_target, `${i_file}.jsonl.gz`));
i_file++; i_this_file = 0;
}
await writer.write_raw(line.trim());
i++;
i_this_file++;
if(new Date() - time_display > 2000) {
const elapsed = new Date() - time_start;
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${this.items_per_file - i_this_file} left for this file\r`)
}
}
await writer.close();
return { recompress_lines: i, recompress_files: i_file };
}
export default records_recompress;

View file

@ -4,7 +4,7 @@ export default function(cli) {
cli.subcommand("recordify", "Converts rainfall radar and water depth data to a directory of .jsonl.gz files.")
.argument("water", "Path to the water depths file, formatted as a stream of terrain50 objects. May or may not be gzipped.", null, "string")
.argument("rainfall", "Path to the rainfall radar data, formatted as jsonl. May or may not be gzipped.", null, "string")
.argument("count-file", "The number of records to store in each TFRecord file. See the documentation for the optimal value of this number (default: 4096).", 64*64)
.argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64)
.argument("rainfall-pattern", "The pattern of the number of time steps to average, as a comma-separated list of numbers. Given a point in time, each successive number specified works BACKWARDS from that point. For example, 1,4,10 would be 3 channels: 1 time step on it's own, then average the next 4 time steps, then average the next 10 steps (default: 1,3,3,5,12,24,48).", [1,3,3,5,12,24,48].reverse(), function(value) {
return value.split(",")
.map(el => parseInt(el))

View file

@ -0,0 +1,8 @@
"use strict";
export default function(cli) {
cli.subcommand("uniq", "Deduplicates the entries in a directory of recordified files.")
.argument("source", "Path to the source directory.", null, "string")
.argument("target", "Path to the target directory.", null, "string")
.argument("count-file", "The number of records to store in each record file. See the documentation for the optimal value of this number (default: 4096).", 64*64);
}

View file

@ -0,0 +1,18 @@
"use strict";
import fs from 'fs';
import settings from "../../settings.mjs";
import RecordUniqManager from '../../lib/record/RecordUniqManager.mjs';
export default async function() {
if(typeof settings.cli.source !== "string")
throw new Error(`Error: No source directory specified (see the --source CLI argument)`);
if(!fs.existsSync(settings.cli.source))
throw new Error(`Error: The source directory at '${settings.cli.source}' doesn't exist or you haven't got permission to access it.`);
const uniq_manager = new RecordUniqManager(settings.cli.count_file);
await uniq_manager.deduplicate(settings.cli.source, settings.cli.target);
}