Add state data cache and successor table. Remove keys table

* Adds a cache for the most recently validated ledger state
* Replaces the keys table with successor table
* Adds support for ledger diffs in the database
* Removes BackendIndexer
This commit is contained in:
CJ Cobb
2021-11-19 16:10:50 -05:00
parent e930ff04df
commit c7e31aff56
35 changed files with 2801 additions and 1241 deletions

View File

@@ -69,10 +69,11 @@ target_sources(clio PRIVATE
## Backend ## Backend
src/backend/CassandraBackend.cpp src/backend/CassandraBackend.cpp
src/backend/PostgresBackend.cpp src/backend/PostgresBackend.cpp
src/backend/BackendIndexer.cpp
src/backend/BackendInterface.cpp src/backend/BackendInterface.cpp
src/backend/Pg.cpp src/backend/Pg.cpp
src/backend/DBHelpers.cpp src/backend/DBHelpers.cpp
src/backend/SimpleCache.cpp
src/backend/LayeredCache.cpp
## ETL ## ETL
src/etl/ETLSource.cpp src/etl/ETLSource.cpp
src/etl/ReportingETL.cpp src/etl/ReportingETL.cpp

View File

@@ -46,7 +46,6 @@ make_Backend(boost::json::object const& config)
backend->updateRange(rng->minSequence); backend->updateRange(rng->minSequence);
backend->updateRange(rng->maxSequence); backend->updateRange(rng->maxSequence);
} }
backend->checkFlagLedgers();
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << ": Constructed BackendInterface Successfully"; << __func__ << ": Constructed BackendInterface Successfully";

View File

@@ -1,241 +0,0 @@
#include <backend/BackendIndexer.h>
#include <backend/BackendInterface.h>
namespace Backend {
BackendIndexer::BackendIndexer(boost::json::object const& config)
: strand_(ioc_)
{
if (config.contains("indexer_key_shift"))
keyShift_ = config.at("indexer_key_shift").as_int64();
work_.emplace(ioc_);
ioThread_ = std::thread{[this]() { ioc_.run(); }};
};
BackendIndexer::~BackendIndexer()
{
work_.reset();
ioThread_.join();
}
void
BackendIndexer::addKey(ripple::uint256&& key)
{
keys.insert(std::move(key));
}
void
BackendIndexer::doKeysRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
auto rng = backend.fetchLedgerRange();
if (!rng)
return;
if (!sequence)
sequence = rng->maxSequence;
if (sequence < rng->minSequence)
sequence = rng->minSequence;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sequence = " << std::to_string(*sequence);
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
if (backend.isLedgerIndexed(*sequence))
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " - " << std::to_string(*sequence)
<< " flag ledger already written. returning";
return;
}
else
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " - " << std::to_string(*sequence)
<< " flag ledger not written. recursing..";
uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_;
doKeysRepair(backend, lower);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " - "
<< " sequence = " << std::to_string(*sequence)
<< " lower = " << std::to_string(lower)
<< " finished recursing. submitting repair ";
writeKeyFlagLedger(lower, backend);
return;
}
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " finished. sequence = " << std::to_string(*sequence);
}
void
BackendIndexer::doKeysRepairAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
boost::asio::post(strand_, [this, sequence, &backend]() {
doKeysRepair(backend, sequence);
});
}
void
BackendIndexer::writeKeyFlagLedger(
uint32_t ledgerSequence,
BackendInterface const& backend)
{
auto nextFlag = getKeyIndexOfSeq(ledgerSequence + 1);
uint32_t lower = ledgerSequence >> keyShift_ << keyShift_;
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - "
<< "next flag = " << std::to_string(nextFlag.keyIndex)
<< "lower = " << std::to_string(lower)
<< "ledgerSequence = " << std::to_string(ledgerSequence) << " starting";
ripple::uint256 zero = {};
std::optional<ripple::uint256> cursor;
size_t numKeys = 0;
auto begin = std::chrono::system_clock::now();
while (true)
{
try
{
{
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - checking for complete...";
if (backend.isLedgerIndexed(nextFlag.keyIndex))
{
BOOST_LOG_TRIVIAL(warning)
<< "writeKeyFlagLedger - "
<< "flag ledger already written. flag = "
<< std::to_string(nextFlag.keyIndex)
<< " , ledger sequence = "
<< std::to_string(ledgerSequence);
return;
}
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - is not complete";
}
indexing_ = nextFlag.keyIndex;
auto start = std::chrono::system_clock::now();
auto [objects, curCursor, warning] =
backend.fetchLedgerPage(cursor, lower, 2048);
auto mid = std::chrono::system_clock::now();
// no cursor means this is the first page
if (!cursor)
{
if (warning)
{
BOOST_LOG_TRIVIAL(error)
<< "writeKeyFlagLedger - "
<< " prev flag ledger not written "
<< std::to_string(nextFlag.keyIndex) << " : "
<< std::to_string(ledgerSequence);
assert(false);
throw std::runtime_error("Missing prev flag");
}
}
cursor = curCursor;
std::unordered_set<ripple::uint256> keys;
for (auto& obj : objects)
{
keys.insert(obj.key);
}
backend.writeKeys(keys, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex)
<< " fetched a page "
<< " cursor = "
<< (cursor.has_value() ? ripple::strHex(*cursor)
: std::string{})
<< " num keys = " << std::to_string(numKeys) << " fetch time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
mid - start)
.count()
<< " write time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
end - mid)
.count();
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
backend.writeKeys({zero}, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex)
<< " finished. "
<< " num keys = " << std::to_string(numKeys) << " total time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - begin)
.count();
indexing_ = 0;
}
void
BackendIndexer::writeKeyFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend)
{
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
boost::asio::post(strand_, [this, ledgerSequence, &backend]() {
writeKeyFlagLedger(ledgerSequence, backend);
});
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. sequence = " << std::to_string(ledgerSequence);
}
void
BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
auto keyIndex = getKeyIndexOfSeq(ledgerSequence);
if (isFirst_)
{
auto rng = backend.fetchLedgerRange();
if (rng && rng->minSequence != ledgerSequence)
isFirst_ = false;
else
{
keyIndex = KeyIndex{ledgerSequence};
}
}
backend.writeKeys(keys, keyIndex);
if (isFirst_)
{
// write completion record
ripple::uint256 zero = {};
backend.writeKeys({zero}, keyIndex);
// write next flag sychronously
keyIndex = getKeyIndexOfSeq(ledgerSequence + 1);
backend.writeKeys(keys, keyIndex);
backend.writeKeys({zero}, keyIndex);
}
isFirst_ = false;
keys = {};
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " finished. sequence = " << std::to_string(ledgerSequence);
}
} // namespace Backend

View File

@@ -1,104 +0,0 @@
#ifndef CLIO_BACKEND_INDEXER_H_INCLUDED
#define CLIO_BACKEND_INDEXER_H_INCLUDED
#include <ripple/basics/base_uint.h>
#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <mutex>
#include <optional>
#include <thread>
namespace std {
template <>
struct hash<ripple::uint256>
{
std::size_t
operator()(const ripple::uint256& k) const noexcept
{
return boost::hash_range(k.begin(), k.end());
}
};
} // namespace std
namespace Backend {
// The below two structs exist to prevent developers from accidentally mixing up
// the two indexes.
struct BookIndex
{
uint32_t bookIndex;
explicit BookIndex(uint32_t v) : bookIndex(v){};
};
struct KeyIndex
{
uint32_t keyIndex;
explicit KeyIndex(uint32_t v) : keyIndex(v){};
};
class BackendInterface;
class BackendIndexer
{
boost::asio::io_context ioc_;
boost::asio::io_context::strand strand_;
std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_;
std::atomic_uint32_t indexing_ = 0;
uint32_t keyShift_ = 20;
std::unordered_set<ripple::uint256> keys;
mutable bool isFirst_ = true;
void
doKeysRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
void
writeKeyFlagLedger(
uint32_t ledgerSequence,
BackendInterface const& backend);
public:
BackendIndexer(boost::json::object const& config);
~BackendIndexer();
void
addKey(ripple::uint256&& key);
void
finish(uint32_t ledgerSequence, BackendInterface const& backend);
void
writeKeyFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend);
void
doKeysRepairAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
uint32_t
getKeyShift()
{
return keyShift_;
}
std::optional<uint32_t>
getCurrentlyIndexing()
{
uint32_t cur = indexing_.load();
if (cur != 0)
return cur;
return {};
}
KeyIndex
getKeyIndexOfSeq(uint32_t seq) const
{
if (isKeyFlagLedger(seq))
return KeyIndex{seq};
auto incr = (1 << keyShift_);
KeyIndex index{(seq >> keyShift_ << keyShift_) + incr};
assert(isKeyFlagLedger(index.keyIndex));
return index;
}
bool
isKeyFlagLedger(uint32_t ledgerSequence) const
{
return (ledgerSequence % (1 << keyShift_)) == 0;
}
};
} // namespace Backend
#endif

View File

@@ -5,14 +5,9 @@ namespace Backend {
bool bool
BackendInterface::finishWrites(uint32_t ledgerSequence) BackendInterface::finishWrites(uint32_t ledgerSequence)
{ {
indexer_.finish(ledgerSequence, *this);
auto commitRes = doFinishWrites(); auto commitRes = doFinishWrites();
if (commitRes) if (commitRes)
{ {
if (isFirst_)
indexer_.doKeysRepairAsync(*this, ledgerSequence);
if (indexer_.isKeyFlagLedger(ledgerSequence))
indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this);
isFirst_ = false; isFirst_ = false;
updateRange(ledgerSequence); updateRange(ledgerSequence);
} }
@@ -25,28 +20,17 @@ BackendInterface::finishWrites(uint32_t ledgerSequence)
} }
return commitRes; return commitRes;
} }
bool
BackendInterface::isLedgerIndexed(std::uint32_t ledgerSequence) const
{
auto keyIndex = getKeyIndexOfSeq(ledgerSequence);
if (keyIndex)
{
auto page = doFetchLedgerPage({}, ledgerSequence, 1);
return !page.warning.has_value();
}
return false;
}
void void
BackendInterface::writeLedgerObject( BackendInterface::writeLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob) const std::string&& blob)
{ {
assert(key.size() == sizeof(ripple::uint256)); assert(key.size() == sizeof(ripple::uint256));
ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); ripple::uint256 key256 = ripple::uint256::fromVoid(key.data());
indexer_.addKey(std::move(key256));
doWriteLedgerObject(std::move(key), seq, std::move(blob)); doWriteLedgerObject(std::move(key), seq, std::move(blob));
} }
std::optional<LedgerRange> std::optional<LedgerRange>
BackendInterface::hardFetchLedgerRangeNoThrow() const BackendInterface::hardFetchLedgerRangeNoThrow() const
{ {
@@ -63,17 +47,96 @@ BackendInterface::hardFetchLedgerRangeNoThrow() const
} }
} }
} }
std::optional<KeyIndex> // *** state data methods
BackendInterface::getKeyIndexOfSeq(uint32_t seq) const std::optional<Blob>
BackendInterface::fetchLedgerObject(
ripple::uint256 const& key,
uint32_t sequence) const
{ {
if (indexer_.isKeyFlagLedger(seq)) auto obj = cache_.get(key, sequence);
return KeyIndex{seq}; if (obj)
auto rng = fetchLedgerRange(); {
if (!rng) BOOST_LOG_TRIVIAL(debug)
return {}; << __func__ << " - cache hit - " << ripple::strHex(key);
if (rng->minSequence == seq) return *obj;
return KeyIndex{seq}; }
return indexer_.getKeyIndexOfSeq(seq); else
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cache miss - " << ripple::strHex(key);
auto dbObj = doFetchLedgerObject(key, sequence);
if (!dbObj)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - missed cache and missed in db";
else
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - missed cache but found in db";
return dbObj;
}
}
std::vector<Blob>
BackendInterface::fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const
{
std::vector<Blob> results;
results.resize(keys.size());
std::vector<ripple::uint256> misses;
for (size_t i = 0; i < keys.size(); ++i)
{
auto obj = cache_.get(keys[i], sequence);
if (obj)
results[i] = *obj;
else
misses.push_back(keys[i]);
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cache hits = " << keys.size() - misses.size()
<< " - cache misses = " << misses.size();
if (misses.size())
{
auto objs = doFetchLedgerObjects(misses, sequence);
for (size_t i = 0, j = 0; i < results.size(); ++i)
{
if (results[i].size() == 0)
{
results[i] = objs[j];
++j;
}
}
}
return results;
}
// Fetches the successor to key/index
std::optional<ripple::uint256>
BackendInterface::fetchSuccessorKey(
ripple::uint256 key,
uint32_t ledgerSequence) const
{
auto succ = cache_.getSuccessor(key, ledgerSequence);
if (succ)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cache hit - " << ripple::strHex(key);
else
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cache miss - " << ripple::strHex(key);
return succ ? succ->key : doFetchSuccessorKey(key, ledgerSequence);
}
std::optional<LedgerObject>
BackendInterface::fetchSuccessorObject(
ripple::uint256 key,
uint32_t ledgerSequence) const
{
auto succ = fetchSuccessorKey(key, ledgerSequence);
if (succ)
{
auto obj = fetchLedgerObject(*succ, ledgerSequence);
assert(obj);
return {{*succ, *obj}};
}
return {};
} }
BookOffersPage BookOffersPage
BackendInterface::fetchBookOffers( BackendInterface::fetchBookOffers(
@@ -82,8 +145,8 @@ BackendInterface::fetchBookOffers(
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const std::optional<ripple::uint256> const& cursor) const
{ {
// TODO try to speed this up. This can take a few seconds. The goal is to // TODO try to speed this up. This can take a few seconds. The goal is
// get it down to a few hundred milliseconds. // to get it down to a few hundred milliseconds.
BookOffersPage page; BookOffersPage page;
const ripple::uint256 bookEnd = ripple::getQualityNext(book); const ripple::uint256 bookEnd = ripple::getQualityNext(book);
ripple::uint256 uTipIndex = book; ripple::uint256 uTipIndex = book;
@@ -101,7 +164,7 @@ BackendInterface::fetchBookOffers(
while (keys.size() < limit) while (keys.size() < limit)
{ {
auto mid1 = std::chrono::system_clock::now(); auto mid1 = std::chrono::system_clock::now();
auto offerDir = fetchSuccessor(uTipIndex, ledgerSequence); auto offerDir = fetchSuccessorObject(uTipIndex, ledgerSequence);
auto mid2 = std::chrono::system_clock::now(); auto mid2 = std::chrono::system_clock::now();
numSucc++; numSucc++;
succMillis += getMillis(mid2 - mid1); succMillis += getMillis(mid2 - mid1);
@@ -141,9 +204,10 @@ BackendInterface::fetchBookOffers(
auto objs = fetchLedgerObjects(keys, ledgerSequence); auto objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < keys.size() && i < limit; ++i) for (size_t i = 0; i < keys.size() && i < limit; ++i)
{ {
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " key = " << ripple::strHex(keys[i]) << __func__ << " key = " << ripple::strHex(keys[i])
<< " blob = " << ripple::strHex(objs[i]); << " blob = " << ripple::strHex(objs[i])
<< " ledgerSequence = " << ledgerSequence;
assert(objs[i].size()); assert(objs[i].size());
page.offers.push_back({keys[i], objs[i]}); page.offers.push_back({keys[i], objs[i]});
} }
@@ -166,22 +230,6 @@ BackendInterface::fetchBookOffers(
return page; return page;
} }
std::optional<LedgerObject>
BackendInterface::fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence)
const
{
auto start = std::chrono::system_clock::now();
auto page = fetchLedgerPage({++key}, ledgerSequence, 1, 512);
auto end = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " took " << std::to_string(ms) << " milliseconds";
if (page.objects.size())
return page.objects[0];
return {};
}
LedgerPage LedgerPage
BackendInterface::fetchLedgerPage( BackendInterface::fetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
@@ -189,160 +237,30 @@ BackendInterface::fetchLedgerPage(
std::uint32_t limit, std::uint32_t limit,
std::uint32_t limitHint) const std::uint32_t limitHint) const
{ {
assert(limit != 0);
bool incomplete = !isLedgerIndexed(ledgerSequence);
BOOST_LOG_TRIVIAL(debug) << __func__ << " incomplete = " << incomplete;
// really low limits almost always miss
uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4));
LedgerPage page; LedgerPage page;
page.cursor = cursor;
long totalTime = 0; std::vector<ripple::uint256> keys;
long maxTime = 5000; while (keys.size() < limit)
bool timedOut = false;
do
{ {
if (totalTime >= maxTime) ripple::uint256 const& curCursor =
{ keys.size() ? keys.back() : cursor ? *cursor : firstKey;
timedOut = true; auto succ = fetchSuccessorKey(curCursor, ledgerSequence);
if (!succ)
break; break;
} keys.push_back(std::move(*succ));
adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2;
auto start = std::chrono::system_clock::now();
auto partial =
doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit);
auto end = std::chrono::system_clock::now();
std::string pageCursorStr =
page.cursor ? ripple::strHex(*page.cursor) : "";
std::string partialCursorStr =
partial.cursor ? ripple::strHex(*partial.cursor) : "";
auto thisTime =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " " << std::to_string(ledgerSequence) << " "
<< std::to_string(adjustedLimit) << " " << pageCursorStr << " - "
<< partialCursorStr << " - time = " << std::to_string(thisTime);
totalTime += thisTime;
page.objects.insert(
page.objects.end(), partial.objects.begin(), partial.objects.end());
page.cursor = partial.cursor;
} while (page.objects.size() < limit && page.cursor);
if (incomplete)
{
auto rng = fetchLedgerRange();
if (!rng)
return page;
if (rng->minSequence == ledgerSequence)
{
BOOST_LOG_TRIVIAL(fatal)
<< __func__
<< " Database is populated but first flag ledger is "
"incomplete. This should never happen";
assert(false);
throw std::runtime_error("Missing base flag ledger");
}
uint32_t lowerSequence = (ledgerSequence - 1) >> indexer_.getKeyShift()
<< indexer_.getKeyShift();
if (lowerSequence < rng->minSequence)
lowerSequence = rng->minSequence;
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " recursing. ledgerSequence = " << std::to_string(ledgerSequence)
<< " , lowerSequence = " << std::to_string(lowerSequence);
auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit);
std::vector<ripple::uint256> keys;
std::transform(
std::move_iterator(lowerPage.objects.begin()),
std::move_iterator(lowerPage.objects.end()),
std::back_inserter(keys),
[](auto&& elt) { return std::move(elt.key); });
size_t upperPageSize = page.objects.size();
auto objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < keys.size(); ++i)
{
auto& obj = objs[i];
auto& key = keys[i];
if (obj.size())
page.objects.push_back({std::move(key), std::move(obj)});
}
std::sort(page.objects.begin(), page.objects.end(), [](auto a, auto b) {
return a.key < b.key;
});
if (page.objects.size() > limit)
page.objects.resize(limit);
if (timedOut)
{
if (page.cursor && lowerPage.cursor)
page.cursor =
std::min(page.cursor.value(), lowerPage.cursor.value());
else if (lowerPage.cursor)
page.cursor = lowerPage.cursor;
}
else if (page.objects.size() && page.objects.size() >= limit)
page.cursor = page.objects.back().key;
} }
auto objects = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < objects.size(); ++i)
{
assert(objects[i].size());
page.objects.push_back({std::move(keys[i]), std::move(objects[i])});
}
if (page.objects.size() >= limit)
page.cursor = page.objects.back().key;
return page; return page;
} }
void
BackendInterface::checkFlagLedgers() const
{
auto rng = hardFetchLedgerRangeNoThrow();
if (rng)
{
bool prevComplete = true;
uint32_t cur = rng->minSequence;
size_t numIncomplete = 0;
while (cur <= rng->maxSequence + 1)
{
auto keyIndex = getKeyIndexOfSeq(cur);
assert(keyIndex.has_value());
cur = keyIndex->keyIndex;
if (!isLedgerIndexed(cur))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " - flag ledger "
<< std::to_string(keyIndex->keyIndex) << " is incomplete";
++numIncomplete;
prevComplete = false;
}
else
{
if (!prevComplete)
{
BOOST_LOG_TRIVIAL(fatal)
<< __func__ << " - flag ledger "
<< std::to_string(keyIndex->keyIndex)
<< " is incomplete but the next is complete. This "
"should never happen";
assert(false);
throw std::runtime_error("missing prev flag ledger");
}
prevComplete = true;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " - flag ledger "
<< std::to_string(keyIndex->keyIndex) << " is complete";
}
cur = cur + 1;
}
if (numIncomplete > 1)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " " << std::to_string(numIncomplete)
<< " incomplete flag ledgers. "
"This can happen, but is unlikely. Check indexer_key_shift "
"in config";
}
else
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " number of incomplete flag ledgers = "
<< std::to_string(numIncomplete);
}
}
}
std::optional<ripple::Fees> std::optional<ripple::Fees>
BackendInterface::fetchFees(std::uint32_t seq) const BackendInterface::fetchFees(std::uint32_t seq) const
{ {

View File

@@ -2,67 +2,11 @@
#define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED #define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
#include <ripple/ledger/ReadView.h> #include <ripple/ledger/ReadView.h>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <backend/BackendIndexer.h>
#include <backend/DBHelpers.h> #include <backend/DBHelpers.h>
class ReportingETL; #include <backend/SimpleCache.h>
class AsyncCallData; #include <backend/Types.h>
class BackendTest_Basic_Test;
namespace Backend { namespace Backend {
// *** return types
using Blob = std::vector<unsigned char>;
struct LedgerObject
{
ripple::uint256 key;
Blob blob;
};
struct LedgerPage
{
std::vector<LedgerObject> objects;
std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
};
struct BookOffersPage
{
std::vector<LedgerObject> offers;
std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
};
struct TransactionAndMetadata
{
Blob transaction;
Blob metadata;
uint32_t ledgerSequence;
uint32_t date;
bool
operator==(const TransactionAndMetadata& other) const
{
return transaction == other.transaction && metadata == other.metadata &&
ledgerSequence == other.ledgerSequence && date == other.date;
}
};
struct AccountTransactionsCursor
{
uint32_t ledgerSequence;
uint32_t transactionIndex;
};
struct AccountTransactions
{
std::vector<TransactionAndMetadata> txns;
std::optional<AccountTransactionsCursor> cursor;
};
struct LedgerRange
{
uint32_t minSequence;
uint32_t maxSequence;
};
class DatabaseTimeout : public std::exception class DatabaseTimeout : public std::exception
{ {
const char* const char*
@@ -75,30 +19,37 @@ class DatabaseTimeout : public std::exception
class BackendInterface class BackendInterface
{ {
protected: protected:
mutable BackendIndexer indexer_; bool isFirst_ = true;
mutable bool isFirst_ = true; std::optional<LedgerRange> range;
mutable std::optional<LedgerRange> range; SimpleCache cache_;
public: public:
BackendInterface(boost::json::object const& config) : indexer_(config) BackendInterface(boost::json::object const& config)
{ {
} }
virtual ~BackendInterface() virtual ~BackendInterface()
{ {
} }
BackendIndexer&
getIndexer() const
{
return indexer_;
}
// *** public read methods *** // *** public read methods ***
// All of these reads methods can throw DatabaseTimeout. When writing code // All of these reads methods can throw DatabaseTimeout. When writing code
// in an RPC handler, this exception does not need to be caught: when an RPC // in an RPC handler, this exception does not need to be caught: when an RPC
// results in a timeout, an error is returned to the client // results in a timeout, an error is returned to the client
public: public:
// *** ledger methods // *** ledger methods
//
SimpleCache const&
cache() const
{
return cache_;
}
SimpleCache&
cache()
{
return cache_;
}
virtual std::optional<ripple::LedgerInfo> virtual std::optional<ripple::LedgerInfo>
fetchLedgerBySequence(uint32_t sequence) const = 0; fetchLedgerBySequence(uint32_t sequence) const = 0;
@@ -139,15 +90,25 @@ public:
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const = 0; fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const = 0;
// *** state data methods // *** state data methods
std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const;
virtual std::optional<Blob> std::vector<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;
virtual std::vector<Blob>
fetchLedgerObjects( fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const;
virtual std::optional<Blob>
doFetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const = 0;
virtual std::vector<Blob>
doFetchLedgerObjects(
std::vector<ripple::uint256> const& keys, std::vector<ripple::uint256> const& keys,
uint32_t sequence) const = 0; uint32_t sequence) const = 0;
virtual std::vector<LedgerObject>
fetchLedgerDiff(uint32_t ledgerSequence) const = 0;
// Fetches a page of ledger objects, ordered by key/index. // Fetches a page of ledger objects, ordered by key/index.
// Used by ledger_data // Used by ledger_data
LedgerPage LedgerPage
@@ -157,10 +118,15 @@ public:
std::uint32_t limit, std::uint32_t limit,
std::uint32_t limitHint = 0) const; std::uint32_t limitHint = 0) const;
// Fetches the successor to key/index. key need not actually be a valid // Fetches the successor to key/index
// key/index.
std::optional<LedgerObject> std::optional<LedgerObject>
fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence) const; fetchSuccessorObject(ripple::uint256 key, uint32_t ledgerSequence) const;
std::optional<ripple::uint256>
fetchSuccessorKey(ripple::uint256 key, uint32_t ledgerSequence) const;
// Fetches the successor to key/index
virtual std::optional<ripple::uint256>
doFetchSuccessorKey(ripple::uint256 key, uint32_t ledgerSequence) const = 0;
BookOffersPage BookOffersPage
fetchBookOffers( fetchBookOffers(
@@ -169,21 +135,6 @@ public:
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor = {}) const; std::optional<ripple::uint256> const& cursor = {}) const;
// Methods related to the indexer
bool
isLedgerIndexed(std::uint32_t ledgerSequence) const;
std::optional<KeyIndex>
getKeyIndexOfSeq(uint32_t seq) const;
// *** protected write methods
protected:
friend class ::ReportingETL;
friend class BackendIndexer;
friend class ::AsyncCallData;
friend std::shared_ptr<BackendInterface>
make_Backend(boost::json::object const& config);
friend class ::BackendTest_Basic_Test;
virtual std::optional<LedgerRange> virtual std::optional<LedgerRange>
hardFetchLedgerRange() const = 0; hardFetchLedgerRange() const = 0;
// Doesn't throw DatabaseTimeout. Should be used with care. // Doesn't throw DatabaseTimeout. Should be used with care.
@@ -203,11 +154,10 @@ protected:
writeLedger( writeLedger(
ripple::LedgerInfo const& ledgerInfo, ripple::LedgerInfo const& ledgerInfo,
std::string&& ledgerHeader, std::string&& ledgerHeader,
bool isFirst = false) const = 0; bool isFirst = false) = 0;
void void
writeLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) writeLedgerObject(std::string&& key, uint32_t seq, std::string&& blob);
const;
virtual void virtual void
writeTransaction( writeTransaction(
@@ -215,24 +165,20 @@ protected:
uint32_t seq, uint32_t seq,
uint32_t date, uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const = 0; std::string&& metadata) = 0;
virtual void virtual void
writeAccountTransactions( writeAccountTransactions(std::vector<AccountTransactionsData>&& data) = 0;
std::vector<AccountTransactionsData>&& data) const = 0;
// TODO: this function, or something similar, could be called internally by
// writeLedgerObject
virtual bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync = false) const = 0;
virtual void
writeSuccessor(
std::string&& key,
uint32_t seq,
std::string&& successor) = 0;
// Tell the database we are about to begin writing data for a particular // Tell the database we are about to begin writing data for a particular
// ledger. // ledger.
virtual void virtual void
startWrites() const = 0; startWrites() = 0;
// Tell the database we have finished writing all data for a particular // Tell the database we have finished writing all data for a particular
// ledger // ledger
@@ -254,21 +200,14 @@ protected:
// *** private helper methods // *** private helper methods
private: private:
virtual LedgerPage
doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const = 0;
virtual void virtual void
doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) doWriteLedgerObject(
const = 0; std::string&& key,
uint32_t seq,
std::string&& blob) = 0;
virtual bool virtual bool
doFinishWrites() const = 0; doFinishWrites() = 0;
void
checkFlagLedgers() const;
}; };
} // namespace Backend } // namespace Backend

View File

@@ -148,9 +148,19 @@ void
CassandraBackend::doWriteLedgerObject( CassandraBackend::doWriteLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob) const std::string&& blob)
{ {
BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
if (!isFirst_)
makeAndExecuteAsyncWrite(
this, std::move(std::make_tuple(seq, key)), [this](auto& params) {
auto& [sequence, key] = params.data;
CassandraStatement statement{insertDiff_};
statement.bindNextInt(sequence);
statement.bindNextBytes(key);
return statement;
});
makeAndExecuteAsyncWrite( makeAndExecuteAsyncWrite(
this, this,
std::move(std::make_tuple(std::move(key), seq, std::move(blob))), std::move(std::make_tuple(std::move(key), seq, std::move(blob))),
@@ -165,10 +175,34 @@ CassandraBackend::doWriteLedgerObject(
}); });
} }
void void
CassandraBackend::writeSuccessor(
std::string&& key,
uint32_t seq,
std::string&& successor)
{
BOOST_LOG_TRIVIAL(trace)
<< "Writing successor. key = " << key
<< " seq = " << std::to_string(seq) << " successor = " << successor;
assert(key.size());
assert(successor.size());
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(std::move(key), seq, std::move(successor))),
[this](auto& params) {
auto& [key, sequence, successor] = params.data;
CassandraStatement statement{insertSuccessor_};
statement.bindNextBytes(key);
statement.bindNextInt(sequence);
statement.bindNextBytes(successor);
return statement;
});
}
void
CassandraBackend::writeLedger( CassandraBackend::writeLedger(
ripple::LedgerInfo const& ledgerInfo, ripple::LedgerInfo const& ledgerInfo,
std::string&& header, std::string&& header,
bool isFirst) const bool isFirst)
{ {
makeAndExecuteAsyncWrite( makeAndExecuteAsyncWrite(
this, this,
@@ -195,7 +229,7 @@ CassandraBackend::writeLedger(
} }
void void
CassandraBackend::writeAccountTransactions( CassandraBackend::writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const std::vector<AccountTransactionsData>&& data)
{ {
for (auto& record : data) for (auto& record : data)
{ {
@@ -228,7 +262,7 @@ CassandraBackend::writeTransaction(
uint32_t seq, uint32_t seq,
uint32_t date, uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const std::string&& metadata)
{ {
BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra";
std::string hashCpy = hash; std::string hashCpy = hash;
@@ -517,89 +551,49 @@ CassandraBackend::fetchAccountTransactions(
} }
return {txns, {}}; return {txns, {}};
} }
std::optional<ripple::uint256>
LedgerPage CassandraBackend::doFetchSuccessorKey(
CassandraBackend::doFetchLedgerPage( ripple::uint256 key,
std::optional<ripple::uint256> const& cursorIn, uint32_t ledgerSequence) const
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{ {
std::optional<ripple::uint256> cursor = cursorIn; BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto index = getKeyIndexOfSeq(ledgerSequence); CassandraStatement statement{selectSuccessor_};
if (!index) statement.bindNextBytes(key);
return {}; statement.bindNextInt(ledgerSequence);
LedgerPage page;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " ledgerSequence = " << std::to_string(ledgerSequence)
<< " index = " << std::to_string(index->keyIndex);
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
CassandraStatement statement{selectKeys_};
statement.bindNextInt(index->keyIndex);
if (!cursor)
{
ripple::uint256 zero;
cursor = zero;
}
statement.bindNextBytes(cursor->data(), 1);
statement.bindNextBytes(*cursor);
statement.bindNextUInt(limit + 1);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!!result) if (!result)
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows";
<< __func__ << " - got keys - size = " << result.numRows(); return {};
std::vector<ripple::uint256> keys;
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
if (keys.size() && keys.size() >= limit)
{
page.cursor = keys.back();
++(*page.cursor);
}
else if (cursor->data()[0] != 0xFF)
{
ripple::uint256 zero;
zero.data()[0] = cursor->data()[0] + 1;
page.cursor = zero;
}
auto objects = fetchLedgerObjects(keys, ledgerSequence);
if (objects.size() != keys.size())
throw std::runtime_error("Mismatch in size of objects and keys");
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor);
for (size_t i = 0; i < objects.size(); ++i)
{
auto& obj = objects[i];
auto& key = keys[i];
if (obj.size())
{
page.objects.push_back({std::move(key), std::move(obj)});
}
}
if (!cursorIn && (!keys.size() || !keys[0].isZero()))
{
page.warning = "Data may be incomplete";
}
return page;
} }
if (!cursor) auto next = result.getUInt256();
return {{}, {}, "Data may be incomplete"}; if (next == lastKey)
return {};
return next;
}
std::optional<Blob>
CassandraBackend::doFetchLedgerObject(
ripple::uint256 const& key,
uint32_t sequence) const
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
CassandraStatement statement{selectObject_};
statement.bindNextBytes(key);
statement.bindNextInt(sequence);
CassandraResult result = executeSyncRead(statement);
if (!result)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows";
return {};
}
auto res = result.getBytes();
if (res.size())
return res;
return {}; return {};
} }
std::vector<Blob> std::vector<Blob>
CassandraBackend::fetchLedgerObjects( CassandraBackend::doFetchLedgerObjects(
std::vector<ripple::uint256> const& keys, std::vector<ripple::uint256> const& keys,
uint32_t sequence) const uint32_t sequence) const
{ {
@@ -638,63 +632,42 @@ CassandraBackend::fetchLedgerObjects(
<< "Fetched " << numKeys << " records from Cassandra"; << "Fetched " << numKeys << " records from Cassandra";
return results; return results;
} }
std::vector<LedgerObject>
bool CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
CassandraBackend::writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync) const
{ {
auto bind = [this](auto& params) { CassandraStatement statement{selectDiff_};
auto& [lgrSeq, key] = params.data; statement.bindNextInt(ledgerSequence);
CassandraStatement statement{insertKey_}; auto start = std::chrono::system_clock::now();
statement.bindNextInt(lgrSeq); CassandraResult result = executeSyncRead(statement);
statement.bindNextBytes(key.data(), 1); auto end = std::chrono::system_clock::now();
statement.bindNextBytes(key); if (!result)
return statement;
};
std::atomic_int numOutstanding = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<BulkWriteCallbackData<
std::pair<uint32_t, ripple::uint256>,
typename std::remove_reference<decltype(bind)>::type>>>
cbs;
cbs.reserve(keys.size());
uint32_t concurrentLimit =
isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Ledger = " << std::to_string(index.keyIndex)
<< " . num keys = " << std::to_string(keys.size())
<< " . concurrentLimit = "
<< std::to_string(indexerMaxRequestsOutstanding);
uint32_t numSubmitted = 0;
for (auto& key : keys)
{ {
cbs.push_back(makeAndExecuteBulkAsyncWrite( BOOST_LOG_TRIVIAL(error)
this, << __func__
std::make_pair(index.keyIndex, std::move(key)), << " - no rows . ledger = " << std::to_string(ledgerSequence);
bind, return {};
numOutstanding,
mtx,
cv));
++numOutstanding;
++numSubmitted;
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numOutstanding, concurrentLimit, &keys]() {
// keys.size() - i is number submitted. keys.size() -
// numRemaining is number completed Difference is num
// outstanding
return numOutstanding < concurrentLimit;
});
if (numSubmitted % 100000 == 0)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Submitted " << std::to_string(numSubmitted);
} }
std::vector<ripple::uint256> keys;
std::unique_lock<std::mutex> lck(mtx); do
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); {
return true; keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << keys.size() << " diff hashes from Cassandra in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count()
<< " milliseconds";
auto objs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results;
std::transform(
keys.begin(),
keys.end(),
objs.begin(),
std::back_inserter(results),
[](auto const& k, auto const& o) {
return LedgerObject{k, o};
});
return results;
} }
bool bool
@@ -971,17 +944,8 @@ CassandraBackend::open(bool readOnly)
cass_cluster_set_connect_timeout(cluster, 10000); cass_cluster_set_connect_timeout(cluster, 10000);
int ttl = getInt("ttl") ? *getInt("ttl") * 2 : 0; int ttl = getInt("ttl") ? *getInt("ttl") * 2 : 0;
int keysTtl = (ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0);
int incr = keysTtl;
while (keysTtl < ttl)
{
keysTtl += incr;
}
int booksTtl = 0;
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " setting ttl to " << std::to_string(ttl) << __func__ << " setting ttl to " << std::to_string(ttl);
<< " , books ttl to " << std::to_string(booksTtl) << " , keys ttl to "
<< std::to_string(keysTtl);
auto executeSimpleStatement = [this](std::string const& query) { auto executeSimpleStatement = [this](std::string const& query) {
CassStatement* statement = makeStatement(query.c_str(), 0); CassStatement* statement = makeStatement(query.c_str(), 0);
@@ -1093,16 +1057,28 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "successor"
<< " ( sequence bigint, first_byte blob, key blob, PRIMARY KEY " << " (key blob, seq bigint, next blob, PRIMARY KEY (key, seq)) "
"((sequence,first_byte), key))"
" WITH default_time_to_live = " " WITH default_time_to_live = "
<< std::to_string(keysTtl); << std::to_string(ttl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
query.str(""); query.str("");
query << "SELECT * FROM " << tablePrefix << "keys" query << "SELECT * FROM " << tablePrefix << "successor"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "diff"
<< " (seq bigint, key blob, PRIMARY KEY (seq, key)) "
" WITH default_time_to_live = "
<< std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "diff"
<< " LIMIT 1"; << " LIMIT 1";
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1191,16 +1167,27 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "INSERT INTO " << tablePrefix << "keys" query << "INSERT INTO " << tablePrefix << "successor"
<< " (sequence,first_byte, key) VALUES (?, ?, ?)"; << " (key,seq,next) VALUES (?, ?, ?)";
if (!insertKey_.prepareStatement(query, session_.get())) if (!insertSuccessor_.prepareStatement(query, session_.get()))
continue; continue;
query.str(""); query.str("");
query << "SELECT key FROM " << tablePrefix << "keys" query << "INSERT INTO " << tablePrefix << "diff"
<< " WHERE sequence = ? AND first_byte = ? AND key >= ? ORDER BY " << " (seq,key) VALUES (?, ?)";
"key ASC LIMIT ?"; if (!insertDiff_.prepareStatement(query, session_.get()))
if (!selectKeys_.prepareStatement(query, session_.get())) continue;
query.str("");
query << "SELECT next FROM " << tablePrefix << "successor"
<< " WHERE key = ? AND seq <= ? ORDER BY seq DESC LIMIT 1";
if (!selectSuccessor_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT key FROM " << tablePrefix << "diff"
<< " WHERE seq = ?";
if (!selectDiff_.prepareStatement(query, session_.get()))
continue; continue;
query.str(""); query.str("");

View File

@@ -553,8 +553,10 @@ private:
CassandraPreparedStatement selectLedgerPage_; CassandraPreparedStatement selectLedgerPage_;
CassandraPreparedStatement upperBound2_; CassandraPreparedStatement upperBound2_;
CassandraPreparedStatement getToken_; CassandraPreparedStatement getToken_;
CassandraPreparedStatement insertKey_; CassandraPreparedStatement insertSuccessor_;
CassandraPreparedStatement selectKeys_; CassandraPreparedStatement selectSuccessor_;
CassandraPreparedStatement insertDiff_;
CassandraPreparedStatement selectDiff_;
CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement selectAccountTx_;
CassandraPreparedStatement selectAccountTxForward_; CassandraPreparedStatement selectAccountTxForward_;
@@ -640,7 +642,7 @@ public:
std::optional<AccountTransactionsCursor> const& cursor) const override; std::optional<AccountTransactionsCursor> const& cursor) const override;
bool bool
doFinishWrites() const override doFinishWrites() override
{ {
// wait for all other writes to finish // wait for all other writes to finish
sync(); sync();
@@ -672,7 +674,7 @@ public:
writeLedger( writeLedger(
ripple::LedgerInfo const& ledgerInfo, ripple::LedgerInfo const& ledgerInfo,
std::string&& header, std::string&& header,
bool isFirst = false) const override; bool isFirst = false) override;
std::optional<uint32_t> std::optional<uint32_t>
fetchLatestLedgerSequence() const override fetchLatestLedgerSequence() const override
@@ -737,24 +739,8 @@ public:
// Synchronously fetch the object with key key, as of ledger with sequence // Synchronously fetch the object with key key, as of ledger with sequence
// sequence // sequence
std::optional<Blob> std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) doFetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const override const override;
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
CassandraStatement statement{selectObject_};
statement.bindNextBytes(key);
statement.bindNextInt(sequence);
CassandraResult result = executeSyncRead(statement);
if (!result)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows";
return {};
}
auto res = result.getBytes();
if (res.size())
return res;
return {};
}
std::optional<int64_t> std::optional<int64_t>
getToken(void const* key) const getToken(void const* key) const
@@ -793,34 +779,33 @@ public:
result.getUInt32(), result.getUInt32(),
result.getUInt32()}}; result.getUInt32()}};
} }
LedgerPage std::optional<ripple::uint256>
doFetchLedgerPage( doFetchSuccessorKey(ripple::uint256 key, uint32_t ledgerSequence)
std::optional<ripple::uint256> const& cursor, const override;
std::uint32_t ledgerSequence,
std::uint32_t limit) const override;
bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync = false) const override;
std::vector<TransactionAndMetadata> std::vector<TransactionAndMetadata>
fetchTransactions( fetchTransactions(
std::vector<ripple::uint256> const& hashes) const override; std::vector<ripple::uint256> const& hashes) const override;
std::vector<Blob> std::vector<Blob>
fetchLedgerObjects( doFetchLedgerObjects(
std::vector<ripple::uint256> const& keys, std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override; uint32_t sequence) const override;
std::vector<LedgerObject>
fetchLedgerDiff(uint32_t ledgerSequence) const override;
void void
doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob)
const override; override;
void
writeSuccessor(std::string&& key, uint32_t seq, std::string&& successor)
override;
void void
writeAccountTransactions( writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const override; std::vector<AccountTransactionsData>&& data) override;
void void
writeTransaction( writeTransaction(
@@ -828,10 +813,10 @@ public:
uint32_t seq, uint32_t seq,
uint32_t date, uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const override; std::string&& metadata) override;
void void
startWrites() const override startWrites() override
{ {
} }

View File

@@ -1,10 +1,13 @@
#ifndef RIPPLE_APP_REPORTING_DBHELPERS_H_INCLUDED #ifndef CLIO_BACKEND_DBHELPERS_H_INCLUDED
#define RIPPLE_APP_REPORTING_DBHELPERS_H_INCLUDED #define CLIO_BACKEND_DBHELPERS_H_INCLUDED
#include <ripple/basics/Log.h> #include <ripple/basics/Log.h>
#include <ripple/protocol/SField.h>
#include <ripple/protocol/STAccount.h>
#include <ripple/protocol/TxMeta.h> #include <ripple/protocol/TxMeta.h>
#include <boost/container/flat_set.hpp> #include <boost/container/flat_set.hpp>
#include <backend/Pg.h> #include <backend/Pg.h>
#include <backend/Types.h>
/// Struct used to keep track of what to write to transactions and /// Struct used to keep track of what to write to transactions and
/// account_transactions tables in Postgres /// account_transactions tables in Postgres
@@ -48,7 +51,24 @@ isOfferHex(T const& object)
} }
return false; return false;
} }
template <class T>
inline bool
isDirNode(T const& object)
{
short spaceKey = (object.data()[1] << 8) | object.data()[2];
return spaceKey == 0x0064;
}
template <class T, class R>
inline bool
isBookDir(T const& key, R const& object)
{
if (!isDirNode(object))
return false;
ripple::STLedgerEntry const sle{
ripple::SerialIter{object.data(), object.size()}, key};
return !sle[~ripple::sfOwner].has_value();
}
template <class T> template <class T>
inline ripple::uint256 inline ripple::uint256
getBook(T const& offer) getBook(T const& offer)
@@ -59,6 +79,19 @@ getBook(T const& offer)
return book; return book;
} }
template <class T>
inline ripple::uint256
getBookBase(T const& key)
{
assert(key.size() == ripple::uint256::size());
ripple::uint256 ret;
for (size_t i = 0; i < 24; ++i)
{
ret.data()[i] = key.data()[i];
}
return ret;
}
inline ripple::LedgerInfo inline ripple::LedgerInfo
deserializeHeader(ripple::Slice data) deserializeHeader(ripple::Slice data)
{ {
@@ -82,4 +115,9 @@ deserializeHeader(ripple::Slice data)
return info; return info;
} }
inline std::string
uint256ToString(ripple::uint256 const& uint)
{
return {reinterpret_cast<const char*>(uint.data()), uint.size()};
}
#endif #endif

View File

@@ -0,0 +1,110 @@
#include <backend/LayeredCache.h>
namespace Backend {
void
LayeredCache::insert(
ripple::uint256 const& key,
Blob const& value,
uint32_t seq)
{
auto entry = map_[key];
// stale insert, do nothing
if (seq <= entry.recent.seq)
return;
entry.old = entry.recent;
entry.recent = {seq, value};
if (value.empty())
pendingDeletes_.push_back(key);
if (!entry.old.blob.empty())
pendingSweeps_.push_back(key);
}
std::optional<Blob>
LayeredCache::select(CacheEntry const& entry, uint32_t seq) const
{
if (seq < entry.old.seq)
return {};
if (seq < entry.recent.seq && !entry.old.blob.empty())
return entry.old.blob;
if (!entry.recent.blob.empty())
return entry.recent.blob;
return {};
}
void
LayeredCache::update(std::vector<LedgerObject> const& blobs, uint32_t seq)
{
std::unique_lock lck{mtx_};
if (seq > mostRecentSequence_)
mostRecentSequence_ = seq;
for (auto const& k : pendingSweeps_)
{
auto e = map_[k];
e.old = {};
}
for (auto const& k : pendingDeletes_)
{
map_.erase(k);
}
for (auto const& b : blobs)
{
insert(b.key, b.blob, seq);
}
}
std::optional<LedgerObject>
LayeredCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
{
ripple::uint256 curKey = key;
while (true)
{
std::shared_lock lck{mtx_};
if (seq < mostRecentSequence_ - 1)
return {};
auto e = map_.upper_bound(curKey);
if (e == map_.end())
return {};
auto const& entry = e->second;
auto blob = select(entry, seq);
if (!blob)
{
curKey = e->first;
continue;
}
else
return {{e->first, *blob}};
}
}
std::optional<LedgerObject>
LayeredCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const
{
ripple::uint256 curKey = key;
std::shared_lock lck{mtx_};
while (true)
{
if (seq < mostRecentSequence_ - 1)
return {};
auto e = map_.lower_bound(curKey);
--e;
if (e == map_.begin())
return {};
auto const& entry = e->second;
auto blob = select(entry, seq);
if (!blob)
{
curKey = e->first;
continue;
}
else
return {{e->first, *blob}};
}
}
std::optional<Blob>
LayeredCache::get(ripple::uint256 const& key, uint32_t seq) const
{
std::shared_lock lck{mtx_};
auto e = map_.find(key);
if (e == map_.end())
return {};
auto const& entry = e->second;
return select(entry, seq);
}
} // namespace Backend

View File

@@ -0,0 +1,73 @@
#ifndef CLIO_LAYEREDCACHE_H_INCLUDED
#define CLIO_LAYEREDCACHE_H_INCLUDED
#include <ripple/basics/base_uint.h>
#include <backend/Types.h>
#include <map>
#include <mutex>
#include <shared_mutex>
#include <utility>
#include <vector>
namespace Backend {
class LayeredCache
{
struct SeqBlobPair
{
uint32_t seq;
Blob blob;
};
struct CacheEntry
{
SeqBlobPair recent;
SeqBlobPair old;
};
std::map<ripple::uint256, CacheEntry> map_;
std::vector<ripple::uint256> pendingDeletes_;
std::vector<ripple::uint256> pendingSweeps_;
mutable std::shared_mutex mtx_;
uint32_t mostRecentSequence_;
void
insert(ripple::uint256 const& key, Blob const& value, uint32_t seq);
/*
void
insert(ripple::uint256 const& key, Blob const& value, uint32_t seq)
{
map_.emplace(key,{{seq,value,{}});
}
void
update(ripple::uint256 const& key, Blob const& value, uint32_t seq)
{
auto& entry = map_.find(key);
entry.old = entry.recent;
entry.recent = {seq, value};
pendingSweeps_.push_back(key);
}
void
erase(ripple::uint256 const& key, uint32_t seq)
{
update(key, {}, seq);
pendingDeletes_.push_back(key);
}
*/
std::optional<Blob>
select(CacheEntry const& entry, uint32_t seq) const;
public:
void
update(std::vector<LedgerObject> const& blobs, uint32_t seq);
std::optional<Blob>
get(ripple::uint256 const& key, uint32_t seq) const;
std::optional<LedgerObject>
getSuccessor(ripple::uint256 const& key, uint32_t seq) const;
std::optional<LedgerObject>
getPredecessor(ripple::uint256 const& key, uint32_t seq) const;
};
} // namespace Backend
#endif

View File

@@ -773,6 +773,8 @@ CREATE TABLE IF NOT EXISTS objects (
CREATE INDEX objects_idx ON objects USING btree(key,ledger_seq); CREATE INDEX objects_idx ON objects USING btree(key,ledger_seq);
CREATE INDEX diff ON objects USING hash(ledger_seq);
create table if not exists objects1 partition of objects for values from (0) to (10000000); create table if not exists objects1 partition of objects for values from (0) to (10000000);
create table if not exists objects2 partition of objects for values from (10000000) to (20000000); create table if not exists objects2 partition of objects for values from (10000000) to (20000000);
create table if not exists objects3 partition of objects for values from (20000000) to (30000000); create table if not exists objects3 partition of objects for values from (20000000) to (30000000);
@@ -824,10 +826,11 @@ create table if not exists account_transactions6 partition of account_transactio
create table if not exists account_transactions7 partition of account_transactions for values from (60000000) to (70000000); create table if not exists account_transactions7 partition of account_transactions for values from (60000000) to (70000000);
CREATE TABLE IF NOT EXISTS keys ( CREATE TABLE IF NOT EXISTS successor (
ledger_seq bigint NOT NULL, key bytea NOT NULL,
key bytea NOT NULL, ledger_seq bigint NOT NULL,
PRIMARY KEY(ledger_seq, key) next bytea NOT NULL,
PRIMARY KEY(key, ledger_seq)
); );

View File

@@ -17,7 +17,7 @@ void
PostgresBackend::writeLedger( PostgresBackend::writeLedger(
ripple::LedgerInfo const& ledgerInfo, ripple::LedgerInfo const& ledgerInfo,
std::string&& ledgerHeader, std::string&& ledgerHeader,
bool isFirst) const bool isFirst)
{ {
auto cmd = boost::format( auto cmd = boost::format(
R"(INSERT INTO ledgers R"(INSERT INTO ledgers
@@ -38,7 +38,7 @@ PostgresBackend::writeLedger(
void void
PostgresBackend::writeAccountTransactions( PostgresBackend::writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const std::vector<AccountTransactionsData>&& data)
{ {
if (abortWrite_) if (abortWrite_)
return; return;
@@ -60,7 +60,7 @@ void
PostgresBackend::doWriteLedgerObject( PostgresBackend::doWriteLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob) const std::string&& blob)
{ {
if (abortWrite_) if (abortWrite_)
return; return;
@@ -81,13 +81,34 @@ PostgresBackend::doWriteLedgerObject(
} }
} }
void
PostgresBackend::writeSuccessor(
std::string&& key,
uint32_t seq,
std::string&& successor)
{
successorBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(successor) << '\n';
numRowsInSuccessorBuffer_++;
if (numRowsInSuccessorBuffer_ % writeInterval_ == 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Flushing large buffer. num successors = "
<< numRowsInSuccessorBuffer_;
writeConnection_.bulkInsert("successor", successorBuffer_.str());
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
successorBuffer_.str("");
}
}
void void
PostgresBackend::writeTransaction( PostgresBackend::writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
uint32_t date, uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const std::string&& metadata)
{ {
if (abortWrite_) if (abortWrite_)
return; return;
@@ -247,7 +268,7 @@ PostgresBackend::hardFetchLedgerRange() const
} }
std::optional<Blob> std::optional<Blob>
PostgresBackend::fetchLedgerObject( PostgresBackend::doFetchLedgerObject(
ripple::uint256 const& key, ripple::uint256 const& key,
uint32_t sequence) const uint32_t sequence) const
{ {
@@ -337,56 +358,27 @@ PostgresBackend::fetchAllTransactionHashesInLedger(
} }
return {}; return {};
} }
std::optional<ripple::uint256>
LedgerPage PostgresBackend::doFetchSuccessorKey(
PostgresBackend::doFetchLedgerPage( ripple::uint256 key,
std::optional<ripple::uint256> const& cursor, uint32_t ledgerSequence) const
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{ {
auto index = getKeyIndexOfSeq(ledgerSequence);
if (!index)
return {};
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000"); pgQuery("SET statement_timeout TO 10000");
std::stringstream sql; std::stringstream sql;
sql << "SELECT key FROM keys WHERE ledger_seq = " sql << "SELECT next FROM successor WHERE key = "
<< std::to_string(index->keyIndex); << "\'\\x" << ripple::strHex(key) << "\'"
if (cursor) << " AND ledger_seq <= " << std::to_string(ledgerSequence)
sql << " AND key >= \'\\x" << ripple::strHex(*cursor) << "\'"; << " ORDER BY ledger_seq DESC LIMIT 1";
sql << " ORDER BY key ASC LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched keys"; if (checkResult(res, 1))
std::optional<ripple::uint256> returnCursor;
if (size_t numRows = checkResult(res, 1))
{ {
std::vector<ripple::uint256> keys; auto next = res.asUInt256(0, 0);
for (size_t i = 0; i < numRows; ++i) if (next == lastKey)
{ return {};
keys.push_back({res.asUInt256(i, 0)}); return next;
}
if (numRows >= limit)
{
returnCursor = keys.back();
++(*returnCursor);
}
auto objs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results;
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size())
{
results.push_back({keys[i], objs[i]});
}
}
if (!cursor && !keys[0].isZero())
return {results, returnCursor, "Data may be incomplete"};
return {results, returnCursor};
} }
if (!cursor)
return {{}, {}, "Data may be incomplete"};
return {}; return {};
} }
@@ -481,7 +473,7 @@ PostgresBackend::fetchTransactions(
} }
std::vector<Blob> std::vector<Blob>
PostgresBackend::fetchLedgerObjects( PostgresBackend::doFetchLedgerObjects(
std::vector<ripple::uint256> const& keys, std::vector<ripple::uint256> const& keys,
uint32_t sequence) const uint32_t sequence) const
{ {
@@ -529,6 +521,27 @@ PostgresBackend::fetchLedgerObjects(
<< " objects with threadpool. took " << std::to_string(duration); << " objects with threadpool. took " << std::to_string(duration);
return results; return results;
} }
std::vector<LedgerObject>
PostgresBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT key,object FROM objects "
"WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence);
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 4))
{
std::vector<LedgerObject> objects;
for (size_t i = 0; i < numRows; ++i)
{
objects.push_back({res.asUInt256(i, 0), res.asUnHexedBlob(i, 1)});
}
return objects;
}
return {};
}
AccountTransactions AccountTransactions
PostgresBackend::fetchAccountTransactions( PostgresBackend::fetchAccountTransactions(
@@ -621,7 +634,7 @@ PostgresBackend::close()
} }
void void
PostgresBackend::startWrites() const PostgresBackend::startWrites()
{ {
numRowsInObjectsBuffer_ = 0; numRowsInObjectsBuffer_ = 0;
abortWrite_ = false; abortWrite_ = false;
@@ -635,7 +648,7 @@ PostgresBackend::startWrites() const
} }
bool bool
PostgresBackend::doFinishWrites() const PostgresBackend::doFinishWrites()
{ {
if (!abortWrite_) if (!abortWrite_)
{ {
@@ -649,6 +662,9 @@ PostgresBackend::doFinishWrites() const
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " objects size = " << objectsStr.size() << __func__ << " objects size = " << objectsStr.size()
<< " txns size = " << txStr.size(); << " txns size = " << txStr.size();
std::string successorStr = successorBuffer_.str();
if (successorStr.size())
writeConnection_.bulkInsert("successor", successorStr);
} }
auto res = writeConnection_("COMMIT"); auto res = writeConnection_("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK) if (!res || res.status() != PGRES_COMMAND_OK)
@@ -661,76 +677,14 @@ PostgresBackend::doFinishWrites() const
transactionsBuffer_.clear(); transactionsBuffer_.clear();
objectsBuffer_.str(""); objectsBuffer_.str("");
objectsBuffer_.clear(); objectsBuffer_.clear();
successorBuffer_.str("");
successorBuffer_.clear();
accountTxBuffer_.str(""); accountTxBuffer_.str("");
accountTxBuffer_.clear(); accountTxBuffer_.clear();
numRowsInObjectsBuffer_ = 0; numRowsInObjectsBuffer_ = 0;
return !abortWrite_; return !abortWrite_;
} }
bool
PostgresBackend::writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync) const
{
if (abortWrite_)
return false;
PgQuery pgQuery(pgPool_);
PgQuery& conn = isAsync ? pgQuery : writeConnection_;
std::stringstream sql;
size_t numRows = 0;
for (auto& key : keys)
{
numRows++;
sql << "INSERT INTO keys (ledger_seq, key) VALUES ("
<< std::to_string(index.keyIndex) << ", \'\\x"
<< ripple::strHex(key) << "\') ON CONFLICT DO NOTHING; ";
if (numRows > 10000)
{
conn(sql.str().c_str());
sql.str("");
sql.clear();
numRows = 0;
}
}
if (numRows > 0)
conn(sql.str().c_str());
return true;
/*
BOOST_LOG_TRIVIAL(debug) << __func__;
std::condition_variable cv;
std::mutex mtx;
std::atomic_uint numRemaining = keys.size();
auto start = std::chrono::system_clock::now();
for (auto& key : keys)
{
boost::asio::post(
pool_, [this, key, &numRemaining, &cv, &mtx, &index]() {
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "INSERT INTO keys (ledger_seq, key) VALUES ("
<< std::to_string(index.keyIndex) << ", \'\\x"
<< ripple::strHex(key) << "\') ON CONFLICT DO NOTHING";
auto res = pgQuery(sql.str().data());
if (--numRemaining == 0)
{
std::unique_lock lck(mtx);
cv.notify_one();
}
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
BOOST_LOG_TRIVIAL(info)
<< __func__ << " wrote " << std::to_string(keys.size())
<< " keys with threadpool. took " << std::to_string(duration);
*/
return true;
}
bool bool
PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
{ {

View File

@@ -9,6 +9,8 @@ class PostgresBackend : public BackendInterface
private: private:
mutable size_t numRowsInObjectsBuffer_ = 0; mutable size_t numRowsInObjectsBuffer_ = 0;
mutable std::stringstream objectsBuffer_; mutable std::stringstream objectsBuffer_;
mutable size_t numRowsInSuccessorBuffer_ = 0;
mutable std::stringstream successorBuffer_;
mutable std::stringstream keysBuffer_; mutable std::stringstream keysBuffer_;
mutable std::stringstream transactionsBuffer_; mutable std::stringstream transactionsBuffer_;
mutable std::stringstream accountTxBuffer_; mutable std::stringstream accountTxBuffer_;
@@ -31,7 +33,7 @@ public:
fetchLedgerByHash(ripple::uint256 const& hash) const override; fetchLedgerByHash(ripple::uint256 const& hash) const override;
std::optional<Blob> std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) doFetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const override; const override;
// returns a transaction, metadata pair // returns a transaction, metadata pair
@@ -44,21 +46,22 @@ public:
std::vector<ripple::uint256> std::vector<ripple::uint256>
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override; fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override;
std::vector<LedgerObject>
fetchLedgerDiff(uint32_t ledgerSequence) const override;
std::optional<LedgerRange> std::optional<LedgerRange>
hardFetchLedgerRange() const override; hardFetchLedgerRange() const override;
LedgerPage std::optional<ripple::uint256>
doFetchLedgerPage( doFetchSuccessorKey(ripple::uint256 key, uint32_t ledgerSequence)
std::optional<ripple::uint256> const& cursor, const override;
std::uint32_t ledgerSequence,
std::uint32_t limit) const override;
std::vector<TransactionAndMetadata> std::vector<TransactionAndMetadata>
fetchTransactions( fetchTransactions(
std::vector<ripple::uint256> const& hashes) const override; std::vector<ripple::uint256> const& hashes) const override;
std::vector<Blob> std::vector<Blob>
fetchLedgerObjects( doFetchLedgerObjects(
std::vector<ripple::uint256> const& keys, std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override; uint32_t sequence) const override;
@@ -73,11 +76,15 @@ public:
writeLedger( writeLedger(
ripple::LedgerInfo const& ledgerInfo, ripple::LedgerInfo const& ledgerInfo,
std::string&& ledgerHeader, std::string&& ledgerHeader,
bool isFirst) const override; bool isFirst) override;
void void
doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob)
const override; override;
void
writeSuccessor(std::string&& key, uint32_t seq, std::string&& successor)
override;
void void
writeTransaction( writeTransaction(
@@ -85,11 +92,11 @@ public:
uint32_t seq, uint32_t seq,
uint32_t date, uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const override; std::string&& metadata) override;
void void
writeAccountTransactions( writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const override; std::vector<AccountTransactionsData>&& data) override;
void void
open(bool readOnly) override; open(bool readOnly) override;
@@ -98,18 +105,13 @@ public:
close() override; close() override;
void void
startWrites() const override; startWrites() override;
bool bool
doFinishWrites() const override; doFinishWrites() override;
bool bool
doOnlineDelete(uint32_t numLedgersToKeep) const override; doOnlineDelete(uint32_t numLedgersToKeep) const override;
bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync = false) const override;
}; };
} // namespace Backend } // namespace Backend
#endif #endif

View File

@@ -0,0 +1,96 @@
#include <backend/SimpleCache.h>
namespace Backend {
void
SimpleCache::update(
std::vector<LedgerObject> const& objs,
uint32_t seq,
bool isBackground)
{
std::unique_lock lck{mtx_};
if (seq > latestSeq_)
{
assert(seq == latestSeq_ + 1 || latestSeq_ == 0);
latestSeq_ = seq;
}
for (auto const& obj : objs)
{
if (obj.blob.size())
{
if (isBackground && deletes_.count(obj.key))
continue;
auto& e = map_[obj.key];
if (seq > e.seq)
{
e = {seq, obj.blob};
}
}
else
{
map_.erase(obj.key);
if (!full_ && !isBackground)
deletes_.insert(obj.key);
}
}
}
std::optional<LedgerObject>
SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
{
if (!full_)
return {};
std::shared_lock{mtx_};
if (seq != latestSeq_)
return {};
auto e = map_.upper_bound(key);
if (e == map_.end())
return {};
return {{e->first, e->second.blob}};
}
std::optional<LedgerObject>
SimpleCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const
{
if (!full_)
return {};
std::shared_lock lck{mtx_};
if (seq != latestSeq_)
return {};
auto e = map_.lower_bound(key);
if (e == map_.begin())
return {};
--e;
return {{e->first, e->second.blob}};
}
std::optional<Blob>
SimpleCache::get(ripple::uint256 const& key, uint32_t seq) const
{
if (seq > latestSeq_)
return {};
std::shared_lock lck{mtx_};
auto e = map_.find(key);
if (e == map_.end())
return {};
if (seq < e->second.seq)
return {};
return {e->second.blob};
}
void
SimpleCache::setFull()
{
full_ = true;
std::unique_lock lck{mtx_};
deletes_.clear();
}
bool
SimpleCache::isFull()
{
return full_;
}
size_t
SimpleCache::size()
{
std::shared_lock lck{mtx_};
return map_.size();
}
} // namespace Backend

60
src/backend/SimpleCache.h Normal file
View File

@@ -0,0 +1,60 @@
#ifndef CLIO_SIMPLECACHE_H_INCLUDED
#define CLIO_SIMPLECACHE_H_INCLUDED
#include <ripple/basics/base_uint.h>
#include <ripple/basics/hardened_hash.h>
#include <backend/Types.h>
#include <map>
#include <mutex>
#include <shared_mutex>
#include <utility>
#include <vector>
namespace Backend {
class SimpleCache
{
struct CacheEntry
{
uint32_t seq = 0;
Blob blob;
};
std::map<ripple::uint256, CacheEntry> map_;
mutable std::shared_mutex mtx_;
uint32_t latestSeq_ = 0;
std::atomic_bool full_ = false;
// temporary set to prevent background thread from writing already deleted
// data. not used when cache is full
std::unordered_set<ripple::uint256, ripple::hardened_hash<>> deletes_;
public:
// Update the cache with new ledger objects
// set isBackground to true when writing old data from a background thread
void
update(
std::vector<LedgerObject> const& blobs,
uint32_t seq,
bool isBackground = false);
std::optional<Blob>
get(ripple::uint256 const& key, uint32_t seq) const;
// always returns empty optional if isFull() is false
std::optional<LedgerObject>
getSuccessor(ripple::uint256 const& key, uint32_t seq) const;
// always returns empty optional if isFull() is false
std::optional<LedgerObject>
getPredecessor(ripple::uint256 const& key, uint32_t seq) const;
void
setFull();
// whether the cache has all data for the most recent ledger
bool
isFull();
size_t
size();
};
} // namespace Backend
#endif

75
src/backend/Types.h Normal file
View File

@@ -0,0 +1,75 @@
#ifndef CLIO_TYPES_H_INCLUDED
#define CLIO_TYPES_H_INCLUDED
#include <ripple/basics/base_uint.h>
#include <optional>
#include <string>
#include <vector>
namespace Backend {
// *** return types
using Blob = std::vector<unsigned char>;
struct LedgerObject
{
ripple::uint256 key;
Blob blob;
bool
operator==(const LedgerObject& other) const
{
return key == other.key && blob == other.blob;
}
};
struct LedgerPage
{
std::vector<LedgerObject> objects;
std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
};
struct BookOffersPage
{
std::vector<LedgerObject> offers;
std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
};
struct TransactionAndMetadata
{
Blob transaction;
Blob metadata;
uint32_t ledgerSequence;
uint32_t date;
bool
operator==(const TransactionAndMetadata& other) const
{
return transaction == other.transaction && metadata == other.metadata &&
ledgerSequence == other.ledgerSequence && date == other.date;
}
};
struct AccountTransactionsCursor
{
uint32_t ledgerSequence;
uint32_t transactionIndex;
};
struct AccountTransactions
{
std::vector<TransactionAndMetadata> txns;
std::optional<AccountTransactionsCursor> cursor;
};
struct LedgerRange
{
uint32_t minSequence;
uint32_t maxSequence;
};
constexpr ripple::uint256 firstKey{
"0000000000000000000000000000000000000000000000000000000000000000"};
constexpr ripple::uint256 lastKey{
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"};
constexpr ripple::uint256 hi192{
"0000000000000000000000000000000000000000000000001111111111111111"};
} // namespace Backend
#endif

View File

@@ -6,6 +6,7 @@
#include <boost/json.hpp> #include <boost/json.hpp>
#include <boost/json/src.hpp> #include <boost/json/src.hpp>
#include <boost/log/trivial.hpp> #include <boost/log/trivial.hpp>
#include <backend/DBHelpers.h>
#include <etl/ETLSource.h> #include <etl/ETLSource.h>
#include <etl/ReportingETL.h> #include <etl/ReportingETL.h>
@@ -28,7 +29,6 @@ ETLSourceImpl<Derived>::ETLSourceImpl(
, subscriptions_(subscriptions) , subscriptions_(subscriptions)
, balancer_(balancer) , balancer_(balancer)
{ {
if (config.contains("ip")) if (config.contains("ip"))
{ {
auto ipJs = config.at("ip").as_string(); auto ipJs = config.at("ip").as_string();
@@ -72,12 +72,12 @@ ETLSourceImpl<Derived>::reconnect(boost::beast::error_code ec)
// when the timer is cancelled. connection_refused will occur repeatedly // when the timer is cancelled. connection_refused will occur repeatedly
std::string err = ec.message(); std::string err = ec.message();
// if we cannot connect to the transaction processing process // if we cannot connect to the transaction processing process
if (ec.category() == boost::asio::error::get_ssl_category()) { if (ec.category() == boost::asio::error::get_ssl_category())
err = std::string(" (") {
+boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value()))+"," err = std::string(" (") +
+boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value()))+") " boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value())) + "," +
; boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value())) + ") ";
//ERR_PACK /* crypto/err/err.h */ // ERR_PACK /* crypto/err/err.h */
char buf[128]; char buf[128];
::ERR_error_string_n(ec.value(), buf, sizeof(buf)); ::ERR_error_string_n(ec.value(), buf, sizeof(buf));
err += buf; err += buf;
@@ -173,20 +173,18 @@ SslETLSource::close(bool startAgain)
{ {
ws_ = std::make_unique<boost::beast::websocket::stream< ws_ = std::make_unique<boost::beast::websocket::stream<
boost::beast::ssl_stream< boost::beast::ssl_stream<
boost::beast::tcp_stream>>>( boost::beast::tcp_stream>>>(
boost::asio::make_strand(ioc_), *sslCtx_); boost::asio::make_strand(ioc_), *sslCtx_);
run(); run();
} }
}); });
} }
else if (startAgain) else if (startAgain)
{ {
ws_ = std::make_unique<boost::beast::websocket::stream< ws_ = std::make_unique<boost::beast::websocket::stream<
boost::beast::ssl_stream< boost::beast::ssl_stream<boost::beast::tcp_stream>>>(
boost::beast::tcp_stream>>>( boost::asio::make_strand(ioc_), *sslCtx_);
boost::asio::make_strand(ioc_), *sslCtx_);
run(); run();
} }
@@ -208,10 +206,12 @@ ETLSourceImpl<Derived>::onResolve(
} }
else else
{ {
boost::beast::get_lowest_layer(derived().ws()).expires_after( boost::beast::get_lowest_layer(derived().ws())
std::chrono::seconds(30)); .expires_after(std::chrono::seconds(30));
boost::beast::get_lowest_layer(derived().ws()).async_connect( boost::beast::get_lowest_layer(derived().ws())
results, [this](auto ec, auto ep) { derived().onConnect(ec, ep); }); .async_connect(results, [this](auto ec, auto ep) {
derived().onConnect(ec, ep);
});
} }
} }
@@ -240,20 +240,22 @@ PlainETLSource::onConnect(
boost::beast::role_type::client)); boost::beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake // Set a decorator to change the User-Agent of the handshake
derived().ws().set_option(boost::beast::websocket::stream_base::decorator( derived().ws().set_option(
[](boost::beast::websocket::request_type& req) { boost::beast::websocket::stream_base::decorator(
req.set( [](boost::beast::websocket::request_type& req) {
boost::beast::http::field::user_agent, req.set(
std::string(BOOST_BEAST_VERSION_STRING) + boost::beast::http::field::user_agent,
" websocket-client-async"); std::string(BOOST_BEAST_VERSION_STRING) +
})); " websocket-client-async");
}));
// Update the host_ string. This will provide the value of the // Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake. // Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4 // See https://tools.ietf.org/html/rfc7230#section-5.4
auto host = ip_ + ':' + std::to_string(endpoint.port()); auto host = ip_ + ':' + std::to_string(endpoint.port());
// Perform the websocket handshake // Perform the websocket handshake
derived().ws().async_handshake(host, "/", [this](auto ec) { onHandshake(ec); }); derived().ws().async_handshake(
host, "/", [this](auto ec) { onHandshake(ec); });
} }
} }
@@ -282,13 +284,14 @@ SslETLSource::onConnect(
boost::beast::role_type::client)); boost::beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake // Set a decorator to change the User-Agent of the handshake
derived().ws().set_option(boost::beast::websocket::stream_base::decorator( derived().ws().set_option(
[](boost::beast::websocket::request_type& req) { boost::beast::websocket::stream_base::decorator(
req.set( [](boost::beast::websocket::request_type& req) {
boost::beast::http::field::user_agent, req.set(
std::string(BOOST_BEAST_VERSION_STRING) + boost::beast::http::field::user_agent,
" websocket-client-async"); std::string(BOOST_BEAST_VERSION_STRING) +
})); " websocket-client-async");
}));
// Update the host_ string. This will provide the value of the // Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake. // Host HTTP header during the WebSocket handshake.
@@ -334,20 +337,22 @@ ETLSourceImpl<Derived>::onHandshake(boost::beast::error_code ec)
{ {
boost::json::object jv{ boost::json::object jv{
{"command", "subscribe"}, {"command", "subscribe"},
{"streams", {"ledger", "manifests", "validations", "transactions_proposed"} {"streams",
}}; {"ledger", "manifests", "validations", "transactions_proposed"}}};
std::string s = boost::json::serialize(jv); std::string s = boost::json::serialize(jv);
BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message"; BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message";
// Send the message // Send the message
derived().ws().async_write(boost::asio::buffer(s), [this](auto ec, size_t size) { derived().ws().async_write(
onWrite(ec, size); boost::asio::buffer(s),
}); [this](auto ec, size_t size) { onWrite(ec, size); });
} }
} }
template <class Derived> template <class Derived>
void void
ETLSourceImpl<Derived>::onWrite(boost::beast::error_code ec, size_t bytesWritten) ETLSourceImpl<Derived>::onWrite(
boost::beast::error_code ec,
size_t bytesWritten)
{ {
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : ec = " << ec << " - " << toString(); << __func__ << " : ec = " << ec << " - " << toString();
@@ -382,7 +387,7 @@ ETLSourceImpl<Derived>::onRead(boost::beast::error_code ec, size_t size)
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : calling async_read - " << toString(); << __func__ << " : calling async_read - " << toString();
derived().ws().async_read( derived().ws().async_read(
readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); }); readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
} }
} }
@@ -417,7 +422,7 @@ ETLSourceImpl<Derived>::handleMessage()
{ {
boost::json::string const& validatedLedgers = boost::json::string const& validatedLedgers =
result["validated_ledgers"].as_string(); result["validated_ledgers"].as_string();
setValidatedRange( setValidatedRange(
{validatedLedgers.c_str(), validatedLedgers.size()}); {validatedLedgers.c_str(), validatedLedgers.size()});
} }
@@ -428,8 +433,7 @@ ETLSourceImpl<Derived>::handleMessage()
<< toString(); << toString();
} }
else if ( else if (
response.contains("type") && response.contains("type") && response["type"] == "ledgerClosed")
response["type"] == "ledgerClosed")
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
@@ -500,13 +504,15 @@ class AsyncCallData
std::unique_ptr<grpc::ClientContext> context_; std::unique_ptr<grpc::ClientContext> context_;
grpc::Status status_; grpc::Status status_;
unsigned char nextPrefix_; unsigned char nextPrefix_;
std::string lastKey_;
public: public:
AsyncCallData( AsyncCallData(
uint32_t seq, uint32_t seq,
ripple::uint256& marker, ripple::uint256 const& marker,
std::optional<ripple::uint256> nextMarker) std::optional<ripple::uint256> const& nextMarker)
{ {
request_.mutable_ledger()->set_sequence(seq); request_.mutable_ledger()->set_sequence(seq);
if (marker.isNonZero()) if (marker.isNonZero())
@@ -523,7 +529,8 @@ public:
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< "Setting up AsyncCallData. marker = " << ripple::strHex(marker) << "Setting up AsyncCallData. marker = " << ripple::strHex(marker)
<< " . prefix = " << ripple::strHex(std::string(1, prefix)) << " . prefix = " << ripple::strHex(std::string(1, prefix))
<< " . nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_)); << " . nextPrefix_ = "
<< ripple::strHex(std::string(1, nextPrefix_));
assert(nextPrefix_ > prefix || nextPrefix_ == 0x00); assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
@@ -539,10 +546,12 @@ public:
process( process(
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq, grpc::CompletionQueue& cq,
BackendInterface const& backend, BackendInterface& backend,
bool abort = false) bool abort,
bool cacheOnly = false)
{ {
BOOST_LOG_TRIVIAL(debug) << "Processing calldata"; BOOST_LOG_TRIVIAL(trace) << "Processing response. "
<< "Marker prefix = " << getMarkerPrefix();
if (abort) if (abort)
{ {
BOOST_LOG_TRIVIAL(error) << "AsyncCallData aborted"; BOOST_LOG_TRIVIAL(error) << "AsyncCallData aborted";
@@ -550,9 +559,10 @@ public:
} }
if (!status_.ok()) if (!status_.ok())
{ {
BOOST_LOG_TRIVIAL(debug) << "AsyncCallData status_ not ok: " BOOST_LOG_TRIVIAL(debug)
<< " code = " << status_.error_code() << "AsyncCallData status_ not ok: "
<< " message = " << status_.error_message(); << " code = " << status_.error_code()
<< " message = " << status_.error_message();
return CallStatus::ERRORED; return CallStatus::ERRORED;
} }
if (!next_->is_unlimited()) if (!next_->is_unlimited())
@@ -560,7 +570,6 @@ public:
BOOST_LOG_TRIVIAL(warning) BOOST_LOG_TRIVIAL(warning)
<< "AsyncCallData is_unlimited is false. Make sure " << "AsyncCallData is_unlimited is false. Make sure "
"secure_gateway is set correctly at the ETL source"; "secure_gateway is set correctly at the ETL source";
assert(false);
} }
std::swap(cur_, next_); std::swap(cur_, next_);
@@ -583,13 +592,32 @@ public:
call(stub, cq); call(stub, cq);
} }
for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects())) BOOST_LOG_TRIVIAL(trace) << "Writing objects";
std::vector<Backend::LedgerObject> cacheUpdates;
cacheUpdates.reserve(cur_->ledger_objects().objects_size());
for (int i = 0; i < cur_->ledger_objects().objects_size(); ++i)
{ {
backend.writeLedgerObject( auto& obj = *(cur_->mutable_ledger_objects()->mutable_objects(i));
std::move(*obj.mutable_key()), cacheUpdates.push_back(
request_.ledger().sequence(), {*ripple::uint256::fromVoidChecked(obj.key()),
std::move(*obj.mutable_data())); {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
if (!cacheOnly)
{
if (lastKey_.size())
backend.writeSuccessor(
std::move(lastKey_),
request_.ledger().sequence(),
std::string{obj.key()});
lastKey_ = obj.key();
backend.writeLedgerObject(
std::move(*obj.mutable_key()),
request_.ledger().sequence(),
std::move(*obj.mutable_data()));
}
} }
backend.cache().update(
cacheUpdates, request_.ledger().sequence(), cacheOnly);
BOOST_LOG_TRIVIAL(trace) << "Wrote objects";
return more ? CallStatus::MORE : CallStatus::DONE; return more ? CallStatus::MORE : CallStatus::DONE;
} }
@@ -618,13 +646,20 @@ public:
else else
return ripple::strHex(std::string{next_->marker().data()[0]}); return ripple::strHex(std::string{next_->marker().data()[0]});
} }
std::string
getLastKey()
{
return lastKey_;
}
}; };
template <class Derived> template <class Derived>
bool bool
ETLSourceImpl<Derived>::loadInitialLedger( ETLSourceImpl<Derived>::loadInitialLedger(
uint32_t sequence, uint32_t sequence,
uint32_t numMarkers) uint32_t numMarkers,
bool cacheOnly)
{ {
if (!stub_) if (!stub_)
return false; return false;
@@ -647,15 +682,17 @@ ETLSourceImpl<Derived>::loadInitialLedger(
} }
BOOST_LOG_TRIVIAL(debug) << "Starting data download for ledger " << sequence BOOST_LOG_TRIVIAL(debug) << "Starting data download for ledger " << sequence
<< ". Using source = " << toString(); << ". Using source = " << toString();
for (auto& c : calls) for (auto& c : calls)
c.call(stub_, cq); c.call(stub_, cq);
size_t numFinished = 0; size_t numFinished = 0;
bool abort = false; bool abort = false;
while (numFinished < calls.size() && size_t incr = 500000;
cq.Next(&tag, &ok)) size_t progress = incr;
std::vector<std::string> edgeKeys;
while (numFinished < calls.size() && cq.Next(&tag, &ok))
{ {
assert(tag); assert(tag);
@@ -669,29 +706,105 @@ ETLSourceImpl<Derived>::loadInitialLedger(
} }
else else
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(trace)
<< "Marker prefix = " << ptr->getMarkerPrefix(); << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, cq, *backend_, abort); auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly);
if (result != AsyncCallData::CallStatus::MORE) if (result != AsyncCallData::CallStatus::MORE)
{ {
numFinished++; numFinished++;
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< "Finished a marker. " << "Finished a marker. "
<< "Current number of finished = " << numFinished; << "Current number of finished = " << numFinished;
edgeKeys.push_back(ptr->getLastKey());
} }
if (result == AsyncCallData::CallStatus::ERRORED) if (result == AsyncCallData::CallStatus::ERRORED)
{ {
abort = true; abort = true;
} }
if (backend_->cache().size() > progress)
{
BOOST_LOG_TRIVIAL(info)
<< "Downloaded " << backend_->cache().size()
<< " records from rippled";
progress += incr;
}
}
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " - finished loadInitialLedger. cache size = "
<< backend_->cache().size();
size_t numWrites = 0;
if (!abort)
{
backend_->cache().setFull();
if (!cacheOnly)
{
auto start = std::chrono::system_clock::now();
for (auto& key : edgeKeys)
{
auto succ = backend_->cache().getSuccessor(
*ripple::uint256::fromVoidChecked(key), sequence);
if (succ)
backend_->writeSuccessor(
std::move(key), sequence, uint256ToString(succ->key));
}
ripple::uint256 prev = Backend::firstKey;
while (auto cur = backend_->cache().getSuccessor(prev, sequence))
{
assert(cur);
if (prev == Backend::firstKey)
backend_->writeSuccessor(
uint256ToString(prev),
sequence,
uint256ToString(cur->key));
if (isBookDir(cur->key, cur->blob))
{
auto base = getBookBase(cur->key);
auto succ = backend_->cache().getSuccessor(base, sequence);
assert(succ);
if (succ->key == cur->key)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Writing book successor = "
<< ripple::strHex(base) << " - "
<< ripple::strHex(cur->key);
backend_->writeSuccessor(
uint256ToString(base),
sequence,
uint256ToString(cur->key));
}
++numWrites;
}
prev = std::move(cur->key);
if (numWrites % 100000 == 0 && numWrites != 0)
BOOST_LOG_TRIVIAL(info) << __func__ << " Wrote "
<< numWrites << " book successors";
}
backend_->writeSuccessor(
uint256ToString(prev),
sequence,
uint256ToString(Backend::lastKey));
++numWrites;
auto end = std::chrono::system_clock::now();
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(end - start)
.count();
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " - Looping through cache and submitting all writes took "
<< seconds
<< " seconds. numWrites = " << std::to_string(numWrites);
} }
} }
return !abort; return !abort;
} }
template <class Derived> template <class Derived>
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse> std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
ETLSourceImpl<Derived>::fetchLedger(uint32_t ledgerSequence, bool getObjects) ETLSourceImpl<Derived>::fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors)
{ {
org::xrpl::rpc::v1::GetLedgerResponse response; org::xrpl::rpc::v1::GetLedgerResponse response;
if (!stub_) if (!stub_)
@@ -704,6 +817,7 @@ ETLSourceImpl<Derived>::fetchLedger(uint32_t ledgerSequence, bool getObjects)
request.set_transactions(true); request.set_transactions(true);
request.set_expand(true); request.set_expand(true);
request.set_get_objects(getObjects); request.set_get_objects(getObjects);
request.set_get_object_neighbors(getObjectNeighbors);
request.set_user("ETL"); request.set_user("ETL");
grpc::Status status = stub_->GetLedger(&context, request, &response); grpc::Status status = stub_->GetLedger(&context, request, &response);
if (status.ok() && !response.is_unlimited()) if (status.ok() && !response.is_unlimited())
@@ -713,8 +827,9 @@ ETLSourceImpl<Derived>::fetchLedger(uint32_t ledgerSequence, bool getObjects)
"false. Make sure secure_gateway is set " "false. Make sure secure_gateway is set "
"correctly on the ETL source. source = " "correctly on the ETL source. source = "
<< toString() << " status = " << status.error_message(); << toString() << " status = " << status.error_message();
assert(false);
} }
// BOOST_LOG_TRIVIAL(debug)
// << __func__ << " Message size = " << response.ByteSizeLong();
return {status, std::move(response)}; return {status, std::move(response)};
} }
@@ -726,8 +841,7 @@ ETLLoadBalancer::ETLLoadBalancer(
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl) std::shared_ptr<NetworkValidatedLedgers> nwvl)
{ {
if (config.contains("num_markers") && if (config.contains("num_markers") && config.at("num_markers").is_int64())
config.at("num_markers").is_int64())
{ {
downloadRanges_ = config.at("num_markers").as_int64(); downloadRanges_ = config.at("num_markers").as_int64();
@@ -752,12 +866,12 @@ ETLLoadBalancer::ETLLoadBalancer(
} }
void void
ETLLoadBalancer::loadInitialLedger(uint32_t sequence) ETLLoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly)
{ {
execute( execute(
[this, &sequence](auto& source) { [this, &sequence, cacheOnly](auto& source) {
bool res = bool res =
source->loadInitialLedger(sequence, downloadRanges_); source->loadInitialLedger(sequence, downloadRanges_, cacheOnly);
if (!res) if (!res)
{ {
BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger." BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger."
@@ -770,13 +884,17 @@ ETLLoadBalancer::loadInitialLedger(uint32_t sequence)
} }
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects) ETLLoadBalancer::fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors)
{ {
org::xrpl::rpc::v1::GetLedgerResponse response; org::xrpl::rpc::v1::GetLedgerResponse response;
bool success = execute( bool success = execute(
[&response, ledgerSequence, getObjects, this](auto& source) { [&response, ledgerSequence, getObjects, getObjectNeighbors, this](
auto [status, data] = auto& source) {
source->fetchLedger(ledgerSequence, getObjects); auto [status, data] = source->fetchLedger(
ledgerSequence, getObjects, getObjectNeighbors);
response = std::move(data); response = std::move(data);
if (status.ok() && (response.validated() || true)) if (status.ok() && (response.validated() || true))
{ {
@@ -866,7 +984,8 @@ ETLSourceImpl<Derived>::getRippledForwardingStub() const
template <class Derived> template <class Derived>
std::optional<boost::json::object> std::optional<boost::json::object>
ETLSourceImpl<Derived>::forwardToRippled(boost::json::object const& request) const ETLSourceImpl<Derived>::forwardToRippled(
boost::json::object const& request) const
{ {
BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. "
<< "request = " << boost::json::serialize(request); << "request = " << boost::json::serialize(request);

View File

@@ -25,7 +25,6 @@ class SubscriptionManager;
/// class forwards transactions received on the transactions_proposed streams to /// class forwards transactions received on the transactions_proposed streams to
/// any subscribers. /// any subscribers.
class ETLSource class ETLSource
{ {
public: public:
@@ -45,12 +44,16 @@ public:
hasLedger(uint32_t sequence) const = 0; hasLedger(uint32_t sequence) const = 0;
virtual std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse> virtual std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects = true) = 0; fetchLedger(
uint32_t ledgerSequence,
bool getObjects = true,
bool getObjectNeighbors = false) = 0;
virtual bool virtual bool
loadInitialLedger( loadInitialLedger(
uint32_t sequence, uint32_t sequence,
std::uint32_t numMarkers) = 0; std::uint32_t numMarkers,
bool cacheOnly = false) = 0;
virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getRippledForwardingStub() const = 0; getRippledForwardingStub() const = 0;
@@ -58,8 +61,7 @@ public:
virtual std::optional<boost::json::object> virtual std::optional<boost::json::object>
forwardToRippled(boost::json::object const& request) const = 0; forwardToRippled(boost::json::object const& request) const = 0;
virtual virtual ~ETLSource()
~ETLSource()
{ {
} }
}; };
@@ -135,7 +137,6 @@ protected:
} }
public: public:
~ETLSourceImpl() ~ETLSourceImpl()
{ {
close(false); close(false);
@@ -249,7 +250,10 @@ public:
/// and the prior one /// and the prior one
/// @return the extracted data and the result status /// @return the extracted data and the result status
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse> std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects = true) override; fetchLedger(
uint32_t ledgerSequence,
bool getObjects = true,
bool getObjectNeighbors = false) override;
std::string std::string
toString() const override toString() const override
@@ -284,7 +288,8 @@ public:
bool bool
loadInitialLedger( loadInitialLedger(
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t numMarkers) override; std::uint32_t numMarkers,
bool cacheOnly = false) override;
/// Attempt to reconnect to the ETL source /// Attempt to reconnect to the ETL source
void void
@@ -300,7 +305,8 @@ public:
virtual void virtual void
onConnect( onConnect(
boost::beast::error_code ec, boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) = 0; boost::asio::ip::tcp::resolver::results_type::endpoint_type
endpoint) = 0;
/// Callback /// Callback
void void
@@ -328,7 +334,6 @@ public:
forwardToRippled(boost::json::object const& request) const override; forwardToRippled(boost::json::object const& request) const override;
}; };
class PlainETLSource : public ETLSourceImpl<PlainETLSource> class PlainETLSource : public ETLSourceImpl<PlainETLSource>
{ {
std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>> std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>>
@@ -344,15 +349,16 @@ public:
ETLLoadBalancer& balancer) ETLLoadBalancer& balancer)
: ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer) : ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer)
, ws_(std::make_unique< , ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::tcp_stream>>( boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc))) boost::asio::make_strand(ioc)))
{ {
} }
void void
onConnect( onConnect(
boost::beast::error_code ec, boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) override; boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
override;
/// Close the websocket /// Close the websocket
/// @param startAgain whether to reconnect /// @param startAgain whether to reconnect
@@ -370,8 +376,9 @@ class SslETLSource : public ETLSourceImpl<SslETLSource>
{ {
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx_; std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx_;
std::unique_ptr<boost::beast::websocket::stream<boost::beast::ssl_stream< std::unique_ptr<boost::beast::websocket::stream<
boost::beast::tcp_stream>>> ws_; boost::beast::ssl_stream<boost::beast::tcp_stream>>>
ws_;
public: public:
SslETLSource( SslETLSource(
@@ -384,17 +391,18 @@ public:
ETLLoadBalancer& balancer) ETLLoadBalancer& balancer)
: ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer) : ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer)
, sslCtx_(sslCtx) , sslCtx_(sslCtx)
, ws_(std::make_unique< , ws_(std::make_unique<boost::beast::websocket::stream<
boost::beast::websocket::stream<boost::beast::ssl_stream< boost::beast::ssl_stream<boost::beast::tcp_stream>>>(
boost::beast::tcp_stream>>>( boost::asio::make_strand(ioc_),
boost::asio::make_strand(ioc_), *sslCtx_)) *sslCtx_))
{ {
} }
void void
onConnect( onConnect(
boost::beast::error_code ec, boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) override; boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
override;
void void
onSslHandshake( onSslHandshake(
@@ -406,54 +414,53 @@ public:
void void
close(bool startAgain); close(bool startAgain);
boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>& boost::beast::websocket::stream<
boost::beast::ssl_stream<boost::beast::tcp_stream>>&
ws() ws()
{ {
return *ws_; return *ws_;
} }
}; };
namespace ETL {
namespace ETL static std::unique_ptr<ETLSource>
make_ETLSource(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
ETLLoadBalancer& balancer)
{ {
static std::unique_ptr<ETLSource> std::unique_ptr<ETLSource> src = nullptr;
make_ETLSource( if (sslCtx)
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
ETLLoadBalancer& balancer)
{ {
std::unique_ptr<ETLSource> src = nullptr; src = std::make_unique<SslETLSource>(
if (sslCtx) config,
{ ioContext,
src = std::make_unique<SslETLSource>( sslCtx,
config, backend,
ioContext, subscriptions,
sslCtx, networkValidatedLedgers,
backend, balancer);
subscriptions,
networkValidatedLedgers,
balancer);
}
else
{
src = std::make_unique<PlainETLSource>(
config,
ioContext,
backend,
subscriptions,
networkValidatedLedgers,
balancer);
}
src->run();
return src;
} }
else
{
src = std::make_unique<PlainETLSource>(
config,
ioContext,
backend,
subscriptions,
networkValidatedLedgers,
balancer);
}
src->run();
return src;
} }
} // namespace ETL
/// This class is used to manage connections to transaction processing processes /// This class is used to manage connections to transaction processing processes
/// This class spawns a listener for each etl source, which listens to messages /// This class spawns a listener for each etl source, which listens to messages
@@ -487,12 +494,7 @@ public:
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers) std::shared_ptr<NetworkValidatedLedgers> validatedLedgers)
{ {
return std::make_shared<ETLLoadBalancer>( return std::make_shared<ETLLoadBalancer>(
config, config, ioc, sslCtx, backend, subscriptions, validatedLedgers);
ioc,
sslCtx,
backend,
subscriptions,
validatedLedgers);
} }
~ETLLoadBalancer() ~ETLLoadBalancer()
@@ -503,7 +505,7 @@ public:
/// Load the initial ledger, writing data to the queue /// Load the initial ledger, writing data to the queue
/// @param sequence sequence of ledger to download /// @param sequence sequence of ledger to download
void void
loadInitialLedger(uint32_t sequence); loadInitialLedger(uint32_t sequence, bool cacheOnly = false);
/// Fetch data for a specific ledger. This function will continuously try /// Fetch data for a specific ledger. This function will continuously try
/// to fetch data for the specified ledger until the fetch succeeds, the /// to fetch data for the specified ledger until the fetch succeeds, the
@@ -515,7 +517,10 @@ public:
/// was found in the database or the server is shutting down, the optional /// was found in the database or the server is shutting down, the optional
/// will be empty /// will be empty
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects); fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors);
/// Determine whether messages received on the transactions_proposed stream /// Determine whether messages received on the transactions_proposed stream
/// should be forwarded to subscribing clients. The server subscribes to /// should be forwarded to subscribing clients. The server subscribes to

View File

@@ -10,8 +10,8 @@
#include <cstdlib> #include <cstdlib>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <variant>
#include <subscriptions/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
#include <variant>
namespace detail { namespace detail {
/// Convenience function for printing out basic ledger info /// Convenience function for printing out basic ledger info
@@ -130,6 +130,12 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Publishing ledger " << std::to_string(lgrInfo.seq); << __func__ << " - Publishing ledger " << std::to_string(lgrInfo.seq);
if (!writing_)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - Updating cache";
auto diff = backend_->fetchLedgerDiff(lgrInfo.seq);
backend_->cache().update(diff, lgrInfo.seq);
}
backend_->updateRange(lgrInfo.seq); backend_->updateRange(lgrInfo.seq);
auto ledgerRange = backend_->fetchLedgerRange(); auto ledgerRange = backend_->fetchLedgerRange();
@@ -229,7 +235,7 @@ ReportingETL::fetchLedgerData(uint32_t idx)
<< "Attempting to fetch ledger with sequence = " << idx; << "Attempting to fetch ledger with sequence = " << idx;
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response = std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
loadBalancer_->fetchLedger(idx, false); loadBalancer_->fetchLedger(idx, false, false);
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< "GetLedger reply = " << response->DebugString(); << "GetLedger reply = " << response->DebugString();
return response; return response;
@@ -243,7 +249,7 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx)
<< "Attempting to fetch ledger with sequence = " << idx; << "Attempting to fetch ledger with sequence = " << idx;
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response = std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
loadBalancer_->fetchLedger(idx, true); loadBalancer_->fetchLedger(idx, true, !backend_->cache().isFull());
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< "GetLedger reply = " << response->DebugString(); << "GetLedger reply = " << response->DebugString();
return response; return response;
@@ -269,13 +275,205 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote ledger header"; << "wrote ledger header";
// Write successor info, if included from rippled
if (rawData.object_neighbors_included())
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " object neighbors included";
for (auto& obj : *(rawData.mutable_book_successors()))
{
auto firstBook = std::move(*obj.mutable_first_book());
if (!firstBook.size())
firstBook = uint256ToString(Backend::lastKey);
backend_->writeSuccessor(
std::move(*obj.mutable_book_base()),
lgrInfo.seq,
std::move(firstBook));
BOOST_LOG_TRIVIAL(debug) << __func__ << " writing book successor "
<< ripple::strHex(obj.book_base()) << " - "
<< ripple::strHex(firstBook);
}
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
{
if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED)
{
std::string* predPtr = obj.mutable_predecessor();
if (!predPtr->size())
*predPtr = uint256ToString(Backend::firstKey);
std::string* succPtr = obj.mutable_successor();
if (!succPtr->size())
*succPtr = uint256ToString(Backend::lastKey);
if (obj.mod_type() ==
org::xrpl::rpc::v1::RawLedgerObject::DELETED)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " modifying successors for deleted object "
<< ripple::strHex(obj.key()) << " - "
<< ripple::strHex(*predPtr) << " - "
<< ripple::strHex(*succPtr);
backend_->writeSuccessor(
std::move(*predPtr), lgrInfo.seq, std::move(*succPtr));
}
else
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " adding successor for new object "
<< ripple::strHex(obj.key()) << " - "
<< ripple::strHex(*predPtr) << " - "
<< ripple::strHex(*succPtr);
backend_->writeSuccessor(
std::move(*predPtr),
lgrInfo.seq,
std::string{obj.key()});
backend_->writeSuccessor(
std::string{obj.key()},
lgrInfo.seq,
std::move(*succPtr));
}
}
else
BOOST_LOG_TRIVIAL(debug) << __func__ << " object modified "
<< ripple::strHex(obj.key());
}
}
std::vector<Backend::LedgerObject> cacheUpdates;
cacheUpdates.reserve(rawData.ledger_objects().objects_size());
// TODO change these to unordered_set
std::set<ripple::uint256> bookSuccessorsToCalculate;
std::set<ripple::uint256> modified;
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
{ {
auto key = ripple::uint256::fromVoidChecked(obj.key());
assert(key);
cacheUpdates.push_back(
{*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED &&
!rawData.object_neighbors_included())
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " object neighbors not included. using cache";
assert(backend_->cache().isFull());
auto blob = obj.mutable_data();
bool checkBookBase = false;
bool isDeleted = blob->size() == 0;
if (isDeleted)
{
auto old = backend_->cache().get(*key, lgrInfo.seq - 1);
assert(old);
checkBookBase = isBookDir(*key, *old);
}
else
checkBookBase = isBookDir(*key, *blob);
if (checkBookBase)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " is book dir. key = " << ripple::strHex(*key);
auto bookBase = getBookBase(*key);
auto oldFirstDir =
backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
assert(oldFirstDir);
// We deleted the first directory, or we added a directory prior
// to the old first directory
if ((isDeleted && key == oldFirstDir->key) ||
(!isDeleted && key < oldFirstDir->key))
{
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " Need to recalculate book base successor. base = "
<< ripple::strHex(bookBase)
<< " - key = " << ripple::strHex(*key)
<< " - isDeleted = " << isDeleted
<< " - seq = " << lgrInfo.seq;
bookSuccessorsToCalculate.insert(bookBase);
}
}
}
if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::MODIFIED)
modified.insert(*key);
backend_->writeLedgerObject( backend_->writeLedgerObject(
std::move(*obj.mutable_key()), std::move(*obj.mutable_key()),
lgrInfo.seq, lgrInfo.seq,
std::move(*obj.mutable_data())); std::move(*obj.mutable_data()));
} }
backend_->cache().update(cacheUpdates, lgrInfo.seq);
// rippled didn't send successor information, so use our cache
if (!rawData.object_neighbors_included() || backend_->cache().isFull())
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " object neighbors not included. using cache";
assert(backend_->cache().isFull());
size_t idx = 0;
for (auto const& obj : cacheUpdates)
{
if (modified.count(obj.key))
continue;
auto lb = backend_->cache().getPredecessor(obj.key, lgrInfo.seq);
if (!lb)
lb = {Backend::firstKey, {}};
auto ub = backend_->cache().getSuccessor(obj.key, lgrInfo.seq);
if (!ub)
ub = {Backend::lastKey, {}};
if (obj.blob.size() == 0)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " writing successor for deleted object "
<< ripple::strHex(obj.key) << " - "
<< ripple::strHex(lb->key) << " - "
<< ripple::strHex(ub->key);
backend_->writeSuccessor(
uint256ToString(lb->key),
lgrInfo.seq,
uint256ToString(ub->key));
}
else
{
backend_->writeSuccessor(
uint256ToString(lb->key),
lgrInfo.seq,
uint256ToString(obj.key));
backend_->writeSuccessor(
uint256ToString(obj.key),
lgrInfo.seq,
uint256ToString(ub->key));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " writing successor for new object "
<< ripple::strHex(lb->key) << " - "
<< ripple::strHex(obj.key) << " - "
<< ripple::strHex(ub->key);
}
}
for (auto const& base : bookSuccessorsToCalculate)
{
auto succ = backend_->cache().getSuccessor(base, lgrInfo.seq);
if (succ)
{
backend_->writeSuccessor(
uint256ToString(base),
lgrInfo.seq,
uint256ToString(succ->key));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Updating book successor "
<< ripple::strHex(base) << " - "
<< ripple::strHex(succ->key);
}
else
{
backend_->writeSuccessor(
uint256ToString(base),
lgrInfo.seq,
uint256ToString(Backend::lastKey));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Updating book successor "
<< ripple::strHex(base) << " - "
<< ripple::strHex(Backend::lastKey);
}
}
}
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
<< "Inserted/modified/deleted all objects. Number of objects = " << "Inserted/modified/deleted all objects. Number of objects = "
@@ -343,11 +541,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
throw std::runtime_error("runETLPipeline: parent ledger is null"); throw std::runtime_error("runETLPipeline: parent ledger is null");
} }
std::atomic<uint32_t> minSequence = rng->minSequence; std::atomic<uint32_t> minSequence = rng->minSequence;
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches";
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches";
std::atomic_bool writeConflict = false; std::atomic_bool writeConflict = false;
std::optional<uint32_t> lastPublishedSequence; std::optional<uint32_t> lastPublishedSequence;
@@ -483,6 +676,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
lastPublishedSequence = lgrInfo.seq; lastPublishedSequence = lgrInfo.seq;
} }
writeConflict = !success; writeConflict = !success;
// TODO move online delete logic to an admin RPC call
if (onlineDeleteInterval_ && !deleting_ && if (onlineDeleteInterval_ && !deleting_ &&
lgrInfo.seq - minSequence > *onlineDeleteInterval_) lgrInfo.seq - minSequence > *onlineDeleteInterval_)
{ {
@@ -611,6 +805,15 @@ ReportingETL::monitor()
<< "Ledger with sequence = " << nextSequence << "Ledger with sequence = " << nextSequence
<< " has been validated by the network. " << " has been validated by the network. "
<< "Attempting to find in database and publish"; << "Attempting to find in database and publish";
if (!backend_->cache().isFull())
{
std::thread t{[this, latestSequence]() {
BOOST_LOG_TRIVIAL(info) << "Loading cache";
loadBalancer_->loadInitialLedger(*latestSequence, true);
backend_->cache().setFull();
}};
t.detach();
}
// Attempt to take over responsibility of ETL writer after 2 failed // Attempt to take over responsibility of ETL writer after 2 failed
// attempts to publish the ledger. publishLedger() fails if the // attempts to publish the ledger. publishLedger() fails if the
// ledger that has been validated by the network is not found in the // ledger that has been validated by the network is not found in the
@@ -647,9 +850,7 @@ ReportingETL::monitor()
nextSequence = *lastPublished + 1; nextSequence = *lastPublished + 1;
} }
else else
{
++nextSequence; ++nextSequence;
}
} }
} }
@@ -665,6 +866,11 @@ ReportingETL::monitorReadOnly()
while (!stopping_ && while (!stopping_ &&
networkValidatedLedgers_->waitUntilValidatedByNetwork(sequence)) networkValidatedLedgers_->waitUntilValidatedByNetwork(sequence))
{ {
std::thread t{[this, sequence]() {
BOOST_LOG_TRIVIAL(info) << "Loading cache";
loadBalancer_->loadInitialLedger(sequence, true);
}};
t.detach();
publishLedger(sequence, {}); publishLedger(sequence, {});
++sequence; ++sequence;
} }

View File

@@ -53,13 +53,12 @@ parse_config(const char* filename)
std::optional<ssl::context> std::optional<ssl::context>
parse_certs(boost::json::object const& config) parse_certs(boost::json::object const& config)
{ {
if (!config.contains("ssl_cert_file") || !config.contains("ssl_key_file")) if (!config.contains("ssl_cert_file") || !config.contains("ssl_key_file"))
return {}; return {};
auto certFilename = config.at("ssl_cert_file").as_string().c_str(); auto certFilename = config.at("ssl_cert_file").as_string().c_str();
auto keyFilename = config.at("ssl_key_file").as_string().c_str(); auto keyFilename = config.at("ssl_key_file").as_string().c_str();
std::ifstream readCert(certFilename, std::ios::in | std::ios::binary); std::ifstream readCert(certFilename, std::ios::in | std::ios::binary);
if (!readCert) if (!readCert)
return {}; return {};
@@ -104,7 +103,8 @@ initLogging(boost::json::object const& config)
{ {
boost::log::add_file_log( boost::log::add_file_log(
config.at("log_file").as_string().c_str(), config.at("log_file").as_string().c_str(),
boost::log::keywords::format = format); boost::log::keywords::format = format,
boost::log::keywords::open_mode = std::ios_base::app);
} }
auto const logLevel = config.contains("log_level") auto const logLevel = config.contains("log_level")
? config.at("log_level").as_string() ? config.at("log_level").as_string()
@@ -168,7 +168,7 @@ main(int argc, char* argv[])
std::cerr << "Couldnt parse config. Exiting..." << std::endl; std::cerr << "Couldnt parse config. Exiting..." << std::endl;
return EXIT_FAILURE; return EXIT_FAILURE;
} }
initLogging(*config); initLogging(*config);
auto ctx = parse_certs(*config); auto ctx = parse_certs(*config);

View File

@@ -6,7 +6,7 @@ namespace RPC {
std::optional<Context> std::optional<Context>
make_WsContext( make_WsContext(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<BackendInterface> const& backend, std::shared_ptr<BackendInterface const> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptions, std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer, std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<WsBase> const& session, std::shared_ptr<WsBase> const& session,
@@ -24,7 +24,7 @@ make_WsContext(
std::optional<Context> std::optional<Context>
make_HttpContext( make_HttpContext(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<BackendInterface> const& backend, std::shared_ptr<BackendInterface const> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptions, std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer, std::shared_ptr<ETLLoadBalancer> const& balancer,
Backend::LedgerRange const& range) Backend::LedgerRange const& range)

View File

@@ -29,7 +29,7 @@ struct Context
std::string method; std::string method;
std::uint32_t version; std::uint32_t version;
boost::json::object const& params; boost::json::object const& params;
std::shared_ptr<BackendInterface> const& backend; std::shared_ptr<BackendInterface const> const& backend;
std::shared_ptr<ETLLoadBalancer> const& balancer; std::shared_ptr<ETLLoadBalancer> const& balancer;
// this needs to be an actual shared_ptr, not a reference. The above // this needs to be an actual shared_ptr, not a reference. The above
// references refer to shared_ptr members of WsBase, but WsBase contains // references refer to shared_ptr members of WsBase, but WsBase contains
@@ -42,7 +42,7 @@ struct Context
std::string const& command_, std::string const& command_,
std::uint32_t version_, std::uint32_t version_,
boost::json::object const& params_, boost::json::object const& params_,
std::shared_ptr<BackendInterface> const& backend_, std::shared_ptr<BackendInterface const> const& backend_,
std::shared_ptr<SubscriptionManager> const& subscriptions_, std::shared_ptr<SubscriptionManager> const& subscriptions_,
std::shared_ptr<ETLLoadBalancer> const& balancer_, std::shared_ptr<ETLLoadBalancer> const& balancer_,
std::shared_ptr<WsBase> const& session_, std::shared_ptr<WsBase> const& session_,
@@ -129,7 +129,7 @@ make_error(Error err);
std::optional<Context> std::optional<Context>
make_WsContext( make_WsContext(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<BackendInterface> const& backend, std::shared_ptr<BackendInterface const> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptions, std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer, std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<WsBase> const& session, std::shared_ptr<WsBase> const& session,
@@ -138,7 +138,7 @@ make_WsContext(
std::optional<Context> std::optional<Context>
make_HttpContext( make_HttpContext(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<BackendInterface> const& backend, std::shared_ptr<BackendInterface const> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptions, std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer, std::shared_ptr<ETLLoadBalancer> const& balancer,
Backend::LedgerRange const& range); Backend::LedgerRange const& range);

View File

@@ -61,12 +61,12 @@ doAccountObjects(Context const& context)
} }
ripple::uint256 cursor; ripple::uint256 cursor;
if (request.contains("cursor")) if (request.contains("marker"))
{ {
if (!request.at("cursor").is_string()) if (!request.at("marker").is_string())
return Status{Error::rpcINVALID_PARAMS, "cursorNotString"}; return Status{Error::rpcINVALID_PARAMS, "markerNotString"};
if (!cursor.parseHex(request.at("cursor").as_string().c_str())) if (!cursor.parseHex(request.at("marker").as_string().c_str()))
return Status{Error::rpcINVALID_PARAMS, "malformedCursor"}; return Status{Error::rpcINVALID_PARAMS, "malformedCursor"};
} }

View File

@@ -208,7 +208,7 @@ unsubscribeToAccountsProposed(
std::variant<Status, std::pair<std::vector<ripple::Book>, boost::json::array>> std::variant<Status, std::pair<std::vector<ripple::Book>, boost::json::array>>
validateAndGetBooks( validateAndGetBooks(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<Backend::BackendInterface> const& backend) std::shared_ptr<Backend::BackendInterface const> const& backend)
{ {
if (!request.at("books").is_array()) if (!request.at("books").is_array())
return Status{Error::rpcINVALID_PARAMS, "booksNotArray"}; return Status{Error::rpcINVALID_PARAMS, "booksNotArray"};

View File

@@ -16,9 +16,8 @@ public:
Subscription(Subscription&) = delete; Subscription(Subscription&) = delete;
Subscription(Subscription&&) = delete; Subscription(Subscription&&) = delete;
explicit explicit Subscription(boost::asio::io_context& ioc) : strand_(ioc)
Subscription(boost::asio::io_context& ioc) : strand_(ioc) {
{
} }
~Subscription() = default; ~Subscription() = default;
@@ -46,27 +45,20 @@ public:
SubscriptionMap(SubscriptionMap&) = delete; SubscriptionMap(SubscriptionMap&) = delete;
SubscriptionMap(SubscriptionMap&&) = delete; SubscriptionMap(SubscriptionMap&&) = delete;
explicit explicit SubscriptionMap(boost::asio::io_context& ioc) : strand_(ioc)
SubscriptionMap(boost::asio::io_context& ioc) : strand_(ioc)
{ {
} }
~SubscriptionMap() = default; ~SubscriptionMap() = default;
void void
subscribe( subscribe(std::shared_ptr<WsBase> const& session, Key const& key);
std::shared_ptr<WsBase> const& session,
Key const& key);
void void
unsubscribe( unsubscribe(std::shared_ptr<WsBase> const& session, Key const& key);
std::shared_ptr<WsBase> const& session,
Key const& key);
void void
publish( publish(std::string const& message, Key const& key);
std::string const& message,
Key const& key);
}; };
class SubscriptionManager class SubscriptionManager
@@ -80,18 +72,18 @@ class SubscriptionManager
Subscription txProposedSubscribers_; Subscription txProposedSubscribers_;
Subscription manifestSubscribers_; Subscription manifestSubscribers_;
Subscription validationsSubscribers_; Subscription validationsSubscribers_;
SubscriptionMap<ripple::AccountID> accountSubscribers_; SubscriptionMap<ripple::AccountID> accountSubscribers_;
SubscriptionMap<ripple::AccountID> accountProposedSubscribers_; SubscriptionMap<ripple::AccountID> accountProposedSubscribers_;
SubscriptionMap<ripple::Book> bookSubscribers_; SubscriptionMap<ripple::Book> bookSubscribers_;
std::shared_ptr<Backend::BackendInterface> backend_; std::shared_ptr<Backend::BackendInterface const> backend_;
public: public:
static std::shared_ptr<SubscriptionManager> static std::shared_ptr<SubscriptionManager>
make_SubscriptionManager( make_SubscriptionManager(
boost::json::object const& config, boost::json::object const& config,
std::shared_ptr<Backend::BackendInterface> const& b) std::shared_ptr<Backend::BackendInterface const> const& b)
{ {
auto numThreads = 1; auto numThreads = 1;
@@ -106,7 +98,7 @@ public:
SubscriptionManager( SubscriptionManager(
std::uint64_t numThreads, std::uint64_t numThreads,
std::shared_ptr<Backend::BackendInterface> const& b) std::shared_ptr<Backend::BackendInterface const> const& b)
: ledgerSubscribers_(ioc_) : ledgerSubscribers_(ioc_)
, txSubscribers_(ioc_) , txSubscribers_(ioc_)
, txProposedSubscribers_(ioc_) , txProposedSubscribers_(ioc_)
@@ -119,10 +111,11 @@ public:
{ {
work_.emplace(ioc_); work_.emplace(ioc_);
// We will eventually want to clamp this to be the number of strands, since // We will eventually want to clamp this to be the number of strands,
// adding more threads than we have strands won't see any performance benefits // since adding more threads than we have strands won't see any
BOOST_LOG_TRIVIAL(info) // performance benefits
<< "Starting subscription manager with " << numThreads << " workers"; BOOST_LOG_TRIVIAL(info) << "Starting subscription manager with "
<< numThreads << " workers";
workers_.reserve(numThreads); workers_.reserve(numThreads);
for (auto i = numThreads; i > 0; --i) for (auto i = numThreads; i > 0; --i)
@@ -132,7 +125,7 @@ public:
~SubscriptionManager() ~SubscriptionManager()
{ {
work_.reset(); work_.reset();
ioc_.stop(); ioc_.stop();
for (auto& worker : workers_) for (auto& worker : workers_)
worker.join(); worker.join();

View File

@@ -68,7 +68,7 @@ handle_request(
boost::beast::http:: boost::beast::http::
request<Body, boost::beast::http::basic_fields<Allocator>>&& req, request<Body, boost::beast::http::basic_fields<Allocator>>&& req,
Send&& send, Send&& send,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,
std::string const& ip) std::string const& ip)
@@ -239,7 +239,7 @@ class HttpBase
http::request<http::string_body> req_; http::request<http::string_body> req_;
std::shared_ptr<void> res_; std::shared_ptr<void> res_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_; std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_; std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_; DOSGuard& dosGuard_;
@@ -250,7 +250,7 @@ protected:
public: public:
HttpBase( HttpBase(
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,

View File

@@ -18,7 +18,7 @@ public:
// Take ownership of the socket // Take ownership of the socket
explicit HttpSession( explicit HttpSession(
tcp::socket&& socket, tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,

View File

@@ -23,7 +23,7 @@ class Detector
boost::beast::tcp_stream stream_; boost::beast::tcp_stream stream_;
std::optional<std::reference_wrapper<ssl::context>> ctx_; std::optional<std::reference_wrapper<ssl::context>> ctx_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_; std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_; std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_; DOSGuard& dosGuard_;
@@ -33,7 +33,7 @@ public:
Detector( Detector(
tcp::socket&& socket, tcp::socket&& socket,
std::optional<std::reference_wrapper<ssl::context>> ctx, std::optional<std::reference_wrapper<ssl::context>> ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard) DOSGuard& dosGuard)
@@ -101,7 +101,7 @@ make_websocket_session(
boost::beast::tcp_stream stream, boost::beast::tcp_stream stream,
http::request<http::string_body> req, http::request<http::string_body> req,
boost::beast::flat_buffer buffer, boost::beast::flat_buffer buffer,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard) DOSGuard& dosGuard)
@@ -122,7 +122,7 @@ make_websocket_session(
boost::beast::ssl_stream<boost::beast::tcp_stream> stream, boost::beast::ssl_stream<boost::beast::tcp_stream> stream,
http::request<http::string_body> req, http::request<http::string_body> req,
boost::beast::flat_buffer buffer, boost::beast::flat_buffer buffer,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard) DOSGuard& dosGuard)
@@ -146,9 +146,9 @@ class Listener
Listener<PlainSession, SslSession>>::shared_from_this; Listener<PlainSession, SslSession>>::shared_from_this;
net::io_context& ioc_; net::io_context& ioc_;
std::optional<std::reference_wrapper<ssl::context>> ctx_; std::optional<std::reference_wrapper<ssl::context>> ctx_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_; std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_; std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_; DOSGuard& dosGuard_;
@@ -158,7 +158,7 @@ public:
net::io_context& ioc, net::io_context& ioc,
std::optional<std::reference_wrapper<ssl::context>> ctx, std::optional<std::reference_wrapper<ssl::context>> ctx,
tcp::endpoint endpoint, tcp::endpoint endpoint,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard) DOSGuard& dosGuard)
@@ -262,7 +262,7 @@ make_HttpServer(
boost::json::object const& config, boost::json::object const& config,
boost::asio::io_context& ioc, boost::asio::io_context& ioc,
std::optional<std::reference_wrapper<ssl::context>> sslCtx, std::optional<std::reference_wrapper<ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard) DOSGuard& dosGuard)

View File

@@ -31,7 +31,7 @@ public:
// Take ownership of the socket // Take ownership of the socket
explicit PlainWsSession( explicit PlainWsSession(
boost::asio::ip::tcp::socket&& socket, boost::asio::ip::tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,
@@ -71,7 +71,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
boost::beast::tcp_stream http_; boost::beast::tcp_stream http_;
boost::optional<http::request_parser<http::string_body>> parser_; boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_; std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_; std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_; DOSGuard& dosGuard_;
@@ -80,7 +80,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
public: public:
WsUpgrader( WsUpgrader(
boost::asio::ip::tcp::socket&& socket, boost::asio::ip::tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,
@@ -95,7 +95,7 @@ public:
} }
WsUpgrader( WsUpgrader(
boost::beast::tcp_stream&& stream, boost::beast::tcp_stream&& stream,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,

View File

@@ -19,7 +19,7 @@ public:
explicit SslHttpSession( explicit SslHttpSession(
tcp::socket&& socket, tcp::socket&& socket,
ssl::context& ctx, ssl::context& ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,

View File

@@ -29,7 +29,7 @@ public:
// Take ownership of the socket // Take ownership of the socket
explicit SslWsSession( explicit SslWsSession(
boost::beast::ssl_stream<boost::beast::tcp_stream>&& stream, boost::beast::ssl_stream<boost::beast::tcp_stream>&& stream,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,
@@ -62,7 +62,7 @@ class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
boost::beast::ssl_stream<boost::beast::tcp_stream> https_; boost::beast::ssl_stream<boost::beast::tcp_stream> https_;
boost::optional<http::request_parser<http::string_body>> parser_; boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_; std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_; std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_; DOSGuard& dosGuard_;
@@ -72,7 +72,7 @@ public:
SslWsUpgrader( SslWsUpgrader(
boost::asio::ip::tcp::socket&& socket, boost::asio::ip::tcp::socket&& socket,
ssl::context& ctx, ssl::context& ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,
@@ -87,7 +87,7 @@ public:
} }
SslWsUpgrader( SslWsUpgrader(
boost::beast::ssl_stream<boost::beast::tcp_stream> stream, boost::beast::ssl_stream<boost::beast::tcp_stream> stream,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,

View File

@@ -79,7 +79,7 @@ class WsSession : public WsBase,
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface const> backend_;
// has to be a weak ptr because SubscriptionManager maintains collections // has to be a weak ptr because SubscriptionManager maintains collections
// of std::shared_ptr<WsBase> objects. If this were shared, there would be // of std::shared_ptr<WsBase> objects. If this were shared, there would be
// a cyclical dependency that would block destruction // a cyclical dependency that would block destruction
@@ -91,7 +91,7 @@ class WsSession : public WsBase,
public: public:
explicit WsSession( explicit WsSession(
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer, std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard, DOSGuard& dosGuard,

View File

@@ -451,9 +451,9 @@ async def ledger_data(ip, port, ledger, limit, binary, cursor):
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
if limit is not None: if limit is not None:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor":cursor})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor":cursor,"marker":cursor}))
else: else:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor,"marker":cursor}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(res) print(res)
objects = [] objects = []
@@ -777,7 +777,7 @@ async def ledger_range(ip, port):
if rng == "empty": if rng == "empty":
return (0,0) return (0,0)
idx = rng.find("-") idx = rng.find("-")
return (int(rng[0:idx]),int(rng[idx+1:-1])) return (int(rng[0:idx]),int(rng[idx+1:]))
res = res["result"] res = res["result"]
return (res["ledger_index_min"],res["ledger_index_max"]) return (res["ledger_index_min"],res["ledger_index_max"])

File diff suppressed because it is too large Load Diff