diff --git a/rainfallwrangler/src/index.mjs b/rainfallwrangler/src/index.mjs index 832b3ce..51e0e6c 100755 --- a/rainfallwrangler/src/index.mjs +++ b/rainfallwrangler/src/index.mjs @@ -1,7 +1,6 @@ #!/usr/bin/env node "use strict"; - import cli from './cli.mjs'; diff --git a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs index bc40ff9..60f76f0 100644 --- a/rainfallwrangler/src/lib/record/RecordUniqManager.mjs +++ b/rainfallwrangler/src/lib/record/RecordUniqManager.mjs @@ -47,20 +47,22 @@ class RecordUniqManager { .filter(filename => filename.endsWith(".jsonl.gz")) .map(filename => path.join(dirpath_source, filename)); - l.log(`STEP [1 / 5]: Hashing files`); + l.log(`STEP [1 / 5]: Hashing ${files.length} files`); + const time_start_hash = new Date(); await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 }); - l.log(`STEP [1 / 5]: ${this.hashes.size} hashes gathered in total.`); + const time_per_hash = (new Date() - time_start_hash) / this.hashes.size; + l.log(`STEP [1 / 5]: ${this.hashes.size} hashes gathered in total, averaging ${(1000/time_per_hash).toFixed(2)} hashes/sec.`); - l.log(`STEP [ 2 / 5 ]: Identify duplicates`); + // l.log(`STEP [ 2 / 5 ]: Identify duplicates`); const dupes = this.find_duplicates(); this.hashes.clear(); // Save memory l.log(`STEP [ 2 / 5 ]: ${dupes.length} duplicate groups identified`); - l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`); + // l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`); const deletion_lists = this.assemble_deletion_lists(dupes); l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`); - l.log(`STEP [ 4 / 5 ]: Delete duplicates`); + // l.log(`STEP [ 4 / 5 ]: Delete duplicates`); await p_map( deletion_lists.entries(), async (args) => await this.#do_single_delete(...args), @@ -139,6 +141,10 @@ class RecordUniqManager { this.items_deleted += result.value; } + + close() { + this.pool.terminate(); + } } export default RecordUniqManager; diff --git a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs index 5f0fdf4..1d6f598 100644 --- a/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs +++ b/rainfallwrangler/src/lib/record/record_uniq_worker/hash_targets.mjs @@ -8,7 +8,6 @@ import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker:h export default async function(filepath) { const result = []; - let i = -1; for await(const line of records_read(filepath)) { i++; diff --git a/rainfallwrangler/src/lib/record/records_recompress.mjs b/rainfallwrangler/src/lib/record/records_recompress.mjs index b9621f4..2a9f5e4 100644 --- a/rainfallwrangler/src/lib/record/records_recompress.mjs +++ b/rainfallwrangler/src/lib/record/records_recompress.mjs @@ -37,9 +37,10 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file i++; i_this_file++; - if(new Date() - time_display > 2000) { + if(new Date() - time_display > 500) { const elapsed = new Date() - time_start; - process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file\r`) + process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/thisfile/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file \r`); + time_display = new Date(); } } await writer.close(); diff --git a/rainfallwrangler/src/subcommands/uniq/uniq.mjs b/rainfallwrangler/src/subcommands/uniq/uniq.mjs index 53031a2..27cbac7 100644 --- a/rainfallwrangler/src/subcommands/uniq/uniq.mjs +++ b/rainfallwrangler/src/subcommands/uniq/uniq.mjs @@ -15,4 +15,5 @@ export default async function() { const uniq_manager = new RecordUniqManager(settings.count_file); await uniq_manager.deduplicate(settings.source, settings.target); + uniq_manager.close(); // Terminate the workerpool nowt hat we're done } \ No newline at end of file