rippled
ReportingETL.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/rdb/backend/PostgresDatabase.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 
23 #include <ripple/beast/core/CurrentThreadName.h>
24 #include <ripple/json/json_reader.h>
25 #include <ripple/json/json_writer.h>
26 #include <boost/asio/connect.hpp>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/beast/core.hpp>
29 #include <boost/beast/websocket.hpp>
30 #include <cctype>
31 #include <charconv>
32 #include <cstdlib>
33 #include <iostream>
34 #include <string>
35 #include <variant>
36 
37 namespace ripple {
38 
39 namespace detail {
42 toString(LedgerInfo const& info)
43 {
45  ss << "LedgerInfo { Sequence : " << info.seq
46  << " Hash : " << strHex(info.hash) << " TxHash : " << strHex(info.txHash)
47  << " AccountHash : " << strHex(info.accountHash)
48  << " ParentHash : " << strHex(info.parentHash) << " }";
49  return ss.str();
50 }
51 } // namespace detail
52 
53 void
57 {
59  size_t num = 0;
60  while (!stopping_ && (sle = writeQueue.pop()))
61  {
62  assert(sle);
63  if (!ledger->exists(sle->key()))
64  ledger->rawInsert(sle);
65 
66  if (flushInterval_ != 0 && (num % flushInterval_) == 0)
67  {
68  JLOG(journal_.debug()) << "Flushing! key = " << strHex(sle->key());
69  ledger->stateMap().flushDirty(hotACCOUNT_NODE);
70  }
71  ++num;
72  }
73 }
74 
78  org::xrpl::rpc::v1::GetLedgerResponse& data)
79 {
81  for (auto& txn : data.transactions_list().transactions())
82  {
83  auto& raw = txn.transaction_blob();
84 
85  SerialIter it{raw.data(), raw.size()};
86  STTx sttx{it};
87 
88  auto txSerializer = std::make_shared<Serializer>(sttx.getSerializer());
89 
90  TxMeta txMeta{
91  sttx.getTransactionID(), ledger->info().seq, txn.metadata_blob()};
92 
93  auto metaSerializer =
94  std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
95 
96  JLOG(journal_.trace())
97  << __func__ << " : "
98  << "Inserting transaction = " << sttx.getTransactionID();
99  uint256 nodestoreHash = ledger->rawTxInsertWithHash(
100  sttx.getTransactionID(), txSerializer, metaSerializer);
101  accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal_);
102  }
103  return accountTxData;
104 }
105 
107 ReportingETL::loadInitialLedger(uint32_t startingSequence)
108 {
109  // check that database is actually empty
110  auto ledger = std::const_pointer_cast<Ledger>(
112  if (ledger)
113  {
114  JLOG(journal_.fatal()) << __func__ << " : "
115  << "Database is not empty";
116  assert(false);
117  return {};
118  }
119 
120  // fetch the ledger from the network. This function will not return until
121  // either the fetch is successful, or the server is being shutdown. This
122  // only fetches the ledger header and the transactions+metadata
124  fetchLedgerData(startingSequence)};
125  if (!ledgerData)
126  return {};
127 
128  LedgerInfo lgrInfo =
129  deserializeHeader(makeSlice(ledgerData->ledger_header()), true);
130 
131  JLOG(journal_.debug()) << __func__ << " : "
132  << "Deserialized ledger header. "
133  << detail::toString(lgrInfo);
134 
135  ledger =
136  std::make_shared<Ledger>(lgrInfo, app_.config(), app_.getNodeFamily());
137  ledger->stateMap().clearSynching();
138  ledger->txMap().clearSynching();
139 
140 #ifdef RIPPLED_REPORTING
142  insertTransactions(ledger, *ledgerData);
143 #endif
144 
146 
148  std::thread asyncWriter{[this, &ledger, &writeQueue]() {
149  consumeLedgerData(ledger, writeQueue);
150  }};
151 
152  // download the full account state map. This function downloads full ledger
153  // data and pushes the downloaded data into the writeQueue. asyncWriter
154  // consumes from the queue and inserts the data into the Ledger object.
155  // Once the below call returns, all data has been pushed into the queue
156  loadBalancer_.loadInitialLedger(startingSequence, writeQueue);
157 
158  // null is used to represent the end of the queue
160  writeQueue.push(null);
161  // wait for the writer to finish
162  asyncWriter.join();
163 
164  if (!stopping_)
165  {
166  flushLedger(ledger);
167  if (app_.config().reporting())
168  {
169 #ifdef RIPPLED_REPORTING
170  dynamic_cast<PostgresDatabase*>(&app_.getRelationalDatabase())
171  ->writeLedgerAndTransactions(ledger->info(), accountTxData);
172 #endif
173  }
174  }
175  auto end = std::chrono::system_clock::now();
176  JLOG(journal_.debug()) << "Time to download and store ledger = "
177  << ((end - start).count()) / 1000000000.0;
178  return ledger;
179 }
180 
181 void
183 {
184  JLOG(journal_.debug()) << __func__ << " : "
185  << "Flushing ledger. "
186  << detail::toString(ledger->info());
187  // These are recomputed in setImmutable
188  auto& accountHash = ledger->info().accountHash;
189  auto& txHash = ledger->info().txHash;
190  auto& ledgerHash = ledger->info().hash;
191 
192  ledger->setImmutable(app_.config(), false);
194 
195  auto numFlushed = ledger->stateMap().flushDirty(hotACCOUNT_NODE);
196 
197  auto numTxFlushed = ledger->txMap().flushDirty(hotTRANSACTION_NODE);
198 
199  {
200  Serializer s(128);
202  addRaw(ledger->info(), s);
204  hotLEDGER,
205  std::move(s.modData()),
206  ledger->info().hash,
207  ledger->info().seq);
208  }
209 
210  app_.getNodeStore().sync();
211 
212  auto end = std::chrono::system_clock::now();
213 
214  JLOG(journal_.debug()) << __func__ << " : "
215  << "Flushed " << numFlushed
216  << " nodes to nodestore from stateMap";
217  JLOG(journal_.debug()) << __func__ << " : "
218  << "Flushed " << numTxFlushed
219  << " nodes to nodestore from txMap";
220 
221  JLOG(journal_.debug()) << __func__ << " : "
222  << "Flush took "
223  << (end - start).count() / 1000000000.0
224  << " seconds";
225 
226  if (numFlushed == 0)
227  {
228  JLOG(journal_.fatal()) << __func__ << " : "
229  << "Flushed 0 nodes from state map";
230  assert(false);
231  }
232  if (numTxFlushed == 0)
233  {
234  JLOG(journal_.warn()) << __func__ << " : "
235  << "Flushed 0 nodes from tx map";
236  }
237 
238  // Make sure calculated hashes are correct
239  if (ledger->stateMap().getHash().as_uint256() != accountHash)
240  {
241  JLOG(journal_.fatal())
242  << __func__ << " : "
243  << "State map hash does not match. "
244  << "Expected hash = " << strHex(accountHash) << "Actual hash = "
245  << strHex(ledger->stateMap().getHash().as_uint256());
246  Throw<std::runtime_error>("state map hash mismatch");
247  }
248 
249  if (ledger->txMap().getHash().as_uint256() != txHash)
250  {
251  JLOG(journal_.fatal())
252  << __func__ << " : "
253  << "Tx map hash does not match. "
254  << "Expected hash = " << strHex(txHash) << "Actual hash = "
255  << strHex(ledger->txMap().getHash().as_uint256());
256  Throw<std::runtime_error>("tx map hash mismatch");
257  }
258 
259  if (ledger->info().hash != ledgerHash)
260  {
261  JLOG(journal_.fatal())
262  << __func__ << " : "
263  << "Ledger hash does not match. "
264  << "Expected hash = " << strHex(ledgerHash)
265  << "Actual hash = " << strHex(ledger->info().hash);
266  Throw<std::runtime_error>("ledger hash mismatch");
267  }
268 
269  JLOG(journal_.info()) << __func__ << " : "
270  << "Successfully flushed ledger! "
271  << detail::toString(ledger->info());
272 }
273 
274 void
276 {
277  app_.getOPs().pubLedger(ledger);
278 
279  setLastPublish();
280 }
281 
282 bool
283 ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
284 {
285  JLOG(journal_.info()) << __func__ << " : "
286  << "Attempting to publish ledger = "
287  << ledgerSequence;
288  size_t numAttempts = 0;
289  while (!stopping_)
290  {
291  auto ledger = app_.getLedgerMaster().getLedgerBySeq(ledgerSequence);
292 
293  if (!ledger)
294  {
295  JLOG(journal_.warn())
296  << __func__ << " : "
297  << "Trying to publish. Could not find ledger with sequence = "
298  << ledgerSequence;
299  // We try maxAttempts times to publish the ledger, waiting one
300  // second in between each attempt.
301  // If the ledger is not present in the database after maxAttempts,
302  // we attempt to take over as the writer. If the takeover fails,
303  // doContinuousETL will return, and this node will go back to
304  // publishing.
305  // If the node is in strict read only mode, we simply
306  // skip publishing this ledger and return false indicating the
307  // publish failed
308  if (numAttempts >= maxAttempts)
309  {
310  JLOG(journal_.error()) << __func__ << " : "
311  << "Failed to publish ledger after "
312  << numAttempts << " attempts.";
313  if (!readOnly_)
314  {
315  JLOG(journal_.info()) << __func__ << " : "
316  << "Attempting to become ETL writer";
317  return false;
318  }
319  else
320  {
321  JLOG(journal_.debug())
322  << __func__ << " : "
323  << "In strict read-only mode. "
324  << "Skipping publishing this ledger. "
325  << "Beginning fast forward.";
326  return false;
327  }
328  }
329  else
330  {
332  ++numAttempts;
333  }
334  continue;
335  }
336 
337  publishStrand_.post([this, ledger, fname = __func__]() {
338  app_.getOPs().pubLedger(ledger);
339  setLastPublish();
340  JLOG(journal_.info())
341  << fname << " : "
342  << "Published ledger. " << detail::toString(ledger->info());
343  });
344  return true;
345  }
346  return false;
347 }
348 
351 {
352  JLOG(journal_.debug()) << __func__ << " : "
353  << "Attempting to fetch ledger with sequence = "
354  << idx;
355 
357  loadBalancer_.fetchLedger(idx, false);
358  JLOG(journal_.trace()) << __func__ << " : "
359  << "GetLedger reply = " << response->DebugString();
360  return response;
361 }
362 
365 {
366  JLOG(journal_.debug()) << __func__ << " : "
367  << "Attempting to fetch ledger with sequence = "
368  << idx;
369 
371  loadBalancer_.fetchLedger(idx, true);
372  JLOG(journal_.trace()) << __func__ << " : "
373  << "GetLedger reply = " << response->DebugString();
374  return response;
375 }
376 
380  org::xrpl::rpc::v1::GetLedgerResponse& rawData)
381 {
382  JLOG(journal_.info()) << __func__ << " : "
383  << "Beginning ledger update";
384 
385  LedgerInfo lgrInfo =
386  deserializeHeader(makeSlice(rawData.ledger_header()), true);
387 
388  JLOG(journal_.debug()) << __func__ << " : "
389  << "Deserialized ledger header. "
390  << detail::toString(lgrInfo);
391 
392  next->setLedgerInfo(lgrInfo);
393 
394  next->stateMap().clearSynching();
395  next->txMap().clearSynching();
396 
398  insertTransactions(next, rawData)};
399 
400  JLOG(journal_.debug())
401  << __func__ << " : "
402  << "Inserted all transactions. Number of transactions = "
403  << rawData.transactions_list().transactions_size();
404 
405  for (auto& obj : rawData.ledger_objects().objects())
406  {
407  auto key = uint256::fromVoidChecked(obj.key());
408  if (!key)
409  throw std::runtime_error("Recevied malformed object ID");
410 
411  auto& data = obj.data();
412 
413  // indicates object was deleted
414  if (data.size() == 0)
415  {
416  JLOG(journal_.trace()) << __func__ << " : "
417  << "Erasing object = " << *key;
418  if (next->exists(*key))
419  next->rawErase(*key);
420  }
421  else
422  {
423  SerialIter it{data.data(), data.size()};
424  std::shared_ptr<SLE> sle = std::make_shared<SLE>(it, *key);
425 
426  if (next->exists(*key))
427  {
428  JLOG(journal_.trace()) << __func__ << " : "
429  << "Replacing object = " << *key;
430  next->rawReplace(sle);
431  }
432  else
433  {
434  JLOG(journal_.trace()) << __func__ << " : "
435  << "Inserting object = " << *key;
436  next->rawInsert(sle);
437  }
438  }
439  }
440  JLOG(journal_.debug())
441  << __func__ << " : "
442  << "Inserted/modified/deleted all objects. Number of objects = "
443  << rawData.ledger_objects().objects_size();
444 
445  if (!rawData.skiplist_included())
446  {
447  next->updateSkipList();
448  JLOG(journal_.warn())
449  << __func__ << " : "
450  << "tx process is not sending skiplist. This indicates that the tx "
451  "process is parsing metadata instead of doing a SHAMap diff. "
452  "Make sure tx process is running the same code as reporting to "
453  "use SHAMap diff instead of parsing metadata";
454  }
455 
456  JLOG(journal_.debug()) << __func__ << " : "
457  << "Finished ledger update. "
458  << detail::toString(next->info());
459  return {std::move(next), std::move(accountTxData)};
460 }
461 
462 // Database must be populated when this starts
464 ReportingETL::runETLPipeline(uint32_t startSequence)
465 {
466  /*
467  * Behold, mortals! This function spawns three separate threads, which talk
468  * to each other via 2 different thread safe queues and 1 atomic variable.
469  * All threads and queues are function local. This function returns when all
470  * of the threads exit. There are two termination conditions: the first is
471  * if the load thread encounters a write conflict. In this case, the load
472  * thread sets writeConflict, an atomic bool, to true, which signals the
473  * other threads to stop. The second termination condition is when the
474  * entire server is shutting down, which is detected in one of three ways:
475  * 1. isStopping() returns true if the server is shutting down
476  * 2. networkValidatedLedgers_.waitUntilValidatedByNetwork returns
477  * false, signaling the wait was aborted.
478  * 3. fetchLedgerDataAndDiff returns an empty optional, signaling the fetch
479  * was aborted.
480  * In all cases, the extract thread detects this condition,
481  * and pushes an empty optional onto the transform queue. The transform
482  * thread, upon popping an empty optional, pushes an empty optional onto the
483  * load queue, and then returns. The load thread, upon popping an empty
484  * optional, returns.
485  */
486 
487  JLOG(journal_.debug()) << __func__ << " : "
488  << "Starting etl pipeline";
489  writing_ = true;
490 
491  std::shared_ptr<Ledger> parent = std::const_pointer_cast<Ledger>(
492  app_.getLedgerMaster().getLedgerBySeq(startSequence - 1));
493  if (!parent)
494  {
495  assert(false);
496  Throw<std::runtime_error>("runETLPipeline: parent ledger is null");
497  }
498 
499  std::atomic_bool writeConflict = false;
500  std::optional<uint32_t> lastPublishedSequence;
501  constexpr uint32_t maxQueueSize = 1000;
502 
504  transformQueue{maxQueueSize};
505 
506  std::thread extracter{[this,
507  &startSequence,
508  &writeConflict,
509  &transformQueue]() {
510  beast::setCurrentThreadName("rippled: ReportingETL extract");
511  uint32_t currentSequence = startSequence;
512 
513  // there are two stopping conditions here.
514  // First, if there is a write conflict in the load thread, the ETL
515  // mechanism should stop.
516  // The other stopping condition is if the entire server is shutting
517  // down. This can be detected in a variety of ways. See the comment
518  // at the top of the function
520  currentSequence) &&
521  !writeConflict && !isStopping())
522  {
525  fetchLedgerDataAndDiff(currentSequence)};
526  // if the fetch is unsuccessful, stop. fetchLedger only returns
527  // false if the server is shutting down, or if the ledger was
528  // found in the database (which means another process already
529  // wrote the ledger that this process was trying to extract;
530  // this is a form of a write conflict). Otherwise,
531  // fetchLedgerDataAndDiff will keep trying to fetch the
532  // specified ledger until successful
533  if (!fetchResponse)
534  {
535  break;
536  }
537  auto end = std::chrono::system_clock::now();
538 
539  auto time = ((end - start).count()) / 1000000000.0;
540  auto tps =
541  fetchResponse->transactions_list().transactions_size() / time;
542 
543  JLOG(journal_.debug()) << "Extract phase time = " << time
544  << " . Extract phase tps = " << tps;
545 
546  transformQueue.push(std::move(fetchResponse));
547  ++currentSequence;
548  }
549  // empty optional tells the transformer to shut down
550  transformQueue.push({});
551  }};
552 
556  loadQueue{maxQueueSize};
557  std::thread transformer{[this,
558  &parent,
559  &writeConflict,
560  &loadQueue,
561  &transformQueue]() {
562  beast::setCurrentThreadName("rippled: ReportingETL transform");
563 
564  assert(parent);
565  parent = std::make_shared<Ledger>(*parent, NetClock::time_point{});
566  while (!writeConflict)
567  {
569  transformQueue.pop()};
570  // if fetchResponse is an empty optional, the extracter thread has
571  // stopped and the transformer should stop as well
572  if (!fetchResponse)
573  {
574  break;
575  }
576  if (isStopping())
577  continue;
578 
580  auto [next, accountTxData] =
581  buildNextLedger(parent, *fetchResponse);
582  auto end = std::chrono::system_clock::now();
583 
584  auto duration = ((end - start).count()) / 1000000000.0;
585  JLOG(journal_.debug()) << "transform time = " << duration;
586  // The below line needs to execute before pushing to the queue, in
587  // order to prevent this thread and the loader thread from accessing
588  // the same SHAMap concurrently
589  parent = std::make_shared<Ledger>(*next, NetClock::time_point{});
590  loadQueue.push(
591  std::make_pair(std::move(next), std::move(accountTxData)));
592  }
593  // empty optional tells the loader to shutdown
594  loadQueue.push({});
595  }};
596 
597  std::thread loader{[this,
598  &lastPublishedSequence,
599  &loadQueue,
600  &writeConflict]() {
601  beast::setCurrentThreadName("rippled: ReportingETL load");
602  size_t totalTransactions = 0;
603  double totalTime = 0;
604  while (!writeConflict)
605  {
609  result{loadQueue.pop()};
610  // if result is an empty optional, the transformer thread has
611  // stopped and the loader should stop as well
612  if (!result)
613  break;
614  if (isStopping())
615  continue;
616 
617  auto& ledger = result->first;
618  auto& accountTxData = result->second;
619 
621  // write to the key-value store
622  flushLedger(ledger);
623 
624  auto mid = std::chrono::system_clock::now();
625  // write to RDBMS
626  // if there is a write conflict, some other process has already
627  // written this ledger and has taken over as the ETL writer
628 #ifdef RIPPLED_REPORTING
629  if (!dynamic_cast<PostgresDatabase*>(&app_.getRelationalDatabase())
631  ledger->info(), accountTxData))
632  writeConflict = true;
633 #endif
634  auto end = std::chrono::system_clock::now();
635 
636  if (!writeConflict)
637  {
638  publishLedger(ledger);
639  lastPublishedSequence = ledger->info().seq;
640  }
641  // print some performance numbers
642  auto kvTime = ((mid - start).count()) / 1000000000.0;
643  auto relationalTime = ((end - mid).count()) / 1000000000.0;
644 
645  size_t numTxns = accountTxData.size();
646  totalTime += kvTime;
647  totalTransactions += numTxns;
648  JLOG(journal_.info())
649  << "Load phase of etl : "
650  << "Successfully published ledger! Ledger info: "
651  << detail::toString(ledger->info())
652  << ". txn count = " << numTxns
653  << ". key-value write time = " << kvTime
654  << ". relational write time = " << relationalTime
655  << ". key-value tps = " << numTxns / kvTime
656  << ". relational tps = " << numTxns / relationalTime
657  << ". total key-value tps = " << totalTransactions / totalTime;
658  }
659  }};
660 
661  // wait for all of the threads to stop
662  loader.join();
663  extracter.join();
664  transformer.join();
665  writing_ = false;
666 
667  JLOG(journal_.debug()) << __func__ << " : "
668  << "Stopping etl pipeline";
669 
670  return lastPublishedSequence;
671 }
672 
673 // main loop. The software begins monitoring the ledgers that are validated
674 // by the nework. The member networkValidatedLedgers_ keeps track of the
675 // sequences of ledgers validated by the network. Whenever a ledger is validated
676 // by the network, the software looks for that ledger in the database. Once the
677 // ledger is found in the database, the software publishes that ledger to the
678 // ledgers stream. If a network validated ledger is not found in the database
679 // after a certain amount of time, then the software attempts to take over
680 // responsibility of the ETL process, where it writes new ledgers to the
681 // database. The software will relinquish control of the ETL process if it
682 // detects that another process has taken over ETL.
683 void
685 {
686  auto ledger = std::const_pointer_cast<Ledger>(
688  if (!ledger)
689  {
690  JLOG(journal_.info()) << __func__ << " : "
691  << "Database is empty. Will download a ledger "
692  "from the network.";
693  if (startSequence_)
694  {
695  JLOG(journal_.info())
696  << __func__ << " : "
697  << "ledger sequence specified in config. "
698  << "Will begin ETL process starting with ledger "
699  << *startSequence_;
701  }
702  else
703  {
704  JLOG(journal_.info())
705  << __func__ << " : "
706  << "Waiting for next ledger to be validated by network...";
707  std::optional<uint32_t> mostRecentValidated =
709  if (mostRecentValidated)
710  {
711  JLOG(journal_.info()) << __func__ << " : "
712  << "Ledger " << *mostRecentValidated
713  << " has been validated. "
714  << "Downloading...";
715  ledger = loadInitialLedger(*mostRecentValidated);
716  }
717  else
718  {
719  JLOG(journal_.info()) << __func__ << " : "
720  << "The wait for the next validated "
721  << "ledger has been aborted. "
722  << "Exiting monitor loop";
723  return;
724  }
725  }
726  }
727  else
728  {
729  if (startSequence_)
730  {
731  Throw<std::runtime_error>(
732  "start sequence specified but db is already populated");
733  }
734  JLOG(journal_.info())
735  << __func__ << " : "
736  << "Database already populated. Picking up from the tip of history";
737  }
738  if (!ledger)
739  {
740  JLOG(journal_.error())
741  << __func__ << " : "
742  << "Failed to load initial ledger. Exiting monitor loop";
743  return;
744  }
745  else
746  {
747  publishLedger(ledger);
748  }
749  uint32_t nextSequence = ledger->info().seq + 1;
750 
751  JLOG(journal_.debug()) << __func__ << " : "
752  << "Database is populated. "
753  << "Starting monitor loop. sequence = "
754  << nextSequence;
755  while (!stopping_ &&
757  {
758  JLOG(journal_.info()) << __func__ << " : "
759  << "Ledger with sequence = " << nextSequence
760  << " has been validated by the network. "
761  << "Attempting to find in database and publish";
762  // Attempt to take over responsibility of ETL writer after 10 failed
763  // attempts to publish the ledger. publishLedger() fails if the
764  // ledger that has been validated by the network is not found in the
765  // database after the specified number of attempts. publishLedger()
766  // waits one second between each attempt to read the ledger from the
767  // database
768  //
769  // In strict read-only mode, when the software fails to find a
770  // ledger in the database that has been validated by the network,
771  // the software will only try to publish subsequent ledgers once,
772  // until one of those ledgers is found in the database. Once the
773  // software successfully publishes a ledger, the software will fall
774  // back to the normal behavior of trying several times to publish
775  // the ledger that has been validated by the network. In this
776  // manner, a reporting processing running in read-only mode does not
777  // need to restart if the database is wiped.
778  constexpr size_t timeoutSeconds = 10;
779  bool success = publishLedger(nextSequence, timeoutSeconds);
780  if (!success)
781  {
782  JLOG(journal_.warn())
783  << __func__ << " : "
784  << "Failed to publish ledger with sequence = " << nextSequence
785  << " . Beginning ETL";
786  // doContinousETLPipelined returns the most recent sequence
787  // published empty optional if no sequence was published
788  std::optional<uint32_t> lastPublished =
789  runETLPipeline(nextSequence);
790  JLOG(journal_.info()) << __func__ << " : "
791  << "Aborting ETL. Falling back to publishing";
792  // if no ledger was published, don't increment nextSequence
793  if (lastPublished)
794  nextSequence = *lastPublished + 1;
795  }
796  else
797  {
798  ++nextSequence;
799  }
800  }
801 }
802 
803 void
805 {
806  JLOG(journal_.debug()) << "Starting reporting in strict read only mode";
807  std::optional<uint32_t> mostRecent =
809  if (!mostRecent)
810  return;
811  uint32_t sequence = *mostRecent;
812  bool success = true;
813  while (!stopping_ &&
815  {
816  success = publishLedger(sequence, success ? 30 : 1);
817  ++sequence;
818  }
819 }
820 
821 void
823 {
824  worker_ = std::thread([this]() {
825  beast::setCurrentThreadName("rippled: ReportingETL worker");
826  if (readOnly_)
827  monitorReadOnly();
828  else
829  monitor();
830  });
831 }
832 
834  : app_(app)
835  , journal_(app.journal("ReportingETL"))
836  , publishStrand_(app_.getIOService())
837  , loadBalancer_(*this)
838 {
839  // if present, get endpoint from config
840  if (app_.config().exists("reporting"))
841  {
842 #ifndef RIPPLED_REPORTING
843  Throw<std::runtime_error>(
844  "Config file specifies reporting, but software was not built with "
845  "-Dreporting=1. To use reporting, configure CMake with "
846  "-Dreporting=1");
847 #endif
848  if (!app_.config().useTxTables())
849  Throw<std::runtime_error>(
850  "Reporting requires tx tables. Set use_tx_tables=1 in config "
851  "file, under [ledger_tx_tables] section");
852  Section section = app_.config().section("reporting");
853 
854  JLOG(journal_.debug()) << "Parsing config info";
855 
856  auto& vals = section.values();
857  for (auto& v : vals)
858  {
859  JLOG(journal_.debug()) << "val is " << v;
860  Section source = app_.config().section(v);
861 
862  auto optIp = source.get("source_ip");
863  if (!optIp)
864  continue;
865 
866  auto optWsPort = source.get("source_ws_port");
867  if (!optWsPort)
868  continue;
869 
870  auto optGrpcPort = source.get("source_grpc_port");
871  if (!optGrpcPort)
872  {
873  // add source without grpc port
874  // used in read-only mode to detect when new ledgers have
875  // been validated. Used for publishing
876  if (app_.config().reportingReadOnly())
877  loadBalancer_.add(*optIp, *optWsPort);
878  continue;
879  }
880 
881  loadBalancer_.add(*optIp, *optWsPort, *optGrpcPort);
882  }
883 
884  // this is true iff --reportingReadOnly was passed via command line
886 
887  // if --reportingReadOnly was not passed via command line, check config
888  // file. Command line takes precedence
889  if (!readOnly_)
890  {
891  auto const optRO = section.get("read_only");
892  if (optRO)
893  {
894  readOnly_ = (*optRO == "true" || *optRO == "1");
896  }
897  }
898 
899  // lambda throws a useful message if string to integer conversion fails
900  auto asciiToIntThrows =
901  [](auto& dest, std::string const& src, char const* onError) {
902  char const* const srcEnd = src.data() + src.size();
903  auto [ptr, err] = std::from_chars(src.data(), srcEnd, dest);
904 
905  if (err == std::errc())
906  // skip whitespace at end of string
907  while (ptr != srcEnd &&
908  std::isspace(static_cast<unsigned char>(*ptr)))
909  ++ptr;
910 
911  // throw if
912  // o conversion error or
913  // o entire string is not consumed
914  if (err != std::errc() || ptr != srcEnd)
915  Throw<std::runtime_error>(onError + src);
916  };
917 
918  // handle command line arguments
919  if (app_.config().START_UP == Config::StartUpType::FRESH && !readOnly_)
920  {
921  asciiToIntThrows(
924  "Expected integral START_LEDGER command line argument. Got: ");
925  }
926  // if not passed via command line, check config for start sequence
927  if (!startSequence_)
928  {
929  auto const optStartSeq = section.get("start_sequence");
930  if (optStartSeq)
931  {
932  // set a value so we can dereference
933  startSequence_ = 0;
934  asciiToIntThrows(
936  *optStartSeq,
937  "Expected integral start_sequence config entry. Got: ");
938  }
939  }
940 
941  auto const optFlushInterval = section.get("flush_interval");
942  if (optFlushInterval)
943  asciiToIntThrows(
945  *optFlushInterval,
946  "Expected integral flush_interval config entry. Got: ");
947 
948  auto const optNumMarkers = section.get("num_markers");
949  if (optNumMarkers)
950  asciiToIntThrows(
951  numMarkers_,
952  *optNumMarkers,
953  "Expected integral num_markers config entry. Got: ");
954  }
955 }
956 
957 } // namespace ripple
ripple::NetworkOPs::pubLedger
virtual void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted)=0
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
ripple::ReportingETL::flushInterval_
size_t flushInterval_
Used to determine when to write to the database during the initial ledger download.
Definition: ReportingETL.h:115
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::Application
Definition: Application.h:115
ripple::Application::getNodeFamily
virtual Family & getNodeFamily()=0
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
std::this_thread::sleep_for
T sleep_for(T... args)
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:241
ripple::ReportingETL::fetchLedgerData
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerData(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:350
ripple::ReportingETL::loadInitialLedger
std::shared_ptr< Ledger > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full, via GetLedgerData, and write the data to the datab...
Definition: ReportingETL.cpp:107
ripple::ReportingETL::startSequence_
std::optional< uint32_t > startSequence_
Ledger sequence to start ETL from.
Definition: ReportingETL.h:139
std::string
STL class.
std::shared_ptr< Ledger >
ripple::LedgerInfo::parentHash
uint256 parentHash
Definition: ReadView.h:104
ripple::ThreadSafeQueue
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:115
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
std::pair
ripple::ReportingETL::setLastPublish
void setLastPublish()
Definition: ReportingETL.h:155
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:101
ripple::ReportingETL::insertTransactions
std::vector< AccountTransactionsData > insertTransactions(std::shared_ptr< Ledger > &ledger, org::xrpl::rpc::v1::GetLedgerResponse &data)
Insert all of the extracted transactions into the ledger.
Definition: ReportingETL.cpp:76
ripple::ThreadSafeQueue::push
void push(T const &elt)
Definition: ETLHelpers.h:136
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:162
charconv
ripple::ReportingETL::flushLedger
void flushLedger(std::shared_ptr< Ledger > &ledger)
Write all new data to the key-value store.
Definition: ReportingETL.cpp:182
ripple::hotACCOUNT_NODE
@ hotACCOUNT_NODE
Definition: NodeObject.h:35
std::vector
STL class.
ripple::ReportingETL::loadBalancer_
ETLLoadBalancer loadBalancer_
Mechanism for communicating with ETL sources.
Definition: ReportingETL.h:96
std::chrono::seconds
ripple::ETLLoadBalancer::fetchLedger
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
Definition: ETLSource.cpp:699
ripple::NetworkValidatedLedgers::waitUntilValidatedByNetwork
bool waitUntilValidatedByNetwork(uint32_t sequence)
Waits for the sequence to be validated by the network.
Definition: ETLHelpers.h:89
ripple::NodeStore::Database::sync
virtual void sync()=0
std::stringstream
STL class.
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
ripple::NodeStore::Database::store
virtual void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t ledgerSeq)=0
Store the object.
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:93
ripple::hotTRANSACTION_NODE
@ hotTRANSACTION_NODE
Definition: NodeObject.h:36
ripple::base_uint< 256 >::fromVoidChecked
static std::optional< base_uint > fromVoidChecked(T const &from)
Definition: base_uint.h:318
iostream
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:102
ripple::NetworkValidatedLedgers::getMostRecent
std::optional< uint32_t > getMostRecent() const
Get most recently validated sequence.
Definition: ETLHelpers.h:67
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::Section::values
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:77
ripple::ReportingETL::networkValidatedLedgers_
NetworkValidatedLedgers networkValidatedLedgers_
Mechanism for detecting when the network has validated a new ledger.
Definition: ReportingETL.h:100
ripple::TxMeta
Definition: TxMeta.h:32
ripple::base_uint< 256 >
ripple::ReportingETL::publishLedger
bool publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts=10)
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition: ReportingETL.cpp:283
ripple::ReportingETL::journal_
beast::Journal journal_
Definition: ReportingETL.h:75
ripple::Config::reporting
bool reporting() const
Definition: Config.h:317
std::thread
STL class.
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::ReportingETL::writing_
std::atomic_bool writing_
Whether the process is writing to the database. Used by server_info.
Definition: ReportingETL.h:134
ripple::ReportingETL::readOnly_
bool readOnly_
Whether the process is in strict read-only mode.
Definition: ReportingETL.h:131
ripple::ReportingETL::numMarkers_
size_t numMarkers_
This variable controls the number of GetLedgerData calls that will be executed in parallel during the...
Definition: ReportingETL.h:126
ripple::ETLLoadBalancer::loadInitialLedger
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
Definition: ETLSource.cpp:680
ripple::Application::config
virtual Config & config()=0
ripple::Application::getRelationalDatabase
virtual RelationalDatabase & getRelationalDatabase()=0
ripple::Config::useTxTables
bool useTxTables() const
Definition: Config.h:323
ripple::ReportingETL::monitorReadOnly
void monitorReadOnly()
Monitor the database for newly written ledgers.
Definition: ReportingETL.cpp:804
ripple::detail::toString
std::string toString(LedgerInfo const &info)
Convenience function for printing out basic ledger info.
Definition: ReportingETL.cpp:42
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::chrono::time_point
ripple::ReportingETL::publishStrand_
boost::asio::io_context::strand publishStrand_
Strand to ensure that ledgers are published in order.
Definition: ReportingETL.h:91
ripple::STTx
Definition: STTx.h:45
ripple::LedgerMaster::getLedgerBySeq
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
Definition: LedgerMaster.cpp:1814
std::errc
std::runtime_error
STL class.
ripple::SerialIter
Definition: Serializer.h:310
ripple::Config::START_UP
StartUpType START_UP
Definition: Config.h:134
std::atomic_bool
std::from_chars
T from_chars(T... args)
ripple::ReportingETL::fetchLedgerDataAndDiff
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerDataAndDiff(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:364
ripple::Serializer
Definition: Serializer.h:39
ripple::Config::setReportingReadOnly
void setReportingReadOnly(bool b)
Definition: Config.h:335
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ReportingETL::stopping_
std::atomic_bool stopping_
Whether the software is stopping.
Definition: ReportingETL.h:103
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::deserializeHeader
LedgerInfo deserializeHeader(Slice data, bool hasHash)
Deserialize a ledger header from a byte array.
Definition: InboundLedger.cpp:272
cstdlib
ripple::PostgresDatabase::writeLedgerAndTransactions
virtual bool writeLedgerAndTransactions(LedgerInfo const &info, std::vector< AccountTransactionsData > const &accountTxData)=0
writeLedgerAndTransactions Writes new ledger and transaction data into the database.
ripple::PostgresDatabase
Definition: PostgresDatabase.h:27
ripple::LedgerMaster::getValidatedLedger
std::shared_ptr< Ledger const > getValidatedLedger()
Definition: LedgerMaster.cpp:1660
ripple::Section::get
std::optional< T > get(std::string const &name) const
Definition: BasicConfig.h:138
cctype
ripple::ReportingETL::app_
Application & app_
Definition: ReportingETL.h:73
std::optional
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::ReportingETL::start
void start()
start all of the necessary components and begin ETL
Definition: ReportingETL.h:326
ripple::ReportingETL::worker_
std::thread worker_
Definition: ReportingETL.h:77
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
std::make_pair
T make_pair(T... args)
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:85
ripple::strHex
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:30
ripple::Config::reportingReadOnly
bool reportingReadOnly() const
Definition: Config.h:329
ripple::ReportingETL::buildNextLedger
std::pair< std::shared_ptr< Ledger >, std::vector< AccountTransactionsData > > buildNextLedger(std::shared_ptr< Ledger > &parent, org::xrpl::rpc::v1::GetLedgerResponse &rawData)
Build the next ledger using the previous ledger and the extracted data.
Definition: ReportingETL.cpp:378
ripple::ReportingETL::monitor
void monitor()
Monitor the network for newly validated ledgers.
Definition: ReportingETL.cpp:684
ripple::ReportingETL::ReportingETL
ReportingETL(Application &app)
Definition: ReportingETL.cpp:833
ripple::Config::START_LEDGER
std::string START_LEDGER
Definition: Config.h:138
ripple::ReportingETL::isStopping
bool isStopping() const
Definition: ReportingETL.h:282
ripple::ReportingETL::consumeLedgerData
void consumeLedgerData(std::shared_ptr< Ledger > &ledger, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Consume data from a queue and insert that data into the ledger This function will continue to pull fr...
Definition: ReportingETL.cpp:54
ripple::ReportingETL::doWork
void doWork()
Definition: ReportingETL.cpp:822
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:103
ripple::ReportingETL::runETLPipeline
std::optional< uint32_t > runETLPipeline(uint32_t startSequence)
Run ETL.
Definition: ReportingETL.cpp:464
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:121
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:127
variant
string
ripple::ETLLoadBalancer::add
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.
Definition: ETLSource.cpp:657
std::chrono::system_clock::now
T now(T... args)