Slight refactor of the boundary between the node db and its backend(s).

This commit is contained in:
JoelKatz
2013-07-13 15:15:09 -07:00
parent 14374bdb3a
commit 2ce293ec85
5 changed files with 87 additions and 115 deletions

View File

@@ -33,14 +33,6 @@ public:
return mName;
}
bool store (NodeObject::ref obj)
{
Blob blob (toBlob (obj));
return mDB->Put (leveldb::WriteOptions (),
leveldb::Slice (reinterpret_cast<char const*>(obj->getHash ().begin ()), 256 / 8),
leveldb::Slice (reinterpret_cast<char const*>(&blob.front ()), blob.size ())).ok ();
}
bool bulkStore (const std::vector< NodeObject::pointer >& objs)
{
leveldb::WriteBatch batch;

View File

@@ -61,35 +61,6 @@ public:
return m_name;
}
bool store (NodeObject::ref obj)
{
MDB_txn *txn = nullptr;
int rc = 0;
rc = mdb_txn_begin(m_env, NULL, 0, &txn);
if (rc == 0)
{
MDB_val key, data;
Blob blob (toBlob (obj));
key.mv_size = (256 / 8);
key.mv_data = const_cast<unsigned char *>(obj->getHash().begin());
data.mv_size = blob.size();
data.mv_data = &blob.front();
rc = mdb_put(txn, m_dbi, &key, &data, 0);
}
if (rc == 0)
rc = mdb_txn_commit(txn);
else if (txn)
mdb_txn_abort(txn);
return rc == 0;
}
bool bulkStore (std::vector <NodeObject::pointer> const& objs)
{
MDB_txn *txn = nullptr;

View File

@@ -10,14 +10,9 @@ NodeStore::NodeStore (String backendParameters, String fastBackendParameters, in
: m_backend (createBackend (backendParameters))
, mCache ("NodeStore", cacheSize, cacheAge)
, mNegativeCache ("HashedObjectNegativeCache", 0, 120)
, mWriteGeneration (0)
, mWriteLoad (0)
, mWritePending (false)
{
if (fastBackendParameters.isNotEmpty ())
m_fastBackend = createBackend (fastBackendParameters);
mWriteSet.reserve (128);
}
void NodeStore::addBackendFactory (BackendFactory& factory)
@@ -44,17 +39,14 @@ void NodeStore::sweep ()
void NodeStore::waitWrite ()
{
boost::mutex::scoped_lock sl (mWriteMutex);
int gen = mWriteGeneration;
while (mWritePending && (mWriteGeneration == gen))
mWriteCondition.wait (sl);
m_backend->waitWrite ();
if (m_fastBackend)
m_fastBackend->waitWrite ();
}
int NodeStore::getWriteLoad ()
{
boost::mutex::scoped_lock sl (mWriteMutex);
return std::max (mWriteLoad, static_cast<int> (mWriteSet.size ()));
return m_backend->getWriteLoad ();
}
bool NodeStore::store (NodeObjectType type, uint32 index,
@@ -72,56 +64,15 @@ bool NodeStore::store (NodeObjectType type, uint32 index,
if (!mCache.canonicalize (hash, object))
{
boost::mutex::scoped_lock sl (mWriteMutex);
mWriteSet.push_back (object);
if (!mWritePending)
{
mWritePending = true;
getApp().getJobQueue ().addJob (jtWRITE, "NodeObject::store",
BIND_TYPE (&NodeStore::bulkWrite, this, P_1));
}
m_backend->store (object);
if (m_fastBackend)
m_fastBackend->store (object);
}
mNegativeCache.del (hash);
return true;
}
void NodeStore::bulkWrite (Job&)
{
int setSize = 0;
while (1)
{
std::vector< boost::shared_ptr<NodeObject> > set;
set.reserve (128);
{
boost::mutex::scoped_lock sl (mWriteMutex);
mWriteSet.swap (set);
assert (mWriteSet.empty ());
++mWriteGeneration;
mWriteCondition.notify_all ();
if (set.empty ())
{
mWritePending = false;
mWriteLoad = 0;
return;
}
mWriteLoad = std::max (setSize, static_cast<int> (mWriteSet.size ()));
setSize = set.size ();
}
m_backend->bulkStore (set);
if (m_fastBackend)
m_fastBackend->bulkStore (set);
}
}
NodeObject::pointer NodeStore::retrieve (uint256 const& hash)
{
NodeObject::pointer obj = mCache.fetch (hash);
@@ -232,3 +183,65 @@ NodeStore::Backend* NodeStore::createBackend (String const& parameters)
return backend;
}
bool NodeStore::Backend::store (NodeObject::ref object)
{
boost::mutex::scoped_lock sl (mWriteMutex);
mWriteSet.push_back (object);
if (!mWritePending)
{
mWritePending = true;
getApp().getJobQueue ().addJob (jtWRITE, "NodeObject::store",
BIND_TYPE (&NodeStore::Backend::bulkWrite, this, P_1));
}
return true;
}
void NodeStore::Backend::bulkWrite (Job &)
{
int setSize = 0;
while (1)
{
std::vector< boost::shared_ptr<NodeObject> > set;
set.reserve (128);
{
boost::mutex::scoped_lock sl (mWriteMutex);
mWriteSet.swap (set);
assert (mWriteSet.empty ());
++mWriteGeneration;
mWriteCondition.notify_all ();
if (set.empty ())
{
mWritePending = false;
mWriteLoad = 0;
return;
}
mWriteLoad = std::max (setSize, static_cast<int> (mWriteSet.size ()));
setSize = set.size ();
}
bulkStore (set);
}
}
void NodeStore::Backend::waitWrite ()
{
boost::mutex::scoped_lock sl (mWriteMutex);
int gen = mWriteGeneration;
while (mWritePending && (mWriteGeneration == gen))
mWriteCondition.wait (sl);
}
int NodeStore::Backend::getWriteLoad ()
{
boost::mutex::scoped_lock sl (mWriteMutex);
return std::max (mWriteLoad, static_cast<int> (mWriteSet.size ()));
}

View File

@@ -23,13 +23,18 @@ public:
//
typedef boost::shared_ptr <Backend> pointer;
Backend () : mWriteGeneration(0), mWriteLoad(0), mWritePending(false)
{
mWriteSet.reserve(128);
}
virtual ~Backend () { }
virtual std::string getDataBaseName() = 0;
// Store/retrieve a single object
// These functions must be thread safe
virtual bool store (NodeObject::ref) = 0;
virtual bool store (NodeObject::ref);
virtual NodeObject::pointer retrieve (uint256 const &hash) = 0;
// Store a group of objects
@@ -39,6 +44,18 @@ public:
// Visit every object in the database
// This function will only be called during an import operation
virtual void visitAll (FUNCTION_TYPE <void (NodeObject::pointer)>) = 0;
virtual void bulkWrite (Job &);
virtual void waitWrite ();
virtual int getWriteLoad ();
protected:
boost::mutex mWriteMutex;
boost::condition_variable mWriteCondition;
int mWriteGeneration;
int mWriteLoad;
bool mWritePending;
std::vector <boost::shared_ptr<NodeObject> > mWriteSet;
};
public:
@@ -90,7 +107,6 @@ public:
NodeObject::pointer retrieve (uint256 const& hash);
void bulkWrite (Job&);
void waitWrite ();
void tune (int size, int age);
void sweep ();
@@ -111,14 +127,6 @@ private:
TaggedCache<uint256, NodeObject, UptimeTimerAdapter> mCache;
KeyCache <uint256, UptimeTimerAdapter> mNegativeCache;
boost::mutex mWriteMutex;
boost::condition_variable mWriteCondition;
int mWriteGeneration;
int mWriteLoad;
std::vector< boost::shared_ptr<NodeObject> > mWriteSet;
bool mWritePending;
};
#endif

View File

@@ -24,18 +24,6 @@ public:
return mName;
}
bool store(NodeObject::ref object)
{
ScopedLock sl(mDb->getDBLock());
static SqliteStatement pSt(mDb->getDB()->getSqliteDB(),
"INSERT OR IGNORE INTO CommittedObjects "
"(Hash,ObjType,LedgerIndex,Object) VALUES (?, ?, ?, ?);");
bind(pSt, object);
pSt.step();
pSt.reset();
return true;
}
bool bulkStore(const std::vector< NodeObject::pointer >& objects)
{
ScopedLock sl(mDb->getDBLock());