Refactor NodeStore

This commit is contained in:
Vinnie Falco
2013-07-17 19:52:13 -07:00
parent 5caaea60c9
commit 8b1592e494
27 changed files with 869 additions and 508 deletions

View File

@@ -95,220 +95,519 @@ int NodeStore::Backend::getWriteLoad ()
// NodeStore
//
Array <NodeStore::BackendFactory*> NodeStore::s_factories;
NodeStore::NodeStore (String backendParameters,
String fastBackendParameters,
int cacheSize,
int cacheAge)
: m_backend (createBackend (backendParameters))
, m_fastBackend (fastBackendParameters.isNotEmpty () ? createBackend (fastBackendParameters)
: nullptr)
, m_cache ("NodeStore", cacheSize, cacheAge)
, m_negativeCache ("NoteStoreNegativeCache", 0, 120)
class NodeStoreImp : public NodeStore
{
}
void NodeStore::addBackendFactory (BackendFactory& factory)
{
s_factories.add (&factory);
}
float NodeStore::getCacheHitRate ()
{
return m_cache.getHitRate ();
}
void NodeStore::tune (int size, int age)
{
m_cache.setTargetSize (size);
m_cache.setTargetAge (age);
}
void NodeStore::sweep ()
{
m_cache.sweep ();
m_negativeCache.sweep ();
}
void NodeStore::waitWrite ()
{
m_backend->waitWrite ();
if (m_fastBackend)
m_fastBackend->waitWrite ();
}
int NodeStore::getWriteLoad ()
{
return m_backend->getWriteLoad ();
}
bool NodeStore::store (NodeObjectType type, uint32 index,
Blob const& data, uint256 const& hash)
{
bool wasStored = false;
bool const keyFoundAndObjectCached = m_cache.refreshIfPresent (hash);
// VFALCO NOTE What happens if the key is found, but the object
// fell out of the cache? We will end up passing it
// to the backend anyway.
//
if (! keyFoundAndObjectCached)
public:
/** Size of a key.
*/
enum
{
keyBytes = 32
};
// VFALCO TODO Rename this to RIPPLE_NODESTORE_VERIFY_HASHES and make
// it be 1 or 0 instead of merely defined or undefined.
//
#ifdef PARANOID
assert (hash == Serializer::getSHA512Half (data));
#endif
/** Parsed key/value blob into NodeObject components.
NodeObject::pointer object = boost::make_shared<NodeObject> (type, index, data, hash);
This will extract the information required to construct
a NodeObject. It also does consistency checking and returns
the result, so it is possible to determine if the data
is corrupted without throwing an exception. Note all forms
of corruption are detected so further analysis will be
needed to eliminate false positives.
// VFALCO NOTE What does it mean to canonicalize an object?
//
if (!m_cache.canonicalize (hash, object))
This is the format in which a NodeObject is stored in the
persistent storage layer.
*/
struct DecodedBlob
{
/** Construct the decoded blob from raw data.
The `success` member will indicate if the operation was succesful.
*/
DecodedBlob (void const* keyParam, void const* value, int valueBytes)
{
m_backend->store (object);
/* Data format:
if (m_fastBackend)
m_fastBackend->store (object);
Bytes
0...3 LedgerIndex 32-bit big endian integer
4...7 Unused? An unused copy of the LedgerIndex
8 char One of NodeObjectType
9...end The body of the object data
*/
success = false;
key = keyParam;
// VFALCO NOTE Ledger indexes should have started at 1
ledgerIndex = LedgerIndex (-1);
objectType = hotUNKNOWN;
objectData = nullptr;
dataBytes = bmax (0, valueBytes - 9);
if (dataBytes > 4)
{
LedgerIndex const* index = static_cast <LedgerIndex const*> (value);
ledgerIndex = ByteOrder::swapIfLittleEndian (*index);
}
// VFALCO NOTE What about bytes 4 through 7 inclusive?
if (dataBytes > 8)
{
unsigned char const* byte = static_cast <unsigned char const*> (value);
objectType = static_cast <NodeObjectType> (byte [8]);
}
if (dataBytes > 9)
{
objectData = static_cast <unsigned char const*> (value) + 9;
switch (objectType)
{
case hotUNKNOWN:
default:
break;
case hotLEDGER:
case hotTRANSACTION:
case hotACCOUNT_NODE:
case hotTRANSACTION_NODE:
success = true;
break;
}
}
}
m_negativeCache.del (hash);
wasStored = true;
}
return wasStored;
}
NodeObject::pointer NodeStore::retrieve (uint256 const& hash)
{
NodeObject::pointer obj = m_cache.fetch (hash);
if (obj || m_negativeCache.isPresent (hash))
return obj;
if (m_fastBackend)
{
obj = retrieve (m_fastBackend, hash);
if (obj)
/** Create a NodeObject from this data.
*/
NodeObject::pointer createObject ()
{
m_cache.canonicalize (hash, obj);
return obj;
NodeObject::pointer object;
if (success)
{
// VFALCO NOTE I dislke these shared pointers from boost
object = boost::make_shared <NodeObject> (
objectType, ledgerIndex, objectData, dataBytes, uint256 (key));
}
return object;
}
bool success;
void const* key;
LedgerIndex ledgerIndex;
NodeObjectType objectType;
unsigned char const* objectData;
int dataBytes;
};
//--------------------------------------------------------------------------
class EncodedBlob
{
HeapBlock <char> data;
};
public:
NodeStoreImp (String backendParameters,
String fastBackendParameters,
int cacheSize,
int cacheAge)
: m_backend (createBackend (backendParameters))
, m_fastBackend (fastBackendParameters.isNotEmpty () ? createBackend (fastBackendParameters)
: nullptr)
, m_cache ("NodeStore", cacheSize, cacheAge)
, m_negativeCache ("NoteStoreNegativeCache", 0, 120)
{
}
~NodeStoreImp ()
{
// m_hooks->onRetrieveBegin ()
// VFALCO TODO Why is this an autoptr? Why can't it just be a plain old object?
// VFALCO NOTE This shouldn't be necessary, the backend can
// just handle it in the destructor.
//
LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtHO_READ, "HOS::retrieve"));
m_backend->waitWrite ();
obj = retrieve (m_backend, hash);
if (m_fastBackend)
m_fastBackend->waitWrite ();
}
float getCacheHitRate ()
{
return m_cache.getHitRate ();
}
void tune (int size, int age)
{
m_cache.setTargetSize (size);
m_cache.setTargetAge (age);
}
void sweep ()
{
m_cache.sweep ();
m_negativeCache.sweep ();
}
int getWriteLoad ()
{
return m_backend->getWriteLoad ();
}
bool store (NodeObjectType type,
uint32 index,
Blob const& data,
uint256 const& hash)
{
bool wasStored = false;
bool const keyFoundAndObjectCached = m_cache.refreshIfPresent (hash);
// VFALCO NOTE What happens if the key is found, but the object
// fell out of the cache? We will end up passing it
// to the backend anyway.
//
if (! keyFoundAndObjectCached)
{
// VFALCO TODO Rename this to RIPPLE_NODESTORE_VERIFY_HASHES and make
// it be 1 or 0 instead of merely defined or undefined.
//
#ifdef PARANOID
assert (hash == Serializer::getSHA512Half (data));
#endif
NodeObject::pointer object = boost::make_shared <NodeObject> (type, index, data, hash);
// VFALCO NOTE What does it mean to canonicalize an object?
//
if (!m_cache.canonicalize (hash, object))
{
m_backend->store (object);
if (m_fastBackend)
m_fastBackend->store (object);
}
m_negativeCache.del (hash);
wasStored = true;
}
return wasStored;
}
//------------------------------------------------------------------------------
NodeObject::pointer retrieve (uint256 const& hash)
{
// See if the object already exists in the cache
//
NodeObject::pointer obj = m_cache.fetch (hash);
if (obj == nullptr)
{
m_negativeCache.add (hash);
// It's not in the cache, see if we can skip checking the db.
//
if (! m_negativeCache.isPresent (hash))
{
// There's still a chance it could be in one of the databases.
// VFALCO TODO Eliminate return from middle of function
bool foundInFastBackend = false;
return obj; // VFALCO NOTE This is nullptr, why return obj?
// Check the fast backend database if we have one
//
if (m_fastBackend != nullptr)
{
obj = retrieveInternal (m_fastBackend, hash);
// If we found the object, avoid storing it again later.
if (obj != nullptr)
foundInFastBackend = true;
}
// Are we still without an object?
//
if (obj == nullptr)
{
// Yes so at last we will try the main database.
//
{
// Monitor this operation's load since it is expensive.
// m_hooks->onRetrieveBegin ()
// VFALCO TODO Why is this an autoptr? Why can't it just be a plain old object?
//
LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtHO_READ, "HOS::retrieve"));
obj = retrieveInternal (m_backend, hash);
// m_hooks->onRetrieveEnd ()
}
// If it's not in the main database, remember that so we
// can skip the lookup for the same object again later.
//
if (obj == nullptr)
m_negativeCache.add (hash);
}
// Did we finally get something?
//
if (obj != nullptr)
{
// Yes it so canonicalize. This solves the problem where
// more than one thread has its own copy of the same object.
//
m_cache.canonicalize (hash, obj);
if (! foundInFastBackend)
{
// If we have a fast back end, store it there for later.
//
if (m_fastBackend != nullptr)
m_fastBackend->store (obj);
// Since this was a 'hard' fetch, we will log it.
//
WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: in db";
}
}
}
else
{
// hash is known not to be in the database
}
}
else
{
// found it!
}
return obj;
}
// VFALCO NOTE What does this do?
m_cache.canonicalize (hash, obj);
NodeObject::pointer retrieveInternal (Backend* backend, uint256 const& hash)
{
// VFALCO TODO Make this not allocate and free on each call
//
struct MyGetCallback : Backend::GetCallback
{
void* getStorageForValue (size_t sizeInBytes)
{
bytes = sizeInBytes;
data.malloc (sizeInBytes);
if (m_fastBackend)
m_fastBackend->store(obj);
return &data [0];
}
WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: in db";
size_t bytes;
HeapBlock <char> data;
};
return obj;
NodeObject::pointer object;
MyGetCallback cb;
Backend::Status const status = backend->get (hash.begin (), &cb);
if (status == Backend::ok)
{
// Deserialize the payload into its components.
//
DecodedBlob decoded (hash.begin (), cb.data.getData (), cb.bytes);
if (decoded.success)
{
object = decoded.createObject ();
}
else
{
// Houston, we've had a problem. Data is likely corrupt.
// VFALCO TODO Deal with encountering corrupt data!
WriteLog (lsFATAL, NodeObject) << "Corrupt NodeObject #" << hash;
}
}
return object;
}
//------------------------------------------------------------------------------
void importVisitor (
std::vector <NodeObject::pointer>& objects,
NodeObject::pointer object)
{
if (objects.size() >= bulkWriteBatchSize)
{
m_backend->bulkStore (objects);
objects.clear ();
objects.reserve (bulkWriteBatchSize);
}
objects.push_back (object);
}
int import (String sourceBackendParameters)
{
ScopedPointer <NodeStore::Backend> srcBackend (createBackend (sourceBackendParameters));
WriteLog (lsWARNING, NodeObject) <<
"Node import from '" << srcBackend->getDataBaseName() << "' to '"
<< m_backend->getDataBaseName() << "'.";
std::vector <NodeObject::pointer> objects;
objects.reserve (bulkWriteBatchSize);
srcBackend->visitAll (BIND_TYPE (&NodeStoreImp::importVisitor, this, boost::ref (objects), P_1));
if (!objects.empty ())
m_backend->bulkStore (objects);
return 0;
}
NodeStore::Backend* createBackend (String const& parameters)
{
Backend* backend = nullptr;
StringPairArray keyValues = parseKeyValueParameters (parameters, '|');
String const& type = keyValues ["type"];
if (type.isNotEmpty ())
{
BackendFactory* factory = nullptr;
for (int i = 0; i < s_factories.size (); ++i)
{
if (s_factories [i]->getName () == type)
{
factory = s_factories [i];
break;
}
}
if (factory != nullptr)
{
backend = factory->createInstance (keyBytes, keyValues);
}
else
{
throw std::runtime_error ("unkown backend type");
}
}
else
{
throw std::runtime_error ("missing backend type");
}
return backend;
}
static void addBackendFactory (BackendFactory& factory)
{
s_factories.add (&factory);
}
private:
static Array <NodeStore::BackendFactory*> s_factories;
RecycledObjectPool <EncodedBlob> m_blobPool;
// Persistent key/value storage.
ScopedPointer <Backend> m_backend;
// Larger key/value storage, but not necessarily persistent.
ScopedPointer <Backend> m_fastBackend;
// VFALCO NOTE What are these things for? We need comments.
TaggedCache <uint256, NodeObject, UptimeTimerAdapter> m_cache;
KeyCache <uint256, UptimeTimerAdapter> m_negativeCache;
};
Array <NodeStore::BackendFactory*> NodeStoreImp::s_factories;
//------------------------------------------------------------------------------
void NodeStore::addBackendFactory (BackendFactory& factory)
{
NodeStoreImp::addBackendFactory (factory);
}
NodeStore* NodeStore::New (String backendParameters,
String fastBackendParameters,
int cacheSize,
int cacheAge)
{
return new NodeStoreImp (backendParameters,
fastBackendParameters,
cacheSize,
cacheAge);
}
//------------------------------------------------------------------------------
NodeObject::pointer NodeStore::retrieve (Backend* backend, uint256 const& hash)
class NodeStoreTests : public UnitTest
{
return backend->retrieve (hash);
}
void NodeStore::importVisitor (
std::vector <NodeObject::pointer>& objects,
NodeObject::pointer object)
{
if (objects.size() >= bulkWriteBatchSize)
public:
enum
{
m_backend->bulkStore (objects);
maxPayloadBytes = 1000,
objects.clear ();
objects.reserve (bulkWriteBatchSize);
numObjects = 1000
};
NodeStoreTests () : UnitTest ("NodeStore")
{
}
objects.push_back (object);
}
int NodeStore::import (String sourceBackendParameters)
{
ScopedPointer <NodeStore::Backend> srcBackend (createBackend (sourceBackendParameters));
WriteLog (lsWARNING, NodeObject) <<
"Node import from '" << srcBackend->getDataBaseName() << "' to '"
<< m_backend->getDataBaseName() << "'.";
std::vector <NodeObject::pointer> objects;
objects.reserve (bulkWriteBatchSize);
srcBackend->visitAll (BIND_TYPE (&NodeStore::importVisitor, this, boost::ref (objects), P_1));
if (!objects.empty ())
m_backend->bulkStore (objects);
return 0;
}
NodeStore::Backend* NodeStore::createBackend (String const& parameters)
{
Backend* backend = nullptr;
StringPairArray keyValues = parseKeyValueParameters (parameters, '|');
String const& type = keyValues ["type"];
if (type.isNotEmpty ())
// Create a pseudo-random object
static NodeObject* createNodeObject (int index, int64 seedValue, HeapBlock <char>& payloadBuffer)
{
BackendFactory* factory = nullptr;
Random r (seedValue + index);
for (int i = 0; i < s_factories.size (); ++i)
NodeObjectType type;
switch (r.nextInt (4))
{
if (s_factories [i]->getName () == type)
{
factory = s_factories [i];
break;
}
}
case 0: type = hotLEDGER; break;
case 1: type = hotTRANSACTION; break;
case 2: type = hotACCOUNT_NODE; break;
case 3: type = hotTRANSACTION_NODE; break;
default:
type = hotUNKNOWN;
break;
};
if (factory != nullptr)
{
backend = factory->createInstance (keyValues);
}
else
{
throw std::runtime_error ("unkown backend type");
}
}
else
{
throw std::runtime_error ("missing backend type");
LedgerIndex ledgerIndex = 1 + r.nextInt (1024 * 1024);
uint256 hash;
r.nextBlob (hash.begin (), hash.size ());
int payloadBytes = 1 + r.nextInt (maxPayloadBytes);
r.nextBlob (payloadBuffer.getData (), payloadBytes);
return new NodeObject (type, ledgerIndex, payloadBuffer.getData (), payloadBytes, hash);
}
return backend;
}
void runTest ()
{
beginTest ("create");
int64 const seedValue = 50;
HeapBlock <char> payloadBuffer (maxPayloadBytes);
for (int i = 0; i < numObjects; ++i)
{
ScopedPointer <NodeObject> object (createNodeObject (i, seedValue, payloadBuffer));
}
}
};
static NodeStoreTests nodeStoreTests;