Merge branch 'main' into beta1

This commit is contained in:
chalith
2022-10-20 11:15:16 +05:30
12 changed files with 212 additions and 208 deletions

2
dependencies/hp.cfg vendored
View File

@@ -23,7 +23,7 @@
"unl": [],
"bin_path": "",
"bin_args": "",
"environment": "",
"environment": {},
"max_input_ledger_offset": 10,
"consensus": {
"mode": "public",

View File

@@ -60,8 +60,8 @@ namespace conf
cfg.version = version::AGENT_VERSION;
cfg.hp.host_address = host_addr.empty() ? "127.0.0.1" : std::string(host_addr);
cfg.hp.init_peer_port = init_peer_port;
cfg.hp.init_user_port = init_user_port;
cfg.hp.init_peer_port = !init_peer_port ? 22861 : init_peer_port;
cfg.hp.init_user_port = !init_user_port ? 26201 : init_user_port;
cfg.system.max_instance_count = !inst_count ? 3 : inst_count;
cfg.system.max_mem_kbytes = !ram_kbytes ? 1048576 : ram_kbytes;

View File

@@ -701,8 +701,8 @@ namespace hp
if (config.contract.execute.has_value())
d["contract"]["execute"] = config.contract.execute.value();
if (config.contract.environment.has_value())
d["contract"]["environment"] = config.contract.environment.value();
if (!config.contract.environment.empty())
d["contract"]["environment"] = config.contract.environment;
if (config.contract.max_input_ledger_offset.has_value())
d["contract"]["max_input_ledger_offset"] = config.contract.max_input_ledger_offset.value();

View File

@@ -132,7 +132,7 @@ int main(int argc, char **argv)
// This will create a new config.
const std::string host_addr = (argc >= 4) ? argv[3] : "";
uint16_t init_peer_port, init_user_port, docker_registry_port = 0;
uint16_t init_peer_port = 0, init_user_port = 0, docker_registry_port = 0;
size_t inst_count = 0, cpu_us = 0, ram_kbytes = 0, swap_kbytes = 0, disk_kbytes = 0;
if (((argc >= 5) && util::stoul(argv[4], init_peer_port) != 0) ||

View File

@@ -297,7 +297,24 @@ namespace msg::json
msg.config.contract.execute = contract[msg::FLD_EXECUTE].as<bool>();
if (contract.contains(msg::FLD_ENVIRONMENT))
msg.config.contract.environment = contract[msg::FLD_ENVIRONMENT].as<std::string>();
{
if (!contract[msg::FLD_ENVIRONMENT].is_object())
{
LOG_ERROR << "Invalid environment variable value.";
return -1;
}
for (const auto &obj : contract[msg::FLD_ENVIRONMENT].object_range())
{
// Environment variable values should be strings.
if (!obj.value().is_string())
{
LOG_ERROR << obj.key() << " environment variable should be a string.";
return -1;
}
msg.config.contract.environment.emplace(obj.key(), obj.value().as<std::string>());
}
}
if (contract.contains(msg::FLD_MAX_INP_LEDGER_OFFSET))
msg.config.contract.max_input_ledger_offset = contract[msg::FLD_MAX_INP_LEDGER_OFFSET].as<uint16_t>();

View File

@@ -63,7 +63,7 @@ namespace msg
std::optional<uint32_t> roundtime;
std::set<std::string> unl;
std::optional<bool> execute;
std::optional<std::string> environment;
std::map<std::string, std::string> environment;
std::optional<uint16_t> max_input_ledger_offset;
c_log_config log;
consensus_config consensus;

View File

@@ -63,18 +63,19 @@ class ContractInstanceManager {
return new Promise(async (resolve, reject) => {
const uploadTimer = setTimeout(() => {
reject("Upload timeout.");
clearTimeout(uploadTimer);
hpc.clear(HotPocket.events.contractOutput);
reject("Upload timeout.");
}, uploadTimeout || UPLOAD_TIMEOUT);
const failure = (e) => {
clearTimeout(uploadTimer);
reject(e);
hpc.clear(HotPocket.events.contractOutput);
reject(e);
}
const success = () => {
console.log("Upload complete");
clearTimeout(uploadTimer);
console.log("Upload complete");
resolve();
}

View File

@@ -79,9 +79,9 @@ class EvernodeService {
}
async extendLease(hostAddress, instanceName, moments) {
const client = this.#tenantClient;
const tenant = this.#tenantClient;
console.log(`Extending lease ${instanceName} of host ${hostAddress} by ${moments} Moments.`);
const result = await client.extendLease(hostAddress, moments, instanceName);
const result = await tenant.extendLease(hostAddress, moments, instanceName);
console.log(`Instance ${instanceName} expiry set to ${result.expiryMoment}`);
return result;
}

View File

@@ -7,6 +7,9 @@ const HotPocket = require('hotpocket-js-client');
const CONFIG_FILE = "config.json";
const EVR_PER_MOMENT = 2;
const MAX_MEMO_PEER_LIMIT = 10;
const FAIL_THRESHOLD = 1;
const DEF_TIMEOUT = 60000;
const CLUSTER_CHUNK_RATIO = 0.2;
async function sleep(ms) {
await new Promise(resolve => {
@@ -19,6 +22,9 @@ async function sleep(ms) {
class ClusterManager {
#config = {};
#evernodeService = null;
#contractIdx;
#instanceCount;
#hosts;
async #readConfig() {
let isConfigExists;
@@ -43,7 +49,6 @@ class ClusterManager {
"config": {},
"target_nodes_count": 1,
"target_moments_count": 1,
"parallel_mode": false,
"cluster": []
}
],
@@ -54,7 +59,8 @@ class ClusterManager {
"tenant_address": "",
"tenant_secret": "",
"primary_host_address": "",
"blacklist_hosts": []
"blacklist_hosts": [],
"preferred_hosts": []
}
};
@@ -87,6 +93,12 @@ class ClusterManager {
const fundAmount = this.#getFundAmount();
await this.#evernodeService.init();
await this.#evernodeService.prepareAccounts(fundAmount);
this.#contractIdx = this.#config.contracts.findIndex(c => c.name === this.#config.selected);
this.#instanceCount = this.#config.contracts[this.#contractIdx]?.cluster?.length || 0;
this.#hosts = (await this.#evernodeService.getHosts()).filter(h =>
!this.#config.accounts.blacklist_hosts.includes(h.address) &&
(!this.#config.accounts.preferred_hosts || !this.#config.accounts.preferred_hosts.length || this.#config.accounts.preferred_hosts.includes(h.address)))
.sort(() => Math.random() - 0.5);
}
async terminate() {
@@ -94,234 +106,201 @@ class ClusterManager {
await this.#writeConfig();
}
async #createNode(ctx, ownerPubKeyHex, weakCluster = false) {
if (!ctx.hosts || !ctx.hosts.length)
throw { exitCode: 1, message: "All the hosts are occupied." };
const primaryHostAddress = this.#config.accounts.primary_host_address;
const contract = this.#config.contracts[ctx.contractIdx];
let randomIndex = 0;
if ((!ctx.existingCount || ctx.existingCount === 0) && ctx.createdInstanceCount === 0 && primaryHostAddress) {
randomIndex = ctx.hosts.findIndex(h => h.address === primaryHostAddress);
if (randomIndex < 0)
throw { exitCode: 1, message: `Host ${primaryHostAddress} not found` };
}
else
randomIndex = Math.floor(Math.random() * ctx.hosts.length);
const host = ctx.hosts[randomIndex];
async #createNode(nodeNumber, hostIndex, contract, ownerPubKeyHex, config) {
const host = this.#hosts[hostIndex];
if (host.activeInstances == host.maxInstances)
throw { exitCode: 0, message: `All the contract slots in ${host.address} are occupied.` };
throw { message: `Choosen host ${host.address} is occupied.`, innerException: `OCCUPIED` };
// Wait until acquire completes.
console.log(`Waiting until ${host.address} is available.`);
while (ctx.hosts[randomIndex].acquiring)
while (this.#hosts[hostIndex].acquiring)
await sleep(1000);
ctx.hosts[randomIndex].acquiring = true;
this.#hosts[hostIndex].acquiring = true;
ctx.createdInstanceCount++;
const nodeNumber = (ctx.existingCount || 0) + ctx.createdInstanceCount;
console.log(`Creating node ${nodeNumber} in ${host.address}`);
let instance;
try {
let config = JSON.parse(JSON.stringify(contract.config));
if (!config.contract) {
config.contract = {}
}
if (contract.cluster.length == 0)
delete config.contract['unl'];
else
config.contract.unl = [contract.cluster[0].pubkey];
if (!config.mesh) {
config.mesh = {}
}
const cluster = [...contract.cluster];
// If cluster length is > MAX_MEMO_PEER_LIMIT pick MAX_MEMO_PEER_LIMIT random peers to limit the memo size.
if (cluster.length > MAX_MEMO_PEER_LIMIT) {
config.mesh.known_peers = cluster.sort(() => Math.random() - 0.5).slice(0, MAX_MEMO_PEER_LIMIT).map(n => `${n.ip}:${n.peer_port}`);
}
else {
config.mesh.known_peers = cluster.map(n => `${n.ip}:${n.peer_port}`);
}
if (weakCluster)
config.mesh.msg_forwarding = true;
instance = await this.#evernodeService.acquireLease(host, contract.contract_id, contract.docker_image, ownerPubKeyHex, config);
const instance = await this.#evernodeService.acquireLease(host, contract.contract_id, contract.docker_image, ownerPubKeyHex, config)
if (!instance)
throw { exitCode: 0, message: 'Error while creating the intance.' };
throw 'INST_CREATE_ERR'
console.log(`Created node ${nodeNumber} in ${host.address}`);
this.#instanceCount++;
this.#config.contracts[this.#contractIdx].cluster.push({ host: host.address, ...instance });
this.#hosts[hostIndex].activeInstances++;
this.#hosts[hostIndex].acquiring = false;
return instance;
}
catch (e) {
if (e.reason === 'TRANSACTION_FAILURE' && e.content?.code === 'tecINSUFFICIENT_FUNDS')
throw { exitCode: 1, message: e };
console.error(`Node ${nodeNumber} creation in ${host.address} failed.`, e);
ctx.createdInstanceCount--;
ctx.hosts[randomIndex].acquiring = false;
throw e;
this.#hosts[hostIndex].acquiring = false;
throw { message: `Error while creating the node ${nodeNumber} in ${host.address}.`, innerException: e };
}
this.#config.contracts[ctx.contractIdx].cluster.push({ host: host.address, ...instance });
ctx.hosts[randomIndex].activeInstances++;
ctx.hosts[randomIndex].acquiring = false;
console.log(`Created node ${nodeNumber} in ${host.address}`);
}
async #createCluster(contractIdx, ownerKeys, targetNodesCount, parallel = false) {
if (contractIdx < 0)
throw `Contract ${this.#config.selected} is invalid.`
async #createPrimaryNode(ownerPubKeyHex) {
const contract = this.#config.contracts[this.#contractIdx];
const primaryHostAddress = this.#config.accounts.primary_host_address;
let ctx = {
hosts: await this.#evernodeService.getHosts(),
createdInstanceCount: 0,
contractIdx: contractIdx,
existingCount: this.#config.contracts[contractIdx].cluster.length
let config = JSON.parse(JSON.stringify(contract.config));
if (config.contract)
delete config.contract['unl'];
if (!config.mesh)
config.mesh = {}
config.mesh.msg_forwarding = true;
let hostIndex = 0;
if ((!this.#instanceCount || this.#instanceCount === 0) && primaryHostAddress) {
hostIndex = this.#hosts.findIndex(h => h.address === primaryHostAddress);
if (hostIndex < 0)
throw { exitCode: 1, message: `Host ${primaryHostAddress} not found` };
}
await this.#createNode(1, hostIndex, contract, ownerPubKeyHex, config);
}
if (this.#config.accounts.blacklist_hosts && this.#config.accounts.blacklist_hosts.length > 0)
ctx.hosts = ctx.hosts.filter(h => !this.#config.accounts.blacklist_hosts.includes(h.address))
async #createNodes(count, ownerPubKeyHex) {
const contract = this.#config.contracts[this.#contractIdx];
const ownerPubKeyHex = Buffer.from(ownerKeys.publicKey).toString('hex');
let config = JSON.parse(JSON.stringify(contract.config));
if (!parallel) {
while (ctx.createdInstanceCount < targetNodesCount) {
try {
await this.#createNode(ctx, ownerPubKeyHex, targetNodesCount > MAX_MEMO_PEER_LIMIT);
}
catch (e) {
if (e.exitCode && e.exitCode === 1)
throw e.message || e;
else
console.error(e.message || e)
}
}
if (!config.contract) {
config.contract = {}
}
config.contract.unl = [contract.cluster[0].pubkey];
if (!config.mesh) {
config.mesh = {}
}
const cluster = [...contract.cluster];
// If cluster length is > MAX_MEMO_PEER_LIMIT pick MAX_MEMO_PEER_LIMIT random peers to limit the memo size.
if (cluster.length > MAX_MEMO_PEER_LIMIT) {
config.mesh.known_peers = cluster.sort(() => Math.random() - 0.5).slice(0, MAX_MEMO_PEER_LIMIT).map(n => `${n.ip}:${n.peer_port}`);
}
else {
const createNodes = async (nodesCount) => {
const res = await Promise.all([...Array(nodesCount).keys()].map(async (v, i) => {
await sleep(1000 * i);
try {
return await this.#createNode(ctx, ownerPubKeyHex, true);
}
catch (e) {
return e
}
}));
const err = res.find(e => e?.exitCode && e?.exitCode === 1);
if (err) {
throw (err.message || err);
}
};
if (this.#config.accounts.primary_host_address && (!ctx.existingCount || ctx.existingCount === 0) && ctx.createdInstanceCount === 0) {
try {
await this.#createNode(ctx, ownerPubKeyHex, true);
}
catch (e) {
throw { message: `Instance creation on primary host ${this.#config.accounts.primary_host_address} failed.`, content: e.message || e };
}
}
while (ctx.createdInstanceCount < targetNodesCount) {
const count = targetNodesCount - ctx.createdInstanceCount;
try {
await createNodes(count > 2 ? Math.ceil(count / 2) : count);
if (targetNodesCount - ctx.createdInstanceCount > 0)
await createNodes(targetNodesCount - ctx.createdInstanceCount);
}
catch (e) {
throw e;
}
}
config.mesh.known_peers = cluster.map(n => `${n.ip}:${n.peer_port}`);
}
config.mesh.msg_forwarding = true;
const promises = [...Array(count).keys()].map(async (v, i) => {
await sleep(1000 * i);
const nodeNumber = this.#instanceCount + i + 1;
let hostIndex = (nodeNumber - 1) % this.#hosts.length;
if (this.#hosts[hostIndex].failcount > FAIL_THRESHOLD) {
hostIndex = Math.floor(Math.random() * this.#hosts.length);
}
await this.#createNode(nodeNumber, hostIndex, contract, ownerPubKeyHex, config).catch(e => {
if (!this.#hosts[hostIndex].failcount)
this.#hosts[hostIndex].failcount = 1;
else
this.#hosts[hostIndex].failcount++;
console.error(e);
});
});
await Promise.all(promises);
}
async #extendCluster(contractIdx) {
const contract = this.#config.contracts[contractIdx];
const res = await Promise.all(contract.cluster.map(async (c, i) => {
if (!c.extended) {
await sleep(1000 * i);
async #createCluster(ownerPubKeyHex) {
const contract = this.#config.contracts[this.#contractIdx];
let targetCount = contract.target_nodes_count - this.#instanceCount;
if (targetCount > 0) {
console.log(`Creating ${targetCount} nodes...`);
// Create primary node.
try {
await this.#createPrimaryNode(ownerPubKeyHex);
targetCount--;
}
catch (e) {
throw { message: 'Error while creating the primary node.', innerException: e };
}
const clusterChunkSize = Math.ceil(targetCount * CLUSTER_CHUNK_RATIO)
while (targetCount > 0) {
try {
const result = await this.#evernodeService.extendLease(c.host, c.name, contract.target_moments_count - 1);
if (!result)
return { error: `Error while extending the intance ${c.name} in ${c.host}.` };
this.#config.contracts[contractIdx].cluster[i].extended = true;
return result;
const curTarget = clusterChunkSize < targetCount ? clusterChunkSize : targetCount;
await this.#createNodes(curTarget, ownerPubKeyHex);
targetCount = contract.target_nodes_count - this.#instanceCount;
}
catch (e) {
return { error: e };
console.error(e);
}
}
}));
const err = res.find(e => e?.error);
if (err) {
// throw (err.error || err);
// Do not terminate the execution if there're extend errors, Because there can be extend timeouts as well.
console.error(res.filter(e => e?.error))
return true;
}
return false;
}
async #extendCluster() {
const contract = this.#config.contracts[this.#contractIdx];
if (contract.target_moments_count > 1 && contract.cluster.findIndex(c => !c.extended) >= 0) {
console.log('Extending the cluster...');
const promises = contract.cluster.map(async (c, i) => {
if (!contract.cluster[i].extended) {
try {
await sleep(2000 * i);
const result = await this.#evernodeService.extendLease(c.host, c.name, contract.target_moments_count - 1);
if (!result)
throw 'INST_EXTEND_ERR';
this.#config.contracts[this.#contractIdx].cluster[i].extended = true;
return result;
}
catch (e) {
console.error({ message: `Error while extending the node ${i + 1} in ${c.host}.`, innerException: e });
}
}
})
await Promise.all(promises);
}
}
async deploy() {
const contractIdx = this.#config.contracts.findIndex(c => c.name === this.#config.selected);
const contract = this.#config.contracts[contractIdx];
let contract = this.#config.contracts[this.#contractIdx];
const ownerKeys = await HotPocket.generateKeys(contract.owner_privatekey);
const ownerPubKeyHex = Buffer.from(ownerKeys.publicKey).toString('hex');
const targetCount = this.#config.contracts[contractIdx].target_nodes_count - (contract?.cluster?.length || 0);
if (targetCount > 0) {
console.log(`Creating ${targetCount} nodes...`);
try {
await this.#createCluster(contractIdx,
ownerKeys,
targetCount,
contract.parallel_mode);
await this.#writeConfig();
}
catch (e) {
await this.#writeConfig();
console.error(e);
console.error(`Cluster create failed.`);
return;
}
console.log('Waiting 15 seconds until nodes are synced...');
await sleep(15000);
}
if (!this.#config.contracts[contractIdx].cluster || !this.#config.contracts[contractIdx].cluster.length) {
console.error(`Contract ${contract.name} cluster is empty.`);
return;
}
if (contract.target_moments_count > 1 && this.#config.contracts[contractIdx].cluster.findIndex(c => !c.extended) >= 0) {
console.log('Extending the cluster...');
try {
await this.#extendCluster(contractIdx);
await this.#writeConfig();
}
catch (e) {
await this.#writeConfig();
console.error(e);
console.error(`Cluster extend failed.`);
return;
try {
if (await this.#createCluster(ownerPubKeyHex)) {
console.log('Waiting 15 seconds until nodes are synced...');
await sleep(15000);
}
}
catch (e) {
await this.#writeConfig();
throw e;
}
const instance = this.#config.contracts[contractIdx].cluster[0];
try {
await this.#extendCluster();
}
catch (e) {
await this.#writeConfig();
throw e;
}
await this.#writeConfig();
contract = this.#config.contracts[this.#contractIdx];
const instance = contract.cluster[0];
const instanceMgr = new ContractInstanceManager(ownerKeys, instance.pubkey, instance.ip, instance.user_port, instance.contractId, contract.bundle_path);
console.log('Deploying the contract...');
try {
await instanceMgr.deployContract({
unl: this.#config.contracts[contractIdx].cluster.map(n => n.pubkey)
}, 60000);
unl: contract.cluster.map(n => n.pubkey)
}, DEF_TIMEOUT);
}
catch (e) {
console.error(`Contract ${contract.name} deployment failed with.`, e);
return;
throw { message: `Contract ${contract.name} deployment failed with.`, innerException: e };
}
console.log('Successfully deployed the contract...');
@@ -330,8 +309,15 @@ class ClusterManager {
async function main() {
const clusterMgr = new ClusterManager();
await clusterMgr.init();
await clusterMgr.deploy();
try {
await clusterMgr.init();
await clusterMgr.deploy();
}
catch (e) {
await clusterMgr.terminate();
throw e;
}
await clusterMgr.terminate();
}

View File

@@ -10,7 +10,7 @@
"license": "ISC",
"dependencies": {
"bson": "4.6.5",
"evernode-js-client": "0.4.50",
"evernode-js-client": "0.5.2",
"hotpocket-js-client": "0.5.4"
}
},
@@ -344,9 +344,9 @@
"integrity": "sha512-MEl9uirslVwqQU369iHNWZXsI8yaZYGg/D65aOgZkeyFJwHYSxilf7rQzXKI7DdDuBPrBXbfk3sl9hJhmd5AUw=="
},
"node_modules/evernode-js-client": {
"version": "0.4.50",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.4.50.tgz",
"integrity": "sha512-x/KmLMaquALOHGUtbOIRwwi1gPTeK6l0lQinSswkiQ8iRyOM28oR64vM1oHp48vxt8o9iz0ALbwR+4zzXdNzXw==",
"version": "0.5.2",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.5.2.tgz",
"integrity": "sha512-xehOnUaJBZehN5DzJjCZi025v2EPP54T1EBdJbSgM28vJkWaT9Qymr90Uc09goYk6kmPU3V1zCn7aq/att+hnQ==",
"dependencies": {
"elliptic": "6.5.4",
"ripple-address-codec": "4.2.0",
@@ -1567,9 +1567,9 @@
"integrity": "sha512-MEl9uirslVwqQU369iHNWZXsI8yaZYGg/D65aOgZkeyFJwHYSxilf7rQzXKI7DdDuBPrBXbfk3sl9hJhmd5AUw=="
},
"evernode-js-client": {
"version": "0.4.50",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.4.50.tgz",
"integrity": "sha512-x/KmLMaquALOHGUtbOIRwwi1gPTeK6l0lQinSswkiQ8iRyOM28oR64vM1oHp48vxt8o9iz0ALbwR+4zzXdNzXw==",
"version": "0.5.2",
"resolved": "https://registry.npmjs.org/evernode-js-client/-/evernode-js-client-0.5.2.tgz",
"integrity": "sha512-xehOnUaJBZehN5DzJjCZi025v2EPP54T1EBdJbSgM28vJkWaT9Qymr90Uc09goYk6kmPU3V1zCn7aq/att+hnQ==",
"requires": {
"elliptic": "6.5.4",
"ripple-address-codec": "4.2.0",

View File

@@ -10,7 +10,7 @@
"license": "ISC",
"dependencies": {
"bson": "4.6.5",
"evernode-js-client": "0.4.50",
"evernode-js-client": "0.5.2",
"hotpocket-js-client": "0.5.4"
}
}