Validators improvements

This commit is contained in:
Vinnie Falco
2013-09-28 18:03:44 -07:00
parent a6ec4f91b0
commit c82b1b1853
5 changed files with 114 additions and 57 deletions

View File

@@ -223,6 +223,11 @@ public:
struct State struct State
{ {
State ()
{
//sources.reserve (64);
}
MapType map; MapType map;
SourcesType sources; SourcesType sources;
}; };
@@ -266,20 +271,19 @@ public:
// //
void addStatic (Source* source) void addStatic (Source* source)
{ {
m_journal.info << "Add static Source, " << source->name(); m_journal.info << "Addding static source '" << source->name() << "'";
ScopedPointer <Source> object (source); ScopedPointer <Source> object (source);
Source::Result result (object->fetch (m_journal)); Source::Result result (object->fetch (m_journal));
SharedState::Access state (m_state);
if (result.success) if (result.success)
{ {
merge (result.list, state); SharedState::Access state (m_state);
merge (result.list, source, state);
} }
else else
{ {
// VFALCO NOTE Maybe log the error and message? // TODO: Report the error
} }
} }
@@ -287,18 +291,23 @@ public:
// //
void add (Source* source) void add (Source* source)
{ {
m_journal.info << "Add Source, " << source->name(); m_journal.info << "Adding source '" << source->name() << "'";
{
SharedState::Access state (m_state); SharedState::Access state (m_state);
SourceDesc& desc (*state->sources.emplace_back ()); SourceDesc& desc (*state->sources.emplace_back ());
desc.source = source; desc.source = source;
m_store.insert (desc); m_store.insert (desc);
} }
}
// Add each entry in the list to the map, incrementing the // Add each entry in the list to the map, incrementing the
// reference count if it already exists, and updating fields. // reference count if it already exists, and updating fields.
// //
void merge (Array <Source::Info> const& list, SharedState::Access& state) void merge (Array <Source::Info> const& list,
Source* source, SharedState::Access& state)
{ {
std::size_t numAdded (0);
for (std::size_t i = 0; i < list.size (); ++i) for (std::size_t i = 0; i < list.size (); ++i)
{ {
Source::Info const& info (list.getReference (i)); Source::Info const& info (list.getReference (i));
@@ -309,16 +318,22 @@ public:
if (result.second) if (result.second)
{ {
// This is a new one // This is a new one
++numAdded;
dirtyChosen (); dirtyChosen ();
} }
} }
m_journal.info << "Added " << numAdded
<< " trusted validators from '" << source->name() << "'";
} }
// Decrement the reference count of each item in the list // Decrement the reference count of each item in the list
// in the map. // in the map.
// //
void remove (Array <Source::Info> const& list, SharedState::Access& state) void remove (Array <Source::Info> const& list,
Source* source, SharedState::Access& state)
{ {
std::size_t numRemoved (0);
for (std::size_t i = 0; i < list.size (); ++i) for (std::size_t i = 0; i < list.size (); ++i)
{ {
Source::Info const& info (list.getReference (i)); Source::Info const& info (list.getReference (i));
@@ -328,10 +343,14 @@ public:
if (--validatorInfo.refCount == 0) if (--validatorInfo.refCount == 0)
{ {
// Last reference removed // Last reference removed
++numRemoved;
state->map.erase (iter); state->map.erase (iter);
dirtyChosen (); dirtyChosen ();
} }
} }
m_journal.info << "Removed " << numRemoved
<< " trusted validators from '" << source->name() << "'";
} }
//---------------------------------------------------------------------- //----------------------------------------------------------------------
@@ -405,13 +424,13 @@ public:
SharedState::Access state (m_state); SharedState::Access state (m_state);
// Add the new source info to the map // Add the new source info to the map
merge (result.list, state); merge (result.list, desc.source, state);
// Swap lists // Swap lists
desc.result.swapWith (result); desc.result.swapWith (result);
// Remove the old source info from the map // Remove the old source info from the map
remove (result.list, state); remove (result.list, desc.source, state);
// See if we need to rebuild // See if we need to rebuild
checkChosen (); checkChosen ();
@@ -437,7 +456,7 @@ public:
void expire (SourceDesc& desc, SharedState::Access& state) void expire (SourceDesc& desc, SharedState::Access& state)
{ {
// Decrement reference count on each validator // Decrement reference count on each validator
remove (desc.result.list, state); remove (desc.result.list, desc.source, state);
m_store.update (desc); m_store.update (desc);
} }

View File

@@ -140,44 +140,6 @@ public:
stopThread (); stopThread ();
} }
//--------------------------------------------------------------------------
//
// Stoppable
//
void onPrepare (Journal journal)
{
journal.info << "Preparing";
addRPCHandlers();
}
void onStart (Journal journal)
{
journal.info << "Starting";
// Do this late so the sources have a chance to be added.
m_queue.dispatch (bind (&ManagerImp::setCheckSources, this));
startThread();
}
void onStop (Journal journal)
{
journal.info << "Stopping";
if (this->Thread::isThreadRunning())
{
m_journal.debug << "Signaling thread exit";
m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
}
else
{
m_journal.debug << "Thread was never started";
stopped();
}
}
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
// RPC::Service // RPC::Service
@@ -232,7 +194,8 @@ public:
void addFile (File const& file) void addFile (File const& file)
{ {
addStaticSource (SourceFile::New (file)); //addStaticSource (SourceFile::New (file));
addSource (SourceFile::New (file));
} }
void addURL (URL const& url) void addURL (URL const& url)
@@ -244,12 +207,20 @@ public:
void addSource (Source* source) void addSource (Source* source)
{ {
#if RIPPLE_USE_NEW_VALIDATORS
m_queue.dispatch (bind (&Logic::add, &m_logic, source)); m_queue.dispatch (bind (&Logic::add, &m_logic, source));
#else
delete source;
#endif
} }
void addStaticSource (Source* source) void addStaticSource (Source* source)
{ {
#if RIPPLE_USE_NEW_VALIDATORS
m_queue.dispatch (bind (&Logic::addStatic, &m_logic, source)); m_queue.dispatch (bind (&Logic::addStatic, &m_logic, source));
#else
delete source;
#endif
} }
// VFALCO NOTE we should just do this on the callers thread? // VFALCO NOTE we should just do this on the callers thread?
@@ -274,6 +245,47 @@ public:
#endif #endif
} }
//--------------------------------------------------------------------------
//
// Stoppable
//
void onPrepare (Journal journal)
{
#if RIPPLE_USE_NEW_VALIDATORS
journal.info << "Preparing";
addRPCHandlers();
#endif
}
void onStart (Journal journal)
{
#if RIPPLE_USE_NEW_VALIDATORS
journal.info << "Starting";
// Do this late so the sources have a chance to be added.
m_queue.dispatch (bind (&ManagerImp::setCheckSources, this));
startThread();
#endif
}
void onStop (Journal journal)
{
journal.info << "Stopping";
if (this->Thread::isThreadRunning())
{
m_journal.debug << "Signaling thread exit";
m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
}
else
{
stopped();
}
}
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
void init () void init ()

View File

@@ -52,6 +52,33 @@ public:
{ {
Result result; Result result;
int64 const fileSize (m_file.getSize ());
if (fileSize != 0)
{
if (fileSize < std::numeric_limits<int32>::max())
{
MemoryBlock buffer (fileSize);
RandomAccessFile f;
RandomAccessFile::ByteCount amountRead;
f.open (m_file, RandomAccessFile::readOnly);
f.read (buffer.begin(), fileSize, &amountRead);
if (amountRead == fileSize)
{
}
}
else
{
// too big!
}
}
else
{
// file doesn't exist
}
return result; return result;
} }

View File

@@ -17,7 +17,6 @@
*/ */
//============================================================================== //==============================================================================
#ifndef RIPPLE_VALIDATORS_UTILITIES_H_INCLUDED #ifndef RIPPLE_VALIDATORS_UTILITIES_H_INCLUDED
#define RIPPLE_VALIDATORS_UTILITIES_H_INCLUDED #define RIPPLE_VALIDATORS_UTILITIES_H_INCLUDED