uniq: bugfix a lot, but it's not working right just yet

There's still a bug in the file line deletor
This commit is contained in:
Starbeamrainbowlabs 2022-07-08 19:54:24 +01:00
parent 3b2715c6cd
commit 38a0bd0942
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
7 changed files with 44 additions and 27 deletions

View file

@ -42,7 +42,7 @@ function write_safe(stream_out, data) {
*/
function end_safe(stream, chunk = undefined) {
return new Promise((resolve, _reject) => {
stream.once("finish", () => { console.log(`end_safe DEBUG finish`); resolve(); });
stream.once("finish", resolve);
if(typeof chunk == "undefined") stream.end();
else stream.end(chunk);
});

View file

@ -38,10 +38,13 @@ class RecordUniqManager {
}
async deduplicate(dirpath_source, dirpath_target) {
await this.init();
const time_start = new Date();
this.hashes.clear();
const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename));
const files = (await fs.promises.readdir(dirpath_source))
.filter(filename => filename.endsWith(".jsonl.gz"))
.map(filename => path.join(dirpath_source, filename));
l.log(`STEP [1 / 5]: Hashing files`);
await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 });
@ -54,15 +57,17 @@ class RecordUniqManager {
l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`);
const deletion_lists = this.assemble_deletion_lists(dupes);
l.log(`STEP [ 3 / 5 ]: ${deletion_lists.values().reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`);
l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`);
l.log(`STEP [ 4 / 5 ]: Delete duplicates`);
l.error(`DEBUG There's a bug here where we somehow pass in the deletion list multiple times?????`);
await p_map(
deletion_lists.entries(),
async (args) => await this.#do_single_delete(...args),
{ concurrency: this.worker_count + 10 }
);
l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`);
throw new Error("Error: NOTE: We need to fix the bug with the duplicate deletion before we can do anything else.");
l.log(`STEP [ 5 / 5 ]: Recompress files`);
const { recompress_lines, recompress_files } = await records_recompress(
@ -88,7 +93,7 @@ class RecordUniqManager {
if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner });
}
if(dupes_group.length > 1) {
result.push(result);
result.push(dupes_group);
}
}
@ -101,7 +106,7 @@ class RecordUniqManager {
for(const dupe of dupe_group.slice(1)) { // Keep the first one
const [ filename, i ] = dupe.id.split(`|`, 2);
if(!result.has(filename)) result.set(filename, []);
result.get(filename).push(i);
result.get(filename).push(parseInt(i, 10));
}
}
return result;
@ -123,10 +128,10 @@ class RecordUniqManager {
}
}
async #do_single_delete(filename, deletion_list) {
const result = await p_reflect(this.proxy.delete_duplicates(filename, deletion_list));
async #do_single_delete(filename_source, deletion_list) {
const result = await p_reflect(this.proxy.delete_duplicates(filename_source, deletion_list));
if(result.isRejected) {
l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename}:`, result.reason);
l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename_source}:`, result.reason);
return;
}
}

View file

@ -1,24 +1,33 @@
"use strict";
import crypto from 'crypto';
import fs from 'fs';
import records_read from "../records_read.mjs";
import RecordsWriter from '../RecordsWriter.mjs';
import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
// This could be muxed together rather than use a worker like this in the main thread since it's I/O bound
export default async function(filepath, lines) {
const result = [];
export default async function(filepath_source, lines) {
let i = -1, writer = new RecordsWriter(filepath);
for await(const line of records_read(filename)) {
l.info(`DEBUG lines slated for deletion`, lines);
const filepath_tmp = `${filepath_source}.dupedeleteTMP`;
let i = -1, count_deleted = 0, writer = new RecordsWriter(filepath_tmp);
for await(const line of records_read(filepath_source)) {
i++;
if(line === "" || lines.includes(i)) continue;
if(line === "") continue;
if(lines.includes(i)) {
count_deleted++;
continue;
}
await writer.write_raw(line);
}
await writer.close();
return result;
await fs.promises.rename(filepath_tmp, filepath_source);
l.log(`Deleted`, count_deleted, `lines out of`, lines.length, `slated`);
return count_deleted;
}

View file

@ -4,13 +4,13 @@ import crypto from 'crypto';
import records_read from "../records_read.mjs";
import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
export default async function(filename) {
export default async function(filepath) {
const result = [];
let i = -1;
for await(const line of records_read(filename)) {
for await(const line of records_read(filepath)) {
i++;
if(line === "") continue;

View file

@ -1,6 +1,7 @@
"use strict";
import fs from 'fs';
import { Readable } from 'stream';
import nexline from 'nexline';
import gunzip from 'gunzip-maybe';
@ -12,7 +13,7 @@ import gunzip from 'gunzip-maybe';
*/
function records_read(filename) {
return nexline({
input: fs.createReadStream(filename).pipe(gunzip())
input: new Readable().wrap(fs.createReadStream(filename).pipe(gunzip()))
});
}

View file

@ -2,9 +2,11 @@
import fs from 'fs';
import path from 'path';
import { Readable } from 'stream';
import nexline from 'nexline';
import pretty_ms from 'pretty-ms';
import gunzip from 'gunzip-maybe';
import RecordsWriter from './RecordsWriter.mjs';
@ -12,7 +14,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file
const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename));
const reader = nexline({
input: files.map(filepath => fs.createReadStream(filepath).pipe(gunzip()))
input: files.map(filepath => new Readable().wrap(fs.createReadStream(filepath).pipe(gunzip())))
});
@ -37,7 +39,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file
if(new Date() - time_display > 2000) {
const elapsed = new Date() - time_start;
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${this.items_per_file - i_this_file} left for this file\r`)
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file\r`)
}
}
await writer.close();

View file

@ -7,12 +7,12 @@ import settings from "../../settings.mjs";
import RecordUniqManager from '../../lib/record/RecordUniqManager.mjs';
export default async function() {
if(typeof settings.cli.source !== "string")
if(typeof settings.source !== "string")
throw new Error(`Error: No source directory specified (see the --source CLI argument)`);
if(!fs.existsSync(settings.cli.source))
throw new Error(`Error: The source directory at '${settings.cli.source}' doesn't exist or you haven't got permission to access it.`);
if(!fs.existsSync(settings.source))
throw new Error(`Error: The source directory at '${settings.source}' doesn't exist or you haven't got permission to access it.`);
const uniq_manager = new RecordUniqManager(settings.cli.count_file);
await uniq_manager.deduplicate(settings.cli.source, settings.cli.target);
const uniq_manager = new RecordUniqManager(settings.count_file);
await uniq_manager.deduplicate(settings.source, settings.target);
}