20 #include <ripple/app/reporting/ETLSource.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 #include <ripple/json/json_reader.h>
23 #include <ripple/json/json_writer.h>
34 , ioc_(etl.getApplication().getIOService())
35 , ws_(
std::make_unique<
37 boost::asio::make_strand(ioc_)))
38 , resolver_(
boost::asio::make_strand(ioc_))
39 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
40 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
41 , app_(etl_.getApplication())
55 , ioc_(etl.getApplication().getIOService())
56 , ws_(
std::make_unique<
58 boost::asio::make_strand(ioc_)))
59 , resolver_(
boost::asio::make_strand(ioc_))
60 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
61 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
62 , app_(etl_.getApplication())
74 <<
"Using IP to connect to ETL source: " << connectionString;
80 <<
"Using DNS to connect to ETL source: " << connectionString;
84 stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
86 connectionString, grpc::InsecureChannelCredentials()));
103 if (ec != boost::asio::error::operation_aborted &&
104 ec != boost::asio::error::connection_refused)
107 <<
"error code = " << ec <<
" - " <<
toString();
112 <<
"error code = " << ec <<
" - " <<
toString();
118 <<
" - etl is stopping. aborting reconnect";
125 timer_.expires_after(boost::asio::chrono::seconds(waitTime));
126 timer_.async_wait([
this, fname = __func__](
auto ec) {
127 bool startAgain = (ec != boost::asio::error::operation_aborted);
128 JLOG(
journal_.
trace()) << fname <<
" async_wait : ec = " << ec;
137 ioc_.post([
this, startAgain]() {
148 boost::beast::websocket::close_code::normal,
149 [this, startAgain, fname = __func__](auto ec) {
152 JLOG(journal_.error())
153 << fname <<
" async_close : "
154 <<
"error code = " << ec <<
" - " << toString();
171 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
173 auto const host = ip_;
174 auto const port = wsPort_;
176 resolver_.async_resolve(
177 host, port, [
this](
auto ec,
auto results) { onResolve(ec, results); });
181 ETLSource::onResolve(
182 boost::beast::error_code ec,
183 boost::asio::ip::tcp::resolver::results_type results)
185 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
194 boost::beast::get_lowest_layer(*ws_).expires_after(
196 boost::beast::get_lowest_layer(*ws_).async_connect(
197 results, [
this](
auto ec,
auto ep) { onConnect(ec, ep); });
202 ETLSource::onConnect(
203 boost::beast::error_code ec,
204 boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
206 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
218 boost::beast::get_lowest_layer(*ws_).expires_never();
222 boost::beast::websocket::stream_base::timeout::suggested(
223 boost::beast::role_type::client));
226 ws_->set_option(boost::beast::websocket::stream_base::decorator(
227 [](boost::beast::websocket::request_type& req) {
229 boost::beast::http::field::user_agent,
231 " websocket-client-async");
239 ws_->async_handshake(host,
"/", [
this](
auto ec) { onHandshake(ec); });
244 ETLSource::onHandshake(boost::beast::error_code ec)
246 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
256 jv[
"command"] =
"subscribe";
260 jv[
"streams"].
append(ledgerStream);
262 jv[
"streams"].
append(txnStream);
264 jv[
"streams"].
append(validationStream);
266 jv[
"streams"].
append(manifestStream);
269 JLOG(journal_.trace()) <<
"Sending subscribe stream message";
272 boost::asio::buffer(fastWriter.
write(jv)),
273 [
this](
auto ec,
size_t size) { onWrite(ec, size); });
278 ETLSource::onWrite(boost::beast::error_code ec,
size_t bytesWritten)
280 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
290 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
295 ETLSource::onRead(boost::beast::error_code ec,
size_t size)
297 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
307 boost::beast::flat_buffer buffer;
308 swap(readBuffer_, buffer);
310 JLOG(journal_.trace())
311 << __func__ <<
" : calling async_read - " << toString();
313 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
318 ETLSource::handleMessage()
320 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
329 static_cast<char const*
>(readBuffer_.data().data()), response))
331 JLOG(journal_.error())
333 <<
"Error parsing stream message."
334 <<
" Message = " << readBuffer_.data().data();
338 uint32_t ledgerIndex = 0;
341 if (response[
"result"].isMember(jss::ledger_index))
343 ledgerIndex = response[
"result"][jss::ledger_index].
asUInt();
345 if (response[jss::result].isMember(jss::validated_ledgers))
348 response[jss::result][jss::validated_ledgers].asString());
350 JLOG(journal_.debug())
352 <<
"Received a message on ledger "
353 <<
" subscription stream. Message : "
358 if (etl_.getETLLoadBalancer().shouldPropagateStream(
this))
360 if (response.
isMember(jss::transaction))
362 etl_.getApplication().getOPs().forwardProposedTransaction(
367 response[
"type"] ==
"validationReceived")
369 etl_.getApplication().getOPs().forwardValidation(response);
373 response[
"type"] ==
"manifestReceived")
375 etl_.getApplication().getOPs().forwardManifest(response);
379 if (response.
isMember(
"type") && response[
"type"] ==
"ledgerClosed")
381 JLOG(journal_.debug())
383 <<
"Received a message on ledger "
384 <<
" subscription stream. Message : "
386 if (response.
isMember(jss::ledger_index))
388 ledgerIndex = response[jss::ledger_index].
asUInt();
390 if (response.
isMember(jss::validated_ledgers))
393 response[jss::validated_ledgers].asString());
398 if (ledgerIndex != 0)
400 JLOG(journal_.trace())
402 <<
"Pushing ledger sequence = " << ledgerIndex <<
" - "
404 networkValidatedLedgers_.push(ledgerIndex);
410 JLOG(journal_.error()) <<
"Exception in handleMessage : " << e.
what();
437 request_.mutable_ledger()->set_sequence(seq);
440 request_.set_marker(marker.
data(), marker.
size());
442 request_.set_user(
"ETL");
445 nextPrefix_ = nextMarker->data()[0];
447 unsigned char prefix = marker.
data()[0];
449 JLOG(journal_.
debug())
450 <<
"Setting up AsyncCallData. marker = " <<
strHex(marker)
454 assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
456 cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
458 next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
460 context_ = std::make_unique<grpc::ClientContext>();
467 grpc::CompletionQueue& cq,
471 JLOG(journal_.
debug()) <<
"Processing calldata";
474 JLOG(journal_.
error()) <<
"AsyncCallData aborted";
475 return CallStatus::ERRORED;
479 JLOG(journal_.
debug()) <<
"AsyncCallData status_ not ok: "
480 <<
" code = " << status_.error_code()
481 <<
" message = " << status_.error_message();
482 return CallStatus::ERRORED;
484 if (!next_->is_unlimited())
486 JLOG(journal_.
warn())
487 <<
"AsyncCallData is_unlimited is false. Make sure "
488 "secure_gateway is set correctly at the ETL source";
497 if (cur_->marker().size() == 0)
501 unsigned char prefix = cur_->marker()[0];
502 if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
508 request_.set_marker(std::move(cur_->marker()));
512 for (
auto& obj : cur_->ledger_objects().objects())
514 auto key = uint256::fromVoidChecked(obj.key());
518 auto& data = obj.data();
526 return more ? CallStatus::MORE : CallStatus::DONE;
532 grpc::CompletionQueue& cq)
534 context_ = std::make_unique<grpc::ClientContext>();
537 org::xrpl::rpc::v1::GetLedgerDataResponse>>
538 rpc(stub->PrepareAsyncGetLedgerData(context_.
get(), request_, &cq));
542 rpc->Finish(next_.
get(), &status_,
this);
548 if (next_->marker().size() == 0)
556 ETLSource::loadInitialLedger(
563 grpc::CompletionQueue cq;
572 for (
size_t i = 0; i < markers.size(); ++i)
575 if (i + 1 < markers.size())
576 nextMarker = markers[i + 1];
577 calls.
emplace_back(markers[i], nextMarker, sequence, journal_);
580 JLOG(journal_.debug()) <<
"Starting data download for ledger " << sequence
581 <<
". Using source = " << toString();
583 for (
auto& c : calls)
586 size_t numFinished = 0;
588 while (numFinished < calls.size() && !etl_.isStopping() &&
597 JLOG(journal_.error()) <<
"loadInitialLedger - ok is false";
603 JLOG(journal_.debug())
604 <<
"Marker prefix = " << ptr->getMarkerPrefix();
605 auto result = ptr->process(stub_, cq, writeQueue, abort);
606 if (result != AsyncCallData::CallStatus::MORE)
609 JLOG(journal_.debug())
610 <<
"Finished a marker. "
611 <<
"Current number of finished = " << numFinished;
613 if (result == AsyncCallData::CallStatus::ERRORED)
623 ETLSource::fetchLedger(uint32_t ledgerSequence,
bool getObjects)
625 org::xrpl::rpc::v1::GetLedgerResponse response;
627 return {{grpc::StatusCode::INTERNAL,
"No Stub"}, response};
630 org::xrpl::rpc::v1::GetLedgerRequest request;
631 grpc::ClientContext context;
632 request.mutable_ledger()->set_sequence(ledgerSequence);
633 request.set_transactions(
true);
634 request.set_expand(
true);
635 request.set_get_objects(getObjects);
636 request.set_user(
"ETL");
637 grpc::Status status = stub_->GetLedger(&context, request, &response);
638 if (status.ok() && !response.is_unlimited())
640 JLOG(journal_.warn()) <<
"ETLSource::fetchLedger - is_unlimited is "
641 "false. Make sure secure_gateway is set "
642 "correctly on the ETL source. source = "
646 return {status, std::move(response)};
651 , journal_(etl_.getApplication().journal(
"ReportingETL::LoadBalancer"))
662 std::make_unique<ETLSource>(host, websocketPort, grpcPort,
etl_);
664 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
672 std::make_unique<ETLSource>(host, websocketPort,
etl_);
674 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
684 [
this, &sequence, &writeQueue](
auto& source) {
685 bool res = source->loadInitialLedger(sequence, writeQueue);
688 JLOG(
journal_.
error()) <<
"Failed to download initial ledger. "
689 <<
" Sequence = " << sequence
690 <<
" source = " << source->toString();
700 org::xrpl::rpc::v1::GetLedgerResponse response;
702 [&response, ledgerSequence, getObjects,
this](
auto& source) {
703 auto [status, data] =
704 source->fetchLedger(ledgerSequence, getObjects);
705 response = std::move(data);
706 if (status.ok() && response.validated())
709 <<
"Successfully fetched ledger = " << ledgerSequence
710 <<
" from source = " << source->toString();
716 <<
"Error getting ledger = " << ledgerSequence
717 <<
" Reply : " << response.DebugString()
718 <<
" error_code : " << status.error_code()
719 <<
" error_msg : " << status.error_message()
720 <<
" source = " << source->toString();
736 srand((
unsigned)time(0));
737 auto sourceIdx = rand() %
sources_.size();
738 auto numAttempts = 0;
739 while (numAttempts <
sources_.size())
741 auto stub =
sources_[sourceIdx]->getP2pForwardingStub();
744 sourceIdx = (sourceIdx + 1) %
sources_.size();
759 srand((
unsigned)time(0));
760 auto sourceIdx = rand() %
sources_.size();
761 auto numAttempts = 0;
764 while (numAttempts <
sources_.size())
766 auto increment = [&]() {
767 sourceIdx = (sourceIdx + 1) %
sources_.size();
771 if (mostRecent && !src->hasLedger(*mostRecent))
776 res = src->forwardToP2p(context);
777 if (!res.
isMember(
"forwarded") || res[
"forwarded"] !=
true)
796 return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
801 grpc::InsecureChannelCredentials()));
813 JLOG(
journal_.
debug()) <<
"Attempting to forward request to tx. "
820 <<
"Attempted to proxy but failed to connect to tx";
823 namespace beast = boost::beast;
824 namespace http = beast::http;
825 namespace websocket = beast::websocket;
827 using tcp = boost::asio::ip::tcp;
835 tcp::resolver resolver{ioc};
838 auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
841 auto const results = resolver.resolve(
ip_,
wsPort_);
845 net::connect(ws->next_layer(), results.begin(), results.end());
851 ws->set_option(websocket::stream_base::decorator(
852 [&context](websocket::request_type& req) {
854 http::field::user_agent,
856 " websocket-client-coro");
858 http::field::forwarded,
865 ws->handshake(
ip_,
"/");
871 ws->write(net::buffer(fastWriter.
write(request)));
873 beast::flat_buffer buffer;
878 static_cast<char const*
>(buffer.data().data()), response))
881 response[jss::error] =
"Error parsing response from tx";
885 response[
"forwarded"] =
true;
895 template <
class Func>
899 srand((
unsigned)time(0));
900 auto sourceIdx = rand() %
sources_.size();
901 auto numAttempts = 0;
909 <<
"Attempting to execute func. ledger sequence = "
910 << ledgerSequence <<
" - source = " << source->toString();
911 if (source->hasLedger(ledgerSequence))
913 bool res = f(source);
918 <<
"Successfully executed func at source = "
919 << source->toString()
920 <<
" - ledger sequence = " << ledgerSequence;
927 <<
"Failed to execute func at source = "
928 << source->toString()
929 <<
" - ledger sequence = " << ledgerSequence;
936 <<
"Ledger not present at source = " << source->toString()
937 <<
" - ledger sequence = " << ledgerSequence;
939 sourceIdx = (sourceIdx + 1) %
sources_.size();
941 if (numAttempts %
sources_.size() == 0)
951 <<
"Error executing function. "
952 <<
" Tried all sources, but ledger was found in db."
953 <<
" Sequence = " << ledgerSequence;
958 <<
"Error executing function "
959 <<
" - ledger sequence = " << ledgerSequence
960 <<
" - Tried all sources. Sleeping and trying again";