tx handler

This commit is contained in:
CJ Cobb
2020-12-23 13:18:39 -05:00
parent 251c6f6c49
commit 943cac57ea
10 changed files with 314 additions and 37 deletions

View File

@@ -60,7 +60,9 @@ target_sources(reporting PRIVATE
reporting/Pg.cpp
reporting/DBHelpers.cpp
reporting/ReportingETL.cpp
handlers/AccountInfo.cpp)
handlers/AccountInfo.cpp
handlers/Tx.cpp
handlers/RPCHelpers.cpp)
message(${Boost_LIBRARIES})

68
handlers/RPCHelpers.cpp Normal file
View File

@@ -0,0 +1,68 @@
#include <handlers/RPCHelpers.h>
std::optional<ripple::AccountID>
accountFromStringStrict(std::string const& account)
{
boost::optional<ripple::AccountID> result;
auto const publicKey = ripple::parseBase58<ripple::PublicKey>(
ripple::TokenType::AccountPublic, account);
if (publicKey)
result = ripple::calcAccountID(*publicKey);
else
result = ripple::parseBase58<ripple::AccountID>(account);
if (result)
return result.value();
else
return {};
}
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>
deserializeTxPlusMeta(
std::pair<std::vector<unsigned char>, std::vector<unsigned char>> const&
blobs)
{
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>
result;
{
ripple::SerialIter s{blobs.first.data(), blobs.first.size()};
result.first = std::make_shared<ripple::STTx const>(s);
}
{
ripple::SerialIter s{blobs.second.data(), blobs.second.size()};
result.second =
std::make_shared<ripple::STObject const>(s, ripple::sfMetadata);
}
return result;
}
boost::json::object
getJson(ripple::STBase const& obj)
{
auto start = std::chrono::system_clock::now();
boost::json::value value = boost::json::parse(
obj.getJson(ripple::JsonOptions::none).toStyledString());
auto end = std::chrono::system_clock::now();
value.as_object()["deserialization_time_microseconds"] =
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count();
return value.as_object();
}
boost::json::object
getJson(ripple::SLE const& sle)
{
auto start = std::chrono::system_clock::now();
boost::json::value value = boost::json::parse(
sle.getJson(ripple::JsonOptions::none).toStyledString());
auto end = std::chrono::system_clock::now();
value.as_object()["deserialization_time_microseconds"] =
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count();
return value.as_object();
}

View File

@@ -3,36 +3,22 @@
#define XRPL_REPORTING_RPCHELPERS_H_INCLUDED
#include <ripple/protocol/STLedgerEntry.h>
#include <ripple/protocol/STTx.h>
#include <boost/json.hpp>
std::optional<ripple::AccountID>
accountFromStringStrict(std::string const& account)
{
boost::optional<ripple::AccountID> result;
accountFromStringStrict(std::string const& account);
auto const publicKey = ripple::parseBase58<ripple::PublicKey>(
ripple::TokenType::AccountPublic, account);
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>
deserializeTxPlusMeta(
std::pair<std::vector<unsigned char>, std::vector<unsigned char>> const&
blobs);
if (publicKey)
result = ripple::calcAccountID(*publicKey);
else
result = ripple::parseBase58<ripple::AccountID>(account);
if (result)
return result.value();
else
return {};
}
boost::json::object
getJson(ripple::SLE const& sle)
{
auto start = std::chrono::system_clock::now();
boost::json::value value = boost::json::parse(
sle.getJson(ripple::JsonOptions::none).toStyledString());
auto end = std::chrono::system_clock::now();
value.as_object()["deserialization_time_microseconds"] =
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count();
return value.as_object();
}
getJson(ripple::STBase const& obj);
boost::json::object
getJson(ripple::SLE const& sle);
#endif

69
handlers/Tx.cpp Normal file
View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2014 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <handlers/RPCHelpers.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
// {
// transaction: <hex>
// }
boost::json::object
doTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& postgres)
{
boost::json::object response;
if (!request.contains("transaction"))
{
response["error"] = "Please specify a transaction hash";
return response;
}
ripple::uint256 hash;
if (!hash.parseHex(request.at("transaction").as_string().c_str()))
{
response["error"] = "Error parsing transaction hash";
return response;
}
auto range = getLedgerRange(postgres);
if (!range)
{
response["error"] = "Database is empty";
return response;
}
auto dbResponse = backend.fetchTransaction(hash.data());
if (!dbResponse)
{
response["error"] = "Transaction not found in Cassandra";
response["ledger_range"] = std::to_string(range->lower()) + " - " +
std::to_string(range->upper());
return response;
}
auto [sttx, meta] = deserializeTxPlusMeta(dbResponse.value());
response["transaction"] = getJson(*sttx);
response["meta"] = getJson(*meta);
return response;
}

View File

@@ -1534,3 +1534,37 @@ getLedger(
return info;
}
std::optional<LedgerRange>
getLedgerRange(std::shared_ptr<PgPool>& pgPool)
{
auto range = PgQuery(pgPool)("SELECT complete_ledgers()");
if (!range)
return {};
std::string res{range.c_str()};
try
{
size_t minVal = 0;
size_t maxVal = 0;
if (res == "empty" || res == "error" || res.empty())
return {};
else if (size_t delim = res.find('-'); delim != std::string::npos)
{
minVal = std::stol(res.substr(0, delim));
maxVal = std::stol(res.substr(delim + 1));
}
else
{
minVal = maxVal = std::stol(res);
}
return LedgerRange{minVal, maxVal};
}
catch (std::exception&)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Error parsing result of getCompleteLedgers()";
}
return {};
}

View File

@@ -22,6 +22,7 @@
#include <ripple/basics/chrono.h>
#include <ripple/ledger/ReadView.h>
#include <boost/icl/closed_interval.hpp>
#include <boost/json.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/log/trivial.hpp>
@@ -523,4 +524,8 @@ getLedger(
std::variant<std::monostate, ripple::uint256, uint32_t> const& whichLedger,
std::shared_ptr<PgPool>& pgPool);
using LedgerRange = boost::icl::closed_interval<uint32_t>;
std::optional<LedgerRange>
getLedgerRange(std::shared_ptr<PgPool>& pgPool);
#endif // RIPPLE_CORE_PG_H_INCLUDED

View File

@@ -700,6 +700,83 @@ public:
<< " microseconds";
return result;
}
std::optional<
std::pair<std::vector<unsigned char>, std::vector<unsigned char>>>
fetchTransaction(void const* hash) const
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(selectTransaction_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(hash), 32);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows";
cass_result_free(res);
return {};
}
cass_byte_t const* txBuf;
std::size_t txBufSize;
rc = cass_value_get_bytes(
cass_row_get_column(row, 0), &txBuf, &txBufSize);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
std::vector<unsigned char> txResult{txBuf, txBuf + txBufSize};
cass_byte_t const* metaBuf;
std::size_t metaBufSize;
rc = cass_value_get_bytes(
cass_row_get_column(row, 0), &metaBuf, &metaBufSize);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
std::vector<unsigned char> metaResult{metaBuf, metaBuf + metaBufSize};
cass_result_free(res);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "Fetched from cassandra in "
<< std::chrono::duration_cast<std::chrono::microseconds>(
end - start)
.count()
<< " microseconds";
return {{txResult, metaResult}};
}
/*
std::pair<std::vector<std::pair<uint256, std::shared_ptr<Blob>>>,
Status> doUpperBound(uint256 marker, std::uint32_t seq, std::uint32_t limit)

View File

@@ -326,6 +326,12 @@ public:
return flatMapBackend_;
}
std::shared_ptr<PgPool>&
getPgPool()
{
return pgPool_;
}
private:
void
doWork();

16
test.py
View File

@@ -22,13 +22,24 @@ async def account_info(ip, port):
except websockets.exceptions.ConnectionClosedError as e:
print(e)
async def tx(ip, port, tx_hash):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"tx","transaction":tx_hash}))
res = json.loads(await ws.recv())
print(res)
except websockets.exceptions.ConnectionClosedError as e:
print(e)
parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info"])
parser.add_argument('action', choices=["account_info", "tx"])
parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080')
parser.add_argument('--hash')
@@ -39,6 +50,9 @@ def run(args):
if args.action == "account_info":
asyncio.get_event_loop().run_until_complete(
account_info(args.ip, args.port))
if args.action == "tx":
asyncio.get_event_loop().run_until_complete(
tx(args.ip, args.port, args.hash))
else:
print("incorrect arguments")

View File

@@ -47,15 +47,23 @@ doAccountInfo(
boost::json::object const& request,
CassandraFlatMapBackend const& backend);
boost::json::object
doTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool);
boost::json::object
buildResponse(
boost::json::object const& request,
CassandraFlatMapBackend const& backend)
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool)
{
std::string command = request.at("command").as_string().c_str();
boost::json::object response;
switch (commandMap[command])
{
case tx:
return doTx(request, backend, pgPool);
break;
case account_tx:
break;
@@ -81,14 +89,17 @@ class session : public std::enable_shared_from_this<session>
{
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_;
std::string response_;
CassandraFlatMapBackend const& backend_;
std::shared_ptr<PgPool>& pgPool_;
public:
// Take ownership of the socket
explicit session(
boost::asio::ip::tcp::socket&& socket,
CassandraFlatMapBackend const& backend)
: ws_(std::move(socket)), backend_(backend)
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool)
: ws_(std::move(socket)), backend_(backend), pgPool_(pgPool)
{
}
@@ -164,13 +175,14 @@ public:
boost::json::value raw = boost::json::parse(msg);
// BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed";
boost::json::object request = raw.as_object();
auto response = buildResponse(request, backend_);
auto response = buildResponse(request, backend_, pgPool_);
BOOST_LOG_TRIVIAL(debug) << __func__ << response;
response_ = boost::json::serialize(response);
// Echo the message
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(boost::json::serialize(response)),
boost::asio::buffer(response_),
boost::beast::bind_front_handler(
&session::on_write, shared_from_this()));
}
@@ -199,13 +211,15 @@ class listener : public std::enable_shared_from_this<listener>
boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_;
CassandraFlatMapBackend const& backend_;
std::shared_ptr<PgPool>& pgPool_;
public:
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint,
CassandraFlatMapBackend const& backend)
: ioc_(ioc), acceptor_(ioc), backend_(backend)
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool)
: ioc_(ioc), acceptor_(ioc), backend_(backend), pgPool_(pgPool)
{
boost::beast::error_code ec;
@@ -270,7 +284,8 @@ private:
else
{
// Create the session and run it
std::make_shared<session>(std::move(socket), backend_)->run();
std::make_shared<session>(std::move(socket), backend_, pgPool_)
->run();
}
// Accept another connection
@@ -376,7 +391,8 @@ main(int argc, char* argv[])
std::make_shared<listener>(
ioc,
boost::asio::ip::tcp::endpoint{address, port},
etl.getFlatMapBackend())
etl.getFlatMapBackend(),
etl.getPgPool())
->run();
// Run the I/O service on the requested number of threads