ledger_data

This commit is contained in:
CJ Cobb
2021-01-11 15:20:49 -05:00
parent 850d7a4a45
commit 4360d4b219
5 changed files with 399 additions and 113 deletions

View File

@@ -63,7 +63,8 @@ target_sources(reporting PRIVATE
handlers/AccountInfo.cpp
handlers/Tx.cpp
handlers/RPCHelpers.cpp
handlers/AccountTx.cpp)
handlers/AccountTx.cpp
handlers/LedgerData.cpp)
message(${Boost_LIBRARIES})

140
handlers/LedgerData.cpp Normal file
View File

@@ -0,0 +1,140 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2014 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/protocol/STLedgerEntry.h>
#include <boost/json.hpp>
#include <handlers/RPCHelpers.h>
#include <reporting/ReportingBackend.h>
// Get state nodes from a ledger
// Inputs:
// limit: integer, maximum number of entries
// marker: opaque, resume point
// binary: boolean, format
// type: string // optional, defaults to all ledger node types
// Outputs:
// ledger_hash: chosen ledger's hash
// ledger_index: chosen ledger's index
// state: array of state nodes
// marker: resume point, if any
//
//
boost::json::object
doLedgerData(
boost::json::object const& request,
CassandraFlatMapBackend const& backend)
{
boost::json::object response;
uint32_t ledger = request.at("ledger_index").as_int64();
std::optional<int64_t> marker = request.contains("marker")
? request.at("marker").as_int64()
: std::optional<int64_t>{};
size_t limit =
request.contains("limit") ? request.at("limit").as_int64() : 200;
auto start = std::chrono::system_clock::now();
auto [results, returnedMarker] =
backend.doUpperBound(marker, ledger, limit);
auto end = std::chrono::system_clock::now();
auto time =
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count();
boost::json::array objects;
for (auto const& [key, object] : results)
{
ripple::STLedgerEntry sle{
ripple::SerialIter{object.data(), object.size()}, key};
objects.push_back(getJson(sle));
}
response["objects"] = objects;
if (returnedMarker)
response["marker"] = returnedMarker.value();
response["num_results"] = results.size();
response["db_time"] = time;
return response;
}
/*
std::pair<org::xrpl::rpc::v1::GetLedgerDataResponse, grpc::Status>
doLedgerDataGrpc(
RPC::GRPCContext<org::xrpl::rpc::v1::GetLedgerDataRequest>& context)
{
org::xrpl::rpc::v1::GetLedgerDataRequest& request = context.params;
org::xrpl::rpc::v1::GetLedgerDataResponse response;
grpc::Status status = grpc::Status::OK;
std::shared_ptr<ReadView const> ledger;
if (RPC::ledgerFromRequest(ledger, context))
{
grpc::Status errorStatus{
grpc::StatusCode::NOT_FOUND, "ledger not found"};
return {response, errorStatus};
}
ReadView::key_type key = ReadView::key_type();
if (request.marker().size() != 0)
{
key = uint256::fromVoid(request.marker().data());
if (key.size() != request.marker().size())
{
grpc::Status errorStatus{
grpc::StatusCode::INVALID_ARGUMENT, "marker malformed"};
return {response, errorStatus};
}
}
auto e = ledger->sles.end();
ReadView::key_type stopKey = ReadView::key_type();
if (request.end_marker().size() != 0)
{
stopKey = uint256::fromVoid(request.end_marker().data());
if (stopKey.size() != request.marker().size())
{
grpc::Status errorStatus{
grpc::StatusCode::INVALID_ARGUMENT, "end marker malformed"};
return {response, errorStatus};
}
e = ledger->sles.upper_bound(stopKey);
}
int maxLimit = RPC::Tuning::pageLength(true);
for (auto i = ledger->sles.upper_bound(key); i != e; ++i)
{
auto sle = ledger->read(keylet::unchecked((*i)->key()));
if (maxLimit-- <= 0)
{
// Stop processing before the current key.
auto k = sle->key();
--k;
response.set_marker(k.data(), k.size());
break;
}
auto stateObject = response.mutable_ledger_objects()->add_objects();
Serializer s;
sle->add(s);
stateObject->set_data(s.peekData().data(), s.getLength());
stateObject->set_key(sle->key().data(), sle->key().size());
}
return {response, status};
}
*/

View File

@@ -80,6 +80,8 @@ private:
const CassPrepared* insertTransaction_ = nullptr;
const CassPrepared* selectTransaction_ = nullptr;
const CassPrepared* selectObject_ = nullptr;
const CassPrepared* upperBound_ = nullptr;
const CassPrepared* getToken_ = nullptr;
// io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_;
@@ -547,10 +549,10 @@ public:
*/
cass_future_free(prepare_future);
/*
query.str("");
query << "SELECT hash, entry FROM " << stateTableName
<< " WHERE TOKEN(hash) >= TOKEN(?) and seq <= ?"
query = {};
query << "SELECT key, object FROM " << tableName << "flat "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
<< " and object > 0x"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
@@ -566,9 +568,9 @@ public:
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing state table select : " << rc
<< ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
ss << "nodestore: Error preparing upperBound : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str();
continue;
}
@@ -579,7 +581,30 @@ public:
// object
//
cass_future_free(prepare_future);
*/
query = {};
query << "SELECT TOKEN(key) FROM " << tableName << "flat "
<< " WHERE key = ? LIMIT 1";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
getToken_ = cass_future_get_prepared(prepare_future);
setupPreparedStatements = true;
}
@@ -621,6 +646,16 @@ public:
cass_prepared_free(selectObject_);
selectObject_ = nullptr;
}
if (upperBound_)
{
cass_prepared_free(upperBound_);
upperBound_ = nullptr;
}
if (getToken_)
{
cass_prepared_free(getToken_);
getToken_ = nullptr;
}
work_.reset();
ioThread_.join();
}
@@ -702,6 +737,64 @@ public:
<< " microseconds";
return result;
}
std::optional<int64_t>
getToken(void const* key) const
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(getToken_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(key), 32);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows";
cass_result_free(res);
return {};
}
cass_int64_t token;
rc = cass_value_get_int64(cass_row_get_column(row, 0), &token);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
cass_result_free(res);
if (token == INT64_MAX)
return {};
return token + 1;
}
std::optional<
std::pair<std::vector<unsigned char>, std::vector<unsigned char>>>
fetchTransaction(void const* hash) const
@@ -779,26 +872,29 @@ public:
<< " microseconds";
return {{txResult, metaResult}};
}
/*
std::pair<std::vector<std::pair<uint256, std::shared_ptr<Blob>>>,
Status> doUpperBound(uint256 marker, std::uint32_t seq, std::uint32_t limit)
override
struct LedgerObject
{
ripple::uint256 key;
std::vector<unsigned char> blob;
};
std::pair<std::vector<LedgerObject>, std::optional<int64_t>>
doUpperBound(
std::optional<int64_t> marker,
std::uint32_t seq,
std::uint32_t limit) const
{
CassStatement* statement = cass_prepared_bind(upperBound_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
int64_t markerVal = marker ? marker.value() : INT64_MIN;
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(marker.begin()),
keyBytes_);
CassError rc = cass_statement_bind_int64(statement, 0, markerVal);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra hash to doUpperBound query: " << rc
<< ", " << cass_error_desc(rc);
return {{}, backendError};
return {};
}
rc = cass_statement_bind_int64(statement, 1, seq);
@@ -806,10 +902,9 @@ public:
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra seq to doUpperBound query: " << rc <<
", "
<< "Binding Cassandra seq to doUpperBound query: " << rc << ", "
<< cass_error_desc(rc);
return {{}, backendError};
return {};
}
rc = cass_statement_bind_int32(statement, 2, limit + 1);
@@ -819,7 +914,7 @@ public:
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra limit to doUpperBound query: " << rc
<< ", " << cass_error_desc(rc);
return {{}, backendError};
return {};
}
CassFuture* fut;
@@ -832,7 +927,6 @@ public:
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
++counters_.readRetries;
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
@@ -842,7 +936,7 @@ public:
cass_statement_free(statement);
cass_future_free(fut);
std::vector<std::pair<uint256, std::shared_ptr<Blob>>> result = {};
std::vector<LedgerObject> result;
CassIterator* iter = cass_iterator_from_result(res);
while (cass_iterator_next(iter))
@@ -860,13 +954,10 @@ public:
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
return {{}, backendError};
}
uint256 resultHash = uint256::fromVoid(outData);
ripple::uint256 resultHash = ripple::uint256::fromVoid(outData);
CassValue const* entry = cass_row_get_column(row, 1);
rc = cass_value_get_bytes(entry, &outData, &outSize);
@@ -876,25 +967,26 @@ public:
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
return {{}, backendError};
}
nudb::detail::buffer bf;
auto [data, size] = lz4_decompress(outData, outSize, bf);
auto slice = Slice(data, size);
std::shared_ptr<Blob> resultBlob =
std::make_shared<Blob>(slice.begin(), slice.end());
if (outSize > 0)
{
std::vector<unsigned char> resultBlob{
outData, outData + outSize};
result.push_back({resultHash, resultBlob});
}
return {result, Status::ok};
}
*/
if (result.size())
{
auto token = getToken(result[result.size() - 1].key.data());
assert(token);
return {result, token};
}
return {result, {}};
}
bool
canFetchBatch()

51
test.py
View File

@@ -44,19 +44,58 @@ async def tx(ip, port, tx_hash):
await ws.send(json.dumps({"command":"tx","transaction":tx_hash}))
res = json.loads(await ws.recv())
print(res)
except websockets.exceptions.ConnectionClosedError as e:
except websockets.exceptions.connectionclosederror as e:
print(e)
async def ledger_data(ip, port, ledger, limit):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"limit":int(limit)}))
res = json.loads(await ws.recv())
print(res)
except websockets.exceptions.connectionclosederror as e:
print(e)
async def ledger_data_full(ip, port, ledger):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
marker = None
while True:
if marker is None:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger)}))
res = json.loads(await ws.recv())
print(res)
else:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"marker":marker}))
res = json.loads(await ws.recv())
if "marker" in res:
marker = int(res["marker"])
print(marker)
elif "result" in res and "marker" in res["result"]:
marker = res["result"]["marker"]
print(marker)
else:
print("done")
break
except websockets.exceptions.connectionclosederror as e:
print(e)
parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "account_tx"])
parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full"])
parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080')
parser.add_argument('--hash')
parser.add_argument('--account', default="rLC64xxNif3GiY9FQnbaM4kcE6VvDhwRod")
parser.add_argument('--ledger')
parser.add_argument('--limit', default='200')
@@ -73,6 +112,12 @@ def run(args):
elif args.action == "account_tx":
asyncio.get_event_loop().run_until_complete(
account_tx(args.ip, args.port, args.account))
elif args.action == "ledger_data":
asyncio.get_event_loop().run_until_complete(
ledger_data(args.ip, args.port, args.ledger, args.limit))
elif args.action == "ledger_data_full":
asyncio.get_event_loop().run_until_complete(
ledger_data_full(args.ip, args.port, args.ledger))
else:
print("incorrect arguments")

View File

@@ -35,12 +35,13 @@
#include <vector>
//------------------------------------------------------------------------------
enum RPCCommand { tx, account_tx, ledger, account_info };
enum RPCCommand { tx, account_tx, ledger, account_info, ledger_data };
std::unordered_map<std::string, RPCCommand> commandMap{
{"tx", tx},
{"account_tx", account_tx},
{"ledger", ledger},
{"account_info", account_info}};
{"account_info", account_info},
{"ledger_data", ledger_data}};
boost::json::object
doAccountInfo(
@@ -57,6 +58,10 @@ doAccountTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool);
boost::json::object
doLedgerData(
boost::json::object const& request,
CassandraFlatMapBackend const& backend);
boost::json::object
buildResponse(
@@ -76,6 +81,9 @@ buildResponse(
break;
case ledger:
break;
case ledger_data:
return doLedgerData(request, backend);
break;
case account_info:
return doAccountInfo(request, backend, pgPool);
break;