rippled
Loading...
Searching...
No Matches
LoadManager.cpp
1#include <xrpld/app/main/Application.h>
2#include <xrpld/app/main/LoadManager.h>
3#include <xrpld/app/misc/LoadFeeTrack.h>
4#include <xrpld/app/misc/NetworkOPs.h>
5
6#include <xrpl/beast/core/CurrentThreadName.h>
7#include <xrpl/json/to_string.h>
8
9#include <memory>
10#include <mutex>
11#include <thread>
12
13namespace xrpl {
14
16 : app_(app), journal_(journal), lastHeartbeat_(), armed_(false)
17{
18}
19
21{
22 try
23 {
24 stop();
25 }
26 catch (std::exception const& ex)
27 {
28 // Swallow the exception in a destructor.
29 JLOG(journal_.warn()) << "std::exception in ~LoadManager. " << ex.what();
30 }
31}
32
33//------------------------------------------------------------------------------
34
35void
42
43void
50
51//------------------------------------------------------------------------------
52
53void
55{
56 JLOG(journal_.debug()) << "Starting";
57 XRPL_ASSERT(!thread_.joinable(), "xrpl::LoadManager::start : thread not joinable");
58
60}
61
62void
64{
65 {
67 stop_ = true;
68 // There is at most one thread waiting on this condition.
70 }
71 if (thread_.joinable())
72 {
73 JLOG(journal_.debug()) << "Stopping";
74 thread_.join();
75 }
76}
77
78//------------------------------------------------------------------------------
79
80void
82{
83 beast::setCurrentThreadName("LoadManager");
84
85 using namespace std::chrono_literals;
86 using clock_type = std::chrono::steady_clock;
87
88 auto t = clock_type::now();
89
90 while (true)
91 {
92 t += 1s;
93
95 if (cv_.wait_until(sl, t, [this] { return stop_; }))
96 break;
97
98 // Copy out shared data under a lock. Use copies outside lock.
99 auto const lastHeartbeat = lastHeartbeat_;
100 auto const armed = armed_;
101 sl.unlock();
102
103 // Measure the amount of time we have been stalled, in seconds.
104 using namespace std::chrono;
105 auto const timeSpentStalled = duration_cast<seconds>(steady_clock::now() - lastHeartbeat);
106
107 constexpr auto reportingIntervalSeconds = 10s;
108 constexpr auto stallFatalLogMessageTimeLimit = 90s;
109 constexpr auto stallLogicErrorTimeLimit = 600s;
110
111 if (armed && (timeSpentStalled >= reportingIntervalSeconds))
112 {
113 // Report the stalled condition every reportingIntervalSeconds
114 if ((timeSpentStalled % reportingIntervalSeconds) == 0s)
115 {
116 if (timeSpentStalled < stallFatalLogMessageTimeLimit)
117 {
118 JLOG(journal_.warn()) << "Server stalled for " << timeSpentStalled.count() << " seconds.";
119
121 {
122 JLOG(journal_.warn()) << "JobQueue: " << app_.getJobQueue().getJson(0);
123 }
124 }
125 else
126 {
127 JLOG(journal_.fatal()) << "Server stalled for " << timeSpentStalled.count() << " seconds.";
128 JLOG(journal_.fatal()) << "JobQueue: " << app_.getJobQueue().getJson(0);
129 }
130 }
131
132 // If we go over the stallLogicErrorTimeLimit spent stalled, it
133 // means that the stall resolution code has failed, which qualifies
134 // as a LogicError
135 if (timeSpentStalled >= stallLogicErrorTimeLimit)
136 {
137 JLOG(journal_.fatal()) << "LogicError: Fatal server stall detected. Stalled time: "
138 << timeSpentStalled.count() << "s";
139 JLOG(journal_.fatal()) << "JobQueue: " << app_.getJobQueue().getJson(0);
140 LogicError("Fatal server stall detected");
141 }
142 }
143 }
144
145 bool change = false;
147 {
148 JLOG(journal_.info()) << "Raising local fee (JQ overload): " << app_.getJobQueue().getJson(0);
149 change = app_.getFeeTrack().raiseLocalFee();
150 }
151 else
152 {
153 change = app_.getFeeTrack().lowerLocalFee();
154 }
155
156 if (change)
157 {
158 // VFALCO TODO replace this with a Listener / observer and
159 // subscribe in NetworkOPs or Application.
161 }
162}
163
164//------------------------------------------------------------------------------
165
168{
169 return std::unique_ptr<LoadManager>{new LoadManager{app, journal}};
170}
171
172} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:325
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream warn() const
Definition Journal.h:313
virtual LoadFeeTrack & getFeeTrack()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
bool isOverloaded()
Definition JobQueue.cpp:166
Json::Value getJson(int c=0)
Definition JobQueue.cpp:172
Manages load sources.
Definition LoadManager.h:27
std::chrono::steady_clock::time_point lastHeartbeat_
Definition LoadManager.h:87
std::condition_variable cv_
Definition LoadManager.h:83
beast::Journal const journal_
Definition LoadManager.h:79
std::thread thread_
Definition LoadManager.h:81
~LoadManager()
Destroy the manager.
std::mutex mutex_
Definition LoadManager.h:82
void heartbeat()
Reset the stall detection timer.
void activateStallDetector()
Turn on stall detection.
LoadManager()=delete
Application & app_
Definition LoadManager.h:78
virtual void reportFeeChange()=0
T join(T... args)
T joinable(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
std::unique_ptr< LoadManager > make_LoadManager(Application &app, beast::Journal journal)
T unlock(T... args)
T what(T... args)