diff --git a/src/Application.cpp b/src/Application.cpp index f85992d93b..e500b39eeb 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -51,6 +51,7 @@ extern int TxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBC void Application::stop() { mIOService.stop(); + mHashedObjectStore.bulkWrite(); Log(lsINFO) << "Stopped: " << mIOService.stopped(); } diff --git a/src/HashedObject.cpp b/src/HashedObject.cpp index 63ff9f633f..03d885b71e 100644 --- a/src/HashedObject.cpp +++ b/src/HashedObject.cpp @@ -26,46 +26,79 @@ void HashedObject::setHash() mHash = Serializer::getSHA512Half(mData); } -// FIXME: Stores should be added to a queue that's serviced by an auxilliary thread or from an -// auxilliary thread pool. These should be tied into a cache, since you need one to handle -// an immedate read back (before the write completes) +HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) : + mCache(cacheSize, cacheAge), mWritePending(false) +{ + mWriteSet.reserve(128); +} + bool HashedObjectStore::store(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash) -{ +{ // return: false=already in cache, true = added to cache if (!theApp->getHashNodeDB()) return true; + if (mCache.touch(hash)) return false; + HashedObject::pointer object = boost::make_shared(type, index, data); object->setHash(); if (object->getHash() != hash) throw std::runtime_error("Object added to store doesn't have valid hash"); - std::string sql = "INSERT INTO CommittedObjects (Hash,ObjType,LedgerIndex,Object) VALUES ('"; - sql.append(hash.GetHex()); - switch(type) + boost::recursive_mutex::scoped_lock sl(mWriteMutex); + mWriteSet.push_back(object); + if (mWriteSet.size() == 64) { - case LEDGER: sql.append("','L','"); break; - case TRANSACTION: sql.append("','T','"); break; - case ACCOUNT_NODE: sql.append("','A','"); break; - case TRANSACTION_NODE: sql.append("','N','"); break; - default: sql.append("','U','"); break; + boost::recursive_mutex::scoped_lock sl(mWriteMutex); + if (!mWritePending) + { + mWritePending = true; + boost::thread t(boost::bind(&HashedObjectStore::bulkWrite, this)); + t.detach(); + } } - sql.append(boost::lexical_cast(index)); - sql.append("',"); - std::string obj; - theApp->getHashNodeDB()->getDB()->escape(&(data.front()), data.size(), obj); - sql.append(obj); - sql.append(");"); + return true; +} - std::string exists = - boost::str(boost::format("SELECT ObjType FROM CommittedObjects WHERE Hash = '%s';") % hash.GetHex()); +void HashedObjectStore::bulkWrite() +{ + std::vector< boost::shared_ptr > set; + set.reserve(128); + + { + boost::recursive_mutex::scoped_lock sl(mWriteMutex); + mWriteSet.swap(set); + mWritePending = false; + } + + boost::format fExists("SELECT ObjType FROM CommittedObjects WHERE Hash = '%s';"); + boost::format fAdd("INSERT INTO ComittedObject (Hash,ObjType,LedgerIndex,Object) VALUES ('%s','%c','%u','%s');"); - ScopedLock sl(theApp->getHashNodeDB()->getDBLock()); - if (mCache.canonicalize(hash, object)) - return false; Database* db = theApp->getHashNodeDB()->getDB(); - if (SQL_EXISTS(db, exists)) - return false; - return db->executeSQL(sql); + ScopedLock sl = theApp->getHashNodeDB()->getDBLock(); + + db->executeSQL("BEGIN TRANSACTION;"); + + for (std::vector< boost::shared_ptr >::iterator it = set.begin(), end = set.end(); it != end; ++it) + { + HashedObject& obj = **it; + if (!SQL_EXISTS(db, boost::str(fExists % obj.getHash().GetHex()))) + { + char type; + switch(obj.getType()) + { + case LEDGER: type = 'L'; break; + case TRANSACTION: type = 'T'; break; + case ACCOUNT_NODE: type = 'A'; break; + case TRANSACTION_NODE: type = 'N'; break; + default: type = 'U'; + } + std::string rawData; + db->escape(&(obj.getData().front()), obj.getData().size(), rawData); + db->executeSQL(boost::str(fAdd % obj.getHash().GetHex() % type % obj.getIndex() % rawData )); + } + } + + db->executeSQL("END TRANSACTION;"); } HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash) @@ -74,7 +107,11 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash) { ScopedLock sl(theApp->getHashNodeDB()->getDBLock()); obj = mCache.fetch(hash); - if (obj) return obj; + if (obj) + { + Log(lsTRACE) << "HOS: " << hash.GetHex() << " fetch: incache"; + return obj; + } } if (!theApp || !theApp->getHashNodeDB()) return HashedObject::pointer(); @@ -90,7 +127,10 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash) Database* db = theApp->getHashNodeDB()->getDB(); if (!db->executeSQL(sql) || !db->startIterRows()) + { + Log(lsTRACE) << "HOS: " << hash.GetHex() << " fetch: not in db"; return HashedObject::pointer(); + } std::string type; db->getStr("ObjType", type); @@ -122,19 +162,8 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash) #ifdef DEBUG assert(obj->checkHash()); #endif + Log(lsTRACE) << "HOS: " << hash.GetHex() << " fetch: in db"; return obj; } -ScopedLock HashedObjectStore::beginBulk() -{ - ScopedLock sl(theApp->getHashNodeDB()->getDBLock()); - theApp->getHashNodeDB()->getDB()->executeSQL("BEGIN TRANSACTION;"); - return sl; -} - -void HashedObjectStore::endBulk() -{ - theApp->getHashNodeDB()->getDB()->executeSQL("END TRANSACTION;"); -} - // vim:ts=4 diff --git a/src/HashedObject.h b/src/HashedObject.h index 0f66c1509e..514a026670 100644 --- a/src/HashedObject.h +++ b/src/HashedObject.h @@ -34,8 +34,10 @@ public: bool checkFixHash(); void setHash(); - const std::vector& getData() { return mData; } - const uint256& getHash() { return mHash; } + const std::vector& getData() { return mData; } + const uint256& getHash() { return mHash; } + HashedObjectType getType() { return mType; } + uint32 getIndex() { return mLedgerIndex; } }; class HashedObjectStore @@ -43,33 +45,20 @@ class HashedObjectStore protected: TaggedCache mCache; + boost::recursive_mutex mWriteMutex; + std::vector< boost::shared_ptr > mWriteSet; + bool mWritePending; + public: - HashedObjectStore(int cacheSize, int cacheAge) : mCache(cacheSize, cacheAge) { ; } + HashedObjectStore(int cacheSize, int cacheAge); bool store(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash); HashedObject::pointer retrieve(const uint256& hash); - ScopedLock beginBulk(); - void endBulk(); -}; - -class HashedObjectBulkWriter -{ -protected: - HashedObjectStore& mStore; - ScopedLock sl; - -public: - HashedObjectBulkWriter(HashedObjectStore& ostore) : mStore(ostore), sl(mStore.beginBulk()) { ; } - ~HashedObjectBulkWriter() { mStore.endBulk(); } - - bool store(HashedObjectType type, uint32 index, const std::vector& data, - const uint256& hash) { return mStore.store(type, index, data, hash); } - - HashedObject::pointer retrieve(const uint256& hash) { return mStore.retrieve(hash); } + void bulkWrite(); }; #endif diff --git a/src/SHAMap.cpp b/src/SHAMap.cpp index e3961f5901..ea592d60a5 100644 --- a/src/SHAMap.cpp +++ b/src/SHAMap.cpp @@ -642,14 +642,13 @@ int SHAMap::flushDirty(int maxNodes, HashedObjectType t, uint32 seq) if(mDirtyNodes) { - HashedObjectBulkWriter bw(theApp->getHashedObjectStore()); boost::unordered_map& dirtyNodes = *mDirtyNodes; boost::unordered_map::iterator it = dirtyNodes.begin(); while (it != dirtyNodes.end()) { s.erase(); it->second->addRaw(s); - bw.store(t, seq, s.peekData(), s.getSHA512Half()); + theApp->getHashedObjectStore().store(t, seq, s.peekData(), s.getSHA512Half()); if (flushed++ >= maxNodes) return flushed; it = dirtyNodes.erase(it);