From 35ad8d4c6fd942ec57109965d44946566b4c7b83 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Thu, 25 Apr 2013 17:32:12 -0700 Subject: [PATCH] Add LevelDB support for hash nodes. We really need to cut over, the reduction in I/O and CPU use is dramatic. --- SConstruct | 6 +++ src/cpp/ripple/Application.cpp | 24 ++++++++-- src/cpp/ripple/Application.h | 19 +++++++- src/cpp/ripple/HashedObject.cpp | 83 +++++++++++++++++++++++++++++++++ src/cpp/ripple/RPCHandler.cpp | 4 ++ 5 files changed, 130 insertions(+), 6 deletions(-) diff --git a/SConstruct b/SConstruct index 8362a9383b..d9a28b2cd6 100644 --- a/SConstruct +++ b/SConstruct @@ -11,6 +11,7 @@ OSX = bool(platform.mac_ver()[0]) FreeBSD = bool('FreeBSD' == platform.system()) Linux = bool('Linux' == platform.system()) Ubuntu = bool(Linux and 'Ubuntu' == platform.linux_distribution()[0]) +LevelDB = bool(0) if OSX or Ubuntu: CTAGS = '/usr/bin/ctags' @@ -113,6 +114,11 @@ if OSX: env.Append(LINKFLAGS = ['-L/usr/local/opt/openssl/lib']) env.Append(CXXFLAGS = ['-I/usr/local/opt/openssl/include']) +if LevelDB: + env.Append(CXXFLAGS = [ '-Ileveldb/include', '-DUSE_LEVELDB']) + env.Append(LINKFLAGS = [ '-Lleveldb/lib' ]) + env.Append(LIBS = [ '-lleveldb']) + DB_SRCS = glob.glob('src/cpp/database/*.c') + glob.glob('src/cpp/database/*.cpp') JSON_SRCS = glob.glob('src/cpp/json/*.cpp') diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index fc8bb4ec52..7646136d27 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -54,7 +54,7 @@ Application::Application() : mFeeVote(10, 50 * SYSTEM_CURRENCY_PARTS, 12.5 * SYSTEM_CURRENCY_PARTS), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), - mHashNodeDB(NULL), mNetNodeDB(NULL), mPathFindDB(NULL), + mNetNodeDB(NULL), mPathFindDB(NULL), mHashNodeDB(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL), mSweepTimer(mAuxService), mShutdown(false) { @@ -74,7 +74,7 @@ void Application::stop() StopSustain(); mShutdown = true; mIOService.stop(); - mHashedObjectStore.bulkWrite(); + mHashedObjectStore.waitWrite(); mValidations.flush(); mAuxService.stop(); mJobQueue.shutdown(); @@ -150,10 +150,23 @@ void Application::setup() t1.join(); t2.join(); t3.join(); boost::thread t4(boost::bind(&InitDB, &mWalletDB, "wallet.db", WalletDBInit, WalletDBCount)); - boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount)); boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount)); boost::thread t7(boost::bind(&InitDB, &mPathFindDB, "pathfind.db", PathFindDBInit, PathFindDBCount)); - t4.join(); t5.join(); t6.join(); t7.join(); + t4.join(); t6.join(); t7.join(); + +#ifdef USE_LEVELDB + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, (theConfig.DATA_DIR / "hashnode.ldb").string(), &mHashNodeDB); + if (!status.ok() || !mHashNodeDB) + { + cLog(lsFATAL) << "Unable to open/create hash node db"; + exit(3); + } +#else + boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount)); + t5.join(); +#endif mTxnDB->getDB()->setupCheckpointing(&mJobQueue); mLedgerDB->getDB()->setupCheckpointing(&mJobQueue); @@ -205,8 +218,11 @@ void Application::setup() mLedgerMaster.tune(theConfig.getSize(siLedgerSize), theConfig.getSize(siLedgerAge)); mLedgerMaster.setMinValidations(theConfig.VALIDATION_QUORUM); +#ifndef USE_LEVELDB theApp->getHashNodeDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") % (theConfig.getSize(siHashNodeDBCache) * 1024))); +#endif + theApp->getLedgerDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") % (theConfig.getSize(siTxnDBCache) * 1024))); theApp->getTxnDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") % diff --git a/src/cpp/ripple/Application.h b/src/cpp/ripple/Application.h index 356cce827f..c6cc810e5c 100644 --- a/src/cpp/ripple/Application.h +++ b/src/cpp/ripple/Application.h @@ -1,6 +1,10 @@ #ifndef __APPLICATION__ #define __APPLICATION__ +#ifdef USE_LEVELDB +#include "leveldb/db.h" +#endif + #include #include "../database/database.h" @@ -74,7 +78,13 @@ class Application OrderBookDB mOrderBookDB; FeeVote mFeeVote; - DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB, *mPathFindDB; + DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB; + +#ifdef USE_LEVELDB + leveldb::DB *mHashNodeDB; +#else + DatabaseCon *mHashNodeDB; +#endif ConnectionPool mConnectionPool; PeerDoor* mPeerDoor; @@ -140,10 +150,15 @@ public: DatabaseCon* getTxnDB() { return mTxnDB; } DatabaseCon* getLedgerDB() { return mLedgerDB; } DatabaseCon* getWalletDB() { return mWalletDB; } - DatabaseCon* getHashNodeDB() { return mHashNodeDB; } DatabaseCon* getNetNodeDB() { return mNetNodeDB; } DatabaseCon* getPathFindDB() { return mPathFindDB; } +#ifdef USE_LEVELDB + leveldb::DB* getHashNodeDB() { return mHashNodeDB; } +#else + DatabaseCon* getHashNodeDB() { return mHashNodeDB; } +#endif + uint256 getNonce256() { return mNonce256; } std::size_t getNonceST() { return mNonceST; } diff --git a/src/cpp/ripple/HashedObject.cpp b/src/cpp/ripple/HashedObject.cpp index 8fa64cf331..a459ff8acf 100644 --- a/src/cpp/ripple/HashedObject.cpp +++ b/src/cpp/ripple/HashedObject.cpp @@ -1,6 +1,10 @@ #include "HashedObject.h" +#ifdef USE_LEVELDB +#include "leveldb/db.h" +#endif + #include #include @@ -26,6 +30,83 @@ void HashedObjectStore::tune(int size, int age) mCache.setTargetAge(age); } +#ifdef USE_LEVELDB + +bool HashedObjectStore::store(HashedObjectType type, uint32 index, + const std::vector& data, const uint256& hash) +{ // return: false = already in cache, true = added to cache + if (!theApp->getHashNodeDB()) + { + cLog(lsTRACE) << "HOS: no db"; + return true; + } + if (mCache.touch(hash)) + { + cLog(lsTRACE) << "HOS: " << hash << " store: incache"; + return false; + } + assert(hash == Serializer::getSHA512Half(data)); + + HashedObject::pointer object = boost::make_shared(type, index, data, hash); + if (!mCache.canonicalize(hash, object)) + { + Serializer s(1 + (32 / 8) + data.size()); + s.add8(static_cast(type)); + s.add32(index); + s.addRaw(data); + leveldb::Status st = theApp->getHashNodeDB()->Put(leveldb::WriteOptions(), hash.GetHex(), + leveldb::Slice(reinterpret_cast(s.getDataPtr()), s.getLength())); + if (!st.ok()) + { + cLog(lsFATAL) << "Failed to store hash node"; + assert(false); + } + } + mNegativeCache.del(hash); + return true; +} + +void HashedObjectStore::waitWrite() +{ +} + +HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash) +{ + HashedObject::pointer obj = mCache.fetch(hash); + if (obj) + return obj; + + if (mNegativeCache.isPresent(hash)) + return obj; + + if (!theApp || !theApp->getHashNodeDB()) + return obj; + + std::string sData; + leveldb::Status st = theApp->getHashNodeDB()->Get(leveldb::ReadOptions(), hash.GetHex(), &sData); + if (!st.ok()) + { + mNegativeCache.add(hash); + return obj; + } + + Serializer s(sData); + + int htype; + uint32 index; + std::vector data; + s.get8(htype, 0); + s.get32(index, 1); + s.getRaw(data, 5, s.getLength() - 5); + + obj = boost::make_shared(static_cast(htype), index, data, hash); + mCache.canonicalize(hash, obj); + + cLog(lsTRACE) << "HOS: " << hash << " fetch: in db"; + return obj; +} + +#else bool HashedObjectStore::store(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash) @@ -330,4 +411,6 @@ int HashedObjectStore::import(const std::string& file) return countYes; } +#endif + // vim:ts=4 diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index 71be580b89..87a1a83d60 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -2111,9 +2111,13 @@ Json::Value RPCHandler::doGetCounts(Json::Value jvRequest, int& cost, ScopedLock dbKB = theApp->getLedgerDB()->getDB()->getKBUsedDB(); if (dbKB > 0) ret["dbKBLedger"] = dbKB; + +#ifndef USE_LEVELDB dbKB = theApp->getHashNodeDB()->getDB()->getKBUsedDB(); if (dbKB > 0) ret["dbKBHashNode"] = dbKB; +#endif + dbKB = theApp->getTxnDB()->getDB()->getKBUsedDB(); if (dbKB > 0) ret["dbKBTransaction"] = dbKB;