Queue transactions and process them syncronously (#257)

This commit is contained in:
Chalith Desaman
2023-05-04 13:40:23 +05:30
committed by GitHub
parent ad24546f01
commit 65a0b2dd7e
4 changed files with 154 additions and 65 deletions

View File

@@ -26,7 +26,7 @@ appenv = {
ACQUIRE_LEASE_TIMEOUT_THRESHOLD: 0.8,
ACQUIRE_LEASE_WAIT_TIMEOUT_THRESHOLD: 0.4,
ORPHAN_PRUNE_SCHEDULER_INTERVAL_HOURS: 4,
EXPIRE_INSTANCES_SCHEDULER_INTERVAL_SECONDS: 2,
SASHIMONO_SCHEDULER_INTERVAL_SECONDS: 2,
SASHI_CLI_PATH: appenv.IS_DEV_MODE ? "../build/sashi" : "/usr/bin/sashi",
MB_VERSION: '0.6.2',
TOS_HASH: '757A0237B44D8B2BBB04AE2BAD5813858E0AECD2F0B217075E27E0630BA74314' // This is the sha256 hash of TOS text.

View File

@@ -23,6 +23,10 @@ class MessageBoard {
#instanceExpirationQueue = [];
#graceTimeoutRef = null;
#lastHaltedTime = null;
#concurrencyQueue = {
processing: false,
queue: []
};
constructor(configPath, secretConfigPath, dbPath, sashiCliPath, sashiDbPath, sashiConfigPath) {
this.configPath = configPath;
@@ -91,15 +95,19 @@ class MessageBoard {
// Catch up missed transactions based on the previously updated "last_watched_ledger" record (checkpoint).
await this.#catchupMissedLeases().catch(console.error);
this.db.close();
// Load the sashimono config.
const sashiConfig = ConfigHelper.readSashiConfig(this.sashiConfigPath);
this.activeInstanceCount = this.expiryList.length;
console.log(`Active instance count: ${this.activeInstanceCount}`);
// Update the registry with the active instance count.
await this.hostClient.updateRegInfo(this.activeInstanceCount, this.cfg.version, sashiConfig.system.max_instance_count, null, null,
sashiConfig.system.max_cpu_us, Math.floor((sashiConfig.system.max_mem_kbytes + sashiConfig.system.max_swap_kbytes) / 1000),
Math.floor(sashiConfig.system.max_storage_kbytes / 1000));
this.db.close();
await this.#queueAction(async () => {
// Update the registry with the active instance count.
await this.hostClient.updateRegInfo(this.activeInstanceCount, this.cfg.version, sashiConfig.system.max_instance_count, null, null,
sashiConfig.system.max_cpu_us, Math.floor((sashiConfig.system.max_mem_kbytes + sashiConfig.system.max_swap_kbytes) / 1000),
Math.floor(sashiConfig.system.max_storage_kbytes / 1000));
});
this.xrplApi.on(evernode.XrplApiEvents.LEDGER, async (e) => {
this.lastValidatedLedgerIndex = e.ledger_index;
@@ -110,11 +118,13 @@ class MessageBoard {
this.hostClient.on(evernode.HostEvents.ExtendLease, r => this.handleExtendLease(r));
const checkAndRequestRebate = async () => {
// Send rebate request at startup if there's any pending rebates.
if (hostInfo?.registrationFee > hostRegFee) {
console.log(`Requesting rebate...`);
await this.hostClient.requestRebate();
}
await this.#queueAction(async () => {
// Send rebate request at startup if there's any pending rebates.
if (hostInfo?.registrationFee > hostRegFee) {
console.log(`Requesting rebate...`);
await this.hostClient.requestRebate();
}
});
}
let hostRegFee = this.hostClient.config.hostRegFee;
@@ -141,6 +151,58 @@ class MessageBoard {
}
// Try to acquire the lease update lock.
async #acquireConcurrencyQueue() {
await new Promise(async resolve => {
while (this.#concurrencyQueue.processing) {
await new Promise(resolveSleep => {
setTimeout(resolveSleep, 1000);
})
}
resolve();
});
this.#concurrencyQueue.processing = true;
}
// Release the lease update lock.
async #releaseConcurrencyQueue() {
this.#concurrencyQueue.processing = false;
}
async #queueAction(action) {
await this.#acquireConcurrencyQueue();
this.#concurrencyQueue.queue.push({
callback: action,
attempts: 0
});
await this.#releaseConcurrencyQueue();
}
async #processConcurrencyQueue() {
await this.#acquireConcurrencyQueue();
let toKeep = [];
for (let action of this.#concurrencyQueue.queue) {
try {
await action.callback();
}
catch (e) {
if (action.attempts < 5) {
action.attempts++;
toKeep.push(action);
}
else {
console.error(e);
}
}
}
this.#concurrencyQueue.queue = toKeep;
await this.#releaseConcurrencyQueue();
}
// Check for xrpl halts
#checkLedgersForHalt() {
const currentTime = evernode.UtilHelpers.getCurrentUnixTime('milli');
@@ -236,7 +298,10 @@ class MessageBoard {
if (voteArr && voteArr.length) {
for (const vote of voteArr) {
try {
await this.hostClient.heartbeat(vote);
await this.#queueAction(async () => {
await this.hostClient.heartbeat(vote);
this.lastHeartbeatMoment = await this.hostClient.getMoment();
});
heartbeatSent = true;
}
catch (e) {
@@ -258,8 +323,10 @@ class MessageBoard {
return;
try {
await this.hostClient.heartbeat();
this.lastHeartbeatMoment = currentMoment;
await this.#queueAction(async () => {
await this.hostClient.heartbeat();
this.lastHeartbeatMoment = await this.hostClient.getMoment();
});
}
catch (err) {
if (err.code === 'tecHOOK_REJECTED')
@@ -298,7 +365,9 @@ class MessageBoard {
// Remove from the queue
this.#instanceExpirationQueue = this.#instanceExpirationQueue.filter(i => i.containerName != lease.containerName);
await this.hostClient.updateRegInfo(this.activeInstanceCount);
await this.#queueAction(async () => {
await this.hostClient.updateRegInfo(this.activeInstanceCount);
});
console.log(`Destroyed ${lease.containerName}`);
}
@@ -364,11 +433,12 @@ class MessageBoard {
}
#startSashimonoClockScheduler() {
const timeout = appenv.EXPIRE_INSTANCES_SCHEDULER_INTERVAL_SECONDS * 1000; // Seconds to millisecs.
const timeout = appenv.SASHIMONO_SCHEDULER_INTERVAL_SECONDS * 1000; // Seconds to millisecs.
const scheduler = async () => {
this.#checkLedgersForHalt();
await this.#expireInstances();
await this.#processConcurrencyQueue();
setTimeout(async () => {
await scheduler();
}, timeout);
@@ -409,9 +479,7 @@ class MessageBoard {
await new Promise(async resolve => {
while (this.#leaseUpdateLock) {
await new Promise(resolveSleep => {
setTimeout(() => {
resolveSleep();
}, 1000);
setTimeout(resolveSleep, 1000);
})
}
resolve();
@@ -464,8 +532,10 @@ class MessageBoard {
const uriInfo = evernode.UtilHelpers.decodeLeaseTokenUri(uriToken.URI);
await this.recreateLeaseOffer(instance.name, lease.tenant_xrp_address, uriInfo.leaseIndex);
console.log(`Refunding tenant ${lease.tenant_xrp_address}...`);
await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString());
await this.#queueAction(async () => {
console.log(`Refunding tenant ${lease.tenant_xrp_address}...`);
await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString());
});
}
// Remove the lease record.
@@ -509,11 +579,13 @@ class MessageBoard {
const uriInfo = evernode.UtilHelpers.decodeLeaseTokenUri(uriToken.URI);
await this.recreateLeaseOffer(lease.container_name, lease.tenant_xrp_address, uriInfo.leaseIndex);
// If lease is in ACQUIRING status acquire response is not received by the tenant and lease is not in expiry list.
if (lease.status === LeaseStatus.ACQUIRING) {
console.log(`Refunding tenant ${lease.tenant_xrp_address}...`);
await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString());
}
await this.#queueAction(async () => {
// If lease is in ACQUIRING status acquire response is not received by the tenant and lease is not in expiry list.
if (lease.status === LeaseStatus.ACQUIRING) {
console.log(`Refunding tenant ${lease.tenant_xrp_address}...`);
await this.hostClient.refundTenant(lease.tx_hash, lease.tenant_xrp_address, uriInfo.leaseAmount.toString());
}
});
}
}
}
@@ -522,11 +594,13 @@ class MessageBoard {
}
}
// If active instance count is updated, Send the update registration transaction.
if (this.activeInstanceCount !== activeInstanceCount) {
this.activeInstanceCount = activeInstanceCount;
await this.hostClient.updateRegInfo(this.activeInstanceCount);
}
await this.#queueAction(async () => {
// If active instance count is updated, Send the update registration transaction.
if (this.activeInstanceCount !== activeInstanceCount) {
this.activeInstanceCount = activeInstanceCount;
await this.hostClient.updateRegInfo(this.activeInstanceCount);
}
});
}
async #catchupMissedLeases() {
@@ -546,14 +620,15 @@ class MessageBoard {
const transactions = transactionHistory.map((record) => {
const transaction = record.tx;
transaction.Memos = evernode.TransactionHelper.deserializeMemos(transaction.Memos);
transaction.HookParameters = evernode.TransactionHelper.deserializeHookParams(transaction.HookParameters);
return transaction;
});
loop1:
for (const trx of transactions) {
try {
const memoTypes = trx.Memos.map(m => m.type);
if (memoTypes.includes(evernode.EventTypes.ACQUIRE_LEASE) || memoTypes.includes(evernode.EventTypes.EXTEND_LEASE)) {
const paramValues = trx.HookParameters.map(p => p.value);
if (paramValues.includes(evernode.EventTypes.ACQUIRE_LEASE) || paramValues.includes(evernode.EventTypes.EXTEND_LEASE)) {
// Update last watched ledger sequence number.
await this.updateLastIndexRecord(trx.ledger_index);
@@ -579,7 +654,7 @@ class MessageBoard {
trx.Destination = this.cfg.xrpl.address;
// Handle Acquires.
if (memoTypes.includes(evernode.EventTypes.ACQUIRE_LEASE)) {
if (paramValues.includes(evernode.EventTypes.ACQUIRE_LEASE)) {
// Find and bind the bought lease offer (If the trx. is an ACQUIRE, it should be an URITokenBuy trx)
const offer = (await hostAccount.getURITokens({ ledger_index: trx.ledger_index - 1 }))?.find(o => o.index === trx?.URITokenID && o.Amount);
@@ -598,12 +673,14 @@ class MessageBoard {
// Have to recreate the URIToken Offer for the lease as previous one was not utilized.
await this.recreateLeaseOffer(eventInfo.data.uriTokenId, eventInfo.data.tenant, uriInfo.leaseIndex);
console.log(`Refunding tenant ${eventInfo.data.tenant} for acquire...`);
await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, uriInfo.leaseAmount.toString());
await this.#queueAction(async () => {
console.log(`Refunding tenant ${eventInfo.data.tenant} for acquire...`);
await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, uriInfo.leaseAmount.toString());
});
}
}
} else if (memoTypes.includes(evernode.EventTypes.EXTEND_LEASE)) { // Handle Extensions.
} else if (paramValues.includes(evernode.EventTypes.EXTEND_LEASE)) { // Handle Extensions.
const eventInfo = await this.hostClient.extractEvernodeEvent(trx);
@@ -613,10 +690,11 @@ class MessageBoard {
const tenantXrplAcc = new evernode.XrplAccount(eventInfo.data.tenant);
const uriToken = (await tenantXrplAcc.getURITokens()).find(n => evernode.EvernodeHelpers.isValidURI(n.URI, evernode.EvernodeConstants.LEASE_TOKEN_PREFIX_HEX) && n.index === eventInfo.data.uriTokenId);
if (uriToken) {
// The refund for the extension, if tenant still own the URIToken.
console.log(`Refunding tenant ${eventInfo.data.tenant} for extend...`);
await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, eventInfo.data.payment.toString());
await this.#queueAction(async () => {
// The refund for the extension, if tenant still own the URIToken.
console.log(`Refunding tenant ${eventInfo.data.tenant} for extend...`);
await this.hostClient.refundTenant(trx.hash, eventInfo.data.tenant, eventInfo.data.payment.toString());
});
} else {
console.log(`No such URIToken (${eventInfo.data.uriTokenId}) was found.`);
}
@@ -648,12 +726,14 @@ class MessageBoard {
}
async recreateLeaseOffer(uriTokenId, tenantAddress, leaseIndex) {
// Burn the URIToken and recreate the offer.
await this.hostClient.expireLease(uriTokenId, tenantAddress).catch(console.error);
// We refresh the config here, So if the purchaserTargetPrice is updated by the purchaser service, the new value will be taken.
await this.hostClient.refreshConfig();
const leaseAmount = this.cfg.xrpl.leaseAmount ? this.cfg.xrpl.leaseAmount : parseFloat(this.hostClient.config.purchaserTargetPrice);
await this.hostClient.offerLease(leaseIndex, leaseAmount, appenv.TOS_HASH).catch(console.error);
await this.#queueAction(async () => {
// Burn the URIToken and recreate the offer.
await this.hostClient.expireLease(uriTokenId, tenantAddress).catch(console.error);
// We refresh the config here, So if the purchaserTargetPrice is updated by the purchaser service, the new value will be taken.
await this.hostClient.refreshConfig();
const leaseAmount = this.cfg.xrpl.leaseAmount ? this.cfg.xrpl.leaseAmount : parseFloat(this.hostClient.config.purchaserTargetPrice);
await this.hostClient.offerLease(leaseIndex, leaseAmount, appenv.TOS_HASH).catch(console.error);
});
}
async handleAcquireLease(r) {
@@ -720,7 +800,7 @@ class MessageBoard {
// Number of validated ledgers passed while the instance is created.
diff = evernode.UtilHelpers.getCurrentUnixTime() - startingValidatedTime;
// Give-up the acquiringing porocess if the instance creation itself takes more than 80% of allowed window(in seconds).
// Give-up the acquiring process if the instance creation itself takes more than 80% of allowed window(in seconds).
threshold = this.hostClient.config.leaseAcquireWindow * appenv.ACQUIRE_LEASE_TIMEOUT_THRESHOLD;
if (diff > threshold) {
console.error(`Instance creation timeout. Took: ${diff} seconds. Threshold: ${threshold} seconds`);
@@ -744,10 +824,12 @@ class MessageBoard {
// Update the active instance count.
this.activeInstanceCount++;
await this.hostClient.updateRegInfo(this.activeInstanceCount);
await this.#queueAction(async () => {
await this.hostClient.updateRegInfo(this.activeInstanceCount);
// Send the acquire response with created instance info.
await this.hostClient.acquireSuccess(acquireRefId, tenantAddress, createRes);
// Send the acquire response with created instance info.
await this.hostClient.acquireSuccess(acquireRefId, tenantAddress, createRes);
});
}
}
}
@@ -766,8 +848,10 @@ class MessageBoard {
if (leaseIndex >= 0)
await this.recreateLeaseOffer(uriTokenId, tenantAddress, leaseIndex).catch(console.error);
// Send error transaction with received leaseAmount.
await this.hostClient.acquireError(acquireRefId, tenantAddress, leaseAmount, e.content || 'invalid_acquire_lease').catch(console.error);
await this.#queueAction(async () => {
// Send error transaction with received leaseAmount.
await this.hostClient.acquireError(acquireRefId, tenantAddress, leaseAmount, e.content || 'invalid_acquire_lease').catch(console.error);
});
}
finally {
await this.#releaseLeaseUpdateLock();
@@ -843,14 +927,19 @@ class MessageBoard {
if (!expiryItemFound)
throw "No matching expiration record was found for the instance";
// Send the extend success response
await this.hostClient.extendSuccess(extendRefId, tenantAddress, expiryTimeStamp);
const expiryMoment = await this.hostClient.getMoment(expiryTimeStamp)
await this.#queueAction(async () => {
// Send the extend success response
await this.hostClient.extendSuccess(extendRefId, tenantAddress, expiryMoment);
});
}
catch (e) {
console.error(e);
// Send the extend error response
await this.hostClient.extendError(extendRefId, tenantAddress, e.content || 'invalid_extend_lease', amount);
await this.#queueAction(async () => {
// Send the extend error response
await this.hostClient.extendError(extendRefId, tenantAddress, e.content || 'invalid_extend_lease', amount);
});
} finally {
this.db.close();
}

View File

@@ -6,7 +6,7 @@
"": {
"name": "mb-xrpl",
"dependencies": {
"evernode-js-client": "0.6.1",
"evernode-js-client": "0.6.2",
"sqlite3": "5.0.2"
},
"devDependencies": {
@@ -937,9 +937,9 @@
}
},
"node_modules/evernode-js-client": {
"version": "0.6.1",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.1.tgz",
"integrity": "sha512-IFUDY8Z0psmeTnjHXTpbWl7oMIEp7bX7uidjx87rzhlwJueC3XDSOb8Wdz0eEtVaQGrzdbNuFLfPDtTrTCLxbw==",
"version": "0.6.2",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.2.tgz",
"integrity": "sha512-0Io4AFWghk0R6FC7U7AIIDmpW0GIqZ3BaEaBOWhDEyB+6tnO6ZNix89xEUB6zFuvQvGU8M2s7p5c/a7pIK5SmA==",
"hasInstallScript": true,
"dependencies": {
"elliptic": "6.5.4",
@@ -3932,9 +3932,9 @@
"dev": true
},
"evernode-js-client": {
"version": "0.6.1",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.1.tgz",
"integrity": "sha512-IFUDY8Z0psmeTnjHXTpbWl7oMIEp7bX7uidjx87rzhlwJueC3XDSOb8Wdz0eEtVaQGrzdbNuFLfPDtTrTCLxbw==",
"version": "0.6.2",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.6.2.tgz",
"integrity": "sha512-0Io4AFWghk0R6FC7U7AIIDmpW0GIqZ3BaEaBOWhDEyB+6tnO6ZNix89xEUB6zFuvQvGU8M2s7p5c/a7pIK5SmA==",
"requires": {
"elliptic": "6.5.4",
"libsodium-wrappers": "0.7.10",

View File

@@ -5,7 +5,7 @@
"build": "npm run lint && ncc build app.js --minify -o dist"
},
"dependencies": {
"evernode-js-client": "0.6.1",
"evernode-js-client": "0.6.2",
"sqlite3": "5.0.2"
},
"devDependencies": {