Configs to support full history mode. (#252)

* Adding full history mode and max shard configs for primary and blob shards.

* Commenting validation checks for hpfs responses until bug is fixed.

* Comment update.
This commit is contained in:
Savinda Senevirathne
2021-02-19 14:45:25 +05:30
committed by GitHub
parent 6b8d60a404
commit a775b0e419
8 changed files with 109 additions and 46 deletions

View File

@@ -22,6 +22,8 @@ namespace conf
constexpr const char *ROLE_OBSERVER = "observer";
constexpr const char *ROLE_VALIDATOR = "validator";
constexpr const char *HISTORY_FULL = "full";
constexpr const char *HISTORY_CUSTOM = "custom";
constexpr const char *PUBLIC = "public";
constexpr const char *PRIVATE = "private";
@@ -134,8 +136,9 @@ namespace conf
cfg.hp_version = util::HP_VERSION;
cfg.node.role = startup_role = ROLE::VALIDATOR;
cfg.node.full_history = false;
cfg.node.max_shards = 4;
cfg.node.history = HISTORY::CUSTOM;
cfg.node.history_config.max_primary_shards = 1;
cfg.node.history_config.max_blob_shards = 1;
cfg.contract.id = crypto::generate_uuid();
cfg.contract.execute = true;
@@ -324,7 +327,35 @@ namespace conf
}
startup_role = cfg.node.role;
cfg.node.max_shards = node["max_shards"].as<uint64_t>();
if (node["history"] == HISTORY_FULL)
cfg.node.history = HISTORY::FULL;
else if (node["history"] == HISTORY_CUSTOM)
cfg.node.history = HISTORY::CUSTOM;
else
{
std::cerr << "Invalid history mode. 'full' or 'custom' expected.\n";
return -1;
}
cfg.node.history_config.max_primary_shards = node["history_config"]["max_primary_shards"].as<uint64_t>();
cfg.node.history_config.max_blob_shards = node["history_config"]["max_blob_shards"].as<uint64_t>();
// Max shards cannot be zero for primary and blob shards if the history mode is custom.
// In history = full, these configs are not used.
if (cfg.node.history == HISTORY::CUSTOM)
{
if (cfg.node.history_config.max_primary_shards == 0)
{
std::cerr << "'max_primary_shards' cannot be zero in history=custom mode.\n";
return -1;
}
if (cfg.node.history_config.max_blob_shards == 0)
{
std::cerr << "'max_blob_shards' cannot be zero in history=custom mode.\n";
return -1;
}
}
}
catch (const std::exception &e)
{
@@ -484,7 +515,13 @@ namespace conf
node_config.insert_or_assign("private_key", cfg.node.private_key_hex);
// We always save the startup role to config. Not the current role which might get changed dynamically during syncing.
node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR);
node_config.insert_or_assign("max_shards", cfg.node.max_shards);
node_config.insert_or_assign("history", cfg.node.history == HISTORY::FULL ? HISTORY_FULL : HISTORY_CUSTOM);
jsoncons::ojson history_config;
history_config.insert_or_assign("max_primary_shards", cfg.node.history_config.max_primary_shards);
history_config.insert_or_assign("max_blob_shards", cfg.node.history_config.max_blob_shards);
node_config.insert_or_assign("history_config", history_config);
d.insert_or_assign("node", node_config);
}

View File

@@ -45,6 +45,20 @@ namespace conf
VALIDATOR = 1 // Consensus participant mode.
};
// History modes of the node.
enum HISTORY
{
FULL, // Full history mode.
CUSTOM
};
// Max number of shards to keep for primary and blob shards.
struct history_configuration
{
uint64_t max_primary_shards; // Maximum number of shards for primary shards.
uint64_t max_blob_shards; // Maximum number of shards for blob shards.
};
// Log severity levels used in Hot Pocket.
enum LOG_SEVERITY
{
@@ -71,10 +85,10 @@ namespace conf
ROLE role = ROLE::OBSERVER; // Configured startup role of the contract (Observer/validator).
bool is_unl = false; // Indicate whether we are a unl node or not.
std::string public_key_hex; // Contract hex public key
std::string private_key_hex; // Contract hex private key
bool full_history = false; // Whether full history mode is on/off.
uint64_t max_shards = 0; // Maximum number of shards to store.
std::string public_key_hex; // Contract hex public key
std::string private_key_hex; // Contract hex private key
HISTORY history; // Node is a full history node if history=full.
history_configuration history_config; // Holds history config values. Only applicable if history=custom.
};
struct appbill_config

View File

@@ -271,12 +271,13 @@ namespace hpfs
std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> peer_fs_entry_map;
msg::fbuf::p2pmsg::flatbuf_hpfsfshashentry_to_hpfsfshashentry(peer_fs_entry_map, fs_resp->entries());
// Commented for now. Need to change the way the hash is calculated once the flatbuffer re-architecture finishes.
// Validate received fs data against the hash.
if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch.";
continue;
}
// if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map))
// {
// LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch.";
// continue;
// }
handle_fs_entry_response(vpath, peer_fs_entry_map);
}
@@ -288,12 +289,13 @@ namespace hpfs
const util::h32 *peer_hashes = reinterpret_cast<const util::h32 *>(file_resp->hash_map()->data());
const size_t peer_hash_count = file_resp->hash_map()->size() / sizeof(util::h32);
// Commented for now. Need to change the way the hash is calculated once the flatbuffer re-architecture finishes.
// Validate received hashmap against the hash.
if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch.";
continue;
}
// if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count))
// {
// LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch.";
// continue;
// }
handle_file_hashmap_response(vpath, peer_hashes, peer_hash_count, file_resp->file_length());
}
@@ -305,12 +307,13 @@ namespace hpfs
const uint32_t block_id = block_resp->block_id();
std::string_view buf = msg::fbuf::flatbuff_bytes_to_sv(block_resp->data());
// Commented for now. Need to change the way the hash is calculated once the flatbuffer re-architecture finishes.
// Validate received block data against the hash.
if (!validate_file_block_hash(hash, block_id, buf))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch.";
continue;
}
// if (!validate_file_block_hash(hash, block_id, buf))
// {
// LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch.";
// continue;
// }
handle_file_block_response(vpath, block_id, buf);
}

View File

@@ -30,7 +30,8 @@ namespace ledger
*/
int init()
{
if (ledger_fs.init(LEDGER_FS_ID, conf::ctx.ledger_hpfs_dir, conf::ctx.ledger_hpfs_mount_dir, conf::ctx.ledger_hpfs_rw_dir, conf::cfg.node.full_history) == -1)
// Full history status is always set to false since this is ledger fs. Historical checkpoints are not required in ledger fs even in full history mode.
if (ledger_fs.init(LEDGER_FS_ID, conf::ctx.ledger_hpfs_dir, conf::ctx.ledger_hpfs_mount_dir, conf::ctx.ledger_hpfs_rw_dir, false) == -1)
{
LOG_ERROR << "Ledger file system initialization failed.";
return -1;
@@ -165,9 +166,9 @@ namespace ledger
ctx.set_last_primary_shard_id(p2p::sequence_hash{primary_shard_seq_no, last_primary_shard_hash});
//Remove old shards that exceeds max shard range.
if (conf::cfg.node.max_shards > 0 && primary_shard_seq_no >= conf::cfg.node.max_shards)
if (conf::cfg.node.history == conf::HISTORY::CUSTOM && primary_shard_seq_no >= conf::cfg.node.history_config.max_primary_shards)
{
remove_old_shards(primary_shard_seq_no - conf::cfg.node.max_shards + 1, PRIMARY_DIR);
remove_old_shards(primary_shard_seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR);
}
sqlite::close_db(&db);
@@ -252,8 +253,8 @@ namespace ledger
*/
void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir)
{
// Remove old shards if full history mode is not enabled.
if (!conf::cfg.node.full_history)
// Remove old shards if this is not a full history node.
if (conf::cfg.node.history == conf::HISTORY::CUSTOM)
{
const std::string shard_dir_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_parent_dir);
std::list<std::string> shards = util::fetch_dir_entries(shard_dir_path);
@@ -401,9 +402,9 @@ namespace ledger
ctx.set_last_blob_shard_id(p2p::sequence_hash{last_blob_shard_seq_no, last_shard_hash});
//Remove old shards that exceeds max shard range.
if (last_blob_shard_seq_no >= MAX_BLOB_SHARDS)
if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_blob_shard_seq_no >= conf::cfg.node.history_config.max_blob_shards)
{
remove_old_shards(last_blob_shard_seq_no - MAX_BLOB_SHARDS + 1, BLOB_DIR);
remove_old_shards(last_blob_shard_seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR);
}
return 0;
@@ -498,5 +499,4 @@ namespace ledger
}
return 0;
}
} // namespace ledger

View File

@@ -10,10 +10,9 @@
namespace ledger
{
constexpr const char *DATEBASE = "ledger.sqlite";
constexpr uint64_t PRIMARY_SHARD_SIZE = 4096;
constexpr uint64_t PRIMARY_SHARD_SIZE = 262144; // 2^18 ledgers per shard.
constexpr uint64_t BLOB_SHARD_SIZE = 4096;
constexpr int FILE_PERMS = 0644;
constexpr uint64_t MAX_BLOB_SHARDS = 4;
struct ledger_context
{

View File

@@ -57,8 +57,8 @@ namespace ledger
ctx.set_last_primary_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash});
}
if (conf::cfg.node.max_shards == 0 || // Sync all shards if this is a full history node.
last_primary_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.max_shards)
if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node.
last_primary_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_primary_shards)
{
// Check whether the hash of the previous shard matches with the hash in the prev_shard.hash file.
const std::string prev_shard_vpath = std::string(PRIMARY_DIR).append("/").append(std::to_string(--synced_shard_seq_no));
@@ -71,16 +71,16 @@ namespace ledger
const std::string shard_path = std::string(PRIMARY_DIR).append("/").append(std::to_string(synced_shard_seq_no));
set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
}
else if (conf::cfg.node.max_shards != 0 && last_primary_shard_seq_no >= conf::cfg.node.max_shards)
else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_primary_shard_seq_no >= conf::cfg.node.history_config.max_primary_shards)
{
// When there are no more shards to sync, Remove old shards that exceeds max shard range.
remove_old_shards(last_primary_shard_seq_no - conf::cfg.node.max_shards + 1, PRIMARY_DIR);
remove_old_shards(last_primary_shard_seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR);
}
}
else if (last_primary_shard_seq_no >= conf::cfg.node.max_shards)
else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_primary_shard_seq_no >= conf::cfg.node.history_config.max_primary_shards)
{
// When there are no more shards to sync, Remove old shards that exceeds max shard range.
remove_old_shards(last_primary_shard_seq_no - conf::cfg.node.max_shards + 1, PRIMARY_DIR);
remove_old_shards(last_primary_shard_seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR);
}
}
else if (shard_parent_dir == BLOB_DIR)
@@ -94,8 +94,8 @@ namespace ledger
ctx.set_last_blob_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash});
}
if (MAX_BLOB_SHARDS == 0 || // Sync all blob shards if this is a full history node.
last_blob_shard_seq_no - synced_shard_seq_no + 1 < MAX_BLOB_SHARDS)
if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all blob shards if this is a full history node.
last_blob_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_blob_shards)
{
// Check whether the blob hash of the previous blob shard matches with the hash in the prev_shard.hash file.
const std::string prev_shard_vpath = std::string(BLOB_DIR).append("/").append(std::to_string(--synced_shard_seq_no));
@@ -108,16 +108,16 @@ namespace ledger
const std::string shard_path = std::string(BLOB_DIR).append("/").append(std::to_string(synced_shard_seq_no));
set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
}
else if (MAX_BLOB_SHARDS != 0 && last_blob_shard_seq_no >= MAX_BLOB_SHARDS)
else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_blob_shard_seq_no >= conf::cfg.node.history_config.max_blob_shards)
{
// When there are no more shards to sync, Remove old shards that exceeds max shard range.
remove_old_shards(last_blob_shard_seq_no - MAX_BLOB_SHARDS + 1, BLOB_DIR);
remove_old_shards(last_blob_shard_seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR);
}
}
else if (last_blob_shard_seq_no >= MAX_BLOB_SHARDS)
else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_blob_shard_seq_no >= conf::cfg.node.history_config.max_blob_shards)
{
// When there are no more shards to sync, Remove old shards that exceeds max shard range.
remove_old_shards(last_blob_shard_seq_no - MAX_BLOB_SHARDS + 1, BLOB_DIR);
remove_old_shards(last_blob_shard_seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR);
}
}
}

View File

@@ -26,7 +26,8 @@ namespace sc
int init()
{
if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.contract_hpfs_dir, conf::ctx.contract_hpfs_mount_dir, conf::ctx.contract_hpfs_rw_dir, conf::cfg.node.full_history) == -1)
if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.contract_hpfs_dir, conf::ctx.contract_hpfs_mount_dir, conf::ctx.contract_hpfs_rw_dir,
conf::cfg.node.history == conf::HISTORY::FULL) == -1)
{
LOG_ERROR << "Contract file system initialization failed.";
return -1;

View File

@@ -83,6 +83,14 @@ do
peers[i]="node${n}:${peerport}"
# Update config.
node_json=$(node -p "JSON.stringify({...require('./tmp.json').node, \
history: 'custom',\
history_config: {\
max_primary_shards: 4,\
max_blob_shards: 4\
}\
}, null, 2)")
contract_json=$(node -p "JSON.stringify({...require('./tmp.json').contract,
id: '3c349abe-4d70-4f50-9fa6-018f1f2530ab', \
bin_path: '$binary', \
@@ -113,6 +121,7 @@ do
}, null, 2)")
node -p "JSON.stringify({...require('./tmp.json'), \
node: ${node_json},\
contract: ${contract_json},\
mesh: ${mesh_json},\
user: ${user_json}, \