mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Make the hashed object write code not block on an SQL write. Make bulk updates efficient.
This commit is contained in:
@@ -51,6 +51,7 @@ extern int TxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBC
|
||||
void Application::stop()
|
||||
{
|
||||
mIOService.stop();
|
||||
mHashedObjectStore.bulkWrite();
|
||||
|
||||
Log(lsINFO) << "Stopped: " << mIOService.stopped();
|
||||
}
|
||||
|
||||
@@ -26,46 +26,79 @@ void HashedObject::setHash()
|
||||
mHash = Serializer::getSHA512Half(mData);
|
||||
}
|
||||
|
||||
// FIXME: Stores should be added to a queue that's serviced by an auxilliary thread or from an
|
||||
// auxilliary thread pool. These should be tied into a cache, since you need one to handle
|
||||
// an immedate read back (before the write completes)
|
||||
HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) :
|
||||
mCache(cacheSize, cacheAge), mWritePending(false)
|
||||
{
|
||||
mWriteSet.reserve(128);
|
||||
}
|
||||
|
||||
|
||||
bool HashedObjectStore::store(HashedObjectType type, uint32 index,
|
||||
const std::vector<unsigned char>& data, const uint256& hash)
|
||||
{
|
||||
{ // return: false=already in cache, true = added to cache
|
||||
if (!theApp->getHashNodeDB()) return true;
|
||||
if (mCache.touch(hash)) return false;
|
||||
|
||||
HashedObject::pointer object = boost::make_shared<HashedObject>(type, index, data);
|
||||
object->setHash();
|
||||
if (object->getHash() != hash)
|
||||
throw std::runtime_error("Object added to store doesn't have valid hash");
|
||||
|
||||
std::string sql = "INSERT INTO CommittedObjects (Hash,ObjType,LedgerIndex,Object) VALUES ('";
|
||||
sql.append(hash.GetHex());
|
||||
switch(type)
|
||||
boost::recursive_mutex::scoped_lock sl(mWriteMutex);
|
||||
mWriteSet.push_back(object);
|
||||
if (mWriteSet.size() == 64)
|
||||
{
|
||||
case LEDGER: sql.append("','L','"); break;
|
||||
case TRANSACTION: sql.append("','T','"); break;
|
||||
case ACCOUNT_NODE: sql.append("','A','"); break;
|
||||
case TRANSACTION_NODE: sql.append("','N','"); break;
|
||||
default: sql.append("','U','"); break;
|
||||
boost::recursive_mutex::scoped_lock sl(mWriteMutex);
|
||||
if (!mWritePending)
|
||||
{
|
||||
mWritePending = true;
|
||||
boost::thread t(boost::bind(&HashedObjectStore::bulkWrite, this));
|
||||
t.detach();
|
||||
}
|
||||
}
|
||||
sql.append(boost::lexical_cast<std::string>(index));
|
||||
sql.append("',");
|
||||
std::string obj;
|
||||
theApp->getHashNodeDB()->getDB()->escape(&(data.front()), data.size(), obj);
|
||||
sql.append(obj);
|
||||
sql.append(");");
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string exists =
|
||||
boost::str(boost::format("SELECT ObjType FROM CommittedObjects WHERE Hash = '%s';") % hash.GetHex());
|
||||
void HashedObjectStore::bulkWrite()
|
||||
{
|
||||
std::vector< boost::shared_ptr<HashedObject> > set;
|
||||
set.reserve(128);
|
||||
|
||||
{
|
||||
boost::recursive_mutex::scoped_lock sl(mWriteMutex);
|
||||
mWriteSet.swap(set);
|
||||
mWritePending = false;
|
||||
}
|
||||
|
||||
boost::format fExists("SELECT ObjType FROM CommittedObjects WHERE Hash = '%s';");
|
||||
boost::format fAdd("INSERT INTO ComittedObject (Hash,ObjType,LedgerIndex,Object) VALUES ('%s','%c','%u','%s');");
|
||||
|
||||
ScopedLock sl(theApp->getHashNodeDB()->getDBLock());
|
||||
if (mCache.canonicalize(hash, object))
|
||||
return false;
|
||||
Database* db = theApp->getHashNodeDB()->getDB();
|
||||
if (SQL_EXISTS(db, exists))
|
||||
return false;
|
||||
return db->executeSQL(sql);
|
||||
ScopedLock sl = theApp->getHashNodeDB()->getDBLock();
|
||||
|
||||
db->executeSQL("BEGIN TRANSACTION;");
|
||||
|
||||
for (std::vector< boost::shared_ptr<HashedObject> >::iterator it = set.begin(), end = set.end(); it != end; ++it)
|
||||
{
|
||||
HashedObject& obj = **it;
|
||||
if (!SQL_EXISTS(db, boost::str(fExists % obj.getHash().GetHex())))
|
||||
{
|
||||
char type;
|
||||
switch(obj.getType())
|
||||
{
|
||||
case LEDGER: type = 'L'; break;
|
||||
case TRANSACTION: type = 'T'; break;
|
||||
case ACCOUNT_NODE: type = 'A'; break;
|
||||
case TRANSACTION_NODE: type = 'N'; break;
|
||||
default: type = 'U';
|
||||
}
|
||||
std::string rawData;
|
||||
db->escape(&(obj.getData().front()), obj.getData().size(), rawData);
|
||||
db->executeSQL(boost::str(fAdd % obj.getHash().GetHex() % type % obj.getIndex() % rawData ));
|
||||
}
|
||||
}
|
||||
|
||||
db->executeSQL("END TRANSACTION;");
|
||||
}
|
||||
|
||||
HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
|
||||
@@ -74,7 +107,11 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
|
||||
{
|
||||
ScopedLock sl(theApp->getHashNodeDB()->getDBLock());
|
||||
obj = mCache.fetch(hash);
|
||||
if (obj) return obj;
|
||||
if (obj)
|
||||
{
|
||||
Log(lsTRACE) << "HOS: " << hash.GetHex() << " fetch: incache";
|
||||
return obj;
|
||||
}
|
||||
}
|
||||
|
||||
if (!theApp || !theApp->getHashNodeDB()) return HashedObject::pointer();
|
||||
@@ -90,7 +127,10 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
|
||||
Database* db = theApp->getHashNodeDB()->getDB();
|
||||
|
||||
if (!db->executeSQL(sql) || !db->startIterRows())
|
||||
{
|
||||
Log(lsTRACE) << "HOS: " << hash.GetHex() << " fetch: not in db";
|
||||
return HashedObject::pointer();
|
||||
}
|
||||
|
||||
std::string type;
|
||||
db->getStr("ObjType", type);
|
||||
@@ -122,19 +162,8 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
|
||||
#ifdef DEBUG
|
||||
assert(obj->checkHash());
|
||||
#endif
|
||||
Log(lsTRACE) << "HOS: " << hash.GetHex() << " fetch: in db";
|
||||
return obj;
|
||||
}
|
||||
|
||||
ScopedLock HashedObjectStore::beginBulk()
|
||||
{
|
||||
ScopedLock sl(theApp->getHashNodeDB()->getDBLock());
|
||||
theApp->getHashNodeDB()->getDB()->executeSQL("BEGIN TRANSACTION;");
|
||||
return sl;
|
||||
}
|
||||
|
||||
void HashedObjectStore::endBulk()
|
||||
{
|
||||
theApp->getHashNodeDB()->getDB()->executeSQL("END TRANSACTION;");
|
||||
}
|
||||
|
||||
// vim:ts=4
|
||||
|
||||
@@ -34,8 +34,10 @@ public:
|
||||
bool checkFixHash();
|
||||
void setHash();
|
||||
|
||||
const std::vector<unsigned char>& getData() { return mData; }
|
||||
const uint256& getHash() { return mHash; }
|
||||
const std::vector<unsigned char>& getData() { return mData; }
|
||||
const uint256& getHash() { return mHash; }
|
||||
HashedObjectType getType() { return mType; }
|
||||
uint32 getIndex() { return mLedgerIndex; }
|
||||
};
|
||||
|
||||
class HashedObjectStore
|
||||
@@ -43,33 +45,20 @@ class HashedObjectStore
|
||||
protected:
|
||||
TaggedCache<uint256, HashedObject> mCache;
|
||||
|
||||
boost::recursive_mutex mWriteMutex;
|
||||
std::vector< boost::shared_ptr<HashedObject> > mWriteSet;
|
||||
bool mWritePending;
|
||||
|
||||
public:
|
||||
|
||||
HashedObjectStore(int cacheSize, int cacheAge) : mCache(cacheSize, cacheAge) { ; }
|
||||
HashedObjectStore(int cacheSize, int cacheAge);
|
||||
|
||||
bool store(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
|
||||
const uint256& hash);
|
||||
|
||||
HashedObject::pointer retrieve(const uint256& hash);
|
||||
|
||||
ScopedLock beginBulk();
|
||||
void endBulk();
|
||||
};
|
||||
|
||||
class HashedObjectBulkWriter
|
||||
{
|
||||
protected:
|
||||
HashedObjectStore& mStore;
|
||||
ScopedLock sl;
|
||||
|
||||
public:
|
||||
HashedObjectBulkWriter(HashedObjectStore& ostore) : mStore(ostore), sl(mStore.beginBulk()) { ; }
|
||||
~HashedObjectBulkWriter() { mStore.endBulk(); }
|
||||
|
||||
bool store(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
|
||||
const uint256& hash) { return mStore.store(type, index, data, hash); }
|
||||
|
||||
HashedObject::pointer retrieve(const uint256& hash) { return mStore.retrieve(hash); }
|
||||
void bulkWrite();
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
@@ -642,14 +642,13 @@ int SHAMap::flushDirty(int maxNodes, HashedObjectType t, uint32 seq)
|
||||
|
||||
if(mDirtyNodes)
|
||||
{
|
||||
HashedObjectBulkWriter bw(theApp->getHashedObjectStore());
|
||||
boost::unordered_map<SHAMapNode, SHAMapTreeNode::pointer>& dirtyNodes = *mDirtyNodes;
|
||||
boost::unordered_map<SHAMapNode, SHAMapTreeNode::pointer>::iterator it = dirtyNodes.begin();
|
||||
while (it != dirtyNodes.end())
|
||||
{
|
||||
s.erase();
|
||||
it->second->addRaw(s);
|
||||
bw.store(t, seq, s.peekData(), s.getSHA512Half());
|
||||
theApp->getHashedObjectStore().store(t, seq, s.peekData(), s.getSHA512Half());
|
||||
if (flushed++ >= maxNodes)
|
||||
return flushed;
|
||||
it = dirtyNodes.erase(it);
|
||||
|
||||
Reference in New Issue
Block a user