Fix instance requirement configs when initiating sahsimono instances (#174)

This commit is contained in:
Chalith Desaman
2022-08-22 16:39:35 +05:30
committed by GitHub
parent ba7c48ff31
commit cb0dbc6ead
7 changed files with 169 additions and 12 deletions

View File

@@ -24,7 +24,7 @@ appenv = {
ACQUIRE_LEASE_WAIT_TIMEOUT_THRESHOLD: 0.4,
ORPHAN_PRUNE_SCHEDULER_INTERVAL_HOURS: 4,
SASHI_CLI_PATH: appenv.IS_DEV_MODE ? "../build/sashi" : "/usr/bin/sashi",
MB_VERSION: '0.5.8',
MB_VERSION: '0.5.9',
TOS_HASH: '757A0237B44D8B2BBB04AE2BAD5813858E0AECD2F0B217075E27E0630BA74314' // This is the sha256 hash of TOS text.
}
Object.freeze(appenv);

View File

@@ -15,6 +15,8 @@ const LeaseStatus = {
}
class MessageBoard {
#leaseUpdateLock = false; // This locking mechanism is temporary, can be removed when acquire queue is implemented
constructor(configPath, secretConfigPath, dbPath, sashiCliPath, sashiDbPath) {
this.configPath = configPath;
this.secretConfigPath = secretConfigPath;
@@ -109,6 +111,7 @@ class MessageBoard {
this.expiryList = this.expiryList.filter(x => x.expiryLedger >= e.ledger_index);
this.db.open();
await this.#acquireLeaseUpdateLock();
for (const x of expired) {
try {
console.log(`Moments exceeded (current ledger:${e.ledger_index}, expiry ledger:${x.expiryLedger}). Destroying ${x.containerName}`);
@@ -138,6 +141,7 @@ class MessageBoard {
console.error(e);
}
}
await this.#releaseLeaseUpdateLock();
this.db.close();
}
});
@@ -182,7 +186,10 @@ class MessageBoard {
const scheduler = async () => {
console.log(`Starting the scheduled prune job...`);
await this.#pruneOrphanLeases().catch(console.error);
await this.#acquireLeaseUpdateLock();
await this.#pruneOrphanLeases().catch(console.error).finally(async () => {
await this.#releaseLeaseUpdateLock();
});
console.log(`Stopped the scheduled prune job.`);
setTimeout(async () => {
await scheduler();
@@ -194,6 +201,26 @@ class MessageBoard {
}, timeout);
}
// Try to acquire the lease update lock.
async #acquireLeaseUpdateLock() {
await new Promise(async resolve => {
while (this.#leaseUpdateLock) {
await new Promise(resolveSleep => {
setTimeout(() => {
resolveSleep();
}, 1000);
})
}
resolve();
});
this.#leaseUpdateLock = true;
}
// Release the lease update lock.
async #releaseLeaseUpdateLock() {
this.#leaseUpdateLock = false;
}
async #pruneOrphanLeases() {
// Note: If this is soft deletion we need to handle the destroyed status and replace deleteLeaseRecord with changing the status.
@@ -330,6 +357,7 @@ class MessageBoard {
this.db.open();
try {
await this.#acquireLeaseUpdateLock();
if (r.host !== this.cfg.xrpl.address)
throw "Invalid host in the lease aquire.";
@@ -426,6 +454,7 @@ class MessageBoard {
await this.hostClient.acquireError(acquireRefId, tenantAddress, leaseAmount, e.content || 'invalid_acquire_lease').catch(console.error);
}
finally {
await this.#releaseLeaseUpdateLock();
this.db.close();
}
}

View File

@@ -701,11 +701,44 @@ namespace hp
if (config.contract.execute.has_value())
d["contract"]["execute"] = config.contract.execute.value();
if (config.contract.roundtime.has_value())
d["contract"]["roundtime"] = config.contract.roundtime.value();
if (config.contract.environment.has_value())
d["contract"]["environment"] = config.contract.environment.value();
if (config.contract.log.enable.has_value())
d["contract"]["log"]["enable"] = config.contract.log.enable.value();
if (config.contract.max_input_ledger_offset.has_value())
d["contract"]["max_input_ledger_offset"] = config.contract.max_input_ledger_offset.value();
if (config.contract.consensus.mode.has_value())
d["contract"]["consensus"]["mode"] = config.contract.consensus.mode.value();
if (config.contract.consensus.roundtime.has_value())
d["contract"]["consensus"]["roundtime"] = config.contract.consensus.roundtime.value();
if (config.contract.consensus.stage_slice.has_value())
d["contract"]["consensus"]["stage_slice"] = config.contract.consensus.stage_slice.value();
if (config.contract.consensus.threshold.has_value())
d["contract"]["consensus"]["threshold"] = config.contract.consensus.threshold.value();
if (config.contract.npl.mode.has_value())
d["contract"]["npl"]["mode"] = config.contract.npl.mode.value();
if (config.contract.round_limits.user_input_bytes.has_value())
d["contract"]["round_limits"]["user_input_bytes"] = config.contract.round_limits.user_input_bytes.value();
if (config.contract.round_limits.user_output_bytes.has_value())
d["contract"]["round_limits"]["user_output_bytes"] = config.contract.round_limits.user_output_bytes.value();
if (config.contract.round_limits.npl_output_bytes.has_value())
d["contract"]["round_limits"]["npl_output_bytes"] = config.contract.round_limits.npl_output_bytes.value();
if (config.contract.round_limits.proc_cpu_seconds.has_value())
d["contract"]["round_limits"]["proc_cpu_seconds"] = config.contract.round_limits.proc_cpu_seconds.value();
if (config.contract.round_limits.proc_mem_bytes.has_value())
d["contract"]["round_limits"]["proc_mem_bytes"] = config.contract.round_limits.proc_mem_bytes.value();
if (config.contract.round_limits.proc_ofd_count.has_value())
d["contract"]["round_limits"]["proc_ofd_count"] = config.contract.round_limits.proc_ofd_count.value();
if (config.contract.log.max_mbytes_per_file.has_value())
d["contract"]["log"]["max_mbytes_per_file"] = config.contract.log.max_mbytes_per_file.value();

View File

@@ -9,8 +9,8 @@ namespace msg::json
constexpr const char *SEP_COMMA_NOQUOTE = ",\"";
constexpr const char *SEP_COLON_NOQUOTE = "\":";
constexpr const char *DOUBLE_QUOTE = "\"";
constexpr uint16_t MOMENT_SIZE = 1190; // XRP ledgers per Moment.
constexpr uint16_t INSTANCE_INFO_SIZE = 488; // Size of a single instance info
constexpr uint16_t MOMENT_SIZE = 1190; // XRP ledgers per Moment.
constexpr uint16_t INSTANCE_INFO_SIZE = 488; // Size of a single instance info
/**
* Parses a json message sent by the message board.
* @param d Jsoncons document to which the parsed json should be loaded.
@@ -296,9 +296,60 @@ namespace msg::json
if (contract.contains(msg::FLD_EXECUTE))
msg.config.contract.execute = contract[msg::FLD_EXECUTE].as<bool>();
if (contract.contains(msg::FLD_ENVIRONMENT))
msg.config.contract.environment = contract[msg::FLD_ENVIRONMENT].as<std::string>();
if (contract.contains(msg::FLD_MAX_INP_LEDGER_OFFSET))
msg.config.contract.max_input_ledger_offset = contract[msg::FLD_MAX_INP_LEDGER_OFFSET].as<uint16_t>();
if (contract.contains(msg::FLD_ROUNDTIME))
msg.config.contract.roundtime = contract[msg::FLD_ROUNDTIME].as<uint32_t>();
if (contract.contains(msg::FLD_CONSENSUS))
{
const jsoncons::json &consensus = contract[msg::FLD_CONSENSUS];
if (consensus.contains(msg::FLD_MODE))
msg.config.contract.consensus.mode = consensus[msg::FLD_MODE].as<std::string>();
if (consensus.contains(msg::FLD_ROUNDTIME))
msg.config.contract.consensus.roundtime = consensus[msg::FLD_ROUNDTIME].as<uint32_t>();
if (consensus.contains(msg::FLD_STAGE_SLICE))
msg.config.contract.consensus.stage_slice = consensus[msg::FLD_STAGE_SLICE].as<uint32_t>();
if (consensus.contains(msg::FLD_THRESHOLD))
msg.config.contract.consensus.threshold = consensus[msg::FLD_THRESHOLD].as<uint16_t>();
}
if (contract.contains(msg::FLD_NPL))
{
const jsoncons::json &npl = contract[msg::FLD_NPL];
if (npl.contains(msg::FLD_MODE))
msg.config.contract.npl.mode = npl[msg::FLD_MODE].as<std::string>();
}
if (contract.contains(msg::FLD_ROUND_LIMITS))
{
const jsoncons::json &round_limits = contract[msg::FLD_ROUND_LIMITS];
if (round_limits.contains(msg::FLD_USER_INP_BYTES))
msg.config.contract.round_limits.user_input_bytes = round_limits[msg::FLD_USER_INP_BYTES].as<uint64_t>();
if (round_limits.contains(msg::FLD_USER_OUTP_BYTES))
msg.config.contract.round_limits.user_output_bytes = round_limits[msg::FLD_USER_OUTP_BYTES].as<uint64_t>();
if (round_limits.contains(msg::FLD_NPL_OUTP_BYTES))
msg.config.contract.round_limits.npl_output_bytes = round_limits[msg::FLD_NPL_OUTP_BYTES].as<uint64_t>();
if (round_limits.contains(msg::FLD_PROC_CPU_SECS))
msg.config.contract.round_limits.proc_cpu_seconds = round_limits[msg::FLD_PROC_CPU_SECS].as<uint64_t>();
if (round_limits.contains(msg::FLD_PROC_MEM_BYTES))
msg.config.contract.round_limits.proc_mem_bytes = round_limits[msg::FLD_PROC_MEM_BYTES].as<uint64_t>();
if (round_limits.contains(msg::FLD_PROC_OFD_COUNT))
msg.config.contract.round_limits.proc_ofd_count = round_limits[msg::FLD_PROC_OFD_COUNT].as<uint64_t>();
}
if (contract.contains(msg::FLD_LOG))
{
const jsoncons::json &log = contract[msg::FLD_LOG];

View File

@@ -35,12 +35,40 @@ namespace msg
std::optional<size_t> max_file_count;
};
struct consensus_config
{
std::optional<std::string> mode;
std::optional<uint32_t> roundtime;
std::optional<uint32_t> stage_slice;
std::optional<uint16_t> threshold;
};
struct npl_config
{
std::optional<std::string> mode;
};
struct round_limits_config
{
std::optional<size_t> user_input_bytes;
std::optional<size_t> user_output_bytes;
std::optional<size_t> npl_output_bytes;
std::optional<size_t> proc_cpu_seconds;
std::optional<size_t> proc_mem_bytes;
std::optional<size_t> proc_ofd_count;
};
struct contract_config
{
std::optional<uint32_t> roundtime;
std::optional<uint32_t> roundtime;
std::set<std::string> unl;
std::optional<bool> execute;
std::optional<std::string> environment;
std::optional<uint16_t> max_input_ledger_offset;
c_log_config log;
consensus_config consensus;
npl_config npl;
round_limits_config round_limits;
};
struct peer_discovery_config
@@ -147,7 +175,21 @@ namespace msg
constexpr const char *FLD_MESH = "mesh";
constexpr const char *FLD_USER = "user";
constexpr const char *FLD_EXECUTE = "execute";
constexpr const char *FLD_ENVIRONMENT = "environment";
constexpr const char *FLD_MAX_INP_LEDGER_OFFSET = "max_input_ledger_offset";
constexpr const char *FLD_CONSENSUS = "consensus";
constexpr const char *FLD_NPL = "npl";
constexpr const char *FLD_MODE = "mode";
constexpr const char *FLD_ROUNDTIME = "roundtime";
constexpr const char *FLD_STAGE_SLICE = "stage_slice";
constexpr const char *FLD_THRESHOLD = "threshold";
constexpr const char *FLD_ROUND_LIMITS = "round_limits";
constexpr const char *FLD_USER_INP_BYTES = "user_input_bytes";
constexpr const char *FLD_USER_OUTP_BYTES = "user_output_bytes";
constexpr const char *FLD_NPL_OUTP_BYTES = "npl_output_bytes";
constexpr const char *FLD_PROC_CPU_SECS = "proc_cpu_seconds";
constexpr const char *FLD_PROC_MEM_BYTES = "proc_mem_bytes";
constexpr const char *FLD_PROC_OFD_COUNT = "proc_ofd_count";
constexpr const char *FLD_LOG = "log";
constexpr const char *FLD_LOG_LEVEL = "log_level";
constexpr const char *FLD_ENABLE = "enable";

View File

@@ -6,7 +6,7 @@
namespace version
{
// Sashimono agent version. Written to new configs.
constexpr const char *AGENT_VERSION = "0.5.8";
constexpr const char *AGENT_VERSION = "0.5.9";
// Minimum compatible config version (this will be used to validate configs).
constexpr const char *MIN_CONFIG_VERSION = "0.5.0";

View File

@@ -76,7 +76,7 @@ class ClusterManager {
const contractIdx = this.#config.contracts.findIndex(c => c.name === this.#config.selected);
const contract = this.#config.contracts[contractIdx];
const totalEvers = ((contract.target_nodes_count - contract.cluster.length) * EVR_PER_MOMENT) +
((contract.target_nodes_count - contract.cluster.filter(c => c.extended)) * (contract.target_moments_count - 1) * EVR_PER_MOMENT);
((contract.target_nodes_count - contract.cluster.filter(c => c.extended).length) * (contract.target_moments_count - 1) * EVR_PER_MOMENT);
return totalEvers + Math.ceil(totalEvers * 25 / 100);
}
@@ -258,7 +258,9 @@ class ClusterManager {
}));
const err = res.find(e => e?.error);
if (err) {
throw (err.error || err);
// throw (err.error || err);
// Do not terminate the execution if there're extend errors, Because there can be extend timeouts as well.
console.error(res.filter(e => e?.error))
}
}