mirror of
https://github.com/sbrl/research-rainfallradar
synced 2024-11-21 17:03:00 +00:00
uniq: bugfix a lot, but it's not working right just yet
There's still a bug in the file line deletor
This commit is contained in:
parent
3b2715c6cd
commit
a966cdff35
7 changed files with 44 additions and 27 deletions
|
@ -42,7 +42,7 @@ function write_safe(stream_out, data) {
|
|||
*/
|
||||
function end_safe(stream, chunk = undefined) {
|
||||
return new Promise((resolve, _reject) => {
|
||||
stream.once("finish", () => { console.log(`end_safe DEBUG finish`); resolve(); });
|
||||
stream.once("finish", resolve);
|
||||
if(typeof chunk == "undefined") stream.end();
|
||||
else stream.end(chunk);
|
||||
});
|
||||
|
|
|
@ -38,10 +38,13 @@ class RecordUniqManager {
|
|||
}
|
||||
|
||||
async deduplicate(dirpath_source, dirpath_target) {
|
||||
await this.init();
|
||||
const time_start = new Date();
|
||||
this.hashes.clear();
|
||||
|
||||
const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename));
|
||||
const files = (await fs.promises.readdir(dirpath_source))
|
||||
.filter(filename => filename.endsWith(".jsonl.gz"))
|
||||
.map(filename => path.join(dirpath_source, filename));
|
||||
|
||||
l.log(`STEP [1 / 5]: Hashing files`);
|
||||
await p_map(files, this.#do_single_hash.bind(this), { concurrency: this.worker_count + 10 });
|
||||
|
@ -54,15 +57,17 @@ class RecordUniqManager {
|
|||
|
||||
l.log(`STEP [ 3 / 5 ]: Assemble deletion lists`);
|
||||
const deletion_lists = this.assemble_deletion_lists(dupes);
|
||||
l.log(`STEP [ 3 / 5 ]: ${deletion_lists.values().reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`);
|
||||
l.log(`STEP [ 3 / 5 ]: ${[...deletion_lists.values()].reduce((acc, next) => next.length + acc, 0)} duplicates to be deleted.`);
|
||||
|
||||
l.log(`STEP [ 4 / 5 ]: Delete duplicates`);
|
||||
l.error(`DEBUG There's a bug here where we somehow pass in the deletion list multiple times?????`);
|
||||
await p_map(
|
||||
deletion_lists.entries(),
|
||||
async (args) => await this.#do_single_delete(...args),
|
||||
{ concurrency: this.worker_count + 10 }
|
||||
);
|
||||
l.log(`STEP [ 4 / 5 ]: Duplicates deleted.`);
|
||||
throw new Error("Error: NOTE: We need to fix the bug with the duplicate deletion before we can do anything else.");
|
||||
|
||||
l.log(`STEP [ 5 / 5 ]: Recompress files`);
|
||||
const { recompress_lines, recompress_files } = await records_recompress(
|
||||
|
@ -88,7 +93,7 @@ class RecordUniqManager {
|
|||
if(hash === hash_inner) dupes_group.push( { id: id_inner, hash: hash_inner });
|
||||
}
|
||||
if(dupes_group.length > 1) {
|
||||
result.push(result);
|
||||
result.push(dupes_group);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,7 +106,7 @@ class RecordUniqManager {
|
|||
for(const dupe of dupe_group.slice(1)) { // Keep the first one
|
||||
const [ filename, i ] = dupe.id.split(`|`, 2);
|
||||
if(!result.has(filename)) result.set(filename, []);
|
||||
result.get(filename).push(i);
|
||||
result.get(filename).push(parseInt(i, 10));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
@ -123,10 +128,10 @@ class RecordUniqManager {
|
|||
}
|
||||
}
|
||||
|
||||
async #do_single_delete(filename, deletion_list) {
|
||||
const result = await p_reflect(this.proxy.delete_duplicates(filename, deletion_list));
|
||||
async #do_single_delete(filename_source, deletion_list) {
|
||||
const result = await p_reflect(this.proxy.delete_duplicates(filename_source, deletion_list));
|
||||
if(result.isRejected) {
|
||||
l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename}:`, result.reason);
|
||||
l.warn(`Got error from worker when deleting ${deletion_list.length} entries from ${filename_source}:`, result.reason);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,24 +1,33 @@
|
|||
"use strict";
|
||||
|
||||
import crypto from 'crypto';
|
||||
import fs from 'fs';
|
||||
|
||||
import records_read from "../records_read.mjs";
|
||||
import RecordsWriter from '../RecordsWriter.mjs';
|
||||
|
||||
import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
|
||||
import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
|
||||
|
||||
// This could be muxed together rather than use a worker like this in the main thread since it's I/O bound
|
||||
export default async function(filepath, lines) {
|
||||
const result = [];
|
||||
export default async function(filepath_source, lines) {
|
||||
|
||||
let i = -1, writer = new RecordsWriter(filepath);
|
||||
for await(const line of records_read(filename)) {
|
||||
l.info(`DEBUG lines slated for deletion`, lines);
|
||||
const filepath_tmp = `${filepath_source}.dupedeleteTMP`;
|
||||
let i = -1, count_deleted = 0, writer = new RecordsWriter(filepath_tmp);
|
||||
for await(const line of records_read(filepath_source)) {
|
||||
i++;
|
||||
if(line === "" || lines.includes(i)) continue;
|
||||
if(line === "") continue;
|
||||
if(lines.includes(i)) {
|
||||
count_deleted++;
|
||||
continue;
|
||||
}
|
||||
|
||||
await writer.write_raw(line);
|
||||
}
|
||||
await writer.close();
|
||||
|
||||
return result;
|
||||
await fs.promises.rename(filepath_tmp, filepath_source);
|
||||
|
||||
l.log(`Deleted`, count_deleted, `lines out of`, lines.length, `slated`);
|
||||
|
||||
return count_deleted;
|
||||
}
|
||||
|
|
|
@ -4,13 +4,13 @@ import crypto from 'crypto';
|
|||
|
||||
import records_read from "../records_read.mjs";
|
||||
|
||||
import log from '../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
|
||||
import log from '../../io/NamespacedLog.mjs'; const l = log("recorduniq:worker");
|
||||
|
||||
export default async function(filename) {
|
||||
export default async function(filepath) {
|
||||
const result = [];
|
||||
|
||||
let i = -1;
|
||||
for await(const line of records_read(filename)) {
|
||||
for await(const line of records_read(filepath)) {
|
||||
i++;
|
||||
if(line === "") continue;
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"use strict";
|
||||
|
||||
import fs from 'fs';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import nexline from 'nexline';
|
||||
import gunzip from 'gunzip-maybe';
|
||||
|
@ -12,7 +13,7 @@ import gunzip from 'gunzip-maybe';
|
|||
*/
|
||||
function records_read(filename) {
|
||||
return nexline({
|
||||
input: fs.createReadStream(filename).pipe(gunzip())
|
||||
input: new Readable().wrap(fs.createReadStream(filename).pipe(gunzip()))
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -2,9 +2,11 @@
|
|||
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import nexline from 'nexline';
|
||||
import pretty_ms from 'pretty-ms';
|
||||
import gunzip from 'gunzip-maybe';
|
||||
|
||||
import RecordsWriter from './RecordsWriter.mjs';
|
||||
|
||||
|
@ -12,7 +14,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file
|
|||
const files = (await fs.promises.readdir(dirpath_source)).map(filename => path.join(dirpath_source, filename));
|
||||
|
||||
const reader = nexline({
|
||||
input: files.map(filepath => fs.createReadStream(filepath).pipe(gunzip()))
|
||||
input: files.map(filepath => new Readable().wrap(fs.createReadStream(filepath).pipe(gunzip())))
|
||||
});
|
||||
|
||||
|
||||
|
@ -37,7 +39,7 @@ async function records_recompress(dirpath_source, dirpath_target, items_per_file
|
|||
|
||||
if(new Date() - time_display > 2000) {
|
||||
const elapsed = new Date() - time_start;
|
||||
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${this.items_per_file - i_this_file} left for this file\r`)
|
||||
process.stdout.write(`${pretty_ms(elapsed, { keepDecimalsOnWholeSeconds: true })} elapsed | ${i_file}/${i_this_file}/${i} files/this file/total | ${(1000 / (elapsed / i)).toFixed(2)} lines/sec | ${items_per_file - i_this_file} left for this file\r`)
|
||||
}
|
||||
}
|
||||
await writer.close();
|
||||
|
|
|
@ -7,12 +7,12 @@ import settings from "../../settings.mjs";
|
|||
import RecordUniqManager from '../../lib/record/RecordUniqManager.mjs';
|
||||
|
||||
export default async function() {
|
||||
if(typeof settings.cli.source !== "string")
|
||||
if(typeof settings.source !== "string")
|
||||
throw new Error(`Error: No source directory specified (see the --source CLI argument)`);
|
||||
if(!fs.existsSync(settings.cli.source))
|
||||
throw new Error(`Error: The source directory at '${settings.cli.source}' doesn't exist or you haven't got permission to access it.`);
|
||||
if(!fs.existsSync(settings.source))
|
||||
throw new Error(`Error: The source directory at '${settings.source}' doesn't exist or you haven't got permission to access it.`);
|
||||
|
||||
|
||||
const uniq_manager = new RecordUniqManager(settings.cli.count_file);
|
||||
await uniq_manager.deduplicate(settings.cli.source, settings.cli.target);
|
||||
const uniq_manager = new RecordUniqManager(settings.count_file);
|
||||
await uniq_manager.deduplicate(settings.source, settings.target);
|
||||
}
|
Loading…
Reference in a new issue