Fix bugs in remainder of rainfallwrangler:uniq :D

This commit is contained in:
Starbeamrainbowlabs 2022-07-22 18:05:03 +01:00
parent 31bd7899b6
commit 82e826fd69
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
5 changed files with 15 additions and 9 deletions

View file

@ -1,7 +1,6 @@
#!/usr/bin/env node
"use strict";
import cli from './cli.mjs';

View file

@ -47,20 +47,22 @@ class RecordUniqManager {
.filter(filename => filename.endsWith(".jsonl.gz"))
.map(filename => path.join(dirpath_source, filename));
l.log(`STEP [1 / 5]: Hashing files`);
l.log(`STEP [1 / 5]: Hashing ${files.length} files`);
const time_start_hash = new Date();
await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 });
l.log(`STEP [1 / 5]: ${this.hashes.size} hashes gathered in total.`);
const time_per_hash = (new Date() - time_start_hash) / this.hashes.size;
l.log(`STEP [1 / 5]: ${this.hashes.size} hashes gathered in total, averaging ${(1000/time_per_hash).toFixed(2)} hashes/sec.`);
l.log(`STEP [ 2 / 5 ]: Identify duplicates`);
// l.log(`STEP [ 2 / 5 ]: Identify duplicates`);
const dupes = this.find_duplicates();
this.hashes.clear(); // Save memory
l.log(`STEP [ 2 / 5 ]: ${dupes.length} duplicate groups identified`);
l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`);
// l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`);
const deletion_lists = this.assemble_deletion_lists(dupes);
l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`);
l.log(`STEP [ 4 / 5 ]: Delete duplicates`);
// l.log(`STEP [ 4 / 5 ]: Delete duplicates`);
await p_map(
deletion_lists.entries(),
async (args) => await this.#do_single_delete(...args),
@ -139,6 +141,10 @@ class RecordUniqManager {
this.items_deleted += result.value;
}
close() {
this.pool.terminate();
}
}
export default RecordUniqManager;

View file

@ -8,7 +8,6 @@ import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker:h
export default async function(filepath) {
const result = [];
let i = -1;
for await(const line of records_read(filepath)) {
i++;

View file

@ -37,9 +37,10 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file
i++;
i_this_file++;
if(new Date() - time_display > 2000) {
if(new Date() - time_display > 500) {
const elapsed = new Date() - time_start;
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file\r`)
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/thisfile/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file \r`);
time_display = new Date();
}
}
await writer.close();

View file

@ -15,4 +15,5 @@ export default async function() {
const uniq_manager = new RecordUniqManager(settings.count_file);
await uniq_manager.deduplicate(settings.source, settings.target);
uniq_manager.close(); // Terminate the workerpool nowt hat we're done
}