Refactor NodeStore

This commit is contained in:
Vinnie Falco
2013-07-20 07:17:47 -07:00
parent 1975d81153
commit db26c37156
37 changed files with 2075 additions and 1018 deletions

View File

@@ -4,24 +4,38 @@
*/
//==============================================================================
class LevelDBBackendFactory::Backend : public NodeStore::Backend
class LevelDBBackendFactory::Backend
: public NodeStore::Backend
, public NodeStore::BatchWriter::Callback
, LeakChecked <LevelDBBackendFactory::Backend>
{
public:
Backend (int keyBytes, StringPairArray const& keyValues)
typedef RecycledObjectPool <std::string> StringPool;
//--------------------------------------------------------------------------
Backend (int keyBytes,
StringPairArray const& keyValues,
NodeStore::Scheduler& scheduler)
: m_keyBytes (keyBytes)
, m_name(keyValues ["path"].toStdString ())
, m_db(NULL)
, m_scheduler (scheduler)
, m_batch (*this, scheduler)
, m_name (keyValues ["path"].toStdString ())
{
if (m_name.empty())
throw std::runtime_error ("Missing path in LevelDB backend");
Throw (std::runtime_error ("Missing path in LevelDB backend"));
leveldb::Options options;
options.create_if_missing = true;
if (keyValues["cache_mb"].isEmpty())
{
options.block_cache = leveldb::NewLRUCache (theConfig.getSize (siHashNodeDBCache) * 1024 * 1024);
}
else
{
options.block_cache = leveldb::NewLRUCache (keyValues["cache_mb"].getIntValue() * 1024L * 1024L);
}
if (keyValues["filter_bits"].isEmpty())
{
@@ -29,39 +43,38 @@ public:
options.filter_policy = leveldb::NewBloomFilterPolicy (10);
}
else if (keyValues["filter_bits"].getIntValue() != 0)
{
options.filter_policy = leveldb::NewBloomFilterPolicy (keyValues["filter_bits"].getIntValue());
}
if (!keyValues["open_files"].isEmpty())
if (! keyValues["open_files"].isEmpty())
{
options.max_open_files = keyValues["open_files"].getIntValue();
}
leveldb::Status status = leveldb::DB::Open (options, m_name, &m_db);
if (!status.ok () || !m_db)
throw (std::runtime_error (std::string("Unable to open/create leveldb: ") + status.ToString()));
leveldb::DB* db = nullptr;
leveldb::Status status = leveldb::DB::Open (options, m_name, &db);
if (!status.ok () || !db)
Throw (std::runtime_error (std::string("Unable to open/create leveldb: ") + status.ToString()));
m_db = db;
}
~Backend ()
{
delete m_db;
}
std::string getDataBaseName()
std::string getName()
{
return m_name;
}
//--------------------------------------------------------------------------
struct StdString
Status fetch (void const* key, NodeObject::Ptr* pObject)
{
std::string blob;
};
pObject->reset ();
typedef RecycledObjectPool <StdString> StdStringPool;
//--------------------------------------------------------------------------
Status get (void const* key, GetCallback* callback)
{
Status status (ok);
leveldb::ReadOptions const options;
@@ -71,22 +84,24 @@ public:
// These are reused std::string objects,
// required for leveldb's funky interface.
//
StdStringPool::ScopedItem item (m_stringPool);
std::string& blob = item.getObject ().blob;
StringPool::ScopedItem item (m_stringPool);
std::string& string = item.getObject ();
leveldb::Status getStatus = m_db->Get (options, slice, &blob);
leveldb::Status getStatus = m_db->Get (options, slice, &string);
if (getStatus.ok ())
{
void* const buffer = callback->getStorageForValue (blob.size ());
NodeStore::DecodedBlob decoded (key, string.data (), string.size ());
if (buffer != nullptr)
if (decoded.wasOk ())
{
memcpy (buffer, blob.data (), blob.size ());
*pObject = decoded.createObject ();
}
else
{
Throw (std::bad_alloc ());
// Decoding failed, probably corrupted!
//
status = dataCorrupt;
}
}
else
@@ -109,83 +124,90 @@ public:
return status;
}
//--------------------------------------------------------------------------
bool bulkStore (const std::vector< NodeObject::pointer >& objs)
void store (NodeObject::ref object)
{
leveldb::WriteBatch batch;
BOOST_FOREACH (NodeObject::ref obj, objs)
{
Blob blob (toBlob (obj));
batch.Put (
leveldb::Slice (reinterpret_cast<char const*>(obj->getHash ().begin ()), m_keyBytes),
leveldb::Slice (reinterpret_cast<char const*>(&blob.front ()), blob.size ()));
}
return m_db->Write (leveldb::WriteOptions (), &batch).ok ();
m_batch.store (object);
}
NodeObject::pointer retrieve (uint256 const& hash)
void storeBatch (NodeStore::Batch const& batch)
{
std::string sData;
if (!m_db->Get (leveldb::ReadOptions (),
leveldb::Slice (reinterpret_cast<char const*>(hash.begin ()), m_keyBytes), &sData).ok ())
leveldb::WriteBatch wb;
{
return NodeObject::pointer();
NodeStore::EncodedBlob::Pool::ScopedItem item (m_blobPool);
BOOST_FOREACH (NodeObject::ref object, batch)
{
item.getObject ().prepare (object);
wb.Put (
leveldb::Slice (reinterpret_cast <char const*> (item.getObject ().getKey ()),
m_keyBytes),
leveldb::Slice (reinterpret_cast <char const*> (item.getObject ().getData ()),
item.getObject ().getSize ()));
}
}
return fromBinary(hash, &sData[0], sData.size ());
leveldb::WriteOptions const options;
m_db->Write (options, &wb).ok ();
}
void visitAll (FUNCTION_TYPE<void (NodeObject::pointer)> func)
void visitAll (VisitCallback& callback)
{
leveldb::Iterator* it = m_db->NewIterator (leveldb::ReadOptions ());
leveldb::ReadOptions const options;
ScopedPointer <leveldb::Iterator> it (m_db->NewIterator (options));
for (it->SeekToFirst (); it->Valid (); it->Next ())
{
if (it->key ().size () == m_keyBytes)
{
uint256 hash;
memcpy(hash.begin(), it->key ().data(), m_keyBytes);
func (fromBinary (hash, it->value ().data (), it->value ().size ()));
NodeStore::DecodedBlob decoded (it->key ().data (),
it->value ().data (),
it->value ().size ());
if (decoded.wasOk ())
{
NodeObject::Ptr object (decoded.createObject ());
callback.visitObject (object);
}
else
{
// Uh oh, corrupted data!
WriteLog (lsFATAL, NodeObject) << "Corrupt NodeObject #" << uint256 (it->key ().data ());
}
}
else
{
// VFALCO NOTE What does it mean to find an
// incorrectly sized key? Corruption?
WriteLog (lsFATAL, NodeObject) << "Bad key size = " << it->key ().size ();
}
}
}
Blob toBlob(NodeObject::ref obj)
int getWriteLoad ()
{
Blob rawData (9 + obj->getData ().size ());
unsigned char* bufPtr = &rawData.front();
*reinterpret_cast<uint32*> (bufPtr + 0) = ntohl (obj->getIndex ());
*reinterpret_cast<uint32*> (bufPtr + 4) = ntohl (obj->getIndex ());
* (bufPtr + 8) = static_cast<unsigned char> (obj->getType ());
memcpy (bufPtr + 9, &obj->getData ().front (), obj->getData ().size ());
return rawData;
return m_batch.getWriteLoad ();
}
NodeObject::pointer fromBinary(uint256 const& hash,
char const* data, int size)
//--------------------------------------------------------------------------
void writeBatch (NodeStore::Batch const& batch)
{
if (size < 9)
throw std::runtime_error ("undersized object");
uint32 index = htonl (*reinterpret_cast<const uint32*> (data));
int htype = data[8];
return boost::make_shared<NodeObject> (static_cast<NodeObjectType> (htype), index,
data + 9, size - 9, hash);
storeBatch (batch);
}
private:
size_t const m_keyBytes;
StdStringPool m_stringPool;
NodeStore::Scheduler& m_scheduler;
NodeStore::BatchWriter m_batch;
StringPool m_stringPool;
NodeStore::EncodedBlob::Pool m_blobPool;
std::string m_name;
leveldb::DB* m_db;
ScopedPointer <leveldb::DB> m_db;
};
//------------------------------------------------------------------------------
@@ -210,9 +232,12 @@ String LevelDBBackendFactory::getName () const
return "LevelDB";
}
NodeStore::Backend* LevelDBBackendFactory::createInstance (size_t keyBytes, StringPairArray const& keyValues)
NodeStore::Backend* LevelDBBackendFactory::createInstance (
size_t keyBytes,
StringPairArray const& keyValues,
NodeStore::Scheduler& scheduler)
{
return new LevelDBBackendFactory::Backend (keyBytes, keyValues);
return new LevelDBBackendFactory::Backend (keyBytes, keyValues, scheduler);
}
//------------------------------------------------------------------------------