Remove beast::SharedData

This commit is contained in:
Nik Bougalis
2015-10-10 19:34:59 -07:00
parent 0e7c8ce554
commit b7c3b96516
11 changed files with 413 additions and 859 deletions

View File

@@ -29,6 +29,7 @@
#include <ripple/protocol/RippleLedgerHash.h>
#include <beast/threads/Thread.h>
#include <beast/cxx14/memory.h> // <memory>
#include <mutex>
#include <thread>
namespace ripple {
@@ -52,38 +53,24 @@ class LedgerCleanerImp
, public beast::Thread
{
public:
struct State
{
State()
: minRange (0)
, maxRange (0)
, checkNodes (false)
, fixTxns (false)
, failures (0)
{
}
// The lowest ledger in the range we're checking.
LedgerIndex minRange;
// The highest ledger in the range we're checking
LedgerIndex maxRange;
// Check all state/transaction nodes
bool checkNodes;
// Rewrite SQL databases
bool fixTxns;
// Number of errors encountered since last success
int failures;
};
using SharedState = beast::SharedData <State>;
Application& app_;
SharedState m_state;
beast::Journal m_journal;
std::mutex lock_;
// The lowest ledger in the range we're checking.
LedgerIndex minRange_ = 0;
// The highest ledger in the range we're checking
LedgerIndex maxRange_ = 0;
// Check all state/transaction nodes
bool checkNodes_ = false;
// Rewrite SQL databases
bool fixTxns_ = false;
// Number of errors encountered since last success
int failures_ = 0;
//--------------------------------------------------------------------------
@@ -133,19 +120,19 @@ public:
void onWrite (beast::PropertyStream::Map& map)
{
SharedState::Access state (m_state);
std::lock_guard<std::mutex> _(lock_);
if (state->maxRange == 0)
if (maxRange_ == 0)
map["status"] = "idle";
else
{
map["status"] = "running";
map["min_ledger"] = state->minRange;
map["max_ledger"] = state->maxRange;
map["check_nodes"] = state->checkNodes ? "true" : "false";
map["fix_txns"] = state->fixTxns ? "true" : "false";
if (state->failures > 0)
map["fail_counts"] = state->failures;
map["min_ledger"] = minRange_;
map["max_ledger"] = maxRange_;
map["check_nodes"] = checkNodes_ ? "true" : "false";
map["fix_txns"] = fixTxns_ ? "true" : "false";
if (failures_ > 0)
map["fail_counts"] = failures_;
}
}
@@ -162,13 +149,13 @@ public:
app_.getLedgerMaster().getFullValidatedRange (minRange, maxRange);
{
SharedState::Access state (m_state);
std::lock_guard<std::mutex> _(lock_);
state->maxRange = maxRange;
state->minRange = minRange;
state->checkNodes = false;
state->fixTxns = false;
state->failures = 0;
maxRange_ = maxRange;
minRange_ = minRange;
checkNodes_ = false;
fixTxns_ = false;
failures_ = 0;
/*
JSON Parameters:
@@ -203,29 +190,29 @@ public:
// Quick way to fix a single ledger
if (params.isMember(jss::ledger))
{
state->maxRange = params[jss::ledger].asUInt();
state->minRange = params[jss::ledger].asUInt();
state->fixTxns = true;
state->checkNodes = true;
maxRange_ = params[jss::ledger].asUInt();
minRange_ = params[jss::ledger].asUInt();
fixTxns_ = true;
checkNodes_ = true;
}
if (params.isMember(jss::max_ledger))
state->maxRange = params[jss::max_ledger].asUInt();
maxRange_ = params[jss::max_ledger].asUInt();
if (params.isMember(jss::min_ledger))
state->minRange = params[jss::min_ledger].asUInt();
minRange_ = params[jss::min_ledger].asUInt();
if (params.isMember(jss::full))
state->fixTxns = state->checkNodes = params[jss::full].asBool();
fixTxns_ = checkNodes_ = params[jss::full].asBool();
if (params.isMember(jss::fix_txns))
state->fixTxns = params[jss::fix_txns].asBool();
fixTxns_ = params[jss::fix_txns].asBool();
if (params.isMember(jss::check_nodes))
state->checkNodes = params[jss::check_nodes].asBool();
checkNodes_ = params[jss::check_nodes].asBool();
if (params.isMember(jss::stop) && params[jss::stop].asBool())
state->minRange = state->maxRange = 0;
minRange_ = maxRange_ = 0;
}
notify();
@@ -411,16 +398,16 @@ public:
}
{
SharedState::Access state (m_state);
if ((state->minRange > state->maxRange) ||
(state->maxRange == 0) || (state->minRange == 0))
std::lock_guard<std::mutex> _(lock_);
if ((minRange_ > maxRange_) ||
(maxRange_ == 0) || (minRange_ == 0))
{
state->minRange = state->maxRange = 0;
minRange_ = maxRange_ = 0;
return;
}
ledgerIndex = state->maxRange;
doNodes = state->checkNodes;
doTxns = state->fixTxns;
ledgerIndex = maxRange_;
doNodes = checkNodes_;
doTxns = fixTxns_;
}
ledgerHash = getHash(ledgerIndex, goodLedger);
@@ -441,8 +428,8 @@ public:
if (fail)
{
{
SharedState::Access state (m_state);
++state->failures;
std::lock_guard<std::mutex> _(lock_);
++failures_;
}
// Wait for acquiring to catch up to us
std::this_thread::sleep_for(std::chrono::seconds(2));
@@ -450,12 +437,12 @@ public:
else
{
{
SharedState::Access state (m_state);
if (ledgerIndex == state->minRange)
++state->minRange;
if (ledgerIndex == state->maxRange)
--state->maxRange;
state->failures = 0;
std::lock_guard<std::mutex> _(lock_);
if (ledgerIndex == minRange_)
++minRange_;
if (ledgerIndex == maxRange_)
--maxRange_;
failures_ = 0;
}
// Reduce I/O pressure and wait for acquiring to catch up to us
std::this_thread::sleep_for(std::chrono::milliseconds(100));

View File

@@ -55,72 +55,46 @@ public:
using Slots = std::map <beast::IP::Endpoint,
std::shared_ptr <SlotImp>>;
using FixedSlots = std::map <beast::IP::Endpoint, Fixed>;
// A set of unique Ripple public keys
using Keys = std::set <RipplePublicKey>;
// A set of non-unique IPAddresses without ports, used
// to filter duplicates when making outgoing connections.
using ConnectedAddresses = std::multiset <beast::IP::Address>;
struct State
{
State (
Store* store,
clock_type& clock,
beast::Journal journal)
: stopping (false)
, counts ()
, livecache (clock, beast::Journal (
journal, Reporting::livecache))
, bootcache (*store, clock, beast::Journal (
journal, Reporting::bootcache))
{
}
// True if we are stopping.
bool stopping;
// The source we are currently fetching.
// This is used to cancel I/O during program exit.
beast::SharedPtr <Source> fetchSource;
// Configuration settings
Config config;
// Slot counts and other aggregate statistics.
Counts counts;
// A list of slots that should always be connected
FixedSlots fixed;
// Live livecache from mtENDPOINTS messages
Livecache <> livecache;
// LiveCache of addresses suitable for gaining initial connections
Bootcache bootcache;
// Holds all counts
Slots slots;
// The addresses (but not port) we are connected to. This includes
// outgoing connection attempts. Note that this set can contain
// duplicates (since the port is not set)
ConnectedAddresses connected_addresses;
// Set of public keys belonging to active peers
Keys keys;
};
using SharedState = beast::SharedData <State>;
beast::Journal m_journal;
SharedState m_state;
clock_type& m_clock;
Store& m_store;
Checker& m_checker;
std::recursive_mutex lock_;
// True if we are stopping.
bool stopping_ = false;
// The source we are currently fetching.
// This is used to cancel I/O during program exit.
beast::SharedPtr <Source> fetchSource_;
// Configuration settings
Config config_;
// Slot counts and other aggregate statistics.
Counts counts_;
// A list of slots that should always be connected
std::map <beast::IP::Endpoint, Fixed> fixed_;
// Live livecache from mtENDPOINTS messages
Livecache <> livecache_;
// LiveCache of addresses suitable for gaining initial connections
Bootcache bootcache_;
// Holds all counts
Slots slots_;
// The addresses (but not port) we are connected to. This includes
// outgoing connection attempts. Note that this set can contain
// duplicates (since the port is not set)
std::multiset <beast::IP::Address> connectedAddresses_;
// Set of public keys belonging to active peers
std::set <RipplePublicKey> keys_;
// A list of dynamic sources to consult as a fallback
std::vector <beast::SharedPtr <Source>> m_sources;
@@ -133,10 +107,13 @@ public:
Logic (clock_type& clock, Store& store,
Checker& checker, beast::Journal journal)
: m_journal (journal, Reporting::logic)
, m_state (&store, std::ref (clock), journal)
, m_clock (clock)
, m_store (store)
, m_checker (checker)
, livecache_ (m_clock,
beast::Journal (journal, Reporting::livecache))
, bootcache_ (store, m_clock,
beast::Journal (journal, Reporting::bootcache))
, m_whenBroadcast (m_clock.now())
, m_squelches (m_clock)
{
@@ -147,9 +124,8 @@ public:
//
void load ()
{
typename SharedState::Access state (m_state);
state->bootcache.load ();
std::lock_guard<std::recursive_mutex> _(lock_);
bootcache_.load ();
}
/** Stop the logic.
@@ -160,10 +136,10 @@ public:
*/
void stop ()
{
typename SharedState::Access state (m_state);
state->stopping = true;
if (state->fetchSource != nullptr)
state->fetchSource->cancel ();
std::lock_guard<std::recursive_mutex> _(lock_);
stopping_ = true;
if (fetchSource_ != nullptr)
fetchSource_->cancel ();
}
//--------------------------------------------------------------------------
@@ -175,16 +151,16 @@ public:
void
config (Config const& c)
{
typename SharedState::Access state (m_state);
state->config = c;
state->counts.onConfig (state->config);
std::lock_guard<std::recursive_mutex> _(lock_);
config_ = c;
counts_.onConfig (config_);
}
Config
config()
{
typename SharedState::Access state (m_state);
return state->config;
std::lock_guard<std::recursive_mutex> _(lock_);
return config_;
}
void
@@ -200,7 +176,7 @@ public:
addFixedPeer (std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
if (addresses.empty ())
{
@@ -217,7 +193,7 @@ public:
remote_address.to_string ());
}
auto result (state->fixed.emplace (std::piecewise_construct,
auto result (fixed_.emplace (std::piecewise_construct,
std::forward_as_tuple (remote_address),
std::make_tuple (std::ref (m_clock))));
@@ -241,9 +217,9 @@ public:
if (ec == boost::asio::error::operation_aborted)
return;
typename SharedState::Access state (m_state);
Slots::iterator const iter (state->slots.find (remoteAddress));
if (iter == state->slots.end())
std::lock_guard<std::recursive_mutex> _(lock_);
auto const iter (slots_.find (remoteAddress));
if (iter == slots_.end())
{
// The slot disconnected before we finished the check
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
@@ -263,7 +239,7 @@ public:
if (m_journal.error) m_journal.error << beast::leftw (18) <<
"Logic testing " << iter->first << " with error, " <<
ec.message();
state->bootcache.on_failure (checkedAddress);
bootcache_.on_failure (checkedAddress);
return;
}
@@ -282,14 +258,14 @@ public:
"Logic accept" << remote_endpoint <<
" on local " << local_endpoint;
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// Check for duplicate connection
if (is_public (remote_endpoint))
{
auto const iter = state->connected_addresses.find (
auto const iter = connectedAddresses_.find (
remote_endpoint.address());
if (iter != state->connected_addresses.end())
if (iter != connectedAddresses_.end())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic dropping inbound " << remote_endpoint <<
@@ -300,18 +276,18 @@ public:
// Create the slot
SlotImp::ptr const slot (std::make_shared <SlotImp> (local_endpoint,
remote_endpoint, fixed (remote_endpoint.address (), state),
remote_endpoint, fixed (remote_endpoint.address ()),
m_clock));
// Add slot to table
auto const result (state->slots.emplace (
auto const result (slots_.emplace (
slot->remote_endpoint (), slot));
// Remote address must not already exist
assert (result.second);
// Add to the connected address list
state->connected_addresses.emplace (remote_endpoint.address());
connectedAddresses_.emplace (remote_endpoint.address());
// Update counts
state->counts.add (*slot);
counts_.add (*slot);
return result.first->second;
}
@@ -323,11 +299,11 @@ public:
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic connect " << remote_endpoint;
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// Check for duplicate connection
if (state->slots.find (remote_endpoint) !=
state->slots.end ())
if (slots_.find (remote_endpoint) !=
slots_.end ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic dropping " << remote_endpoint <<
@@ -337,20 +313,20 @@ public:
// Create the slot
SlotImp::ptr const slot (std::make_shared <SlotImp> (
remote_endpoint, fixed (remote_endpoint, state), m_clock));
remote_endpoint, fixed (remote_endpoint), m_clock));
// Add slot to table
std::pair <Slots::iterator, bool> result (
state->slots.emplace (slot->remote_endpoint (),
slot));
auto const result =
slots_.emplace (slot->remote_endpoint (),
slot);
// Remote address must not already exist
assert (result.second);
// Add to the connected address list
state->connected_addresses.emplace (remote_endpoint.address());
connectedAddresses_.emplace (remote_endpoint.address());
// Update counts
state->counts.add (*slot);
counts_.add (*slot);
return result.first->second;
}
@@ -363,18 +339,18 @@ public:
"Logic connected" << slot->remote_endpoint () <<
" on local " << local_endpoint;
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// The object must exist in our table
assert (state->slots.find (slot->remote_endpoint ()) !=
state->slots.end ());
assert (slots_.find (slot->remote_endpoint ()) !=
slots_.end ());
// Assign the local endpoint now that it's known
slot->local_endpoint (local_endpoint);
// Check for self-connect by address
{
auto const iter (state->slots.find (local_endpoint));
if (iter != state->slots.end ())
auto const iter (slots_.find (local_endpoint));
if (iter != slots_.end ())
{
assert (iter->second->local_endpoint ()
== slot->remote_endpoint ());
@@ -386,9 +362,9 @@ public:
}
// Update counts
state->counts.remove (*slot);
counts_.remove (*slot);
slot->state (Slot::connected);
state->counts.add (*slot);
counts_.add (*slot);
return true;
}
@@ -400,56 +376,55 @@ public:
"Logic handshake " << slot->remote_endpoint () <<
" with " << (cluster ? "clustered " : "") << "key " << key;
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// The object must exist in our table
assert (state->slots.find (slot->remote_endpoint ()) !=
state->slots.end ());
assert (slots_.find (slot->remote_endpoint ()) !=
slots_.end ());
// Must be accepted or connected
assert (slot->state() == Slot::accept ||
slot->state() == Slot::connected);
// Check for duplicate connection by key
if (state->keys.find (key) != state->keys.end())
if (keys_.find (key) != keys_.end())
return Result::duplicate;
// See if we have an open space for this slot
if (! state->counts.can_activate (*slot))
if (! counts_.can_activate (*slot))
{
if (! slot->inbound())
state->bootcache.on_success (
bootcache_.on_success (
slot->remote_endpoint());
return Result::full;
}
// Set key and cluster right before adding to the map
// otherwise we could assert later when erasing the key.
state->counts.remove (*slot);
counts_.remove (*slot);
slot->public_key (key);
slot->cluster (cluster);
state->counts.add (*slot);
counts_.add (*slot);
// Add the public key to the active set
std::pair <Keys::iterator, bool> const result (
state->keys.insert (key));
auto const result = keys_.insert (key);
// Public key must not already exist
assert (result.second);
(void) result.second;
// Change state and update counts
state->counts.remove (*slot);
counts_.remove (*slot);
slot->activate (m_clock.now ());
state->counts.add (*slot);
counts_.add (*slot);
if (! slot->inbound())
state->bootcache.on_success (
bootcache_.on_success (
slot->remote_endpoint());
// Mark fixed slot success
if (slot->fixed() && ! slot->inbound())
{
auto iter (state->fixed.find (slot->remote_endpoint()));
assert (iter != state->fixed.end ());
auto iter (fixed_.find (slot->remote_endpoint()));
assert (iter != fixed_.end ());
iter->second.success (m_clock.now ());
if (m_journal.trace) m_journal.trace << beast::leftw (18) <<
"Logic fixed " << slot->remote_endpoint () << " success";
@@ -465,12 +440,12 @@ public:
std::vector <Endpoint>
redirect (SlotImp::ptr const& slot)
{
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
RedirectHandouts h (slot);
state->livecache.hops.shuffle();
livecache_.hops.shuffle();
handout (&h, (&h)+1,
state->livecache.hops.begin(),
state->livecache.hops.end());
livecache_.hops.begin(),
livecache_.hops.end());
return std::move(h.list());
}
@@ -485,27 +460,33 @@ public:
{
std::vector <beast::IP::Endpoint> const none;
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// Count how many more outbound attempts to make
//
auto needed (state->counts.attempts_needed ());
auto needed (counts_.attempts_needed ());
if (needed == 0)
return none;
ConnectHandouts h (needed, m_squelches);
// Make sure we don't connect to already-connected entries.
squelch_slots (state);
for (auto const& s : slots_)
{
auto const result (m_squelches.insert (
s.second->remote_endpoint().address()));
if (! result.second)
m_squelches.touch (result.first);
}
// 1. Use Fixed if:
// Fixed active count is below fixed count AND
// ( There are eligible fixed addresses to try OR
// Any outbound attempts are in progress)
//
if (state->counts.fixed_active() < state->fixed.size ())
if (counts_.fixed_active() < fixed_.size ())
{
get_fixed (needed, h.list(), m_squelches, state);
get_fixed (needed, h.list(), m_squelches);
if (! h.list().empty ())
{
@@ -514,11 +495,11 @@ public:
return h.list();
}
if (state->counts.attempts() > 0)
if (counts_.attempts() > 0)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic waiting on " <<
state->counts.attempts() << " attempts";
counts_.attempts() << " attempts";
return none;
}
}
@@ -526,8 +507,8 @@ public:
// Only proceed if auto connect is enabled and we
// have less than the desired number of outbound slots
//
if (! state->config.autoConnect ||
state->counts.out_active () >= state->counts.out_max ())
if (! config_.autoConnect ||
counts_.out_active () >= counts_.out_max ())
return none;
// 2. Use Livecache if:
@@ -535,10 +516,10 @@ public:
// Any outbound attempts are in progress
//
{
state->livecache.hops.shuffle();
livecache_.hops.shuffle();
handout (&h, (&h)+1,
state->livecache.hops.rbegin(),
state->livecache.hops.rend());
livecache_.hops.rbegin(),
livecache_.hops.rend());
if (! h.list().empty ())
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
@@ -546,11 +527,11 @@ public:
((h.list().size () > 1) ? "endpoints" : "endpoint");
return h.list();
}
else if (state->counts.attempts() > 0)
else if (counts_.attempts() > 0)
{
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic waiting on " <<
state->counts.attempts() << " attempts";
counts_.attempts() << " attempts";
return none;
}
}
@@ -571,8 +552,8 @@ public:
// 4. Use Bootcache if:
// There are any entries we haven't tried lately
//
for (auto iter (state->bootcache.begin());
! h.full() && iter != state->bootcache.end(); ++iter)
for (auto iter (bootcache_.begin());
! h.full() && iter != bootcache_.end(); ++iter)
h.try_insert (*iter);
if (! h.list().empty ())
@@ -592,7 +573,7 @@ public:
{
std::vector<std::pair<Slot::ptr, std::vector<Endpoint>>> result;
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
clock_type::time_point const now = m_clock.now();
if (m_whenBroadcast <= now)
@@ -602,8 +583,8 @@ public:
{
// build list of active slots
std::vector <SlotImp::ptr> slots;
slots.reserve (state->slots.size());
std::for_each (state->slots.cbegin(), state->slots.cend(),
slots.reserve (slots_.size());
std::for_each (slots_.cbegin(), slots_.cend(),
[&slots](Slots::value_type const& value)
{
if (value.second->state() == Slot::active)
@@ -632,23 +613,23 @@ public:
// 2. We have slots
// 3. We haven't failed the firewalled test
//
if (state->config.wantIncoming &&
state->counts.inboundSlots() > 0)
if (config_.wantIncoming &&
counts_.inboundSlots() > 0)
{
Endpoint ep;
ep.hops = 0;
ep.address = beast::IP::Endpoint (
beast::IP::AddressV4 ()).at_port (
state->config.listeningPort);
config_.listeningPort);
for (auto& t : targets)
t.insert (ep);
}
// build sequence of endpoints by hops
state->livecache.hops.shuffle();
livecache_.hops.shuffle();
handout (targets.begin(), targets.end(),
state->livecache.hops.begin(),
state->livecache.hops.end());
livecache_.hops.begin(),
livecache_.hops.end());
// broadcast
for (auto const& t : targets)
@@ -670,30 +651,29 @@ public:
void once_per_second()
{
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// Expire the Livecache
state->livecache.expire ();
livecache_.expire ();
// Expire the recent cache in each slot
for (auto const& entry : state->slots)
for (auto const& entry : slots_)
entry.second->expire();
// Expire the recent attempts table
beast::expire (m_squelches,
Tuning::recentAttemptDuration);
state->bootcache.periodicActivity ();
bootcache_.periodicActivity ();
}
//--------------------------------------------------------------------------
// Validate and clean up the list that we received from the slot.
void preprocess (SlotImp::ptr const& slot, Endpoints& list,
typename SharedState::Access& state)
void preprocess (SlotImp::ptr const& slot, Endpoints& list)
{
bool neighbor (false);
for (auto iter (list.begin()); iter != list.end();)
for (auto iter = list.begin(); iter != list.end();)
{
Endpoint& ep (*iter);
@@ -767,16 +747,16 @@ public:
" contained " << list.size () <<
((list.size() > 1) ? " entries" : " entry");
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// The object must exist in our table
assert (state->slots.find (slot->remote_endpoint ()) !=
state->slots.end ());
assert (slots_.find (slot->remote_endpoint ()) !=
slots_.end ());
// Must be handshaked!
assert (slot->state() == Slot::active);
preprocess (slot, list, state);
preprocess (slot, list);
clock_type::time_point const now (m_clock.now());
@@ -827,8 +807,8 @@ public:
// listening test, else we silently drop their messsage
// since their listening port is misconfigured.
//
state->livecache.insert (ep);
state->bootcache.insert (ep.address);
livecache_.insert (ep);
bootcache_.insert (ep.address);
}
slot->whenAcceptEndpoints = now + Tuning::secondsPerMessage;
@@ -839,52 +819,52 @@ public:
void on_legacy_endpoints (IPAddresses const& list)
{
// Ignoring them also seems a valid choice.
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
for (IPAddresses::const_iterator iter (list.begin());
iter != list.end(); ++iter)
state->bootcache.insert (*iter);
bootcache_.insert (*iter);
}
void remove (SlotImp::ptr const& slot, typename SharedState::Access& state)
void remove (SlotImp::ptr const& slot)
{
Slots::iterator const iter (state->slots.find (
slot->remote_endpoint ()));
auto const iter = slots_.find (
slot->remote_endpoint ());
// The slot must exist in the table
assert (iter != state->slots.end ());
assert (iter != slots_.end ());
// Remove from slot by IP table
state->slots.erase (iter);
slots_.erase (iter);
// Remove the key if present
if (slot->public_key () != boost::none)
{
Keys::iterator const iter (state->keys.find (*slot->public_key()));
auto const iter = keys_.find (*slot->public_key());
// Key must exist
assert (iter != state->keys.end ());
state->keys.erase (iter);
assert (iter != keys_.end ());
keys_.erase (iter);
}
// Remove from connected address table
{
auto const iter (state->connected_addresses.find (
auto const iter (connectedAddresses_.find (
slot->remote_endpoint().address()));
// Address must exist
assert (iter != state->connected_addresses.end ());
state->connected_addresses.erase (iter);
assert (iter != connectedAddresses_.end ());
connectedAddresses_.erase (iter);
}
// Update counts
state->counts.remove (*slot);
counts_.remove (*slot);
}
void on_closed (SlotImp::ptr const& slot)
{
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
remove (slot, state);
remove (slot);
// Mark fixed slot failure
if (slot->fixed() && ! slot->inbound() && slot->state() != Slot::active)
{
auto iter (state->fixed.find (slot->remote_endpoint()));
assert (iter != state->fixed.end ());
auto iter (fixed_.find (slot->remote_endpoint()));
assert (iter != fixed_.end ());
iter->second.failure (m_clock.now ());
if (m_journal.debug) m_journal.debug << beast::leftw (18) <<
"Logic fixed " << slot->remote_endpoint () << " failed";
@@ -900,7 +880,7 @@ public:
case Slot::connect:
case Slot::connected:
state->bootcache.on_failure (slot->remote_endpoint ());
bootcache_.on_failure (slot->remote_endpoint ());
// VFALCO TODO If the address exists in the ephemeral/live
// endpoint livecache then we should mark the failure
// as if it didn't pass the listening test. We should also
@@ -925,9 +905,9 @@ public:
void on_failure (SlotImp::ptr const& slot)
{
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
state->bootcache.on_failure (slot->remote_endpoint ());
bootcache_.on_failure (slot->remote_endpoint ());
}
// Insert a set of redirect IP addresses into the Bootcache
@@ -939,9 +919,10 @@ public:
//--------------------------------------------------------------------------
// Returns `true` if the address matches a fixed slot address
bool fixed (beast::IP::Endpoint const& endpoint, typename SharedState::Access& state) const
// Must have the lock held
bool fixed (beast::IP::Endpoint const& endpoint) const
{
for (auto const& entry : state->fixed)
for (auto const& entry : fixed_)
if (entry.first == endpoint)
return true;
return false;
@@ -949,9 +930,10 @@ public:
// Returns `true` if the address matches a fixed slot address
// Note that this does not use the port information in the IP::Endpoint
bool fixed (beast::IP::Address const& address, typename SharedState::Access& state) const
// Must have the lock held
bool fixed (beast::IP::Address const& address) const
{
for (auto const& entry : state->fixed)
for (auto const& entry : fixed_)
if (entry.first.address () == address)
return true;
return false;
@@ -966,17 +948,16 @@ public:
/** Adds eligible Fixed addresses for outbound attempts. */
template <class Container>
void get_fixed (std::size_t needed, Container& c,
typename ConnectHandouts::Squelches& squelches,
typename SharedState::Access& state)
typename ConnectHandouts::Squelches& squelches)
{
auto const now (m_clock.now());
for (auto iter = state->fixed.begin ();
needed && iter != state->fixed.end (); ++iter)
for (auto iter = fixed_.begin ();
needed && iter != fixed_.end (); ++iter)
{
auto const& address (iter->first.address());
if (iter->second.when() <= now && squelches.find(address) ==
squelches.end() && std::none_of (
state->slots.cbegin(), state->slots.cend(),
slots_.cbegin(), slots_.cend(),
[address](Slots::value_type const& v)
{
return address == v.first.address();
@@ -991,20 +972,6 @@ public:
//--------------------------------------------------------------------------
// Adds slot addresses to the squelched set
void squelch_slots (typename SharedState::Access& state)
{
for (auto const& s : state->slots)
{
auto const result (m_squelches.insert (
s.second->remote_endpoint().address()));
if (! result.second)
m_squelches.touch (result.first);
}
}
//--------------------------------------------------------------------------
void
addStaticSource (beast::SharedPtr <Source> const& source)
{
@@ -1023,25 +990,16 @@ public:
//
//--------------------------------------------------------------------------
// Add one address.
// Returns `true` if the address is new.
//
bool addBootcacheAddress (beast::IP::Endpoint const& address,
typename SharedState::Access& state)
{
return state->bootcache.insert (address);
}
// Add a set of addresses.
// Returns the number of addresses added.
//
int addBootcacheAddresses (IPAddresses const& list)
{
int count (0);
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
for (auto addr : list)
{
if (addBootcacheAddress (addr, state))
if (bootcache_.insert (addr))
++count;
}
return count;
@@ -1054,10 +1012,10 @@ public:
{
{
typename SharedState::Access state (m_state);
if (state->stopping)
std::lock_guard<std::recursive_mutex> _(lock_);
if (stopping_)
return;
state->fetchSource = source;
fetchSource_ = source;
}
// VFALCO NOTE The fetch is synchronous,
@@ -1066,10 +1024,10 @@ public:
source->fetch (results, m_journal);
{
typename SharedState::Access state (m_state);
if (state->stopping)
std::lock_guard<std::recursive_mutex> _(lock_);
if (stopping_)
return;
state->fetchSource = nullptr;
fetchSource_ = nullptr;
}
}
@@ -1135,37 +1093,37 @@ public:
void onWrite (beast::PropertyStream::Map& map)
{
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
// VFALCO NOTE These ugly casts are needed because
// of how std::size_t is declared on some linuxes
//
map ["bootcache"] = std::uint32_t (state->bootcache.size());
map ["fixed"] = std::uint32_t (state->fixed.size());
map ["bootcache"] = std::uint32_t (bootcache_.size());
map ["fixed"] = std::uint32_t (fixed_.size());
{
beast::PropertyStream::Set child ("peers", map);
writeSlots (child, state->slots);
writeSlots (child, slots_);
}
{
beast::PropertyStream::Map child ("counts", map);
state->counts.onWrite (child);
counts_.onWrite (child);
}
{
beast::PropertyStream::Map child ("config", map);
state->config.onWrite (child);
config_.onWrite (child);
}
{
beast::PropertyStream::Map child ("livecache", map);
state->livecache.onWrite (child);
livecache_.onWrite (child);
}
{
beast::PropertyStream::Map child ("bootcache", map);
state->bootcache.onWrite (child);
bootcache_.onWrite (child);
}
}
@@ -1175,14 +1133,9 @@ public:
//
//--------------------------------------------------------------------------
State const& state () const
{
return *typename SharedState::ConstAccess (m_state);
}
Counts const& counts () const
{
return typename SharedState::ConstAccess (m_state)->counts;
return counts_;
}
static std::string stateString (Slot::State state)
@@ -1209,10 +1162,10 @@ void
Logic<Checker>::onRedirects (FwdIter first, FwdIter last,
boost::asio::ip::tcp::endpoint const& remote_address)
{
typename SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
std::size_t n = 0;
for(;first != last && n < Tuning::maxRedirects; ++first, ++n)
state->bootcache.insert(
bootcache_.insert(
beast::IPAddressConversion::from_asio(*first));
if (n > 0)
if (m_journal.trace) m_journal.trace << beast::leftw (18) <<

View File

@@ -29,8 +29,8 @@
#include <ripple/protocol/JsonFields.h>
#include <beast/chrono/abstract_clock.h>
#include <beast/Insight.h>
#include <beast/threads/SharedData.h>
#include <beast/utility/PropertyStream.h>
#include <mutex>
namespace ripple {
namespace Resource {
@@ -43,33 +43,6 @@ private:
using Table = hash_map <Key, Entry, Key::hasher, Key::key_equal>;
using EntryIntrusiveList = beast::List <Entry>;
struct State
{
// Table of all entries
Table table;
// Because the following are intrusive lists, a given Entry may be in
// at most list at a given instant. The Entry must be removed from
// one list before placing it in another.
// List of all active inbound entries
EntryIntrusiveList inbound;
// List of all active outbound entries
EntryIntrusiveList outbound;
// List of all active admin entries
EntryIntrusiveList admin;
// List of all inactve entries
EntryIntrusiveList inactive;
// All imported gossip data
Imports import_table;
};
using SharedState = beast::SharedData <State>;
struct Stats
{
Stats (beast::insight::Collector::ptr const& collector)
@@ -82,11 +55,34 @@ private:
beast::insight::Meter drop;
};
SharedState m_state;
Stats m_stats;
Stopwatch& m_clock;
beast::Journal m_journal;
std::recursive_mutex lock_;
// Table of all entries
Table table_;
// Because the following are intrusive lists, a given Entry may be in
// at most list at a given instant. The Entry must be removed from
// one list before placing it in another.
// List of all active inbound entries
EntryIntrusiveList inbound_;
// List of all active outbound entries
EntryIntrusiveList outbound_;
// List of all active admin entries
EntryIntrusiveList admin_;
// List of all inactve entries
EntryIntrusiveList inactive_;
// All imported gossip data
Imports importTable_;
//--------------------------------------------------------------------------
public:
@@ -105,9 +101,8 @@ public:
// Order matters here as well, the import table has to be
// destroyed before the consumer table.
//
SharedState::UnlockedAccess state (m_state);
state->import_table.clear();
state->table.clear();
importTable_.clear();
table_.clear();
}
Consumer newInboundEndpoint (beast::IP::Endpoint const& address)
@@ -115,11 +110,11 @@ public:
Entry* entry (nullptr);
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct,
std::lock_guard<std::recursive_mutex> _(lock_);
auto result =
table_.emplace (std::piecewise_construct,
std::make_tuple (kindInbound, address.at_port (0)), // Key
std::make_tuple (m_clock.now()))); // Entry
std::make_tuple (m_clock.now())); // Entry
entry = &result.first->second;
entry->key = &result.first->first;
@@ -128,10 +123,10 @@ public:
{
if (! result.second)
{
state->inactive.erase (
state->inactive.iterator_to (*entry));
inactive_.erase (
inactive_.iterator_to (*entry));
}
state->inbound.push_back (*entry);
inbound_.push_back (*entry);
}
}
@@ -146,11 +141,11 @@ public:
Entry* entry (nullptr);
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct,
std::lock_guard<std::recursive_mutex> _(lock_);
auto result =
table_.emplace (std::piecewise_construct,
std::make_tuple (kindOutbound, address), // Key
std::make_tuple (m_clock.now()))); // Entry
std::make_tuple (m_clock.now())); // Entry
entry = &result.first->second;
entry->key = &result.first->first;
@@ -158,9 +153,9 @@ public:
if (entry->refcount == 1)
{
if (! result.second)
state->inactive.erase (
state->inactive.iterator_to (*entry));
state->outbound.push_back (*entry);
inactive_.erase (
inactive_.iterator_to (*entry));
outbound_.push_back (*entry);
}
}
@@ -175,11 +170,11 @@ public:
Entry* entry (nullptr);
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct,
std::lock_guard<std::recursive_mutex> _(lock_);
auto result =
table_.emplace (std::piecewise_construct,
std::make_tuple (kindAdmin, name), // Key
std::make_tuple (m_clock.now()))); // Entry
std::make_tuple (m_clock.now())); // Entry
entry = &result.first->second;
entry->key = &result.first->first;
@@ -187,9 +182,9 @@ public:
if (entry->refcount == 1)
{
if (! result.second)
state->inactive.erase (
state->inactive.iterator_to (*entry));
state->admin.push_back (*entry);
inactive_.erase (
inactive_.iterator_to (*entry));
admin_.push_back (*entry);
}
}
@@ -207,11 +202,11 @@ public:
Entry* entry (nullptr);
{
SharedState::Access state (m_state);
std::pair <Table::iterator, bool> result (
state->table.emplace (std::piecewise_construct,
std::lock_guard<std::recursive_mutex> _(lock_);
auto result =
table_.emplace (std::piecewise_construct,
std::make_tuple (kindAdmin, name), // Key
std::make_tuple (m_clock.now()))); // Entry
std::make_tuple (m_clock.now())); // Entry
entry = &result.first->second;
entry->key = &result.first->first;
@@ -219,12 +214,12 @@ public:
if (entry->refcount == 1)
{
if (! result.second)
state->inactive.erase (
state->inactive.iterator_to (*entry));
state->admin.push_back (*entry);
inactive_.erase (
inactive_.iterator_to (*entry));
admin_.push_back (*entry);
}
release (prior, state);
release (prior);
}
return *entry;
@@ -241,9 +236,9 @@ public:
clock_type::time_point const now (m_clock.now());
Json::Value ret (Json::objectValue);
SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
for (auto& inboundEntry : state->inbound)
for (auto& inboundEntry : inbound_)
{
int localBalance = inboundEntry.local_balance.value (now);
if ((localBalance + inboundEntry.remote_balance) >= threshold)
@@ -255,7 +250,7 @@ public:
}
}
for (auto& outboundEntry : state->outbound)
for (auto& outboundEntry : outbound_)
{
int localBalance = outboundEntry.local_balance.value (now);
if ((localBalance + outboundEntry.remote_balance) >= threshold)
@@ -267,7 +262,7 @@ public:
}
}
for (auto& adminEntry : state->admin)
for (auto& adminEntry : admin_)
{
int localBalance = adminEntry.local_balance.value (now);
if ((localBalance + adminEntry.remote_balance) >= threshold)
@@ -288,11 +283,11 @@ public:
clock_type::time_point const now (m_clock.now());
Gossip gossip;
SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
gossip.items.reserve (state->inbound.size());
gossip.items.reserve (inbound_.size());
for (auto& inboundEntry : state->inbound)
for (auto& inboundEntry : inbound_)
{
Gossip::Item item;
item.balance = inboundEntry.local_balance.value (now);
@@ -312,11 +307,11 @@ public:
{
clock_type::rep const elapsed (m_clock.now().time_since_epoch().count());
{
SharedState::Access state (m_state);
std::pair <Imports::iterator, bool> result (
state->import_table.emplace (std::piecewise_construct,
std::lock_guard<std::recursive_mutex> _(lock_);
auto result =
importTable_.emplace (std::piecewise_construct,
std::make_tuple(origin), // Key
std::make_tuple(m_clock.now().time_since_epoch().count()))); // Import
std::make_tuple(m_clock.now().time_since_epoch().count())); // Import
if (result.second)
{
@@ -368,19 +363,19 @@ public:
//
void periodicActivity ()
{
SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
clock_type::rep const elapsed (m_clock.now().time_since_epoch().count());
for (auto iter (state->inactive.begin()); iter != state->inactive.end();)
for (auto iter (inactive_.begin()); iter != inactive_.end();)
{
if (iter->whenExpires <= elapsed)
{
m_journal.debug << "Expired " << *iter;
Table::iterator table_iter (
state->table.find (*iter->key));
auto table_iter =
table_.find (*iter->key);
++iter;
erase (table_iter, state);
erase (table_iter);
}
else
{
@@ -388,8 +383,8 @@ public:
}
}
Imports::iterator iter (state->import_table.begin());
while (iter != state->import_table.end())
auto iter = importTable_.begin();
while (iter != importTable_.end())
{
Import& import (iter->second);
if (iter->second.whenExpires <= elapsed)
@@ -400,7 +395,7 @@ public:
item_iter->consumer.entry().remote_balance -= item_iter->balance;
}
iter = state->import_table.erase (iter);
iter = importTable_.erase (iter);
}
else
++iter;
@@ -421,13 +416,25 @@ public:
return Disposition::ok;
}
void acquire (Entry& entry, SharedState::Access& state)
void erase (Table::iterator iter)
{
std::lock_guard<std::recursive_mutex> _(lock_);
Entry& entry (iter->second);
assert (entry.refcount == 0);
inactive_.erase (
inactive_.iterator_to (entry));
table_.erase (iter);
}
void acquire (Entry& entry)
{
std::lock_guard<std::recursive_mutex> _(lock_);
++entry.refcount;
}
void release (Entry& entry, SharedState::Access& state)
void release (Entry& entry)
{
std::lock_guard<std::recursive_mutex> _(lock_);
if (--entry.refcount == 0)
{
m_journal.debug <<
@@ -436,37 +443,29 @@ public:
switch (entry.key->kind)
{
case kindInbound:
state->inbound.erase (
state->inbound.iterator_to (entry));
inbound_.erase (
inbound_.iterator_to (entry));
break;
case kindOutbound:
state->outbound.erase (
state->outbound.iterator_to (entry));
outbound_.erase (
outbound_.iterator_to (entry));
break;
case kindAdmin:
state->admin.erase (
state->admin.iterator_to (entry));
admin_.erase (
admin_.iterator_to (entry));
break;
default:
bassertfalse;
break;
}
state->inactive.push_back (entry);
inactive_.push_back (entry);
entry.whenExpires = m_clock.now().time_since_epoch().count() + secondsUntilExpiration;
}
}
void erase (Table::iterator iter, SharedState::Access& state)
{
Entry& entry (iter->second);
bassert (entry.refcount == 0);
state->inactive.erase (
state->inactive.iterator_to (entry));
state->table.erase (iter);
}
Disposition charge (Entry& entry, Charge const& fee, SharedState::Access& state)
Disposition charge (Entry& entry, Charge const& fee)
{
std::lock_guard<std::recursive_mutex> _(lock_);
clock_type::time_point const now (m_clock.now());
int const balance (entry.add (fee.cost(), now));
m_journal.trace <<
@@ -474,30 +473,35 @@ public:
return disposition (balance);
}
bool warn (Entry& entry, SharedState::Access& state)
bool warn (Entry& entry)
{
if (entry.admin())
return false;
std::lock_guard<std::recursive_mutex> _(lock_);
bool notify (false);
clock_type::rep const elapsed (m_clock.now().time_since_epoch().count());
if (entry.balance (m_clock.now()) >= warningThreshold &&
elapsed != entry.lastWarningTime)
{
charge (entry, feeWarning, state);
charge (entry, feeWarning);
notify = true;
entry.lastWarningTime = elapsed;
}
if (notify)
m_journal.info <<
"Load warning: " << entry;
if (notify)
++m_stats.warn;
return notify;
}
bool disconnect (Entry& entry, SharedState::Access& state)
bool disconnect (Entry& entry)
{
if (entry.admin())
return false;
std::lock_guard<std::recursive_mutex> _(lock_);
bool drop (false);
clock_type::time_point const now (m_clock.now());
int const balance (entry.balance (now));
@@ -511,60 +515,17 @@ public:
// Adding feeDrop at this point keeps the dropped connection
// from re-connecting for at least a little while after it is
// dropped.
charge (entry, feeDrop, state);
charge (entry, feeDrop);
++m_stats.drop;
drop = true;
}
return drop;
}
int balance (Entry& entry, SharedState::Access& state)
{
return entry.balance (m_clock.now());
}
//--------------------------------------------------------------------------
void acquire (Entry& entry)
{
SharedState::Access state (m_state);
acquire (entry, state);
}
void release (Entry& entry)
{
SharedState::Access state (m_state);
release (entry, state);
}
Disposition charge (Entry& entry, Charge const& fee)
{
SharedState::Access state (m_state);
return charge (entry, fee, state);
}
bool warn (Entry& entry)
{
if (entry.admin())
return false;
SharedState::Access state (m_state);
return warn (entry, state);
}
bool disconnect (Entry& entry)
{
if (entry.admin())
return false;
SharedState::Access state (m_state);
return disconnect (entry, state);
}
int balance (Entry& entry)
{
SharedState::Access state (m_state);
return balance (entry, state);
std::lock_guard<std::recursive_mutex> _(lock_);
return entry.balance (m_clock.now());
}
//--------------------------------------------------------------------------
@@ -590,26 +551,26 @@ public:
{
clock_type::time_point const now (m_clock.now());
SharedState::Access state (m_state);
std::lock_guard<std::recursive_mutex> _(lock_);
{
beast::PropertyStream::Set s ("inbound", map);
writeList (now, s, state->inbound);
writeList (now, s, inbound_);
}
{
beast::PropertyStream::Set s ("outbound", map);
writeList (now, s, state->outbound);
writeList (now, s, outbound_);
}
{
beast::PropertyStream::Set s ("admin", map);
writeList (now, s, state->admin);
writeList (now, s, admin_);
}
{
beast::PropertyStream::Set s ("inactive", map);
writeList (now, s, state->inactive);
writeList (now, s, inactive_);
}
}
};

View File

@@ -24,7 +24,6 @@
#include <ripple/server/Handler.h>
#include <ripple/server/Server.h>
#include <beast/intrusive/List.h>
#include <beast/threads/SharedData.h>
#include <beast/threads/Thread.h>
#include <boost/asio.hpp>
#include <boost/intrusive/list.hpp>