Beta, add webserver (socket, event, dirlisting), tx ordering, etc.

This commit is contained in:
Wietse Wind
2023-10-06 00:58:05 +02:00
parent 986705326a
commit f58015933b
11 changed files with 1144 additions and 63 deletions

View File

@@ -12,7 +12,10 @@ Based on the work by @RichardAH: https://github.com/RichardAH/xpop-generator
This tool creates a folder structore in the `./store` directory, where it creates sub-directories like this:
> `store / {networkid} / {ledgerno(0, -6)} / {ledgerno(-6, -3)} / {ledgerno(-3)} /`
> `store / {networkid} / {ledgerpath///} /`
The `ledgerpath` is the ledger index chunked from right to left in sections of three digits, making sure there
are max. 1000 subfolders per level. This allows for easy dir listing & cleaning.
So e.g. for NetworkId `21338`, ledger index `82906790`, the path would be:
@@ -37,3 +40,30 @@ Every folder will contain the following files:
`npm run dev` to launch (verbose)
`npm run xpopgen` to launch, less verbose
## Webserver
This script also runs a webserver is the env. var is provided for the TCP port & URL Prefix where the app will run:
```bash
EVENT_SOCKET_PORT="3000"
URL_PREFIX="https://4849bf891e06.ngrok.app"
```
#### WebSocket
You can listen for xPOP publish events (live, so you don't hve to poll).
By default you will get all xPOP events. If you want to filter on a specific address, provide
the r-address in the URL path. If you also want to receive the xPOP Blob, also provide `/blob` in the URL path.
E.g. `/blob/rwietsevLFg8XSmG3bEZzFein1g8RBqWDZ` would listen for xPOPs for account `rwietsevLFg8XSmG3bEZzFein1g8RBqWDZ`
and serve the (hex encoded) xPOP in the `xpop.blob` property.
#### HTTP File Browser
On the HTTP port a file listing is also provided & xPOPs can be downloaded at `/xpop/{tx hash}`.
Original source files to reconstruct the xPOP locally can be downloaded at `/{networkid}/`.
This file browser is for development and test purposes only, for production, put a static webserver
in front of this application & reverse proxy only the WebSocket (HTTP Upgrade) server.

92
bin/webserver.mjs Normal file
View File

@@ -0,0 +1,92 @@
import { WebSocket } from 'ws'
import morgan from 'morgan'
import express from 'express'
import expressWs from 'express-ws'
import serveIndex from 'serve-index'
import 'dotenv/config'
let wss // WebSocket Server
if (!wss) {
if (process.env?.EVENT_SOCKET_PORT && process.env?.URL_PREFIX) {
const port = Number(process.env.EVENT_SOCKET_PORT)
try {
const app = express()
app.enable('trust proxy')
app.use(morgan('combined', { }))
wss = expressWs(app)
// app.use(function middlware (req, res, next) {
// return next()
// })
app.use('/', express.static('./store/'))
app.use('/:networkId([0-9]{1,})', (req, res, next) => {
return serveIndex('./store/' + req.params.networkId + '/', { icons: false })(req, res, next)
})
// app.get('/', function route (req, res, next){
// console.log('get route', req.testing)
// res.end()
// })
app.ws('*', function wsclient (ws, req) {
Object.assign(ws, { req, })
ws.on('message', function wsmsg (msg) {
// Ignore
// console.log(msg)
})
})
app.listen(port)
console.log('Started Event Socket Service at TCP port', port)
} catch (e) {
console.log('Cannot start Webserver & Event Socket Service at port', port, e)
}
} else {
console.log('Not starting Webserver & Event Socket Service, EVENT_SOCKET_PORT and/or URL_PREFIX unset')
}
}
const emit = _data => {
if (wss) {
wss.getWss().clients.forEach(function each (client) {
const data = Object.assign({}, { ..._data })
// Needed to prevent shared object manipulation with multiple clients
data.xpop = Object.assign({}, _data.xpop)
if (client.readyState === WebSocket.OPEN) {
// console.log(client)
// console.log(client?._xpopAccount)
// console.log(client?._xpopBlob)
let account = ''
const accountAddress = client.req.url.match(/r[a-zA-Z0-9]{18,}/)
const blob = !!client.req.url.match(/\/blob/i)
if (accountAddress) {
account = accountAddress[0]
}
if (!blob && data?.xpop?.blob) {
data.xpop.blob = undefined
}
if (account === '' || data.account === account) {
client.send(JSON.stringify(data), { binary: false })
}
}
})
return wss.getWss().clients.length
}
return false
}
export {
emit,
}

View File

@@ -2,13 +2,14 @@ import { XrplClient } from 'xrpl-client'
import { createDirectory } from './lib/createDirectory.mjs'
import { onValidation } from './lib/onValidation.mjs'
import { onLedger } from './lib/onLedger.mjs'
import { onTransaction } from './lib/onTransaction.mjs'
import 'dotenv/config'
import assert from 'assert'
import './bin/webserver.mjs'
assert(process.env?.NODES, 'ENV var missing: NODES, containing: a comma separated list of websocket endpoints')
await createDirectory('store')
await createDirectory('store/xpop')
process.env.NODES.split(',').map(h => h.trim())
.map(h => new XrplClient(h)).map(async c => {
@@ -22,8 +23,9 @@ process.env.NODES.split(',').map(h => h.trim())
c.send({ command: "subscribe", streams: [
"validations",
"ledger",
"transactions",
"transactions_proposed"
// No transactions, to make it easier for clients transactions are
// processed in order (sorted on sequence) and emitted in order
// to clients to prevent async tx sequence problems.
] })
c.on("validation", validation => onValidation({
@@ -38,11 +40,4 @@ process.env.NODES.split(',').map(h => h.trim())
ledger,
connection: c,
}))
c.on("transaction", transaction => onTransaction({
connectionUrl: c.getState()?.server?.uri,
networkId: c.getState()?.server?.networkId,
transaction,
connection: c,
}))
})

View File

@@ -0,0 +1,91 @@
const ledgers = {}
const externalResolvablePromise = () => {
let _resolve
const meta = {
resolved: false,
}
const promise = new Promise(resolve => {
_resolve = (r) => {
meta.resolved = true
return resolve(r)
}
})
return { promise, resolve: _resolve, meta, }
}
/**
*
* @param {number} ledger - Ledger Index
* @param {(ledger_binary_transactions|ledger_info|vl|validation)} readyElement -
*/
const ledgerReady = async (ledger, readyElement) => {
// console.log('LedgerReady', ledger, readyElement)
const ledgerIndexString = String(ledger)
if (!ledgers?.[ledgerIndexString]) {
const ledger_binary_transactions = externalResolvablePromise()
const ledger_info = externalResolvablePromise()
const vl = externalResolvablePromise()
const ready = Promise.all([
ledger_binary_transactions.promise,
ledger_info.promise,
vl.promise,
])
Object.assign(ledgers, {
[ledgerIndexString]: {
ledger_binary_transactions,
ledger_info,
vl,
validation: 0,
ready,
}
})
// Set timeout to clean up
setTimeout(() => {
// console.log('Cleaning up', ledgerIndexString)
if (ledgers?.[ledgerIndexString]) {
ledgers?.[ledgerIndexString]?.ledger_binary_transactions?.resolve(false)
ledgers?.[ledgerIndexString]?.ledger_info?.resolve(false)
ledgers?.[ledgerIndexString]?.vl?.resolve(false)
}
// Force GC
setTimeout(() => {
if (ledgers?.[ledgerIndexString]) delete ledgers?.[ledgerIndexString]
}, 50)
}, 20_000)
}
if (
readyElement === 'ledger_binary_transactions'
|| readyElement === 'ledger_info'
|| readyElement === 'vl'
) {
ledgers[ledgerIndexString][readyElement].resolve(new Date() / 1000)
}
if (readyElement === 'validation') {
ledgers[ledgerIndexString][readyElement]++
}
}
const waitForLedgerReady = ledgerIndex => {
return ledgers?.[String(ledgerIndex)]?.ready
}
const isLedgerReady = ledgerIndex => {
return ledgers?.[String(ledgerIndex)]?.ledger_binary_transactions.meta.resolved
&& ledgers?.[String(ledgerIndex)]?.ledger_info.meta.resolved
&& ledgers?.[String(ledgerIndex)]?.vl.meta.resolved
}
export {
ledgerReady,
isLedgerReady,
waitForLedgerReady,
}

View File

@@ -2,6 +2,8 @@ import { writeFile } from 'fs'
import { ledgerIndexToFolders } from './ledgerIndexToFolders.mjs'
import { computeBinaryTransactionHash } from './computeBinaryTransactionHash.mjs'
import { dirExists } from './dirExists.mjs'
import { ledgerReady, waitForLedgerReady } from './events/ledgerReady.mjs'
import { onTransaction } from './onTransaction.mjs'
import 'dotenv/config'
const obtainedHumanReadableLedgers = []
@@ -18,7 +20,7 @@ const onLedger = async ({
const storeDir = new URL('../' + relativeStoreDir, import.meta.url).pathname
if (await dirExists(storeDir)) {
;[
const ledgerData = [
...(
obtainedBinaryTxLedgers.indexOf(ledger.ledger_index) < 0
? [
@@ -37,6 +39,9 @@ const onLedger = async ({
connection.send({
command: 'ledger',
ledger_index: ledger.ledger_index,
transactions: true,
expand: true,
binary: false,
})
]
: []),
@@ -71,6 +76,8 @@ const onLedger = async ({
writeFile(storeDir + '/ledger_binary_transactions.json', Buffer.from(JSON.stringify(results.ledger), 'utf8'), err => {
if (err) {
console.log('Error writing file @ ' + storeDir)
} else {
ledgerReady(results.ledger_index, 'ledger_binary_transactions')
}
})
}
@@ -87,16 +94,54 @@ const onLedger = async ({
console.log('Obtained ledger (JSON object)', relativeStoreDir, results.ledger_index, 'Hash', results.ledger.ledger_hash)
writeFile(storeDir + '/ledger_info.json', Buffer.from(JSON.stringify(results.ledger), 'utf8'), err => {
writeFile(storeDir + '/ledger_info.json', Buffer.from(JSON.stringify({ ...results.ledger, transactions: undefined, }), 'utf8'), err => {
if (err) {
console.log('Error writing file @ ' + storeDir)
} else {
ledgerReady(ledger.ledger_index, 'ledger_info')
}
})
}
}
}))
}
return results.ledger
}))
/**
* Deal with transactions & fire events
*/
waitForLedgerReady(ledger.ledger_index).then(async () => {
if (ledgerData.length > 0) {
const [binary, json] = await Promise.all(ledgerData)
const sequetiallyMappedLedgerTxEvents = (json?.transactions || []).map(tx => {
return {
validated: true,
ledger_index: ledger.ledger_index,
transaction: tx,
}
})
.sort((a, b) => a.transaction.Sequence - b.transaction.Sequence)
.reduce((promiseChain, current) => {
return promiseChain.then(() => {
// console.log(' » Tx events: Processing', current.transaction.Sequence)
return onTransaction({
networkId,
transaction: current,
connection,
})
}).then(() => {
// console.log(' » Tx events: Done ', current.transaction.Sequence)
})
}, Promise.resolve())
sequetiallyMappedLedgerTxEvents.then(() => {
// console.log(' « « « « All transactions in ledger processed', ledger.ledger_index)
});
}
})
}
}
}
}

View File

@@ -1,9 +1,12 @@
import { writeFile } from 'fs'
import { dirExists } from './dirExists.mjs'
import { ledgerIndexToFolders } from './ledgerIndexToFolders.mjs'
import { generateV1 as xpop } from '../xpop/generateV1.mjs'
import { xpopGenerate } from './xpopGenerate.mjs'
import { waitForLedgerReady } from './events/ledgerReady.mjs'
import { emit } from '../bin/webserver.mjs'
import 'dotenv/config'
const xpopBinaryDir = new URL('../store/xpop', import.meta.url).pathname
const lastSeenTransactions = []
const fields = (process.env?.FIELDSREQUIRED || '')
@@ -21,52 +24,94 @@ const onTransaction = async ({
networkId,
transaction,
}) => {
if (transaction?.validated) {
const { transaction: tx } = transaction
if (transaction?.validated) {
const { transaction: tx } = transaction
if (tx.hash && lastSeenTransactions.indexOf(tx.hash) < 0) {
lastSeenTransactions.unshift(tx.hash)
lastSeenTransactions.length = 3000
const validTx = hasRequiredFields(tx)
if (!process.env?.NOELIGIBLEFULLTXLOG) {
console.log('TX', tx.hash, validTx)
}
if (validTx && transaction?.ledger_index) {
const relativeStorDir = 'store/' + networkId + '/' + ledgerIndexToFolders(transaction.ledger_index)
const storeDir = new URL('../' + relativeStorDir, import.meta.url).pathname
console.log('xPOP eligible', relativeStorDir, process.env?.NOELIGIBLEFULLTXLOG ? tx.hash : tx)
if (await dirExists(storeDir)) {
const wroteTxFile = await new Promise(resolve => {
writeFile(storeDir + '/tx_' + tx.hash + '.json', Buffer.from(JSON.stringify(transaction), 'utf8'), err => {
if (err) {
console.log('Error writing file @ ' + storeDir)
resolve(false)
}
resolve(true)
})
})
if (tx.hash && lastSeenTransactions.indexOf(tx.hash) < 0) {
lastSeenTransactions.unshift(tx.hash)
lastSeenTransactions.length = 3000
const validTx = hasRequiredFields(tx)
if (!process.env?.NOELIGIBLEFULLTXLOG) {
console.log('TX', tx.hash, validTx)
}
if (validTx && transaction?.ledger_index) {
const relativeStorDir = 'store/' + networkId + '/' + ledgerIndexToFolders(transaction.ledger_index)
const storeDir = new URL('../' + relativeStorDir, import.meta.url).pathname
console.log('xPOP eligible', relativeStorDir, process.env?.NOELIGIBLEFULLTXLOG ? tx.hash : tx)
if (await dirExists(storeDir)) {
writeFile(storeDir + '/tx_' + tx.hash + '.json', Buffer.from(JSON.stringify(transaction), 'utf8'), err => {
if (err) {
console.log('Error writing file @ ' + storeDir)
} else {
if (wroteTxFile) {
await waitForLedgerReady(transaction.ledger_index)
/**
* TX all ready, written to filesystem, ...
* This is where we start a slight delay to give the `onLedger`
* routine some time to fetch & store and then we'll try to
* generate an xPOP.
*/
setTimeout(async () => {
await xpop({
ledgerIndex: transaction.ledger_index,
networkId,
txHash: tx.hash,
const xpopBinary = await xpopGenerate({
ledgerIndex: transaction.ledger_index,
networkId,
txHash: tx.hash,
})
if (await dirExists(xpopBinaryDir)) {
const xpopWritten = await new Promise(resolve => {
writeFile(xpopBinaryDir + '/' + tx.hash, Buffer.from(xpopBinary, 'utf8'), err => {
if (err) {
console.log('Error writing binary XPOP', err)
resolve(false)
} else {
console.log('Wrote binary xPOP: ' + xpopBinaryDir + '/' + tx.hash)
resolve(true)
}
})
})
}, 500)
// ^^ To check: is this enough? If e.g. retrieving the ledger info
// would take longer this may not be enough. Best solution:
// make this await the ledger fetching calls.
// Dirty: extend to e.g. 2000.
if (xpopWritten) {
console.log(' ### EMIT XPOP READY FOR', tx?.Account, Number(tx.Sequence), tx.hash)
return await emit({
account: tx?.Account,
sequence: tx.Sequence,
origin: {
tx: tx.hash,
networkId: networkId,
ledgerIndex: transaction.ledger_index,
burn: tx?.Fee,
},
destination: {
networkId: tx?.OperationLimit,
},
...(
process.env?.URL_PREFIX
? {
xpop: {
binary: `${process.env.URL_PREFIX}/xpop/${tx.hash}`,
source: `${process.env.URL_PREFIX}/${networkId}/${ledgerIndexToFolders(transaction.ledger_index)}/`,
blob: xpopBinary,
}
}
: {}
)
})
}
}
}
})
}
}
}
}
}
}
export {

View File

@@ -3,6 +3,7 @@ import { createDirectory } from './createDirectory.mjs'
import 'dotenv/config'
import { unlData } from './unlData.mjs'
import { ledgerIndexToFolders } from './ledgerIndexToFolders.mjs'
import { ledgerReady } from './events/ledgerReady.mjs'
const lastSeenValidations = []
let lastCreatedLedgerDir
@@ -34,6 +35,8 @@ const onValidation = async ({
const relativeStorDir = 'store/' + networkId + '/' + ledgerIndexToFolders(validation.ledger_index)
const storeDir = new URL('../' + relativeStorDir, import.meta.url).pathname
ledgerReady(validation.ledger_index, 'validation')
if (lastCreatedLedgerDir !== validation.ledger_index) {
await createDirectory(relativeStorDir)
lastCreatedLedgerDir = validation.ledger_index
@@ -41,6 +44,8 @@ const onValidation = async ({
writeFile(storeDir + '/vl.json', Buffer.from(JSON.stringify(unlData.data), 'utf8'), err => {
if (err) {
console.log('Error writing file @ ' + storeDir)
} else {
ledgerReady(validation.ledger_index, 'vl')
}
})
}

View File

@@ -1,5 +1,5 @@
import assert from 'assert'
import { xpop } from './xpopV1.mjs'
import { xpop } from './xpop/v1.mjs'
import { writeFile, readFile, readdir } from 'fs'
import { ledgerIndexToFolders } from '../lib/ledgerIndexToFolders.mjs'
import { dirExists } from '../lib/dirExists.mjs'
@@ -20,7 +20,7 @@ const catjson = async file => {
return JSON.parse(buffer.toString())
}
const generateV1 = async ({
const xpopGenerate = async ({
ledgerIndex,
networkId,
txHash
@@ -83,5 +83,5 @@ const generateV1 = async ({
}
export {
generateV1,
xpopGenerate,
}

788
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@
"main": "index.mjs",
"scripts": {
"dev": "nodemon .",
"xpopdev": "source .env && nodemon --max-old-space-size=50 .",
"xpopdev": "source .env && nodemon --max-old-space-size=40 .",
"serve": "serve ./store/"
},
"author": "Wietse Wind <w@xrpl-labs.com>",
@@ -20,12 +20,20 @@
"dotenv": "^16.3.1",
"ed25519": "^0.0.5",
"elliptic": "^6.5.4",
"express-ws": "^5.0.2",
"morgan": "^1.10.0",
"node-fetch": "^3.3.2",
"ripple-address-codec": "^4.3.0",
"ripple-binary-codec": "^1.10.0",
"serve-index": "^1.9.1",
"ws": "^8.14.2",
"xrpl-client": "^2.2.0"
},
"devDependencies": {
"serve": "^14.2.1"
},
"optionalDependencies": {
"bufferutil": "^4.0.7",
"utf-8-validate": "^6.0.3"
}
}