From ce303814d683c5385ea419790fadd73af10f5a62 Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Fri, 22 Jul 2022 17:06:02 +0100 Subject: [PATCH] Bugfix: don't make 1 group for each duplicate.... --- .../src/lib/record/RecordUniqManager.mjs | 15 ++++++++++----- .../record_uniq_worker/delete_duplicates.mjs | 6 ++---- .../record/record_uniq_worker/hash_targets.mjs | 4 +++- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs index 2db611e..0a5cce0 100644 --- a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs +++ b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs @@ -41,6 +41,7 @@ class RecordUniqManager { await this.init(); const time_start = new Date(); this.hashes.clear(); + this.items_deleted = 0; const files = (await fs.promises.readdir(dirpath_source)) .filter(filename => filename.endsWith(".jsonl.gz")) @@ -57,17 +58,16 @@ class RecordUniqManager { l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`); const deletion_lists = this.assemble_deletion_lists(dupes); + console.log(deletion_lists); l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`); l.log(`STEP [ 4 / 5 ]: Delete duplicates`); - l.error(`DEBUG There's a bug here where we somehow pass in the deletion list multiple times?????`); await p_map( deletion_lists.entries(), async (args) => await this.#do_single_delete(...args), { concurrency: this.worker_count + 10 } ); - l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`); - throw new Error("Error: NOTE: We need to fix the bug with the duplicate deletion before we can do anything else."); + l.log(`STEP [ 4 / 5 ]: ${this.items_deleted} duplicates deleted.`); l.log(`STEP [ 5 / 5 ]: Recompress files`); const { recompress_lines, recompress_files } = await records_recompress( @@ -86,12 +86,15 @@ class RecordUniqManager { find_duplicates() { const result = []; + const hashes_seen = []; for(const [ id, hash ] of this.hashes.entries()) { + if(hashes_seen.includes(hash)) continue; const dupes_group = [ { id, hash } ]; for(const [ id_inner, hash_inner ] of this.hashes.entries()) { if(id === id_inner) continue; if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner }); } + hashes_seen.push(hash); if(dupes_group.length > 1) { result.push(dupes_group); } @@ -116,7 +119,7 @@ class RecordUniqManager { if(filepath.includes("|")) throw new Error(`Filepath contains bar character: ${filepath}`); const filename = path.basename(filepath); - l.log(`Hashing ${path.basename(filepath)}`); + // l.log(`Hashing ${path.basename(filepath)}`); const result = await p_reflect(this.proxy.hash_targets(filepath)); if(result.isRejected) { l.warn(`Got error from worker when hashing ${filename}:`, result.reason); @@ -132,8 +135,10 @@ class RecordUniqManager { const result = await p_reflect(this.proxy.delete_duplicates(filename_source, deletion_list)); if(result.isRejected) { l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename_source}:`, result.reason); - return; + return null; } + + this.items_deleted += result.value; } } diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs index 7007d12..4daf498 100644 --- a/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/delete_duplicates.mjs @@ -5,12 +5,10 @@ import fs from 'fs'; import records_read from "../records_read.mjs"; import RecordsWriter from '../RecordsWriter.mjs'; -import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); +import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker:delete"); // This could be muxed together rather than use a worker like this in the main thread since it's I/O bound export default async function(filepath_source, lines) { - - l.info(`DEBUG lines slated for deletion`, lines); const filepath_tmp = `${filepath_source}.dupedeleteTMP`; let i = -1, count_deleted = 0, writer = new RecordsWriter(filepath_tmp); for await(const line of records_read(filepath_source)) { @@ -27,7 +25,7 @@ export default async function(filepath_source, lines) { await fs.promises.rename(filepath_tmp, filepath_source); - l.log(`Deleted`, count_deleted, `lines out of`, lines.length, `slated`); + l.log(`${filepath_source}: deleted`, count_deleted, `lines out of`, lines.length, `slated`); return count_deleted; } diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs index ce089da..a00db4a 100644 --- a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs @@ -4,7 +4,7 @@ import crypto from 'crypto'; import records_read from "../records_read.mjs"; -import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); +import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker:hash"); export default async function(filepath) { const result = []; @@ -18,5 +18,7 @@ export default async function(filepath) { result.push({ i, hash: crypto.createHash("sha256").update(line, "binary").digest("base64") }); } + l.log(`${filepath}: Hashed ${i+1} lines`); + return result; } \ No newline at end of file