20 #include <ripple/app/rdb/backend/PostgresDatabase.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 #include <ripple/basics/ThreadUtilities.h>
23 #include <ripple/json/json_reader.h>
24 #include <ripple/json/json_writer.h>
25 #include <boost/asio/connect.hpp>
26 #include <boost/asio/ip/tcp.hpp>
27 #include <boost/beast/core.hpp>
28 #include <boost/beast/websocket.hpp>
44 ss <<
"LedgerInfo { Sequence : " << info.
seq
59 while (!
stopping_ && (sle = writeQueue.pop()))
62 if (!ledger->exists(sle->key()))
63 ledger->rawInsert(sle);
77 org::xrpl::rpc::v1::GetLedgerResponse& data)
80 for (
auto& txn : data.transactions_list().transactions())
82 auto& raw = txn.transaction_blob();
87 auto txSerializer = std::make_shared<Serializer>(sttx.getSerializer());
90 sttx.getTransactionID(), ledger->info().seq, txn.metadata_blob()};
93 std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
97 <<
"Inserting transaction = " << sttx.getTransactionID();
98 uint256 nodestoreHash = ledger->rawTxInsertWithHash(
99 sttx.getTransactionID(), txSerializer, metaSerializer);
102 return accountTxData;
109 auto ledger = std::const_pointer_cast<Ledger>(
114 <<
"Database is not empty";
131 <<
"Deserialized ledger header. "
136 ledger->stateMap().clearSynching();
137 ledger->txMap().clearSynching();
139 #ifdef RIPPLED_REPORTING
147 std::thread asyncWriter{[
this, &ledger, &writeQueue]() {
159 writeQueue.
push(
null);
168 #ifdef RIPPLED_REPORTING
170 ->writeLedgerAndTransactions(ledger->info(), accountTxData);
175 JLOG(
journal_.
debug()) <<
"Time to download and store ledger = "
176 << ((end -
start).count()) / 1000000000.0;
184 <<
"Flushing ledger. "
187 auto& accountHash = ledger->info().accountHash;
188 auto& txHash = ledger->info().txHash;
189 auto& ledgerHash = ledger->info().hash;
194 ledger->setImmutable(
false);
204 addRaw(ledger->info(), s);
217 <<
"Flushed " << numFlushed
218 <<
" nodes to nodestore from stateMap";
220 <<
"Flushed " << numTxFlushed
221 <<
" nodes to nodestore from txMap";
225 << (end -
start).count() / 1000000000.0
231 <<
"Flushed 0 nodes from state map";
234 if (numTxFlushed == 0)
237 <<
"Flushed 0 nodes from tx map";
241 if (ledger->stateMap().getHash().as_uint256() != accountHash)
245 <<
"State map hash does not match. "
246 <<
"Expected hash = " <<
strHex(accountHash) <<
"Actual hash = "
247 <<
strHex(ledger->stateMap().getHash().as_uint256());
248 Throw<std::runtime_error>(
"state map hash mismatch");
251 if (ledger->txMap().getHash().as_uint256() != txHash)
255 <<
"Tx map hash does not match. "
256 <<
"Expected hash = " <<
strHex(txHash) <<
"Actual hash = "
257 <<
strHex(ledger->txMap().getHash().as_uint256());
258 Throw<std::runtime_error>(
"tx map hash mismatch");
261 if (ledger->info().hash != ledgerHash)
265 <<
"Ledger hash does not match. "
266 <<
"Expected hash = " <<
strHex(ledgerHash)
267 <<
"Actual hash = " <<
strHex(ledger->info().hash);
268 Throw<std::runtime_error>(
"ledger hash mismatch");
272 <<
"Successfully flushed ledger! "
288 <<
"Attempting to publish ledger = "
290 size_t numAttempts = 0;
299 <<
"Trying to publish. Could not find ledger with sequence = "
310 if (numAttempts >= maxAttempts)
313 <<
"Failed to publish ledger after "
314 << numAttempts <<
" attempts.";
318 <<
"Attempting to become ETL writer";
325 <<
"In strict read-only mode. "
326 <<
"Skipping publishing this ledger. "
327 <<
"Beginning fast forward.";
355 <<
"Attempting to fetch ledger with sequence = "
361 <<
"GetLedger reply = " << response->DebugString();
369 <<
"Attempting to fetch ledger with sequence = "
375 <<
"GetLedger reply = " << response->DebugString();
382 org::xrpl::rpc::v1::GetLedgerResponse& rawData)
385 <<
"Beginning ledger update";
391 <<
"Deserialized ledger header. "
394 next->setLedgerInfo(lgrInfo);
396 next->stateMap().clearSynching();
397 next->txMap().clearSynching();
404 <<
"Inserted all transactions. Number of transactions = "
405 << rawData.transactions_list().transactions_size();
407 for (
auto& obj : rawData.ledger_objects().objects())
413 auto& data = obj.data();
416 if (data.size() == 0)
419 <<
"Erasing object = " << *key;
420 if (next->exists(*key))
421 next->rawErase(*key);
428 if (next->exists(*key))
431 <<
"Replacing object = " << *key;
432 next->rawReplace(sle);
437 <<
"Inserting object = " << *key;
438 next->rawInsert(sle);
444 <<
"Inserted/modified/deleted all objects. Number of objects = "
445 << rawData.ledger_objects().objects_size();
447 if (!rawData.skiplist_included())
449 next->updateSkipList();
452 <<
"tx process is not sending skiplist. This indicates that the tx "
453 "process is parsing metadata instead of doing a SHAMap diff. "
454 "Make sure tx process is running the same code as reporting to "
455 "use SHAMap diff instead of parsing metadata";
459 <<
"Finished ledger update. "
461 return {std::move(next), std::move(accountTxData)};
490 <<
"Starting etl pipeline";
498 Throw<std::runtime_error>(
"runETLPipeline: parent ledger is null");
503 constexpr uint32_t maxQueueSize = 1000;
506 transformQueue{maxQueueSize};
513 uint32_t currentSequence = startSequence;
541 auto time = ((end -
start).count()) / 1000000000.0;
543 fetchResponse->transactions_list().transactions_size() / time;
546 <<
" . Extract phase tps = " << tps;
548 transformQueue.push(std::move(fetchResponse));
552 transformQueue.push({});
558 loadQueue{maxQueueSize};
568 while (!writeConflict)
571 transformQueue.pop()};
582 auto [next, accountTxData] =
586 auto duration = ((end -
start).count()) / 1000000000.0;
600 &lastPublishedSequence,
604 size_t totalTransactions = 0;
605 double totalTime = 0;
606 while (!writeConflict)
611 result{loadQueue.pop()};
619 auto& ledger = result->first;
620 auto& accountTxData = result->second;
630 #ifdef RIPPLED_REPORTING
633 ledger->info(), accountTxData))
634 writeConflict =
true;
641 lastPublishedSequence = ledger->info().seq;
644 auto kvTime = ((mid -
start).count()) / 1000000000.0;
645 auto relationalTime = ((end - mid).count()) / 1000000000.0;
647 size_t numTxns = accountTxData.size();
649 totalTransactions += numTxns;
651 <<
"Load phase of etl : "
652 <<
"Successfully published ledger! Ledger info: "
654 <<
". txn count = " << numTxns
655 <<
". key-value write time = " << kvTime
656 <<
". relational write time = " << relationalTime
657 <<
". key-value tps = " << numTxns / kvTime
658 <<
". relational tps = " << numTxns / relationalTime
659 <<
". total key-value tps = " << totalTransactions / totalTime;
670 <<
"Stopping etl pipeline";
672 return lastPublishedSequence;
688 auto ledger = std::const_pointer_cast<Ledger>(
693 <<
"Database is empty. Will download a ledger "
699 <<
"ledger sequence specified in config. "
700 <<
"Will begin ETL process starting with ledger "
708 <<
"Waiting for next ledger to be validated by network...";
711 if (mostRecentValidated)
714 <<
"Ledger " << *mostRecentValidated
715 <<
" has been validated. "
722 <<
"The wait for the next validated "
723 <<
"ledger has been aborted. "
724 <<
"Exiting monitor loop";
733 Throw<std::runtime_error>(
734 "start sequence specified but db is already populated");
738 <<
"Database already populated. Picking up from the tip of history";
744 <<
"Failed to load initial ledger. Exiting monitor loop";
751 uint32_t nextSequence = ledger->info().seq + 1;
754 <<
"Database is populated. "
755 <<
"Starting monitor loop. sequence = "
761 <<
"Ledger with sequence = " << nextSequence
762 <<
" has been validated by the network. "
763 <<
"Attempting to find in database and publish";
780 constexpr
size_t timeoutSeconds = 10;
786 <<
"Failed to publish ledger with sequence = " << nextSequence
787 <<
" . Beginning ETL";
793 <<
"Aborting ETL. Falling back to publishing";
796 nextSequence = *lastPublished + 1;
808 JLOG(
journal_.
debug()) <<
"Starting reporting in strict read only mode";
813 uint32_t sequence = *mostRecent;
837 , journal_(app.journal(
"ReportingETL"))
838 , publishStrand_(app_.getIOService())
839 , loadBalancer_(*this)
844 #ifndef RIPPLED_REPORTING
845 Throw<std::runtime_error>(
846 "Config file specifies reporting, but software was not built with "
847 "-Dreporting=1. To use reporting, configure CMake with "
851 Throw<std::runtime_error>(
852 "Reporting requires tx tables. Set use_tx_tables=1 in config "
853 "file, under [ledger_tx_tables] section");
858 auto& vals = section.
values();
864 auto optIp = source.
get(
"source_ip");
868 auto optWsPort = source.
get(
"source_ws_port");
872 auto optGrpcPort = source.
get(
"source_grpc_port");
893 auto const optRO = section.
get(
"read_only");
896 readOnly_ = (*optRO ==
"true" || *optRO ==
"1");
902 auto asciiToIntThrows =
903 [](
auto& dest,
std::string const& src,
char const* onError) {
904 char const*
const srcEnd = src.data() + src.size();
909 while (ptr != srcEnd &&
910 std::isspace(
static_cast<unsigned char>(*ptr)))
917 Throw<std::runtime_error>(onError + src);
926 "Expected integral START_LEDGER command line argument. Got: ");
931 auto const optStartSeq = section.
get(
"start_sequence");
939 "Expected integral start_sequence config entry. Got: ");
943 auto const optFlushInterval = section.
get(
"flush_interval");
944 if (optFlushInterval)
948 "Expected integral flush_interval config entry. Got: ");
950 auto const optNumMarkers = section.
get(
"num_markers");
955 "Expected integral num_markers config entry. Got: ");