Ledger query infrastructure. (#275)

* Added json ledger query param parser.

* Added initial query response creation.

* Updated client lib.

* Implemented get ledger by seq no.

* Added ledger query execution wrappers.

* Included log record info.

* Fixed empty output hash issue.

* Added bson support.

* Added db file existance check.

* Added requesy/reply tracking for queries in client lib.

* Improved multi connection usage in client lib.

* Added genesis ledger query support.

* Updated naming convention of query result fields.

* Comments.

* Used sqlite bind() for query param.

* Used binary hashes in ledger sqlite db.

* Missing const.
This commit is contained in:
Ravin Perera
2021-03-29 11:20:15 +05:30
committed by GitHub
parent 44fa3134ea
commit 948113398c
21 changed files with 829 additions and 188 deletions

View File

@@ -62,6 +62,7 @@ add_executable(hpcore
src/usr/usr.cpp
src/usr/read_req.cpp
src/ledger/sqlite.cpp
src/ledger/ledger_query.cpp
src/ledger/ledger_mount.cpp
src/ledger/ledger_sync.cpp
src/ledger/ledger_serve.cpp

View File

@@ -244,6 +244,46 @@
}
}
/**
* Executes the provided func on all connections and returns the collected results.
* @param func The function taking a HP Connection as a parameter. This will get executed on all connections.
* @param maxConnections Maximum no. of connections to use. Uses all available connections if null.
*/
const getMultiConnectionResult = async (func, maxConnections) => {
if (status == 2)
return await Promise.resolve();
if (maxConnections == null)
maxConnections = requiredConnectionCount;
const connections = nodes.filter(n => n.connection && n.connection.isConnected()).map(n => n.connection).slice(0, maxConnections);
const results = await Promise.all(connections.map(c => func(c)));
// If we are expecting only 1 connection, then return null or single result.
// Otherwise return the array of results.
if (maxConnections == 1 && results.length <= 1)
return results.length == 0 ? null : results[0];
else
return results;
}
/**
* Executes the provided func on all connections.
* @param func The function taking a HP Connection as a parameter. This will get executed on all connections.
* @param maxConnections Maximum no. of connections to use. Uses all available connections if null.
*/
const executeMultiConnectionFunc = (func, maxConnections) => {
if (status == 2)
return Promise.resolve();
if (maxConnections == null)
maxConnections = requiredConnectionCount;
const connections = nodes.filter(n => n.connection && n.connection.isConnected()).map(n => n.connection).slice(0, maxConnections);
return Promise.all(connections.map(c => func(c)));
}
this.connect = () => {
if (status > 0)
@@ -281,22 +321,21 @@
emitter.clear(event);
}
this.sendContractInput = async (input, nonce = null, maxLclOffset = null) => {
if (status == 2)
return;
return await Promise.all(
nodes.filter(n => n.connection && n.connection.isConnected())
.map(n => n.connection.sendContractInput(input, nonce, maxLclOffset)));
this.sendContractInput = (input, nonce = null, maxLclOffset = null) => {
// We always only submit contract input to a single node (even if we are connected to multiple nodes).
return getMultiConnectionResult(con => con.sendContractInput(input, nonce, maxLclOffset), 1);
}
this.sendContractReadRequest = (request) => {
if (status == 2)
return;
return executeMultiConnectionFunc(con => con.sendContractReadRequest(request));
}
nodes.filter(n => n.connection && n.connection.isConnected()).forEach(n => {
n.connection.sendContractReadRequest(request);
});
this.getStatus = () => {
return getMultiConnectionResult(con => con.getStatus());
}
this.getLedgerBySeqNo = (seqNo, includeRawInputs, includeRawOutputs) => {
return getMultiConnectionResult(con => con.getLedgerBySeqNo(seqNo, includeRawInputs, includeRawOutputs));
}
}
@@ -318,6 +357,7 @@
let closeResolver = null;
let statResponseResolvers = [];
let contractInputResolvers = {};
let ledgerQueryResolvers = {}; // Message resolvers that uses request/reply associations.
// Calcualtes the blake3 hash of all array items.
const getHash = (arr) => {
@@ -539,6 +579,27 @@
validateAndEmitUnlChange(unl);
}
}
else if (m.type == "ledger_query_result") {
const resolver = ledgerQueryResolvers[m.reply_for];
if (resolver) {
const results = m.results.map(r => {
return {
seqNo: r.seq_no,
timestamp: r.timestamp,
hash: r.hash,
prevHash: r.prev_hash,
stateHash: r.state_hash,
configHash: r.config_hash,
userHash: r.user_hash,
inputHash: r.input_hash,
outputHash: r.output_hash
}
});
if (resolver.type == "seq_no")
resolver.resolver(results.length > 0 ? results[0] : null) // Return as a single object rather than an array.
delete ledgerQueryResolvers[m.reply_for];
}
}
else {
liblog(1, "Received unrecognized contract message: type:" + m.type);
return false;
@@ -682,7 +743,7 @@
this.sendContractInput = async (input, nonce = null, maxLclOffset = null) => {
if (connectionStatus != 2)
return null;
return Promise.resolve(null);
if (!maxLclOffset)
maxLclOffset = 10;
@@ -711,10 +772,27 @@
this.sendContractReadRequest = (request) => {
if (connectionStatus != 2)
return;
return Promise.resolve();
const msg = msgHelper.createReadRequest(request);
wsSend(msgHelper.serializeObject(msg));
return Promise.resolve();
}
this.getLedgerBySeqNo = (seqNo, includeRawInputs, includeRawOutputs) => {
if (connectionStatus != 2)
return Promise.resolve(null);
const msg = msgHelper.createLedgerQuery("seq_no", { "seq_no": seqNo }, includeRawInputs, includeRawOutputs);
const p = new Promise(resolve => {
ledgerQueryResolvers[msg.id] = {
type: "seq_no",
resolver: resolve
};
})
wsSend(msgHelper.serializeObject(msg));
return p;
}
}
@@ -815,6 +893,21 @@
this.createStatusRequest = () => {
return { type: "stat" };
}
this.createLedgerQuery = (filterBy, params, includeRawInputs, includeRawOutputs) => {
const includes = [];
if (includeRawInputs) includes.push("raw_inputs");
if (includeRawOutputs) includes.push("raw_outputs");
return {
type: "ledger_query",
id: "query_" + filterBy + "_" + (new Date()).getTime().toString(),
filter_by: filterBy,
params: params,
include: includes
}
}
}
function hexToUint8Array(hexString) {

View File

@@ -657,23 +657,20 @@ namespace consensus
}
}
if (!p.output_hash.empty())
if (ctx.stage < 3)
{
if (ctx.stage < 3)
// If the elected hash is our output hash, then place our output signature in the proposal.
// We only do this if we are at stage 1 or 2.
if (p.output_hash == ctx.user_outputs_hashtree.root_hash())
p.output_sig = ctx.user_outputs_our_sig;
}
else
{
// If this is the stage 3 proposal, collect the UNL output signatures matching the elected output hash.
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
// If the elected hash is our output hash, then place our output signature in the proposal.
// We only do this if we are at stage 1 or 2.
if (p.output_hash == ctx.user_outputs_hashtree.root_hash())
p.output_sig = ctx.user_outputs_our_sig;
}
else
{
// If this is the stage 3 proposal, collect the UNL output signatures matching the elected output hash.
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
if (cp.output_hash == p.output_hash)
ctx.user_outputs_unl_sig.emplace_back(cp.pubkey, cp.output_sig);
}
if (cp.output_hash == p.output_hash)
ctx.user_outputs_unl_sig.emplace_back(cp.pubkey, cp.output_sig);
}
}

View File

@@ -5,6 +5,7 @@
#include "../util/util.hpp"
#include "../msg/fbuf/ledger_helpers.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "ledger_common.hpp"
#include "ledger_serve.hpp"
#define LEDGER_CREATE_ERROR \
@@ -18,6 +19,7 @@
namespace ledger
{
ledger_context ctx;
ledger_record genesis;
constexpr uint32_t LEDGER_FS_ID = 1;
ledger::ledger_mount ledger_fs; // Global ledger file system instance.
ledger::ledger_sync ledger_sync_worker; // Global ledger file system sync instance.
@@ -25,11 +27,28 @@ namespace ledger
std::shared_mutex primary_index_file_mutex;
constexpr int FILE_PERMS = 0644;
/**
* Perform ledger related initializations.
*/
int init()
{
// Setup the static genesis ledger fields.
{
const std::string empty_hash = std::string(util::h32_empty.to_string_view());
genesis.seq_no = 0;
genesis.timestamp = 0;
genesis.ledger_hash = empty_hash;
genesis.prev_ledger_hash = empty_hash;
genesis.data_hash = empty_hash;
genesis.state_hash = empty_hash;
genesis.config_hash = empty_hash;
genesis.user_hash = empty_hash;
genesis.input_hash = empty_hash;
genesis.output_hash = empty_hash;
}
// Full history status is always set to false since this is ledger fs. Historical checkpoints are not required in ledger fs even in full history mode.
if (ledger_fs.init(LEDGER_FS_ID, conf::ctx.ledger_hpfs_dir, conf::ctx.ledger_hpfs_mount_dir, conf::ctx.ledger_hpfs_rw_dir, false) == -1)
{
@@ -115,20 +134,19 @@ namespace ledger
// Ledger hash is the combined hash of previous ledger hash and the new data hash.
const std::string ledger_hash = crypto::get_hash(prev_ledger_hash, data_hash);
const std::string ledger_hash_hex = util::to_hex(ledger_hash);
// Construct ledger struct.
// Hashes are stored as hex string;
const sqlite::ledger ledger(
// Construct ledger struct with binary hashes.
const ledger_record ledger{
seq_no,
proposal.time,
ledger_hash_hex,
util::to_hex(prev_ledger_hash),
util::to_hex(data_hash),
util::to_hex(proposal.state_hash.to_string_view()),
util::to_hex(proposal.patch_hash.to_string_view()),
util::to_hex(user_hash),
util::to_hex(input_hash),
util::to_hex(proposal.output_hash)); // Merkle root output hash.
ledger_hash,
prev_ledger_hash,
data_hash,
std::string(proposal.state_hash.to_string_view()),
std::string(proposal.patch_hash.to_string_view()),
user_hash,
input_hash,
proposal.output_hash}; // Merkle root output hash.
if (sqlite::insert_ledger_row(db, ledger) == -1)
{
@@ -200,7 +218,7 @@ namespace ledger
}
// Creating ledger database and open a database connection.
if (sqlite::open_db(shard_path + "/" + DATEBASE, db) == -1)
if (sqlite::open_db(shard_path + "/" + DATABASE, db) == -1)
{
LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(shard_seq_no);
return -1;
@@ -270,7 +288,7 @@ namespace ledger
return -1;
}
}
else if (sqlite::open_db(shard_path + "/" + DATEBASE, db) == -1)
else if (sqlite::open_db(shard_path + "/" + DATABASE, db) == -1)
{
LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(shard_seq_no);
return -1;
@@ -558,19 +576,25 @@ namespace ledger
return 0;
}
if (sqlite::open_db(shard_path + "/" + DATEBASE, &db) == -1)
if (sqlite::open_db(shard_path + "/" + DATABASE, &db) == -1)
{
LOG_ERROR << errno << ": Error openning the shard database, shard: " << last_primary_shard_id.seq_no;
return -1;
}
const sqlite::ledger last_ledger = sqlite::get_last_ledger(db);
ledger_record last_ledger;
if (sqlite::get_last_ledger(db, last_ledger) == -1)
{
sqlite::close_db(&db);
return -1;
}
sqlite::close_db(&db);
// Update new lcl information.
p2p::sequence_hash lcl_id;
lcl_id.seq_no = last_ledger.seq_no;
lcl_id.hash = util::to_bin(last_ledger.ledger_hash_hex);
lcl_id.hash = last_ledger.ledger_hash;
ctx.set_lcl_id(lcl_id);
return 0;

View File

@@ -9,10 +9,6 @@
namespace ledger
{
constexpr const char *DATEBASE = "ledger.sqlite";
constexpr uint64_t PRIMARY_SHARD_SIZE = 262144; // 2^18 ledgers per shard.
constexpr uint64_t BLOB_SHARD_SIZE = 4096;
constexpr int FILE_PERMS = 0644;
struct ledger_context
{

View File

@@ -0,0 +1,34 @@
#ifndef _HP_LEDGER_LEDGER_COMMON_
#define _HP_LEDGER_LEDGER_COMMON_
#include "../pchheader.hpp"
namespace ledger
{
constexpr const char *DATABASE = "ledger.sqlite";
constexpr uint64_t PRIMARY_SHARD_SIZE = 262144; // 2^18 ledgers per shard.
constexpr uint64_t BLOB_SHARD_SIZE = 4096;
/**
* Struct to hold ledger fields read.
* All the hashes are stored as 32 byte binary data.
*/
struct ledger_record
{
uint64_t seq_no;
uint64_t timestamp;
std::string ledger_hash;
std::string prev_ledger_hash;
std::string data_hash;
std::string state_hash;
std::string config_hash;
std::string user_hash;
std::string input_hash;
std::string output_hash;
};
// Holds the global genesis ledger.
extern ledger_record genesis;
}
#endif

View File

@@ -0,0 +1,82 @@
#include "ledger_query.hpp"
#include "ledger_common.hpp"
#include "ledger.hpp"
#include "sqlite.hpp"
namespace ledger::query
{
constexpr const char *ERROR_EXEC_FAILURE = "exec_failure";
/**
* Executes the specified ledger query and returns the result.
* @param user_pubkey Binary pubkey of the user executing the query.
* @param q The query information.
* @returns The query result.
*/
const query_result execute(std::string_view user_pubkey, const query_request &q)
{
query_result res = ERROR_EXEC_FAILURE;
// Query the ledger with a ledger fs readonly session.
// Allocate unique readonly session name prefixed with user pubkey.
// There will always only be one query execution per user because each user session
// processes messages sequentially.
const std::string fs_sess_name = "lqr_" + util::to_hex(user_pubkey);
if (ledger::ledger_fs.start_ro_session(fs_sess_name, false) == -1)
return res;
std::vector<query_result_record> records;
if (q.index() == 0) // Filter by seq no.
{
ledger_record ledger;
const int seq_no_res = get_ledger_by_seq_no(ledger, std::get<seq_no_query>(q), fs_sess_name);
if (seq_no_res != -1)
{
if (seq_no_res == 1) // Ledger found.
records.push_back(query_result_record{std::move(ledger)});
res = std::move(records);
}
}
ledger::ledger_fs.stop_ro_session(fs_sess_name);
return res;
}
/**
* Get the ledger record by seq no.
* @param ledger Ledger structure to populate (if match found)).
* @param q The seq no query information.
* @param fs_sess_name The ledger hosting fs session name.
* @returns 1 if ledger found. 0 if ledger not found. -1 on failure.
*/
int get_ledger_by_seq_no(ledger_record &ledger, const seq_no_query &q, const std::string &fs_sess_name)
{
// If seq no. is 0, return GENESIS ledger.
if (q.seq_no == 0)
{
ledger = ledger::genesis;
return 1;
}
// Construct shard path based on provided ledger seq no.
const uint64_t shard_seq_no = (q.seq_no - 1) / ledger::PRIMARY_SHARD_SIZE;
const std::string db_vpath = std::string(ledger::PRIMARY_DIR).append("/").append(std::to_string(shard_seq_no)).append("/").append(ledger::DATABASE);
const std::string dbpath = ledger::ledger_fs.physical_path(fs_sess_name, db_vpath);
if (!util::is_file_exists(dbpath))
return 0; // Not found.
query_result_record result;
sqlite3 *db = NULL;
if (sqlite::open_db(dbpath, &db) == -1)
return -1;
const int sql_res = sqlite::get_ledger_by_seq_no(db, q.seq_no, ledger);
sqlite::close_db(&db);
return sql_res;
}
}

View File

@@ -0,0 +1,34 @@
#ifndef _HP_LEDGER_LEDGER_QUERY_
#define _HP_LEDGER_LEDGER_QUERY_
#include "../pchheader.hpp"
#include "ledger_common.hpp"
namespace ledger::query
{
/**
* Represents a ledger query request to filter by seq no.
*/
struct seq_no_query
{
uint64_t seq_no = 0;
bool raw_inputs = false;
bool raw_outputs = false;
};
struct query_result_record
{
ledger::ledger_record ledger;
// TODO:
// RawInputs field.
// RawOutputs field.
};
typedef std::variant<seq_no_query> query_request;
typedef std::variant<const char *, std::vector<query_result_record>> query_result;
const query_result execute(std::string_view user_pubkey, const query_request &q);
int get_ledger_by_seq_no(ledger_record &ledger, const seq_no_query &q, const std::string &fs_sess_name);
}
#endif

View File

@@ -1,12 +1,13 @@
#include "sqlite.hpp"
#include "../util/h32.hpp"
#include "ledger_common.hpp"
namespace ledger::sqlite
{
constexpr const char *LEDGER_TABLE = "ledger";
constexpr const char *LEDGER_COLUMNS = "seq_no, time, ledger_hash, prev_ledger_hash, data_hash, state_hash, patch_hash, user_hash, input_hash, output_hash";
constexpr const char *HP_VERSION_TABLE = "hp";
constexpr const char *HP_VERSION_COLUMN = "hp_version";
constexpr const char *COLUMN_DATA_TYPES[]{"INT", "TEXT"};
constexpr const char *COLUMN_DATA_TYPES[]{"INT", "TEXT", "BLOB"};
constexpr const char *CREATE_TABLE = "CREATE TABLE IF NOT EXISTS ";
constexpr const char *INSERT_INTO = "INSERT INTO ";
constexpr const char *PRIMARY_KEY = "PRIMARY KEY";
@@ -15,8 +16,16 @@ namespace ledger::sqlite
constexpr const char *SELECT_ALL = "SELECT * FROM ";
constexpr const char *SQLITE_MASTER = "sqlite_master";
constexpr const char *WHERE = " WHERE ";
constexpr const char *ORDER_BY = " ORDER BY ";
constexpr const char *AND = " AND ";
constexpr const char *SELECT_LAST_LEDGER = "SELECT * FROM ledger ORDER BY seq_no DESC LIMIT 1";
constexpr const char *SELECT_LEDGER_BY_SEQ_NO = "SELECT * FROM ledger WHERE seq_no=? LIMIT 1";
constexpr const char *INSERT_INTO_LEDGER = "INSERT INTO ledger("
"seq_no, time, ledger_hash, prev_ledger_hash, data_hash,"
"state_hash, patch_hash, user_hash, input_hash, output_hash"
") VALUES(?,?,?,?,?,?,?,?,?,?)";
#define BIND_H32_BLOB(idx, field) (sqlite3_bind_blob(stmt, idx, field.data(), sizeof(util::h32), SQLITE_STATIC) == SQLITE_OK)
#define GET_H32_BLOB(idx) std::string((char *)sqlite3_column_blob(stmt, idx), sizeof(util::h32))
/**
* Opens a connection to a given databse and give the db pointer.
@@ -26,10 +35,11 @@ namespace ledger::sqlite
*/
int open_db(std::string_view db_name, sqlite3 **db)
{
if (sqlite3_open(db_name.data(), db) != SQLITE_OK)
int ret;
if ((ret = sqlite3_open(db_name.data(), db)) != SQLITE_OK)
{
*db = NULL;
LOG_ERROR << "Can't open database: " << sqlite3_errmsg(*db);
LOG_ERROR << "Can't open database: " << ret << ", " << sqlite3_errmsg(*db);
return -1;
}
return 0;
@@ -97,14 +107,14 @@ namespace ledger::sqlite
}
/**
* Insert values to a table.
* Inserts mulitple rows to a table.
* @param db Pointer to the db.
* @param table_name Table name to be populated.
* @param column_names_string Comma seperated string of colums (eg: "col_1,col_2,...").
* @param value_strings Vector of comma seperated values (wrap in single quotes for TEXT type) (eg: ["r1val1,'r1val2',...", "r2val1,'r2val2',..."]).
* @returns returns 0 on success, or -1 on error.
*/
int insert_values(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, const std::vector<std::string> &value_strings)
int insert_rows(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, const std::vector<std::string> &value_strings)
{
std::string sql;
@@ -130,14 +140,14 @@ namespace ledger::sqlite
}
/**
* Insert a value row to a table.
* Inserts a row to a table.
* @param db Pointer to the db.
* @param table_name Table name to be populated.
* @param column_names_string Comma seperated string of colums (eg: "col_1,col_2,...").
* @param value_string comma seperated values as per column order (wrap in single quotes for TEXT type) (eg: "r1val1,'r1val2',...").
* @returns returns 0 on success, or -1 on error.
*/
int insert_value(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, std::string_view value_string)
int insert_row(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, std::string_view value_string)
{
std::string sql;
// Reserving the space for the query before construction.
@@ -217,17 +227,17 @@ namespace ledger::sqlite
*/
int create_ledger_table(sqlite3 *db)
{
std::vector<table_column_info> column_info{
const std::vector<table_column_info> column_info{
table_column_info("seq_no", COLUMN_DATA_TYPE::INT, true),
table_column_info("time", COLUMN_DATA_TYPE::INT),
table_column_info("ledger_hash", COLUMN_DATA_TYPE::TEXT),
table_column_info("prev_ledger_hash", COLUMN_DATA_TYPE::TEXT),
table_column_info("data_hash", COLUMN_DATA_TYPE::TEXT),
table_column_info("state_hash", COLUMN_DATA_TYPE::TEXT),
table_column_info("patch_hash", COLUMN_DATA_TYPE::TEXT),
table_column_info("user_hash", COLUMN_DATA_TYPE::TEXT),
table_column_info("input_hash", COLUMN_DATA_TYPE::TEXT),
table_column_info("output_hash", COLUMN_DATA_TYPE::TEXT)};
table_column_info("ledger_hash", COLUMN_DATA_TYPE::BLOB),
table_column_info("prev_ledger_hash", COLUMN_DATA_TYPE::BLOB),
table_column_info("data_hash", COLUMN_DATA_TYPE::BLOB),
table_column_info("state_hash", COLUMN_DATA_TYPE::BLOB),
table_column_info("patch_hash", COLUMN_DATA_TYPE::BLOB),
table_column_info("user_hash", COLUMN_DATA_TYPE::BLOB),
table_column_info("input_hash", COLUMN_DATA_TYPE::BLOB),
table_column_info("output_hash", COLUMN_DATA_TYPE::BLOB)};
if (create_table(db, LEDGER_TABLE, column_info) == -1)
return -1;
@@ -244,7 +254,6 @@ namespace ledger::sqlite
*/
int create_hp_version_table_and_update(sqlite3 *db, std::string_view version)
{
const std::vector<table_column_info> column_info{
table_column_info(HP_VERSION_COLUMN, COLUMN_DATA_TYPE::TEXT)};
@@ -252,7 +261,7 @@ namespace ledger::sqlite
return -1;
const std::string value_string = "\"" + std::string(version) + "\"";
if (insert_value(db, HP_VERSION_TABLE, HP_VERSION_COLUMN, value_string) == -1)
if (insert_row(db, HP_VERSION_TABLE, HP_VERSION_COLUMN, value_string) == -1)
return -1;
return 0;
@@ -264,23 +273,28 @@ namespace ledger::sqlite
* @param ledger Ledger struct to be inserted.
* @returns returns 0 on success, or -1 on error.
*/
int insert_ledger_row(sqlite3 *db, const ledger &ledger)
int insert_ledger_row(sqlite3 *db, const ledger::ledger_record &ledger)
{
std::string value_string = std::to_string(ledger.seq_no) + "," +
std::to_string(ledger.time) + "," +
"'" + ledger.ledger_hash_hex + "'," +
"'" + ledger.prev_ledger_hash_hex + "'," +
"'" + ledger.data_hash_hex + "'," +
"'" + ledger.state_hash_hex + "'," +
"'" + ledger.patch_hash_hex + "'," +
"'" + ledger.user_hash_hex + "'," +
"'" + ledger.input_hash_hex + "'," +
"'" + ledger.output_hash_hex + "'";
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(db, INSERT_INTO_LEDGER, -1, &stmt, 0) == SQLITE_OK && stmt != NULL &&
sqlite3_bind_int64(stmt, 1, ledger.seq_no) == SQLITE_OK &&
sqlite3_bind_int64(stmt, 2, ledger.timestamp) == SQLITE_OK &&
BIND_H32_BLOB(3, ledger.ledger_hash) &&
BIND_H32_BLOB(4, ledger.prev_ledger_hash) &&
BIND_H32_BLOB(5, ledger.data_hash) &&
BIND_H32_BLOB(6, ledger.state_hash) &&
BIND_H32_BLOB(7, ledger.config_hash) &&
BIND_H32_BLOB(8, ledger.user_hash) &&
BIND_H32_BLOB(9, ledger.input_hash) &&
BIND_H32_BLOB(10, ledger.output_hash) &&
sqlite3_step(stmt) == SQLITE_DONE)
{
sqlite3_finalize(stmt);
return 0;
}
if (insert_value(db, LEDGER_TABLE, LEDGER_COLUMNS, value_string) == -1)
return -1;
return 0;
sqlite3_finalize(stmt);
return -1;
}
/**
@@ -296,36 +310,71 @@ namespace ledger::sqlite
/**
* Get the last ledger record of the given db.
* @param db Pointer to the db.
* @returns returns the last ledger as a struct.
* @param ledger Ledger structure to populate.
* @returns 0 on success. -1 on failure.
*/
ledger get_last_ledger(sqlite3 *db)
int get_last_ledger(sqlite3 *db, ledger::ledger_record &ledger)
{
std::string sql;
sql.append(SELECT_ALL);
sql.append(LEDGER_TABLE);
sql.append(ORDER_BY);
sql.append("seq_no DESC LIMIT 1");
sqlite3_stmt *stmt;
sqlite::ledger ledger;
if (sqlite3_prepare_v2(db, sql.data(), -1, &stmt, 0) == SQLITE_OK &&
stmt != NULL && sqlite3_step(stmt) == SQLITE_ROW)
if (sqlite3_prepare_v2(db, SELECT_LAST_LEDGER, -1, &stmt, 0) == SQLITE_OK && stmt != NULL &&
sqlite3_step(stmt) == SQLITE_ROW)
{
ledger.seq_no = sqlite3_column_int64(stmt, 0);
ledger.time = sqlite3_column_int64(stmt, 1);
ledger.ledger_hash_hex = std::string((char *)sqlite3_column_text(stmt, 2));
ledger.prev_ledger_hash_hex = std::string((char *)sqlite3_column_text(stmt, 3));
ledger.data_hash_hex = std::string((char *)sqlite3_column_text(stmt, 4));
ledger.state_hash_hex = std::string((char *)sqlite3_column_text(stmt, 5));
ledger.patch_hash_hex = std::string((char *)sqlite3_column_text(stmt, 6));
ledger.user_hash_hex = std::string((char *)sqlite3_column_text(stmt, 7));
ledger.input_hash_hex = std::string((char *)sqlite3_column_text(stmt, 8));
ledger.output_hash_hex = std::string((char *)sqlite3_column_text(stmt, 9));
populate_ledger_from_sql_record(ledger, stmt);
sqlite3_finalize(stmt);
return 0;
}
// Finalize and distroys the statement.
LOG_ERROR << "Error when querying last ledger from db.";
sqlite3_finalize(stmt);
return ledger;
return -1;
}
/**
* Get the ledger record by seq no.
* @param db Pointer to the db.
* @param seq_no Ledger sequence no. to search for.
* @param ledger Ledger structure to populate.
* @returns 1 if ledger found. 0 if ledger not found. -1 on failure.
*/
int get_ledger_by_seq_no(sqlite3 *db, const uint64_t seq_no, ledger::ledger_record &ledger)
{
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(db, SELECT_LEDGER_BY_SEQ_NO, -1, &stmt, 0) == SQLITE_OK && stmt != NULL &&
sqlite3_bind_int64(stmt, 1, seq_no) == SQLITE_OK)
{
const int result = sqlite3_step(stmt);
if (result == SQLITE_ROW)
{
populate_ledger_from_sql_record(ledger, stmt);
sqlite3_finalize(stmt);
return 1; // Ledger found.
}
else if (result == SQLITE_DONE)
{
sqlite3_finalize(stmt);
return 0; // Not found.
}
}
LOG_ERROR << "Error when querying ledger by seq no. from db.";
sqlite3_finalize(stmt);
return -1;
}
void populate_ledger_from_sql_record(ledger::ledger_record &ledger, sqlite3_stmt *stmt)
{
ledger.seq_no = sqlite3_column_int64(stmt, 0);
ledger.timestamp = sqlite3_column_int64(stmt, 1);
ledger.ledger_hash = GET_H32_BLOB(2);
ledger.prev_ledger_hash = GET_H32_BLOB(3);
ledger.data_hash = GET_H32_BLOB(4);
ledger.state_hash = GET_H32_BLOB(5);
ledger.config_hash = GET_H32_BLOB(6);
ledger.user_hash = GET_H32_BLOB(7);
ledger.input_hash = GET_H32_BLOB(8);
ledger.output_hash = GET_H32_BLOB(9);
}
} // namespace ledger::sqlite

View File

@@ -2,6 +2,7 @@
#define _LEDGER_SQLITE_
#include "../pchheader.hpp"
#include "ledger_common.hpp"
namespace ledger::sqlite
{
@@ -12,7 +13,8 @@ namespace ledger::sqlite
enum COLUMN_DATA_TYPE
{
INT,
TEXT
TEXT,
BLOB
};
/**
@@ -37,50 +39,6 @@ namespace ledger::sqlite
}
};
/**
* Struct for ledger feilds.
* All the hashes are stored as hex strings.
*/
struct ledger
{
uint64_t seq_no;
uint64_t time;
std::string ledger_hash_hex;
std::string prev_ledger_hash_hex;
std::string data_hash_hex;
std::string state_hash_hex;
std::string patch_hash_hex;
std::string user_hash_hex;
std::string input_hash_hex;
std::string output_hash_hex;
ledger(){};
ledger(
const uint64_t seq_no,
const uint64_t time,
std::string_view ledger_hash_hex,
std::string_view prev_ledger_hash_hex,
std::string_view data_hash_hex,
std::string_view state_hash_hex,
std::string_view patch_hash_hex,
std::string_view user_hash_hex,
std::string_view input_hash_hex,
std::string_view output_hash_hex)
: seq_no(seq_no),
time(time),
ledger_hash_hex(ledger_hash_hex),
prev_ledger_hash_hex(prev_ledger_hash_hex),
data_hash_hex(data_hash_hex),
state_hash_hex(state_hash_hex),
patch_hash_hex(patch_hash_hex),
user_hash_hex(user_hash_hex),
input_hash_hex(input_hash_hex),
output_hash_hex(output_hash_hex)
{
}
};
// Generic methods.
int open_db(std::string_view db_name, sqlite3 **db);
@@ -88,9 +46,9 @@ namespace ledger::sqlite
int create_table(sqlite3 *db, std::string_view table_name, const std::vector<table_column_info> &column_info);
int insert_values(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, const std::vector<std::string> &value_strings);
int insert_rows(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, const std::vector<std::string> &value_strings);
int insert_value(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, std::string_view value_string);
int insert_row(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, std::string_view value_string);
bool is_table_exists(sqlite3 *db, std::string_view table_name);
@@ -101,11 +59,15 @@ namespace ledger::sqlite
int create_hp_version_table_and_update(sqlite3 *db, std::string_view version);
int insert_ledger_row(sqlite3 *db, const ledger &ledger);
int insert_ledger_row(sqlite3 *db, const ledger::ledger_record &ledger);
bool is_ledger_table_exist(sqlite3 *db);
ledger get_last_ledger(sqlite3 *db);
int get_last_ledger(sqlite3 *db, ledger::ledger_record &ledger);
int get_ledger_by_seq_no(sqlite3 *db, const uint64_t seq_no, ledger::ledger_record &ledger);
void populate_ledger_from_sql_record(ledger::ledger_record &ledger, sqlite3_stmt *stmt);
} // namespace ledger::sqlite

View File

@@ -3,6 +3,7 @@
#include "../../pchheader.hpp"
#include "../../util/util.hpp"
#include "../../hplog.hpp"
#include "../../ledger/ledger_query.hpp"
#include "../usrmsg_common.hpp"
#include "usrmsg_bson.hpp"
@@ -10,7 +11,7 @@ namespace msg::usrmsg::bson
{
/**
* Constructs a status response message.
* @param msg String reference to copy the generated bson message into.
* @param msg Buffer to construct the generated bson message into.
* Message format:
* {
* "type": "stat_response",
@@ -19,7 +20,7 @@ namespace msg::usrmsg::bson
* }
*/
constexpr const size_t MAX_KNOWN_PEERS_INFO = 10;
void create_status_response(std::vector<uint8_t> &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
@@ -37,10 +38,10 @@ namespace msg::usrmsg::bson
encoder.key(msg::usrmsg::FLD_CONTARCT_EXECUTION_ENABLED);
encoder.bool_value(conf::cfg.contract.execute);
encoder.key(msg::usrmsg::FLD_READ_REQUESTS_ENABLED);
encoder.bool_value(conf::cfg.user.concurrent_read_reqeuests != 0);
encoder.bool_value(conf::cfg.user.concurrent_read_reqeuests != 0);
encoder.key(msg::usrmsg::FLD_IS_FULL_HISTORY_NODE);
encoder.bool_value(conf::cfg.node.history == conf::HISTORY::FULL);
encoder.key(msg::usrmsg::FLD_CURRENT_UNL);
encoder.begin_array();
for (std::string_view unl : conf::cfg.contract.unl)
@@ -67,7 +68,7 @@ namespace msg::usrmsg::bson
/**
* Constructs a contract input status message.
* @param msg String reference to copy the generated bson message into.
* @param msg Buffer to construct the generated bson message into.
* Message format:
* {
* "type": "contract_input_status",
@@ -97,7 +98,7 @@ namespace msg::usrmsg::bson
/**
* Constructs a contract read response message.
* @param msg String reference to copy the generated bson message into.
* @param msg Buffer to construct the generated bson message into.
* Message format:
* {
* "type": "contract_read_response",
@@ -119,7 +120,7 @@ namespace msg::usrmsg::bson
/**
* Constructs a contract output container message.
* @param msg String reference to copy the generated bson message into.
* @param msg Buffer to construct the generated bson message into.
* Message format:
* {
* "type": "contract_output",
@@ -170,7 +171,7 @@ namespace msg::usrmsg::bson
/**
* Constructs unl list container message.
* @param msg String reference to copy the generated bson message string into.
* @param msg Buffer to construct the generated bson message string into.
* Message format:
* {
* "type": "unl_change",
@@ -188,7 +189,43 @@ namespace msg::usrmsg::bson
encoder.begin_array();
for (std::string_view unl : unl_list)
encoder.byte_string_value(unl);
encoder.end_array();
encoder.end_array();
encoder.end_object();
encoder.flush();
}
/**
* Constructs a ledger query response.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "ledger_query_result",
* "reply_for": "<original query id>",
* "error": "error_code" or NULL,
* "results": [{}...]
* }
* @param reply_for Original query id to associate the response with.
* @param result Query results to be sent in the response.
*/
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_LEDGER_QUERY_RESULT);
encoder.key(msg::usrmsg::FLD_REPLY_FOR);
encoder.string_value(reply_for);
encoder.key(msg::usrmsg::FLD_ERROR);
if (result.index() == 1)
encoder.null_value();
else
encoder.string_value(std::get<const char *>(result));
encoder.key(msg::usrmsg::FLD_RESULTS);
encoder.begin_array();
populate_query_results(encoder, std::get<std::vector<ledger::query::query_result_record>>(result));
encoder.end_array();
encoder.end_object();
encoder.flush();
}
@@ -338,6 +375,79 @@ namespace msg::usrmsg::bson
return 0;
}
/**
* Extract query information from a ledger query request.
* @param extracted_query Extracted query criteria.
* @param extracted_id The query id.
* @param d The bson document holding the query.
* Accepted query message format:
* {
* "type": "ledger_query",
* "id": "<query id>",
* "filter_by": "<filter by>",
* "params": {...}, // Params supported by the specified filter.
* "include": ["raw_inputs", "raw_outputs"]
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::ojson &d)
{
if (!d.contains(msg::usrmsg::FLD_ID) || !d.contains(msg::usrmsg::FLD_FILTER_BY) ||
!d.contains(msg::usrmsg::FLD_PARAMS) || !d.contains(msg::usrmsg::FLD_INCLUDE))
{
LOG_DEBUG << "Ledger query required fields missing.";
return -1;
}
if (!d[msg::usrmsg::FLD_ID].is<std::string>() || !d[msg::usrmsg::FLD_FILTER_BY].is<std::string>() ||
!d[msg::usrmsg::FLD_PARAMS].is_object() || !d[msg::usrmsg::FLD_INCLUDE].is_array())
{
LOG_DEBUG << "Ledger query invalid field values.";
return -1;
}
const std::string id = d[msg::usrmsg::FLD_ID].as<std::string>();
if (id.empty())
{
LOG_DEBUG << "Ledger query invalid id.";
return -1;
}
extracted_id = std::move(id);
// Detect includes.
bool raw_inputs = false;
bool raw_outputs = false;
for (auto &val : d[msg::usrmsg::FLD_INCLUDE].array_range())
{
if (val == msg::usrmsg::QUERY_INCLUDE_RAW_INPUTS)
raw_inputs = true;
else if (val == msg::usrmsg::QUERY_INCLUDE_RAW_OUTPUTS)
raw_outputs = false;
}
auto &params_field = d[msg::usrmsg::FLD_PARAMS];
if (d[msg::usrmsg::FLD_FILTER_BY] == msg::usrmsg::QUERY_FILTER_BY_SEQ_NO)
{
if (!params_field.contains(msg::usrmsg::FLD_SEQ_NO) || !params_field[msg::usrmsg::FLD_SEQ_NO].is<uint64_t>())
{
LOG_DEBUG << "Ledger query seq no filter invalid params.";
return -1;
}
extracted_query = ledger::query::seq_no_query{
params_field[msg::usrmsg::FLD_SEQ_NO].as<uint64_t>(),
raw_inputs,
raw_outputs};
return 0;
}
else
{
LOG_DEBUG << "Ledger query invalid filter-by criteria.";
return -1;
}
}
void populate_output_hash_array(jsoncons::bson::bson_bytes_encoder &encoder, const util::merkle_hash_node &node)
{
if (node.children.empty())
@@ -354,4 +464,31 @@ namespace msg::usrmsg::bson
}
}
void populate_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector<ledger::query::query_result_record> &results)
{
for (const ledger::query::query_result_record &r : results)
{
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_SEQ_NO);
encoder.uint64_value(r.ledger.seq_no);
encoder.key(msg::usrmsg::FLD_TIMESTAMP);
encoder.uint64_value(r.ledger.timestamp);
encoder.key(msg::usrmsg::FLD_HASH);
encoder.byte_string_value(r.ledger.ledger_hash);
encoder.key(msg::usrmsg::FLD_PREV_HASH);
encoder.byte_string_value(r.ledger.prev_ledger_hash);
encoder.key(msg::usrmsg::FLD_STATE_HASH);
encoder.byte_string_value(r.ledger.state_hash);
encoder.key(msg::usrmsg::FLD_CONFIG_HASH);
encoder.byte_string_value(r.ledger.config_hash);
encoder.key(msg::usrmsg::FLD_USER_HASH);
encoder.byte_string_value(r.ledger.user_hash);
encoder.key(msg::usrmsg::FLD_INPUT_HASH);
encoder.byte_string_value(r.ledger.input_hash);
encoder.key(msg::usrmsg::FLD_OUTPUT_HASH);
encoder.byte_string_value(r.ledger.output_hash);
encoder.end_object();
}
}
} // namespace msg::usrmsg::bson

View File

@@ -3,6 +3,7 @@
#include "../../pchheader.hpp"
#include "../../util/merkle_hash_tree.hpp"
#include "../../ledger/ledger_query.hpp"
namespace msg::usrmsg::bson
{
@@ -20,6 +21,9 @@ namespace msg::usrmsg::bson
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list);
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result);
int verify_user_handshake_response(std::string &extracted_pubkeyhex, std::string &extracted_protocol,
std::string_view response, std::string_view original_challenge);
@@ -35,8 +39,12 @@ namespace msg::usrmsg::bson
int extract_input_container(std::string &input, std::string &nonce,
uint64_t &max_lcl_seq_no, std::string_view contentbson);
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::ojson &d);
void populate_output_hash_array(jsoncons::bson::bson_bytes_encoder &encoder, const util::merkle_hash_node &node);
void populate_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector<ledger::query::query_result_record> &results);
} // namespace msg::usrmsg::bson
#endif

View File

@@ -5,6 +5,7 @@
#include "../../crypto.hpp"
#include "../../hplog.hpp"
#include "../../conf.hpp"
#include "../../ledger/ledger_query.hpp"
#include "../usrmsg_common.hpp"
#include "usrmsg_json.hpp"
@@ -33,7 +34,7 @@ namespace msg::usrmsg::json
* initial user challenge handshake. This gets called when a user establishes
* a web socket connection to HP.
*
* @param msg String reference to copy the generated json message string into.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "hp_version": "<hp protocol version>",
@@ -42,7 +43,7 @@ namespace msg::usrmsg::json
* "contract_version": "<contract version string>",
* "challenge": "<challenge string>"
* }
* @param challenge_bytes String reference to copy the generated challenge bytes into.
* @param challenge_bytes Buffer to construct the generated challenge bytes into.
*/
void create_user_challenge(std::vector<uint8_t> &msg, std::string &challenge)
{
@@ -83,7 +84,7 @@ namespace msg::usrmsg::json
* Constructs server challenge response message json. This gets sent when we receive
* a challenge from the user.
*
* @param msg String reference to copy the generated json message string into.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "server_challenge_response",
@@ -130,7 +131,7 @@ namespace msg::usrmsg::json
/**
* Constructs a status response message.
* @param msg String reference to copy the generated json message string into.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "stat_response",
@@ -162,15 +163,15 @@ namespace msg::usrmsg::json
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_ROUND_TIME;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(conf::cfg.contract.roundtime);
msg += std::to_string(conf::cfg.contract.roundtime);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_CONTARCT_EXECUTION_ENABLED;
msg += SEP_COLON_NOQUOTE;
msg += conf::cfg.contract.execute ? "true" : "false";
msg += conf::cfg.contract.execute ? "true" : "false";
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_READ_REQUESTS_ENABLED;
msg += SEP_COLON_NOQUOTE;
msg += conf::cfg.user.concurrent_read_reqeuests != 0 ? "true" : "false";
msg += conf::cfg.user.concurrent_read_reqeuests != 0 ? "true" : "false";
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_IS_FULL_HISTORY_NODE;
msg += SEP_COLON_NOQUOTE;
@@ -216,7 +217,7 @@ namespace msg::usrmsg::json
/**
* Constructs a contract input status message.
* @param msg String reference to copy the generated json message string into.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "contract_input_status",
@@ -252,7 +253,7 @@ namespace msg::usrmsg::json
/**
* Constructs a contract read response message.
* @param msg String reference to copy the generated json message string into.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "contract_read_response",
@@ -293,7 +294,7 @@ namespace msg::usrmsg::json
/**
* Constructs a contract output container message.
* @param msg String reference to copy the generated json message string into.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "contract_output",
@@ -377,7 +378,7 @@ namespace msg::usrmsg::json
/**
* Constructs unl list container message.
* @param msg String reference to copy the generated json message string into.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "unl_change",
@@ -410,6 +411,50 @@ namespace msg::usrmsg::json
msg += "]}";
}
/**
* Constructs a ledger query response.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "ledger_query_result",
* "reply_for": "<original query id>",
* "error": "error_code" or NULL,
* "results": [{}...]
* }
* @param reply_for Original query id to associate the response with.
* @param result Query results to be sent in the response.
*/
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result)
{
msg.reserve(1024);
msg += "{\"";
msg += msg::usrmsg::FLD_TYPE;
msg += SEP_COLON;
msg += msg::usrmsg::MSGTYPE_LEDGER_QUERY_RESULT;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_REPLY_FOR;
msg += SEP_COLON;
msg += reply_for;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_ERROR;
if (result.index() == 1)
{
msg += "\":null,\"";
}
else
{
msg += SEP_COLON;
msg += std::get<const char *>(result);
msg += SEP_COMMA;
}
msg += msg::usrmsg::FLD_RESULTS;
msg += "\":[";
if (result.index() == 1)
populate_query_results(msg, std::get<std::vector<ledger::query::query_result_record>>(result));
msg += "]}";
}
/**
* Verifies the user handshake response with the original challenge issued to the user
* and the user public key contained in the response.
@@ -671,6 +716,79 @@ namespace msg::usrmsg::json
return 0;
}
/**
* Extract query information from a ledger query request.
* @param extracted_query Extracted query criteria.
* @param extracted_id The query id.
* @param d The json document holding the query.
* Accepted query message format:
* {
* "type": "ledger_query",
* "id": "<query id>",
* "filter_by": "<filter by>",
* "params": {...}, // Params supported by the specified filter.
* "include": ["raw_inputs", "raw_outputs"]
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::json &d)
{
if (!d.contains(msg::usrmsg::FLD_ID) || !d.contains(msg::usrmsg::FLD_FILTER_BY) ||
!d.contains(msg::usrmsg::FLD_PARAMS) || !d.contains(msg::usrmsg::FLD_INCLUDE))
{
LOG_DEBUG << "Ledger query required fields missing.";
return -1;
}
if (!d[msg::usrmsg::FLD_ID].is<std::string>() || !d[msg::usrmsg::FLD_FILTER_BY].is<std::string>() ||
!d[msg::usrmsg::FLD_PARAMS].is_object() || !d[msg::usrmsg::FLD_INCLUDE].is_array())
{
LOG_DEBUG << "Ledger query invalid field values.";
return -1;
}
const std::string id = d[msg::usrmsg::FLD_ID].as<std::string>();
if (id.empty())
{
LOG_DEBUG << "Ledger query invalid id.";
return -1;
}
extracted_id = std::move(id);
// Detect includes.
bool raw_inputs = false;
bool raw_outputs = false;
for (auto &val : d[msg::usrmsg::FLD_INCLUDE].array_range())
{
if (val == msg::usrmsg::QUERY_INCLUDE_RAW_INPUTS)
raw_inputs = true;
else if (val == msg::usrmsg::QUERY_INCLUDE_RAW_OUTPUTS)
raw_outputs = false;
}
auto &params_field = d[msg::usrmsg::FLD_PARAMS];
if (d[msg::usrmsg::FLD_FILTER_BY] == msg::usrmsg::QUERY_FILTER_BY_SEQ_NO)
{
if (!params_field.contains(msg::usrmsg::FLD_SEQ_NO) || !params_field[msg::usrmsg::FLD_SEQ_NO].is<uint64_t>())
{
LOG_DEBUG << "Ledger query seq no filter invalid params.";
return -1;
}
extracted_query = ledger::query::seq_no_query{
params_field[msg::usrmsg::FLD_SEQ_NO].as<uint64_t>(),
raw_inputs,
raw_outputs};
return 0;
}
else
{
LOG_DEBUG << "Ledger query invalid filter-by criteria.";
return -1;
}
}
bool is_json_string(std::string_view content)
{
if (content.empty())
@@ -720,4 +838,48 @@ namespace msg::usrmsg::json
}
}
void populate_query_results(std::vector<uint8_t> &msg, const std::vector<ledger::query::query_result_record> &results)
{
for (const ledger::query::query_result_record &r : results)
{
msg += "{\"";
msg += msg::usrmsg::FLD_SEQ_NO;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(r.ledger.seq_no);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_TIMESTAMP;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(r.ledger.timestamp);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_HASH;
msg += SEP_COLON;
msg += util::to_hex(r.ledger.ledger_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_PREV_HASH;
msg += SEP_COLON;
msg += util::to_hex(r.ledger.prev_ledger_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_STATE_HASH;
msg += SEP_COLON;
msg += util::to_hex(r.ledger.state_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_CONFIG_HASH;
msg += SEP_COLON;
msg += util::to_hex(r.ledger.config_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_USER_HASH;
msg += SEP_COLON;
msg += util::to_hex(r.ledger.user_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_INPUT_HASH;
msg += SEP_COLON;
msg += util::to_hex(r.ledger.input_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_OUTPUT_HASH;
msg += SEP_COLON;
msg += util::to_hex(r.ledger.output_hash);
msg += "\"}";
}
}
} // namespace msg::usrmsg::json

View File

@@ -3,6 +3,7 @@
#include "../../pchheader.hpp"
#include "../../util/merkle_hash_tree.hpp"
#include "../../ledger/ledger_query.hpp"
namespace msg::usrmsg::json
{
@@ -24,6 +25,9 @@ namespace msg::usrmsg::json
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list);
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result);
int verify_user_challenge(std::string &extracted_pubkeyhex, std::string &extracted_protocol, std::string &extracted_server_challenge,
std::string_view response, std::string_view original_challenge);
@@ -39,10 +43,14 @@ namespace msg::usrmsg::json
int extract_input_container(std::string &input, std::string &nonce,
uint64_t &max_lcl_seq_no, std::string_view contentjson);
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::json &d);
bool is_json_string(std::string_view content);
void populate_output_hash_array(std::vector<uint8_t> &msg, const util::merkle_hash_node &node);
void populate_query_results(std::vector<uint8_t> &msg, const std::vector<ledger::query::query_result_record> &results);
} // namespace msg::usrmsg::json
#endif

View File

@@ -38,6 +38,22 @@ namespace msg::usrmsg
constexpr const char *FLD_IS_FULL_HISTORY_NODE = "is_full_history_node";
constexpr const char *FLD_CURRENT_UNL = "current_unl";
constexpr const char *FLD_PEERS = "peers";
constexpr const char *FLD_ID = "id";
constexpr const char *FLD_REPLY_FOR = "reply_for";
constexpr const char *FLD_FILTER_BY = "filter_by";
constexpr const char *FLD_INCLUDE = "include";
constexpr const char *FLD_PARAMS = "params";
constexpr const char *FLD_SEQ_NO = "seq_no";
constexpr const char *FLD_ERROR = "error";
constexpr const char *FLD_RESULTS = "results";
constexpr const char *FLD_TIMESTAMP = "timestamp";
constexpr const char *FLD_HASH = "hash";
constexpr const char *FLD_PREV_HASH = "prev_hash";
constexpr const char *FLD_STATE_HASH = "state_hash";
constexpr const char *FLD_CONFIG_HASH = "config_hash";
constexpr const char *FLD_USER_HASH = "user_hash";
constexpr const char *FLD_INPUT_HASH = "input_hash";
constexpr const char *FLD_OUTPUT_HASH = "output_hash";
// Message types
constexpr const char *MSGTYPE_USER_CHALLENGE = "user_challenge";
@@ -51,6 +67,8 @@ namespace msg::usrmsg
constexpr const char *MSGTYPE_STAT = "stat";
constexpr const char *MSGTYPE_STAT_RESPONSE = "stat_response";
constexpr const char *MSGTYPE_UNL_CHANGE = "unl_change";
constexpr const char *MSGTYPE_LEDGER_QUERY = "ledger_query";
constexpr const char *MSGTYPE_LEDGER_QUERY_RESULT = "ledger_query_result";
constexpr const char *MSGTYPE_UNKNOWN = "unknown";
// Values
@@ -66,6 +84,9 @@ namespace msg::usrmsg
constexpr const char *REASON_ALREADY_SUBMITTED = "already_submitted";
constexpr const char *REASON_NONCE_OVERFLOW = "nonce_overflow";
constexpr const char *REASON_ROUND_INPUTS_OVERFLOW = "round_inputs_overflow";
constexpr const char *QUERY_FILTER_BY_SEQ_NO = "seq_no";
constexpr const char *QUERY_INCLUDE_RAW_INPUTS = "raw_inputs";
constexpr const char *QUERY_INCLUDE_RAW_OUTPUTS = "raw_outputs";
} // namespace msg::usrmsg

View File

@@ -56,6 +56,15 @@ namespace msg::usrmsg
busrmsg::create_unl_list_container(msg, unl_list);
}
void usrmsg_parser::create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_ledger_query_response(msg, reply_for, result);
else
busrmsg::create_ledger_query_response(msg, reply_for, result);
}
int usrmsg_parser::parse(std::string_view message)
{
if (protocol == util::PROTOCOL::JSON)
@@ -97,4 +106,12 @@ namespace msg::usrmsg
return busrmsg::extract_input_container(input, nonce, max_lcl_seq_no, encoded_content);
}
int usrmsg_parser::extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const
{
if (protocol == util::PROTOCOL::JSON)
return jusrmsg::extract_ledger_query(extracted_query, extracted_id, jdoc);
else
return busrmsg::extract_ledger_query(extracted_query, extracted_id, bdoc);
}
} // namespace msg::usrmsg

View File

@@ -1,9 +1,10 @@
#ifndef _HP_MSG_USRMSG_PARSER_
#define _HP_MSG_USRMSG_PARSER_
#include "../pchheader.hpp"
#include "../util/util.hpp"
#include "../util/merkle_hash_tree.hpp"
#include "../pchheader.hpp"
#include "../ledger/ledger_query.hpp"
namespace msg::usrmsg
{
@@ -32,6 +33,9 @@ namespace msg::usrmsg
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list) const;
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result) const;
int parse(std::string_view message);
int extract_type(std::string &extracted_type) const;
@@ -42,6 +46,8 @@ namespace msg::usrmsg
int extract_input_container(std::string &input, std::string &nonce,
uint64_t &max_lcl_seq_no, std::string_view encoded_content) const;
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const;
};
} // namespace msg::usrmsg

View File

@@ -4,6 +4,7 @@
// Enable boost strack trace.
#define BOOST_STACKTRACE_USE_BACKTRACE
#include <bitset>
#include <blake3.h>
#include <boost/stacktrace.hpp>
#include <chrono>

View File

@@ -234,10 +234,24 @@ namespace usr
}
else if (msg_type == msg::usrmsg::MSGTYPE_STAT)
{
std::vector<uint8_t> msg;
std::vector<uint8_t> resp;
const p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
parser.create_status_response(msg, lcl_id.seq_no, lcl_id.hash.to_string_view());
user.session.send(msg);
parser.create_status_response(resp, lcl_id.seq_no, lcl_id.hash.to_string_view());
user.session.send(resp);
return 0;
}
else if (msg_type == msg::usrmsg::MSGTYPE_LEDGER_QUERY)
{
ledger::query::query_request req;
std::string id;
if (parser.extract_ledger_query(req, id) == -1)
return -1;
const ledger::query::query_result result = ledger::query::execute(user.pubkey, req);
std::vector<uint8_t> resp;
parser.create_ledger_query_response(resp, id, result);
user.session.send(resp);
return 0;
}
else

View File

@@ -7,6 +7,7 @@ namespace util
{
merkle_hash_tree::merkle_hash_tree(const size_t block_size) : block_size(block_size)
{
root.hash.resize(BLAKE3_OUT_LEN);
}
void merkle_hash_tree::create_groups(std::list<merkle_hash_node> &nodes)
@@ -76,14 +77,9 @@ namespace util
return new_root;
}
bool merkle_hash_tree::empty()
{
return root.hash.empty();
}
void merkle_hash_tree::clear()
{
root.hash.clear();
root.hash = std::string(BLAKE3_OUT_LEN, 0);
root.children.clear();
}

View File

@@ -26,7 +26,6 @@ namespace util
void populate(const std::vector<std::string_view> &hashes);
const std::string root_hash();
const merkle_hash_node collapse(std::string_view retain_hash);
bool empty();
void clear();
};
} // namespace util