mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-19 18:45:52 +00:00
- Drop duplicate outgoing TMGetLedger messages per peer
- Allow a retry after 30s in case of peer or network congestion.
- Addresses RIPD-1870
- (Changes levelization. That is not desirable, and will need to be fixed.)
- Drop duplicate incoming TMGetLedger messages per peer
- Allow a retry after 15s in case of peer or network congestion.
- The requestCookie is ignored when computing the hash, thus increasing
the chances of detecting duplicate messages.
- With duplicate messages, keep track of the different requestCookies
(or lack of cookie). When work is finally done for a given request,
send the response to all the peers that are waiting on the request,
sending one message per peer, including all the cookies and
a "directResponse" flag indicating the data is intended for the
sender, too.
- Addresses RIPD-1871
- Drop duplicate incoming TMLedgerData messages
- Addresses RIPD-1869
- Improve logging related to ledger acquisition
- Class "CanProcess" to keep track of processing of distinct items
---------
Co-authored-by: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com>
135 lines
4.5 KiB
C++
135 lines
4.5 KiB
C++
//------------------------------------------------------------------------------
|
|
/*
|
|
This file is part of rippled: https://github.com/ripple/rippled
|
|
Copyright (c) 2024 Ripple Labs Inc.
|
|
|
|
Permission to use, copy, modify, and/or distribute this software for any
|
|
purpose with or without fee is hereby granted, provided that the above
|
|
copyright notice and this permission notice appear in all copies.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
//==============================================================================
|
|
|
|
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
|
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
|
|
|
#include <functional>
|
|
#include <mutex>
|
|
#include <set>
|
|
|
|
/** RAII class to check if an Item is already being processed on another thread,
|
|
* as indicated by it's presence in a Collection.
|
|
*
|
|
* If the Item is not in the Collection, it will be added under lock in the
|
|
* ctor, and removed under lock in the dtor. The object will be considered
|
|
* "usable" and evaluate to `true`.
|
|
*
|
|
* If the Item is in the Collection, no changes will be made to the collection,
|
|
* and the CanProcess object will be considered "unusable".
|
|
*
|
|
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
|
|
* Process or skip a block of code, or set a flag.)
|
|
*
|
|
* The current use is to avoid lock contention that would be involved in
|
|
* processing something associated with the Item.
|
|
*
|
|
* Examples:
|
|
*
|
|
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
|
|
* {
|
|
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
|
|
* {
|
|
* acquire(hash, ...);
|
|
* }
|
|
* }
|
|
*
|
|
* bool
|
|
* NetworkOPsImp::recvValidation(
|
|
* std::shared_ptr<STValidation> const& val,
|
|
* std::string const& source)
|
|
* {
|
|
* CanProcess check(
|
|
* validationsMutex_, pendingValidations_, val->getLedgerHash());
|
|
* BypassAccept bypassAccept =
|
|
* check ? BypassAccept::no : BypassAccept::yes;
|
|
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
|
* }
|
|
*
|
|
*/
|
|
class CanProcess
|
|
{
|
|
public:
|
|
template <class Mutex, class Collection, class Item>
|
|
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
|
|
: cleanup_(insert(mtx, collection, item))
|
|
{
|
|
}
|
|
|
|
~CanProcess()
|
|
{
|
|
if (cleanup_)
|
|
cleanup_();
|
|
}
|
|
|
|
explicit
|
|
operator bool() const
|
|
{
|
|
return static_cast<bool>(cleanup_);
|
|
}
|
|
|
|
private:
|
|
template <bool useIterator, class Mutex, class Collection, class Item>
|
|
std::function<void()>
|
|
doInsert(Mutex& mtx, Collection& collection, Item const& item)
|
|
{
|
|
std::unique_lock<Mutex> lock(mtx);
|
|
// TODO: Use structured binding once LLVM 16 is the minimum supported
|
|
// version. See also: https://github.com/llvm/llvm-project/issues/48582
|
|
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
|
|
auto const insertResult = collection.insert(item);
|
|
auto const it = insertResult.first;
|
|
if (!insertResult.second)
|
|
return {};
|
|
if constexpr (useIterator)
|
|
return [&, it]() {
|
|
std::unique_lock<Mutex> lock(mtx);
|
|
collection.erase(it);
|
|
};
|
|
else
|
|
return [&]() {
|
|
std::unique_lock<Mutex> lock(mtx);
|
|
collection.erase(item);
|
|
};
|
|
}
|
|
|
|
// Generic insert() function doesn't use iterators because they may get
|
|
// invalidated
|
|
template <class Mutex, class Collection, class Item>
|
|
std::function<void()>
|
|
insert(Mutex& mtx, Collection& collection, Item const& item)
|
|
{
|
|
return doInsert<false>(mtx, collection, item);
|
|
}
|
|
|
|
// Specialize insert() for std::set, which does not invalidate iterators for
|
|
// insert and erase
|
|
template <class Mutex, class Item>
|
|
std::function<void()>
|
|
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
|
|
{
|
|
return doInsert<true>(mtx, collection, item);
|
|
}
|
|
|
|
// If set, then the item is "usable"
|
|
std::function<void()> cleanup_;
|
|
};
|
|
|
|
#endif
|