Efficiently save validations.

This commit is contained in:
JoelKatz
2012-07-17 03:10:05 -07:00
parent e89af2f784
commit d83d8080fd
6 changed files with 99 additions and 12 deletions

View File

@@ -52,6 +52,7 @@ void Application::stop()
{
mIOService.stop();
mHashedObjectStore.bulkWrite();
mValidations.flush();
Log(lsINFO) << "Stopped: " << mIOService.stopped();
}

View File

@@ -26,7 +26,7 @@ const char *TxnDBInit[] = {
);",
"CREATE INDEX AcctTxindex ON \
AccountTransactions(Account, LedgerSeq, TransID);",
"CREATE INDEX AcctLgrIndex ON \
"CREATE INDEX AcctLgrIndex ON \
AccountTransactions(LedgerSeq, Account, TransID);",
"END TRANSACTION;"
@@ -46,21 +46,22 @@ const char *LedgerDBInit[] = {
ClosingTime BIGINT UNSIGNED, \
PrevClosingTime BIGINT UNSIGNED, \
CloseTimeRes BIGINT UNSIGNED, \
CloseFlags, BIGINT UNSIGNED, \
CloseFlags BIGINT UNSIGNED, \
AccountSetHash CHARACTER(64), \
TransSetHash CHARACTER(64) \
);",
"CREATE INDEX SeqLedger ON Ledgers(LedgerSeq);",
#if 0
"CREATE TABLE LedgerConfirmations ( \
LedgerSeq BIGINT UNSIGNED, \
"CREATE TABLE LedgerValidations ( \
LedgerHash CHARACTER(64), \
Hanko CHARACTER(35), \
NodePubKey CHARACTER(56), \
Flags BIGINT UNSIGNED, \
CloseTime BIGINT UNSIGNED, \
Signature BLOB \
);",
"CREATE INDEX LedgerConfByHash ON \
LedgerConfirmations(LedgerHash)",
#endif
"CREATE INDEX ValidationByHash ON \
LedgerValidations(LedgerHash);",
"END TRANSACTION;"
};

View File

@@ -55,6 +55,11 @@ uint32 SerializedValidation::getCloseTime() const
return getValueFieldU32(sfCloseTime);
}
uint32 SerializedValidation::getFlags() const
{
return getValueFieldU32(sfFlags);
}
bool SerializedValidation::isValid() const
{
return isValid(getSigningHash());

View File

@@ -26,11 +26,11 @@ public:
uint256 getLedgerHash() const;
uint32 getCloseTime() const;
uint32 getFlags() const;
NewcoinAddress getSignerPublic() const;
bool isValid() const;
bool isFull() const;
bool isTrusted() const { return mTrusted; }
CKey::pointer getSigningKey() const;
uint256 getSigningHash() const;
bool isValid(const uint256&) const;

View File

@@ -30,7 +30,11 @@ bool ValidationCollection::addValidation(SerializedValidation::pointer val)
{
boost::unordered_map<uint160, SerializedValidation::pointer>::iterator it = mCurrentValidations.find(node);
if ((it == mCurrentValidations.end()) || (val->getCloseTime() >= it->second->getCloseTime()))
{
mStaleValidations.push_back(it->second);
mCurrentValidations[node] = val;
condWrite();
}
}
}
@@ -112,17 +116,85 @@ boost::unordered_map<uint256, int> ValidationCollection::getCurrentValidations()
{
boost::mutex::scoped_lock sl(mValidationLock);
boost::unordered_map<uint160, SerializedValidation::pointer>::iterator it = mCurrentValidations.begin();
bool anyNew = false;
while (it != mCurrentValidations.end())
{
if (now > (it->second->getCloseTime() + LEDGER_MAX_INTERVAL))
{
mStaleValidations.push_back(it->second);
it = mCurrentValidations.erase(it);
anyNew = true;
}
else
{
++ret[it->second->getLedgerHash()];
++it;
}
}
if (anyNew)
condWrite();
}
return ret;
}
void ValidationCollection::flush()
{
boost::mutex::scoped_lock sl(mValidationLock);
boost::unordered_map<uint160, SerializedValidation::pointer>::iterator it = mCurrentValidations.begin();
bool anyNew = false;
while (it != mCurrentValidations.end())
{
mStaleValidations.push_back(it->second);
++it;
anyNew = true;
}
mCurrentValidations.clear();
if (anyNew)
condWrite();
while (mWriting)
{
sl.unlock();
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
sl.lock();
}
}
void ValidationCollection::condWrite()
{
if (mWriting)
return;
mWriting = true;
boost::thread(boost::bind(&ValidationCollection::doWrite, this));
thread.detach();
}
void ValidationCollection::doWrite()
{
static boost::format insVal("INSERT INTO LedgerValidations "
"(LedgerHash,NodePubKey,Flags,CloseTime,Signature) VALUES "
"('%s','%s','%u','%u',%s);");
boost::mutex::scoped_lock sl(mValidationLock);
assert(mWriting);
while (!mStaleValidations.empty())
{
std::vector<SerializedValidation::pointer> vector;
mStaleValidations.swap(vector);
sl.unlock();
{
ScopedLock dbl(theApp->getLedgerDB()->getDBLock());
Database *db = theApp->getLedgerDB()->getDB();
db->executeSQL("BEGIN TRANSACTION;");
for (std::vector<SerializedValidation::pointer>::iterator it = vector.begin(); it != vector.end(); ++it)
db->executeSQL(boost::str(insVal % (*it)->getLedgerHash().GetHex()
% (*it)->getSignerPublic().humanNodePublic() % (*it)->getFlags() % (*it)->getCloseTime()
% db->escape(strCopy((*it)->getSignature()))));
db->executeSQL("END TRANSACTION;");
}
sl.lock();
}
mWriting = false;
}

View File

@@ -1,6 +1,8 @@
#ifndef __VALIDATION_COLLECTION__
#define __VALIDATION_COLLECTION__
#include <vector>
#include <boost/unordered_map.hpp>
#include <boost/thread/mutex.hpp>
@@ -12,14 +14,19 @@ typedef boost::unordered_map<uint160, SerializedValidation::pointer> ValidationS
class ValidationCollection
{
protected:
protected:
boost::mutex mValidationLock;
boost::unordered_map<uint256, ValidationSet> mValidations;
boost::unordered_map<uint160, SerializedValidation::pointer> mCurrentValidations;
std::vector<SerializedValidation::pointer> mStaleValidations;
bool mWriting;
void doWrite();
void condWrite();
public:
ValidationCollection() { ; }
ValidationCollection() : mWriting(false) { ; }
bool addValidation(SerializedValidation::pointer);
ValidationSet getValidations(const uint256& ledger);
@@ -27,6 +34,7 @@ public:
int getTrustedValidationCount(const uint256& ledger);
int getCurrentValidationCount(uint32 afterTime);
boost::unordered_map<uint256, int> getCurrentValidations();
void flush();
};
#endif