mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 19:56:00 +00:00
204
src/etl/Source.h
204
src/etl/Source.h
@@ -60,8 +60,7 @@ class ProbingSource;
|
||||
* Note: Since sources below are implemented via CRTP, it sort of makes no sense to have a virtual base class.
|
||||
* We should consider using a vector of ProbingSources instead of vector of unique ptrs to this virtual base.
|
||||
*/
|
||||
class Source
|
||||
{
|
||||
class Source {
|
||||
public:
|
||||
/** @return true if source is connected; false otherwise */
|
||||
virtual bool
|
||||
@@ -133,7 +132,8 @@ public:
|
||||
forwardToRippled(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& forwardToRippledclientIp,
|
||||
boost::asio::yield_context yield) const = 0;
|
||||
boost::asio::yield_context yield
|
||||
) const = 0;
|
||||
|
||||
/**
|
||||
* @return A token that uniquely identifies this source instance.
|
||||
@@ -166,14 +166,14 @@ private:
|
||||
requestFromRippled(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
boost::asio::yield_context yield) const = 0;
|
||||
boost::asio::yield_context yield
|
||||
) const = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Hooks for source events such as connects and disconnects.
|
||||
*/
|
||||
struct SourceHooks
|
||||
{
|
||||
struct SourceHooks {
|
||||
enum class Action { STOP, PROCEED };
|
||||
|
||||
std::function<Action(boost::beast::error_code)> onConnected;
|
||||
@@ -186,8 +186,7 @@ struct SourceHooks
|
||||
* @tparam Derived The derived class for CRTP
|
||||
*/
|
||||
template <class Derived>
|
||||
class SourceImpl : public Source
|
||||
{
|
||||
class SourceImpl : public Source {
|
||||
std::string wsPort_;
|
||||
std::string grpcPort_;
|
||||
|
||||
@@ -248,7 +247,8 @@ public:
|
||||
std::shared_ptr<feed::SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
|
||||
LoadBalancer& balancer,
|
||||
SourceHooks hooks)
|
||||
SourceHooks hooks
|
||||
)
|
||||
: networkValidatedLedgers_(std::move(validatedLedgers))
|
||||
, backend_(std::move(backend))
|
||||
, subscriptions_(std::move(subscriptions))
|
||||
@@ -265,22 +265,19 @@ public:
|
||||
ip_ = config.valueOr<std::string>("ip", {});
|
||||
wsPort_ = config.valueOr<std::string>("ws_port", {});
|
||||
|
||||
if (auto value = config.maybeValue<std::string>("grpc_port"); value)
|
||||
{
|
||||
if (auto value = config.maybeValue<std::string>("grpc_port"); value) {
|
||||
grpcPort_ = *value;
|
||||
try
|
||||
{
|
||||
try {
|
||||
boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)};
|
||||
std::stringstream ss;
|
||||
ss << endpoint;
|
||||
grpc::ChannelArguments chArgs;
|
||||
chArgs.SetMaxReceiveMessageSize(-1);
|
||||
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
|
||||
grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), chArgs));
|
||||
grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), chArgs)
|
||||
);
|
||||
LOG(log_.debug()) << "Made stub for remote = " << toString();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
} catch (std::exception const& e) {
|
||||
LOG(log_.debug()) << "Exception while creating stub = " << e.what() << " . Remote = " << toString();
|
||||
}
|
||||
}
|
||||
@@ -307,7 +304,8 @@ public:
|
||||
requestFromRippled(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
boost::asio::yield_context yield) const override
|
||||
boost::asio::yield_context yield
|
||||
) const override
|
||||
{
|
||||
LOG(log_.trace()) << "Attempting to forward request to tx. Request = " << boost::json::serialize(request);
|
||||
|
||||
@@ -319,8 +317,7 @@ public:
|
||||
namespace net = boost::asio;
|
||||
using tcp = boost::asio::ip::tcp;
|
||||
|
||||
try
|
||||
{
|
||||
try {
|
||||
auto executor = boost::asio::get_associated_executor(yield);
|
||||
beast::error_code ec;
|
||||
tcp::resolver resolver{executor};
|
||||
@@ -336,8 +333,8 @@ public:
|
||||
if (ec)
|
||||
return {};
|
||||
|
||||
// if client ip is know, change the User-Agent of the handshake and to tell rippled to charge the client IP
|
||||
// for RPC resources. See "secure_gateway" in
|
||||
// if client ip is know, change the User-Agent of the handshake and to tell rippled to charge the client
|
||||
// IP for RPC resources. See "secure_gateway" in
|
||||
// https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
|
||||
|
||||
// TODO: user-agent can be clio-[version]
|
||||
@@ -364,8 +361,7 @@ public:
|
||||
auto end = begin + buffer.data().size();
|
||||
auto parsed = boost::json::parse(std::string(begin, end));
|
||||
|
||||
if (!parsed.is_object())
|
||||
{
|
||||
if (!parsed.is_object()) {
|
||||
LOG(log_.error()) << "Error parsing response: " << std::string{begin, end};
|
||||
return {};
|
||||
}
|
||||
@@ -374,9 +370,7 @@ public:
|
||||
response["forwarded"] = true;
|
||||
|
||||
return response;
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
} catch (std::exception const& e) {
|
||||
LOG(log_.error()) << "Encountered exception : " << e.what();
|
||||
return {};
|
||||
}
|
||||
@@ -386,14 +380,11 @@ public:
|
||||
hasLedger(uint32_t sequence) const override
|
||||
{
|
||||
std::lock_guard const lck(mtx_);
|
||||
for (auto& pair : validatedLedgers_)
|
||||
{
|
||||
if (sequence >= pair.first && sequence <= pair.second)
|
||||
{
|
||||
for (auto& pair : validatedLedgers_) {
|
||||
if (sequence >= pair.first && sequence <= pair.second) {
|
||||
return true;
|
||||
}
|
||||
if (sequence < pair.first)
|
||||
{
|
||||
if (sequence < pair.first) {
|
||||
// validatedLedgers_ is a sorted list of disjoint ranges
|
||||
// if the sequence comes before this range, the sequence will
|
||||
// come before all subsequent ranges
|
||||
@@ -423,11 +414,10 @@ public:
|
||||
|
||||
grpc::Status const status = stub_->GetLedger(&context, request, &response);
|
||||
|
||||
if (status.ok() && !response.is_unlimited())
|
||||
{
|
||||
log_.warn()
|
||||
<< "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. source = "
|
||||
<< toString() << "; status = " << status.error_message();
|
||||
if (status.ok() && !response.is_unlimited()) {
|
||||
log_.warn(
|
||||
) << "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. source = "
|
||||
<< toString() << "; status = " << status.error_message();
|
||||
}
|
||||
|
||||
return {status, std::move(response)};
|
||||
@@ -452,11 +442,11 @@ public:
|
||||
res["grpc_port"] = grpcPort_;
|
||||
|
||||
auto last = getLastMsgTime();
|
||||
if (last.time_since_epoch().count() != 0)
|
||||
{
|
||||
if (last.time_since_epoch().count() != 0) {
|
||||
res["last_msg_age_seconds"] = std::to_string(
|
||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastMsgTime())
|
||||
.count());
|
||||
.count()
|
||||
);
|
||||
}
|
||||
|
||||
return res;
|
||||
@@ -474,8 +464,7 @@ public:
|
||||
std::vector<etl::detail::AsyncCallData> calls;
|
||||
auto markers = getMarkers(numMarkers);
|
||||
|
||||
for (size_t i = 0; i < markers.size(); ++i)
|
||||
{
|
||||
for (size_t i = 0; i < markers.size(); ++i) {
|
||||
std::optional<ripple::uint256> nextMarker;
|
||||
|
||||
if (i + 1 < markers.size())
|
||||
@@ -495,13 +484,11 @@ public:
|
||||
size_t progress = incr;
|
||||
std::vector<std::string> edgeKeys;
|
||||
|
||||
while (numFinished < calls.size() && cq.Next(&tag, &ok))
|
||||
{
|
||||
while (numFinished < calls.size() && cq.Next(&tag, &ok)) {
|
||||
assert(tag);
|
||||
auto ptr = static_cast<etl::detail::AsyncCallData*>(tag);
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
if (!ok) {
|
||||
LOG(log_.error()) << "loadInitialLedger - ok is false";
|
||||
return {{}, false}; // handle cancelled
|
||||
}
|
||||
@@ -509,8 +496,7 @@ public:
|
||||
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
|
||||
auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly);
|
||||
if (result != etl::detail::AsyncCallData::CallStatus::MORE)
|
||||
{
|
||||
if (result != etl::detail::AsyncCallData::CallStatus::MORE) {
|
||||
++numFinished;
|
||||
LOG(log_.debug()) << "Finished a marker. "
|
||||
<< "Current number of finished = " << numFinished;
|
||||
@@ -524,8 +510,7 @@ public:
|
||||
if (result == etl::detail::AsyncCallData::CallStatus::ERRORED)
|
||||
abort = true;
|
||||
|
||||
if (backend_->cache().size() > progress)
|
||||
{
|
||||
if (backend_->cache().size() > progress) {
|
||||
LOG(log_.info()) << "Downloaded " << backend_->cache().size() << " records from rippled";
|
||||
progress += incr;
|
||||
}
|
||||
@@ -539,10 +524,10 @@ public:
|
||||
forwardToRippled(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
boost::asio::yield_context yield) const override
|
||||
boost::asio::yield_context yield
|
||||
) const override
|
||||
{
|
||||
if (auto resp = forwardCache_.get(request); resp)
|
||||
{
|
||||
if (auto resp = forwardCache_.get(request); resp) {
|
||||
LOG(log_.debug()) << "request hit forwardCache";
|
||||
return resp;
|
||||
}
|
||||
@@ -573,13 +558,10 @@ public:
|
||||
void
|
||||
onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
if (ec) {
|
||||
// try again
|
||||
reconnect(ec);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
static constexpr std::size_t LOWEST_LAYER_TIMEOUT_SECONDS = 30;
|
||||
boost::beast::get_lowest_layer(derived().ws())
|
||||
.expires_after(std::chrono::seconds(LOWEST_LAYER_TIMEOUT_SECONDS));
|
||||
@@ -600,13 +582,10 @@ public:
|
||||
if (auto action = hooks_.onConnected(ec); action == SourceHooks::Action::STOP)
|
||||
return;
|
||||
|
||||
if (ec)
|
||||
{
|
||||
if (ec) {
|
||||
// start over
|
||||
reconnect(ec);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
boost::json::object const jv{
|
||||
{"command", "subscribe"},
|
||||
{"streams", {"ledger", "manifests", "validations", "transactions_proposed"}},
|
||||
@@ -617,10 +596,11 @@ public:
|
||||
derived().ws().set_option(
|
||||
boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::request_type& req) {
|
||||
req.set(
|
||||
boost::beast::http::field::user_agent,
|
||||
std::string(BOOST_BEAST_VERSION_STRING) + " clio-client");
|
||||
boost::beast::http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " clio-client"
|
||||
);
|
||||
req.set("X-User", "coro-client");
|
||||
}));
|
||||
})
|
||||
);
|
||||
|
||||
// Send subscription message
|
||||
derived().ws().async_write(boost::asio::buffer(s), [this](auto ec, size_t size) { onWrite(ec, size); });
|
||||
@@ -636,12 +616,9 @@ public:
|
||||
void
|
||||
onWrite(boost::beast::error_code ec, [[maybe_unused]] size_t size)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
if (ec) {
|
||||
reconnect(ec);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
derived().ws().async_read(readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
|
||||
}
|
||||
}
|
||||
@@ -655,12 +632,9 @@ public:
|
||||
void
|
||||
onRead(boost::beast::error_code ec, size_t size)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
if (ec) {
|
||||
reconnect(ec);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
handleMessage(size);
|
||||
derived().ws().async_read(readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
|
||||
}
|
||||
@@ -677,8 +651,7 @@ public:
|
||||
{
|
||||
setLastMsgTime();
|
||||
|
||||
try
|
||||
{
|
||||
try {
|
||||
auto const msg = boost::beast::buffers_to_string(readBuffer_.data());
|
||||
readBuffer_.consume(size);
|
||||
|
||||
@@ -686,65 +659,48 @@ public:
|
||||
auto const response = raw.as_object();
|
||||
uint32_t ledgerIndex = 0;
|
||||
|
||||
if (response.contains("result"))
|
||||
{
|
||||
if (response.contains("result")) {
|
||||
auto const& result = response.at("result").as_object();
|
||||
if (result.contains("ledger_index"))
|
||||
ledgerIndex = result.at("ledger_index").as_int64();
|
||||
|
||||
if (result.contains("validated_ledgers"))
|
||||
{
|
||||
if (result.contains("validated_ledgers")) {
|
||||
auto const& validatedLedgers = result.at("validated_ledgers").as_string();
|
||||
setValidatedRange({validatedLedgers.data(), validatedLedgers.size()});
|
||||
}
|
||||
|
||||
LOG(log_.info()) << "Received a message on ledger "
|
||||
<< " subscription stream. Message : " << response << " - " << toString();
|
||||
}
|
||||
else if (response.contains("type") && response.at("type") == "ledgerClosed")
|
||||
{
|
||||
} else if (response.contains("type") && response.at("type") == "ledgerClosed") {
|
||||
LOG(log_.info()) << "Received a message on ledger "
|
||||
<< " subscription stream. Message : " << response << " - " << toString();
|
||||
if (response.contains("ledger_index"))
|
||||
{
|
||||
if (response.contains("ledger_index")) {
|
||||
ledgerIndex = response.at("ledger_index").as_int64();
|
||||
}
|
||||
if (response.contains("validated_ledgers"))
|
||||
{
|
||||
if (response.contains("validated_ledgers")) {
|
||||
auto const& validatedLedgers = response.at("validated_ledgers").as_string();
|
||||
setValidatedRange({validatedLedgers.data(), validatedLedgers.size()});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (balancer_.shouldPropagateTxnStream(this))
|
||||
{
|
||||
if (response.contains("transaction"))
|
||||
{
|
||||
} else {
|
||||
if (balancer_.shouldPropagateTxnStream(this)) {
|
||||
if (response.contains("transaction")) {
|
||||
forwardCache_.freshen();
|
||||
subscriptions_->forwardProposedTransaction(response);
|
||||
}
|
||||
else if (response.contains("type") && response.at("type") == "validationReceived")
|
||||
{
|
||||
} else if (response.contains("type") && response.at("type") == "validationReceived") {
|
||||
subscriptions_->forwardValidation(response);
|
||||
}
|
||||
else if (response.contains("type") && response.at("type") == "manifestReceived")
|
||||
{
|
||||
} else if (response.contains("type") && response.at("type") == "manifestReceived") {
|
||||
subscriptions_->forwardManifest(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ledgerIndex != 0)
|
||||
{
|
||||
if (ledgerIndex != 0) {
|
||||
LOG(log_.trace()) << "Pushing ledger sequence = " << ledgerIndex << " - " << toString();
|
||||
networkValidatedLedgers_->push(ledgerIndex);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
} catch (std::exception const& e) {
|
||||
LOG(log_.error()) << "Exception in handleMessage : " << e.what();
|
||||
return false;
|
||||
}
|
||||
@@ -780,8 +736,7 @@ protected:
|
||||
// when the timer is cancelled. connection_refused will occur repeatedly
|
||||
std::string err = ec.message();
|
||||
// if we cannot connect to the transaction processing process
|
||||
if (ec.category() == boost::asio::error::get_ssl_category())
|
||||
{
|
||||
if (ec.category() == boost::asio::error::get_ssl_category()) {
|
||||
err = std::string(" (") + boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value())) + "," +
|
||||
boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value())) + ") ";
|
||||
|
||||
@@ -793,12 +748,9 @@ protected:
|
||||
LOG(log_.error()) << err;
|
||||
}
|
||||
|
||||
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused)
|
||||
{
|
||||
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused) {
|
||||
LOG(log_.error()) << "error code = " << ec << " - " << toString();
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG(log_.warn()) << "error code = " << ec << " - " << toString();
|
||||
}
|
||||
|
||||
@@ -833,19 +785,15 @@ private:
|
||||
std::vector<std::pair<uint32_t, uint32_t>> pairs;
|
||||
std::vector<std::string> ranges;
|
||||
boost::split(ranges, range, boost::is_any_of(","));
|
||||
for (auto& pair : ranges)
|
||||
{
|
||||
for (auto& pair : ranges) {
|
||||
std::vector<std::string> minAndMax;
|
||||
|
||||
boost::split(minAndMax, pair, boost::is_any_of("-"));
|
||||
|
||||
if (minAndMax.size() == 1)
|
||||
{
|
||||
if (minAndMax.size() == 1) {
|
||||
uint32_t const sequence = std::stoll(minAndMax[0]);
|
||||
pairs.emplace_back(sequence, sequence);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
assert(minAndMax.size() == 2);
|
||||
uint32_t const min = std::stoll(minAndMax[0]);
|
||||
uint32_t const max = std::stoll(minAndMax[1]);
|
||||
@@ -871,8 +819,7 @@ private:
|
||||
/**
|
||||
* @brief Implementation of a source that uses a regular, non-secure websocket connection.
|
||||
*/
|
||||
class PlainSource : public SourceImpl<PlainSource>
|
||||
{
|
||||
class PlainSource : public SourceImpl<PlainSource> {
|
||||
using StreamType = boost::beast::websocket::stream<boost::beast::tcp_stream>;
|
||||
std::unique_ptr<StreamType> ws_;
|
||||
|
||||
@@ -895,7 +842,8 @@ public:
|
||||
std::shared_ptr<feed::SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
|
||||
LoadBalancer& balancer,
|
||||
SourceHooks hooks)
|
||||
SourceHooks hooks
|
||||
)
|
||||
: SourceImpl(config, ioc, backend, subscriptions, validatedLedgers, balancer, std::move(hooks))
|
||||
, ws_(std::make_unique<StreamType>(strand_))
|
||||
{
|
||||
@@ -929,8 +877,7 @@ public:
|
||||
/**
|
||||
* @brief Implementation of a source that uses a secure websocket connection.
|
||||
*/
|
||||
class SslSource : public SourceImpl<SslSource>
|
||||
{
|
||||
class SslSource : public SourceImpl<SslSource> {
|
||||
using StreamType = boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>;
|
||||
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx_;
|
||||
std::unique_ptr<StreamType> ws_;
|
||||
@@ -956,7 +903,8 @@ public:
|
||||
std::shared_ptr<feed::SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers,
|
||||
LoadBalancer& balancer,
|
||||
SourceHooks hooks)
|
||||
SourceHooks hooks
|
||||
)
|
||||
: SourceImpl(config, ioc, backend, subscriptions, validatedLedgers, balancer, std::move(hooks))
|
||||
, sslCtx_(sslCtx)
|
||||
, ws_(std::make_unique<StreamType>(strand_, *sslCtx_))
|
||||
|
||||
Reference in New Issue
Block a user