2021-10-02 00:16:34 +00:00
"use strict" ;
2021-10-09 17:00:54 +00:00
import { EventEmitter , once } from 'events' ;
import net from 'net' ;
2022-01-08 16:59:08 +00:00
import p _retry from 'p-retry' ;
2022-01-09 16:57:07 +00:00
import p _reflect from 'p-reflect' ;
2022-01-09 16:30:42 +00:00
import log from '../io/NamespacedLog.mjs' ; const l = log ( "peerserver" ) ;
2022-01-08 16:59:08 +00:00
2022-01-08 23:47:52 +00:00
import settings from '../../settings.mjs' ;
2021-10-09 17:00:54 +00:00
import Connection from '../transport/Connection.mjs' ;
2021-10-19 01:36:22 +00:00
import ErrorWrapper from '../core/ErrorWrapper.mjs' ;
2021-10-09 17:00:54 +00:00
import Peer from './Peer.mjs' ;
2021-10-19 01:43:55 +00:00
/ * *
* A server that handles connections to many peers .
2021-12-27 18:34:44 +00:00
* Note that when a new peer connects it is NOT asked for a list of peers it is
* aware of . This is something you need to handle yourself !
2021-10-19 01:43:55 +00:00
* @ extends EventEmitter
* /
2021-10-09 17:00:54 +00:00
class PeerServer extends EventEmitter {
2021-10-19 01:36:22 +00:00
constructor ( our _id , secret _join ) {
2021-10-09 17:00:54 +00:00
super ( ) ;
this . our _id = our _id ;
2021-10-19 01:36:22 +00:00
this . secret _join = secret _join ;
2021-10-02 00:16:34 +00:00
2022-01-08 16:59:08 +00:00
// The number of retries when attempting to connect to a peer
this . retries = 5 ;
2021-10-19 01:36:22 +00:00
this . connected _peers = [ ] ;
this . connecting _peers = [ ] ;
2021-10-09 17:00:54 +00:00
}
/ * *
* Starts the PeerServer listening on the given port and bind address .
* @ param { Number } [ port = 5252 ] The port number to listen on .
* @ param { String } [ host = "::" ] The address to bind to .
* @ return { Promise < void > } A Promise that resolves when the server setup is complete .
* /
listen ( port = 5252 , host = "::" ) {
return new Promise ( ( resolve , reject ) => {
2022-01-08 23:47:52 +00:00
this . host = host ;
this . port = port ;
2021-10-09 17:00:54 +00:00
this . server = net . createServer ( async ( client ) => {
await this . handle _client ( client ) ;
} ) ;
this . server . once ( "error" , reject ) ;
this . server . on ( "error" , this . handle _error ) ;
this . server . listen ( {
host ,
port ,
exclusive : false
} , ( ) => {
this . server . off ( "error" , reject ) ;
resolve ( ) ;
} ) ;
} ) ;
}
handle _error ( error ) {
throw error ;
}
async handle _client ( client ) {
2021-10-19 01:36:22 +00:00
const peer = await Peer . Accept ( this , await Connection . Wrap ( this . secret _join , client ) ) ;
2022-01-08 16:59:08 +00:00
this . peer _initialise ( peer ) ;
await once ( peer , "connect" ) ;
}
/ * *
* Initialises a CONNECTED peer and registers it as a valid peer .
* This also includesd attaching the necessary event handlers .
* @ param { Peer } peer The peer in question .
* @ return { void }
* /
peer _initialise ( peer ) {
2021-10-19 01:36:22 +00:00
this . connected _peers . push ( peer ) ;
2021-10-09 17:00:54 +00:00
peer . on ( "message" , this . handle _message . bind ( this , peer ) ) ;
peer . on ( "destroy" , this . handle _destroy . bind ( this , peer ) ) ;
}
async handle _message ( peer , message ) {
this . emit ( "message" , peer , message ) ;
this . emit ( ` message- ${ message . event } ` , peer , message . message ) ;
}
async handle _destroy ( peer ) {
2021-10-19 01:36:22 +00:00
const index = this . connected _peers . indexOf ( peer ) ;
2021-10-09 17:00:54 +00:00
if ( index > - 1 )
2021-10-19 01:36:22 +00:00
this . connected _peers . splice ( index , 1 ) ;
2021-10-09 17:00:54 +00:00
2022-01-08 16:59:08 +00:00
l . log ( ` Peer ${ peer . address } : ${ peer . port } disconnected ` ) ;
2021-10-09 17:00:54 +00:00
this . emit ( "disconnect" , peer . remote _endpoint ) ;
}
/ * *
* Returns a list of all currently known peer addresses .
* @ return { { address : string , port : number } [ ] }
* /
2021-10-19 01:36:22 +00:00
peers ( ) {
return this . connected _peers . map ( ( peer ) => peer . remote _endpoint )
2022-01-09 17:02:26 +00:00
. filter ( el => typeof el . address === "string" && typeof el . port === "number" ) ;
2021-10-09 17:00:54 +00:00
}
2022-01-08 16:59:08 +00:00
/ * *
* Resolves a Peer id to the respective peer instance .
* @ param { string | Peer } peer _id The peer ID to resolve as a string . If a Peer instance is passed instead , this is simply returned unchanged .
* @ return { Peer } The Peer instance associated with the given peer id .
* /
peer _resolve ( peer _id ) {
if ( peer _id instanceof Peer ) return peer _id ;
for ( let peer of this . connected _peers ) {
if ( peer . id === peer _id ) return peer ;
}
}
/ * *
* Resolves a list of peer ids ( and potentially Peer instances ) to a list
* of Peer instances .
* Any Peer instances passed are returned unchanged .
* @ param { ... string | Peer } peers The peer ( s ) to resolve .
* @ return { Peer [ ] } A list of Peer instances .
* /
peers _resolve ( ... peers ) {
return peers . map ( this . peer _resolve ) ;
}
2021-10-19 01:36:22 +00:00
/ * *
* Processes a list of peers .
* New connections are established to any peers in the list to which
* we don ' t already have a connection .
* Note that this function does NOT connect to any other peers known to the
* peers in the list you ' ve specified ! You need to do this manually .
* @ param { ... { address : string , port : number } } new _peers The list of new peers to process .
* @ returns { Promise < Peer [ ] > } A list of new peers to which we have successfully established a connection .
* /
async add _peers ( ... new _peers ) {
2022-01-08 16:59:08 +00:00
return ( await Promise . all ( new _peers . map (
2022-01-09 16:57:07 +00:00
async new _peer => {
let result = await p _reflect ( p _retry ( async ( ) => await this . _ _add _peer (
new _peer . address ,
new _peer . port
) , {
retries : this . retries ,
onFailedAttempt : ( error ) => {
switch ( error instanceof ErrorWrapper ? error . inner _exception . code : error . code ) {
case "ECONNREFUSED" :
l . error ( ` Failed to connect to peer at ${ new _peer . address } : ${ new _peer . port } . ` ) ;
break ;
default :
let msg = ` [attempt ${ error . attemptNumber } / ${ error . retriesLeft + error . attemptNumber } ] Error while connecting to ${ new _peer . address } : ${ new _peer . port } : ${ error } ` ;
if ( settings . cli . verbose ) msg += ` \n ${ error . stack } ` ;
l . error ( msg ) ;
break ;
}
2022-01-08 23:47:52 +00:00
}
2022-01-09 16:57:07 +00:00
} ) ) ;
if ( result . isRejected ) {
l . error ( ` Failed to connect to ${ new _peer . address } : ${ new _peer . port } : ${ result . reason } ` ) ;
return null ;
2022-01-08 23:47:52 +00:00
}
2022-01-09 16:57:07 +00:00
return result . value ;
2022-01-08 17:29:09 +00:00
} )
2022-01-09 16:57:07 +00:00
) ) . filter ( peer => peer instanceof Peer ) ;
2021-10-19 01:36:22 +00:00
}
/ * *
* Connects to a new peer and adds them to the pool of currently connected peers .
* Note : This does NOT automatically connect to all the peers known to the
* peer you ' re connecting to !
* You need to do this manually .
2022-01-08 23:47:52 +00:00
* Note that you should NOT use this function directly . Instead , use
* add _peers ( ) , which supports multiple peers and also automatically
* retries on failure too .
2021-10-19 01:36:22 +00:00
* @ throws { ErrorWrapper } Throws if the connection failed . This could be for a large number of different reasons , from an incorrect join secret from the remote to connection issues .
* @ param { string } address The address to connect to .
* @ param { number } port The port number to connect to .
* @ return { Promise < Peer | null > } A Promise that resolves to the resulting Peer connection , or null if the connection wasn ' t attemped .
* /
2022-01-08 23:47:52 +00:00
async _ _add _peer ( address , port ) {
2022-01-09 16:57:07 +00:00
l . info ( ` Attempting to connect to ${ address } : ${ port } ` ) ;
2022-01-09 00:57:06 +00:00
2022-01-08 16:59:08 +00:00
// If we're already connected, don't bother reconnecting again
2022-01-09 00:57:06 +00:00
if ( this . peers ( ) . some ( el => el . address === address && el . port === port ) ) {
2022-01-09 16:30:42 +00:00
l . log ( ` A connection is already open to ${ address } : ${ port } , skipping ` ) ;
2021-10-19 01:36:22 +00:00
return ;
2022-01-09 00:57:06 +00:00
}
2022-01-09 16:57:07 +00:00
2022-01-08 23:47:52 +00:00
// If we are attempting to connect to ourselves, then don't bother
2022-01-09 16:57:07 +00:00
if ( ( address == "127.0.0.1" || address == "::" || address == "::1" ) && port == this . port ) {
2022-01-09 16:30:42 +00:00
l . log ( ` ${ address } : ${ port } is actually us, skipping ` ) ;
2022-01-08 23:47:52 +00:00
return ;
2022-01-09 00:57:06 +00:00
}
2021-10-19 01:36:22 +00:00
const peer _string = ` peer: ${ address } : ${ port } ` ;
this . connecting _peers . push ( peer _string ) ;
2022-01-08 23:47:52 +00:00
let conn = null ;
2021-10-19 01:36:22 +00:00
try {
conn = await Peer . Initiate ( this , address , port ) ;
2022-01-09 16:57:07 +00:00
this . peer _initialise ( conn ) ;
2021-10-19 01:36:22 +00:00
}
catch ( error ) {
throw new ErrorWrapper ( ` Error: Failed to connect to peer. ` , error ) ;
}
finally {
this . connecting _peers . splice ( this . connecting _peers . indexOf ( peer _string ) ) ;
}
2022-01-08 23:47:52 +00:00
if ( conn === null ) return null ;
2022-01-09 16:30:42 +00:00
l . log ( ` Peer ${ conn . id _short } from ${ address } : ${ port } connected ` )
2021-10-19 01:36:22 +00:00
this . emit ( ` peer ` , conn ) ;
return conn ;
}
2021-10-09 17:00:54 +00:00
2021-10-19 01:43:55 +00:00
/ * *
* Sends a message to 1 or more peers .
* @ param { string | Peer | string [ ] | Peer [ ] } peer _id Either the peer id or the peer itself to which we should send the message . May also be an array of arbitrarily mixed items - in which case the message will be sent to all the specified peers in parallel . The order which peers are messaged is undefined .
* @ param { string } event _name The name of the event to send .
* @ param { Object } msg The message itself to send .
* @ return { Promise } A Promise that resolves ( or potentially rejects ) when the message has been sent .
* /
async send ( peer _id , event _name , msg ) {
2022-01-08 16:59:08 +00:00
if ( ! ( peer _id instanceof Array ) ) peer _id = [ peer _id ] ;
await Promise . all ( this . peers _resolve ( ... peer _id ) . map (
peer => peer . send ( event _name , msg )
) ) ;
2021-10-19 01:43:55 +00:00
}
/ * *
* Sends a message in parallel to all peers to which we have an established
* connection .
* The order which peers are messaged is undefined .
* @ param { string } event _name The name of the event to send .
* @ param { Object } msg The message itself to send .
* @ return { Promise } A Promise that resolves ( or potentially rejects ) when the message has been sent .
* /
async broadcast ( event _name , msg ) {
await this . send ( this . connected _peers , event _name , msg ) ;
}
2021-10-09 17:00:54 +00:00
/ * *
* Shuts the server down .
* This does not disconnect any existing peers !
* @ return { Promise } A Promise that resolves once the server has been shutdown .
* /
shutdown _server ( ) {
return new Promise ( ( resolve , _reject ) => {
this . server . close ( resolve ) ;
} ) ;
}
/ * *
* Stops the PeerServer and gracefully ( if possible ) disconnects all existing peers .
* @ return { Promise } A Promise that resolves once the shutdown process has completed .
* /
async destroy ( ) {
await this . shutdown _server ( ) ;
2021-10-19 01:36:22 +00:00
await Promise . all ( ... this . connected _peers . map ( peer => peer . destroy ( ) ) ) ;
2021-10-02 00:16:34 +00:00
}
}
export default PeerServer ;