Move all SQL operations on ledger close into the ledger code so we can

accept ledgers without having to participate in the consensus process. We'll
need this when we implement "catch up".

Move AcctTx into the same connection as Txn so they can be part of a single
transaction.

Dispatch ledger accept synchronization functions into a detached thread so it
doesn't stall our I/O engine.
This commit is contained in:
JoelKatz
2012-06-09 22:01:31 -07:00
parent 16f68296c4
commit 2d49cacd29
8 changed files with 52 additions and 52 deletions

View File

@@ -35,16 +35,15 @@ DatabaseCon::~DatabaseCon()
Application::Application() :
mUNL(mIOService),
mNetOps(mIOService, &mMasterLedger), mNodeCache(16384, 600),
mTxnDB(NULL), mAcctTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL),
mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL),
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL)
{
RAND_bytes(mNonce256.begin(), mNonce256.size());
RAND_bytes(reinterpret_cast<unsigned char *>(&mNonceST), sizeof(mNonceST));
}
extern const char *AcctTxnDBInit[], *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[],
*HashNodeDBInit[], *NetNodeDBInit[];
extern int TxnDBCount, AcctTxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBCount;
extern const char *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], *HashNodeDBInit[], *NetNodeDBInit[];
extern int TxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBCount;
void Application::stop()
{
@@ -61,7 +60,6 @@ void Application::run()
// Construct databases.
//
mTxnDB = new DatabaseCon("transaction.db", TxnDBInit, TxnDBCount);
mAcctTxnDB = new DatabaseCon("accttx.db", AcctTxnDBInit, AcctTxnDBCount);
mLedgerDB = new DatabaseCon("ledger.db", LedgerDBInit, LedgerDBCount);
mWalletDB = new DatabaseCon("wallet.db", WalletDBInit, WalletDBCount);
mHashNodeDB = new DatabaseCon("hashnode.db", HashNodeDBInit, HashNodeDBCount);
@@ -140,7 +138,6 @@ void Application::run()
Application::~Application()
{
delete mTxnDB;
delete mAcctTxnDB;
delete mLedgerDB;
delete mWalletDB;
delete mHashNodeDB;

View File

@@ -45,7 +45,7 @@ class Application
NetworkOPs mNetOps;
NodeCache mNodeCache;
DatabaseCon *mTxnDB, *mAcctTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB;
DatabaseCon *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB;
ConnectionPool mConnectionPool;
PeerDoor* mPeerDoor;
@@ -76,7 +76,6 @@ public:
NodeCache& getNodeCache() { return mNodeCache; }
DatabaseCon* getTxnDB() { return mTxnDB; }
DatabaseCon* getAcctTxnDB() { return mAcctTxnDB; }
DatabaseCon* getLedgerDB() { return mLedgerDB; }
DatabaseCon* getWalletDB() { return mWalletDB; }
DatabaseCon* getHashNodeDB() { return mHashNodeDB; }

View File

@@ -14,12 +14,7 @@ const char *TxnDBInit[] = {
"CREATE TABLE PubKeys ( \
ID CHARACTER(35) PRIMARY KEY, \
PubKey BLOB \
);"
};
int TxnDBCount = sizeof(TxnDBInit) / sizeof(const char *);
const char *AcctTxnDBInit[] = {
);",
"CREATE TABLE AccountTransactions ( \
TransID CHARACTER(64), \
Account CHARACTER(64), \
@@ -29,7 +24,7 @@ const char *AcctTxnDBInit[] = {
AccountTransactions(Account, LedgerSeq, TransID);"
};
int AcctTxnDBCount = sizeof(AcctTxnDBInit) / sizeof(const char *);
int TxnDBCount = sizeof(TxnDBInit) / sizeof(const char *);
// Ledger database holds ledgers and ledger confirmations
const char *LedgerDBInit[] = {

View File

@@ -16,6 +16,7 @@
#include "Wallet.h"
#include "BinaryFormats.h"
#include "LedgerTiming.h"
#include "Log.h"
Ledger::Ledger(const NewcoinAddress& masterID, uint64 startAmount) : mTotCoins(startAmount),
mCloseTime(0), mLedgerSeq(0), mLedgerInterval(LEDGER_INTERVAL), mClosed(false), mValidHash(false),
@@ -261,6 +262,41 @@ void Ledger::saveAcceptedLedger(Ledger::pointer ledger)
while(ledger->mAccountStateMap->flushDirty(64, ACCOUNT_NODE, ledger->mLedgerSeq))
{ ; }
SHAMap& txSet = *ledger->peekAccountStateMap();
Database *db = theApp->getTxnDB()->getDB();
ScopedLock dbLock = theApp->getTxnDB()->getDBLock();
db->executeSQL("BEGIN TRANSACTION;");
for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag()))
{
SerializerIterator sit(item->peekSerializer());
SerializedTransaction txn(sit);
std::vector<NewcoinAddress> accts = txn.getAffectedAccounts();
std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES ";
bool first = true;
for (std::vector<NewcoinAddress>::iterator it = accts.begin(), end = accts.end(); it != end; ++it)
{
if (!first)
sql += ", ('";
else
{
sql += "('";
first = false;
}
sql += txn.getTransactionID().GetHex();
sql += "','";
sql += it->humanAccountID();
sql += "',";
sql += boost::lexical_cast<std::string>(ledger->getLedgerSeq());
sql += ")";
}
sql += ";";
Log(lsTRACE) << "ActTx: " << sql;
db->executeSQL(sql);
db->executeSQL(txn.getSQLInsertHeader() + txn.getSQL(ledger->getLedgerSeq(), TXN_SQL_VALIDATED) + ";");
// FIXME: If above updates no rows, modify seq/status
}
db->executeSQL("COMMIT TRANSACTION;");
}
Ledger::pointer Ledger::getSQL(const std::string& sql)

View File

@@ -727,40 +727,6 @@ void LedgerConsensus::accept(SHAMap::pointer set)
theApp->getConnectionPool().relayMessage(NULL, boost::make_shared<PackedMessage>(val, newcoin::mtVALIDATION));
Log(lsINFO) << "Validation sent " << newLCL->getHash().GetHex();
statusChange(newcoin::neACCEPTED_LEDGER, newOL);
// Insert the transactions in set into the AcctTxn database
Database *db = theApp->getAcctTxnDB()->getDB();
ScopedLock dbLock = theApp->getAcctTxnDB()->getDBLock();
db->executeSQL("BEGIN TRANSACTION;");
for (SHAMapItem::pointer item = set->peekFirstItem(); !!item; item = set->peekNextItem(item->getTag()))
{
SerializerIterator sit(item->peekSerializer());
SerializedTransaction txn(sit);
std::vector<NewcoinAddress> accts = txn.getAffectedAccounts();
std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES ";
bool first = true;
for (std::vector<NewcoinAddress>::iterator it = accts.begin(), end = accts.end(); it != end; ++it)
{
if (!first)
sql += ", ('";
else
{
sql += "('";
first = false;
}
sql += txn.getTransactionID().GetHex();
sql += "','";
sql += it->humanAccountID();
sql += "',";
sql += boost::lexical_cast<std::string>(newLedgerSeq);
sql += ")";
}
sql += ";";
Log(lsTRACE) << "ActTx: " << sql;
db->executeSQL(sql);
}
db->executeSQL("COMMIT TRANSACTION;");
}
void LedgerConsensus::endConsensus()

View File

@@ -2,6 +2,7 @@
#include <string>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include "LedgerHistory.h"
#include "Config.h"
@@ -32,7 +33,8 @@ void LedgerHistory::addAcceptedLedger(Ledger::pointer ledger)
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
mLedgersByHash.canonicalize(h, ledger);
mLedgersByIndex.insert(std::make_pair(ledger->getLedgerSeq(), ledger));
theApp->getIOService().post(boost::bind(&Ledger::saveAcceptedLedger, ledger));
boost::thread thread(boost::bind(&Ledger::saveAcceptedLedger, ledger));
thread.detach();
}
Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index)

View File

@@ -542,8 +542,8 @@ std::vector< std::pair<uint32, uint256> >
" WHERE Account = '%s' AND LedgerSeq <= '%d' AND LedgerSeq >= '%d' ORDER BY LedgerSeq LIMIT 1000")
% account.humanAccountID() % maxLedger % minLedger);
Database *db = theApp->getAcctTxnDB()->getDB();
ScopedLock dbLock = theApp->getAcctTxnDB()->getDBLock();
Database *db = theApp->getTxnDB()->getDB();
ScopedLock dbLock = theApp->getTxnDB()->getDBLock();
SQL_FOREACH(db, sql)
{

View File

@@ -10,6 +10,11 @@
#include "TransactionFormats.h"
#include "NewcoinAddress.h"
#define TXN_SQL_NEW 'N'
#define TXN_SQL_CONFLICT 'C'
#define TXN_SQL_HELD 'H'
#define TXN_SQL_VALIDATED 'V'
class SerializedTransaction : public SerializedType
{
public: