rebase session to use shared_ptr

This commit is contained in:
Nathan Nichols
2021-05-26 13:47:51 -05:00
parent be9ab23998
commit fef8f6f223
9 changed files with 173 additions and 153 deletions

View File

@@ -45,6 +45,12 @@ class NetworkValidatedLedgers
bool stopping_ = false; bool stopping_ = false;
public: public:
static std::shared_ptr<NetworkValidatedLedgers>
makeValidatedLedgers()
{
return std::make_shared<NetworkValidatedLedgers>();
}
/// Notify the datastructure that idx has been validated by the network /// Notify the datastructure that idx has been validated by the network
/// @param idx sequence validated by network /// @param idx sequence validated by network
void void

View File

@@ -34,19 +34,21 @@
// Primarly used in read-only mode, to monitor when ledgers are validated // Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource::ETLSource( ETLSource::ETLSource(
boost::json::object const& config, boost::json::object const& config,
BackendInterface& backend, std::shared_ptr<BackendInterface> backend,
ReportingETL& etl, std::shared_ptr<SubscriptionManager> subscriptions,
NetworkValidatedLedgers& networkValidatedLedgers, std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
boost::asio::io_context& ioContext) boost::asio::io_context& ioContext)
: ioc_(ioContext) : ioc_(ioContext)
, ws_(std::make_unique< , ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::tcp_stream>>( boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc_))) boost::asio::make_strand(ioc_)))
, resolver_(boost::asio::make_strand(ioc_)) , resolver_(boost::asio::make_strand(ioc_))
, etl_(etl)
, timer_(ioc_) , timer_(ioc_)
, networkValidatedLedgers_(networkValidatedLedgers) , networkValidatedLedgers_(networkValidatedLedgers)
, backend_(backend) , backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
{ {
if (config.contains("ip")) if (config.contains("ip"))
{ {
@@ -330,9 +332,9 @@ ETLSource::handleMessage()
{ {
if (response.contains("transaction")) if (response.contains("transaction"))
{ {
if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this)) if (balancer_->shouldPropagateTxnStream(this))
{ {
etl_.getSubscriptionManager().forwardProposedTransaction(response); subscriptions_->forwardProposedTransaction(response);
} }
} }
else else
@@ -362,7 +364,7 @@ ETLSource::handleMessage()
<< __func__ << " : " << __func__ << " : "
<< "Pushing ledger sequence = " << ledgerIndex << " - " << "Pushing ledger sequence = " << ledgerIndex << " - "
<< toString(); << toString();
networkValidatedLedgers_.push(ledgerIndex); networkValidatedLedgers_->push(ledgerIndex);
} }
return true; return true;
} }
@@ -533,7 +535,7 @@ ETLSource::loadInitialLedger(uint32_t sequence)
{ {
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< "Marker prefix = " << ptr->getMarkerPrefix(); << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, cq, backend_, abort); auto result = ptr->process(stub_, cq, *backend_, abort);
if (result != AsyncCallData::CallStatus::MORE) if (result != AsyncCallData::CallStatus::MORE)
{ {
numFinished++; numFinished++;
@@ -577,18 +579,17 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
} }
return {status, std::move(response)}; return {status, std::move(response)};
} }
ETLLoadBalancer::ETLLoadBalancer( ETLLoadBalancer::ETLLoadBalancer(
boost::json::array const& config, boost::json::array const& config,
BackendInterface& backend, std::shared_ptr<BackendInterface> backend,
ReportingETL& etl, std::shared_ptr<NetworkValidatedLedgers> nwvl,
NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext) boost::asio::io_context& ioContext)
: etl_(etl)
{ {
for (auto& entry : config) for (auto& entry : config)
{ {
std::unique_ptr<ETLSource> source = std::make_unique<ETLSource>( std::unique_ptr<ETLSource> source = std::make_unique<ETLSource>(
entry.as_object(), backend, etl, nwvl, ioContext); entry.as_object(), backend, nwvl, ioContext);
sources_.push_back(std::move(source)); sources_.push_back(std::move(source));
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - " BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "
<< sources_.back()->toString(); << sources_.back()->toString();
@@ -852,18 +853,6 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
numAttempts++; numAttempts++;
if (numAttempts % sources_.size() == 0) if (numAttempts % sources_.size() == 0)
{ {
/*
if (etl_.getApplication().getLedgerMaster().getLedgerBySeq(
ledgerSequence))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : "
<< "Error executing function. "
<< " Tried all sources, but ledger was found in db."
<< " Sequence = " << ledgerSequence;
break;
}
*/
BOOST_LOG_TRIVIAL(error) BOOST_LOG_TRIVIAL(error)
<< __func__ << " : " << __func__ << " : "
<< "Error executing function " << "Error executing function "

View File

@@ -26,12 +26,14 @@
#include <boost/beast/core/string.hpp> #include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <reporting/BackendInterface.h> #include <reporting/BackendInterface.h>
#include <reporting/server/SubscriptionManager.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#include <reporting/ETLHelpers.h> #include <reporting/ETLHelpers.h>
class ReportingETL; class ETLLoadBalancer;
class SubscriptionManager;
/// This class manages a connection to a single ETL source. This is almost /// This class manages a connection to a single ETL source. This is almost
/// always a p2p node, but really could be another reporting node. This class /// always a p2p node, but really could be another reporting node. This class
@@ -63,9 +65,7 @@ class ETLSource
std::string validatedLedgersRaw_; std::string validatedLedgersRaw_;
NetworkValidatedLedgers& networkValidatedLedgers_; std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers_;
ReportingETL& etl_;
// beast::Journal journal_; // beast::Journal journal_;
@@ -90,7 +90,11 @@ class ETLSource
// used for retrying connections // used for retrying connections
boost::asio::steady_timer timer_; boost::asio::steady_timer timer_;
BackendInterface& backend_; std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
public: public:
bool bool
@@ -118,9 +122,10 @@ public:
/// Primarly used in read-only mode, to monitor when ledgers are validated /// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource( ETLSource(
boost::json::object const& config, boost::json::object const& config,
BackendInterface& backend, std::shared_ptr<BackendInterface> backend,
ReportingETL& etl, std::shared_ptr<SubscriptionManager> subscriptions,
NetworkValidatedLedgers& networkValidatedLedgers, std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
boost::asio::io_context& ioContext); boost::asio::io_context& ioContext);
/// @param sequence ledger sequence to check for /// @param sequence ledger sequence to check for
@@ -292,18 +297,29 @@ public:
class ETLLoadBalancer class ETLLoadBalancer
{ {
private: private:
ReportingETL& etl_;
std::vector<std::unique_ptr<ETLSource>> sources_; std::vector<std::unique_ptr<ETLSource>> sources_;
public:
ETLLoadBalancer( ETLLoadBalancer(
boost::json::array const& config, boost::json::array const& config,
BackendInterface& backend, std::shared_ptr<BackendInterface> backend,
ReportingETL& etl, std::shared_ptr<NetworkValidatedLedgers> nwvl,
NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext); boost::asio::io_context& ioContext);
public:
static std::shared_ptr<ETLLoadBalancer>
makeETLLoadBalancer(
boost::json::object const& config,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
boost::asio::io_context& ioc)
{
return std::make_shared<ETLLoadBalancer>(
config.at("etl_sources").as_array(),
backend,
validatedLedgers,
ioc);
}
/// Load the initial ledger, writing data to the queue /// Load the initial ledger, writing data to the queue
/// @param sequence sequence of ledger to download /// @param sequence sequence of ledger to download
/// @param writeQueue queue to push downloaded data to /// @param writeQueue queue to push downloaded data to
@@ -393,4 +409,5 @@ private:
bool bool
execute(Func f, uint32_t ledgerSequence); execute(Func f, uint32_t ledgerSequence);
}; };
#endif #endif

View File

@@ -76,7 +76,7 @@ ReportingETL::insertTransactions(
auto journal = ripple::debugLog(); auto journal = ripple::debugLog();
accountTxData.emplace_back(txMeta, sttx.getTransactionID(), journal); accountTxData.emplace_back(txMeta, sttx.getTransactionID(), journal);
std::string keyStr{(const char*)sttx.getTransactionID().data(), 32}; std::string keyStr{(const char*)sttx.getTransactionID().data(), 32};
flatMapBackend_->writeTransaction( backend_->writeTransaction(
std::move(keyStr), std::move(keyStr),
ledger.seq, ledger.seq,
std::move(*raw), std::move(*raw),
@@ -89,7 +89,7 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence) ReportingETL::loadInitialLedger(uint32_t startingSequence)
{ {
// check that database is actually empty // check that database is actually empty
auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); auto rng = backend_->fetchLedgerRangeNoThrow();
if (rng) if (rng)
{ {
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
@@ -113,8 +113,8 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
<< __func__ << " : " << __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo); << "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites(); backend_->startWrites();
flatMapBackend_->writeLedger( backend_->writeLedger(
lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true);
std::vector<AccountTransactionsData> accountTxData = std::vector<AccountTransactionsData> accountTxData =
insertTransactions(lgrInfo, *ledgerData); insertTransactions(lgrInfo, *ledgerData);
@@ -125,13 +125,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
// data and pushes the downloaded data into the writeQueue. asyncWriter // data and pushes the downloaded data into the writeQueue. asyncWriter
// consumes from the queue and inserts the data into the Ledger object. // consumes from the queue and inserts the data into the Ledger object.
// Once the below call returns, all data has been pushed into the queue // Once the below call returns, all data has been pushed into the queue
loadBalancer_.loadInitialLedger(startingSequence); loadBalancer_->loadInitialLedger(startingSequence);
if (!stopping_) if (!stopping_)
{ {
flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); backend_->writeAccountTransactions(std::move(accountTxData));
} }
flatMapBackend_->finishWrites(startingSequence); backend_->finishWrites(startingSequence);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = " BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = "
<< ((end - start).count()) / 1000000000.0; << ((end - start).count()) / 1000000000.0;
@@ -144,7 +144,7 @@ ReportingETL::getFees(std::uint32_t seq)
ripple::Fees fees; ripple::Fees fees;
auto key = ripple::keylet::fees().key; auto key = ripple::keylet::fees().key;
auto bytes = flatMapBackend_->fetchLedgerObject(key, seq); auto bytes = backend_->fetchLedgerObject(key, seq);
if (!bytes) if (!bytes)
{ {
@@ -173,10 +173,10 @@ ReportingETL::getFees(std::uint32_t seq)
void void
ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{ {
auto ledgerRange = flatMapBackend_->fetchLedgerRange(); auto ledgerRange = backend_->fetchLedgerRange();
auto fees = getFees(lgrInfo.seq); auto fees = getFees(lgrInfo.seq);
auto transactions = auto transactions =
flatMapBackend_->fetchAllTransactionsInLedger(lgrInfo.seq); backend_->fetchAllTransactionsInLedger(lgrInfo.seq);
if (!fees || !ledgerRange) if (!fees || !ledgerRange)
{ {
@@ -206,7 +206,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
{ {
try try
{ {
auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); auto range = backend_->fetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence) if (!range || range->maxSequence < ledgerSequence)
{ {
@@ -269,7 +269,7 @@ ReportingETL::fetchLedgerData(uint32_t idx)
<< "Attempting to fetch ledger with sequence = " << idx; << "Attempting to fetch ledger with sequence = " << idx;
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response = std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
loadBalancer_.fetchLedger(idx, false); loadBalancer_->fetchLedger(idx, false);
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< "GetLedger reply = " << response->DebugString(); << "GetLedger reply = " << response->DebugString();
return response; return response;
@@ -283,7 +283,7 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx)
<< "Attempting to fetch ledger with sequence = " << idx; << "Attempting to fetch ledger with sequence = " << idx;
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response = std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
loadBalancer_.fetchLedger(idx, true); loadBalancer_->fetchLedger(idx, true);
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< "GetLedger reply = " << response->DebugString(); << "GetLedger reply = " << response->DebugString();
return response; return response;
@@ -301,9 +301,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo); << "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites(); backend_->startWrites();
flatMapBackend_->writeLedger( backend_->writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header())); lgrInfo, std::move(*rawData.mutable_ledger_header()));
std::vector<AccountTransactionsData> accountTxData{ std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)}; insertTransactions(lgrInfo, rawData)};
@@ -336,7 +336,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
} }
assert(not(isCreated and isDeleted)); assert(not(isCreated and isDeleted));
flatMapBackend_->writeLedgerObject( backend_->writeLedgerObject(
std::move(*obj.mutable_key()), std::move(*obj.mutable_key()),
lgrInfo.seq, lgrInfo.seq,
std::move(*obj.mutable_data()), std::move(*obj.mutable_data()),
@@ -344,13 +344,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
isDeleted, isDeleted,
std::move(bookDir)); std::move(bookDir));
} }
flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); backend_->writeAccountTransactions(std::move(accountTxData));
accumTxns_ += rawData.transactions_list().transactions_size(); accumTxns_ += rawData.transactions_list().transactions_size();
bool success = true; bool success = true;
if (accumTxns_ >= txnThreshold_) if (accumTxns_ >= txnThreshold_)
{ {
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
success = flatMapBackend_->finishWrites(lgrInfo.seq); success = backend_->finishWrites(lgrInfo.seq);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0; auto duration = ((end - start).count()) / 1000000000.0;
@@ -407,7 +407,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Starting etl pipeline"; << "Starting etl pipeline";
writing_ = true; writing_ = true;
auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); auto rng = backend_->fetchLedgerRangeNoThrow();
if (!rng || rng->maxSequence != startSequence - 1) if (!rng || rng->maxSequence != startSequence - 1)
{ {
assert(false); assert(false);
@@ -416,7 +416,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches"; << "Populating caches";
flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_); backend_->getIndexer().populateCachesAsync(*backend_);
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches"; << "Populated caches";
@@ -456,7 +456,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
// the entire server is shutting down. This can be detected in a // the entire server is shutting down. This can be detected in a
// variety of ways. See the comment at the top of the function // variety of ways. See the comment at the top of the function
while ((!finishSequence_ || currentSequence <= *finishSequence_) && while ((!finishSequence_ || currentSequence <= *finishSequence_) &&
networkValidatedLedgers_.waitUntilValidatedByNetwork( networkValidatedLedgers_->waitUntilValidatedByNetwork(
currentSequence) && currentSequence) &&
!writeConflict && !isStopping()) !writeConflict && !isStopping())
{ {
@@ -550,7 +550,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
lastPublishedSequence = lgrInfo.seq; lastPublishedSequence = lgrInfo.seq;
} }
writeConflict = !success; writeConflict = !success;
auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); auto range = backend_->fetchLedgerRangeNoThrow();
if (onlineDeleteInterval_ && !deleting_ && if (onlineDeleteInterval_ && !deleting_ &&
range->maxSequence - range->minSequence > range->maxSequence - range->minSequence >
*onlineDeleteInterval_) *onlineDeleteInterval_)
@@ -558,7 +558,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
deleting_ = true; deleting_ = true;
ioContext_.post([this, &range]() { ioContext_.post([this, &range]() {
BOOST_LOG_TRIVIAL(info) << "Running online delete"; BOOST_LOG_TRIVIAL(info) << "Running online delete";
flatMapBackend_->doOnlineDelete( backend_->doOnlineDelete(
range->maxSequence - *onlineDeleteInterval_); range->maxSequence - *onlineDeleteInterval_);
BOOST_LOG_TRIVIAL(info) << "Finished online delete"; BOOST_LOG_TRIVIAL(info) << "Finished online delete";
deleting_ = false; deleting_ = false;
@@ -581,7 +581,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence << "Extracted and wrote " << *lastPublishedSequence - startSequence
<< " in " << ((end - begin).count()) / 1000000000.0; << " in " << ((end - begin).count()) / 1000000000.0;
writing_ = false; writing_ = false;
flatMapBackend_->getIndexer().clearCaches(); backend_->getIndexer().clearCaches();
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Stopping etl pipeline"; << "Stopping etl pipeline";
@@ -603,7 +603,7 @@ void
ReportingETL::monitor() ReportingETL::monitor()
{ {
std::optional<uint32_t> latestSequence = std::optional<uint32_t> latestSequence =
flatMapBackend_->fetchLatestLedgerSequence(); backend_->fetchLatestLedgerSequence();
if (!latestSequence) if (!latestSequence)
{ {
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
@@ -625,7 +625,7 @@ ReportingETL::monitor()
<< __func__ << " : " << __func__ << " : "
<< "Waiting for next ledger to be validated by network..."; << "Waiting for next ledger to be validated by network...";
std::optional<uint32_t> mostRecentValidated = std::optional<uint32_t> mostRecentValidated =
networkValidatedLedgers_.getMostRecent(); networkValidatedLedgers_->getMostRecent();
if (mostRecentValidated) if (mostRecentValidated)
{ {
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
@@ -674,7 +674,7 @@ ReportingETL::monitor()
<< "Database is populated. " << "Database is populated. "
<< "Starting monitor loop. sequence = " << nextSequence; << "Starting monitor loop. sequence = " << nextSequence;
while (!stopping_ && while (!stopping_ &&
networkValidatedLedgers_.waitUntilValidatedByNetwork(nextSequence)) networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence))
{ {
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Ledger with sequence = " << nextSequence << "Ledger with sequence = " << nextSequence
@@ -727,13 +727,13 @@ ReportingETL::monitorReadOnly()
{ {
BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode"; BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode";
std::optional<uint32_t> mostRecent = std::optional<uint32_t> mostRecent =
networkValidatedLedgers_.getMostRecent(); networkValidatedLedgers_->getMostRecent();
if (!mostRecent) if (!mostRecent)
return; return;
uint32_t sequence = *mostRecent; uint32_t sequence = *mostRecent;
bool success = true; bool success = true;
while (!stopping_ && while (!stopping_ &&
networkValidatedLedgers_.waitUntilValidatedByNetwork(sequence)) networkValidatedLedgers_->waitUntilValidatedByNetwork(sequence))
{ {
publishLedger(sequence, 30); publishLedger(sequence, 30);
++sequence; ++sequence;
@@ -754,17 +754,15 @@ ReportingETL::doWork()
ReportingETL::ReportingETL( ReportingETL::ReportingETL(
boost::json::object const& config, boost::json::object const& config,
boost::asio::io_context& ioc) boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
: publishStrand_(ioc) : publishStrand_(ioc)
, ioContext_(ioc) , ioContext_(ioc)
, flatMapBackend_(Backend::makeBackend(config)) , backend_(backend)
, subscriptions_(std::make_unique<SubscriptionManager>()) , subscriptions_(subscriptions)
, loadBalancer_( , loadBalancer_(balancer)
config.at("etl_sources").as_array(),
*flatMapBackend_,
*this,
networkValidatedLedgers_,
ioc)
{ {
if (config.contains("start_sequence")) if (config.contains("start_sequence"))
startSequence_ = config.at("start_sequence").as_int64(); startSequence_ = config.at("start_sequence").as_int64();
@@ -778,6 +776,6 @@ ReportingETL::ReportingETL(
extractorThreads_ = config.at("extractor_threads").as_int64(); extractorThreads_ = config.at("extractor_threads").as_int64();
if (config.contains("txn_threshold")) if (config.contains("txn_threshold"))
txnThreshold_ = config.at("txn_threshold").as_int64(); txnThreshold_ = config.at("txn_threshold").as_int64();
flatMapBackend_->open(readOnly_); backend_->open(readOnly_);
} }

View File

@@ -60,8 +60,9 @@ class SubscriptionManager;
class ReportingETL class ReportingETL
{ {
private: private:
std::unique_ptr<BackendInterface> flatMapBackend_; std::shared_ptr<BackendInterface> backend_;
std::unique_ptr<SubscriptionManager> subscriptions_; std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> loadBalancer_;
std::optional<uint32_t> onlineDeleteInterval_; std::optional<uint32_t> onlineDeleteInterval_;
uint32_t extractorThreads_ = 1; uint32_t extractorThreads_ = 1;
@@ -86,11 +87,10 @@ private:
/// Mechanism for communicating with ETL sources. ETLLoadBalancer wraps an /// Mechanism for communicating with ETL sources. ETLLoadBalancer wraps an
/// arbitrary number of ETL sources and load balances ETL requests across /// arbitrary number of ETL sources and load balances ETL requests across
/// those sources. /// those sources.
ETLLoadBalancer loadBalancer_;
/// Mechanism for detecting when the network has validated a new ledger. /// Mechanism for detecting when the network has validated a new ledger.
/// This class provides a way to wait for a specific ledger to be validated /// This class provides a way to wait for a specific ledger to be validated
NetworkValidatedLedgers networkValidatedLedgers_; std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers_;
/// Whether the software is stopping /// Whether the software is stopping
std::atomic_bool stopping_ = false; std::atomic_bool stopping_ = false;
@@ -256,20 +256,45 @@ private:
std::optional<ripple::Fees> std::optional<ripple::Fees>
getFees(std::uint32_t seq); getFees(std::uint32_t seq);
public:
ReportingETL( ReportingETL(
boost::json::object const& config, boost::json::object const& config,
boost::asio::io_context& ioc); boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer);
public:
static std::shared_ptr<ReportingETL>
makeReportingETL(
boost::json::object const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
{
auto etl = std::make_shared<ReportingETL>(
config,
ioc,
backend,
subscriptions,
balancer);
etl->run();
return etl;
}
~ReportingETL() ~ReportingETL()
{ {
onStop(); BOOST_LOG_TRIVIAL(info) << "onStop called";
} BOOST_LOG_TRIVIAL(debug) << "Stopping Reporting ETL";
stopping_ = true;
NetworkValidatedLedgers& if (worker_.joinable())
getNetworkValidatedLedgers() worker_.join();
{
return networkValidatedLedgers_; BOOST_LOG_TRIVIAL(debug) << "Joined ReportingETL worker thread";
} }
bool bool
@@ -288,22 +313,6 @@ public:
return numMarkers_; return numMarkers_;
} }
/*
Json::Value
getInfo()
{
Json::Value result(Json::objectValue);
result["etl_sources"] = loadBalancer_.toJson();
result["is_writer"] = writing_.load();
auto last = getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_time"] = to_string(
date::floor<std::chrono::microseconds>(getLastPublish()));
return result;
}
*/
/// start all of the necessary components and begin ETL /// start all of the necessary components and begin ETL
void void
run() run()
@@ -312,43 +321,14 @@ public:
stopping_ = false; stopping_ = false;
loadBalancer_.start(); loadBalancer_->start();
doWork(); doWork();
} }
/// Stop all the necessary components
void void
onStop() onStop()
{ {
BOOST_LOG_TRIVIAL(info) << "onStop called";
BOOST_LOG_TRIVIAL(debug) << "Stopping Reporting ETL";
stopping_ = true;
networkValidatedLedgers_.stop();
loadBalancer_.stop();
BOOST_LOG_TRIVIAL(debug) << "Stopped loadBalancer";
if (worker_.joinable())
worker_.join();
BOOST_LOG_TRIVIAL(debug) << "Joined worker thread";
}
ETLLoadBalancer&
getETLLoadBalancer()
{
return loadBalancer_;
}
BackendInterface&
getFlatMapBackend()
{
return *flatMapBackend_;
}
SubscriptionManager&
getSubscriptionManager()
{
return *subscriptions_;
} }
private: private:

View File

@@ -45,6 +45,12 @@ class SubscriptionManager
public: public:
static std::shared_ptr<SubscriptionManager>
makeSubscriptionManager()
{
return std::make_shared<SubscriptionManager>();
}
void void
subLedger(std::shared_ptr<session>& session); subLedger(std::shared_ptr<session>& session);

View File

@@ -10,8 +10,9 @@ fail(boost::beast::error_code ec, char const* what)
boost::json::object boost::json::object
buildResponse( buildResponse(
boost::json::object const& request, boost::json::object const& request,
BackendInterface const& backend, std::shared_ptr<BackendInterface> backend,
SubscriptionManager& manager, std::shared_ptr<SubscriptionManager> manager,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<session> session) std::shared_ptr<session> session)
{ {
std::string command = request.at("command").as_string().c_str(); std::string command = request.at("command").as_string().c_str();
@@ -19,7 +20,7 @@ buildResponse(
boost::json::object response; boost::json::object response;
if (shouldForwardToP2p(request)) if (shouldForwardToP2p(request))
return etl.getETLLoadBalancer().forwardToP2p(request); return balancer->forwardToP2p(request);
switch (commandMap[command]) switch (commandMap[command])
{ {

View File

@@ -24,12 +24,13 @@
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <reporting/ReportingETL.h> #include <reporting/BackendInterface.h>
#include <reporting/server/SubscriptionManager.h>
#include <iostream> #include <reporting/ETLSource.h>
class session; class session;
class SubscriptionManager; class SubscriptionManager;
class ETLLoadBalancer;
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
enum RPCCommand { enum RPCCommand {
@@ -155,8 +156,9 @@ doUnsubscribe(
boost::json::object boost::json::object
buildResponse( buildResponse(
boost::json::object const& request, boost::json::object const& request,
BackendInterface const& backend, std::shared_ptr<BackendInterface> backend,
SubscriptionManager& manager, std::shared_ptr<SubscriptionManager> manager,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<session> session); std::shared_ptr<session> session);
void void
@@ -168,15 +170,22 @@ class session : public std::enable_shared_from_this<session>
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_; boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::string response_; std::string response_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
public: public:
// Take ownership of the socket // Take ownership of the socket
explicit session( explicit session(
boost::asio::ip::tcp::socket&& socket, boost::asio::ip::tcp::socket&& socket,
ReportingETL& etl) std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
: ws_(std::move(socket)) : ws_(std::move(socket))
, etl_(etl) , backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
{ {
} }
@@ -259,7 +268,9 @@ public:
{ {
response = buildResponse( response = buildResponse(
request, request,
etl_, backend_,
subscriptions_,
balancer_,
shared_from_this()); shared_from_this());
} }
catch (Backend::DatabaseTimeout const& t) catch (Backend::DatabaseTimeout const& t)

View File

@@ -130,7 +130,19 @@ main(int argc, char* argv[])
// The io_context is required for all I/O // The io_context is required for all I/O
boost::asio::io_context ioc{threads}; boost::asio::io_context ioc{threads};
ReportingETL etl{config.value(), ioc};
std::shared_ptr<BackendInterface> backend{BackendInterface::makeBackend(config)};
std::shared_ptr<SubscriptionManager> subscriptions{SubscriptionManager::makeSubscriptionManager()};
std::shared_ptr<NetworkValidatedLedgers> ledgers{NetworkValidatedLedgers::makeValidatedLedgers()};
std::shared_ptr<ETLLoadBalancer> = balancer{ETLLoadBalancer::makeETLLoadBalancer(
)};
std::shared_ptr<ReportingETL> etl{ReportingETL::makeReportingETL(
)};
// Create and launch a listening port // Create and launch a listening port
std::make_shared<listener>( std::make_shared<listener>(