rippled
Loading...
Searching...
No Matches
LoadManager.cpp
1#include <xrpld/app/main/Application.h>
2#include <xrpld/app/main/LoadManager.h>
3#include <xrpld/app/misc/LoadFeeTrack.h>
4#include <xrpld/app/misc/NetworkOPs.h>
5
6#include <xrpl/beast/core/CurrentThreadName.h>
7#include <xrpl/json/to_string.h>
8
9#include <memory>
10#include <mutex>
11#include <thread>
12
13namespace ripple {
14
16 : app_(app), journal_(journal), lastHeartbeat_(), armed_(false)
17{
18}
19
21{
22 try
23 {
24 stop();
25 }
26 catch (std::exception const& ex)
27 {
28 // Swallow the exception in a destructor.
29 JLOG(journal_.warn())
30 << "std::exception in ~LoadManager. " << ex.what();
31 }
32}
33
34//------------------------------------------------------------------------------
35
36void
43
44void
51
52//------------------------------------------------------------------------------
53
54void
56{
57 JLOG(journal_.debug()) << "Starting";
58 XRPL_ASSERT(
60 "ripple::LoadManager::start : thread not joinable");
61
63}
64
65void
67{
68 {
70 stop_ = true;
71 // There is at most one thread waiting on this condition.
73 }
74 if (thread_.joinable())
75 {
76 JLOG(journal_.debug()) << "Stopping";
77 thread_.join();
78 }
79}
80
81//------------------------------------------------------------------------------
82
83void
85{
86 beast::setCurrentThreadName("LoadManager");
87
88 using namespace std::chrono_literals;
89 using clock_type = std::chrono::steady_clock;
90
91 auto t = clock_type::now();
92
93 while (true)
94 {
95 t += 1s;
96
98 if (cv_.wait_until(sl, t, [this] { return stop_; }))
99 break;
100
101 // Copy out shared data under a lock. Use copies outside lock.
102 auto const lastHeartbeat = lastHeartbeat_;
103 auto const armed = armed_;
104 sl.unlock();
105
106 // Measure the amount of time we have been stalled, in seconds.
107 using namespace std::chrono;
108 auto const timeSpentStalled =
109 duration_cast<seconds>(steady_clock::now() - lastHeartbeat);
110
111 constexpr auto reportingIntervalSeconds = 10s;
112 constexpr auto stallFatalLogMessageTimeLimit = 90s;
113 constexpr auto stallLogicErrorTimeLimit = 600s;
114
115 if (armed && (timeSpentStalled >= reportingIntervalSeconds))
116 {
117 // Report the stalled condition every reportingIntervalSeconds
118 if ((timeSpentStalled % reportingIntervalSeconds) == 0s)
119 {
120 if (timeSpentStalled < stallFatalLogMessageTimeLimit)
121 {
122 JLOG(journal_.warn())
123 << "Server stalled for " << timeSpentStalled.count()
124 << " seconds.";
125
127 {
128 JLOG(journal_.warn())
129 << "JobQueue: " << app_.getJobQueue().getJson(0);
130 }
131 }
132 else
133 {
134 JLOG(journal_.fatal())
135 << "Server stalled for " << timeSpentStalled.count()
136 << " seconds.";
137 JLOG(journal_.fatal())
138 << "JobQueue: " << app_.getJobQueue().getJson(0);
139 }
140 }
141
142 // If we go over the stallLogicErrorTimeLimit spent stalled, it
143 // means that the stall resolution code has failed, which qualifies
144 // as a LogicError
145 if (timeSpentStalled >= stallLogicErrorTimeLimit)
146 {
147 JLOG(journal_.fatal())
148 << "LogicError: Fatal server stall detected. Stalled time: "
149 << timeSpentStalled.count() << "s";
150 JLOG(journal_.fatal())
151 << "JobQueue: " << app_.getJobQueue().getJson(0);
152 LogicError("Fatal server stall detected");
153 }
154 }
155 }
156
157 bool change = false;
159 {
160 JLOG(journal_.info()) << "Raising local fee (JQ overload): "
161 << app_.getJobQueue().getJson(0);
162 change = app_.getFeeTrack().raiseLocalFee();
163 }
164 else
165 {
166 change = app_.getFeeTrack().lowerLocalFee();
167 }
168
169 if (change)
170 {
171 // VFALCO TODO replace this with a Listener / observer and
172 // subscribe in NetworkOPs or Application.
174 }
175}
176
177//------------------------------------------------------------------------------
178
181{
182 return std::unique_ptr<LoadManager>{new LoadManager{app, journal}};
183}
184
185} // namespace ripple
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream warn() const
Definition Journal.h:321
virtual LoadFeeTrack & getFeeTrack()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
Json::Value getJson(int c=0)
Definition JobQueue.cpp:195
Manages load sources.
Definition LoadManager.h:27
std::chrono::steady_clock::time_point lastHeartbeat_
Definition LoadManager.h:87
std::condition_variable cv_
Definition LoadManager.h:83
std::thread thread_
Definition LoadManager.h:81
void activateStallDetector()
Turn on stall detection.
beast::Journal const journal_
Definition LoadManager.h:79
Application & app_
Definition LoadManager.h:78
void heartbeat()
Reset the stall detection timer.
~LoadManager()
Destroy the manager.
virtual void reportFeeChange()=0
T join(T... args)
T joinable(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::unique_ptr< LoadManager > make_LoadManager(Application &app, beast::Journal journal)
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T unlock(T... args)
T what(T... args)