It works! Wooo!
...almost. We still need to ask ourselves for a table too haha
This commit is contained in:
parent
f320617117
commit
3819f8c61a
4 changed files with 26 additions and 15 deletions
|
@ -33,7 +33,6 @@ class SystemQuery extends EventEmitter {
|
||||||
this.config.net.http.port,
|
this.config.net.http.port,
|
||||||
this.config.net.http.bind_address
|
this.config.net.http.bind_address
|
||||||
);
|
);
|
||||||
await this.agent.init();
|
|
||||||
|
|
||||||
///
|
///
|
||||||
// 2: Attach listeners
|
// 2: Attach listeners
|
||||||
|
@ -41,6 +40,10 @@ class SystemQuery extends EventEmitter {
|
||||||
this.agent.on("message-query", this.handle_query.bind(this));
|
this.agent.on("message-query", this.handle_query.bind(this));
|
||||||
this.agent.on("message-query-response", this.handle_query_response.bind(this));
|
this.agent.on("message-query-response", this.handle_query_response.bind(this));
|
||||||
|
|
||||||
|
///
|
||||||
|
// 3: Start agent
|
||||||
|
///
|
||||||
|
await this.agent.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
async handle_query(peer, msg) {
|
async handle_query(peer, msg) {
|
||||||
|
@ -66,7 +69,7 @@ class SystemQuery extends EventEmitter {
|
||||||
async *fetch_table(name) {
|
async *fetch_table(name) {
|
||||||
// If it isn't valid for us, it ain't gonna be valid for anyone else....
|
// If it isn't valid for us, it ain't gonna be valid for anyone else....
|
||||||
if(!this.info.is_valid_table(name)) return null;
|
if(!this.info.is_valid_table(name)) return null;
|
||||||
|
l.log(`fetch table ${name} start`)
|
||||||
const queue = new ItemQueue();
|
const queue = new ItemQueue();
|
||||||
|
|
||||||
const handle_response = (peer, msg) => {
|
const handle_response = (peer, msg) => {
|
||||||
|
@ -82,10 +85,10 @@ class SystemQuery extends EventEmitter {
|
||||||
// If it's not the right table, ignore it
|
// If it's not the right table, ignore it
|
||||||
if(msg.name !== name) return;
|
if(msg.name !== name) return;
|
||||||
|
|
||||||
queue.push({ peer, table: msg.table });
|
queue.enqueue({ peer, table: msg.table });
|
||||||
};
|
};
|
||||||
|
|
||||||
this.on("message-query-response", handle_response);
|
this.agent.on("message-query-response", handle_response);
|
||||||
|
|
||||||
// Only *after* we have our listeners in place do we then broadcast the
|
// Only *after* we have our listeners in place do we then broadcast the
|
||||||
// query. Note that despite not having entered the below while loop yet,
|
// query. Note that despite not having entered the below while loop yet,
|
||||||
|
@ -94,17 +97,19 @@ class SystemQuery extends EventEmitter {
|
||||||
|
|
||||||
let peers_seen = [];
|
let peers_seen = [];
|
||||||
while(peers_seen.length < this.agent.connected_peers.length) {
|
while(peers_seen.length < this.agent.connected_peers.length) {
|
||||||
let next = await queue.pop(this.config.net.table_config);
|
l.info(`peers_seen:`, peers_seen, `connected peers:`, this.agent.connected_peers.length);
|
||||||
|
let next = await queue.dequeue(this.config.net.table_timeout * 1000);
|
||||||
if(typeof next === "undefined") // We timed out
|
if(typeof next === "undefined") // We timed out
|
||||||
break;
|
break;
|
||||||
console.log(`DEBUG`, next);
|
l.log(`fetch table DEBUG`, next);
|
||||||
if(!peers_seen.includes(next.peer.id))
|
if(!peers_seen.includes(next.peer.id))
|
||||||
peers_seen.push(next.peer.id);
|
peers_seen.push(next.peer.id);
|
||||||
yield next;
|
yield next;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.off("message-query-response", handle_response);
|
this.agent.off("message-query-response", handle_response);
|
||||||
|
|
||||||
|
l.log(`fetch table END`);
|
||||||
// FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table.
|
// FUTURE: Add a cache here? Note that we also do not listen for query responses unless we've asked for a table.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
"use strict";
|
"use strict";
|
||||||
|
|
||||||
|
import log from '../../../io/NamespacedLog.mjs'; const l = log("sse");
|
||||||
|
|
||||||
class ServerSentEventStream {
|
class ServerSentEventStream {
|
||||||
constructor(response) {
|
constructor(response) {
|
||||||
this.response = response;
|
this.response = response;
|
||||||
|
@ -13,6 +15,7 @@ class ServerSentEventStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
write(event_name, data) {
|
write(event_name, data) {
|
||||||
|
// l.log(`event #${this.next_id} ${event_name}: ${data}`);
|
||||||
this.response.write(`id: ${this.next_id++}\n`);
|
this.response.write(`id: ${this.next_id++}\n`);
|
||||||
this.response.write(`event: ${event_name}\n`);
|
this.response.write(`event: ${event_name}\n`);
|
||||||
this.response.write(`data: ${data}\n\n`);
|
this.response.write(`data: ${data}\n\n`);
|
||||||
|
|
|
@ -6,8 +6,11 @@ export default async function(sys, ctx, _next) {
|
||||||
let stream = new ServerSentEventStream(ctx.response);
|
let stream = new ServerSentEventStream(ctx.response);
|
||||||
|
|
||||||
let table_name = sys.fetch_table(ctx.params.table_name.replace(/[^0-9a-zA-Z_-]/g, ""));
|
let table_name = sys.fetch_table(ctx.params.table_name.replace(/[^0-9a-zA-Z_-]/g, ""));
|
||||||
for await (let tabledef of table_name) {
|
for await (let next of table_name) {
|
||||||
stream.write_json(`table`, tabledef);
|
stream.write_json(`table`, {
|
||||||
|
peer: next.peer.id,
|
||||||
|
table: next.table
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.end();
|
stream.end();
|
||||||
|
|
|
@ -17,7 +17,7 @@ class ItemQueue extends EventEmitter {
|
||||||
this.items = [];
|
this.items = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
__push_single(item) {
|
__enqueue_single(item) {
|
||||||
this.items.push(item);
|
this.items.push(item);
|
||||||
this.emit("push", item);
|
this.emit("push", item);
|
||||||
}
|
}
|
||||||
|
@ -27,9 +27,9 @@ class ItemQueue extends EventEmitter {
|
||||||
* @param {any} items The items to add to the queue.
|
* @param {any} items The items to add to the queue.
|
||||||
* @return {void}
|
* @return {void}
|
||||||
*/
|
*/
|
||||||
push(...items) {
|
enqueue(...items) {
|
||||||
for(let item of items)
|
for(let item of items)
|
||||||
this.__push_single(item);
|
this.__enqueue_single(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,11 +39,11 @@ class ItemQueue extends EventEmitter {
|
||||||
* @param {Number} [time_limit_ms=0] The maximum number of milliseconds to wait before timing out.
|
* @param {Number} [time_limit_ms=0] The maximum number of milliseconds to wait before timing out.
|
||||||
* @return {Promise} A Promise that resolves to either the next item in the queue, or undefined if it timed out.
|
* @return {Promise} A Promise that resolves to either the next item in the queue, or undefined if it timed out.
|
||||||
*/
|
*/
|
||||||
async pop(time_limit_ms = 0) {
|
async dequeue(time_limit_ms = 0) {
|
||||||
if(this.items.length === 0)
|
if(this.items.length === 0)
|
||||||
return await this.wait_for_item(time_limit_ms);
|
return await this.wait_for_item(time_limit_ms);
|
||||||
|
|
||||||
let item = this.items.pop();
|
let item = this.items.shift();
|
||||||
this.emit("pop", item);
|
this.emit("pop", item);
|
||||||
if(this.items.length === 0)
|
if(this.items.length === 0)
|
||||||
this.emit("empty");
|
this.emit("empty");
|
||||||
|
@ -72,7 +72,7 @@ class ItemQueue extends EventEmitter {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
|
|
||||||
this.items.splice(this.items.indexOf(item));
|
this.items.splice(this.items.indexOf(item));
|
||||||
return item;
|
return item[0];
|
||||||
}
|
}
|
||||||
catch(error) {
|
catch(error) {
|
||||||
if(timeout !== null)
|
if(timeout !== null)
|
||||||
|
|
Loading…
Reference in a new issue