rippled
Loading...
Searching...
No Matches
include/xrpl/resource/detail/Logic.h
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#ifndef RIPPLE_RESOURCE_LOGIC_H_INCLUDED
21#define RIPPLE_RESOURCE_LOGIC_H_INCLUDED
22
23#include <xrpl/basics/Log.h>
24#include <xrpl/basics/UnorderedContainers.h>
25#include <xrpl/basics/chrono.h>
26#include <xrpl/beast/clock/abstract_clock.h>
27#include <xrpl/beast/insight/Insight.h>
28#include <xrpl/beast/utility/PropertyStream.h>
29#include <xrpl/beast/utility/instrumentation.h>
30#include <xrpl/json/json_value.h>
31#include <xrpl/protocol/jss.h>
32#include <xrpl/resource/Fees.h>
33#include <xrpl/resource/Gossip.h>
34#include <xrpl/resource/detail/Import.h>
35
36#include <mutex>
37
38namespace ripple {
39namespace Resource {
40
41class Logic
42{
43private:
48
49 struct Stats
50 {
52 {
53 warn = collector->make_meter("warn");
54 drop = collector->make_meter("drop");
55 }
56
59 };
60
64
66
67 // Table of all entries
69
70 // Because the following are intrusive lists, a given Entry may be in
71 // at most list at a given instant. The Entry must be removed from
72 // one list before placing it in another.
73
74 // List of all active inbound entries
76
77 // List of all active outbound entries
79
80 // List of all active admin entries
82
83 // List of all inactve entries
85
86 // All imported gossip data
88
89 //--------------------------------------------------------------------------
90public:
92 beast::insight::Collector::ptr const& collector,
93 clock_type& clock,
94 beast::Journal journal)
95 : m_stats(collector), m_clock(clock), m_journal(journal)
96 {
97 }
98
100 {
101 // These have to be cleared before the Logic is destroyed
102 // since their destructors call back into the class.
103 // Order matters here as well, the import table has to be
104 // destroyed before the consumer table.
105 //
107 table_.clear();
108 }
109
112 {
113 Entry* entry(nullptr);
114
115 {
117 auto [resultIt, resultInserted] = table_.emplace(
119 std::make_tuple(kindInbound, address.at_port(0)), // Key
120 std::make_tuple(m_clock.now())); // Entry
121
122 entry = &resultIt->second;
123 entry->key = &resultIt->first;
124 ++entry->refcount;
125 if (entry->refcount == 1)
126 {
127 if (!resultInserted)
128 {
130 }
131 inbound_.push_back(*entry);
132 }
133 }
134
135 JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;
136
137 return Consumer(*this, *entry);
138 }
139
142 {
143 Entry* entry(nullptr);
144
145 {
147 auto [resultIt, resultInserted] = table_.emplace(
149 std::make_tuple(kindOutbound, address), // Key
150 std::make_tuple(m_clock.now())); // Entry
151
152 entry = &resultIt->second;
153 entry->key = &resultIt->first;
154 ++entry->refcount;
155 if (entry->refcount == 1)
156 {
157 if (!resultInserted)
159 outbound_.push_back(*entry);
160 }
161 }
162
163 JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;
164
165 return Consumer(*this, *entry);
166 }
167
175 {
176 Entry* entry(nullptr);
177
178 {
180 auto [resultIt, resultInserted] = table_.emplace(
182 std::make_tuple(kindUnlimited, address.at_port(1)), // Key
183 std::make_tuple(m_clock.now())); // Entry
184
185 entry = &resultIt->second;
186 entry->key = &resultIt->first;
187 ++entry->refcount;
188 if (entry->refcount == 1)
189 {
190 if (!resultInserted)
192 admin_.push_back(*entry);
193 }
194 }
195
196 JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;
197
198 return Consumer(*this, *entry);
199 }
200
203 {
205 }
206
209 getJson(int threshold)
210 {
212
215
216 for (auto& inboundEntry : inbound_)
217 {
218 int localBalance = inboundEntry.local_balance.value(now);
219 if ((localBalance + inboundEntry.remote_balance) >= threshold)
220 {
221 Json::Value& entry =
222 (ret[inboundEntry.to_string()] = Json::objectValue);
223 entry[jss::local] = localBalance;
224 entry[jss::remote] = inboundEntry.remote_balance;
225 entry[jss::type] = "inbound";
226 }
227 }
228 for (auto& outboundEntry : outbound_)
229 {
230 int localBalance = outboundEntry.local_balance.value(now);
231 if ((localBalance + outboundEntry.remote_balance) >= threshold)
232 {
233 Json::Value& entry =
234 (ret[outboundEntry.to_string()] = Json::objectValue);
235 entry[jss::local] = localBalance;
236 entry[jss::remote] = outboundEntry.remote_balance;
237 entry[jss::type] = "outbound";
238 }
239 }
240 for (auto& adminEntry : admin_)
241 {
242 int localBalance = adminEntry.local_balance.value(now);
243 if ((localBalance + adminEntry.remote_balance) >= threshold)
244 {
245 Json::Value& entry =
246 (ret[adminEntry.to_string()] = Json::objectValue);
247 entry[jss::local] = localBalance;
248 entry[jss::remote] = adminEntry.remote_balance;
249 entry[jss::type] = "admin";
250 }
251 }
252
253 return ret;
254 }
255
256 Gossip
258 {
260
261 Gossip gossip;
263
264 gossip.items.reserve(inbound_.size());
265
266 for (auto& inboundEntry : inbound_)
267 {
268 Gossip::Item item;
269 item.balance = inboundEntry.local_balance.value(now);
270 if (item.balance >= minimumGossipBalance)
271 {
272 item.address = inboundEntry.key->address;
273 gossip.items.push_back(item);
274 }
275 }
276
277 return gossip;
278 }
279
280 //--------------------------------------------------------------------------
281
282 void
283 importConsumers(std::string const& origin, Gossip const& gossip)
284 {
285 auto const elapsed = m_clock.now();
286 {
288 auto [resultIt, resultInserted] = importTable_.emplace(
290 std::make_tuple(origin), // Key
292 m_clock.now().time_since_epoch().count())); // Import
293
294 if (resultInserted)
295 {
296 // This is a new import
297 Import& next(resultIt->second);
298 next.whenExpires = elapsed + gossipExpirationSeconds;
299 next.items.reserve(gossip.items.size());
300
301 for (auto const& gossipItem : gossip.items)
302 {
303 Import::Item item;
304 item.balance = gossipItem.balance;
305 item.consumer = newInboundEndpoint(gossipItem.address);
306 item.consumer.entry().remote_balance += item.balance;
307 next.items.push_back(item);
308 }
309 }
310 else
311 {
312 // Previous import exists so add the new remote
313 // balances and then deduct the old remote balances.
314
315 Import next;
316 next.whenExpires = elapsed + gossipExpirationSeconds;
317 next.items.reserve(gossip.items.size());
318 for (auto const& gossipItem : gossip.items)
319 {
320 Import::Item item;
321 item.balance = gossipItem.balance;
322 item.consumer = newInboundEndpoint(gossipItem.address);
323 item.consumer.entry().remote_balance += item.balance;
324 next.items.push_back(item);
325 }
326
327 Import& prev(resultIt->second);
328 for (auto& item : prev.items)
329 {
330 item.consumer.entry().remote_balance -= item.balance;
331 }
332
333 std::swap(next, prev);
334 }
335 }
336 }
337
338 //--------------------------------------------------------------------------
339
340 // Called periodically to expire entries and groom the table.
341 //
342 void
344 {
346
347 auto const elapsed = m_clock.now();
348
349 for (auto iter(inactive_.begin()); iter != inactive_.end();)
350 {
351 if (iter->whenExpires <= elapsed)
352 {
353 JLOG(m_journal.debug()) << "Expired " << *iter;
354 auto table_iter = table_.find(*iter->key);
355 ++iter;
356 erase(table_iter);
357 }
358 else
359 {
360 break;
361 }
362 }
363
364 auto iter = importTable_.begin();
365 while (iter != importTable_.end())
366 {
367 Import& import(iter->second);
368 if (iter->second.whenExpires <= elapsed)
369 {
370 for (auto item_iter(import.items.begin());
371 item_iter != import.items.end();
372 ++item_iter)
373 {
374 item_iter->consumer.entry().remote_balance -=
375 item_iter->balance;
376 }
377
378 iter = importTable_.erase(iter);
379 }
380 else
381 ++iter;
382 }
383 }
384
385 //--------------------------------------------------------------------------
386
387 // Returns the disposition based on the balance and thresholds
388 static Disposition
390 {
391 if (balance >= dropThreshold)
392 return Disposition::drop;
393
395 return Disposition::warn;
396
397 return Disposition::ok;
398 }
399
400 void
401 erase(Table::iterator iter)
402 {
404 Entry& entry(iter->second);
405 XRPL_ASSERT(
406 entry.refcount == 0,
407 "ripple::Resource::Logic::erase : entry not used");
409 table_.erase(iter);
410 }
411
412 void
414 {
416 ++entry.refcount;
417 }
418
419 void
421 {
423 if (--entry.refcount == 0)
424 {
425 JLOG(m_journal.debug()) << "Inactive " << entry;
426
427 switch (entry.key->kind)
428 {
429 case kindInbound:
431 break;
432 case kindOutbound:
434 break;
435 case kindUnlimited:
437 break;
438 default:
439 // LCOV_EXCL_START
440 UNREACHABLE(
441 "ripple::Resource::Logic::release : invalid entry "
442 "kind");
443 break;
444 // LCOV_EXCL_STOP
445 }
446 inactive_.push_back(entry);
447 entry.whenExpires = m_clock.now() + secondsUntilExpiration;
448 }
449 }
450
452 charge(Entry& entry, Charge const& fee, std::string context = {})
453 {
454 static constexpr Charge::value_type feeLogAsWarn = 3000;
455 static constexpr Charge::value_type feeLogAsInfo = 1000;
456 static constexpr Charge::value_type feeLogAsDebug = 100;
457 static_assert(
458 feeLogAsWarn > feeLogAsInfo && feeLogAsInfo > feeLogAsDebug &&
459 feeLogAsDebug > 10);
460
461 static auto getStream = [](Resource::Charge::value_type cost,
462 beast::Journal& journal) {
463 if (cost >= feeLogAsWarn)
464 return journal.warn();
465 if (cost >= feeLogAsInfo)
466 return journal.info();
467 if (cost >= feeLogAsDebug)
468 return journal.debug();
469 return journal.trace();
470 };
471
472 if (!context.empty())
473 context = " (" + context + ")";
474
477 int const balance(entry.add(fee.cost(), now));
478 JLOG(getStream(fee.cost(), m_journal))
479 << "Charging " << entry << " for " << fee << context;
480 return disposition(balance);
481 }
482
483 bool
484 warn(Entry& entry)
485 {
486 if (entry.isUnlimited())
487 return false;
488
490 bool notify(false);
491 auto const elapsed = m_clock.now();
492 if (entry.balance(m_clock.now()) >= warningThreshold &&
493 elapsed != entry.lastWarningTime)
494 {
495 charge(entry, feeWarning);
496 notify = true;
497 entry.lastWarningTime = elapsed;
498 }
499 if (notify)
500 {
501 JLOG(m_journal.info()) << "Load warning: " << entry;
502 ++m_stats.warn;
503 }
504 return notify;
505 }
506
507 bool
509 {
510 if (entry.isUnlimited())
511 return false;
512
514 bool drop(false);
516 int const balance(entry.balance(now));
517 if (balance >= dropThreshold)
518 {
519 JLOG(m_journal.warn())
520 << "Consumer entry " << entry << " dropped with balance "
521 << balance << " at or above drop threshold " << dropThreshold;
522
523 // Adding feeDrop at this point keeps the dropped connection
524 // from re-connecting for at least a little while after it is
525 // dropped.
526 charge(entry, feeDrop);
527 ++m_stats.drop;
528 drop = true;
529 }
530 return drop;
531 }
532
533 int
535 {
537 return entry.balance(m_clock.now());
538 }
539
540 //--------------------------------------------------------------------------
541
542 void
544 clock_type::time_point const now,
546 EntryIntrusiveList& list)
547 {
548 for (auto& entry : list)
549 {
550 beast::PropertyStream::Map item(items);
551 if (entry.refcount != 0)
552 item["count"] = entry.refcount;
553 item["name"] = entry.to_string();
554 item["balance"] = entry.balance(now);
555 if (entry.remote_balance != 0)
556 item["remote_balance"] = entry.remote_balance;
557 }
558 }
559
560 void
562 {
564
566
567 {
568 beast::PropertyStream::Set s("inbound", map);
569 writeList(now, s, inbound_);
570 }
571
572 {
573 beast::PropertyStream::Set s("outbound", map);
574 writeList(now, s, outbound_);
575 }
576
577 {
578 beast::PropertyStream::Set s("admin", map);
579 writeList(now, s, admin_);
580 }
581
582 {
583 beast::PropertyStream::Set s("inactive", map);
584 writeList(now, s, inactive_);
585 }
586 }
587};
588
589} // namespace Resource
590} // namespace ripple
591
592#endif
T begin(T... args)
Represents a JSON value.
Definition json_value.h:149
A version-independent IP address and port combination.
Definition IPEndpoint.h:38
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:75
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:68
A generic endpoint for log messages.
Definition Journal.h:60
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream warn() const
Definition Journal.h:340
iterator iterator_to(T &element) const noexcept
Obtain an iterator from an element.
Definition List.h:561
iterator push_back(T &element) noexcept
Append an element at the end of the list.
Definition List.h:508
iterator begin() noexcept
Obtain an iterator to the beginning of the list.
Definition List.h:366
iterator end() noexcept
Obtain a iterator to the end of the list.
Definition List.h:393
size_type size() const noexcept
Returns the number of elements in the list.
Definition List.h:317
iterator erase(iterator pos) noexcept
Remove an element.
Definition List.h:471
virtual time_point now() const =0
Returns the current time.
A metric for measuring an integral value.
Definition Meter.h:38
A consumption charge.
Definition Charge.h:30
int value_type
The type used to hold a consumption charge.
Definition Charge.h:33
value_type cost() const
Return the cost of the charge in Resource::Manager units.
Definition Charge.cpp:42
An endpoint that consumes resources.
Definition Consumer.h:35
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
void importConsumers(std::string const &origin, Gossip const &gossip)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
void onWrite(beast::PropertyStream::Map &map)
Json::Value getJson(int threshold)
Returns a Json::objectValue.
Logic(beast::insight::Collector::ptr const &collector, clock_type &clock, beast::Journal journal)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
Disposition charge(Entry &entry, Charge const &fee, std::string context={})
void writeList(clock_type::time_point const now, beast::PropertyStream::Set &items, EntryIntrusiveList &list)
static Disposition disposition(int balance)
T clear(T... args)
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T make_tuple(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
std::chrono::seconds constexpr gossipExpirationSeconds
Charge const feeDrop
Charge const feeWarning
Disposition
The disposition of a consumer after applying a load charge.
Definition Disposition.h:27
@ ok
No action required.
Definition Disposition.h:29
@ warn
Consumer should be disconnected for excess consumption.
Definition Disposition.h:33
std::chrono::seconds constexpr secondsUntilExpiration
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
beast::abstract_clock< std::chrono::steady_clock > Stopwatch
A clock for measuring elapsed time.
Definition chrono.h:112
T piecewise_construct
Describes a single consumer.
Definition Gossip.h:37
beast::IP::Endpoint address
Definition Gossip.h:41
Data format for exchanging consumption information across peers.
Definition Gossip.h:32
std::vector< Item > items
Definition Gossip.h:44
A set of imported consumer data from a gossip origin.
Definition Import.h:31
Stats(beast::insight::Collector::ptr const &collector)
T swap(T... args)