From 197a19b996888984018a6bff53058e54b1a50244 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 9 Jun 2012 19:12:06 -0700 Subject: [PATCH 1/3] Beginning transaction persistance. --- src/DBInit.cpp | 5 +---- src/SerializedTransaction.cpp | 30 ++++++++++++++++++++++++++++++ src/SerializedTransaction.h | 8 ++++++++ src/Transaction.cpp | 1 + 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/DBInit.cpp b/src/DBInit.cpp index 0c4826e943..a2b5be277b 100644 --- a/src/DBInit.cpp +++ b/src/DBInit.cpp @@ -7,10 +7,7 @@ const char *TxnDBInit[] = { TransType CHARACTER(24) \ FromAcct CHARACTER(35), \ FromSeq BIGINT UNSIGNED, \ - OtherAcct CHARACTER(40), \ - Amount BIGINT UNSIGNED, \ - FirstSeen TEXT, \ - CommitSeq BIGINT UNSIGNED, \ + LedgerSeq BIGINT UNSIGNED, \ Status CHARACTER(1), \ RawTxn BLOB \ );", diff --git a/src/SerializedTransaction.cpp b/src/SerializedTransaction.cpp index d5b7aac3e1..a5fce40f25 100644 --- a/src/SerializedTransaction.cpp +++ b/src/SerializedTransaction.cpp @@ -1,5 +1,6 @@ #include "SerializedTransaction.h" +#include "Application.h" #include "Log.h" @@ -297,4 +298,33 @@ Json::Value SerializedTransaction::getJson(int options) const ret["Inner"] = mInnerTxn.getJson(options); return ret; } + +std::string SerializedTransaction::getSQLValueHeader() +{ + return "(TransID, TransType, FromAcct, FromSeq, CommitSeq, Status, RawTxn)"; +} + +std::string SerializedTransaction::getSQLInsertHeader() +{ + return "INSERT INTO Transactions " + getSQLValueHeader() + " VALUES "; +} + +std::string SerializedTransaction::getSQL(uint32 inLedger, char status) const +{ + Serializer s; + add(s); + return getSQL(s, inLedger, status); +} + +std::string SerializedTransaction::getSQL(Serializer rawTxn, uint32 inLedger, char status) const +{ + std::string rTxn; + theApp->getTxnDB()->getDB()->escape( + reinterpret_cast(rawTxn.getDataPtr()), rawTxn.getLength(), rTxn); + return str(boost::format("('%s', '%s', '%s', %d, %d, %c, '%s')") + % getTransactionID().GetHex() % getTransactionType() % getSourceAccount().humanAccountID() + % getSequence() % inLedger % status % rTxn); +} + + // vim:ts=4 diff --git a/src/SerializedTransaction.h b/src/SerializedTransaction.h index 024bbfa548..07b1dca83b 100644 --- a/src/SerializedTransaction.h +++ b/src/SerializedTransaction.h @@ -116,6 +116,14 @@ public: bool sign(const NewcoinAddress& naAccountPrivate); bool checkSign(const NewcoinAddress& naAccountPublic) const; + + // SQL Functions + static std::string getSQLValueHeader(); + static std::string getSQLInsertHeader(); + std::string getSQL(std::string& sql, uint32 inLedger, char status) const; + std::string getSQL(uint32 inLedger, char status) const; + std::string getSQL(Serializer rawTxn, uint32 inLedger, char status) const; + }; #endif diff --git a/src/Transaction.cpp b/src/Transaction.cpp index 328a7aa6fc..a724516a16 100644 --- a/src/Transaction.cpp +++ b/src/Transaction.cpp @@ -510,6 +510,7 @@ void Transaction::saveTransaction(Transaction::pointer txn) bool Transaction::save() const { // This code needs to be fixed to support new-style transactions - FIXME + // This code is going away. It will be handled from SerializedTransaction #if 0 // Identify minimums fields to write for now. // Also maybe write effected accounts for use later. From 16f68296c4d49e6a9d705d7ef1939b93c22ec02d Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 9 Jun 2012 19:13:09 -0700 Subject: [PATCH 2/3] Avoid warnings. --- src/RPCServer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/RPCServer.cpp b/src/RPCServer.cpp index a641cf7728..a2f161e693 100644 --- a/src/RPCServer.cpp +++ b/src/RPCServer.cpp @@ -517,7 +517,7 @@ Json::Value RPCServer::doAccountInfo(Json::Value ¶ms) // account_lines || [] Json::Value RPCServer::doAccountLines(Json::Value ¶ms) { - uint256 uClosed = mNetOps->getClosedLedger(); +// uint256 uClosed = mNetOps->getClosedLedger(); uint256 uCurrent = mNetOps->getCurrentLedger(); std::string strIdent = params[0u].asString(); From 2d49cacd2961efe429bbd8ed24f1af95dabb6d85 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 9 Jun 2012 22:01:31 -0700 Subject: [PATCH 3/3] Move all SQL operations on ledger close into the ledger code so we can accept ledgers without having to participate in the consensus process. We'll need this when we implement "catch up". Move AcctTx into the same connection as Txn so they can be part of a single transaction. Dispatch ledger accept synchronization functions into a detached thread so it doesn't stall our I/O engine. --- src/Application.cpp | 9 +++------ src/Application.h | 3 +-- src/DBInit.cpp | 9 ++------- src/Ledger.cpp | 36 ++++++++++++++++++++++++++++++++++++ src/LedgerConsensus.cpp | 34 ---------------------------------- src/LedgerHistory.cpp | 4 +++- src/NetworkOPs.cpp | 4 ++-- src/SerializedTransaction.h | 5 +++++ 8 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/Application.cpp b/src/Application.cpp index 97e0f47d8c..2f05c39440 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -35,16 +35,15 @@ DatabaseCon::~DatabaseCon() Application::Application() : mUNL(mIOService), mNetOps(mIOService, &mMasterLedger), mNodeCache(16384, 600), - mTxnDB(NULL), mAcctTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), + mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL) { RAND_bytes(mNonce256.begin(), mNonce256.size()); RAND_bytes(reinterpret_cast(&mNonceST), sizeof(mNonceST)); } -extern const char *AcctTxnDBInit[], *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], - *HashNodeDBInit[], *NetNodeDBInit[]; -extern int TxnDBCount, AcctTxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBCount; +extern const char *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], *HashNodeDBInit[], *NetNodeDBInit[]; +extern int TxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount, NetNodeDBCount; void Application::stop() { @@ -61,7 +60,6 @@ void Application::run() // Construct databases. // mTxnDB = new DatabaseCon("transaction.db", TxnDBInit, TxnDBCount); - mAcctTxnDB = new DatabaseCon("accttx.db", AcctTxnDBInit, AcctTxnDBCount); mLedgerDB = new DatabaseCon("ledger.db", LedgerDBInit, LedgerDBCount); mWalletDB = new DatabaseCon("wallet.db", WalletDBInit, WalletDBCount); mHashNodeDB = new DatabaseCon("hashnode.db", HashNodeDBInit, HashNodeDBCount); @@ -140,7 +138,6 @@ void Application::run() Application::~Application() { delete mTxnDB; - delete mAcctTxnDB; delete mLedgerDB; delete mWalletDB; delete mHashNodeDB; diff --git a/src/Application.h b/src/Application.h index 546d0aad51..1771aa63e4 100644 --- a/src/Application.h +++ b/src/Application.h @@ -45,7 +45,7 @@ class Application NetworkOPs mNetOps; NodeCache mNodeCache; - DatabaseCon *mTxnDB, *mAcctTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; + DatabaseCon *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; ConnectionPool mConnectionPool; PeerDoor* mPeerDoor; @@ -76,7 +76,6 @@ public: NodeCache& getNodeCache() { return mNodeCache; } DatabaseCon* getTxnDB() { return mTxnDB; } - DatabaseCon* getAcctTxnDB() { return mAcctTxnDB; } DatabaseCon* getLedgerDB() { return mLedgerDB; } DatabaseCon* getWalletDB() { return mWalletDB; } DatabaseCon* getHashNodeDB() { return mHashNodeDB; } diff --git a/src/DBInit.cpp b/src/DBInit.cpp index a2b5be277b..66a87f8af3 100644 --- a/src/DBInit.cpp +++ b/src/DBInit.cpp @@ -14,12 +14,7 @@ const char *TxnDBInit[] = { "CREATE TABLE PubKeys ( \ ID CHARACTER(35) PRIMARY KEY, \ PubKey BLOB \ - );" -}; - -int TxnDBCount = sizeof(TxnDBInit) / sizeof(const char *); - -const char *AcctTxnDBInit[] = { + );", "CREATE TABLE AccountTransactions ( \ TransID CHARACTER(64), \ Account CHARACTER(64), \ @@ -29,7 +24,7 @@ const char *AcctTxnDBInit[] = { AccountTransactions(Account, LedgerSeq, TransID);" }; -int AcctTxnDBCount = sizeof(AcctTxnDBInit) / sizeof(const char *); +int TxnDBCount = sizeof(TxnDBInit) / sizeof(const char *); // Ledger database holds ledgers and ledger confirmations const char *LedgerDBInit[] = { diff --git a/src/Ledger.cpp b/src/Ledger.cpp index a77b0e649c..278fa7c221 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -16,6 +16,7 @@ #include "Wallet.h" #include "BinaryFormats.h" #include "LedgerTiming.h" +#include "Log.h" Ledger::Ledger(const NewcoinAddress& masterID, uint64 startAmount) : mTotCoins(startAmount), mCloseTime(0), mLedgerSeq(0), mLedgerInterval(LEDGER_INTERVAL), mClosed(false), mValidHash(false), @@ -261,6 +262,41 @@ void Ledger::saveAcceptedLedger(Ledger::pointer ledger) while(ledger->mAccountStateMap->flushDirty(64, ACCOUNT_NODE, ledger->mLedgerSeq)) { ; } + SHAMap& txSet = *ledger->peekAccountStateMap(); + Database *db = theApp->getTxnDB()->getDB(); + ScopedLock dbLock = theApp->getTxnDB()->getDBLock(); + db->executeSQL("BEGIN TRANSACTION;"); + for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag())) + { + SerializerIterator sit(item->peekSerializer()); + SerializedTransaction txn(sit); + std::vector accts = txn.getAffectedAccounts(); + + std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES "; + bool first = true; + for (std::vector::iterator it = accts.begin(), end = accts.end(); it != end; ++it) + { + if (!first) + sql += ", ('"; + else + { + sql += "('"; + first = false; + } + sql += txn.getTransactionID().GetHex(); + sql += "','"; + sql += it->humanAccountID(); + sql += "',"; + sql += boost::lexical_cast(ledger->getLedgerSeq()); + sql += ")"; + } + sql += ";"; + Log(lsTRACE) << "ActTx: " << sql; + db->executeSQL(sql); + db->executeSQL(txn.getSQLInsertHeader() + txn.getSQL(ledger->getLedgerSeq(), TXN_SQL_VALIDATED) + ";"); + // FIXME: If above updates no rows, modify seq/status + } + db->executeSQL("COMMIT TRANSACTION;"); } Ledger::pointer Ledger::getSQL(const std::string& sql) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index fa2ecb1896..0fcc1ee132 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -727,40 +727,6 @@ void LedgerConsensus::accept(SHAMap::pointer set) theApp->getConnectionPool().relayMessage(NULL, boost::make_shared(val, newcoin::mtVALIDATION)); Log(lsINFO) << "Validation sent " << newLCL->getHash().GetHex(); statusChange(newcoin::neACCEPTED_LEDGER, newOL); - - // Insert the transactions in set into the AcctTxn database - Database *db = theApp->getAcctTxnDB()->getDB(); - ScopedLock dbLock = theApp->getAcctTxnDB()->getDBLock(); - db->executeSQL("BEGIN TRANSACTION;"); - for (SHAMapItem::pointer item = set->peekFirstItem(); !!item; item = set->peekNextItem(item->getTag())) - { - SerializerIterator sit(item->peekSerializer()); - SerializedTransaction txn(sit); - std::vector accts = txn.getAffectedAccounts(); - - std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES "; - bool first = true; - for (std::vector::iterator it = accts.begin(), end = accts.end(); it != end; ++it) - { - if (!first) - sql += ", ('"; - else - { - sql += "('"; - first = false; - } - sql += txn.getTransactionID().GetHex(); - sql += "','"; - sql += it->humanAccountID(); - sql += "',"; - sql += boost::lexical_cast(newLedgerSeq); - sql += ")"; - } - sql += ";"; - Log(lsTRACE) << "ActTx: " << sql; - db->executeSQL(sql); - } - db->executeSQL("COMMIT TRANSACTION;"); } void LedgerConsensus::endConsensus() diff --git a/src/LedgerHistory.cpp b/src/LedgerHistory.cpp index 3ea397d83a..ad357139ab 100644 --- a/src/LedgerHistory.cpp +++ b/src/LedgerHistory.cpp @@ -2,6 +2,7 @@ #include #include +#include #include "LedgerHistory.h" #include "Config.h" @@ -32,7 +33,8 @@ void LedgerHistory::addAcceptedLedger(Ledger::pointer ledger) boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex()); mLedgersByHash.canonicalize(h, ledger); mLedgersByIndex.insert(std::make_pair(ledger->getLedgerSeq(), ledger)); - theApp->getIOService().post(boost::bind(&Ledger::saveAcceptedLedger, ledger)); + boost::thread thread(boost::bind(&Ledger::saveAcceptedLedger, ledger)); + thread.detach(); } Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 0efcc2b30b..db14bbf94c 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -542,8 +542,8 @@ std::vector< std::pair > " WHERE Account = '%s' AND LedgerSeq <= '%d' AND LedgerSeq >= '%d' ORDER BY LedgerSeq LIMIT 1000") % account.humanAccountID() % maxLedger % minLedger); - Database *db = theApp->getAcctTxnDB()->getDB(); - ScopedLock dbLock = theApp->getAcctTxnDB()->getDBLock(); + Database *db = theApp->getTxnDB()->getDB(); + ScopedLock dbLock = theApp->getTxnDB()->getDBLock(); SQL_FOREACH(db, sql) { diff --git a/src/SerializedTransaction.h b/src/SerializedTransaction.h index 07b1dca83b..ddb78a6997 100644 --- a/src/SerializedTransaction.h +++ b/src/SerializedTransaction.h @@ -10,6 +10,11 @@ #include "TransactionFormats.h" #include "NewcoinAddress.h" +#define TXN_SQL_NEW 'N' +#define TXN_SQL_CONFLICT 'C' +#define TXN_SQL_HELD 'H' +#define TXN_SQL_VALIDATED 'V' + class SerializedTransaction : public SerializedType { public: