Implement the ephemeral cache.

This commit is contained in:
JoelKatz
2013-06-05 10:25:03 -07:00
parent cc0b9ad01c
commit b7920f40b7
6 changed files with 123 additions and 39 deletions

View File

@@ -54,7 +54,7 @@ Application::Application() :
mFeatureTable(2 * 7 * 24 * 60 * 60, 200), // two weeks, 200/256 mFeatureTable(2 * 7 * 24 * 60 * 60, 200), // two weeks, 200/256
mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL),
mNetNodeDB(NULL), mPathFindDB(NULL), mHashNodeDB(NULL), mHashNodeLDB(NULL), mNetNodeDB(NULL), mPathFindDB(NULL), mHashNodeDB(NULL), mHashNodeLDB(NULL), mEphemeralLDB(NULL),
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL),
mSweepTimer(mAuxService), mShutdown(false) mSweepTimer(mAuxService), mShutdown(false)
{ {
@@ -82,6 +82,9 @@ void Application::stop()
delete mHashNodeLDB; delete mHashNodeLDB;
mHashNodeLDB = NULL; mHashNodeLDB = NULL;
delete mEphemeralLDB;
mEphemeralLDB = NULL;
WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped(); WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped();
Instance::shutdown(); Instance::shutdown();
} }
@@ -176,6 +179,18 @@ void Application::setup()
StopSustain(); StopSustain();
exit(3); exit(3);
} }
if (!theConfig.LDB_EPHEMERAL.empty())
{
leveldb::Status status = leveldb::DB::Open(options, theConfig.LDB_EPHEMERAL, &mEphemeralLDB);
if (!status.ok() || !mEphemeralLDB)
{
WriteLog(lsFATAL, Application) << "Unable to open/create epehemeral db: "
<< theConfig.LDB_EPHEMERAL << " " << status.ToString();
StopSustain();
exit(3);
}
}
} }
else else
{ {
@@ -404,6 +419,7 @@ Application::~Application()
delete mNetNodeDB; delete mNetNodeDB;
delete mPathFindDB; delete mPathFindDB;
delete mHashNodeLDB; delete mHashNodeLDB;
delete mEphemeralLDB;
} }
void Application::startNewLedger() void Application::startNewLedger()

View File

@@ -82,6 +82,7 @@ class Application
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB, *mHashNodeDB; DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB, *mHashNodeDB;
leveldb::DB *mHashNodeLDB; leveldb::DB *mHashNodeLDB;
leveldb::DB *mEphemeralLDB;
ConnectionPool mConnectionPool; ConnectionPool mConnectionPool;
PeerDoor* mPeerDoor; PeerDoor* mPeerDoor;
@@ -153,6 +154,7 @@ public:
DatabaseCon* getHashNodeDB() { return mHashNodeDB; } DatabaseCon* getHashNodeDB() { return mHashNodeDB; }
leveldb::DB* getHashNodeLDB() { return mHashNodeLDB; } leveldb::DB* getHashNodeLDB() { return mHashNodeLDB; }
leveldb::DB* getEphemeralLDB() { return mEphemeralLDB; }
uint256 getNonce256() { return mNonce256; } uint256 getNonce256() { return mNonce256; }
std::size_t getNonceST() { return mNonceST; } std::size_t getNonceST() { return mNonceST; }

View File

@@ -24,6 +24,7 @@
#define SECTION_FEE_ACCOUNT_RESERVE "fee_account_reserve" #define SECTION_FEE_ACCOUNT_RESERVE "fee_account_reserve"
#define SECTION_FEE_OWNER_RESERVE "fee_owner_reserve" #define SECTION_FEE_OWNER_RESERVE "fee_owner_reserve"
#define SECTION_NODE_DB "node_db" #define SECTION_NODE_DB "node_db"
#define SECTION_LDB_EPHEMERAL "ephemeral_db"
#define SECTION_LEDGER_HISTORY "ledger_history" #define SECTION_LEDGER_HISTORY "ledger_history"
#define SECTION_IPS "ips" #define SECTION_IPS "ips"
#define SECTION_NETWORK_QUORUM "network_quorum" #define SECTION_NETWORK_QUORUM "network_quorum"
@@ -357,6 +358,7 @@ void Config::load()
(void) sectionSingleB(secConfig, SECTION_RPC_PASSWORD, RPC_PASSWORD); (void) sectionSingleB(secConfig, SECTION_RPC_PASSWORD, RPC_PASSWORD);
(void) sectionSingleB(secConfig, SECTION_RPC_USER, RPC_USER); (void) sectionSingleB(secConfig, SECTION_RPC_USER, RPC_USER);
(void) sectionSingleB(secConfig, SECTION_NODE_DB, NODE_DB); (void) sectionSingleB(secConfig, SECTION_NODE_DB, NODE_DB);
(void) sectionSingleB(secConfig, SECTION_LDB_EPHEMERAL, LDB_EPHEMERAL);
if (sectionSingleB(secConfig, SECTION_RPC_PORT, strTemp)) if (sectionSingleB(secConfig, SECTION_RPC_PORT, strTemp))
RPC_PORT = boost::lexical_cast<int>(strTemp); RPC_PORT = boost::lexical_cast<int>(strTemp);

View File

@@ -85,6 +85,7 @@ public:
boost::filesystem::path DEBUG_LOGFILE; boost::filesystem::path DEBUG_LOGFILE;
boost::filesystem::path VALIDATORS_FILE; // As specifed in rippled.cfg. boost::filesystem::path VALIDATORS_FILE; // As specifed in rippled.cfg.
std::string NODE_DB; // Database to use for nodes std::string NODE_DB; // Database to use for nodes
std::string LDB_EPHEMERAL; // Database for temporary storage
bool LDB_IMPORT; // Import into LevelDB bool LDB_IMPORT; // Import into LevelDB
bool ELB_SUPPORT; // Support Amazon ELB bool ELB_SUPPORT; // Support Amazon ELB

View File

@@ -16,7 +16,7 @@ DECLARE_INSTANCE(HashedObject);
HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) : HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) :
mCache("HashedObjectStore", cacheSize, cacheAge), mNegativeCache("HashedObjectNegativeCache", 0, 120), mCache("HashedObjectStore", cacheSize, cacheAge), mNegativeCache("HashedObjectNegativeCache", 0, 120),
mWriteGeneration(0), mWriteLoad(0), mWritePending(false), mLevelDB(false) mWriteGeneration(0), mWriteLoad(0), mWritePending(false), mLevelDB(false), mEphemeralDB(false)
{ {
mWriteSet.reserve(128); mWriteSet.reserve(128);
@@ -29,6 +29,8 @@ HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) :
WriteLog (lsFATAL, HashedObject) << "Incorrect database selection"; WriteLog (lsFATAL, HashedObject) << "Incorrect database selection";
assert(false); assert(false);
} }
if (!theConfig.LDB_EPHEMERAL.empty())
mEphemeralDB = true;
} }
void HashedObjectStore::tune(int size, int age) void HashedObjectStore::tune(int size, int age)
@@ -51,6 +53,74 @@ int HashedObjectStore::getWriteLoad()
return std::max(mWriteLoad, static_cast<int>(mWriteSet.size())); return std::max(mWriteLoad, static_cast<int>(mWriteSet.size()));
} }
static HashedObject::pointer LLRetrieve(const uint256& hash, leveldb::DB* db)
{ // low-level retrieve
std::string sData;
leveldb::Status st = db->Get(leveldb::ReadOptions(),
leveldb::Slice(reinterpret_cast<const char *>(hash.begin()), hash.size()), &sData);
if (!st.ok())
{
assert(st.IsNotFound());
return HashedObject::pointer();
}
const unsigned char* bufPtr = reinterpret_cast<const unsigned char*>(&sData[0]);
uint32 index = htonl(*reinterpret_cast<const uint32*>(bufPtr));
int htype = bufPtr[8];
return boost::make_shared<HashedObject>(static_cast<HashedObjectType>(htype), index,
bufPtr + 9, sData.size() - 9, hash);
}
static void LLWrite(boost::shared_ptr<HashedObject> ptr, leveldb::DB* db)
{ // low-level write single
HashedObject& obj = *ptr;
std::vector<unsigned char> rawData(9 + obj.mData.size());
unsigned char* bufPtr = &rawData.front();
*reinterpret_cast<uint32*>(bufPtr + 0) = ntohl(obj.mLedgerIndex);
*reinterpret_cast<uint32*>(bufPtr + 4) = ntohl(obj.mLedgerIndex);
*(bufPtr + 8) = static_cast<unsigned char>(obj.mType);
memcpy(bufPtr + 9, &obj.mData.front(), obj.mData.size());
leveldb::Status st = db->Put(leveldb::WriteOptions(),
leveldb::Slice(reinterpret_cast<const char *>(obj.mHash.begin()), obj.mHash.size()),
leveldb::Slice(reinterpret_cast<const char *>(bufPtr), rawData.size()));
if (!st.ok())
{
WriteLog (lsFATAL, HashedObject) << "Failed to store hash node";
assert(false);
}
}
static void LLWrite(const std::vector< boost::shared_ptr<HashedObject> >& set, leveldb::DB* db)
{ // low-level write set
leveldb::WriteBatch batch;
BOOST_FOREACH(const boost::shared_ptr<HashedObject>& it, set)
{
const HashedObject& obj = *it;
std::vector<unsigned char> rawData(9 + obj.mData.size());
unsigned char* bufPtr = &rawData.front();
*reinterpret_cast<uint32*>(bufPtr + 0) = ntohl(obj.mLedgerIndex);
*reinterpret_cast<uint32*>(bufPtr + 4) = ntohl(obj.mLedgerIndex);
*(bufPtr + 8) = static_cast<unsigned char>(obj.mType);
memcpy(bufPtr + 9, &obj.mData.front(), obj.mData.size());
batch.Put(leveldb::Slice(reinterpret_cast<const char *>(obj.mHash.begin()), obj.mHash.size()),
leveldb::Slice(reinterpret_cast<const char *>(bufPtr), rawData.size()));
}
leveldb::Status st = db->Write(leveldb::WriteOptions(), &batch);
if (!st.ok())
{
WriteLog (lsFATAL, HashedObject) << "Failed to store hash node";
assert(false);
}
}
bool HashedObjectStore::storeLevelDB(HashedObjectType type, uint32 index, bool HashedObjectStore::storeLevelDB(HashedObjectType type, uint32 index,
const std::vector<unsigned char>& data, const uint256& hash) const std::vector<unsigned char>& data, const uint256& hash)
{ // return: false = already in cache, true = added to cache { // return: false = already in cache, true = added to cache
@@ -106,31 +176,9 @@ void HashedObjectStore::bulkWriteLevelDB(Job &)
setSize = set.size(); setSize = set.size();
} }
{ LLWrite(set, theApp->getHashNodeLDB());
leveldb::WriteBatch batch; if (mEphemeralDB)
LLWrite(set, theApp->getEphemeralLDB());
BOOST_FOREACH(const boost::shared_ptr<HashedObject>& it, set)
{
const HashedObject& obj = *it;
std::vector<unsigned char> rawData(9 + obj.mData.size());
unsigned char* bufPtr = &rawData.front();
*reinterpret_cast<uint32*>(bufPtr + 0) = ntohl(obj.mLedgerIndex);
*reinterpret_cast<uint32*>(bufPtr + 4) = ntohl(obj.mLedgerIndex);
*(bufPtr + 8) = static_cast<unsigned char>(obj.mType);
memcpy(bufPtr + 9, &obj.mData.front(), obj.mData.size());
batch.Put(leveldb::Slice(reinterpret_cast<const char *>(obj.mHash.begin()), obj.mHash.size()),
leveldb::Slice(reinterpret_cast<const char *>(bufPtr), rawData.size()));
}
leveldb::Status st = theApp->getHashNodeLDB()->Write(leveldb::WriteOptions(), &batch);
if (!st.ok())
{
WriteLog (lsFATAL, HashedObject) << "Failed to store hash node";
assert(false);
}
}
} }
} }
@@ -140,27 +188,28 @@ HashedObject::pointer HashedObjectStore::retrieveLevelDB(const uint256& hash)
if (obj || mNegativeCache.isPresent(hash) || !theApp || !theApp->getHashNodeLDB()) if (obj || mNegativeCache.isPresent(hash) || !theApp || !theApp->getHashNodeLDB())
return obj; return obj;
std::string sData; if (mEphemeralDB)
{
obj = LLRetrieve(hash, theApp->getEphemeralLDB());
if (obj)
return obj;
}
{ {
LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtHO_READ, "HOS::retrieve")); LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtHO_READ, "HOS::retrieve"));
leveldb::Status st = theApp->getHashNodeLDB()->Get(leveldb::ReadOptions(), obj = LLRetrieve(hash, theApp->getHashNodeLDB());
leveldb::Slice(reinterpret_cast<const char *>(hash.begin()), hash.size()), &sData); if (!obj)
if (!st.ok())
{ {
assert(st.IsNotFound()); mNegativeCache.add(hash);
return obj; return obj;
} }
} }
const unsigned char* bufPtr = reinterpret_cast<const unsigned char*>(&sData[0]);
uint32 index = htonl(*reinterpret_cast<const uint32*>(bufPtr));
int htype = bufPtr[8];
obj = boost::make_shared<HashedObject>(static_cast<HashedObjectType>(htype), index,
bufPtr + 9, sData.size() - 9, hash);
mCache.canonicalize(hash, obj); mCache.canonicalize(hash, obj);
if (mEphemeralDB)
LLWrite(obj, theApp->getEphemeralLDB());
WriteLog (lsTRACE, HashedObject) << "HOS: " << hash << " fetch: in db"; WriteLog (lsTRACE, HashedObject) << "HOS: " << hash << " fetch: in db";
return obj; return obj;
} }
@@ -196,6 +245,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index,
// else // else
// WriteLog (lsTRACE, HashedObject) << "HOS: already had " << hash; // WriteLog (lsTRACE, HashedObject) << "HOS: already had " << hash;
mNegativeCache.del(hash); mNegativeCache.del(hash);
return true; return true;
} }
@@ -223,6 +273,9 @@ void HashedObjectStore::bulkWriteSQLite(Job&)
#ifndef NO_SQLITE3_PREPARE #ifndef NO_SQLITE3_PREPARE
if (mEphemeralDB)
LLWrite(set, theApp->getEphemeralLDB());
{ {
Database* db = theApp->getHashNodeDB()->getDB(); Database* db = theApp->getHashNodeDB()->getDB();
static SqliteStatement pStB(db->getSqliteDB(), "BEGIN TRANSACTION;", !theConfig.RUN_STANDALONE); static SqliteStatement pStB(db->getSqliteDB(), "BEGIN TRANSACTION;", !theConfig.RUN_STANDALONE);
@@ -308,6 +361,13 @@ HashedObject::pointer HashedObjectStore::retrieveSQLite(const uint256& hash)
if (mNegativeCache.isPresent(hash)) if (mNegativeCache.isPresent(hash))
return obj; return obj;
if (mEphemeralDB)
{
obj = LLRetrieve(hash, theApp->getEphemeralLDB());
if (obj)
return obj;
}
if (!theApp || !theApp->getHashNodeDB()) if (!theApp || !theApp->getHashNodeDB())
return obj; return obj;
@@ -387,6 +447,9 @@ HashedObject::pointer HashedObjectStore::retrieveSQLite(const uint256& hash)
obj = boost::make_shared<HashedObject>(htype, index, data, hash); obj = boost::make_shared<HashedObject>(htype, index, data, hash);
mCache.canonicalize(hash, obj); mCache.canonicalize(hash, obj);
if (mEphemeralDB)
LLWrite(obj, theApp->getEphemeralLDB());
WriteLog (lsTRACE, HashedObject) << "HOS: " << hash << " fetch: in db"; WriteLog (lsTRACE, HashedObject) << "HOS: " << hash << " fetch: in db";
return obj; return obj;
} }

View File

@@ -69,7 +69,7 @@ protected:
std::vector< boost::shared_ptr<HashedObject> > mWriteSet; std::vector< boost::shared_ptr<HashedObject> > mWriteSet;
bool mWritePending; bool mWritePending;
bool mLevelDB; bool mLevelDB, mEphemeralDB;
public: public: