mirror of
				https://github.com/Xahau/Validation-Ledger-Tx-Store-to-xPOP.git
				synced 2025-11-04 04:15:48 +00:00 
			
		
		
		
	Add timeout-reconnecting & prevent multi-quit handler
This commit is contained in:
		@@ -12,6 +12,10 @@ import 'wtfnode'
 | 
			
		||||
import { lastLedger } from '../lib/onLedger.mjs'
 | 
			
		||||
import { txCount } from '../lib/onTransaction.mjs'
 | 
			
		||||
 | 
			
		||||
let _health = {
 | 
			
		||||
  reconnectCount: -1,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const telemetry = {
 | 
			
		||||
  host: null,
 | 
			
		||||
  proto: null,
 | 
			
		||||
@@ -132,7 +136,8 @@ if (!wss) {
 | 
			
		||||
                uptime: new Date() - startDate,
 | 
			
		||||
                lastLedger: lastLedger ?? null,
 | 
			
		||||
                lastWsPushedLedger: lastWsPushedLedger ?? null,
 | 
			
		||||
                txCount: txCount ?? null,  
 | 
			
		||||
                txCount: txCount ?? null,
 | 
			
		||||
                ..._health,
 | 
			
		||||
              },
 | 
			
		||||
            })
 | 
			
		||||
          } else {
 | 
			
		||||
@@ -150,6 +155,7 @@ if (!wss) {
 | 
			
		||||
          lastLedger: lastLedger ?? null,
 | 
			
		||||
          lastWsPushedLedger: lastWsPushedLedger ?? null,
 | 
			
		||||
          txCount: txCount ?? null,
 | 
			
		||||
          ..._health,
 | 
			
		||||
        })
 | 
			
		||||
      })
 | 
			
		||||
 | 
			
		||||
@@ -232,4 +238,5 @@ const emit = _data => {
 | 
			
		||||
 | 
			
		||||
export {
 | 
			
		||||
  emit,
 | 
			
		||||
  _health,
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										156
									
								
								index.mjs
									
									
									
									
									
								
							
							
						
						
									
										156
									
								
								index.mjs
									
									
									
									
									
								
							@@ -5,84 +5,128 @@ import { onLedger } from './lib/onLedger.mjs'
 | 
			
		||||
import 'dotenv/config'
 | 
			
		||||
import assert from 'assert'
 | 
			
		||||
import wtf from 'wtfnode'
 | 
			
		||||
import './bin/webserver.mjs'
 | 
			
		||||
import { _health } from './bin/webserver.mjs'
 | 
			
		||||
 | 
			
		||||
const noLedgerTimeoutSec = Number(process.env.LEDGERTIMEOUTSEC || 15) || 15
 | 
			
		||||
 | 
			
		||||
let sigintEventHandler = false
 | 
			
		||||
let quitting = false
 | 
			
		||||
 | 
			
		||||
let aliveInterval
 | 
			
		||||
 | 
			
		||||
assert(process.env?.NODES, 'ENV var missing: NODES, containing: a comma separated list of websocket endpoints')
 | 
			
		||||
 | 
			
		||||
await createDirectory('store')
 | 
			
		||||
await createDirectory('store/xpop')
 | 
			
		||||
 | 
			
		||||
const connections = process.env.NODES.split(',').map(h => h.trim())
 | 
			
		||||
  .map(h => new XrplClient(h, {
 | 
			
		||||
    assumeOfflineAfterSeconds: 10,
 | 
			
		||||
    connectAttemptTimeoutSeconds: 10,
 | 
			
		||||
    maxConnectionAttempts: null,
 | 
			
		||||
  }))
 | 
			
		||||
const connections = []
 | 
			
		||||
 | 
			
		||||
connections
 | 
			
		||||
  .map(async c => {
 | 
			
		||||
    
 | 
			
		||||
    const subscribe = async () => {
 | 
			
		||||
      await c.ready()
 | 
			
		||||
      
 | 
			
		||||
      /**
 | 
			
		||||
       * TODO: Auto disconnect if no messages for X
 | 
			
		||||
       * TODO: Generate xPOPs for matching transactions
 | 
			
		||||
       */
 | 
			
		||||
const connect = () => {
 | 
			
		||||
  console.log('<<<<< CONNECTING >>>>>')
 | 
			
		||||
  _health.reconnectCount++;
 | 
			
		||||
 | 
			
		||||
      try {
 | 
			
		||||
        c.send({ command: "subscribe", streams: [ "validations" ] })
 | 
			
		||||
        c.send({ command: "subscribe", streams: [ "ledger" ] })
 | 
			
		||||
        // No transactions, to make it easier for clients transactions are
 | 
			
		||||
        // processed in order (sorted on sequence) and emitted in order
 | 
			
		||||
        // to clients to prevent async tx sequence problems.
 | 
			
		||||
        } catch (e) {
 | 
			
		||||
        console.log(e.message)
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  connections.map(c => {
 | 
			
		||||
    console.log('# # # CLOSING', c.getState()?.server?.uri)
 | 
			
		||||
 | 
			
		||||
    c.on("validation", validation => onValidation({
 | 
			
		||||
      connectionUrl: c.getState()?.server?.uri,
 | 
			
		||||
      networkId: c.getState()?.server?.networkId,
 | 
			
		||||
      validation,
 | 
			
		||||
    }))
 | 
			
		||||
 | 
			
		||||
    c.on("ledger", ledger => onLedger({
 | 
			
		||||
      connectionUrl: c.getState()?.server?.uri,
 | 
			
		||||
      networkId: c.getState()?.server?.networkId,
 | 
			
		||||
      ledger,
 | 
			
		||||
      connection: c,
 | 
			
		||||
    }))
 | 
			
		||||
 | 
			
		||||
    c.on('online', () => subscribe())
 | 
			
		||||
    c.on('state', () => subscribe())
 | 
			
		||||
    // c.on('retry', () => subscribe())
 | 
			
		||||
    // c.on('round', () => subscribe())
 | 
			
		||||
 | 
			
		||||
    c.on('error', e => console.error(e?.message || e))
 | 
			
		||||
    c.removeAllListeners('validation')
 | 
			
		||||
    c.removeAllListeners('ledger')
 | 
			
		||||
    c.removeAllListeners('online')
 | 
			
		||||
    c.removeAllListeners('state')
 | 
			
		||||
    c.removeAllListeners('error')
 | 
			
		||||
    c.close()
 | 
			
		||||
  })
 | 
			
		||||
 | 
			
		||||
  connections.length = 0
 | 
			
		||||
  process.env.NODES.split(',').map(h => h.trim())
 | 
			
		||||
    .map(h => new XrplClient(h, {
 | 
			
		||||
      assumeOfflineAfterSeconds: 10,
 | 
			
		||||
      connectAttemptTimeoutSeconds: 10,
 | 
			
		||||
      maxConnectionAttempts: null,
 | 
			
		||||
    }))
 | 
			
		||||
    .forEach(c => {
 | 
			
		||||
      console.log('* * * CONNECTING', c.getState()?.server?.uri)
 | 
			
		||||
      connections.push(c)
 | 
			
		||||
    })
 | 
			
		||||
 | 
			
		||||
  connections
 | 
			
		||||
    .map(async c => {
 | 
			
		||||
      const subscribe = async () => {
 | 
			
		||||
        // await c.ready()
 | 
			
		||||
        
 | 
			
		||||
        /**
 | 
			
		||||
         * TODO: Auto disconnect if no messages for X
 | 
			
		||||
         * TODO: Generate xPOPs for matching transactions
 | 
			
		||||
         */
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
          c.send({ command: "subscribe", streams: [ "validations" ] })
 | 
			
		||||
          c.send({ command: "subscribe", streams: [ "ledger" ] })
 | 
			
		||||
          // No transactions, to make it easier for clients transactions are
 | 
			
		||||
          // processed in order (sorted on sequence) and emitted in order
 | 
			
		||||
          // to clients to prevent async tx sequence problems.
 | 
			
		||||
          } catch (e) {
 | 
			
		||||
          console.log(e.message)
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      c.on("validation", validation => onValidation({
 | 
			
		||||
        connectionUrl: c.getState()?.server?.uri,
 | 
			
		||||
        networkId: c.getState()?.server?.networkId,
 | 
			
		||||
        validation,
 | 
			
		||||
      }))
 | 
			
		||||
 | 
			
		||||
      c.on("ledger", ledger => {
 | 
			
		||||
        clearTimeout(aliveInterval)
 | 
			
		||||
        aliveInterval = setTimeout(() => {
 | 
			
		||||
          console.log('Reconnecting, no recently closed ledger after sec.', noLedgerTimeoutSec)
 | 
			
		||||
          connect()
 | 
			
		||||
        }, noLedgerTimeoutSec * 1000)
 | 
			
		||||
 | 
			
		||||
        return onLedger({
 | 
			
		||||
          connectionUrl: c.getState()?.server?.uri,
 | 
			
		||||
          networkId: c.getState()?.server?.networkId,
 | 
			
		||||
          ledger,
 | 
			
		||||
          connection: c,
 | 
			
		||||
        })
 | 
			
		||||
      })
 | 
			
		||||
 | 
			
		||||
      c.on('online', subscribe)
 | 
			
		||||
      c.on('state', subscribe)
 | 
			
		||||
 | 
			
		||||
      // c.on('retry', () => subscribe())
 | 
			
		||||
      // c.on('round', () => subscribe())
 | 
			
		||||
 | 
			
		||||
      c.on('error', e => console.error(e?.message || e))
 | 
			
		||||
    })
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
// Play nice with Docker etc.
 | 
			
		||||
if (!sigintEventHandler) {
 | 
			
		||||
  sigintEventHandler = true
 | 
			
		||||
 | 
			
		||||
  const quit = () => {
 | 
			
		||||
    connections
 | 
			
		||||
      .map(async c => {
 | 
			
		||||
        console.info('Interrupted', c.getState()?.server?.uri)
 | 
			
		||||
        c.close()
 | 
			
		||||
      })
 | 
			
		||||
    if (!quitting) {
 | 
			
		||||
      quitting = true
 | 
			
		||||
 | 
			
		||||
    if (process.env?.DEBUG) {
 | 
			
		||||
      // Display open handles
 | 
			
		||||
      console.log('-------------------')
 | 
			
		||||
      wtf.dump()
 | 
			
		||||
      console.log('-------------------' + `\n`)
 | 
			
		||||
      console.log('Closing (interrupting) connections', connections.length)
 | 
			
		||||
      connections
 | 
			
		||||
        .map(async c => {
 | 
			
		||||
          console.info('Interrupted', c.getState()?.server?.uri)
 | 
			
		||||
          c.close()
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
      if (process.env?.DEBUG) {
 | 
			
		||||
        // Display open handles
 | 
			
		||||
        console.log('-------------------')
 | 
			
		||||
        wtf.dump()
 | 
			
		||||
        console.log('-------------------' + `\n`)
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  process.on('SIGINT', quit) // Node
 | 
			
		||||
  process.on('SIGTERM', quit) // Docker    
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Here we go
 | 
			
		||||
connect()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user