Update Validators for ServiceQueue

This commit is contained in:
Vinnie Falco
2013-09-27 13:44:01 -07:00
parent 3ec36929f1
commit 6602aff53d
15 changed files with 101 additions and 211 deletions

View File

@@ -1636,7 +1636,6 @@
<ClInclude Include="..\..\src\ripple\validators\api\Types.h" />
<ClInclude Include="..\..\src\ripple\validators\api\Manager.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\AgedHistory.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\CancelCallbacks.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\ChosenList.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\Logic.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\SourceDesc.h" />

View File

@@ -1830,9 +1830,6 @@
<ClInclude Include="..\..\src\ripple\validators\impl\StoreSqdb.h">
<Filter>[1] Ripple\validators\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\validators\impl\CancelCallbacks.h">
<Filter>[1] Ripple\validators\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_app\ripple_app.h">
<Filter>[2] Old Ripple\ripple_app</Filter>
</ClInclude>

View File

@@ -52,9 +52,6 @@ public:
virtual String createParam () = 0;
/** Fetch the most recent list from the Source.
If possible, the Source should periodically poll the
CancelCallback, and abort the operation if shouldCancel
returns `true`.
This call will block.
*/
struct Result
@@ -67,8 +64,12 @@ public:
Time expirationTime;
Array <Info> list;
};
virtual Result fetch (CancelCallback& callback, Journal journal) = 0;
/** Cancel any pending fetch.
The default implementation does nothing.
*/
virtual void cancel () { }
virtual Result fetch (Journal journal) = 0;
};
}

View File

@@ -31,12 +31,6 @@ struct ReceivedValidation
RipplePublicKeyHash publicKeyHash;
};
/** Callback used to optionally cancel long running fetch operations. */
struct CancelCallback
{
virtual bool shouldCancel () = 0;
};
}
}

View File

@@ -1,69 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_VALIDATORS_CANCELCALLBACKS_H_INCLUDED
#define RIPPLE_VALIDATORS_CANCELCALLBACKS_H_INCLUDED
namespace ripple {
namespace Validators {
// Dummy CancelCallback that does nothing
//
class NoOpCancelCallback : public CancelCallback
{
public:
bool shouldCancel ()
{
return false;
}
};
//------------------------------------------------------------------------------
// CancelCallback attached to ThreadWithCallQueue
//
class ThreadCancelCallback
: public CancelCallback
, public Uncopyable
{
public:
explicit ThreadCancelCallback (ThreadWithCallQueue& thread)
: m_thread (thread)
, m_interrupted (false)
{
}
bool shouldCancel ()
{
if (m_interrupted)
return true;
return m_interrupted = m_thread.interruptionPoint ();
}
private:
ThreadWithCallQueue& m_thread;
bool m_interrupted;
};
}
}
#endif

View File

@@ -268,9 +268,7 @@ public:
ScopedPointer <Source> object (source);
NoOpCancelCallback cancelCallback;
Source::Result result (object->fetch (cancelCallback, m_journal));
Source::Result result (object->fetch (m_journal));
SharedState::Access state (m_state);
if (result.success)
@@ -390,49 +388,46 @@ public:
//
/** Perform a fetch on the source. */
void fetch (SourceDesc& desc, CancelCallback& callback)
void fetch (SourceDesc& desc)
{
m_journal.info << "fetch ('" << desc.source->name() << "')";
Source::Result result (desc.source->fetch (callback, m_journal));
Source::Result result (desc.source->fetch (m_journal));
if (! callback.shouldCancel ())
// Reset fetch timer for the source.
desc.whenToFetch = Time::getCurrentTime () +
RelativeTime (secondsBetweenFetches);
if (result.success)
{
// Reset fetch timer for the source.
desc.whenToFetch = Time::getCurrentTime () +
RelativeTime (secondsBetweenFetches);
SharedState::Access state (m_state);
if (result.success)
{
SharedState::Access state (m_state);
// Add the new source info to the map
merge (result.list, state);
// Add the new source info to the map
merge (result.list, state);
// Swap lists
desc.result.swapWith (result);
// Swap lists
desc.result.swapWith (result);
// Remove the old source info from the map
remove (result.list, state);
// Remove the old source info from the map
remove (result.list, state);
// See if we need to rebuild
checkChosen ();
// See if we need to rebuild
checkChosen ();
// Reset failure status
desc.numberOfFailures = 0;
desc.status = SourceDesc::statusFetched;
// Reset failure status
desc.numberOfFailures = 0;
desc.status = SourceDesc::statusFetched;
// Update the source's list in the store
m_store.update (desc, true);
}
else
{
++desc.numberOfFailures;
desc.status = SourceDesc::statusFailed;
// Update the source's list in the store
m_store.update (desc, true);
}
else
{
++desc.numberOfFailures;
desc.status = SourceDesc::statusFailed;
// Record the failure in the Store
m_store.update (desc);
}
// Record the failure in the Store
m_store.update (desc);
}
}
@@ -445,17 +440,17 @@ public:
m_store.update (desc);
}
/** Check each Source to see if it needs processing.
@return `true` if an interruption occurred.
/** Process up to one source that needs fetching.
@return The number of sources that were fetched.
*/
bool check (CancelCallback& callback)
std::size_t fetch_one ()
{
bool interrupted (false);
std::size_t n (0);
Time const currentTime (Time::getCurrentTime ());
SharedState::Access state (m_state);
for (SourcesType::iterator iter = state->sources.begin ();
iter != state->sources.end (); ++iter)
(n == 0) && iter != state->sources.end (); ++iter)
{
SourceDesc& desc (*iter);
@@ -463,12 +458,8 @@ public:
//
if (desc.whenToFetch <= currentTime)
{
fetch (desc, callback);
if (callback.shouldCancel ())
{
interrupted = true;
break;
}
fetch (desc);
++n;
}
// See if we need to expire
@@ -480,7 +471,7 @@ public:
}
}
return interrupted;
return n;
}
//----------------------------------------------------------------------

View File

@@ -17,7 +17,6 @@
*/
//==============================================================================
/*
Information to track:
@@ -104,27 +103,39 @@ namespace Validators {
class ManagerImp
: public Manager
, public Stoppable
, public ThreadWithCallQueue::EntryPoints
, public Thread
, public DeadlineTimer::Listener
, public LeakChecked <ManagerImp>
{
public:
StoreSqdb m_store;
Logic m_logic;
Journal m_journal;
DeadlineTimer m_checkTimer;
ServiceQueue m_queue;
// True if we should call check on idle.
// This gets set to false once we make it through the whole list.
//
bool m_checkSources;
ManagerImp (Stoppable& parent, Journal journal)
: Stoppable ("Validators::Manager", parent)
, Thread ("Validators")
, m_store (journal)
, m_logic (m_store, journal)
, m_journal (journal)
, m_thread ("Validators")
, m_checkTimer (this)
, m_checkSources (true) // true to cause a full scan on start
, m_checkSources (true)
{
addRPCHandlers();
m_thread.start (this);
startThread();
}
~ManagerImp ()
{
m_thread.stop (true);
stopThread ();
}
//--------------------------------------------------------------------------
@@ -134,7 +145,7 @@ public:
void onStop ()
{
m_thread.stop (false);
m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
}
//--------------------------------------------------------------------------
@@ -149,7 +160,7 @@ public:
Json::Value rpcRebuild (Json::Value const& args)
{
m_thread.call (&Logic::buildChosen, &m_logic);
m_queue.dispatch (bind (&Logic::buildChosen, &m_logic));
Json::Value result;
result ["chosen_list"] = "rebuilding";
return result;
@@ -203,60 +214,40 @@ public:
void addSource (Source* source)
{
#if RIPPLE_USE_NEW_VALIDATORS
bassert (! isStopping());
//m_thread.call (&Logic::add, &m_logic, source);
#else
delete source;
#endif
m_queue.dispatch (bind (&Logic::add, &m_logic, source));
}
void addStaticSource (Source* source)
{
#if RIPPLE_USE_NEW_VALIDATORS
bassert (! isStopping());
//m_thread.call (&Logic::addStatic, &m_logic, source);
#else
delete source;
#endif
m_queue.dispatch (bind (&Logic::addStatic, &m_logic, source));
}
// VFALCO NOTE we should just do this on the callers thread?
//
void receiveValidation (ReceivedValidation const& rv)
{
#if RIPPLE_USE_NEW_VALIDATORS
if (! isStopping())
m_thread.call (&Logic::receiveValidation, &m_logic, rv);
m_queue.dispatch (bind (
&Logic::receiveValidation, &m_logic, rv));
#endif
}
// VFALCO NOTE we should just do this on the callers thread?
//
void ledgerClosed (RippleLedgerHash const& ledgerHash)
{
#if RIPPLE_USE_NEW_VALIDATORS
if (! isStopping())
m_thread.call (&Logic::ledgerClosed, &m_logic, ledgerHash);
m_queue.dispatch (bind (
&Logic::ledgerClosed, &m_logic, ledgerHash));
#endif
}
//--------------------------------------------------------------------------
void onDeadlineTimer (DeadlineTimer& timer)
void init ()
{
#if RIPPLE_USE_NEW_VALIDATORS
if (timer == m_checkTimer)
{
m_checkSources = true;
// This will kick us back into threadIdle
m_thread.interrupt();
}
#endif
}
//--------------------------------------------------------------------------
void threadInit ()
{
#if RIPPLE_USE_NEW_VALIDATORS
File const file (File::getSpecialLocation (
File::userDocumentsDirectory).getChildFile ("validators.sqlite"));
@@ -271,55 +262,49 @@ public:
if (! error)
{
m_logic.load ();
// This flag needs to be on, to force a full check of all
// sources on startup. Once we finish the check we will
// set the deadine timer.
//
bassert (m_checkSources);
}
#endif
}
void threadExit ()
void onDeadlineTimer (DeadlineTimer& timer)
{
// must come last
stopped ();
if (timer == m_checkTimer)
{
m_queue.dispatch (bind (&ManagerImp::setCheckSources, this));
}
}
bool threadIdle ()
void setCheckSources ()
{
bool interrupted = false;
m_checkSources = true;
}
#if RIPPLE_USE_NEW_VALIDATORS
void checkSources ()
{
if (m_checkSources)
{
ThreadCancelCallback cancelCallback (m_thread);
interrupted = m_logic.check (cancelCallback);
if (! interrupted)
if (m_logic.fetch_one () == 0)
{
// Made it through the list without interruption!
// Clear the flag and set the deadline timer again.
//
m_checkSources = false;
m_checkTimer.setExpiration (checkEverySeconds);
}
}
#endif
return interrupted;
}
private:
StoreSqdb m_store;
Logic m_logic;
Journal m_journal;
ThreadWithCallQueue m_thread;
DeadlineTimer m_checkTimer;
void run ()
{
init ();
// True if we should call check on idle.
// This gets set to false once we make it through the whole
// list without interruption.
bool m_checkSources;
while (! this->threadShouldExit())
{
checkSources ();
m_queue.run_one();
}
stopped();
}
};
//------------------------------------------------------------------------------

View File

@@ -48,7 +48,7 @@ public:
return m_file.getFullPathName ();
}
Result fetch (CancelCallback&, Journal journal)
Result fetch (Journal journal)
{
Result result;

View File

@@ -50,7 +50,7 @@ public:
return String::empty;
}
Result fetch (CancelCallback&, Journal journal)
Result fetch (Journal journal)
{
Result result;

View File

@@ -48,7 +48,7 @@ public:
return m_url.full();
}
Result fetch (CancelCallback&, Journal journal)
Result fetch (Journal journal)
{
Result result;

View File

@@ -122,7 +122,7 @@ public:
return String::empty;
}
Result fetch (CancelCallback& cancel, Journal)
Result fetch (Journal)
{
Result result;
@@ -207,8 +207,7 @@ public:
addSources (logic);
NoOpCancelCallback cancelCallback;
logic.check (cancelCallback);
logic.fetch_one ();
ChosenList::Ptr list (logic.getChosen ());

View File

@@ -42,7 +42,6 @@ namespace ripple {
using namespace beast;
}
# include "impl/CancelCallbacks.h"
# include "impl/ChosenList.h"
# include "impl/SourceFile.h"
# include "impl/SourceStrings.h"

View File

@@ -522,12 +522,10 @@ void LedgerMaster::setFullLedger (Ledger::pointer ledger, bool isSynchronous, bo
//--------------------------------------------------------------------------
//
#if RIPPLE_USE_NEW_VALIDATORS
{
if (isCurrent)
getApp ().getValidators ().ledgerClosed (ledger->getHash());
}
#endif
//
//--------------------------------------------------------------------------
}

View File

@@ -166,7 +166,6 @@ public:
// Initialize the Validators object with Config information.
void initValidatorsConfig ()
{
#if RIPPLE_USE_NEW_VALIDATORS
{
std::vector <std::string> const& strings (getConfig().validators);
if (! strings.empty ())
@@ -182,7 +181,6 @@ public:
{
m_validators->addFile (getConfig().getValidatorsFile());
}
#endif
}
//--------------------------------------------------------------------------

View File

@@ -1460,14 +1460,12 @@ static void checkValidation (Job&, SerializedValidation::pointer val, bool isTru
//----------------------------------------------------------------------
//
{
#if RIPPLE_USE_NEW_VALIDATORS
SerializedValidation const& sv (*val);
Validators::ReceivedValidation rv;
rv.ledgerHash = sv.getLedgerHash ();
rv.publicKey = sv.getSignerPublic();
rv.publicKeyHash = sv.getSignerPublic();
getApp ().getValidators ().receiveValidation (rv);
#endif
}
//
//----------------------------------------------------------------------