Implement initial data processor, but it's untested.
This commit is contained in:
parent
d0d303bc4b
commit
70da049d49
5 changed files with 96 additions and 5 deletions
|
@ -28,6 +28,17 @@ class ReadingRepo {
|
|||
this.RSSIRepo.add(...reading.rssis);
|
||||
}
|
||||
|
||||
exists(id) {
|
||||
// Note that normally a count wouldn't be appropriate here just to test for existence, but it's ok because it's guaranteed that there will only ever be a single entry with a given id.
|
||||
let statement = this.db.prepare(`SELECT COUNT(id) AS count
|
||||
FROM readings
|
||||
WHERE id = :id`);
|
||||
|
||||
let count = statement.get({ id }).count;
|
||||
|
||||
return count > 0;
|
||||
}
|
||||
|
||||
iterate() {
|
||||
return this.db.prepare(`SELECT * FROM readings`).iterate();
|
||||
}
|
||||
|
|
|
@ -55,16 +55,32 @@ export default async function(c) {
|
|||
case "decode-test":
|
||||
l.log(`${a.fgreen}${a.hicol}Decoding message${a.reset}`);
|
||||
if(process.argv.length < 4) {
|
||||
l.error("No message specified. Specify the message to decode in single quotes directly after 'decode-test'.");
|
||||
l.error("Error: No message specified. Specify the message to decode in single quotes directly after 'decode-test'.");
|
||||
process.exit(1);
|
||||
}
|
||||
l.log("Input: ", process.argv[3]);
|
||||
l.log("Output: ", decode_payload(process.argv[3]));
|
||||
break;
|
||||
|
||||
case "process-data":
|
||||
l.log(`${a.fgreen}${a.hicol}Processing microSD card data${a.reset}`);
|
||||
if(process.argv.length < 4) {
|
||||
l.error("Error: No filename specified. Do so by specifying a filename after the 'process-data' subcommand.");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
let filepath = process.argv[3];
|
||||
|
||||
let data_processor = c.cradle.DataProcessor;
|
||||
await data_processor.process(filepath);
|
||||
l.log("Processing complete!");
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
console.error(`${a.fred}${a.hicol}Error: Subcommand '${extras[0]}' not recognised.${a.reset}`);
|
||||
console.error( `${a.fred}Perhaps you mistyped it, or it hasn't been implemented yet?`);
|
||||
l.error(`Error: Subcommand '${extras[0]}' not recognised.`);
|
||||
l.error(`Perhaps you mistyped it, or it hasn't been implemented yet?`);
|
||||
l.error(`Try invoking this program with --help to see a list of supported subcommands.`);
|
||||
process.exit(1);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@ import Log from '../Helpers/Log.mjs';
|
|||
import TTNAppServer from '../ttn-app-server/TTNAppServer.mjs';
|
||||
import MessageHandler from '../ttn-app-server/MessageHandler.mjs';
|
||||
|
||||
import DataProcessor from '../process-data/DataProcessor.mjs';
|
||||
|
||||
import settings from './settings.mjs';
|
||||
import database_init from '../bootstrap/database_init.mjs';
|
||||
|
||||
|
@ -28,7 +30,8 @@ export default async function setup() {
|
|||
log: a.asClass(Log).singleton(),
|
||||
database: a.asFunction(database_init).singleton(),
|
||||
TTNAppServer: a.asClass(TTNAppServer),
|
||||
MessageHandler: a.asClass(MessageHandler)
|
||||
MessageHandler: a.asClass(MessageHandler),
|
||||
DataProcessor: a.asClass(DataProcessor)
|
||||
});
|
||||
|
||||
let repo_filenames = await fs.readdir(path.join(root_dir, "./Repos.SQLite"));
|
||||
|
|
|
@ -3,7 +3,7 @@ CREATE TABLE IF NOT EXISTS readings (
|
|||
id INTEGER PRIMARY KEY, -- Random unique integer
|
||||
latitude FLOAT NOT NULL, -- Latitude component of GPS co-ordinates of reading
|
||||
longitude FLOAT NOT NULL, -- Longitude component of GPS co-ordinates of reading,
|
||||
data_rate TEXT NOT NULL, -- The id of the data rate code in the data_rates table that describes the data rate at which the message was transmitted
|
||||
data_rate TEXT, -- The id of the data rate code in the data_rates table that describes the data rate at which the message was transmitted
|
||||
code_rate TEXT -- The coding rate at which the message was transmitted. FUTURE: This may need to be an INTEGER field - not sure
|
||||
-- bit_rate INTEGER, -- The bit rate at which the message was transmitted -- We didn't actually recieve this - disparity between API docs & actual message
|
||||
);
|
||||
|
|
61
server/process-data/DataProcessor.mjs
Normal file
61
server/process-data/DataProcessor.mjs
Normal file
|
@ -0,0 +1,61 @@
|
|||
"use strict";
|
||||
|
||||
import fs from 'fs';
|
||||
import readline from 'readline';
|
||||
|
||||
/**
|
||||
* Folds data from the microSD card into the SQLite database.
|
||||
*/
|
||||
class DataProcessor {
|
||||
constructor({ settings, log, ReadingRepo }) {
|
||||
this.settings = settings;
|
||||
this.l = log;
|
||||
this.repo_reading = ReadingRepo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a filename, reads it and folds the data into the SQLite database.
|
||||
* @param {string} filename The path tot he file to read in.
|
||||
* @return {Promise} A promise that resolves when the folding is complete.
|
||||
*/
|
||||
process(filename) {
|
||||
return new Promise((resolve, reject) => { // Arrow functions keep the scope of their parent functions
|
||||
if(!fs.existsSync(filename)) {
|
||||
this.l.log(`Error: ${filename} doesn't exist.`);
|
||||
reject();
|
||||
}
|
||||
|
||||
let reader = readline.createInterface({
|
||||
input: fs.createReadStream(filename)
|
||||
});
|
||||
|
||||
reader.on("line", this.process_line);
|
||||
reader.on("close", resolve);
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
process_line(line_text) {
|
||||
let parts = line_text.split(/\s+/),
|
||||
reading = {
|
||||
id: parseInt(parts[0]),
|
||||
latitude: parseFloat(parts[1]),
|
||||
longitude: parseFloat(parts[2]),
|
||||
|
||||
// For inserting into the database
|
||||
data_rate: null,
|
||||
code_rate: null
|
||||
};
|
||||
|
||||
let log_message = `Processing reading with id ${reading.id}`;
|
||||
|
||||
if(!this.repo_reading.exists(reading.id)) {
|
||||
this.repo_reading.add(reading);
|
||||
log_message += " no database entry detected, inserted new record";
|
||||
}
|
||||
|
||||
this.l.log(log_message);
|
||||
}
|
||||
}
|
||||
|
||||
export default DataProcessor;
|
Loading…
Reference in a new issue