From 65a0b2dd7e2ae3f708cccfc3b8a1dacaa331a81b Mon Sep 17 00:00:00 2001 From: Chalith Desaman Date: Thu, 4 May 2023 13:40:23 +0530 Subject: [PATCH] Queue transactions and process them syncronously (#257) --- mb-xrpl/lib/appenv.js | 2 +- mb-xrpl/lib/message-board.js | 201 +++++++++++++++++++++++++---------- mb-xrpl/package-lock.json | 14 +-- mb-xrpl/package.json | 2 +- 4 files changed, 154 insertions(+), 65 deletions(-) diff --git a/mb-xrpl/lib/appenv.js b/mb-xrpl/lib/appenv.js index 014f61c..0d9b30d 100644 --- a/mb-xrpl/lib/appenv.js +++ b/mb-xrpl/lib/appenv.js @@ -26,7 +26,7 @@ appenv = { ACQUIRE_LEASE_TIMEOUT_THRESHOLD: 0.8, ACQUIRE_LEASE_WAIT_TIMEOUT_THRESHOLD: 0.4, ORPHAN_PRUNE_SCHEDULER_INTERVAL_HOURS: 4, - EXPIRE_INSTANCES_SCHEDULER_INTERVAL_SECONDS: 2, + SASHIMONO_SCHEDULER_INTERVAL_SECONDS: 2, SASHI_CLI_PATH: appenv.IS_DEV_MODE ? "../build/sashi" : "/usr/bin/sashi", MB_VERSION: '0.6.2', TOS_HASH: '757A0237B44D8B2BBB04AE2BAD5813858E0AECD2F0B217075E27E0630BA74314' // This is the sha256 hash of TOS text. diff --git a/mb-xrpl/lib/message-board.js b/mb-xrpl/lib/message-board.js index df3fc8c..6f2166c 100644 --- a/mb-xrpl/lib/message-board.js +++ b/mb-xrpl/lib/message-board.js @@ -23,6 +23,10 @@ class MessageBoard { #instanceExpirationQueue = []; #graceTimeoutRef = null; #lastHaltedTime = null; + #concurrencyQueue = { + processing: false, + queue: [] + }; constructor(configPath, secretConfigPath, dbPath, sashiCliPath, sashiDbPath, sashiConfigPath) { this.configPath = configPath; @@ -91,15 +95,19 @@ class MessageBoard { // Catch up missed transactions based on the previously updated "last_watched_ledger" record (checkpoint). await this.#catchupMissedLeases().catch(console.error); + this.db.close(); + // Load the sashimono config. const sashiConfig = ConfigHelper.readSashiConfig(this.sashiConfigPath); this.activeInstanceCount = this.expiryList.length; console.log(`Active instance count: ${this.activeInstanceCount}`); - // Update the registry with the active instance count. - await this.hostClient.updateRegInfo(this.activeInstanceCount, this.cfg.version, sashiConfig.system.max_instance_count, null, null, - sashiConfig.system.max_cpu_us, Math.floor((sashiConfig.system.max_mem_kbytes + sashiConfig.system.max_swap_kbytes) / 1000), - Math.floor(sashiConfig.system.max_storage_kbytes / 1000)); - this.db.close(); + + await this.#queueAction(async () => { + // Update the registry with the active instance count. + await this.hostClient.updateRegInfo(this.activeInstanceCount, this.cfg.version, sashiConfig.system.max_instance_count, null, null, + sashiConfig.system.max_cpu_us, Math.floor((sashiConfig.system.max_mem_kbytes + sashiConfig.system.max_swap_kbytes) / 1000), + Math.floor(sashiConfig.system.max_storage_kbytes / 1000)); + }); this.xrplApi.on(evernode.XrplApiEvents.LEDGER, async (e) => { this.lastValidatedLedgerIndex = e.ledger_index; @@ -110,11 +118,13 @@ class MessageBoard { this.hostClient.on(evernode.HostEvents.ExtendLease, r => this.handleExtendLease(r)); const checkAndRequestRebate = async () => { - // Send rebate request at startup if there's any pending rebates. - if (hostInfo?.registrationFee > hostRegFee) { - console.log(`Requesting rebate...`); - await this.hostClient.requestRebate(); - } + await this.#queueAction(async () => { + // Send rebate request at startup if there's any pending rebates. + if (hostInfo?.registrationFee > hostRegFee) { + console.log(`Requesting rebate...`); + await this.hostClient.requestRebate(); + } + }); } let hostRegFee = this.hostClient.config.hostRegFee; @@ -141,6 +151,58 @@ class MessageBoard { } + // Try to acquire the lease update lock. + async #acquireConcurrencyQueue() { + await new Promise(async resolve => { + while (this.#concurrencyQueue.processing) { + await new Promise(resolveSleep => { + setTimeout(resolveSleep, 1000); + }) + } + resolve(); + }); + this.#concurrencyQueue.processing = true; + } + + // Release the lease update lock. + async #releaseConcurrencyQueue() { + this.#concurrencyQueue.processing = false; + } + + async #queueAction(action) { + await this.#acquireConcurrencyQueue(); + + this.#concurrencyQueue.queue.push({ + callback: action, + attempts: 0 + }); + + await this.#releaseConcurrencyQueue(); + } + + async #processConcurrencyQueue() { + await this.#acquireConcurrencyQueue(); + + let toKeep = []; + for (let action of this.#concurrencyQueue.queue) { + try { + await action.callback(); + } + catch (e) { + if (action.attempts < 5) { + action.attempts++; + toKeep.push(action); + } + else { + console.error(e); + } + } + } + this.#concurrencyQueue.queue = toKeep; + + await this.#releaseConcurrencyQueue(); + } + // Check for xrpl halts #checkLedgersForHalt() { const currentTime = evernode.UtilHelpers.getCurrentUnixTime('milli'); @@ -236,7 +298,10 @@ class MessageBoard { if (voteArr && voteArr.length) { for (const vote of voteArr) { try { - await this.hostClient.heartbeat(vote); + await this.#queueAction(async () => { + await this.hostClient.heartbeat(vote); + this.lastHeartbeatMoment = await this.hostClient.getMoment(); + }); heartbeatSent = true; } catch (e) { @@ -258,8 +323,10 @@ class MessageBoard { return; try { - await this.hostClient.heartbeat(); - this.lastHeartbeatMoment = currentMoment; + await this.#queueAction(async () => { + await this.hostClient.heartbeat(); + this.lastHeartbeatMoment = await this.hostClient.getMoment(); + }); } catch (err) { if (err.code === 'tecHOOK_REJECTED') @@ -298,7 +365,9 @@ class MessageBoard { // Remove from the queue this.#instanceExpirationQueue = this.#instanceExpirationQueue.filter(i => i.containerName != lease.containerName); - await this.hostClient.updateRegInfo(this.activeInstanceCount); + await this.#queueAction(async () => { + await this.hostClient.updateRegInfo(this.activeInstanceCount); + }); console.log(`Destroyed ${lease.containerName}`); } @@ -364,11 +433,12 @@ class MessageBoard { } #startSashimonoClockScheduler() { - const timeout = appenv.EXPIRE_INSTANCES_SCHEDULER_INTERVAL_SECONDS * 1000; // Seconds to millisecs. + const timeout = appenv.SASHIMONO_SCHEDULER_INTERVAL_SECONDS * 1000; // Seconds to millisecs. const scheduler = async () => { this.#checkLedgersForHalt(); await this.#expireInstances(); + await this.#processConcurrencyQueue(); setTimeout(async () => { await scheduler(); }, timeout); @@ -409,9 +479,7 @@ class MessageBoard { await new Promise(async resolve => { while (this.#leaseUpdateLock) { await new Promise(resolveSleep => { - setTimeout(() => { - resolveSleep(); - }, 1000); + setTimeout(resolveSleep, 1000); }) } resolve(); @@ -464,8 +532,10 @@ class MessageBoard { const uriInfo = evernode.UtilHelpers.decodeLeaseTokenUri(uriToken.URI); await this.recreateLeaseOffer(instance.name, lease.tenant_xrp_address, uriInfo.leaseIndex); - console.log(`Refunding tenant ${lease.tenant_xrp_address}...`); - await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString()); + await this.#queueAction(async () => { + console.log(`Refunding tenant ${lease.tenant_xrp_address}...`); + await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString()); + }); } // Remove the lease record. @@ -509,11 +579,13 @@ class MessageBoard { const uriInfo = evernode.UtilHelpers.decodeLeaseTokenUri(uriToken.URI); await this.recreateLeaseOffer(lease.container_name, lease.tenant_xrp_address, uriInfo.leaseIndex); - // If lease is in ACQUIRING status acquire response is not received by the tenant and lease is not in expiry list. - if (lease.status === LeaseStatus.ACQUIRING) { - console.log(`Refunding tenant ${lease.tenant_xrp_address}...`); - await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString()); - } + await this.#queueAction(async () => { + // If lease is in ACQUIRING status acquire response is not received by the tenant and lease is not in expiry list. + if (lease.status === LeaseStatus.ACQUIRING) { + console.log(`Refunding tenant ${lease.tenant_xrp_address}...`); + await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString()); + } + }); } } } @@ -522,11 +594,13 @@ class MessageBoard { } } - // If active instance count is updated, Send the update registration transaction. - if (this.activeInstanceCount !== activeInstanceCount) { - this.activeInstanceCount = activeInstanceCount; - await this.hostClient.updateRegInfo(this.activeInstanceCount); - } + await this.#queueAction(async () => { + // If active instance count is updated, Send the update registration transaction. + if (this.activeInstanceCount !== activeInstanceCount) { + this.activeInstanceCount = activeInstanceCount; + await this.hostClient.updateRegInfo(this.activeInstanceCount); + } + }); } async #catchupMissedLeases() { @@ -546,14 +620,15 @@ class MessageBoard { const transactions = transactionHistory.map((record) => { const transaction = record.tx; transaction.Memos = evernode.TransactionHelper.deserializeMemos(transaction.Memos); + transaction.HookParameters = evernode.TransactionHelper.deserializeHookParams(transaction.HookParameters); return transaction; }); loop1: for (const trx of transactions) { try { - const memoTypes = trx.Memos.map(m => m.type); - if (memoTypes.includes(evernode.EventTypes.ACQUIRE_LEASE) || memoTypes.includes(evernode.EventTypes.EXTEND_LEASE)) { + const paramValues = trx.HookParameters.map(p => p.value); + if (paramValues.includes(evernode.EventTypes.ACQUIRE_LEASE) || paramValues.includes(evernode.EventTypes.EXTEND_LEASE)) { // Update last watched ledger sequence number. await this.updateLastIndexRecord(trx.ledger_index); @@ -579,7 +654,7 @@ class MessageBoard { trx.Destination = this.cfg.xrpl.address; // Handle Acquires. - if (memoTypes.includes(evernode.EventTypes.ACQUIRE_LEASE)) { + if (paramValues.includes(evernode.EventTypes.ACQUIRE_LEASE)) { // Find and bind the bought lease offer (If the trx. is an ACQUIRE, it should be an URITokenBuy trx) const offer = (await hostAccount.getURITokens({ ledger_index: trx.ledger_index - 1 }))?.find(o => o.index === trx?.URITokenID && o.Amount); @@ -598,12 +673,14 @@ class MessageBoard { // Have to recreate the URIToken Offer for the lease as previous one was not utilized. await this.recreateLeaseOffer(eventInfo.data.uriTokenId, eventInfo.data.tenant, uriInfo.leaseIndex); - console.log(`Refunding tenant ${eventInfo.data.tenant} for acquire...`); - await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, uriInfo.leaseAmount.toString()); + await this.#queueAction(async () => { + console.log(`Refunding tenant ${eventInfo.data.tenant} for acquire...`); + await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, uriInfo.leaseAmount.toString()); + }); } } - } else if (memoTypes.includes(evernode.EventTypes.EXTEND_LEASE)) { // Handle Extensions. + } else if (paramValues.includes(evernode.EventTypes.EXTEND_LEASE)) { // Handle Extensions. const eventInfo = await this.hostClient.extractEvernodeEvent(trx); @@ -613,10 +690,11 @@ class MessageBoard { const tenantXrplAcc = new evernode.XrplAccount(eventInfo.data.tenant); const uriToken = (await tenantXrplAcc.getURITokens()).find(n => evernode.EvernodeHelpers.isValidURI(n.URI, evernode.EvernodeConstants.LEASE_TOKEN_PREFIX_HEX) && n.index === eventInfo.data.uriTokenId); if (uriToken) { - // The refund for the extension, if tenant still own the URIToken. - console.log(`Refunding tenant ${eventInfo.data.tenant} for extend...`); - await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, eventInfo.data.payment.toString()); - + await this.#queueAction(async () => { + // The refund for the extension, if tenant still own the URIToken. + console.log(`Refunding tenant ${eventInfo.data.tenant} for extend...`); + await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, eventInfo.data.payment.toString()); + }); } else { console.log(`No such URIToken (${eventInfo.data.uriTokenId}) was found.`); } @@ -648,12 +726,14 @@ class MessageBoard { } async recreateLeaseOffer(uriTokenId, tenantAddress, leaseIndex) { - // Burn the URIToken and recreate the offer. - await this.hostClient.expireLease(uriTokenId, tenantAddress).catch(console.error); - // We refresh the config here, So if the purchaserTargetPrice is updated by the purchaser service, the new value will be taken. - await this.hostClient.refreshConfig(); - const leaseAmount = this.cfg.xrpl.leaseAmount ? this.cfg.xrpl.leaseAmount : parseFloat(this.hostClient.config.purchaserTargetPrice); - await this.hostClient.offerLease(leaseIndex, leaseAmount, appenv.TOS_HASH).catch(console.error); + await this.#queueAction(async () => { + // Burn the URIToken and recreate the offer. + await this.hostClient.expireLease(uriTokenId, tenantAddress).catch(console.error); + // We refresh the config here, So if the purchaserTargetPrice is updated by the purchaser service, the new value will be taken. + await this.hostClient.refreshConfig(); + const leaseAmount = this.cfg.xrpl.leaseAmount ? this.cfg.xrpl.leaseAmount : parseFloat(this.hostClient.config.purchaserTargetPrice); + await this.hostClient.offerLease(leaseIndex, leaseAmount, appenv.TOS_HASH).catch(console.error); + }); } async handleAcquireLease(r) { @@ -720,7 +800,7 @@ class MessageBoard { // Number of validated ledgers passed while the instance is created. diff = evernode.UtilHelpers.getCurrentUnixTime() - startingValidatedTime; - // Give-up the acquiringing porocess if the instance creation itself takes more than 80% of allowed window(in seconds). + // Give-up the acquiring process if the instance creation itself takes more than 80% of allowed window(in seconds). threshold = this.hostClient.config.leaseAcquireWindow * appenv.ACQUIRE_LEASE_TIMEOUT_THRESHOLD; if (diff > threshold) { console.error(`Instance creation timeout. Took: ${diff} seconds. Threshold: ${threshold} seconds`); @@ -744,10 +824,12 @@ class MessageBoard { // Update the active instance count. this.activeInstanceCount++; - await this.hostClient.updateRegInfo(this.activeInstanceCount); + await this.#queueAction(async () => { + await this.hostClient.updateRegInfo(this.activeInstanceCount); - // Send the acquire response with created instance info. - await this.hostClient.acquireSuccess(acquireRefId, tenantAddress, createRes); + // Send the acquire response with created instance info. + await this.hostClient.acquireSuccess(acquireRefId, tenantAddress, createRes); + }); } } } @@ -766,8 +848,10 @@ class MessageBoard { if (leaseIndex >= 0) await this.recreateLeaseOffer(uriTokenId, tenantAddress, leaseIndex).catch(console.error); - // Send error transaction with received leaseAmount. - await this.hostClient.acquireError(acquireRefId, tenantAddress, leaseAmount, e.content || 'invalid_acquire_lease').catch(console.error); + await this.#queueAction(async () => { + // Send error transaction with received leaseAmount. + await this.hostClient.acquireError(acquireRefId, tenantAddress, leaseAmount, e.content || 'invalid_acquire_lease').catch(console.error); + }); } finally { await this.#releaseLeaseUpdateLock(); @@ -843,14 +927,19 @@ class MessageBoard { if (!expiryItemFound) throw "No matching expiration record was found for the instance"; - // Send the extend success response - await this.hostClient.extendSuccess(extendRefId, tenantAddress, expiryTimeStamp); + const expiryMoment = await this.hostClient.getMoment(expiryTimeStamp) + await this.#queueAction(async () => { + // Send the extend success response + await this.hostClient.extendSuccess(extendRefId, tenantAddress, expiryMoment); + }); } catch (e) { console.error(e); - // Send the extend error response - await this.hostClient.extendError(extendRefId, tenantAddress, e.content || 'invalid_extend_lease', amount); + await this.#queueAction(async () => { + // Send the extend error response + await this.hostClient.extendError(extendRefId, tenantAddress, e.content || 'invalid_extend_lease', amount); + }); } finally { this.db.close(); } diff --git a/mb-xrpl/package-lock.json b/mb-xrpl/package-lock.json index 696702c..039116b 100644 --- a/mb-xrpl/package-lock.json +++ b/mb-xrpl/package-lock.json @@ -6,7 +6,7 @@ "": { "name": "mb-xrpl", "dependencies": { - "evernode-js-client": "0.6.1", + "evernode-js-client": "0.6.2", "sqlite3": "5.0.2" }, "devDependencies": { @@ -937,9 +937,9 @@ } }, "node_modules/evernode-js-client": { - "version": "0.6.1", - "resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.1.tgz", - "integrity": "sha512-IFUDY8Z0psmeTnjHXTpbWl7oMIEp7bX7uidjx87rzhlwJueC3XDSOb8Wdz0eEtVaQGrzdbNuFLfPDtTrTCLxbw==", + "version": "0.6.2", + "resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.2.tgz", + "integrity": "sha512-0Io4AFWghk0R6FC7U7AIIDmpW0GIqZ3BaEaBOWhDEyB+6tnO6ZNix89xEUB6zFuvQvGU8M2s7p5c/a7pIK5SmA==", "hasInstallScript": true, "dependencies": { "elliptic": "6.5.4", @@ -3932,9 +3932,9 @@ "dev": true }, "evernode-js-client": { - "version": "0.6.1", - "resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.1.tgz", - "integrity": "sha512-IFUDY8Z0psmeTnjHXTpbWl7oMIEp7bX7uidjx87rzhlwJueC3XDSOb8Wdz0eEtVaQGrzdbNuFLfPDtTrTCLxbw==", + "version": "0.6.2", + "resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.2.tgz", + "integrity": "sha512-0Io4AFWghk0R6FC7U7AIIDmpW0GIqZ3BaEaBOWhDEyB+6tnO6ZNix89xEUB6zFuvQvGU8M2s7p5c/a7pIK5SmA==", "requires": { "elliptic": "6.5.4", "libsodium-wrappers": "0.7.10", diff --git a/mb-xrpl/package.json b/mb-xrpl/package.json index 337f446..3ad0bf5 100644 --- a/mb-xrpl/package.json +++ b/mb-xrpl/package.json @@ -5,7 +5,7 @@ "build": "npm run lint && ncc build app.js --minify -o dist" }, "dependencies": { - "evernode-js-client": "0.6.1", + "evernode-js-client": "0.6.2", "sqlite3": "5.0.2" }, "devDependencies": {