mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Optimized user pipes life time. (#26)
Optimized user-contract I/O pipes.
This commit is contained in:
@@ -1,8 +1,6 @@
|
||||
#include <cstdio>
|
||||
#include <iostream>
|
||||
#include <unistd.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/types.h>
|
||||
#include <rapidjson/document.h>
|
||||
#include <rapidjson/stringbuffer.h>
|
||||
#include <rapidjson/writer.h>
|
||||
@@ -23,7 +21,7 @@ namespace usr
|
||||
* Global user list. (Exposed to other sub systems)
|
||||
* Map key: User socket session id (<ip:port>)
|
||||
*/
|
||||
std::map<std::string, util::contract_user, std::less<>> users;
|
||||
std::map<std::string, usr::contract_user, std::less<>> users;
|
||||
|
||||
/**
|
||||
* Keep track of verification-pending challenges issued to newly connected users.
|
||||
@@ -48,10 +46,10 @@ std::thread listener_thread;
|
||||
|
||||
// Challenge response fields.
|
||||
// These fields are used on challenge response validation.
|
||||
static const char* CHALLENGE_RESP_TYPE = "type";
|
||||
static const char* CHALLENGE_RESP_CHALLENGE = "challenge";
|
||||
static const char* CHALLENGE_RESP_SIG = "sig";
|
||||
static const char* CHALLENGE_RESP_PUBKEY = "pubkey";
|
||||
static const char *CHALLENGE_RESP_TYPE = "type";
|
||||
static const char *CHALLENGE_RESP_CHALLENGE = "challenge";
|
||||
static const char *CHALLENGE_RESP_SIG = "sig";
|
||||
static const char *CHALLENGE_RESP_PUBKEY = "pubkey";
|
||||
|
||||
// Message type for the user challenge.
|
||||
static const char *CHALLENGE_MSGTYPE = "public_challenge";
|
||||
@@ -168,9 +166,9 @@ int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string
|
||||
|
||||
// Verify the challenge signature. We do this last due to signature verification cost.
|
||||
if (crypto::verify_b64(
|
||||
original_challenge,
|
||||
util::getsv(d[CHALLENGE_RESP_SIG]),
|
||||
util::getsv(d[CHALLENGE_RESP_PUBKEY])) != 0)
|
||||
original_challenge,
|
||||
util::getsv(d[CHALLENGE_RESP_SIG]),
|
||||
util::getsv(d[CHALLENGE_RESP_PUBKEY])) != 0)
|
||||
{
|
||||
std::cerr << "User challenge response signature verification failed.\n";
|
||||
return -1;
|
||||
@@ -195,30 +193,7 @@ int add_user(std::string_view sessionid, std::string_view pubkeyb64)
|
||||
return -1;
|
||||
}
|
||||
|
||||
//Establish the I/O pipes for [User <--> SC] channel.
|
||||
|
||||
//inpipe: User will write input to this and contract will read user-input from this.
|
||||
int inpipe[2];
|
||||
if (pipe(inpipe) != 0)
|
||||
{
|
||||
std::cerr << "User in pipe creation failed. sessionid:" << sessionid << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
//outpipe: Contract will write output for the user to this and user will read from this.
|
||||
int outpipe[2];
|
||||
if (pipe(outpipe) != 0)
|
||||
{
|
||||
std::cerr << "User out pipe creation failed. sessionid:" << sessionid << std::endl;
|
||||
|
||||
//We need to close 'inpipe' in case outpipe failed.
|
||||
close(inpipe[0]);
|
||||
close(inpipe[1]);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
users.emplace(sessionid, util::contract_user(pubkeyb64, inpipe, outpipe));
|
||||
users.emplace(sessionid, usr::contract_user(pubkeyb64));
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -238,52 +213,19 @@ int remove_user(std::string_view sessionid)
|
||||
return -1;
|
||||
}
|
||||
|
||||
const util::contract_user &user = itr->second;
|
||||
usr::contract_user &user = itr->second;
|
||||
|
||||
//Close the User <--> SC I/O pipes.
|
||||
close(user.inpipe[0]);
|
||||
close(user.inpipe[1]);
|
||||
close(user.outpipe[0]);
|
||||
close(user.outpipe[1]);
|
||||
|
||||
users.erase(itr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all per-user outputs produced by the contract process and store them in
|
||||
* the user buffer for later processing.
|
||||
*
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int read_contract_user_outputs()
|
||||
{
|
||||
//Read any outputs that has been written by the contract process
|
||||
//from all the user outpipes and store in the outbuffer of each user.
|
||||
//User outbuffer will be read by the consensus process later when it wishes so.
|
||||
|
||||
//Future optmization: Read and populate user buffers parallely.
|
||||
//Currently this is sequential for simplicity which will not scale well
|
||||
//when there are large number of users connected to the same HP node.
|
||||
|
||||
for (auto &[sid, user] : users)
|
||||
// Close any open fds for this user.
|
||||
for (int i = 0; i < 4; i++)
|
||||
{
|
||||
int fdout = user.outpipe[0];
|
||||
int bytes_available = 0;
|
||||
ioctl(fdout, FIONREAD, &bytes_available);
|
||||
|
||||
if (bytes_available > 0)
|
||||
if (user.fds[i] > 0)
|
||||
{
|
||||
char data[bytes_available];
|
||||
read(fdout, data, bytes_available);
|
||||
|
||||
//Populate the user output buffer with new data
|
||||
user.outbuffer = std::string(data, bytes_available);
|
||||
|
||||
std::cout << "Read " + std::to_string(bytes_available) << " bytes into user output buffer. user:" + user.pubkeyb64 << std::endl;
|
||||
close(user.fds[i]);
|
||||
user.fds[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
users.erase(itr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user