mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Improved return codes of hpfs hash access interface. (#121)
This commit is contained in:
@@ -177,37 +177,59 @@ namespace hpfs
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the hash of the specified vpath.
|
||||
* @return 1 on success. 0 if vpath not found. -1 on error.
|
||||
*/
|
||||
int get_hash(h32 &hash, const std::string_view mount_dir, const std::string_view vpath)
|
||||
{
|
||||
const std::string path = std::string(mount_dir).append(vpath).append("::hpfs.hmap.hash");
|
||||
const int fd = open(path.c_str(), O_RDONLY);
|
||||
if (fd == -1)
|
||||
if (fd == -1 && errno == ENOENT)
|
||||
{
|
||||
LOG_ERR << errno << ": Error opening hash file.";
|
||||
LOG_DBG << "Cannot get hash. vpath not found. " << vpath;
|
||||
return 0;
|
||||
}
|
||||
else if (fd == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Error opening hash file. " << vpath;
|
||||
return -1;
|
||||
}
|
||||
|
||||
const int res = read(fd, &hash, sizeof(h32));
|
||||
close(fd);
|
||||
if (res == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Error reading hash file.";
|
||||
LOG_ERR << errno << ": Error reading hash file. " << vpath;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the list of file block hashes for the specified vpath.
|
||||
* @return 1 on success. 0 if vpath not found. -1 on error.
|
||||
*/
|
||||
int get_file_block_hashes(std::vector<h32> &hashes, const std::string_view mount_dir, const std::string_view vpath)
|
||||
{
|
||||
const std::string path = std::string(mount_dir).append(vpath).append("::hpfs.hmap.children");
|
||||
const int fd = open(path.c_str(), O_RDONLY);
|
||||
if (fd == -1)
|
||||
if (fd == -1 && errno == ENOENT)
|
||||
{
|
||||
LOG_DBG << "Cannot get file block hashes. vpath not found. " << vpath;
|
||||
return 0;
|
||||
}
|
||||
else if (fd == -1)
|
||||
{
|
||||
LOG_DBG << errno << ": Error opening hashmap children. " << vpath;
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct stat st;
|
||||
if (fstat(fd, &st) == -1)
|
||||
{
|
||||
close(fd);
|
||||
LOG_ERR << errno << ": Error reading block hashes length.";
|
||||
LOG_ERR << errno << ": Error reading block hashes length. " << vpath;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -218,19 +240,28 @@ namespace hpfs
|
||||
close(fd);
|
||||
if (res == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Error reading hash block hashes.";
|
||||
LOG_ERR << errno << ": Error reading block hashes. " << vpath;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the list of dir entry hashes for the specified vpath.
|
||||
* @return 1 on success. 0 if vpath not found. -1 on error.
|
||||
*/
|
||||
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath)
|
||||
{
|
||||
const std::string path = std::string(mount_dir).append(dir_vpath).append("::hpfs.hmap.children");
|
||||
const int fd = open(path.c_str(), O_RDONLY);
|
||||
if (fd == -1)
|
||||
if (fd == -1 && errno == ENOENT)
|
||||
{
|
||||
LOG_ERR << errno << ": Error opening hash children nodes.";
|
||||
LOG_DBG << "Cannot get dir children hashes. Dir vpath not found. " << dir_vpath;
|
||||
return 0;
|
||||
}
|
||||
else if (fd == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Error opening dir hash children nodes. " << dir_vpath;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -238,7 +269,7 @@ namespace hpfs
|
||||
if (fstat(fd, &st) == -1)
|
||||
{
|
||||
close(fd);
|
||||
LOG_ERR << errno << ": Error reading hash children nodes length.";
|
||||
LOG_ERR << errno << ": Error reading hash children nodes length. " << dir_vpath;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -249,10 +280,10 @@ namespace hpfs
|
||||
close(fd);
|
||||
if (res == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Error reading hash children nodes.";
|
||||
LOG_ERR << errno << ": Error reading hash children nodes. " << dir_vpath;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
} // namespace hpfs
|
||||
164
src/sc.cpp
164
src/sc.cpp
@@ -7,9 +7,9 @@
|
||||
namespace sc
|
||||
{
|
||||
/**
|
||||
* Executes the contract process and passes the specified context arguments.
|
||||
* @return 0 on successful process creation. -1 on failure or contract process is already running.
|
||||
*/
|
||||
* Executes the contract process and passes the specified context arguments.
|
||||
* @return 0 on successful process creation. -1 on failure or contract process is already running.
|
||||
*/
|
||||
int execute_contract(execution_context &ctx)
|
||||
{
|
||||
// Start the hpfs rw session before starting the contract process.
|
||||
@@ -123,9 +123,9 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the calling thread until the specified process completed exeution (if running).
|
||||
* @return 0 if process exited normally or exit code of process if abnormally exited.
|
||||
*/
|
||||
* Blocks the calling thread until the specified process completed exeution (if running).
|
||||
* @return 0 if process exited normally or exit code of process if abnormally exited.
|
||||
*/
|
||||
int await_process_execution(pid_t pid)
|
||||
{
|
||||
if (pid > 0)
|
||||
@@ -139,8 +139,8 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the hpfs read/write state filesystem.
|
||||
*/
|
||||
* Starts the hpfs read/write state filesystem.
|
||||
*/
|
||||
int start_hpfs_rw_session(execution_context &ctx)
|
||||
{
|
||||
if (hpfs::start_fs_session(ctx.hpfs_pid, ctx.args.state_dir, ctx.args.readonly ? "ro" : "rw", true) == -1)
|
||||
@@ -151,16 +151,16 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the hpfs state filesystem.
|
||||
*/
|
||||
* Stops the hpfs state filesystem.
|
||||
*/
|
||||
int stop_hpfs_rw_session(execution_context &ctx)
|
||||
{
|
||||
// Read the root hash if not in readonly mode.
|
||||
if (!ctx.args.readonly && hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.state_dir, "/") == -1)
|
||||
if (!ctx.args.readonly && hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.state_dir, "/") < 1)
|
||||
return -1;
|
||||
|
||||
LOG_DBG << "Stopping hpfs session... pid:" << ctx.hpfs_pid << (ctx.args.readonly ? " (rdonly)" : "");
|
||||
;
|
||||
|
||||
if (util::kill_process(ctx.hpfs_pid, true) == -1)
|
||||
return -1;
|
||||
|
||||
@@ -169,20 +169,20 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the contract args (JSON) into the stdin of the contract process.
|
||||
* Args format:
|
||||
* {
|
||||
* "version":"<hp version>",
|
||||
* "pubkey": "<this node's hex public key>",
|
||||
* "ts": <this node's timestamp (unix milliseconds)>,
|
||||
* "readonly": <true|false>,
|
||||
* "lcl": "<this node's last closed ledger seq no. and hash in hex>", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb)
|
||||
* "hpfd": [fd0, fd1],
|
||||
* "nplfd":[fd0, fd1],
|
||||
* "usrfd":{ "<pkhex>":[fd0, fd1], ... },
|
||||
* "unl":[ "pkhex", ... ]
|
||||
* }
|
||||
*/
|
||||
* Writes the contract args (JSON) into the stdin of the contract process.
|
||||
* Args format:
|
||||
* {
|
||||
* "version":"<hp version>",
|
||||
* "pubkey": "<this node's hex public key>",
|
||||
* "ts": <this node's timestamp (unix milliseconds)>,
|
||||
* "readonly": <true|false>,
|
||||
* "lcl": "<this node's last closed ledger seq no. and hash in hex>", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb)
|
||||
* "hpfd": [fd0, fd1],
|
||||
* "nplfd":[fd0, fd1],
|
||||
* "usrfd":{ "<pkhex>":[fd0, fd1], ... },
|
||||
* "unl":[ "pkhex", ... ]
|
||||
* }
|
||||
*/
|
||||
int write_contract_args(const execution_context &ctx)
|
||||
{
|
||||
// Populate the json string with contract args.
|
||||
@@ -304,8 +304,8 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes any hp input messages to the contract.
|
||||
*/
|
||||
* Writes any hp input messages to the contract.
|
||||
*/
|
||||
int write_contract_hp_inputs(execution_context &ctx)
|
||||
{
|
||||
if (write_iopipe(ctx.hpscfds, ctx.args.hpscbufs.inputs) == -1)
|
||||
@@ -380,11 +380,11 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all HP output messages produced by the contract process and store them in
|
||||
* the buffer for later processing.
|
||||
*
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
*/
|
||||
* Read all HP output messages produced by the contract process and store them in
|
||||
* the buffer for later processing.
|
||||
*
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
*/
|
||||
int read_contract_hp_npl_outputs(execution_context &ctx)
|
||||
{
|
||||
const int hpsc_res = read_iopipe(ctx.hpscfds, ctx.args.hpscbufs.output);
|
||||
@@ -405,10 +405,10 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common helper function to write json output of fdmap to given ostream.
|
||||
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds)
|
||||
* @param os An output stream.
|
||||
*/
|
||||
* Common helper function to write json output of fdmap to given ostream.
|
||||
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds)
|
||||
* @param os An output stream.
|
||||
*/
|
||||
void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os)
|
||||
{
|
||||
for (auto itr = fdmap.begin(); itr != fdmap.end(); itr++)
|
||||
@@ -432,11 +432,11 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates io pipes for all pubkeys specified in bufmap.
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
* Creates io pipes for all pubkeys specified in bufmap.
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
|
||||
{
|
||||
for (auto &[pubkey, buflist] : bufmap)
|
||||
@@ -452,14 +452,14 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to create the pipes and write buffer inputs to the fdmap.
|
||||
* We take mutable parameters since the internal entries in the maps will be
|
||||
* modified (eg. fd close, buffer clear).
|
||||
*
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
* Common function to create the pipes and write buffer inputs to the fdmap.
|
||||
* We take mutable parameters since the internal entries in the maps will be
|
||||
* modified (eg. fd close, buffer clear).
|
||||
*
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
|
||||
{
|
||||
// Loop through input buffers for each pubkey.
|
||||
@@ -473,13 +473,13 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to read all outputs produced by the contract process and store them in
|
||||
* output buffers for later processing.
|
||||
*
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
*/
|
||||
* Common function to read all outputs produced by the contract process and store them in
|
||||
* output buffers for later processing.
|
||||
*
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
*/
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
|
||||
{
|
||||
bool bytes_read = false;
|
||||
@@ -500,9 +500,9 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to close any open fds in the map after an error.
|
||||
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds)
|
||||
*/
|
||||
* Common function to close any open fds in the map after an error.
|
||||
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds)
|
||||
*/
|
||||
void cleanup_fdmap(contract_fdmap_t &fdmap)
|
||||
{
|
||||
for (auto &[pubkey, fds] : fdmap)
|
||||
@@ -512,11 +512,11 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to create a pair of pipes (Hp->SC, SC->HP).
|
||||
* @param fds Vector to populate fd list.
|
||||
* @param inputbuffer Buffer to write into the HP write fd.
|
||||
* @param create_inpipe Whether to create the input pipe from HP to SC.
|
||||
*/
|
||||
* Common function to create a pair of pipes (Hp->SC, SC->HP).
|
||||
* @param fds Vector to populate fd list.
|
||||
* @param inputbuffer Buffer to write into the HP write fd.
|
||||
* @param create_inpipe Whether to create the input pipe from HP to SC.
|
||||
*/
|
||||
int create_iopipes(std::vector<int> &fds, const bool create_inpipe)
|
||||
{
|
||||
int inpipe[2] = {-1, -1};
|
||||
@@ -546,10 +546,10 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to write the given input buffer into the write fd from the HP side.
|
||||
* @param fds Vector of fd list.
|
||||
* @param inputs Buffer to write into the HP write fd.
|
||||
*/
|
||||
* Common function to write the given input buffer into the write fd from the HP side.
|
||||
* @param fds Vector of fd list.
|
||||
* @param inputs Buffer to write into the HP write fd.
|
||||
*/
|
||||
int write_iopipe(std::vector<int> &fds, std::list<std::string> &inputs)
|
||||
{
|
||||
// Write the inputs (if any) into the contract and close the writefd.
|
||||
@@ -586,11 +586,11 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to read buffered output from the pipe and populate the output list.
|
||||
* @param fds Vector representing the pipes fd list.
|
||||
* @param output The buffer to place the read output.
|
||||
* @return -1 on error. Otherwise no. of bytes read.
|
||||
*/
|
||||
* Common function to read buffered output from the pipe and populate the output list.
|
||||
* @param fds Vector representing the pipes fd list.
|
||||
* @param output The buffer to place the read output.
|
||||
* @return -1 on error. Otherwise no. of bytes read.
|
||||
*/
|
||||
int read_iopipe(std::vector<int> &fds, std::string &output)
|
||||
{
|
||||
// Read any available data that have been written by the contract process
|
||||
@@ -642,10 +642,10 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function for closing unused fds based on which process this gets called from.
|
||||
* @param is_hp Specify 'true' when calling from HP process. 'false' from SC process.
|
||||
* @param fds Vector of fds to close.
|
||||
*/
|
||||
* Common function for closing unused fds based on which process this gets called from.
|
||||
* @param is_hp Specify 'true' when calling from HP process. 'false' from SC process.
|
||||
* @param fds Vector of fds to close.
|
||||
*/
|
||||
void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds)
|
||||
{
|
||||
const int fdtypes_to_close[2] = {
|
||||
@@ -665,8 +665,8 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes all fds in a vector fd set.
|
||||
*/
|
||||
* Closes all fds in a vector fd set.
|
||||
*/
|
||||
void cleanup_vectorfds(std::vector<int> &fds)
|
||||
{
|
||||
for (int i = 0; i < fds.size(); i++)
|
||||
@@ -694,8 +694,8 @@ namespace sc
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup any running processes for the specified execution context.
|
||||
*/
|
||||
* Cleanup any running processes for the specified execution context.
|
||||
*/
|
||||
void stop(execution_context &ctx)
|
||||
{
|
||||
ctx.should_stop = true;
|
||||
|
||||
@@ -63,26 +63,25 @@ namespace state_serve
|
||||
state_requests.splice(state_requests.end(), p2p::ctx.collected_msgs.state_requests);
|
||||
}
|
||||
|
||||
uint64_t time_start = util::get_epoch_milliseconds();
|
||||
const uint64_t time_start = util::get_epoch_milliseconds();
|
||||
|
||||
for (auto &[session_id, request] : state_requests)
|
||||
{
|
||||
if (is_shutting_down)
|
||||
break;
|
||||
|
||||
// If we have spent too much time handling state requests, abandon the entire batch
|
||||
// because the requester would have stopped waiting for us.
|
||||
const uint64_t time_now = util::get_epoch_milliseconds();
|
||||
if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT)
|
||||
break;
|
||||
|
||||
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data());
|
||||
|
||||
const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message());
|
||||
flatbuffers::FlatBufferBuilder fbuf(1024);
|
||||
|
||||
uint64_t time_now = util::get_epoch_milliseconds();
|
||||
|
||||
// If we have spent too much time handling state requests, abandon the entire batch
|
||||
// because the requester would have stopped waiting for us.
|
||||
if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT)
|
||||
break;
|
||||
|
||||
if (state_serve::create_state_response(fbuf, sr) == 0)
|
||||
if (state_serve::create_state_response(fbuf, sr) == 1)
|
||||
{
|
||||
// Find the peer that we should send the state response to.
|
||||
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
|
||||
@@ -106,10 +105,12 @@ namespace state_serve
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the reply message for a given state request.
|
||||
* @param fbuf The flatbuffer builder to construct the reply message.
|
||||
* @param sr The state request which should be replied to.
|
||||
*/
|
||||
* Creates the reply message for a given state request.
|
||||
* @param fbuf The flatbuffer builder to construct the reply message.
|
||||
* @param sr The state request which should be replied to.
|
||||
* @return 1 if successful state response was generated. 0 if request is invalid
|
||||
* and no response was generated. -1 on error.
|
||||
*/
|
||||
int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr)
|
||||
{
|
||||
LOG_DBG << "Serving state req. path:" << sr.parent_path << " block_id:" << sr.block_id;
|
||||
@@ -120,19 +121,24 @@ namespace state_serve
|
||||
// Vector to hold the block bytes. Normally block size is constant BLOCK_SIZE (4MB), but the
|
||||
// last block of a file may have a smaller size.
|
||||
std::vector<uint8_t> block;
|
||||
if (get_file_block(block, sr.parent_path, sr.block_id, sr.expected_hash) == -1)
|
||||
const int result = get_data_block(block, sr.parent_path, sr.block_id, sr.expected_hash);
|
||||
|
||||
if (result == -1)
|
||||
{
|
||||
LOG_ERR << "Error in getting file block: " << sr.parent_path;
|
||||
return -1;
|
||||
}
|
||||
else if (result == 1)
|
||||
{
|
||||
p2p::block_response resp;
|
||||
resp.path = sr.parent_path;
|
||||
resp.block_id = sr.block_id;
|
||||
resp.hash = sr.expected_hash;
|
||||
resp.data = std::string_view(reinterpret_cast<const char *>(block.data()), block.size());
|
||||
|
||||
p2p::block_response resp;
|
||||
resp.path = sr.parent_path;
|
||||
resp.block_id = sr.block_id;
|
||||
resp.hash = sr.expected_hash;
|
||||
resp.data = std::string_view(reinterpret_cast<const char *>(block.data()), block.size());
|
||||
|
||||
msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, cons::ctx.lcl);
|
||||
msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, cons::ctx.lcl);
|
||||
return 1; // Success.
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -141,43 +147,53 @@ namespace state_serve
|
||||
{
|
||||
std::vector<hpfs::h32> block_hashes;
|
||||
std::size_t file_length = 0;
|
||||
if (get_file_block_hashes(block_hashes, file_length, sr.parent_path, sr.expected_hash) == -1)
|
||||
const int result = get_data_block_hashes(block_hashes, file_length, sr.parent_path, sr.expected_hash);
|
||||
|
||||
if (result == -1)
|
||||
{
|
||||
LOG_ERR << "Error in getting block hashes: " << sr.parent_path;
|
||||
return -1;
|
||||
}
|
||||
|
||||
msg::fbuf::p2pmsg::create_msg_from_filehashmap_response(
|
||||
fbuf, sr.parent_path, block_hashes,
|
||||
file_length, sr.expected_hash, cons::ctx.lcl);
|
||||
else if (result == 1)
|
||||
{
|
||||
msg::fbuf::p2pmsg::create_msg_from_filehashmap_response(
|
||||
fbuf, sr.parent_path, block_hashes,
|
||||
file_length, sr.expected_hash, cons::ctx.lcl);
|
||||
return 1; // Success.
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// If the state request is for a directory we need to reply with the
|
||||
// file system entries and their hashes inside that dir.
|
||||
std::vector<hpfs::child_hash_node> child_hash_nodes;
|
||||
if (get_fs_entry_hashes(child_hash_nodes, sr.parent_path, sr.expected_hash) == -1)
|
||||
const int result = get_fs_entry_hashes(child_hash_nodes, sr.parent_path, sr.expected_hash);
|
||||
|
||||
if (result == -1)
|
||||
{
|
||||
LOG_ERR << "Error in getting fs entries: " << sr.parent_path;
|
||||
return -1;
|
||||
}
|
||||
|
||||
msg::fbuf::p2pmsg::create_msg_from_fsentry_response(
|
||||
fbuf, sr.parent_path, child_hash_nodes, sr.expected_hash, cons::ctx.lcl);
|
||||
else if (result == 1)
|
||||
{
|
||||
msg::fbuf::p2pmsg::create_msg_from_fsentry_response(
|
||||
fbuf, sr.parent_path, child_hash_nodes, sr.expected_hash, cons::ctx.lcl);
|
||||
return 1; // Success.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DBG << "No state response generated.";
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the specified data block from a state file if expected hash matches.
|
||||
* @return Number of bytes read on success. -1 on failure.
|
||||
*/
|
||||
int get_file_block(std::vector<uint8_t> &block, const std::string_view vpath,
|
||||
* Retrieves the specified data block from a state file if expected hash matches.
|
||||
* @return 1 if block data was succefully fetched. 0 if vpath or block does not exist. -1 on error.
|
||||
*/
|
||||
int get_data_block(std::vector<uint8_t> &block, const std::string_view vpath,
|
||||
const uint32_t block_id, const hpfs::h32 expected_hash)
|
||||
{
|
||||
int fd = 0;
|
||||
pid_t hpfs_pid = 0;
|
||||
std::string mount_dir;
|
||||
if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1)
|
||||
@@ -185,80 +201,81 @@ namespace state_serve
|
||||
|
||||
// Check whether the existing block hash matches expected hash.
|
||||
std::vector<hpfs::h32> block_hashes;
|
||||
if (hpfs::get_file_block_hashes(block_hashes, mount_dir, vpath) == -1)
|
||||
goto failure;
|
||||
|
||||
if (block_id >= block_hashes.size())
|
||||
int result = hpfs::get_file_block_hashes(block_hashes, mount_dir, vpath);
|
||||
if (result == 1)
|
||||
{
|
||||
LOG_DBG << "Requested block_id " << block_id << " does not exist.";
|
||||
goto failure;
|
||||
}
|
||||
|
||||
if (block_hashes[block_id] != expected_hash)
|
||||
{
|
||||
LOG_DBG << "Expected hash mismatch.";
|
||||
goto failure;
|
||||
}
|
||||
|
||||
// Get actual block data.
|
||||
{
|
||||
const std::string file_path = std::string(mount_dir).append(vpath);
|
||||
const off_t block_offset = block_id * state_common::BLOCK_SIZE;
|
||||
fd = open(file_path.c_str(), O_RDONLY);
|
||||
if (fd == -1)
|
||||
if (block_id >= block_hashes.size())
|
||||
{
|
||||
LOG_ERR << errno << ": Open failed. " << file_path;
|
||||
goto failure;
|
||||
LOG_DBG << "Requested block_id " << block_id << " does not exist.";
|
||||
result = 0;
|
||||
}
|
||||
|
||||
struct stat st;
|
||||
if (fstat(fd, &st) == -1)
|
||||
else if (block_hashes[block_id] != expected_hash)
|
||||
{
|
||||
LOG_ERR << errno << ": Stat failed. " << file_path;
|
||||
goto failure;
|
||||
LOG_DBG << "Expected hash mismatch.";
|
||||
result = 0;
|
||||
}
|
||||
|
||||
if (!S_ISREG(st.st_mode))
|
||||
else // Get actual block data.
|
||||
{
|
||||
LOG_ERR << "Not a file. " << file_path;
|
||||
goto failure;
|
||||
}
|
||||
struct stat st;
|
||||
const std::string file_path = std::string(mount_dir).append(vpath);
|
||||
const off_t block_offset = block_id * state_common::BLOCK_SIZE;
|
||||
const int fd = open(file_path.c_str(), O_RDONLY);
|
||||
if (fd == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Open failed. " << file_path;
|
||||
result = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (fstat(fd, &st) == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Stat failed. " << file_path;
|
||||
result = -1;
|
||||
}
|
||||
else if (!S_ISREG(st.st_mode))
|
||||
{
|
||||
LOG_ERR << "Not a file. " << file_path;
|
||||
result = -1;
|
||||
}
|
||||
else if (block_offset > st.st_size)
|
||||
{
|
||||
LOG_ERR << "Block offset " << block_offset << " larger than file " << st.st_size << " - " << file_path;
|
||||
result = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
const size_t read_len = MIN(state_common::BLOCK_SIZE, (st.st_size - block_offset));
|
||||
block.resize(read_len);
|
||||
|
||||
if (block_offset > st.st_size)
|
||||
{
|
||||
LOG_ERR << "Block offset " << block_offset << " larger than file " << st.st_size << " - " << file_path;
|
||||
goto failure;
|
||||
}
|
||||
lseek(fd, block_offset, SEEK_SET);
|
||||
const int res = read(fd, block.data(), read_len);
|
||||
if (res < read_len)
|
||||
{
|
||||
LOG_ERR << errno << ": Read failed (result:" << res
|
||||
<< " off:" << block_offset << " len:" << read_len << "). " << file_path;
|
||||
result = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
result = 1; // Success.
|
||||
}
|
||||
}
|
||||
|
||||
const size_t read_len = MIN(state_common::BLOCK_SIZE, (st.st_size - block_offset));
|
||||
block.resize(read_len);
|
||||
|
||||
lseek(fd, block_offset, SEEK_SET);
|
||||
const int res = read(fd, block.data(), read_len);
|
||||
if (res < read_len)
|
||||
{
|
||||
LOG_ERR << errno << ": Read failed (result:" << res
|
||||
<< " off:" << block_offset << " len:" << read_len << "). " << file_path;
|
||||
goto failure;
|
||||
close(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
goto success;
|
||||
|
||||
failure:
|
||||
if (fd > 0)
|
||||
close(fd);
|
||||
util::kill_process(hpfs_pid, true);
|
||||
return -1;
|
||||
success:
|
||||
if (fd > 0)
|
||||
close(fd);
|
||||
if (util::kill_process(hpfs_pid, true) == -1)
|
||||
return -1;
|
||||
return 0;
|
||||
return result;
|
||||
}
|
||||
|
||||
int get_file_block_hashes(std::vector<hpfs::h32> &hashes, size_t &file_length,
|
||||
/**
|
||||
* Retrieves the specified file block hashes if expected hash matches.
|
||||
* @return 1 if block hashes were successfuly fetched. 0 if vpath does not exist. -1 on error.
|
||||
*/
|
||||
int get_data_block_hashes(std::vector<hpfs::h32> &hashes, size_t &file_length,
|
||||
const std::string_view vpath, const hpfs::h32 expected_hash)
|
||||
{
|
||||
pid_t hpfs_pid = 0;
|
||||
@@ -268,42 +285,43 @@ namespace state_serve
|
||||
|
||||
// Check whether the existing file hash matches expected hash.
|
||||
hpfs::h32 file_hash = hpfs::h32_empty;
|
||||
if (hpfs::get_hash(file_hash, mount_dir, vpath) == -1)
|
||||
goto failure;
|
||||
|
||||
if (file_hash != expected_hash)
|
||||
int result = hpfs::get_hash(file_hash, mount_dir, vpath);
|
||||
if (result == 1)
|
||||
{
|
||||
LOG_DBG << "Expected hash mismatch.";
|
||||
goto failure;
|
||||
}
|
||||
|
||||
// Get the block hashes.
|
||||
if (hpfs::get_file_block_hashes(hashes, mount_dir, vpath) == -1)
|
||||
goto failure;
|
||||
|
||||
// Get actual file length.
|
||||
{
|
||||
const std::string file_path = std::string(mount_dir).append(vpath);
|
||||
struct stat st;
|
||||
if (stat(file_path.c_str(), &st) == -1)
|
||||
if (file_hash != expected_hash)
|
||||
{
|
||||
LOG_ERR << errno << ": Stat failed. " << file_path;
|
||||
goto failure;
|
||||
LOG_DBG << "Expected hash mismatch.";
|
||||
result = 0;
|
||||
}
|
||||
// Get the block hashes.
|
||||
else if (hpfs::get_file_block_hashes(hashes, mount_dir, vpath) < 0)
|
||||
{
|
||||
result = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Get actual file length.
|
||||
const std::string file_path = std::string(mount_dir).append(vpath);
|
||||
struct stat st;
|
||||
if (stat(file_path.c_str(), &st) == -1)
|
||||
{
|
||||
LOG_ERR << errno << ": Stat failed when getting file length. " << file_path;
|
||||
result = -1;
|
||||
}
|
||||
file_length = st.st_size;
|
||||
result = 1; // Success.
|
||||
}
|
||||
file_length = st.st_size;
|
||||
}
|
||||
|
||||
goto success;
|
||||
|
||||
failure:
|
||||
util::kill_process(hpfs_pid, true);
|
||||
return -1;
|
||||
success:
|
||||
if (util::kill_process(hpfs_pid, true) == -1)
|
||||
return -1;
|
||||
return 0;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the specified dir entry hashes if expected fir hash matches.
|
||||
* @return 1 if fs entry hashes were successfuly fetched. 0 if vpath does not exist. -1 on error.
|
||||
*/
|
||||
int get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
|
||||
const std::string_view vpath, const hpfs::h32 expected_hash)
|
||||
{
|
||||
@@ -314,27 +332,27 @@ namespace state_serve
|
||||
|
||||
// Check whether the existing dir hash matches expected hash.
|
||||
hpfs::h32 dir_hash = hpfs::h32_empty;
|
||||
if (hpfs::get_hash(dir_hash, mount_dir, vpath) == -1)
|
||||
goto failure;
|
||||
|
||||
if (dir_hash != expected_hash)
|
||||
int result = hpfs::get_hash(dir_hash, mount_dir, vpath);
|
||||
if (result == 1)
|
||||
{
|
||||
LOG_DBG << "Expected hash mismatch.";
|
||||
goto failure;
|
||||
if (dir_hash != expected_hash)
|
||||
{
|
||||
LOG_DBG << "Expected hash mismatch.";
|
||||
result = 0;
|
||||
}
|
||||
// Get the children hash nodes.
|
||||
else if (hpfs::get_dir_children_hashes(hash_nodes, mount_dir, vpath) < 0)
|
||||
{
|
||||
result = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
result = 1; // Success.
|
||||
}
|
||||
}
|
||||
|
||||
// Get the children hash nodes.
|
||||
if (hpfs::get_dir_children_hashes(hash_nodes, mount_dir, vpath) == -1)
|
||||
goto failure;
|
||||
|
||||
goto success;
|
||||
|
||||
failure:
|
||||
util::kill_process(hpfs_pid, true);
|
||||
return -1;
|
||||
success:
|
||||
if (util::kill_process(hpfs_pid, true) == -1)
|
||||
return -1;
|
||||
return 0;
|
||||
return result;
|
||||
}
|
||||
} // namespace state_serve
|
||||
@@ -16,10 +16,10 @@ namespace state_serve
|
||||
|
||||
int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr);
|
||||
|
||||
int get_file_block(std::vector<uint8_t> &vec, const std::string_view vpath,
|
||||
int get_data_block(std::vector<uint8_t> &vec, const std::string_view vpath,
|
||||
const uint32_t block_id, const hpfs::h32 expected_hash);
|
||||
|
||||
int get_file_block_hashes(std::vector<hpfs::h32> &hashes, size_t &file_length,
|
||||
int get_data_block_hashes(std::vector<hpfs::h32> &hashes, size_t &file_length,
|
||||
const std::string_view vpath, const hpfs::h32 expected_hash);
|
||||
|
||||
int get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
|
||||
|
||||
@@ -14,13 +14,13 @@
|
||||
namespace state_sync
|
||||
{
|
||||
// Idle loop sleep time (milliseconds).
|
||||
constexpr uint16_t IDLE_WAIT = 50;
|
||||
constexpr uint16_t IDLE_WAIT = 20;
|
||||
|
||||
// Max number of requests that can be awaiting response at any given time.
|
||||
constexpr uint16_t MAX_AWAITING_REQUESTS = 4;
|
||||
|
||||
// Request loop sleep time (milliseconds).
|
||||
constexpr uint16_t REQUEST_LOOP_WAIT = 20;
|
||||
constexpr uint16_t REQUEST_LOOP_WAIT = 10;
|
||||
|
||||
constexpr int FILE_PERMS = 0644;
|
||||
|
||||
@@ -187,7 +187,12 @@ namespace state_sync
|
||||
handle_file_block_response(vpath, resp_msg->state_response_as_Block_Response());
|
||||
|
||||
// After handling each response, check whether we have reached target state.
|
||||
hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/");
|
||||
if (hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/") < 1)
|
||||
{
|
||||
LOG_ERR << "State sync: exiting due to hash check error.";
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_DBG << "State sync: current:" << updated_state << " | target:" << current_target;
|
||||
if (updated_state == current_target)
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user