diff --git a/CMakeLists.txt b/CMakeLists.txt index 5ae9ee3a..cbbfcd71 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,10 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build) set(CMAKE_BUILD_TYPE "MinSizeRel" FORCE) add_executable(hpcore + src/util.cpp + src/crypto.cpp + src/conf.cpp + src/hplog.cpp src/sock/socket_client.cpp src/sock/socket_server.cpp src/sock/socket_session.cpp @@ -15,12 +19,8 @@ add_executable(hpcore src/p2p/p2p.cpp src/usr/user_session_handler.cpp src/usr/usr.cpp - src/cons/cons.cpp - src/util.cpp - src/crypto.cpp - src/conf.cpp - src/hplog.cpp src/proc.cpp + src/cons/cons.cpp src/main.cpp ) diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index fb60d567..fb1a2781 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -1,5 +1,3 @@ -#include -#include #include #include #include @@ -43,8 +41,8 @@ void consensus() ctx.time_now = util::get_epoch_milliseconds(); // Throughout consensus, we move over the incoming proposals collected via the network so far into - // the candidate proposal set (move and append). This is to have a private working set for the consensus and void - // threading conflicts with network incoming proposals list. + // the candidate proposal set (move and append). This is to have a private working set for the consensus and avoid + // threading conflicts with network incoming proposals. { std::lock_guard lock(p2p::collected_msgs.proposals_mutex); ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals); @@ -54,20 +52,25 @@ void consensus() { // Stage 0 means begining of a consensus round. - // Remove any useless candidate proposals so we'll have a cleaner proposal set to look at - // when we transition to stage 1. - auto itr = ctx.candidate_proposals.begin(); - while (itr != ctx.candidate_proposals.end()) { - // Remove any proposal from previous round's stage 3. - // Remove any proposal from self (pubkey match). - // todo: check the state of these to ensure we're running consensus ledger - if (itr->stage == 3 || conf::cfg.pubkey == itr->pubkey) - ctx.candidate_proposals.erase(itr++); - else - ++itr; + // Remove any useless candidate proposals so we'll have a cleaner proposal set to look at + // when we transition to stage 1. + auto itr = ctx.candidate_proposals.begin(); + while (itr != ctx.candidate_proposals.end()) + { + // Remove any proposal from previous round's stage 3. + // Remove any proposal from self (pubkey match). + // todo: check the state of these to ensure we're running consensus ledger + if (itr->stage == 3 || conf::cfg.pubkey == itr->pubkey) + ctx.candidate_proposals.erase(itr++); + else + ++itr; + } } + // Transfer connected user data onto consensus candidate data. + populate_candidate_users_and_inputs(); + // In stage 0 we create a novel proposal and broadcast it. const p2p::proposal stg_prop = create_stage0_proposal(); if (broadcast_proposal(stg_prop) != 0) @@ -127,6 +130,25 @@ void consensus() std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 4)); } +/** + * Populate connected users and their inputs (if any) into consensus candidate data. + */ +void populate_candidate_users_and_inputs() +{ + // Lock the connected user list until we do this operation. + std::lock_guard lock(usr::users_mutex); + for (auto &[sid, con_user] : usr::users) + { + // Populate the user into candidate user inputs map. + // We do this regardless of whether the user has any inputs or not. + + std::list &inplist = ctx.candidate_users[con_user.pubkey]; + + // Transfer the connected user's inputs (if any) to the candidate user's inputs list. + inplist.splice(inplist.end(), con_user.inputs); + } +} + p2p::proposal create_stage0_proposal() { // The proposal we are going to emit in stage 0. @@ -137,36 +159,36 @@ p2p::proposal create_stage0_proposal() stg_prop.stage = 0; stg_prop.lcl = ctx.lcl; - // Populate the stg_prop with users list (user pubkey list) and their inputs. - { - std::lock_guard lock(usr::users_mutex); - for (auto &[sid, user] : usr::users) - { - // add all the user connections we host - stg_prop.users.emplace(user.pubkey); + // Populate the poposal with users list (user pubkey list) and their inputs. - // and all their pending messages - if (!user.inbuffer.empty()) - { - std::string input; - input.swap(user.inbuffer); - stg_prop.raw_inputs.try_emplace(user.pubkey, std::move(input)); - } + for (auto [pubkey, inputs] : ctx.candidate_users) + { + // Add all the user connections we host. + stg_prop.users.emplace(pubkey); + + // Add all their pending inputs. + if (!inputs.empty()) + { + std::vector inpvec; + for (util::hash_buffer &hashbuf : inputs) + inpvec.push_back(hashbuf); // Copy all hashbufs from candidate inputs into the proposal. + + stg_prop.raw_inputs.emplace(pubkey, std::move(inpvec)); } } // Populate the stg_prop with any contract outputs from previous round's stage 3. - for (auto &[pubkey, bufpair] : ctx.local_userbuf) + for (auto &[pubkey, bufpair] : ctx.useriobufmap) { - if (!bufpair.second.empty()) // bufpair.second is the output buffer. + if (!bufpair.output.empty()) { std::string rawoutput; - rawoutput.swap(bufpair.second); + rawoutput.swap(bufpair.output); - stg_prop.raw_outputs.try_emplace(pubkey, std::move(rawoutput)); + stg_prop.raw_outputs.try_emplace(pubkey, util::hash_buffer(rawoutput, pubkey)); } } - ctx.local_userbuf.clear(); + ctx.useriobufmap.clear(); // todo: set propsal states // todo: generate stg_prop hash and check with ctx.novel_proposal, we are sending same stg_prop again. @@ -188,7 +210,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) //todo:check lcl votes and wait for proposals - // Vote for rest of the proposal fields + // Vote for rest of the proposal fields by looking at candidate proposals. for (const p2p::proposal &cp : ctx.candidate_proposals) { // Vote for times. @@ -202,64 +224,52 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) // Vote for user inputs - // Proposals from stage 0 will have raw inputs in them. + // Proposals from stage 0 will have raw inputs (and their hashes) in them. if (!cp.raw_inputs.empty()) { - for (auto &[pubkey, input] : cp.raw_inputs) + for (auto &[pubkey, inputs] : cp.raw_inputs) { - // Hash the pubkey+input. - std::string str_to_hash; - str_to_hash.reserve(pubkey.size() + input.size()); - str_to_hash.append(pubkey); - str_to_hash.append(input); - std::string hash = crypto::sha_512_hash(str_to_hash, "INP", 3); + // Vote for the input hash. + for (util::hash_buffer input : inputs) + { + increment(votes.inputs, input.hash); - // Vote for the hash. - increment(votes.inputs, hash); - - // Remember the actual input along with the hash for future use for apply-ledger. - ctx.possible_inputs.try_emplace( - std::move(hash), - std::make_pair(pubkey, input)); + std::string inputbuffer; + inputbuffer.swap(input.buffer); + // Remember the actual input along with the hash for future use for apply-ledger. + ctx.possible_inputs.try_emplace(input.hash, std::make_pair(pubkey, inputbuffer)); + } } } - // Proposals from stage 1, 2, 3 will have hashed inputs in them. + // Proposals from stage 1, 2, 3 will have only input hashes in them. else if (!cp.hash_inputs.empty()) { for (const std::string &inputhash : cp.hash_inputs) increment(votes.inputs, inputhash); } - // Vote for user outputs + // Vote for contract outputs // Proposals from stage 0 will have raw user outputs in them. if (!cp.raw_outputs.empty()) { - for (auto &[pubkey, output] : cp.raw_outputs) + for (auto [pubkey, output] : cp.raw_outputs) { - // Hash the pubkey+input. - std::string str_to_hash; - str_to_hash.reserve(pubkey.size() + output.size()); - str_to_hash.append(pubkey); - str_to_hash.append(output); - std::string hash = crypto::sha_512_hash(str_to_hash, "OUT", 3); - // Vote for the hash. - increment(votes.outputs, hash); + increment(votes.outputs, output.hash); - // Remember the actual output along with the hash for future use for apply-ledger. - ctx.possible_outputs.try_emplace( - std::move(hash), - std::make_pair(pubkey, output)); + std::string outputbuf; + outputbuf.swap(output.buffer); + + // Remember the actual output along with the hash for future use for apply-ledger and sending back to user. + ctx.possible_outputs.try_emplace(output.hash, std::make_pair(pubkey, outputbuf)); } } // Proposals from stage 1, 2, 3 will have hashed user outputs in them. else if (!cp.hash_outputs.empty()) { for (auto outputhash : cp.hash_outputs) - { increment(votes.outputs, outputhash); - } } // todo: repeat above for state @@ -458,7 +468,6 @@ void apply_ledger(const p2p::proposal &cons_prop) //create input to feed to binary contract run - //todo:remove entries from pending inputs that made their way into a closed ledger for (const std::string &hash : cons_prop.hash_inputs) { auto itr = ctx.possible_inputs.find(hash); @@ -471,7 +480,7 @@ void apply_ledger(const p2p::proposal &cons_prop) } else { - // Prepare ctx.local_userbuf with user inputs to feed to the contract. + // Prepare ctx.useriobufmap with user inputs to feed to the contract. const std::string &pubkey = itr->second.first; std::string rawinput = itr->second.second; @@ -479,21 +488,45 @@ void apply_ledger(const p2p::proposal &cons_prop) std::string inputtofeed; inputtofeed.swap(rawinput); - std::pair bufpair; - bufpair.first = std::move(inputtofeed); - ctx.local_userbuf.try_emplace(pubkey, std::move(bufpair)); + proc::contract_iobuf_pair &bufpair = ctx.useriobufmap[pubkey]; + bufpair.inputs.push_back(std::move(inputtofeed)); } } + ctx.possible_inputs.clear(); run_contract_binary(cons_prop.time); + + // Remove entries from candidate inputs that made their way into a closed ledger + auto cu_itr = ctx.candidate_users.begin(); + while (cu_itr != ctx.candidate_users.end()) + { + // Delete any ledger inputs for this user. + std::list &inputs = cu_itr->second; + auto inp_itr = inputs.begin(); + while (inp_itr != inputs.end()) + { + // Delete the input from the list, if it was part of consensus proposal. + if (cons_prop.hash_inputs.count(inp_itr->hash)) + inputs.erase(inp_itr++); + else + ++inp_itr; + } + + // Delete the user from the list if there are no more unprocessed inputs. + if (cu_itr->second.empty()) + ctx.candidate_users.erase(cu_itr++); + else + ++cu_itr; + } } void run_contract_binary(int64_t time_now) { - std::pair hpscbufpair; - std::unordered_map> nplbufs; + // todo:implement proper data structures to exchange npl and hpsc bufs + proc::contract_bufmap_t nplbufs; + proc::contract_iobuf_pair hpscbufpair; - proc::ContractExecArgs eargs(time_now, ctx.local_userbuf, nplbufs, hpscbufpair); + proc::ContractExecArgs eargs(time_now, ctx.useriobufmap, nplbufs, hpscbufpair); proc::exec_contract(eargs); } diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 9893a6a2..14439a7f 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -5,6 +5,7 @@ #include #include #include +#include "../proc.hpp" #include "../p2p/p2p.hpp" namespace cons @@ -23,16 +24,18 @@ static const float STAGE3_THRESHOLD = 0.8; struct consensus_context { std::list candidate_proposals; + std::unordered_map> candidate_users; int8_t stage; int64_t novel_proposal_time; int64_t time_now; std::string lcl; std::string novel_proposal; + std::map> possible_inputs; std::map> possible_outputs; - std::unordered_map> local_userbuf; + std::unordered_map useriobufmap; int32_t next_sleep; }; @@ -57,6 +60,8 @@ float_t get_stage_threshold(int8_t stage); void timewait_stage(bool reset); +void populate_candidate_users_and_inputs(); + p2p::proposal create_stage0_proposal(); p2p::proposal create_stage123_proposal(vote_counter &votes); diff --git a/src/crypto.cpp b/src/crypto.cpp index 11a260b8..cab43aef 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -135,20 +135,14 @@ int verify_hex(std::string_view msg, std::string_view sighex, std::string_view p /** * Generate SHA 512 hash for message prepend with prefix before hashing. * - * @param msg message string. - * @param prefix prefix char array. - * @param char_length length of prefix char array. + * @param data String to hash. * @return SHA 512 hash. */ -std::string sha_512_hash(std::string_view msg, const char *prefix, size_t char_length) +std::string sha_512_hash(std::string_view data) { - std::string payload; - payload.reserve(char_length + msg.size()); - payload.append(prefix); - payload.append(msg.data()); unsigned char hashchars[crypto_hash_sha512_BYTES]; - crypto_hash_sha512(hashchars, (unsigned char *)payload.data(), payload.length()); - return std::string((char *)hashchars, crypto_hash_sha512_BYTES); + crypto_hash_sha512(hashchars, (unsigned char *)data.data(), data.length()); + return std::string(reinterpret_cast(hashchars), crypto_hash_sha512_BYTES); } } // namespace crypto \ No newline at end of file diff --git a/src/crypto.hpp b/src/crypto.hpp index a05375d7..c4ba370f 100644 --- a/src/crypto.hpp +++ b/src/crypto.hpp @@ -29,7 +29,7 @@ int verify(std::string_view msg, std::string_view sig, std::string_view pubkey); int verify_hex(std::string_view msg, std::string_view sighex, std::string_view pubkeyhex); -std::string sha_512_hash(std::string_view msg, const char *prefix, size_t char_length); +std::string sha_512_hash(std::string_view data); } // namespace crypto diff --git a/src/p2p/message_content.fbs b/src/p2p/message_content.fbs index 438450c6..c6316372 100644 --- a/src/p2p/message_content.fbs +++ b/src/p2p/message_content.fbs @@ -1,18 +1,28 @@ //IDL file for p2p message content schema. namespace p2p; -table BytesKeyValuePair { //flatbuff equivalent for dictionary/hashmap for - key:[ubyte]; - value:[ubyte]; - } +table BytesKeyValuePair { //A key, value pair of byte[]. + key:[ubyte]; + value:[ubyte]; +} - table ByteArray { //To represent list of byte arrays - array:[ubyte]; - } +table RawInputList { //Pubkey bytes with an array of key value pairs. + pubkey:[ubyte]; + inputs:[BytesKeyValuePair]; +} + +table RawOutput { //Pubkey bytes with a output key value pair. + pubkey:[ubyte]; + output:BytesKeyValuePair; +} + +table ByteArray { //To represent list of byte arrays + array:[ubyte]; +} union Message { Proposal_Message, Npl_Message } //message content type -table Content { +table Content { message:Message; } @@ -23,10 +33,10 @@ table Proposal_Message { //Proposal type message schema time:uint64; lcl:[ubyte]; users: [ByteArray]; - raw_inputs: [BytesKeyValuePair]; //stage 0 inputs - hash_inputs:[ByteArray]; //stage > 0 inputs (hash of stage 0 inputs) - raw_outputs: [BytesKeyValuePair]; //stage 0 outputs - hash_outputs:[ByteArray]; //stage > 0 outputs (hash of stage 0 outputs) + raw_inputs: [RawInputList]; //stage 0 inputs (hash and raw value) + hash_inputs:[ByteArray]; //stage > 0 inputs (hash of stage 0 inputs) + raw_outputs: [RawOutput]; //stage 0 outputs (hash and raw value) + hash_outputs:[ByteArray]; //stage > 0 outputs (hash of stage 0 outputs) state: State; } diff --git a/src/p2p/message_content_generated.h b/src/p2p/message_content_generated.h index d52af826..dd0cfa4c 100644 --- a/src/p2p/message_content_generated.h +++ b/src/p2p/message_content_generated.h @@ -10,6 +10,10 @@ namespace p2p { struct BytesKeyValuePair; +struct RawInputList; + +struct RawOutput; + struct ByteArray; struct Content; @@ -140,6 +144,146 @@ inline flatbuffers::Offset CreateBytesKeyValuePairDirect( value__); } +struct RawInputList FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_PUBKEY = 4, + VT_INPUTS = 6 + }; + const flatbuffers::Vector *pubkey() const { + return GetPointer *>(VT_PUBKEY); + } + flatbuffers::Vector *mutable_pubkey() { + return GetPointer *>(VT_PUBKEY); + } + const flatbuffers::Vector> *inputs() const { + return GetPointer> *>(VT_INPUTS); + } + flatbuffers::Vector> *mutable_inputs() { + return GetPointer> *>(VT_INPUTS); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_PUBKEY) && + verifier.VerifyVector(pubkey()) && + VerifyOffset(verifier, VT_INPUTS) && + verifier.VerifyVector(inputs()) && + verifier.VerifyVectorOfTables(inputs()) && + verifier.EndTable(); + } +}; + +struct RawInputListBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_pubkey(flatbuffers::Offset> pubkey) { + fbb_.AddOffset(RawInputList::VT_PUBKEY, pubkey); + } + void add_inputs(flatbuffers::Offset>> inputs) { + fbb_.AddOffset(RawInputList::VT_INPUTS, inputs); + } + explicit RawInputListBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + RawInputListBuilder &operator=(const RawInputListBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateRawInputList( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> pubkey = 0, + flatbuffers::Offset>> inputs = 0) { + RawInputListBuilder builder_(_fbb); + builder_.add_inputs(inputs); + builder_.add_pubkey(pubkey); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateRawInputListDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *pubkey = nullptr, + const std::vector> *inputs = nullptr) { + auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; + auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; + return p2p::CreateRawInputList( + _fbb, + pubkey__, + inputs__); +} + +struct RawOutput FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_PUBKEY = 4, + VT_OUTPUT = 6 + }; + const flatbuffers::Vector *pubkey() const { + return GetPointer *>(VT_PUBKEY); + } + flatbuffers::Vector *mutable_pubkey() { + return GetPointer *>(VT_PUBKEY); + } + const BytesKeyValuePair *output() const { + return GetPointer(VT_OUTPUT); + } + BytesKeyValuePair *mutable_output() { + return GetPointer(VT_OUTPUT); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_PUBKEY) && + verifier.VerifyVector(pubkey()) && + VerifyOffset(verifier, VT_OUTPUT) && + verifier.VerifyTable(output()) && + verifier.EndTable(); + } +}; + +struct RawOutputBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_pubkey(flatbuffers::Offset> pubkey) { + fbb_.AddOffset(RawOutput::VT_PUBKEY, pubkey); + } + void add_output(flatbuffers::Offset output) { + fbb_.AddOffset(RawOutput::VT_OUTPUT, output); + } + explicit RawOutputBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + RawOutputBuilder &operator=(const RawOutputBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateRawOutput( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> pubkey = 0, + flatbuffers::Offset output = 0) { + RawOutputBuilder builder_(_fbb); + builder_.add_output(output); + builder_.add_pubkey(pubkey); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateRawOutputDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *pubkey = nullptr, + flatbuffers::Offset output = 0) { + auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; + return p2p::CreateRawOutput( + _fbb, + pubkey__, + output); +} + struct ByteArray FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_ARRAY = 4 @@ -315,11 +459,11 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector> *mutable_users() { return GetPointer> *>(VT_USERS); } - const flatbuffers::Vector> *raw_inputs() const { - return GetPointer> *>(VT_RAW_INPUTS); + const flatbuffers::Vector> *raw_inputs() const { + return GetPointer> *>(VT_RAW_INPUTS); } - flatbuffers::Vector> *mutable_raw_inputs() { - return GetPointer> *>(VT_RAW_INPUTS); + flatbuffers::Vector> *mutable_raw_inputs() { + return GetPointer> *>(VT_RAW_INPUTS); } const flatbuffers::Vector> *hash_inputs() const { return GetPointer> *>(VT_HASH_INPUTS); @@ -327,11 +471,11 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector> *mutable_hash_inputs() { return GetPointer> *>(VT_HASH_INPUTS); } - const flatbuffers::Vector> *raw_outputs() const { - return GetPointer> *>(VT_RAW_OUTPUTS); + const flatbuffers::Vector> *raw_outputs() const { + return GetPointer> *>(VT_RAW_OUTPUTS); } - flatbuffers::Vector> *mutable_raw_outputs() { - return GetPointer> *>(VT_RAW_OUTPUTS); + flatbuffers::Vector> *mutable_raw_outputs() { + return GetPointer> *>(VT_RAW_OUTPUTS); } const flatbuffers::Vector> *hash_outputs() const { return GetPointer> *>(VT_HASH_OUTPUTS); @@ -396,13 +540,13 @@ struct Proposal_MessageBuilder { void add_users(flatbuffers::Offset>> users) { fbb_.AddOffset(Proposal_Message::VT_USERS, users); } - void add_raw_inputs(flatbuffers::Offset>> raw_inputs) { + void add_raw_inputs(flatbuffers::Offset>> raw_inputs) { fbb_.AddOffset(Proposal_Message::VT_RAW_INPUTS, raw_inputs); } void add_hash_inputs(flatbuffers::Offset>> hash_inputs) { fbb_.AddOffset(Proposal_Message::VT_HASH_INPUTS, hash_inputs); } - void add_raw_outputs(flatbuffers::Offset>> raw_outputs) { + void add_raw_outputs(flatbuffers::Offset>> raw_outputs) { fbb_.AddOffset(Proposal_Message::VT_RAW_OUTPUTS, raw_outputs); } void add_hash_outputs(flatbuffers::Offset>> hash_outputs) { @@ -431,9 +575,9 @@ inline flatbuffers::Offset CreateProposal_Message( uint64_t time = 0, flatbuffers::Offset> lcl = 0, flatbuffers::Offset>> users = 0, - flatbuffers::Offset>> raw_inputs = 0, + flatbuffers::Offset>> raw_inputs = 0, flatbuffers::Offset>> hash_inputs = 0, - flatbuffers::Offset>> raw_outputs = 0, + flatbuffers::Offset>> raw_outputs = 0, flatbuffers::Offset>> hash_outputs = 0, flatbuffers::Offset state = 0) { Proposal_MessageBuilder builder_(_fbb); @@ -459,17 +603,17 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( uint64_t time = 0, const std::vector *lcl = nullptr, const std::vector> *users = nullptr, - const std::vector> *raw_inputs = nullptr, + const std::vector> *raw_inputs = nullptr, const std::vector> *hash_inputs = nullptr, - const std::vector> *raw_outputs = nullptr, + const std::vector> *raw_outputs = nullptr, const std::vector> *hash_outputs = nullptr, flatbuffers::Offset state = 0) { auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; auto users__ = users ? _fbb.CreateVector>(*users) : 0; - auto raw_inputs__ = raw_inputs ? _fbb.CreateVector>(*raw_inputs) : 0; + auto raw_inputs__ = raw_inputs ? _fbb.CreateVector>(*raw_inputs) : 0; auto hash_inputs__ = hash_inputs ? _fbb.CreateVector>(*hash_inputs) : 0; - auto raw_outputs__ = raw_outputs ? _fbb.CreateVector>(*raw_outputs) : 0; + auto raw_outputs__ = raw_outputs ? _fbb.CreateVector>(*raw_outputs) : 0; auto hash_outputs__ = hash_outputs ? _fbb.CreateVector>(*hash_outputs) : 0; return p2p::CreateProposal_Message( _fbb, diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 1b1f51ed..b93288ab 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -19,9 +19,9 @@ struct proposal int8_t stage; std::string lcl; std::unordered_set users; - std::unordered_map raw_inputs; + std::unordered_map> raw_inputs; std::unordered_set hash_inputs; - std::unordered_map raw_outputs; + std::unordered_map raw_outputs; std::unordered_set hash_outputs; }; diff --git a/src/p2p/peer_message_handler.cpp b/src/p2p/peer_message_handler.cpp index eaf1bc31..77708bcf 100644 --- a/src/p2p/peer_message_handler.cpp +++ b/src/p2p/peer_message_handler.cpp @@ -178,13 +178,13 @@ const proposal create_proposal_from_msg(const Proposal_Message &msg) p.users = flatbuf_bytearrayvector_to_stringlist(msg.users()); if (msg.raw_inputs()) - p.raw_inputs = flatbuf_pairvector_to_stringmap(msg.raw_inputs()); + p.raw_inputs = flatbuf_rawinputs_to_hashbuffermap(msg.raw_inputs()); if (msg.hash_inputs()) p.hash_inputs = flatbuf_bytearrayvector_to_stringlist(msg.hash_inputs()); if (msg.raw_outputs()) - p.raw_outputs = flatbuf_pairvector_to_stringmap(msg.raw_outputs()); + p.raw_outputs = flatbuf_rawoutputs_to_hashbuffermap(msg.raw_outputs()); if (msg.hash_outputs()) p.hash_outputs = flatbuf_bytearrayvector_to_stringlist(msg.hash_outputs()); @@ -214,9 +214,9 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, p.time, sv_to_flatbuff_bytes(builder, p.lcl), stringlist_to_flatbuf_bytearrayvector(builder, p.users), - stringmap_to_flatbuf_bytepairvector(builder, p.raw_inputs), + hashbuffermap_to_flatbuf_rawinputs(builder, p.raw_inputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), - stringmap_to_flatbuf_bytepairvector(builder, p.raw_outputs), + hashbuffermap_to_flatbuf_rawoutputs(builder, p.raw_outputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)); flatbuffers::Offset message = CreateContent(builder, Message_Proposal_Message, proposal.Union()); @@ -300,6 +300,52 @@ flatbuf_pairvector_to_stringmap(const flatbuffers::Vector> +flatbuf_rawinputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec) +{ + std::unordered_map> map; + map.reserve(fbvec->size()); + for (const RawInputList *user : *fbvec) + { + std::vector bufvec; + bufvec.reserve(user->inputs()->size()); + + for (auto input : *user->inputs()) + { + // Create hash_buffer object and manually assign the hash from the input. + util::hash_buffer buf(flatbuff_bytes_to_sv(input->value())); //input->value() is the raw input. + buf.hash = flatbuff_bytes_to_sv(input->key()); //input->key() is the hash of the input. + + bufvec.push_back(buf); + } + + map.emplace(flatbuff_bytes_to_sv(user->pubkey()), std::move(bufvec)); + } + return map; +} + +/** + * Returns a hash buffer map from Flatbuffer proposal raw outputs. + */ +const std::unordered_map +flatbuf_rawoutputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec) +{ + std::unordered_map map; + map.reserve(fbvec->size()); + for (const RawOutput *user : *fbvec) + { + // Create hash_buffer object and manually assign the hash from the output. + util::hash_buffer buf(flatbuff_bytes_to_sv(user->output()->value())); //output->value() is the raw output. + buf.hash = flatbuff_bytes_to_sv(user->output()->key()); //output->key() is the hash of the output. + + map.emplace(flatbuff_bytes_to_sv(user->pubkey()), std::move(buf)); + } + return map; +} + //---Conversion helpers from std data types to flatbuffers data types---// //---These are used in constructing Flatbuffer messages using builders---// @@ -343,4 +389,52 @@ stringmap_to_flatbuf_bytepairvector(flatbuffers::FlatBufferBuilder &builder, con return builder.CreateVector(fbvec); } +/** + * Returns Flatbuffer vector of RawInputs from a given map of hash buffer lists. + */ +const flatbuffers::Offset>> +hashbuffermap_to_flatbuf_rawinputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) +{ + std::vector> fbvec; + fbvec.reserve(map.size()); + for (auto const &[pubkey, bufvec] : map) + { + std::vector> fbinputsvec; + for (const util::hash_buffer &buf : bufvec) + { + fbinputsvec.push_back(CreateBytesKeyValuePair( + builder, + sv_to_flatbuff_bytes(builder, buf.hash), + sv_to_flatbuff_bytes(builder, buf.buffer))); + } + + fbvec.push_back(CreateRawInputList( + builder, + sv_to_flatbuff_bytes(builder, pubkey), + builder.CreateVector(fbinputsvec))); + } + return builder.CreateVector(fbvec); +} + +/** + * Returns Flatbuffer vector of RawOutputs from a given map of hash buffers. + */ +const flatbuffers::Offset>> +hashbuffermap_to_flatbuf_rawoutputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map) +{ + std::vector> fbvec; + fbvec.reserve(map.size()); + for (auto const &[pubkey, buf] : map) + { + fbvec.push_back(CreateRawOutput( + builder, + sv_to_flatbuff_bytes(builder, pubkey), + CreateBytesKeyValuePair( + builder, + sv_to_flatbuff_bytes(builder, buf.hash), + sv_to_flatbuff_bytes(builder, buf.buffer)))); + } + return builder.CreateVector(fbvec); +} + } // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_message_handler.hpp b/src/p2p/peer_message_handler.hpp index 848795a4..06e39431 100644 --- a/src/p2p/peer_message_handler.hpp +++ b/src/p2p/peer_message_handler.hpp @@ -36,9 +36,17 @@ std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_ std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *buffer); -const std::unordered_set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec); +const std::unordered_set +flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec); -const std::unordered_map flatbuf_pairvector_to_stringmap(const flatbuffers::Vector> *fbvec); +const std::unordered_map +flatbuf_pairvector_to_stringmap(const flatbuffers::Vector> *fbvec); + +const std::unordered_map> +flatbuf_rawinputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec); + +const std::unordered_map +flatbuf_rawoutputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec); //---Conversion helpers from std data types to flatbuffers data types---// @@ -51,6 +59,12 @@ stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, c const flatbuffers::Offset>> stringmap_to_flatbuf_bytepairvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map); +const flatbuffers::Offset>> +hashbuffermap_to_flatbuf_rawinputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); + +const flatbuffers::Offset>> +hashbuffermap_to_flatbuf_rawoutputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map); + } // namespace p2p #endif \ No newline at end of file diff --git a/src/proc.cpp b/src/proc.cpp index f135beff..8d2e5806 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -30,10 +30,10 @@ enum FDTYPE }; // Map of user pipe fds (map key: user public key) -contract_fdmap userfds; +contract_fdmap_t userfds; // Map of NPL pipe fds (map key: user public key) -contract_fdmap nplfds; +contract_fdmap_t nplfds; // Pipe fds for HP <--> messages. std::vector hpscfds; @@ -242,10 +242,9 @@ int write_contract_args(const ContractExecArgs &args) */ int write_contract_hp_inputs(const ContractExecArgs &args) { - if (create_and_write_iopipes(hpscfds, args.hpscbufs.first) != 0) // hpscbufs.first is the input buffer. + if (create_and_write_iopipes(hpscfds, args.hpscbufs.inputs) != 0) { - LOG_ERR << "Error writing HP input to SC (" << args.hpscbufs.first.length() - << " bytes)"; + LOG_ERR << "Error writing HP inputs to SC"; return -1; } return 0; @@ -259,11 +258,11 @@ int write_contract_hp_inputs(const ContractExecArgs &args) */ int read_contract_hp_outputs(const ContractExecArgs &args) { - // Clear the input buffer because we are sure the contract has finished reading from + // Clear the input buffers because we are sure the contract has finished reading from // that mapped memory portion. - args.hpscbufs.first.clear(); //bufpair.first is the input buffer. + args.hpscbufs.inputs.clear(); - if (read_iopipe(hpscfds, args.hpscbufs.second) != 0) // hpscbufs.second is the output buffer. + if (read_iopipe(hpscfds, args.hpscbufs.output) != 0) // hpscbufs.second is the output buffer. return -1; return 0; @@ -274,7 +273,7 @@ int read_contract_hp_outputs(const ContractExecArgs &args) * @param fdmap Any pubkey->fdlist map. (eg. userfds, nplfds) * @param os An output stream. */ -void fdmap_json_to_stream(const contract_fdmap &fdmap, std::ostringstream &os) +void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os) { for (auto itr = fdmap.begin(); itr != fdmap.end(); itr++) { @@ -302,16 +301,16 @@ void fdmap_json_to_stream(const contract_fdmap &fdmap, std::ostringstream &os) * modified (eg. fd close, buffer clear). * * @param fdmap A map which has public key and a vector as fd list for that public key. - * @param bufmap A map which has a public key and input/output buffer pair for that public key. + * @param bufmap A map which has a public key and input/output buffer lists for that public key. * @return 0 on success. -1 on failure. */ -int write_contract_fdmap_inputs(contract_fdmap &fdmap, contract_bufmap &bufmap) +int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) { - // Loop through input buffer for each pubkey. - for (auto &[pubkey, bufpair] : bufmap) + // Loop through input buffers for each pubkey. + for (auto &[pubkey, buflist] : bufmap) { std::vector fds = std::vector(); - if (create_and_write_iopipes(fds, bufpair.first) != 0) // bufpair.first is the input buffer. + if (create_and_write_iopipes(fds, buflist.inputs) != 0) return -1; fdmap.emplace(pubkey, std::move(fds)); @@ -328,18 +327,18 @@ int write_contract_fdmap_inputs(contract_fdmap &fdmap, contract_bufmap &bufmap) * @param bufmap A map which has a public key and input/output buffer pair for that public key. * @return 0 on success. -1 on failure. */ -int read_contract_fdmap_outputs(contract_fdmap &fdmap, contract_bufmap &bufmap) +int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) { for (auto &[pubkey, bufpair] : bufmap) { // Clear the input buffer because we are sure the contract has finished reading from - // that mapped memory portion. - bufpair.first.clear(); //bufpair.first is the input buffer. + // the inputs' mapped memory portion. + bufpair.inputs.clear(); // Get fds for the pubkey. std::vector &fds = fdmap[pubkey]; - if (read_iopipe(fds, bufpair.second) != 0) // bufpair.second is the output buffer. + if (read_iopipe(fds, bufpair.output) != 0) // bufpair.second is the output buffer. return -1; } @@ -350,7 +349,7 @@ int read_contract_fdmap_outputs(contract_fdmap &fdmap, contract_bufmap &bufmap) * Common function to close any open fds in the map after an error. * @param fdmap Any pubkey->fdlist map. (eg. userfds, nplfds) */ -void cleanup_fdmap(contract_fdmap &fdmap) +void cleanup_fdmap(contract_fdmap_t &fdmap) { for (auto &[pubkey, fds] : fdmap) { @@ -370,7 +369,7 @@ void cleanup_fdmap(contract_fdmap &fdmap) * @param fds Vector to populate fd list. * @param inputbuffer Buffer to write into the HP write fd. */ -int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer) +int create_and_write_iopipes(std::vector &fds, std::list &inputs) { int inpipe[2]; if (pipe(inpipe) != 0) @@ -392,17 +391,17 @@ int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer) fds.push_back(outpipe[0]); //HPREAD fds.push_back(outpipe[1]); //SCWRITE - // Write the input (if any) into the contract and close the writefd. + // Write the inputs (if any) into the contract and close the writefd. int writefd = fds[FDTYPE::HPWRITE]; bool vmsplice_error = false; - if (!inputbuffer.empty()) + for (std::string &input : inputs) { // We use vmsplice to map (zero-copy) the input into the fd. iovec memsegs[1]; - memsegs[0].iov_base = inputbuffer.data(); - memsegs[0].iov_len = inputbuffer.length(); + memsegs[0].iov_base = input.data(); + memsegs[0].iov_len = input.length(); if (vmsplice(writefd, memsegs, 1, 0) == -1) vmsplice_error = true; @@ -421,15 +420,15 @@ int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer) } /** - * Common function to read and close SC output from the pipe and populate a given buffer. + * Common function to read and close SC output from the pipe and populate the output list. * @param fds Vector representing the pipes fd list. - * @param The buffer to place the read output. + * @param output The buffer to place the read output. */ -int read_iopipe(std::vector &fds, std::string &outputbuffer) +int read_iopipe(std::vector &fds, std::string &output) { - // Read any outputs that have been written by the contract process - // from the HP outpipe and store in the outbuffer. - // outbuffer will be read by the consensus process later when it wishes so. + // Read any data that have been written by the contract process + // from the output pipe and store in the output buffer. + // Outputs will be read by the consensus process later when it wishes so. int readfd = fds[FDTYPE::HPREAD]; int bytes_available = 0; @@ -438,12 +437,12 @@ int read_iopipe(std::vector &fds, std::string &outputbuffer) if (bytes_available > 0) { - outputbuffer.resize(bytes_available); // args.hpscbufs.second is the output buffer. + output.resize(bytes_available); // Populate the user output buffer with new data from the pipe. // We use vmsplice to map (zero-copy) the output from the fd into output bbuffer. iovec memsegs[1]; - memsegs[0].iov_base = outputbuffer.data(); + memsegs[0].iov_base = output.data(); memsegs[0].iov_len = bytes_available; if (vmsplice(readfd, memsegs, 1, 0) == -1) diff --git a/src/proc.hpp b/src/proc.hpp index d1a6644d..49d3a5e2 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "usr/usr.hpp" #include "util.hpp" @@ -14,13 +15,26 @@ namespace proc { +/** + * Represents list of inputs to the contract and the accumilated contract output for those inputs. + */ +struct contract_iobuf_pair +{ + // List of inputs to be fed into the contract. + std::list inputs; + + // Output emitted by contract after execution. (Because we are reading output at the end, there's no way to + // get a "list" of outputs. So it's always a one contingous output.) + std::string output; +}; + // Common typedef for a map of pubkey->fdlist. // This is used to keep track of fdlist quadruplet with a public key (eg. user, npl). -typedef std::unordered_map> contract_fdmap; +typedef std::unordered_map> contract_fdmap_t; -// Common typedef for a map of pubkey->buf-pair (input buffer and output buffer). -// This is used to keep track of input/output buffer pair with a public key (eg. user, npl) -typedef std::unordered_map> contract_bufmap; +// Common typedef for a map of pubkey->I/O list pair (input list and output list). +// This is used to keep track of input/output buffers for a given public key (eg. user, npl) +typedef std::unordered_map contract_bufmap_t; /** * Holds information that should be passed into the contract process. @@ -28,25 +42,25 @@ typedef std::unordered_map> con struct ContractExecArgs { // Map of user I/O buffers (map key: user binary public key). - // The value is a pair holding consensus-verified input and contract-generated output. - contract_bufmap &userbufs; + // The value is a pair holding consensus-verified inputs and contract-generated outputs. + contract_bufmap_t &userbufs; // Map of NPL I/O buffers (map key: Peer binary public key). - // The value is a pair holding NPL input and contract-generated output. - contract_bufmap &nplbufs; + // The value is a pair holding NPL inputs and contract-generated outputs. + contract_bufmap_t &nplbufs; // Pair of HP<->SC JSON message buffers (mainly used for control messages). - // Input buffer for HP->SC messages, Output buffer for SC->HP messages. - std::pair &hpscbufs; + // Input buffers for HP->SC messages, Output buffers for SC->HP messages. + contract_iobuf_pair &hpscbufs; // Current HotPocket timestamp. int64_t timestamp; ContractExecArgs( int64_t _timestamp, - contract_bufmap &_userbufs, - contract_bufmap &_nplbufs, - std::pair &_hpscbufs) : + contract_bufmap_t &_userbufs, + contract_bufmap_t &_nplbufs, + contract_iobuf_pair &_hpscbufs) : userbufs(_userbufs), nplbufs(_nplbufs), hpscbufs(_hpscbufs) @@ -69,17 +83,17 @@ int read_contract_hp_outputs(const ContractExecArgs &args); // Common helper functions -void fdmap_json_to_stream(const contract_fdmap &fdmap, std::ostringstream &os); +void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os); -int write_contract_fdmap_inputs(contract_fdmap &fdmap, contract_bufmap &bufmap); +int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); -int read_contract_fdmap_outputs(contract_fdmap &fdmap, contract_bufmap &bufmap); +int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); -void cleanup_fdmap(contract_fdmap &fdmap); +void cleanup_fdmap(contract_fdmap_t &fdmap); -int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer); +int create_and_write_iopipes(std::vector &fds, std::list &inputs); -int read_iopipe(std::vector &fds, std::string &outputbuffer); +int read_iopipe(std::vector &fds, std::string &output); void close_unused_fds(bool is_hp); diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 908b4d56..a2208d54 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -48,7 +48,7 @@ void user_session_handler::on_connect(sock::socket_sessioninit_uniqueid(); // Create an entry in pending_challenges for later tracking upon challenge response. - usr::pending_challenges[session->uniqueid] = challengehex; + usr::pending_challenges.try_emplace(session->uniqueid, challengehex); user_outbound_message outmsg(std::move(msgstr)); session->send(std::move(outmsg)); @@ -95,7 +95,7 @@ void user_session_handler::on_message( session->flags.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag session->flags.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag - usr::add_user(session, userpubkey); // Add the user to the global authed user list + usr::add_user(session, userpubkey); // Add the user to the global authed user list usr::pending_challenges.erase(session->uniqueid); // Remove the stored challenge LOG_INFO << "User connection " << session->uniqueid << " authenticated. Public key " @@ -125,10 +125,13 @@ void user_session_handler::on_message( // This is an authed user. usr::connected_user &user = itr->second; - //Append the bytes into connected user input buffer. - user.inbuffer.append(message); + { + std::lock_guard lock(users_mutex); + //Add to the hashed input buffer list. + user.inputs.push_back(util::hash_buffer(message, user.pubkey)); + } - LOG_DBG << "Collected " << user.inbuffer.length() << " bytes from user"; + LOG_DBG << "Collected " << message.length() << " bytes from user"; return; } } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index aaef8f81..96ab5cb7 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -31,13 +31,13 @@ std::mutex users_mutex; // Mutex for users access race conditions. * This is used for pubkey duplicate checks as well. * Map key: User binary pubkey */ -std::unordered_map sessionids; +std::unordered_map sessionids; /** * Keep track of verification-pending challenges issued to newly connected users. * Map key: User socket session id () */ -std::unordered_map pending_challenges; +std::unordered_map pending_challenges; /** * User session handler instance. This instance's methods will be fired for any user socket activity. @@ -152,7 +152,10 @@ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string { // We load response raw bytes into json document. rapidjson::Document d; - d.Parse(response.data()); + + // Because we project the response message directly from the binary socket buffer in a zero-copy manner, the response + // string is not null terminated. 'kParseStopWhenDoneFlag' avoids rapidjson error in this case. + d.Parse(response.data()); if (d.HasParseError()) { LOG_INFO << "Challenge response json parsing failed."; @@ -226,7 +229,7 @@ int add_user(sock::socket_session *session, const std::st } // Populate sessionid map so we can lookup by user pubkey. - sessionids[pubkey] = sessionid; + sessionids.try_emplace(pubkey, sessionid); return 0; } @@ -254,7 +257,7 @@ int remove_user(const std::string &sessionid) std::lock_guard lock(users_mutex); sessionids.erase(user.pubkey); } - + users.erase(itr); return 0; } @@ -268,7 +271,6 @@ void start_listening() auto address = net::ip::make_address(conf::cfg.listenip); sess_opts.max_message_size = conf::cfg.pubmaxsize; - std::make_shared>( ioc, ctx, diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index e63e7402..946b0dd5 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "../util.hpp" #include "../sock/socket_session.hpp" @@ -22,10 +23,10 @@ namespace usr struct connected_user { // User binary public key - std::string pubkey; + const std::string pubkey; - // Holds the unprocessed user input collected from websocket. - std::string inbuffer; + // Holds the unprocessed user inputs (and the hashes) collected from websocket. + std::list inputs; // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. @@ -35,9 +36,9 @@ struct connected_user * @param _pubkey The public key of the user in binary format. */ connected_user(sock::socket_session *_session, std::string_view _pubkey) + : pubkey(_pubkey) { session = _session; - pubkey = _pubkey; } }; @@ -52,12 +53,12 @@ extern std::mutex users_mutex; // Mutex for users access race conditions. * Keep track of verification-pending challenges issued to newly connected users. * Map key: User socket session id () */ -extern std::unordered_map sessionids; +extern std::unordered_map sessionids; /** * Keep track of verification-pending challenges issued to newly connected users. */ -extern std::unordered_map pending_challenges; +extern std::unordered_map pending_challenges; int init(); diff --git a/src/util.hpp b/src/util.hpp index 0bf87b51..1cafb567 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -4,6 +4,7 @@ #include #include #include +#include "crypto.hpp" /** * Contains helper functions and data structures used by multiple other subsystems. @@ -46,6 +47,35 @@ int version_compare(const std::string &x, const std::string &y); std::string_view getsv(const rapidjson::Value &v); +/** + * Represents a data buffer which calculates the hash of the buffer. + */ +struct hash_buffer +{ + std::string hash; + std::string buffer; + + hash_buffer(std::string_view data) + { + buffer = data; + } + + hash_buffer(std::string_view data, std::string_view hashprefix) + { + buffer = data; + + std::string timestr = std::to_string(get_epoch_milliseconds()); + + std::string stringtohash; + stringtohash.reserve(hashprefix.length() + buffer.length() + timestr.length()); + stringtohash.append(hashprefix); + stringtohash.append(buffer); + stringtohash.append(timestr); + + hash = crypto::sha_512_hash(stringtohash); + } +}; + } // namespace util #endif \ No newline at end of file