Read requests synchronous replies. (#361)

This commit is contained in:
Ravin Perera
2022-02-26 07:48:02 +05:30
committed by GitHub
parent d5f0c1e664
commit 5c349dfa95
14 changed files with 136 additions and 66 deletions

65
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,65 @@
{
"files.associations": {
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"cinttypes": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"ratio": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"utility": "cpp",
"fstream": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"new": "cpp",
"ostream": "cpp",
"shared_mutex": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"thread": "cpp",
"typeinfo": "cpp",
"valarray": "cpp",
"variant": "cpp"
}
}

View File

@@ -23,18 +23,18 @@ int main(int argc, char **argv)
const void *input_mmap = hp_init_user_input_mmap();
// Iterate through all users.
for (int u = 0; u < ctx->users.count; u++)
for (size_t u = 0; u < ctx->users.count; u++)
{
const struct hp_user *user = &ctx->users.list[u];
// Iterate through all inputs from this user.
for (int i = 0; i < user->inputs.count; i++)
for (size_t i = 0; i < user->inputs.count; i++)
{
const struct hp_user_input input = user->inputs.list[i];
// Instead of mmap, we can also read the inputs from 'ctx->users.in_fd' using file I/O.
// However, using mmap is recommended because user inputs already reside in memory.
const void *buf = input_mmap + input.offset;
const uint8_t *buf = (uint8_t *)input_mmap + input.offset;
process_user_message(user, buf, input.size);
}

View File

@@ -30,37 +30,14 @@
}
},
"hotpocket-js-client": {
"version": "0.5.1",
"resolved": "https://registry.npmjs.org/hotpocket-js-client/-/hotpocket-js-client-0.5.1.tgz",
"integrity": "sha512-l11Fh6+Qia3gzk/latGf1ePoemIp/XO6L73mvo3b3XEyEF93Ew6+DxQzCsCYmXWEsSp5WgYsma5NOPvVMypDEw==",
"version": "0.5.2",
"resolved": "https://registry.npmjs.org/hotpocket-js-client/-/hotpocket-js-client-0.5.2.tgz",
"integrity": "sha512-WjixO8nmpAvrPdPVPT5LbMVNw4MBSD8iLjVWW4YvKVPy9hxZqFzORFZz6qD+Asx+6KQMncMsCxkXJGJ2QxKCOA==",
"requires": {
"blake3": "2.1.4",
"bson": "4.5.3",
"libsodium-wrappers": "0.7.9",
"ws": "8.2.3"
},
"dependencies": {
"bson": {
"version": "4.5.3",
"resolved": "https://registry.npmjs.org/bson/-/bson-4.5.3.tgz",
"integrity": "sha512-qVX7LX79Mtj7B3NPLzCfBiCP6RAsjiV8N63DjlaVVpZW+PFoDTxQ4SeDbSpcqgE6mXksM5CAwZnXxxxn/XwC0g==",
"requires": {
"buffer": "^5.6.0"
}
},
"libsodium-wrappers": {
"version": "0.7.9",
"resolved": "https://registry.npmjs.org/libsodium-wrappers/-/libsodium-wrappers-0.7.9.tgz",
"integrity": "sha512-9HaAeBGk1nKTRFRHkt7nzxqCvnkWTjn1pdjKgcUnZxj0FyOP4CnhgFhMdrFfgNsukijBGyBLpP2m2uKT1vuWhQ==",
"requires": {
"libsodium": "^0.7.0"
}
},
"ws": {
"version": "8.2.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz",
"integrity": "sha512-wBuoj1BDpC6ZQ1B7DWQBYVLphPWkm8i9Y0/3YdHjHKHiohOJ1ws+3OccDWtH+PoC9DZD5WOTrJvNbWvjS6JWaA=="
}
}
},
"ieee754": {
@@ -69,9 +46,22 @@
"integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA=="
},
"libsodium": {
"version": "0.7.6",
"resolved": "https://registry.npmjs.org/libsodium/-/libsodium-0.7.6.tgz",
"integrity": "sha512-hPb/04sEuLcTRdWDtd+xH3RXBihpmbPCsKW/Jtf4PsvdyKh+D6z2D2gvp/5BfoxseP+0FCOg66kE+0oGUE/loQ=="
"version": "0.7.9",
"resolved": "https://registry.npmjs.org/libsodium/-/libsodium-0.7.9.tgz",
"integrity": "sha512-gfeADtR4D/CM0oRUviKBViMGXZDgnFdMKMzHsvBdqLBHd9ySi6EtYnmuhHVDDYgYpAO8eU8hEY+F8vIUAPh08A=="
},
"libsodium-wrappers": {
"version": "0.7.9",
"resolved": "https://registry.npmjs.org/libsodium-wrappers/-/libsodium-wrappers-0.7.9.tgz",
"integrity": "sha512-9HaAeBGk1nKTRFRHkt7nzxqCvnkWTjn1pdjKgcUnZxj0FyOP4CnhgFhMdrFfgNsukijBGyBLpP2m2uKT1vuWhQ==",
"requires": {
"libsodium": "^0.7.0"
}
},
"ws": {
"version": "8.2.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz",
"integrity": "sha512-wBuoj1BDpC6ZQ1B7DWQBYVLphPWkm8i9Y0/3YdHjHKHiohOJ1ws+3OccDWtH+PoC9DZD5WOTrJvNbWvjS6JWaA=="
}
}
}

View File

@@ -1,6 +1,6 @@
{
"dependencies": {
"hotpocket-js-client": "0.5.1",
"hotpocket-js-client": "0.5.2",
"bson": "4.5.3"
}
}

View File

@@ -60,12 +60,6 @@ async function main() {
});
})
// This will get fired when contract sends a read response.
hpc.on(HotPocket.events.contractReadResponse, (o) => {
const outputLog = o.length <= 512 ? o : `[Big output (${o.length / 1024} KB)]`;
console.log("Read response>> " + outputLog);
})
// This will get fired when the unl public key list changes.
hpc.on(HotPocket.events.unlChange, (unl) => {
console.log("New unl received:");
@@ -114,7 +108,7 @@ async function main() {
if (inp.length > 0) {
if (inp.startsWith("read ")) {
hpc.sendContractReadRequest(inp.substr(5));
hpc.submitContractReadRequest(inp.substr(5)).then(reply => console.log(reply));
}
else if (inp.startsWith("ledger ")) {
hpc.getLedgerBySeqNo(parseInt(inp.substr(7)), true, true)

View File

@@ -167,16 +167,19 @@ namespace msg::usrmsg::bson
* Message format:
* {
* "type": "contract_read_response",
* "reply_for": "<corresponding request id>",
* "content": <contract output>
* }
* @param content The contract binary output content to be put in the message.
*/
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content)
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_CONTRACT_READ_RESPONSE);
encoder.key(msg::usrmsg::FLD_REPLY_FOR);
encoder.string_value(reply_for);
encoder.key(msg::usrmsg::FLD_CONTENT);
encoder.byte_string_value(content);
encoder.end_object();
@@ -469,18 +472,26 @@ namespace msg::usrmsg::bson
* Accepted signed input container format:
* {
* "type": "contract_read_request",
* "id": "<any string>",
* "content": <binary buffer>
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_read_request(std::string &extracted_content, const jsoncons::ojson &d)
int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d)
{
if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is<std::string>())
{
LOG_DEBUG << "Read request 'id' field missing or invalid.";
return -1;
}
if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is_byte_string_view())
{
LOG_DEBUG << "Read request 'content' field missing or invalid.";
return -1;
}
extracted_id = d[msg::usrmsg::FLD_ID].as<std::string>();
const jsoncons::byte_string_view &bsv = d[msg::usrmsg::FLD_CONTENT].as_byte_string_view();
extracted_content = std::string_view(reinterpret_cast<const char *>(bsv.data()), bsv.size());
return 0;

View File

@@ -17,7 +17,7 @@ namespace msg::usrmsg::bson
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view hash, const ::std::vector<std::string> &outputs,
const util::merkle_hash_node &hash_root, const std::vector<std::pair<std::string, std::string>> &unl_sig,
@@ -41,7 +41,7 @@ namespace msg::usrmsg::bson
int extract_type(std::string &extracted_type, const jsoncons::ojson &d);
int extract_read_request(std::string &extracted_content, const jsoncons::ojson &d);
int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d);
int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig,
const jsoncons::ojson &d);

View File

@@ -331,11 +331,12 @@ namespace msg::usrmsg::json
* Message format:
* {
* "type": "contract_read_response",
* "reply_for": "<corresponding request id>",
* "content": "<response string>"
* }
* @param content The contract binary output content to be put in the message.
*/
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content)
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content)
{
msg.reserve(content.size() + 256);
msg += "{\"";
@@ -343,6 +344,10 @@ namespace msg::usrmsg::json
msg += SEP_COLON;
msg += msg::usrmsg::MSGTYPE_CONTRACT_READ_RESPONSE;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_REPLY_FOR;
msg += SEP_COLON;
msg += reply_for;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_CONTENT;
msg += SEP_COLON_NOQUOTE;
@@ -843,24 +848,26 @@ namespace msg::usrmsg::json
* Accepted signed input container format:
* {
* "type": "contract_read_request",
* "id": "<any string>",
* "content": "<any string>"
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_read_request(std::string &extracted_content, const jsoncons::json &d)
int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d)
{
if (!d.contains(msg::usrmsg::FLD_CONTENT))
if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is<std::string>())
{
LOG_DEBUG << "Read request required fields missing.";
LOG_DEBUG << "Read request 'id' field missing or invalid.";
return -1;
}
if (!d[msg::usrmsg::FLD_CONTENT].is<std::string>())
if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is<std::string>())
{
LOG_DEBUG << "Read request invalid field values.";
LOG_DEBUG << "Read request 'content' field missing or invalid.";
return -1;
}
extracted_id = d[msg::usrmsg::FLD_ID].as<std::string>();
extracted_content = d[msg::usrmsg::FLD_CONTENT].as<std::string>();
return 0;
}

View File

@@ -21,7 +21,7 @@ namespace msg::usrmsg::json
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view hash, const ::std::vector<std::string> &outputs,
const util::merkle_hash_node &hash_root, const std::vector<std::pair<std::string, std::string>> &unl_sig,
@@ -45,7 +45,7 @@ namespace msg::usrmsg::json
int extract_type(std::string &extracted_type, const jsoncons::json &d);
int extract_read_request(std::string &extracted_content, const jsoncons::json &d);
int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d);
int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig,
const jsoncons::json &d);

View File

@@ -38,12 +38,12 @@ namespace msg::usrmsg
busrmsg::create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash);
}
void usrmsg_parser::create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content) const
void usrmsg_parser::create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_contract_read_response_container(msg, content);
jusrmsg::create_contract_read_response_container(msg, reply_for, content);
else
busrmsg::create_contract_read_response_container(msg, content);
busrmsg::create_contract_read_response_container(msg, reply_for, content);
}
void usrmsg_parser::create_contract_output_container(std::vector<uint8_t> &msg, std::string_view hash, const ::std::vector<std::string> &outputs,
@@ -113,12 +113,12 @@ namespace msg::usrmsg
return busrmsg::extract_type(extracted_type, bdoc);
}
int usrmsg_parser::extract_read_request(std::string &extracted_content) const
int usrmsg_parser::extract_read_request(std::string &extracted_id, std::string &extracted_content) const
{
if (protocol == util::PROTOCOL::JSON)
return jusrmsg::extract_read_request(extracted_content, jdoc);
return jusrmsg::extract_read_request(extracted_id, extracted_content, jdoc);
else
return busrmsg::extract_read_request(extracted_content, bdoc);
return busrmsg::extract_read_request(extracted_id, extracted_content, bdoc);
}
int usrmsg_parser::extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const

View File

@@ -26,7 +26,7 @@ namespace msg::usrmsg
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) const;
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content) const;
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content) const;
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view hash, const ::std::vector<std::string> &outputs,
const util::merkle_hash_node &hash_root, const std::vector<std::pair<std::string, std::string>> &unl_sig,
@@ -47,7 +47,7 @@ namespace msg::usrmsg
int extract_type(std::string &extracted_type) const;
int extract_read_request(std::string &extracted_content) const;
int extract_read_request(std::string &extracted_id, std::string &extracted_content) const;
int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const;

View File

@@ -157,7 +157,7 @@ namespace read_req
for (sc::contract_output &output : user_buf_itr->second.outputs)
{
std::vector<uint8_t> msg;
parser.create_contract_read_response_container(msg, output.message);
parser.create_contract_read_response_container(msg, read_request.id, output.message);
user.session.send(msg);
output.message.clear();
}
@@ -193,12 +193,14 @@ namespace read_req
/**
* Add new read request from users to the read request queue for processing.
* @param pubkey Public key of the user.
* @param id Message id (used to associate replies).
* @param content Message content.
* @return 0 on successful addition and -1 on queue overflow
*/
int populate_read_req_queue(const std::string &pubkey, const std::string &content)
int populate_read_req_queue(const std::string &pubkey, const std::string &id, const std::string &content)
{
user_read_req read_request;
read_request.id = id;
read_request.content = read_req_store.write_buf(content.data(), content.size());
read_request.pubkey = pubkey;

View File

@@ -9,6 +9,7 @@ namespace read_req
struct user_read_req
{
std::string pubkey;
std::string id;
util::buffer_view content;
};
@@ -20,7 +21,7 @@ namespace read_req
void read_request_processor();
int populate_read_req_queue(const std::string &pubkey, const std::string &content);
int populate_read_req_queue(const std::string &pubkey, const std::string &id, const std::string &content);
void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx);

View File

@@ -148,10 +148,10 @@ namespace usr
if (!conf::cfg.contract.execute || conf::cfg.user.concurrent_read_requests == 0)
return 0;
std::string content;
if (parser.extract_read_request(content) != -1)
std::string id, content;
if (parser.extract_read_request(id, content) != -1)
{
if (read_req::populate_read_req_queue(user.pubkey, std::move(content)) == -1)
if (read_req::populate_read_req_queue(user.pubkey, std::move(id), std::move(content)) == -1)
{
LOG_WARNING << "Failed to enqueue read request.";
}