From 6602aff53dcfcce4ed6809a2e928ae1e49161581 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 27 Sep 2013 13:44:01 -0700 Subject: [PATCH] Update Validators for ServiceQueue --- Builds/VisualStudio2012/RippleD.vcxproj | 1 - .../VisualStudio2012/RippleD.vcxproj.filters | 3 - src/ripple/validators/api/Source.h | 11 +- src/ripple/validators/api/Types.h | 6 - src/ripple/validators/impl/CancelCallbacks.h | 69 ---------- src/ripple/validators/impl/Logic.h | 83 ++++++------ src/ripple/validators/impl/Manager.cpp | 121 ++++++++---------- src/ripple/validators/impl/SourceFile.cpp | 2 +- src/ripple/validators/impl/SourceStrings.cpp | 2 +- src/ripple/validators/impl/SourceURL.cpp | 2 +- src/ripple/validators/impl/Tests.cpp | 5 +- src/ripple/validators/ripple_validators.cpp | 1 - src/ripple_app/ledger/LedgerMaster.cpp | 2 - src/ripple_app/main/Application.cpp | 2 - src/ripple_app/peers/Peer.cpp | 2 - 15 files changed, 101 insertions(+), 211 deletions(-) delete mode 100644 src/ripple/validators/impl/CancelCallbacks.h diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index 2801a29a44..19b8557d78 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -1636,7 +1636,6 @@ - diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index 139a15303f..bf1eaa7442 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -1830,9 +1830,6 @@ [1] Ripple\validators\impl - - [1] Ripple\validators\impl - [2] Old Ripple\ripple_app diff --git a/src/ripple/validators/api/Source.h b/src/ripple/validators/api/Source.h index 476151e454..8fdaebae4c 100644 --- a/src/ripple/validators/api/Source.h +++ b/src/ripple/validators/api/Source.h @@ -52,9 +52,6 @@ public: virtual String createParam () = 0; /** Fetch the most recent list from the Source. - If possible, the Source should periodically poll the - CancelCallback, and abort the operation if shouldCancel - returns `true`. This call will block. */ struct Result @@ -67,8 +64,12 @@ public: Time expirationTime; Array list; }; - - virtual Result fetch (CancelCallback& callback, Journal journal) = 0; + + /** Cancel any pending fetch. + The default implementation does nothing. + */ + virtual void cancel () { } + virtual Result fetch (Journal journal) = 0; }; } diff --git a/src/ripple/validators/api/Types.h b/src/ripple/validators/api/Types.h index 180f32f875..f801c5cf69 100644 --- a/src/ripple/validators/api/Types.h +++ b/src/ripple/validators/api/Types.h @@ -31,12 +31,6 @@ struct ReceivedValidation RipplePublicKeyHash publicKeyHash; }; -/** Callback used to optionally cancel long running fetch operations. */ -struct CancelCallback -{ - virtual bool shouldCancel () = 0; -}; - } } diff --git a/src/ripple/validators/impl/CancelCallbacks.h b/src/ripple/validators/impl/CancelCallbacks.h deleted file mode 100644 index a360e38406..0000000000 --- a/src/ripple/validators/impl/CancelCallbacks.h +++ /dev/null @@ -1,69 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - - -#ifndef RIPPLE_VALIDATORS_CANCELCALLBACKS_H_INCLUDED -#define RIPPLE_VALIDATORS_CANCELCALLBACKS_H_INCLUDED - -namespace ripple { -namespace Validators { - -// Dummy CancelCallback that does nothing -// -class NoOpCancelCallback : public CancelCallback -{ -public: - bool shouldCancel () - { - return false; - } - -}; - -//------------------------------------------------------------------------------ - -// CancelCallback attached to ThreadWithCallQueue -// -class ThreadCancelCallback - : public CancelCallback - , public Uncopyable -{ -public: - explicit ThreadCancelCallback (ThreadWithCallQueue& thread) - : m_thread (thread) - , m_interrupted (false) - { - } - - bool shouldCancel () - { - if (m_interrupted) - return true; - return m_interrupted = m_thread.interruptionPoint (); - } - -private: - ThreadWithCallQueue& m_thread; - bool m_interrupted; -}; - -} -} - -#endif diff --git a/src/ripple/validators/impl/Logic.h b/src/ripple/validators/impl/Logic.h index d078beff0c..8ceb33fe4e 100644 --- a/src/ripple/validators/impl/Logic.h +++ b/src/ripple/validators/impl/Logic.h @@ -268,9 +268,7 @@ public: ScopedPointer object (source); - NoOpCancelCallback cancelCallback; - - Source::Result result (object->fetch (cancelCallback, m_journal)); + Source::Result result (object->fetch (m_journal)); SharedState::Access state (m_state); if (result.success) @@ -390,49 +388,46 @@ public: // /** Perform a fetch on the source. */ - void fetch (SourceDesc& desc, CancelCallback& callback) + void fetch (SourceDesc& desc) { m_journal.info << "fetch ('" << desc.source->name() << "')"; - Source::Result result (desc.source->fetch (callback, m_journal)); + Source::Result result (desc.source->fetch (m_journal)); - if (! callback.shouldCancel ()) + // Reset fetch timer for the source. + desc.whenToFetch = Time::getCurrentTime () + + RelativeTime (secondsBetweenFetches); + + if (result.success) { - // Reset fetch timer for the source. - desc.whenToFetch = Time::getCurrentTime () + - RelativeTime (secondsBetweenFetches); + SharedState::Access state (m_state); - if (result.success) - { - SharedState::Access state (m_state); + // Add the new source info to the map + merge (result.list, state); - // Add the new source info to the map - merge (result.list, state); + // Swap lists + desc.result.swapWith (result); - // Swap lists - desc.result.swapWith (result); + // Remove the old source info from the map + remove (result.list, state); - // Remove the old source info from the map - remove (result.list, state); + // See if we need to rebuild + checkChosen (); - // See if we need to rebuild - checkChosen (); + // Reset failure status + desc.numberOfFailures = 0; + desc.status = SourceDesc::statusFetched; - // Reset failure status - desc.numberOfFailures = 0; - desc.status = SourceDesc::statusFetched; + // Update the source's list in the store + m_store.update (desc, true); + } + else + { + ++desc.numberOfFailures; + desc.status = SourceDesc::statusFailed; - // Update the source's list in the store - m_store.update (desc, true); - } - else - { - ++desc.numberOfFailures; - desc.status = SourceDesc::statusFailed; - - // Record the failure in the Store - m_store.update (desc); - } + // Record the failure in the Store + m_store.update (desc); } } @@ -445,17 +440,17 @@ public: m_store.update (desc); } - /** Check each Source to see if it needs processing. - @return `true` if an interruption occurred. + /** Process up to one source that needs fetching. + @return The number of sources that were fetched. */ - bool check (CancelCallback& callback) + std::size_t fetch_one () { - bool interrupted (false); + std::size_t n (0); Time const currentTime (Time::getCurrentTime ()); SharedState::Access state (m_state); for (SourcesType::iterator iter = state->sources.begin (); - iter != state->sources.end (); ++iter) + (n == 0) && iter != state->sources.end (); ++iter) { SourceDesc& desc (*iter); @@ -463,12 +458,8 @@ public: // if (desc.whenToFetch <= currentTime) { - fetch (desc, callback); - if (callback.shouldCancel ()) - { - interrupted = true; - break; - } + fetch (desc); + ++n; } // See if we need to expire @@ -480,7 +471,7 @@ public: } } - return interrupted; + return n; } //---------------------------------------------------------------------- diff --git a/src/ripple/validators/impl/Manager.cpp b/src/ripple/validators/impl/Manager.cpp index 51626a2f7b..c356303332 100644 --- a/src/ripple/validators/impl/Manager.cpp +++ b/src/ripple/validators/impl/Manager.cpp @@ -17,7 +17,6 @@ */ //============================================================================== - /* Information to track: @@ -104,27 +103,39 @@ namespace Validators { class ManagerImp : public Manager , public Stoppable - , public ThreadWithCallQueue::EntryPoints + , public Thread , public DeadlineTimer::Listener , public LeakChecked { public: + StoreSqdb m_store; + Logic m_logic; + Journal m_journal; + DeadlineTimer m_checkTimer; + ServiceQueue m_queue; + + // True if we should call check on idle. + // This gets set to false once we make it through the whole list. + // + bool m_checkSources; + ManagerImp (Stoppable& parent, Journal journal) : Stoppable ("Validators::Manager", parent) + , Thread ("Validators") , m_store (journal) , m_logic (m_store, journal) , m_journal (journal) - , m_thread ("Validators") , m_checkTimer (this) - , m_checkSources (true) // true to cause a full scan on start + , m_checkSources (true) { addRPCHandlers(); - m_thread.start (this); + + startThread(); } ~ManagerImp () { - m_thread.stop (true); + stopThread (); } //-------------------------------------------------------------------------- @@ -134,7 +145,7 @@ public: void onStop () { - m_thread.stop (false); + m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this)); } //-------------------------------------------------------------------------- @@ -149,7 +160,7 @@ public: Json::Value rpcRebuild (Json::Value const& args) { - m_thread.call (&Logic::buildChosen, &m_logic); + m_queue.dispatch (bind (&Logic::buildChosen, &m_logic)); Json::Value result; result ["chosen_list"] = "rebuilding"; return result; @@ -203,60 +214,40 @@ public: void addSource (Source* source) { -#if RIPPLE_USE_NEW_VALIDATORS - bassert (! isStopping()); - //m_thread.call (&Logic::add, &m_logic, source); -#else - delete source; -#endif + m_queue.dispatch (bind (&Logic::add, &m_logic, source)); } void addStaticSource (Source* source) { -#if RIPPLE_USE_NEW_VALIDATORS - bassert (! isStopping()); - //m_thread.call (&Logic::addStatic, &m_logic, source); -#else - delete source; -#endif + m_queue.dispatch (bind (&Logic::addStatic, &m_logic, source)); } + // VFALCO NOTE we should just do this on the callers thread? + // void receiveValidation (ReceivedValidation const& rv) { #if RIPPLE_USE_NEW_VALIDATORS if (! isStopping()) - m_thread.call (&Logic::receiveValidation, &m_logic, rv); + m_queue.dispatch (bind ( + &Logic::receiveValidation, &m_logic, rv)); #endif } + // VFALCO NOTE we should just do this on the callers thread? + // void ledgerClosed (RippleLedgerHash const& ledgerHash) { #if RIPPLE_USE_NEW_VALIDATORS if (! isStopping()) - m_thread.call (&Logic::ledgerClosed, &m_logic, ledgerHash); + m_queue.dispatch (bind ( + &Logic::ledgerClosed, &m_logic, ledgerHash)); #endif } //-------------------------------------------------------------------------- - void onDeadlineTimer (DeadlineTimer& timer) + void init () { -#if RIPPLE_USE_NEW_VALIDATORS - if (timer == m_checkTimer) - { - m_checkSources = true; - - // This will kick us back into threadIdle - m_thread.interrupt(); - } -#endif - } - - //-------------------------------------------------------------------------- - - void threadInit () - { -#if RIPPLE_USE_NEW_VALIDATORS File const file (File::getSpecialLocation ( File::userDocumentsDirectory).getChildFile ("validators.sqlite")); @@ -271,55 +262,49 @@ public: if (! error) { m_logic.load (); - - // This flag needs to be on, to force a full check of all - // sources on startup. Once we finish the check we will - // set the deadine timer. - // - bassert (m_checkSources); } -#endif } - void threadExit () + void onDeadlineTimer (DeadlineTimer& timer) { - // must come last - stopped (); + if (timer == m_checkTimer) + { + m_queue.dispatch (bind (&ManagerImp::setCheckSources, this)); + } } - bool threadIdle () + void setCheckSources () { - bool interrupted = false; + m_checkSources = true; + } -#if RIPPLE_USE_NEW_VALIDATORS + void checkSources () + { if (m_checkSources) { - ThreadCancelCallback cancelCallback (m_thread); - interrupted = m_logic.check (cancelCallback); - if (! interrupted) + if (m_logic.fetch_one () == 0) { // Made it through the list without interruption! // Clear the flag and set the deadline timer again. + // m_checkSources = false; m_checkTimer.setExpiration (checkEverySeconds); } } -#endif - - return interrupted; } -private: - StoreSqdb m_store; - Logic m_logic; - Journal m_journal; - ThreadWithCallQueue m_thread; - DeadlineTimer m_checkTimer; + void run () + { + init (); - // True if we should call check on idle. - // This gets set to false once we make it through the whole - // list without interruption. - bool m_checkSources; + while (! this->threadShouldExit()) + { + checkSources (); + m_queue.run_one(); + } + + stopped(); + } }; //------------------------------------------------------------------------------ diff --git a/src/ripple/validators/impl/SourceFile.cpp b/src/ripple/validators/impl/SourceFile.cpp index bcbfbdacba..e0ddf5ff00 100644 --- a/src/ripple/validators/impl/SourceFile.cpp +++ b/src/ripple/validators/impl/SourceFile.cpp @@ -48,7 +48,7 @@ public: return m_file.getFullPathName (); } - Result fetch (CancelCallback&, Journal journal) + Result fetch (Journal journal) { Result result; diff --git a/src/ripple/validators/impl/SourceStrings.cpp b/src/ripple/validators/impl/SourceStrings.cpp index f009ee13fd..21827d3bb3 100644 --- a/src/ripple/validators/impl/SourceStrings.cpp +++ b/src/ripple/validators/impl/SourceStrings.cpp @@ -50,7 +50,7 @@ public: return String::empty; } - Result fetch (CancelCallback&, Journal journal) + Result fetch (Journal journal) { Result result; diff --git a/src/ripple/validators/impl/SourceURL.cpp b/src/ripple/validators/impl/SourceURL.cpp index 789c345fe6..52e1422584 100644 --- a/src/ripple/validators/impl/SourceURL.cpp +++ b/src/ripple/validators/impl/SourceURL.cpp @@ -48,7 +48,7 @@ public: return m_url.full(); } - Result fetch (CancelCallback&, Journal journal) + Result fetch (Journal journal) { Result result; diff --git a/src/ripple/validators/impl/Tests.cpp b/src/ripple/validators/impl/Tests.cpp index b29ae7323a..7c29354499 100644 --- a/src/ripple/validators/impl/Tests.cpp +++ b/src/ripple/validators/impl/Tests.cpp @@ -122,7 +122,7 @@ public: return String::empty; } - Result fetch (CancelCallback& cancel, Journal) + Result fetch (Journal) { Result result; @@ -207,8 +207,7 @@ public: addSources (logic); - NoOpCancelCallback cancelCallback; - logic.check (cancelCallback); + logic.fetch_one (); ChosenList::Ptr list (logic.getChosen ()); diff --git a/src/ripple/validators/ripple_validators.cpp b/src/ripple/validators/ripple_validators.cpp index ca368fb714..2bae7bba5e 100644 --- a/src/ripple/validators/ripple_validators.cpp +++ b/src/ripple/validators/ripple_validators.cpp @@ -42,7 +42,6 @@ namespace ripple { using namespace beast; } -# include "impl/CancelCallbacks.h" # include "impl/ChosenList.h" # include "impl/SourceFile.h" # include "impl/SourceStrings.h" diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp index 143bb75d9e..aff7c8725a 100644 --- a/src/ripple_app/ledger/LedgerMaster.cpp +++ b/src/ripple_app/ledger/LedgerMaster.cpp @@ -522,12 +522,10 @@ void LedgerMaster::setFullLedger (Ledger::pointer ledger, bool isSynchronous, bo //-------------------------------------------------------------------------- // -#if RIPPLE_USE_NEW_VALIDATORS { if (isCurrent) getApp ().getValidators ().ledgerClosed (ledger->getHash()); } -#endif // //-------------------------------------------------------------------------- } diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index dbf3f2399e..c93d884047 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -166,7 +166,6 @@ public: // Initialize the Validators object with Config information. void initValidatorsConfig () { -#if RIPPLE_USE_NEW_VALIDATORS { std::vector const& strings (getConfig().validators); if (! strings.empty ()) @@ -182,7 +181,6 @@ public: { m_validators->addFile (getConfig().getValidatorsFile()); } -#endif } //-------------------------------------------------------------------------- diff --git a/src/ripple_app/peers/Peer.cpp b/src/ripple_app/peers/Peer.cpp index 0e3216130d..c4717b17fc 100644 --- a/src/ripple_app/peers/Peer.cpp +++ b/src/ripple_app/peers/Peer.cpp @@ -1460,14 +1460,12 @@ static void checkValidation (Job&, SerializedValidation::pointer val, bool isTru //---------------------------------------------------------------------- // { -#if RIPPLE_USE_NEW_VALIDATORS SerializedValidation const& sv (*val); Validators::ReceivedValidation rv; rv.ledgerHash = sv.getLedgerHash (); rv.publicKey = sv.getSignerPublic(); rv.publicKeyHash = sv.getSignerPublic(); getApp ().getValidators ().receiveValidation (rv); -#endif } // //----------------------------------------------------------------------