builds with ETL source

This commit is contained in:
CJ Cobb
2020-12-14 22:49:14 -05:00
parent f107d14bdd
commit 833df2b3a4
4 changed files with 1303 additions and 8 deletions

View File

@@ -9,7 +9,9 @@
project(reporting)
cmake_minimum_required(VERSION 3.17)
FIND_PACKAGE( Boost 1.70 COMPONENTS thread REQUIRED )
set (CMAKE_CXX_STANDARD 17)
FIND_PACKAGE( Boost 1.75 COMPONENTS thread log REQUIRED )
add_executable (reporting
websocket_server_async.cpp
@@ -77,8 +79,9 @@ target_include_directories (grpc_pbufs SYSTEM PUBLIC ${GRPC_GEN_DIR})
target_link_libraries (grpc_pbufs ${_REFLECTION} ${_PROTOBUF_LIBPROTOBUF} ${_GRPC_GRPCPP})
target_sources(reporting PRIVATE reporting/ETLSource.cpp)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR} ${Boost_INCLUDE_DIR})
TARGET_LINK_LIBRARIES(reporting LINK_PUBLIC ${Boost_LIBRARIES} grpc_pbufs)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR} ${Boost_INCLUDE_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
TARGET_LINK_LIBRARIES(reporting LINK_PUBLIC ${Boost_LIBRARIES} ${Boost_LOG_LIBRARY} grpc_pbufs)

View File

@@ -1,5 +0,0 @@
package message;
message Message {
repeated int32 id = 1;
}

893
reporting/ETLSource.cpp Normal file
View File

@@ -0,0 +1,893 @@
#define BOOST_LOG_DYN_LINK 1
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <reporting/ETLSource.h>
#include <boost/log/trivial.hpp>
#include <boost/asio/strand.hpp>
namespace ripple {
// Create ETL source without grpc endpoint
// Fetch ledger and load initial ledger will fail for this source
// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource::ETLSource(std::string ip, std::string wsPort)
: ip_(ip)
, wsPort_(wsPort)
, ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc_)))
, resolver_(boost::asio::make_strand(ioc_))
, timer_(ioc_)
{
}
ETLSource::ETLSource(
std::string ip,
std::string wsPort,
std::string grpcPort)
: ip_(ip)
, wsPort_(wsPort)
, grpcPort_(grpcPort)
, ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc_)))
, resolver_(boost::asio::make_strand(ioc_))
, timer_(ioc_)
{
try
{
boost::asio::ip::tcp::endpoint endpoint{
boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)};
std::stringstream ss;
ss << endpoint;
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
grpc::CreateChannel(ss.str(),
grpc::InsecureChannelCredentials()));
BOOST_LOG_TRIVIAL(debug)
<< "Made stub for remote = " << toString();
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(debug)
<< "Exception while creating stub = " << e.what()
<< " . Remote = " << toString();
}
}
void
ETLSource::reconnect(boost::beast::error_code ec)
{
connected_ = false;
// These are somewhat normal errors. operation_aborted occurs on shutdown,
// when the timer is cancelled. connection_refused will occur repeatedly
// if we cannot connect to the transaction processing process
if (ec != boost::asio::error::operation_aborted &&
ec != boost::asio::error::connection_refused)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " : "
<< "error code = " << ec << " - " << toString();
}
else
{
BOOST_LOG_TRIVIAL(warning) << __func__ << " : "
<< "error code = " << ec << " - " << toString();
}
// exponentially increasing timeouts, with a max of 30 seconds
size_t waitTime = std::min(pow(2, numFailures_), 30.0);
numFailures_++;
timer_.expires_after(boost::asio::chrono::seconds(waitTime));
timer_.async_wait([this](auto ec) {
bool startAgain = (ec != boost::asio::error::operation_aborted);
BOOST_LOG_TRIVIAL(trace) << __func__ << " async_wait : ec = " << ec;
close(startAgain);
});
}
void
ETLSource::close(bool startAgain)
{
timer_.cancel();
ioc_.post([this, startAgain]() {
if (closing_)
return;
if (ws_->is_open())
{
// onStop() also calls close(). If the async_close is called twice,
// an assertion fails. Using closing_ makes sure async_close is only
// called once
closing_ = true;
ws_->async_close(
boost::beast::websocket::close_code::normal,
[this, startAgain](auto ec) {
if (ec)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " async_close : "
<< "error code = " << ec << " - " << toString();
}
closing_ = false;
if (startAgain)
start();
});
}
else if (startAgain)
{
start();
}
});
}
void
ETLSource::start()
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString();
auto const host = ip_;
auto const port = wsPort_;
resolver_.async_resolve(
host, port, [this](auto ec, auto results) { onResolve(ec, results); });
}
void
ETLSource::onResolve(
boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type results)
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
<< toString();
if (ec)
{
// try again
reconnect(ec);
}
else
{
boost::beast::get_lowest_layer(*ws_).expires_after(
std::chrono::seconds(30));
boost::beast::get_lowest_layer(*ws_).async_connect(
results, [this](auto ec, auto ep) { onConnect(ec, ep); });
}
}
void
ETLSource::onConnect(
boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
<< toString();
if (ec)
{
// start over
reconnect(ec);
}
else
{
numFailures_ = 0;
// Turn off timeout on the tcp stream, because websocket stream has it's
// own timeout system
boost::beast::get_lowest_layer(*ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_->set_option(
boost::beast::websocket::stream_base::timeout::suggested(
boost::beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_->set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type& req) {
req.set(
boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-async");
}));
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
auto host = ip_ + ':' + std::to_string(endpoint.port());
// Perform the websocket handshake
ws_->async_handshake(host, "/", [this](auto ec) { onHandshake(ec); });
}
}
void
ETLSource::onHandshake(boost::beast::error_code ec)
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
<< toString();
if (ec)
{
// start over
reconnect(ec);
}
else
{
/*
Json::Value jv;
jv["command"] = "subscribe";
jv["streams"] = Json::arrayValue;
Json::Value ledgerStream("ledger");
jv["streams"].append(ledgerStream);
Json::Value txnStream("transactions_proposed");
jv["streams"].append(txnStream);
Json::FastWriter fastWriter;
*/
BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message";
// Send the message
ws_->async_write(
boost::asio::buffer("foo"),
[this](auto ec, size_t size) { onWrite(ec, size); });
}
}
void
ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
<< toString();
if (ec)
{
// start over
reconnect(ec);
}
else
{
ws_->async_read(
readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
}
}
void
ETLSource::onRead(boost::beast::error_code ec, size_t size)
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
<< toString();
// if error or error reading message, start over
if (ec)
{
reconnect(ec);
}
else
{
handleMessage();
boost::beast::flat_buffer buffer;
swap(readBuffer_, buffer);
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : calling async_read - " << toString();
ws_->async_read(
readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
}
}
bool
ETLSource::handleMessage()
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString();
setLastMsgTime();
connected_ = true;
try
{
/*
Json::Value response;
Json::Reader reader;
if (!reader.parse(
static_cast<char const*>(readBuffer_.data().data()), response))
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Error parsing stream message."
<< " Message = " << readBuffer_.data().data();
return false;
}
uint32_t ledgerIndex = 0;
if (response.isMember("result"))
{
if (response["result"].isMember(jss::ledger_index))
{
ledgerIndex = response["result"][jss::ledger_index].asUInt();
}
if (response[jss::result].isMember(jss::validated_ledgers))
{
setValidatedRange(
response[jss::result][jss::validated_ledgers].asString());
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Received a message on ledger "
<< " subscription stream. Message : "
<< response.toStyledString() << " - " << toString();
}
else
{
if (response.isMember(jss::transaction))
{
if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this))
{
etl_.getApplication().getOPs().forwardProposedTransaction(
response);
}
}
else
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Received a message on ledger "
<< " subscription stream. Message : "
<< response.toStyledString() << " - " << toString();
if (response.isMember(jss::ledger_index))
{
ledgerIndex = response[jss::ledger_index].asUInt();
}
if (response.isMember(jss::validated_ledgers))
{
setValidatedRange(
response[jss::validated_ledgers].asString());
}
}
}
if (ledgerIndex != 0)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Pushing ledger sequence = " << ledgerIndex << " - "
<< toString();
networkValidatedLedgers_.push(ledgerIndex);
}
*/
return true;
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error) << "Exception in handleMessage : " << e.what();
return false;
}
}
class AsyncCallData
{
std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> cur_;
std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> next_;
org::xrpl::rpc::v1::GetLedgerDataRequest request_;
std::unique_ptr<grpc::ClientContext> context_;
grpc::Status status_;
public:
AsyncCallData(
uint32_t seq)
{
request_.mutable_ledger()->set_sequence(seq);
request_.set_user("ETL");
cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
context_ = std::make_unique<grpc::ClientContext>();
}
enum class CallStatus { MORE, DONE, ERRORED };
CallStatus
process(
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq,
bool abort = false)
{
std::cout << "Processing calldata" << std::endl;
if (abort)
{
std::cout << "AsyncCallData aborted";
return CallStatus::ERRORED;
}
if (!status_.ok())
{
BOOST_LOG_TRIVIAL(debug) << "AsyncCallData status_ not ok: "
<< " code = " << status_.error_code()
<< " message = " << status_.error_message();
return CallStatus::ERRORED;
}
if (!next_->is_unlimited())
{
BOOST_LOG_TRIVIAL(warning)
<< "AsyncCallData is_unlimited is false. Make sure "
"secure_gateway is set correctly at the ETL source";
assert(false);
}
std::swap(cur_, next_);
bool more = true;
// if no marker returned, we are done
if (cur_->marker().size() == 0)
more = false;
// if we are not done, make the next async call
if (more)
{
request_.set_marker(std::move(cur_->marker()));
call(stub, cq);
}
for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects()))
{
/*
flatMapBackend.store(
std::move(*obj.mutable_key()),
request_.ledger().sequence(),
std::move(*obj.mutable_data()));
*/
}
return more ? CallStatus::MORE : CallStatus::DONE;
}
void
call(
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq)
{
context_ = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncResponseReader<
org::xrpl::rpc::v1::GetLedgerDataResponse>>
rpc(stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq));
rpc->StartCall();
rpc->Finish(next_.get(), &status_, this);
}
std::string
getMarkerPrefix()
{
if (next_->marker().size() == 0)
return "";
else
return std::string{next_->marker().data()[0]};
}
};
bool
ETLSource::loadInitialLedger(uint32_t sequence)
{
if (!stub_)
return false;
grpc::CompletionQueue cq;
void* tag;
bool ok = false;
std::vector<AsyncCallData> calls;
calls.emplace_back(sequence);
BOOST_LOG_TRIVIAL(debug) << "Starting data download for ledger " << sequence
<< ". Using source = " << toString();
for (auto& c : calls)
c.call(stub_, cq);
size_t numFinished = 0;
bool abort = false;
while (numFinished < calls.size() &&
cq.Next(&tag, &ok))
{
assert(tag);
auto ptr = static_cast<AsyncCallData*>(tag);
if (!ok)
{
BOOST_LOG_TRIVIAL(error) << "loadInitialLedger - ok is false";
return false;
// handle cancelled
}
else
{
BOOST_LOG_TRIVIAL(debug)
<< "Marker prefix = " << ptr->getMarkerPrefix();
auto result =
ptr->process(stub_, cq, abort);
if (result != AsyncCallData::CallStatus::MORE)
{
numFinished++;
BOOST_LOG_TRIVIAL(debug)
<< "Finished a marker. "
<< "Current number of finished = " << numFinished;
}
if (result == AsyncCallData::CallStatus::ERRORED)
{
abort = true;
}
}
}
return !abort;
}
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
{
org::xrpl::rpc::v1::GetLedgerResponse response;
if (!stub_)
return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
// ledger header with txns and metadata
org::xrpl::rpc::v1::GetLedgerRequest request;
grpc::ClientContext context;
request.mutable_ledger()->set_sequence(ledgerSequence);
request.set_transactions(true);
request.set_expand(true);
request.set_get_objects(getObjects);
request.set_user("ETL");
grpc::Status status = stub_->GetLedger(&context, request, &response);
if (status.ok() && !response.is_unlimited())
{
BOOST_LOG_TRIVIAL(warning)
<< "ETLSource::fetchLedger - is_unlimited is "
"false. Make sure secure_gateway is set "
"correctly on the ETL source. source = "
<< toString() << " response = " << response.DebugString()
<< " status = " << status.error_message();
assert(false);
}
return {status, std::move(response)};
}
/*
ETLLoadBalancer::ETLLoadBalancer(ReportingETL& etl)
: etl_(etl)
, journal_(etl_.getApplication().journal("ReportingETL::LoadBalancer"))
{
}
void
ETLLoadBalancer::add(
std::string& host,
std::string& websocketPort,
std::string& grpcPort)
{
std::unique_ptr<ETLSource> ptr =
std::make_unique<ETLSource>(host, websocketPort, grpcPort, etl_);
sources_.push_back(std::move(ptr));
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "
<< sources_.back()->toString();
}
void
ETLLoadBalancer::add(std::string& host, std::string& websocketPort)
{
std::unique_ptr<ETLSource> ptr =
std::make_unique<ETLSource>(host, websocketPort, etl_);
sources_.push_back(std::move(ptr));
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "
<< sources_.back()->toString();
}
void
ETLLoadBalancer::loadInitialLedger(uint32_t sequence)
{
execute(
[this, &sequence](auto& source) {
bool res = source->loadInitialLedger(sequence);
if (!res)
{
BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger. "
<< " Sequence = " << sequence
<< " source = " << source->toString();
}
return res;
},
sequence);
}
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects)
{
org::xrpl::rpc::v1::GetLedgerResponse response;
bool success = execute(
[&response, ledgerSequence, getObjects, this](auto& source) {
auto [status, data] =
source->fetchLedger(ledgerSequence, getObjects);
response = std::move(data);
if (status.ok() && response.validated())
{
BOOST_LOG_TRIVIAL(info)
<< "Successfully fetched ledger = " << ledgerSequence
<< " from source = " << source->toString();
return true;
}
else
{
BOOST_LOG_TRIVIAL(warning)
<< "Error getting ledger = " << ledgerSequence
<< " Reply : " << response.DebugString()
<< " error_code : " << status.error_code()
<< " error_msg : " << status.error_message()
<< " source = " << source->toString();
return false;
}
},
ledgerSequence);
if (success)
return response;
else
return {};
}
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
ETLLoadBalancer::getP2pForwardingStub() const
{
if (sources_.size() == 0)
return nullptr;
srand((unsigned)time(0));
auto sourceIdx = rand() % sources_.size();
auto numAttempts = 0;
while (numAttempts < sources_.size())
{
auto stub = sources_[sourceIdx]->getP2pForwardingStub();
if (!stub)
{
sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts;
continue;
}
return stub;
}
return nullptr;
}
Json::Value
ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const
{
Json::Value res;
if (sources_.size() == 0)
return res;
srand((unsigned)time(0));
auto sourceIdx = rand() % sources_.size();
auto numAttempts = 0;
while (numAttempts < sources_.size())
{
res = sources_[sourceIdx]->forwardToP2p(context);
if (!res.isMember("forwarded") || res["forwarded"] != true)
{
sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts;
continue;
}
return res;
}
RPC::Status err = {rpcFAILED_TO_FORWARD};
err.inject(res);
return res;
}
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
ETLSource::getP2pForwardingStub() const
{
if (!connected_)
return nullptr;
try
{
return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
grpc::CreateChannel(
beast::IP::Endpoint(
boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
.to_string(),
grpc::InsecureChannelCredentials()));
}
catch (std::exception const&)
{
BOOST_LOG_TRIVIAL(error) << "Failed to create grpc stub";
return nullptr;
}
}
Json::Value
ETLSource::forwardToP2p(RPC::JsonContext& context) const
{
BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. "
<< "request = " << context.params.toStyledString();
Json::Value response;
if (!connected_)
{
BOOST_LOG_TRIVIAL(error)
<< "Attempted to proxy but failed to connect to tx";
return response;
}
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
Json::Value& request = context.params;
try
{
// The io_context is required for all I/O
net::io_context ioc;
// These objects perform our I/O
tcp::resolver resolver{ioc};
BOOST_LOG_TRIVIAL(debug) << "Creating websocket";
auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
// Look up the domain name
auto const results = resolver.resolve(ip_, wsPort_);
BOOST_LOG_TRIVIAL(debug) << "Connecting websocket";
// Make the connection on the IP address we get from a lookup
net::connect(ws->next_layer(), results.begin(), results.end());
// Set a decorator to change the User-Agent of the handshake
// and to tell rippled to charge the client IP for RPC
// resources. See "secure_gateway" in
// https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
ws->set_option(websocket::stream_base::decorator(
[&context](websocket::request_type& req) {
req.set(
http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-coro");
req.set(
http::field::forwarded,
"for=" + context.consumer.to_string());
}));
BOOST_LOG_TRIVIAL(debug) << "client ip: " << context.consumer.to_string();
BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake";
// Perform the websocket handshake
ws->handshake(ip_, "/");
Json::FastWriter fastWriter;
BOOST_LOG_TRIVIAL(debug) << "Sending request";
// Send the message
ws->write(net::buffer(fastWriter.write(request)));
beast::flat_buffer buffer;
ws->read(buffer);
Json::Reader reader;
if (!reader.parse(
static_cast<char const*>(buffer.data().data()), response))
{
BOOST_LOG_TRIVIAL(error) << "Error parsing response";
response[jss::error] = "Error parsing response from tx";
}
BOOST_LOG_TRIVIAL(debug) << "Successfully forward request";
response["forwarded"] = true;
return response;
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error) << "Encountered exception : " << e.what();
return response;
}
}
template <class Func>
bool
ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
{
srand((unsigned)time(0));
auto sourceIdx = rand() % sources_.size();
auto numAttempts = 0;
while (!etl_.isStopping())
{
auto& source = sources_[sourceIdx];
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Attempting to execute func. ledger sequence = "
<< ledgerSequence << " - source = " << source->toString();
if (source->hasLedger(ledgerSequence))
{
bool res = f(source);
if (res)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Successfully executed func at source = "
<< source->toString()
<< " - ledger sequence = " << ledgerSequence;
break;
}
else
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : "
<< "Failed to execute func at source = "
<< source->toString()
<< " - ledger sequence = " << ledgerSequence;
}
}
else
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : "
<< "Ledger not present at source = " << source->toString()
<< " - ledger sequence = " << ledgerSequence;
}
sourceIdx = (sourceIdx + 1) % sources_.size();
numAttempts++;
if (numAttempts % sources_.size() == 0)
{
// If another process loaded the ledger into the database, we can
// abort trying to fetch the ledger from a transaction processing
// process
if (etl_.getApplication().getLedgerMaster().getLedgerBySeq(
ledgerSequence))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : "
<< "Error executing function. "
<< " Tried all sources, but ledger was found in db."
<< " Sequence = " << ledgerSequence;
break;
}
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Error executing function "
<< " - ledger sequence = " << ledgerSequence
<< " - Tried all sources. Sleeping and trying again";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
return !etl_.isStopping();
}
void
ETLLoadBalancer::start()
{
for (auto& source : sources_)
source->start();
}
void
ETLLoadBalancer::stop()
{
for (auto& source : sources_)
source->stop();
}
*/
} // namespace ripple

404
reporting/ETLSource.h Normal file
View File

@@ -0,0 +1,404 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_REPORTING_ETLSOURCE_H_INCLUDED
#define RIPPLE_APP_REPORTING_ETLSOURCE_H_INCLUDED
#include <boost/algorithm/string.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio.hpp>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
namespace ripple {
class ReportingETL;
/// This class manages a connection to a single ETL source. This is almost
/// always a p2p node, but really could be another reporting node. This class
/// subscribes to the ledgers and transactions_proposed streams of the
/// associated p2p node, and keeps track of which ledgers the p2p node has. This
/// class also has methods for extracting said ledgers. Lastly this class
/// forwards transactions received on the transactions_proposed streams to any
/// subscribers.
class ETLSource
{
std::string ip_;
std::string wsPort_;
std::string grpcPort_;
// a reference to the applications io_service
boost::asio::io_context ioc_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>>
ws_;
boost::asio::ip::tcp::resolver resolver_;
boost::beast::flat_buffer readBuffer_;
std::vector<std::pair<uint32_t, uint32_t>> validatedLedgers_;
std::string validatedLedgersRaw_;
//NetworkValidatedLedgers& networkValidatedLedgers_;
//beast::Journal journal_;
mutable std::mutex mtx_;
size_t numFailures_ = 0;
std::atomic_bool closing_{false};
std::atomic_bool connected_{false};
// true if this ETL source is forwarding transactions received on the
// transactions_proposed stream. There are usually multiple ETL sources,
// so to avoid forwarding the same transaction multiple times, we only
// forward from one particular ETL source at a time.
std::atomic_bool forwardingStream_{false};
// The last time a message was received on the ledgers stream
std::chrono::system_clock::time_point lastMsgTime_;
mutable std::mutex lastMsgTimeMtx_;
// used for retrying connections
boost::asio::steady_timer timer_;
public:
bool
isConnected() const
{
return connected_;
}
std::chrono::system_clock::time_point
getLastMsgTime() const
{
std::lock_guard lck(lastMsgTimeMtx_);
return lastMsgTime_;
}
void
setLastMsgTime()
{
std::lock_guard lck(lastMsgTimeMtx_);
lastMsgTime_ = std::chrono::system_clock::now();
}
/// Create ETL source without gRPC endpoint
/// Fetch ledger and load initial ledger will fail for this source
/// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource(std::string ip, std::string wsPort);
/// Create ETL source with gRPC endpoint
ETLSource(
std::string ip,
std::string wsPort,
std::string grpcPort);
/// @param sequence ledger sequence to check for
/// @return true if this source has the desired ledger
bool
hasLedger(uint32_t sequence) const
{
std::lock_guard lck(mtx_);
for (auto& pair : validatedLedgers_)
{
if (sequence >= pair.first && sequence <= pair.second)
{
return true;
}
else if (sequence < pair.first)
{
// validatedLedgers_ is a sorted list of disjoint ranges
// if the sequence comes before this range, the sequence will
// come before all subsequent ranges
return false;
}
}
return false;
}
/// process the validated range received on the ledgers stream. set the
/// appropriate member variable
/// @param range validated range received on ledgers stream
void
setValidatedRange(std::string const& range)
{
std::vector<std::pair<uint32_t, uint32_t>> pairs;
std::vector<std::string> ranges;
boost::split(ranges, range, boost::is_any_of(","));
for (auto& pair : ranges)
{
std::vector<std::string> minAndMax;
boost::split(minAndMax, pair, boost::is_any_of("-"));
if (minAndMax.size() == 1)
{
uint32_t sequence = std::stoll(minAndMax[0]);
pairs.push_back(std::make_pair(sequence, sequence));
}
else
{
assert(minAndMax.size() == 2);
uint32_t min = std::stoll(minAndMax[0]);
uint32_t max = std::stoll(minAndMax[1]);
pairs.push_back(std::make_pair(min, max));
}
}
std::sort(pairs.begin(), pairs.end(), [](auto left, auto right) {
return left.first < right.first;
});
// we only hold the lock here, to avoid blocking while string processing
std::lock_guard lck(mtx_);
validatedLedgers_ = std::move(pairs);
validatedLedgersRaw_ = range;
}
/// @return the validated range of this source
/// @note this is only used by server_info
std::string
getValidatedRange() const
{
std::lock_guard lck(mtx_);
return validatedLedgersRaw_;
}
/// Close the underlying websocket
void
stop()
{
assert(ws_);
close(false);
}
/// Fetch the specified ledger
/// @param ledgerSequence sequence of the ledger to fetch
/// @getObjects whether to get the account state diff between this ledger
/// and the prior one
/// @return the extracted data and the result status
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects = true);
std::string
toString() const
{
return "{ validated_ledger : " + getValidatedRange() +
" , ip : " + ip_ + " , web socket port : " + wsPort_ +
", grpc port : " + grpcPort_ + " }";
}
/// Download a ledger in full
/// @param ledgerSequence sequence of the ledger to download
/// @param writeQueue queue to push downloaded ledger objects
/// @return true if the download was successful
bool
loadInitialLedger(uint32_t ledgerSequence);
/// Begin sequence of operations to connect to the ETL source and subscribe
/// to ledgers and transactions_proposed
void
start();
/// Attempt to reconnect to the ETL source
void
reconnect(boost::beast::error_code ec);
/// Callback
void
onResolve(
boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type results);
/// Callback
void
onConnect(
boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
/// Callback
void
onHandshake(boost::beast::error_code ec);
/// Callback
void
onWrite(boost::beast::error_code ec, size_t size);
/// Callback
void
onRead(boost::beast::error_code ec, size_t size);
/// Handle the most recently received message
/// @return true if the message was handled successfully. false on error
bool
handleMessage();
/// Close the websocket
/// @param startAgain whether to reconnect
void
close(bool startAgain);
/*
/// Get grpc stub to forward requests to p2p node
/// @return stub to send requests to ETL source
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getP2pForwardingStub() const;
*/
};
/*
*
/// This class is used to manage connections to transaction processing processes
/// This class spawns a listener for each etl source, which listens to messages
/// on the ledgers stream (to keep track of which ledgers have been validated by
/// the network, and the range of ledgers each etl source has). This class also
/// allows requests for ledger data to be load balanced across all possible etl
/// sources.
class ETLLoadBalancer
{
private:
ReportingETL& etl_;
beast::Journal journal_;
std::vector<std::unique_ptr<ETLSource>> sources_;
public:
ETLLoadBalancer(ReportingETL& etl);
/// Add an ETL source
/// @param host host or ip of ETL source
/// @param websocketPort port where ETL source accepts websocket connections
/// @param grpcPort port where ETL source accepts gRPC requests
void
add(std::string& host, std::string& websocketPort, std::string& grpcPort);
/// Add an ETL source without gRPC support. This source will send messages
/// on the ledgers and transactions_proposed streams, but will not be able
/// to handle the gRPC requests that are used for ETL
/// @param host host or ip of ETL source
/// @param websocketPort port where ETL source accepts websocket connections
void
add(std::string& host, std::string& websocketPort);
/// Load the initial ledger, writing data to the queue
/// @param sequence sequence of ledger to download
/// @param writeQueue queue to push downloaded data to
void
loadInitialLedger(uint32_t sequence);
/// Fetch data for a specific ledger. This function will continuously try
/// to fetch data for the specified ledger until the fetch succeeds, the
/// ledger is found in the database, or the server is shutting down.
/// @param ledgerSequence sequence of ledger to fetch data for
/// @param getObjects if true, fetch diff between specified ledger and
/// previous
/// @return the extracted data, if extraction was successful. If the ledger
/// was found in the database or the server is shutting down, the optional
/// will be empty
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t ledgerSequence, bool getObjects);
/// Setup all of the ETL sources and subscribe to the necessary streams
void
start();
void
stop();
/// Determine whether messages received on the transactions_proposed stream
/// should be forwarded to subscribing clients. The server subscribes to
/// transactions_proposed on multiple ETLSources, yet only forwards messages
/// from one source at any given time (to avoid sending duplicate messages
/// to clients).
/// @param in ETLSource in question
/// @return true if messages should be forwarded
bool
shouldPropagateTxnStream(ETLSource* in) const
{
for (auto& src : sources_)
{
assert(src);
// We pick the first ETLSource encountered that is connected
if (src->isConnected())
{
if (src.get() == in)
return true;
else
return false;
}
}
// If no sources connected, then this stream has not been forwarded.
return true;
}
Json::Value
toJson() const
{
Json::Value ret(Json::arrayValue);
for (auto& src : sources_)
{
ret.append(src->toJson());
}
return ret;
}
/// Randomly select a p2p node to forward a gRPC request to
/// @return gRPC stub to forward requests to p2p node
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getP2pForwardingStub() const;
/// Forward a JSON RPC request to a randomly selected p2p node
/// @param context context of the request
/// @return response received from p2p node
Json::Value
forwardToP2p(RPC::JsonContext& context) const;
private:
/// f is a function that takes an ETLSource as an argument and returns a
/// bool. Attempt to execute f for one randomly chosen ETLSource that has
/// the specified ledger. If f returns false, another randomly chosen
/// ETLSource is used. The process repeats until f returns true.
/// @param f function to execute. This function takes the ETL source as an
/// argument, and returns a bool.
/// @param ledgerSequence f is executed for each ETLSource that has this
/// ledger
/// @return true if f was eventually executed successfully. false if the
/// ledger was found in the database or the server is shutting down
template <class Func>
bool
execute(Func f, uint32_t ledgerSequence);
};
*/
} // namespace ripple
#endif