rippled
ReportingETL.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/rdb/backend/RelationalDBInterfacePostgres.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 
23 #include <ripple/beast/core/CurrentThreadName.h>
24 #include <ripple/json/json_reader.h>
25 #include <ripple/json/json_writer.h>
26 #include <boost/asio/connect.hpp>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/beast/core.hpp>
29 #include <boost/beast/websocket.hpp>
30 #include <cctype>
31 #include <charconv>
32 #include <cstdlib>
33 #include <iostream>
34 #include <string>
35 #include <variant>
36 
37 namespace ripple {
38 
39 namespace detail {
42 toString(LedgerInfo const& info)
43 {
45  ss << "LedgerInfo { Sequence : " << info.seq
46  << " Hash : " << strHex(info.hash) << " TxHash : " << strHex(info.txHash)
47  << " AccountHash : " << strHex(info.accountHash)
48  << " ParentHash : " << strHex(info.parentHash) << " }";
49  return ss.str();
50 }
51 } // namespace detail
52 
53 void
57 {
59  size_t num = 0;
60  while (!stopping_ && (sle = writeQueue.pop()))
61  {
62  assert(sle);
63  if (!ledger->exists(sle->key()))
64  ledger->rawInsert(sle);
65 
66  if (flushInterval_ != 0 && (num % flushInterval_) == 0)
67  {
68  JLOG(journal_.debug()) << "Flushing! key = " << strHex(sle->key());
69  ledger->stateMap().flushDirty(hotACCOUNT_NODE);
70  }
71  ++num;
72  }
73 }
74 
78  org::xrpl::rpc::v1::GetLedgerResponse& data)
79 {
81  for (auto& txn : data.transactions_list().transactions())
82  {
83  auto& raw = txn.transaction_blob();
84 
85  SerialIter it{raw.data(), raw.size()};
86  STTx sttx{it};
87 
88  auto txSerializer = std::make_shared<Serializer>(sttx.getSerializer());
89 
90  TxMeta txMeta{
91  sttx.getTransactionID(), ledger->info().seq, txn.metadata_blob()};
92 
93  auto metaSerializer =
94  std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
95 
96  JLOG(journal_.trace())
97  << __func__ << " : "
98  << "Inserting transaction = " << sttx.getTransactionID();
99  uint256 nodestoreHash = ledger->rawTxInsertWithHash(
100  sttx.getTransactionID(), txSerializer, metaSerializer);
101  accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal_);
102  }
103  return accountTxData;
104 }
105 
107 ReportingETL::loadInitialLedger(uint32_t startingSequence)
108 {
109  // check that database is actually empty
110  auto ledger = std::const_pointer_cast<Ledger>(
112  if (ledger)
113  {
114  JLOG(journal_.fatal()) << __func__ << " : "
115  << "Database is not empty";
116  assert(false);
117  return {};
118  }
119 
120  // fetch the ledger from the network. This function will not return until
121  // either the fetch is successful, or the server is being shutdown. This
122  // only fetches the ledger header and the transactions+metadata
124  fetchLedgerData(startingSequence)};
125  if (!ledgerData)
126  return {};
127 
128  LedgerInfo lgrInfo =
129  deserializeHeader(makeSlice(ledgerData->ledger_header()), true);
130 
131  JLOG(journal_.debug()) << __func__ << " : "
132  << "Deserialized ledger header. "
133  << detail::toString(lgrInfo);
134 
135  ledger =
136  std::make_shared<Ledger>(lgrInfo, app_.config(), app_.getNodeFamily());
137  ledger->stateMap().clearSynching();
138  ledger->txMap().clearSynching();
139 
140 #ifdef RIPPLED_REPORTING
142  insertTransactions(ledger, *ledgerData);
143 #endif
144 
146 
148  std::thread asyncWriter{[this, &ledger, &writeQueue]() {
149  consumeLedgerData(ledger, writeQueue);
150  }};
151 
152  // download the full account state map. This function downloads full ledger
153  // data and pushes the downloaded data into the writeQueue. asyncWriter
154  // consumes from the queue and inserts the data into the Ledger object.
155  // Once the below call returns, all data has been pushed into the queue
156  loadBalancer_.loadInitialLedger(startingSequence, writeQueue);
157 
158  // null is used to respresent the end of the queue
160  writeQueue.push(null);
161  // wait for the writer to finish
162  asyncWriter.join();
163 
164  if (!stopping_)
165  {
166  flushLedger(ledger);
167  if (app_.config().reporting())
168  {
169 #ifdef RIPPLED_REPORTING
170  dynamic_cast<RelationalDBInterfacePostgres*>(
172  ->writeLedgerAndTransactions(ledger->info(), accountTxData);
173 #endif
174  }
175  }
176  auto end = std::chrono::system_clock::now();
177  JLOG(journal_.debug()) << "Time to download and store ledger = "
178  << ((end - start).count()) / 1000000000.0;
179  return ledger;
180 }
181 
182 void
184 {
185  JLOG(journal_.debug()) << __func__ << " : "
186  << "Flushing ledger. "
187  << detail::toString(ledger->info());
188  // These are recomputed in setImmutable
189  auto& accountHash = ledger->info().accountHash;
190  auto& txHash = ledger->info().txHash;
191  auto& ledgerHash = ledger->info().hash;
192 
193  ledger->setImmutable(app_.config(), false);
195 
196  auto numFlushed = ledger->stateMap().flushDirty(hotACCOUNT_NODE);
197 
198  auto numTxFlushed = ledger->txMap().flushDirty(hotTRANSACTION_NODE);
199 
200  {
201  Serializer s(128);
203  addRaw(ledger->info(), s);
205  hotLEDGER,
206  std::move(s.modData()),
207  ledger->info().hash,
208  ledger->info().seq);
209  }
210 
211  app_.getNodeStore().sync();
212 
213  auto end = std::chrono::system_clock::now();
214 
215  JLOG(journal_.debug()) << __func__ << " : "
216  << "Flushed " << numFlushed
217  << " nodes to nodestore from stateMap";
218  JLOG(journal_.debug()) << __func__ << " : "
219  << "Flushed " << numTxFlushed
220  << " nodes to nodestore from txMap";
221 
222  JLOG(journal_.debug()) << __func__ << " : "
223  << "Flush took "
224  << (end - start).count() / 1000000000.0
225  << " seconds";
226 
227  if (numFlushed == 0)
228  {
229  JLOG(journal_.fatal()) << __func__ << " : "
230  << "Flushed 0 nodes from state map";
231  assert(false);
232  }
233  if (numTxFlushed == 0)
234  {
235  JLOG(journal_.warn()) << __func__ << " : "
236  << "Flushed 0 nodes from tx map";
237  }
238 
239  // Make sure calculated hashes are correct
240  if (ledger->stateMap().getHash().as_uint256() != accountHash)
241  {
242  JLOG(journal_.fatal())
243  << __func__ << " : "
244  << "State map hash does not match. "
245  << "Expected hash = " << strHex(accountHash) << "Actual hash = "
246  << strHex(ledger->stateMap().getHash().as_uint256());
247  Throw<std::runtime_error>("state map hash mismatch");
248  }
249 
250  if (ledger->txMap().getHash().as_uint256() != txHash)
251  {
252  JLOG(journal_.fatal())
253  << __func__ << " : "
254  << "Tx map hash does not match. "
255  << "Expected hash = " << strHex(txHash) << "Actual hash = "
256  << strHex(ledger->txMap().getHash().as_uint256());
257  Throw<std::runtime_error>("tx map hash mismatch");
258  }
259 
260  if (ledger->info().hash != ledgerHash)
261  {
262  JLOG(journal_.fatal())
263  << __func__ << " : "
264  << "Ledger hash does not match. "
265  << "Expected hash = " << strHex(ledgerHash)
266  << "Actual hash = " << strHex(ledger->info().hash);
267  Throw<std::runtime_error>("ledger hash mismatch");
268  }
269 
270  JLOG(journal_.info()) << __func__ << " : "
271  << "Successfully flushed ledger! "
272  << detail::toString(ledger->info());
273 }
274 
275 void
277 {
278  app_.getOPs().pubLedger(ledger);
279 
280  setLastPublish();
281 }
282 
283 bool
284 ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
285 {
286  JLOG(journal_.info()) << __func__ << " : "
287  << "Attempting to publish ledger = "
288  << ledgerSequence;
289  size_t numAttempts = 0;
290  while (!stopping_)
291  {
292  auto ledger = app_.getLedgerMaster().getLedgerBySeq(ledgerSequence);
293 
294  if (!ledger)
295  {
296  JLOG(journal_.warn())
297  << __func__ << " : "
298  << "Trying to publish. Could not find ledger with sequence = "
299  << ledgerSequence;
300  // We try maxAttempts times to publish the ledger, waiting one
301  // second in between each attempt.
302  // If the ledger is not present in the database after maxAttempts,
303  // we attempt to take over as the writer. If the takeover fails,
304  // doContinuousETL will return, and this node will go back to
305  // publishing.
306  // If the node is in strict read only mode, we simply
307  // skip publishing this ledger and return false indicating the
308  // publish failed
309  if (numAttempts >= maxAttempts)
310  {
311  JLOG(journal_.error()) << __func__ << " : "
312  << "Failed to publish ledger after "
313  << numAttempts << " attempts.";
314  if (!readOnly_)
315  {
316  JLOG(journal_.info()) << __func__ << " : "
317  << "Attempting to become ETL writer";
318  return false;
319  }
320  else
321  {
322  JLOG(journal_.debug())
323  << __func__ << " : "
324  << "In strict read-only mode. "
325  << "Skipping publishing this ledger. "
326  << "Beginning fast forward.";
327  return false;
328  }
329  }
330  else
331  {
333  ++numAttempts;
334  }
335  continue;
336  }
337 
338  publishStrand_.post([this, ledger, fname = __func__]() {
339  app_.getOPs().pubLedger(ledger);
340  setLastPublish();
341  JLOG(journal_.info())
342  << fname << " : "
343  << "Published ledger. " << detail::toString(ledger->info());
344  });
345  return true;
346  }
347  return false;
348 }
349 
352 {
353  JLOG(journal_.debug()) << __func__ << " : "
354  << "Attempting to fetch ledger with sequence = "
355  << idx;
356 
358  loadBalancer_.fetchLedger(idx, false);
359  JLOG(journal_.trace()) << __func__ << " : "
360  << "GetLedger reply = " << response->DebugString();
361  return response;
362 }
363 
366 {
367  JLOG(journal_.debug()) << __func__ << " : "
368  << "Attempting to fetch ledger with sequence = "
369  << idx;
370 
372  loadBalancer_.fetchLedger(idx, true);
373  JLOG(journal_.trace()) << __func__ << " : "
374  << "GetLedger reply = " << response->DebugString();
375  return response;
376 }
377 
381  org::xrpl::rpc::v1::GetLedgerResponse& rawData)
382 {
383  JLOG(journal_.info()) << __func__ << " : "
384  << "Beginning ledger update";
385 
386  LedgerInfo lgrInfo =
387  deserializeHeader(makeSlice(rawData.ledger_header()), true);
388 
389  JLOG(journal_.debug()) << __func__ << " : "
390  << "Deserialized ledger header. "
391  << detail::toString(lgrInfo);
392 
393  next->setLedgerInfo(lgrInfo);
394 
395  next->stateMap().clearSynching();
396  next->txMap().clearSynching();
397 
399  insertTransactions(next, rawData)};
400 
401  JLOG(journal_.debug())
402  << __func__ << " : "
403  << "Inserted all transactions. Number of transactions = "
404  << rawData.transactions_list().transactions_size();
405 
406  for (auto& obj : rawData.ledger_objects().objects())
407  {
408  auto key = uint256::fromVoidChecked(obj.key());
409  if (!key)
410  throw std::runtime_error("Recevied malformed object ID");
411 
412  auto& data = obj.data();
413 
414  // indicates object was deleted
415  if (data.size() == 0)
416  {
417  JLOG(journal_.trace()) << __func__ << " : "
418  << "Erasing object = " << *key;
419  if (next->exists(*key))
420  next->rawErase(*key);
421  }
422  else
423  {
424  SerialIter it{data.data(), data.size()};
425  std::shared_ptr<SLE> sle = std::make_shared<SLE>(it, *key);
426 
427  if (next->exists(*key))
428  {
429  JLOG(journal_.trace()) << __func__ << " : "
430  << "Replacing object = " << *key;
431  next->rawReplace(sle);
432  }
433  else
434  {
435  JLOG(journal_.trace()) << __func__ << " : "
436  << "Inserting object = " << *key;
437  next->rawInsert(sle);
438  }
439  }
440  }
441  JLOG(journal_.debug())
442  << __func__ << " : "
443  << "Inserted/modified/deleted all objects. Number of objects = "
444  << rawData.ledger_objects().objects_size();
445 
446  if (!rawData.skiplist_included())
447  {
448  next->updateSkipList();
449  JLOG(journal_.warn())
450  << __func__ << " : "
451  << "tx process is not sending skiplist. This indicates that the tx "
452  "process is parsing metadata instead of doing a SHAMap diff. "
453  "Make sure tx process is running the same code as reporting to "
454  "use SHAMap diff instead of parsing metadata";
455  }
456 
457  JLOG(journal_.debug()) << __func__ << " : "
458  << "Finished ledger update. "
459  << detail::toString(next->info());
460  return {std::move(next), std::move(accountTxData)};
461 }
462 
463 // Database must be populated when this starts
465 ReportingETL::runETLPipeline(uint32_t startSequence)
466 {
467  /*
468  * Behold, mortals! This function spawns three separate threads, which talk
469  * to each other via 2 different thread safe queues and 1 atomic variable.
470  * All threads and queues are function local. This function returns when all
471  * of the threads exit. There are two termination conditions: the first is
472  * if the load thread encounters a write conflict. In this case, the load
473  * thread sets writeConflict, an atomic bool, to true, which signals the
474  * other threads to stop. The second termination condition is when the
475  * entire server is shutting down, which is detected in one of three ways:
476  * 1. isStopping() returns true if the server is shutting down
477  * 2. networkValidatedLedgers_.waitUntilValidatedByNetwork returns
478  * false, signaling the wait was aborted.
479  * 3. fetchLedgerDataAndDiff returns an empty optional, signaling the fetch
480  * was aborted.
481  * In all cases, the extract thread detects this condition,
482  * and pushes an empty optional onto the transform queue. The transform
483  * thread, upon popping an empty optional, pushes an empty optional onto the
484  * load queue, and then returns. The load thread, upon popping an empty
485  * optional, returns.
486  */
487 
488  JLOG(journal_.debug()) << __func__ << " : "
489  << "Starting etl pipeline";
490  writing_ = true;
491 
492  std::shared_ptr<Ledger> parent = std::const_pointer_cast<Ledger>(
493  app_.getLedgerMaster().getLedgerBySeq(startSequence - 1));
494  if (!parent)
495  {
496  assert(false);
497  Throw<std::runtime_error>("runETLPipeline: parent ledger is null");
498  }
499 
500  std::atomic_bool writeConflict = false;
501  std::optional<uint32_t> lastPublishedSequence;
502  constexpr uint32_t maxQueueSize = 1000;
503 
505  transformQueue{maxQueueSize};
506 
507  std::thread extracter{[this,
508  &startSequence,
509  &writeConflict,
510  &transformQueue]() {
511  beast::setCurrentThreadName("rippled: ReportingETL extract");
512  uint32_t currentSequence = startSequence;
513 
514  // there are two stopping conditions here.
515  // First, if there is a write conflict in the load thread, the ETL
516  // mechanism should stop.
517  // The other stopping condition is if the entire server is shutting
518  // down. This can be detected in a variety of ways. See the comment
519  // at the top of the function
521  currentSequence) &&
522  !writeConflict && !isStopping())
523  {
526  fetchLedgerDataAndDiff(currentSequence)};
527  // if the fetch is unsuccessful, stop. fetchLedger only returns
528  // false if the server is shutting down, or if the ledger was
529  // found in the database (which means another process already
530  // wrote the ledger that this process was trying to extract;
531  // this is a form of a write conflict). Otherwise,
532  // fetchLedgerDataAndDiff will keep trying to fetch the
533  // specified ledger until successful
534  if (!fetchResponse)
535  {
536  break;
537  }
538  auto end = std::chrono::system_clock::now();
539 
540  auto time = ((end - start).count()) / 1000000000.0;
541  auto tps =
542  fetchResponse->transactions_list().transactions_size() / time;
543 
544  JLOG(journal_.debug()) << "Extract phase time = " << time
545  << " . Extract phase tps = " << tps;
546 
547  transformQueue.push(std::move(fetchResponse));
548  ++currentSequence;
549  }
550  // empty optional tells the transformer to shut down
551  transformQueue.push({});
552  }};
553 
557  loadQueue{maxQueueSize};
558  std::thread transformer{[this,
559  &parent,
560  &writeConflict,
561  &loadQueue,
562  &transformQueue]() {
563  beast::setCurrentThreadName("rippled: ReportingETL transform");
564 
565  assert(parent);
566  parent = std::make_shared<Ledger>(*parent, NetClock::time_point{});
567  while (!writeConflict)
568  {
570  transformQueue.pop()};
571  // if fetchResponse is an empty optional, the extracter thread has
572  // stopped and the transformer should stop as well
573  if (!fetchResponse)
574  {
575  break;
576  }
577  if (isStopping())
578  continue;
579 
581  auto [next, accountTxData] =
582  buildNextLedger(parent, *fetchResponse);
583  auto end = std::chrono::system_clock::now();
584 
585  auto duration = ((end - start).count()) / 1000000000.0;
586  JLOG(journal_.debug()) << "transform time = " << duration;
587  // The below line needs to execute before pushing to the queue, in
588  // order to prevent this thread and the loader thread from accessing
589  // the same SHAMap concurrently
590  parent = std::make_shared<Ledger>(*next, NetClock::time_point{});
591  loadQueue.push(
592  std::make_pair(std::move(next), std::move(accountTxData)));
593  }
594  // empty optional tells the loader to shutdown
595  loadQueue.push({});
596  }};
597 
598  std::thread loader{
599  [this, &lastPublishedSequence, &loadQueue, &writeConflict]() {
600  beast::setCurrentThreadName("rippled: ReportingETL load");
601  size_t totalTransactions = 0;
602  double totalTime = 0;
603  while (!writeConflict)
604  {
608  result{loadQueue.pop()};
609  // if result is an empty optional, the transformer thread has
610  // stopped and the loader should stop as well
611  if (!result)
612  break;
613  if (isStopping())
614  continue;
615 
616  auto& ledger = result->first;
617  auto& accountTxData = result->second;
618 
620  // write to the key-value store
621  flushLedger(ledger);
622 
623  auto mid = std::chrono::system_clock::now();
624  // write to RDBMS
625  // if there is a write conflict, some other process has already
626  // written this ledger and has taken over as the ETL writer
627 #ifdef RIPPLED_REPORTING
628  if (!dynamic_cast<RelationalDBInterfacePostgres*>(
631  ledger->info(), accountTxData))
632  writeConflict = true;
633 #endif
634  auto end = std::chrono::system_clock::now();
635 
636  if (!writeConflict)
637  {
638  publishLedger(ledger);
639  lastPublishedSequence = ledger->info().seq;
640  }
641  // print some performance numbers
642  auto kvTime = ((mid - start).count()) / 1000000000.0;
643  auto relationalTime = ((end - mid).count()) / 1000000000.0;
644 
645  size_t numTxns = accountTxData.size();
646  totalTime += kvTime;
647  totalTransactions += numTxns;
648  JLOG(journal_.info())
649  << "Load phase of etl : "
650  << "Successfully published ledger! Ledger info: "
651  << detail::toString(ledger->info())
652  << ". txn count = " << numTxns
653  << ". key-value write time = " << kvTime
654  << ". relational write time = " << relationalTime
655  << ". key-value tps = " << numTxns / kvTime
656  << ". relational tps = " << numTxns / relationalTime
657  << ". total key-value tps = "
658  << totalTransactions / totalTime;
659  }
660  }};
661 
662  // wait for all of the threads to stop
663  loader.join();
664  extracter.join();
665  transformer.join();
666  writing_ = false;
667 
668  JLOG(journal_.debug()) << __func__ << " : "
669  << "Stopping etl pipeline";
670 
671  return lastPublishedSequence;
672 }
673 
674 // main loop. The software begins monitoring the ledgers that are validated
675 // by the nework. The member networkValidatedLedgers_ keeps track of the
676 // sequences of ledgers validated by the network. Whenever a ledger is validated
677 // by the network, the software looks for that ledger in the database. Once the
678 // ledger is found in the database, the software publishes that ledger to the
679 // ledgers stream. If a network validated ledger is not found in the database
680 // after a certain amount of time, then the software attempts to take over
681 // responsibility of the ETL process, where it writes new ledgers to the
682 // database. The software will relinquish control of the ETL process if it
683 // detects that another process has taken over ETL.
684 void
686 {
687  auto ledger = std::const_pointer_cast<Ledger>(
689  if (!ledger)
690  {
691  JLOG(journal_.info()) << __func__ << " : "
692  << "Database is empty. Will download a ledger "
693  "from the network.";
694  if (startSequence_)
695  {
696  JLOG(journal_.info())
697  << __func__ << " : "
698  << "ledger sequence specified in config. "
699  << "Will begin ETL process starting with ledger "
700  << *startSequence_;
702  }
703  else
704  {
705  JLOG(journal_.info())
706  << __func__ << " : "
707  << "Waiting for next ledger to be validated by network...";
708  std::optional<uint32_t> mostRecentValidated =
710  if (mostRecentValidated)
711  {
712  JLOG(journal_.info()) << __func__ << " : "
713  << "Ledger " << *mostRecentValidated
714  << " has been validated. "
715  << "Downloading...";
716  ledger = loadInitialLedger(*mostRecentValidated);
717  }
718  else
719  {
720  JLOG(journal_.info()) << __func__ << " : "
721  << "The wait for the next validated "
722  << "ledger has been aborted. "
723  << "Exiting monitor loop";
724  return;
725  }
726  }
727  }
728  else
729  {
730  if (startSequence_)
731  {
732  Throw<std::runtime_error>(
733  "start sequence specified but db is already populated");
734  }
735  JLOG(journal_.info())
736  << __func__ << " : "
737  << "Database already populated. Picking up from the tip of history";
738  }
739  if (!ledger)
740  {
741  JLOG(journal_.error())
742  << __func__ << " : "
743  << "Failed to load initial ledger. Exiting monitor loop";
744  return;
745  }
746  else
747  {
748  publishLedger(ledger);
749  }
750  uint32_t nextSequence = ledger->info().seq + 1;
751 
752  JLOG(journal_.debug()) << __func__ << " : "
753  << "Database is populated. "
754  << "Starting monitor loop. sequence = "
755  << nextSequence;
756  while (!stopping_ &&
758  {
759  JLOG(journal_.info()) << __func__ << " : "
760  << "Ledger with sequence = " << nextSequence
761  << " has been validated by the network. "
762  << "Attempting to find in database and publish";
763  // Attempt to take over responsibility of ETL writer after 10 failed
764  // attempts to publish the ledger. publishLedger() fails if the
765  // ledger that has been validated by the network is not found in the
766  // database after the specified number of attempts. publishLedger()
767  // waits one second between each attempt to read the ledger from the
768  // database
769  //
770  // In strict read-only mode, when the software fails to find a
771  // ledger in the database that has been validated by the network,
772  // the software will only try to publish subsequent ledgers once,
773  // until one of those ledgers is found in the database. Once the
774  // software successfully publishes a ledger, the software will fall
775  // back to the normal behavior of trying several times to publish
776  // the ledger that has been validated by the network. In this
777  // manner, a reporting processing running in read-only mode does not
778  // need to restart if the database is wiped.
779  constexpr size_t timeoutSeconds = 10;
780  bool success = publishLedger(nextSequence, timeoutSeconds);
781  if (!success)
782  {
783  JLOG(journal_.warn())
784  << __func__ << " : "
785  << "Failed to publish ledger with sequence = " << nextSequence
786  << " . Beginning ETL";
787  // doContinousETLPipelined returns the most recent sequence
788  // published empty optional if no sequence was published
789  std::optional<uint32_t> lastPublished =
790  runETLPipeline(nextSequence);
791  JLOG(journal_.info()) << __func__ << " : "
792  << "Aborting ETL. Falling back to publishing";
793  // if no ledger was published, don't increment nextSequence
794  if (lastPublished)
795  nextSequence = *lastPublished + 1;
796  }
797  else
798  {
799  ++nextSequence;
800  }
801  }
802 }
803 
804 void
806 {
807  JLOG(journal_.debug()) << "Starting reporting in strict read only mode";
808  std::optional<uint32_t> mostRecent =
810  if (!mostRecent)
811  return;
812  uint32_t sequence = *mostRecent;
813  bool success = true;
814  while (!stopping_ &&
816  {
817  success = publishLedger(sequence, success ? 30 : 1);
818  ++sequence;
819  }
820 }
821 
822 void
824 {
825  worker_ = std::thread([this]() {
826  beast::setCurrentThreadName("rippled: ReportingETL worker");
827  if (readOnly_)
828  monitorReadOnly();
829  else
830  monitor();
831  });
832 }
833 
835  : app_(app)
836  , journal_(app.journal("ReportingETL"))
837  , publishStrand_(app_.getIOService())
838  , loadBalancer_(*this)
839 {
840  // if present, get endpoint from config
841  if (app_.config().exists("reporting"))
842  {
843 #ifndef RIPPLED_REPORTING
844  Throw<std::runtime_error>(
845  "Config file specifies reporting, but software was not built with "
846  "-Dreporting=1. To use reporting, configure CMake with "
847  "-Dreporting=1");
848 #endif
849  if (!app_.config().useTxTables())
850  Throw<std::runtime_error>(
851  "Reporting requires tx tables. Set use_tx_tables=1 in config "
852  "file, under [ledger_tx_tables] section");
853  Section section = app_.config().section("reporting");
854 
855  JLOG(journal_.debug()) << "Parsing config info";
856 
857  auto& vals = section.values();
858  for (auto& v : vals)
859  {
860  JLOG(journal_.debug()) << "val is " << v;
861  Section source = app_.config().section(v);
862 
863  auto optIp = source.get("source_ip");
864  if (!optIp)
865  continue;
866 
867  auto optWsPort = source.get("source_ws_port");
868  if (!optWsPort)
869  continue;
870 
871  auto optGrpcPort = source.get("source_grpc_port");
872  if (!optGrpcPort)
873  {
874  // add source without grpc port
875  // used in read-only mode to detect when new ledgers have
876  // been validated. Used for publishing
877  if (app_.config().reportingReadOnly())
878  loadBalancer_.add(*optIp, *optWsPort);
879  continue;
880  }
881 
882  loadBalancer_.add(*optIp, *optWsPort, *optGrpcPort);
883  }
884 
885  // this is true iff --reportingReadOnly was passed via command line
887 
888  // if --reportingReadOnly was not passed via command line, check config
889  // file. Command line takes precedence
890  if (!readOnly_)
891  {
892  auto const optRO = section.get("read_only");
893  if (optRO)
894  {
895  readOnly_ = (*optRO == "true" || *optRO == "1");
897  }
898  }
899 
900  // lambda throws a useful message if string to integer conversion fails
901  auto asciiToIntThrows =
902  [](auto& dest, std::string const& src, char const* onError) {
903  char const* const srcEnd = src.data() + src.size();
904  auto [ptr, err] = std::from_chars(src.data(), srcEnd, dest);
905 
906  if (err == std::errc())
907  // skip whitespace at end of string
908  while (ptr != srcEnd &&
909  std::isspace(static_cast<unsigned char>(*ptr)))
910  ++ptr;
911 
912  // throw if
913  // o conversion error or
914  // o entire string is not consumed
915  if (err != std::errc() || ptr != srcEnd)
916  Throw<std::runtime_error>(onError + src);
917  };
918 
919  // handle command line arguments
920  if (app_.config().START_UP == Config::StartUpType::FRESH && !readOnly_)
921  {
922  asciiToIntThrows(
925  "Expected integral START_LEDGER command line argument. Got: ");
926  }
927  // if not passed via command line, check config for start sequence
928  if (!startSequence_)
929  {
930  auto const optStartSeq = section.get("start_sequence");
931  if (optStartSeq)
932  asciiToIntThrows(
934  *optStartSeq,
935  "Expected integral start_sequence config entry. Got: ");
936  }
937 
938  auto const optFlushInterval = section.get("flush_interval");
939  if (optFlushInterval)
940  asciiToIntThrows(
942  *optFlushInterval,
943  "Expected integral flush_interval config entry. Got: ");
944 
945  auto const optNumMarkers = section.get("num_markers");
946  if (optNumMarkers)
947  asciiToIntThrows(
948  numMarkers_,
949  *optNumMarkers,
950  "Expected integral num_markers config entry. Got: ");
951  }
952 }
953 
954 } // namespace ripple
ripple::NetworkOPs::pubLedger
virtual void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted)=0
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
ripple::ReportingETL::flushInterval_
size_t flushInterval_
Used to determine when to write to the database during the initial ledger download.
Definition: ReportingETL.h:115
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::Application
Definition: Application.h:115
ripple::Application::getNodeFamily
virtual Family & getNodeFamily()=0
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
std::this_thread::sleep_for
T sleep_for(T... args)
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:240
ripple::ReportingETL::fetchLedgerData
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerData(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:351
ripple::ReportingETL::loadInitialLedger
std::shared_ptr< Ledger > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full, via GetLedgerData, and write the data to the datab...
Definition: ReportingETL.cpp:107
ripple::ReportingETL::startSequence_
std::optional< uint32_t > startSequence_
Ledger sequence to start ETL from.
Definition: ReportingETL.h:139
std::string
STL class.
std::shared_ptr< Ledger >
ripple::LedgerInfo::parentHash
uint256 parentHash
Definition: ReadView.h:103
ripple::ThreadSafeQueue
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:105
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
std::pair
ripple::ReportingETL::setLastPublish
void setLastPublish()
Definition: ReportingETL.h:155
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:100
ripple::ReportingETL::insertTransactions
std::vector< AccountTransactionsData > insertTransactions(std::shared_ptr< Ledger > &ledger, org::xrpl::rpc::v1::GetLedgerResponse &data)
Insert all of the extracted transactions into the ledger.
Definition: ReportingETL.cpp:76
ripple::ThreadSafeQueue::push
void push(T const &elt)
Definition: ETLHelpers.h:126
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:164
charconv
ripple::ReportingETL::flushLedger
void flushLedger(std::shared_ptr< Ledger > &ledger)
Write all new data to the key-value store.
Definition: ReportingETL.cpp:183
ripple::hotACCOUNT_NODE
@ hotACCOUNT_NODE
Definition: NodeObject.h:35
std::vector
STL class.
ripple::Application::getRelationalDBInterface
virtual RelationalDBInterface & getRelationalDBInterface()=0
ripple::ReportingETL::loadBalancer_
ETLLoadBalancer loadBalancer_
Mechanism for communicating with ETL sources.
Definition: ReportingETL.h:96
std::chrono::seconds
ripple::ETLLoadBalancer::fetchLedger
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
Definition: ETLSource.cpp:699
ripple::NetworkValidatedLedgers::waitUntilValidatedByNetwork
bool waitUntilValidatedByNetwork(uint32_t sequence)
Waits for the sequence to be validated by the network.
Definition: ETLHelpers.h:79
ripple::NodeStore::Database::sync
virtual void sync()=0
std::stringstream
STL class.
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
ripple::NodeStore::Database::store
virtual void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t ledgerSeq)=0
Store the object.
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:92
ripple::hotTRANSACTION_NODE
@ hotTRANSACTION_NODE
Definition: NodeObject.h:36
ripple::base_uint< 256 >::fromVoidChecked
static std::optional< base_uint > fromVoidChecked(T const &from)
Definition: base_uint.h:312
iostream
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:101
ripple::RelationalDBInterfacePostgres
Definition: RelationalDBInterfacePostgres.h:27
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::Section::values
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:77
ripple::ReportingETL::networkValidatedLedgers_
NetworkValidatedLedgers networkValidatedLedgers_
Mechanism for detecting when the network has validated a new ledger.
Definition: ReportingETL.h:100
ripple::TxMeta
Definition: TxMeta.h:32
ripple::base_uint< 256 >
ripple::writeLedgerAndTransactions
bool writeLedgerAndTransactions(std::shared_ptr< PgPool > const &pgPool, LedgerInfo const &info, std::vector< AccountTransactionsData > const &accountTxData, beast::Journal &j)
writeLedgerAndTransactions Write new ledger and transaction data to Postgres.
Definition: RelationalDBInterface_postgres.cpp:762
ripple::ReportingETL::publishLedger
bool publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts=10)
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition: ReportingETL.cpp:284
ripple::ReportingETL::journal_
beast::Journal journal_
Definition: ReportingETL.h:75
ripple::Config::reporting
bool reporting() const
Definition: Config.h:308
std::thread
STL class.
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::ReportingETL::writing_
std::atomic_bool writing_
Whether the process is writing to the database. Used by server_info.
Definition: ReportingETL.h:134
ripple::ReportingETL::readOnly_
bool readOnly_
Whether the process is in strict read-only mode.
Definition: ReportingETL.h:131
ripple::ReportingETL::numMarkers_
size_t numMarkers_
This variable controls the number of GetLedgerData calls that will be executed in parallel during the...
Definition: ReportingETL.h:126
ripple::ETLLoadBalancer::loadInitialLedger
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
Definition: ETLSource.cpp:680
ripple::Application::config
virtual Config & config()=0
ripple::Config::useTxTables
bool useTxTables() const
Definition: Config.h:314
ripple::ReportingETL::monitorReadOnly
void monitorReadOnly()
Monitor the database for newly written ledgers.
Definition: ReportingETL.cpp:805
ripple::detail::toString
std::string toString(LedgerInfo const &info)
Convenience function for printing out basic ledger info.
Definition: ReportingETL.cpp:42
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::chrono::time_point
ripple::ReportingETL::publishStrand_
boost::asio::io_context::strand publishStrand_
Strand to ensure that ledgers are published in order.
Definition: ReportingETL.h:91
ripple::STTx
Definition: STTx.h:43
ripple::LedgerMaster::getLedgerBySeq
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
Definition: LedgerMaster.cpp:1767
std::errc
std::runtime_error
STL class.
ripple::SerialIter
Definition: Serializer.h:310
ripple::Config::START_UP
StartUpType START_UP
Definition: Config.h:133
std::atomic_bool
std::from_chars
T from_chars(T... args)
ripple::ReportingETL::fetchLedgerDataAndDiff
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerDataAndDiff(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:365
ripple::NetworkValidatedLedgers::getMostRecent
std::optional< uint32_t > getMostRecent()
Get most recently validated sequence.
Definition: ETLHelpers.h:67
ripple::Serializer
Definition: Serializer.h:39
ripple::Config::setReportingReadOnly
void setReportingReadOnly(bool b)
Definition: Config.h:326
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ReportingETL::stopping_
std::atomic_bool stopping_
Whether the software is stopping.
Definition: ReportingETL.h:103
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::deserializeHeader
LedgerInfo deserializeHeader(Slice data, bool hasHash)
Deserialize a ledger header from a byte array.
Definition: InboundLedger.cpp:269
cstdlib
ripple::LedgerMaster::getValidatedLedger
std::shared_ptr< Ledger const > getValidatedLedger()
Definition: LedgerMaster.cpp:1612
ripple::Section::get
std::optional< T > get(std::string const &name) const
Definition: BasicConfig.h:138
cctype
ripple::ReportingETL::app_
Application & app_
Definition: ReportingETL.h:73
std::optional
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::ReportingETL::start
void start()
start all of the necessary components and begin ETL
Definition: ReportingETL.h:326
ripple::ReportingETL::worker_
std::thread worker_
Definition: ReportingETL.h:77
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
std::make_pair
T make_pair(T... args)
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:84
ripple::strHex
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:45
ripple::Config::reportingReadOnly
bool reportingReadOnly() const
Definition: Config.h:320
ripple::ReportingETL::buildNextLedger
std::pair< std::shared_ptr< Ledger >, std::vector< AccountTransactionsData > > buildNextLedger(std::shared_ptr< Ledger > &parent, org::xrpl::rpc::v1::GetLedgerResponse &rawData)
Build the next ledger using the previous ledger and the extracted data.
Definition: ReportingETL.cpp:379
ripple::ReportingETL::monitor
void monitor()
Monitor the network for newly validated ledgers.
Definition: ReportingETL.cpp:685
ripple::ReportingETL::ReportingETL
ReportingETL(Application &app)
Definition: ReportingETL.cpp:834
ripple::Config::START_LEDGER
std::string START_LEDGER
Definition: Config.h:137
ripple::ReportingETL::isStopping
bool isStopping() const
Definition: ReportingETL.h:282
ripple::ReportingETL::consumeLedgerData
void consumeLedgerData(std::shared_ptr< Ledger > &ledger, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Consume data from a queue and insert that data into the ledger This function will continue to pull fr...
Definition: ReportingETL.cpp:54
ripple::ReportingETL::doWork
void doWork()
Definition: ReportingETL.cpp:823
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:102
ripple::ReportingETL::runETLPipeline
std::optional< uint32_t > runETLPipeline(uint32_t startSequence)
Run ETL.
Definition: ReportingETL.cpp:465
ripple::RelationalDBInterfacePostgres::writeLedgerAndTransactions
virtual bool writeLedgerAndTransactions(LedgerInfo const &info, std::vector< AccountTransactionsData > const &accountTxData)=0
writeLedgerAndTransactions Write new ledger and transaction data into database.
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:121
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:127
variant
string
ripple::ETLLoadBalancer::add
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.
Definition: ETLSource.cpp:657
std::chrono::system_clock::now
T now(T... args)