From b7c3b96516e6468a26d39fdf80cd0e9a1fbc2df0 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Sat, 10 Oct 2015 19:34:59 -0700 Subject: [PATCH] Remove beast::SharedData --- Builds/VisualStudio2015/RippleD.vcxproj | 2 - .../VisualStudio2015/RippleD.vcxproj.filters | 3 - src/beast/beast/Threads.h | 1 - .../beast/insight/impl/StatsDCollector.cpp | 27 +- src/beast/beast/threads/SharedData.h | 287 ------------ src/beast/beast/utility/PropertyStream.h | 32 +- .../beast/utility/impl/PropertyStream.cpp | 94 ++-- src/ripple/app/ledger/impl/LedgerCleaner.cpp | 125 +++-- src/ripple/peerfinder/impl/Logic.h | 427 ++++++++---------- src/ripple/resource/impl/Logic.h | 273 +++++------ src/ripple/server/impl/ServerImpl.h | 1 - 11 files changed, 413 insertions(+), 859 deletions(-) delete mode 100644 src/beast/beast/threads/SharedData.h diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj index cbd0d1b6c..9ea9e9d80 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj +++ b/Builds/VisualStudio2015/RippleD.vcxproj @@ -957,8 +957,6 @@ - - diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters index c07a78c60..2c2ef8db1 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters @@ -1596,9 +1596,6 @@ beast\threads - - beast\threads - beast\threads diff --git a/src/beast/beast/Threads.h b/src/beast/beast/Threads.h index 73a7ec77b..07a76138d 100644 --- a/src/beast/beast/Threads.h +++ b/src/beast/beast/Threads.h @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include diff --git a/src/beast/beast/insight/impl/StatsDCollector.cpp b/src/beast/beast/insight/impl/StatsDCollector.cpp index c180f3e66..e16035f19 100644 --- a/src/beast/beast/insight/impl/StatsDCollector.cpp +++ b/src/beast/beast/insight/impl/StatsDCollector.cpp @@ -20,13 +20,13 @@ #include #include #include -#include #include #include #include #include #include #include +#include #include #include #include @@ -192,13 +192,6 @@ private: max_packet_size = 1472 }; - struct StateType - { - List metrics; - }; - - using State = SharedData ; - Journal m_journal; IP::Endpoint m_address; std::string m_prefix; @@ -208,7 +201,8 @@ private: boost::asio::deadline_timer m_timer; boost::asio::ip::udp::socket m_socket; std::deque m_data; - State m_state; + std::recursive_mutex metricsLock_; + List metrics_; // Must come last for order of init std::thread m_thread; @@ -288,14 +282,14 @@ public: void add (StatsDMetricBase& metric) { - State::Access state (m_state); - state->metrics.push_back (metric); + std::lock_guard _(metricsLock_); + metrics_.push_back (metric); } void remove (StatsDMetricBase& metric) { - State::Access state (m_state); - state->metrics.erase (state->metrics.iterator_to (metric)); + std::lock_guard _(metricsLock_); + metrics_.erase (metrics_.iterator_to (metric)); } //-------------------------------------------------------------------------- @@ -425,11 +419,10 @@ public: return; } - State::Access state (m_state); + std::lock_guard _(metricsLock_); - for (List ::iterator iter (state->metrics.begin()); - iter != state->metrics.end(); ++iter) - iter->do_process(); + for (auto& m : metrics_) + m.do_process(); send_buffers (); diff --git a/src/beast/beast/threads/SharedData.h b/src/beast/beast/threads/SharedData.h deleted file mode 100644 index 9c6412987..000000000 --- a/src/beast/beast/threads/SharedData.h +++ /dev/null @@ -1,287 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef BEAST_THREADS_SHAREDDATA_H_INCLUDED -#define BEAST_THREADS_SHAREDDATA_H_INCLUDED - -#include -#include - -namespace beast { - -/** Structured, multi-threaded access to a shared state. - - This container combines locking semantics with data access semantics to - create an alternative to the typical synchronization method of first - acquiring a lock and then accessing data members. - - With this container, access to the underlying data is only possible after - first acquiring a lock. The steps of acquiring the lock and obtaining - a const or non-const reference to the data are combined into one - RAII style operation. - - There are three types of access: - - - Access - Provides access to the shared state via a non-const reference or pointer. - Access acquires a unique lock on the mutex associated with the - container. - - - ConstAccess - Provides access to the shared state via a const reference or pointer. - ConstAccess acquires a shared lock on the mutex associated with the - container. - - - ConstUnlockedAccess - Provides access to the shared state via a const reference or pointer. - No locking is performed. It is the callers responsibility to ensure that - the operation is synchronized. This can be useful for diagnostics or - assertions, or for when it is known that no other threads can access - the data. - - - UnlockedAccess - Provides access to the shared state via a reference or pointer. - No locking is performed. It is the callers responsibility to ensure that - the operation is synchronized. This can be useful for diagnostics or - assertions, or for when it is known that no other threads can access - the data. - - Usage example: - - @code - - struct State - { - int value1; - String value2; - }; - - using SharedState = SharedData ; - - SharedState m_state; - - void readExample () - { - SharedState::ConstAccess state (m_state); - - print (state->value1); // read access - print (state->value2); // read access - - state->value1 = 42; // write disallowed: compile error - } - - void writeExample () - { - SharedState::Access state (m_state); - - state->value2 = "Label"; // write access, allowed - } - - @endcode - - Requirements for Value: - Constructible - Destructible - - Requirements for SharedMutexType: - X is SharedMutexType, a is an instance of X: - X a; DefaultConstructible - X::LockGuardType Names a type that implements the LockGuard concept. - X::SharedLockGuardType Names a type that implements the SharedLockGuard concept. - - @tparam Value The type which the container holds. - @tparam SharedMutexType The type of shared mutex to use. -*/ -template > -class SharedData -{ -private: - using LockGuardType = typename SharedMutexType::LockGuardType; - using SharedLockGuardType = typename SharedMutexType::SharedLockGuardType; - -public: - using ValueType = Value; - - class Access; - class ConstAccess; - class UnlockedAccess; - class ConstUnlockedAccess; - - /** Create a shared data container. - Up to 8 parameters can be specified in the constructor. These parameters - are forwarded to the corresponding constructor in Data. If no - constructor in Data matches the parameter list, a compile error is - generated. - */ - /** @{ */ - SharedData () = default; - - template - explicit SharedData (T1 t1) - : m_value (t1) { } - - template - SharedData (T1 t1, T2 t2) - : m_value (t1, t2) { } - - template - SharedData (T1 t1, T2 t2, T3 t3) - : m_value (t1, t2, t3) { } - - template - SharedData (T1 t1, T2 t2, T3 t3, T4 t4) - : m_value (t1, t2, t3, t4) { } - - template - SharedData (T1 t1, T2 t2, T3 t3, T4 t4, T5 t5) - : m_value (t1, t2, t3, t4, t5) { } - - template - SharedData (T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6) - : m_value (t1, t2, t3, t4, t5, t6) { } - - template - SharedData (T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7) : m_value (t1, t2, t3, t4, t5, t6, t7) { } - - template - SharedData (T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8) - : m_value (t1, t2, t3, t4, t5, t6, t7, t8) { } - /** @} */ - - SharedData (SharedData const&) = delete; - SharedData& operator= (SharedData const&) = delete; - -private: - Value m_value; - SharedMutexType m_mutex; -}; - -//------------------------------------------------------------------------------ - -/** Provides non-const access to the contents of a SharedData. - This acquires a unique lock on the underlying mutex. -*/ -template -class SharedData ::Access -{ -public: - explicit Access (SharedData& state) - : m_state (state) - , m_lock (m_state.m_mutex) - { } - - Access (Access const&) = delete; - Access& operator= (Access const&) = delete; - - Data const& get () const { return m_state.m_value; } - Data const& operator* () const { return get (); } - Data const* operator-> () const { return &get (); } - Data& get () { return m_state.m_value; } - Data& operator* () { return get (); } - Data* operator-> () { return &get (); } - -private: - SharedData& m_state; - typename SharedData::LockGuardType m_lock; -}; - -//------------------------------------------------------------------------------ - -/** Provides const access to the contents of a SharedData. - This acquires a shared lock on the underlying mutex. -*/ -template -class SharedData ::ConstAccess -{ -public: - /** Create a ConstAccess from the specified SharedData */ - explicit ConstAccess (SharedData const volatile& state) - : m_state (const_cast (state)) - , m_lock (m_state.m_mutex) - { } - - ConstAccess (ConstAccess const&) = delete; - ConstAccess& operator= (ConstAccess const&) = delete; - - Data const& get () const { return m_state.m_value; } - Data const& operator* () const { return get (); } - Data const* operator-> () const { return &get (); } - -private: - SharedData const& m_state; - typename SharedData::SharedLockGuardType m_lock; -}; - -//------------------------------------------------------------------------------ - -/** Provides const access to the contents of a SharedData. - This acquires a shared lock on the underlying mutex. -*/ -template -class SharedData ::ConstUnlockedAccess -{ -public: - /** Create an UnlockedAccess from the specified SharedData */ - explicit ConstUnlockedAccess (SharedData const volatile& state) - : m_state (const_cast (state)) - { } - - ConstUnlockedAccess (ConstUnlockedAccess const&) = delete; - ConstUnlockedAccess& operator= (ConstUnlockedAccess const&) = delete; - - Data const& get () const { return m_state.m_value; } - Data const& operator* () const { return get (); } - Data const* operator-> () const { return &get (); } - -private: - SharedData const& m_state; -}; - -//------------------------------------------------------------------------------ - -/** Provides access to the contents of a SharedData. - This acquires a shared lock on the underlying mutex. -*/ -template -class SharedData ::UnlockedAccess -{ -public: - /** Create an UnlockedAccess from the specified SharedData */ - explicit UnlockedAccess (SharedData& state) - : m_state (state) - { } - - UnlockedAccess (UnlockedAccess const&) = delete; - UnlockedAccess& operator= (UnlockedAccess const&) = delete; - - Data const& get () const { return m_state.m_value; } - Data const& operator* () const { return get (); } - Data const* operator-> () const { return &get (); } - Data& get () { return m_state.m_value; } - Data& operator* () { return get (); } - Data* operator-> () { return &get (); } - -private: - SharedData& m_state; -}; - -} - -#endif diff --git a/src/beast/beast/utility/PropertyStream.h b/src/beast/beast/utility/PropertyStream.h index cdedf01eb..ce0be6400 100644 --- a/src/beast/beast/utility/PropertyStream.h +++ b/src/beast/beast/utility/PropertyStream.h @@ -21,9 +21,9 @@ #define BEAST_UTILITY_PROPERTYSTREAM_H_INCLUDED #include -#include #include +#include #include #include #include @@ -274,31 +274,11 @@ public: class PropertyStream::Source { private: - struct State - { - explicit State (Source* source) - : item (source) - , parent (nullptr) - { } - - Item item; - Source* parent; - List children; - }; - - using SharedState = SharedData ; - std::string const m_name; - SharedState m_state; - - //-------------------------------------------------------------------------- - - void remove (SharedState::Access& state, - SharedState::Access& childState); - - void removeAll (SharedState::Access& state); - - void write (SharedState::Access& state, PropertyStream& stream); + std::recursive_mutex lock_; + Item item_; + Source* parent_; + List children_; public: explicit Source (std::string const& name); @@ -326,7 +306,7 @@ public: /** Remove a child source from this Source. */ void remove (Source& child); - /** Remove all child sources of this Source. */ + /** Remove all child sources from this Source. */ void removeAll (); /** Write only this Source to the stream. */ diff --git a/src/beast/beast/utility/impl/PropertyStream.cpp b/src/beast/beast/utility/impl/PropertyStream.cpp index de271a06a..641be5ee8 100644 --- a/src/beast/beast/utility/impl/PropertyStream.cpp +++ b/src/beast/beast/utility/impl/PropertyStream.cpp @@ -173,16 +173,17 @@ PropertyStream const& PropertyStream::Set::stream() const PropertyStream::Source::Source (std::string const& name) : m_name (name) - , m_state (this) + , item_ (this) + , parent_ (nullptr) { } PropertyStream::Source::~Source () { - SharedState::Access state (m_state); - if (state->parent != nullptr) - state->parent->remove (*this); - removeAll (state); + std::lock_guard _(lock_); + if (parent_ != nullptr) + parent_->remove (*this); + removeAll (); } std::string const& PropertyStream::Source::name () const @@ -192,37 +193,35 @@ std::string const& PropertyStream::Source::name () const void PropertyStream::Source::add (Source& source) { - SharedState::Access state (m_state); - SharedState::Access childState (source.m_state); - bassert (childState->parent == nullptr); - state->children.push_back (childState->item); - childState->parent = this; + std::lock(lock_, source.lock_); + std::lock_guard lk1(lock_, std::adopt_lock); + std::lock_guard lk2(source.lock_, std::adopt_lock); + + bassert (source.parent_ == nullptr); + children_.push_back (source.item_); + source.parent_ = this; } void PropertyStream::Source::remove (Source& child) { - SharedState::Access state (m_state); - SharedState::Access childState (child.m_state); - remove (state, childState); + std::lock(lock_, child.lock_); + std::lock_guard lk1(lock_, std::adopt_lock); + std::lock_guard lk2(child.lock_, std::adopt_lock); + + bassert (child.parent_ == this); + children_.erase ( + children_.iterator_to ( + child.item_)); + child.parent_ = nullptr; } void PropertyStream::Source::removeAll () { - SharedState::Access state (m_state); - removeAll (state); -} - -//------------------------------------------------------------------------------ - -void PropertyStream::Source::write ( - SharedState::Access& state, PropertyStream &stream) -{ - for (List ::iterator iter (state->children.begin()); - iter != state->children.end(); ++iter) + std::lock_guard _(lock_); + for (auto iter = children_.begin(); iter != children_.end(); ) { - Source& source (iter->source()); - Map map (source.name(), stream); - source.write (stream); + std::lock_guard _cl((*iter)->lock_); + remove (*(*iter)); } } @@ -239,14 +238,10 @@ void PropertyStream::Source::write (PropertyStream& stream) Map map (m_name, stream); onWrite (map); - SharedState::Access state (m_state); + std::lock_guard _(lock_); - for (List ::iterator iter (state->children.begin()); - iter != state->children.end(); ++iter) - { - Source& source (iter->source()); - source.write (stream); - } + for (auto& child : children_) + child.source().write (stream); } void PropertyStream::Source::write (PropertyStream& stream, std::string const& path) @@ -330,8 +325,9 @@ PropertyStream::Source* PropertyStream::Source::find_one_deep (std::string const Source* found = find_one (name); if (found != nullptr) return found; - SharedState::Access state (this->m_state); - for (auto& s : state->children) + + std::lock_guard _(lock_); + for (auto& s : children_) { found = s.source().find_one_deep (name); if (found != nullptr) @@ -360,8 +356,8 @@ PropertyStream::Source* PropertyStream::Source::find_path (std::string path) // If no immediate children match, then return nullptr PropertyStream::Source* PropertyStream::Source::find_one (std::string const& name) { - SharedState::Access state (this->m_state); - for (auto& s : state->children) + std::lock_guard _(lock_); + for (auto& s : children_) { if (s.source().m_name == name) return &s.source(); @@ -373,28 +369,6 @@ void PropertyStream::Source::onWrite (Map&) { } -//------------------------------------------------------------------------------ - -void PropertyStream::Source::remove ( - SharedState::Access& state, SharedState::Access& childState) -{ - bassert (childState->parent == this); - state->children.erase ( - state->children.iterator_to ( - childState->item)); - childState->parent = nullptr; -} - -void PropertyStream::Source::removeAll (SharedState::Access& state) -{ - for (List ::iterator iter (state->children.begin()); - iter != state->children.end();) - { - SharedState::Access childState ((*iter)->m_state); - remove (state, childState); - } -} - //------------------------------------------------------------------------------ // // PropertyStream diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index daf7195df..19c56a294 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -29,6 +29,7 @@ #include #include #include // +#include #include namespace ripple { @@ -52,38 +53,24 @@ class LedgerCleanerImp , public beast::Thread { public: - struct State - { - State() - : minRange (0) - , maxRange (0) - , checkNodes (false) - , fixTxns (false) - , failures (0) - { - } - - // The lowest ledger in the range we're checking. - LedgerIndex minRange; - - // The highest ledger in the range we're checking - LedgerIndex maxRange; - - // Check all state/transaction nodes - bool checkNodes; - - // Rewrite SQL databases - bool fixTxns; - - // Number of errors encountered since last success - int failures; - }; - - using SharedState = beast::SharedData ; - Application& app_; - SharedState m_state; beast::Journal m_journal; + std::mutex lock_; + + // The lowest ledger in the range we're checking. + LedgerIndex minRange_ = 0; + + // The highest ledger in the range we're checking + LedgerIndex maxRange_ = 0; + + // Check all state/transaction nodes + bool checkNodes_ = false; + + // Rewrite SQL databases + bool fixTxns_ = false; + + // Number of errors encountered since last success + int failures_ = 0; //-------------------------------------------------------------------------- @@ -133,19 +120,19 @@ public: void onWrite (beast::PropertyStream::Map& map) { - SharedState::Access state (m_state); + std::lock_guard _(lock_); - if (state->maxRange == 0) + if (maxRange_ == 0) map["status"] = "idle"; else { map["status"] = "running"; - map["min_ledger"] = state->minRange; - map["max_ledger"] = state->maxRange; - map["check_nodes"] = state->checkNodes ? "true" : "false"; - map["fix_txns"] = state->fixTxns ? "true" : "false"; - if (state->failures > 0) - map["fail_counts"] = state->failures; + map["min_ledger"] = minRange_; + map["max_ledger"] = maxRange_; + map["check_nodes"] = checkNodes_ ? "true" : "false"; + map["fix_txns"] = fixTxns_ ? "true" : "false"; + if (failures_ > 0) + map["fail_counts"] = failures_; } } @@ -162,13 +149,13 @@ public: app_.getLedgerMaster().getFullValidatedRange (minRange, maxRange); { - SharedState::Access state (m_state); + std::lock_guard _(lock_); - state->maxRange = maxRange; - state->minRange = minRange; - state->checkNodes = false; - state->fixTxns = false; - state->failures = 0; + maxRange_ = maxRange; + minRange_ = minRange; + checkNodes_ = false; + fixTxns_ = false; + failures_ = 0; /* JSON Parameters: @@ -203,29 +190,29 @@ public: // Quick way to fix a single ledger if (params.isMember(jss::ledger)) { - state->maxRange = params[jss::ledger].asUInt(); - state->minRange = params[jss::ledger].asUInt(); - state->fixTxns = true; - state->checkNodes = true; + maxRange_ = params[jss::ledger].asUInt(); + minRange_ = params[jss::ledger].asUInt(); + fixTxns_ = true; + checkNodes_ = true; } if (params.isMember(jss::max_ledger)) - state->maxRange = params[jss::max_ledger].asUInt(); + maxRange_ = params[jss::max_ledger].asUInt(); if (params.isMember(jss::min_ledger)) - state->minRange = params[jss::min_ledger].asUInt(); + minRange_ = params[jss::min_ledger].asUInt(); if (params.isMember(jss::full)) - state->fixTxns = state->checkNodes = params[jss::full].asBool(); + fixTxns_ = checkNodes_ = params[jss::full].asBool(); if (params.isMember(jss::fix_txns)) - state->fixTxns = params[jss::fix_txns].asBool(); + fixTxns_ = params[jss::fix_txns].asBool(); if (params.isMember(jss::check_nodes)) - state->checkNodes = params[jss::check_nodes].asBool(); + checkNodes_ = params[jss::check_nodes].asBool(); if (params.isMember(jss::stop) && params[jss::stop].asBool()) - state->minRange = state->maxRange = 0; + minRange_ = maxRange_ = 0; } notify(); @@ -411,16 +398,16 @@ public: } { - SharedState::Access state (m_state); - if ((state->minRange > state->maxRange) || - (state->maxRange == 0) || (state->minRange == 0)) + std::lock_guard _(lock_); + if ((minRange_ > maxRange_) || + (maxRange_ == 0) || (minRange_ == 0)) { - state->minRange = state->maxRange = 0; + minRange_ = maxRange_ = 0; return; } - ledgerIndex = state->maxRange; - doNodes = state->checkNodes; - doTxns = state->fixTxns; + ledgerIndex = maxRange_; + doNodes = checkNodes_; + doTxns = fixTxns_; } ledgerHash = getHash(ledgerIndex, goodLedger); @@ -441,8 +428,8 @@ public: if (fail) { { - SharedState::Access state (m_state); - ++state->failures; + std::lock_guard _(lock_); + ++failures_; } // Wait for acquiring to catch up to us std::this_thread::sleep_for(std::chrono::seconds(2)); @@ -450,12 +437,12 @@ public: else { { - SharedState::Access state (m_state); - if (ledgerIndex == state->minRange) - ++state->minRange; - if (ledgerIndex == state->maxRange) - --state->maxRange; - state->failures = 0; + std::lock_guard _(lock_); + if (ledgerIndex == minRange_) + ++minRange_; + if (ledgerIndex == maxRange_) + --maxRange_; + failures_ = 0; } // Reduce I/O pressure and wait for acquiring to catch up to us std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 2c0fd77c4..80a2255ee 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -55,72 +55,46 @@ public: using Slots = std::map >; - using FixedSlots = std::map ; - - // A set of unique Ripple public keys - using Keys = std::set ; - - // A set of non-unique IPAddresses without ports, used - // to filter duplicates when making outgoing connections. - using ConnectedAddresses = std::multiset ; - - struct State - { - State ( - Store* store, - clock_type& clock, - beast::Journal journal) - : stopping (false) - , counts () - , livecache (clock, beast::Journal ( - journal, Reporting::livecache)) - , bootcache (*store, clock, beast::Journal ( - journal, Reporting::bootcache)) - { - } - - // True if we are stopping. - bool stopping; - - // The source we are currently fetching. - // This is used to cancel I/O during program exit. - beast::SharedPtr fetchSource; - - // Configuration settings - Config config; - - // Slot counts and other aggregate statistics. - Counts counts; - - // A list of slots that should always be connected - FixedSlots fixed; - - // Live livecache from mtENDPOINTS messages - Livecache <> livecache; - - // LiveCache of addresses suitable for gaining initial connections - Bootcache bootcache; - - // Holds all counts - Slots slots; - - // The addresses (but not port) we are connected to. This includes - // outgoing connection attempts. Note that this set can contain - // duplicates (since the port is not set) - ConnectedAddresses connected_addresses; - - // Set of public keys belonging to active peers - Keys keys; - }; - - using SharedState = beast::SharedData ; - beast::Journal m_journal; - SharedState m_state; clock_type& m_clock; Store& m_store; Checker& m_checker; + std::recursive_mutex lock_; + + // True if we are stopping. + bool stopping_ = false; + + // The source we are currently fetching. + // This is used to cancel I/O during program exit. + beast::SharedPtr fetchSource_; + + // Configuration settings + Config config_; + + // Slot counts and other aggregate statistics. + Counts counts_; + + // A list of slots that should always be connected + std::map fixed_; + + // Live livecache from mtENDPOINTS messages + Livecache <> livecache_; + + // LiveCache of addresses suitable for gaining initial connections + Bootcache bootcache_; + + // Holds all counts + Slots slots_; + + // The addresses (but not port) we are connected to. This includes + // outgoing connection attempts. Note that this set can contain + // duplicates (since the port is not set) + std::multiset connectedAddresses_; + + // Set of public keys belonging to active peers + std::set keys_; + // A list of dynamic sources to consult as a fallback std::vector > m_sources; @@ -133,10 +107,13 @@ public: Logic (clock_type& clock, Store& store, Checker& checker, beast::Journal journal) : m_journal (journal, Reporting::logic) - , m_state (&store, std::ref (clock), journal) , m_clock (clock) , m_store (store) , m_checker (checker) + , livecache_ (m_clock, + beast::Journal (journal, Reporting::livecache)) + , bootcache_ (store, m_clock, + beast::Journal (journal, Reporting::bootcache)) , m_whenBroadcast (m_clock.now()) , m_squelches (m_clock) { @@ -147,9 +124,8 @@ public: // void load () { - typename SharedState::Access state (m_state); - - state->bootcache.load (); + std::lock_guard _(lock_); + bootcache_.load (); } /** Stop the logic. @@ -160,10 +136,10 @@ public: */ void stop () { - typename SharedState::Access state (m_state); - state->stopping = true; - if (state->fetchSource != nullptr) - state->fetchSource->cancel (); + std::lock_guard _(lock_); + stopping_ = true; + if (fetchSource_ != nullptr) + fetchSource_->cancel (); } //-------------------------------------------------------------------------- @@ -175,16 +151,16 @@ public: void config (Config const& c) { - typename SharedState::Access state (m_state); - state->config = c; - state->counts.onConfig (state->config); + std::lock_guard _(lock_); + config_ = c; + counts_.onConfig (config_); } Config config() { - typename SharedState::Access state (m_state); - return state->config; + std::lock_guard _(lock_); + return config_; } void @@ -200,7 +176,7 @@ public: addFixedPeer (std::string const& name, std::vector const& addresses) { - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); if (addresses.empty ()) { @@ -217,7 +193,7 @@ public: remote_address.to_string ()); } - auto result (state->fixed.emplace (std::piecewise_construct, + auto result (fixed_.emplace (std::piecewise_construct, std::forward_as_tuple (remote_address), std::make_tuple (std::ref (m_clock)))); @@ -241,9 +217,9 @@ public: if (ec == boost::asio::error::operation_aborted) return; - typename SharedState::Access state (m_state); - Slots::iterator const iter (state->slots.find (remoteAddress)); - if (iter == state->slots.end()) + std::lock_guard _(lock_); + auto const iter (slots_.find (remoteAddress)); + if (iter == slots_.end()) { // The slot disconnected before we finished the check if (m_journal.debug) m_journal.debug << beast::leftw (18) << @@ -263,7 +239,7 @@ public: if (m_journal.error) m_journal.error << beast::leftw (18) << "Logic testing " << iter->first << " with error, " << ec.message(); - state->bootcache.on_failure (checkedAddress); + bootcache_.on_failure (checkedAddress); return; } @@ -282,14 +258,14 @@ public: "Logic accept" << remote_endpoint << " on local " << local_endpoint; - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // Check for duplicate connection if (is_public (remote_endpoint)) { - auto const iter = state->connected_addresses.find ( + auto const iter = connectedAddresses_.find ( remote_endpoint.address()); - if (iter != state->connected_addresses.end()) + if (iter != connectedAddresses_.end()) { if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic dropping inbound " << remote_endpoint << @@ -300,18 +276,18 @@ public: // Create the slot SlotImp::ptr const slot (std::make_shared (local_endpoint, - remote_endpoint, fixed (remote_endpoint.address (), state), + remote_endpoint, fixed (remote_endpoint.address ()), m_clock)); // Add slot to table - auto const result (state->slots.emplace ( + auto const result (slots_.emplace ( slot->remote_endpoint (), slot)); // Remote address must not already exist assert (result.second); // Add to the connected address list - state->connected_addresses.emplace (remote_endpoint.address()); + connectedAddresses_.emplace (remote_endpoint.address()); // Update counts - state->counts.add (*slot); + counts_.add (*slot); return result.first->second; } @@ -323,11 +299,11 @@ public: if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic connect " << remote_endpoint; - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // Check for duplicate connection - if (state->slots.find (remote_endpoint) != - state->slots.end ()) + if (slots_.find (remote_endpoint) != + slots_.end ()) { if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic dropping " << remote_endpoint << @@ -337,20 +313,20 @@ public: // Create the slot SlotImp::ptr const slot (std::make_shared ( - remote_endpoint, fixed (remote_endpoint, state), m_clock)); + remote_endpoint, fixed (remote_endpoint), m_clock)); // Add slot to table - std::pair result ( - state->slots.emplace (slot->remote_endpoint (), - slot)); + auto const result = + slots_.emplace (slot->remote_endpoint (), + slot); // Remote address must not already exist assert (result.second); // Add to the connected address list - state->connected_addresses.emplace (remote_endpoint.address()); + connectedAddresses_.emplace (remote_endpoint.address()); // Update counts - state->counts.add (*slot); + counts_.add (*slot); return result.first->second; } @@ -363,18 +339,18 @@ public: "Logic connected" << slot->remote_endpoint () << " on local " << local_endpoint; - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // The object must exist in our table - assert (state->slots.find (slot->remote_endpoint ()) != - state->slots.end ()); + assert (slots_.find (slot->remote_endpoint ()) != + slots_.end ()); // Assign the local endpoint now that it's known slot->local_endpoint (local_endpoint); // Check for self-connect by address { - auto const iter (state->slots.find (local_endpoint)); - if (iter != state->slots.end ()) + auto const iter (slots_.find (local_endpoint)); + if (iter != slots_.end ()) { assert (iter->second->local_endpoint () == slot->remote_endpoint ()); @@ -386,9 +362,9 @@ public: } // Update counts - state->counts.remove (*slot); + counts_.remove (*slot); slot->state (Slot::connected); - state->counts.add (*slot); + counts_.add (*slot); return true; } @@ -400,56 +376,55 @@ public: "Logic handshake " << slot->remote_endpoint () << " with " << (cluster ? "clustered " : "") << "key " << key; - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // The object must exist in our table - assert (state->slots.find (slot->remote_endpoint ()) != - state->slots.end ()); + assert (slots_.find (slot->remote_endpoint ()) != + slots_.end ()); // Must be accepted or connected assert (slot->state() == Slot::accept || slot->state() == Slot::connected); // Check for duplicate connection by key - if (state->keys.find (key) != state->keys.end()) + if (keys_.find (key) != keys_.end()) return Result::duplicate; // See if we have an open space for this slot - if (! state->counts.can_activate (*slot)) + if (! counts_.can_activate (*slot)) { if (! slot->inbound()) - state->bootcache.on_success ( + bootcache_.on_success ( slot->remote_endpoint()); return Result::full; } // Set key and cluster right before adding to the map // otherwise we could assert later when erasing the key. - state->counts.remove (*slot); + counts_.remove (*slot); slot->public_key (key); slot->cluster (cluster); - state->counts.add (*slot); + counts_.add (*slot); // Add the public key to the active set - std::pair const result ( - state->keys.insert (key)); + auto const result = keys_.insert (key); // Public key must not already exist assert (result.second); (void) result.second; // Change state and update counts - state->counts.remove (*slot); + counts_.remove (*slot); slot->activate (m_clock.now ()); - state->counts.add (*slot); + counts_.add (*slot); if (! slot->inbound()) - state->bootcache.on_success ( + bootcache_.on_success ( slot->remote_endpoint()); // Mark fixed slot success if (slot->fixed() && ! slot->inbound()) { - auto iter (state->fixed.find (slot->remote_endpoint())); - assert (iter != state->fixed.end ()); + auto iter (fixed_.find (slot->remote_endpoint())); + assert (iter != fixed_.end ()); iter->second.success (m_clock.now ()); if (m_journal.trace) m_journal.trace << beast::leftw (18) << "Logic fixed " << slot->remote_endpoint () << " success"; @@ -465,12 +440,12 @@ public: std::vector redirect (SlotImp::ptr const& slot) { - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); RedirectHandouts h (slot); - state->livecache.hops.shuffle(); + livecache_.hops.shuffle(); handout (&h, (&h)+1, - state->livecache.hops.begin(), - state->livecache.hops.end()); + livecache_.hops.begin(), + livecache_.hops.end()); return std::move(h.list()); } @@ -485,27 +460,33 @@ public: { std::vector const none; - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // Count how many more outbound attempts to make // - auto needed (state->counts.attempts_needed ()); + auto needed (counts_.attempts_needed ()); if (needed == 0) return none; ConnectHandouts h (needed, m_squelches); // Make sure we don't connect to already-connected entries. - squelch_slots (state); + for (auto const& s : slots_) + { + auto const result (m_squelches.insert ( + s.second->remote_endpoint().address())); + if (! result.second) + m_squelches.touch (result.first); + } // 1. Use Fixed if: // Fixed active count is below fixed count AND // ( There are eligible fixed addresses to try OR // Any outbound attempts are in progress) // - if (state->counts.fixed_active() < state->fixed.size ()) + if (counts_.fixed_active() < fixed_.size ()) { - get_fixed (needed, h.list(), m_squelches, state); + get_fixed (needed, h.list(), m_squelches); if (! h.list().empty ()) { @@ -514,11 +495,11 @@ public: return h.list(); } - if (state->counts.attempts() > 0) + if (counts_.attempts() > 0) { if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic waiting on " << - state->counts.attempts() << " attempts"; + counts_.attempts() << " attempts"; return none; } } @@ -526,8 +507,8 @@ public: // Only proceed if auto connect is enabled and we // have less than the desired number of outbound slots // - if (! state->config.autoConnect || - state->counts.out_active () >= state->counts.out_max ()) + if (! config_.autoConnect || + counts_.out_active () >= counts_.out_max ()) return none; // 2. Use Livecache if: @@ -535,10 +516,10 @@ public: // Any outbound attempts are in progress // { - state->livecache.hops.shuffle(); + livecache_.hops.shuffle(); handout (&h, (&h)+1, - state->livecache.hops.rbegin(), - state->livecache.hops.rend()); + livecache_.hops.rbegin(), + livecache_.hops.rend()); if (! h.list().empty ()) { if (m_journal.debug) m_journal.debug << beast::leftw (18) << @@ -546,11 +527,11 @@ public: ((h.list().size () > 1) ? "endpoints" : "endpoint"); return h.list(); } - else if (state->counts.attempts() > 0) + else if (counts_.attempts() > 0) { if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic waiting on " << - state->counts.attempts() << " attempts"; + counts_.attempts() << " attempts"; return none; } } @@ -571,8 +552,8 @@ public: // 4. Use Bootcache if: // There are any entries we haven't tried lately // - for (auto iter (state->bootcache.begin()); - ! h.full() && iter != state->bootcache.end(); ++iter) + for (auto iter (bootcache_.begin()); + ! h.full() && iter != bootcache_.end(); ++iter) h.try_insert (*iter); if (! h.list().empty ()) @@ -592,7 +573,7 @@ public: { std::vector>> result; - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); clock_type::time_point const now = m_clock.now(); if (m_whenBroadcast <= now) @@ -602,8 +583,8 @@ public: { // build list of active slots std::vector slots; - slots.reserve (state->slots.size()); - std::for_each (state->slots.cbegin(), state->slots.cend(), + slots.reserve (slots_.size()); + std::for_each (slots_.cbegin(), slots_.cend(), [&slots](Slots::value_type const& value) { if (value.second->state() == Slot::active) @@ -632,23 +613,23 @@ public: // 2. We have slots // 3. We haven't failed the firewalled test // - if (state->config.wantIncoming && - state->counts.inboundSlots() > 0) + if (config_.wantIncoming && + counts_.inboundSlots() > 0) { Endpoint ep; ep.hops = 0; ep.address = beast::IP::Endpoint ( beast::IP::AddressV4 ()).at_port ( - state->config.listeningPort); + config_.listeningPort); for (auto& t : targets) t.insert (ep); } // build sequence of endpoints by hops - state->livecache.hops.shuffle(); + livecache_.hops.shuffle(); handout (targets.begin(), targets.end(), - state->livecache.hops.begin(), - state->livecache.hops.end()); + livecache_.hops.begin(), + livecache_.hops.end()); // broadcast for (auto const& t : targets) @@ -670,30 +651,29 @@ public: void once_per_second() { - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // Expire the Livecache - state->livecache.expire (); + livecache_.expire (); // Expire the recent cache in each slot - for (auto const& entry : state->slots) + for (auto const& entry : slots_) entry.second->expire(); // Expire the recent attempts table beast::expire (m_squelches, Tuning::recentAttemptDuration); - state->bootcache.periodicActivity (); + bootcache_.periodicActivity (); } //-------------------------------------------------------------------------- // Validate and clean up the list that we received from the slot. - void preprocess (SlotImp::ptr const& slot, Endpoints& list, - typename SharedState::Access& state) + void preprocess (SlotImp::ptr const& slot, Endpoints& list) { bool neighbor (false); - for (auto iter (list.begin()); iter != list.end();) + for (auto iter = list.begin(); iter != list.end();) { Endpoint& ep (*iter); @@ -767,16 +747,16 @@ public: " contained " << list.size () << ((list.size() > 1) ? " entries" : " entry"); - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // The object must exist in our table - assert (state->slots.find (slot->remote_endpoint ()) != - state->slots.end ()); + assert (slots_.find (slot->remote_endpoint ()) != + slots_.end ()); // Must be handshaked! assert (slot->state() == Slot::active); - preprocess (slot, list, state); + preprocess (slot, list); clock_type::time_point const now (m_clock.now()); @@ -827,8 +807,8 @@ public: // listening test, else we silently drop their messsage // since their listening port is misconfigured. // - state->livecache.insert (ep); - state->bootcache.insert (ep.address); + livecache_.insert (ep); + bootcache_.insert (ep.address); } slot->whenAcceptEndpoints = now + Tuning::secondsPerMessage; @@ -839,52 +819,52 @@ public: void on_legacy_endpoints (IPAddresses const& list) { // Ignoring them also seems a valid choice. - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); for (IPAddresses::const_iterator iter (list.begin()); iter != list.end(); ++iter) - state->bootcache.insert (*iter); + bootcache_.insert (*iter); } - void remove (SlotImp::ptr const& slot, typename SharedState::Access& state) + void remove (SlotImp::ptr const& slot) { - Slots::iterator const iter (state->slots.find ( - slot->remote_endpoint ())); + auto const iter = slots_.find ( + slot->remote_endpoint ()); // The slot must exist in the table - assert (iter != state->slots.end ()); + assert (iter != slots_.end ()); // Remove from slot by IP table - state->slots.erase (iter); + slots_.erase (iter); // Remove the key if present if (slot->public_key () != boost::none) { - Keys::iterator const iter (state->keys.find (*slot->public_key())); + auto const iter = keys_.find (*slot->public_key()); // Key must exist - assert (iter != state->keys.end ()); - state->keys.erase (iter); + assert (iter != keys_.end ()); + keys_.erase (iter); } // Remove from connected address table { - auto const iter (state->connected_addresses.find ( + auto const iter (connectedAddresses_.find ( slot->remote_endpoint().address())); // Address must exist - assert (iter != state->connected_addresses.end ()); - state->connected_addresses.erase (iter); + assert (iter != connectedAddresses_.end ()); + connectedAddresses_.erase (iter); } // Update counts - state->counts.remove (*slot); + counts_.remove (*slot); } void on_closed (SlotImp::ptr const& slot) { - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); - remove (slot, state); + remove (slot); // Mark fixed slot failure if (slot->fixed() && ! slot->inbound() && slot->state() != Slot::active) { - auto iter (state->fixed.find (slot->remote_endpoint())); - assert (iter != state->fixed.end ()); + auto iter (fixed_.find (slot->remote_endpoint())); + assert (iter != fixed_.end ()); iter->second.failure (m_clock.now ()); if (m_journal.debug) m_journal.debug << beast::leftw (18) << "Logic fixed " << slot->remote_endpoint () << " failed"; @@ -900,7 +880,7 @@ public: case Slot::connect: case Slot::connected: - state->bootcache.on_failure (slot->remote_endpoint ()); + bootcache_.on_failure (slot->remote_endpoint ()); // VFALCO TODO If the address exists in the ephemeral/live // endpoint livecache then we should mark the failure // as if it didn't pass the listening test. We should also @@ -925,9 +905,9 @@ public: void on_failure (SlotImp::ptr const& slot) { - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); - state->bootcache.on_failure (slot->remote_endpoint ()); + bootcache_.on_failure (slot->remote_endpoint ()); } // Insert a set of redirect IP addresses into the Bootcache @@ -939,9 +919,10 @@ public: //-------------------------------------------------------------------------- // Returns `true` if the address matches a fixed slot address - bool fixed (beast::IP::Endpoint const& endpoint, typename SharedState::Access& state) const + // Must have the lock held + bool fixed (beast::IP::Endpoint const& endpoint) const { - for (auto const& entry : state->fixed) + for (auto const& entry : fixed_) if (entry.first == endpoint) return true; return false; @@ -949,9 +930,10 @@ public: // Returns `true` if the address matches a fixed slot address // Note that this does not use the port information in the IP::Endpoint - bool fixed (beast::IP::Address const& address, typename SharedState::Access& state) const + // Must have the lock held + bool fixed (beast::IP::Address const& address) const { - for (auto const& entry : state->fixed) + for (auto const& entry : fixed_) if (entry.first.address () == address) return true; return false; @@ -966,17 +948,16 @@ public: /** Adds eligible Fixed addresses for outbound attempts. */ template void get_fixed (std::size_t needed, Container& c, - typename ConnectHandouts::Squelches& squelches, - typename SharedState::Access& state) + typename ConnectHandouts::Squelches& squelches) { auto const now (m_clock.now()); - for (auto iter = state->fixed.begin (); - needed && iter != state->fixed.end (); ++iter) + for (auto iter = fixed_.begin (); + needed && iter != fixed_.end (); ++iter) { auto const& address (iter->first.address()); if (iter->second.when() <= now && squelches.find(address) == squelches.end() && std::none_of ( - state->slots.cbegin(), state->slots.cend(), + slots_.cbegin(), slots_.cend(), [address](Slots::value_type const& v) { return address == v.first.address(); @@ -991,20 +972,6 @@ public: //-------------------------------------------------------------------------- - // Adds slot addresses to the squelched set - void squelch_slots (typename SharedState::Access& state) - { - for (auto const& s : state->slots) - { - auto const result (m_squelches.insert ( - s.second->remote_endpoint().address())); - if (! result.second) - m_squelches.touch (result.first); - } - } - - //-------------------------------------------------------------------------- - void addStaticSource (beast::SharedPtr const& source) { @@ -1023,25 +990,16 @@ public: // //-------------------------------------------------------------------------- - // Add one address. - // Returns `true` if the address is new. - // - bool addBootcacheAddress (beast::IP::Endpoint const& address, - typename SharedState::Access& state) - { - return state->bootcache.insert (address); - } - // Add a set of addresses. // Returns the number of addresses added. // int addBootcacheAddresses (IPAddresses const& list) { int count (0); - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); for (auto addr : list) { - if (addBootcacheAddress (addr, state)) + if (bootcache_.insert (addr)) ++count; } return count; @@ -1054,10 +1012,10 @@ public: { { - typename SharedState::Access state (m_state); - if (state->stopping) + std::lock_guard _(lock_); + if (stopping_) return; - state->fetchSource = source; + fetchSource_ = source; } // VFALCO NOTE The fetch is synchronous, @@ -1066,10 +1024,10 @@ public: source->fetch (results, m_journal); { - typename SharedState::Access state (m_state); - if (state->stopping) + std::lock_guard _(lock_); + if (stopping_) return; - state->fetchSource = nullptr; + fetchSource_ = nullptr; } } @@ -1135,37 +1093,37 @@ public: void onWrite (beast::PropertyStream::Map& map) { - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); // VFALCO NOTE These ugly casts are needed because // of how std::size_t is declared on some linuxes // - map ["bootcache"] = std::uint32_t (state->bootcache.size()); - map ["fixed"] = std::uint32_t (state->fixed.size()); + map ["bootcache"] = std::uint32_t (bootcache_.size()); + map ["fixed"] = std::uint32_t (fixed_.size()); { beast::PropertyStream::Set child ("peers", map); - writeSlots (child, state->slots); + writeSlots (child, slots_); } { beast::PropertyStream::Map child ("counts", map); - state->counts.onWrite (child); + counts_.onWrite (child); } { beast::PropertyStream::Map child ("config", map); - state->config.onWrite (child); + config_.onWrite (child); } { beast::PropertyStream::Map child ("livecache", map); - state->livecache.onWrite (child); + livecache_.onWrite (child); } { beast::PropertyStream::Map child ("bootcache", map); - state->bootcache.onWrite (child); + bootcache_.onWrite (child); } } @@ -1175,14 +1133,9 @@ public: // //-------------------------------------------------------------------------- - State const& state () const - { - return *typename SharedState::ConstAccess (m_state); - } - Counts const& counts () const { - return typename SharedState::ConstAccess (m_state)->counts; + return counts_; } static std::string stateString (Slot::State state) @@ -1209,10 +1162,10 @@ void Logic::onRedirects (FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const& remote_address) { - typename SharedState::Access state (m_state); + std::lock_guard _(lock_); std::size_t n = 0; for(;first != last && n < Tuning::maxRedirects; ++first, ++n) - state->bootcache.insert( + bootcache_.insert( beast::IPAddressConversion::from_asio(*first)); if (n > 0) if (m_journal.trace) m_journal.trace << beast::leftw (18) << diff --git a/src/ripple/resource/impl/Logic.h b/src/ripple/resource/impl/Logic.h index 8a493db21..fa89baa65 100644 --- a/src/ripple/resource/impl/Logic.h +++ b/src/ripple/resource/impl/Logic.h @@ -29,8 +29,8 @@ #include #include #include -#include #include +#include namespace ripple { namespace Resource { @@ -43,33 +43,6 @@ private: using Table = hash_map ; using EntryIntrusiveList = beast::List ; - struct State - { - // Table of all entries - Table table; - - // Because the following are intrusive lists, a given Entry may be in - // at most list at a given instant. The Entry must be removed from - // one list before placing it in another. - - // List of all active inbound entries - EntryIntrusiveList inbound; - - // List of all active outbound entries - EntryIntrusiveList outbound; - - // List of all active admin entries - EntryIntrusiveList admin; - - // List of all inactve entries - EntryIntrusiveList inactive; - - // All imported gossip data - Imports import_table; - }; - - using SharedState = beast::SharedData ; - struct Stats { Stats (beast::insight::Collector::ptr const& collector) @@ -82,11 +55,34 @@ private: beast::insight::Meter drop; }; - SharedState m_state; Stats m_stats; Stopwatch& m_clock; beast::Journal m_journal; + std::recursive_mutex lock_; + + // Table of all entries + Table table_; + + // Because the following are intrusive lists, a given Entry may be in + // at most list at a given instant. The Entry must be removed from + // one list before placing it in another. + + // List of all active inbound entries + EntryIntrusiveList inbound_; + + // List of all active outbound entries + EntryIntrusiveList outbound_; + + // List of all active admin entries + EntryIntrusiveList admin_; + + // List of all inactve entries + EntryIntrusiveList inactive_; + + // All imported gossip data + Imports importTable_; + //-------------------------------------------------------------------------- public: @@ -105,9 +101,8 @@ public: // Order matters here as well, the import table has to be // destroyed before the consumer table. // - SharedState::UnlockedAccess state (m_state); - state->import_table.clear(); - state->table.clear(); + importTable_.clear(); + table_.clear(); } Consumer newInboundEndpoint (beast::IP::Endpoint const& address) @@ -115,11 +110,11 @@ public: Entry* entry (nullptr); { - SharedState::Access state (m_state); - std::pair result ( - state->table.emplace (std::piecewise_construct, + std::lock_guard _(lock_); + auto result = + table_.emplace (std::piecewise_construct, std::make_tuple (kindInbound, address.at_port (0)), // Key - std::make_tuple (m_clock.now()))); // Entry + std::make_tuple (m_clock.now())); // Entry entry = &result.first->second; entry->key = &result.first->first; @@ -128,10 +123,10 @@ public: { if (! result.second) { - state->inactive.erase ( - state->inactive.iterator_to (*entry)); + inactive_.erase ( + inactive_.iterator_to (*entry)); } - state->inbound.push_back (*entry); + inbound_.push_back (*entry); } } @@ -146,11 +141,11 @@ public: Entry* entry (nullptr); { - SharedState::Access state (m_state); - std::pair result ( - state->table.emplace (std::piecewise_construct, + std::lock_guard _(lock_); + auto result = + table_.emplace (std::piecewise_construct, std::make_tuple (kindOutbound, address), // Key - std::make_tuple (m_clock.now()))); // Entry + std::make_tuple (m_clock.now())); // Entry entry = &result.first->second; entry->key = &result.first->first; @@ -158,9 +153,9 @@ public: if (entry->refcount == 1) { if (! result.second) - state->inactive.erase ( - state->inactive.iterator_to (*entry)); - state->outbound.push_back (*entry); + inactive_.erase ( + inactive_.iterator_to (*entry)); + outbound_.push_back (*entry); } } @@ -175,11 +170,11 @@ public: Entry* entry (nullptr); { - SharedState::Access state (m_state); - std::pair result ( - state->table.emplace (std::piecewise_construct, + std::lock_guard _(lock_); + auto result = + table_.emplace (std::piecewise_construct, std::make_tuple (kindAdmin, name), // Key - std::make_tuple (m_clock.now()))); // Entry + std::make_tuple (m_clock.now())); // Entry entry = &result.first->second; entry->key = &result.first->first; @@ -187,9 +182,9 @@ public: if (entry->refcount == 1) { if (! result.second) - state->inactive.erase ( - state->inactive.iterator_to (*entry)); - state->admin.push_back (*entry); + inactive_.erase ( + inactive_.iterator_to (*entry)); + admin_.push_back (*entry); } } @@ -207,11 +202,11 @@ public: Entry* entry (nullptr); { - SharedState::Access state (m_state); - std::pair result ( - state->table.emplace (std::piecewise_construct, + std::lock_guard _(lock_); + auto result = + table_.emplace (std::piecewise_construct, std::make_tuple (kindAdmin, name), // Key - std::make_tuple (m_clock.now()))); // Entry + std::make_tuple (m_clock.now())); // Entry entry = &result.first->second; entry->key = &result.first->first; @@ -219,12 +214,12 @@ public: if (entry->refcount == 1) { if (! result.second) - state->inactive.erase ( - state->inactive.iterator_to (*entry)); - state->admin.push_back (*entry); + inactive_.erase ( + inactive_.iterator_to (*entry)); + admin_.push_back (*entry); } - release (prior, state); + release (prior); } return *entry; @@ -241,9 +236,9 @@ public: clock_type::time_point const now (m_clock.now()); Json::Value ret (Json::objectValue); - SharedState::Access state (m_state); + std::lock_guard _(lock_); - for (auto& inboundEntry : state->inbound) + for (auto& inboundEntry : inbound_) { int localBalance = inboundEntry.local_balance.value (now); if ((localBalance + inboundEntry.remote_balance) >= threshold) @@ -255,7 +250,7 @@ public: } } - for (auto& outboundEntry : state->outbound) + for (auto& outboundEntry : outbound_) { int localBalance = outboundEntry.local_balance.value (now); if ((localBalance + outboundEntry.remote_balance) >= threshold) @@ -267,7 +262,7 @@ public: } } - for (auto& adminEntry : state->admin) + for (auto& adminEntry : admin_) { int localBalance = adminEntry.local_balance.value (now); if ((localBalance + adminEntry.remote_balance) >= threshold) @@ -288,11 +283,11 @@ public: clock_type::time_point const now (m_clock.now()); Gossip gossip; - SharedState::Access state (m_state); + std::lock_guard _(lock_); - gossip.items.reserve (state->inbound.size()); + gossip.items.reserve (inbound_.size()); - for (auto& inboundEntry : state->inbound) + for (auto& inboundEntry : inbound_) { Gossip::Item item; item.balance = inboundEntry.local_balance.value (now); @@ -312,11 +307,11 @@ public: { clock_type::rep const elapsed (m_clock.now().time_since_epoch().count()); { - SharedState::Access state (m_state); - std::pair result ( - state->import_table.emplace (std::piecewise_construct, + std::lock_guard _(lock_); + auto result = + importTable_.emplace (std::piecewise_construct, std::make_tuple(origin), // Key - std::make_tuple(m_clock.now().time_since_epoch().count()))); // Import + std::make_tuple(m_clock.now().time_since_epoch().count())); // Import if (result.second) { @@ -368,19 +363,19 @@ public: // void periodicActivity () { - SharedState::Access state (m_state); + std::lock_guard _(lock_); clock_type::rep const elapsed (m_clock.now().time_since_epoch().count()); - for (auto iter (state->inactive.begin()); iter != state->inactive.end();) + for (auto iter (inactive_.begin()); iter != inactive_.end();) { if (iter->whenExpires <= elapsed) { m_journal.debug << "Expired " << *iter; - Table::iterator table_iter ( - state->table.find (*iter->key)); + auto table_iter = + table_.find (*iter->key); ++iter; - erase (table_iter, state); + erase (table_iter); } else { @@ -388,8 +383,8 @@ public: } } - Imports::iterator iter (state->import_table.begin()); - while (iter != state->import_table.end()) + auto iter = importTable_.begin(); + while (iter != importTable_.end()) { Import& import (iter->second); if (iter->second.whenExpires <= elapsed) @@ -400,7 +395,7 @@ public: item_iter->consumer.entry().remote_balance -= item_iter->balance; } - iter = state->import_table.erase (iter); + iter = importTable_.erase (iter); } else ++iter; @@ -421,13 +416,25 @@ public: return Disposition::ok; } - void acquire (Entry& entry, SharedState::Access& state) + void erase (Table::iterator iter) { + std::lock_guard _(lock_); + Entry& entry (iter->second); + assert (entry.refcount == 0); + inactive_.erase ( + inactive_.iterator_to (entry)); + table_.erase (iter); + } + + void acquire (Entry& entry) + { + std::lock_guard _(lock_); ++entry.refcount; } - void release (Entry& entry, SharedState::Access& state) + void release (Entry& entry) { + std::lock_guard _(lock_); if (--entry.refcount == 0) { m_journal.debug << @@ -436,37 +443,29 @@ public: switch (entry.key->kind) { case kindInbound: - state->inbound.erase ( - state->inbound.iterator_to (entry)); + inbound_.erase ( + inbound_.iterator_to (entry)); break; case kindOutbound: - state->outbound.erase ( - state->outbound.iterator_to (entry)); + outbound_.erase ( + outbound_.iterator_to (entry)); break; case kindAdmin: - state->admin.erase ( - state->admin.iterator_to (entry)); + admin_.erase ( + admin_.iterator_to (entry)); break; default: bassertfalse; break; } - state->inactive.push_back (entry); + inactive_.push_back (entry); entry.whenExpires = m_clock.now().time_since_epoch().count() + secondsUntilExpiration; } } - void erase (Table::iterator iter, SharedState::Access& state) - { - Entry& entry (iter->second); - bassert (entry.refcount == 0); - state->inactive.erase ( - state->inactive.iterator_to (entry)); - state->table.erase (iter); - } - - Disposition charge (Entry& entry, Charge const& fee, SharedState::Access& state) + Disposition charge (Entry& entry, Charge const& fee) { + std::lock_guard _(lock_); clock_type::time_point const now (m_clock.now()); int const balance (entry.add (fee.cost(), now)); m_journal.trace << @@ -474,30 +473,35 @@ public: return disposition (balance); } - bool warn (Entry& entry, SharedState::Access& state) + bool warn (Entry& entry) { + if (entry.admin()) + return false; + + std::lock_guard _(lock_); bool notify (false); clock_type::rep const elapsed (m_clock.now().time_since_epoch().count()); if (entry.balance (m_clock.now()) >= warningThreshold && elapsed != entry.lastWarningTime) { - charge (entry, feeWarning, state); + charge (entry, feeWarning); notify = true; entry.lastWarningTime = elapsed; } - if (notify) m_journal.info << "Load warning: " << entry; - if (notify) ++m_stats.warn; - return notify; } - bool disconnect (Entry& entry, SharedState::Access& state) + bool disconnect (Entry& entry) { + if (entry.admin()) + return false; + + std::lock_guard _(lock_); bool drop (false); clock_type::time_point const now (m_clock.now()); int const balance (entry.balance (now)); @@ -511,60 +515,17 @@ public: // Adding feeDrop at this point keeps the dropped connection // from re-connecting for at least a little while after it is // dropped. - charge (entry, feeDrop, state); + charge (entry, feeDrop); ++m_stats.drop; drop = true; } return drop; } - int balance (Entry& entry, SharedState::Access& state) - { - return entry.balance (m_clock.now()); - } - - //-------------------------------------------------------------------------- - - void acquire (Entry& entry) - { - SharedState::Access state (m_state); - acquire (entry, state); - } - - void release (Entry& entry) - { - SharedState::Access state (m_state); - release (entry, state); - } - - Disposition charge (Entry& entry, Charge const& fee) - { - SharedState::Access state (m_state); - return charge (entry, fee, state); - } - - bool warn (Entry& entry) - { - if (entry.admin()) - return false; - - SharedState::Access state (m_state); - return warn (entry, state); - } - - bool disconnect (Entry& entry) - { - if (entry.admin()) - return false; - - SharedState::Access state (m_state); - return disconnect (entry, state); - } - int balance (Entry& entry) { - SharedState::Access state (m_state); - return balance (entry, state); + std::lock_guard _(lock_); + return entry.balance (m_clock.now()); } //-------------------------------------------------------------------------- @@ -590,26 +551,26 @@ public: { clock_type::time_point const now (m_clock.now()); - SharedState::Access state (m_state); + std::lock_guard _(lock_); { beast::PropertyStream::Set s ("inbound", map); - writeList (now, s, state->inbound); + writeList (now, s, inbound_); } { beast::PropertyStream::Set s ("outbound", map); - writeList (now, s, state->outbound); + writeList (now, s, outbound_); } { beast::PropertyStream::Set s ("admin", map); - writeList (now, s, state->admin); + writeList (now, s, admin_); } { beast::PropertyStream::Set s ("inactive", map); - writeList (now, s, state->inactive); + writeList (now, s, inactive_); } } }; diff --git a/src/ripple/server/impl/ServerImpl.h b/src/ripple/server/impl/ServerImpl.h index 8b4f3fc87..e6c7dbe81 100644 --- a/src/ripple/server/impl/ServerImpl.h +++ b/src/ripple/server/impl/ServerImpl.h @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include