mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 01:08:01 +00:00
108
src/etl/Source.h
108
src/etl/Source.h
@@ -38,11 +38,11 @@
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_generators.hpp>
|
||||
#include <grpcpp/grpcpp.h>
|
||||
#include <utility>
|
||||
|
||||
class ProbingSource;
|
||||
namespace feed {
|
||||
class SubscriptionManager;
|
||||
}
|
||||
} // namespace feed
|
||||
|
||||
// TODO: we use Source so that we can store a vector of Sources
|
||||
// but we also use CRTP for implementation of the common logic - this is a bit strange because CRTP as used here is
|
||||
@@ -51,6 +51,7 @@ class SubscriptionManager;
|
||||
// things into the base class instead.
|
||||
|
||||
namespace etl {
|
||||
class ProbingSource;
|
||||
|
||||
/**
|
||||
* @brief Base class for all ETL sources.
|
||||
@@ -206,7 +207,7 @@ class SourceImpl : public Source
|
||||
LoadBalancer& balancer_;
|
||||
|
||||
etl::detail::ForwardCache forwardCache_;
|
||||
boost::uuids::uuid uuid_;
|
||||
boost::uuids::uuid uuid_{};
|
||||
|
||||
protected:
|
||||
std::string ip_;
|
||||
@@ -245,15 +246,15 @@ public:
|
||||
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
|
||||
LoadBalancer& balancer,
|
||||
SourceHooks hooks)
|
||||
: networkValidatedLedgers_(validatedLedgers)
|
||||
, backend_(backend)
|
||||
, subscriptions_(subscriptions)
|
||||
: networkValidatedLedgers_(std::move(validatedLedgers))
|
||||
, backend_(std::move(backend))
|
||||
, subscriptions_(std::move(subscriptions))
|
||||
, balancer_(balancer)
|
||||
, forwardCache_(config, ioc, *this)
|
||||
, strand_(boost::asio::make_strand(ioc))
|
||||
, timer_(strand_)
|
||||
, resolver_(strand_)
|
||||
, hooks_(hooks)
|
||||
, hooks_(std::move(hooks))
|
||||
{
|
||||
static boost::uuids::random_generator uuidGenerator;
|
||||
uuid_ = uuidGenerator();
|
||||
@@ -266,7 +267,7 @@ public:
|
||||
grpcPort_ = *value;
|
||||
try
|
||||
{
|
||||
boost::asio::ip::tcp::endpoint endpoint{boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)};
|
||||
boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)};
|
||||
std::stringstream ss;
|
||||
ss << endpoint;
|
||||
grpc::ChannelArguments chArgs;
|
||||
@@ -282,7 +283,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
~SourceImpl()
|
||||
~SourceImpl() override
|
||||
{
|
||||
derived().close(false);
|
||||
}
|
||||
@@ -316,7 +317,7 @@ public:
|
||||
}
|
||||
|
||||
namespace beast = boost::beast;
|
||||
namespace http = beast::http;
|
||||
namespace http = boost::beast::http;
|
||||
namespace websocket = beast::websocket;
|
||||
namespace net = boost::asio;
|
||||
using tcp = boost::asio::ip::tcp;
|
||||
@@ -324,7 +325,7 @@ public:
|
||||
try
|
||||
{
|
||||
auto executor = boost::asio::get_associated_executor(yield);
|
||||
boost::beast::error_code ec;
|
||||
beast::error_code ec;
|
||||
tcp::resolver resolver{executor};
|
||||
|
||||
auto ws = std::make_unique<websocket::stream<beast::tcp_stream>>(executor);
|
||||
@@ -384,14 +385,14 @@ public:
|
||||
bool
|
||||
hasLedger(uint32_t sequence) const override
|
||||
{
|
||||
std::lock_guard lck(mtx_);
|
||||
std::lock_guard const lck(mtx_);
|
||||
for (auto& pair : validatedLedgers_)
|
||||
{
|
||||
if (sequence >= pair.first && sequence <= pair.second)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else if (sequence < pair.first)
|
||||
if (sequence < pair.first)
|
||||
{
|
||||
// validatedLedgers_ is a sorted list of disjoint ranges
|
||||
// if the sequence comes before this range, the sequence will
|
||||
@@ -420,7 +421,7 @@ public:
|
||||
request.set_get_object_neighbors(getObjectNeighbors);
|
||||
request.set_user("ETL");
|
||||
|
||||
grpc::Status status = stub_->GetLedger(&context, request, &response);
|
||||
grpc::Status const status = stub_->GetLedger(&context, request, &response);
|
||||
|
||||
if (status.ok() && !response.is_unlimited())
|
||||
{
|
||||
@@ -452,9 +453,11 @@ public:
|
||||
|
||||
auto last = getLastMsgTime();
|
||||
if (last.time_since_epoch().count() != 0)
|
||||
{
|
||||
res["last_msg_age_seconds"] = std::to_string(
|
||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastMsgTime())
|
||||
.count());
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
@@ -466,7 +469,7 @@ public:
|
||||
return {{}, false};
|
||||
|
||||
grpc::CompletionQueue cq;
|
||||
void* tag;
|
||||
void* tag = nullptr;
|
||||
bool ok = false;
|
||||
std::vector<etl::detail::AsyncCallData> calls;
|
||||
auto markers = getMarkers(numMarkers);
|
||||
@@ -488,7 +491,7 @@ public:
|
||||
|
||||
size_t numFinished = 0;
|
||||
bool abort = false;
|
||||
size_t incr = 500000;
|
||||
size_t const incr = 500000;
|
||||
size_t progress = incr;
|
||||
std::vector<std::string> edgeKeys;
|
||||
|
||||
@@ -502,31 +505,29 @@ public:
|
||||
LOG(log_.error()) << "loadInitialLedger - ok is false";
|
||||
return {{}, false}; // handle cancelled
|
||||
}
|
||||
else
|
||||
|
||||
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
|
||||
auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly);
|
||||
if (result != etl::detail::AsyncCallData::CallStatus::MORE)
|
||||
{
|
||||
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
++numFinished;
|
||||
LOG(log_.debug()) << "Finished a marker. "
|
||||
<< "Current number of finished = " << numFinished;
|
||||
|
||||
auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly);
|
||||
if (result != etl::detail::AsyncCallData::CallStatus::MORE)
|
||||
{
|
||||
++numFinished;
|
||||
LOG(log_.debug()) << "Finished a marker. "
|
||||
<< "Current number of finished = " << numFinished;
|
||||
std::string const lastKey = ptr->getLastKey();
|
||||
|
||||
std::string lastKey = ptr->getLastKey();
|
||||
if (!lastKey.empty())
|
||||
edgeKeys.push_back(ptr->getLastKey());
|
||||
}
|
||||
|
||||
if (lastKey.size())
|
||||
edgeKeys.push_back(ptr->getLastKey());
|
||||
}
|
||||
if (result == etl::detail::AsyncCallData::CallStatus::ERRORED)
|
||||
abort = true;
|
||||
|
||||
if (result == etl::detail::AsyncCallData::CallStatus::ERRORED)
|
||||
abort = true;
|
||||
|
||||
if (backend_->cache().size() > progress)
|
||||
{
|
||||
LOG(log_.info()) << "Downloaded " << backend_->cache().size() << " records from rippled";
|
||||
progress += incr;
|
||||
}
|
||||
if (backend_->cache().size() > progress)
|
||||
{
|
||||
LOG(log_.info()) << "Downloaded " << backend_->cache().size() << " records from rippled";
|
||||
progress += incr;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -577,7 +578,9 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::beast::get_lowest_layer(derived().ws()).expires_after(std::chrono::seconds(30));
|
||||
static constexpr std::size_t LOWEST_LAYER_TIMEOUT_SECONDS = 30;
|
||||
boost::beast::get_lowest_layer(derived().ws())
|
||||
.expires_after(std::chrono::seconds(LOWEST_LAYER_TIMEOUT_SECONDS));
|
||||
boost::beast::get_lowest_layer(derived().ws()).async_connect(results, [this](auto ec, auto ep) {
|
||||
derived().onConnect(ec, ep);
|
||||
});
|
||||
@@ -602,7 +605,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::json::object jv{
|
||||
boost::json::object const jv{
|
||||
{"command", "subscribe"},
|
||||
{"streams", {"ledger", "manifests", "validations", "transactions_proposed"}},
|
||||
};
|
||||
@@ -632,9 +635,13 @@ public:
|
||||
onWrite(boost::beast::error_code ec, [[maybe_unused]] size_t size)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
reconnect(ec);
|
||||
}
|
||||
else
|
||||
{
|
||||
derived().ws().async_read(readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -757,6 +764,7 @@ protected:
|
||||
void
|
||||
reconnect(boost::beast::error_code ec)
|
||||
{
|
||||
static constexpr std::size_t BUFFER_SIZE = 128;
|
||||
if (paused_)
|
||||
return;
|
||||
|
||||
@@ -776,7 +784,7 @@ protected:
|
||||
boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value())) + ") ";
|
||||
|
||||
// ERR_PACK /* crypto/err/err.h */
|
||||
char buf[128];
|
||||
char buf[BUFFER_SIZE];
|
||||
::ERR_error_string_n(ec.value(), buf, sizeof(buf));
|
||||
err += buf;
|
||||
|
||||
@@ -793,11 +801,11 @@ protected:
|
||||
}
|
||||
|
||||
// exponentially increasing timeouts, with a max of 30 seconds
|
||||
size_t waitTime = std::min(pow(2, numFailures_), 30.0);
|
||||
size_t const waitTime = std::min(pow(2, numFailures_), 30.0);
|
||||
numFailures_++;
|
||||
timer_.expires_after(boost::asio::chrono::seconds(waitTime));
|
||||
timer_.async_wait([this](auto ec) {
|
||||
bool startAgain = (ec != boost::asio::error::operation_aborted);
|
||||
bool const startAgain = (ec != boost::asio::error::operation_aborted);
|
||||
derived().close(startAgain);
|
||||
});
|
||||
}
|
||||
@@ -806,14 +814,14 @@ private:
|
||||
void
|
||||
setLastMsgTime()
|
||||
{
|
||||
std::lock_guard lck(lastMsgTimeMtx_);
|
||||
std::lock_guard const lck(lastMsgTimeMtx_);
|
||||
lastMsgTime_ = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
std::chrono::system_clock::time_point
|
||||
getLastMsgTime() const
|
||||
{
|
||||
std::lock_guard lck(lastMsgTimeMtx_);
|
||||
std::lock_guard const lck(lastMsgTimeMtx_);
|
||||
return lastMsgTime_;
|
||||
}
|
||||
|
||||
@@ -831,21 +839,21 @@ private:
|
||||
|
||||
if (minAndMax.size() == 1)
|
||||
{
|
||||
uint32_t sequence = std::stoll(minAndMax[0]);
|
||||
pairs.push_back(std::make_pair(sequence, sequence));
|
||||
uint32_t const sequence = std::stoll(minAndMax[0]);
|
||||
pairs.emplace_back(sequence, sequence);
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(minAndMax.size() == 2);
|
||||
uint32_t min = std::stoll(minAndMax[0]);
|
||||
uint32_t max = std::stoll(minAndMax[1]);
|
||||
pairs.push_back(std::make_pair(min, max));
|
||||
uint32_t const min = std::stoll(minAndMax[0]);
|
||||
uint32_t const max = std::stoll(minAndMax[1]);
|
||||
pairs.emplace_back(min, max);
|
||||
}
|
||||
}
|
||||
std::sort(pairs.begin(), pairs.end(), [](auto left, auto right) { return left.first < right.first; });
|
||||
|
||||
// we only hold the lock here, to avoid blocking while string processing
|
||||
std::lock_guard lck(mtx_);
|
||||
std::lock_guard const lck(mtx_);
|
||||
validatedLedgers_ = std::move(pairs);
|
||||
validatedLedgersRaw_ = range;
|
||||
}
|
||||
@@ -853,7 +861,7 @@ private:
|
||||
std::string
|
||||
getValidatedRange() const
|
||||
{
|
||||
std::lock_guard lck(mtx_);
|
||||
std::lock_guard const lck(mtx_);
|
||||
return validatedLedgersRaw_;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user