the best ledger_data implementation yet

This commit is contained in:
CJ Cobb
2021-03-25 16:56:54 -04:00
parent 2af6d72d7e
commit 168283f0aa
5 changed files with 491 additions and 191 deletions

View File

@@ -177,6 +177,8 @@ flatMapReadCallback(CassFuture* fut, void* cbData)
*static_cast<CassandraBackend::ReadCallbackData*>(cbData);
auto func = [](auto& params) { params.backend.read(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func};
if (asyncResult.timedOut())
requestParams.result.transaction = {0};
CassandraResult& result = asyncResult.getResult();
if (!!result)
@@ -196,6 +198,8 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData)
*static_cast<CassandraBackend::ReadObjectCallbackData*>(cbData);
auto func = [](auto& params) { params.backend.readObject(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func};
if (asyncResult.timedOut())
requestParams.result = {0};
CassandraResult& result = asyncResult.getResult();
if (!!result)
@@ -265,6 +269,413 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
} while (result.nextRow());
return hashes;
}
LedgerPage
CassandraBackend::fetchLedgerPage2(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
BOOST_LOG_TRIVIAL(trace) << __func__;
std::optional<ripple::uint256> currentCursor = cursor;
std::vector<LedgerObject> objects;
uint32_t curLimit = limit;
while (objects.size() < limit)
{
CassandraStatement statement{selectLedgerPage_};
int64_t intCursor = INT64_MIN;
if (currentCursor)
{
auto token = getToken(currentCursor->data());
if (token)
intCursor = *token;
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cursor = " << std::to_string(intCursor)
<< " , sequence = " << std::to_string(ledgerSequence)
<< ", - limit = " << std::to_string(limit);
statement.bindInt(intCursor);
statement.bindInt(ledgerSequence);
statement.bindUInt(curLimit);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
size_t prevSize = objects.size();
do
{
std::vector<unsigned char> object = result.getBytes();
if (object.size())
{
objects.push_back({result.getUInt256(), std::move(object)});
}
} while (result.nextRow());
size_t prevBatchSize = objects.size() - prevSize;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - added to objects. size = " << objects.size();
if (result.numRows() < curLimit)
{
currentCursor = {};
break;
}
if (objects.size() < limit)
{
curLimit = 2048;
}
assert(objects.size());
currentCursor = objects[objects.size() - 1].key;
}
}
if (objects.size())
return {objects, currentCursor};
return {{}, {}};
}
std::vector<LedgerObject>
CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
{
CassandraStatement statement{selectLedgerDiff_};
statement.bindInt(ledgerSequence);
CassandraResult result = executeSyncRead(statement);
if (!result)
return {};
std::vector<LedgerObject> objects;
do
{
objects.push_back({result.getUInt256(), result.getBytes()});
} while (result.nextRow());
return objects;
}
LedgerPage
CassandraBackend::fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
LedgerPage page;
bool cursorIsInt = false;
if (cursor && !cursor->isZero())
{
bool foundNonZero = false;
for (size_t i = 0; i < 28 && !foundNonZero; ++i)
{
if (cursor->data()[i] != 0)
foundNonZero = true;
}
cursorIsInt = !foundNonZero;
}
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor)
<< " : cursorIsInt = " << std::to_string(cursorIsInt);
if (!cursor || !cursorIsInt)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger";
CassandraStatement statement{selectKeys_};
uint32_t upper = (ledgerSequence >> indexerShift_) << indexerShift_;
if (upper != ledgerSequence)
upper += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " upper is " << std::to_string(upper);
statement.bindInt(upper);
if (cursor)
statement.bindBytes(*cursor);
else
{
ripple::uint256 zero;
statement.bindBytes(zero);
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger. Got keys";
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
std::vector<ripple::uint256> keys;
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Using base ledger. Read keys";
auto objects = fetchLedgerObjects(keys, ledgerSequence);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Using base ledger. Got objects";
if (objects.size() != keys.size())
throw std::runtime_error(
"Mismatch in size of objects and keys");
if (keys.size() == limit)
page.cursor = keys[keys.size() - 1];
else if (ledgerSequence < upper)
page.cursor = upper - 1;
if (cursor)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor);
for (size_t i = 0; i < objects.size(); ++i)
{
auto& obj = objects[i];
auto& key = keys[i];
if (obj.size())
{
page.objects.push_back({std::move(key), std::move(obj)});
}
}
return page;
}
}
else
{
uint32_t curSequence = 0;
for (size_t i = 28; i < 32; ++i)
{
uint32_t digit = cursor->data()[i];
digit = digit << (8 * (31 - i));
curSequence += digit;
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Using ledger diffs. Sequence = " << curSequence
<< " size_of uint32_t " << std::to_string(sizeof(uint32_t))
<< " cursor = " << ripple::strHex(*cursor);
auto diff = fetchLedgerDiff(curSequence);
BOOST_LOG_TRIVIAL(info) << __func__ << " diff size = " << diff.size();
std::vector<ripple::uint256> deletedKeys;
for (auto& obj : diff)
{
if (obj.blob.size() == 0)
deletedKeys.push_back(std::move(obj.key));
}
auto objects = fetchLedgerObjects(deletedKeys, ledgerSequence);
if (objects.size() != deletedKeys.size())
throw std::runtime_error("Mismatch in size of objects and keys");
BOOST_LOG_TRIVIAL(info)
<< __func__ << " deleted keys size = " << deletedKeys.size();
for (size_t i = 0; i < objects.size(); ++i)
{
auto& obj = objects[i];
auto& key = deletedKeys[i];
if (obj.size())
{
page.objects.push_back({std::move(key), std::move(obj)});
}
}
if (curSequence - 1 >= ledgerSequence)
page.cursor = curSequence - 1;
return page;
// do the diff algorithm
}
return {{}, {}};
}
std::vector<Blob>
CassandraBackend::fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const
{
std::size_t const numKeys = keys.size();
BOOST_LOG_TRIVIAL(trace)
<< "Fetching " << numKeys << " records from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<Blob> results{numKeys};
std::vector<std::shared_ptr<ReadObjectCallbackData>> cbs;
cbs.reserve(numKeys);
for (std::size_t i = 0; i < keys.size(); ++i)
{
cbs.push_back(std::make_shared<ReadObjectCallbackData>(
*this, keys[i], sequence, results[i], cv, numFinished, numKeys));
readObject(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &numKeys]() { return numFinished == numKeys; });
for (auto const& res : results)
{
if (res.size() == 1 && res[0] == 0)
throw DatabaseTimeout();
}
BOOST_LOG_TRIVIAL(trace)
<< "Fetched " << numKeys << " records from Cassandra";
return results;
}
struct WriteKeyCallbackData
{
CassandraBackend const& backend;
ripple::uint256 key;
uint32_t ledgerSequence;
std::condition_variable& cv;
std::atomic_uint32_t& numRemaining;
std::mutex& mtx;
uint32_t currentRetries = 0;
WriteKeyCallbackData(
CassandraBackend const& backend,
ripple::uint256&& key,
uint32_t ledgerSequence,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numRemaining)
: backend(backend)
, key(std::move(key))
, ledgerSequence(ledgerSequence)
, cv(cv)
, mtx(mtx)
, numRemaining(numRemaining)
{
}
};
void
writeKeyCallback(CassFuture* fut, void* cbData);
void
writeKey(WriteKeyCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertKeyPreparedStatement()};
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.key);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeKeyCallback, cb, true);
}
void
writeKeyCallback(CassFuture* fut, void* cbData)
{
WriteKeyCallbackData& requestParams =
*static_cast<WriteKeyCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert key error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
writeKey(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numRemaining;
requestParams.cv.notify_one();
}
}
}
bool
CassandraBackend::writeKeys(uint32_t ledgerSequence) const
{
CassandraStatement statement{selectKeys_};
statement.bindInt(ledgerSequence);
ripple::uint256 zero;
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence)
<< " already indexed. Returning";
return false;
}
auto start = std::chrono::system_clock::now();
constexpr uint32_t limit = 2048;
std::vector<ripple::uint256> keys;
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
auto [objects, curCursor] =
fetchLedgerPage(cursor, ledgerSequence, limit);
cursor = curCursor;
for (auto& obj : objects)
{
keys.push_back(std::move(obj.key));
if (keys.size() % 100000 == 0)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Fetched "
<< std::to_string(keys.size()) << "keys";
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (mid - start).count() / 1000000000.0;
std::atomic_uint32_t numRemaining = keys.size();
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs;
cbs.reserve(keys.size());
uint32_t concurrentLimit = maxRequestsOutstanding / 2;
for (std::size_t i = 0; i < keys.size(); ++i)
{
cbs.push_back(std::make_shared<WriteKeyCallbackData>(
*this, std::move(keys[i]), ledgerSequence, cv, mtx, numRemaining));
writeKey(*cbs[i]);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numRemaining, i, concurrentLimit, &keys]() {
BOOST_LOG_TRIVIAL(trace)
<< std::to_string(i) << " " << std::to_string(numRemaining)
<< " " << std::to_string(keys.size()) << " "
<< std::to_string(concurrentLimit);
// keys.size() - i is number submitted. keys.size() -
// numRemaining is number completed Difference is num
// outstanding
return (i + 1 - (keys.size() - numRemaining)) < concurrentLimit;
});
if (i % 100000 == 0)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Submitted " << std::to_string(i)
<< " write requests. Completed "
<< (keys.size() - numRemaining);
}
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (end - mid).count() / 1000000000.0
<< ". Entire operation took " << (end - start).count() / 1000000000.0;
return true;
}
bool
CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
{
@@ -442,8 +853,7 @@ CassandraBackend::open()
std::string tablePrefix = getString("table_prefix");
if (tablePrefix.empty())
{
throw std::runtime_error(
"nodestore: Missing table name in Cassandra config");
BOOST_LOG_TRIVIAL(warning) << "Table prefix is empty";
}
cass_cluster_set_connect_timeout(cluster, 10000);
@@ -512,10 +922,10 @@ CassandraBackend::open()
continue;
query = {};
query
<< "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, transaction "
"blob, metadata blob)";
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
"transaction "
"blob, metadata blob)";
if (!executeSimpleStatement(query.str()))
continue;
@@ -539,9 +949,8 @@ CassandraBackend::open()
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys"
<< " ( key blob, created bigint, deleted bigint, PRIMARY KEY "
"(key, created)) with clustering order by (created "
"desc) ";
<< " ( sequence bigint, key blob, PRIMARY KEY "
"(sequence, key))";
if (!executeSimpleStatement(query.str()))
continue;
@@ -629,16 +1038,16 @@ CassandraBackend::open()
continue;
query = {};
query
<< "INSERT INTO " << tablePrefix << "transactions"
<< " (hash, ledger_sequence, transaction, metadata) VALUES (?, ?, "
"?, ?)";
query << "INSERT INTO " << tablePrefix << "transactions"
<< " (hash, ledger_sequence, transaction, metadata) VALUES "
"(?, ?, "
"?, ?)";
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tablePrefix << "keys"
<< " (key, created, deleted) VALUES (?, ?, ?)";
<< " (sequence, key) VALUES (?, ?)";
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
@@ -654,9 +1063,9 @@ CassandraBackend::open()
continue;
query = {};
query << "SELECT created FROM " << tablePrefix << "keys"
<< " WHERE key = ? ORDER BY created desc LIMIT 1";
if (!getCreated_.prepareStatement(query, session_.get()))
query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?";
if (!selectKeys_.prepareStatement(query, session_.get()))
continue;
query = {};
@@ -689,9 +1098,8 @@ CassandraBackend::open()
continue;
query = {};
query << "SELECT key FROM " << tablePrefix << "keys "
<< " WHERE TOKEN(key) >= ? and created <= ?"
<< " and deleted > ?"
query << "SELECT key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
if (!selectLedgerPageKeys_.prepareStatement(query, session_.get()))
@@ -757,9 +1165,9 @@ CassandraBackend::open()
continue;
query = {};
query
<< " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence in (?,null)";
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence in "
"(?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
@@ -781,6 +1189,11 @@ CassandraBackend::open()
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " SELECT key,object FROM " << tablePrefix
<< "objects WHERE sequence = ?";
if (!selectLedgerDiff_.prepareStatement(query, session_.get()))
continue;
setupPreparedStatements = true;
}
@@ -826,6 +1239,23 @@ CassandraBackend::open()
{
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
}
if (config_.contains("run_indexer"))
{
if (config_["run_indexer"].as_bool())
indexer_ = std::thread{[this]() {
auto seq = fetchLatestLedgerSequence();
if (seq)
{
auto base = (*seq >> indexerShift_) << indexerShift_;
BOOST_LOG_TRIVIAL(info)
<< "Running indexer. Ledger = " << std::to_string(base)
<< " latest = " << std::to_string(*seq);
writeKeys(base);
BOOST_LOG_TRIVIAL(info) << "Ran indexer";
}
}};
}
work_.emplace(ioContext_);
ioThread_ = std::thread{[this]() { ioContext_.run(); }};
open_ = true;