diff --git a/modules/ripple_app/node/ripple_NodeStore.cpp b/modules/ripple_app/node/ripple_NodeStore.cpp index 8e56630268..40a256a173 100644 --- a/modules/ripple_app/node/ripple_NodeStore.cpp +++ b/modules/ripple_app/node/ripple_NodeStore.cpp @@ -89,23 +89,6 @@ float NodeStore::getCacheHitRate () return mCache.getHitRate (); } -bool NodeStore::store (NodeObjectType type, uint32 index, Blob const& data, - uint256 const& hash) -{ - if (mLevelDB) - return storeLevelDB (type, index, data, hash); - - return storeSQLite (type, index, data, hash); -} - -NodeObject::pointer NodeStore::retrieve (uint256 const& hash) -{ - if (mLevelDB) - return retrieveLevelDB (hash); - - return retrieveSQLite (hash); -} - void NodeStore::tune (int size, int age) { mCache.setTargetSize (size); @@ -133,87 +116,10 @@ int NodeStore::getWriteLoad () return std::max (mWriteLoad, static_cast (mWriteSet.size ())); } -// low-level retrieve -NodeObject::pointer NodeStore::LLRetrieve (uint256 const& hash, leveldb::DB* db) -{ - std::string sData; - - leveldb::Status st = db->Get (leveldb::ReadOptions (), - leveldb::Slice (reinterpret_cast (hash.begin ()), hash.size ()), &sData); - - if (!st.ok ()) - { - assert (st.IsNotFound ()); - return NodeObject::pointer (); - } - - const unsigned char* bufPtr = reinterpret_cast (&sData[0]); - uint32 index = htonl (*reinterpret_cast (bufPtr)); - int htype = bufPtr[8]; - - return boost::make_shared (static_cast (htype), index, - bufPtr + 9, sData.size () - 9, hash); -} - -// low-level write single -void NodeStore::LLWrite (boost::shared_ptr ptr, leveldb::DB* db) -{ - NodeObject& obj = *ptr; - Blob rawData (9 + obj.getData ().size ()); - unsigned char* bufPtr = &rawData.front (); - - *reinterpret_cast (bufPtr + 0) = ntohl (obj.getIndex ()); - *reinterpret_cast (bufPtr + 4) = ntohl (obj.getIndex ()); - * (bufPtr + 8) = static_cast (obj.getType ()); - memcpy (bufPtr + 9, &obj.getData ().front (), obj.getData ().size ()); - - leveldb::Status st = db->Put (leveldb::WriteOptions (), - leveldb::Slice (reinterpret_cast (obj.getHash ().begin ()), obj.getHash ().size ()), - leveldb::Slice (reinterpret_cast (bufPtr), rawData.size ())); - - if (!st.ok ()) - { - WriteLog (lsFATAL, NodeObject) << "Failed to store hash node"; - assert (false); - } -} - -// low-level write set -void NodeStore::LLWrite (const std::vector< boost::shared_ptr >& set, leveldb::DB* db) -{ - leveldb::WriteBatch batch; - - BOOST_FOREACH (const boost::shared_ptr& it, set) - { - const NodeObject& obj = *it; - Blob rawData (9 + obj.getData ().size ()); - unsigned char* bufPtr = &rawData.front (); - - *reinterpret_cast (bufPtr + 0) = ntohl (obj.getIndex ()); - *reinterpret_cast (bufPtr + 4) = ntohl (obj.getIndex ()); - * (bufPtr + 8) = static_cast (obj.getType ()); - memcpy (bufPtr + 9, &obj.getData ().front (), obj.getData ().size ()); - - batch.Put (leveldb::Slice (reinterpret_cast (obj.getHash ().begin ()), obj.getHash ().size ()), - leveldb::Slice (reinterpret_cast (bufPtr), rawData.size ())); - } - - leveldb::Status st = db->Write (leveldb::WriteOptions (), &batch); - - if (!st.ok ()) - { - WriteLog (lsFATAL, NodeObject) << "Failed to store hash node"; - assert (false); - } -} - -bool NodeStore::storeLevelDB (NodeObjectType type, uint32 index, +bool NodeStore::store (NodeObjectType type, uint32 index, Blob const& data, uint256 const& hash) { // return: false = already in cache, true = added to cache - if (!getApp().getHashNodeLDB ()) - return true; - if (mCache.touch (hash)) return false; @@ -232,7 +138,7 @@ bool NodeStore::storeLevelDB (NodeObjectType type, uint32 index, { mWritePending = true; getApp().getJobQueue ().addJob (jtWRITE, "NodeObject::store", - BIND_TYPE (&NodeStore::bulkWriteLevelDB, this, P_1)); + BIND_TYPE (&NodeStore::bulkWrite, this, P_1)); } } @@ -240,7 +146,7 @@ bool NodeStore::storeLevelDB (NodeObjectType type, uint32 index, return true; } -void NodeStore::bulkWriteLevelDB (Job&) +void NodeStore::bulkWrite (Job&) { assert (mLevelDB); int setSize = 0; @@ -269,23 +175,23 @@ void NodeStore::bulkWriteLevelDB (Job&) setSize = set.size (); } - LLWrite (set, getApp().getHashNodeLDB ()); + m_backend->bulkStore (set); - if (mEphemeralDB) - LLWrite (set, getApp().getEphemeralLDB ()); + if (m_backendFast) + m_backendFast->bulkStore (set); } } -NodeObject::pointer NodeStore::retrieveLevelDB (uint256 const& hash) +NodeObject::pointer NodeStore::retrieve (uint256 const& hash) { NodeObject::pointer obj = mCache.fetch (hash); if (obj || mNegativeCache.isPresent (hash) || !getApp().getHashNodeLDB ()) return obj; - if (mEphemeralDB) + if (m_backendFast) { - obj = LLRetrieve (hash, getApp().getEphemeralLDB ()); + obj = m_backendFast->retrieve (hash); if (obj) { @@ -296,7 +202,7 @@ NodeObject::pointer NodeStore::retrieveLevelDB (uint256 const& hash) { LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtHO_READ, "HOS::retrieve")); - obj = LLRetrieve (hash, getApp().getHashNodeLDB ()); + obj = m_backend->retrieve(hash); if (!obj) { @@ -307,401 +213,9 @@ NodeObject::pointer NodeStore::retrieveLevelDB (uint256 const& hash) mCache.canonicalize (hash, obj); - if (mEphemeralDB) - LLWrite (obj, getApp().getEphemeralLDB ()); + if (m_backendFast) + m_backendFast->store(obj); WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: in db"; return obj; } - -bool NodeStore::storeSQLite (NodeObjectType type, uint32 index, - Blob const& data, uint256 const& hash) -{ - // return: false = already in cache, true = added to cache - if (!getApp().getHashNodeDB ()) - { - WriteLog (lsTRACE, NodeObject) << "HOS: no db"; - return true; - } - - if (mCache.touch (hash)) - { - WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " store: incache"; - return false; - } - - assert (hash == Serializer::getSHA512Half (data)); - - NodeObject::pointer object = boost::make_shared (type, index, data, hash); - - if (!mCache.canonicalize (hash, object)) - { - // WriteLog (lsTRACE, NodeObject) << "Queuing write for " << hash; - boost::mutex::scoped_lock sl (mWriteMutex); - mWriteSet.push_back (object); - - if (!mWritePending) - { - mWritePending = true; - getApp().getJobQueue ().addJob (jtWRITE, "NodeObject::store", - BIND_TYPE (&NodeStore::bulkWriteSQLite, this, P_1)); - } - } - - // else - // WriteLog (lsTRACE, NodeObject) << "HOS: already had " << hash; - mNegativeCache.del (hash); - - return true; -} - -void NodeStore::bulkWriteSQLite (Job&) -{ - assert (!mLevelDB); - int setSize = 0; - - while (1) - { - std::vector< boost::shared_ptr > set; - set.reserve (128); - - { - boost::mutex::scoped_lock sl (mWriteMutex); - mWriteSet.swap (set); - assert (mWriteSet.empty ()); - ++mWriteGeneration; - mWriteCondition.notify_all (); - - if (set.empty ()) - { - mWritePending = false; - mWriteLoad = 0; - return; - } - - mWriteLoad = std::max (setSize, static_cast (mWriteSet.size ())); - setSize = set.size (); - } - // WriteLog (lsTRACE, NodeObject) << "HOS: writing " << set.size(); - -#ifndef NO_SQLITE3_PREPARE - - if (mEphemeralDB) - LLWrite (set, getApp().getEphemeralLDB ()); - - { - Database* db = getApp().getHashNodeDB ()->getDB (); - - - // VFALCO TODO Get rid of the last parameter "aux", which is set to !theConfig.RUN_STANDALONE - // - static SqliteStatement pStB (db->getSqliteDB (), "BEGIN TRANSACTION;", !theConfig.RUN_STANDALONE); - static SqliteStatement pStE (db->getSqliteDB (), "END TRANSACTION;", !theConfig.RUN_STANDALONE); - static SqliteStatement pSt (db->getSqliteDB (), - "INSERT OR IGNORE INTO CommittedObjects " - "(Hash,ObjType,LedgerIndex,Object) VALUES (?, ?, ?, ?);", !theConfig.RUN_STANDALONE); - - pStB.step (); - pStB.reset (); - - BOOST_FOREACH (const boost::shared_ptr& it, set) - { - const char* type; - - switch (it->getType ()) - { - case hotLEDGER: - type = "L"; - break; - - case hotTRANSACTION: - type = "T"; - break; - - case hotACCOUNT_NODE: - type = "A"; - break; - - case hotTRANSACTION_NODE: - type = "N"; - break; - - default: - type = "U"; - } - - pSt.bind (1, it->getHash ().GetHex ()); - pSt.bind (2, type); - pSt.bind (3, it->getIndex ()); - pSt.bindStatic (4, it->getData ()); - int ret = pSt.step (); - - if (!pSt.isDone (ret)) - { - WriteLog (lsFATAL, NodeObject) << "Error saving hashed object " << ret; - assert (false); - } - - pSt.reset (); - } - - pStE.step (); - pStE.reset (); - } - -#else - - static boost::format - fAdd ("INSERT OR IGNORE INTO CommittedObjects " - "(Hash,ObjType,LedgerIndex,Object) VALUES ('%s','%c','%u',%s);"); - - Database* db = getApp().getHashNodeDB ()->getDB (); - { - ScopedLock sl (getApp().getHashNodeDB ()->getDBLock ()); - - db->executeSQL ("BEGIN TRANSACTION;"); - - BOOST_FOREACH (const boost::shared_ptr& it, set) - { - char type; - - switch (it->getType ()) - { - case hotLEDGER: - type = 'L'; - break; - - case hotTRANSACTION: - type = 'T'; - break; - - case hotACCOUNT_NODE: - type = 'A'; - break; - - case hotTRANSACTION_NODE: - type = 'N'; - break; - - default: - type = 'U'; - } - - db->executeSQL (boost::str (boost::format (fAdd) - % it->getHash ().GetHex () % type % it->getIndex () % sqlEscape (it->getData ()))); - } - - db->executeSQL ("END TRANSACTION;"); - } -#endif - - } -} - -NodeObject::pointer NodeStore::retrieveSQLite (uint256 const& hash) -{ - NodeObject::pointer obj = mCache.fetch (hash); - - if (obj) - return obj; - - if (mNegativeCache.isPresent (hash)) - return obj; - - if (mEphemeralDB) - { - obj = LLRetrieve (hash, getApp().getEphemeralLDB ()); - - if (obj) - { - mCache.canonicalize (hash, obj); - return obj; - } - } - - if (!getApp().getHashNodeDB ()) - return obj; - - Blob data; - std::string type; - uint32 index; - -#ifndef NO_SQLITE3_PREPARE - { - ScopedLock sl (getApp().getHashNodeDB ()->getDBLock ()); - static SqliteStatement pSt (getApp().getHashNodeDB ()->getDB ()->getSqliteDB (), - "SELECT ObjType,LedgerIndex,Object FROM CommittedObjects WHERE Hash = ?;"); - LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtDISK, "HOS::retrieve")); - - pSt.bind (1, hash.GetHex ()); - int ret = pSt.step (); - - if (pSt.isDone (ret)) - { - pSt.reset (); - mNegativeCache.add (hash); - WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: not in db"; - return obj; - } - - type = pSt.peekString (0); - index = pSt.getUInt32 (1); - pSt.getBlob (2).swap (data); - pSt.reset (); - } - -#else - - std::string sql = "SELECT * FROM CommittedObjects WHERE Hash='"; - sql.append (hash.GetHex ()); - sql.append ("';"); - - - { - ScopedLock sl (getApp().getHashNodeDB ()->getDBLock ()); - Database* db = getApp().getHashNodeDB ()->getDB (); - - if (!db->executeSQL (sql) || !db->startIterRows ()) - { - sl.unlock (); - mNegativeCache.add (hash); - return obj; - } - - db->getStr ("ObjType", type); - index = db->getBigInt ("LedgerIndex"); - - int size = db->getBinary ("Object", NULL, 0); - data.resize (size); - db->getBinary ("Object", & (data.front ()), size); - db->endIterRows (); - } -#endif - -#ifdef PARANOID - assert (Serializer::getSHA512Half (data) == hash); -#endif - - NodeObjectType htype = hotUNKNOWN; - - switch (type[0]) - { - case 'L': - htype = hotLEDGER; - break; - - case 'T': - htype = hotTRANSACTION; - break; - - case 'A': - htype = hotACCOUNT_NODE; - break; - - case 'N': - htype = hotTRANSACTION_NODE; - break; - - default: - assert (false); - WriteLog (lsERROR, NodeObject) << "Invalid hashed object"; - mNegativeCache.add (hash); - return obj; - } - - obj = boost::make_shared (htype, index, data, hash); - mCache.canonicalize (hash, obj); - - if (mEphemeralDB) - LLWrite (obj, getApp().getEphemeralLDB ()); - - WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: in db"; - return obj; -} - -int NodeStore::import (const std::string& file) -{ - WriteLog (lsWARNING, NodeObject) << "Hashed object import from \"" << file << "\"."; - UPTR_T importDB (new SqliteDatabase (file.c_str ())); - importDB->connect (); - - leveldb::DB* db = getApp().getHashNodeLDB (); - leveldb::WriteOptions wo; - - int count = 0; - - SQL_FOREACH (importDB, "SELECT * FROM CommittedObjects;") - { - uint256 hash; - std::string hashStr; - importDB->getStr ("Hash", hashStr); - hash.SetHexExact (hashStr); - - if (hash.isZero ()) - { - WriteLog (lsWARNING, NodeObject) << "zero hash found in import table"; - } - else - { - Blob rawData; - int size = importDB->getBinary ("Object", NULL, 0); - rawData.resize (9 + size); - unsigned char* bufPtr = &rawData.front (); - - importDB->getBinary ("Object", bufPtr + 9, size); - - uint32 index = importDB->getBigInt ("LedgerIndex"); - *reinterpret_cast (bufPtr + 0) = ntohl (index); - *reinterpret_cast (bufPtr + 4) = ntohl (index); - - std::string type; - importDB->getStr ("ObjType", type); - NodeObjectType htype = hotUNKNOWN; - - switch (type[0]) - { - case 'L': - htype = hotLEDGER; - break; - - case 'T': - htype = hotTRANSACTION; - break; - - case 'A': - htype = hotACCOUNT_NODE; - break; - - case 'N': - htype = hotTRANSACTION_NODE; - break; - - default: - assert (false); - WriteLog (lsERROR, NodeObject) << "Invalid hashed object"; - } - - * (bufPtr + 8) = static_cast (htype); - - leveldb::Status st = db->Put (wo, - leveldb::Slice (reinterpret_cast (hash.begin ()), hash.size ()), - leveldb::Slice (reinterpret_cast (bufPtr), rawData.size ())); - - if (!st.ok ()) - { - WriteLog (lsFATAL, NodeObject) << "Failed to store hash node"; - assert (false); - } - - ++count; - } - - if ((count % 10000) == 0) - { - WriteLog (lsINFO, NodeObject) << "Import in progress: " << count; - } - } - - WriteLog (lsWARNING, NodeObject) << "Imported " << count << " nodes"; - return count; -} diff --git a/modules/ripple_app/node/ripple_NodeStore.h b/modules/ripple_app/node/ripple_NodeStore.h index 232db52630..d9d9bc2632 100644 --- a/modules/ripple_app/node/ripple_NodeStore.h +++ b/modules/ripple_app/node/ripple_NodeStore.h @@ -86,17 +86,7 @@ public: NodeObject::pointer retrieve (uint256 const& hash); - bool storeSQLite (NodeObjectType type, uint32 index, Blob const& data, - uint256 const& hash); - NodeObject::pointer retrieveSQLite (uint256 const& hash); - void bulkWriteSQLite (Job&); - - bool storeLevelDB (NodeObjectType type, uint32 index, Blob const& data, - uint256 const& hash); - NodeObject::pointer retrieveLevelDB (uint256 const& hash); - void bulkWriteLevelDB (Job&); - - + void bulkWrite (Job&); void waitWrite (); void tune (int size, int age); void sweep (); @@ -114,6 +104,7 @@ private: private: ScopedPointer m_backend; + ScopedPointer m_backendFast; TaggedCache mCache; KeyCache mNegativeCache;