Bugfix: don't make 1 group for each duplicate....

This commit is contained in:
Starbeamrainbowlabs 2022-07-22 17:06:02 +01:00
parent a966cdff35
commit ce303814d6
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
3 changed files with 15 additions and 10 deletions

View file

@ -41,6 +41,7 @@ class RecordUniqManager {
await this.init(); await this.init();
const time_start = new Date(); const time_start = new Date();
this.hashes.clear(); this.hashes.clear();
this.items_deleted = 0;
const files = (await fs.promises.readdir(dirpath_source)) const files = (await fs.promises.readdir(dirpath_source))
.filter(filename => filename.endsWith(".jsonl.gz")) .filter(filename => filename.endsWith(".jsonl.gz"))
@ -57,17 +58,16 @@ class RecordUniqManager {
l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`); l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`);
const deletion_lists = this.assemble_deletion_lists(dupes); const deletion_lists = this.assemble_deletion_lists(dupes);
console.log(deletion_lists);
l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`); l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`);
l.log(`STEP [ 4 / 5 ]: Delete duplicates`); l.log(`STEP [ 4 / 5 ]: Delete duplicates`);
l.error(`DEBUG There's a bug here where we somehow pass in the deletion list multiple times?????`);
await p_map( await p_map(
deletion_lists.entries(), deletion_lists.entries(),
async (args) => await this.#do_single_delete(...args), async (args) => await this.#do_single_delete(...args),
{ concurrency: this.worker_count + 10 } { concurrency: this.worker_count + 10 }
); );
l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`); l.log(`STEP [ 4 / 5 ]: ${this.items_deleted} duplicates deleted.`);
throw new Error("Error: NOTE: We need to fix the bug with the duplicate deletion before we can do anything else.");
l.log(`STEP [ 5 / 5 ]: Recompress files`); l.log(`STEP [ 5 / 5 ]: Recompress files`);
const { recompress_lines, recompress_files } = await records_recompress( const { recompress_lines, recompress_files } = await records_recompress(
@ -86,12 +86,15 @@ class RecordUniqManager {
find_duplicates() { find_duplicates() {
const result = []; const result = [];
const hashes_seen = [];
for(const [ id, hash ] of this.hashes.entries()) { for(const [ id, hash ] of this.hashes.entries()) {
if(hashes_seen.includes(hash)) continue;
const dupes_group = [ { id, hash } ]; const dupes_group = [ { id, hash } ];
for(const [ id_inner, hash_inner ] of this.hashes.entries()) { for(const [ id_inner, hash_inner ] of this.hashes.entries()) {
if(id === id_inner) continue; if(id === id_inner) continue;
if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner }); if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner });
} }
hashes_seen.push(hash);
if(dupes_group.length > 1) { if(dupes_group.length > 1) {
result.push(dupes_group); result.push(dupes_group);
} }
@ -116,7 +119,7 @@ class RecordUniqManager {
if(filepath.includes("|")) throw new Error(`Filepath contains bar character: ${filepath}`); if(filepath.includes("|")) throw new Error(`Filepath contains bar character: ${filepath}`);
const filename = path.basename(filepath); const filename = path.basename(filepath);
l.log(`Hashing ${path.basename(filepath)}`); // l.log(`Hashing ${path.basename(filepath)}`);
const result = await p_reflect(this.proxy.hash_targets(filepath)); const result = await p_reflect(this.proxy.hash_targets(filepath));
if(result.isRejected) { if(result.isRejected) {
l.warn(`Got error from worker when hashing ${filename}:`, result.reason); l.warn(`Got error from worker when hashing ${filename}:`, result.reason);
@ -132,8 +135,10 @@ class RecordUniqManager {
const result = await p_reflect(this.proxy.delete_duplicates(filename_source, deletion_list)); const result = await p_reflect(this.proxy.delete_duplicates(filename_source, deletion_list));
if(result.isRejected) { if(result.isRejected) {
l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename_source}:`, result.reason); l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename_source}:`, result.reason);
return; return null;
} }
this.items_deleted += result.value;
} }
} }

View file

@ -5,12 +5,10 @@ import fs from 'fs';
import records_read from "../records_read.mjs"; import records_read from "../records_read.mjs";
import RecordsWriter from '../RecordsWriter.mjs'; import RecordsWriter from '../RecordsWriter.mjs';
import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker:delete");
// This could be muxed together rather than use a worker like this in the main thread since it's I/O bound // This could be muxed together rather than use a worker like this in the main thread since it's I/O bound
export default async function(filepath_source, lines) { export default async function(filepath_source, lines) {
l.info(`DEBUG lines slated for deletion`, lines);
const filepath_tmp = `${filepath_source}.dupedeleteTMP`; const filepath_tmp = `${filepath_source}.dupedeleteTMP`;
let i = -1, count_deleted = 0, writer = new RecordsWriter(filepath_tmp); let i = -1, count_deleted = 0, writer = new RecordsWriter(filepath_tmp);
for await(const line of records_read(filepath_source)) { for await(const line of records_read(filepath_source)) {
@ -27,7 +25,7 @@ export default async function(filepath_source, lines) {
await fs.promises.rename(filepath_tmp, filepath_source); await fs.promises.rename(filepath_tmp, filepath_source);
l.log(`Deleted`, count_deleted, `lines out of`, lines.length, `slated`); l.log(`${filepath_source}: deleted`, count_deleted, `lines out of`, lines.length, `slated`);
return count_deleted; return count_deleted;
} }

View file

@ -4,7 +4,7 @@ import crypto from 'crypto';
import records_read from "../records_read.mjs"; import records_read from "../records_read.mjs";
import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker"); import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker:hash");
export default async function(filepath) { export default async function(filepath) {
const result = []; const result = [];
@ -18,5 +18,7 @@ export default async function(filepath) {
result.push({ i, hash: crypto.createHash("sha256").update(line, "binary").digest("base64") }); result.push({ i, hash: crypto.createHash("sha256").update(line, "binary").digest("base64") });
} }
l.log(`${filepath}: Hashed ${i+1} lines`);
return result; return result;
} }