diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 031c26136e..ea7f1b66af 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -180,8 +180,6 @@ void Application::setup() options.filter_policy = leveldb::NewBloomFilterPolicy(10); if (theConfig.LDB_IMPORT) options.write_buffer_size = 32 << 20; - options.write_buffer_size = std::max(options.write_buffer_size, - static_cast(theConfig.getSize(siLDBWriteSize) << 20)); leveldb::Status status = leveldb::DB::Open(options, (theConfig.DATA_DIR / "hashnode").string(), &mHashNodeLDB); if (!status.ok() || !mHashNodeLDB) { diff --git a/src/cpp/ripple/Config.cpp b/src/cpp/ripple/Config.cpp index 15115dc42a..891bf45fce 100644 --- a/src/cpp/ripple/Config.cpp +++ b/src/cpp/ripple/Config.cpp @@ -516,7 +516,6 @@ int Config::getSize(SizedItemName item) { siHashNodeDBCache, { 24, 48, 64, 128, 256 } }, { siTxnDBCache, { 4, 12, 48, 96, 192 } }, { siLgrDBCache, { 4, 8, 32, 64, 128 } }, - { siLDBWriteSize, { 8, 16, 32, 64, 128 } } }; for (int i = 0; i < (sizeof(sizeTable) / sizeof(SizedItem)); ++i) diff --git a/src/cpp/ripple/Config.h b/src/cpp/ripple/Config.h index d8b884c978..bbb3a6521f 100644 --- a/src/cpp/ripple/Config.h +++ b/src/cpp/ripple/Config.h @@ -68,7 +68,6 @@ enum SizedItemName siHashNodeDBCache, siTxnDBCache, siLgrDBCache, - siLDBWriteSize }; struct SizedItem diff --git a/src/cpp/ripple/HashedObject.cpp b/src/cpp/ripple/HashedObject.cpp index c3cf13958b..20bff54f86 100644 --- a/src/cpp/ripple/HashedObject.cpp +++ b/src/cpp/ripple/HashedObject.cpp @@ -1,8 +1,8 @@ - #include "HashedObject.h" #ifdef USE_LEVELDB #include "leveldb/db.h" +#include "leveldb/write_batch.h" #endif #include @@ -68,34 +68,72 @@ bool HashedObjectStore::storeLevelDB(HashedObjectType type, uint32 index, HashedObject::pointer object = boost::make_shared(type, index, data, hash); if (!mCache.canonicalize(hash, object)) { - LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtHO_WRITE, "HOS::store")); - - std::vector rawData(9 + data.size()); - unsigned char* bufPtr = &rawData.front(); - - *reinterpret_cast(bufPtr + 0) = ntohl(index); - *reinterpret_cast(bufPtr + 4) = ntohl(index); - *(bufPtr + 8) = static_cast(type); - memcpy(bufPtr + 9, &data.front(), data.size()); - - leveldb::Status st = theApp->getHashNodeLDB()->Put(leveldb::WriteOptions(), - leveldb::Slice(reinterpret_cast(hash.begin()), hash.size()), - leveldb::Slice(reinterpret_cast(bufPtr), 9 + data.size())); - if (!st.ok()) + boost::mutex::scoped_lock sl(mWriteMutex); + mWriteSet.push_back(object); + if (!mWritePending) { - cLog(lsFATAL) << "Failed to store hash node"; - assert(false); + mWritePending = true; + theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store", + BIND_TYPE(&HashedObjectStore::bulkWriteLevelDB, this)); } } - else - cLog(lsDEBUG) << "HOS: store race"; + mNegativeCache.del(hash); return true; } +void HashedObjectStore::bulkWriteLevelDB() +{ + assert(mLevelDB); + while (1) + { + std::vector< boost::shared_ptr > set; + set.reserve(128); + + { + boost::mutex::scoped_lock sl(mWriteMutex); + mWriteSet.swap(set); + assert(mWriteSet.empty()); + ++mWriteGeneration; + mWriteCondition.notify_all(); + if (set.empty()) + { + mWritePending = false; + return; + } + } + + { + leveldb::WriteBatch batch; + + BOOST_FOREACH(const boost::shared_ptr& it, set) + { + const HashedObject& obj = *it; + std::vector rawData(9 + obj.mData.size()); + unsigned char* bufPtr = &rawData.front(); + + *reinterpret_cast(bufPtr + 0) = ntohl(obj.mLedgerIndex); + *reinterpret_cast(bufPtr + 4) = ntohl(obj.mLedgerIndex); + *(bufPtr + 8) = static_cast(obj.mType); + memcpy(bufPtr + 9, &obj.mData.front(), obj.mData.size()); + + batch.Put(leveldb::Slice(reinterpret_cast(obj.mHash.begin()), obj.mHash.size()), + leveldb::Slice(reinterpret_cast(bufPtr), rawData.size())); + } + + leveldb::Status st = theApp->getHashNodeLDB()->Write(leveldb::WriteOptions(), &batch); + if (!st.ok()) + { + cLog(lsFATAL) << "Failed to store hash node"; + assert(false); + } + } + } +} + HashedObject::pointer HashedObjectStore::retrieveLevelDB(const uint256& hash) { HashedObject::pointer obj = mCache.fetch(hash); - if (obj) + if (obj || mNegativeCache.isPresent(hash)) return obj; if (!theApp || !theApp->getHashNodeLDB()) @@ -153,7 +191,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index, { mWritePending = true; theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store", - BIND_TYPE(&HashedObjectStore::bulkWrite, this)); + BIND_TYPE(&HashedObjectStore::bulkWriteSQLite, this)); } } // else @@ -172,7 +210,7 @@ void HashedObjectStore::waitWrite() mWriteCondition.wait(sl); } -void HashedObjectStore::bulkWrite() +void HashedObjectStore::bulkWriteSQLite() { assert(!mLevelDB); while (1) diff --git a/src/cpp/ripple/HashedObject.h b/src/cpp/ripple/HashedObject.h index e66f1f680c..014977ef5b 100644 --- a/src/cpp/ripple/HashedObject.h +++ b/src/cpp/ripple/HashedObject.h @@ -88,14 +88,16 @@ public: bool storeSQLite(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash); HashedObject::pointer retrieveSQLite(const uint256& hash); + void bulkWriteSQLite(); #ifdef USE_LEVELDB bool storeLevelDB(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash); HashedObject::pointer retrieveLevelDB(const uint256& hash); + void bulkWriteLevelDB(); #endif - void bulkWrite(); + void waitWrite(); void tune(int size, int age); void sweep() { mCache.sweep(); mNegativeCache.sweep(); }