Merge branch 'ledger_data_indexer' into master_new

This commit is contained in:
CJ Cobb
2021-04-28 20:12:28 +00:00
11 changed files with 1370 additions and 221 deletions

View File

@@ -58,6 +58,7 @@ target_sources(reporting PRIVATE
reporting/ETLSource.cpp reporting/ETLSource.cpp
reporting/CassandraBackend.cpp reporting/CassandraBackend.cpp
reporting/PostgresBackend.cpp reporting/PostgresBackend.cpp
reporting/BackendIndexer.cpp
reporting/Pg.cpp reporting/Pg.cpp
reporting/DBHelpers.cpp reporting/DBHelpers.cpp
reporting/ReportingETL.cpp reporting/ReportingETL.cpp

View File

@@ -99,7 +99,7 @@ doLedgerData(
response["num_results"] = results.size(); response["num_results"] = results.size();
response["db_time"] = time; response["db_time"] = time;
response["time_per_result"] = time / results.size(); response["time_per_result"] = time / (results.size() ? results.size() : 1);
return response; return response;
} }

View File

@@ -0,0 +1,76 @@
#include <reporting/BackendInterface.h>
namespace Backend {
BackendIndexer::BackendIndexer(boost::json::object const& config)
: keyShift_(config.at("keyshift").as_int64())
, bookShift_(config.at("bookshift").as_int64())
{
work_.emplace(ioc_);
ioThread_ = std::thread{[this]() { ioc_.run(); }};
};
BackendIndexer::~BackendIndexer()
{
std::unique_lock lck(mutex_);
work_.reset();
ioThread_.join();
}
void
BackendIndexer::addKey(ripple::uint256 const& key)
{
keys.insert(key);
}
void
BackendIndexer::deleteKey(ripple::uint256 const& key)
{
keys.erase(key);
}
void
BackendIndexer::addBookOffer(
ripple::uint256 const& book,
ripple::uint256 const& offerKey)
{
booksToOffers[book].insert(offerKey);
}
void
BackendIndexer::deleteBookOffer(
ripple::uint256 const& book,
ripple::uint256 const& offerKey)
{
booksToOffers[book].erase(offerKey);
booksToDeletedOffers[book].insert(offerKey);
}
void
BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
{
if (ledgerSequence >> keyShift_ << keyShift_ == ledgerSequence)
{
std::unordered_set<ripple::uint256> keysCopy = keys;
boost::asio::post(ioc_, [=, &backend]() {
BOOST_LOG_TRIVIAL(info) << "Indexer - writing keys. Ledger = "
<< std::to_string(ledgerSequence);
backend.writeKeys(keysCopy, ledgerSequence);
BOOST_LOG_TRIVIAL(info) << "Indexer - wrote keys. Ledger = "
<< std::to_string(ledgerSequence);
});
}
if (ledgerSequence >> bookShift_ << bookShift_ == ledgerSequence)
{
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksToOffersCopy = booksToOffers;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksToDeletedOffersCopy = booksToDeletedOffers;
boost::asio::post(ioc_, [=, &backend]() {
BOOST_LOG_TRIVIAL(info) << "Indexer - writing books. Ledger = "
<< std::to_string(ledgerSequence);
backend.writeBooks(booksToOffersCopy, ledgerSequence);
backend.writeBooks(booksToDeletedOffersCopy, ledgerSequence);
BOOST_LOG_TRIVIAL(info) << "Indexer - wrote books. Ledger = "
<< std::to_string(ledgerSequence);
});
booksToDeletedOffers = {};
}
}
} // namespace Backend

View File

@@ -1,7 +1,19 @@
#ifndef RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED #ifndef RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
#define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED #define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
#include <ripple/ledger/ReadView.h> #include <ripple/ledger/ReadView.h>
#include <boost/asio.hpp>
#include <reporting/DBHelpers.h> #include <reporting/DBHelpers.h>
namespace std {
template <>
struct hash<ripple::uint256>
{
std::size_t
operator()(const ripple::uint256& k) const noexcept
{
return boost::hash_range(k.begin(), k.end());
}
};
} // namespace std
namespace Backend { namespace Backend {
using Blob = std::vector<unsigned char>; using Blob = std::vector<unsigned char>;
struct LedgerObject struct LedgerObject
@@ -42,11 +54,51 @@ class DatabaseTimeout : public std::exception
return "Database read timed out. Please retry the request"; return "Database read timed out. Please retry the request";
} }
}; };
class BackendInterface;
class BackendIndexer
{
boost::asio::io_context ioc_;
std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_;
uint32_t keyShift_ = 16;
uint32_t bookShift_ = 16;
std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksToOffers;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksToDeletedOffers;
public:
BackendIndexer(boost::json::object const& config);
~BackendIndexer();
void
addKey(ripple::uint256 const& key);
void
deleteKey(ripple::uint256 const& key);
void
addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey);
void
deleteBookOffer(
ripple::uint256 const& book,
ripple::uint256 const& offerKey);
void
finish(uint32_t ledgerSequence, BackendInterface const& backend);
};
class BackendInterface class BackendInterface
{ {
private:
mutable BackendIndexer indexer_;
public: public:
// read methods // read methods
BackendInterface(boost::json::object const& config) : indexer_(config)
{
}
virtual std::optional<uint32_t> virtual std::optional<uint32_t>
fetchLatestLedgerSequence() const = 0; fetchLatestLedgerSequence() const = 0;
@@ -107,8 +159,37 @@ public:
std::string&& ledgerHeader, std::string&& ledgerHeader,
bool isFirst = false) const = 0; bool isFirst = false) const = 0;
virtual void void
writeLedgerObject( writeLedgerObject(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const
{
ripple::uint256 key256 = ripple::uint256::fromVoid(key.data());
if (isCreated)
indexer_.addKey(key256);
if (isDeleted)
indexer_.deleteKey(key256);
if (book)
{
if (isCreated)
indexer_.addBookOffer(*book, key256);
if (isDeleted)
indexer_.deleteBookOffer(*book, key256);
}
doWriteLedgerObject(
std::move(key),
seq,
std::move(blob),
isCreated,
isDeleted,
std::move(book));
}
virtual void
doWriteLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob, std::string&& blob,
@@ -141,11 +222,27 @@ public:
virtual void virtual void
startWrites() const = 0; startWrites() const = 0;
bool
finishWrites(uint32_t ledgerSequence) const
{
indexer_.finish(ledgerSequence, *this);
return doFinishWrites();
}
virtual bool virtual bool
finishWrites() const = 0; doFinishWrites() const = 0;
virtual bool virtual bool
doOnlineDelete(uint32_t minLedgerToKeep) const = 0; doOnlineDelete(uint32_t minLedgerToKeep) const = 0;
virtual bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const = 0;
virtual bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const = 0;
virtual ~BackendInterface() virtual ~BackendInterface()
{ {

File diff suppressed because it is too large Load Diff

View File

@@ -108,7 +108,7 @@ public:
~CassandraPreparedStatement() ~CassandraPreparedStatement()
{ {
BOOST_LOG_TRIVIAL(info) << __func__; BOOST_LOG_TRIVIAL(trace) << __func__;
if (prepared_) if (prepared_)
{ {
cass_prepared_free(prepared_); cass_prepared_free(prepared_);
@@ -211,7 +211,7 @@ public:
if (!statement_) if (!statement_)
throw std::runtime_error( throw std::runtime_error(
"CassandraStatement::bindUInt - statement_ is null"); "CassandraStatement::bindUInt - statement_ is null");
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(trace)
<< std::to_string(curBindingIndex_) << " " << std::to_string(value); << std::to_string(curBindingIndex_) << " " << std::to_string(value);
CassError rc = CassError rc =
cass_statement_bind_int32(statement_, curBindingIndex_, value); cass_statement_bind_int32(statement_, curBindingIndex_, value);
@@ -334,6 +334,12 @@ public:
} }
} }
bool
isOk()
{
return result_ != nullptr;
}
bool bool
hasResult() hasResult()
{ {
@@ -502,10 +508,15 @@ class CassandraAsyncResult
T& requestParams_; T& requestParams_;
CassandraResult result_; CassandraResult result_;
bool timedOut_ = false; bool timedOut_ = false;
bool retryOnTimeout_ = false;
public: public:
CassandraAsyncResult(T& requestParams, CassFuture* fut, F retry) CassandraAsyncResult(
: requestParams_(requestParams) T& requestParams,
CassFuture* fut,
F retry,
bool retryOnTimeout = false)
: requestParams_(requestParams), retryOnTimeout_(retryOnTimeout)
{ {
CassError rc = cass_future_error_code(fut); CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK) if (rc != CASS_OK)
@@ -516,7 +527,7 @@ public:
// try again // try again
if (isTimeout(rc)) if (isTimeout(rc))
timedOut_ = true; timedOut_ = true;
else if (!timedOut_ || retryOnTimeout_)
retry(requestParams_); retry(requestParams_);
} }
else else
@@ -527,7 +538,7 @@ public:
~CassandraAsyncResult() ~CassandraAsyncResult()
{ {
if (!!result_ or timedOut_) if (result_.isOk() or timedOut_)
{ {
BOOST_LOG_TRIVIAL(trace) << "finished a request"; BOOST_LOG_TRIVIAL(trace) << "finished a request";
size_t batchSize = requestParams_.batchSize; size_t batchSize = requestParams_.batchSize;
@@ -598,9 +609,11 @@ private:
CassandraPreparedStatement upperBound2_; CassandraPreparedStatement upperBound2_;
CassandraPreparedStatement getToken_; CassandraPreparedStatement getToken_;
CassandraPreparedStatement insertKey_; CassandraPreparedStatement insertKey_;
CassandraPreparedStatement getCreated_; CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement getBook_; CassandraPreparedStatement getBook_;
CassandraPreparedStatement selectBook_;
CassandraPreparedStatement insertBook_; CassandraPreparedStatement insertBook_;
CassandraPreparedStatement insertBook2_;
CassandraPreparedStatement deleteBook_; CassandraPreparedStatement deleteBook_;
CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement selectAccountTx_;
@@ -611,12 +624,16 @@ private:
CassandraPreparedStatement selectLedgerBySeq_; CassandraPreparedStatement selectLedgerBySeq_;
CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLatestLedger_;
CassandraPreparedStatement selectLedgerRange_; CassandraPreparedStatement selectLedgerRange_;
CassandraPreparedStatement selectLedgerDiff_;
// io_context used for exponential backoff for write retries // io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_; mutable boost::asio::io_context ioContext_;
std::optional<boost::asio::io_context::work> work_; std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_; std::thread ioThread_;
std::thread indexer_;
uint32_t indexerShift_ = 16;
// maximum number of concurrent in flight requests. New requests will wait // maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded // for earlier requests to finish if this limit is exceeded
uint32_t maxRequestsOutstanding = 10000; uint32_t maxRequestsOutstanding = 10000;
@@ -638,7 +655,8 @@ private:
mutable bool isFirstLedger_ = false; mutable bool isFirstLedger_ = false;
public: public:
CassandraBackend(boost::json::object const& config) : config_(config) CassandraBackend(boost::json::object const& config)
: BackendInterface(config), config_(config)
{ {
} }
@@ -675,9 +693,27 @@ public:
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
work_.reset(); work_.reset();
ioThread_.join(); ioThread_.join();
if (indexer_.joinable())
indexer_.join();
} }
open_ = false; open_ = false;
} }
CassandraPreparedStatement const&
getInsertKeyPreparedStatement() const
{
return insertKey_;
}
CassandraPreparedStatement const&
getInsertBookPreparedStatement() const
{
return insertBook2_;
}
CassandraPreparedStatement const&
getSelectLedgerDiffPreparedStatement() const
{
return selectLedgerDiff_;
}
std::pair< std::pair<
std::vector<TransactionAndMetadata>, std::vector<TransactionAndMetadata>,
@@ -763,7 +799,7 @@ public:
}; };
bool bool
finishWrites() const override doFinishWrites() const override
{ {
// wait for all other writes to finish // wait for all other writes to finish
sync(); sync();
@@ -917,124 +953,48 @@ public:
fetchLedgerPage2( fetchLedgerPage2(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const std::uint32_t limit) const;
{
BOOST_LOG_TRIVIAL(trace) << __func__;
CassandraStatement statement{selectLedgerPageKeys_};
int64_t intCursor = INT64_MIN;
if (cursor)
{
auto token = getToken(cursor->data());
if (token)
intCursor = *token;
}
statement.bindInt(intCursor);
statement.bindInt(ledgerSequence);
statement.bindInt(ledgerSequence);
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size();
if (keys.size())
{
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < objs.size(); ++i)
{
results.push_back({keys[i], objs[i]});
}
return {results, keys[keys.size() - 1]};
}
return {{}, {}};
}
LedgerPage LedgerPage
fetchLedgerPage( fetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const override std::uint32_t limit) const override;
{ std::vector<LedgerObject>
BOOST_LOG_TRIVIAL(trace) << __func__; fetchLedgerDiff(uint32_t ledgerSequence) const;
std::optional<ripple::uint256> currentCursor = cursor; std::map<uint32_t, std::vector<LedgerObject>>
std::vector<LedgerObject> objects; fetchLedgerDiffs(std::vector<uint32_t> const& sequences) const;
uint32_t curLimit = limit;
while (objects.size() < limit)
{
CassandraStatement statement{selectLedgerPage_};
int64_t intCursor = INT64_MIN; bool
if (currentCursor) runIndexer(uint32_t ledgerSequence) const;
{ bool
auto token = getToken(currentCursor->data()); isIndexed(uint32_t ledgerSequence) const;
if (token)
intCursor = *token;
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cursor = " << std::to_string(intCursor)
<< " , sequence = " << std::to_string(ledgerSequence)
<< ", - limit = " << std::to_string(limit);
statement.bindInt(intCursor);
statement.bindInt(ledgerSequence);
statement.bindUInt(curLimit);
CassandraResult result = executeSyncRead(statement); std::optional<uint32_t>
getNextToIndex() const;
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
size_t prevSize = objects.size();
do
{
std::vector<unsigned char> object = result.getBytes();
if (object.size())
{
objects.push_back(
{result.getUInt256(), std::move(object)});
}
} while (result.nextRow());
size_t prevBatchSize = objects.size() - prevSize;
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " - added to objects. size = " << objects.size();
if (result.numRows() < curLimit)
{
currentCursor = {};
break;
}
if (objects.size() < limit)
{
curLimit = 2048;
}
assert(objects.size());
currentCursor = objects[objects.size() - 1].key;
}
}
if (objects.size())
return {objects, currentCursor};
return {{}, {}};
}
bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const;
bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>> std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers( fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t sequence, uint32_t sequence,
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const override std::optional<ripple::uint256> const& cursor) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers2(
ripple::uint256 const& book,
uint32_t sequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{ {
CassandraStatement statement{getBook_}; CassandraStatement statement{getBook_};
statement.bindBytes(book); statement.bindBytes(book);
@@ -1135,7 +1095,7 @@ public:
}); });
for (auto const& res : results) for (auto const& res : results)
{ {
if (res.transaction.size() == 0) if (res.transaction.size() == 1 && res.transaction[0] == 0)
throw DatabaseTimeout(); throw DatabaseTimeout();
} }
@@ -1183,47 +1143,7 @@ public:
ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; ReadObjectCallbackData(ReadObjectCallbackData const& other) = default;
}; };
std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override
{
std::size_t const numKeys = keys.size();
BOOST_LOG_TRIVIAL(trace)
<< "Fetching " << numKeys << " records from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<Blob> results{numKeys};
std::vector<std::shared_ptr<ReadObjectCallbackData>> cbs;
cbs.reserve(numKeys);
for (std::size_t i = 0; i < keys.size(); ++i)
{
cbs.push_back(std::make_shared<ReadObjectCallbackData>(
*this,
keys[i],
sequence,
results[i],
cv,
numFinished,
numKeys));
readObject(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(
lck, [&numFinished, &numKeys]() { return numFinished == numKeys; });
for (auto const& res : results)
{
if (res.size() == 0)
throw DatabaseTimeout();
}
BOOST_LOG_TRIVIAL(trace)
<< "Fetched " << numKeys << " records from Cassandra";
return results;
}
void void
readObject(ReadObjectCallbackData& data) const readObject(ReadObjectCallbackData& data) const
{ {
@@ -1233,6 +1153,10 @@ public:
executeAsyncRead(statement, flatMapReadObjectCallback, data); executeAsyncRead(statement, flatMapReadObjectCallback, data);
} }
std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override;
struct WriteCallbackData struct WriteCallbackData
{ {
@@ -1346,7 +1270,7 @@ public:
executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry); executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry);
} }
void void
writeLedgerObject( doWriteLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob, std::string&& blob,

View File

@@ -46,8 +46,9 @@ struct AccountTransactionsData
} }
}; };
template <class T>
inline bool inline bool
isOffer(std::string const& object) isOffer(T const& object)
{ {
short offer_bytes = (object[1] << 8) | object[2]; short offer_bytes = (object[1] << 8) | object[2];
return offer_bytes == 0x006f; return offer_bytes == 0x006f;

View File

@@ -4,7 +4,9 @@
namespace Backend { namespace Backend {
PostgresBackend::PostgresBackend(boost::json::object const& config) PostgresBackend::PostgresBackend(boost::json::object const& config)
: pgPool_(make_PgPool(config)), writeConnection_(pgPool_) : BackendInterface(config)
, pgPool_(make_PgPool(config))
, writeConnection_(pgPool_)
{ {
if (config.contains("write_interval")) if (config.contains("write_interval"))
{ {
@@ -55,7 +57,7 @@ PostgresBackend::writeAccountTransactions(
} }
} }
void void
PostgresBackend::writeLedgerObject( PostgresBackend::doWriteLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob, std::string&& blob,
@@ -651,7 +653,7 @@ PostgresBackend::startWrites() const
} }
bool bool
PostgresBackend::finishWrites() const PostgresBackend::doFinishWrites() const
{ {
if (!abortWrite_) if (!abortWrite_)
{ {
@@ -682,6 +684,66 @@ PostgresBackend::finishWrites() const
return !abortWrite_; return !abortWrite_;
} }
bool bool
PostgresBackend::writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const
{
PgQuery pgQuery(pgPool_);
std::stringstream keysBuffer;
size_t numRows = 0;
for (auto& key : keys)
{
keysBuffer << std::to_string(ledgerSequence) << '\t' << "\\\\x"
<< ripple::strHex(key) << '\n';
numRows++;
// If the buffer gets too large, the insert fails. Not sure why. So we
// insert after 1 million records
if (numRows == 1000000)
{
pgQuery.bulkInsert("keys", keysBuffer.str());
keysBuffer = {};
numRows = 0;
}
}
if (numRows > 0)
{
pgQuery.bulkInsert("keys", keysBuffer.str());
}
}
bool
PostgresBackend::writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const
{
PgQuery pgQuery(pgPool_);
std::stringstream booksBuffer;
size_t numRows = 0;
for (auto& book : books)
{
for (auto& offer : book.second)
{
booksBuffer << "\\\\x" << ripple::strHex(book.first) << '\t'
<< std::to_string(ledgerSequence) << '\t' << "\\\\x"
<< ripple::strHex(offer) << '\n';
numRows++;
// If the buffer gets too large, the insert fails. Not sure why. So
// we insert after 1 million records
if (numRows == 1000000)
{
pgQuery.bulkInsert("books", booksBuffer.str());
booksBuffer = {};
numRows = 0;
}
}
}
if (numRows > 0)
{
pgQuery.bulkInsert("books", booksBuffer.str());
}
}
bool
PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
{ {
uint32_t limit = 2048; uint32_t limit = 2048;

View File

@@ -81,7 +81,7 @@ public:
bool isFirst) const override; bool isFirst) const override;
void void
writeLedgerObject( doWriteLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob, std::string&& blob,
@@ -110,10 +110,20 @@ public:
startWrites() const override; startWrites() const override;
bool bool
finishWrites() const override; doFinishWrites() const override;
bool bool
doOnlineDelete(uint32_t minLedgerToKeep) const override; doOnlineDelete(uint32_t minLedgerToKeep) const override;
bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const override;
bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const override;
}; };
} // namespace Backend } // namespace Backend
#endif #endif

View File

@@ -131,7 +131,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
{ {
flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
} }
flatMapBackend_->finishWrites(); flatMapBackend_->finishWrites(startingSequence);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = " BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = "
<< ((end - start).count()) / 1000000000.0; << ((end - start).count()) / 1000000000.0;
@@ -153,29 +153,31 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
<< "Attempting to publish ledger = " << ledgerSequence; << "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0; size_t numAttempts = 0;
while (!stopping_) while (!stopping_)
{
try
{ {
auto range = flatMapBackend_->fetchLedgerRange(); auto range = flatMapBackend_->fetchLedgerRange();
if (!range || range->maxSequence < ledgerSequence) if (!range || range->maxSequence < ledgerSequence)
{ {
BOOST_LOG_TRIVIAL(warning) BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< __func__ << " : " << "Trying to publish. Could not find "
<< "Trying to publish. Could not find ledger with sequence = " "ledger with sequence = "
<< ledgerSequence; << ledgerSequence;
// We try maxAttempts times to publish the ledger, waiting one // We try maxAttempts times to publish the ledger, waiting one
// second in between each attempt. // second in between each attempt.
// If the ledger is not present in the database after maxAttempts, // If the ledger is not present in the database after
// we attempt to take over as the writer. If the takeover fails, // maxAttempts, we attempt to take over as the writer. If the
// doContinuousETL will return, and this node will go back to // takeover fails, doContinuousETL will return, and this node
// publishing. // will go back to publishing. If the node is in strict read
// If the node is in strict read only mode, we simply // only mode, we simply skip publishing this ledger and return
// skip publishing this ledger and return false indicating the // false indicating the publish failed
// publish failed
if (numAttempts >= maxAttempts) if (numAttempts >= maxAttempts)
{ {
BOOST_LOG_TRIVIAL(error) << __func__ << " : " BOOST_LOG_TRIVIAL(debug)
<< "Failed to publish ledger after " << __func__ << " : "
<< numAttempts << " attempts."; << "Failed to publish ledger after " << numAttempts
<< " attempts.";
if (!readOnly_) if (!readOnly_)
{ {
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
@@ -183,21 +185,14 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
<< "Attempting to become ETL writer"; << "Attempting to become ETL writer";
return false; return false;
} }
else
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "In strict read-only mode. "
<< "Skipping publishing this ledger. "
<< "Beginning fast forward.";
return false;
} }
}
else
{
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
++numAttempts; ++numAttempts;
continue;
} }
}
catch (Backend::DatabaseTimeout const& e)
{
continue; continue;
} }
@@ -682,7 +677,7 @@ ReportingETL::monitorReadOnly()
while (!stopping_ && while (!stopping_ &&
networkValidatedLedgers_.waitUntilValidatedByNetwork(sequence)) networkValidatedLedgers_.waitUntilValidatedByNetwork(sequence))
{ {
success = publishLedger(sequence, success ? 30 : 1); publishLedger(sequence, 30);
++sequence; ++sequence;
} }
} }

16
test.py
View File

@@ -262,11 +262,12 @@ async def ledger_entries(ip, port,ledger):
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)
async def ledger_data(ip, port, ledger, limit, binary): async def ledger_data(ip, port, ledger, limit, binary, cursor):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit)})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor"cursor}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
objects = [] objects = []
blobs = [] blobs = []
@@ -357,6 +358,7 @@ def compare_offer(aldous, p2p):
def compare_book_offers(aldous, p2p): def compare_book_offers(aldous, p2p):
p2pOffers = {} p2pOffers = {}
for x in p2p: for x in p2p:
matched = False
for y in aldous: for y in aldous:
if y["index"] == x["index"]: if y["index"] == x["index"]:
if not compare_offer(y,x): if not compare_offer(y,x):
@@ -364,6 +366,12 @@ def compare_book_offers(aldous, p2p):
print(y) print(y)
print(x) print(x)
return False return False
else:
matched = True
if not matched:
print("offer not found")
print(x)
return False
print("offers match!") print("offers match!")
return True return True
@@ -464,6 +472,7 @@ async def ledger(ip, port, ledger, binary, transactions, expand):
await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":bool(binary), "transactions":bool(transactions),"expand":bool(expand)})) await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":bool(binary), "transactions":bool(transactions),"expand":bool(expand)}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
print(bool(binary))
return res return res
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
@@ -543,6 +552,7 @@ parser.add_argument('--numPages',default=3)
parser.add_argument('--base') parser.add_argument('--base')
parser.add_argument('--desired') parser.add_argument('--desired')
parser.add_argument('--includeBlobs',default=False) parser.add_argument('--includeBlobs',default=False)
parser.add_argument('--cursor',default='0000000000000000000000000000000000000000000000000000000000000000')
@@ -640,7 +650,7 @@ def run(args):
print(compareAccountTx(res,res2)) print(compareAccountTx(res,res2))
elif args.action == "ledger_data": elif args.action == "ledger_data":
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary)) ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary, args.cursor))
if args.verify: if args.verify:
writeLedgerData(res,args.filename) writeLedgerData(res,args.filename)
elif args.action == "ledger_data_full": elif args.action == "ledger_data_full":