From 5c349dfa9575770b282c5dd0ddb781d362ceab67 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Sat, 26 Feb 2022 07:48:02 +0530 Subject: [PATCH] Read requests synchronous replies. (#361) --- .vscode/settings.json | 65 ++++++++++++++++++++++++++++ examples/c_contract/echo_contract.c | 6 +-- examples/js_client/package-lock.json | 48 ++++++++------------ examples/js_client/package.json | 2 +- examples/js_client/text-client.js | 8 +--- src/msg/bson/usrmsg_bson.cpp | 15 ++++++- src/msg/bson/usrmsg_bson.hpp | 4 +- src/msg/json/usrmsg_json.cpp | 19 +++++--- src/msg/json/usrmsg_json.hpp | 4 +- src/msg/usrmsg_parser.cpp | 12 ++--- src/msg/usrmsg_parser.hpp | 4 +- src/usr/read_req.cpp | 6 ++- src/usr/read_req.hpp | 3 +- src/usr/usr.cpp | 6 +-- 14 files changed, 136 insertions(+), 66 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..1e6904b6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,65 @@ +{ + "files.associations": { + "cctype": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "*.tcc": "cpp", + "bitset": "cpp", + "chrono": "cpp", + "cinttypes": "cpp", + "condition_variable": "cpp", + "cstdint": "cpp", + "deque": "cpp", + "list": "cpp", + "map": "cpp", + "set": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "string": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "fstream": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "ostream": "cpp", + "shared_mutex": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "streambuf": "cpp", + "thread": "cpp", + "typeinfo": "cpp", + "valarray": "cpp", + "variant": "cpp" + } +} \ No newline at end of file diff --git a/examples/c_contract/echo_contract.c b/examples/c_contract/echo_contract.c index 2448c23e..b79a5f49 100644 --- a/examples/c_contract/echo_contract.c +++ b/examples/c_contract/echo_contract.c @@ -23,18 +23,18 @@ int main(int argc, char **argv) const void *input_mmap = hp_init_user_input_mmap(); // Iterate through all users. - for (int u = 0; u < ctx->users.count; u++) + for (size_t u = 0; u < ctx->users.count; u++) { const struct hp_user *user = &ctx->users.list[u]; // Iterate through all inputs from this user. - for (int i = 0; i < user->inputs.count; i++) + for (size_t i = 0; i < user->inputs.count; i++) { const struct hp_user_input input = user->inputs.list[i]; // Instead of mmap, we can also read the inputs from 'ctx->users.in_fd' using file I/O. // However, using mmap is recommended because user inputs already reside in memory. - const void *buf = input_mmap + input.offset; + const uint8_t *buf = (uint8_t *)input_mmap + input.offset; process_user_message(user, buf, input.size); } diff --git a/examples/js_client/package-lock.json b/examples/js_client/package-lock.json index ce553e59..a96de439 100644 --- a/examples/js_client/package-lock.json +++ b/examples/js_client/package-lock.json @@ -30,37 +30,14 @@ } }, "hotpocket-js-client": { - "version": "0.5.1", - "resolved": "https://registry.npmjs.org/hotpocket-js-client/-/hotpocket-js-client-0.5.1.tgz", - "integrity": "sha512-l11Fh6+Qia3gzk/latGf1ePoemIp/XO6L73mvo3b3XEyEF93Ew6+DxQzCsCYmXWEsSp5WgYsma5NOPvVMypDEw==", + "version": "0.5.2", + "resolved": "https://registry.npmjs.org/hotpocket-js-client/-/hotpocket-js-client-0.5.2.tgz", + "integrity": "sha512-WjixO8nmpAvrPdPVPT5LbMVNw4MBSD8iLjVWW4YvKVPy9hxZqFzORFZz6qD+Asx+6KQMncMsCxkXJGJ2QxKCOA==", "requires": { "blake3": "2.1.4", "bson": "4.5.3", "libsodium-wrappers": "0.7.9", "ws": "8.2.3" - }, - "dependencies": { - "bson": { - "version": "4.5.3", - "resolved": "https://registry.npmjs.org/bson/-/bson-4.5.3.tgz", - "integrity": "sha512-qVX7LX79Mtj7B3NPLzCfBiCP6RAsjiV8N63DjlaVVpZW+PFoDTxQ4SeDbSpcqgE6mXksM5CAwZnXxxxn/XwC0g==", - "requires": { - "buffer": "^5.6.0" - } - }, - "libsodium-wrappers": { - "version": "0.7.9", - "resolved": "https://registry.npmjs.org/libsodium-wrappers/-/libsodium-wrappers-0.7.9.tgz", - "integrity": "sha512-9HaAeBGk1nKTRFRHkt7nzxqCvnkWTjn1pdjKgcUnZxj0FyOP4CnhgFhMdrFfgNsukijBGyBLpP2m2uKT1vuWhQ==", - "requires": { - "libsodium": "^0.7.0" - } - }, - "ws": { - "version": "8.2.3", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz", - "integrity": "sha512-wBuoj1BDpC6ZQ1B7DWQBYVLphPWkm8i9Y0/3YdHjHKHiohOJ1ws+3OccDWtH+PoC9DZD5WOTrJvNbWvjS6JWaA==" - } } }, "ieee754": { @@ -69,9 +46,22 @@ "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==" }, "libsodium": { - "version": "0.7.6", - "resolved": "https://registry.npmjs.org/libsodium/-/libsodium-0.7.6.tgz", - "integrity": "sha512-hPb/04sEuLcTRdWDtd+xH3RXBihpmbPCsKW/Jtf4PsvdyKh+D6z2D2gvp/5BfoxseP+0FCOg66kE+0oGUE/loQ==" + "version": "0.7.9", + "resolved": "https://registry.npmjs.org/libsodium/-/libsodium-0.7.9.tgz", + "integrity": "sha512-gfeADtR4D/CM0oRUviKBViMGXZDgnFdMKMzHsvBdqLBHd9ySi6EtYnmuhHVDDYgYpAO8eU8hEY+F8vIUAPh08A==" + }, + "libsodium-wrappers": { + "version": "0.7.9", + "resolved": "https://registry.npmjs.org/libsodium-wrappers/-/libsodium-wrappers-0.7.9.tgz", + "integrity": "sha512-9HaAeBGk1nKTRFRHkt7nzxqCvnkWTjn1pdjKgcUnZxj0FyOP4CnhgFhMdrFfgNsukijBGyBLpP2m2uKT1vuWhQ==", + "requires": { + "libsodium": "^0.7.0" + } + }, + "ws": { + "version": "8.2.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz", + "integrity": "sha512-wBuoj1BDpC6ZQ1B7DWQBYVLphPWkm8i9Y0/3YdHjHKHiohOJ1ws+3OccDWtH+PoC9DZD5WOTrJvNbWvjS6JWaA==" } } } diff --git a/examples/js_client/package.json b/examples/js_client/package.json index 09225110..94a6045d 100644 --- a/examples/js_client/package.json +++ b/examples/js_client/package.json @@ -1,6 +1,6 @@ { "dependencies": { - "hotpocket-js-client": "0.5.1", + "hotpocket-js-client": "0.5.2", "bson": "4.5.3" } } diff --git a/examples/js_client/text-client.js b/examples/js_client/text-client.js index b707e981..1c3a61d4 100644 --- a/examples/js_client/text-client.js +++ b/examples/js_client/text-client.js @@ -60,12 +60,6 @@ async function main() { }); }) - // This will get fired when contract sends a read response. - hpc.on(HotPocket.events.contractReadResponse, (o) => { - const outputLog = o.length <= 512 ? o : `[Big output (${o.length / 1024} KB)]`; - console.log("Read response>> " + outputLog); - }) - // This will get fired when the unl public key list changes. hpc.on(HotPocket.events.unlChange, (unl) => { console.log("New unl received:"); @@ -114,7 +108,7 @@ async function main() { if (inp.length > 0) { if (inp.startsWith("read ")) { - hpc.sendContractReadRequest(inp.substr(5)); + hpc.submitContractReadRequest(inp.substr(5)).then(reply => console.log(reply)); } else if (inp.startsWith("ledger ")) { hpc.getLedgerBySeqNo(parseInt(inp.substr(7)), true, true) diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index d563194b..2300886c 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -167,16 +167,19 @@ namespace msg::usrmsg::bson * Message format: * { * "type": "contract_read_response", + * "reply_for": "", * "content": * } * @param content The contract binary output content to be put in the message. */ - void create_contract_read_response_container(std::vector &msg, std::string_view content) + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content) { jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); encoder.key(msg::usrmsg::FLD_TYPE); encoder.string_value(msg::usrmsg::MSGTYPE_CONTRACT_READ_RESPONSE); + encoder.key(msg::usrmsg::FLD_REPLY_FOR); + encoder.string_value(reply_for); encoder.key(msg::usrmsg::FLD_CONTENT); encoder.byte_string_value(content); encoder.end_object(); @@ -469,18 +472,26 @@ namespace msg::usrmsg::bson * Accepted signed input container format: * { * "type": "contract_read_request", + * "id": "", * "content": * } * @return 0 on successful extraction. -1 for failure. */ - int extract_read_request(std::string &extracted_content, const jsoncons::ojson &d) + int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d) { + if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is()) + { + LOG_DEBUG << "Read request 'id' field missing or invalid."; + return -1; + } + if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is_byte_string_view()) { LOG_DEBUG << "Read request 'content' field missing or invalid."; return -1; } + extracted_id = d[msg::usrmsg::FLD_ID].as(); const jsoncons::byte_string_view &bsv = d[msg::usrmsg::FLD_CONTENT].as_byte_string_view(); extracted_content = std::string_view(reinterpret_cast(bsv.data()), bsv.size()); return 0; diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index 66317228..e5ac95dd 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -17,7 +17,7 @@ namespace msg::usrmsg::bson void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); - void create_contract_read_response_container(std::vector &msg, std::string_view content); + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content); void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, @@ -41,7 +41,7 @@ namespace msg::usrmsg::bson int extract_type(std::string &extracted_type, const jsoncons::ojson &d); - int extract_read_request(std::string &extracted_content, const jsoncons::ojson &d); + int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d); int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig, const jsoncons::ojson &d); diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index c0a18829..069854c9 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -331,11 +331,12 @@ namespace msg::usrmsg::json * Message format: * { * "type": "contract_read_response", + * "reply_for": "", * "content": "" * } * @param content The contract binary output content to be put in the message. */ - void create_contract_read_response_container(std::vector &msg, std::string_view content) + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content) { msg.reserve(content.size() + 256); msg += "{\""; @@ -343,6 +344,10 @@ namespace msg::usrmsg::json msg += SEP_COLON; msg += msg::usrmsg::MSGTYPE_CONTRACT_READ_RESPONSE; msg += SEP_COMMA; + msg += msg::usrmsg::FLD_REPLY_FOR; + msg += SEP_COLON; + msg += reply_for; + msg += SEP_COMMA; msg += msg::usrmsg::FLD_CONTENT; msg += SEP_COLON_NOQUOTE; @@ -843,24 +848,26 @@ namespace msg::usrmsg::json * Accepted signed input container format: * { * "type": "contract_read_request", + * "id": "", * "content": "" * } * @return 0 on successful extraction. -1 for failure. */ - int extract_read_request(std::string &extracted_content, const jsoncons::json &d) + int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d) { - if (!d.contains(msg::usrmsg::FLD_CONTENT)) + if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is()) { - LOG_DEBUG << "Read request required fields missing."; + LOG_DEBUG << "Read request 'id' field missing or invalid."; return -1; } - if (!d[msg::usrmsg::FLD_CONTENT].is()) + if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is()) { - LOG_DEBUG << "Read request invalid field values."; + LOG_DEBUG << "Read request 'content' field missing or invalid."; return -1; } + extracted_id = d[msg::usrmsg::FLD_ID].as(); extracted_content = d[msg::usrmsg::FLD_CONTENT].as(); return 0; } diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index 06eb2ae5..f959b8be 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -21,7 +21,7 @@ namespace msg::usrmsg::json void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); - void create_contract_read_response_container(std::vector &msg, std::string_view content); + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content); void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, @@ -45,7 +45,7 @@ namespace msg::usrmsg::json int extract_type(std::string &extracted_type, const jsoncons::json &d); - int extract_read_request(std::string &extracted_content, const jsoncons::json &d); + int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d); int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig, const jsoncons::json &d); diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index ba85be28..48a0b8ba 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -38,12 +38,12 @@ namespace msg::usrmsg busrmsg::create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash); } - void usrmsg_parser::create_contract_read_response_container(std::vector &msg, std::string_view content) const + void usrmsg_parser::create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content) const { if (protocol == util::PROTOCOL::JSON) - jusrmsg::create_contract_read_response_container(msg, content); + jusrmsg::create_contract_read_response_container(msg, reply_for, content); else - busrmsg::create_contract_read_response_container(msg, content); + busrmsg::create_contract_read_response_container(msg, reply_for, content); } void usrmsg_parser::create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, @@ -113,12 +113,12 @@ namespace msg::usrmsg return busrmsg::extract_type(extracted_type, bdoc); } - int usrmsg_parser::extract_read_request(std::string &extracted_content) const + int usrmsg_parser::extract_read_request(std::string &extracted_id, std::string &extracted_content) const { if (protocol == util::PROTOCOL::JSON) - return jusrmsg::extract_read_request(extracted_content, jdoc); + return jusrmsg::extract_read_request(extracted_id, extracted_content, jdoc); else - return busrmsg::extract_read_request(extracted_content, bdoc); + return busrmsg::extract_read_request(extracted_id, extracted_content, bdoc); } int usrmsg_parser::extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index 0535e4f4..51ca0af9 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -26,7 +26,7 @@ namespace msg::usrmsg void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) const; - void create_contract_read_response_container(std::vector &msg, std::string_view content) const; + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content) const; void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, @@ -47,7 +47,7 @@ namespace msg::usrmsg int extract_type(std::string &extracted_type) const; - int extract_read_request(std::string &extracted_content) const; + int extract_read_request(std::string &extracted_id, std::string &extracted_content) const; int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const; diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index 8045f73b..9bd93867 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -157,7 +157,7 @@ namespace read_req for (sc::contract_output &output : user_buf_itr->second.outputs) { std::vector msg; - parser.create_contract_read_response_container(msg, output.message); + parser.create_contract_read_response_container(msg, read_request.id, output.message); user.session.send(msg); output.message.clear(); } @@ -193,12 +193,14 @@ namespace read_req /** * Add new read request from users to the read request queue for processing. * @param pubkey Public key of the user. + * @param id Message id (used to associate replies). * @param content Message content. * @return 0 on successful addition and -1 on queue overflow */ - int populate_read_req_queue(const std::string &pubkey, const std::string &content) + int populate_read_req_queue(const std::string &pubkey, const std::string &id, const std::string &content) { user_read_req read_request; + read_request.id = id; read_request.content = read_req_store.write_buf(content.data(), content.size()); read_request.pubkey = pubkey; diff --git a/src/usr/read_req.hpp b/src/usr/read_req.hpp index 9721be46..16032fed 100644 --- a/src/usr/read_req.hpp +++ b/src/usr/read_req.hpp @@ -9,6 +9,7 @@ namespace read_req struct user_read_req { std::string pubkey; + std::string id; util::buffer_view content; }; @@ -20,7 +21,7 @@ namespace read_req void read_request_processor(); - int populate_read_req_queue(const std::string &pubkey, const std::string &content); + int populate_read_req_queue(const std::string &pubkey, const std::string &id, const std::string &content); void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx); diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index d47fd6ba..91bd384a 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -148,10 +148,10 @@ namespace usr if (!conf::cfg.contract.execute || conf::cfg.user.concurrent_read_requests == 0) return 0; - std::string content; - if (parser.extract_read_request(content) != -1) + std::string id, content; + if (parser.extract_read_request(id, content) != -1) { - if (read_req::populate_read_req_queue(user.pubkey, std::move(content)) == -1) + if (read_req::populate_read_req_queue(user.pubkey, std::move(id), std::move(content)) == -1) { LOG_WARNING << "Failed to enqueue read request."; }