Implement initial http table_fetch endpoint, but it's not working right

This commit is contained in:
Starbeamrainbowlabs 2022-02-12 16:27:50 +00:00
parent 3f26696056
commit f320617117
Signed by: sbrl
GPG key ID: 1BE5172E637709C2
5 changed files with 42 additions and 14 deletions

View file

@ -6,6 +6,7 @@ import log from './io/NamespacedLog.mjs'; const l = log("systemquery");
import Agent from './agent/Agent.mjs';
import InfoBroker from './core/InfoBroker.mjs';
import HttpSubsystem from './agent/subsystems/http/HttpSubsystem.mjs';
import ItemQueue from './async/ItemQueue.mjs';
@ -19,6 +20,8 @@ class SystemQuery extends EventEmitter {
this.mode = mode;
this.config = config;
this.info = new InfoBroker();
this.http = new HttpSubsystem(this);
}
async init() {
@ -26,6 +29,10 @@ class SystemQuery extends EventEmitter {
// 1: Create agent
///
this.agent = new Agent(this.config);
this.http.init(
this.config.net.http.port,
this.config.net.http.bind_address
);
await this.agent.init();
///
@ -33,6 +40,7 @@ class SystemQuery extends EventEmitter {
///
this.agent.on("message-query", this.handle_query.bind(this));
this.agent.on("message-query-response", this.handle_query_response.bind(this));
}
async handle_query(peer, msg) {
@ -55,7 +63,7 @@ class SystemQuery extends EventEmitter {
return await once(this.agent, `message-${event_name}`, { signal: ac });
}
*fetch_table(name) {
async *fetch_table(name) {
// If it isn't valid for us, it ain't gonna be valid for anyone else....
if(!this.info.is_valid_table(name)) return null;
@ -74,8 +82,6 @@ class SystemQuery extends EventEmitter {
// If it's not the right table, ignore it
if(msg.name !== name) return;
queue.push({ peer, table: msg.table });
};
@ -86,11 +92,14 @@ class SystemQuery extends EventEmitter {
// we do not drop any messages due to the use of the ItemQueue.
this.agent.broadcast(`query`, { name });
let peers_seen = [];
while(peers_seen.length < this.agent.connected_peers.length) {
let next = queue.pop(this.config.net.table_config);
let next = await queue.pop(this.config.net.table_config);
if(typeof next === "undefined") // We timed out
break;
console.log(`DEBUG`, next);
if(!peers_seen.includes(next.peer.id))
peers_seen.push(next.peer.id);
yield next;
}

View file

@ -2,16 +2,22 @@
import http from 'http';
import log from '../../../io/NamespacedLog.mjs'; const l = log("http");
import make_router from './routes.mjs';
class HttpSubsystem {
constructor(agent) {
this.agent = agent;
constructor(sys) {
this.sys = sys;
this.router = make_router(this.sys);
}
init(port, address = `127.0.0.1`) {
this.http = http.createServer((req, res) => {
init(port, address = `::1`) {
this.http = http.createServer(async (req, res) => {
await this.router.handle(req, res);
});
this.
const address_pretty = address.indexOf(`:`) > -1 ? `[${address}]` : address;
this.http.listen(port, address, () => l.log(`Listening on http://${address_pretty}:${port}`));
}
}

View file

@ -21,6 +21,10 @@ class ServerSentEventStream {
write_json(event_name, data_obj) {
return this.write(event_name, JSON.stringify(data_obj));
}
end() {
this.response.end();
}
}
export default ServerSentEventStream;

View file

@ -4,10 +4,10 @@ import ServerRouter from 'powahroot/Server.mjs';
import route_table from './routes/table.mjs';
export default function(agent) {
export default function(sys) {
const router = new ServerRouter();
router.on(`/api/table/:table_name`, route_table.bind(this, agent));
router.get(`/api/table/:table_name`, route_table.bind(this, sys));
return router;
}

View file

@ -1,5 +1,14 @@
"use strict";
export default function(agent, context, next) {
import ServerSentEventStream from '../ServerSentEventStream.mjs';
export default async function(sys, ctx, _next) {
let stream = new ServerSentEventStream(ctx.response);
let table_name = sys.fetch_table(ctx.params.table_name.replace(/[^0-9a-zA-Z_-]/g, ""));
for await (let tabledef of table_name) {
stream.write_json(`table`, tabledef);
}
stream.end();
}