wrap up refactor

This commit is contained in:
Nathan Nichols
2021-05-19 23:16:02 -05:00
parent fef8f6f223
commit cb045ad581
11 changed files with 258 additions and 240 deletions

View File

@@ -1,30 +1,48 @@
#ifndef RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED
#define RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED
#include <boost/algorithm/string.hpp>
#include <reporting/BackendInterface.h>
#include <reporting/CassandraBackend.h>
#include <reporting/PostgresBackend.h>
namespace Backend {
std::unique_ptr<BackendInterface>
makeBackend(boost::json::object const& config)
make_Backend(boost::json::object const& config)
{
BOOST_LOG_TRIVIAL(info) << __func__ << ": Constructing BackendInterface";
boost::json::object const& dbConfig = config.at("database").as_object();
bool readOnly = false;
if (config.contains("read_only"))
readOnly = config.at("read_only").as_bool();
auto type = dbConfig.at("type").as_string();
std::unique_ptr<BackendInterface> backend = nullptr;
if (boost::iequals(type, "cassandra"))
{
auto backend =
backend =
std::make_unique<CassandraBackend>(dbConfig.at(type).as_object());
return std::move(backend);
}
else if (boost::iequals(type, "postgres"))
{
auto backend =
backend =
std::make_unique<PostgresBackend>(dbConfig.at(type).as_object());
return std::move(backend);
}
return nullptr;
if (!backend)
throw std::runtime_error("Invalid database type");
backend->open(readOnly);
BOOST_LOG_TRIVIAL(info) << __func__
<< ": Constructed BackendInterface Successfully";
return backend;
}
} // namespace Backend
#endif
#endif //RIPPLE_REPORTING_BACKEND_FACTORY

View File

@@ -1124,6 +1124,8 @@ CassandraBackend::open(bool readOnly)
return;
}
BOOST_LOG_TRIVIAL(info) << "Opening Cassandra Backend";
std::lock_guard<std::mutex> lock(mutex_);
CassCluster* cluster = cass_cluster_new();
if (!cluster)
@@ -1480,7 +1482,6 @@ CassandraBackend::open(bool readOnly)
<< " (book, sequence, quality_key) VALUES (?, ?, (?, ?))";
if (!insertBook2_.prepareStatement(query, session_.get()))
continue;
query.str("");
query.str("");
query << "SELECT key FROM " << tablePrefix << "keys"
@@ -1533,16 +1534,6 @@ CassandraBackend::open(bool readOnly)
if (!selectLedgerPage_.prepareStatement(query, session_.get()))
continue;
/*
query.str("");
query << "SELECT filterempty(key,object) FROM " << tablePrefix <<
"objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
if (!upperBound2_.prepareStatement(query, session_.get()))
continue;
*/
query.str("");
query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects "
<< " WHERE key = ? LIMIT 1";
@@ -1628,42 +1619,6 @@ CassandraBackend::open(bool readOnly)
setupPreparedStatements = true;
}
/*
while (true)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
if (!fetchLatestLedgerSequence())
{
std::stringstream query;
query << "TRUNCATE TABLE " << tablePrefix << "ledger_range";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "ledgers";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "ledger_hashes";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "objects";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "transactions";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "account_tx";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
}
break;
}
*/
if (config_.contains("max_requests_outstanding"))
{
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
@@ -1673,33 +1628,11 @@ CassandraBackend::open(bool readOnly)
indexerMaxRequestsOutstanding =
config_["indexer_max_requests_outstanding"].as_int64();
}
/*
if (config_.contains("run_indexer"))
{
if (config_["run_indexer"].as_bool())
{
if (config_.contains("indexer_shift"))
{
indexerShift_ = config_["indexer_shift"].as_int64();
}
indexer_ = std::thread{[this]() {
auto seq = getNextToIndex();
if (seq)
{
BOOST_LOG_TRIVIAL(info)
<< "Running indexer. Ledger = " << std::to_string(*seq);
runIndexer(*seq);
BOOST_LOG_TRIVIAL(info) << "Ran indexer";
}
}};
}
}
*/
work_.emplace(ioContext_);
ioThread_ = std::thread{[this]() { ioContext_.run(); }};
open_ = true;
BOOST_LOG_TRIVIAL(info) << "Opened database successfully";
BOOST_LOG_TRIVIAL(info) << "Opened CassandraBackend successfully";
} // namespace Backend
} // namespace Backend

View File

@@ -46,7 +46,7 @@ class NetworkValidatedLedgers
public:
static std::shared_ptr<NetworkValidatedLedgers>
makeValidatedLedgers()
make_ValidatedLedgers()
{
return std::make_shared<NetworkValidatedLedgers>();
}

View File

@@ -34,11 +34,11 @@
// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource::ETLSource(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
boost::asio::io_context& ioContext)
ETLLoadBalancer& balancer)
: ioc_(ioContext)
, ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::tcp_stream>>(
@@ -141,28 +141,16 @@ ETLSource::close(bool startAgain)
}
closing_ = false;
if (startAgain)
start();
run();
});
}
else if (startAgain)
{
start();
run();
}
});
}
void
ETLSource::start()
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString();
auto const host = ip_;
auto const port = wsPort_;
resolver_.async_resolve(
host, port, [this](auto ec, auto results) { onResolve(ec, results); });
}
void
ETLSource::onResolve(
boost::beast::error_code ec,
@@ -332,7 +320,7 @@ ETLSource::handleMessage()
{
if (response.contains("transaction"))
{
if (balancer_->shouldPropagateTxnStream(this))
if (balancer_.shouldPropagateTxnStream(this))
{
subscriptions_->forwardProposedTransaction(response);
}
@@ -582,14 +570,22 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
ETLLoadBalancer::ETLLoadBalancer(
boost::json::array const& config,
boost::asio::io_context& ioContext,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
boost::asio::io_context& ioContext)
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl)
{
for (auto& entry : config)
{
std::unique_ptr<ETLSource> source = std::make_unique<ETLSource>(
entry.as_object(), backend, nwvl, ioContext);
std::unique_ptr<ETLSource> source = ETLSource::make_ETLSource(
entry.as_object(),
ioContext,
backend,
subscriptions,
nwvl,
*this
);
sources_.push_back(std::move(source));
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "
<< sources_.back()->toString();
@@ -864,16 +860,3 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
return true;
}
void
ETLLoadBalancer::start()
{
for (auto& source : sources_)
source->start();
}
void
ETLLoadBalancer::stop()
{
for (auto& source : sources_)
source->stop();
}

View File

@@ -91,12 +91,51 @@ class ETLSource
boost::asio::steady_timer timer_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
ETLLoadBalancer& balancer_;
std::shared_ptr<ETLLoadBalancer> balancer_;
void
run()
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString();
auto const host = ip_;
auto const port = wsPort_;
resolver_.async_resolve(
host, port, [this](auto ec, auto results) { onResolve(ec, results); });
}
public:
static std::unique_ptr<ETLSource>
make_ETLSource(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
ETLLoadBalancer& balancer)
{
std::unique_ptr<ETLSource> src = std::make_unique<ETLSource>(
config,
ioContext,
backend,
subscriptions,
networkValidatedLedgers,
balancer
);
src->run();
return src;
}
~ETLSource()
{
close(false);
}
bool
isConnected() const
{
@@ -122,11 +161,11 @@ public:
/// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
boost::asio::io_context& ioContext);
ETLLoadBalancer& balancer);
/// @param sequence ledger sequence to check for
/// @return true if this source has the desired ledger
@@ -199,14 +238,6 @@ public:
return validatedLedgersRaw_;
}
/// Close the underlying websocket
void
stop()
{
assert(ws_);
close(false);
}
/// Fetch the specified ledger
/// @param ledgerSequence sequence of the ledger to fetch
/// @getObjects whether to get the account state diff between this ledger
@@ -236,11 +267,6 @@ public:
bool
loadInitialLedger(uint32_t ledgerSequence);
/// Begin sequence of operations to connect to the ETL source and subscribe
/// to ledgers and transactions_proposed
void
start();
/// Attempt to reconnect to the ETL source
void
reconnect(boost::beast::error_code ec);
@@ -294,30 +320,38 @@ public:
/// the network, and the range of ledgers each etl source has). This class also
/// allows requests for ledger data to be load balanced across all possible etl
/// sources.
class ETLLoadBalancer
class ETLLoadBalancer : std::enable_shared_from_this<ETLLoadBalancer>
{
private:
std::vector<std::unique_ptr<ETLSource>> sources_;
public:
ETLLoadBalancer(
boost::json::array const& config,
boost::asio::io_context& ioContext,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
boost::asio::io_context& ioContext);
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl);
public:
static std::shared_ptr<ETLLoadBalancer>
makeETLLoadBalancer(
make_ETLLoadBalancer(
boost::json::object const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
boost::asio::io_context& ioc)
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers)
{
return std::make_shared<ETLLoadBalancer>(
config.at("etl_sources").as_array(),
ioc,
backend,
validatedLedgers,
ioc);
subscriptions,
validatedLedgers);
}
~ETLLoadBalancer()
{
sources_.clear();
}
/// Load the initial ledger, writing data to the queue
@@ -338,13 +372,6 @@ public:
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects);
/// Setup all of the ETL sources and subscribe to the necessary streams
void
start();
void
stop();
/// Determine whether messages received on the transactions_proposed stream
/// should be forwarded to subscribing clients. The server subscribes to
/// transactions_proposed on multiple ETLSources, yet only forwards messages

View File

@@ -18,7 +18,6 @@
//==============================================================================
#include <ripple/basics/StringUtilities.h>
#include <reporting/BackendFactory.h>
#include <reporting/DBHelpers.h>
#include <reporting/ReportingETL.h>
@@ -757,12 +756,14 @@ ReportingETL::ReportingETL(
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<NetworkValidatedLedgers> ledgers)
: publishStrand_(ioc)
, ioContext_(ioc)
, backend_(backend)
, subscriptions_(subscriptions)
, loadBalancer_(balancer)
, networkValidatedLedgers_(ledgers)
{
if (config.contains("start_sequence"))
startSequence_ = config.at("start_sequence").as_int64();
@@ -776,6 +777,5 @@ ReportingETL::ReportingETL(
extractorThreads_ = config.at("extractor_threads").as_int64();
if (config.contains("txn_threshold"))
txnThreshold_ = config.at("txn_threshold").as_int64();
backend_->open(readOnly_);
}

View File

@@ -66,7 +66,6 @@ private:
std::optional<uint32_t> onlineDeleteInterval_;
uint32_t extractorThreads_ = 1;
std::thread worker_;
boost::asio::io_context& ioContext_;
@@ -256,29 +255,60 @@ private:
std::optional<ripple::Fees>
getFees(std::uint32_t seq);
bool
isStopping()
{
return stopping_;
}
/// Get the number of markers to use during the initial ledger download.
/// This is equivelent to the degree of parallelism during the initial
/// ledger download
/// @return the number of markers
uint32_t
getNumMarkers()
{
return numMarkers_;
}
void
run()
{
BOOST_LOG_TRIVIAL(info) << "Starting reporting etl";
stopping_ = false;
doWork();
}
void
doWork();
public:
ReportingETL(
boost::json::object const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer);
public:
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<NetworkValidatedLedgers> ledgers);
static std::shared_ptr<ReportingETL>
makeReportingETL(
make_ReportingETL(
boost::json::object const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<NetworkValidatedLedgers> ledgers)
{
auto etl = std::make_shared<ReportingETL>(
config,
ioc,
backend,
subscriptions,
balancer);
balancer,
ledgers);
etl->run();
@@ -296,44 +326,6 @@ public:
BOOST_LOG_TRIVIAL(debug) << "Joined ReportingETL worker thread";
}
bool
isStopping()
{
return stopping_;
}
/// Get the number of markers to use during the initial ledger download.
/// This is equivelent to the degree of parallelism during the initial
/// ledger download
/// @return the number of markers
uint32_t
getNumMarkers()
{
return numMarkers_;
}
/// start all of the necessary components and begin ETL
void
run()
{
BOOST_LOG_TRIVIAL(info) << "Starting reporting etl";
stopping_ = false;
loadBalancer_->start();
doWork();
}
void
onStop()
{
}
private:
void
doWork();
};
#endif

View File

@@ -46,7 +46,7 @@ class SubscriptionManager
public:
static std::shared_ptr<SubscriptionManager>
makeSubscriptionManager()
make_SubscriptionManager()
{
return std::make_shared<SubscriptionManager>();
}

View File

@@ -36,16 +36,39 @@ class listener : public std::enable_shared_from_this<listener>
{
boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
public:
static void
make_listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
{
std::make_shared<listener>(
ioc,
endpoint,
backend,
subscriptions,
balancer
)->run();
}
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint,
ReportingETL& etl)
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
: ioc_(ioc)
, acceptor_(ioc)
, etl_(etl)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
{
boost::beast::error_code ec;
@@ -82,14 +105,16 @@ public:
}
}
// Start accepting incoming connections
~listener() = default;
private:
void
run()
{
do_accept();
}
private:
void
do_accept()
{
@@ -109,8 +134,7 @@ private:
}
else
{
// Create the session and run it
std::make_shared<session>(std::move(socket), etl_)->run();
session::make_session(std::move(socket), backend_, subscriptions_, balancer_);
}
// Accept another connection

View File

@@ -189,6 +189,35 @@ public:
{
}
static void
make_session(
boost::asio::ip::tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer)
{
std::make_shared<session>(
std::move(socket),
backend,
subscriptions,
balancer
)->run();
}
~session() = default;
void
send(std::string&& msg)
{
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(msg),
boost::beast::bind_front_handler(
&session::on_write, shared_from_this()));
}
private:
// Get on the correct executor
void
run()
@@ -296,16 +325,6 @@ public:
&session::on_write, shared_from_this()));
}
void
send(std::string&& msg)
{
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(msg),
boost::beast::bind_front_handler(
&session::on_write, shared_from_this()));
}
void
on_write(boost::beast::error_code ec, std::size_t bytes_transferred)
{

View File

@@ -29,6 +29,7 @@
#include <iostream>
#include <memory>
#include <reporting/ReportingETL.h>
#include <reporting/BackendFactory.h>
#include <reporting/server/session.h>
#include <reporting/server/listener.h>
#include <sstream>
@@ -41,11 +42,9 @@ parse_config(const char* filename)
{
try
{
std::cout << "TRYING" << std::endl;
std::ifstream in(filename, std::ios::in | std::ios::binary);
if (in)
{
std::cout << "GOT IN" << std::endl;
std::stringstream contents;
contents << in.rdbuf();
in.close();
@@ -97,6 +96,17 @@ initLogLevel(int level)
}
}
void
start(boost::asio::io_context& ioc, std::uint32_t numThreads)
{
std::vector<std::thread> v;
v.reserve(numThreads - 1);
for (auto i = numThreads - 1; i > 0; --i)
v.emplace_back([&ioc] { ioc.run(); });
ioc.run();
}
int
main(int argc, char* argv[])
{
@@ -131,35 +141,47 @@ main(int argc, char* argv[])
// The io_context is required for all I/O
boost::asio::io_context ioc{threads};
std::shared_ptr<BackendInterface> backend{BackendInterface::makeBackend(config)};
std::shared_ptr<SubscriptionManager> subscriptions{SubscriptionManager::makeSubscriptionManager()};
std::shared_ptr<NetworkValidatedLedgers> ledgers{NetworkValidatedLedgers::makeValidatedLedgers()};
std::shared_ptr<ETLLoadBalancer> = balancer{ETLLoadBalancer::makeETLLoadBalancer(
std::shared_ptr<BackendInterface> backend{
Backend::make_Backend(*config)
};
std::shared_ptr<SubscriptionManager> subscriptions{
SubscriptionManager::make_SubscriptionManager()
};
std::shared_ptr<NetworkValidatedLedgers> ledgers{
NetworkValidatedLedgers::make_ValidatedLedgers()
};
std::shared_ptr<ETLLoadBalancer> balancer{ETLLoadBalancer::make_ETLLoadBalancer(
*config,
ioc,
backend,
subscriptions,
ledgers
)};
std::shared_ptr<ReportingETL> etl{ReportingETL::makeReportingETL(
std::shared_ptr<ReportingETL> etl{ReportingETL::make_ReportingETL(
*config,
ioc,
backend,
subscriptions,
balancer,
ledgers
)};
// Create and launch a listening port
std::make_shared<listener>(
listener::make_listener(
ioc,
boost::asio::ip::tcp::endpoint{address, port},
etl)
->run();
backend,
subscriptions,
balancer
);
// Run the I/O service on the requested number of threads
std::vector<std::thread> v;
v.reserve(threads - 1);
for (auto i = threads - 1; i > 0; --i)
v.emplace_back([&ioc] { ioc.run(); });
std::cout << "created ETL" << std::endl;
etl.run();
std::cout << "running ETL" << std::endl;
ioc.run();
// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope
// Calls destructors on all resources, and destructs in order
start(ioc, threads);
return EXIT_SUCCESS;
}