From f320617117491eb90385091ee44cd5f497ebc95c Mon Sep 17 00:00:00 2001 From: Starbeamrainbowlabs Date: Sat, 12 Feb 2022 16:27:50 +0000 Subject: [PATCH] Implement initial http table_fetch endpoint, but it's not working right --- src/lib/SystemQuery.mjs | 19 ++++++++++++++----- .../agent/subsystems/http/HttpSubsystem.mjs | 18 ++++++++++++------ .../subsystems/http/ServerSentEventStream.mjs | 4 ++++ src/lib/agent/subsystems/http/routes.mjs | 4 ++-- .../agent/subsystems/http/routes/table.mjs | 11 ++++++++++- 5 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/lib/SystemQuery.mjs b/src/lib/SystemQuery.mjs index e5f8fd0..9cfb6bf 100644 --- a/src/lib/SystemQuery.mjs +++ b/src/lib/SystemQuery.mjs @@ -6,6 +6,7 @@ import log from './io/NamespacedLog.mjs'; const l = log("systemquery"); import Agent from './agent/Agent.mjs'; import InfoBroker from './core/InfoBroker.mjs'; +import HttpSubsystem from './agent/subsystems/http/HttpSubsystem.mjs'; import ItemQueue from './async/ItemQueue.mjs'; @@ -19,6 +20,8 @@ class SystemQuery extends EventEmitter { this.mode = mode; this.config = config; this.info = new InfoBroker(); + + this.http = new HttpSubsystem(this); } async init() { @@ -26,6 +29,10 @@ class SystemQuery extends EventEmitter { // 1: Create agent /// this.agent = new Agent(this.config); + this.http.init( + this.config.net.http.port, + this.config.net.http.bind_address + ); await this.agent.init(); /// @@ -33,6 +40,7 @@ class SystemQuery extends EventEmitter { /// this.agent.on("message-query", this.handle_query.bind(this)); this.agent.on("message-query-response", this.handle_query_response.bind(this)); + } async handle_query(peer, msg) { @@ -55,7 +63,7 @@ class SystemQuery extends EventEmitter { return await once(this.agent, `message-${event_name}`, { signal: ac }); } - *fetch_table(name) { + async *fetch_table(name) { // If it isn't valid for us, it ain't gonna be valid for anyone else.... if(!this.info.is_valid_table(name)) return null; @@ -74,8 +82,6 @@ class SystemQuery extends EventEmitter { // If it's not the right table, ignore it if(msg.name !== name) return; - - queue.push({ peer, table: msg.table }); }; @@ -86,11 +92,14 @@ class SystemQuery extends EventEmitter { // we do not drop any messages due to the use of the ItemQueue. this.agent.broadcast(`query`, { name }); + let peers_seen = []; while(peers_seen.length < this.agent.connected_peers.length) { - let next = queue.pop(this.config.net.table_config); + let next = await queue.pop(this.config.net.table_config); if(typeof next === "undefined") // We timed out break; - + console.log(`DEBUG`, next); + if(!peers_seen.includes(next.peer.id)) + peers_seen.push(next.peer.id); yield next; } diff --git a/src/lib/agent/subsystems/http/HttpSubsystem.mjs b/src/lib/agent/subsystems/http/HttpSubsystem.mjs index 77eb5d5..b05621b 100644 --- a/src/lib/agent/subsystems/http/HttpSubsystem.mjs +++ b/src/lib/agent/subsystems/http/HttpSubsystem.mjs @@ -2,16 +2,22 @@ import http from 'http'; +import log from '../../../io/NamespacedLog.mjs'; const l = log("http"); +import make_router from './routes.mjs'; + class HttpSubsystem { - constructor(agent) { - this.agent = agent; + constructor(sys) { + this.sys = sys; + this.router = make_router(this.sys); } - init(port, address = `127.0.0.1`) { - this.http = http.createServer((req, res) => { - + init(port, address = `::1`) { + this.http = http.createServer(async (req, res) => { + await this.router.handle(req, res); }); - this. + + const address_pretty = address.indexOf(`:`) > -1 ? `[${address}]` : address; + this.http.listen(port, address, () => l.log(`Listening on http://${address_pretty}:${port}`)); } } diff --git a/src/lib/agent/subsystems/http/ServerSentEventStream.mjs b/src/lib/agent/subsystems/http/ServerSentEventStream.mjs index 7e14efd..dd14ddd 100644 --- a/src/lib/agent/subsystems/http/ServerSentEventStream.mjs +++ b/src/lib/agent/subsystems/http/ServerSentEventStream.mjs @@ -21,6 +21,10 @@ class ServerSentEventStream { write_json(event_name, data_obj) { return this.write(event_name, JSON.stringify(data_obj)); } + + end() { + this.response.end(); + } } export default ServerSentEventStream; diff --git a/src/lib/agent/subsystems/http/routes.mjs b/src/lib/agent/subsystems/http/routes.mjs index abc4a3c..27aa598 100644 --- a/src/lib/agent/subsystems/http/routes.mjs +++ b/src/lib/agent/subsystems/http/routes.mjs @@ -4,10 +4,10 @@ import ServerRouter from 'powahroot/Server.mjs'; import route_table from './routes/table.mjs'; -export default function(agent) { +export default function(sys) { const router = new ServerRouter(); - router.on(`/api/table/:table_name`, route_table.bind(this, agent)); + router.get(`/api/table/:table_name`, route_table.bind(this, sys)); return router; } diff --git a/src/lib/agent/subsystems/http/routes/table.mjs b/src/lib/agent/subsystems/http/routes/table.mjs index b86f07c..974e773 100644 --- a/src/lib/agent/subsystems/http/routes/table.mjs +++ b/src/lib/agent/subsystems/http/routes/table.mjs @@ -1,5 +1,14 @@ "use strict"; -export default function(agent, context, next) { +import ServerSentEventStream from '../ServerSentEventStream.mjs'; + +export default async function(sys, ctx, _next) { + let stream = new ServerSentEventStream(ctx.response); + let table_name = sys.fetch_table(ctx.params.table_name.replace(/[^0-9a-zA-Z_-]/g, "")); + for await (let tabledef of table_name) { + stream.write_json(`table`, tabledef); + } + + stream.end(); }