20 #include <ripple/app/reporting/ETLSource.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_reader.h>
24 #include <ripple/json/json_writer.h>
35 , ioc_(etl.getApplication().getIOService())
36 , ws_(
std::make_unique<
38 boost::asio::make_strand(ioc_)))
39 , resolver_(
boost::asio::make_strand(ioc_))
40 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
41 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
42 , app_(etl_.getApplication())
56 , ioc_(etl.getApplication().getIOService())
57 , ws_(
std::make_unique<
59 boost::asio::make_strand(ioc_)))
60 , resolver_(
boost::asio::make_strand(ioc_))
61 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
62 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
63 , app_(etl_.getApplication())
68 stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
73 grpc::InsecureChannelCredentials()));
90 if (ec != boost::asio::error::operation_aborted &&
91 ec != boost::asio::error::connection_refused)
94 <<
"error code = " << ec <<
" - " <<
toString();
99 <<
"error code = " << ec <<
" - " <<
toString();
105 <<
" - etl is stopping. aborting reconnect";
112 timer_.expires_after(boost::asio::chrono::seconds(waitTime));
113 timer_.async_wait([
this](
auto ec) {
114 bool startAgain = (ec != boost::asio::error::operation_aborted);
115 JLOG(
journal_.
trace()) << __func__ <<
" async_wait : ec = " << ec;
124 ioc_.post([
this, startAgain]() {
135 boost::beast::websocket::close_code::normal,
136 [this, startAgain](auto ec) {
139 JLOG(journal_.error())
140 << __func__ <<
" async_close : "
141 <<
"error code = " << ec <<
" - " << toString();
158 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
160 auto const host = ip_;
161 auto const port = wsPort_;
163 resolver_.async_resolve(
164 host, port, [
this](
auto ec,
auto results) { onResolve(ec, results); });
168 ETLSource::onResolve(
169 boost::beast::error_code ec,
170 boost::asio::ip::tcp::resolver::results_type results)
172 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
181 boost::beast::get_lowest_layer(*ws_).expires_after(
183 boost::beast::get_lowest_layer(*ws_).async_connect(
184 results, [
this](
auto ec,
auto ep) { onConnect(ec, ep); });
189 ETLSource::onConnect(
190 boost::beast::error_code ec,
191 boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
193 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
205 boost::beast::get_lowest_layer(*ws_).expires_never();
209 boost::beast::websocket::stream_base::timeout::suggested(
210 boost::beast::role_type::client));
213 ws_->set_option(boost::beast::websocket::stream_base::decorator(
214 [](boost::beast::websocket::request_type& req) {
216 boost::beast::http::field::user_agent,
218 " websocket-client-async");
226 ws_->async_handshake(host,
"/", [
this](
auto ec) { onHandshake(ec); });
231 ETLSource::onHandshake(boost::beast::error_code ec)
233 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
243 jv[
"command"] =
"subscribe";
247 jv[
"streams"].
append(ledgerStream);
249 jv[
"streams"].
append(txnStream);
252 JLOG(journal_.trace()) <<
"Sending subscribe stream message";
255 boost::asio::buffer(fastWriter.
write(jv)),
256 [
this](
auto ec,
size_t size) { onWrite(ec, size); });
261 ETLSource::onWrite(boost::beast::error_code ec,
size_t bytesWritten)
263 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
273 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
278 ETLSource::onRead(boost::beast::error_code ec,
size_t size)
280 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
290 boost::beast::flat_buffer buffer;
291 swap(readBuffer_, buffer);
293 JLOG(journal_.trace())
294 << __func__ <<
" : calling async_read - " << toString();
296 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
301 ETLSource::handleMessage()
303 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
312 static_cast<char const*
>(readBuffer_.data().data()), response))
314 JLOG(journal_.error())
316 <<
"Error parsing stream message."
317 <<
" Message = " << readBuffer_.data().data();
321 uint32_t ledgerIndex = 0;
324 if (response[
"result"].isMember(jss::ledger_index))
326 ledgerIndex = response[
"result"][jss::ledger_index].
asUInt();
328 if (response[jss::result].isMember(jss::validated_ledgers))
331 response[jss::result][jss::validated_ledgers].asString());
333 JLOG(journal_.debug())
335 <<
"Received a message on ledger "
336 <<
" subscription stream. Message : "
341 if (response.
isMember(jss::transaction))
343 if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(
this))
345 etl_.getApplication().getOPs().forwardProposedTransaction(
351 JLOG(journal_.debug())
353 <<
"Received a message on ledger "
354 <<
" subscription stream. Message : "
356 if (response.
isMember(jss::ledger_index))
358 ledgerIndex = response[jss::ledger_index].
asUInt();
360 if (response.
isMember(jss::validated_ledgers))
363 response[jss::validated_ledgers].asString());
368 if (ledgerIndex != 0)
370 JLOG(journal_.trace())
372 <<
"Pushing ledger sequence = " << ledgerIndex <<
" - "
374 networkValidatedLedgers_.push(ledgerIndex);
380 JLOG(journal_.error()) <<
"Exception in handleMessage : " << e.
what();
407 request_.mutable_ledger()->set_sequence(seq);
410 request_.set_marker(marker.
data(), marker.
size());
412 request_.set_user(
"ETL");
415 nextPrefix_ = nextMarker->data()[0];
417 unsigned char prefix = marker.
data()[0];
419 JLOG(journal_.
debug())
420 <<
"Setting up AsyncCallData. marker = " <<
strHex(marker)
424 assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
426 cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
428 next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
430 context_ = std::make_unique<grpc::ClientContext>();
437 grpc::CompletionQueue& cq,
441 JLOG(journal_.
debug()) <<
"Processing calldata";
444 JLOG(journal_.
error()) <<
"AsyncCallData aborted";
445 return CallStatus::ERRORED;
449 JLOG(journal_.
debug()) <<
"AsyncCallData status_ not ok: "
450 <<
" code = " << status_.error_code()
451 <<
" message = " << status_.error_message();
452 return CallStatus::ERRORED;
454 if (!next_->is_unlimited())
456 JLOG(journal_.
warn())
457 <<
"AsyncCallData is_unlimited is false. Make sure "
458 "secure_gateway is set correctly at the ETL source";
467 if (cur_->marker().size() == 0)
471 unsigned char prefix = cur_->marker()[0];
472 if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
478 request_.set_marker(std::move(cur_->marker()));
482 for (
auto& obj : cur_->ledger_objects().objects())
484 auto key = uint256::fromVoid(obj.key().data());
485 auto& data = obj.data();
493 return more ? CallStatus::MORE : CallStatus::DONE;
499 grpc::CompletionQueue& cq)
501 context_ = std::make_unique<grpc::ClientContext>();
504 org::xrpl::rpc::v1::GetLedgerDataResponse>>
505 rpc(stub->PrepareAsyncGetLedgerData(context_.
get(), request_, &cq));
509 rpc->Finish(next_.
get(), &status_,
this);
515 if (next_->marker().size() == 0)
523 ETLSource::loadInitialLedger(
530 grpc::CompletionQueue cq;
539 for (
size_t i = 0; i < markers.size(); ++i)
542 if (i + 1 < markers.size())
543 nextMarker = markers[i + 1];
544 calls.
emplace_back(markers[i], nextMarker, sequence, journal_);
547 JLOG(journal_.debug()) <<
"Starting data download for ledger " << sequence
548 <<
". Using source = " << toString();
550 for (
auto& c : calls)
553 size_t numFinished = 0;
555 while (numFinished < calls.size() && !etl_.isStopping() &&
564 JLOG(journal_.error()) <<
"loadInitialLedger - ok is false";
570 JLOG(journal_.debug())
571 <<
"Marker prefix = " << ptr->getMarkerPrefix();
572 auto result = ptr->process(stub_, cq, writeQueue, abort);
573 if (result != AsyncCallData::CallStatus::MORE)
576 JLOG(journal_.debug())
577 <<
"Finished a marker. "
578 <<
"Current number of finished = " << numFinished;
580 if (result == AsyncCallData::CallStatus::ERRORED)
590 ETLSource::fetchLedger(uint32_t ledgerSequence,
bool getObjects)
592 org::xrpl::rpc::v1::GetLedgerResponse response;
594 return {{grpc::StatusCode::INTERNAL,
"No Stub"}, response};
597 org::xrpl::rpc::v1::GetLedgerRequest request;
598 grpc::ClientContext context;
599 request.mutable_ledger()->set_sequence(ledgerSequence);
600 request.set_transactions(
true);
601 request.set_expand(
true);
602 request.set_get_objects(getObjects);
603 request.set_user(
"ETL");
604 grpc::Status status = stub_->GetLedger(&context, request, &response);
605 if (status.ok() && !response.is_unlimited())
607 JLOG(journal_.warn()) <<
"ETLSource::fetchLedger - is_unlimited is "
608 "false. Make sure secure_gateway is set "
609 "correctly on the ETL source. source = "
613 return {status, std::move(response)};
618 , journal_(etl_.getApplication().journal(
"ReportingETL::LoadBalancer"))
629 std::make_unique<ETLSource>(host, websocketPort, grpcPort,
etl_);
631 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
639 std::make_unique<ETLSource>(host, websocketPort,
etl_);
641 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
651 [
this, &sequence, &writeQueue](
auto& source) {
652 bool res = source->loadInitialLedger(sequence, writeQueue);
655 JLOG(
journal_.
error()) <<
"Failed to download initial ledger. "
656 <<
" Sequence = " << sequence
657 <<
" source = " << source->toString();
667 org::xrpl::rpc::v1::GetLedgerResponse response;
669 [&response, ledgerSequence, getObjects,
this](
auto& source) {
670 auto [status, data] =
671 source->fetchLedger(ledgerSequence, getObjects);
672 response = std::move(data);
673 if (status.ok() && response.validated())
676 <<
"Successfully fetched ledger = " << ledgerSequence
677 <<
" from source = " << source->toString();
683 <<
"Error getting ledger = " << ledgerSequence
684 <<
" Reply : " << response.DebugString()
685 <<
" error_code : " << status.error_code()
686 <<
" error_msg : " << status.error_message()
687 <<
" source = " << source->toString();
703 srand((
unsigned)time(0));
704 auto sourceIdx = rand() %
sources_.size();
705 auto numAttempts = 0;
706 while (numAttempts <
sources_.size())
708 auto stub =
sources_[sourceIdx]->getP2pForwardingStub();
711 sourceIdx = (sourceIdx + 1) %
sources_.size();
726 srand((
unsigned)time(0));
727 auto sourceIdx = rand() %
sources_.size();
728 auto numAttempts = 0;
729 while (numAttempts <
sources_.size())
731 res =
sources_[sourceIdx]->forwardToP2p(context);
732 if (!res.
isMember(
"forwarded") || res[
"forwarded"] !=
true)
734 sourceIdx = (sourceIdx + 1) %
sources_.size();
752 return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
757 grpc::InsecureChannelCredentials()));
769 JLOG(
journal_.
debug()) <<
"Attempting to forward request to tx. "
776 <<
"Attempted to proxy but failed to connect to tx";
779 namespace beast = boost::beast;
780 namespace http = beast::http;
781 namespace websocket = beast::websocket;
783 using tcp = boost::asio::ip::tcp;
791 tcp::resolver resolver{ioc};
794 auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
797 auto const results = resolver.resolve(
ip_,
wsPort_);
801 net::connect(ws->next_layer(), results.begin(), results.end());
807 ws->set_option(websocket::stream_base::decorator(
808 [&context](websocket::request_type& req) {
810 http::field::user_agent,
812 " websocket-client-coro");
814 http::field::forwarded,
821 ws->handshake(
ip_,
"/");
827 ws->write(net::buffer(fastWriter.
write(request)));
829 beast::flat_buffer buffer;
834 static_cast<char const*
>(buffer.data().data()), response))
837 response[jss::error] =
"Error parsing response from tx";
841 response[
"forwarded"] =
true;
851 template <
class Func>
855 srand((
unsigned)time(0));
856 auto sourceIdx = rand() %
sources_.size();
857 auto numAttempts = 0;
865 <<
"Attempting to execute func. ledger sequence = "
866 << ledgerSequence <<
" - source = " << source->toString();
867 if (source->hasLedger(ledgerSequence))
869 bool res = f(source);
874 <<
"Successfully executed func at source = "
875 << source->toString()
876 <<
" - ledger sequence = " << ledgerSequence;
883 <<
"Failed to execute func at source = "
884 << source->toString()
885 <<
" - ledger sequence = " << ledgerSequence;
892 <<
"Ledger not present at source = " << source->toString()
893 <<
" - ledger sequence = " << ledgerSequence;
895 sourceIdx = (sourceIdx + 1) %
sources_.size();
897 if (numAttempts %
sources_.size() == 0)
907 <<
"Error executing function. "
908 <<
" Tried all sources, but ledger was found in db."
909 <<
" Sequence = " << ledgerSequence;
914 <<
"Error executing function "
915 <<
" - ledger sequence = " << ledgerSequence
916 <<
" - Tried all sources. Sleeping and trying again";