diff --git a/src/HashedObject.cpp b/src/HashedObject.cpp index 837a518a6..c1df0b3fa 100644 --- a/src/HashedObject.cpp +++ b/src/HashedObject.cpp @@ -36,7 +36,7 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index, if (!mCache.canonicalize(hash, object)) { // cLog(lsTRACE) << "Queuing write for " << hash; - boost::recursive_mutex::scoped_lock sl(mWriteMutex); + boost::mutex::scoped_lock sl(mWriteMutex); mWriteSet.push_back(object); if (!mWritePending) { @@ -50,6 +50,13 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index, return true; } +void HashedObjectStore::waitWrite() +{ + boost::unique_lock sl(mWriteMutex); + while (mWritePending) + mWriteCondition.wait(sl); +} + void HashedObjectStore::bulkWrite() { std::vector< boost::shared_ptr > set; @@ -59,11 +66,12 @@ void HashedObjectStore::bulkWrite() set.reserve(128); { - boost::recursive_mutex::scoped_lock sl(mWriteMutex); + boost::unique_lock sl(mWriteMutex); mWriteSet.swap(set); if (set.empty()) { mWritePending = false; + mWriteCondition.notify_all(); return; } } diff --git a/src/HashedObject.h b/src/HashedObject.h index f403d5042..5178af089 100644 --- a/src/HashedObject.h +++ b/src/HashedObject.h @@ -3,6 +3,9 @@ #include +#include +#include + #include "types.h" #include "uint256.h" #include "ScopedLock.h" @@ -41,7 +44,9 @@ class HashedObjectStore protected: TaggedCache mCache; - boost::recursive_mutex mWriteMutex; + boost::mutex mWriteMutex; + boost::condition_variable mWriteCondition; + std::vector< boost::shared_ptr > mWriteSet; bool mWritePending; @@ -55,6 +60,7 @@ public: HashedObject::pointer retrieve(const uint256& hash); void bulkWrite(); + void waitWrite(); }; #endif diff --git a/src/Ledger.cpp b/src/Ledger.cpp index b21958e58..ceabf6048 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -363,13 +363,9 @@ void Ledger::saveAcceptedLedger() { ScopedLock sl(theApp->getLedgerDB()->getDBLock()); + if (SQL_EXISTS(theApp->getLedgerDB()->getDB(), boost::str(ledgerExists % mLedgerSeq))) theApp->getLedgerDB()->getDB()->executeSQL(boost::str(deleteLedger % mLedgerSeq)); - theApp->getLedgerDB()->getDB()->executeSQL(boost::str(addLedger % - getHash().GetHex() % mLedgerSeq % mParentHash.GetHex() % - boost::lexical_cast(mTotCoins) % mCloseTime % mParentCloseTime % - mCloseResolution % mCloseFlags % - mAccountHash.GetHex() % mTransHash.GetHex())); // write out dirty nodes int fc; @@ -435,6 +431,13 @@ void Ledger::saveAcceptedLedger() } } db->executeSQL("COMMIT TRANSACTION;"); + + theApp->getHashedObjectStore().waitWrite(); // wait until all nodes are written + theApp->getLedgerDB()->getDB()->executeSQL(boost::str(addLedger % + getHash().GetHex() % mLedgerSeq % mParentHash.GetHex() % + boost::lexical_cast(mTotCoins) % mCloseTime % mParentCloseTime % + mCloseResolution % mCloseFlags % + mAccountHash.GetHex() % mTransHash.GetHex())); } theApp->getOPs().pubLedger(shared_from_this()); diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 83dcbefef..6f4d0aac0 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -259,7 +259,7 @@ void LedgerConsensus::checkLCL() uint256 netLgr = mPrevLedgerHash; int netLgrCount = 0; - uint256 favoredLedger = (mState == lcsPRE_CLOSE) ? uint256() : mPrevLedgerHash; // Don't get stuck one ledger behind + uint256 favoredLedger = (mState == lcsPRE_CLOSE) ? uint256() : mPrevLedgerHash; // Don't get stuck one ledger back boost::unordered_map vals = theApp->getValidations().getCurrentValidations(favoredLedger); @@ -287,15 +287,18 @@ void LedgerConsensus::checkLCL() << status << ", " << (mHaveCorrectLCL ? "CorrectLCL" : "IncorrectLCL"); cLog(lsWARNING) << mPrevLedgerHash << " to " << netLgr; -#ifdef DEBUG - BOOST_FOREACH(u256_cvc_pair& it, vals) - cLog(lsDEBUG) << "V: " << it.first << ", " << it.second.first; -#endif + if (sLog(lsDEBUG)) + { + BOOST_FOREACH(u256_cvc_pair& it, vals) + cLog(lsDEBUG) << "V: " << it.first << ", " << it.second.first; + } if (mHaveCorrectLCL) theApp->getOPs().consensusViewChange(); handleLCL(netLgr); } + else if (mPreviousLedger->getHash() != mPrevLedgerHash) + handleLCL(netLgr); } void LedgerConsensus::handleLCL(const uint256& lclHash) @@ -356,6 +359,7 @@ void LedgerConsensus::handleLCL(const uint256& lclHash) cLog(lsINFO) << "Acquired the consensus ledger " << mPrevLedgerHash; mHaveCorrectLCL = true; mAcquiringLedger = LedgerAcquire::pointer(); + theApp->getOPs().clearNeedNetworkLedger(); mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution( mPreviousLedger->getCloseResolution(), mPreviousLedger->getCloseAgree(), mPreviousLedger->getLedgerSeq() + 1); @@ -810,7 +814,9 @@ void LedgerConsensus::propose() { cLog(lsTRACE) << "We propose: " << mOurPosition->getCurrentHash(); ripple::TMProposeSet prop; + prop.set_currenttxhash(mOurPosition->getCurrentHash().begin(), 256 / 8); + prop.set_previousledger(mOurPosition->getPrevLedger().begin(), 256 / 8); prop.set_proposeseq(mOurPosition->getProposeSeq()); prop.set_closetime(mOurPosition->getCloseTime()); @@ -958,18 +964,12 @@ void LedgerConsensus::Saccept(boost::shared_ptr This, SHAMap::p This->accept(txSet); } -void LedgerConsensus::deferProposal(const LedgerProposal::pointer& proposal, const NewcoinAddress& peerPublic) -{ - std::list& props = mDeferredProposals[peerPublic.getNodeID()]; - if (props.size() >= (mPreviousProposers + 10)) - props.pop_front(); - props.push_back(proposal); -} - void LedgerConsensus::playbackProposals() { + boost::unordered_map >& storedProposals = theApp->getOPs().peekStoredProposals(); for (boost::unordered_map< uint160, std::list >::iterator - it = mDeferredProposals.begin(), end = mDeferredProposals.end(); it != end; ++it) + it = storedProposals.begin(), end = storedProposals.end(); it != end; ++it) { BOOST_FOREACH(const LedgerProposal::pointer& proposal, it->second) { @@ -978,7 +978,7 @@ void LedgerConsensus::playbackProposals() proposal->setPrevLedger(mPrevLedgerHash); if (proposal->checkSign()) { - cLog(lsINFO) << "Applying deferred proposal"; + cLog(lsINFO) << "Applying stored proposal"; peerPosition(proposal); } } diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index ffe8db5d3..a88aee079 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -113,9 +113,6 @@ protected: // Close time estimates std::map mCloseTimes; - // deferred proposals (node ID -> proposals from that peer) - boost::unordered_map< uint160, std::list > mDeferredProposals; - // nodes that have bowed out of this consensus process boost::unordered_set mDeadNodes; @@ -180,16 +177,12 @@ public: bool haveConsensus(); bool peerPosition(const LedgerProposal::pointer&); - void deferProposal(const LedgerProposal::pointer& proposal, const NewcoinAddress& peerPublic); bool peerHasSet(Peer::ref peer, const uint256& set, ripple::TxSetStatus status); bool peerGaveNodes(Peer::ref peer, const uint256& setHash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); - void swapDefer(boost::unordered_map< uint160, std::list > &n) - { mDeferredProposals.swap(n); } - // test/debug void simulate(); }; diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 1e6050469..477cca227 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -646,8 +646,11 @@ int NetworkOPs::beginConsensus(const uint256& networkClosed, Ledger::pointer clo Ledger::pointer prevLedger = mLedgerMaster->getLedgerByHash(closingLedger->getParentHash()); if (!prevLedger) { // this shouldn't happen unless we jump ledgers - cLog(lsWARNING) << "Don't have LCL, going to tracking"; - setMode(omTRACKING); + if (mMode == omFULL) + { + cLog(lsWARNING) << "Don't have LCL, going to tracking"; + setMode(omTRACKING); + } return 3; } assert(prevLedger->getHash() == closingLedger->getParentHash()); @@ -658,7 +661,6 @@ int NetworkOPs::beginConsensus(const uint256& networkClosed, Ledger::pointer clo prevLedger->setImmutable(); mConsensus = boost::make_shared( networkClosed, prevLedger, mLedgerMaster->getCurrentLedger()->getCloseTimeNC()); - mConsensus->swapDefer(mDeferredProposals); cLog(lsDEBUG) << "Initiating consensus engine"; return mConsensus->startup(); @@ -728,7 +730,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons } if (prevLedger == mConsensus->getLCL()) return mConsensus->peerPosition(proposal); - mConsensus->deferProposal(proposal, nodePublic); + storeProposal(proposal, nodePublic); return false; } @@ -738,7 +740,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons { // Note that if the LCL is different, the signature check will fail cLog(lsWARNING) << "Ledger proposal fails signature check"; proposal->setSignature(signature); - mConsensus->deferProposal(proposal, nodePublic); + storeProposal(proposal, nodePublic); return false; } return mConsensus->peerPosition(proposal); @@ -785,7 +787,6 @@ void NetworkOPs::endConsensus(bool correctLCL) cLog(lsTRACE) << "Killing obsolete peer status"; it->cycleStatus(); } - mConsensus->swapDefer(mDeferredProposals); mConsensus = boost::shared_ptr(); } @@ -874,8 +875,13 @@ Json::Value NetworkOPs::getServerInfo() default: info["serverState"] = "unknown"; } - if (!theConfig.VALIDATION_SEED.isValid()) info["serverState"] = "none"; - else info["validationPKey"] = NewcoinAddress::createNodePublic(theConfig.VALIDATION_SEED).humanNodePublic(); + if (!theConfig.VALIDATION_SEED.isValid()) + info["serverState"] = "none"; + else + info["validationPKey"] = NewcoinAddress::createNodePublic(theConfig.VALIDATION_SEED).humanNodePublic(); + + if (mNeedNetworkLedger) + info["networkLedger"] = "waiting"; Json::Value lastClose = Json::objectValue; lastClose["proposers"] = theApp->getOPs().getPreviousProposers(); @@ -992,12 +998,12 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted) if (bAll) { - pubTransactionAll(lpAccepted, *stTxn, terResult, "closed"); + pubTransactionAll(lpAccepted, *stTxn, terResult, true); } if (bAccounts) { - pubTransactionAccounts(lpAccepted, *stTxn, terResult, "closed"); + pubTransactionAccounts(lpAccepted, *stTxn, terResult, true); } } } @@ -1099,12 +1105,12 @@ void NetworkOPs::pubTransaction(Ledger::ref lpCurrent, const SerializedTransacti if (!mSubTransaction.empty()) { - pubTransactionAll(lpCurrent, stTxn, terResult, "proposed"); + pubTransactionAll(lpCurrent, stTxn, terResult, false); } if (!mSubAccountTransaction.empty()) { - pubTransactionAccounts(lpCurrent, stTxn, terResult, "proposed"); + pubTransactionAccounts(lpCurrent, stTxn, terResult, false); } } @@ -1244,6 +1250,14 @@ uint32 NetworkOPs::acceptLedger() return mLedgerMaster->getCurrentLedger()->getLedgerSeq(); } +void NetworkOPs::storeProposal(const LedgerProposal::pointer& proposal, const NewcoinAddress& peerPublic) +{ + std::list& props = mStoredProposals[peerPublic.getNodeID()]; + if (props.size() >= (mLastCloseProposers + 10)) + props.pop_front(); + props.push_back(proposal); +} + #if 0 void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash) { diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 931364054..cd2e8bede 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -57,7 +57,7 @@ protected: boost::asio::deadline_timer mNetTimer; boost::shared_ptr mConsensus; boost::unordered_map > mDeferredProposals; + std::list > mStoredProposals; LedgerMaster* mLedgerMaster; LedgerAcquire::pointer mAcquiringLedger; @@ -186,6 +186,8 @@ public: void setStateTimer(); void newLCL(int proposers, int convergeTime, const uint256& ledgerHash); void needNetworkLedger() { mNeedNetworkLedger = true; } + void clearNeedNetworkLedger() { mNeedNetworkLedger = false; } + bool isNeedNetworkLedger() { return mNeedNetworkLedger; } void consensusViewChange(); int getPreviousProposers() { return mLastCloseProposers; } int getPreviousConvergeTime() { return mLastCloseConvergeTime; } @@ -193,6 +195,9 @@ public: void setLastCloseTime(uint32 t) { mLastCloseTime = t; } Json::Value getServerInfo(); uint32 acceptLedger(); + boost::unordered_map >& peekStoredProposals() { return mStoredProposals; } + void storeProposal(const LedgerProposal::pointer& proposal, const NewcoinAddress& peerPublic); // client information retrieval functions std::vector< std::pair > diff --git a/src/SerializeProto.h b/src/SerializeProto.h index df7d2f6d5..e3b594db7 100644 --- a/src/SerializeProto.h +++ b/src/SerializeProto.h @@ -25,6 +25,7 @@ // 8-bit integers FIELD(CloseResolution, UINT8, 1) FIELD(TemplateEntryType, UINT8, 2) + FIELD(TransactionResult, UINT8, 3) // 16-bit integers FIELD(LedgerEntryType, UINT16, 1) @@ -92,8 +93,14 @@ FIELD(SendMax, AMOUNT, 9) // currency amount (uncommon) - FIELD(MinimumOffer, AMOUNT, 16) - FIELD(RippleEscrow, AMOUNT, 17) + FIELD(MinimumOffer, AMOUNT, 16) + FIELD(RippleEscrow, AMOUNT, 17) + FIELD(PreviousBalance, AMOUNT, 18) + FIELD(FinalBalance, AMOUNT, 19) + FIELD(PreviousTakerPays, AMOUNT, 20) + FIELD(PreviousTakerGets, AMOUNT, 21) + FIELD(FinalTakerPays, AMOUNT, 22) + FIELD(FinalTakerGets, AMOUNT, 23) // variable length FIELD(PublicKey, VL, 1) @@ -116,6 +123,11 @@ FIELD(Target, ACCOUNT, 7) FIELD(AuthorizedKey, ACCOUNT, 8) + // account (uncommon) + FIELD(PreviousAccount, ACCOUNT, 16) + FIELD(LowID, ACCOUNT, 17) + FIELD(HighID, ACCOUNT, 18) + // path set FIELD(Paths, PATHSET, 1) @@ -125,9 +137,13 @@ // inner object // OBJECT/1 is reserved for end of object FIELD(TemplateEntry, OBJECT, 1) + FIELD(CreatedNode, OBJECT, 2) + FIELD(DeletedNode, OBJECT, 3) + FIELD(ModifiedNode, OBJECT, 4) // array of objects // ARRAY/1 is reserved for end of array + FIELD(TransactionMetaData, ARRAY, 1) FIELD(SigningAccounts, ARRAY, 2) FIELD(TxnSignatures, ARRAY, 3) FIELD(Signatures, ARRAY, 4) diff --git a/src/SerializedObject.cpp b/src/SerializedObject.cpp index 678b1add3..c8d243952 100644 --- a/src/SerializedObject.cpp +++ b/src/SerializedObject.cpp @@ -866,6 +866,11 @@ STArray* STArray::construct(SerializerIterator& sit, SField::ref field) return new STArray(field, value); } +void STArray::sort(bool (*compare)(const STObject&, const STObject&)) +{ + std::sort(value.begin(), value.end(), compare); +} + std::auto_ptr STObject::parseJson(const Json::Value& object, SField::ref inName, int depth) { if (!object.isObject()) diff --git a/src/SerializedObject.h b/src/SerializedObject.h index 7ecd539eb..f2c389f09 100644 --- a/src/SerializedObject.h +++ b/src/SerializedObject.h @@ -152,9 +152,9 @@ class STArray : public SerializedType { public: typedef std::vector vector; - typedef std::vector::iterator iterator; + typedef std::vector::iterator iterator; typedef std::vector::const_iterator const_iterator; - typedef std::vector::reverse_iterator reverse_iterator; + typedef std::vector::reverse_iterator reverse_iterator; typedef std::vector::const_reverse_iterator const_reverse_iterator; typedef std::vector::size_type size_type; @@ -205,6 +205,8 @@ public: virtual Json::Value getJson(int) const; virtual void add(Serializer& s) const; + void sort(bool (*compare)(const STObject& o1, const STObject& o2)); + bool operator==(const STArray &s) { return value == s.value; } bool operator!=(const STArray &s) { return value != s.value; } diff --git a/src/TransactionErr.h b/src/TransactionErr.h index 97a9b1525..652a23a21 100644 --- a/src/TransactionErr.h +++ b/src/TransactionErr.h @@ -108,11 +108,12 @@ enum TER // aka TransactionEngineResult tepPATH_PARTIAL, }; +#define isTelLocal(x) ((x) >= telLOCAL_ERROR && (x) < temMALFORMED) #define isTemMalformed(x) ((x) >= temMALFORMED && (x) < tefFAILURE) #define isTefFailure(x) ((x) >= tefFAILURE && (x) < terRETRY) -#define isTepPartial(x) ((x) >= tepPATH_PARTIAL) -#define isTepSuccess(x) ((x) >= tesSUCCESS) #define isTerRetry(x) ((x) >= terRETRY && (x) < tesSUCCESS) +#define isTepSuccess(x) ((x) >= tesSUCCESS) +#define isTepPartial(x) ((x) >= tepPATH_PARTIAL) bool transResultInfo(TER terCode, std::string& strToken, std::string& strHuman); std::string transToken(TER terCode); diff --git a/src/TransactionMeta.cpp b/src/TransactionMeta.cpp index 3e0cd4035..d1372ebdc 100644 --- a/src/TransactionMeta.cpp +++ b/src/TransactionMeta.cpp @@ -280,7 +280,7 @@ Json::Value TransactionMetaSet::getJson(int v) const { Json::Value ret = Json::objectValue; - ret["transaction_id"] = mTransactionID.GetHex(); + ret["hash"] = mTransactionID.GetHex(); ret["ledger"] = mLedger; Json::Value e = Json::arrayValue; @@ -330,3 +330,4 @@ void TransactionMetaSet::swap(TransactionMetaSet& s) mNodes.swap(s.mNodes); } +// vim:ts=4 diff --git a/src/WSDoor.cpp b/src/WSDoor.cpp index d0c23e2c7..0085026b4 100644 --- a/src/WSDoor.cpp +++ b/src/WSDoor.cpp @@ -17,6 +17,8 @@ #include "../json/reader.h" #include "../json/writer.h" +SETUP_LOG(); + // // This is a light weight, untrusted interface for web clients. // For now we don't provide proof. Later we will. @@ -145,7 +147,7 @@ public: { try { - // Log(lsINFO) << "Ws:: Sending '" << strMessage << "'"; + cLog(lsDEBUG) << "Ws:: Sending '" << strMessage << "'"; cpClient->send(strMessage); } @@ -159,7 +161,7 @@ public: { Json::FastWriter jfwWriter; - // Log(lsINFO) << "Ws:: Object '" << jfwWriter.write(jvObj) << "'"; + cLog(lsDEBUG) << "Ws:: Object '" << jfwWriter.write(jvObj) << "'"; send(cpClient, jfwWriter.write(jvObj)); }