Add LevelDB support for hash nodes. We really need to cut over, the reduction in

I/O and CPU use is dramatic.
This commit is contained in:
JoelKatz
2013-04-25 17:32:12 -07:00
parent 45fc2f0667
commit 35ad8d4c6f
5 changed files with 130 additions and 6 deletions

View File

@@ -11,6 +11,7 @@ OSX = bool(platform.mac_ver()[0])
FreeBSD = bool('FreeBSD' == platform.system()) FreeBSD = bool('FreeBSD' == platform.system())
Linux = bool('Linux' == platform.system()) Linux = bool('Linux' == platform.system())
Ubuntu = bool(Linux and 'Ubuntu' == platform.linux_distribution()[0]) Ubuntu = bool(Linux and 'Ubuntu' == platform.linux_distribution()[0])
LevelDB = bool(0)
if OSX or Ubuntu: if OSX or Ubuntu:
CTAGS = '/usr/bin/ctags' CTAGS = '/usr/bin/ctags'
@@ -113,6 +114,11 @@ if OSX:
env.Append(LINKFLAGS = ['-L/usr/local/opt/openssl/lib']) env.Append(LINKFLAGS = ['-L/usr/local/opt/openssl/lib'])
env.Append(CXXFLAGS = ['-I/usr/local/opt/openssl/include']) env.Append(CXXFLAGS = ['-I/usr/local/opt/openssl/include'])
if LevelDB:
env.Append(CXXFLAGS = [ '-Ileveldb/include', '-DUSE_LEVELDB'])
env.Append(LINKFLAGS = [ '-Lleveldb/lib' ])
env.Append(LIBS = [ '-lleveldb'])
DB_SRCS = glob.glob('src/cpp/database/*.c') + glob.glob('src/cpp/database/*.cpp') DB_SRCS = glob.glob('src/cpp/database/*.c') + glob.glob('src/cpp/database/*.cpp')
JSON_SRCS = glob.glob('src/cpp/json/*.cpp') JSON_SRCS = glob.glob('src/cpp/json/*.cpp')

View File

@@ -54,7 +54,7 @@ Application::Application() :
mFeeVote(10, 50 * SYSTEM_CURRENCY_PARTS, 12.5 * SYSTEM_CURRENCY_PARTS), mFeeVote(10, 50 * SYSTEM_CURRENCY_PARTS, 12.5 * SYSTEM_CURRENCY_PARTS),
mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL),
mHashNodeDB(NULL), mNetNodeDB(NULL), mPathFindDB(NULL), mNetNodeDB(NULL), mPathFindDB(NULL), mHashNodeDB(NULL),
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL),
mSweepTimer(mAuxService), mShutdown(false) mSweepTimer(mAuxService), mShutdown(false)
{ {
@@ -74,7 +74,7 @@ void Application::stop()
StopSustain(); StopSustain();
mShutdown = true; mShutdown = true;
mIOService.stop(); mIOService.stop();
mHashedObjectStore.bulkWrite(); mHashedObjectStore.waitWrite();
mValidations.flush(); mValidations.flush();
mAuxService.stop(); mAuxService.stop();
mJobQueue.shutdown(); mJobQueue.shutdown();
@@ -150,10 +150,23 @@ void Application::setup()
t1.join(); t2.join(); t3.join(); t1.join(); t2.join(); t3.join();
boost::thread t4(boost::bind(&InitDB, &mWalletDB, "wallet.db", WalletDBInit, WalletDBCount)); boost::thread t4(boost::bind(&InitDB, &mWalletDB, "wallet.db", WalletDBInit, WalletDBCount));
boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount));
boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount)); boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount));
boost::thread t7(boost::bind(&InitDB, &mPathFindDB, "pathfind.db", PathFindDBInit, PathFindDBCount)); boost::thread t7(boost::bind(&InitDB, &mPathFindDB, "pathfind.db", PathFindDBInit, PathFindDBCount));
t4.join(); t5.join(); t6.join(); t7.join(); t4.join(); t6.join(); t7.join();
#ifdef USE_LEVELDB
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, (theConfig.DATA_DIR / "hashnode.ldb").string(), &mHashNodeDB);
if (!status.ok() || !mHashNodeDB)
{
cLog(lsFATAL) << "Unable to open/create hash node db";
exit(3);
}
#else
boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount));
t5.join();
#endif
mTxnDB->getDB()->setupCheckpointing(&mJobQueue); mTxnDB->getDB()->setupCheckpointing(&mJobQueue);
mLedgerDB->getDB()->setupCheckpointing(&mJobQueue); mLedgerDB->getDB()->setupCheckpointing(&mJobQueue);
@@ -205,8 +218,11 @@ void Application::setup()
mLedgerMaster.tune(theConfig.getSize(siLedgerSize), theConfig.getSize(siLedgerAge)); mLedgerMaster.tune(theConfig.getSize(siLedgerSize), theConfig.getSize(siLedgerAge));
mLedgerMaster.setMinValidations(theConfig.VALIDATION_QUORUM); mLedgerMaster.setMinValidations(theConfig.VALIDATION_QUORUM);
#ifndef USE_LEVELDB
theApp->getHashNodeDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") % theApp->getHashNodeDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") %
(theConfig.getSize(siHashNodeDBCache) * 1024))); (theConfig.getSize(siHashNodeDBCache) * 1024)));
#endif
theApp->getLedgerDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") % theApp->getLedgerDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") %
(theConfig.getSize(siTxnDBCache) * 1024))); (theConfig.getSize(siTxnDBCache) * 1024)));
theApp->getTxnDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") % theApp->getTxnDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") %

View File

@@ -1,6 +1,10 @@
#ifndef __APPLICATION__ #ifndef __APPLICATION__
#define __APPLICATION__ #define __APPLICATION__
#ifdef USE_LEVELDB
#include "leveldb/db.h"
#endif
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "../database/database.h" #include "../database/database.h"
@@ -74,7 +78,13 @@ class Application
OrderBookDB mOrderBookDB; OrderBookDB mOrderBookDB;
FeeVote mFeeVote; FeeVote mFeeVote;
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB, *mPathFindDB; DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB;
#ifdef USE_LEVELDB
leveldb::DB *mHashNodeDB;
#else
DatabaseCon *mHashNodeDB;
#endif
ConnectionPool mConnectionPool; ConnectionPool mConnectionPool;
PeerDoor* mPeerDoor; PeerDoor* mPeerDoor;
@@ -140,10 +150,15 @@ public:
DatabaseCon* getTxnDB() { return mTxnDB; } DatabaseCon* getTxnDB() { return mTxnDB; }
DatabaseCon* getLedgerDB() { return mLedgerDB; } DatabaseCon* getLedgerDB() { return mLedgerDB; }
DatabaseCon* getWalletDB() { return mWalletDB; } DatabaseCon* getWalletDB() { return mWalletDB; }
DatabaseCon* getHashNodeDB() { return mHashNodeDB; }
DatabaseCon* getNetNodeDB() { return mNetNodeDB; } DatabaseCon* getNetNodeDB() { return mNetNodeDB; }
DatabaseCon* getPathFindDB() { return mPathFindDB; } DatabaseCon* getPathFindDB() { return mPathFindDB; }
#ifdef USE_LEVELDB
leveldb::DB* getHashNodeDB() { return mHashNodeDB; }
#else
DatabaseCon* getHashNodeDB() { return mHashNodeDB; }
#endif
uint256 getNonce256() { return mNonce256; } uint256 getNonce256() { return mNonce256; }
std::size_t getNonceST() { return mNonceST; } std::size_t getNonceST() { return mNonceST; }

View File

@@ -1,6 +1,10 @@
#include "HashedObject.h" #include "HashedObject.h"
#ifdef USE_LEVELDB
#include "leveldb/db.h"
#endif
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <boost/foreach.hpp> #include <boost/foreach.hpp>
@@ -26,6 +30,83 @@ void HashedObjectStore::tune(int size, int age)
mCache.setTargetAge(age); mCache.setTargetAge(age);
} }
#ifdef USE_LEVELDB
bool HashedObjectStore::store(HashedObjectType type, uint32 index,
const std::vector<unsigned char>& data, const uint256& hash)
{ // return: false = already in cache, true = added to cache
if (!theApp->getHashNodeDB())
{
cLog(lsTRACE) << "HOS: no db";
return true;
}
if (mCache.touch(hash))
{
cLog(lsTRACE) << "HOS: " << hash << " store: incache";
return false;
}
assert(hash == Serializer::getSHA512Half(data));
HashedObject::pointer object = boost::make_shared<HashedObject>(type, index, data, hash);
if (!mCache.canonicalize(hash, object))
{
Serializer s(1 + (32 / 8) + data.size());
s.add8(static_cast<unsigned char>(type));
s.add32(index);
s.addRaw(data);
leveldb::Status st = theApp->getHashNodeDB()->Put(leveldb::WriteOptions(), hash.GetHex(),
leveldb::Slice(reinterpret_cast<const char *>(s.getDataPtr()), s.getLength()));
if (!st.ok())
{
cLog(lsFATAL) << "Failed to store hash node";
assert(false);
}
}
mNegativeCache.del(hash);
return true;
}
void HashedObjectStore::waitWrite()
{
}
HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
{
HashedObject::pointer obj = mCache.fetch(hash);
if (obj)
return obj;
if (mNegativeCache.isPresent(hash))
return obj;
if (!theApp || !theApp->getHashNodeDB())
return obj;
std::string sData;
leveldb::Status st = theApp->getHashNodeDB()->Get(leveldb::ReadOptions(), hash.GetHex(), &sData);
if (!st.ok())
{
mNegativeCache.add(hash);
return obj;
}
Serializer s(sData);
int htype;
uint32 index;
std::vector<unsigned char> data;
s.get8(htype, 0);
s.get32(index, 1);
s.getRaw(data, 5, s.getLength() - 5);
obj = boost::make_shared<HashedObject>(static_cast<HashedObjectType>(htype), index, data, hash);
mCache.canonicalize(hash, obj);
cLog(lsTRACE) << "HOS: " << hash << " fetch: in db";
return obj;
}
#else
bool HashedObjectStore::store(HashedObjectType type, uint32 index, bool HashedObjectStore::store(HashedObjectType type, uint32 index,
const std::vector<unsigned char>& data, const uint256& hash) const std::vector<unsigned char>& data, const uint256& hash)
@@ -330,4 +411,6 @@ int HashedObjectStore::import(const std::string& file)
return countYes; return countYes;
} }
#endif
// vim:ts=4 // vim:ts=4

View File

@@ -2111,9 +2111,13 @@ Json::Value RPCHandler::doGetCounts(Json::Value jvRequest, int& cost, ScopedLock
dbKB = theApp->getLedgerDB()->getDB()->getKBUsedDB(); dbKB = theApp->getLedgerDB()->getDB()->getKBUsedDB();
if (dbKB > 0) if (dbKB > 0)
ret["dbKBLedger"] = dbKB; ret["dbKBLedger"] = dbKB;
#ifndef USE_LEVELDB
dbKB = theApp->getHashNodeDB()->getDB()->getKBUsedDB(); dbKB = theApp->getHashNodeDB()->getDB()->getKBUsedDB();
if (dbKB > 0) if (dbKB > 0)
ret["dbKBHashNode"] = dbKB; ret["dbKBHashNode"] = dbKB;
#endif
dbKB = theApp->getTxnDB()->getDB()->getKBUsedDB(); dbKB = theApp->getTxnDB()->getDB()->getKBUsedDB();
if (dbKB > 0) if (dbKB > 0)
ret["dbKBTransaction"] = dbKB; ret["dbKBTransaction"] = dbKB;