Integrated hpfs in-proc sessions. (#134)

This commit is contained in:
Ravin Perera
2020-10-23 10:04:45 +05:30
committed by GitHub
parent 4f6ff4fbef
commit 7a4e91b0dd
12 changed files with 278 additions and 201 deletions

View File

@@ -24,11 +24,16 @@ namespace state_serve
bool is_shutting_down = false;
bool init_success = false;
pid_t hpfs_pid;
std::thread state_serve_thread;
int init()
{
REQUEST_BATCH_TIMEOUT = state_common::get_request_resubmit_timeout() * 0.9;
if (hpfs::start_ro_rw_process(hpfs_pid, conf::ctx.state_serve_dir, true, true, false) == -1)
return -1;
state_serve_thread = std::thread(state_serve_loop);
init_success = true;
return 0;
@@ -40,6 +45,10 @@ namespace state_serve
{
is_shutting_down = true;
state_serve_thread.join();
LOG_DEBUG << "Stopping hpfs state serve process... pid:" << hpfs_pid;
if (hpfs_pid > 0 && util::kill_process(hpfs_pid, true) == 0)
LOG_INFO << "Stopped hpfs state serve process.";
}
}
@@ -66,37 +75,45 @@ namespace state_serve
const uint64_t time_start = util::get_epoch_milliseconds();
const std::string lcl = ledger::ctx.get_lcl();
for (auto &[session_id, request] : state_requests)
if (state_requests.empty())
continue;
if (hpfs::start_fs_session(conf::ctx.state_serve_dir) != -1)
{
if (is_shutting_down)
break;
// If we have spent too much time handling state requests, abandon the entire batch
// because the requester would have stopped waiting for us.
const uint64_t time_now = util::get_epoch_milliseconds();
if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT)
break;
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data());
const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message());
flatbuffers::FlatBufferBuilder fbuf(1024);
if (state_serve::create_state_response(fbuf, sr, lcl) == 1)
for (auto &[session_id, request] : state_requests)
{
// Find the peer that we should send the state response to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
const auto peer_itr = p2p::ctx.peer_connections.find(session_id);
if (is_shutting_down)
break;
if (peer_itr != p2p::ctx.peer_connections.end())
// If we have spent too much time handling state requests, abandon the entire batch
// because the requester would have stopped waiting for us.
const uint64_t time_now = util::get_epoch_milliseconds();
if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT)
break;
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data());
const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message());
flatbuffers::FlatBufferBuilder fbuf(1024);
if (state_serve::create_state_response(fbuf, sr, lcl) == 1)
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
// Find the peer that we should send the state response to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
const auto peer_itr = p2p::ctx.peer_connections.find(session_id);
comm::comm_session *session = peer_itr->second;
session->send(msg);
if (peer_itr != p2p::ctx.peer_connections.end())
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
comm::comm_session *session = peer_itr->second;
session->send(msg);
}
}
}
hpfs::stop_fs_session(conf::ctx.state_serve_dir);
}
state_requests.clear();
@@ -195,14 +212,9 @@ namespace state_serve
int get_data_block(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const hpfs::h32 expected_hash)
{
pid_t hpfs_pid = 0;
std::string mount_dir;
if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1)
return -1;
// Check whether the existing block hash matches expected hash.
std::vector<hpfs::h32> block_hashes;
int result = hpfs::get_file_block_hashes(block_hashes, mount_dir, vpath);
int result = hpfs::get_file_block_hashes(block_hashes, conf::ctx.state_serve_dir, vpath);
if (result == 1)
{
if (block_id >= block_hashes.size())
@@ -218,7 +230,7 @@ namespace state_serve
else // Get actual block data.
{
struct stat st;
const std::string file_path = std::string(mount_dir).append(vpath);
const std::string file_path = std::string(conf::ctx.state_serve_dir).append(vpath);
const off_t block_offset = block_id * state_common::BLOCK_SIZE;
const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
@@ -253,7 +265,7 @@ namespace state_serve
if (res < read_len)
{
LOG_ERROR << errno << ": Read failed (result:" << res
<< " off:" << block_offset << " len:" << read_len << "). " << file_path;
<< " off:" << block_offset << " len:" << read_len << "). " << file_path;
result = -1;
}
else
@@ -267,8 +279,6 @@ namespace state_serve
}
}
if (util::kill_process(hpfs_pid, true) == -1)
return -1;
return result;
}
@@ -279,14 +289,9 @@ namespace state_serve
int get_data_block_hashes(std::vector<hpfs::h32> &hashes, size_t &file_length,
const std::string_view vpath, const hpfs::h32 expected_hash)
{
pid_t hpfs_pid = 0;
std::string mount_dir;
if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1)
return -1;
// Check whether the existing file hash matches expected hash.
hpfs::h32 file_hash = hpfs::h32_empty;
int result = hpfs::get_hash(file_hash, mount_dir, vpath);
int result = hpfs::get_hash(file_hash, conf::ctx.state_serve_dir, vpath);
if (result == 1)
{
if (file_hash != expected_hash)
@@ -295,14 +300,14 @@ namespace state_serve
result = 0;
}
// Get the block hashes.
else if (hpfs::get_file_block_hashes(hashes, mount_dir, vpath) < 0)
else if (hpfs::get_file_block_hashes(hashes, conf::ctx.state_serve_dir, vpath) < 0)
{
result = -1;
}
else
{
// Get actual file length.
const std::string file_path = std::string(mount_dir).append(vpath);
const std::string file_path = std::string(conf::ctx.state_serve_dir).append(vpath);
struct stat st;
if (stat(file_path.c_str(), &st) == -1)
{
@@ -314,8 +319,6 @@ namespace state_serve
}
}
if (util::kill_process(hpfs_pid, true) == -1)
return -1;
return result;
}
@@ -326,14 +329,9 @@ namespace state_serve
int get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
const std::string_view vpath, const hpfs::h32 expected_hash)
{
pid_t hpfs_pid = 0;
std::string mount_dir;
if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1)
return -1;
// Check whether the existing dir hash matches expected hash.
hpfs::h32 dir_hash = hpfs::h32_empty;
int result = hpfs::get_hash(dir_hash, mount_dir, vpath);
int result = hpfs::get_hash(dir_hash, conf::ctx.state_serve_dir, vpath);
if (result == 1)
{
if (dir_hash != expected_hash)
@@ -342,7 +340,7 @@ namespace state_serve
result = 0;
}
// Get the children hash nodes.
else if (hpfs::get_dir_children_hashes(hash_nodes, mount_dir, vpath) < 0)
else if (hpfs::get_dir_children_hashes(hash_nodes, conf::ctx.state_serve_dir, vpath) < 0)
{
result = -1;
}
@@ -352,8 +350,6 @@ namespace state_serve
}
}
if (util::kill_process(hpfs_pid, true) == -1)
return -1;
return result;
}
} // namespace state_serve

View File

@@ -34,6 +34,7 @@ namespace state_sync
REQUEST_RESUBMIT_TIMEOUT = state_common::get_request_resubmit_timeout();
ctx.target_state = hpfs::h32_empty;
ctx.state_sync_thread = std::thread(state_syncer_loop);
ctx.hpfs_mount_dir = conf::ctx.state_rw_dir;
init_success = true;
return 0;
}
@@ -88,8 +89,7 @@ namespace state_sync
LOG_INFO << "State sync: Starting sync for target state: " << ctx.target_state;
}
pid_t hpfs_pid = 0;
if (hpfs::start_fs_session(hpfs_pid, ctx.hpfs_mount_dir, "rw", true) != -1)
if (hpfs::start_fs_session(ctx.hpfs_mount_dir) != -1)
{
while (!ctx.is_shutting_down)
{
@@ -119,10 +119,8 @@ namespace state_sync
}
}
}
// Stop hpfs rw session.
LOG_DEBUG << "State sync: Stopping hpfs session... pid:" << hpfs_pid;
util::kill_process(hpfs_pid, true);
hpfs::stop_fs_session(ctx.hpfs_mount_dir);
}
else
{