Contract stdout/err logging. (#233)

This commit is contained in:
Ravin Perera
2021-02-02 20:58:39 +05:30
committed by GitHub
parent d08d2630f6
commit 9bca6700ae
5 changed files with 121 additions and 44 deletions

View File

@@ -109,7 +109,7 @@ namespace conf
if (util::create_dir_tree_recursive(ctx.config_dir) == -1 ||
util::create_dir_tree_recursive(ctx.hist_dir) == -1 ||
util::create_dir_tree_recursive(ctx.full_hist_dir) == -1 ||
util::create_dir_tree_recursive(ctx.log_dir) == -1 ||
util::create_dir_tree_recursive(ctx.contract_log_dir) == -1 ||
util::create_dir_tree_recursive(ctx.hpfs_dir + "/seed" + hpfs::STATE_DIR_PATH) == -1 ||
util::create_dir_tree_recursive(ctx.hpfs_mount_dir) == -1)
{
@@ -134,6 +134,7 @@ namespace conf
cfg.contract.id = crypto::generate_uuid();
cfg.contract.execute = true;
cfg.contract.log_output = false;
cfg.contract.version = "1.0";
//Add self pubkey to the unl.
cfg.contract.unl.emplace(cfg.node.public_key);
@@ -206,6 +207,7 @@ namespace conf
ctx.hpfs_mount_dir = ctx.hpfs_dir + "/mnt";
ctx.hpfs_rw_dir = ctx.hpfs_mount_dir + "/rw";
ctx.log_dir = basedir + "/log";
ctx.contract_log_dir = ctx.log_dir + "/contract";
}
/**
@@ -773,6 +775,7 @@ namespace conf
{
jdoc.insert_or_assign("id", contract.id);
jdoc.insert_or_assign("execute", contract.execute);
jdoc.insert_or_assign("log_output", contract.log_output);
}
jdoc.insert_or_assign("version", contract.version);
@@ -816,6 +819,7 @@ namespace conf
}
contract.execute = jdoc["execute"].as<bool>();
contract.log_output = jdoc["log_output"].as<bool>();
}
contract.version = jdoc["version"].as<std::string>();

View File

@@ -84,6 +84,7 @@ namespace conf
{
std::string id; // Contract guid.
bool execute; // Whether or not to execute the contract on the node.
bool log_output; // Whether to log stdout/err of the contract process.
std::string version; // Contract version string.
std::set<std::string> unl; // Unique node list (list of binary public keys)
std::string bin_path; // Full path to the contract binary
@@ -140,17 +141,18 @@ namespace conf
std::string hpws_exe_path; // hpws executable file path.
std::string hpfs_exe_path; // hpfs executable file path.
std::string contract_dir; // Contract base directory full path.
std::string full_hist_dir; // Contract full history dir full path.
std::string hist_dir; // Contract ledger history dir full path.
std::string hpfs_dir; // hpfs metdata dir (The location of hpfs log file).
std::string hpfs_mount_dir; // hpfs fuse file system mount path.
std::string hpfs_rw_dir; // hpfs read/write fs session path.
std::string log_dir; // Contract log dir full path.
std::string config_dir; // Config dir full path.
std::string config_file; // Full path to the config file.
std::string tls_key_file; // Full path to the tls private key file.
std::string tls_cert_file; // Full path to the tls certificate.
std::string contract_dir; // Contract base directory full path.
std::string full_hist_dir; // Contract full history dir full path.
std::string hist_dir; // Contract ledger history dir full path.
std::string hpfs_dir; // hpfs metdata dir (The location of hpfs log file).
std::string hpfs_mount_dir; // hpfs fuse file system mount path.
std::string hpfs_rw_dir; // hpfs read/write fs session path.
std::string log_dir; // HotPocket log dir full path.
std::string contract_log_dir; // Contract log dir full path.
std::string config_dir; // Config dir full path.
std::string config_file; // Full path to the config file.
std::string tls_key_file; // Full path to the tls private key file.
std::string tls_cert_file; // Full path to the tls certificate.
int config_fd; // Config file file descriptor.
struct flock config_lock; // Config file lock.

View File

@@ -4,8 +4,10 @@
// Enable boost strack trace.
#define BOOST_STACKTRACE_USE_BACKTRACE
#include <blake3.h>
#include <boost/stacktrace.hpp>
#include <chrono>
#include <concurrentqueue.h>
#include <cstdio>
#include <dirent.h>
#include <fcntl.h>
@@ -20,11 +22,15 @@
#include <math.h>
#include <memory>
#include <mutex>
#include <plog/Log.h>
#include <plog/Appenders/ColorConsoleAppender.h>
#include <poll.h>
#include <queue>
#include <readerwriterqueue/readerwriterqueue.h>
#include <set>
#include <shared_mutex>
#include <sodium.h>
#include <sqlite3.h>
#include <sstream>
#include <stdlib.h>
#include <string>
@@ -44,11 +50,5 @@
#include <unordered_set>
#include <variant>
#include <vector>
#include <blake3.h>
#include <concurrentqueue.h>
#include <plog/Log.h>
#include <plog/Appenders/ColorConsoleAppender.h>
#include <sqlite3.h>
#include <queue>
#endif

View File

@@ -12,7 +12,10 @@
namespace sc
{
const uint32_t READ_BUFFER_SIZE = 128 * 1024; // This has to be minimum 128KB to support sequence packets.
constexpr uint32_t READ_BUFFER_SIZE = 128 * 1024; // This has to be minimum 128KB to support sequence packets.
constexpr int FILE_PERMS = 0644;
constexpr const char *STDOUT_LOG = ".stdout.log";
constexpr const char *STDERR_LOG = ".stderr.log";
/**
* Executes the contract process and passes the specified context arguments.
@@ -26,9 +29,9 @@ namespace sc
// Create the IO sockets for users, control channel and npl.
// (Note: User socket will only be used for contract output only. For feeding user inputs we are using a memfd.)
if (create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs) == -1 ||
create_iosockets(ctx.controlfds, SOCK_SEQPACKET) == -1 ||
(!ctx.args.readonly && create_iosockets(ctx.nplfds, SOCK_SEQPACKET) == -1))
if (create_iosockets_for_fdmap(ctx.user_fds, ctx.args.userbufs) == -1 ||
create_iosockets(ctx.control_fds, SOCK_SEQPACKET) == -1 ||
(!ctx.args.readonly && create_iosockets(ctx.npl_fds, SOCK_SEQPACKET) == -1))
{
cleanup_fds(ctx);
stop_hpfs_session(ctx);
@@ -92,6 +95,12 @@ namespace sc
const std::string current_dir = hpfs::contract_fs.physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH);
chdir(current_dir.c_str());
if (create_contract_log_files(ctx) == -1)
{
std::cerr << errno << ": Contract process output redirection failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
exit(1);
}
execv(execv_args[0], execv_args);
std::cerr << errno << ": Contract process execv failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
exit(1);
@@ -232,15 +241,15 @@ namespace sc
if (!ctx.args.readonly)
{
os << ",\"lcl\":\"" << ctx.args.lcl
<< "\",\"npl_fd\":" << ctx.nplfds.scfd;
<< "\",\"npl_fd\":" << ctx.npl_fds.scfd;
}
os << ",\"control_fd\":" << ctx.controlfds.scfd;
os << ",\"control_fd\":" << ctx.control_fds.scfd;
os << ",\"user_in_fd\":" << user_inputs_fd
<< ",\"users\":{";
user_json_to_stream(ctx.userfds, ctx.args.userbufs, os);
user_json_to_stream(ctx.user_fds, ctx.args.userbufs, os);
os << "},\"unl\":" << unl::get_json() << "}";
@@ -282,16 +291,16 @@ namespace sc
// Prepare output poll fd list.
// User out fds + control fd + NPL fd (NPL fd not available in readonly mode)
const size_t out_fd_count = ctx.userfds.size() + (ctx.args.readonly ? 1 : 2);
const size_t control_fd_idx = ctx.userfds.size();
const size_t out_fd_count = ctx.user_fds.size() + (ctx.args.readonly ? 1 : 2);
const size_t control_fd_idx = ctx.user_fds.size();
const size_t npl_fd_idx = control_fd_idx + 1;
struct pollfd out_fds[out_fd_count];
auto user_itr = ctx.userfds.begin();
auto user_itr = ctx.user_fds.begin();
for (int i = 0; i < out_fd_count; i++)
{
const int fd = (user_itr != ctx.userfds.end()) ? (user_itr++)->second.hpfd
: (i == control_fd_idx ? ctx.controlfds.hpfd : ctx.nplfds.hpfd);
const int fd = (user_itr != ctx.user_fds.end()) ? (user_itr++)->second.hpfd
: (i == control_fd_idx ? ctx.control_fds.hpfd : ctx.npl_fds.hpfd);
out_fds[i] = {fd, POLLIN, 0};
}
@@ -310,7 +319,7 @@ namespace sc
// Atempt to read messages from contract (regardless of contract terminated or not).
const int control_read_res = read_control_outputs(ctx, out_fds[control_fd_idx]);
const int npl_read_res = ctx.args.readonly ? 0 : read_npl_outputs(ctx, out_fds[npl_fd_idx]);
const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, out_fds, ctx.args.userbufs);
const int user_read_res = read_contract_fdmap_outputs(ctx.user_fds, out_fds, ctx.args.userbufs);
if (ctx.termination_signaled || ctx.contract_pid == 0)
{
@@ -371,7 +380,7 @@ namespace sc
if (ctx.args.control_messages.try_dequeue(control_msg))
{
if (write_iosocket_seq_packet(ctx.controlfds, control_msg) == -1)
if (write_iosocket_seq_packet(ctx.control_fds, control_msg) == -1)
{
LOG_ERROR << "Error writing HP inputs to SC";
return -1;
@@ -392,7 +401,7 @@ namespace sc
* npl inputs are feed into the contract as sequence packets. It first sends the pubkey and then
* the data.
*/
const int writefd = ctx.nplfds.hpfd;
const int writefd = ctx.npl_fds.hpfd;
if (writefd == -1)
return 0;
@@ -620,6 +629,66 @@ namespace sc
return bytes_read ? 1 : 0;
}
/**
* Create contract stdout/err log files. (Called from the contract process)
*/
int create_contract_log_files(execution_context &ctx)
{
if (!conf::cfg.contract.log_output)
return 0;
const time_t epoch = util::get_epoch_milliseconds() / 1000;
std::stringstream now_ss;
now_ss << std::put_time(std::localtime(&epoch), "%Y%m%dT%H%M%S");
const std::string now = now_ss.str();
// For consensus execution, we keep appending logs to the same out/err files.
// For read request executions, independent log files are created based on read request session names.
const std::string prefix = ctx.args.readonly ? (ctx.args.hpfs_session_name + "_" + now) : ctx.args.hpfs_session_name;
const std::string stdout_file = conf::ctx.contract_log_dir + "/" + prefix + STDOUT_LOG;
const std::string stderr_file = conf::ctx.contract_log_dir + "/" + prefix + STDERR_LOG;
const int outfd = open(stdout_file.data(), O_CREAT | O_WRONLY | O_APPEND, FILE_PERMS);
if (outfd == -1)
{
std::cerr << errno << ": Error opening " << stdout_file << "\n";
return -1;
}
const int errfd = open(stderr_file.data(), O_CREAT | O_WRONLY | O_APPEND, FILE_PERMS);
if (errfd == -1)
{
close(outfd);
std::cerr << errno << ": Error opening " << stderr_file << "\n";
return -1;
}
// Because consensus executions append logs to same files, we need to insert a demarkation line
// to mark the start of each execution.
if (!ctx.args.readonly)
{
const std::string header = "Execution lcl " + ctx.args.lcl + " on " + now + "\n";
if (write(outfd, header.data(), header.size()) == -1 ||
write(errfd, header.data(), header.size()) == -1)
{
close(outfd);
close(errfd);
std::cerr << errno << ": Error writing contract execution log header.\n";
return -1;
}
}
// Redirect stdout/err to log files.
if (dup2(outfd, STDOUT_FILENO) == -1 ||
dup2(errfd, STDERR_FILENO) == -1)
{
close(outfd);
close(errfd);
return -1;
}
return 0;
}
/**
* Common function to create a socket (Hp->SC, SC->HP).
* @param fds fd pair to populate.
@@ -694,13 +763,13 @@ namespace sc
{
if (!ctx.args.readonly)
{
close_unused_socket_fds(is_hp, ctx.nplfds);
close_unused_socket_fds(is_hp, ctx.npl_fds);
}
close_unused_socket_fds(is_hp, ctx.controlfds);
close_unused_socket_fds(is_hp, ctx.control_fds);
// Loop through user fds.
for (auto &[pubkey, fds] : ctx.userfds)
for (auto &[pubkey, fds] : ctx.user_fds)
close_unused_socket_fds(is_hp, fds);
}
@@ -741,11 +810,11 @@ namespace sc
void cleanup_fds(execution_context &ctx)
{
cleanup_fd_pair(ctx.controlfds);
cleanup_fd_pair(ctx.nplfds);
for (auto &[pubkey, fds] : ctx.userfds)
cleanup_fd_pair(ctx.control_fds);
cleanup_fd_pair(ctx.npl_fds);
for (auto &[pubkey, fds] : ctx.user_fds)
cleanup_fd_pair(fds);
ctx.userfds.clear();
ctx.user_fds.clear();
}
/**

View File

@@ -100,13 +100,13 @@ namespace sc
contract_execution_args args;
// Map of user socket fds (map key: user public key)
contract_fdmap_t userfds;
contract_fdmap_t user_fds;
// Socket fds for NPL messages.
fd_pair nplfds;
fd_pair npl_fds;
// Socket fds for control messages.
fd_pair controlfds;
fd_pair control_fds;
// Holds the contract process id (if currently executing).
pid_t contract_pid = 0;
@@ -157,6 +157,8 @@ namespace sc
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, const pollfd *pfds, contract_bufmap_t &bufmap);
int create_contract_log_files(execution_context &ctx);
int create_iosockets(fd_pair &fds, const int socket_type);
int write_iosocket_seq_packet(fd_pair &fds, std::string_view input);
@@ -170,7 +172,7 @@ namespace sc
void cleanup_fds(execution_context &ctx);
void cleanup_fd_pair(fd_pair &fds);
void stop(execution_context &ctx);
void handle_control_msg(execution_context &ctx, std::string_view msg);