20 #include <ripple/app/reporting/ETLSource.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_reader.h>
24 #include <ripple/json/json_writer.h>
35 , ioc_(etl.getApplication().getIOService())
36 , ws_(
std::make_unique<
38 boost::asio::make_strand(ioc_)))
39 , resolver_(
boost::asio::make_strand(ioc_))
40 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
41 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
42 , app_(etl_.getApplication())
56 , ioc_(etl.getApplication().getIOService())
57 , ws_(
std::make_unique<
59 boost::asio::make_strand(ioc_)))
60 , resolver_(
boost::asio::make_strand(ioc_))
61 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
62 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
63 , app_(etl_.getApplication())
75 <<
"Using IP to connect to ETL source: " << connectionString;
81 <<
"Using DNS to connect to ETL source: " << connectionString;
85 stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
87 connectionString, grpc::InsecureChannelCredentials()));
104 if (ec != boost::asio::error::operation_aborted &&
105 ec != boost::asio::error::connection_refused)
108 <<
"error code = " << ec <<
" - " <<
toString();
113 <<
"error code = " << ec <<
" - " <<
toString();
119 <<
" - etl is stopping. aborting reconnect";
126 timer_.expires_after(boost::asio::chrono::seconds(waitTime));
127 timer_.async_wait([
this, fname = __func__](
auto ec) {
128 bool startAgain = (ec != boost::asio::error::operation_aborted);
129 JLOG(
journal_.
trace()) << fname <<
" async_wait : ec = " << ec;
138 ioc_.post([
this, startAgain]() {
149 boost::beast::websocket::close_code::normal,
150 [this, startAgain, fname = __func__](auto ec) {
153 JLOG(journal_.error())
154 << fname <<
" async_close : "
155 <<
"error code = " << ec <<
" - " << toString();
172 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
174 auto const host = ip_;
175 auto const port = wsPort_;
177 resolver_.async_resolve(
178 host, port, [
this](
auto ec,
auto results) { onResolve(ec, results); });
182 ETLSource::onResolve(
183 boost::beast::error_code ec,
184 boost::asio::ip::tcp::resolver::results_type results)
186 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
195 boost::beast::get_lowest_layer(*ws_).expires_after(
197 boost::beast::get_lowest_layer(*ws_).async_connect(
198 results, [
this](
auto ec,
auto ep) { onConnect(ec, ep); });
203 ETLSource::onConnect(
204 boost::beast::error_code ec,
205 boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
207 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
219 boost::beast::get_lowest_layer(*ws_).expires_never();
223 boost::beast::websocket::stream_base::timeout::suggested(
224 boost::beast::role_type::client));
227 ws_->set_option(boost::beast::websocket::stream_base::decorator(
228 [](boost::beast::websocket::request_type& req) {
230 boost::beast::http::field::user_agent,
232 " websocket-client-async");
240 ws_->async_handshake(host,
"/", [
this](
auto ec) { onHandshake(ec); });
245 ETLSource::onHandshake(boost::beast::error_code ec)
247 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
257 jv[
"command"] =
"subscribe";
261 jv[
"streams"].
append(ledgerStream);
263 jv[
"streams"].
append(txnStream);
266 JLOG(journal_.trace()) <<
"Sending subscribe stream message";
269 boost::asio::buffer(fastWriter.
write(jv)),
270 [
this](
auto ec,
size_t size) { onWrite(ec, size); });
275 ETLSource::onWrite(boost::beast::error_code ec,
size_t bytesWritten)
277 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
287 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
292 ETLSource::onRead(boost::beast::error_code ec,
size_t size)
294 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
304 boost::beast::flat_buffer buffer;
305 swap(readBuffer_, buffer);
307 JLOG(journal_.trace())
308 << __func__ <<
" : calling async_read - " << toString();
310 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
315 ETLSource::handleMessage()
317 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
326 static_cast<char const*
>(readBuffer_.data().data()), response))
328 JLOG(journal_.error())
330 <<
"Error parsing stream message."
331 <<
" Message = " << readBuffer_.data().data();
335 uint32_t ledgerIndex = 0;
338 if (response[
"result"].isMember(jss::ledger_index))
340 ledgerIndex = response[
"result"][jss::ledger_index].
asUInt();
342 if (response[jss::result].isMember(jss::validated_ledgers))
345 response[jss::result][jss::validated_ledgers].asString());
347 JLOG(journal_.debug())
349 <<
"Received a message on ledger "
350 <<
" subscription stream. Message : "
355 if (response.
isMember(jss::transaction))
357 if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(
this))
359 etl_.getApplication().getOPs().forwardProposedTransaction(
365 JLOG(journal_.debug())
367 <<
"Received a message on ledger "
368 <<
" subscription stream. Message : "
370 if (response.
isMember(jss::ledger_index))
372 ledgerIndex = response[jss::ledger_index].
asUInt();
374 if (response.
isMember(jss::validated_ledgers))
377 response[jss::validated_ledgers].asString());
382 if (ledgerIndex != 0)
384 JLOG(journal_.trace())
386 <<
"Pushing ledger sequence = " << ledgerIndex <<
" - "
388 networkValidatedLedgers_.push(ledgerIndex);
394 JLOG(journal_.error()) <<
"Exception in handleMessage : " << e.
what();
421 request_.mutable_ledger()->set_sequence(seq);
424 request_.set_marker(marker.
data(), marker.
size());
426 request_.set_user(
"ETL");
429 nextPrefix_ = nextMarker->data()[0];
431 unsigned char prefix = marker.
data()[0];
433 JLOG(journal_.
debug())
434 <<
"Setting up AsyncCallData. marker = " <<
strHex(marker)
438 assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
440 cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
442 next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
444 context_ = std::make_unique<grpc::ClientContext>();
451 grpc::CompletionQueue& cq,
455 JLOG(journal_.
debug()) <<
"Processing calldata";
458 JLOG(journal_.
error()) <<
"AsyncCallData aborted";
459 return CallStatus::ERRORED;
463 JLOG(journal_.
debug()) <<
"AsyncCallData status_ not ok: "
464 <<
" code = " << status_.error_code()
465 <<
" message = " << status_.error_message();
466 return CallStatus::ERRORED;
468 if (!next_->is_unlimited())
470 JLOG(journal_.
warn())
471 <<
"AsyncCallData is_unlimited is false. Make sure "
472 "secure_gateway is set correctly at the ETL source";
481 if (cur_->marker().size() == 0)
485 unsigned char prefix = cur_->marker()[0];
486 if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
492 request_.set_marker(std::move(cur_->marker()));
496 for (
auto& obj : cur_->ledger_objects().objects())
498 auto key = uint256::fromVoid(obj.key().data());
499 auto& data = obj.data();
507 return more ? CallStatus::MORE : CallStatus::DONE;
513 grpc::CompletionQueue& cq)
515 context_ = std::make_unique<grpc::ClientContext>();
518 org::xrpl::rpc::v1::GetLedgerDataResponse>>
519 rpc(stub->PrepareAsyncGetLedgerData(context_.
get(), request_, &cq));
523 rpc->Finish(next_.
get(), &status_,
this);
529 if (next_->marker().size() == 0)
537 ETLSource::loadInitialLedger(
544 grpc::CompletionQueue cq;
553 for (
size_t i = 0; i < markers.size(); ++i)
556 if (i + 1 < markers.size())
557 nextMarker = markers[i + 1];
558 calls.
emplace_back(markers[i], nextMarker, sequence, journal_);
561 JLOG(journal_.debug()) <<
"Starting data download for ledger " << sequence
562 <<
". Using source = " << toString();
564 for (
auto& c : calls)
567 size_t numFinished = 0;
569 while (numFinished < calls.size() && !etl_.isStopping() &&
578 JLOG(journal_.error()) <<
"loadInitialLedger - ok is false";
584 JLOG(journal_.debug())
585 <<
"Marker prefix = " << ptr->getMarkerPrefix();
586 auto result = ptr->process(stub_, cq, writeQueue, abort);
587 if (result != AsyncCallData::CallStatus::MORE)
590 JLOG(journal_.debug())
591 <<
"Finished a marker. "
592 <<
"Current number of finished = " << numFinished;
594 if (result == AsyncCallData::CallStatus::ERRORED)
604 ETLSource::fetchLedger(uint32_t ledgerSequence,
bool getObjects)
606 org::xrpl::rpc::v1::GetLedgerResponse response;
608 return {{grpc::StatusCode::INTERNAL,
"No Stub"}, response};
611 org::xrpl::rpc::v1::GetLedgerRequest request;
612 grpc::ClientContext context;
613 request.mutable_ledger()->set_sequence(ledgerSequence);
614 request.set_transactions(
true);
615 request.set_expand(
true);
616 request.set_get_objects(getObjects);
617 request.set_user(
"ETL");
618 grpc::Status status = stub_->GetLedger(&context, request, &response);
619 if (status.ok() && !response.is_unlimited())
621 JLOG(journal_.warn()) <<
"ETLSource::fetchLedger - is_unlimited is "
622 "false. Make sure secure_gateway is set "
623 "correctly on the ETL source. source = "
627 return {status, std::move(response)};
632 , journal_(etl_.getApplication().journal(
"ReportingETL::LoadBalancer"))
643 std::make_unique<ETLSource>(host, websocketPort, grpcPort,
etl_);
645 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
653 std::make_unique<ETLSource>(host, websocketPort,
etl_);
655 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
665 [
this, &sequence, &writeQueue](
auto& source) {
666 bool res = source->loadInitialLedger(sequence, writeQueue);
669 JLOG(
journal_.
error()) <<
"Failed to download initial ledger. "
670 <<
" Sequence = " << sequence
671 <<
" source = " << source->toString();
681 org::xrpl::rpc::v1::GetLedgerResponse response;
683 [&response, ledgerSequence, getObjects,
this](
auto& source) {
684 auto [status, data] =
685 source->fetchLedger(ledgerSequence, getObjects);
686 response = std::move(data);
687 if (status.ok() && response.validated())
690 <<
"Successfully fetched ledger = " << ledgerSequence
691 <<
" from source = " << source->toString();
697 <<
"Error getting ledger = " << ledgerSequence
698 <<
" Reply : " << response.DebugString()
699 <<
" error_code : " << status.error_code()
700 <<
" error_msg : " << status.error_message()
701 <<
" source = " << source->toString();
717 srand((
unsigned)time(0));
718 auto sourceIdx = rand() %
sources_.size();
719 auto numAttempts = 0;
720 while (numAttempts <
sources_.size())
722 auto stub =
sources_[sourceIdx]->getP2pForwardingStub();
725 sourceIdx = (sourceIdx + 1) %
sources_.size();
740 srand((
unsigned)time(0));
741 auto sourceIdx = rand() %
sources_.size();
742 auto numAttempts = 0;
743 while (numAttempts <
sources_.size())
745 res =
sources_[sourceIdx]->forwardToP2p(context);
746 if (!res.
isMember(
"forwarded") || res[
"forwarded"] !=
true)
748 sourceIdx = (sourceIdx + 1) %
sources_.size();
766 return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
771 grpc::InsecureChannelCredentials()));
783 JLOG(
journal_.
debug()) <<
"Attempting to forward request to tx. "
790 <<
"Attempted to proxy but failed to connect to tx";
793 namespace beast = boost::beast;
794 namespace http = beast::http;
795 namespace websocket = beast::websocket;
797 using tcp = boost::asio::ip::tcp;
805 tcp::resolver resolver{ioc};
808 auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
811 auto const results = resolver.resolve(
ip_,
wsPort_);
815 net::connect(ws->next_layer(), results.begin(), results.end());
821 ws->set_option(websocket::stream_base::decorator(
822 [&context](websocket::request_type& req) {
824 http::field::user_agent,
826 " websocket-client-coro");
828 http::field::forwarded,
835 ws->handshake(
ip_,
"/");
841 ws->write(net::buffer(fastWriter.
write(request)));
843 beast::flat_buffer buffer;
848 static_cast<char const*
>(buffer.data().data()), response))
851 response[jss::error] =
"Error parsing response from tx";
855 response[
"forwarded"] =
true;
865 template <
class Func>
869 srand((
unsigned)time(0));
870 auto sourceIdx = rand() %
sources_.size();
871 auto numAttempts = 0;
879 <<
"Attempting to execute func. ledger sequence = "
880 << ledgerSequence <<
" - source = " << source->toString();
881 if (source->hasLedger(ledgerSequence))
883 bool res = f(source);
888 <<
"Successfully executed func at source = "
889 << source->toString()
890 <<
" - ledger sequence = " << ledgerSequence;
897 <<
"Failed to execute func at source = "
898 << source->toString()
899 <<
" - ledger sequence = " << ledgerSequence;
906 <<
"Ledger not present at source = " << source->toString()
907 <<
" - ledger sequence = " << ledgerSequence;
909 sourceIdx = (sourceIdx + 1) %
sources_.size();
911 if (numAttempts %
sources_.size() == 0)
921 <<
"Error executing function. "
922 <<
" Tried all sources, but ledger was found in db."
923 <<
" Sequence = " << ledgerSequence;
928 <<
"Error executing function "
929 <<
" - ledger sequence = " << ledgerSequence
930 <<
" - Tried all sources. Sleeping and trying again";