Improvements in state request/response handling. (#97)

This commit is contained in:
Ravin Perera
2020-06-18 15:07:33 +05:30
committed by GitHub
parent b89dbe0a2c
commit 04e230c82e
15 changed files with 534 additions and 431 deletions

View File

@@ -10,259 +10,259 @@
namespace comm
{
constexpr uint32_t INTERVALMS = 60000;
constexpr uint8_t SIZE_HEADER_LEN = 8;
constexpr uint32_t INTERVALMS = 60000;
constexpr uint8_t SIZE_HEADER_LEN = 8;
// Global instances of user and peer session handlers.
usr::user_session_handler user_sess_handler;
p2p::peer_session_handler peer_sess_handler;
// Global instances of user and peer session handlers.
usr::user_session_handler user_sess_handler;
p2p::peer_session_handler peer_sess_handler;
comm_session::comm_session(
std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type,
const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4])
comm_session::comm_session(
std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type,
const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4])
: read_fd(read_fd),
write_fd(write_fd),
session_type(session_type),
uniqueid(std::to_string(read_fd).append(":").append(ip)),
is_binary(is_binary),
is_inbound(is_inbound)
{
// Create new session_thresholds and insert it to thresholds vector.
// Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector
// since enum's value is used as index in the vector to update vector values.
thresholds.reserve(sizeof metric_thresholds);
for (size_t i = 0; i < sizeof metric_thresholds; i++)
thresholds.push_back(session_threshold(metric_thresholds[i], INTERVALMS));
}
: read_fd(read_fd),
write_fd(write_fd),
session_type(session_type),
uniqueid(std::to_string(read_fd).append(":").append(ip)),
is_binary(is_binary),
is_inbound(is_inbound)
{
// Create new session_thresholds and insert it to thresholds vector.
// Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector
// since enum's value is used as index in the vector to update vector values.
thresholds.reserve(4);
for (size_t i = 0; i < 4; i++)
thresholds.push_back(session_threshold(metric_thresholds[i], INTERVALMS));
}
int comm_session::on_connect()
{
state = SESSION_STATE::ACTIVE;
int comm_session::on_connect()
{
state = SESSION_STATE::ACTIVE;
if (session_type == SESSION_TYPE::USER)
return user_sess_handler.on_connect(*this);
else
return peer_sess_handler.on_connect(*this);
}
if (session_type == SESSION_TYPE::USER)
return user_sess_handler.on_connect(*this);
else
return peer_sess_handler.on_connect(*this);
}
/**
/**
* Attempts to read message data from the given socket fd and passes the message on to the session.
* @param should_disconnect Whether the client fd must be disconnected.
* @param max_msg_size The allowed max byte length of a message to be read.
*/
int comm_session::attempt_read(const uint64_t max_msg_size)
{
size_t available_bytes = 0;
if (ioctl(read_fd, FIONREAD, &available_bytes) == -1 ||
(max_msg_size > 0 &&
available_bytes > (max_msg_size + (is_binary ? SIZE_HEADER_LEN : 0))))
return -1;
// Try to read a complete message using available bytes.
// If complete message is not available silently return.
if (available_bytes > 0)
int comm_session::attempt_read(const uint64_t max_msg_size)
{
const uint32_t read_len = is_binary ? get_binary_msg_read_len(available_bytes) : available_bytes;
size_t available_bytes = 0;
if (ioctl(read_fd, FIONREAD, &available_bytes) == -1 ||
(max_msg_size > 0 &&
available_bytes > (max_msg_size + (is_binary ? SIZE_HEADER_LEN : 0))))
return -1;
if (read_len == -1)
// Try to read a complete message using available bytes.
// If complete message is not available silently return.
if (available_bytes > 0)
{
const uint32_t read_len = is_binary ? get_binary_msg_read_len(available_bytes) : available_bytes;
if (read_len == -1)
{
return -1;
}
else if (read_len > 0)
{
int res = on_message(std::string_view(read_buffer.data(), read_len));
read_buffer.clear(); // Clear the buffer after read operation.
read_buffer_filled_size = 0;
return res;
}
}
return 0;
}
int comm_session::on_message(std::string_view message)
{
increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.length());
if (session_type == SESSION_TYPE::USER)
return user_sess_handler.on_message(*this, message);
else
return peer_sess_handler.on_message(*this, message);
}
int comm_session::send(std::string_view message) const
{
if (state == SESSION_STATE::CLOSED)
return -1;
// Prepare the memory segments to map with writev().
iovec memsegs[2];
if (is_binary)
{
// In binary mode, we need to prefix every message with the message size header.
uint8_t header_buf[SIZE_HEADER_LEN] = {0, 0, 0, 0, 0, 0, 0, 0};
uint32_t len = message.length();
// Reserve the first 4 bytes for future (TODO).
header_buf[4] = len >> 24;
header_buf[5] = (len >> 16) & 0xff;
header_buf[6] = (len >> 8) & 0xff;
header_buf[7] = len & 0xff;
memsegs[0].iov_base = header_buf;
memsegs[0].iov_len = SIZE_HEADER_LEN;
memsegs[1].iov_base = (char *)message.data();
memsegs[1].iov_len = message.length();
}
else
{
// In text mode, we need to append every message with '\n'
memsegs[0].iov_base = (char *)message.data();
memsegs[0].iov_len = message.length();
memsegs[1].iov_base = (char *)"\n";
memsegs[1].iov_len = 1;
}
if (writev(write_fd, memsegs, 2) == -1)
{
LOG_ERR << errno << ": Session " << uniqueid << " send writev failed.";
return -1;
}
else if (read_len > 0)
return 0;
}
void comm_session::close(const bool invoke_handler)
{
if (state == SESSION_STATE::CLOSED)
return;
if (invoke_handler)
{
int res = on_message(std::string_view(read_buffer.data(), read_len));
read_buffer.clear(); // Clear the buffer after read operation.
read_buffer_filled_size = 0;
return res;
if (session_type == SESSION_TYPE::USER)
user_sess_handler.on_close(*this);
else
peer_sess_handler.on_close(*this);
}
::close(read_fd);
state = SESSION_STATE::CLOSED;
LOG_DBG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: "
<< uniqueid << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : "");
}
return 0;
}
int comm_session::on_message(std::string_view message)
{
increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.length());
if (session_type == SESSION_TYPE::USER)
return user_sess_handler.on_message(*this, message);
else
return peer_sess_handler.on_message(*this, message);
}
int comm_session::send(std::string_view message) const
{
if (state == SESSION_STATE::CLOSED)
return -1;
// Prepare the memory segments to map with writev().
iovec memsegs[2];
if (is_binary)
{
// In binary mode, we need to prefix every message with the message size header.
uint8_t header_buf[SIZE_HEADER_LEN] = {0, 0, 0, 0, 0, 0, 0, 0};
uint32_t len = message.length();
// Reserve the first 4 bytes for future (TODO).
header_buf[4] = len >> 24;
header_buf[5] = (len >> 16) & 0xff;
header_buf[6] = (len >> 8) & 0xff;
header_buf[7] = len & 0xff;
memsegs[0].iov_base = header_buf;
memsegs[0].iov_len = SIZE_HEADER_LEN;
memsegs[1].iov_base = (char *)message.data();
memsegs[1].iov_len = message.length();
}
else
{
// In text mode, we need to append every message with '\n'
memsegs[0].iov_base = (char *)message.data();
memsegs[0].iov_len = message.length();
memsegs[1].iov_base = (char *)"\n";
memsegs[1].iov_len = 1;
}
if (writev(write_fd, memsegs, 2) == -1)
{
LOG_ERR << errno << ": Session " << uniqueid << " send writev failed.";
return -1;
}
return 0;
}
void comm_session::close(const bool invoke_handler)
{
if (state == SESSION_STATE::CLOSED)
return;
if (invoke_handler)
{
if (session_type == SESSION_TYPE::USER)
user_sess_handler.on_close(*this);
else
peer_sess_handler.on_close(*this);
}
::close(read_fd);
state = SESSION_STATE::CLOSED;
LOG_DBG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: "
<< uniqueid << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : "");
}
/**
/**
* Retrieves the length of the binary message pending to be read. Only relevant for Binary mode.
* @param available_bytes Count of bytes that is available to read from the client socket.
* @return Length of the message if the complete message available to be read. 0 if reading must be skipped. -1 if client must be disconnected.
*/
uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes)
{
// If we have previously encountered a size header and we are waiting until all message
// bytes are received, we must have the expected message size > 0.
size_t data_bytes = available_bytes;
// If we are not tracking a previous size header, then we must check for a size header.
if (expected_msg_size == 0 && available_bytes >= SIZE_HEADER_LEN)
uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes)
{
// Read the size header.
uint8_t header_buf[SIZE_HEADER_LEN];
if (read(read_fd, header_buf, SIZE_HEADER_LEN) == -1)
return -1; // Indicates that we should disconnect the client.
// If we have previously encountered a size header and we are waiting until all message
// bytes are received, we must have the expected message size > 0.
data_bytes -= SIZE_HEADER_LEN;
size_t data_bytes = available_bytes;
// We are using last 4 bytes (big endian) in the header for the message size.
uint32_t upcoming_msg_size = (header_buf[4] << 24) + (header_buf[5] << 16) + (header_buf[6] << 8) + header_buf[7];
// Remember the expected msg size until sufficient bytes are available.
expected_msg_size = upcoming_msg_size;
read_buffer.resize(expected_msg_size);
}
if (expected_msg_size > 0 && data_bytes > 0)
{
// Claculate bytes remaining to form complete message.
const size_t remaining_len = expected_msg_size - read_buffer_filled_size;
// We know expected message size, and enough bytes are available to read complete expected message.
if (data_bytes >= remaining_len)
// If we are not tracking a previous size header, then we must check for a size header.
if (expected_msg_size == 0 && available_bytes >= SIZE_HEADER_LEN)
{
// Complete the buffer by reading remaining bytes.
if (read(read_fd, read_buffer.data() + read_buffer_filled_size, remaining_len) == -1)
// Read the size header.
uint8_t header_buf[SIZE_HEADER_LEN];
if (read(read_fd, header_buf, SIZE_HEADER_LEN) == -1)
return -1; // Indicates that we should disconnect the client.
read_buffer_filled_size += remaining_len;
const size_t read_len = expected_msg_size;
expected_msg_size = 0; // reset the expected msg size.
return read_len;
data_bytes -= SIZE_HEADER_LEN;
// We are using last 4 bytes (big endian) in the header for the message size.
uint32_t upcoming_msg_size = (header_buf[4] << 24) + (header_buf[5] << 16) + (header_buf[6] << 8) + header_buf[7];
// Remember the expected msg size until sufficient bytes are available.
expected_msg_size = upcoming_msg_size;
read_buffer.resize(expected_msg_size);
}
else
if (expected_msg_size > 0 && data_bytes > 0)
{
// Collect any available bytes to the buffer.
if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1)
return -1; // Indicates that we should disconnect the client.
read_buffer_filled_size += data_bytes;
// Claculate bytes remaining to form complete message.
const size_t remaining_len = expected_msg_size - read_buffer_filled_size;
// We know expected message size, and enough bytes are available to read complete expected message.
if (data_bytes >= remaining_len)
{
// Complete the buffer by reading remaining bytes.
if (read(read_fd, read_buffer.data() + read_buffer_filled_size, remaining_len) == -1)
return -1; // Indicates that we should disconnect the client.
read_buffer_filled_size += remaining_len;
const size_t read_len = expected_msg_size;
expected_msg_size = 0; // reset the expected msg size.
return read_len;
}
else
{
// Collect any available bytes to the buffer.
if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1)
return -1; // Indicates that we should disconnect the client.
read_buffer_filled_size += data_bytes;
}
}
// Skip reading
return 0;
}
// Skip reading
return 0;
}
/**
/**
* Set thresholds to the socket session
*/
void comm_session::set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms)
{
session_threshold &t = thresholds[threshold_type];
t.counter_value = 0;
t.intervalms = intervalms;
t.threshold_limit = threshold_limit;
}
void comm_session::set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms)
{
session_threshold &t = thresholds[threshold_type];
t.counter_value = 0;
t.intervalms = intervalms;
t.threshold_limit = threshold_limit;
}
/*
/*
* Increment the provided thresholds counter value with the provided amount and validate it against the
* configured threshold limit.
*/
void comm_session::increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount)
{
session_threshold &t = thresholds[threshold_type];
// Ignore the counter if limit is set as 0.
if (t.threshold_limit == 0)
return;
const uint64_t time_now = util::get_epoch_milliseconds();
t.counter_value += amount;
if (t.timestamp == 0)
void comm_session::increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount)
{
// Reset counter timestamp.
t.timestamp = time_now;
}
else
{
// Check whether we have exceeded the threshold within the monitering interval.
const uint64_t elapsed_time = time_now - t.timestamp;
if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit)
{
this->close();
session_threshold &t = thresholds[threshold_type];
t.timestamp = 0;
t.counter_value = 0;
// Ignore the counter if limit is set as 0.
if (t.threshold_limit == 0)
return;
LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")";
corebill::report_violation(this->address);
}
else if (elapsed_time > t.intervalms)
const uint64_t time_now = util::get_epoch_milliseconds();
t.counter_value += amount;
if (t.timestamp == 0)
{
// Reset counter timestamp.
t.timestamp = time_now;
t.counter_value = amount;
}
else
{
// Check whether we have exceeded the threshold within the monitering interval.
const uint64_t elapsed_time = time_now - t.timestamp;
if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit)
{
this->close();
t.timestamp = 0;
t.counter_value = 0;
LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")";
corebill::report_violation(this->address);
}
else if (elapsed_time > t.intervalms)
{
t.timestamp = time_now;
t.counter_value = amount;
}
}
}
}
} // namespace comm

View File

@@ -86,21 +86,19 @@ union State_Response{ File_HashMap_Response, Block_Response, Fs_Entry_Response }
table State_Response_Message{
state_response:State_Response;
hash:[ubyte];
path: string;
}
table Fs_Entry_Response{
path: string;
entries: [State_FS_Hash_Entry];
}
table File_HashMap_Response{
path: string;
file_length:uint64;
hash_map:[ubyte];
}
table Block_Response{
path: string;
block_id:uint32;
data: [ubyte];
}

View File

@@ -1277,7 +1277,8 @@ struct State_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tab
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_STATE_RESPONSE_TYPE = 4,
VT_STATE_RESPONSE = 6,
VT_HASH = 8
VT_HASH = 8,
VT_PATH = 10
};
fbschema::p2pmsg::State_Response state_response_type() const {
return static_cast<fbschema::p2pmsg::State_Response>(GetField<uint8_t>(VT_STATE_RESPONSE_TYPE, 0));
@@ -1304,6 +1305,12 @@ struct State_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tab
flatbuffers::Vector<uint8_t> *mutable_hash() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_HASH);
}
const flatbuffers::String *path() const {
return GetPointer<const flatbuffers::String *>(VT_PATH);
}
flatbuffers::String *mutable_path() {
return GetPointer<flatbuffers::String *>(VT_PATH);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint8_t>(verifier, VT_STATE_RESPONSE_TYPE) &&
@@ -1311,6 +1318,8 @@ struct State_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tab
VerifyState_Response(verifier, state_response(), state_response_type()) &&
VerifyOffset(verifier, VT_HASH) &&
verifier.VerifyVector(hash()) &&
VerifyOffset(verifier, VT_PATH) &&
verifier.VerifyString(path()) &&
verifier.EndTable();
}
};
@@ -1340,6 +1349,9 @@ struct State_Response_MessageBuilder {
void add_hash(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash) {
fbb_.AddOffset(State_Response_Message::VT_HASH, hash);
}
void add_path(flatbuffers::Offset<flatbuffers::String> path) {
fbb_.AddOffset(State_Response_Message::VT_PATH, path);
}
explicit State_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
@@ -1355,8 +1367,10 @@ inline flatbuffers::Offset<State_Response_Message> CreateState_Response_Message(
flatbuffers::FlatBufferBuilder &_fbb,
fbschema::p2pmsg::State_Response state_response_type = fbschema::p2pmsg::State_Response_NONE,
flatbuffers::Offset<void> state_response = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash = 0) {
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash = 0,
flatbuffers::Offset<flatbuffers::String> path = 0) {
State_Response_MessageBuilder builder_(_fbb);
builder_.add_path(path);
builder_.add_hash(hash);
builder_.add_state_response(state_response);
builder_.add_state_response_type(state_response_type);
@@ -1367,27 +1381,23 @@ inline flatbuffers::Offset<State_Response_Message> CreateState_Response_MessageD
flatbuffers::FlatBufferBuilder &_fbb,
fbschema::p2pmsg::State_Response state_response_type = fbschema::p2pmsg::State_Response_NONE,
flatbuffers::Offset<void> state_response = 0,
const std::vector<uint8_t> *hash = nullptr) {
const std::vector<uint8_t> *hash = nullptr,
const char *path = nullptr) {
auto hash__ = hash ? _fbb.CreateVector<uint8_t>(*hash) : 0;
auto path__ = path ? _fbb.CreateString(path) : 0;
return fbschema::p2pmsg::CreateState_Response_Message(
_fbb,
state_response_type,
state_response,
hash__);
hash__,
path__);
}
struct Fs_Entry_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef Fs_Entry_ResponseBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_PATH = 4,
VT_ENTRIES = 6
VT_ENTRIES = 4
};
const flatbuffers::String *path() const {
return GetPointer<const flatbuffers::String *>(VT_PATH);
}
flatbuffers::String *mutable_path() {
return GetPointer<flatbuffers::String *>(VT_PATH);
}
const flatbuffers::Vector<flatbuffers::Offset<fbschema::p2pmsg::State_FS_Hash_Entry>> *entries() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<fbschema::p2pmsg::State_FS_Hash_Entry>> *>(VT_ENTRIES);
}
@@ -1396,8 +1406,6 @@ struct Fs_Entry_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_PATH) &&
verifier.VerifyString(path()) &&
VerifyOffset(verifier, VT_ENTRIES) &&
verifier.VerifyVector(entries()) &&
verifier.VerifyVectorOfTables(entries()) &&
@@ -1409,9 +1417,6 @@ struct Fs_Entry_ResponseBuilder {
typedef Fs_Entry_Response Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_path(flatbuffers::Offset<flatbuffers::String> path) {
fbb_.AddOffset(Fs_Entry_Response::VT_PATH, path);
}
void add_entries(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::p2pmsg::State_FS_Hash_Entry>>> entries) {
fbb_.AddOffset(Fs_Entry_Response::VT_ENTRIES, entries);
}
@@ -1428,39 +1433,27 @@ struct Fs_Entry_ResponseBuilder {
inline flatbuffers::Offset<Fs_Entry_Response> CreateFs_Entry_Response(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::String> path = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::p2pmsg::State_FS_Hash_Entry>>> entries = 0) {
Fs_Entry_ResponseBuilder builder_(_fbb);
builder_.add_entries(entries);
builder_.add_path(path);
return builder_.Finish();
}
inline flatbuffers::Offset<Fs_Entry_Response> CreateFs_Entry_ResponseDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const char *path = nullptr,
const std::vector<flatbuffers::Offset<fbschema::p2pmsg::State_FS_Hash_Entry>> *entries = nullptr) {
auto path__ = path ? _fbb.CreateString(path) : 0;
auto entries__ = entries ? _fbb.CreateVector<flatbuffers::Offset<fbschema::p2pmsg::State_FS_Hash_Entry>>(*entries) : 0;
return fbschema::p2pmsg::CreateFs_Entry_Response(
_fbb,
path__,
entries__);
}
struct File_HashMap_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef File_HashMap_ResponseBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_PATH = 4,
VT_FILE_LENGTH = 6,
VT_HASH_MAP = 8
VT_FILE_LENGTH = 4,
VT_HASH_MAP = 6
};
const flatbuffers::String *path() const {
return GetPointer<const flatbuffers::String *>(VT_PATH);
}
flatbuffers::String *mutable_path() {
return GetPointer<flatbuffers::String *>(VT_PATH);
}
uint64_t file_length() const {
return GetField<uint64_t>(VT_FILE_LENGTH, 0);
}
@@ -1475,8 +1468,6 @@ struct File_HashMap_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_PATH) &&
verifier.VerifyString(path()) &&
VerifyField<uint64_t>(verifier, VT_FILE_LENGTH) &&
VerifyOffset(verifier, VT_HASH_MAP) &&
verifier.VerifyVector(hash_map()) &&
@@ -1488,9 +1479,6 @@ struct File_HashMap_ResponseBuilder {
typedef File_HashMap_Response Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_path(flatbuffers::Offset<flatbuffers::String> path) {
fbb_.AddOffset(File_HashMap_Response::VT_PATH, path);
}
void add_file_length(uint64_t file_length) {
fbb_.AddElement<uint64_t>(File_HashMap_Response::VT_FILE_LENGTH, file_length, 0);
}
@@ -1510,26 +1498,21 @@ struct File_HashMap_ResponseBuilder {
inline flatbuffers::Offset<File_HashMap_Response> CreateFile_HashMap_Response(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::String> path = 0,
uint64_t file_length = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash_map = 0) {
File_HashMap_ResponseBuilder builder_(_fbb);
builder_.add_file_length(file_length);
builder_.add_hash_map(hash_map);
builder_.add_path(path);
return builder_.Finish();
}
inline flatbuffers::Offset<File_HashMap_Response> CreateFile_HashMap_ResponseDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const char *path = nullptr,
uint64_t file_length = 0,
const std::vector<uint8_t> *hash_map = nullptr) {
auto path__ = path ? _fbb.CreateString(path) : 0;
auto hash_map__ = hash_map ? _fbb.CreateVector<uint8_t>(*hash_map) : 0;
return fbschema::p2pmsg::CreateFile_HashMap_Response(
_fbb,
path__,
file_length,
hash_map__);
}
@@ -1537,16 +1520,9 @@ inline flatbuffers::Offset<File_HashMap_Response> CreateFile_HashMap_ResponseDir
struct Block_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef Block_ResponseBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_PATH = 4,
VT_BLOCK_ID = 6,
VT_DATA = 8
VT_BLOCK_ID = 4,
VT_DATA = 6
};
const flatbuffers::String *path() const {
return GetPointer<const flatbuffers::String *>(VT_PATH);
}
flatbuffers::String *mutable_path() {
return GetPointer<flatbuffers::String *>(VT_PATH);
}
uint32_t block_id() const {
return GetField<uint32_t>(VT_BLOCK_ID, 0);
}
@@ -1561,8 +1537,6 @@ struct Block_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_PATH) &&
verifier.VerifyString(path()) &&
VerifyField<uint32_t>(verifier, VT_BLOCK_ID) &&
VerifyOffset(verifier, VT_DATA) &&
verifier.VerifyVector(data()) &&
@@ -1574,9 +1548,6 @@ struct Block_ResponseBuilder {
typedef Block_Response Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_path(flatbuffers::Offset<flatbuffers::String> path) {
fbb_.AddOffset(Block_Response::VT_PATH, path);
}
void add_block_id(uint32_t block_id) {
fbb_.AddElement<uint32_t>(Block_Response::VT_BLOCK_ID, block_id, 0);
}
@@ -1596,26 +1567,21 @@ struct Block_ResponseBuilder {
inline flatbuffers::Offset<Block_Response> CreateBlock_Response(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::String> path = 0,
uint32_t block_id = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data = 0) {
Block_ResponseBuilder builder_(_fbb);
builder_.add_data(data);
builder_.add_block_id(block_id);
builder_.add_path(path);
return builder_.Finish();
}
inline flatbuffers::Offset<Block_Response> CreateBlock_ResponseDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const char *path = nullptr,
uint32_t block_id = 0,
const std::vector<uint8_t> *data = nullptr) {
auto path__ = path ? _fbb.CreateString(path) : 0;
auto data__ = data ? _fbb.CreateVector<uint8_t>(*data) : 0;
return fbschema::p2pmsg::CreateBlock_Response(
_fbb,
path__,
block_id,
data__);
}

View File

@@ -18,7 +18,7 @@ namespace fbschema::p2pmsg
constexpr size_t PEERCHALLENGE_LEN = 16;
// Max size of messages which are subjected to time (too old) check.
constexpr size_t MAX_SIZE_FOR_TIME_CHECK = 10 * 1024 * 1024; // 10 MB
constexpr size_t MAX_SIZE_FOR_TIME_CHECK = 1 * 1024 * 1024; // 1 MB
/**
* This section contains Flatbuffer message reading/writing helpers.
@@ -480,13 +480,13 @@ namespace fbschema::p2pmsg
const flatbuffers::Offset<Fs_Entry_Response> resp =
CreateFs_Entry_Response(
builder,
sv_to_flatbuff_str(builder, path),
statefshashentry_to_flatbuff_statefshashentry(builder, hash_nodes));
const flatbuffers::Offset<State_Response_Message> st_resp = CreateState_Response_Message(
builder, State_Response_Fs_Entry_Response,
resp.Union(),
hash_to_flatbuff_bytes(builder, expected_hash));
hash_to_flatbuff_bytes(builder, expected_hash),
sv_to_flatbuff_str(builder, path));
flatbuffers::Offset<Content> message = CreateContent(builder, Message_State_Response_Message, st_resp.Union());
builder.Finish(message); // Finished building message content to get serialised content.
@@ -515,7 +515,6 @@ namespace fbschema::p2pmsg
const flatbuffers::Offset<File_HashMap_Response> resp =
CreateFile_HashMap_Response(
builder,
sv_to_flatbuff_str(builder, path),
file_length,
sv_to_flatbuff_bytes(builder, hashmap_sv));
@@ -523,7 +522,8 @@ namespace fbschema::p2pmsg
builder,
State_Response_File_HashMap_Response,
resp.Union(),
hash_to_flatbuff_bytes(builder, expected_hash));
hash_to_flatbuff_bytes(builder, expected_hash),
sv_to_flatbuff_str(builder, path));
flatbuffers::Offset<Content> message = CreateContent(builder, Message_State_Response_Message, st_resp.Union());
builder.Finish(message); // Finished building message content to get serialised content.
@@ -547,7 +547,6 @@ namespace fbschema::p2pmsg
const flatbuffers::Offset<Block_Response> resp =
CreateBlock_Response(
builder,
sv_to_flatbuff_str(builder, block_resp.path),
block_resp.block_id,
sv_to_flatbuff_bytes(builder, block_resp.data));
@@ -555,7 +554,8 @@ namespace fbschema::p2pmsg
builder,
State_Response_Block_Response,
resp.Union(),
hash_to_flatbuff_bytes(builder, block_resp.hash));
hash_to_flatbuff_bytes(builder, block_resp.hash),
sv_to_flatbuff_str(builder, block_resp.path));
flatbuffers::Offset<Content> message = CreateContent(builder, Message_State_Response_Message, st_resp.Union());
builder.Finish(message); // Finished building message content to get serialised content.
@@ -710,7 +710,7 @@ namespace fbschema::p2pmsg
entry.name = flatbuff_str_to_sv(f_hash->name());
entry.is_file = f_hash->is_file();
entry.hash = flatbuff_bytes_to_hash(f_hash->hash());
fs_entries.emplace(entry.name, std::move(entry));
}
}

View File

@@ -309,7 +309,8 @@ int extract_signed_input_container(
return -1;
}
// Verify the signature of the content.
// We do not verify the signature of the content here since we need to let each node
// (including self) to verify that individually after we broadcast the NUP proposal.
const std::string content(d[FLD_CONTENT].GetString(), d[FLD_CONTENT].GetStringLength());

View File

@@ -13,6 +13,7 @@
#include "cons/cons.hpp"
#include "hpfs/hpfs.hpp"
#include "state/state_sync.hpp"
#include "state/state_serve.hpp"
/**
* Parses CLI args and extracts hot pocket command and parameters given.
@@ -68,6 +69,7 @@ void deinit()
cons::deinit();
sc::deinit();
state_sync::deinit();
state_serve::deinit();
usr::deinit();
p2p::deinit();
hpfs::deinit();
@@ -191,7 +193,7 @@ int main(int argc, char **argv)
<< (conf::cfg.startup_mode == conf::OPERATING_MODE::OBSERVER ? "Observer" : "Proposer");
if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 ||
state_sync::init() != 0 || cons::init() != 0)
state_serve::init() != 0 || state_sync::init() != 0 || cons::init() != 0)
{
deinit();
return -1;

View File

@@ -14,6 +14,7 @@ namespace p2p
// Holds global connected-peers and related objects.
connected_context ctx;
uint64_t metric_thresholds[4];
bool init_success = false;
/**
@@ -22,6 +23,11 @@ namespace p2p
*/
int init()
{
metric_thresholds[0] = conf::cfg.peermaxcpm;
metric_thresholds[1] = conf::cfg.peermaxdupmpm;
metric_thresholds[2] = conf::cfg.peermaxbadsigpm;
metric_thresholds[3] = conf::cfg.peermaxbadmpm;
//Entry point for p2p which will start peer connections to other nodes
if (start_peer_connections() == -1)
return -1;
@@ -41,7 +47,6 @@ namespace p2p
int start_peer_connections()
{
const uint64_t metric_thresholds[] = {conf::cfg.peermaxcpm, conf::cfg.peermaxdupmpm, conf::cfg.peermaxbadsigpm, conf::cfg.peermaxbadmpm};
if (ctx.listener.start(
conf::cfg.peerport, ".sock-peer", comm::SESSION_TYPE::PEER, true, false, metric_thresholds, conf::cfg.peers, conf::cfg.peermaxsize) == -1)
return -1;

View File

@@ -13,132 +13,136 @@
namespace p2p
{
struct proposal
{
std::string pubkey;
uint64_t timestamp;
uint64_t time;
uint8_t stage;
std::string lcl;
hpfs::h32 state;
std::set<std::string> users;
std::set<std::string> hash_inputs;
std::set<std::string> hash_outputs;
};
struct proposal
{
std::string pubkey;
uint64_t timestamp;
uint64_t time;
uint8_t stage;
std::string lcl;
hpfs::h32 state;
std::set<std::string> users;
std::set<std::string> hash_inputs;
std::set<std::string> hash_outputs;
};
struct nonunl_proposal
{
std::unordered_map<std::string, const std::list<usr::user_submitted_message>> user_messages;
};
struct nonunl_proposal
{
std::unordered_map<std::string, const std::list<usr::user_submitted_message>> user_messages;
};
struct history_request
{
std::string minimum_lcl;
std::string required_lcl;
};
struct history_request
{
std::string minimum_lcl;
std::string required_lcl;
};
struct history_ledger
{
std::string state;
std::string lcl;
std::vector<uint8_t> raw_ledger;
};
struct history_ledger
{
std::string state;
std::string lcl;
std::vector<uint8_t> raw_ledger;
};
struct peer_challenge_response
{
std::string challenge;
std::string signature;
std::string pubkey;
};
struct peer_challenge_response
{
std::string challenge;
std::string signature;
std::string pubkey;
};
enum LEDGER_RESPONSE_ERROR
{
NONE = 0,
INVALID_MIN_LEDGER = 1,
REQ_LEDGER_NOT_FOUND = 2
};
enum LEDGER_RESPONSE_ERROR
{
NONE = 0,
INVALID_MIN_LEDGER = 1,
REQ_LEDGER_NOT_FOUND = 2
};
struct history_response
{
std::map<uint64_t, const history_ledger> hist_ledgers;
LEDGER_RESPONSE_ERROR error;
};
struct history_response
{
std::map<uint64_t, const history_ledger> hist_ledgers;
LEDGER_RESPONSE_ERROR error;
};
struct npl_message
{
std::string data;
};
struct npl_message
{
std::string data;
};
// Represents a state request sent to a peer.
struct state_request
{
std::string parent_path; // The requested file or dir path.
bool is_file; // Whether the path is a file or dir.
int32_t block_id; // Block id of the file if we are requesting for file block. Otherwise -1.
hpfs::h32 expected_hash; // The expected hash of the requested result.
};
// Represents a state request sent to a peer.
struct state_request
{
std::string parent_path; // The requested file or dir path.
bool is_file; // Whether the path is a file or dir.
int32_t block_id; // Block id of the file if we are requesting for file block. Otherwise -1.
hpfs::h32 expected_hash; // The expected hash of the requested result.
};
// Represents state file system entry.
struct state_fs_hash_entry
{
std::string name; // Name of the file/dir.
bool is_file; // Whether this is a file or dir.
hpfs::h32 hash; // Hash of the file or dir.
};
// Represents state file system entry.
struct state_fs_hash_entry
{
std::string name; // Name of the file/dir.
bool is_file; // Whether this is a file or dir.
hpfs::h32 hash; // Hash of the file or dir.
};
// Represents a file block data resposne.
struct block_response
{
std::string path; // Path of the file.
uint32_t block_id; // Id of the block where the data belongs to.
std::string_view data; // The block data.
hpfs::h32 hash; // Hash of the bloc data.
};
// Represents a file block data resposne.
struct block_response
{
std::string path; // Path of the file.
uint32_t block_id; // Id of the block where the data belongs to.
std::string_view data; // The block data.
hpfs::h32 hash; // Hash of the bloc data.
};
struct message_collection
{
std::list<proposal> proposals;
std::mutex proposals_mutex; // Mutex for proposals access race conditions.
struct message_collection
{
std::list<proposal> proposals;
std::mutex proposals_mutex; // Mutex for proposals access race conditions.
std::list<nonunl_proposal> nonunl_proposals;
std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions.
std::list<nonunl_proposal> nonunl_proposals;
std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions.
// NPL messages are stored as string list because we are feeding the npl messages as it is (byte array) to the contract.
std::list<std::string> npl_messages;
std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions.
// NPL messages are stored as string list because we are feeding the npl messages as it is (byte array) to the contract.
std::list<std::string> npl_messages;
std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions.
std::list<std::string> state_response;
std::mutex state_response_mutex; // Mutex for state response access race conditions.
};
// List of pairs indicating the session pubkey hex and the state requests.
std::list<std::pair<std::string, std::string>> state_requests;
std::mutex state_requests_mutex; // Mutex for state requests access race conditions.
struct connected_context
{
// Holds all the messages until they are processed by consensus.
message_collection collected_msgs;
std::list<std::string> state_responses;
std::mutex state_responses_mutex; // Mutex for state responses access race conditions.
};
// Set of currently connected peer connections mapped by the uniqueid of socket session.
std::unordered_map<std::string, comm::comm_session *> peer_connections;
struct connected_context
{
// Holds all the messages until they are processed by consensus.
message_collection collected_msgs;
std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions.
// Set of currently connected peer connections mapped by the uniqueid of socket session.
std::unordered_map<std::string, comm::comm_session *> peer_connections;
comm::comm_server listener;
};
std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions.
extern connected_context ctx;
comm::comm_server listener;
};
int init();
extern connected_context ctx;
void deinit();
int init();
int start_peer_connections();
void deinit();
int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp);
int start_peer_connections();
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self);
int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp);
void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf);
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self);
void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf);
void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf);
void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf);
} // namespace p2p

View File

@@ -13,7 +13,6 @@
#include "peer_session_handler.hpp"
#include "../cons/ledger_handler.hpp"
#include "../state/state_sync.hpp"
#include "../state/state_serve.hpp"
#include "../cons/cons.hpp"
namespace p2pmsg = fbschema::p2pmsg;
@@ -145,15 +144,16 @@ namespace p2p
}
else if (content_message_type == p2pmsg::Message_State_Request_Message)
{
const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message());
flatbuffers::FlatBufferBuilder fbuf(1024);
if (state_serve::create_state_response(fbuf, sr) == 0)
if (p2pmsg::validate_container_trust(container) != 0)
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
session.send(msg);
LOG_DBG << "State request message rejected due to trust failure. " << session.uniqueid;
return 0;
}
// Insert request with lock.
std::lock_guard<std::mutex> lock(ctx.collected_msgs.state_requests_mutex);
std::string state_request_msg(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg)));
}
else if (content_message_type == p2pmsg::Message_State_Response_Message)
{
@@ -166,13 +166,19 @@ namespace p2p
if (state_sync::ctx.is_syncing) // Only accept state responses if state is syncing.
{
// Insert state_response with lock.
std::lock_guard<std::mutex> lock(ctx.collected_msgs.state_response_mutex);
std::lock_guard<std::mutex> lock(ctx.collected_msgs.state_responses_mutex);
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_response.push_back(std::move(response));
ctx.collected_msgs.state_responses.push_back(std::move(response));
}
}
else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message
{
if (p2pmsg::validate_container_trust(container) != 0)
{
LOG_DBG << "History request message rejected due to trust failure. " << session.uniqueid;
return 0;
}
const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message());
//first check node has the required lcl available. -> if so send lcl history accordingly.
const bool req_lcl_avail = cons::check_required_lcl_availability(hr);

View File

@@ -0,0 +1,17 @@
#ifndef _HP_CONS_STATE_COMMON_
#define _HP_CONS_STATE_COMMON_
#include "../pchheader.hpp"
#include "../conf.hpp"
namespace state_common
{
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
inline uint16_t get_request_resubmit_timeout()
{
return conf::cfg.roundtime;
}
}
#endif

View File

@@ -9,13 +9,101 @@
#include "../cons/cons.hpp"
#include "../hplog.hpp"
#include "state_serve.hpp"
#include "state_common.hpp"
namespace p2pmsg = fbschema::p2pmsg;
/**
* Helper functions for serving state requests from other peers.
*/
namespace state_serve
{
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr uint16_t LOOP_WAIT = 100; // Milliseconds
uint16_t REQUEST_BATCH_TIMEOUT;
bool is_shutting_down = false;
bool init_success = false;
std::thread state_serve_thread;
int init()
{
REQUEST_BATCH_TIMEOUT = state_common::get_request_resubmit_timeout() * 0.9;
state_serve_thread = std::thread(state_serve_loop);
init_success = true;
return 0;
}
void deinit()
{
if (init_success)
{
is_shutting_down = true;
state_serve_thread.join();
}
}
void state_serve_loop()
{
util::mask_signal();
LOG_INFO << "State server started.";
std::list<std::pair<std::string, std::string>> state_requests;
while (!is_shutting_down)
{
util::sleep(LOOP_WAIT);
{
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.state_requests_mutex);
// Move collected state requests over to local requests list.
if (!p2p::ctx.collected_msgs.state_requests.empty())
state_requests.splice(state_requests.end(), p2p::ctx.collected_msgs.state_requests);
}
uint64_t time_start = util::get_epoch_milliseconds();
for (auto &[session_id, request] : state_requests)
{
if (is_shutting_down)
break;
const fbschema::p2pmsg::Content *content = fbschema::p2pmsg::GetContent(request.data());
const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message());
flatbuffers::FlatBufferBuilder fbuf(1024);
uint64_t time_now = util::get_epoch_milliseconds();
// If we have spent too much time handling state requests, abandon the entire batch
// because the requester would have stopped waiting for us.
if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT)
break;
if (state_serve::create_state_response(fbuf, sr) == 0)
{
// Find the peer that we should send the state response to.
std::lock_guard<std::mutex> lock(p2p::ctx.peer_connections_mutex);
const auto peer_itr = p2p::ctx.peer_connections.find(session_id);
if (peer_itr != p2p::ctx.peer_connections.end())
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
const comm::comm_session *session = peer_itr->second;
session->send(msg);
}
}
}
state_requests.clear();
}
LOG_INFO << "State server stopped.";
}
/**
* Creates the reply message for a given state request.
@@ -34,7 +122,7 @@ namespace state_serve
std::vector<uint8_t> block;
if (get_file_block(block, sr.parent_path, sr.block_id, sr.expected_hash) == -1)
{
LOG_ERR << "Error in getting file block.";
LOG_ERR << "Error in getting file block: " << sr.parent_path;
return -1;
}
@@ -55,7 +143,7 @@ namespace state_serve
std::size_t file_length = 0;
if (get_file_block_hashes(block_hashes, file_length, sr.parent_path, sr.expected_hash) == -1)
{
LOG_ERR << "Error in getting block hashes.";
LOG_ERR << "Error in getting block hashes: " << sr.parent_path;
return -1;
}
@@ -70,7 +158,7 @@ namespace state_serve
std::vector<hpfs::child_hash_node> child_hash_nodes;
if (get_fs_entry_hashes(child_hash_nodes, sr.parent_path, sr.expected_hash) == -1)
{
LOG_ERR << "Error in getting fs entries.";
LOG_ERR << "Error in getting fs entries: " << sr.parent_path;
return -1;
}
@@ -115,7 +203,7 @@ namespace state_serve
// Get actual block data.
{
const std::string file_path = std::string(mount_dir).append(vpath);
const off_t block_offset = block_id * BLOCK_SIZE;
const off_t block_offset = block_id * state_common::BLOCK_SIZE;
fd = open(file_path.c_str(), O_RDONLY);
if (fd == -1)
{
@@ -142,7 +230,7 @@ namespace state_serve
goto failure;
}
const size_t read_len = MIN(BLOCK_SIZE, (st.st_size - block_offset));
const size_t read_len = MIN(state_common::BLOCK_SIZE, (st.st_size - block_offset));
block.resize(read_len);
lseek(fd, block_offset, SEEK_SET);

View File

@@ -8,6 +8,12 @@
namespace state_serve
{
int init();
void deinit();
void state_serve_loop();
int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr);
int get_file_block(std::vector<uint8_t> &vec, const std::string_view vpath,

View File

@@ -1,4 +1,3 @@
#include "../state/state_sync.hpp"
#include "../fbschema/p2pmsg_helpers.hpp"
#include "../fbschema/p2pmsg_content_generated.h"
#include "../fbschema/common_helpers.hpp"
@@ -9,6 +8,8 @@
#include "../util.hpp"
#include "../hpfs/hpfs.hpp"
#include "../hpfs/h32.hpp"
#include "state_sync.hpp"
#include "state_common.hpp"
namespace state_sync
{
@@ -16,33 +17,35 @@ namespace state_sync
constexpr uint16_t IDLE_WAIT = 50;
// Max number of requests that can be awaiting response at any given time.
constexpr uint16_t MAX_AWAITING_REQUESTS = 1;
constexpr uint16_t MAX_AWAITING_REQUESTS = 4;
// Request loop sleep time (milliseconds).
constexpr uint16_t REQUEST_LOOP_WAIT = 20;
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr int FILE_PERMS = 0644;
// No. of milliseconds to wait before resubmitting a request.
uint16_t REQUEST_RESUBMIT_TIMEOUT;
sync_context ctx;
bool init_success = false;
int init()
{
REQUEST_RESUBMIT_TIMEOUT = conf::cfg.roundtime / 2;
REQUEST_RESUBMIT_TIMEOUT = state_common::get_request_resubmit_timeout();
ctx.target_state = hpfs::h32_empty;
ctx.state_sync_thread = std::thread(state_syncer_loop);
init_success = true;
return 0;
}
void deinit()
{
ctx.is_syncing = false;
ctx.is_shutting_down = true;
ctx.state_sync_thread.join();
if (init_success)
{
ctx.is_syncing = false;
ctx.is_shutting_down = true;
ctx.state_sync_thread.join();
}
}
/**
@@ -144,11 +147,11 @@ namespace state_sync
util::sleep(REQUEST_LOOP_WAIT);
{
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.state_response_mutex);
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.state_responses_mutex);
// Move collected state responses over to local candidate responses list.
if (!p2p::ctx.collected_msgs.state_response.empty())
ctx.candidate_state_responses.splice(ctx.candidate_state_responses.end(), p2p::ctx.collected_msgs.state_response);
if (!p2p::ctx.collected_msgs.state_responses.empty())
ctx.candidate_state_responses.splice(ctx.candidate_state_responses.end(), p2p::ctx.collected_msgs.state_responses);
}
for (auto &response : ctx.candidate_state_responses)
@@ -159,12 +162,15 @@ namespace state_sync
const fbschema::p2pmsg::Content *content = fbschema::p2pmsg::GetContent(response.data());
const fbschema::p2pmsg::State_Response_Message *resp_msg = content->message_as_State_Response_Message();
// Check whether we are actually waiting for this response's hash. If not, ignore it.
const hpfs::h32 response_hash = fbschema::flatbuff_bytes_to_hash(resp_msg->hash());
const auto pending_resp_itr = ctx.submitted_requests.find(response_hash);
// Check whether we are actually waiting for this response. If not, ignore it.
std::string_view hash = fbschema::flatbuff_bytes_to_sv(resp_msg->hash());
std::string_view vpath = fbschema::flatbuff_str_to_sv(resp_msg->path());
const std::string key = std::string(vpath).append(hash);
const auto pending_resp_itr = ctx.submitted_requests.find(key);
if (pending_resp_itr == ctx.submitted_requests.end())
{
LOG_DBG << "Skipping state response due to hash mismatch. Received:" << response_hash;
LOG_DBG << "Skipping state response due to hash mismatch.";
continue;
}
@@ -175,11 +181,11 @@ namespace state_sync
const fbschema::p2pmsg::State_Response msg_type = resp_msg->state_response_type();
if (msg_type == fbschema::p2pmsg::State_Response_Fs_Entry_Response)
handle_fs_entry_response(resp_msg->state_response_as_Fs_Entry_Response());
handle_fs_entry_response(vpath, resp_msg->state_response_as_Fs_Entry_Response());
else if (msg_type == fbschema::p2pmsg::State_Response_File_HashMap_Response)
handle_file_hashmap_response(resp_msg->state_response_as_File_HashMap_Response());
handle_file_hashmap_response(vpath, resp_msg->state_response_as_File_HashMap_Response());
else if (msg_type == fbschema::p2pmsg::State_Response_Block_Response)
handle_file_block_response(resp_msg->state_response_as_Block_Response());
handle_file_block_response(vpath, resp_msg->state_response_as_Block_Response());
// After handling each response, check whether we have reached target state.
hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/");
@@ -269,7 +275,9 @@ namespace state_sync
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
ctx.submitted_requests.try_emplace(request.expected_hash, request);
const std::string key = std::string(request.path)
.append(reinterpret_cast<const char *>(&request.expected_hash), sizeof(hpfs::h32));
ctx.submitted_requests.try_emplace(key, request);
const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR;
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash);
@@ -278,10 +286,9 @@ namespace state_sync
/**
* Process dir children response.
*/
int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp)
int handle_fs_entry_response(std::string_view parent_vpath, const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp)
{
// Get the parent path of the fs entries we have received.
std::string_view parent_vpath = fbschema::flatbuff_str_to_sv(fs_entry_resp->path());
LOG_DBG << "State sync: Processing fs entries response for " << parent_vpath;
// Get fs entries we have received.
@@ -355,10 +362,9 @@ namespace state_sync
/**
* Process file block hash map response.
*/
int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp)
int handle_file_hashmap_response(std::string_view file_vpath, const fbschema::p2pmsg::File_HashMap_Response *file_resp)
{
// Get the file path of the block hashes we have received.
std::string file_vpath = std::string(fbschema::flatbuff_str_to_sv(file_resp->path()));
LOG_DBG << "State sync: Processing file block hashes response for " << file_vpath;
// File block hashes on our side (file might not exist on our side).
@@ -378,7 +384,7 @@ namespace state_sync
{
// Insert at front to give priority to block requests while preserving block order.
if (block_id >= existing_hash_count || existing_hashes[block_id] != peer_hashes[block_id])
ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, file_vpath, block_id, peer_hashes[block_id]});
ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(file_vpath), block_id, peer_hashes[block_id]});
}
if (existing_hashes.size() >= peer_hash_count)
@@ -395,10 +401,9 @@ namespace state_sync
/**
* Process file block response.
*/
int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg)
int handle_file_block_response(std::string_view file_vpath, const fbschema::p2pmsg::Block_Response *block_msg)
{
// Get the file path of the block data we have received.
std::string_view file_vpath = fbschema::flatbuff_str_to_sv(block_msg->path());
const uint32_t block_id = block_msg->block_id();
std::string_view buf = fbschema::flatbuff_bytes_to_sv(block_msg->data());
@@ -414,7 +419,7 @@ namespace state_sync
return -1;
}
const off_t offset = block_id * BLOCK_SIZE;
const off_t offset = block_id * state_common::BLOCK_SIZE;
const int res = pwrite(fd, buf.data(), buf.length(), offset);
close(fd);
if (res < buf.length())

View File

@@ -40,8 +40,8 @@ namespace state_sync
// List of pending sync requests to be sent out.
std::list<backlog_item> pending_requests;
// List of submitted requests we are awaiting responses for, keyed by expected response hash.
std::unordered_map<hpfs::h32, backlog_item, hpfs::h32_std_key_hasher> submitted_requests;
// List of submitted requests we are awaiting responses for, keyed by expected response path+hash.
std::unordered_map<std::string, backlog_item> submitted_requests;
std::thread state_sync_thread;
std::mutex target_state_update_lock;
@@ -72,11 +72,11 @@ namespace state_sync
void submit_request(const backlog_item &request);
int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp);
int handle_fs_entry_response(std::string_view parent_vpath, const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp);
int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp);
int handle_file_hashmap_response(std::string_view file_vpath, const fbschema::p2pmsg::File_HashMap_Response *file_resp);
int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg);
int handle_file_block_response(std::string_view file_vpath, const fbschema::p2pmsg::Block_Response *block_msg);
} // namespace state_sync

View File

@@ -18,6 +18,7 @@ namespace usr
// Holds global connected-users and related objects.
connected_context ctx;
uint64_t metric_thresholds[4];
bool init_success = false;
/**
@@ -26,6 +27,11 @@ namespace usr
*/
int init()
{
metric_thresholds[0] = conf::cfg.pubmaxcpm;
metric_thresholds[1] = 0;
metric_thresholds[2] = 0;
metric_thresholds[3] = conf::cfg.pubmaxbadmpm;
// Start listening for incoming user connections.
if (start_listening() == -1)
return -1;
@@ -48,7 +54,6 @@ namespace usr
*/
int start_listening()
{
const uint64_t metric_thresholds[] = {conf::cfg.pubmaxcpm, 0, 0, conf::cfg.pubmaxbadmpm};
if (ctx.listener.start(
conf::cfg.pubport, ".sock-user", comm::SESSION_TYPE::USER, true, true, metric_thresholds, std::set<conf::ip_port_pair>(), conf::cfg.pubmaxsize) == -1)
return -1;