diff --git a/src/lib/SystemQuery.mjs b/src/lib/SystemQuery.mjs index 9cfb6bf..517ca16 100644 --- a/src/lib/SystemQuery.mjs +++ b/src/lib/SystemQuery.mjs @@ -33,7 +33,6 @@ class SystemQuery extends EventEmitter { this.config.net.http.port, this.config.net.http.bind_address ); - await this.agent.init(); /// // 2: Attach listeners @@ -41,6 +40,10 @@ class SystemQuery extends EventEmitter { this.agent.on("message-query", this.handle_query.bind(this)); this.agent.on("message-query-response", this.handle_query_response.bind(this)); + /// + // 3: Start agent + /// + await this.agent.init(); } async handle_query(peer, msg) { @@ -66,7 +69,7 @@ class SystemQuery extends EventEmitter { async *fetch_table(name) { // If it isn't valid for us, it ain't gonna be valid for anyone else.... if(!this.info.is_valid_table(name)) return null; - + l.log(`fetch table ${name} start`) const queue = new ItemQueue(); const handle_response = (peer, msg) => { @@ -82,10 +85,10 @@ class SystemQuery extends EventEmitter { // If it's not the right table, ignore it if(msg.name !== name) return; - queue.push({ peer, table: msg.table }); + queue.enqueue({ peer, table: msg.table }); }; - this.on("message-query-response", handle_response); + this.agent.on("message-query-response", handle_response); // Only *after* we have our listeners in place do we then broadcast the // query. Note that despite not having entered the below while loop yet, @@ -94,17 +97,19 @@ class SystemQuery extends EventEmitter { let peers_seen = []; while(peers_seen.length < this.agent.connected_peers.length) { - let next = await queue.pop(this.config.net.table_config); + l.info(`peers_seen:`, peers_seen, `connected peers:`, this.agent.connected_peers.length); + let next = await queue.dequeue(this.config.net.table_timeout * 1000); if(typeof next === "undefined") // We timed out break; - console.log(`DEBUG`, next); + l.log(`fetch table DEBUG`, next); if(!peers_seen.includes(next.peer.id)) peers_seen.push(next.peer.id); yield next; } - this.off("message-query-response", handle_response); + this.agent.off("message-query-response", handle_response); + l.log(`fetch table END`); // FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table. } } diff --git a/src/lib/agent/subsystems/http/ServerSentEventStream.mjs b/src/lib/agent/subsystems/http/ServerSentEventStream.mjs index dd14ddd..00c9aac 100644 --- a/src/lib/agent/subsystems/http/ServerSentEventStream.mjs +++ b/src/lib/agent/subsystems/http/ServerSentEventStream.mjs @@ -1,5 +1,7 @@ "use strict"; +import log from '../../../io/NamespacedLog.mjs'; const l = log("sse"); + class ServerSentEventStream { constructor(response) { this.response = response; @@ -13,6 +15,7 @@ class ServerSentEventStream { } write(event_name, data) { + // l.log(`event #${this.next_id} ${event_name}: ${data}`); this.response.write(`id: ${this.next_id++}\n`); this.response.write(`event: ${event_name}\n`); this.response.write(`data: ${data}\n\n`); diff --git a/src/lib/agent/subsystems/http/routes/table.mjs b/src/lib/agent/subsystems/http/routes/table.mjs index 974e773..b26d75e 100644 --- a/src/lib/agent/subsystems/http/routes/table.mjs +++ b/src/lib/agent/subsystems/http/routes/table.mjs @@ -6,8 +6,11 @@ export default async function(sys, ctx, _next) { let stream = new ServerSentEventStream(ctx.response); let table_name = sys.fetch_table(ctx.params.table_name.replace(/[^0-9a-zA-Z_-]/g, "")); - for await (let tabledef of table_name) { - stream.write_json(`table`, tabledef); + for await (let next of table_name) { + stream.write_json(`table`, { + peer: next.peer.id, + table: next.table + }); } stream.end(); diff --git a/src/lib/async/ItemQueue.mjs b/src/lib/async/ItemQueue.mjs index 77ef600..81f9afc 100644 --- a/src/lib/async/ItemQueue.mjs +++ b/src/lib/async/ItemQueue.mjs @@ -17,7 +17,7 @@ class ItemQueue extends EventEmitter { this.items = []; } - __push_single(item) { + __enqueue_single(item) { this.items.push(item); this.emit("push", item); } @@ -27,9 +27,9 @@ class ItemQueue extends EventEmitter { * @param {any} items The items to add to the queue. * @return {void} */ - push(...items) { + enqueue(...items) { for(let item of items) - this.__push_single(item); + this.__enqueue_single(item); } /** @@ -39,11 +39,11 @@ class ItemQueue extends EventEmitter { * @param {Number} [time_limit_ms=0] The maximum number of milliseconds to wait before timing out. * @return {Promise} A Promise that resolves to either the next item in the queue, or undefined if it timed out. */ - async pop(time_limit_ms = 0) { + async dequeue(time_limit_ms = 0) { if(this.items.length === 0) return await this.wait_for_item(time_limit_ms); - let item = this.items.pop(); + let item = this.items.shift(); this.emit("pop", item); if(this.items.length === 0) this.emit("empty"); @@ -72,7 +72,7 @@ class ItemQueue extends EventEmitter { clearTimeout(timeout); this.items.splice(this.items.indexOf(item)); - return item; + return item[0]; } catch(error) { if(timeout !== null)