20 #include <ripple/app/rdb/backend/PostgresDatabase.h>
21 #include <ripple/app/reporting/ReportingETL.h>
23 #include <ripple/beast/core/CurrentThreadName.h>
24 #include <ripple/json/json_reader.h>
25 #include <ripple/json/json_writer.h>
26 #include <boost/asio/connect.hpp>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/beast/core.hpp>
29 #include <boost/beast/websocket.hpp>
45 ss <<
"LedgerInfo { Sequence : " << info.
seq
60 while (!
stopping_ && (sle = writeQueue.pop()))
63 if (!ledger->exists(sle->key()))
64 ledger->rawInsert(sle);
78 org::xrpl::rpc::v1::GetLedgerResponse& data)
81 for (
auto& txn : data.transactions_list().transactions())
83 auto& raw = txn.transaction_blob();
88 auto txSerializer = std::make_shared<Serializer>(sttx.getSerializer());
91 sttx.getTransactionID(), ledger->info().seq, txn.metadata_blob()};
94 std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
98 <<
"Inserting transaction = " << sttx.getTransactionID();
99 uint256 nodestoreHash = ledger->rawTxInsertWithHash(
100 sttx.getTransactionID(), txSerializer, metaSerializer);
103 return accountTxData;
110 auto ledger = std::const_pointer_cast<Ledger>(
115 <<
"Database is not empty";
132 <<
"Deserialized ledger header. "
137 ledger->stateMap().clearSynching();
138 ledger->txMap().clearSynching();
140 #ifdef RIPPLED_REPORTING
148 std::thread asyncWriter{[
this, &ledger, &writeQueue]() {
160 writeQueue.
push(
null);
169 #ifdef RIPPLED_REPORTING
171 ->writeLedgerAndTransactions(ledger->info(), accountTxData);
176 JLOG(
journal_.
debug()) <<
"Time to download and store ledger = "
177 << ((end -
start).count()) / 1000000000.0;
185 <<
"Flushing ledger. "
188 auto& accountHash = ledger->info().accountHash;
189 auto& txHash = ledger->info().txHash;
190 auto& ledgerHash = ledger->info().hash;
202 addRaw(ledger->info(), s);
215 <<
"Flushed " << numFlushed
216 <<
" nodes to nodestore from stateMap";
218 <<
"Flushed " << numTxFlushed
219 <<
" nodes to nodestore from txMap";
223 << (end -
start).count() / 1000000000.0
229 <<
"Flushed 0 nodes from state map";
232 if (numTxFlushed == 0)
235 <<
"Flushed 0 nodes from tx map";
239 if (ledger->stateMap().getHash().as_uint256() != accountHash)
243 <<
"State map hash does not match. "
244 <<
"Expected hash = " <<
strHex(accountHash) <<
"Actual hash = "
245 <<
strHex(ledger->stateMap().getHash().as_uint256());
246 Throw<std::runtime_error>(
"state map hash mismatch");
249 if (ledger->txMap().getHash().as_uint256() != txHash)
253 <<
"Tx map hash does not match. "
254 <<
"Expected hash = " <<
strHex(txHash) <<
"Actual hash = "
255 <<
strHex(ledger->txMap().getHash().as_uint256());
256 Throw<std::runtime_error>(
"tx map hash mismatch");
259 if (ledger->info().hash != ledgerHash)
263 <<
"Ledger hash does not match. "
264 <<
"Expected hash = " <<
strHex(ledgerHash)
265 <<
"Actual hash = " <<
strHex(ledger->info().hash);
266 Throw<std::runtime_error>(
"ledger hash mismatch");
270 <<
"Successfully flushed ledger! "
286 <<
"Attempting to publish ledger = "
288 size_t numAttempts = 0;
297 <<
"Trying to publish. Could not find ledger with sequence = "
308 if (numAttempts >= maxAttempts)
311 <<
"Failed to publish ledger after "
312 << numAttempts <<
" attempts.";
316 <<
"Attempting to become ETL writer";
323 <<
"In strict read-only mode. "
324 <<
"Skipping publishing this ledger. "
325 <<
"Beginning fast forward.";
353 <<
"Attempting to fetch ledger with sequence = "
359 <<
"GetLedger reply = " << response->DebugString();
367 <<
"Attempting to fetch ledger with sequence = "
373 <<
"GetLedger reply = " << response->DebugString();
380 org::xrpl::rpc::v1::GetLedgerResponse& rawData)
383 <<
"Beginning ledger update";
389 <<
"Deserialized ledger header. "
392 next->setLedgerInfo(lgrInfo);
394 next->stateMap().clearSynching();
395 next->txMap().clearSynching();
402 <<
"Inserted all transactions. Number of transactions = "
403 << rawData.transactions_list().transactions_size();
405 for (
auto& obj : rawData.ledger_objects().objects())
411 auto& data = obj.data();
414 if (data.size() == 0)
417 <<
"Erasing object = " << *key;
418 if (next->exists(*key))
419 next->rawErase(*key);
426 if (next->exists(*key))
429 <<
"Replacing object = " << *key;
430 next->rawReplace(sle);
435 <<
"Inserting object = " << *key;
436 next->rawInsert(sle);
442 <<
"Inserted/modified/deleted all objects. Number of objects = "
443 << rawData.ledger_objects().objects_size();
445 if (!rawData.skiplist_included())
447 next->updateSkipList();
450 <<
"tx process is not sending skiplist. This indicates that the tx "
451 "process is parsing metadata instead of doing a SHAMap diff. "
452 "Make sure tx process is running the same code as reporting to "
453 "use SHAMap diff instead of parsing metadata";
457 <<
"Finished ledger update. "
459 return {std::move(next), std::move(accountTxData)};
488 <<
"Starting etl pipeline";
496 Throw<std::runtime_error>(
"runETLPipeline: parent ledger is null");
501 constexpr uint32_t maxQueueSize = 1000;
504 transformQueue{maxQueueSize};
511 uint32_t currentSequence = startSequence;
539 auto time = ((end -
start).count()) / 1000000000.0;
541 fetchResponse->transactions_list().transactions_size() / time;
544 <<
" . Extract phase tps = " << tps;
546 transformQueue.push(std::move(fetchResponse));
550 transformQueue.push({});
556 loadQueue{maxQueueSize};
566 while (!writeConflict)
569 transformQueue.pop()};
580 auto [next, accountTxData] =
584 auto duration = ((end -
start).count()) / 1000000000.0;
598 &lastPublishedSequence,
602 size_t totalTransactions = 0;
603 double totalTime = 0;
604 while (!writeConflict)
609 result{loadQueue.pop()};
617 auto& ledger = result->first;
618 auto& accountTxData = result->second;
628 #ifdef RIPPLED_REPORTING
631 ledger->info(), accountTxData))
632 writeConflict =
true;
639 lastPublishedSequence = ledger->info().seq;
642 auto kvTime = ((mid -
start).count()) / 1000000000.0;
643 auto relationalTime = ((end - mid).count()) / 1000000000.0;
645 size_t numTxns = accountTxData.size();
647 totalTransactions += numTxns;
649 <<
"Load phase of etl : "
650 <<
"Successfully published ledger! Ledger info: "
652 <<
". txn count = " << numTxns
653 <<
". key-value write time = " << kvTime
654 <<
". relational write time = " << relationalTime
655 <<
". key-value tps = " << numTxns / kvTime
656 <<
". relational tps = " << numTxns / relationalTime
657 <<
". total key-value tps = " << totalTransactions / totalTime;
668 <<
"Stopping etl pipeline";
670 return lastPublishedSequence;
686 auto ledger = std::const_pointer_cast<Ledger>(
691 <<
"Database is empty. Will download a ledger "
697 <<
"ledger sequence specified in config. "
698 <<
"Will begin ETL process starting with ledger "
706 <<
"Waiting for next ledger to be validated by network...";
709 if (mostRecentValidated)
712 <<
"Ledger " << *mostRecentValidated
713 <<
" has been validated. "
720 <<
"The wait for the next validated "
721 <<
"ledger has been aborted. "
722 <<
"Exiting monitor loop";
731 Throw<std::runtime_error>(
732 "start sequence specified but db is already populated");
736 <<
"Database already populated. Picking up from the tip of history";
742 <<
"Failed to load initial ledger. Exiting monitor loop";
749 uint32_t nextSequence = ledger->info().seq + 1;
752 <<
"Database is populated. "
753 <<
"Starting monitor loop. sequence = "
759 <<
"Ledger with sequence = " << nextSequence
760 <<
" has been validated by the network. "
761 <<
"Attempting to find in database and publish";
778 constexpr
size_t timeoutSeconds = 10;
784 <<
"Failed to publish ledger with sequence = " << nextSequence
785 <<
" . Beginning ETL";
791 <<
"Aborting ETL. Falling back to publishing";
794 nextSequence = *lastPublished + 1;
806 JLOG(
journal_.
debug()) <<
"Starting reporting in strict read only mode";
811 uint32_t sequence = *mostRecent;
835 , journal_(app.journal(
"ReportingETL"))
836 , publishStrand_(app_.getIOService())
837 , loadBalancer_(*this)
842 #ifndef RIPPLED_REPORTING
843 Throw<std::runtime_error>(
844 "Config file specifies reporting, but software was not built with "
845 "-Dreporting=1. To use reporting, configure CMake with "
849 Throw<std::runtime_error>(
850 "Reporting requires tx tables. Set use_tx_tables=1 in config "
851 "file, under [ledger_tx_tables] section");
856 auto& vals = section.
values();
862 auto optIp = source.
get(
"source_ip");
866 auto optWsPort = source.
get(
"source_ws_port");
870 auto optGrpcPort = source.
get(
"source_grpc_port");
891 auto const optRO = section.
get(
"read_only");
894 readOnly_ = (*optRO ==
"true" || *optRO ==
"1");
900 auto asciiToIntThrows =
901 [](
auto& dest,
std::string const& src,
char const* onError) {
902 char const*
const srcEnd = src.data() + src.size();
907 while (ptr != srcEnd &&
908 std::isspace(
static_cast<unsigned char>(*ptr)))
915 Throw<std::runtime_error>(onError + src);
924 "Expected integral START_LEDGER command line argument. Got: ");
929 auto const optStartSeq = section.
get(
"start_sequence");
937 "Expected integral start_sequence config entry. Got: ");
941 auto const optFlushInterval = section.
get(
"flush_interval");
942 if (optFlushInterval)
946 "Expected integral flush_interval config entry. Got: ");
948 auto const optNumMarkers = section.
get(
"num_markers");
953 "Expected integral num_markers config entry. Got: ");