From 2d49cacd2961efe429bbd8ed24f1af95dabb6d85 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 9 Jun 2012 22:01:31 -0700 Subject: [PATCH] Move all SQL operations on ledger close into the ledger code so we can accept ledgers without having to participate in the consensus process. We'll need this when we implement "catch up". Move AcctTx into the same connection as Txn so they can be part of a single transaction. Dispatch ledger accept synchronization functions into a detached thread so it doesn't stall our I/O engine. --- src/Application.cpp | 9 +++------ src/Application.h | 3 +-- src/DBInit.cpp | 9 ++------- src/Ledger.cpp | 36 ++++++++++++++++++++++++++++++++++++ src/LedgerConsensus.cpp | 34 ---------------------------------- src/LedgerHistory.cpp | 4 +++- src/NetworkOPs.cpp | 4 ++-- src/SerializedTransaction.h | 5 +++++ 8 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/Application.cpp b/src/Application.cpp index 97e0f47d8c..2f05c39440 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -35,16 +35,15 @@ DatabaseCon::~DatabaseCon() Application::Application() : mUNL(mIOService), mNetOps(mIOService, &mMasterLedger), mNodeCache(16384, 600), - mTxnDB(NULL), mAcctTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), + mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL) { RAND_bytes(mNonce256.begin(), mNonce256.size()); RAND_bytes(reinterpret_cast(&mNonceST), sizeof(mNonceST)); } -extern const char *AcctTxnDBInit[], *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], - *HashNodeDBInit[], *NetNodeDBInit[]; -extern int TxnDBCount, AcctTxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBCount; +extern const char *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], *HashNodeDBInit[], *NetNodeDBInit[]; +extern int TxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBCount; void Application::stop() { @@ -61,7 +60,6 @@ void Application::run() // Construct databases. // mTxnDB = new DatabaseCon("transaction.db", TxnDBInit, TxnDBCount); - mAcctTxnDB = new DatabaseCon("accttx.db", AcctTxnDBInit, AcctTxnDBCount); mLedgerDB = new DatabaseCon("ledger.db", LedgerDBInit, LedgerDBCount); mWalletDB = new DatabaseCon("wallet.db", WalletDBInit, WalletDBCount); mHashNodeDB = new DatabaseCon("hashnode.db", HashNodeDBInit, HashNodeDBCount); @@ -140,7 +138,6 @@ void Application::run() Application::~Application() { delete mTxnDB; - delete mAcctTxnDB; delete mLedgerDB; delete mWalletDB; delete mHashNodeDB; diff --git a/src/Application.h b/src/Application.h index 546d0aad51..1771aa63e4 100644 --- a/src/Application.h +++ b/src/Application.h @@ -45,7 +45,7 @@ class Application NetworkOPs mNetOps; NodeCache mNodeCache; - DatabaseCon *mTxnDB, *mAcctTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; + DatabaseCon *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; ConnectionPool mConnectionPool; PeerDoor* mPeerDoor; @@ -76,7 +76,6 @@ public: NodeCache& getNodeCache() { return mNodeCache; } DatabaseCon* getTxnDB() { return mTxnDB; } - DatabaseCon* getAcctTxnDB() { return mAcctTxnDB; } DatabaseCon* getLedgerDB() { return mLedgerDB; } DatabaseCon* getWalletDB() { return mWalletDB; } DatabaseCon* getHashNodeDB() { return mHashNodeDB; } diff --git a/src/DBInit.cpp b/src/DBInit.cpp index a2b5be277b..66a87f8af3 100644 --- a/src/DBInit.cpp +++ b/src/DBInit.cpp @@ -14,12 +14,7 @@ const char *TxnDBInit[] = { "CREATE TABLE PubKeys ( \ ID CHARACTER(35) PRIMARY KEY, \ PubKey BLOB \ - );" -}; - -int TxnDBCount = sizeof(TxnDBInit) / sizeof(const char *); - -const char *AcctTxnDBInit[] = { + );", "CREATE TABLE AccountTransactions ( \ TransID CHARACTER(64), \ Account CHARACTER(64), \ @@ -29,7 +24,7 @@ const char *AcctTxnDBInit[] = { AccountTransactions(Account, LedgerSeq, TransID);" }; -int AcctTxnDBCount = sizeof(AcctTxnDBInit) / sizeof(const char *); +int TxnDBCount = sizeof(TxnDBInit) / sizeof(const char *); // Ledger database holds ledgers and ledger confirmations const char *LedgerDBInit[] = { diff --git a/src/Ledger.cpp b/src/Ledger.cpp index a77b0e649c..278fa7c221 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -16,6 +16,7 @@ #include "Wallet.h" #include "BinaryFormats.h" #include "LedgerTiming.h" +#include "Log.h" Ledger::Ledger(const NewcoinAddress& masterID, uint64 startAmount) : mTotCoins(startAmount), mCloseTime(0), mLedgerSeq(0), mLedgerInterval(LEDGER_INTERVAL), mClosed(false), mValidHash(false), @@ -261,6 +262,41 @@ void Ledger::saveAcceptedLedger(Ledger::pointer ledger) while(ledger->mAccountStateMap->flushDirty(64, ACCOUNT_NODE, ledger->mLedgerSeq)) { ; } + SHAMap& txSet = *ledger->peekAccountStateMap(); + Database *db = theApp->getTxnDB()->getDB(); + ScopedLock dbLock = theApp->getTxnDB()->getDBLock(); + db->executeSQL("BEGIN TRANSACTION;"); + for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag())) + { + SerializerIterator sit(item->peekSerializer()); + SerializedTransaction txn(sit); + std::vector accts = txn.getAffectedAccounts(); + + std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES "; + bool first = true; + for (std::vector::iterator it = accts.begin(), end = accts.end(); it != end; ++it) + { + if (!first) + sql += ", ('"; + else + { + sql += "('"; + first = false; + } + sql += txn.getTransactionID().GetHex(); + sql += "','"; + sql += it->humanAccountID(); + sql += "',"; + sql += boost::lexical_cast(ledger->getLedgerSeq()); + sql += ")"; + } + sql += ";"; + Log(lsTRACE) << "ActTx: " << sql; + db->executeSQL(sql); + db->executeSQL(txn.getSQLInsertHeader() + txn.getSQL(ledger->getLedgerSeq(), TXN_SQL_VALIDATED) + ";"); + // FIXME: If above updates no rows, modify seq/status + } + db->executeSQL("COMMIT TRANSACTION;"); } Ledger::pointer Ledger::getSQL(const std::string& sql) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index fa2ecb1896..0fcc1ee132 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -727,40 +727,6 @@ void LedgerConsensus::accept(SHAMap::pointer set) theApp->getConnectionPool().relayMessage(NULL, boost::make_shared(val, newcoin::mtVALIDATION)); Log(lsINFO) << "Validation sent " << newLCL->getHash().GetHex(); statusChange(newcoin::neACCEPTED_LEDGER, newOL); - - // Insert the transactions in set into the AcctTxn database - Database *db = theApp->getAcctTxnDB()->getDB(); - ScopedLock dbLock = theApp->getAcctTxnDB()->getDBLock(); - db->executeSQL("BEGIN TRANSACTION;"); - for (SHAMapItem::pointer item = set->peekFirstItem(); !!item; item = set->peekNextItem(item->getTag())) - { - SerializerIterator sit(item->peekSerializer()); - SerializedTransaction txn(sit); - std::vector accts = txn.getAffectedAccounts(); - - std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES "; - bool first = true; - for (std::vector::iterator it = accts.begin(), end = accts.end(); it != end; ++it) - { - if (!first) - sql += ", ('"; - else - { - sql += "('"; - first = false; - } - sql += txn.getTransactionID().GetHex(); - sql += "','"; - sql += it->humanAccountID(); - sql += "',"; - sql += boost::lexical_cast(newLedgerSeq); - sql += ")"; - } - sql += ";"; - Log(lsTRACE) << "ActTx: " << sql; - db->executeSQL(sql); - } - db->executeSQL("COMMIT TRANSACTION;"); } void LedgerConsensus::endConsensus() diff --git a/src/LedgerHistory.cpp b/src/LedgerHistory.cpp index 3ea397d83a..ad357139ab 100644 --- a/src/LedgerHistory.cpp +++ b/src/LedgerHistory.cpp @@ -2,6 +2,7 @@ #include #include +#include #include "LedgerHistory.h" #include "Config.h" @@ -32,7 +33,8 @@ void LedgerHistory::addAcceptedLedger(Ledger::pointer ledger) boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex()); mLedgersByHash.canonicalize(h, ledger); mLedgersByIndex.insert(std::make_pair(ledger->getLedgerSeq(), ledger)); - theApp->getIOService().post(boost::bind(&Ledger::saveAcceptedLedger, ledger)); + boost::thread thread(boost::bind(&Ledger::saveAcceptedLedger, ledger)); + thread.detach(); } Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 0efcc2b30b..db14bbf94c 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -542,8 +542,8 @@ std::vector< std::pair > " WHERE Account = '%s' AND LedgerSeq <= '%d' AND LedgerSeq >= '%d' ORDER BY LedgerSeq LIMIT 1000") % account.humanAccountID() % maxLedger % minLedger); - Database *db = theApp->getAcctTxnDB()->getDB(); - ScopedLock dbLock = theApp->getAcctTxnDB()->getDBLock(); + Database *db = theApp->getTxnDB()->getDB(); + ScopedLock dbLock = theApp->getTxnDB()->getDBLock(); SQL_FOREACH(db, sql) { diff --git a/src/SerializedTransaction.h b/src/SerializedTransaction.h index 07b1dca83b..ddb78a6997 100644 --- a/src/SerializedTransaction.h +++ b/src/SerializedTransaction.h @@ -10,6 +10,11 @@ #include "TransactionFormats.h" #include "NewcoinAddress.h" +#define TXN_SQL_NEW 'N' +#define TXN_SQL_CONFLICT 'C' +#define TXN_SQL_HELD 'H' +#define TXN_SQL_VALIDATED 'V' + class SerializedTransaction : public SerializedType { public: