Update doxygen comments (#818)

Fixes #421
This commit is contained in:
Alex Kremer
2023-08-11 21:32:32 +01:00
committed by GitHub
parent c20b14494a
commit 547cb340bd
206 changed files with 3004 additions and 1937 deletions

View File

@@ -53,47 +53,98 @@ class SubscriptionManager;
namespace etl {
/**
* @brief Base class for all ETL sources
* @brief Base class for all ETL sources.
*
* Note: Since sources below are implemented via CRTP, it sort of makes no sense to have a virtual base class.
* We should consider using a vector of ProbingSources instead of vector of unique ptrs to this virtual base.
*/
class Source
{
public:
/** @return true if source is connected; false otherwise */
virtual bool
isConnected() const = 0;
/** @return JSON representation of the source */
virtual boost::json::object
toJson() const = 0;
/** @brief Runs the source */
virtual void
run() = 0;
/** @brief Request to pause the source (i.e. disconnect and do nothing) */
virtual void
pause() = 0;
/** @brief Reconnect and resume this source */
virtual void
resume() = 0;
/** @return String representation of the source (for debug) */
virtual std::string
toString() const = 0;
/**
* @brief Check if ledger is known by this source.
*
* @param sequence The ledger sequence to check
* @return true if ledger is in the range of this source; false otherwise
*/
virtual bool
hasLedger(uint32_t sequence) const = 0;
/**
* @brief Fetch data for a specific ledger.
*
* This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger
* is found in the database, or the server is shutting down.
*
* @param sequence Sequence of the ledger to fetch
* @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true
* @param getObjectNeighbors Whether to request object neighbors; defaults to false
* @return A std::pair of the response status and the response itself
*/
virtual std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects = true, bool getObjectNeighbors = false) = 0;
fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) = 0;
/**
* @brief Download a ledger in full.
*
* @param sequence Sequence of the ledger to download
* @param numMarkers Number of markers to generate for async calls
* @param cacheOnly Only insert into cache, not the DB; defaults to false
* @return A std::pair of the data and a bool indicating whether the download was successfull
*/
virtual std::pair<std::vector<std::string>, bool>
loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) = 0;
/**
* @brief Forward a request to rippled.
*
* @param request The request to forward
* @param clientIp IP of the client forwarding this request
* @param yield The coroutine context
* @return Response wrapped in an optional on success; nullopt otherwise
*/
virtual std::optional<boost::json::object>
forwardToRippled(boost::json::object const& request, std::string const& clientIp, boost::asio::yield_context yield)
const = 0;
/**
* @return A token that uniquely identifies this source instance.
*/
virtual boost::uuids::uuid
token() const = 0;
virtual ~Source() = default;
/**
* @brief Comparison is done via comparing tokens provided by the token() function.
*
* @param other The other source to compare to
* @return true if sources are equal; false otherwise
*/
bool
operator==(Source const& other) const
{
@@ -115,7 +166,7 @@ private:
};
/**
* @brief Hooks for source events such as connects and disconnects
* @brief Hooks for source events such as connects and disconnects.
*/
struct SourceHooks
{
@@ -126,7 +177,9 @@ struct SourceHooks
};
/**
* @brief Base implementation of shared source logic (using CRTP)
* @brief Base implementation of shared source logic.
*
* @tparam Derived The derived class for CRTP
*/
template <class Derived>
class SourceImpl : public Source
@@ -174,25 +227,30 @@ protected:
public:
/**
* @brief Create ETL source without gRPC endpoint
* @brief Create the base portion of ETL source.
*
* Fetch ledger and load initial ledger will fail for this source.
* Primarly used in read-only mode, to monitor when ledgers are validated.
* @param config The configuration to use
* @param ioc The io_context to run on
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager
* @param validatedLedgers The network validated ledgers datastructure
* @param balancer Load balancer to use
* @param hooks Hooks to use for connect/disconnect events
*/
SourceImpl(
util::Config const& config,
boost::asio::io_context& ioContext,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
LoadBalancer& balancer,
SourceHooks hooks)
: networkValidatedLedgers_(networkValidatedLedgers)
: networkValidatedLedgers_(validatedLedgers)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, forwardCache_(config, ioContext, *this)
, strand_(boost::asio::make_strand(ioContext))
, forwardCache_(config, ioc, *this)
, strand_(boost::asio::make_strand(ioc))
, timer_(strand_)
, resolver_(strand_)
, hooks_(hooks)
@@ -241,20 +299,6 @@ public:
return uuid_;
}
std::chrono::system_clock::time_point
getLastMsgTime() const
{
std::lock_guard lck(lastMsgTimeMtx_);
return lastMsgTime_;
}
void
setLastMsgTime()
{
std::lock_guard lck(lastMsgTimeMtx_);
lastMsgTime_ = std::chrono::system_clock::now();
}
std::optional<boost::json::object>
requestFromRippled(
boost::json::object const& request,
@@ -337,10 +381,6 @@ public:
}
}
/**
* @param sequence ledger sequence to check for
* @return true if this source has the desired ledger
*/
bool
hasLedger(uint32_t sequence) const override
{
@@ -362,64 +402,8 @@ public:
return false;
}
/**
* @brief Process the validated range received on the ledgers stream. set the appropriate member variable
*
* @param range validated range received on ledgers stream
*/
void
setValidatedRange(std::string const& range)
{
std::vector<std::pair<uint32_t, uint32_t>> pairs;
std::vector<std::string> ranges;
boost::split(ranges, range, boost::is_any_of(","));
for (auto& pair : ranges)
{
std::vector<std::string> minAndMax;
boost::split(minAndMax, pair, boost::is_any_of("-"));
if (minAndMax.size() == 1)
{
uint32_t sequence = std::stoll(minAndMax[0]);
pairs.push_back(std::make_pair(sequence, sequence));
}
else
{
assert(minAndMax.size() == 2);
uint32_t min = std::stoll(minAndMax[0]);
uint32_t max = std::stoll(minAndMax[1]);
pairs.push_back(std::make_pair(min, max));
}
}
std::sort(pairs.begin(), pairs.end(), [](auto left, auto right) { return left.first < right.first; });
// we only hold the lock here, to avoid blocking while string processing
std::lock_guard lck(mtx_);
validatedLedgers_ = std::move(pairs);
validatedLedgersRaw_ = range;
}
/**
* @return the validated range of this source
* @note this is only used by server_info
*/
std::string
getValidatedRange() const
{
std::lock_guard lck(mtx_);
return validatedLedgersRaw_;
}
/**
* @brief Fetch the specified ledger
*
* @param ledgerSequence sequence of the ledger to fetch @getObjects whether to get the account state diff between
* this ledger and the prior one
* @return the extracted data and the result status
*/
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects = true, bool getObjectNeighbors = false) override
fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) override
{
org::xrpl::rpc::v1::GetLedgerResponse response;
if (!stub_)
@@ -429,7 +413,7 @@ public:
org::xrpl::rpc::v1::GetLedgerRequest request;
grpc::ClientContext context;
request.mutable_ledger()->set_sequence(ledgerSequence);
request.mutable_ledger()->set_sequence(sequence);
request.set_transactions(true);
request.set_expand(true);
request.set_get_objects(getObjects);
@@ -448,9 +432,6 @@ public:
return {status, std::move(response)};
}
/**
* @brief Produces a human-readable string with info about the source
*/
std::string
toString() const override
{
@@ -458,10 +439,6 @@ public:
", grpc port: " + grpcPort_ + "}";
}
/**
* @brief Produces stats for this source in a json object
* @return json object with stats
*/
boost::json::object
toJson() const override
{
@@ -482,15 +459,8 @@ public:
return res;
}
/**
* @brief Download a ledger in full
*
* @param ledgerSequence sequence of the ledger to download
* @param writeQueue queue to push downloaded ledger objects
* @return true if the download was successful
*/
std::pair<std::vector<std::string>, bool>
loadInitialLedger(std::uint32_t ledgerSequence, std::uint32_t numMarkers, bool cacheOnly = false) override
loadInitialLedger(std::uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) override
{
if (!stub_)
return {{}, false};
@@ -508,10 +478,10 @@ public:
if (i + 1 < markers.size())
nextMarker = markers[i + 1];
calls.emplace_back(ledgerSequence, markers[i], nextMarker);
calls.emplace_back(sequence, markers[i], nextMarker);
}
log_.debug() << "Starting data download for ledger " << ledgerSequence << ". Using source = " << toString();
log_.debug() << "Starting data download for ledger " << sequence << ". Using source = " << toString();
for (auto& c : calls)
c.call(stub_, cq);
@@ -564,60 +534,19 @@ public:
return {std::move(edgeKeys), !abort};
}
/**
* @brief Attempt to reconnect to the ETL source
*/
void
reconnect(boost::beast::error_code ec)
std::optional<boost::json::object>
forwardToRippled(boost::json::object const& request, std::string const& clientIp, boost::asio::yield_context yield)
const override
{
if (paused_)
return;
if (isConnected())
hooks_.onDisconnected(ec);
connected_ = false;
readBuffer_ = {};
// These are somewhat normal errors. operation_aborted occurs on shutdown,
// when the timer is cancelled. connection_refused will occur repeatedly
std::string err = ec.message();
// if we cannot connect to the transaction processing process
if (ec.category() == boost::asio::error::get_ssl_category())
if (auto resp = forwardCache_.get(request); resp)
{
err = std::string(" (") + boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value())) + "," +
boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value())) + ") ";
// ERR_PACK /* crypto/err/err.h */
char buf[128];
::ERR_error_string_n(ec.value(), buf, sizeof(buf));
err += buf;
log_.error() << err;
log_.debug() << "request hit forwardCache";
return resp;
}
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused)
{
log_.error() << "error code = " << ec << " - " << toString();
}
else
{
log_.warn() << "error code = " << ec << " - " << toString();
}
// exponentially increasing timeouts, with a max of 30 seconds
size_t waitTime = std::min(pow(2, numFailures_), 30.0);
numFailures_++;
timer_.expires_after(boost::asio::chrono::seconds(waitTime));
timer_.async_wait([this](auto ec) {
bool startAgain = (ec != boost::asio::error::operation_aborted);
derived().close(startAgain);
});
return requestFromRippled(request, clientIp, yield);
}
/**
* @brief Pause the source effectively stopping it from trying to reconnect
*/
void
pause() override
{
@@ -625,9 +554,6 @@ public:
derived().close(false);
}
/**
* @brief Resume the source allowing it to reconnect again
*/
void
resume() override
{
@@ -636,7 +562,10 @@ public:
}
/**
* @brief Callback for resolving the server host
* @brief Callback for resolving the server host.
*
* @param ec The error code
* @param results Result of the resolve operation
*/
void
onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
@@ -656,7 +585,9 @@ public:
}
/**
* @brief Callback for handshake with the server
* @brief Callback for handshake with the server.
*
* @param ec The error code
*/
void
onHandshake(boost::beast::error_code ec)
@@ -692,10 +623,13 @@ public:
}
/**
* @brief Callback for writing data
* @brief Callback for writing data.
*
* @param ec The error code
* @param size Amount of bytes written
*/
void
onWrite(boost::beast::error_code ec, size_t size)
onWrite(boost::beast::error_code ec, [[maybe_unused]] size_t size)
{
if (ec)
reconnect(ec);
@@ -704,7 +638,10 @@ public:
}
/**
* @brief Callback for data available to read
* @brief Callback for data available to read.
*
* @param ec The error code
* @param size Amount of bytes read
*/
void
onRead(boost::beast::error_code ec, size_t size)
@@ -721,8 +658,10 @@ public:
}
/**
* @brief Handle the most recently received message
* @return true if the message was handled successfully. false on error
* @brief Handle the most recently received message.
*
* @param size Amount of bytes available in the read buffer
* @return true if the message was handled successfully; false otherwise
*/
bool
handleMessage(size_t size)
@@ -802,23 +741,6 @@ public:
}
}
/**
* @brief Forward a request to rippled
* @return response wrapped in an optional on success; nullopt otherwise
*/
std::optional<boost::json::object>
forwardToRippled(boost::json::object const& request, std::string const& clientIp, boost::asio::yield_context yield)
const override
{
if (auto resp = forwardCache_.get(request); resp)
{
log_.debug() << "request hit forwardCache";
return resp;
}
return requestFromRippled(request, clientIp, yield);
}
protected:
Derived&
derived()
@@ -831,47 +753,172 @@ protected:
{
resolver_.async_resolve(ip_, wsPort_, [this](auto ec, auto results) { onResolve(ec, results); });
}
void
reconnect(boost::beast::error_code ec)
{
if (paused_)
return;
if (isConnected())
hooks_.onDisconnected(ec);
connected_ = false;
readBuffer_ = {};
// These are somewhat normal errors. operation_aborted occurs on shutdown,
// when the timer is cancelled. connection_refused will occur repeatedly
std::string err = ec.message();
// if we cannot connect to the transaction processing process
if (ec.category() == boost::asio::error::get_ssl_category())
{
err = std::string(" (") + boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value())) + "," +
boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value())) + ") ";
// ERR_PACK /* crypto/err/err.h */
char buf[128];
::ERR_error_string_n(ec.value(), buf, sizeof(buf));
err += buf;
log_.error() << err;
}
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused)
{
log_.error() << "error code = " << ec << " - " << toString();
}
else
{
log_.warn() << "error code = " << ec << " - " << toString();
}
// exponentially increasing timeouts, with a max of 30 seconds
size_t waitTime = std::min(pow(2, numFailures_), 30.0);
numFailures_++;
timer_.expires_after(boost::asio::chrono::seconds(waitTime));
timer_.async_wait([this](auto ec) {
bool startAgain = (ec != boost::asio::error::operation_aborted);
derived().close(startAgain);
});
}
private:
void
setLastMsgTime()
{
std::lock_guard lck(lastMsgTimeMtx_);
lastMsgTime_ = std::chrono::system_clock::now();
}
std::chrono::system_clock::time_point
getLastMsgTime() const
{
std::lock_guard lck(lastMsgTimeMtx_);
return lastMsgTime_;
}
void
setValidatedRange(std::string const& range)
{
std::vector<std::pair<uint32_t, uint32_t>> pairs;
std::vector<std::string> ranges;
boost::split(ranges, range, boost::is_any_of(","));
for (auto& pair : ranges)
{
std::vector<std::string> minAndMax;
boost::split(minAndMax, pair, boost::is_any_of("-"));
if (minAndMax.size() == 1)
{
uint32_t sequence = std::stoll(minAndMax[0]);
pairs.push_back(std::make_pair(sequence, sequence));
}
else
{
assert(minAndMax.size() == 2);
uint32_t min = std::stoll(minAndMax[0]);
uint32_t max = std::stoll(minAndMax[1]);
pairs.push_back(std::make_pair(min, max));
}
}
std::sort(pairs.begin(), pairs.end(), [](auto left, auto right) { return left.first < right.first; });
// we only hold the lock here, to avoid blocking while string processing
std::lock_guard lck(mtx_);
validatedLedgers_ = std::move(pairs);
validatedLedgersRaw_ = range;
}
std::string
getValidatedRange() const
{
std::lock_guard lck(mtx_);
return validatedLedgersRaw_;
}
};
/**
* @brief Implementation of a source that uses a regular, non-secure websocket connection.
*/
class PlainSource : public SourceImpl<PlainSource>
{
using StreamType = boost::beast::websocket::stream<boost::beast::tcp_stream>;
std::unique_ptr<StreamType> ws_;
public:
/**
* @brief Create a non-secure ETL source.
*
* @param config The configuration to use
* @param ioc The io_context to run on
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager
* @param validatedLedgers The network validated ledgers datastructure
* @param balancer Load balancer to use
* @param hooks Hooks to use for connect/disconnect events
*/
PlainSource(
util::Config const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
LoadBalancer& balancer,
SourceHooks hooks)
: SourceImpl(config, ioc, backend, subscriptions, nwvl, balancer, std::move(hooks))
: SourceImpl(config, ioc, backend, subscriptions, validatedLedgers, balancer, std::move(hooks))
, ws_(std::make_unique<StreamType>(strand_))
{
}
/**
* @brief Callback for connection to the server
* @brief Callback for connection to the server.
*
* @param ec The error code
* @param endpoint The resolved endpoint
*/
void
onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
/**
* @brief Close the websocket
* @param startAgain whether to reconnect
* @brief Close the websocket.
*
* @param startAgain Whether to automatically reconnect
*/
void
close(bool startAgain);
boost::beast::websocket::stream<boost::beast::tcp_stream>&
/** @return The underlying TCP stream */
StreamType&
ws()
{
return *ws_;
}
};
/**
* @brief Implementation of a source that uses a secure websocket connection.
*/
class SslSource : public SourceImpl<SslSource>
{
using StreamType = boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>;
@@ -879,44 +926,64 @@ class SslSource : public SourceImpl<SslSource>
std::unique_ptr<StreamType> ws_;
public:
/**
* @brief Create a secure ETL source.
*
* @param config The configuration to use
* @param ioc The io_context to run on
* @param sslCtx The SSL context if any
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager
* @param validatedLedgers The network validated ledgers datastructure
* @param balancer Load balancer to use
* @param hooks Hooks to use for connect/disconnect events
*/
SslSource(
util::Config const& config,
boost::asio::io_context& ioc,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
LoadBalancer& balancer,
SourceHooks hooks)
: SourceImpl(config, ioc, backend, subscriptions, nwvl, balancer, std::move(hooks))
: SourceImpl(config, ioc, backend, subscriptions, validatedLedgers, balancer, std::move(hooks))
, sslCtx_(sslCtx)
, ws_(std::make_unique<StreamType>(strand_, *sslCtx_))
{
}
/**
* @brief Callback for connection to the server
* @brief Callback for connection to the server.
*
* @param ec The error code
* @param endpoint The resolved endpoint
*/
void
onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
/**
* @brief Callback for SSL handshake completion
* @brief Callback for SSL handshake completion.
*
* @param ec The error code
* @param endpoint The resolved endpoint
*/
void
onSslHandshake(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
/**
* @brief Close the websocket
* @param startAgain whether to reconnect
* @brief Close the websocket.
*
* @param startAgain Whether to automatically reconnect
*/
void
close(bool startAgain);
/** @return The underlying SSL stream */
StreamType&
ws()
{
return *ws_;
}
};
} // namespace etl
} // namespace etl