Persistence for Validators

This commit is contained in:
Vinnie Falco
2013-09-12 21:38:29 -07:00
parent 9b40bc6835
commit c631cc5f92
18 changed files with 971 additions and 239 deletions

View File

@@ -63,6 +63,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\validators\impl\StoreSqdb.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\validators\impl\Tests.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -1427,10 +1433,12 @@
<ClInclude Include="..\..\src\ripple\validators\impl\CancelCallbacks.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\ChosenList.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\Logic.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\SourceDesc.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\SourceFile.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\SourceStrings.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\SourceURL.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\Store.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\StoreSqdb.h" />
<ClInclude Include="..\..\src\ripple\validators\impl\Utilities.h" />
<ClInclude Include="..\..\src\ripple\validators\ripple_validators.h" />
<ClInclude Include="..\..\src\ripple_app\consensus\ripple_DisputedTx.h" />

View File

@@ -921,6 +921,9 @@
<ClCompile Include="..\..\src\ripple\validators\impl\Tests.cpp">
<Filter>[2] Ripple %28New%29\validators\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\validators\impl\StoreSqdb.cpp">
<Filter>[2] Ripple %28New%29\validators\impl</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_app\ripple_app.h">
@@ -1806,6 +1809,12 @@
<ClInclude Include="..\..\src\ripple\validators\impl\ChosenList.h">
<Filter>[2] Ripple %28New%29\validators\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\validators\impl\SourceDesc.h">
<Filter>[2] Ripple %28New%29\validators\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\validators\impl\StoreSqdb.h">
<Filter>[2] Ripple %28New%29\validators\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\validators\impl\CancelCallbacks.h">
<Filter>[2] Ripple %28New%29\validators\impl</Filter>
</ClInclude>

View File

@@ -18,7 +18,10 @@ public:
struct Info
{
/** The unique key for this validator. */
PublicKey key;
PublicKey publicKey;
/** Optional human readable comment describing the validator. */
String label;
};
/** Destroy the Source.
@@ -30,10 +33,9 @@ public:
virtual String name () = 0;
struct CancelCallback
{
virtual bool shouldCancel () = 0;
};
virtual String uniqueID () = 0;
virtual String createParam () = 0;
/** Fetch the most recent list from the Source.
If possible, the Source should periodically poll the

View File

@@ -19,6 +19,12 @@ struct ReceivedValidation
PublicKeyHash signerPublicKeyHash;
};
/** Callback used to optionally cancel long running fetch operations. */
struct CancelCallback
{
virtual bool shouldCancel () = 0;
};
}
#endif

View File

@@ -12,7 +12,7 @@ namespace Validators
// Dummy CancelCallback that does nothing
//
class NoOpCancelCallback : public Source::CancelCallback
class NoOpCancelCallback : public CancelCallback
{
public:
bool shouldCancel ()
@@ -27,7 +27,7 @@ public:
// CancelCallback attached to ThreadWithCallQueue
//
class ThreadCancelCallback
: public Source::CancelCallback
: public CancelCallback
, public Uncopyable
{
public:

View File

@@ -25,52 +25,14 @@ enum
,expectedNumberOfResults = 1000
};
//------------------------------------------------------------------------------
// Encapsulates the logic for creating the chosen validators.
// This is a separate class to facilitate the unit tests.
//
class Logic
{
public:
// Information associated with each Source
//
struct SourceDesc
{
enum
{
keysPreallocationSize = 1000
};
enum Status
{
statusNone,
statusFetched,
statusFailed
};
ScopedPointer <Source> source;
Status status;
Time whenToFetch;
int numberOfFailures;
// The result of the last fetch
Source::Result result;
//------------------------------------------------------------------
SourceDesc () noexcept
: status (statusNone)
, whenToFetch (Time::getCurrentTime ())
, numberOfFailures (0)
{
}
~SourceDesc ()
{
}
};
typedef DynamicList <SourceDesc> SourcesType;
//----------------------------------------------------------------------
// Information associated with each distinguishable validator
@@ -85,20 +47,27 @@ public:
int refCount;
};
typedef boost::unordered_map <PublicKey, ValidatorInfo, PublicKey::HashFunction> MapType;
typedef boost::unordered_map <
PublicKey, ValidatorInfo, PublicKey::HashFunction> MapType;
//----------------------------------------------------------------------
explicit Logic (Journal journal = Journal ())
: m_journal (journal)
, m_chosenListNeedsUpdate (false)
Logic (Store& store, Journal journal = Journal ())
: m_store (store)
, m_journal (journal)
, m_rebuildChosenList (false)
{
}
void load ()
{
// load data from m_store
}
// Add a one-time static source.
// Fetch is called right away, this call blocks.
//
void addStaticSource (Source* source)
void addStatic (Source* source)
{
m_journal.info() << "Add static Source, " << source->name();
@@ -110,7 +79,7 @@ public:
if (result.success)
{
addSourceInfo (result.list);
merge (result.list);
}
else
{
@@ -120,14 +89,205 @@ public:
// Add a live source to the list of sources.
//
void addSource (Source* source)
void add (Source* source)
{
m_journal.info() << "Add Source, " << source->name();
SourceDesc& desc (*m_sources.emplace_back ());
desc.source = source;
m_store.insert (desc);
}
// Add each entry in the list to the map, incrementing the
// reference count if it already exists, and updating fields.
//
void merge (Array <Source::Info> const& list)
{
for (std::size_t i = 0; i < list.size (); ++i)
{
Source::Info const& info (list.getReference (i));
std::pair <MapType::iterator, bool> result (
m_map.emplace (info.publicKey, ValidatorInfo ()));
ValidatorInfo& validatorInfo (result.first->second);
++validatorInfo.refCount;
if (result.second)
{
// This is a new one
dirtyChosen ();
}
}
}
// Decrement the reference count of each item in the list
// in the map.
//
void remove (Array <Source::Info> const& list)
{
for (std::size_t i = 0; i < list.size (); ++i)
{
Source::Info const& info (list.getReference (i));
MapType::iterator iter (m_map.find (info.publicKey));
bassert (iter != m_map.end ());
ValidatorInfo& validatorInfo (iter->second);
if (--validatorInfo.refCount == 0)
{
// Last reference removed
m_map.erase (iter);
dirtyChosen ();
}
}
}
//----------------------------------------------------------------------
//
// Chosen
//
/** Rebuild the Chosen List. */
void buildChosen ()
{
ChosenList::Ptr list (new ChosenList (m_map.size ()));
for (MapType::iterator iter = m_map.begin ();
iter != m_map.end (); ++iter)
{
ChosenList::Info info;
list->insert (iter->first, info);
}
// This is thread safe
m_chosenList = list;
m_journal.debug() <<
"Rebuilt chosen list with " <<
String::fromNumber (m_chosenList->size()) << " entries";
}
/** Mark the Chosen List for a rebuild. */
void dirtyChosen ()
{
m_rebuildChosenList = true;
}
/** Rebuild the Chosen List if necessary. */
void checkChosen ()
{
if (m_rebuildChosenList)
{
buildChosen ();
m_rebuildChosenList = false;
}
}
/** Returns the current Chosen list.
This can be called from any thread at any time.
*/
ChosenList::Ptr getChosen ()
{
return m_chosenList;
}
//----------------------------------------------------------------------
//
// Fetching
//
/** Perform a fetch on the source. */
void fetch (SourceDesc& desc, CancelCallback& callback)
{
m_journal.info() << "fetch ('" << desc.source->name() << "')";
Source::Result result (desc.source->fetch (callback, m_journal));
if (! callback.shouldCancel ())
{
// Reset fetch timer for the source.
desc.whenToFetch = Time::getCurrentTime () +
RelativeTime (secondsBetweenFetches);
if (result.success)
{
// Add the new source info to the map
merge (result.list);
// Swap lists
desc.result.swapWith (result);
// Remove the old source info from the map
remove (result.list);
// See if we need to rebuild
checkChosen ();
// Reset failure status
desc.numberOfFailures = 0;
desc.status = SourceDesc::statusFetched;
// Update the source's list in the store
m_store.update (desc, true);
}
else
{
++desc.numberOfFailures;
desc.status = SourceDesc::statusFailed;
// Record the failure in the Store
m_store.update (desc);
}
}
}
/** Expire a source's list of validators. */
void expire (SourceDesc& desc)
{
// Decrement reference count on each validator
remove (desc.result.list);
m_store.update (desc);
}
/** Check each Source to see if it needs processing.
@return `true` if an interruption occurred.
*/
bool check (CancelCallback& callback)
{
bool interrupted (false);
Time const currentTime (Time::getCurrentTime ());
for (SourcesType::iterator iter = m_sources.begin ();
iter != m_sources.end (); ++iter)
{
SourceDesc& desc (*iter);
// See if we should fetch
//
if (desc.whenToFetch <= currentTime)
{
fetch (desc, callback);
if (callback.shouldCancel ())
{
interrupted = true;
break;
}
}
// See if we need to expire
//
if (desc.expirationTime.isNotNull () &&
desc.expirationTime <= currentTime)
{
expire (desc);
}
}
return interrupted;
}
//----------------------------------------------------------------------
//
// Ripple interface
//
// Called when we receive a validation from a peer.
//
void receiveValidation (ReceivedValidation const& rv)
@@ -147,158 +307,9 @@ public:
#endif
}
// Add each entry in the list to the map, incrementing the
// reference count if it already exists, and updating fields.
// Returns `true` if the public key hash is contained in the Chosen List.
//
void addSourceInfo (Array <Source::Info> const& list)
{
for (std::size_t i = 0; i < list.size (); ++i)
{
Source::Info const& info (list.getReference (i));
std::pair <MapType::iterator, bool> result (
m_map.emplace (info.key, ValidatorInfo ()));
ValidatorInfo& validatorInfo (result.first->second);
++validatorInfo.refCount;
if (result.second)
{
// This is a new one
markDirtyChosenList ();
}
}
}
// Decrement the reference count of each item in the list
// in the map
//
void removeSourceInfo (Array <Source::Info> const& list)
{
for (std::size_t i = 0; i < list.size (); ++i)
{
Source::Info const& info (list.getReference (i));
MapType::iterator iter (m_map.find (info.key));
bassert (iter != m_map.end ());
ValidatorInfo& validatorInfo (iter->second);
if (--validatorInfo.refCount == 0)
{
// Last reference removed
m_map.erase (iter);
markDirtyChosenList ();
}
}
}
// Fetch one source
//
void fetchSource (SourceDesc& desc, Source::CancelCallback& callback)
{
m_journal.info() << "Fetching Source, " << desc.source->name();
Source::Result result (desc.source->fetch (callback, m_journal));
if (! callback.shouldCancel ())
{
// Reset fetch timer for the source.
desc.whenToFetch = Time::getCurrentTime () +
RelativeTime (secondsBetweenFetches);
if (result.success)
{
// Add the new source info to the map
addSourceInfo (result.list);
// Swap lists
desc.result.swapWith (result);
// Remove the old source info from the map
removeSourceInfo (result.list);
// See if we need to rebuild
checkDirtyChosenList ();
// Reset failure status
desc.numberOfFailures = 0;
desc.status = SourceDesc::statusFetched;
}
else
{
++desc.numberOfFailures;
desc.status = SourceDesc::statusFailed;
}
}
}
// Check each source to see if it needs fetching.
//
void checkSources (Source::CancelCallback& callback)
{
m_journal.info() << "Checking Sources";
Time const currentTime (Time::getCurrentTime ());
for (SourcesType::iterator iter = m_sources.begin ();
! callback.shouldCancel () && iter != m_sources.end (); ++iter)
{
SourceDesc& desc (*iter);
if (desc.whenToFetch <= currentTime)
fetchSource (desc, callback);
}
}
// Signal that the Chosen List needs to be rebuilt.
//
void markDirtyChosenList ()
{
m_chosenListNeedsUpdate = true;
}
// Check the dirty state of the Chosen List, and rebuild it
// if necessary.
//
void checkDirtyChosenList ()
{
if (m_chosenListNeedsUpdate)
{
buildChosenList ();
m_chosenListNeedsUpdate = false;
}
}
// Rebuilds the Chosen List
//
void buildChosenList ()
{
ChosenList::Ptr list (new ChosenList (m_map.size ()));
for (MapType::iterator iter = m_map.begin ();
iter != m_map.end (); ++iter)
{
ChosenList::Info info;
list->insert (iter->first, info);
}
// This is thread safe
m_chosenList = list;
m_journal.debug() <<
"Rebuilt chosen list with " <<
String::fromNumber (m_chosenList->size()) << " entries";
}
// Get a reference to the chosen list.
// This is safe to call from any thread at any time.
//
ChosenList::Ptr getChosenList ()
{
return m_chosenList;
}
//----------------------------------------------------------------------
//
// Ripple interface
//
// These routines are modeled after UniqueNodeList
bool isTrustedPublicKeyHash (
PublicKeyHash const& publicKeyHash)
bool isTrustedPublicKeyHash (PublicKeyHash const& publicKeyHash)
{
return m_chosenList->containsPublicKeyHash (publicKeyHash);
}
@@ -308,10 +319,11 @@ public:
//----------------------------------------------------------------------
private:
Store& m_store;
Journal m_journal;
SourcesType m_sources;
MapType m_map;
bool m_chosenListNeedsUpdate;
bool m_rebuildChosenList;
ChosenList::Ptr m_chosenList;
};

View File

@@ -4,6 +4,10 @@
*/
//==============================================================================
#ifndef RIPPLE_VALIDATORS_DISABLE_MANAGER
#define RIPPLE_VALIDATORS_DISABLE_MANAGER 1
#endif
/*
Information to track:
@@ -95,10 +99,12 @@ class ManagerImp
{
public:
explicit ManagerImp (Journal journal)
: m_logic (journal)
: m_store (journal)
, m_logic (m_store, journal)
, m_journal (journal)
, m_thread ("Validators")
, m_checkTimer (this)
, m_checkSources (true) // true to cause a full scan on start
{
m_thread.start (this);
}
@@ -136,40 +142,71 @@ public:
void addSource (Source* source)
{
m_thread.call (&Logic::addSource, &m_logic, source);
#if RIPPLE_VALIDATORS_DISABLE_MANAGER
delete source;
#else
m_thread.call (&Logic::add, &m_logic, source);
#endif
}
void addStaticSource (Source* source)
{
m_thread.call (&Logic::addStaticSource, &m_logic, source);
#if RIPPLE_VALIDATORS_DISABLE_MANAGER
delete source;
#else
m_thread.call (&Logic::addStatic, &m_logic, source);
#endif
}
void receiveValidation (ReceivedValidation const& rv)
{
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
m_thread.call (&Logic::receiveValidation, &m_logic, rv);
#endif
}
//--------------------------------------------------------------------------
// This intermediate function is used to provide the CancelCallback
void checkSources ()
{
ThreadCancelCallback cancelCallback (m_thread);
m_logic.checkSources (cancelCallback);
}
void onDeadlineTimer (DeadlineTimer& timer)
{
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
if (timer == m_checkTimer)
m_thread.call (&ManagerImp::checkSources, this);
{
m_checkSources = true;
// This will kick us back into threadIdle
m_thread.interrupt();
}
#endif
}
//--------------------------------------------------------------------------
void threadInit ()
{
m_checkTimer.setRecurringExpiration (checkEverySeconds);
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
File const file (File::getSpecialLocation (
File::userDocumentsDirectory).getChildFile ("validators.sqlite"));
Error error (m_store.open (file));
if (error)
{
m_journal.fatal() <<
"Failed to open '" << file.getFullPathName() << "'";
}
if (! error)
{
m_logic.load ();
// This flag needs to be on, to force a full check of all
// sources on startup. Once we finish the check we will
// set the deadine timer.
//
bassert (m_checkSources);
}
#endif
}
void threadExit ()
@@ -180,14 +217,35 @@ public:
{
bool interrupted = false;
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
if (m_checkSources)
{
ThreadCancelCallback cancelCallback (m_thread);
interrupted = m_logic.check (cancelCallback);
if (! interrupted)
{
// Made it through the list without interruption!
// Clear the flag and set the deadline timer again.
m_checkSources = false;
m_checkTimer.setExpiration (checkEverySeconds);
}
}
#endif
return interrupted;
}
private:
StoreSqdb m_store;
Logic m_logic;
Journal m_journal;
ThreadWithCallQueue m_thread;
DeadlineTimer m_checkTimer;
// True if we should call check on idle.
// This gets set to false once we make it through the whole
// list without interruption.
bool m_checkSources;
};
//------------------------------------------------------------------------------

View File

@@ -0,0 +1,57 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_VALIDATORS_SOURCEDESC_H_INCLUDED
#define RIPPLE_VALIDATORS_SOURCEDESC_H_INCLUDED
namespace Validators
{
/** Additional state information associated with a Source. */
struct SourceDesc
{
enum Status
{
statusNone,
statusFetched,
statusFailed
};
ScopedPointer <Source> source;
Status status;
Time whenToFetch;
int numberOfFailures;
// The result of the last fetch
Source::Result result;
//------------------------------------------------------------------
/** The time of the last successful fetch. */
Time lastFetchTime;
/** When to expire this source's list of cached results (if any) */
Time expirationTime;
//------------------------------------------------------------------
SourceDesc () noexcept
: status (statusNone)
, whenToFetch (Time::getCurrentTime ())
, numberOfFailures (0)
{
}
~SourceDesc ()
{
}
};
typedef DynamicList <SourceDesc> SourcesType;
}
#endif

View File

@@ -24,6 +24,16 @@ public:
return "File :'" + m_file.getFullPathName () + "'";
}
String uniqueID ()
{
return "File," + m_file.getFullPathName ();
}
String createParam ()
{
return m_file.getFullPathName ();
}
Result fetch (CancelCallback&, Journal journal)
{
Result result;

View File

@@ -26,6 +26,16 @@ public:
return m_name;
}
String uniqueID ()
{
return String::empty;
}
String createParam ()
{
return String::empty;
}
Result fetch (CancelCallback&, Journal journal)
{
Result result;

View File

@@ -24,6 +24,16 @@ public:
return "URL: '" + m_url.full() + "'";
}
String uniqueID ()
{
return "URL," + m_url.full();
}
String createParam ()
{
return m_url.full();
}
Result fetch (CancelCallback&, Journal journal)
{
Result result;

View File

@@ -4,19 +4,29 @@
*/
//==============================================================================
#ifndef RIPPLE_VALIDATORS_VALIDATORSSTORE_H_INCLUDED
#define RIPPLE_VALIDATORS_VALIDATORSSTORE_H_INCLUDED
#ifndef RIPPLE_VALIDATORS_STORE_H_INCLUDED
#define RIPPLE_VALIDATORS_STORE_H_INCLUDED
namespace Validators
{
/** Database persistence for Validators. */
/** Abstract persistence for Validators data. */
class Store
{
public:
virtual ~Store () { }
/** Insert a new SourceDesc to the Store.
The caller's SourceDesc will have any available persistent
information filled in from the Store.
*/
virtual void insert (SourceDesc& desc) = 0;
/** Update the SourceDesc fixed fields. */
virtual void update (SourceDesc& desc, bool updateFetchResults = false) = 0;
protected:
Store () { }
};
}

View File

@@ -0,0 +1,345 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
namespace Validators
{
StoreSqdb::StoreSqdb (Journal journal)
: m_journal (journal)
{
}
StoreSqdb::~StoreSqdb ()
{
}
Error StoreSqdb::open (File const& file)
{
Error error (m_session.open (file.getFullPathName ()));
if (!error)
error = init ();
return error;
}
//--------------------------------------------------------------------------
void StoreSqdb::insert (SourceDesc& desc)
{
sqdb::transaction tr (m_session);
bool const found (select (desc));
if (found)
{
selectList (desc);
}
else
{
Error error;
String const sourceID (desc.source->uniqueID().toStdString());
String const createParam (desc.source->createParam().toStdString());
String const lastFetchTime (Utilities::timeToString (desc.lastFetchTime));
String const expirationTime (Utilities::timeToString (desc.expirationTime));
sqdb::statement st = (m_session.prepare <<
"INSERT INTO ValidatorsSource ( "
" sourceID, "
" createParam, "
" lastFetchTime, "
" expirationTime "
") VALUES ( "
" ?, ?, ?, ? "
"); "
,sqdb::use (sourceID)
,sqdb::use (createParam)
,sqdb::use (lastFetchTime)
,sqdb::use (expirationTime)
);
st.execute_and_fetch (error);
if (! error)
{
error = tr.commit ();
}
if (error)
{
tr.rollback ();
report (error, __FILE__, __LINE__);
}
}
}
//--------------------------------------------------------------------------
void StoreSqdb::update (SourceDesc& desc, bool updateFetchResults)
{
Error error;
String const sourceID (desc.source->uniqueID());
String const lastFetchTime (Utilities::timeToString (desc.lastFetchTime));
String const expirationTime (Utilities::timeToString (desc.expirationTime));
sqdb::transaction tr (m_session);
m_session.once (error) <<
"UPDATE ValidatorsSource SET "
" lastFetchTime = ?, "
" expirationTime = ? "
"WHERE "
" sourceID = ? "
,sqdb::use (lastFetchTime)
,sqdb::use (expirationTime)
,sqdb::use (sourceID)
;
if (! error && updateFetchResults)
{
// Delete the previous data set
{
sqdb::statement st = (m_session.prepare <<
"DELETE FROM ValidatorsSourceInfo WHERE "
" sourceID = ?; "
,sqdb::use (sourceID)
);
st.execute_and_fetch (error);
}
// Insert the new data set
if (! error)
{
std::string publicKey;
String label;
sqdb::statement st = (m_session.prepare <<
"INSERT INTO ValidatorsSourceInfo ( "
" sourceID, "
" publicKey, "
" label "
") VALUES ( "
" ?, ?, ? "
");"
,sqdb::use (sourceID)
,sqdb::use (publicKey)
,sqdb::use (label)
);
Array <Source::Info>& list (desc.result.list);
for (std::size_t i = 0; ! error && i < list.size(); ++i)
{
Source::Info& info (list.getReference(i));
publicKey = Utilities::publicKeyToString (info.publicKey);
label = list[i].label;
st.execute_and_fetch (error);
}
}
}
if (! error)
{
error = tr.commit ();
}
if (error)
{
report (error, __FILE__, __LINE__);
}
}
//--------------------------------------------------------------------------
void StoreSqdb::report (Error const& error, char const* fileName, int lineNumber)
{
if (error)
{
m_journal.error() <<
"Failure: '"<< error.getReasonText() << "' " <<
" at " << Debug::getSourceLocation (fileName, lineNumber);
}
}
//--------------------------------------------------------------------------
/** Reads the fixed information into the SourceDesc if it exists.
Returns `true` if the record was found.
*/
bool StoreSqdb::select (SourceDesc& desc)
{
bool found (false);
Error error;
String const sourceID (desc.source->uniqueID());
String lastFetchTime;
String expirationTime;
sqdb::statement st = (m_session.prepare <<
"SELECT "
" lastFetchTime, "
" expirationTime "
"FROM ValidatorsSource WHERE "
" sourceID = ? "
,sqdb::into (lastFetchTime)
,sqdb::into (expirationTime)
,sqdb::use (sourceID)
);
if (st.execute_and_fetch (error))
{
found = true;
desc.lastFetchTime = Utilities::stringToTime (lastFetchTime);
desc.expirationTime = Utilities::stringToTime (expirationTime);
}
if (error)
{
report (error, __FILE__, __LINE__);
}
return found;
}
//--------------------------------------------------------------------------
/** Reads the variable information into the SourceDesc.
This should only be called when the sourceID was already found.
*/
void StoreSqdb::selectList (SourceDesc& desc)
{
Error error;
String const sourceID (desc.source->uniqueID());
// Get the count
std::size_t count;
if (! error)
{
m_session.once (error) <<
"SELECT "
" COUNT(*) "
"FROM ValidatorsSourceInfo WHERE "
" sourceID = ? "
,sqdb::into (count)
,sqdb::use (sourceID)
;
}
if (error)
{
report (error, __FILE__, __LINE__);
return;
}
// Precondition: the list must be empty.
bassert (desc.result.list.size() == 0);
// Pre-allocate some storage
desc.result.list.ensureStorageAllocated (count);
// Prepare the select
{
std::string publicKey;
std::string label;
sqdb::statement st = (m_session.prepare <<
"SELECT "
" publicKey, "
" label "
"FROM ValidatorsSourceInfo WHERE "
" sourceID = ? "
,sqdb::into (publicKey)
,sqdb::into (label)
,sqdb::use (sourceID)
);
// Add all the records to the list
if (st.execute_and_fetch (error))
{
do
{
Source::Info info;
info.publicKey = Utilities::stringToPublicKey (publicKey);
info.label = label;
desc.result.list.add (info);
}
while (st.fetch (error));
}
}
if (error)
{
report (error, __FILE__, __LINE__);
}
}
//--------------------------------------------------------------------------
Error StoreSqdb::init ()
{
Error error;
sqdb::transaction tr (m_session);
if (! error)
{
m_session.once (error) <<
"PRAGMA encoding=\"UTF-8\"";
}
if (! error)
{
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS ValidatorsSource ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" sourceID TEXT UNIQUE, "
" createParam TEXT NOT NULL, "
" lastFetchTime TEXT NOT NULL, "
" expirationTime TEXT NOT NULL "
");"
;
}
if (! error)
{
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS ValidatorsSourceInfo ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" sourceID TEXT NOT NULL, "
" publicKey TEXT NOT NULL, "
" label TEXT NOT NULL "
");"
;
}
if (! error)
{
m_session.once (error) <<
"CREATE INDEX IF NOT EXISTS "
" ValidatorsSourceInfoIndex ON ValidatorsSourceInfo "
" ( "
" sourceID "
" ); "
;
}
if (! error)
{
error = tr.commit();
}
if (error)
{
tr.rollback ();
report (error, __FILE__, __LINE__);
}
return error;
}
}

View File

@@ -0,0 +1,41 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_VALIDATORS_STORESQDB_H_INCLUDED
#define RIPPLE_VALIDATORS_STORESQDB_H_INCLUDED
namespace Validators
{
/** Database persistence for Validators using SQLite */
class StoreSqdb : public Store
{
public:
explicit StoreSqdb (Journal journal = Journal());
~StoreSqdb ();
Error open (File const& file);
void insert (SourceDesc& desc);
void update (SourceDesc& desc, bool updateFetchResults);
private:
void report (Error const& error, char const* fileName, int lineNumber);
bool select (SourceDesc& desc);
void selectList (SourceDesc& desc);
Error init ();
Journal m_journal;
sqdb::session m_session;
};
}
#endif

View File

@@ -12,7 +12,8 @@ class Tests : public UnitTest
public:
enum
{
numberOfTestValidators = 1000
numberOfTestValidators = 1000,
numberofTestSources = 50
};
//--------------------------------------------------------------------------
@@ -92,7 +93,19 @@ public:
String name ()
{
return "Test";
return uniqueID ();
}
String uniqueID ()
{
return String ("Test,") + m_name + "," +
String::fromNumber (m_start) + "," +
String::fromNumber (m_end);
}
String createParam ()
{
return String::empty;
}
Result fetch (CancelCallback& cancel, Journal)
@@ -106,7 +119,8 @@ public:
for (uint32 i = m_start ; i < m_end; ++i)
{
Info info;
info.key = Validators::PublicKey::createFromInteger (i);
info.publicKey = Validators::PublicKey::createFromInteger (i);
info.label = String::fromNumber (i);
result.list.add (info);
}
@@ -120,36 +134,81 @@ public:
//--------------------------------------------------------------------------
class TestStore : public Store
{
public:
TestStore ()
{
}
~TestStore ()
{
}
void insertSourceDesc (SourceDesc& desc)
{
}
void updateSourceDesc (SourceDesc& desc)
{
}
void updateSourceDescInfo (SourceDesc& desc)
{
}
};
//--------------------------------------------------------------------------
void addSources (Logic& logic)
{
#if 0
logic.addSource (new TestSource ("source 1", 0, 1000));
logic.addSource (new TestSource ("source 2", 200, 1500));
logic.addSource (new TestSource ("source 3", 500, 2000));
logic.addSource (new TestSource ("source 4", 750, 2200));
logic.addSource (new TestSource ("source 5", 1500, 3200));
#else
logic.addSource (new TestSource ("source 1", 0, 1));
#endif
for (int i = 1; i <= numberofTestSources; ++i)
{
String const name (String::fromNumber (i));
uint32 const start = random().nextInt (numberOfTestValidators);
uint32 const end = start + random().nextInt (numberOfTestValidators);
logic.add (new TestSource (name, start, end));
}
}
void testLogic ()
{
beginTestCase ("logic");
Logic logic;
//TestStore store;
StoreSqdb storage;
File const file (
File::getSpecialLocation (
File::userDocumentsDirectory).getChildFile (
"validators-test.sqlite"));
// Can't call this 'error' because of ADL and Journal::error
Error err (storage.open (file));
unexpected (err, err.what());
Logic logic (storage, Journal ());
logic.load ();
addSources (logic);
NoOpCancelCallback cancelCallback;
logic.checkSources (cancelCallback);
logic.check (cancelCallback);
ChosenList::Ptr list (logic.getChosenList ());
ChosenList::Ptr list (logic.getChosen ());
pass ();
}
void runTest ()
{
// We need to use the same seed so we create the
// same IDs for the set of TestSource objects.
//
int64 const seedValue = 10;
random().setSeed (seedValue);
testLogic ();
}

View File

@@ -170,4 +170,85 @@ void Utilities::parseResultLine (
}
}
//--------------------------------------------------------------------------
String Utilities::itos (int i, int fieldSize)
{
return String::fromNumber (i).paddedLeft (beast_wchar('0'), fieldSize);
}
String Utilities::timeToString (Time const& t)
{
if (t.isNotNull ())
{
return
itos (t.getYear(), 4) + "-" +
itos (t.getMonth(), 2) + "-" +
itos (t.getDayOfMonth (), 2) + " " +
itos (t.getHours () , 2) + ":" +
itos (t.getMinutes (), 2) + ":" +
itos (t.getSeconds(), 2);
}
return String::empty;
}
int Utilities::stoi (String& s, int fieldSize, int minValue, int maxValue, beast_wchar delimiter)
{
int const needed (fieldSize + ((delimiter != 0) ? 1 : 0));
String const v (s.substring (0, needed));
s = s.substring (v.length ());
if (s.length() == needed)
{
int const v (s.getIntValue());
if (s.startsWith (itos (v, fieldSize)) &&
v >= minValue && v <= maxValue &&
(delimiter == 0 || s.endsWithChar (delimiter)))
{
return v;
}
}
return -1; // fail
}
Time Utilities::stringToTime (String s)
{
if (s.isNotEmpty ())
{
int const year (stoi (s, 4, 1970, 9999, '-'));
int const mon (stoi (s, 2, 0, 11, '-'));
int const day (stoi (s, 2, 1, 31, ' '));
int const hour (stoi (s, 2, 0, 23, ':'));
int const min (stoi (s, 2, 0, 59, ':'));
int const sec (stoi (s, 2, 0, 59, 0 ));
if (year != -1 &&
mon != -1 &&
day != -1 &&
hour != -1 &&
min != -1 &&
sec != -1)
{
// local time
return Time (year, mon, day, hour, min, sec, 0, true);
}
}
return Time (0);
}
std::string Utilities::publicKeyToString (PublicKey const& publicKey)
{
std::string s (PublicKey::sizeInBytes, ' ');
std::copy (publicKey.cbegin(), publicKey.cend(), s.begin());
return s;
}
PublicKey Utilities::stringToPublicKey (std::string const& s)
{
bassert (s.size() == PublicKey::sizeInBytes);
return PublicKey (&s.front());
}
//------------------------------------------------------------------------------
}

View File

@@ -46,7 +46,18 @@ public:
std::string const& line,
Journal journal = Journal());
private:
// helpers
static String itos (int i, int fieldSize = 0);
static int stoi (String& s, int fieldSize, int minValue, int maxValue, beast_wchar delimiter);
// conversion betwen Time and String
static String timeToString (Time const& t);
static Time stringToTime (String s);
// conversion between PublicKey and String
static std::string publicKeyToString (PublicKey const& publicKey);
static PublicKey stringToPublicKey (std::string const& s);
struct Helpers;
/** Parse a string into a Source::Info.

View File

@@ -24,8 +24,10 @@ namespace ripple
# include "impl/SourceFile.h"
# include "impl/SourceStrings.h"
# include "impl/SourceURL.h"
# include "impl/Store.h"
# include "impl/Utilities.h"
# include "impl/SourceDesc.h"
# include "impl/Store.h"
# include "impl/StoreSqdb.h"
#include "impl/Logic.h"
#include "impl/Manager.cpp"
@@ -33,6 +35,7 @@ namespace ripple
#include "impl/SourceFile.cpp"
#include "impl/SourceStrings.cpp"
#include "impl/SourceURL.cpp"
#include "impl/StoreSqdb.cpp"
#include "impl/Tests.cpp"
#include "impl/Utilities.cpp"