From a966cdff358ff3c657bb91d0ec8309efb01e7694 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Fri, 8 Jul 2022 19:54:24 +0100 Subject: [PATCH] uniq: bugfix a lot, but it's not working right just yet There's still a bug in the file line deletor --- rainfallwrangler/src/lib/io/StreamHelpers.mjs | 2 +- .../src/lib/record/RecordUniqManager.mjs | 19 ++++++++------ .../record_uniq_worker/delete_duplicates.mjs | 25 +++++++++++++------ .../record_uniq_worker/hash_targets.mjs | 6 ++--- .../src/lib/record/records_read.mjs | 3 ++- .../src/lib/record/records_recompress.mjs | 6 +++-- .../src/subcommands/uniq/uniq.mjs | 10 ++++---- 7 files changed, 44 insertions(+), 27 deletions(-) diff --git a/rainfallwrangler/src/lib/io/StreamHelpers.mjs b/rainfallwrangler/src/lib/io/StreamHelpers.mjs index 69af5e9..a15bc9f 100644 --- a/rainfallwrangler/src/lib/io/StreamHelpers.mjs +++ b/rainfallwrangler/src/lib/io/StreamHelpers.mjs @@ -42,7 +42,7 @@ function write_safe(stream_out, data) { */ function end_safe(stream, chunk = undefined) { return new Promise((resolve, _reject) => { - stream.once("finish", () => { console.log(`end_safe DEBUG finish`); resolve(); }); + stream.once("finish", resolve); if(typeof chunk == "undefined") stream.end(); else stream.end(chunk); }); diff --git a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs index f75334d..2db611e 100644 --- a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs +++ b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs @@ -38,10 +38,13 @@ class RecordUniqManager { } async deduplicate(dirpath_source, dirpath_target) { + await this.init(); const time_start = new Date(); this.hashes.clear(); - const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename)); + const files = (await fs.promises.readdir(dirpath_source)) + .filter(filename => filename.endsWith(".jsonl.gz")) + .map(filename => path.join(dirpath_source, filename)); l.log(`STEP [1 / 5]: Hashing files`); await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 }); @@ -54,15 +57,17 @@ class RecordUniqManager { l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`); const deletion_lists = this.assemble_deletion_lists(dupes); - l.log(`STEP [ 3 / 5 ]: ${deletion_lists.values().reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`); + l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`); l.log(`STEP [ 4 / 5 ]: Delete duplicates`); + l.error(`DEBUG There's a bug here where we somehow pass in the deletion list multiple times?????`); await p_map( deletion_lists.entries(), async (args) => await this.#do_single_delete(...args), { concurrency: this.worker_count + 10 } ); l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`); + throw new Error("Error: NOTE: We need to fix the bug with the duplicate deletion before we can do anything else."); l.log(`STEP [ 5 / 5 ]: Recompress files`); const { recompress_lines, recompress_files } = await records_recompress( @@ -88,7 +93,7 @@ class RecordUniqManager { if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner }); } if(dupes_group.length > 1) { - result.push(result); + result.push(dupes_group); } } @@ -101,7 +106,7 @@ class RecordUniqManager { for(const dupe of dupe_group.slice(1)) { // Keep the first one const [ filename, i ] = dupe.id.split(`|`, 2); if(!result.has(filename)) result.set(filename, []); - result.get(filename).push(i); + result.get(filename).push(parseInt(i, 10)); } } return result; @@ -123,10 +128,10 @@ class RecordUniqManager { } } - async #do_single_delete(filename, deletion_list) { - const result = await p_reflect(this.proxy.delete_duplicates(filename, deletion_list)); + async #do_single_delete(filename_source, deletion_list) { + const result = await p_reflect(this.proxy.delete_duplicates(filename_source, deletion_list)); if(result.isRejected) { - l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename}:`, result.reason); + l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename_source}:`, result.reason); return; } } diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs index 002dc2e..7007d12 100644 --- a/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs @@ -1,24 +1,33 @@ "use strict"; -import crypto from 'crypto'; +import fs from 'fs'; import records_read from "../records_read.mjs"; import RecordsWriter from '../RecordsWriter.mjs'; -import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); +import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); // This could be muxed together rather than use a worker like this in the main thread since it's I/O bound -export default async function(filepath, lines) { - const result = []; +export default async function(filepath_source, lines) { - let i = -1, writer = new RecordsWriter(filepath); - for await(const line of records_read(filename)) { + l.info(`DEBUG lines slated for deletion`, lines); + const filepath_tmp = `${filepath_source}.dupedeleteTMP`; + let i = -1, count_deleted = 0, writer = new RecordsWriter(filepath_tmp); + for await(const line of records_read(filepath_source)) { i++; - if(line === "" || lines.includes(i)) continue; + if(line === "") continue; + if(lines.includes(i)) { + count_deleted++; + continue; + } await writer.write_raw(line); } await writer.close(); - return result; + await fs.promises.rename(filepath_tmp, filepath_source); + + l.log(`Deleted`, count_deleted, `lines out of`, lines.length, `slated`); + + return count_deleted; } diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs index 0835e84..ce089da 100644 --- a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs @@ -4,13 +4,13 @@ import crypto from 'crypto'; import records_read from "../records_read.mjs"; -import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); +import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); -export default async function(filename) { +export default async function(filepath) { const result = []; let i = -1; - for await(const line of records_read(filename)) { + for await(const line of records_read(filepath)) { i++; if(line === "") continue; diff --git a/rainfallwrangler/src/lib/record/records_read.mjs b/rainfallwrangler/src/lib/record/records_read.mjs index 2ec37e1..9fdc3a4 100644 --- a/rainfallwrangler/src/lib/record/records_read.mjs +++ b/rainfallwrangler/src/lib/record/records_read.mjs @@ -1,6 +1,7 @@ "use strict"; import fs from 'fs'; +import { Readable } from 'stream'; import nexline from 'nexline'; import gunzip from 'gunzip-maybe'; @@ -12,7 +13,7 @@ import gunzip from 'gunzip-maybe'; */ function records_read(filename) { return nexline({ - input: fs.createReadStream(filename).pipe(gunzip()) + input: new Readable().wrap(fs.createReadStream(filename).pipe(gunzip())) }); } diff --git a/rainfallwrangler/src/lib/record/records_recompress.mjs b/rainfallwrangler/src/lib/record/records_recompress.mjs index fa28106..b9621f4 100644 --- a/rainfallwrangler/src/lib/record/records_recompress.mjs +++ b/rainfallwrangler/src/lib/record/records_recompress.mjs @@ -2,9 +2,11 @@ import fs from 'fs'; import path from 'path'; +import { Readable } from 'stream'; import nexline from 'nexline'; import pretty_ms from 'pretty-ms'; +import gunzip from 'gunzip-maybe'; import RecordsWriter from './RecordsWriter.mjs'; @@ -12,7 +14,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename)); const reader = nexline({ - input: files.map(filepath => fs.createReadStream(filepath).pipe(gunzip())) + input: files.map(filepath => new Readable().wrap(fs.createReadStream(filepath).pipe(gunzip()))) }); @@ -37,7 +39,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file if(new Date() - time_display > 2000) { const elapsed = new Date() - time_start; - process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${this.items_per_file - i_this_file} left for this file\r`) + process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file\r`) } } await writer.close(); diff --git a/rainfallwrangler/src/subcommands/uniq/uniq.mjs b/rainfallwrangler/src/subcommands/uniq/uniq.mjs index 53ed1ed..53031a2 100644 --- a/rainfallwrangler/src/subcommands/uniq/uniq.mjs +++ b/rainfallwrangler/src/subcommands/uniq/uniq.mjs @@ -7,12 +7,12 @@ import settings from "../../settings.mjs"; import RecordUniqManager from '../../lib/record/RecordUniqManager.mjs'; export default async function() { - if(typeof settings.cli.source !== "string") + if(typeof settings.source !== "string") throw new Error(`Error: No source directory specified (see the --source CLI argument)`); - if(!fs.existsSync(settings.cli.source)) - throw new Error(`Error: The source directory at '${settings.cli.source}' doesn't exist or you haven't got permission to access it.`); + if(!fs.existsSync(settings.source)) + throw new Error(`Error: The source directory at '${settings.source}' doesn't exist or you haven't got permission to access it.`); - const uniq_manager = new RecordUniqManager(settings.cli.count_file); - await uniq_manager.deduplicate(settings.cli.source, settings.cli.target); + const uniq_manager = new RecordUniqManager(settings.count_file); + await uniq_manager.deduplicate(settings.source, settings.target); } \ No newline at end of file