mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
User inputs round limit. (#240)
This commit is contained in:
@@ -849,8 +849,12 @@ namespace conf
|
||||
jsoncons::ojson appbill;
|
||||
appbill.insert_or_assign("mode", contract.appbill.mode);
|
||||
appbill.insert_or_assign("bin_args", contract.appbill.bin_args);
|
||||
|
||||
jdoc.insert_or_assign("appbill", appbill);
|
||||
|
||||
jsoncons::ojson round_limits;
|
||||
round_limits.insert_or_assign("user_input_bytes", contract.round_limits.user_input_bytes);
|
||||
round_limits.insert_or_assign("user_output_bytes", contract.round_limits.user_output_bytes);
|
||||
jdoc.insert_or_assign("round_limits", round_limits);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -938,6 +942,9 @@ namespace conf
|
||||
return -1;
|
||||
}
|
||||
contract.appbill.bin_args = jdoc["appbill"]["bin_args"].as<std::string>();
|
||||
|
||||
contract.round_limits.user_input_bytes = jdoc["round_limits"]["user_input_bytes"].as<size_t>();
|
||||
contract.round_limits.user_output_bytes = jdoc["round_limits"]["user_output_bytes"].as<size_t>();
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
|
||||
@@ -84,6 +84,13 @@ namespace conf
|
||||
// Config element which are initialized in memory (This is not directly loaded from the config file)
|
||||
std::vector<std::string> runtime_args; // Appbill execution args used during runtime.
|
||||
};
|
||||
|
||||
struct round_limits_config
|
||||
{
|
||||
size_t user_input_bytes = 0; // Max contract input bytes per user per round.
|
||||
size_t user_output_bytes = 0; // Max contract output bytes per user per round.
|
||||
};
|
||||
|
||||
struct contract_config
|
||||
{
|
||||
std::string id; // Contract guid.
|
||||
@@ -97,6 +104,7 @@ namespace conf
|
||||
bool is_consensus_public = false; // If true, consensus are broadcasted to non-unl nodes as well.
|
||||
bool is_npl_public = false; // If true, npl messages are broadcasted to non-unl nodes as well.
|
||||
appbill_config appbill;
|
||||
round_limits_config round_limits;
|
||||
|
||||
// Config element which are initialized in memory (This is not directly loaded from the config file)
|
||||
std::vector<std::string> runtime_binexec_args; // Contract binary execution args used during runtime.
|
||||
|
||||
@@ -364,8 +364,9 @@ namespace consensus
|
||||
// Construct NUP.
|
||||
for (auto &[sid, user] : usr::ctx.users)
|
||||
{
|
||||
std::list<usr::user_input> user_inputs;
|
||||
std::list<usr::submitted_user_input> user_inputs;
|
||||
user_inputs.splice(user_inputs.end(), user.submitted_inputs);
|
||||
user.collected_input_size = 0; // Reset the collected inputs size counter.
|
||||
|
||||
// We should create an entry for each user pubkey, even if the user has no inputs. This is
|
||||
// because this data map will be used to track connected users as well in addition to inputs.
|
||||
@@ -435,109 +436,81 @@ namespace consensus
|
||||
*/
|
||||
int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no)
|
||||
{
|
||||
// Move over NUPs collected from the network into a local list.
|
||||
std::list<p2p::nonunl_proposal> collected_nups;
|
||||
{
|
||||
std::scoped_lock lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex);
|
||||
collected_nups.splice(collected_nups.end(), p2p::ctx.collected_msgs.nonunl_proposals);
|
||||
}
|
||||
// Maintains users and any input-acceptance responses we should send to them.
|
||||
// Key: user pubkey. Value: List of responses for that user.
|
||||
std::unordered_map<std::string, std::vector<usr::input_status_response>> responses;
|
||||
|
||||
// Prepare merged list of users with each user's inputs grouped under the user.
|
||||
// Maintains merged list of users with each user's inputs grouped under the user.
|
||||
// Key: user pubkey, Value: List of inputs from the user.
|
||||
std::unordered_map<std::string, std::list<usr::user_input>> input_groups;
|
||||
for (p2p::nonunl_proposal &p : collected_nups)
|
||||
std::unordered_map<std::string, std::list<usr::submitted_user_input>> input_groups;
|
||||
|
||||
// Move over NUPs collected from the network input groups (grouped by user).
|
||||
{
|
||||
for (auto &[pubkey, umsgs] : p.user_inputs)
|
||||
std::list<p2p::nonunl_proposal> collected_nups;
|
||||
{
|
||||
// Move any user inputs from each NUP over to the grouped inputs under the user pubkey.
|
||||
std::list<usr::user_input> &input_list = input_groups[pubkey];
|
||||
input_list.splice(input_list.end(), umsgs);
|
||||
std::scoped_lock lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex);
|
||||
collected_nups.splice(collected_nups.end(), p2p::ctx.collected_msgs.nonunl_proposals);
|
||||
}
|
||||
|
||||
for (p2p::nonunl_proposal &p : collected_nups)
|
||||
{
|
||||
for (auto &[pubkey, sbmitted_inputs] : p.user_inputs)
|
||||
{
|
||||
// Move any user inputs from each NUP over to the grouped inputs under the user pubkey.
|
||||
std::list<usr::submitted_user_input> &input_list = input_groups[pubkey];
|
||||
input_list.splice(input_list.end(), sbmitted_inputs);
|
||||
}
|
||||
}
|
||||
}
|
||||
collected_nups.clear();
|
||||
|
||||
// Maintains users and any input-acceptance responses we should send to them.
|
||||
// Key: user pubkey. Value: List of [user-protocol, msg-sig, reject-reason] tuples.
|
||||
std::unordered_map<std::string, std::list<std::tuple<const util::PROTOCOL, const std::string, const char *>>> responses;
|
||||
|
||||
for (const auto &[pubkey, umsgs] : input_groups)
|
||||
for (auto &[pubkey, submitted_inputs] : input_groups)
|
||||
{
|
||||
// Populate user list with this user's pubkey.
|
||||
ctx.candidate_users.emplace(pubkey);
|
||||
|
||||
std::list<usr::extracted_user_input> extracted_inputs;
|
||||
|
||||
for (const usr::submitted_user_input &submitted_input : submitted_inputs)
|
||||
{
|
||||
usr::extracted_user_input extracted = {};
|
||||
const char *reject_reason = usr::extract_submitted_input(pubkey, submitted_input, extracted);
|
||||
|
||||
if (reject_reason == NULL)
|
||||
extracted_inputs.push_back(std::move(extracted));
|
||||
else
|
||||
responses[pubkey].push_back(usr::input_status_response{submitted_input.protocol, submitted_input.sig, reject_reason});
|
||||
}
|
||||
|
||||
// This will sort the inputs in nonce order so the validation will follow the same order on all nodes.
|
||||
extracted_inputs.sort();
|
||||
|
||||
// Keep track of total input length to verify against remaining balance.
|
||||
// We only process inputs in the submitted order that can be satisfied with the remaining account balance.
|
||||
size_t total_input_len = 0;
|
||||
bool appbill_balance_exceeded = false;
|
||||
size_t total_input_size = 0;
|
||||
|
||||
for (const usr::user_input &umsg : umsgs)
|
||||
for (const usr::extracted_user_input &extracted_input : extracted_inputs)
|
||||
{
|
||||
const char *reject_reason = NULL;
|
||||
util::buffer_view stored_input; // Contains pointer to the input data stored in memfd accessed by the contract.
|
||||
std::string hash;
|
||||
|
||||
if (appbill_balance_exceeded)
|
||||
{
|
||||
reject_reason = msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED;
|
||||
}
|
||||
else
|
||||
{
|
||||
util::buffer_view input;
|
||||
std::string hash;
|
||||
uint64_t max_lcl_seqno;
|
||||
reject_reason = usr::validate_user_input_submission(pubkey, umsg, lcl_seq_no, total_input_len, hash, input, max_lcl_seqno);
|
||||
// Validate the input against all submission criteria.
|
||||
const char *reject_reason = usr::validate_user_input_submission(pubkey, extracted_input, lcl_seq_no, total_input_size, hash, stored_input);
|
||||
|
||||
if (reject_reason == NULL && !input.is_null())
|
||||
{
|
||||
// No reject reason means we should go ahead and subject the input to consensus.
|
||||
ctx.candidate_user_inputs.try_emplace(
|
||||
hash,
|
||||
candidate_user_input(pubkey, input, max_lcl_seqno));
|
||||
}
|
||||
else if (reject_reason == msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED)
|
||||
{
|
||||
// Abandon processing further inputs from this user when we find out
|
||||
// an input cannot be processed with the account balance.
|
||||
appbill_balance_exceeded = true;
|
||||
}
|
||||
if (reject_reason == NULL && !stored_input.is_null())
|
||||
{
|
||||
// No reject reason means we should go ahead and subject the input to consensus.
|
||||
ctx.candidate_user_inputs.try_emplace(
|
||||
hash,
|
||||
candidate_user_input(pubkey, stored_input, extracted_input.max_lcl_seqno));
|
||||
}
|
||||
|
||||
responses[pubkey].push_back(std::tuple<const util::PROTOCOL, const std::string, const char *>(umsg.protocol, umsg.sig, reject_reason));
|
||||
responses[pubkey].push_back(usr::input_status_response{extracted_input.protocol, extracted_input.sig, reject_reason});
|
||||
}
|
||||
}
|
||||
|
||||
input_groups.clear();
|
||||
|
||||
{
|
||||
// Lock the user sessions.
|
||||
std::scoped_lock lock(usr::ctx.users_mutex);
|
||||
|
||||
for (auto &[pubkey, user_responses] : responses)
|
||||
{
|
||||
// Locate this user's socket session.
|
||||
const auto user_itr = usr::ctx.users.find(pubkey);
|
||||
if (user_itr != usr::ctx.users.end())
|
||||
{
|
||||
// Send the request status result if this user is connected to us.
|
||||
for (auto &resp : user_responses)
|
||||
{
|
||||
// resp: 0=protocl, 1=msg sig, 2=reject reason.
|
||||
const char *reject_reason = std::get<2>(resp);
|
||||
|
||||
// We are not sending any status response for 'already submitted' inputs. This is because the user
|
||||
// would have gotten the proper status response during first submission.
|
||||
if (reject_reason != msg::usrmsg::REASON_ALREADY_SUBMITTED)
|
||||
{
|
||||
msg::usrmsg::usrmsg_parser parser(std::get<0>(resp));
|
||||
const std::string &msg_sig = std::get<1>(resp);
|
||||
usr::send_input_status(parser,
|
||||
user_itr->second.session,
|
||||
reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED,
|
||||
reject_reason == NULL ? "" : reject_reason,
|
||||
msg_sig);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
usr::send_input_status_responses(responses);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -824,10 +797,7 @@ namespace consensus
|
||||
// Taking the raw input string from the buffer_view.
|
||||
std::string input;
|
||||
if (usr::input_store.read_buf(cand_input.input, input) != -1)
|
||||
{
|
||||
usr::raw_user_input raw_input(cand_input.userpubkey, std::move(input));
|
||||
raw_inputs.emplace(hash, std::move(raw_input));
|
||||
}
|
||||
raw_inputs.emplace(hash, usr::raw_user_input{cand_input.userpubkey, std::move(input)});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,6 +95,7 @@ namespace consensus
|
||||
std::map<util::h32, uint32_t> state_hash;
|
||||
std::map<util::h32, uint32_t> patch_hash;
|
||||
};
|
||||
|
||||
extern std::atomic<bool> is_patch_update_pending; // Keep track whether the patch file is changed by the SC and is not yet applied to runtime.
|
||||
|
||||
int init();
|
||||
|
||||
@@ -81,8 +81,10 @@ namespace msg::fbuf::ledger
|
||||
map.reserve(fbvec->size());
|
||||
for (auto el : *fbvec)
|
||||
{
|
||||
usr::raw_user_input raw_user_input(flatbuff_bytes_to_sv(el->pubkey()), flatbuff_bytes_to_sv(el->input()));
|
||||
map.emplace(flatbuff_bytes_to_sv(el->hash()), raw_user_input);
|
||||
map.emplace(flatbuff_bytes_to_sv(el->hash()),
|
||||
usr::raw_user_input{
|
||||
std::string(flatbuff_bytes_to_sv(el->pubkey())),
|
||||
std::string(flatbuff_bytes_to_sv(el->input()))});
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
@@ -730,21 +730,21 @@ namespace msg::fbuf::p2pmsg
|
||||
|
||||
//---Conversion helpers from flatbuffers data types to std data types---//
|
||||
|
||||
const std::unordered_map<std::string, std::list<usr::user_input>>
|
||||
const std::unordered_map<std::string, std::list<usr::submitted_user_input>>
|
||||
flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>> *fbvec)
|
||||
{
|
||||
std::unordered_map<std::string, std::list<usr::user_input>> map;
|
||||
std::unordered_map<std::string, std::list<usr::submitted_user_input>> map;
|
||||
map.reserve(fbvec->size());
|
||||
for (const UserInputGroup *group : *fbvec)
|
||||
{
|
||||
std::list<usr::user_input> user_inputs_list;
|
||||
std::list<usr::submitted_user_input> user_inputs_list;
|
||||
|
||||
for (const auto msg : *group->messages())
|
||||
{
|
||||
user_inputs_list.push_back(usr::user_input(
|
||||
flatbuff_bytes_to_sv(msg->input_container()),
|
||||
flatbuff_bytes_to_sv(msg->signature()),
|
||||
static_cast<util::PROTOCOL>(msg->protocol())));
|
||||
user_inputs_list.push_back(usr::submitted_user_input{
|
||||
std::string(flatbuff_bytes_to_sv(msg->input_container())),
|
||||
std::string(flatbuff_bytes_to_sv(msg->signature())),
|
||||
static_cast<util::PROTOCOL>(msg->protocol())});
|
||||
}
|
||||
|
||||
map.emplace(flatbuff_bytes_to_sv(group->pubkey()), std::move(user_inputs_list));
|
||||
@@ -756,14 +756,14 @@ namespace msg::fbuf::p2pmsg
|
||||
//---These are used in constructing Flatbuffer messages using builders---//
|
||||
|
||||
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>>>
|
||||
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::user_input>> &map)
|
||||
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::submitted_user_input>> &map)
|
||||
{
|
||||
std::vector<flatbuffers::Offset<UserInputGroup>> fbvec;
|
||||
fbvec.reserve(map.size());
|
||||
for (const auto &[pubkey, msglist] : map)
|
||||
{
|
||||
std::vector<flatbuffers::Offset<UserInput>> fbmsgsvec;
|
||||
for (const usr::user_input &msg : msglist)
|
||||
for (const usr::submitted_user_input &msg : msglist)
|
||||
{
|
||||
fbmsgsvec.push_back(CreateUserInput(
|
||||
builder,
|
||||
@@ -787,7 +787,7 @@ namespace msg::fbuf::p2pmsg
|
||||
|
||||
for (const HistoryLedgerBlockPair *pair : *fbvec)
|
||||
{
|
||||
std::list<usr::user_input> msglist;
|
||||
std::list<usr::submitted_user_input> msglist;
|
||||
|
||||
p2p::history_ledger_block ledger;
|
||||
|
||||
|
||||
@@ -80,13 +80,13 @@ namespace msg::fbuf::p2pmsg
|
||||
|
||||
//---Conversion helpers from flatbuffers data types to std data types---//
|
||||
|
||||
const std::unordered_map<std::string, std::list<usr::user_input>>
|
||||
const std::unordered_map<std::string, std::list<usr::submitted_user_input>>
|
||||
flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>> *fbvec);
|
||||
|
||||
//---Conversion helpers from std data types to flatbuffers data types---//
|
||||
|
||||
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>>>
|
||||
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::user_input>> &map);
|
||||
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::submitted_user_input>> &map);
|
||||
|
||||
const std::map<uint64_t, const p2p::history_ledger_block>
|
||||
flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerBlockPair>> *fbvec);
|
||||
|
||||
@@ -58,6 +58,8 @@ namespace msg::usrmsg
|
||||
constexpr const char *REASON_MAX_LEDGER_EXPIRED = "max_ledger_expired";
|
||||
constexpr const char *REASON_NONCE_EXPIRED = "nonce_expired";
|
||||
constexpr const char *REASON_ALREADY_SUBMITTED = "already_submitted";
|
||||
constexpr const char *REASON_NONCE_OVERFLOW = "nonce_overflow";
|
||||
constexpr const char *REASON_ROUND_INPUTS_OVERFLOW = "round_inputs_overflow";
|
||||
|
||||
} // namespace msg::usrmsg
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ namespace p2p
|
||||
|
||||
struct nonunl_proposal
|
||||
{
|
||||
std::unordered_map<std::string, std::list<usr::user_input>> user_inputs;
|
||||
std::unordered_map<std::string, std::list<usr::submitted_user_input>> user_inputs;
|
||||
};
|
||||
|
||||
struct history_request
|
||||
|
||||
@@ -8,22 +8,27 @@ namespace usr
|
||||
{
|
||||
|
||||
/**
|
||||
* Represents a signed contract input message a network user has submitted.
|
||||
*/
|
||||
struct user_input
|
||||
* Represents a signed contract input message a network user has submitted.
|
||||
*/
|
||||
struct submitted_user_input
|
||||
{
|
||||
const std::string input_container;
|
||||
const std::string sig;
|
||||
const util::PROTOCOL protocol; // The encoding protocol used for the input container.
|
||||
const util::PROTOCOL protocol; // The message protocol used by the user.
|
||||
};
|
||||
|
||||
user_input(const std::string input_container, const std::string sig, const util::PROTOCOL protocol)
|
||||
: input_container(std::move(input_container)), sig(std::move(sig)), protocol(protocol)
|
||||
{
|
||||
}
|
||||
struct extracted_user_input
|
||||
{
|
||||
std::string input;
|
||||
std::string nonce;
|
||||
uint64_t max_lcl_seqno;
|
||||
std::string sig;
|
||||
util::PROTOCOL protocol; // The message protocol used by the user.
|
||||
|
||||
user_input(std::string_view input_container, std::string_view sig, const util::PROTOCOL protocol)
|
||||
: input_container(input_container), sig(sig), protocol(protocol)
|
||||
// Comparison operator used for sorting user's inputs in nonce order.
|
||||
bool operator<(const extracted_user_input &other)
|
||||
{
|
||||
return nonce < other.nonce;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -31,11 +36,6 @@ namespace usr
|
||||
{
|
||||
const std::string pubkey;
|
||||
const std::string input;
|
||||
|
||||
raw_user_input(std::string_view pubkey, std::string_view input)
|
||||
: pubkey(pubkey), input(input)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace usr
|
||||
|
||||
@@ -66,7 +66,7 @@ namespace usr
|
||||
{
|
||||
// This is an authed user.
|
||||
connected_user &user = itr->second;
|
||||
if (handle_user_message(user, message) != 0)
|
||||
if (handle_authed_user_message(user, message) != 0)
|
||||
{
|
||||
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
|
||||
LOG_DEBUG << "Bad message from user " << session.display_name();
|
||||
|
||||
148
src/usr/usr.cpp
148
src/usr/usr.cpp
@@ -28,6 +28,8 @@ namespace usr
|
||||
uint64_t metric_thresholds[5];
|
||||
bool init_success = false;
|
||||
|
||||
constexpr size_t MAX_INPUT_NONCE_SIZE = 128;
|
||||
|
||||
/**
|
||||
* Initializes the usr subsystem. Must be called once during application startup.
|
||||
* @return 0 for successful initialization. -1 for failure.
|
||||
@@ -128,12 +130,12 @@ namespace usr
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a message sent by a connected user. This will be invoked by web socket on_message handler.
|
||||
* Processes a message sent by a authenticated user. This will be invoked by web socket on_message handler.
|
||||
* @param user The authenticated user who sent the message.
|
||||
* @param message The message sent by user.
|
||||
* @return 0 on successful processing. -1 for failure.
|
||||
*/
|
||||
int handle_user_message(connected_user &user, std::string_view message)
|
||||
int handle_authed_user_message(connected_user &user, std::string_view message)
|
||||
{
|
||||
msg::usrmsg::usrmsg_parser parser(user.protocol);
|
||||
|
||||
@@ -175,14 +177,33 @@ namespace usr
|
||||
uint64_t max_lcl_seqno;
|
||||
if (parser.extract_input_container(input_data, nonce, max_lcl_seqno, input_container) != -1)
|
||||
{
|
||||
// Check for max nonce size.
|
||||
if (nonce.size() > MAX_INPUT_NONCE_SIZE)
|
||||
{
|
||||
send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_NONCE_OVERFLOW, sig);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Check whether the newly received input is going to cause overflow of round input limit.
|
||||
if (conf::cfg.contract.round_limits.user_input_bytes > 0 &&
|
||||
(user.collected_input_size + input_data.size()) > conf::cfg.contract.round_limits.user_input_bytes)
|
||||
{
|
||||
send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW, sig);
|
||||
return -1;
|
||||
}
|
||||
|
||||
const int nonce_status = nonce_map.check(user.pubkey, nonce, sig, max_lcl_seqno, true);
|
||||
if (nonce_status == 0)
|
||||
{
|
||||
//Add to the submitted input list.
|
||||
user.submitted_inputs.push_back(user_input(
|
||||
user.submitted_inputs.push_back(submitted_user_input{
|
||||
std::move(input_container),
|
||||
std::move(sig),
|
||||
user.protocol));
|
||||
user.protocol});
|
||||
|
||||
// Increment the collected input size counter. This will be reset whenever collected inputs are moved
|
||||
// to concensus candidate input set.
|
||||
user.collected_input_size += input_data.size();
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
@@ -226,6 +247,39 @@ namespace usr
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends multiple user input responses grouped by user.
|
||||
*/
|
||||
void send_input_status_responses(const std::unordered_map<std::string, std::vector<input_status_response>> &responses)
|
||||
{
|
||||
// Lock the user sessions.
|
||||
std::scoped_lock lock(usr::ctx.users_mutex);
|
||||
|
||||
for (auto &[pubkey, user_responses] : responses)
|
||||
{
|
||||
// Locate this user's socket session.
|
||||
const auto user_itr = usr::ctx.users.find(pubkey);
|
||||
if (user_itr != usr::ctx.users.end())
|
||||
{
|
||||
// Send the request status result if this user is connected to us.
|
||||
for (const input_status_response &resp : user_responses)
|
||||
{
|
||||
// We are not sending any status response for 'already submitted' inputs. This is because the user
|
||||
// would have gotten the proper status response during first submission.
|
||||
if (resp.reject_reason != msg::usrmsg::REASON_ALREADY_SUBMITTED)
|
||||
{
|
||||
msg::usrmsg::usrmsg_parser parser(resp.protocol);
|
||||
send_input_status(parser,
|
||||
user_itr->second.session,
|
||||
resp.reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED,
|
||||
resp.reject_reason == NULL ? "" : resp.reject_reason,
|
||||
resp.sig);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the specified contract input status result via the provided session.
|
||||
*/
|
||||
@@ -300,61 +354,77 @@ namespace usr
|
||||
return 0;
|
||||
}
|
||||
|
||||
const char *extract_submitted_input(const std::string &user_pubkey, const usr::submitted_user_input &submitted, usr::extracted_user_input &extracted)
|
||||
{
|
||||
// Verify the signature of the submitted input_container.
|
||||
if (crypto::verify(submitted.input_container, submitted.sig, user_pubkey) == -1)
|
||||
{
|
||||
LOG_DEBUG << "User input bad signature.";
|
||||
return msg::usrmsg::REASON_BAD_SIG;
|
||||
}
|
||||
|
||||
// Extract information from input container.
|
||||
msg::usrmsg::usrmsg_parser parser(submitted.protocol);
|
||||
if (parser.extract_input_container(extracted.input, extracted.nonce, extracted.max_lcl_seqno, submitted.input_container) == -1)
|
||||
{
|
||||
LOG_DEBUG << "User input bad input container format.";
|
||||
return msg::usrmsg::REASON_BAD_MSG_FORMAT;
|
||||
}
|
||||
|
||||
extracted.sig = std::move(submitted.sig);
|
||||
extracted.protocol = submitted.protocol;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the provided user input message against all the required criteria.
|
||||
* @return The rejection reason if input rejected. NULL if the input can be accepted.
|
||||
*/
|
||||
const char *validate_user_input_submission(const std::string &user_pubkey, const usr::user_input &umsg,
|
||||
const uint64_t lcl_seq_no, size_t &total_input_len,
|
||||
std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno)
|
||||
const char *validate_user_input_submission(const std::string &user_pubkey, const usr::extracted_user_input &extracted_input,
|
||||
const uint64_t lcl_seq_no, size_t &total_input_size, std::string &hash, util::buffer_view &input)
|
||||
{
|
||||
// Verify the signature of the input_container.
|
||||
if (crypto::verify(umsg.input_container, umsg.sig, user_pubkey) == -1)
|
||||
{
|
||||
LOG_DEBUG << "User message bad signature.";
|
||||
return msg::usrmsg::REASON_BAD_SIG;
|
||||
}
|
||||
|
||||
std::string nonce;
|
||||
msg::usrmsg::usrmsg_parser parser(umsg.protocol);
|
||||
|
||||
std::string input_data;
|
||||
if (parser.extract_input_container(input_data, nonce, max_lcl_seqno, umsg.input_container) == -1)
|
||||
{
|
||||
LOG_DEBUG << "User message bad input format.";
|
||||
return msg::usrmsg::REASON_BAD_MSG_FORMAT;
|
||||
}
|
||||
|
||||
// Ignore the input if our ledger has passed the input TTL.
|
||||
if (max_lcl_seqno <= lcl_seq_no)
|
||||
if (extracted_input.max_lcl_seqno <= lcl_seq_no)
|
||||
{
|
||||
LOG_DEBUG << "User message bad max ledger seq expired.";
|
||||
LOG_DEBUG << "User input bad max ledger seq expired.";
|
||||
return msg::usrmsg::REASON_MAX_LEDGER_EXPIRED;
|
||||
}
|
||||
|
||||
const int nonce_status = nonce_map.check(user_pubkey, nonce, umsg.sig, max_lcl_seqno);
|
||||
// Check subtotal of inputs extracted so far with the input size limit.
|
||||
const size_t new_total_input_size = total_input_size + extracted_input.input.size();
|
||||
if (conf::cfg.contract.round_limits.user_input_bytes > 0 &&
|
||||
new_total_input_size > conf::cfg.contract.round_limits.user_input_bytes)
|
||||
{
|
||||
LOG_DEBUG << "User input input exceeds round limit.";
|
||||
return msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW;
|
||||
}
|
||||
|
||||
const int nonce_status = nonce_map.check(user_pubkey, extracted_input.nonce, extracted_input.sig, extracted_input.max_lcl_seqno);
|
||||
if (nonce_status > 0)
|
||||
{
|
||||
LOG_DEBUG << (nonce_status == 1 ? "User message nonce expired." : "User message with same nonce/sig already submitted.");
|
||||
LOG_DEBUG << (nonce_status == 1 ? "User input nonce expired." : "User input with same nonce/sig already submitted.");
|
||||
return (nonce_status == 1 ? msg::usrmsg::REASON_NONCE_EXPIRED : msg::usrmsg::REASON_ALREADY_SUBMITTED);
|
||||
}
|
||||
|
||||
// Keep checking the subtotal of inputs extracted so far with the appbill account balance.
|
||||
total_input_len += input_data.length();
|
||||
if (!verify_appbill_check(user_pubkey, total_input_len))
|
||||
if (!verify_appbill_check(user_pubkey, new_total_input_size))
|
||||
{
|
||||
LOG_DEBUG << "User message app bill balance exceeded.";
|
||||
LOG_DEBUG << "User input app bill balance exceeded.";
|
||||
return msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED;
|
||||
}
|
||||
|
||||
// Hash is prefixed with the nonce to support user-defined sort order.
|
||||
hash = std::move(nonce);
|
||||
// Append the hash of the message signature to get the final hash.
|
||||
hash.append(crypto::get_hash(umsg.sig));
|
||||
// Reaching here means the input is successfully validated and we can submit it to consensus.
|
||||
|
||||
// Copy the input data into the input store.
|
||||
std::string_view s();
|
||||
input = input_store.write_buf(input_data.data(), input_data.size());
|
||||
// Hash is used as the globally unqiue 'key' to represent this input for this consensus round.
|
||||
// It is prefixed with the nonce to support user-defined sort order and signature hash is appended
|
||||
// to make it unique among inputs from all users.
|
||||
hash = extracted_input.nonce + crypto::get_hash(extracted_input.sig);
|
||||
|
||||
// Copy the input data into the input store. Contract will read the input from this location.
|
||||
input = input_store.write_buf(extracted_input.input.data(), extracted_input.input.size());
|
||||
|
||||
// Increment the total valid input size so far.
|
||||
total_input_size = new_total_input_size;
|
||||
|
||||
return NULL; // Success. No reject reason.
|
||||
}
|
||||
|
||||
@@ -28,7 +28,10 @@ namespace usr
|
||||
const std::string pubkey;
|
||||
|
||||
// Holds the unprocessed user inputs collected from websocket.
|
||||
std::list<user_input> submitted_inputs;
|
||||
std::list<submitted_user_input> submitted_inputs;
|
||||
|
||||
// Total input bytes collected which are pending to be subjected to consensus.
|
||||
size_t collected_input_size = 0;
|
||||
|
||||
// Holds the websocket session of this user.
|
||||
// We don't need to own the session object since the lifetime of user and session are coupled.
|
||||
@@ -59,6 +62,14 @@ namespace usr
|
||||
|
||||
std::optional<usr::user_comm_server> server;
|
||||
};
|
||||
|
||||
struct input_status_response
|
||||
{
|
||||
const util::PROTOCOL protocol;
|
||||
const std::string sig;
|
||||
const char *reject_reason;
|
||||
};
|
||||
|
||||
extern connected_context ctx;
|
||||
extern util::buffer_store input_store;
|
||||
|
||||
@@ -70,7 +81,9 @@ namespace usr
|
||||
|
||||
int verify_challenge(std::string_view message, usr::user_comm_session &session);
|
||||
|
||||
int handle_user_message(connected_user &user, std::string_view message);
|
||||
int handle_authed_user_message(connected_user &user, std::string_view message);
|
||||
|
||||
void send_input_status_responses(const std::unordered_map<std::string, std::vector<input_status_response>> &responses);
|
||||
|
||||
void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session,
|
||||
std::string_view status, std::string_view reason, std::string_view input_sig);
|
||||
@@ -79,9 +92,10 @@ namespace usr
|
||||
|
||||
int remove_user(const std::string &pubkey);
|
||||
|
||||
const char *validate_user_input_submission(const std::string &user_pubkey, const usr::user_input &umsg,
|
||||
const uint64_t lcl_seq_no, size_t &total_input_len,
|
||||
std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno);
|
||||
const char *extract_submitted_input(const std::string &user_pubkey, const usr::submitted_user_input &submitted, usr::extracted_user_input &extracted);
|
||||
|
||||
const char *validate_user_input_submission(const std::string &user_pubkey, const usr::extracted_user_input &extracted_input,
|
||||
const uint64_t lcl_seq_no, size_t &total_input_size, std::string &hash, util::buffer_view &input);
|
||||
|
||||
bool verify_appbill_check(std::string_view pubkey, const size_t input_len);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user