Fix Linux/gcc compilation (#795)

Fixes #803
This commit is contained in:
Alex Kremer
2023-08-02 13:44:03 +01:00
committed by GitHub
parent 98d0a963dc
commit 24f69acd9e
81 changed files with 1259 additions and 1282 deletions

View File

@@ -22,6 +22,8 @@
#include <backend/BackendInterface.h>
#include <config/Config.h>
#include <etl/ETLHelpers.h>
#include <etl/LoadBalancer.h>
#include <etl/impl/AsyncData.h>
#include <etl/impl/ForwardCache.h>
#include <log/Logger.h>
#include <subscriptions/SubscriptionManager.h>
@@ -37,8 +39,6 @@
#include <boost/uuid/uuid_generators.hpp>
#include <grpcpp/grpcpp.h>
class LoadBalancer;
class Source;
class ProbingSource;
class SubscriptionManager;
@@ -82,7 +82,7 @@ public:
loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) = 0;
virtual std::optional<boost::json::object>
forwardToRippled(boost::json::object const& request, std::string const& clientIp, boost::asio::yield_context& yield)
forwardToRippled(boost::json::object const& request, std::string const& clientIp, boost::asio::yield_context yield)
const = 0;
virtual boost::uuids::uuid
@@ -107,7 +107,7 @@ private:
requestFromRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const = 0;
boost::asio::yield_context yield) const = 0;
};
/**
@@ -130,10 +130,6 @@ class SourceImpl : public Source
std::string wsPort_;
std::string grpcPort_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
boost::asio::ip::tcp::resolver resolver_;
boost::beast::flat_buffer readBuffer_;
std::vector<std::pair<uint32_t, uint32_t>> validatedLedgers_;
std::string validatedLedgersRaw_{"N/A"};
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers_;
@@ -160,8 +156,12 @@ protected:
std::string ip_;
size_t numFailures_ = 0;
boost::asio::io_context& ioc_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
boost::asio::steady_timer timer_;
boost::asio::ip::tcp::resolver resolver_;
boost::beast::flat_buffer readBuffer_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
std::atomic_bool closing_{false};
std::atomic_bool paused_{false};
@@ -183,14 +183,14 @@ public:
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
LoadBalancer& balancer,
SourceHooks hooks)
: resolver_(boost::asio::make_strand(ioContext))
, networkValidatedLedgers_(networkValidatedLedgers)
: networkValidatedLedgers_(networkValidatedLedgers)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, forwardCache_(config, ioContext, *this)
, ioc_(ioContext)
, timer_(boost::asio::make_strand(ioContext))
, strand_(boost::asio::make_strand(ioContext))
, timer_(strand_)
, resolver_(strand_)
, hooks_(hooks)
{
static boost::uuids::random_generator uuidGenerator;
@@ -255,7 +255,83 @@ public:
requestFromRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const override;
boost::asio::yield_context yield) const override
{
log_.trace() << "Attempting to forward request to tx. "
<< "request = " << boost::json::serialize(request);
boost::json::object response;
if (!connected_)
{
log_.error() << "Attempted to proxy but failed to connect to tx";
return {};
}
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
try
{
auto executor = boost::asio::get_associated_executor(yield);
boost::beast::error_code ec;
tcp::resolver resolver{executor};
auto ws = std::make_unique<websocket::stream<beast::tcp_stream>>(executor);
auto const results = resolver.async_resolve(ip_, wsPort_, yield[ec]);
if (ec)
return {};
ws->next_layer().expires_after(std::chrono::seconds(3));
ws->next_layer().async_connect(results, yield[ec]);
if (ec)
return {};
// Set a decorator to change the User-Agent of the handshake and to tell rippled to charge the client IP for
// RPC resources. See "secure_gateway" in
// https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
ws->set_option(websocket::stream_base::decorator([&clientIp](websocket::request_type& req) {
req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
req.set(http::field::forwarded, "for=" + clientIp);
}));
ws->async_handshake(ip_, "/", yield[ec]);
if (ec)
return {};
ws->async_write(net::buffer(boost::json::serialize(request)), yield[ec]);
if (ec)
return {};
beast::flat_buffer buffer;
ws->async_read(buffer, yield[ec]);
if (ec)
return {};
auto begin = static_cast<char const*>(buffer.data().data());
auto end = begin + buffer.data().size();
auto parsed = boost::json::parse(std::string(begin, end));
if (!parsed.is_object())
{
log_.error() << "Error parsing response: " << std::string{begin, end};
return {};
}
response = parsed.as_object();
response["forwarded"] = true;
return response;
}
catch (std::exception const& e)
{
log_.error() << "Encountered exception : " << e.what();
return {};
}
}
/**
* @param sequence ledger sequence to check for
@@ -339,7 +415,34 @@ public:
* @return the extracted data and the result status
*/
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects = true, bool getObjectNeighbors = false) override;
fetchLedger(uint32_t ledgerSequence, bool getObjects = true, bool getObjectNeighbors = false) override
{
org::xrpl::rpc::v1::GetLedgerResponse response;
if (!stub_)
return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
// Ledger header with txns and metadata
org::xrpl::rpc::v1::GetLedgerRequest request;
grpc::ClientContext context;
request.mutable_ledger()->set_sequence(ledgerSequence);
request.set_transactions(true);
request.set_expand(true);
request.set_get_objects(getObjects);
request.set_get_object_neighbors(getObjectNeighbors);
request.set_user("ETL");
grpc::Status status = stub_->GetLedger(&context, request, &response);
if (status.ok() && !response.is_unlimited())
{
log_.warn()
<< "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. source = "
<< toString() << "; status = " << status.error_message();
}
return {status, std::move(response)};
}
/**
* @brief Produces a human-readable string with info about the source
@@ -383,13 +486,130 @@ public:
* @return true if the download was successful
*/
std::pair<std::vector<std::string>, bool>
loadInitialLedger(std::uint32_t ledgerSequence, std::uint32_t numMarkers, bool cacheOnly = false) override;
loadInitialLedger(std::uint32_t ledgerSequence, std::uint32_t numMarkers, bool cacheOnly = false) override
{
if (!stub_)
return {{}, false};
grpc::CompletionQueue cq;
void* tag;
bool ok = false;
std::vector<clio::detail::AsyncCallData> calls;
auto markers = getMarkers(numMarkers);
for (size_t i = 0; i < markers.size(); ++i)
{
std::optional<ripple::uint256> nextMarker;
if (i + 1 < markers.size())
nextMarker = markers[i + 1];
calls.emplace_back(ledgerSequence, markers[i], nextMarker);
}
log_.debug() << "Starting data download for ledger " << ledgerSequence << ". Using source = " << toString();
for (auto& c : calls)
c.call(stub_, cq);
size_t numFinished = 0;
bool abort = false;
size_t incr = 500000;
size_t progress = incr;
std::vector<std::string> edgeKeys;
while (numFinished < calls.size() && cq.Next(&tag, &ok))
{
assert(tag);
auto ptr = static_cast<clio::detail::AsyncCallData*>(tag);
if (!ok)
{
log_.error() << "loadInitialLedger - ok is false";
return {{}, false}; // handle cancelled
}
else
{
log_.trace() << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly);
if (result != clio::detail::AsyncCallData::CallStatus::MORE)
{
++numFinished;
log_.debug() << "Finished a marker. "
<< "Current number of finished = " << numFinished;
std::string lastKey = ptr->getLastKey();
if (lastKey.size())
edgeKeys.push_back(ptr->getLastKey());
}
if (result == clio::detail::AsyncCallData::CallStatus::ERRORED)
abort = true;
if (backend_->cache().size() > progress)
{
log_.info() << "Downloaded " << backend_->cache().size() << " records from rippled";
progress += incr;
}
}
}
log_.info() << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
return {std::move(edgeKeys), !abort};
}
/**
* @brief Attempt to reconnect to the ETL source
*/
void
reconnect(boost::beast::error_code ec);
reconnect(boost::beast::error_code ec)
{
if (paused_)
return;
if (connected_)
hooks_.onDisconnected(ec);
connected_ = false;
readBuffer_ = {};
// These are somewhat normal errors. operation_aborted occurs on shutdown,
// when the timer is cancelled. connection_refused will occur repeatedly
std::string err = ec.message();
// if we cannot connect to the transaction processing process
if (ec.category() == boost::asio::error::get_ssl_category())
{
err = std::string(" (") + boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value())) + "," +
boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value())) + ") ";
// ERR_PACK /* crypto/err/err.h */
char buf[128];
::ERR_error_string_n(ec.value(), buf, sizeof(buf));
err += buf;
log_.error() << err;
}
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused)
{
log_.error() << "error code = " << ec << " - " << toString();
}
else
{
log_.warn() << "error code = " << ec << " - " << toString();
}
// exponentially increasing timeouts, with a max of 30 seconds
size_t waitTime = std::min(pow(2, numFailures_), 30.0);
numFailures_++;
timer_.expires_after(boost::asio::chrono::seconds(waitTime));
timer_.async_wait([this](auto ec) {
bool startAgain = (ec != boost::asio::error::operation_aborted);
derived().close(startAgain);
});
}
/**
* @brief Pause the source effectively stopping it from trying to reconnect
@@ -415,46 +635,186 @@ public:
* @brief Callback for resolving the server host
*/
void
onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results);
/**
* @brief Callback for connection to the server
*/
virtual void
onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) = 0;
onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
{
if (ec)
{
// try again
reconnect(ec);
}
else
{
boost::beast::get_lowest_layer(derived().ws()).expires_after(std::chrono::seconds(30));
boost::beast::get_lowest_layer(derived().ws()).async_connect(results, [this](auto ec, auto ep) {
derived().onConnect(ec, ep);
});
}
}
/**
* @brief Callback for handshake with the server
*/
void
onHandshake(boost::beast::error_code ec);
onHandshake(boost::beast::error_code ec)
{
if (auto action = hooks_.onConnected(ec); action == SourceHooks::Action::STOP)
return;
if (ec)
{
// start over
reconnect(ec);
}
else
{
boost::json::object jv{
{"command", "subscribe"},
{"streams", {"ledger", "manifests", "validations", "transactions_proposed"}},
};
std::string s = boost::json::serialize(jv);
log_.trace() << "Sending subscribe stream message";
derived().ws().set_option(
boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::request_type& req) {
req.set(
boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " clio-client");
req.set("X-User", "coro-client");
}));
// Send subscription message
derived().ws().async_write(boost::asio::buffer(s), [this](auto ec, size_t size) { onWrite(ec, size); });
}
}
/**
* @brief Callback for writing data
*/
void
onWrite(boost::beast::error_code ec, size_t size);
onWrite(boost::beast::error_code ec, size_t size)
{
if (ec)
reconnect(ec);
else
derived().ws().async_read(readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
}
/**
* @brief Callback for data available to read
*/
void
onRead(boost::beast::error_code ec, size_t size);
onRead(boost::beast::error_code ec, size_t size)
{
if (ec)
{
reconnect(ec);
}
else
{
handleMessage(size);
derived().ws().async_read(readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
}
}
/**
* @brief Handle the most recently received message
* @return true if the message was handled successfully. false on error
*/
bool
handleMessage(size_t size);
handleMessage(size_t size)
{
setLastMsgTime();
connected_ = true;
try
{
auto const msg = boost::beast::buffers_to_string(readBuffer_.data());
readBuffer_.consume(size);
auto const raw = boost::json::parse(msg);
auto const response = raw.as_object();
uint32_t ledgerIndex = 0;
if (response.contains("result"))
{
auto const& result = response.at("result").as_object();
if (result.contains("ledger_index"))
ledgerIndex = result.at("ledger_index").as_int64();
if (result.contains("validated_ledgers"))
{
auto const& validatedLedgers = result.at("validated_ledgers").as_string();
setValidatedRange({validatedLedgers.data(), validatedLedgers.size()});
}
log_.info() << "Received a message on ledger "
<< " subscription stream. Message : " << response << " - " << toString();
}
else if (response.contains("type") && response.at("type") == "ledgerClosed")
{
log_.info() << "Received a message on ledger "
<< " subscription stream. Message : " << response << " - " << toString();
if (response.contains("ledger_index"))
{
ledgerIndex = response.at("ledger_index").as_int64();
}
if (response.contains("validated_ledgers"))
{
auto const& validatedLedgers = response.at("validated_ledgers").as_string();
setValidatedRange({validatedLedgers.data(), validatedLedgers.size()});
}
}
else
{
if (balancer_.shouldPropagateTxnStream(this))
{
if (response.contains("transaction"))
{
forwardCache_.freshen();
subscriptions_->forwardProposedTransaction(response);
}
else if (response.contains("type") && response.at("type") == "validationReceived")
{
subscriptions_->forwardValidation(response);
}
else if (response.contains("type") && response.at("type") == "manifestReceived")
{
subscriptions_->forwardManifest(response);
}
}
}
if (ledgerIndex != 0)
{
log_.trace() << "Pushing ledger sequence = " << ledgerIndex << " - " << toString();
networkValidatedLedgers_->push(ledgerIndex);
}
return true;
}
catch (std::exception const& e)
{
log_.error() << "Exception in handleMessage : " << e.what();
return false;
}
}
/**
* @brief Forward a request to rippled
* @return response wrapped in an optional on success; nullopt otherwise
*/
std::optional<boost::json::object>
forwardToRippled(boost::json::object const& request, std::string const& clientIp, boost::asio::yield_context& yield)
const override;
forwardToRippled(boost::json::object const& request, std::string const& clientIp, boost::asio::yield_context yield)
const override
{
if (auto resp = forwardCache_.get(request); resp)
{
log_.debug() << "request hit forwardCache";
return resp;
}
return requestFromRippled(request, clientIp, yield);
}
protected:
Derived&
@@ -466,18 +826,14 @@ protected:
void
run() override
{
log_.trace() << toString();
auto const host = ip_;
auto const port = wsPort_;
resolver_.async_resolve(host, port, [this](auto ec, auto results) { onResolve(ec, results); });
resolver_.async_resolve(ip_, wsPort_, [this](auto ec, auto results) { onResolve(ec, results); });
}
};
class PlainSource : public SourceImpl<PlainSource>
{
std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>> ws_;
using StreamType = boost::beast::websocket::stream<boost::beast::tcp_stream>;
std::unique_ptr<StreamType> ws_;
public:
PlainSource(
@@ -489,8 +845,7 @@ public:
LoadBalancer& balancer,
SourceHooks hooks)
: SourceImpl(config, ioc, backend, subscriptions, nwvl, balancer, std::move(hooks))
, ws_(std::make_unique<boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc)))
, ws_(std::make_unique<StreamType>(strand_))
{
}
@@ -498,8 +853,7 @@ public:
* @brief Callback for connection to the server
*/
void
onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
override;
onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
/**
* @brief Close the websocket
@@ -517,9 +871,9 @@ public:
class SslSource : public SourceImpl<SslSource>
{
using StreamType = boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>;
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx_;
std::unique_ptr<boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>> ws_;
std::unique_ptr<StreamType> ws_;
public:
SslSource(
@@ -533,9 +887,7 @@ public:
SourceHooks hooks)
: SourceImpl(config, ioc, backend, subscriptions, nwvl, balancer, std::move(hooks))
, sslCtx_(sslCtx)
, ws_(std::make_unique<boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>>(
boost::asio::make_strand(ioc_),
*sslCtx_))
, ws_(std::make_unique<StreamType>(strand_, *sslCtx_))
{
}
@@ -543,8 +895,7 @@ public:
* @brief Callback for connection to the server
*/
void
onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
override;
onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
/**
* @brief Callback for SSL handshake completion
@@ -559,7 +910,7 @@ public:
void
close(bool startAgain);
boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>&
StreamType&
ws()
{
return *ws_;