Clean up the writer thread.

This commit is contained in:
JoelKatz
2012-10-14 23:26:32 -07:00
parent ce24ae1ef9
commit 1c9573a972

View File

@@ -21,7 +21,11 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index,
const std::vector<unsigned char>& data, const uint256& hash)
{ // return: false=already in cache, true = added to cache
assert(hash == Serializer::getSHA512Half(data));
if (!theApp->getHashNodeDB()) return true;
if (!theApp->getHashNodeDB())
{
cLog(lsTRACE) << "HOS: no db";
return true;
}
if (mCache.touch(hash))
{
cLog(lsTRACE) << "HOS: " << hash << " store: incache";
@@ -48,42 +52,49 @@ void HashedObjectStore::bulkWrite()
std::vector< boost::shared_ptr<HashedObject> > set;
set.reserve(128);
do
{
boost::recursive_mutex::scoped_lock sl(mWriteMutex);
mWriteSet.swap(set);
mWritePending = false;
}
cLog(lsINFO) << "HOS: BulkWrite " << set.size();
static boost::format fExists("SELECT ObjType FROM CommittedObjects WHERE Hash = '%s';");
static boost::format
fAdd("INSERT INTO CommittedObjects (Hash,ObjType,LedgerIndex,Object) VALUES ('%s','%c','%u',%s);");
Database* db = theApp->getHashNodeDB()->getDB();
ScopedLock sl = theApp->getHashNodeDB()->getDBLock();
db->executeSQL("BEGIN TRANSACTION;");
BOOST_FOREACH(const boost::shared_ptr<HashedObject>& it, set)
{
if (!SQL_EXISTS(db, boost::str(fExists % it->getHash().GetHex())))
{
char type;
switch(it->getType())
boost::recursive_mutex::scoped_lock sl(mWriteMutex);
mWriteSet.swap(set);
if (set.empty())
{
case hotLEDGER: type= 'L'; break;
case hotTRANSACTION: type = 'T'; break;
case hotACCOUNT_NODE: type = 'A'; break;
case hotTRANSACTION_NODE: type = 'N'; break;
default: type = 'U';
mWritePending = false;
return;
}
std::string rawData;
db->escape(&(it->getData().front()), it->getData().size(), rawData);
db->executeSQL(boost::str(fAdd % it->getHash().GetHex() % type % it->getIndex() % rawData ));
}
}
cLog(lsINFO) << "HOS: BulkWrite " << set.size();
db->executeSQL("END TRANSACTION;");
static boost::format fExists("SELECT ObjType FROM CommittedObjects WHERE Hash = '%s';");
static boost::format
fAdd("INSERT INTO CommittedObjects (Hash,ObjType,LedgerIndex,Object) VALUES ('%s','%c','%u',%s);");
Database* db = theApp->getHashNodeDB()->getDB();
ScopedLock sl = theApp->getHashNodeDB()->getDBLock();
db->executeSQL("BEGIN TRANSACTION;");
BOOST_FOREACH(const boost::shared_ptr<HashedObject>& it, set)
{
if (!SQL_EXISTS(db, boost::str(fExists % it->getHash().GetHex())))
{
char type;
switch(it->getType())
{
case hotLEDGER: type = 'L'; break;
case hotTRANSACTION: type = 'T'; break;
case hotACCOUNT_NODE: type = 'A'; break;
case hotTRANSACTION_NODE: type = 'N'; break;
default: type = 'U';
}
std::string rawData;
db->escape(&(it->getData().front()), it->getData().size(), rawData);
db->executeSQL(boost::str(fAdd % it->getHash().GetHex() % type % it->getIndex() % rawData ));
}
}
db->executeSQL("END TRANSACTION;");
} while(1);
}
HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)