Add WS support for reporting accepted transactions.

This commit is contained in:
Arthur Britto
2012-07-01 00:40:56 -07:00
parent 1804530577
commit 85919c02f3
6 changed files with 100 additions and 32 deletions

View File

@@ -236,11 +236,11 @@ uint256 Ledger::getHash()
void Ledger::saveAcceptedLedger(Ledger::pointer ledger)
{
static boost::format ledgerExists("SELECT LedgerSeq FROM Ledgers where LedgerSeq = '%d';");
static boost::format deleteLedger("DELETE FROM Ledgers WHERE LedgerSeq = '%d';");
static boost::format ledgerExists("SELECT LedgerSeq FROM Ledgers where LedgerSeq = %d;");
static boost::format deleteLedger("DELETE FROM Ledgers WHERE LedgerSeq = %d;");
static boost::format AcctTransExists("SELECT LedgerSeq FROM AccountTransactions WHERE TransId = '%s';");
static boost::format transExists("SELECT Status from Transactions where TransID = '%s';");
static boost::format updateTx("UPDATE Transactions SET LedgerSeq = '%d', Status = '%c' WHERE TransID = '%s';");
static boost::format transExists("SELECT Status FROM Transactions WHERE TransID = '%s';");
static boost::format updateTx("UPDATE Transactions SET LedgerSeq = %d, Status = '%c' WHERE TransID = '%s';");
std::string sql="INSERT INTO Ledgers "
"(LedgerHash,LedgerSeq,PrevHash,TotalCoins,ClosingTime,AccountSetHash,TransSetHash) VALUES ('";
@@ -277,20 +277,12 @@ void Ledger::saveAcceptedLedger(Ledger::pointer ledger)
db->executeSQL("BEGIN TRANSACTION;");
for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag()))
{
SerializedTransaction::pointer txn;
Transaction::pointer iTx = theApp->getMasterTransaction().fetch(item->getTag(), false);
if (!iTx)
{
SerializerIterator sit(item->peekSerializer());
txn = boost::make_shared<SerializedTransaction>(boost::ref(sit));
}
else
{
iTx->setStatus(COMMITTED, ledger->mLedgerSeq);
txn = iTx->getSTransaction();
}
SerializedTransaction::pointer txn = theApp->getMasterTransaction().fetch(item, false, ledger->mLedgerSeq);
// Make sure transaction is in AccountTransactions.
if (!SQL_EXISTS(db, boost::str(AcctTransExists % item->getTag().GetHex())))
{
// Transaction not in AccountTransactions
std::vector<NewcoinAddress> accts = txn->getAffectedAccounts();
std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES ";
@@ -315,12 +307,21 @@ void Ledger::saveAcceptedLedger(Ledger::pointer ledger)
Log(lsTRACE) << "ActTx: " << sql;
db->executeSQL(sql); // may already be in there
}
if (SQL_EXISTS(db, boost::str(transExists % txn->getTransactionID().GetHex())))
db->executeSQL(boost::str(updateTx % ledger->getLedgerSeq() % TXN_SQL_VALIDATED
{
// In Transactions, update LedgerSeq and Status.
db->executeSQL(boost::str(updateTx
% ledger->getLedgerSeq()
% TXN_SQL_VALIDATED
% txn->getTransactionID().GetHex()));
}
else
{
// Not in Transactions, insert the whole thing..
db->executeSQL(
txn->getSQLInsertHeader() + txn->getSQL(ledger->getLedgerSeq(), TXN_SQL_VALIDATED) + ";");
}
}
db->executeSQL("COMMIT TRANSACTION;");

View File

@@ -765,6 +765,35 @@ void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted)
}
}
}
{
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
bool bAll = !mSubTransaction.empty();
bool bAccounts = !mSubAccountTransaction.empty();
if (bAll || bAccounts)
{
SHAMap& txSet = *lpAccepted->peekTransactionMap();
for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag()))
{
SerializedTransaction::pointer stTxn = theApp->getMasterTransaction().fetch(item, false, 0);
// XXX Need to support other results.
// XXX Need to give failures too.
TransactionEngineResult terResult = terSUCCESS;
if (bAll)
{
pubTransactionAll(lpAccepted, *stTxn, terResult, "accepted");
}
if (bAccounts)
{
pubTransactionAccounts(lpAccepted, *stTxn, terResult, "accepted");
}
}
}
}
}
Json::Value NetworkOPs::transJson(const SerializedTransaction& stTxn, TransactionEngineResult terResult, const std::string& strStatus, int iSeq, const std::string& strType)
@@ -794,21 +823,18 @@ Json::Value NetworkOPs::transJson(const SerializedTransaction& stTxn, Transactio
return jvObj;
}
void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult)
void NetworkOPs::pubTransactionAll(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const char* pState)
{
Json::Value jvObj = transJson(stTxn, terResult, pState, lpCurrent->getLedgerSeq(), "transaction");
BOOST_FOREACH(InfoSub* ispListener, mSubTransaction)
{
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
if (!mSubTransaction.empty())
{
Json::Value jvObj = transJson(stTxn, terResult, "proposed", lpCurrent->getLedgerSeq(), "transaction");
BOOST_FOREACH(InfoSub* ispListener, mSubTransaction)
{
ispListener->send(jvObj);
}
}
ispListener->send(jvObj);
}
}
void NetworkOPs::pubTransactionAccounts(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const char* pState)
{
boost::unordered_set<InfoSub*> usisNotify;
{
@@ -833,7 +859,7 @@ void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const Serializ
if (!usisNotify.empty())
{
Json::Value jvObj = transJson(stTxn, terResult, "proposed", lpCurrent->getLedgerSeq(), "account");
Json::Value jvObj = transJson(stTxn, terResult, pState, lpCurrent->getLedgerSeq(), "account");
BOOST_FOREACH(InfoSub* ispListener, usisNotify)
{
@@ -842,6 +868,21 @@ void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const Serializ
}
}
void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult)
{
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
if (!mSubTransaction.empty())
{
pubTransactionAll(lpCurrent, stTxn, terResult, "proposed");
}
if (!mSubAccountTransaction.empty())
{
pubTransactionAccounts(lpCurrent, stTxn, terResult, "proposed");
}
}
//
// Monitoring
//

View File

@@ -67,6 +67,8 @@ protected:
// subInfoMapType mSubTransactionAccounts;
Json::Value transJson(const SerializedTransaction& stTxn, TransactionEngineResult terResult, const std::string& strStatus, int iSeq, const std::string& strType);
void pubTransactionAll(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const char* pState);
void pubTransactionAccounts(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const char* pState);
public:
NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster);

View File

@@ -27,6 +27,29 @@ Transaction::pointer TransactionMaster::fetch(const uint256& txnID, bool checkDi
if (!txn) return txn;
mCache.canonicalize(txnID, txn);
return txn;
}
SerializedTransaction::pointer TransactionMaster::fetch(const SHAMapItem::pointer& item, bool checkDisk, uint32 uCommitLedger)
{
SerializedTransaction::pointer txn;
Transaction::pointer iTx = theApp->getMasterTransaction().fetch(item->getTag(), false);
if (!iTx)
{
SerializerIterator sit(item->peekSerializer());
txn = boost::make_shared<SerializedTransaction>(boost::ref(sit));
}
else
{
if (uCommitLedger)
iTx->setStatus(COMMITTED, uCommitLedger);
txn = iTx->getSTransaction();
}
return txn;
}

View File

@@ -15,7 +15,8 @@ public:
TransactionMaster();
Transaction::pointer fetch(const uint256&, bool checkDisk);
Transaction::pointer fetch(const uint256&, bool checkDisk);
SerializedTransaction::pointer fetch(const SHAMapItem::pointer& item, bool checkDisk, uint32 uCommitLedger);
// return value: true = we had the transaction already
bool canonicalize(Transaction::pointer& txn, bool maybeNew);

View File

@@ -135,7 +135,7 @@ public:
{
try
{
Log(lsINFO) << "Ws:: Sending '" << strMessage << "'";
// Log(lsINFO) << "Ws:: Sending '" << strMessage << "'";
cpClient->send(strMessage);
}
@@ -149,7 +149,7 @@ public:
{
Json::FastWriter jfwWriter;
Log(lsINFO) << "Ws:: Object '" << jfwWriter.write(jvObj) << "'";
// Log(lsINFO) << "Ws:: Object '" << jfwWriter.write(jvObj) << "'";
send(cpClient, jfwWriter.write(jvObj));
}