bug fixes in both backends. add ledger_range rpc. improve test scripts

This commit is contained in:
CJ Cobb
2021-03-05 16:46:55 -05:00
parent 7a6dfe5967
commit e3a121e571
13 changed files with 179 additions and 51 deletions

View File

@@ -204,6 +204,27 @@ public:
curBindingIndex_++;
}
void
bindUInt(uint32_t value)
{
if (!statement_)
throw std::runtime_error(
"CassandraStatement::bindUInt - statement_ is null");
BOOST_LOG_TRIVIAL(info)
<< std::to_string(curBindingIndex_) << " " << std::to_string(value);
CassError rc =
cass_statement_bind_int32(statement_, curBindingIndex_, value);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Error binding uint to statement: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
curBindingIndex_++;
}
void
bindInt(uint32_t value)
{
@@ -278,11 +299,27 @@ class CassandraResult
size_t curGetIndex_ = 0;
public:
CassandraResult()
CassandraResult() : result_(nullptr), row_(nullptr), iter_(nullptr)
{
}
CassandraResult&
operator=(CassandraResult&& other)
{
result_ = other.result_;
row_ = other.row_;
iter_ = other.iter_;
curGetIndex_ = other.curGetIndex_;
other.result_ = nullptr;
other.row_ = nullptr;
other.iter_ = nullptr;
other.curGetIndex_ = 0;
return *this;
}
CassandraResult(CassandraResult const& other) = delete;
CassandraResult&
operator=(CassandraResult const& other) = delete;
CassandraResult(CassResult const* result) : result_(result)
{
@@ -351,7 +388,7 @@ public:
getUInt256()
{
if (!row_)
throw std::runtime_error("CassandraResult::getBytes - no result");
throw std::runtime_error("CassandraResult::uint256 - no result");
cass_byte_t const* buf;
std::size_t bufSize;
CassError rc = cass_value_get_bytes(
@@ -359,7 +396,7 @@ public:
if (rc != CASS_OK)
{
std::stringstream msg;
msg << "CassandraResult::getBytes - error getting value: " << rc
msg << "CassandraResult::getuint256 - error getting value: " << rc
<< ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << msg.str();
throw std::runtime_error(msg.str());
@@ -420,7 +457,8 @@ public:
~CassandraResult()
{
cass_result_free(result_);
if (result_ != nullptr)
cass_result_free(result_);
}
};
template <class T, class F>
@@ -440,7 +478,7 @@ public:
}
else
{
result_ = CassandraResult(cass_future_get_result(fut));
result_ = std::move(CassandraResult(cass_future_get_result(fut)));
}
}
@@ -531,7 +569,7 @@ private:
// maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded
uint32_t maxRequestsOutstanding = 10000000;
uint32_t maxRequestsOutstanding = 10000;
mutable std::atomic_uint32_t numRequestsOutstanding_ = 0;
// mutex and condition_variable to limit the number of concurrent in flight
@@ -606,7 +644,7 @@ public:
cursor->ledgerSequence, cursor->transactionIndex);
else
statement.bindIntTuple(INT32_MAX, INT32_MAX);
statement.bindInt(limit);
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
if (!result.hasResult())
{
@@ -618,6 +656,8 @@ public:
size_t numRows = result.numRows();
bool returnCursor = numRows == limit;
std::optional<AccountTransactionsCursor> retCursor;
BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows);
do
{
hashes.push_back(result.getUInt256());
@@ -706,6 +746,8 @@ public:
++numRequestsOutstanding_;
writeLedgerHeader(*headerCb, false);
writeLedgerHash(*hashCb, false);
ledgerSequence_ = ledgerInfo.seq;
isFirstLedger_ = isFirst;
}
void
writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const
@@ -820,10 +862,10 @@ public:
return {{result.getBytes(), result.getBytes()}};
}
LedgerPage
fetchLedgerPage(
fetchLedgerPage2(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const override
std::uint32_t limit) const
{
BOOST_LOG_TRIVIAL(trace) << __func__;
CassandraStatement statement{selectLedgerPageKeys_};
@@ -839,7 +881,7 @@ public:
statement.bindInt(intCursor);
statement.bindInt(ledgerSequence);
statement.bindInt(ledgerSequence);
statement.bindInt(limit);
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
@@ -867,56 +909,80 @@ public:
return {{}, {}};
}
LedgerPage
fetchLedgerPage2(
std::optional<ripple::uint256> cursor,
fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
std::uint32_t limit) const override
{
BOOST_LOG_TRIVIAL(trace) << __func__;
std::optional<ripple::uint256> currentCursor = cursor;
std::vector<LedgerObject> objects;
uint32_t curLimit = limit;
while (objects.size() < limit)
{
CassandraStatement statement{selectLedgerPage_};
int64_t intCursor = INT64_MIN;
if (cursor)
if (currentCursor)
{
auto token = getToken(cursor->data());
auto token = getToken(currentCursor->data());
if (token)
intCursor = *token;
}
BOOST_LOG_TRIVIAL(trace)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cursor = " << std::to_string(intCursor)
<< " , sequence = " << std::to_string(ledgerSequence)
<< ", - limit = " << std::to_string(limit);
statement.bindInt(intCursor);
statement.bindInt(ledgerSequence);
statement.bindInt(limit);
statement.bindUInt(curLimit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
size_t prevSize = objects.size();
do
{
objects.push_back({result.getUInt256(), result.getBytes()});
ripple::uint256 key = result.getUInt256();
std::vector<unsigned char> object = result.getBytes();
if (object.size())
{
objects.push_back({std::move(key), std::move(object)});
}
} while (result.nextRow());
size_t prevBatchSize = objects.size() - prevSize;
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " - added to objects. size = " << objects.size();
if (result.numRows() < curLimit)
{
currentCursor = {};
break;
}
if (objects.size() < limit)
{
double sparsity = limit / objects.size();
limit = (limit - objects.size()) * sparsity;
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " cur limit = " << std::to_string(curLimit)
<< " , numRows = " << std::to_string(prevBatchSize);
double sparsity =
(double)(curLimit + 1) / (double)(prevBatchSize + 1);
curLimit = (limit - objects.size()) * sparsity;
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " - sparsity = " << std::to_string(sparsity)
<< " , limit = " << std::to_string(limit);
<< " , curLimit = " << std::to_string(curLimit);
}
assert(objects.size());
cursor = objects[objects.size() - 1].key;
currentCursor = objects[objects.size() - 1].key;
}
}
if (objects.size())
return {objects, cursor};
return {objects, currentCursor};
return {{}, {}};
}
@@ -939,11 +1005,13 @@ public:
ripple::uint256 zero = {};
statement.bindBytes(zero);
}
statement.bindInt(limit);
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
if (!result)
return {{}, {}};
do
{
keys.push_back(result.getUInt256());
@@ -1003,8 +1071,8 @@ public:
fetchTransactions(std::vector<ripple::uint256> const& hashes) const override
{
std::size_t const numHashes = hashes.size();
BOOST_LOG_TRIVIAL(trace)
<< "Fetching " << numHashes << " records from Cassandra";
BOOST_LOG_TRIVIAL(debug)
<< "Fetching " << numHashes << " transactions from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
@@ -1024,8 +1092,8 @@ public:
return numFinished == numHashes;
});
BOOST_LOG_TRIVIAL(trace)
<< "Fetched " << numHashes << " records from Cassandra";
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << numHashes << " transactions from Cassandra";
return results;
}
@@ -1278,6 +1346,7 @@ public:
statement.bindBytes(account);
statement.bindIntTuple(
data.data.ledgerSequence, data.data.transactionIndex);
statement.bindBytes(data.data.txHash);
executeAsyncWrite(
statement, flatMapWriteAccountTxCallback, data, isRetry);
@@ -1525,6 +1594,10 @@ public:
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
throw std::runtime_error("invalid query");
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);