From cb896142f7dfe4134003e504fa8e6cc700531e01 Mon Sep 17 00:00:00 2001 From: intelliot Date: Thu, 7 Sep 2023 18:48:39 +0000 Subject: [PATCH] deploy: 36cb5f90e233f975eb3f80d819b2fbadab0a9387 --- BasicApp_8cpp_source.html | 6 +- CurrentThreadName_8cpp_source.html | 213 -- DatabaseNodeImp_8cpp_source.html | 2 +- DatabaseNodeImp_8h_source.html | 4 +- DatabaseRotatingImp_8cpp_source.html | 2 +- DatabaseRotatingImp_8h_source.html | 2 +- DatabaseShardImp_8cpp_source.html | 8 +- DatabaseShard__test_8cpp_source.html | 2 +- Database_8cpp_source.html | 709 +++-- Database_8h_source.html | 16 +- ETLSource_8cpp_source.html | 1975 +++++++------- ETLSource_8h_source.html | 46 +- GRPCServer_8cpp_source.html | 57 +- GRPCServer_8h_source.html | 4 +- GetCounts_8cpp_source.html | 2 +- InboundLedger_8cpp_source.html | 2 +- Job_8cpp_source.html | 6 +- LedgerCleaner_8cpp_source.html | 6 +- LedgerMaster_8cpp_source.html | 2 +- LoadManager_8cpp_source.html | 8 +- Main_8cpp_source.html | 929 ++++--- NetworkOPs_8cpp_source.html | 2 +- P2pProxy_8cpp_source.html | 4 +- PerfLogImp_8cpp_source.html | 8 +- ReportingETL_8cpp_source.html | 1911 +++++++------ ReportingETL_8h_source.html | 30 +- ResourceManager_8cpp_source.html | 8 +- RocksDBFactory_8cpp_source.html | 10 +- SHAMapStoreImp_8cpp_source.html | 8 +- SHAMapStoreImp_8h_source.html | 2 +- SHAMap_8cpp_source.html | 4 +- SNTPClock_8cpp_source.html | 8 +- TestBase_8h_source.html | 2 +- ....html => ThreadName__test_8cpp_source.html | 154 +- ThreadUtilities_8cpp_source.html | 234 ++ ...rce.html => ThreadUtilities_8h_source.html | 96 +- Tx_8cpp_source.html | 2 +- Workers_8cpp_source.html | 8 +- annotated.html | 282 +- classes.html | 419 ++- classripple_1_1AsyncCallData.html | 26 +- classripple_1_1ETLLoadBalancer.html | 20 +- classripple_1_1ETLSource.html | 30 +- classripple_1_1GRPCServer.html | 4 +- classripple_1_1NodeStore_1_1Database.html | 18 +- ...ipple_1_1NodeStore_1_1DatabaseNodeImp.html | 14 +- ...pple_1_1NodeStore_1_1DatabaseRotating.html | 16 +- ...e_1_1NodeStore_1_1DatabaseRotatingImp.html | 16 +- ...sripple_1_1NodeStore_1_1DatabaseShard.html | 16 +- ...pple_1_1NodeStore_1_1DatabaseShardImp.html | 14 +- classripple_1_1ReportingETL.html | 28 +- ..._1CurrentThreadName__test__coll__graph.map | 4 - ..._1CurrentThreadName__test__coll__graph.md5 | 1 - ..._1CurrentThreadName__test__coll__graph.png | Bin 3812 -> 0 bytes ...urrentThreadName__test__inherit__graph.map | 4 - ...urrentThreadName__test__inherit__graph.md5 | 1 - ...urrentThreadName__test__inherit__graph.png | Bin 3812 -> 0 bytes ...e_1_1test_1_1ThreadName__test-members.html | 10 +- ...assripple_1_1test_1_1ThreadName__test.html | 54 +- ...1test_1_1ThreadName__test__coll__graph.map | 4 + ...1test_1_1ThreadName__test__coll__graph.md5 | 1 + ...1test_1_1ThreadName__test__coll__graph.png | Bin 0 -> 3310 bytes ...st_1_1ThreadName__test__inherit__graph.map | 4 + ...st_1_1ThreadName__test__inherit__graph.md5 | 1 + ...st_1_1ThreadName__test__inherit__graph.png | Bin 0 -> 3310 bytes dir_318145072d948161b6b9385de457c361.html | 2 - dir_354330c8aa73f5a2ebe2c490e5cf38ef.html | 2 + dir_5f536ab1985a8dcd28bcfd618bc58e92.html | 2 + dir_6f921693cf6aba7edd915fbfc33f741e.html | 2 + dir_e5649611bbe5c9ce048f27b7e2080ec6.html | 4 - functions_e.html | 2 +- functions_func_e.html | 2 +- functions_func_r.html | 4 +- functions_func_v.html | 8 +- functions_r.html | 2 +- functions_v.html | 16 +- functions_w.html | 2 +- hierarchy.html | 300 +-- inherit_graph_522.md5 | 2 +- inherit_graph_6.map | 240 +- inherit_graph_6.md5 | 2 +- inherit_graph_6.png | Bin 1941170 -> 1942787 bytes inherit_graph_915.md5 | 2 +- inherits.html | 240 +- namespacebeast.html | 52 - namespacebeast_1_1detail.html | 18 - namespacemembers_b.html | 2 +- namespacemembers_func_b.html | 6 +- namespacemembers_func_g.html | 15 +- namespacemembers_func_s.html | 6 +- namespacemembers_g.html | 17 +- namespacemembers_o.html | 2 +- namespacemembers_s.html | 12 +- namespacemembers_t.html | 23 +- namespacemembers_vars_t.html | 3 - namespaceripple.html | 47 + namespaceripple_1_1detail.html | 2 +- namespaceripple_1_1test.html | 18 +- namespaceripple_1_1this__thread.html | 126 + namespaces.html | 11 +- search/all_10.js | 1240 ++++----- search/all_11.js | 98 +- search/all_12.js | 1343 +++++----- search/all_13.js | 102 +- search/all_14.js | 6 +- search/all_15.js | 14 +- search/all_16.js | 8 +- search/all_17.js | 6 +- search/all_1b.js | 2 +- search/all_2.js | 2 +- search/all_3.js | 67 +- search/all_4.js | 1164 ++++---- search/all_5.js | 602 ++--- search/all_6.js | 994 +++---- search/all_7.js | 1138 ++++---- search/all_8.js | 392 +-- search/all_9.js | 1624 +++++------ search/all_a.js | 228 +- search/all_b.js | 112 +- search/all_c.js | 962 +++---- search/all_d.js | 1974 +++++++------- search/all_e.js | 662 ++--- search/all_f.js | 694 ++--- search/classes_10.js | 268 +- search/classes_11.js | 24 +- search/classes_12.js | 258 +- search/classes_13.js | 674 ++--- search/classes_14.js | 127 +- search/classes_15.js | 12 +- search/classes_16.js | 4 +- search/classes_17.js | 4 +- search/classes_3.js | 5 +- search/classes_4.js | 202 +- search/classes_5.js | 176 +- search/classes_6.js | 156 +- search/classes_7.js | 56 +- search/classes_8.js | 80 +- search/classes_9.js | 410 +-- search/classes_a.js | 46 +- search/classes_b.js | 32 +- search/classes_c.js | 218 +- search/classes_d.js | 274 +- search/classes_e.js | 172 +- search/classes_f.js | 184 +- search/enumvalues_12.js | 2 +- search/enumvalues_5.js | 2 +- search/enumvalues_e.js | 2 +- search/files_0.js | 8 +- search/files_1.js | 4 +- search/files_10.js | 8 +- search/files_2.js | 68 +- search/files_3.js | 2 +- search/files_4.js | 4 +- search/files_5.js | 10 +- search/files_6.js | 14 +- search/files_7.js | 6 +- search/files_8.js | 8 +- search/files_9.js | 4 +- search/files_a.js | 4 +- search/files_b.js | 2 +- search/files_c.js | 8 +- search/files_d.js | 26 +- search/files_e.js | 10 +- search/files_f.js | 6 +- search/functions_0.js | 2 +- search/functions_1.js | 808 +++--- search/functions_10.js | 744 +++--- search/functions_11.js | 50 +- search/functions_12.js | 612 ++--- search/functions_13.js | 1390 +++++----- search/functions_14.js | 2376 ++++++++--------- search/functions_15.js | 242 +- search/functions_16.js | 176 +- search/functions_17.js | 304 +-- search/functions_18.js | 82 +- search/functions_19.js | 2 +- search/functions_1a.js | 10 +- search/functions_1b.js | 720 ++--- search/functions_2.js | 306 +-- search/functions_3.js | 944 +++---- search/functions_4.js | 756 +++--- search/functions_5.js | 330 +-- search/functions_6.js | 544 ++-- search/functions_7.js | 1228 ++++----- search/functions_8.js | 186 +- search/functions_9.js | 872 +++--- search/functions_a.js | 50 +- search/functions_b.js | 34 +- search/functions_c.js | 368 +-- search/functions_d.js | 630 ++--- search/functions_e.js | 324 +-- search/functions_f.js | 436 +-- search/namespaces_3.js | 11 +- search/namespaces_4.js | 24 +- search/variables_0.js | 374 +-- search/variables_1.js | 162 +- search/variables_10.js | 42 +- search/variables_11.js | 330 +-- search/variables_12.js | 968 +++---- search/variables_13.js | 173 +- search/variables_16.js | 2 +- search/variables_2.js | 344 +-- search/variables_3.js | 236 +- search/variables_4.js | 122 +- search/variables_5.js | 372 +-- search/variables_6.js | 58 +- search/variables_7.js | 106 +- search/variables_8.js | 398 +-- search/variables_9.js | 50 +- search/variables_a.js | 26 +- search/variables_b.js | 296 +- search/variables_c.js | 1132 ++++---- search/variables_d.js | 202 +- search/variables_e.js | 100 +- search/variables_f.js | 312 +-- short__read__test_8cpp_source.html | 8 +- 216 files changed, 23089 insertions(+), 22972 deletions(-) delete mode 100644 CurrentThreadName_8cpp_source.html rename beast__CurrentThreadName__test_8cpp_source.html => ThreadName__test_8cpp_source.html (59%) create mode 100644 ThreadUtilities_8cpp_source.html rename CurrentThreadName_8h_source.html => ThreadUtilities_8h_source.html (54%) delete mode 100644 classripple_1_1test_1_1CurrentThreadName__test__coll__graph.map delete mode 100644 classripple_1_1test_1_1CurrentThreadName__test__coll__graph.md5 delete mode 100644 classripple_1_1test_1_1CurrentThreadName__test__coll__graph.png delete mode 100644 classripple_1_1test_1_1CurrentThreadName__test__inherit__graph.map delete mode 100644 classripple_1_1test_1_1CurrentThreadName__test__inherit__graph.md5 delete mode 100644 classripple_1_1test_1_1CurrentThreadName__test__inherit__graph.png rename classripple_1_1test_1_1CurrentThreadName__test-members.html => classripple_1_1test_1_1ThreadName__test-members.html (76%) rename classripple_1_1test_1_1CurrentThreadName__test.html => classripple_1_1test_1_1ThreadName__test.html (63%) create mode 100644 classripple_1_1test_1_1ThreadName__test__coll__graph.map create mode 100644 classripple_1_1test_1_1ThreadName__test__coll__graph.md5 create mode 100644 classripple_1_1test_1_1ThreadName__test__coll__graph.png create mode 100644 classripple_1_1test_1_1ThreadName__test__inherit__graph.map create mode 100644 classripple_1_1test_1_1ThreadName__test__inherit__graph.md5 create mode 100644 classripple_1_1test_1_1ThreadName__test__inherit__graph.png create mode 100644 namespaceripple_1_1this__thread.html diff --git a/BasicApp_8cpp_source.html b/BasicApp_8cpp_source.html index ec315a5fae..b4868ba781 100644 --- a/BasicApp_8cpp_source.html +++ b/BasicApp_8cpp_source.html @@ -89,7 +89,7 @@ $(function() {
18 //==============================================================================
19 
20 #include <ripple/app/main/BasicApp.h>
-
21 #include <ripple/beast/core/CurrentThreadName.h>
+
21 #include <ripple/basics/ThreadUtilities.h>
22 
23 BasicApp::BasicApp(std::size_t numberOfThreads)
24 {
@@ -99,7 +99,7 @@ $(function() {
28  while (numberOfThreads--)
29  {
30  threads_.emplace_back([this, numberOfThreads]() {
-
31  beast::setCurrentThreadName(
+
31  ripple::this_thread::set_name(
32  "io svc #" + std::to_string(numberOfThreads));
33  this->io_service_.run();
34  });
@@ -118,12 +118,12 @@ $(function() {
BasicApp(std::size_t numberOfThreads)
Definition: BasicApp.cpp:23
~BasicApp()
Definition: BasicApp.cpp:38
T reserve(T... args)
+
void set_name(std::string s)
std::vector< std::thread > threads_
Definition: BasicApp.h:33
T emplace(T... args)
T reset(T... args)
boost::asio::io_service io_service_
Definition: BasicApp.h:34
T to_string(T... args)
-
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
T emplace_back(T... args)
diff --git a/CurrentThreadName_8cpp_source.html b/CurrentThreadName_8cpp_source.html deleted file mode 100644 index 2a49c9c46b..0000000000 --- a/CurrentThreadName_8cpp_source.html +++ /dev/null @@ -1,213 +0,0 @@ - - - - - - - -rippled: CurrentThreadName.cpp Source File - - - - - - - - - -
-
- - - - - - -
-
rippled -
-
-
- - - - - - - - -
-
- - -
- -
- - -
-
-
-
CurrentThreadName.cpp
-
-
-
1 //------------------------------------------------------------------------------
-
2 /*
-
3  This file is part of Beast: https://github.com/vinniefalco/Beast
-
4  Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
-
5 
-
6  Portions of this file are from JUCE.
-
7  Copyright (c) 2013 - Raw Material Software Ltd.
-
8  Please visit http://www.juce.com
-
9 
-
10  Permission to use, copy, modify, and/or distribute this software for any
-
11  purpose with or without fee is hereby granted, provided that the above
-
12  copyright notice and this permission notice appear in all copies.
-
13 
-
14  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-
15  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-
16  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-
17  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-
18  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-
19  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-
20  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
21 */
-
22 //==============================================================================
-
23 
-
24 #include <ripple/beast/core/CurrentThreadName.h>
-
25 #include <boost/predef.h>
-
26 
-
27 //------------------------------------------------------------------------------
-
28 
-
29 #if BOOST_OS_WINDOWS
-
30 #include <process.h>
-
31 #include <windows.h>
-
32 
-
33 namespace beast::detail {
-
34 
-
35 inline void
-
36 setCurrentThreadNameImpl(std::string_view name)
-
37 {
-
38 #if DEBUG && BOOST_COMP_MSVC
-
39  // This technique is documented by Microsoft and works for all versions
-
40  // of Windows and Visual Studio provided that the process is being run
-
41  // under the Visual Studio debugger. For more details, see:
-
42  // https://docs.microsoft.com/en-us/visualstudio/debugger/how-to-set-a-thread-name-in-native-code
-
43 
-
44 #pragma pack(push, 8)
-
45  struct THREADNAME_INFO
-
46  {
-
47  DWORD dwType;
-
48  LPCSTR szName;
-
49  DWORD dwThreadID;
-
50  DWORD dwFlags;
-
51  };
-
52 #pragma pack(pop)
-
53 
-
54  THREADNAME_INFO ni;
-
55 
-
56  ni.dwType = 0x1000;
-
57  ni.szName = name.data();
-
58  ni.dwThreadID = GetCurrentThreadId();
-
59  ni.dwFlags = 0;
-
60 
-
61 #pragma warning(push)
-
62 #pragma warning(disable : 6320 6322)
-
63  __try
-
64  {
-
65  RaiseException(
-
66  0x406d1388, 0, sizeof(ni) / sizeof(ULONG_PTR), (ULONG_PTR*)&ni);
-
67  }
-
68  __except (EXCEPTION_CONTINUE_EXECUTION)
-
69  {
-
70  }
-
71 #pragma warning(pop)
-
72 #endif
-
73 }
-
74 
-
75 } // namespace beast::detail
-
76 #endif // BOOST_OS_WINDOWS
-
77 
-
78 #if BOOST_OS_MACOS
-
79 #include <pthread.h>
-
80 
-
81 namespace beast::detail {
-
82 
-
83 inline void
-
84 setCurrentThreadNameImpl(std::string_view name)
-
85 {
-
86  pthread_setname_np(name.data());
-
87 }
-
88 
-
89 } // namespace beast::detail
-
90 #endif // BOOST_OS_MACOS
-
91 
-
92 #if BOOST_OS_LINUX
-
93 #include <pthread.h>
-
94 
-
95 namespace beast::detail {
-
96 
-
97 inline void
-
98 setCurrentThreadNameImpl(std::string_view name)
-
99 {
-
100  pthread_setname_np(pthread_self(), name.data());
-
101 }
-
102 
-
103 } // namespace beast::detail
-
104 #endif // BOOST_OS_LINUX
-
105 
-
106 namespace beast {
-
107 
-
108 namespace detail {
-
109 thread_local std::string threadName;
-
110 } // namespace detail
-
111 
- - -
114 {
-
115  return detail::threadName;
-
116 }
-
117 
-
118 void
- -
120 {
-
121  detail::threadName = name;
-
122  detail::setCurrentThreadNameImpl(name);
-
123 }
-
124 
-
125 } // namespace beast
-
-
STL class.
-
STL class.
-
thread_local std::string threadName
-
-
std::string getCurrentThreadName()
Returns the name of the caller thread.
-
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
-
T data(T... args)
-
Definition: base_uint.h:641
- - - - diff --git a/DatabaseNodeImp_8cpp_source.html b/DatabaseNodeImp_8cpp_source.html index 6b7d249898..100f0277a9 100644 --- a/DatabaseNodeImp_8cpp_source.html +++ b/DatabaseNodeImp_8cpp_source.html @@ -299,7 +299,7 @@ $(function() {
T size(T... args)
NodeObjectType
The types of node objects.
Definition: NodeObject.h:32
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
-
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:198
+
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:197
Stream warn() const
Definition: Journal.h:327
Contains information about a fetch operation.
diff --git a/DatabaseNodeImp_8h_source.html b/DatabaseNodeImp_8h_source.html index cb0b16af87..de977df0af 100644 --- a/DatabaseNodeImp_8h_source.html +++ b/DatabaseNodeImp_8h_source.html @@ -253,7 +253,7 @@ $(function() {
T value_or(T... args)
-
virtual void stop()
Definition: Database.cpp:165
+
virtual void stop()
Definition: Database.cpp:164
bool exists(std::string const &name) const
Returns true if a key with the given name exists.
Contains information about a fetch operation.
std::optional< Backend::Counters< std::uint64_t > > getCounters() const override
Retrieve backend read and write stats.
@@ -261,7 +261,7 @@ $(function() {
void for_each(std::function< void(std::shared_ptr< NodeObject >)> f) override
Visit every object in the database This is usually called during import.
-
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:213
+
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:212
std::shared_ptr< TaggedCache< uint256, NodeObject > > cache_
DatabaseNodeImp()=delete
A generic endpoint for log messages.
Definition: Journal.h:58
diff --git a/DatabaseRotatingImp_8cpp_source.html b/DatabaseRotatingImp_8cpp_source.html index 3af5fa6ff6..10fe12fbda 100644 --- a/DatabaseRotatingImp_8cpp_source.html +++ b/DatabaseRotatingImp_8cpp_source.html @@ -307,7 +307,7 @@ $(function() {
@ notFound
std::int32_t getWriteLoad() const override
Retrieve the estimated number of pending write operations.
-
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:213
+
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:212
void Rethrow()
Rethrow the exception currently being handled.
Definition: contract.h:48
std::mutex mutex_
bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger) override
Store a ledger from a different database.
diff --git a/DatabaseRotatingImp_8h_source.html b/DatabaseRotatingImp_8h_source.html index e3efb6416c..2f2d4fec1d 100644 --- a/DatabaseRotatingImp_8h_source.html +++ b/DatabaseRotatingImp_8h_source.html @@ -184,7 +184,7 @@ $(function() {
NodeObjectType
The types of node objects.
Definition: NodeObject.h:32
DatabaseRotatingImp()=delete
-
virtual void stop()
Definition: Database.cpp:165
+
virtual void stop()
Definition: Database.cpp:164
Contains information about a fetch operation.
std::string getName() const override
Retrieve the name associated with this backend.
diff --git a/DatabaseShardImp_8cpp_source.html b/DatabaseShardImp_8cpp_source.html index 72072ec804..100a6da425 100644 --- a/DatabaseShardImp_8cpp_source.html +++ b/DatabaseShardImp_8cpp_source.html @@ -2374,7 +2374,7 @@ $(function() {
STL class.
constexpr auto kilobytes(T value) noexcept
Definition: ByteUtilities.h:27
virtual OperatingMode getOperatingMode() const =0
-
virtual void stop()
Definition: Database.cpp:165
+
virtual void stop()
Definition: Database.cpp:164
Contains information about a fetch operation.
std::optional< std::uint32_t > getDatabaseImportSequence() const override
Returns the first ledger sequence of the shard currently being imported from the NodeStore.
@@ -2461,7 +2461,7 @@ $(function() {
std::set< std::uint32_t > preparedIndexes_
bool init() override
Initialize the database.
STL class.
-
bool isStopping() const
Definition: Database.cpp:146
+
bool isStopping() const
Definition: Database.cpp:145
@ rpcINTERNAL
Definition: ErrorCodes.h:130
Definition: Serializer.h:40
std::vector< boost::filesystem::path > historicalPaths_
@@ -2504,14 +2504,14 @@ $(function() {
bool canAdd_
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
T end(T... args)
-
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:252
+
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:251
Scheduler & scheduler_
Definition: Database.h:302
boost::icl::interval_set< T, std::less, ClosedInterval< T > > RangeSet
A set of closed intervals over the domain T.
Definition: RangeSet.h:70
std::uint32_t earliestLedgerSeq() const noexcept
Definition: Database.h:238
void finalizeShard(std::shared_ptr< Shard > &shard, bool writeSQLite, std::optional< uint256 > const &expectedHash)
T max(T... args)
-
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:152
+
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:151
DatabaseShardImp()=delete
T make_reverse_iterator(T... args)
STL class.
diff --git a/DatabaseShard__test_8cpp_source.html b/DatabaseShard__test_8cpp_source.html index 536b4cd554..73c2f210f5 100644 --- a/DatabaseShard__test_8cpp_source.html +++ b/DatabaseShard__test_8cpp_source.html @@ -2130,7 +2130,7 @@ $(function() {
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
const resultType A
T end(T... args)
-
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:252
+
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:251
resultType a() const
boost::icl::interval_set< T, std::less, ClosedInterval< T > > RangeSet
A set of closed intervals over the domain T.
Definition: RangeSet.h:70
void makeLedgerData(test::jtx::Env &env_, std::uint32_t seq)
diff --git a/Database_8cpp_source.html b/Database_8cpp_source.html index 0281d8398f..9f1cd43537 100644 --- a/Database_8cpp_source.html +++ b/Database_8cpp_source.html @@ -89,8 +89,8 @@ $(function() {
18 //==============================================================================
19 
20 #include <ripple/app/ledger/Ledger.h>
-
21 #include <ripple/basics/chrono.h>
-
22 #include <ripple/beast/core/CurrentThreadName.h>
+
21 #include <ripple/basics/ThreadUtilities.h>
+
22 #include <ripple/basics/chrono.h>
23 #include <ripple/json/json_value.h>
24 #include <ripple/nodestore/Database.h>
25 #include <ripple/protocol/HashPrefix.h>
@@ -134,350 +134,349 @@ $(function() {
63  [this](int i) {
64  runningThreads_++;
65 
-
66  beast::setCurrentThreadName(
-
67  "db prefetch #" + std::to_string(i));
-
68 
-
69  decltype(read_) read;
-
70 
-
71  while (true)
-
72  {
-
73  {
-
74  std::unique_lock<std::mutex> lock(readLock_);
-
75 
-
76  if (isStopping())
-
77  break;
-
78 
-
79  if (read_.empty())
-
80  {
-
81  runningThreads_--;
-
82  readCondVar_.wait(lock);
-
83  runningThreads_++;
-
84  }
-
85 
-
86  if (isStopping())
-
87  break;
-
88 
-
89  // extract multiple object at a time to minimize the
-
90  // overhead of acquiring the mutex.
-
91  for (int cnt = 0;
-
92  !read_.empty() && cnt != requestBundle_;
-
93  ++cnt)
-
94  read.insert(read_.extract(read_.begin()));
-
95  }
-
96 
-
97  for (auto it = read.begin(); it != read.end(); ++it)
-
98  {
-
99  assert(!it->second.empty());
-
100 
-
101  auto const& hash = it->first;
-
102  auto const& data = it->second;
-
103  auto const seqn = data[0].first;
-
104 
-
105  auto obj =
-
106  fetchNodeObject(hash, seqn, FetchType::async);
-
107 
-
108  // This could be further optimized: if there are
-
109  // multiple requests for sequence numbers mapping to
-
110  // multiple databases by sorting requests such that all
-
111  // indices mapping to the same database are grouped
-
112  // together and serviced by a single read.
-
113  for (auto const& req : data)
-
114  {
-
115  req.second(
-
116  (seqn == req.first) || isSameDB(req.first, seqn)
-
117  ? obj
-
118  : fetchNodeObject(
-
119  hash, req.first, FetchType::async));
-
120  }
-
121  }
-
122 
-
123  read.clear();
-
124  }
-
125 
-
126  --runningThreads_;
-
127  --readThreads_;
-
128  },
-
129  i);
-
130  t.detach();
-
131  }
-
132 }
-
133 
-
134 Database::~Database()
-
135 {
-
136  // NOTE!
-
137  // Any derived class should call the stop() method in its
-
138  // destructor. Otherwise, occasionally, the derived class may
-
139  // crash during shutdown when its members are accessed by one of
-
140  // these threads after the derived class is destroyed but before
-
141  // this base class is destroyed.
-
142  stop();
-
143 }
-
144 
-
145 bool
-
146 Database::isStopping() const
-
147 {
-
148  return readStopping_.load(std::memory_order_relaxed);
-
149 }
-
150 
-
151 std::uint32_t
-
152 Database::maxLedgers(std::uint32_t shardIndex) const noexcept
-
153 {
-
154  if (shardIndex > earliestShardIndex_)
-
155  return ledgersPerShard_;
-
156 
-
157  if (shardIndex == earliestShardIndex_)
-
158  return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
-
159 
-
160  assert(!"Invalid shard index");
-
161  return 0;
-
162 }
-
163 
-
164 void
-
165 Database::stop()
-
166 {
-
167  {
-
168  std::lock_guard lock(readLock_);
-
169 
-
170  if (!readStopping_.exchange(true, std::memory_order_relaxed))
-
171  {
-
172  JLOG(j_.debug()) << "Clearing read queue because of stop request";
-
173  read_.clear();
-
174  readCondVar_.notify_all();
-
175  }
-
176  }
-
177 
-
178  JLOG(j_.debug()) << "Waiting for stop request to complete...";
-
179 
-
180  using namespace std::chrono;
-
181 
-
182  auto const start = steady_clock::now();
-
183 
-
184  while (readThreads_.load() != 0)
-
185  {
-
186  assert(steady_clock::now() - start < 30s);
-
187  std::this_thread::yield();
-
188  }
-
189 
-
190  JLOG(j_.debug()) << "Stop request completed in "
-
191  << duration_cast<std::chrono::milliseconds>(
-
192  steady_clock::now() - start)
-
193  .count()
-
194  << " millseconds";
-
195 }
-
196 
-
197 void
-
198 Database::asyncFetch(
-
199  uint256 const& hash,
-
200  std::uint32_t ledgerSeq,
-
201  std::function<void(std::shared_ptr<NodeObject> const&)>&& cb)
-
202 {
-
203  std::lock_guard lock(readLock_);
-
204 
-
205  if (!isStopping())
-
206  {
-
207  read_[hash].emplace_back(ledgerSeq, std::move(cb));
-
208  readCondVar_.notify_one();
-
209  }
-
210 }
-
211 
-
212 void
-
213 Database::importInternal(Backend& dstBackend, Database& srcDB)
-
214 {
-
215  Batch batch;
-
216  batch.reserve(batchWritePreallocationSize);
-
217  auto storeBatch = [&, fname = __func__]() {
-
218  try
-
219  {
-
220  dstBackend.storeBatch(batch);
-
221  }
-
222  catch (std::exception const& e)
-
223  {
-
224  JLOG(j_.error()) << "Exception caught in function " << fname
-
225  << ". Error: " << e.what();
-
226  return;
-
227  }
-
228 
-
229  std::uint64_t sz{0};
-
230  for (auto const& nodeObject : batch)
-
231  sz += nodeObject->getData().size();
-
232  storeStats(batch.size(), sz);
-
233  batch.clear();
-
234  };
-
235 
-
236  srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
-
237  assert(nodeObject);
-
238  if (!nodeObject) // This should never happen
-
239  return;
-
240 
-
241  batch.emplace_back(std::move(nodeObject));
-
242  if (batch.size() >= batchWritePreallocationSize)
-
243  storeBatch();
-
244  });
-
245 
-
246  if (!batch.empty())
-
247  storeBatch();
-
248 }
-
249 
-
250 // Perform a fetch and report the time it took
-
251 std::shared_ptr<NodeObject>
-
252 Database::fetchNodeObject(
-
253  uint256 const& hash,
-
254  std::uint32_t ledgerSeq,
-
255  FetchType fetchType,
-
256  bool duplicate)
-
257 {
-
258  FetchReport fetchReport(fetchType);
-
259 
-
260  using namespace std::chrono;
-
261  auto const begin{steady_clock::now()};
-
262 
-
263  auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
-
264  auto dur = steady_clock::now() - begin;
-
265  fetchDurationUs_ += duration_cast<microseconds>(dur).count();
-
266  if (nodeObject)
-
267  {
-
268  ++fetchHitCount_;
-
269  fetchSz_ += nodeObject->getData().size();
-
270  }
-
271  ++fetchTotalCount_;
-
272 
-
273  fetchReport.elapsed = duration_cast<milliseconds>(dur);
-
274  scheduler_.onFetch(fetchReport);
-
275  return nodeObject;
-
276 }
-
277 
-
278 bool
-
279 Database::storeLedger(
-
280  Ledger const& srcLedger,
-
281  std::shared_ptr<Backend> dstBackend)
-
282 {
-
283  auto fail = [&](std::string const& msg) {
-
284  JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
-
285  << ". " << msg;
-
286  return false;
-
287  };
-
288 
-
289  if (srcLedger.info().hash.isZero())
-
290  return fail("Invalid hash");
-
291  if (srcLedger.info().accountHash.isZero())
-
292  return fail("Invalid account hash");
-
293 
-
294  auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
-
295  if (&srcDB == this)
-
296  return fail("Source and destination databases are the same");
-
297 
-
298  Batch batch;
-
299  batch.reserve(batchWritePreallocationSize);
-
300  auto storeBatch = [&, fname = __func__]() {
-
301  std::uint64_t sz{0};
-
302  for (auto const& nodeObject : batch)
-
303  sz += nodeObject->getData().size();
-
304 
-
305  try
-
306  {
-
307  dstBackend->storeBatch(batch);
-
308  }
-
309  catch (std::exception const& e)
-
310  {
-
311  fail(
-
312  std::string("Exception caught in function ") + fname +
-
313  ". Error: " + e.what());
-
314  return false;
-
315  }
-
316 
-
317  storeStats(batch.size(), sz);
-
318  batch.clear();
-
319  return true;
-
320  };
-
321 
-
322  // Store ledger header
-
323  {
-
324  Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
-
325  s.add32(HashPrefix::ledgerMaster);
-
326  addRaw(srcLedger.info(), s);
-
327  auto nObj = NodeObject::createObject(
-
328  hotLEDGER, std::move(s.modData()), srcLedger.info().hash);
-
329  batch.emplace_back(std::move(nObj));
-
330  }
-
331 
-
332  bool error = false;
-
333  auto visit = [&](SHAMapTreeNode& node) {
-
334  if (!isStopping())
-
335  {
-
336  if (auto nodeObject = srcDB.fetchNodeObject(
-
337  node.getHash().as_uint256(), srcLedger.info().seq))
-
338  {
-
339  batch.emplace_back(std::move(nodeObject));
-
340  if (batch.size() < batchWritePreallocationSize || storeBatch())
-
341  return true;
-
342  }
-
343  }
-
344 
-
345  error = true;
-
346  return false;
-
347  };
-
348 
-
349  // Store the state map
-
350  if (srcLedger.stateMap().getHash().isNonZero())
-
351  {
-
352  if (!srcLedger.stateMap().isValid())
-
353  return fail("Invalid state map");
-
354 
-
355  srcLedger.stateMap().snapShot(false)->visitNodes(visit);
-
356  if (error)
-
357  return fail("Failed to store state map");
-
358  }
-
359 
-
360  // Store the transaction map
-
361  if (srcLedger.info().txHash.isNonZero())
-
362  {
-
363  if (!srcLedger.txMap().isValid())
-
364  return fail("Invalid transaction map");
-
365 
-
366  srcLedger.txMap().snapShot(false)->visitNodes(visit);
-
367  if (error)
-
368  return fail("Failed to store transaction map");
-
369  }
-
370 
-
371  if (!batch.empty() && !storeBatch())
-
372  return fail("Failed to store");
-
373 
-
374  return true;
-
375 }
-
376 
-
377 void
-
378 Database::getCountsJson(Json::Value& obj)
-
379 {
-
380  assert(obj.isObject());
-
381 
-
382  {
-
383  std::unique_lock<std::mutex> lock(readLock_);
-
384  obj["read_queue"] = static_cast<Json::UInt>(read_.size());
-
385  }
-
386 
-
387  obj["read_threads_total"] = readThreads_.load();
-
388  obj["read_threads_running"] = runningThreads_.load();
-
389  obj["read_request_bundle"] = requestBundle_;
-
390 
-
391  obj[jss::node_writes] = std::to_string(storeCount_);
-
392  obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
-
393  obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
-
394  obj[jss::node_written_bytes] = std::to_string(storeSz_);
-
395  obj[jss::node_read_bytes] = std::to_string(fetchSz_);
-
396  obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
-
397 
-
398  if (auto c = getCounters())
-
399  {
-
400  obj[jss::node_read_errors] = std::to_string(c->readErrors);
-
401  obj[jss::node_read_retries] = std::to_string(c->readRetries);
-
402  obj[jss::node_write_retries] = std::to_string(c->writeRetries);
-
403  obj[jss::node_writes_delayed] = std::to_string(c->writesDelayed);
-
404  obj[jss::node_writes_duration_us] = std::to_string(c->writeDurationUs);
-
405  }
-
406 }
-
407 
-
408 } // namespace NodeStore
-
409 } // namespace ripple
+
66  this_thread::set_name("prefetch " + std::to_string(i));
+
67 
+
68  decltype(read_) read;
+
69 
+
70  while (true)
+
71  {
+
72  {
+
73  std::unique_lock<std::mutex> lock(readLock_);
+
74 
+
75  if (isStopping())
+
76  break;
+
77 
+
78  if (read_.empty())
+
79  {
+
80  runningThreads_--;
+
81  readCondVar_.wait(lock);
+
82  runningThreads_++;
+
83  }
+
84 
+
85  if (isStopping())
+
86  break;
+
87 
+
88  // extract multiple object at a time to minimize the
+
89  // overhead of acquiring the mutex.
+
90  for (int cnt = 0;
+
91  !read_.empty() && cnt != requestBundle_;
+
92  ++cnt)
+
93  read.insert(read_.extract(read_.begin()));
+
94  }
+
95 
+
96  for (auto it = read.begin(); it != read.end(); ++it)
+
97  {
+
98  assert(!it->second.empty());
+
99 
+
100  auto const& hash = it->first;
+
101  auto const& data = it->second;
+
102  auto const seqn = data[0].first;
+
103 
+
104  auto obj =
+
105  fetchNodeObject(hash, seqn, FetchType::async);
+
106 
+
107  // This could be further optimized: if there are
+
108  // multiple requests for sequence numbers mapping to
+
109  // multiple databases by sorting requests such that all
+
110  // indices mapping to the same database are grouped
+
111  // together and serviced by a single read.
+
112  for (auto const& req : data)
+
113  {
+
114  req.second(
+
115  (seqn == req.first) || isSameDB(req.first, seqn)
+
116  ? obj
+
117  : fetchNodeObject(
+
118  hash, req.first, FetchType::async));
+
119  }
+
120  }
+
121 
+
122  read.clear();
+
123  }
+
124 
+
125  --runningThreads_;
+
126  --readThreads_;
+
127  },
+
128  i);
+
129  t.detach();
+
130  }
+
131 }
+
132 
+
133 Database::~Database()
+
134 {
+
135  // NOTE!
+
136  // Any derived class should call the stop() method in its
+
137  // destructor. Otherwise, occasionally, the derived class may
+
138  // crash during shutdown when its members are accessed by one of
+
139  // these threads after the derived class is destroyed but before
+
140  // this base class is destroyed.
+
141  stop();
+
142 }
+
143 
+
144 bool
+
145 Database::isStopping() const
+
146 {
+
147  return readStopping_.load(std::memory_order_relaxed);
+
148 }
+
149 
+
150 std::uint32_t
+
151 Database::maxLedgers(std::uint32_t shardIndex) const noexcept
+
152 {
+
153  if (shardIndex > earliestShardIndex_)
+
154  return ledgersPerShard_;
+
155 
+
156  if (shardIndex == earliestShardIndex_)
+
157  return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
+
158 
+
159  assert(!"Invalid shard index");
+
160  return 0;
+
161 }
+
162 
+
163 void
+
164 Database::stop()
+
165 {
+
166  {
+
167  std::lock_guard lock(readLock_);
+
168 
+
169  if (!readStopping_.exchange(true, std::memory_order_relaxed))
+
170  {
+
171  JLOG(j_.debug()) << "Clearing read queue because of stop request";
+
172  read_.clear();
+
173  readCondVar_.notify_all();
+
174  }
+
175  }
+
176 
+
177  JLOG(j_.debug()) << "Waiting for stop request to complete...";
+
178 
+
179  using namespace std::chrono;
+
180 
+
181  auto const start = steady_clock::now();
+
182 
+
183  while (readThreads_.load() != 0)
+
184  {
+
185  assert(steady_clock::now() - start < 30s);
+
186  std::this_thread::yield();
+
187  }
+
188 
+
189  JLOG(j_.debug()) << "Stop request completed in "
+
190  << duration_cast<std::chrono::milliseconds>(
+
191  steady_clock::now() - start)
+
192  .count()
+
193  << " millseconds";
+
194 }
+
195 
+
196 void
+
197 Database::asyncFetch(
+
198  uint256 const& hash,
+
199  std::uint32_t ledgerSeq,
+
200  std::function<void(std::shared_ptr<NodeObject> const&)>&& cb)
+
201 {
+
202  std::lock_guard lock(readLock_);
+
203 
+
204  if (!isStopping())
+
205  {
+
206  read_[hash].emplace_back(ledgerSeq, std::move(cb));
+
207  readCondVar_.notify_one();
+
208  }
+
209 }
+
210 
+
211 void
+
212 Database::importInternal(Backend& dstBackend, Database& srcDB)
+
213 {
+
214  Batch batch;
+
215  batch.reserve(batchWritePreallocationSize);
+
216  auto storeBatch = [&, fname = __func__]() {
+
217  try
+
218  {
+
219  dstBackend.storeBatch(batch);
+
220  }
+
221  catch (std::exception const& e)
+
222  {
+
223  JLOG(j_.error()) << "Exception caught in function " << fname
+
224  << ". Error: " << e.what();
+
225  return;
+
226  }
+
227 
+
228  std::uint64_t sz{0};
+
229  for (auto const& nodeObject : batch)
+
230  sz += nodeObject->getData().size();
+
231  storeStats(batch.size(), sz);
+
232  batch.clear();
+
233  };
+
234 
+
235  srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
+
236  assert(nodeObject);
+
237  if (!nodeObject) // This should never happen
+
238  return;
+
239 
+
240  batch.emplace_back(std::move(nodeObject));
+
241  if (batch.size() >= batchWritePreallocationSize)
+
242  storeBatch();
+
243  });
+
244 
+
245  if (!batch.empty())
+
246  storeBatch();
+
247 }
+
248 
+
249 // Perform a fetch and report the time it took
+
250 std::shared_ptr<NodeObject>
+
251 Database::fetchNodeObject(
+
252  uint256 const& hash,
+
253  std::uint32_t ledgerSeq,
+
254  FetchType fetchType,
+
255  bool duplicate)
+
256 {
+
257  FetchReport fetchReport(fetchType);
+
258 
+
259  using namespace std::chrono;
+
260  auto const begin{steady_clock::now()};
+
261 
+
262  auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
+
263  auto dur = steady_clock::now() - begin;
+
264  fetchDurationUs_ += duration_cast<microseconds>(dur).count();
+
265  if (nodeObject)
+
266  {
+
267  ++fetchHitCount_;
+
268  fetchSz_ += nodeObject->getData().size();
+
269  }
+
270  ++fetchTotalCount_;
+
271 
+
272  fetchReport.elapsed = duration_cast<milliseconds>(dur);
+
273  scheduler_.onFetch(fetchReport);
+
274  return nodeObject;
+
275 }
+
276 
+
277 bool
+
278 Database::storeLedger(
+
279  Ledger const& srcLedger,
+
280  std::shared_ptr<Backend> dstBackend)
+
281 {
+
282  auto fail = [&](std::string const& msg) {
+
283  JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
+
284  << ". " << msg;
+
285  return false;
+
286  };
+
287 
+
288  if (srcLedger.info().hash.isZero())
+
289  return fail("Invalid hash");
+
290  if (srcLedger.info().accountHash.isZero())
+
291  return fail("Invalid account hash");
+
292 
+
293  auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
+
294  if (&srcDB == this)
+
295  return fail("Source and destination databases are the same");
+
296 
+
297  Batch batch;
+
298  batch.reserve(batchWritePreallocationSize);
+
299  auto storeBatch = [&, fname = __func__]() {
+
300  std::uint64_t sz{0};
+
301  for (auto const& nodeObject : batch)
+
302  sz += nodeObject->getData().size();
+
303 
+
304  try
+
305  {
+
306  dstBackend->storeBatch(batch);
+
307  }
+
308  catch (std::exception const& e)
+
309  {
+
310  fail(
+
311  std::string("Exception caught in function ") + fname +
+
312  ". Error: " + e.what());
+
313  return false;
+
314  }
+
315 
+
316  storeStats(batch.size(), sz);
+
317  batch.clear();
+
318  return true;
+
319  };
+
320 
+
321  // Store ledger header
+
322  {
+
323  Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
+
324  s.add32(HashPrefix::ledgerMaster);
+
325  addRaw(srcLedger.info(), s);
+
326  auto nObj = NodeObject::createObject(
+
327  hotLEDGER, std::move(s.modData()), srcLedger.info().hash);
+
328  batch.emplace_back(std::move(nObj));
+
329  }
+
330 
+
331  bool error = false;
+
332  auto visit = [&](SHAMapTreeNode& node) {
+
333  if (!isStopping())
+
334  {
+
335  if (auto nodeObject = srcDB.fetchNodeObject(
+
336  node.getHash().as_uint256(), srcLedger.info().seq))
+
337  {
+
338  batch.emplace_back(std::move(nodeObject));
+
339  if (batch.size() < batchWritePreallocationSize || storeBatch())
+
340  return true;
+
341  }
+
342  }
+
343 
+
344  error = true;
+
345  return false;
+
346  };
+
347 
+
348  // Store the state map
+
349  if (srcLedger.stateMap().getHash().isNonZero())
+
350  {
+
351  if (!srcLedger.stateMap().isValid())
+
352  return fail("Invalid state map");
+
353 
+
354  srcLedger.stateMap().snapShot(false)->visitNodes(visit);
+
355  if (error)
+
356  return fail("Failed to store state map");
+
357  }
+
358 
+
359  // Store the transaction map
+
360  if (srcLedger.info().txHash.isNonZero())
+
361  {
+
362  if (!srcLedger.txMap().isValid())
+
363  return fail("Invalid transaction map");
+
364 
+
365  srcLedger.txMap().snapShot(false)->visitNodes(visit);
+
366  if (error)
+
367  return fail("Failed to store transaction map");
+
368  }
+
369 
+
370  if (!batch.empty() && !storeBatch())
+
371  return fail("Failed to store");
+
372 
+
373  return true;
+
374 }
+
375 
+
376 void
+
377 Database::getCountsJson(Json::Value& obj)
+
378 {
+
379  assert(obj.isObject());
+
380 
+
381  {
+
382  std::unique_lock<std::mutex> lock(readLock_);
+
383  obj["read_queue"] = static_cast<Json::UInt>(read_.size());
+
384  }
+
385 
+
386  obj["read_threads_total"] = readThreads_.load();
+
387  obj["read_threads_running"] = runningThreads_.load();
+
388  obj["read_request_bundle"] = requestBundle_;
+
389 
+
390  obj[jss::node_writes] = std::to_string(storeCount_);
+
391  obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
+
392  obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
+
393  obj[jss::node_written_bytes] = std::to_string(storeSz_);
+
394  obj[jss::node_read_bytes] = std::to_string(fetchSz_);
+
395  obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
+
396 
+
397  if (auto c = getCounters())
+
398  {
+
399  obj[jss::node_read_errors] = std::to_string(c->readErrors);
+
400  obj[jss::node_read_retries] = std::to_string(c->readRetries);
+
401  obj[jss::node_write_retries] = std::to_string(c->writeRetries);
+
402  obj[jss::node_writes_delayed] = std::to_string(c->writesDelayed);
+
403  obj[jss::node_writes_duration_us] = std::to_string(c->writeDurationUs);
+
404  }
+
405 }
+
406 
+
407 } // namespace NodeStore
+
408 } // namespace ripple
Holds a collection of configuration values.
Definition: BasicConfig.h:42
@ ledgerMaster
ledger master data for signing
@@ -502,12 +501,13 @@ $(function() {
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition: Database.h:372
const int requestBundle_
Definition: Database.h:330
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
+
void set_name(std::string s)
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:359
-
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:198
+
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:197
LedgerIndex seq
Definition: LedgerHeader.h:41
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:306
STL class.
-
virtual void stop()
Definition: Database.cpp:165
+
virtual void stop()
Definition: Database.cpp:164
Contains information about a fetch operation.
uint256 accountHash
Definition: LedgerHeader.h:51
@@ -526,7 +526,7 @@ $(function() {
std::atomic< std::uint64_t > storeSz_
Definition: Database.h:358
LedgerInfo const & info() const override
Returns information about the ledger.
Definition: Ledger.h:152
-
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:213
+
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:212
bool isZero() const
Definition: base_uint.h:532
STL class.
Holds a ledger.
Definition: Ledger.h:76
@@ -543,18 +543,17 @@ $(function() {
T wait(T... args)
Scheduling for asynchronous backend activity.
-
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:134
+
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:133
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:333
Information about the notional ledger backing the view.
Definition: LedgerHeader.h:33
FetchType
-
bool isStopping() const
Definition: Database.cpp:146
+
bool isStopping() const
Definition: Database.cpp:145
T notify_one(T... args)
Definition: Serializer.h:40
SHAMap const & txMap() const
Definition: Ledger.h:322
@ async
virtual std::optional< Backend::Counters< std::uint64_t > > getCounters() const
Retrieve backend read and write stats.
Definition: Database.h:401
-
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
T emplace_back(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
@@ -563,17 +562,17 @@ $(function() {
const beast::Journal j_
Definition: Database.h:301
STL namespace.
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
-
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:378
+
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:377
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
T empty(T... args)
Stream debug() const
Definition: Journal.h:315
@ hotLEDGER
Definition: NodeObject.h:34
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
-
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:252
+
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:251
Scheduler & scheduler_
Definition: Database.h:302
std::atomic< int > runningThreads_
Definition: Database.h:376
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
-
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:152
+
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:151
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
std::chrono::milliseconds elapsed
T notify_all(T... args)
diff --git a/Database_8h_source.html b/Database_8h_source.html index 292b2275ec..af7fe86447 100644 --- a/Database_8h_source.html +++ b/Database_8h_source.html @@ -381,10 +381,10 @@ $(function() {
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition: Database.h:372
const int requestBundle_
Definition: Database.h:330
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:359
-
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:198
+
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:197
virtual void sync()=0
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:306
-
virtual void stop()
Definition: Database.cpp:165
+
virtual void stop()
Definition: Database.cpp:164
Contains information about a fetch operation.
virtual void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t ledgerSeq)=0
Store the object.
@@ -400,7 +400,7 @@ $(function() {
std::uint64_t getStoreSize() const
Definition: Database.h:200
std::atomic< std::uint64_t > storeSz_
Definition: Database.h:358
std::uint32_t firstLedgerSeq(std::uint32_t shardIndex) const noexcept
Calculates the first ledger sequence for a given shard index.
Definition: Database.h:257
-
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:213
+
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:212
Holds a ledger.
Definition: Ledger.h:76
std::condition_variable readCondVar_
Definition: Database.h:364
@@ -409,12 +409,12 @@ $(function() {
STL class.
Scheduling for asynchronous backend activity.
-
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:134
+
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:133
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
virtual void sweep()=0
Remove expired entries from the positive and negative caches.
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:333
FetchType
-
bool isStopping() const
Definition: Database.cpp:146
+
bool isStopping() const
Definition: Database.cpp:145
void threadEntry()
virtual std::optional< Backend::Counters< std::uint64_t > > getCounters() const
Retrieve backend read and write stats.
Definition: Database.h:401
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
@@ -423,20 +423,20 @@ $(function() {
std::uint32_t seqToShardIndex(std::uint32_t ledgerSeq) const noexcept
Calculates the shard index for a given ledger sequence.
Definition: Database.h:283
std::atomic< std::uint64_t > storeDurationUs_
Definition: Database.h:361
std::uint32_t earliestShardIndex() const noexcept
Definition: Database.h:246
-
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:378
+
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:377
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
std::uint32_t getFetchHitCount() const
Definition: Database.h:194
STL class.
const std::uint32_t earliestShardIndex_
Definition: Database.h:325
-
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:252
+
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:251
Scheduler & scheduler_
Definition: Database.h:302
std::atomic< int > runningThreads_
Definition: Database.h:376
std::uint32_t getFetchTotalCount() const
Definition: Database.h:188
std::uint32_t earliestLedgerSeq() const noexcept
Definition: Database.h:238
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
-
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:152
+
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:151
@ synchronous
virtual void importDatabase(Database &source)=0
Import objects from another database.
std::uint32_t ledgersPerShard() const noexcept
Definition: Database.h:230
diff --git a/ETLSource_8cpp_source.html b/ETLSource_8cpp_source.html index e59273d1d6..1ee4882248 100644 --- a/ETLSource_8cpp_source.html +++ b/ETLSource_8cpp_source.html @@ -90,1001 +90,1000 @@ $(function() {
19 
20 #include <ripple/app/reporting/ETLSource.h>
21 #include <ripple/app/reporting/ReportingETL.h>
-
22 #include <ripple/beast/core/CurrentThreadName.h>
-
23 #include <ripple/json/json_reader.h>
-
24 #include <ripple/json/json_writer.h>
-
25 
-
26 namespace ripple {
-
27 
-
28 // Create ETL source without grpc endpoint
-
29 // Fetch ledger and load initial ledger will fail for this source
-
30 // Primarly used in read-only mode, to monitor when ledgers are validated
-
31 ETLSource::ETLSource(std::string ip, std::string wsPort, ReportingETL& etl)
-
32  : ip_(ip)
-
33  , wsPort_(wsPort)
-
34  , etl_(etl)
-
35  , ioc_(etl.getApplication().getIOService())
-
36  , ws_(std::make_unique<
-
37  boost::beast::websocket::stream<boost::beast::tcp_stream>>(
-
38  boost::asio::make_strand(ioc_)))
-
39  , resolver_(boost::asio::make_strand(ioc_))
-
40  , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
-
41  , journal_(etl_.getApplication().journal("ReportingETL::ETLSource"))
-
42  , app_(etl_.getApplication())
-
43  , timer_(ioc_)
-
44 {
-
45 }
-
46 
-
47 ETLSource::ETLSource(
-
48  std::string ip,
-
49  std::string wsPort,
-
50  std::string grpcPort,
-
51  ReportingETL& etl)
-
52  : ip_(ip)
-
53  , wsPort_(wsPort)
-
54  , grpcPort_(grpcPort)
-
55  , etl_(etl)
-
56  , ioc_(etl.getApplication().getIOService())
-
57  , ws_(std::make_unique<
-
58  boost::beast::websocket::stream<boost::beast::tcp_stream>>(
-
59  boost::asio::make_strand(ioc_)))
-
60  , resolver_(boost::asio::make_strand(ioc_))
-
61  , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
-
62  , journal_(etl_.getApplication().journal("ReportingETL::ETLSource"))
-
63  , app_(etl_.getApplication())
-
64  , timer_(ioc_)
-
65 {
-
66  std::string connectionString;
-
67  try
-
68  {
-
69  connectionString =
-
70  beast::IP::Endpoint(
-
71  boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
-
72  .to_string();
-
73 
-
74  JLOG(journal_.info())
-
75  << "Using IP to connect to ETL source: " << connectionString;
-
76  }
-
77  catch (std::exception const&)
-
78  {
-
79  connectionString = "dns:" + ip_ + ":" + grpcPort_;
-
80  JLOG(journal_.info())
-
81  << "Using DNS to connect to ETL source: " << connectionString;
-
82  }
-
83  try
-
84  {
-
85  stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
-
86  grpc::CreateChannel(
-
87  connectionString, grpc::InsecureChannelCredentials()));
-
88  JLOG(journal_.info()) << "Made stub for remote = " << toString();
-
89  }
-
90  catch (std::exception const& e)
-
91  {
-
92  JLOG(journal_.error()) << "Exception while creating stub = " << e.what()
-
93  << " . Remote = " << toString();
-
94  }
-
95 }
-
96 
-
97 void
-
98 ETLSource::reconnect(boost::beast::error_code ec)
-
99 {
-
100  connected_ = false;
-
101  // These are somewhat normal errors. operation_aborted occurs on shutdown,
-
102  // when the timer is cancelled. connection_refused will occur repeatedly
-
103  // if we cannot connect to the transaction processing process
-
104  if (ec != boost::asio::error::operation_aborted &&
-
105  ec != boost::asio::error::connection_refused)
-
106  {
-
107  JLOG(journal_.error()) << __func__ << " : "
-
108  << "error code = " << ec << " - " << toString();
-
109  }
-
110  else
-
111  {
-
112  JLOG(journal_.warn()) << __func__ << " : "
-
113  << "error code = " << ec << " - " << toString();
-
114  }
-
115 
-
116  if (etl_.isStopping())
-
117  {
-
118  JLOG(journal_.debug()) << __func__ << " : " << toString()
-
119  << " - etl is stopping. aborting reconnect";
-
120  return;
-
121  }
-
122 
-
123  // exponentially increasing timeouts, with a max of 30 seconds
-
124  size_t waitTime = std::min(pow(2, numFailures_), 30.0);
-
125  numFailures_++;
-
126  timer_.expires_after(boost::asio::chrono::seconds(waitTime));
-
127  timer_.async_wait([this, fname = __func__](auto ec) {
-
128  bool startAgain = (ec != boost::asio::error::operation_aborted);
-
129  JLOG(journal_.trace()) << fname << " async_wait : ec = " << ec;
-
130  close(startAgain);
-
131  });
-
132 }
-
133 
-
134 void
-
135 ETLSource::close(bool startAgain)
-
136 {
-
137  timer_.cancel();
-
138  ioc_.post([this, startAgain]() {
-
139  if (closing_)
-
140  return;
-
141 
-
142  if (ws_->is_open())
-
143  {
-
144  // onStop() also calls close(). If the async_close is called twice,
-
145  // an assertion fails. Using closing_ makes sure async_close is only
-
146  // called once
-
147  closing_ = true;
-
148  ws_->async_close(
-
149  boost::beast::websocket::close_code::normal,
-
150  [this, startAgain, fname = __func__](auto ec) {
-
151  if (ec)
-
152  {
-
153  JLOG(journal_.error())
-
154  << fname << " async_close : "
-
155  << "error code = " << ec << " - " << toString();
-
156  }
-
157  closing_ = false;
-
158  if (startAgain)
-
159  start();
-
160  });
-
161  }
-
162  else if (startAgain)
-
163  {
-
164  start();
-
165  }
-
166  });
-
167 }
-
168 
-
169 void
-
170 ETLSource::start()
-
171 {
-
172  JLOG(journal_.trace()) << __func__ << " : " << toString();
-
173 
-
174  auto const host = ip_;
-
175  auto const port = wsPort_;
-
176 
-
177  resolver_.async_resolve(
-
178  host, port, [this](auto ec, auto results) { onResolve(ec, results); });
-
179 }
-
180 
-
181 void
-
182 ETLSource::onResolve(
-
183  boost::beast::error_code ec,
-
184  boost::asio::ip::tcp::resolver::results_type results)
-
185 {
-
186  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
-
187  << toString();
-
188  if (ec)
-
189  {
-
190  // try again
-
191  reconnect(ec);
-
192  }
-
193  else
-
194  {
-
195  boost::beast::get_lowest_layer(*ws_).expires_after(
-
196  std::chrono::seconds(30));
-
197  boost::beast::get_lowest_layer(*ws_).async_connect(
-
198  results, [this](auto ec, auto ep) { onConnect(ec, ep); });
-
199  }
-
200 }
-
201 
-
202 void
-
203 ETLSource::onConnect(
-
204  boost::beast::error_code ec,
-
205  boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
-
206 {
-
207  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
-
208  << toString();
-
209  if (ec)
-
210  {
-
211  // start over
-
212  reconnect(ec);
-
213  }
-
214  else
-
215  {
-
216  numFailures_ = 0;
-
217  // Turn off timeout on the tcp stream, because websocket stream has it's
-
218  // own timeout system
-
219  boost::beast::get_lowest_layer(*ws_).expires_never();
-
220 
-
221  // Set suggested timeout settings for the websocket
-
222  ws_->set_option(
-
223  boost::beast::websocket::stream_base::timeout::suggested(
-
224  boost::beast::role_type::client));
-
225 
-
226  // Set a decorator to change the User-Agent of the handshake
-
227  ws_->set_option(boost::beast::websocket::stream_base::decorator(
-
228  [](boost::beast::websocket::request_type& req) {
-
229  req.set(
-
230  boost::beast::http::field::user_agent,
-
231  std::string(BOOST_BEAST_VERSION_STRING) +
-
232  " websocket-client-async");
-
233  }));
-
234 
-
235  // Update the host_ string. This will provide the value of the
-
236  // Host HTTP header during the WebSocket handshake.
-
237  // See https://tools.ietf.org/html/rfc7230#section-5.4
-
238  auto host = ip_ + ':' + std::to_string(endpoint.port());
-
239  // Perform the websocket handshake
-
240  ws_->async_handshake(host, "/", [this](auto ec) { onHandshake(ec); });
-
241  }
-
242 }
-
243 
-
244 void
-
245 ETLSource::onHandshake(boost::beast::error_code ec)
-
246 {
-
247  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
-
248  << toString();
-
249  if (ec)
-
250  {
-
251  // start over
-
252  reconnect(ec);
-
253  }
-
254  else
-
255  {
-
256  Json::Value jv;
-
257  jv["command"] = "subscribe";
-
258 
-
259  jv["streams"] = Json::arrayValue;
-
260  Json::Value ledgerStream("ledger");
-
261  jv["streams"].append(ledgerStream);
-
262  Json::Value txnStream("transactions_proposed");
-
263  jv["streams"].append(txnStream);
-
264  Json::Value validationStream("validations");
-
265  jv["streams"].append(validationStream);
-
266  Json::Value manifestStream("manifests");
-
267  jv["streams"].append(manifestStream);
-
268  Json::FastWriter fastWriter;
-
269 
-
270  JLOG(journal_.trace()) << "Sending subscribe stream message";
-
271  // Send the message
-
272  ws_->async_write(
-
273  boost::asio::buffer(fastWriter.write(jv)),
-
274  [this](auto ec, size_t size) { onWrite(ec, size); });
-
275  }
-
276 }
-
277 
-
278 void
-
279 ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
-
280 {
-
281  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
-
282  << toString();
-
283  if (ec)
-
284  {
-
285  // start over
-
286  reconnect(ec);
-
287  }
-
288  else
-
289  {
-
290  ws_->async_read(
-
291  readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
-
292  }
-
293 }
-
294 
-
295 void
-
296 ETLSource::onRead(boost::beast::error_code ec, size_t size)
-
297 {
-
298  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
-
299  << toString();
-
300  // if error or error reading message, start over
-
301  if (ec)
-
302  {
-
303  reconnect(ec);
-
304  }
-
305  else
-
306  {
-
307  handleMessage();
-
308  boost::beast::flat_buffer buffer;
-
309  swap(readBuffer_, buffer);
-
310 
-
311  JLOG(journal_.trace())
-
312  << __func__ << " : calling async_read - " << toString();
-
313  ws_->async_read(
-
314  readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
-
315  }
-
316 }
-
317 
-
318 bool
-
319 ETLSource::handleMessage()
-
320 {
-
321  JLOG(journal_.trace()) << __func__ << " : " << toString();
-
322 
-
323  setLastMsgTime();
-
324  connected_ = true;
-
325  try
-
326  {
-
327  Json::Value response;
-
328  Json::Reader reader;
-
329  if (!reader.parse(
-
330  static_cast<char const*>(readBuffer_.data().data()), response))
-
331  {
-
332  JLOG(journal_.error())
-
333  << __func__ << " : "
-
334  << "Error parsing stream message."
-
335  << " Message = " << readBuffer_.data().data();
-
336  return false;
-
337  }
-
338 
-
339  uint32_t ledgerIndex = 0;
-
340  if (response.isMember("result"))
-
341  {
-
342  if (response["result"].isMember(jss::ledger_index))
-
343  {
-
344  ledgerIndex = response["result"][jss::ledger_index].asUInt();
-
345  }
-
346  if (response[jss::result].isMember(jss::validated_ledgers))
-
347  {
-
348  setValidatedRange(
-
349  response[jss::result][jss::validated_ledgers].asString());
-
350  }
-
351  JLOG(journal_.debug())
-
352  << __func__ << " : "
-
353  << "Received a message on ledger "
-
354  << " subscription stream. Message : "
-
355  << response.toStyledString() << " - " << toString();
-
356  }
-
357  else
-
358  {
-
359  if (etl_.getETLLoadBalancer().shouldPropagateStream(this))
-
360  {
-
361  if (response.isMember(jss::transaction))
-
362  {
-
363  etl_.getApplication().getOPs().forwardProposedTransaction(
-
364  response);
-
365  }
-
366  else if (
-
367  response.isMember("type") &&
-
368  response["type"] == "validationReceived")
-
369  {
-
370  etl_.getApplication().getOPs().forwardValidation(response);
-
371  }
-
372  else if (
-
373  response.isMember("type") &&
-
374  response["type"] == "manifestReceived")
-
375  {
-
376  etl_.getApplication().getOPs().forwardManifest(response);
-
377  }
-
378  }
-
379 
-
380  if (response.isMember("type") && response["type"] == "ledgerClosed")
-
381  {
-
382  JLOG(journal_.debug())
-
383  << __func__ << " : "
-
384  << "Received a message on ledger "
-
385  << " subscription stream. Message : "
-
386  << response.toStyledString() << " - " << toString();
-
387  if (response.isMember(jss::ledger_index))
-
388  {
-
389  ledgerIndex = response[jss::ledger_index].asUInt();
-
390  }
-
391  if (response.isMember(jss::validated_ledgers))
-
392  {
-
393  setValidatedRange(
-
394  response[jss::validated_ledgers].asString());
-
395  }
-
396  }
-
397  }
-
398 
-
399  if (ledgerIndex != 0)
-
400  {
-
401  JLOG(journal_.trace())
-
402  << __func__ << " : "
-
403  << "Pushing ledger sequence = " << ledgerIndex << " - "
-
404  << toString();
-
405  networkValidatedLedgers_.push(ledgerIndex);
-
406  }
-
407  return true;
-
408  }
-
409  catch (std::exception const& e)
-
410  {
-
411  JLOG(journal_.error()) << "Exception in handleMessage : " << e.what();
-
412  return false;
-
413  }
-
414 }
-
415 
-
416 class AsyncCallData
-
417 {
-
418  std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> cur_;
-
419  std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> next_;
-
420 
-
421  org::xrpl::rpc::v1::GetLedgerDataRequest request_;
-
422  std::unique_ptr<grpc::ClientContext> context_;
-
423 
-
424  grpc::Status status_;
-
425 
-
426  unsigned char nextPrefix_;
-
427 
-
428  beast::Journal journal_;
-
429 
-
430 public:
-
431  AsyncCallData(
-
432  uint256& marker,
-
433  std::optional<uint256> nextMarker,
-
434  uint32_t seq,
-
435  beast::Journal& j)
-
436  : journal_(j)
-
437  {
-
438  request_.mutable_ledger()->set_sequence(seq);
-
439  if (marker.isNonZero())
-
440  {
-
441  request_.set_marker(marker.data(), marker.size());
-
442  }
-
443  request_.set_user("ETL");
-
444  nextPrefix_ = 0x00;
-
445  if (nextMarker)
-
446  nextPrefix_ = nextMarker->data()[0];
-
447 
-
448  unsigned char prefix = marker.data()[0];
-
449 
-
450  JLOG(journal_.debug())
-
451  << "Setting up AsyncCallData. marker = " << strHex(marker)
-
452  << " . prefix = " << strHex(std::string(1, prefix))
-
453  << " . nextPrefix_ = " << strHex(std::string(1, nextPrefix_));
-
454 
-
455  assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
-
456 
-
457  cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
-
458 
-
459  next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
-
460 
-
461  context_ = std::make_unique<grpc::ClientContext>();
-
462  }
-
463 
-
464  enum class CallStatus { MORE, DONE, ERRORED };
-
465  CallStatus
-
466  process(
-
467  std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
-
468  grpc::CompletionQueue& cq,
-
469  ThreadSafeQueue<std::shared_ptr<SLE>>& queue,
-
470  bool abort = false)
-
471  {
-
472  JLOG(journal_.debug()) << "Processing calldata";
-
473  if (abort)
-
474  {
-
475  JLOG(journal_.error()) << "AsyncCallData aborted";
-
476  return CallStatus::ERRORED;
-
477  }
-
478  if (!status_.ok())
-
479  {
-
480  JLOG(journal_.debug()) << "AsyncCallData status_ not ok: "
-
481  << " code = " << status_.error_code()
-
482  << " message = " << status_.error_message();
-
483  return CallStatus::ERRORED;
-
484  }
-
485  if (!next_->is_unlimited())
-
486  {
-
487  JLOG(journal_.warn())
-
488  << "AsyncCallData is_unlimited is false. Make sure "
-
489  "secure_gateway is set correctly at the ETL source";
-
490  assert(false);
-
491  }
-
492 
-
493  std::swap(cur_, next_);
-
494 
-
495  bool more = true;
-
496 
-
497  // if no marker returned, we are done
-
498  if (cur_->marker().size() == 0)
-
499  more = false;
-
500 
-
501  // if returned marker is greater than our end, we are done
-
502  unsigned char prefix = cur_->marker()[0];
-
503  if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
-
504  more = false;
-
505 
-
506  // if we are not done, make the next async call
-
507  if (more)
-
508  {
-
509  request_.set_marker(std::move(cur_->marker()));
-
510  call(stub, cq);
-
511  }
-
512 
-
513  for (auto& obj : cur_->ledger_objects().objects())
-
514  {
-
515  auto key = uint256::fromVoidChecked(obj.key());
-
516  if (!key)
-
517  throw std::runtime_error("Received malformed object ID");
-
518 
-
519  auto& data = obj.data();
-
520 
-
521  SerialIter it{data.data(), data.size()};
-
522  std::shared_ptr<SLE> sle = std::make_shared<SLE>(it, *key);
-
523 
-
524  queue.push(sle);
-
525  }
-
526 
-
527  return more ? CallStatus::MORE : CallStatus::DONE;
-
528  }
-
529 
-
530  void
-
531  call(
-
532  std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
-
533  grpc::CompletionQueue& cq)
-
534  {
-
535  context_ = std::make_unique<grpc::ClientContext>();
-
536 
-
537  std::unique_ptr<grpc::ClientAsyncResponseReader<
-
538  org::xrpl::rpc::v1::GetLedgerDataResponse>>
-
539  rpc(stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq));
-
540 
-
541  rpc->StartCall();
-
542 
-
543  rpc->Finish(next_.get(), &status_, this);
-
544  }
-
545 
-
546  std::string
-
547  getMarkerPrefix()
-
548  {
-
549  if (next_->marker().size() == 0)
-
550  return "";
-
551  else
-
552  return strHex(std::string{next_->marker().data()[0]});
-
553  }
-
554 };
-
555 
-
556 bool
-
557 ETLSource::loadInitialLedger(
-
558  uint32_t sequence,
-
559  ThreadSafeQueue<std::shared_ptr<SLE>>& writeQueue)
-
560 {
-
561  if (!stub_)
-
562  return false;
-
563 
-
564  grpc::CompletionQueue cq;
-
565 
-
566  void* tag;
-
567 
-
568  bool ok = false;
-
569 
-
570  std::vector<AsyncCallData> calls;
-
571  std::vector<uint256> markers{getMarkers(etl_.getNumMarkers())};
-
572 
-
573  for (size_t i = 0; i < markers.size(); ++i)
-
574  {
-
575  std::optional<uint256> nextMarker;
-
576  if (i + 1 < markers.size())
-
577  nextMarker = markers[i + 1];
-
578  calls.emplace_back(markers[i], nextMarker, sequence, journal_);
-
579  }
-
580 
-
581  JLOG(journal_.debug()) << "Starting data download for ledger " << sequence
-
582  << ". Using source = " << toString();
-
583 
-
584  for (auto& c : calls)
-
585  c.call(stub_, cq);
-
586 
-
587  size_t numFinished = 0;
-
588  bool abort = false;
-
589  while (numFinished < calls.size() && !etl_.isStopping() &&
-
590  cq.Next(&tag, &ok))
-
591  {
-
592  assert(tag);
-
593 
-
594  auto ptr = static_cast<AsyncCallData*>(tag);
-
595 
-
596  if (!ok)
-
597  {
-
598  JLOG(journal_.error()) << "loadInitialLedger - ok is false";
-
599  return false;
-
600  // handle cancelled
-
601  }
-
602  else
-
603  {
-
604  JLOG(journal_.debug())
-
605  << "Marker prefix = " << ptr->getMarkerPrefix();
-
606  auto result = ptr->process(stub_, cq, writeQueue, abort);
-
607  if (result != AsyncCallData::CallStatus::MORE)
-
608  {
-
609  numFinished++;
-
610  JLOG(journal_.debug())
-
611  << "Finished a marker. "
-
612  << "Current number of finished = " << numFinished;
-
613  }
-
614  if (result == AsyncCallData::CallStatus::ERRORED)
-
615  {
-
616  abort = true;
-
617  }
-
618  }
-
619  }
-
620  return !abort;
-
621 }
-
622 
-
623 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
-
624 ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
-
625 {
-
626  org::xrpl::rpc::v1::GetLedgerResponse response;
-
627  if (!stub_)
-
628  return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
-
629 
-
630  // ledger header with txns and metadata
-
631  org::xrpl::rpc::v1::GetLedgerRequest request;
-
632  grpc::ClientContext context;
-
633  request.mutable_ledger()->set_sequence(ledgerSequence);
-
634  request.set_transactions(true);
-
635  request.set_expand(true);
-
636  request.set_get_objects(getObjects);
-
637  request.set_user("ETL");
-
638  grpc::Status status = stub_->GetLedger(&context, request, &response);
-
639  if (status.ok() && !response.is_unlimited())
-
640  {
-
641  JLOG(journal_.warn()) << "ETLSource::fetchLedger - is_unlimited is "
-
642  "false. Make sure secure_gateway is set "
-
643  "correctly on the ETL source. source = "
-
644  << toString();
-
645  assert(false);
-
646  }
-
647  return {status, std::move(response)};
-
648 }
-
649 
-
650 ETLLoadBalancer::ETLLoadBalancer(ReportingETL& etl)
-
651  : etl_(etl)
-
652  , journal_(etl_.getApplication().journal("ReportingETL::LoadBalancer"))
-
653 {
-
654 }
-
655 
-
656 void
-
657 ETLLoadBalancer::add(
-
658  std::string& host,
-
659  std::string& websocketPort,
-
660  std::string& grpcPort)
-
661 {
-
662  std::unique_ptr<ETLSource> ptr =
-
663  std::make_unique<ETLSource>(host, websocketPort, grpcPort, etl_);
-
664  sources_.push_back(std::move(ptr));
-
665  JLOG(journal_.info()) << __func__ << " : added etl source - "
-
666  << sources_.back()->toString();
-
667 }
-
668 
-
669 void
-
670 ETLLoadBalancer::add(std::string& host, std::string& websocketPort)
-
671 {
-
672  std::unique_ptr<ETLSource> ptr =
-
673  std::make_unique<ETLSource>(host, websocketPort, etl_);
-
674  sources_.push_back(std::move(ptr));
-
675  JLOG(journal_.info()) << __func__ << " : added etl source - "
-
676  << sources_.back()->toString();
-
677 }
-
678 
-
679 void
-
680 ETLLoadBalancer::loadInitialLedger(
-
681  uint32_t sequence,
-
682  ThreadSafeQueue<std::shared_ptr<SLE>>& writeQueue)
-
683 {
-
684  execute(
-
685  [this, &sequence, &writeQueue](auto& source) {
-
686  bool res = source->loadInitialLedger(sequence, writeQueue);
-
687  if (!res)
-
688  {
-
689  JLOG(journal_.error()) << "Failed to download initial ledger. "
-
690  << " Sequence = " << sequence
-
691  << " source = " << source->toString();
-
692  }
-
693  return res;
-
694  },
-
695  sequence);
-
696 }
-
697 
-
698 std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
-
699 ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects)
-
700 {
-
701  org::xrpl::rpc::v1::GetLedgerResponse response;
-
702  bool success = execute(
-
703  [&response, ledgerSequence, getObjects, this](auto& source) {
-
704  auto [status, data] =
-
705  source->fetchLedger(ledgerSequence, getObjects);
-
706  response = std::move(data);
-
707  if (status.ok() && response.validated())
-
708  {
-
709  JLOG(journal_.info())
-
710  << "Successfully fetched ledger = " << ledgerSequence
-
711  << " from source = " << source->toString();
-
712  return true;
-
713  }
-
714  else
-
715  {
-
716  JLOG(journal_.warn())
-
717  << "Error getting ledger = " << ledgerSequence
-
718  << " Reply : " << response.DebugString()
-
719  << " error_code : " << status.error_code()
-
720  << " error_msg : " << status.error_message()
-
721  << " source = " << source->toString();
-
722  return false;
-
723  }
-
724  },
-
725  ledgerSequence);
-
726  if (success)
-
727  return response;
-
728  else
-
729  return {};
-
730 }
-
731 
-
732 std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
-
733 ETLLoadBalancer::getP2pForwardingStub() const
-
734 {
-
735  if (sources_.size() == 0)
-
736  return nullptr;
-
737  srand((unsigned)time(0));
-
738  auto sourceIdx = rand() % sources_.size();
-
739  auto numAttempts = 0;
-
740  while (numAttempts < sources_.size())
-
741  {
-
742  auto stub = sources_[sourceIdx]->getP2pForwardingStub();
-
743  if (!stub)
-
744  {
-
745  sourceIdx = (sourceIdx + 1) % sources_.size();
-
746  ++numAttempts;
-
747  continue;
-
748  }
-
749  return stub;
-
750  }
-
751  return nullptr;
-
752 }
-
753 
-
754 Json::Value
-
755 ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const
-
756 {
-
757  Json::Value res;
-
758  if (sources_.size() == 0)
-
759  return res;
-
760  srand((unsigned)time(0));
-
761  auto sourceIdx = rand() % sources_.size();
-
762  auto numAttempts = 0;
-
763 
-
764  auto mostRecent = etl_.getNetworkValidatedLedgers().tryGetMostRecent();
-
765  while (numAttempts < sources_.size())
-
766  {
-
767  auto increment = [&]() {
-
768  sourceIdx = (sourceIdx + 1) % sources_.size();
-
769  ++numAttempts;
-
770  };
-
771  auto& src = sources_[sourceIdx];
-
772  if (mostRecent && !src->hasLedger(*mostRecent))
-
773  {
-
774  increment();
-
775  continue;
-
776  }
-
777  res = src->forwardToP2p(context);
-
778  if (!res.isMember("forwarded") || res["forwarded"] != true)
-
779  {
-
780  increment();
-
781  continue;
-
782  }
-
783  return res;
-
784  }
-
785  RPC::Status err = {rpcFAILED_TO_FORWARD};
-
786  err.inject(res);
-
787  return res;
-
788 }
-
789 
-
790 std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
-
791 ETLSource::getP2pForwardingStub() const
-
792 {
-
793  if (!connected_)
-
794  return nullptr;
-
795  try
-
796  {
-
797  return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
-
798  grpc::CreateChannel(
-
799  beast::IP::Endpoint(
-
800  boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
-
801  .to_string(),
-
802  grpc::InsecureChannelCredentials()));
-
803  }
-
804  catch (std::exception const&)
-
805  {
-
806  JLOG(journal_.error()) << "Failed to create grpc stub";
-
807  return nullptr;
-
808  }
-
809 }
-
810 
-
811 Json::Value
-
812 ETLSource::forwardToP2p(RPC::JsonContext& context) const
-
813 {
-
814  JLOG(journal_.debug()) << "Attempting to forward request to tx. "
-
815  << "request = " << context.params.toStyledString();
-
816 
-
817  Json::Value response;
-
818  if (!connected_)
-
819  {
-
820  JLOG(journal_.error())
-
821  << "Attempted to proxy but failed to connect to tx";
-
822  return response;
-
823  }
-
824  namespace beast = boost::beast; // from <boost/beast.hpp>
-
825  namespace http = beast::http; // from <boost/beast/http.hpp>
-
826  namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
-
827  namespace net = boost::asio; // from <boost/asio.hpp>
-
828  using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
-
829  Json::Value& request = context.params;
-
830  try
-
831  {
-
832  // The io_context is required for all I/O
-
833  net::io_context ioc;
-
834 
-
835  // These objects perform our I/O
-
836  tcp::resolver resolver{ioc};
-
837 
-
838  JLOG(journal_.debug()) << "Creating websocket";
-
839  auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
-
840 
-
841  // Look up the domain name
-
842  auto const results = resolver.resolve(ip_, wsPort_);
-
843 
-
844  JLOG(journal_.debug()) << "Connecting websocket";
-
845  // Make the connection on the IP address we get from a lookup
-
846  net::connect(ws->next_layer(), results.begin(), results.end());
-
847 
-
848  // Set a decorator to change the User-Agent of the handshake
-
849  // and to tell rippled to charge the client IP for RPC
-
850  // resources. See "secure_gateway" in
-
851  // https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
-
852  ws->set_option(websocket::stream_base::decorator(
-
853  [&context](websocket::request_type& req) {
-
854  req.set(
-
855  http::field::user_agent,
-
856  std::string(BOOST_BEAST_VERSION_STRING) +
-
857  " websocket-client-coro");
-
858  req.set(
-
859  http::field::forwarded,
-
860  "for=" + context.consumer.to_string());
-
861  }));
-
862  JLOG(journal_.debug()) << "client ip: " << context.consumer.to_string();
-
863 
-
864  JLOG(journal_.debug()) << "Performing websocket handshake";
-
865  // Perform the websocket handshake
-
866  ws->handshake(ip_, "/");
-
867 
-
868  Json::FastWriter fastWriter;
-
869 
-
870  JLOG(journal_.debug()) << "Sending request";
-
871  // Send the message
-
872  ws->write(net::buffer(fastWriter.write(request)));
-
873 
-
874  beast::flat_buffer buffer;
-
875  ws->read(buffer);
-
876 
-
877  Json::Reader reader;
-
878  if (!reader.parse(
-
879  static_cast<char const*>(buffer.data().data()), response))
-
880  {
-
881  JLOG(journal_.error()) << "Error parsing response";
-
882  response[jss::error] = "Error parsing response from tx";
-
883  }
-
884  JLOG(journal_.debug()) << "Successfully forward request";
-
885 
-
886  response["forwarded"] = true;
-
887  return response;
-
888  }
-
889  catch (std::exception const& e)
-
890  {
-
891  JLOG(journal_.error()) << "Encountered exception : " << e.what();
-
892  return response;
-
893  }
-
894 }
-
895 
-
896 template <class Func>
-
897 bool
-
898 ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
-
899 {
-
900  srand((unsigned)time(0));
-
901  auto sourceIdx = rand() % sources_.size();
-
902  auto numAttempts = 0;
-
903 
-
904  while (!etl_.isStopping())
-
905  {
-
906  auto& source = sources_[sourceIdx];
-
907 
-
908  JLOG(journal_.debug())
-
909  << __func__ << " : "
-
910  << "Attempting to execute func. ledger sequence = "
-
911  << ledgerSequence << " - source = " << source->toString();
-
912  if (source->hasLedger(ledgerSequence))
-
913  {
-
914  bool res = f(source);
-
915  if (res)
-
916  {
-
917  JLOG(journal_.debug())
-
918  << __func__ << " : "
-
919  << "Successfully executed func at source = "
-
920  << source->toString()
-
921  << " - ledger sequence = " << ledgerSequence;
-
922  break;
-
923  }
-
924  else
-
925  {
-
926  JLOG(journal_.warn())
-
927  << __func__ << " : "
-
928  << "Failed to execute func at source = "
-
929  << source->toString()
-
930  << " - ledger sequence = " << ledgerSequence;
-
931  }
-
932  }
-
933  else
-
934  {
-
935  JLOG(journal_.warn())
-
936  << __func__ << " : "
-
937  << "Ledger not present at source = " << source->toString()
-
938  << " - ledger sequence = " << ledgerSequence;
-
939  }
-
940  sourceIdx = (sourceIdx + 1) % sources_.size();
-
941  numAttempts++;
-
942  if (numAttempts % sources_.size() == 0)
-
943  {
-
944  // If another process loaded the ledger into the database, we can
-
945  // abort trying to fetch the ledger from a transaction processing
-
946  // process
-
947  if (etl_.getApplication().getLedgerMaster().getLedgerBySeq(
-
948  ledgerSequence))
-
949  {
-
950  JLOG(journal_.warn())
-
951  << __func__ << " : "
-
952  << "Error executing function. "
-
953  << " Tried all sources, but ledger was found in db."
-
954  << " Sequence = " << ledgerSequence;
-
955  return false;
-
956  }
-
957  JLOG(journal_.error())
-
958  << __func__ << " : "
-
959  << "Error executing function "
-
960  << " - ledger sequence = " << ledgerSequence
-
961  << " - Tried all sources. Sleeping and trying again";
-
962  std::this_thread::sleep_for(std::chrono::seconds(2));
-
963  }
-
964  }
-
965  return !etl_.isStopping();
-
966 }
-
967 
-
968 void
-
969 ETLLoadBalancer::start()
-
970 {
-
971  for (auto& source : sources_)
-
972  source->start();
-
973 }
-
974 
-
975 void
-
976 ETLLoadBalancer::stop()
-
977 {
-
978  for (auto& source : sources_)
-
979  source->stop();
-
980 }
-
981 
-
982 } // namespace ripple
+
22 #include <ripple/json/json_reader.h>
+
23 #include <ripple/json/json_writer.h>
+
24 
+
25 namespace ripple {
+
26 
+
27 // Create ETL source without grpc endpoint
+
28 // Fetch ledger and load initial ledger will fail for this source
+
29 // Primarly used in read-only mode, to monitor when ledgers are validated
+
30 ETLSource::ETLSource(std::string ip, std::string wsPort, ReportingETL& etl)
+
31  : ip_(ip)
+
32  , wsPort_(wsPort)
+
33  , etl_(etl)
+
34  , ioc_(etl.getApplication().getIOService())
+
35  , ws_(std::make_unique<
+
36  boost::beast::websocket::stream<boost::beast::tcp_stream>>(
+
37  boost::asio::make_strand(ioc_)))
+
38  , resolver_(boost::asio::make_strand(ioc_))
+
39  , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
+
40  , journal_(etl_.getApplication().journal("ReportingETL::ETLSource"))
+
41  , app_(etl_.getApplication())
+
42  , timer_(ioc_)
+
43 {
+
44 }
+
45 
+
46 ETLSource::ETLSource(
+
47  std::string ip,
+
48  std::string wsPort,
+
49  std::string grpcPort,
+
50  ReportingETL& etl)
+
51  : ip_(ip)
+
52  , wsPort_(wsPort)
+
53  , grpcPort_(grpcPort)
+
54  , etl_(etl)
+
55  , ioc_(etl.getApplication().getIOService())
+
56  , ws_(std::make_unique<
+
57  boost::beast::websocket::stream<boost::beast::tcp_stream>>(
+
58  boost::asio::make_strand(ioc_)))
+
59  , resolver_(boost::asio::make_strand(ioc_))
+
60  , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
+
61  , journal_(etl_.getApplication().journal("ReportingETL::ETLSource"))
+
62  , app_(etl_.getApplication())
+
63  , timer_(ioc_)
+
64 {
+
65  std::string connectionString;
+
66  try
+
67  {
+
68  connectionString =
+
69  beast::IP::Endpoint(
+
70  boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
+
71  .to_string();
+
72 
+
73  JLOG(journal_.info())
+
74  << "Using IP to connect to ETL source: " << connectionString;
+
75  }
+
76  catch (std::exception const&)
+
77  {
+
78  connectionString = "dns:" + ip_ + ":" + grpcPort_;
+
79  JLOG(journal_.info())
+
80  << "Using DNS to connect to ETL source: " << connectionString;
+
81  }
+
82  try
+
83  {
+
84  stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
+
85  grpc::CreateChannel(
+
86  connectionString, grpc::InsecureChannelCredentials()));
+
87  JLOG(journal_.info()) << "Made stub for remote = " << toString();
+
88  }
+
89  catch (std::exception const& e)
+
90  {
+
91  JLOG(journal_.error()) << "Exception while creating stub = " << e.what()
+
92  << " . Remote = " << toString();
+
93  }
+
94 }
+
95 
+
96 void
+
97 ETLSource::reconnect(boost::beast::error_code ec)
+
98 {
+
99  connected_ = false;
+
100  // These are somewhat normal errors. operation_aborted occurs on shutdown,
+
101  // when the timer is cancelled. connection_refused will occur repeatedly
+
102  // if we cannot connect to the transaction processing process
+
103  if (ec != boost::asio::error::operation_aborted &&
+
104  ec != boost::asio::error::connection_refused)
+
105  {
+
106  JLOG(journal_.error()) << __func__ << " : "
+
107  << "error code = " << ec << " - " << toString();
+
108  }
+
109  else
+
110  {
+
111  JLOG(journal_.warn()) << __func__ << " : "
+
112  << "error code = " << ec << " - " << toString();
+
113  }
+
114 
+
115  if (etl_.isStopping())
+
116  {
+
117  JLOG(journal_.debug()) << __func__ << " : " << toString()
+
118  << " - etl is stopping. aborting reconnect";
+
119  return;
+
120  }
+
121 
+
122  // exponentially increasing timeouts, with a max of 30 seconds
+
123  size_t waitTime = std::min(pow(2, numFailures_), 30.0);
+
124  numFailures_++;
+
125  timer_.expires_after(boost::asio::chrono::seconds(waitTime));
+
126  timer_.async_wait([this, fname = __func__](auto ec) {
+
127  bool startAgain = (ec != boost::asio::error::operation_aborted);
+
128  JLOG(journal_.trace()) << fname << " async_wait : ec = " << ec;
+
129  close(startAgain);
+
130  });
+
131 }
+
132 
+
133 void
+
134 ETLSource::close(bool startAgain)
+
135 {
+
136  timer_.cancel();
+
137  ioc_.post([this, startAgain]() {
+
138  if (closing_)
+
139  return;
+
140 
+
141  if (ws_->is_open())
+
142  {
+
143  // onStop() also calls close(). If the async_close is called twice,
+
144  // an assertion fails. Using closing_ makes sure async_close is only
+
145  // called once
+
146  closing_ = true;
+
147  ws_->async_close(
+
148  boost::beast::websocket::close_code::normal,
+
149  [this, startAgain, fname = __func__](auto ec) {
+
150  if (ec)
+
151  {
+
152  JLOG(journal_.error())
+
153  << fname << " async_close : "
+
154  << "error code = " << ec << " - " << toString();
+
155  }
+
156  closing_ = false;
+
157  if (startAgain)
+
158  start();
+
159  });
+
160  }
+
161  else if (startAgain)
+
162  {
+
163  start();
+
164  }
+
165  });
+
166 }
+
167 
+
168 void
+
169 ETLSource::start()
+
170 {
+
171  JLOG(journal_.trace()) << __func__ << " : " << toString();
+
172 
+
173  auto const host = ip_;
+
174  auto const port = wsPort_;
+
175 
+
176  resolver_.async_resolve(
+
177  host, port, [this](auto ec, auto results) { onResolve(ec, results); });
+
178 }
+
179 
+
180 void
+
181 ETLSource::onResolve(
+
182  boost::beast::error_code ec,
+
183  boost::asio::ip::tcp::resolver::results_type results)
+
184 {
+
185  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
+
186  << toString();
+
187  if (ec)
+
188  {
+
189  // try again
+
190  reconnect(ec);
+
191  }
+
192  else
+
193  {
+
194  boost::beast::get_lowest_layer(*ws_).expires_after(
+
195  std::chrono::seconds(30));
+
196  boost::beast::get_lowest_layer(*ws_).async_connect(
+
197  results, [this](auto ec, auto ep) { onConnect(ec, ep); });
+
198  }
+
199 }
+
200 
+
201 void
+
202 ETLSource::onConnect(
+
203  boost::beast::error_code ec,
+
204  boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
+
205 {
+
206  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
+
207  << toString();
+
208  if (ec)
+
209  {
+
210  // start over
+
211  reconnect(ec);
+
212  }
+
213  else
+
214  {
+
215  numFailures_ = 0;
+
216  // Turn off timeout on the tcp stream, because websocket stream has it's
+
217  // own timeout system
+
218  boost::beast::get_lowest_layer(*ws_).expires_never();
+
219 
+
220  // Set suggested timeout settings for the websocket
+
221  ws_->set_option(
+
222  boost::beast::websocket::stream_base::timeout::suggested(
+
223  boost::beast::role_type::client));
+
224 
+
225  // Set a decorator to change the User-Agent of the handshake
+
226  ws_->set_option(boost::beast::websocket::stream_base::decorator(
+
227  [](boost::beast::websocket::request_type& req) {
+
228  req.set(
+
229  boost::beast::http::field::user_agent,
+
230  std::string(BOOST_BEAST_VERSION_STRING) +
+
231  " websocket-client-async");
+
232  }));
+
233 
+
234  // Update the host_ string. This will provide the value of the
+
235  // Host HTTP header during the WebSocket handshake.
+
236  // See https://tools.ietf.org/html/rfc7230#section-5.4
+
237  auto host = ip_ + ':' + std::to_string(endpoint.port());
+
238  // Perform the websocket handshake
+
239  ws_->async_handshake(host, "/", [this](auto ec) { onHandshake(ec); });
+
240  }
+
241 }
+
242 
+
243 void
+
244 ETLSource::onHandshake(boost::beast::error_code ec)
+
245 {
+
246  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
+
247  << toString();
+
248  if (ec)
+
249  {
+
250  // start over
+
251  reconnect(ec);
+
252  }
+
253  else
+
254  {
+
255  Json::Value jv;
+
256  jv["command"] = "subscribe";
+
257 
+
258  jv["streams"] = Json::arrayValue;
+
259  Json::Value ledgerStream("ledger");
+
260  jv["streams"].append(ledgerStream);
+
261  Json::Value txnStream("transactions_proposed");
+
262  jv["streams"].append(txnStream);
+
263  Json::Value validationStream("validations");
+
264  jv["streams"].append(validationStream);
+
265  Json::Value manifestStream("manifests");
+
266  jv["streams"].append(manifestStream);
+
267  Json::FastWriter fastWriter;
+
268 
+
269  JLOG(journal_.trace()) << "Sending subscribe stream message";
+
270  // Send the message
+
271  ws_->async_write(
+
272  boost::asio::buffer(fastWriter.write(jv)),
+
273  [this](auto ec, size_t size) { onWrite(ec, size); });
+
274  }
+
275 }
+
276 
+
277 void
+
278 ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
+
279 {
+
280  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
+
281  << toString();
+
282  if (ec)
+
283  {
+
284  // start over
+
285  reconnect(ec);
+
286  }
+
287  else
+
288  {
+
289  ws_->async_read(
+
290  readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
+
291  }
+
292 }
+
293 
+
294 void
+
295 ETLSource::onRead(boost::beast::error_code ec, size_t size)
+
296 {
+
297  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
+
298  << toString();
+
299  // if error or error reading message, start over
+
300  if (ec)
+
301  {
+
302  reconnect(ec);
+
303  }
+
304  else
+
305  {
+
306  handleMessage();
+
307  boost::beast::flat_buffer buffer;
+
308  swap(readBuffer_, buffer);
+
309 
+
310  JLOG(journal_.trace())
+
311  << __func__ << " : calling async_read - " << toString();
+
312  ws_->async_read(
+
313  readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
+
314  }
+
315 }
+
316 
+
317 bool
+
318 ETLSource::handleMessage()
+
319 {
+
320  JLOG(journal_.trace()) << __func__ << " : " << toString();
+
321 
+
322  setLastMsgTime();
+
323  connected_ = true;
+
324  try
+
325  {
+
326  Json::Value response;
+
327  Json::Reader reader;
+
328  if (!reader.parse(
+
329  static_cast<char const*>(readBuffer_.data().data()), response))
+
330  {
+
331  JLOG(journal_.error())
+
332  << __func__ << " : "
+
333  << "Error parsing stream message."
+
334  << " Message = " << readBuffer_.data().data();
+
335  return false;
+
336  }
+
337 
+
338  uint32_t ledgerIndex = 0;
+
339  if (response.isMember("result"))
+
340  {
+
341  if (response["result"].isMember(jss::ledger_index))
+
342  {
+
343  ledgerIndex = response["result"][jss::ledger_index].asUInt();
+
344  }
+
345  if (response[jss::result].isMember(jss::validated_ledgers))
+
346  {
+
347  setValidatedRange(
+
348  response[jss::result][jss::validated_ledgers].asString());
+
349  }
+
350  JLOG(journal_.debug())
+
351  << __func__ << " : "
+
352  << "Received a message on ledger "
+
353  << " subscription stream. Message : "
+
354  << response.toStyledString() << " - " << toString();
+
355  }
+
356  else
+
357  {
+
358  if (etl_.getETLLoadBalancer().shouldPropagateStream(this))
+
359  {
+
360  if (response.isMember(jss::transaction))
+
361  {
+
362  etl_.getApplication().getOPs().forwardProposedTransaction(
+
363  response);
+
364  }
+
365  else if (
+
366  response.isMember("type") &&
+
367  response["type"] == "validationReceived")
+
368  {
+
369  etl_.getApplication().getOPs().forwardValidation(response);
+
370  }
+
371  else if (
+
372  response.isMember("type") &&
+
373  response["type"] == "manifestReceived")
+
374  {
+
375  etl_.getApplication().getOPs().forwardManifest(response);
+
376  }
+
377  }
+
378 
+
379  if (response.isMember("type") && response["type"] == "ledgerClosed")
+
380  {
+
381  JLOG(journal_.debug())
+
382  << __func__ << " : "
+
383  << "Received a message on ledger "
+
384  << " subscription stream. Message : "
+
385  << response.toStyledString() << " - " << toString();
+
386  if (response.isMember(jss::ledger_index))
+
387  {
+
388  ledgerIndex = response[jss::ledger_index].asUInt();
+
389  }
+
390  if (response.isMember(jss::validated_ledgers))
+
391  {
+
392  setValidatedRange(
+
393  response[jss::validated_ledgers].asString());
+
394  }
+
395  }
+
396  }
+
397 
+
398  if (ledgerIndex != 0)
+
399  {
+
400  JLOG(journal_.trace())
+
401  << __func__ << " : "
+
402  << "Pushing ledger sequence = " << ledgerIndex << " - "
+
403  << toString();
+
404  networkValidatedLedgers_.push(ledgerIndex);
+
405  }
+
406  return true;
+
407  }
+
408  catch (std::exception const& e)
+
409  {
+
410  JLOG(journal_.error()) << "Exception in handleMessage : " << e.what();
+
411  return false;
+
412  }
+
413 }
+
414 
+
415 class AsyncCallData
+
416 {
+
417  std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> cur_;
+
418  std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> next_;
+
419 
+
420  org::xrpl::rpc::v1::GetLedgerDataRequest request_;
+
421  std::unique_ptr<grpc::ClientContext> context_;
+
422 
+
423  grpc::Status status_;
+
424 
+
425  unsigned char nextPrefix_;
+
426 
+
427  beast::Journal journal_;
+
428 
+
429 public:
+
430  AsyncCallData(
+
431  uint256& marker,
+
432  std::optional<uint256> nextMarker,
+
433  uint32_t seq,
+
434  beast::Journal& j)
+
435  : journal_(j)
+
436  {
+
437  request_.mutable_ledger()->set_sequence(seq);
+
438  if (marker.isNonZero())
+
439  {
+
440  request_.set_marker(marker.data(), marker.size());
+
441  }
+
442  request_.set_user("ETL");
+
443  nextPrefix_ = 0x00;
+
444  if (nextMarker)
+
445  nextPrefix_ = nextMarker->data()[0];
+
446 
+
447  unsigned char prefix = marker.data()[0];
+
448 
+
449  JLOG(journal_.debug())
+
450  << "Setting up AsyncCallData. marker = " << strHex(marker)
+
451  << " . prefix = " << strHex(std::string(1, prefix))
+
452  << " . nextPrefix_ = " << strHex(std::string(1, nextPrefix_));
+
453 
+
454  assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
+
455 
+
456  cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
+
457 
+
458  next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
+
459 
+
460  context_ = std::make_unique<grpc::ClientContext>();
+
461  }
+
462 
+
463  enum class CallStatus { MORE, DONE, ERRORED };
+
464  CallStatus
+
465  process(
+
466  std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
+
467  grpc::CompletionQueue& cq,
+
468  ThreadSafeQueue<std::shared_ptr<SLE>>& queue,
+
469  bool abort = false)
+
470  {
+
471  JLOG(journal_.debug()) << "Processing calldata";
+
472  if (abort)
+
473  {
+
474  JLOG(journal_.error()) << "AsyncCallData aborted";
+
475  return CallStatus::ERRORED;
+
476  }
+
477  if (!status_.ok())
+
478  {
+
479  JLOG(journal_.debug()) << "AsyncCallData status_ not ok: "
+
480  << " code = " << status_.error_code()
+
481  << " message = " << status_.error_message();
+
482  return CallStatus::ERRORED;
+
483  }
+
484  if (!next_->is_unlimited())
+
485  {
+
486  JLOG(journal_.warn())
+
487  << "AsyncCallData is_unlimited is false. Make sure "
+
488  "secure_gateway is set correctly at the ETL source";
+
489  assert(false);
+
490  }
+
491 
+
492  std::swap(cur_, next_);
+
493 
+
494  bool more = true;
+
495 
+
496  // if no marker returned, we are done
+
497  if (cur_->marker().size() == 0)
+
498  more = false;
+
499 
+
500  // if returned marker is greater than our end, we are done
+
501  unsigned char prefix = cur_->marker()[0];
+
502  if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
+
503  more = false;
+
504 
+
505  // if we are not done, make the next async call
+
506  if (more)
+
507  {
+
508  request_.set_marker(std::move(cur_->marker()));
+
509  call(stub, cq);
+
510  }
+
511 
+
512  for (auto& obj : cur_->ledger_objects().objects())
+
513  {
+
514  auto key = uint256::fromVoidChecked(obj.key());
+
515  if (!key)
+
516  throw std::runtime_error("Received malformed object ID");
+
517 
+
518  auto& data = obj.data();
+
519 
+
520  SerialIter it{data.data(), data.size()};
+
521  std::shared_ptr<SLE> sle = std::make_shared<SLE>(it, *key);
+
522 
+
523  queue.push(sle);
+
524  }
+
525 
+
526  return more ? CallStatus::MORE : CallStatus::DONE;
+
527  }
+
528 
+
529  void
+
530  call(
+
531  std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
+
532  grpc::CompletionQueue& cq)
+
533  {
+
534  context_ = std::make_unique<grpc::ClientContext>();
+
535 
+
536  std::unique_ptr<grpc::ClientAsyncResponseReader<
+
537  org::xrpl::rpc::v1::GetLedgerDataResponse>>
+
538  rpc(stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq));
+
539 
+
540  rpc->StartCall();
+
541 
+
542  rpc->Finish(next_.get(), &status_, this);
+
543  }
+
544 
+
545  std::string
+
546  getMarkerPrefix()
+
547  {
+
548  if (next_->marker().size() == 0)
+
549  return "";
+
550  else
+
551  return strHex(std::string{next_->marker().data()[0]});
+
552  }
+
553 };
+
554 
+
555 bool
+
556 ETLSource::loadInitialLedger(
+
557  uint32_t sequence,
+
558  ThreadSafeQueue<std::shared_ptr<SLE>>& writeQueue)
+
559 {
+
560  if (!stub_)
+
561  return false;
+
562 
+
563  grpc::CompletionQueue cq;
+
564 
+
565  void* tag;
+
566 
+
567  bool ok = false;
+
568 
+
569  std::vector<AsyncCallData> calls;
+
570  std::vector<uint256> markers{getMarkers(etl_.getNumMarkers())};
+
571 
+
572  for (size_t i = 0; i < markers.size(); ++i)
+
573  {
+
574  std::optional<uint256> nextMarker;
+
575  if (i + 1 < markers.size())
+
576  nextMarker = markers[i + 1];
+
577  calls.emplace_back(markers[i], nextMarker, sequence, journal_);
+
578  }
+
579 
+
580  JLOG(journal_.debug()) << "Starting data download for ledger " << sequence
+
581  << ". Using source = " << toString();
+
582 
+
583  for (auto& c : calls)
+
584  c.call(stub_, cq);
+
585 
+
586  size_t numFinished = 0;
+
587  bool abort = false;
+
588  while (numFinished < calls.size() && !etl_.isStopping() &&
+
589  cq.Next(&tag, &ok))
+
590  {
+
591  assert(tag);
+
592 
+
593  auto ptr = static_cast<AsyncCallData*>(tag);
+
594 
+
595  if (!ok)
+
596  {
+
597  JLOG(journal_.error()) << "loadInitialLedger - ok is false";
+
598  return false;
+
599  // handle cancelled
+
600  }
+
601  else
+
602  {
+
603  JLOG(journal_.debug())
+
604  << "Marker prefix = " << ptr->getMarkerPrefix();
+
605  auto result = ptr->process(stub_, cq, writeQueue, abort);
+
606  if (result != AsyncCallData::CallStatus::MORE)
+
607  {
+
608  numFinished++;
+
609  JLOG(journal_.debug())
+
610  << "Finished a marker. "
+
611  << "Current number of finished = " << numFinished;
+
612  }
+
613  if (result == AsyncCallData::CallStatus::ERRORED)
+
614  {
+
615  abort = true;
+
616  }
+
617  }
+
618  }
+
619  return !abort;
+
620 }
+
621 
+
622 std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
+
623 ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
+
624 {
+
625  org::xrpl::rpc::v1::GetLedgerResponse response;
+
626  if (!stub_)
+
627  return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
+
628 
+
629  // ledger header with txns and metadata
+
630  org::xrpl::rpc::v1::GetLedgerRequest request;
+
631  grpc::ClientContext context;
+
632  request.mutable_ledger()->set_sequence(ledgerSequence);
+
633  request.set_transactions(true);
+
634  request.set_expand(true);
+
635  request.set_get_objects(getObjects);
+
636  request.set_user("ETL");
+
637  grpc::Status status = stub_->GetLedger(&context, request, &response);
+
638  if (status.ok() && !response.is_unlimited())
+
639  {
+
640  JLOG(journal_.warn()) << "ETLSource::fetchLedger - is_unlimited is "
+
641  "false. Make sure secure_gateway is set "
+
642  "correctly on the ETL source. source = "
+
643  << toString();
+
644  assert(false);
+
645  }
+
646  return {status, std::move(response)};
+
647 }
+
648 
+
649 ETLLoadBalancer::ETLLoadBalancer(ReportingETL& etl)
+
650  : etl_(etl)
+
651  , journal_(etl_.getApplication().journal("ReportingETL::LoadBalancer"))
+
652 {
+
653 }
+
654 
+
655 void
+
656 ETLLoadBalancer::add(
+
657  std::string& host,
+
658  std::string& websocketPort,
+
659  std::string& grpcPort)
+
660 {
+
661  std::unique_ptr<ETLSource> ptr =
+
662  std::make_unique<ETLSource>(host, websocketPort, grpcPort, etl_);
+
663  sources_.push_back(std::move(ptr));
+
664  JLOG(journal_.info()) << __func__ << " : added etl source - "
+
665  << sources_.back()->toString();
+
666 }
+
667 
+
668 void
+
669 ETLLoadBalancer::add(std::string& host, std::string& websocketPort)
+
670 {
+
671  std::unique_ptr<ETLSource> ptr =
+
672  std::make_unique<ETLSource>(host, websocketPort, etl_);
+
673  sources_.push_back(std::move(ptr));
+
674  JLOG(journal_.info()) << __func__ << " : added etl source - "
+
675  << sources_.back()->toString();
+
676 }
+
677 
+
678 void
+
679 ETLLoadBalancer::loadInitialLedger(
+
680  uint32_t sequence,
+
681  ThreadSafeQueue<std::shared_ptr<SLE>>& writeQueue)
+
682 {
+
683  execute(
+
684  [this, &sequence, &writeQueue](auto& source) {
+
685  bool res = source->loadInitialLedger(sequence, writeQueue);
+
686  if (!res)
+
687  {
+
688  JLOG(journal_.error()) << "Failed to download initial ledger. "
+
689  << " Sequence = " << sequence
+
690  << " source = " << source->toString();
+
691  }
+
692  return res;
+
693  },
+
694  sequence);
+
695 }
+
696 
+
697 std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
+
698 ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects)
+
699 {
+
700  org::xrpl::rpc::v1::GetLedgerResponse response;
+
701  bool success = execute(
+
702  [&response, ledgerSequence, getObjects, this](auto& source) {
+
703  auto [status, data] =
+
704  source->fetchLedger(ledgerSequence, getObjects);
+
705  response = std::move(data);
+
706  if (status.ok() && response.validated())
+
707  {
+
708  JLOG(journal_.info())
+
709  << "Successfully fetched ledger = " << ledgerSequence
+
710  << " from source = " << source->toString();
+
711  return true;
+
712  }
+
713  else
+
714  {
+
715  JLOG(journal_.warn())
+
716  << "Error getting ledger = " << ledgerSequence
+
717  << " Reply : " << response.DebugString()
+
718  << " error_code : " << status.error_code()
+
719  << " error_msg : " << status.error_message()
+
720  << " source = " << source->toString();
+
721  return false;
+
722  }
+
723  },
+
724  ledgerSequence);
+
725  if (success)
+
726  return response;
+
727  else
+
728  return {};
+
729 }
+
730 
+
731 std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
+
732 ETLLoadBalancer::getP2pForwardingStub() const
+
733 {
+
734  if (sources_.size() == 0)
+
735  return nullptr;
+
736  srand((unsigned)time(0));
+
737  auto sourceIdx = rand() % sources_.size();
+
738  auto numAttempts = 0;
+
739  while (numAttempts < sources_.size())
+
740  {
+
741  auto stub = sources_[sourceIdx]->getP2pForwardingStub();
+
742  if (!stub)
+
743  {
+
744  sourceIdx = (sourceIdx + 1) % sources_.size();
+
745  ++numAttempts;
+
746  continue;
+
747  }
+
748  return stub;
+
749  }
+
750  return nullptr;
+
751 }
+
752 
+
753 Json::Value
+
754 ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const
+
755 {
+
756  Json::Value res;
+
757  if (sources_.size() == 0)
+
758  return res;
+
759  srand((unsigned)time(0));
+
760  auto sourceIdx = rand() % sources_.size();
+
761  auto numAttempts = 0;
+
762 
+
763  auto mostRecent = etl_.getNetworkValidatedLedgers().tryGetMostRecent();
+
764  while (numAttempts < sources_.size())
+
765  {
+
766  auto increment = [&]() {
+
767  sourceIdx = (sourceIdx + 1) % sources_.size();
+
768  ++numAttempts;
+
769  };
+
770  auto& src = sources_[sourceIdx];
+
771  if (mostRecent && !src->hasLedger(*mostRecent))
+
772  {
+
773  increment();
+
774  continue;
+
775  }
+
776  res = src->forwardToP2p(context);
+
777  if (!res.isMember("forwarded") || res["forwarded"] != true)
+
778  {
+
779  increment();
+
780  continue;
+
781  }
+
782  return res;
+
783  }
+
784  RPC::Status err = {rpcFAILED_TO_FORWARD};
+
785  err.inject(res);
+
786  return res;
+
787 }
+
788 
+
789 std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
+
790 ETLSource::getP2pForwardingStub() const
+
791 {
+
792  if (!connected_)
+
793  return nullptr;
+
794  try
+
795  {
+
796  return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
+
797  grpc::CreateChannel(
+
798  beast::IP::Endpoint(
+
799  boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
+
800  .to_string(),
+
801  grpc::InsecureChannelCredentials()));
+
802  }
+
803  catch (std::exception const&)
+
804  {
+
805  JLOG(journal_.error()) << "Failed to create grpc stub";
+
806  return nullptr;
+
807  }
+
808 }
+
809 
+
810 Json::Value
+
811 ETLSource::forwardToP2p(RPC::JsonContext& context) const
+
812 {
+
813  JLOG(journal_.debug()) << "Attempting to forward request to tx. "
+
814  << "request = " << context.params.toStyledString();
+
815 
+
816  Json::Value response;
+
817  if (!connected_)
+
818  {
+
819  JLOG(journal_.error())
+
820  << "Attempted to proxy but failed to connect to tx";
+
821  return response;
+
822  }
+
823  namespace beast = boost::beast; // from <boost/beast.hpp>
+
824  namespace http = beast::http; // from <boost/beast/http.hpp>
+
825  namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
+
826  namespace net = boost::asio; // from <boost/asio.hpp>
+
827  using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
+
828  Json::Value& request = context.params;
+
829  try
+
830  {
+
831  // The io_context is required for all I/O
+
832  net::io_context ioc;
+
833 
+
834  // These objects perform our I/O
+
835  tcp::resolver resolver{ioc};
+
836 
+
837  JLOG(journal_.debug()) << "Creating websocket";
+
838  auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
+
839 
+
840  // Look up the domain name
+
841  auto const results = resolver.resolve(ip_, wsPort_);
+
842 
+
843  JLOG(journal_.debug()) << "Connecting websocket";
+
844  // Make the connection on the IP address we get from a lookup
+
845  net::connect(ws->next_layer(), results.begin(), results.end());
+
846 
+
847  // Set a decorator to change the User-Agent of the handshake
+
848  // and to tell rippled to charge the client IP for RPC
+
849  // resources. See "secure_gateway" in
+
850  // https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
+
851  ws->set_option(websocket::stream_base::decorator(
+
852  [&context](websocket::request_type& req) {
+
853  req.set(
+
854  http::field::user_agent,
+
855  std::string(BOOST_BEAST_VERSION_STRING) +
+
856  " websocket-client-coro");
+
857  req.set(
+
858  http::field::forwarded,
+
859  "for=" + context.consumer.to_string());
+
860  }));
+
861  JLOG(journal_.debug()) << "client ip: " << context.consumer.to_string();
+
862 
+
863  JLOG(journal_.debug()) << "Performing websocket handshake";
+
864  // Perform the websocket handshake
+
865  ws->handshake(ip_, "/");
+
866 
+
867  Json::FastWriter fastWriter;
+
868 
+
869  JLOG(journal_.debug()) << "Sending request";
+
870  // Send the message
+
871  ws->write(net::buffer(fastWriter.write(request)));
+
872 
+
873  beast::flat_buffer buffer;
+
874  ws->read(buffer);
+
875 
+
876  Json::Reader reader;
+
877  if (!reader.parse(
+
878  static_cast<char const*>(buffer.data().data()), response))
+
879  {
+
880  JLOG(journal_.error()) << "Error parsing response";
+
881  response[jss::error] = "Error parsing response from tx";
+
882  }
+
883  JLOG(journal_.debug()) << "Successfully forward request";
+
884 
+
885  response["forwarded"] = true;
+
886  return response;
+
887  }
+
888  catch (std::exception const& e)
+
889  {
+
890  JLOG(journal_.error()) << "Encountered exception : " << e.what();
+
891  return response;
+
892  }
+
893 }
+
894 
+
895 template <class Func>
+
896 bool
+
897 ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
+
898 {
+
899  srand((unsigned)time(0));
+
900  auto sourceIdx = rand() % sources_.size();
+
901  auto numAttempts = 0;
+
902 
+
903  while (!etl_.isStopping())
+
904  {
+
905  auto& source = sources_[sourceIdx];
+
906 
+
907  JLOG(journal_.debug())
+
908  << __func__ << " : "
+
909  << "Attempting to execute func. ledger sequence = "
+
910  << ledgerSequence << " - source = " << source->toString();
+
911  if (source->hasLedger(ledgerSequence))
+
912  {
+
913  bool res = f(source);
+
914  if (res)
+
915  {
+
916  JLOG(journal_.debug())
+
917  << __func__ << " : "
+
918  << "Successfully executed func at source = "
+
919  << source->toString()
+
920  << " - ledger sequence = " << ledgerSequence;
+
921  break;
+
922  }
+
923  else
+
924  {
+
925  JLOG(journal_.warn())
+
926  << __func__ << " : "
+
927  << "Failed to execute func at source = "
+
928  << source->toString()
+
929  << " - ledger sequence = " << ledgerSequence;
+
930  }
+
931  }
+
932  else
+
933  {
+
934  JLOG(journal_.warn())
+
935  << __func__ << " : "
+
936  << "Ledger not present at source = " << source->toString()
+
937  << " - ledger sequence = " << ledgerSequence;
+
938  }
+
939  sourceIdx = (sourceIdx + 1) % sources_.size();
+
940  numAttempts++;
+
941  if (numAttempts % sources_.size() == 0)
+
942  {
+
943  // If another process loaded the ledger into the database, we can
+
944  // abort trying to fetch the ledger from a transaction processing
+
945  // process
+
946  if (etl_.getApplication().getLedgerMaster().getLedgerBySeq(
+
947  ledgerSequence))
+
948  {
+
949  JLOG(journal_.warn())
+
950  << __func__ << " : "
+
951  << "Error executing function. "
+
952  << " Tried all sources, but ledger was found in db."
+
953  << " Sequence = " << ledgerSequence;
+
954  return false;
+
955  }
+
956  JLOG(journal_.error())
+
957  << __func__ << " : "
+
958  << "Error executing function "
+
959  << " - ledger sequence = " << ledgerSequence
+
960  << " - Tried all sources. Sleeping and trying again";
+
961  std::this_thread::sleep_for(std::chrono::seconds(2));
+
962  }
+
963  }
+
964  return !etl_.isStopping();
+
965 }
+
966 
+
967 void
+
968 ETLLoadBalancer::start()
+
969 {
+
970  for (auto& source : sources_)
+
971  source->start();
+
972 }
+
973 
+
974 void
+
975 ETLLoadBalancer::stop()
+
976 {
+
977  for (auto& source : sources_)
+
978  source->stop();
+
979 }
+
980 
+
981 } // namespace ripple
-
std::unique_ptr< grpc::ClientContext > context_
Definition: ETLSource.cpp:422
+
std::unique_ptr< grpc::ClientContext > context_
Definition: ETLSource.cpp:421
Definition: Context.h:53
T sleep_for(T... args)
STL class.
STL class.
-
CallStatus
Definition: ETLSource.cpp:464
+
CallStatus
Definition: ETLSource.cpp:463
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:115
STL class.
bool isNonZero() const
Definition: base_uint.h:537
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
-
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a randomly selected p2p node.
Definition: ETLSource.cpp:755
-
void start()
Setup all of the ETL sources and subscribe to the necessary streams.
Definition: ETLSource.cpp:969
+
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a randomly selected p2p node.
Definition: ETLSource.cpp:754
+
void start()
Setup all of the ETL sources and subscribe to the necessary streams.
Definition: ETLSource.cpp:968
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
std::string to_string() const
Returns a string representing the endpoint.
Definition: IPEndpoint.cpp:57
STL class.
ReportingETL & etl_
Definition: ETLSource.h:54
-
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
Definition: ETLSource.cpp:699
+
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
Definition: ETLSource.cpp:698
Stream warn() const
Definition: Journal.h:327
T get(T... args)
std::string ip_
Definition: ETLSource.h:48
-
grpc::Status status_
Definition: ETLSource.cpp:424
-
ETLSource(std::string ip, std::string wsPort, ReportingETL &etl)
Create ETL source without gRPC endpoint Fetch ledger and load initial ledger will fail for this sourc...
Definition: ETLSource.cpp:31
-
beast::Journal journal_
Definition: ETLSource.cpp:428
+
grpc::Status status_
Definition: ETLSource.cpp:423
+
ETLSource(std::string ip, std::string wsPort, ReportingETL &etl)
Create ETL source without gRPC endpoint Fetch ledger and load initial ledger will fail for this sourc...
Definition: ETLSource.cpp:30
+
beast::Journal journal_
Definition: ETLSource.cpp:427
Definition: IPAddress.h:103
std::string toStyledString() const
Application & getApplication()
Definition: ReportingETL.h:298
Unserialize a JSON document into a Value.
Definition: json_reader.h:36
-
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Randomly select a p2p node to forward a gRPC request to.
Definition: ETLSource.cpp:733
+
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Randomly select a p2p node to forward a gRPC request to.
Definition: ETLSource.cpp:732
pointer data()
Definition: base_uint.h:122
constexpr static std::size_t size()
Definition: base_uint.h:519
-
std::string getMarkerPrefix()
Definition: ETLSource.cpp:547
+
std::string getMarkerPrefix()
Definition: ETLSource.cpp:546
std::string to_string() const
Return a human readable string uniquely identifying this consumer.
Definition: Consumer.cpp:71
T stoi(T... args)
@@ -1094,19 +1093,19 @@ $(function() {
std::atomic_bool connected_
Definition: ETLSource.h:83
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > stub_
Definition: ETLSource.h:59
virtual LedgerMaster & getLedgerMaster()=0
-
void stop()
Definition: ETLSource.cpp:976
-
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
Definition: ETLSource.cpp:680
-
unsigned char nextPrefix_
Definition: ETLSource.cpp:426
-
void reconnect(boost::beast::error_code ec)
Attempt to reconnect to the ETL source.
Definition: ETLSource.cpp:98
+
void stop()
Definition: ETLSource.cpp:975
+
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
Definition: ETLSource.cpp:679
+
unsigned char nextPrefix_
Definition: ETLSource.cpp:425
+
void reconnect(boost::beast::error_code ec)
Attempt to reconnect to the ETL source.
Definition: ETLSource.cpp:97
NetworkValidatedLedgers & getNetworkValidatedLedgers()
Definition: ReportingETL.h:276
-
AsyncCallData(uint256 &marker, std::optional< uint256 > nextMarker, uint32_t seq, beast::Journal &j)
Definition: ETLSource.cpp:431
-
bool execute(Func f, uint32_t ledgerSequence)
f is a function that takes an ETLSource as an argument and returns a bool.
Definition: ETLSource.cpp:898
+
AsyncCallData(uint256 &marker, std::optional< uint256 > nextMarker, uint32_t seq, beast::Journal &j)
Definition: ETLSource.cpp:430
+
bool execute(Func f, uint32_t ledgerSequence)
f is a function that takes an ETLSource as an argument and returns a bool.
Definition: ETLSource.cpp:897
T to_string(T... args)
-
std::unique_ptr< org::xrpl::rpc::v1::GetLedgerDataResponse > cur_
Definition: ETLSource.cpp:418
+
std::unique_ptr< org::xrpl::rpc::v1::GetLedgerDataResponse > cur_
Definition: ETLSource.cpp:417
Definition: Overlay.h:41
Stream error() const
Definition: Journal.h:333
Stream info() const
Definition: Journal.h:321
-
CallStatus process(std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > &stub, grpc::CompletionQueue &cq, ThreadSafeQueue< std::shared_ptr< SLE >> &queue, bool abort=false)
Definition: ETLSource.cpp:466
+
CallStatus process(std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > &stub, grpc::CompletionQueue &cq, ThreadSafeQueue< std::shared_ptr< SLE >> &queue, bool abort=false)
Definition: ETLSource.cpp:465
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition: ReportingETL.h:70
std::string grpcPort_
Definition: ETLSource.h:52
@@ -1114,22 +1113,22 @@ $(function() {
Definition: Serializer.h:311
bool isMember(const char *key) const
Return true if the object has a member named key.
Definition: json_value.cpp:932
A generic endpoint for log messages.
Definition: Journal.h:58
-
Definition: ETLSource.cpp:416
+
Definition: ETLSource.cpp:415
std::optional< uint32_t > tryGetMostRecent() const
Get most recently validated sequence.
Definition: ETLHelpers.h:78
std::string write(const Value &root) override
-
void call(std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > &stub, grpc::CompletionQueue &cq)
Definition: ETLSource.cpp:531
+
void call(std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > &stub, grpc::CompletionQueue &cq)
Definition: ETLSource.cpp:530
void inject(Object &object) const
Apply the Status to a JsonObject.
Definition: Status.h:115
Status represents the results of an operation that might fail.
Definition: Status.h:39
std::vector< std::unique_ptr< ETLSource > > sources_
Definition: ETLSource.h:322
beast::Journal journal_
Definition: ETLSource.h:73
-
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Get grpc stub to forward requests to p2p node.
Definition: ETLSource.cpp:791
+
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Get grpc stub to forward requests to p2p node.
Definition: ETLSource.cpp:790
T swap(T... args)
T min(T... args)
size_t numFailures_
Definition: ETLSource.h:79
boost::asio::io_context & ioc_
Definition: ETLSource.h:57
T emplace_back(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
-
void start()
Begin sequence of operations to connect to the ETL source and subscribe to ledgers and transactions_p...
Definition: ETLSource.cpp:170
+
void start()
Begin sequence of operations to connect to the ETL source and subscribe to ledgers and transactions_p...
Definition: ETLSource.cpp:169
std::unique_ptr< boost::beast::websocket::stream< boost::beast::tcp_stream > > ws_
Definition: ETLSource.h:62
STL namespace.
std::atomic_bool closing_
Definition: ETLSource.h:81
@@ -1138,26 +1137,26 @@ $(function() {
Outputs a Value in JSON format without formatting (not human friendly).
Definition: json_writer.h:52
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:74
@ rpcFAILED_TO_FORWARD
Definition: ErrorCodes.h:140
-
std::unique_ptr< org::xrpl::rpc::v1::GetLedgerDataResponse > next_
Definition: ETLSource.cpp:419
+
std::unique_ptr< org::xrpl::rpc::v1::GetLedgerDataResponse > next_
Definition: ETLSource.cpp:418
std::string toString() const
Definition: ETLSource.h:222
Stream debug() const
Definition: Journal.h:315
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
A version-independent IP address and port combination.
Definition: IPEndpoint.h:38
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:30
-
void close(bool startAgain)
Close the websocket.
Definition: ETLSource.cpp:135
-
org::xrpl::rpc::v1::GetLedgerDataRequest request_
Definition: ETLSource.cpp:421
+
void close(bool startAgain)
Close the websocket.
Definition: ETLSource.cpp:134
+
org::xrpl::rpc::v1::GetLedgerDataRequest request_
Definition: ETLSource.cpp:420
boost::asio::steady_timer timer_
Definition: ETLSource.h:96
Json::Value params
Definition: Context.h:64
bool isStopping() const
Definition: ReportingETL.h:282
T what(T... args)
-
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a p2p node.
Definition: ETLSource.cpp:812
+
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a p2p node.
Definition: ETLSource.cpp:811
std::vector< uint256 > getMarkers(size_t numMarkers)
Parititions the uint256 keyspace into numMarkers partitions, each of equal size.
Definition: ETLHelpers.h:177
Represents a JSON value.
Definition: json_value.h:145
std::string wsPort_
Definition: ETLSource.h:50
Definition: base_uint.h:641
-
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.
Definition: ETLSource.cpp:657
+
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.
Definition: ETLSource.cpp:656