rippled
Loading...
Searching...
No Matches
ResourceManager.cpp
1#include <xrpl/basics/chrono.h>
2#include <xrpl/beast/core/CurrentThreadName.h>
3#include <xrpl/beast/insight/Collector.h>
4#include <xrpl/beast/net/IPAddressConversion.h>
5#include <xrpl/beast/net/IPEndpoint.h>
6#include <xrpl/beast/utility/Journal.h>
7#include <xrpl/beast/utility/PropertyStream.h>
8#include <xrpl/json/json_value.h>
9#include <xrpl/resource/Consumer.h>
10#include <xrpl/resource/Gossip.h>
11#include <xrpl/resource/ResourceManager.h>
12#include <xrpl/resource/detail/Logic.h>
13
14#include <boost/asio/ip/address.hpp>
15#include <boost/system/detail/error_code.hpp>
16
17#include <chrono>
18#include <condition_variable>
19#include <memory>
20#include <mutex>
21#include <string>
22#include <string_view>
23#include <thread>
24
25namespace xrpl {
26namespace Resource {
27
28class ManagerImp : public Manager
29{
30private:
34 bool stop_ = false;
37
38public:
40 : journal_(journal), logic_(collector, stopwatch(), journal)
41 {
43 }
44
45 ManagerImp() = delete;
46 ManagerImp(ManagerImp const&) = delete;
48 operator=(ManagerImp const&) = delete;
49
50 ~ManagerImp() override
51 {
52 {
54 stop_ = true;
56 }
57 thread_.join();
58 }
59
61 newInboundEndpoint(beast::IP::Endpoint const& address) override
62 {
63 return logic_.newInboundEndpoint(address);
64 }
65
67 newInboundEndpoint(beast::IP::Endpoint const& address, bool const proxy, std::string_view forwardedFor) override
68 {
69 if (!proxy)
70 return newInboundEndpoint(address);
71
72 boost::system::error_code ec;
73 auto const proxiedIp = boost::asio::ip::make_address(forwardedFor, ec);
74 if (ec)
75 {
76 journal_.warn() << "forwarded for (" << forwardedFor << ") from proxy " << address.to_string()
77 << " doesn't convert to IP endpoint: " << ec.message();
78 return newInboundEndpoint(address);
79 }
81 }
82
85 {
86 return logic_.newOutboundEndpoint(address);
87 }
88
91 {
92 return logic_.newUnlimitedEndpoint(address);
93 }
94
95 Gossip
96 exportConsumers() override
97 {
98 return logic_.exportConsumers();
99 }
100
101 void
102 importConsumers(std::string const& origin, Gossip const& gossip) override
103 {
104 logic_.importConsumers(origin, gossip);
105 }
106
107 //--------------------------------------------------------------------------
108
110 getJson() override
111 {
112 return logic_.getJson();
113 }
114
116 getJson(int threshold) override
117 {
118 return logic_.getJson(threshold);
119 }
120
121 //--------------------------------------------------------------------------
122
123 void
125 {
126 logic_.onWrite(map);
127 }
128
129 //--------------------------------------------------------------------------
130
131private:
132 void
134 {
135 beast::setCurrentThreadName("Resource::Mngr");
136 for (;;)
137 {
141 if (stop_)
142 break;
143 }
144 }
145};
146
147//------------------------------------------------------------------------------
148
149Manager::Manager() : beast::PropertyStream::Source("resource")
150{
151}
152
153Manager::~Manager() = default;
154
155//------------------------------------------------------------------------------
156
159{
160 return std::make_unique<ManagerImp>(collector, journal);
161}
162
163} // namespace Resource
164} // namespace xrpl
Represents a JSON value.
Definition json_value.h:131
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
std::string to_string() const
Returns a string representing the endpoint.
A generic endpoint for log messages.
Definition Journal.h:41
Stream warn() const
Definition Journal.h:313
An endpoint that consumes resources.
Definition Consumer.h:17
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
void importConsumers(std::string const &origin, Gossip const &gossip)
void onWrite(beast::PropertyStream::Map &map)
std::condition_variable cond_
void importConsumers(std::string const &origin, Gossip const &gossip) override
Import packaged consumer information.
Consumer newInboundEndpoint(beast::IP::Endpoint const &address) override
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address) override
Create a new unlimited endpoint keyed by forwarded IP.
Json::Value getJson() override
Extract consumer information for reporting.
Consumer newInboundEndpoint(beast::IP::Endpoint const &address, bool const proxy, std::string_view forwardedFor) override
Gossip exportConsumers() override
Extract packaged consumer information for export.
ManagerImp(ManagerImp const &)=delete
ManagerImp & operator=(ManagerImp const &)=delete
beast::Journal const journal_
void onWrite(beast::PropertyStream::Map &map) override
Subclass override.
Json::Value getJson(int threshold) override
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address) override
Create a new endpoint keyed by outbound IP address and port.
ManagerImp(beast::insight::Collector::ptr const &collector, beast::Journal journal)
Tracks load and resource consumption.
T is_same_v
T join(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
std::unique_ptr< Manager > make_Manager(beast::insight::Collector::ptr const &collector, beast::Journal journal)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:94
std::string_view forwardedFor(http_request_type const &request)
Definition Role.cpp:228
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Data format for exchanging consumption information across peers.
Definition Gossip.h:13