User request status messages (#76)

This commit is contained in:
Ravin Perera
2019-12-20 21:14:53 +05:30
committed by GitHub
parent d82ab8f626
commit e43cbc9326
5 changed files with 185 additions and 24 deletions

View File

@@ -324,14 +324,20 @@ void verify_and_populate_candidate_user_inputs()
{
for (const auto &[pubkey, umsgs] : p.user_messages)
{
// Locate this user's socket session in case we need to send any status messages regarding user inputs.
sock::socket_session<usr::user_outbound_message> *session = usr::get_session_by_pubkey(pubkey);
// Populate user list with this user's pubkey.
ctx.candidate_users.emplace(pubkey);
// Collect valid inputs for this user.
std::unordered_map<std::string, candidate_user_input> valid_inputs;
// Keep track of total input length to verify against remaining balance.
// We only process inputs in the submitted order that can be satisfied with the remaining account balance.
size_t total_input_len = 0;
bool appbill_balance_exceeded = false;
for (const usr::user_submitted_message &umsg : umsgs)
{
const char *reject_reason = NULL;
const std::string sig_hash = crypto::get_hash(umsg.sig);
// Check for duplicate messages using hash of the signature.
@@ -348,32 +354,58 @@ void verify_and_populate_candidate_user_inputs()
// Ignore the input if our ledger has passed the input TTL.
if (maxledgerseqno > ctx.led_seq_no)
{
// Hash is prefixed with the nonce to support user-defined sort order.
std::string hash = std::move(nonce);
// Append the hash of the message signature to get the final hash.
hash.append(sig_hash);
if (!appbill_balance_exceeded)
{
// Hash is prefixed with the nonce to support user-defined sort order.
std::string hash = std::move(nonce);
// Append the hash of the message signature to get the final hash.
hash.append(sig_hash);
valid_inputs.try_emplace(
hash,
candidate_user_input(pubkey, std::move(input), maxledgerseqno));
// Keep checking the subtotal of inputs extracted so far with the appbill account balance.
total_input_len += input.length();
if (verify_appbill_check(pubkey, total_input_len))
{
ctx.candidate_user_inputs.try_emplace(
hash,
candidate_user_input(pubkey, std::move(input), maxledgerseqno));
}
else
{
// Abandon processing further inputs from this user when we find out
// an input cannot be processed with the account balance.
appbill_balance_exceeded = true;
reject_reason = jusrmsg::REASON_APPBILL_BALANCE_EXCEEDED;
}
}
else
{
reject_reason = jusrmsg::REASON_APPBILL_BALANCE_EXCEEDED;
}
}
else
{
LOG_DBG << "User message bad max ledger seq expired.";
reject_reason = jusrmsg::REASON_MAX_LEDGER_EXPIRED;
}
}
else
{
LOG_DBG << "User message bad signature.";
reject_reason = jusrmsg::REASON_BAD_SIG;
}
}
else
{
LOG_DBG << "Duplicate user message.";
reject_reason = jusrmsg::REASON_DUPLICATE_MSG;
}
usr::send_request_status_result(session,
reject_reason == NULL ? jusrmsg::STATUS_ACCEPTED : jusrmsg::STATUS_REJECTED,
reject_reason == NULL ? "" : reject_reason,
jusrmsg::MSGTYPE_CONTRACT_INPUT,
jusrmsg::origin_data_for_contract_input(umsg.sig));
}
// Verify the accumulated input length with app bill before adding to candidate inputs.
size_t total_input_len = 0;
for (const auto &[hash, cand_input] : valid_inputs)
total_input_len += cand_input.input.size();
if (verify_appbill_check(pubkey, total_input_len))
ctx.candidate_user_inputs.merge(valid_inputs);
// TODO: report back to the user if the inputs didn't make it into consensus due to some reason.
}
}
p2p::ctx.collected_msgs.nonunl_proposals.clear();

View File

@@ -29,6 +29,9 @@ constexpr const char *FLD_CONTENT = "content";
constexpr const char *FLD_NONCE = "nonce";
constexpr const char *FLD_LCL = "lcl";
constexpr const char *FLD_LCL_SEQ = "lcl_seqno";
constexpr const char *FLD_STATUS = "status";
constexpr const char *FLD_ORIGIN = "origin";
constexpr const char *FLD_REASON = "reason";
// Length of user random challenge bytes.
const size_t CHALLENGE_LEN = 16;
@@ -86,7 +89,7 @@ void create_user_challenge(std::string &msg, std::string &challengehex)
* Message format:
* {
* "type": "stat_resp",
* "lcl": "<lcl id>"
* "lcl": "<lcl id>",
* "lcl_seqno": <integer>
* }
*/
@@ -108,6 +111,69 @@ void create_status_response(std::string &msg)
.append("}");
}
/**
* Constructs a request result message.
* @param msg String reference to copy the generated json message string into.
* Message format:
* {
* "type": "request_status_result",
* "status": "<accepted|rejected>",
* "reason": "",
* "origin": {
* "type": "<original msg type>",
* ...
* }
* }
* @param is_accepted Whether the original message was accepted or not.
* @param reason Rejected reason. Empty if accepted.
* @param origin_type Original message type which generated this result.
* @param origin_extra_data Extra field data string to be injected into origin.
*/
void create_request_status_result(std::string &msg, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data)
{
msg.reserve(128);
msg.append("{\"")
.append(FLD_TYPE)
.append(SEP_COLON)
.append(MSGTYPE_REQUEST_STATUS_RESULT)
.append(SEP_COMMA)
.append(FLD_STATUS)
.append(SEP_COLON)
.append(status)
.append(SEP_COMMA)
.append(FLD_REASON)
.append(SEP_COLON)
.append(reason)
.append(SEP_COMMA)
.append(FLD_ORIGIN)
.append(":{")
.append(FLD_TYPE)
.append(SEP_COMMA)
.append(origin_type)
.append("\"")
.append(origin_extra_data)
.append("}}");
}
/**
* Returns concatenated string for contract input origin data fields to be included in request result.
* @param sig Binary singature of the original contract input.
*/
std::string origin_data_for_contract_input(std::string_view sig)
{
std::string sighex;
util::bin2hex(sighex, reinterpret_cast<const unsigned char *>(sig.data()), sig.length());
std::string extra_data;
extra_data.append(",\"")
.append(FLD_SIG)
.append(SEP_COLON)
.append(sighex)
.append("\"");
return extra_data;
}
/**
* Constructs a contract output container message.
* @param msg String reference to copy the generated json message string into.

View File

@@ -16,11 +16,26 @@ constexpr const char* MSGTYPE_CONTRACT_INPUT = "contract_input";
constexpr const char* MSGTYPE_CONTRACT_OUTPUT = "contract_output";
constexpr const char* MSGTYPE_STAT = "stat";
constexpr const char* MSGTYPE_STAT_RESP = "stat_resp";
constexpr const char* MSGTYPE_REQUEST_STATUS_RESULT = "request_status_result";
constexpr const char* MSGTYPE_UNKNOWN = "unknown";
constexpr const char *STATUS_ACCEPTED = "accepted";
constexpr const char *STATUS_REJECTED = "rejected";
constexpr const char *REASON_BAD_MSG_FORMAT = "bad_msg_format";
constexpr const char *REASON_INVALID_MSG_TYPE = "invalid_msg_type";
constexpr const char *REASON_DUPLICATE_MSG = "dup_msg";
constexpr const char *REASON_BAD_SIG = "bad_sig";
constexpr const char *REASON_APPBILL_BALANCE_EXCEEDED = "appbill_balance_exceeded";
constexpr const char *REASON_MAX_LEDGER_EXPIRED = "max_ledger_expired";
void create_user_challenge(std::string &msg, std::string &challengehex);
void create_status_response(std::string &msg);
void create_request_status_result(std::string &msg, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data);
std::string origin_data_for_contract_input(std::string_view sig);
void create_contract_output_container(std::string &msg, std::string_view content);
int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge);

View File

@@ -113,8 +113,12 @@ int verify_challenge(std::string_view message, sock::socket_session<user_outboun
int handle_user_message(connected_user &user, std::string_view message)
{
rapidjson::Document d;
const char *msg_type = jusrmsg::MSGTYPE_UNKNOWN;
if (jusrmsg::parse_user_message(d, message) == 0)
{
const char *msg_type = d[jusrmsg::FLD_TYPE].GetString();
// Message is a contract input message.
if (d[jusrmsg::FLD_TYPE] == jusrmsg::MSGTYPE_CONTRACT_INPUT)
{
@@ -130,23 +134,45 @@ int handle_user_message(connected_user &user, std::string_view message)
std::move(sig)));
return 0;
}
else
{
send_request_status_result(user.session, jusrmsg::STATUS_REJECTED, jusrmsg::REASON_BAD_SIG, msg_type, jusrmsg::origin_data_for_contract_input(sig));
return -1;
}
}
else if (d[jusrmsg::FLD_TYPE] == jusrmsg::MSGTYPE_STAT)
{
std::string msg;
LOG_DBG << msg;
jusrmsg::create_status_response(msg);
user.session->send(user_outbound_message(std::move(msg)));
return 0;
}
else
{
LOG_DBG << "Invalid user message type: " << d[jusrmsg::FLD_TYPE].GetString();
LOG_DBG << "Invalid user message type: " << msg_type;
send_request_status_result(user.session, jusrmsg::STATUS_REJECTED, jusrmsg::REASON_INVALID_MSG_TYPE, msg_type, "");
return -1;
}
}
else
{
// Bad message.
send_request_status_result(user.session, jusrmsg::STATUS_REJECTED, jusrmsg::REASON_BAD_MSG_FORMAT, msg_type, "");
return -1;
}
}
// Bad message.
return -1;
/**
* Send the specified status result via the provided session.
*/
void send_request_status_result(sock::socket_session<user_outbound_message> *session, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data)
{
if (session != NULL)
{
std::string msg;
jusrmsg::create_request_status_result(msg, status, reason, origin_type, origin_extra_data);
session->send(usr::user_outbound_message(std::move(msg)));
}
}
/**
@@ -205,6 +231,24 @@ int remove_user(const std::string &sessionid)
return 0;
}
/**
* Finds and returns the socket session for the proided user pubkey.
* @param pubkey User binary pubkey.
* @return Pointer to the socket session. NULL of not found.
*/
sock::socket_session<usr::user_outbound_message> *get_session_by_pubkey(const std::string &pubkey)
{
const auto sessionid_itr = usr::ctx.sessionids.find(pubkey);
if (sessionid_itr != usr::ctx.sessionids.end())
{
const auto user_itr = usr::ctx.users.find(sessionid_itr->second);
if (user_itr != usr::ctx.users.end())
return user_itr->second.session;
}
return NULL;
}
/**
* Starts listening for incoming user websocket connections.
*/

View File

@@ -90,10 +90,14 @@ int verify_challenge(std::string_view message, sock::socket_session<user_outboun
int handle_user_message(connected_user &user, std::string_view message);
void send_request_status_result(sock::socket_session<usr::user_outbound_message> *session, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data);
int add_user(sock::socket_session<user_outbound_message> *session, const std::string &pubkey);
int remove_user(const std::string &sessionid);
sock::socket_session<usr::user_outbound_message> *get_session_by_pubkey(const std::string &pubkey);
void start_listening();
} // namespace usr