diff --git a/js/events.js b/js/events.js new file mode 100644 index 000000000..7a2098f98 --- /dev/null +++ b/js/events.js @@ -0,0 +1,57 @@ +var EventEmitter = function () { + this._events = {}; +}; + +EventEmitter.prototype.on = function (e, f) { + console.log('on', e, f) + if (e in this._events) { + if (this._events[e].indexOf(f) < 0) { + this._events[e].push(f); + } + } else { + this._events[e] = [f]; + } + return this; +}; + +EventEmitter.prototype.off = function (e, f) { + if (f) { + function eq(x) { return function (y) { return x === y; } } + this._events[e] = this.listeners(e).filter(eq(f)); + } else { + delete this._events[e]; + } +}; + +EventEmitter.prototype.removeListener = function (e, f) { + this.off(e, f); +}; + +EventEmitter.prototype.removeAllListeners = function (e) { + this.off(e); +}; + +EventEmitter.prototype.emit = function (e) { + var args = Array.prototype.slice.call(arguments, 1), + fs = this.listeners(e); + console.log('emit', e, args) + + for (var i = 0; i < fs.length; i++) { + fs[i].apply(e, args); + } +}; + +EventEmitter.prototype.listeners = function (e) { + return this._events[e] || []; +}; + +EventEmitter.prototype.once = function (e, f) { + var that = this; + this.on(e, function g() { + f.apply(e, arguments); + that.off(e, g); + }); + return this; +}; + +exports.EventEmitter = EventEmitter; \ No newline at end of file diff --git a/src/Application.h b/src/Application.h index fca7a5301..da956fae2 100644 --- a/src/Application.h +++ b/src/Application.h @@ -102,6 +102,7 @@ public: bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } bool isNew(const uint256& s, uint64 p) { return mSuppressions.addSuppressionPeer(s, p); } + bool isNew(const uint256& s, uint64 p, int& f) { return mSuppressions.addSuppressionPeer(s, p, f); } bool isNewFlag(const uint256& s, int f) { return mSuppressions.setFlag(s, f); } bool running() { return mTxnDB != NULL; } bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); } diff --git a/src/CanonicalTXSet.h b/src/CanonicalTXSet.h index c75fb6b1b..fead540db 100644 --- a/src/CanonicalTXSet.h +++ b/src/CanonicalTXSet.h @@ -39,6 +39,13 @@ public: CanonicalTXSet(const uint256& lclHash) : mSetHash(lclHash) { ; } void push_back(SerializedTransaction::ref txn); + + void reset(const uint256& newLCL) + { + mSetHash = newLCL; + mMap.clear(); + } + iterator erase(const iterator& it); iterator begin() { return mMap.begin(); } diff --git a/src/HashedObject.cpp b/src/HashedObject.cpp index 9fc95a825..ee992abdb 100644 --- a/src/HashedObject.cpp +++ b/src/HashedObject.cpp @@ -21,7 +21,6 @@ HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) : bool HashedObjectStore::store(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash) { // return: false = already in cache, true = added to cache - assert(hash == Serializer::getSHA512Half(data)); if (!theApp->getHashNodeDB()) { cLog(lsTRACE) << "HOS: no db"; @@ -32,6 +31,7 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index, cLog(lsTRACE) << "HOS: " << hash << " store: incache"; return false; } + assert(hash == Serializer::getSHA512Half(data)); HashedObject::pointer object = boost::make_shared(type, index, data, hash); if (!mCache.canonicalize(hash, object)) diff --git a/src/LedgerAcquire.cpp b/src/LedgerAcquire.cpp index f9d9f1bef..50497b3cd 100644 --- a/src/LedgerAcquire.cpp +++ b/src/LedgerAcquire.cpp @@ -170,7 +170,7 @@ void LedgerAcquire::addPeers() if (!found) { BOOST_FOREACH(Peer::ref peer, peerList) - peerHas(peer); + peerHas(peer); } } diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index fa52fddbb..48939b5cd 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -35,16 +35,43 @@ void TransactionAcquire::done() { if (mFailed) { - cLog(lsWARNING) << "Failed to acquire TXs " << mHash; + cLog(lsWARNING) << "Failed to acquire TX set " << mHash; theApp->getOPs().mapComplete(mHash, SHAMap::pointer()); } else { + cLog(lsINFO) << "Acquired TX set " << mHash; mMap->setImmutable(); theApp->getOPs().mapComplete(mHash, mMap); } } +void TransactionAcquire::onTimer() +{ + if (!getPeerCount()) + { // out of peers + cLog(lsWARNING) << "Out of peers for TX set " << getHash(); + + bool found = false; + std::vector peerList = theApp->getConnectionPool().getPeerVector(); + BOOST_FOREACH(Peer::ref peer, peerList) + { + if (peer->hasTxSet(getHash())) + { + found = true; + peerHas(peer); + } + } + if (!found) + { + BOOST_FOREACH(Peer::ref peer, peerList) + peerHas(peer); + } + } + else + trigger(Peer::pointer(), true); +} + boost::weak_ptr TransactionAcquire::pmDowncast() { return boost::shared_polymorphic_downcast(shared_from_this()); @@ -59,6 +86,7 @@ void TransactionAcquire::trigger(Peer::ref peer, bool timer) } if (!mHaveRoot) { + cLog(lsTRACE) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer") << " no root"; ripple::TMGetLedger tmGL; tmGL.set_ledgerhash(mHash.begin(), mHash.size()); tmGL.set_itype(ripple::liTS_CANDIDATE); @@ -98,9 +126,15 @@ bool TransactionAcquire::takeNodes(const std::list& nodeIDs, const std::list< std::vector >& data, Peer::ref peer) { if (mComplete) + { + cLog(lsTRACE) << "TX set complete"; return true; + } if (mFailed) + { + cLog(lsTRACE) << "TX set failed"; return false; + } try { std::list::const_iterator nodeIDit = nodeIDs.begin(); @@ -116,12 +150,18 @@ bool TransactionAcquire::takeNodes(const std::list& nodeIDs, return false; } if (!mMap->addRootNode(getHash(), *nodeDatait, snfWIRE, NULL)) + { + cLog(lsWARNING) << "TX acquire got bad root node"; return false; + } else mHaveRoot = true; } else if (!mMap->addKnownNode(*nodeIDit, *nodeDatait, &sf)) + { + cLog(lsWARNING) << "TX acquire got bad non-root node"; return false; + } ++nodeIDit; ++nodeDatait; } @@ -323,7 +363,6 @@ void LedgerConsensus::handleLCL(const uint256& lclHash) mProposing = false; mValidating = false; mPeerPositions.clear(); - mPeerData.clear(); mDisputes.clear(); mCloseTimes.clear(); mDeadNodes.clear(); @@ -537,7 +576,7 @@ void LedgerConsensus::closeLedger() mCloseTime = theApp->getOPs().getCloseTimeNC(); theApp->getOPs().setLastCloseTime(mCloseTime); statusChange(ripple::neCLOSING_LEDGER, *mPreviousLedger); - takeInitialPosition(*theApp->getMasterLedger().closeLedger()); + takeInitialPosition(*theApp->getMasterLedger().closeLedger(true)); } void LedgerConsensus::stateEstablish() @@ -842,7 +881,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec { boost::unordered_map::const_iterator cit = mAcquired.find(pit.second->getCurrentHash()); - if (cit != mAcquired.end() && cit->second) + if ((cit != mAcquired.end()) && cit->second) txn->setVote(pit.first, cit->second->hasItem(txID)); } @@ -927,7 +966,10 @@ bool LedgerConsensus::peerGaveNodes(Peer::ref peer, const uint256& setHash, { boost::unordered_map::iterator acq = mAcquiring.find(setHash); if (acq == mAcquiring.end()) + { + cLog(lsINFO) << "Got TX data for set not acquiring: " << setHash; return false; + } TransactionAcquire::pointer set = acq->second; // We must keep the set around during the function return set->takeNodes(nodeIDs, nodeData, peer); } @@ -956,6 +998,7 @@ void LedgerConsensus::playbackProposals() for (boost::unordered_map< uint160, std::list >::iterator it = storedProposals.begin(), end = storedProposals.end(); it != end; ++it) { + bool relay = false; BOOST_FOREACH(const LedgerProposal::pointer& proposal, it->second) { if (proposal->hasSignature()) @@ -964,11 +1007,33 @@ void LedgerConsensus::playbackProposals() if (proposal->checkSign()) { cLog(lsINFO) << "Applying stored proposal"; - peerPosition(proposal); + relay = peerPosition(proposal); } } else if (proposal->isPrevLedger(mPrevLedgerHash)) - peerPosition(proposal); + relay = peerPosition(proposal); + + if (relay) + { + cLog(lsWARNING) << "We should do delayed relay of this proposal, but we cannot"; + } +#if 0 // FIXME: We can't do delayed relay because we don't have the signature + std::set peers + if (relay && theApp->getSuppression().swapSet(proposal.getSuppress(), set, SF_RELAYED)) + { + cLog(lsDEBUG) << "Stored proposal delayed relay"; + ripple::TMProposeSet set; + set.set_proposeseq + set.set_currenttxhash(, 256 / 8); + previousledger + closetime + nodepubkey + signature + PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); + } +#endif + } } } diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 6eae25750..8ac72634a 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -30,7 +30,7 @@ protected: SHAMap::pointer mMap; bool mHaveRoot; - void onTimer() { trigger(Peer::pointer(), true); } + void onTimer(); void newPeer(Peer::ref peer) { trigger(peer, false); } void done(); diff --git a/src/LedgerEntrySet.cpp b/src/LedgerEntrySet.cpp index 3224f07eb..fa3872ea9 100644 --- a/src/LedgerEntrySet.cpp +++ b/src/LedgerEntrySet.cpp @@ -447,9 +447,8 @@ void LedgerEntrySet::calcRawMeta(Serializer& s, TER result) it != end; ++it) entryModify(it->second); - cLog(lsTRACE) << "Metadata:" << mSet.getJson(0); - mSet.addRaw(s, result); + cLog(lsTRACE) << "Metadata:" << mSet.getJson(0); } // <-- uNodeDir: For deletion, present to make dirDelete efficient. diff --git a/src/LedgerMaster.cpp b/src/LedgerMaster.cpp index aa46d42e3..6c3a2ebc9 100644 --- a/src/LedgerMaster.cpp +++ b/src/LedgerMaster.cpp @@ -14,10 +14,10 @@ uint32 LedgerMaster::getCurrentLedgerIndex() return mCurrentLedger->getLedgerSeq(); } -bool LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction) +void LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction) { // returns true if transaction was added boost::recursive_mutex::scoped_lock ml(mLock); - return mHeldTransactionsByID.insert(std::make_pair(transaction->getID(), transaction)).second; + mHeldTransactions.push_back(transaction->getSTransaction()); } void LedgerMaster::pushLedger(Ledger::ref newLedger) @@ -79,12 +79,34 @@ void LedgerMaster::storeLedger(Ledger::ref ledger) } -Ledger::pointer LedgerMaster::closeLedger() +Ledger::pointer LedgerMaster::closeLedger(bool recover) { boost::recursive_mutex::scoped_lock sl(mLock); Ledger::pointer closingLedger = mCurrentLedger; + + if (recover) + { + int recovers = 0; + for (CanonicalTXSet::iterator it = mHeldTransactions.begin(), end = mHeldTransactions.end(); it != end; ++it) + { + try + { + TER result = mEngine.applyTransaction(*it->second, tapOPEN_LEDGER); + if (isTepSuccess(result)) + ++recovers; + } + catch (...) + { + cLog(lsWARNING) << "Held transaction throws"; + } + } + tLog(recovers != 0, lsINFO) << "Recovered " << recovers << " held transactions"; + mHeldTransactions.reset(closingLedger->getHash()); + } + mCurrentLedger = boost::make_shared(boost::ref(*closingLedger), true); mEngine.setLedger(mCurrentLedger); + return closingLedger; } diff --git a/src/LedgerMaster.h b/src/LedgerMaster.h index e3270710e..152952b50 100644 --- a/src/LedgerMaster.h +++ b/src/LedgerMaster.h @@ -9,6 +9,7 @@ #include "Transaction.h" #include "TransactionEngine.h" #include "RangeSet.h" +#include "CanonicalTXSet.h" // Tracks the current ledger and any ledgers in the process of closing // Tracks ledger history @@ -25,7 +26,7 @@ class LedgerMaster LedgerHistory mLedgerHistory; - std::map mHeldTransactionsByID; + CanonicalTXSet mHeldTransactions; RangeSet mCompleteLedgers; LedgerAcquire::pointer mMissingLedger; @@ -40,7 +41,7 @@ class LedgerMaster public: - LedgerMaster() : mMissingSeq(0) { ; } + LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0) { ; } uint32 getCurrentLedgerIndex(); @@ -64,7 +65,7 @@ public: std::string getCompleteLedgers() { return mCompleteLedgers.toString(); } - Ledger::pointer closeLedger(); + Ledger::pointer closeLedger(bool recoverHeldTransactions); Ledger::pointer getLedgerBySeq(uint32 index) { @@ -90,7 +91,7 @@ public: void setLedgerRangePresent(uint32 minV, uint32 maxV) { mCompleteLedgers.setRange(minV, maxV); } - bool addHeldTransaction(const Transaction::pointer& trans); + void addHeldTransaction(const Transaction::pointer& trans); void sweep(void) { mLedgerHistory.sweep(); } }; diff --git a/src/LedgerProposal.cpp b/src/LedgerProposal.cpp index 75e286e9e..5f5f05873 100644 --- a/src/LedgerProposal.cpp +++ b/src/LedgerProposal.cpp @@ -10,8 +10,9 @@ DECLARE_INSTANCE(LedgerProposal); LedgerProposal::LedgerProposal(const uint256& pLgr, uint32 seq, const uint256& tx, uint32 closeTime, - const RippleAddress& naPeerPublic) : - mPreviousLedger(pLgr), mCurrentHash(tx), mCloseTime(closeTime), mProposeSeq(seq), mPublicKey(naPeerPublic) + const RippleAddress& naPeerPublic, const uint256& suppression) : + mPreviousLedger(pLgr), mCurrentHash(tx), mSuppression(suppression), mCloseTime(closeTime), + mProposeSeq(seq), mPublicKey(naPeerPublic) { // XXX Validate key. // if (!mKey->SetPubKey(pubKey)) @@ -26,7 +27,7 @@ LedgerProposal::LedgerProposal(const RippleAddress& naPub, const RippleAddress& const uint256& prevLgr, const uint256& position, uint32 closeTime) : mPreviousLedger(prevLgr), mCurrentHash(position), mCloseTime(closeTime), mProposeSeq(0), mPublicKey(naPub), mPrivateKey(naPriv) -{ // OPTIMIZEME: This is expensive. We create both the public and private keys separately each time +{ mPeerID = mPublicKey.getNodeID(); mTime = boost::posix_time::second_clock::universal_time(); } diff --git a/src/LedgerProposal.h b/src/LedgerProposal.h index a531de214..b5176c7c9 100644 --- a/src/LedgerProposal.h +++ b/src/LedgerProposal.h @@ -18,7 +18,7 @@ class LedgerProposal : private IS_INSTANCE(LedgerProposal) { protected: - uint256 mPreviousLedger, mCurrentHash; + uint256 mPreviousLedger, mCurrentHash, mSuppression; uint32 mCloseTime, mProposeSeq; uint160 mPeerID; @@ -35,7 +35,7 @@ public: // proposal from peer LedgerProposal(const uint256& prevLgr, uint32 proposeSeq, const uint256& propose, - uint32 closeTime, const RippleAddress& naPeerPublic); + uint32 closeTime, const RippleAddress& naPeerPublic, const uint256& suppress); // our first proposal LedgerProposal(const RippleAddress& pubKey, const RippleAddress& privKey, @@ -52,6 +52,7 @@ public: const uint160& getPeerID() const { return mPeerID; } const uint256& getCurrentHash() const { return mCurrentHash; } const uint256& getPrevLedger() const { return mPreviousLedger; } + const uint256& getSuppression() const { return mSuppression; } uint32 getProposeSeq() const { return mProposeSeq; } uint32 getCloseTime() const { return mCloseTime; } const RippleAddress& peekPublic() const { return mPublicKey; } diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index adccade64..21ce90666 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -115,19 +115,36 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) { Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true); - if (dbtx) return dbtx; + if (dbtx) + return dbtx; - if (!trans->checkSign()) - { - cLog(lsINFO) << "Transaction has bad signature"; + int newFlags = theApp->getSuppression().getFlags(trans->getID()); + if ((newFlags & SF_BAD) != 0) + { // cached bad trans->setStatus(INVALID); return trans; } - TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER); + if ((newFlags & SF_SIGGOOD) == 0) + { // signature not checked + if (!trans->checkSign()) + { + cLog(lsINFO) << "Transaction has bad signature"; + trans->setStatus(INVALID); + theApp->isNewFlag(trans->getID(), SF_BAD); + return trans; + } + theApp->isNewFlag(trans->getID(), SF_SIGGOOD); + } + TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER | tapNO_CHECK_SIGN); trans->setResult(r); + if (isTemMalformed(r)) // malformed, cache bad + theApp->isNewFlag(trans->getID(), SF_BAD); + else if(isTelLocal(r) || isTerRetry(r)) // can be retried + theApp->isNewFlag(trans->getID(), SF_RETRY); + #ifdef DEBUG if (r != tesSUCCESS) { @@ -139,9 +156,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) if (r == tefFAILURE) throw Fault(IO_ERROR); - if (r == terPRE_SEQ) + if (isTerRetry(r)) { // transaction should be held - cLog(lsDEBUG) << "Transaction should be held"; + cLog(lsDEBUG) << "Transaction should be held: " << r; trans->setStatus(HELD); theApp->getMasterTransaction().canonicalize(trans, true); mLedgerMaster->addHeldTransaction(trans); @@ -154,12 +171,24 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) return trans; } + bool relay = true; + if (r == tesSUCCESS) { - cLog(lsINFO) << "Transaction is now included"; + cLog(lsINFO) << "Transaction is now included in open ledger"; trans->setStatus(INCLUDED); theApp->getMasterTransaction().canonicalize(trans, true); + } + else + { + cLog(lsDEBUG) << "Status other than success " << r; + if (mMode == omFULL) + relay = false; + trans->setStatus(INVALID); + } + if (relay) + { std::set peers; if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) { @@ -168,32 +197,13 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) trans->getSTransaction()->add(s); tx.set_rawtransaction(&s.getData().front(), s.getLength()); tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); + tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); theApp->getConnectionPool().relayMessageBut(peers, packet); } - - return trans; } - cLog(lsDEBUG) << "Status other than success " << r; - std::set peers; - - if ((mMode != omFULL) && (mMode != omTRACKING) && - theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) - { - ripple::TMTransaction tx; - Serializer s; - trans->getSTransaction()->add(s); - tx.set_rawtransaction(&s.getData().front(), s.getLength()); - tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); - PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessageTo(peers, packet); - } - - trans->setStatus(INVALID); return trans; } @@ -694,60 +704,55 @@ bool NetworkOPs::haveConsensusObject() return mConsensus; } -// <-- bool: true to relay -bool NetworkOPs::recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, - const uint256& prevLedger, uint32 closeTime, const std::string& signature, - const RippleAddress& nodePublic) +uint256 NetworkOPs::getConsensusLCL() { - // JED: does mConsensus need to be locked? + if (!haveConsensusObject()) + return uint256(); + return mConsensus->getLCL(); +} - // XXX Validate key. - // XXX Take a vuc for pubkey. +void NetworkOPs::processTrustedProposal(LedgerProposal::pointer proposal, + boost::shared_ptr set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood) +{ + bool relay = true; if (!haveConsensusObject()) { cLog(lsINFO) << "Received proposal outside consensus window"; - return mMode != omFULL; + if (mMode == omFULL) + relay = false; } - - if (mConsensus->isOurPubKey(nodePublic)) + else { - cLog(lsTRACE) << "Received our own validation"; - return false; - } + storeProposal(proposal, nodePublic); - // Is this node on our UNL? - if (!theApp->getUNL().nodeInUNL(nodePublic)) - { - cLog(lsINFO) << "Untrusted proposal: " << nodePublic.humanNodePublic() << " " << proposeHash; - return true; - } + uint256 consensusLCL = mConsensus->getLCL(); - if (prevLedger.isNonZero()) - { // proposal includes a previous ledger - LedgerProposal::pointer proposal = - boost::make_shared(prevLedger, proposeSeq, proposeHash, closeTime, nodePublic); - if (!proposal->checkSign(signature)) + if (!set->has_previousledger() && (checkLedger != consensusLCL)) { - cLog(lsWARNING) << "New-style ledger proposal fails signature check"; - return false; + cLog(lsWARNING) << "Have to re-check proposal signature due to consensus view change"; + assert(proposal->hasSignature()); + proposal->setPrevLedger(consensusLCL); + if (proposal->checkSign()) + sigGood = true; + } + + if (sigGood && (consensusLCL == proposal->getPrevLedger())) + { + relay = mConsensus->peerPosition(proposal); + cLog(lsTRACE) << "Proposal processing finished, relay=" << relay; } - if (prevLedger == mConsensus->getLCL()) - return mConsensus->peerPosition(proposal); - storeProposal(proposal, nodePublic); - return false; } - LedgerProposal::pointer proposal = - boost::make_shared(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, nodePublic); - if (!proposal->checkSign(signature)) - { // Note that if the LCL is different, the signature check will fail - cLog(lsWARNING) << "Ledger proposal fails signature check"; - proposal->setSignature(signature); - storeProposal(proposal, nodePublic); - return false; + if (relay) + { + std::set peers; + theApp->getSuppression().swapSet(proposal->getSuppression(), peers, SF_RELAYED); + PackedMessage::pointer message = boost::make_shared(*set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); } - return mConsensus->peerPosition(proposal); + else + cLog(lsINFO) << "Not relaying trusted proposal"; } SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash) @@ -761,7 +766,10 @@ bool NetworkOPs::gotTXData(const boost::shared_ptr& peer, const uint256& h const std::list& nodeIDs, const std::list< std::vector >& nodeData) { if (!haveConsensusObject()) + { + cLog(lsWARNING) << "Got TX data with no consensus object"; return false; + } return mConsensus->peerGaveNodes(peer, hash, nodeIDs, nodeData); } @@ -777,7 +785,7 @@ bool NetworkOPs::hasTXSet(const boost::shared_ptr& peer, const uint256& se void NetworkOPs::mapComplete(const uint256& hash, SHAMap::ref map) { - if (!haveConsensusObject()) + if (haveConsensusObject()) mConsensus->mapComplete(hash, map, true); } diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index cfbc81831..b14b06e09 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -172,8 +172,8 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - bool recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, - const uint256& prevLedger, uint32 closeTime, const std::string& signature, const RippleAddress& nodePublic); + void processTrustedProposal(LedgerProposal::pointer proposal, boost::shared_ptr set, + RippleAddress nodePublic, uint256 checkLedger, bool sigGood); bool gotTXData(const boost::shared_ptr& peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); bool recvValidation(const SerializedValidation::pointer& val); @@ -203,6 +203,7 @@ public: boost::unordered_map >& peekStoredProposals() { return mStoredProposals; } void storeProposal(const LedgerProposal::pointer& proposal, const RippleAddress& peerPublic); + uint256 getConsensusLCL(); // client information retrieval functions std::vector< std::pair > diff --git a/src/Peer.cpp b/src/Peer.cpp index c9c6df5db..5e4b12128 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -500,8 +500,8 @@ void Peer::processReadBuffer() case ripple::mtPROPOSE_LEDGER: { - ripple::TMProposeSet msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + boost::shared_ptr msg = boost::make_shared(); + if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvPropose(msg); else cLog(lsWARNING) << "parse error: " << type; @@ -694,6 +694,42 @@ void Peer::recvHello(ripple::TMHello& packet) } } +static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx, boost::weak_ptr peer) +{ + +#ifndef TRUST_NETWORK + try + { +#endif + Transaction::pointer tx; + + if ((flags & SF_SIGGOOD) != 0) + { + tx = boost::make_shared(stx, true); + if (tx->getStatus() == INVALID) + { + theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD); + Peer::punishPeer(peer, PP_BAD_SIGNATURE); + return; + } + else + theApp->getSuppression().setFlag(stx->getTransactionID(), SF_SIGGOOD); + } + else + tx = boost::make_shared(stx, false); + + theApp->getIOService().post(boost::bind(&NetworkOPs::processTransaction, &theApp->getOPs(), tx)); + +#ifndef TRUST_NETWORK + } + catch (...) + { + theApp->getSuppression().setFlags(stx->getTransactionID(), SF_BAD); + punishPeer(peer, PP_INVALID_REQUEST); + } +#endif +} + void Peer::recvTransaction(ripple::TMTransaction& packet) { cLog(lsDEBUG) << "Got transaction from peer"; @@ -708,12 +744,22 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) SerializerIterator sit(s); SerializedTransaction::pointer stx = boost::make_shared(boost::ref(sit)); - if (!theApp->isNew(stx->getTransactionID(), mPeerId)) - return; + int flags; + if (!theApp->isNew(stx->getTransactionID(), mPeerId, flags)) + { // we have seen this transaction recently + if ((flags & SF_BAD) != 0) + { + punishPeer(PP_BAD_SIGNATURE); + return; + } + + if ((flags & SF_RETRY) == 0) + return; + } + + theApp->getJobQueue().addJob(jtTRANSACTION, + boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); - tx = boost::make_shared(stx, true); - if (tx->getStatus() == INVALID) - throw(0); #ifndef TRUST_NETWORK } catch (...) @@ -727,52 +773,122 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } #endif - tx = theApp->getOPs().processTransaction(tx); - - if(tx->getStatus() != INCLUDED) - { // transaction wasn't accepted into ledger -#ifdef DEBUG - std::cerr << "Transaction from peer won't go in ledger" << std::endl; -#endif - } } -void Peer::recvPropose(ripple::TMProposeSet& packet) +static void checkPropose(Job& job, boost::shared_ptr packet, + LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, boost::weak_ptr peer) { - if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) || - (packet.signature().size() < 56) || (packet.nodepubkey().size() > 128) || (packet.signature().size() > 128)) + bool sigGood = false; + bool isTrusted = (job.getType() == jtPROPOSAL_t); + + cLog(lsTRACE) << "Checking " << (isTrusted ? "trusted" : "UNtrusted") << " proposal"; + + assert(packet); + ripple::TMProposeSet& set = *packet; + + uint256 prevLedger; + if (set.has_previousledger()) + { // proposal includes a previous ledger + cLog(lsTRACE) << "proposal with previous ledger"; + memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8); + if (!proposal->checkSign(set.signature())) + { + cLog(lsWARNING) << "proposal with previous ledger fails signature check"; + Peer::punishPeer(peer, PP_BAD_SIGNATURE); + return; + } + else + sigGood = true; + } + else + { + if (consensusLCL.isNonZero() && proposal->checkSign(set.signature())) + { + prevLedger = consensusLCL; + sigGood = true; + } + else + { + cLog(lsWARNING) << "Ledger proposal fails signature check"; + proposal->setSignature(set.signature()); + } + } + + if (isTrusted) + { + theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(), + proposal, packet, nodePublic, prevLedger, sigGood)); + } + else if (sigGood && (prevLedger == consensusLCL)) + { // relay untrusted proposal + cLog(lsTRACE) << "relaying untrusted proposal"; + std::set peers; + theApp->getSuppression().swapSet(proposal->getSuppression(), peers, SF_RELAYED); + PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); + } + else + cLog(lsDEBUG) << "Not relaying untrusted proposal"; +} + +void Peer::recvPropose(const boost::shared_ptr& packet) +{ + assert(packet); + ripple::TMProposeSet& set = *packet; + + if ((set.currenttxhash().size() != 32) || (set.nodepubkey().size() < 28) || + (set.signature().size() < 56) || (set.nodepubkey().size() > 128) || (set.signature().size() > 128)) { cLog(lsWARNING) << "Received proposal is malformed"; + punishPeer(PP_INVALID_REQUEST); return; } - uint256 currentTxHash, prevLedger; - memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32); + if (set.has_previousledger() && (set.previousledger().size() != 32)) + { + cLog(lsWARNING) << "Received proposal is malformed"; + punishPeer(PP_INVALID_REQUEST); + return; + } - if ((packet.has_previousledger()) && (packet.previousledger().size() == 32)) - memcpy(prevLedger.begin(), packet.previousledger().data(), 32); + uint256 proposeHash, prevLedger; + memcpy(proposeHash.begin(), set.currenttxhash().data(), 32); + if (set.has_previousledger()) + memcpy(prevLedger.begin(), set.previousledger().data(), 32); Serializer s(512); - s.add256(currentTxHash); - s.add256(prevLedger); - s.add32(packet.proposeseq()); - s.add32(packet.closetime()); - s.addVL(packet.nodepubkey()); - s.addVL(packet.signature()); + s.add256(proposeHash); + s.add32(set.proposeseq()); + s.add32(set.closetime()); + s.addVL(set.nodepubkey()); + s.addVL(set.signature()); + if (set.has_previousledger()) + s.add256(prevLedger); uint256 suppression = s.getSHA512Half(); if (!theApp->isNew(suppression, mPeerId)) + { + cLog(lsTRACE) << "Received duplicate proposal from peer " << mPeerId; return; - - RippleAddress nodePublic = RippleAddress::createNodePublic(strCopy(packet.nodepubkey())); -// bool isTrusted = theApp->getUNL().nodeInUNL(nodePublic); - - if(theApp->getOPs().recvPropose(suppression, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(), - packet.signature(), nodePublic)) - { // FIXME: Not all nodes will want proposals - PackedMessage::pointer message = boost::make_shared(packet, ripple::mtPROPOSE_LEDGER); - theApp->getConnectionPool().relayMessage(this, message); } + + RippleAddress signerPublic = RippleAddress::createNodePublic(strCopy(set.nodepubkey())); + if (signerPublic == theConfig.VALIDATION_PUB) + { + cLog(lsTRACE) << "Received our own proposal from peer " << mPeerId; + return; + } + bool isTrusted = theApp->getUNL().nodeInUNL(signerPublic); + cLog(lsTRACE) << "Received " << (isTrusted ? "trusted" : "UNtrusted") << " proposal from " << mPeerId; + + uint256 consensusLCL = theApp->getOPs().getConsensusLCL(); + LedgerProposal::pointer proposal = boost::make_shared( + prevLedger.isNonZero() ? prevLedger : consensusLCL, + set.proposeseq(), proposeHash, set.closetime(), signerPublic, suppression); + + theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, + boost::bind(&checkPropose, _1, packet, proposal, consensusLCL, + mNodePublic, boost::weak_ptr(shared_from_this()))); } void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet) @@ -1043,11 +1159,12 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) reply.set_requestcookie(packet.requestcookie()); if (packet.itype() == ripple::liTS_CANDIDATE) - { // Request is for a transaction candidate set + { // Request is for a transaction candidate set cLog(lsINFO) << "Received request for TX candidate set data " << getIP(); if ((!packet.has_ledgerhash() || packet.ledgerhash().size() != 32)) { punishPeer(PP_INVALID_REQUEST); + cLog(lsWARNING) << "invalid request"; return; } uint256 txHash; @@ -1171,6 +1288,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) SHAMapNode mn(packet.nodeids(i).data(), packet.nodeids(i).size()); if(!mn.isValid()) { + cLog(lsWARNING) << "Request for invalid node"; punishPeer(PP_INVALID_REQUEST); return; } @@ -1178,6 +1296,8 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) std::list< std::vector > rawNodes; if(map->getNodeFat(mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) { + assert(nodeIDs.size() == rawNodes.size()); + cLog(lsDEBUG) << "getNodeFat got " << rawNodes.size() << " nodes"; std::vector::iterator nodeIDIterator; std::list< std::vector >::iterator rawNodeIterator; for(nodeIDIterator = nodeIDs.begin(), rawNodeIterator = rawNodes.begin(); @@ -1190,6 +1310,8 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) node->set_nodedata(&rawNodeIterator->front(), rawNodeIterator->size()); } } + else + cLog(lsWARNING) << "getNodeFat returns false"; } PackedMessage::pointer oPacket = boost::make_shared(reply, ripple::mtLEDGER_DATA); sendPacket(oPacket); @@ -1199,7 +1321,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet) { if (packet.nodes().size() <= 0) { - cLog(lsWARNING) << "Ledger data with no nodes"; + cLog(lsWARNING) << "Ledger/TXset data with no nodes"; punishPeer(PP_INVALID_REQUEST); return; } @@ -1209,6 +1331,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet) uint256 hash; if(packet.ledgerhash().size() != 32) { + cLog(lsWARNING) << "TX candidate reply with invalid hash size"; punishPeer(PP_INVALID_REQUEST); return; } @@ -1368,7 +1491,7 @@ Json::Value Peer::getJson() if (mHello.has_protoversion() && (mHello.protoversion() != MAKE_VERSION_INT(PROTO_VERSION_MAJOR, PROTO_VERSION_MINOR))) - ret["protocol"] = boost::lexical_cast(GET_VERSION_MAJOR(mHello.protoversion())) + "." + + ret["protocol"] = boost::lexical_cast(GET_VERSION_MAJOR(mHello.protoversion())) + "." + boost::lexical_cast(GET_VERSION_MINOR(mHello.protoversion())); if (!!mClosedLedgerHash) diff --git a/src/Peer.h b/src/Peer.h index 256319c83..e70d702fb 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -19,6 +19,7 @@ enum PeerPunish PP_INVALID_REQUEST = 1, // The peer sent a request that makes no sense PP_UNKNOWN_REQUEST = 2, // The peer sent a request that might be garbage PP_UNWANTED_DATA = 3, // The peer sent us data we didn't want/need + PP_BAD_SIGNATURE = 4, // Object had bad signature }; typedef std::pair ipPort; @@ -116,7 +117,7 @@ protected: void recvGetLedger(ripple::TMGetLedger& packet); void recvLedger(ripple::TMLedgerData& packet); void recvStatus(ripple::TMStatusChange& packet); - void recvPropose(ripple::TMProposeSet& packet); + void recvPropose(const boost::shared_ptr& packet); void recvHaveTxSet(ripple::TMHaveTransactionSet& packet); void getSessionCookie(std::string& strDst); diff --git a/src/SHAMap.cpp b/src/SHAMap.cpp index e3de616d2..def66e4f3 100644 --- a/src/SHAMap.cpp +++ b/src/SHAMap.cpp @@ -716,23 +716,22 @@ SHAMapTreeNode::pointer SHAMap::fetchNodeExternal(const SHAMapNode& id, const ui // Log(lsTRACE) << "fetchNodeExternal: missing " << hash; throw SHAMapMissingNode(mType, id, hash); } - assert(Serializer::getSHA512Half(obj->getData()) == hash); try { SHAMapTreeNode::pointer ret = boost::make_shared(id, obj->getData(), mSeq - 1, snfPREFIX); -#ifdef DEBUG if (id != *ret) { Log(lsFATAL) << "id:" << id << ", got:" << *ret; assert(false); + return SHAMapTreeNode::pointer(); } if (ret->getNodeHash() != hash) { Log(lsFATAL) << "Hashes don't match"; assert(false); + return SHAMapTreeNode::pointer(); } -#endif return ret; } catch (...) diff --git a/src/Suppression.cpp b/src/Suppression.cpp index ad96d4b15..5eee0ab7f 100644 --- a/src/Suppression.cpp +++ b/src/Suppression.cpp @@ -58,6 +58,25 @@ bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer) return created; } +bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer, int& flags) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + Suppression &s = findCreateEntry(index, created); + s.addPeer(peer); + flags = s.getFlags(); + return created; +} + +int SuppressionTable::getFlags(const uint256& index) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + return findCreateEntry(index, created).getFlags(); +} + bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag) { boost::mutex::scoped_lock sl(mSuppressionMutex); diff --git a/src/Suppression.h b/src/Suppression.h index 14b611ede..fe07e522b 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -14,10 +14,11 @@ DEFINE_INSTANCE(Suppression); -#define SF_RELAYED 0x01 -#define SF_SIGBAD 0x02 -#define SF_SIGGOOD 0x04 +#define SF_RELAYED 0x01 // Has already been relayed to other nodes +#define SF_BAD 0x02 // Signature/format is bad +#define SF_SIGGOOD 0x04 // Signature is good #define SF_SAVED 0x08 +#define SF_RETRY 0x10 // Transaction can be retried class Suppression : private IS_INSTANCE(Suppression) { @@ -61,12 +62,15 @@ public: bool addSuppression(const uint256& index); bool addSuppressionPeer(const uint256& index, uint64 peer); + bool addSuppressionPeer(const uint256& index, uint64 peer, int& flags); bool addSuppressionFlags(const uint256& index, int flag); bool setFlag(const uint256& index, int flag); + int getFlags(const uint256& index); Suppression getEntry(const uint256&); bool swapSet(const uint256& index, std::set& peers, int flag); + bool swapSet(const uint256& index, std::set& peers); }; #endif diff --git a/src/TransactionEngine.cpp b/src/TransactionEngine.cpp index 053502939..3c6789ce5 100644 --- a/src/TransactionEngine.cpp +++ b/src/TransactionEngine.cpp @@ -455,7 +455,7 @@ TER TransactionEngine::applyTransaction(const SerializedTransaction& txn, Transa terResult = terRETRY; } - if (tesSUCCESS == terResult || isTepPartial(terResult)) + if ((tesSUCCESS == terResult) || isTepPartial(terResult)) { // Transaction succeeded fully or (retries are not allowed and the transaction succeeded partially). Serializer m; diff --git a/src/TransactionMeta.cpp b/src/TransactionMeta.cpp index f8fdff08d..8866b039f 100644 --- a/src/TransactionMeta.cpp +++ b/src/TransactionMeta.cpp @@ -121,6 +121,7 @@ static bool compare(const STObject& o1, const STObject& o2) STObject TransactionMetaSet::getAsObject() const { STObject metaData(sfTransactionMetaData); + assert(mResult != 255); metaData.setFieldU8(sfTransactionResult, mResult); metaData.addObject(mNodes); return metaData;