diff --git a/mb-xrpl/lib/appenv.js b/mb-xrpl/lib/appenv.js index 59e50ed..3881c5e 100644 --- a/mb-xrpl/lib/appenv.js +++ b/mb-xrpl/lib/appenv.js @@ -24,7 +24,7 @@ appenv = { ACQUIRE_LEASE_WAIT_TIMEOUT_THRESHOLD: 0.4, ORPHAN_PRUNE_SCHEDULER_INTERVAL_HOURS: 4, SASHI_CLI_PATH: appenv.IS_DEV_MODE ? "../build/sashi" : "/usr/bin/sashi", - MB_VERSION: '0.5.8', + MB_VERSION: '0.5.9', TOS_HASH: '757A0237B44D8B2BBB04AE2BAD5813858E0AECD2F0B217075E27E0630BA74314' // This is the sha256 hash of TOS text. } Object.freeze(appenv); diff --git a/mb-xrpl/lib/message-board.js b/mb-xrpl/lib/message-board.js index 2679bc8..62e5a17 100644 --- a/mb-xrpl/lib/message-board.js +++ b/mb-xrpl/lib/message-board.js @@ -15,6 +15,8 @@ const LeaseStatus = { } class MessageBoard { + #leaseUpdateLock = false; // This locking mechanism is temporary, can be removed when acquire queue is implemented + constructor(configPath, secretConfigPath, dbPath, sashiCliPath, sashiDbPath) { this.configPath = configPath; this.secretConfigPath = secretConfigPath; @@ -109,6 +111,7 @@ class MessageBoard { this.expiryList = this.expiryList.filter(x => x.expiryLedger >= e.ledger_index); this.db.open(); + await this.#acquireLeaseUpdateLock(); for (const x of expired) { try { console.log(`Moments exceeded (current ledger:${e.ledger_index}, expiry ledger:${x.expiryLedger}). Destroying ${x.containerName}`); @@ -138,6 +141,7 @@ class MessageBoard { console.error(e); } } + await this.#releaseLeaseUpdateLock(); this.db.close(); } }); @@ -182,7 +186,10 @@ class MessageBoard { const scheduler = async () => { console.log(`Starting the scheduled prune job...`); - await this.#pruneOrphanLeases().catch(console.error); + await this.#acquireLeaseUpdateLock(); + await this.#pruneOrphanLeases().catch(console.error).finally(async () => { + await this.#releaseLeaseUpdateLock(); + }); console.log(`Stopped the scheduled prune job.`); setTimeout(async () => { await scheduler(); @@ -194,6 +201,26 @@ class MessageBoard { }, timeout); } + // Try to acquire the lease update lock. + async #acquireLeaseUpdateLock() { + await new Promise(async resolve => { + while (this.#leaseUpdateLock) { + await new Promise(resolveSleep => { + setTimeout(() => { + resolveSleep(); + }, 1000); + }) + } + resolve(); + }); + this.#leaseUpdateLock = true; + } + + // Release the lease update lock. + async #releaseLeaseUpdateLock() { + this.#leaseUpdateLock = false; + } + async #pruneOrphanLeases() { // Note: If this is soft deletion we need to handle the destroyed status and replace deleteLeaseRecord with changing the status. @@ -330,6 +357,7 @@ class MessageBoard { this.db.open(); try { + await this.#acquireLeaseUpdateLock(); if (r.host !== this.cfg.xrpl.address) throw "Invalid host in the lease aquire."; @@ -426,6 +454,7 @@ class MessageBoard { await this.hostClient.acquireError(acquireRefId, tenantAddress, leaseAmount, e.content || 'invalid_acquire_lease').catch(console.error); } finally { + await this.#releaseLeaseUpdateLock(); this.db.close(); } } diff --git a/src/hp_manager.cpp b/src/hp_manager.cpp index 7acdbe1..f354bda 100644 --- a/src/hp_manager.cpp +++ b/src/hp_manager.cpp @@ -701,11 +701,44 @@ namespace hp if (config.contract.execute.has_value()) d["contract"]["execute"] = config.contract.execute.value(); - if (config.contract.roundtime.has_value()) - d["contract"]["roundtime"] = config.contract.roundtime.value(); + if (config.contract.environment.has_value()) + d["contract"]["environment"] = config.contract.environment.value(); - if (config.contract.log.enable.has_value()) - d["contract"]["log"]["enable"] = config.contract.log.enable.value(); + if (config.contract.max_input_ledger_offset.has_value()) + d["contract"]["max_input_ledger_offset"] = config.contract.max_input_ledger_offset.value(); + + if (config.contract.consensus.mode.has_value()) + d["contract"]["consensus"]["mode"] = config.contract.consensus.mode.value(); + + if (config.contract.consensus.roundtime.has_value()) + d["contract"]["consensus"]["roundtime"] = config.contract.consensus.roundtime.value(); + + if (config.contract.consensus.stage_slice.has_value()) + d["contract"]["consensus"]["stage_slice"] = config.contract.consensus.stage_slice.value(); + + if (config.contract.consensus.threshold.has_value()) + d["contract"]["consensus"]["threshold"] = config.contract.consensus.threshold.value(); + + if (config.contract.npl.mode.has_value()) + d["contract"]["npl"]["mode"] = config.contract.npl.mode.value(); + + if (config.contract.round_limits.user_input_bytes.has_value()) + d["contract"]["round_limits"]["user_input_bytes"] = config.contract.round_limits.user_input_bytes.value(); + + if (config.contract.round_limits.user_output_bytes.has_value()) + d["contract"]["round_limits"]["user_output_bytes"] = config.contract.round_limits.user_output_bytes.value(); + + if (config.contract.round_limits.npl_output_bytes.has_value()) + d["contract"]["round_limits"]["npl_output_bytes"] = config.contract.round_limits.npl_output_bytes.value(); + + if (config.contract.round_limits.proc_cpu_seconds.has_value()) + d["contract"]["round_limits"]["proc_cpu_seconds"] = config.contract.round_limits.proc_cpu_seconds.value(); + + if (config.contract.round_limits.proc_mem_bytes.has_value()) + d["contract"]["round_limits"]["proc_mem_bytes"] = config.contract.round_limits.proc_mem_bytes.value(); + + if (config.contract.round_limits.proc_ofd_count.has_value()) + d["contract"]["round_limits"]["proc_ofd_count"] = config.contract.round_limits.proc_ofd_count.value(); if (config.contract.log.max_mbytes_per_file.has_value()) d["contract"]["log"]["max_mbytes_per_file"] = config.contract.log.max_mbytes_per_file.value(); diff --git a/src/msg/json/msg_json.cpp b/src/msg/json/msg_json.cpp index 72fd7d2..85cfb46 100644 --- a/src/msg/json/msg_json.cpp +++ b/src/msg/json/msg_json.cpp @@ -9,8 +9,8 @@ namespace msg::json constexpr const char *SEP_COMMA_NOQUOTE = ",\""; constexpr const char *SEP_COLON_NOQUOTE = "\":"; constexpr const char *DOUBLE_QUOTE = "\""; - constexpr uint16_t MOMENT_SIZE = 1190; // XRP ledgers per Moment. - constexpr uint16_t INSTANCE_INFO_SIZE = 488; // Size of a single instance info + constexpr uint16_t MOMENT_SIZE = 1190; // XRP ledgers per Moment. + constexpr uint16_t INSTANCE_INFO_SIZE = 488; // Size of a single instance info /** * Parses a json message sent by the message board. * @param d Jsoncons document to which the parsed json should be loaded. @@ -296,9 +296,60 @@ namespace msg::json if (contract.contains(msg::FLD_EXECUTE)) msg.config.contract.execute = contract[msg::FLD_EXECUTE].as(); + if (contract.contains(msg::FLD_ENVIRONMENT)) + msg.config.contract.environment = contract[msg::FLD_ENVIRONMENT].as(); + + if (contract.contains(msg::FLD_MAX_INP_LEDGER_OFFSET)) + msg.config.contract.max_input_ledger_offset = contract[msg::FLD_MAX_INP_LEDGER_OFFSET].as(); + if (contract.contains(msg::FLD_ROUNDTIME)) msg.config.contract.roundtime = contract[msg::FLD_ROUNDTIME].as(); + if (contract.contains(msg::FLD_CONSENSUS)) + { + const jsoncons::json &consensus = contract[msg::FLD_CONSENSUS]; + if (consensus.contains(msg::FLD_MODE)) + msg.config.contract.consensus.mode = consensus[msg::FLD_MODE].as(); + + if (consensus.contains(msg::FLD_ROUNDTIME)) + msg.config.contract.consensus.roundtime = consensus[msg::FLD_ROUNDTIME].as(); + + if (consensus.contains(msg::FLD_STAGE_SLICE)) + msg.config.contract.consensus.stage_slice = consensus[msg::FLD_STAGE_SLICE].as(); + + if (consensus.contains(msg::FLD_THRESHOLD)) + msg.config.contract.consensus.threshold = consensus[msg::FLD_THRESHOLD].as(); + } + + if (contract.contains(msg::FLD_NPL)) + { + const jsoncons::json &npl = contract[msg::FLD_NPL]; + if (npl.contains(msg::FLD_MODE)) + msg.config.contract.npl.mode = npl[msg::FLD_MODE].as(); + } + + if (contract.contains(msg::FLD_ROUND_LIMITS)) + { + const jsoncons::json &round_limits = contract[msg::FLD_ROUND_LIMITS]; + if (round_limits.contains(msg::FLD_USER_INP_BYTES)) + msg.config.contract.round_limits.user_input_bytes = round_limits[msg::FLD_USER_INP_BYTES].as(); + + if (round_limits.contains(msg::FLD_USER_OUTP_BYTES)) + msg.config.contract.round_limits.user_output_bytes = round_limits[msg::FLD_USER_OUTP_BYTES].as(); + + if (round_limits.contains(msg::FLD_NPL_OUTP_BYTES)) + msg.config.contract.round_limits.npl_output_bytes = round_limits[msg::FLD_NPL_OUTP_BYTES].as(); + + if (round_limits.contains(msg::FLD_PROC_CPU_SECS)) + msg.config.contract.round_limits.proc_cpu_seconds = round_limits[msg::FLD_PROC_CPU_SECS].as(); + + if (round_limits.contains(msg::FLD_PROC_MEM_BYTES)) + msg.config.contract.round_limits.proc_mem_bytes = round_limits[msg::FLD_PROC_MEM_BYTES].as(); + + if (round_limits.contains(msg::FLD_PROC_OFD_COUNT)) + msg.config.contract.round_limits.proc_ofd_count = round_limits[msg::FLD_PROC_OFD_COUNT].as(); + } + if (contract.contains(msg::FLD_LOG)) { const jsoncons::json &log = contract[msg::FLD_LOG]; diff --git a/src/msg/msg_common.hpp b/src/msg/msg_common.hpp index 7ad8908..c56222d 100644 --- a/src/msg/msg_common.hpp +++ b/src/msg/msg_common.hpp @@ -35,12 +35,40 @@ namespace msg std::optional max_file_count; }; + struct consensus_config + { + std::optional mode; + std::optional roundtime; + std::optional stage_slice; + std::optional threshold; + }; + + struct npl_config + { + std::optional mode; + }; + + struct round_limits_config + { + std::optional user_input_bytes; + std::optional user_output_bytes; + std::optional npl_output_bytes; + std::optional proc_cpu_seconds; + std::optional proc_mem_bytes; + std::optional proc_ofd_count; + }; + struct contract_config { - std::optional roundtime; + std::optional roundtime; std::set unl; std::optional execute; + std::optional environment; + std::optional max_input_ledger_offset; c_log_config log; + consensus_config consensus; + npl_config npl; + round_limits_config round_limits; }; struct peer_discovery_config @@ -147,7 +175,21 @@ namespace msg constexpr const char *FLD_MESH = "mesh"; constexpr const char *FLD_USER = "user"; constexpr const char *FLD_EXECUTE = "execute"; + constexpr const char *FLD_ENVIRONMENT = "environment"; + constexpr const char *FLD_MAX_INP_LEDGER_OFFSET = "max_input_ledger_offset"; + constexpr const char *FLD_CONSENSUS = "consensus"; + constexpr const char *FLD_NPL = "npl"; + constexpr const char *FLD_MODE = "mode"; constexpr const char *FLD_ROUNDTIME = "roundtime"; + constexpr const char *FLD_STAGE_SLICE = "stage_slice"; + constexpr const char *FLD_THRESHOLD = "threshold"; + constexpr const char *FLD_ROUND_LIMITS = "round_limits"; + constexpr const char *FLD_USER_INP_BYTES = "user_input_bytes"; + constexpr const char *FLD_USER_OUTP_BYTES = "user_output_bytes"; + constexpr const char *FLD_NPL_OUTP_BYTES = "npl_output_bytes"; + constexpr const char *FLD_PROC_CPU_SECS = "proc_cpu_seconds"; + constexpr const char *FLD_PROC_MEM_BYTES = "proc_mem_bytes"; + constexpr const char *FLD_PROC_OFD_COUNT = "proc_ofd_count"; constexpr const char *FLD_LOG = "log"; constexpr const char *FLD_LOG_LEVEL = "log_level"; constexpr const char *FLD_ENABLE = "enable"; diff --git a/src/version.hpp b/src/version.hpp index 779b9d7..7ef5e10 100644 --- a/src/version.hpp +++ b/src/version.hpp @@ -6,7 +6,7 @@ namespace version { // Sashimono agent version. Written to new configs. - constexpr const char *AGENT_VERSION = "0.5.8"; + constexpr const char *AGENT_VERSION = "0.5.9"; // Minimum compatible config version (this will be used to validate configs). constexpr const char *MIN_CONFIG_VERSION = "0.5.0"; diff --git a/test/evernode-cluster/index.js b/test/evernode-cluster/index.js index 27a50f2..e7e6f9d 100644 --- a/test/evernode-cluster/index.js +++ b/test/evernode-cluster/index.js @@ -76,7 +76,7 @@ class ClusterManager { const contractIdx = this.#config.contracts.findIndex(c => c.name === this.#config.selected); const contract = this.#config.contracts[contractIdx]; const totalEvers = ((contract.target_nodes_count - contract.cluster.length) * EVR_PER_MOMENT) + - ((contract.target_nodes_count - contract.cluster.filter(c => c.extended)) * (contract.target_moments_count - 1) * EVR_PER_MOMENT); + ((contract.target_nodes_count - contract.cluster.filter(c => c.extended).length) * (contract.target_moments_count - 1) * EVR_PER_MOMENT); return totalEvers + Math.ceil(totalEvers * 25 / 100); } @@ -258,7 +258,9 @@ class ClusterManager { })); const err = res.find(e => e?.error); if (err) { - throw (err.error || err); + // throw (err.error || err); + // Do not terminate the execution if there're extend errors, Because there can be extend timeouts as well. + console.error(res.filter(e => e?.error)) } }