diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index 38b48abe1..154daf6e2 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -1047,6 +1047,8 @@
+
+
@@ -1945,6 +1947,8 @@
True
True
+ ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
+ ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
True
@@ -2008,6 +2012,14 @@
+
+ True
+ True
+ ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
+ ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
+
+
+
True
True
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index dae82c892..c54cf2c14 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -1707,6 +1707,9 @@
beast\utility
+
+ beast\utility
+
beast\utility
@@ -2571,6 +2574,12 @@
ripple\app\transactors
+
+ ripple\app\tx
+
+
+ ripple\app\tx
+
ripple\app\tx
diff --git a/src/ripple/app/consensus/LedgerConsensus.cpp b/src/ripple/app/consensus/LedgerConsensus.cpp
index 8c6ae288e..068c8b9c3 100644
--- a/src/ripple/app/consensus/LedgerConsensus.cpp
+++ b/src/ripple/app/consensus/LedgerConsensus.cpp
@@ -31,6 +31,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -78,7 +79,6 @@ public:
/**
The result of applying a transaction to a ledger.
- @param clock The clock which will be used to measure time.
@param localtx A set of local transactions to apply.
@param prevLCLHash The hash of the Last Closed Ledger (LCL).
@param previousLedger Best guess of what the Last Closed Ledger (LCL)
@@ -86,11 +86,10 @@ public:
@param closeTime Closing time point of the LCL.
@param feeVote Our desired fee levels and voting logic.
*/
- LedgerConsensusImp (clock_type& clock, LocalTxs& localtx,
+ LedgerConsensusImp (LocalTxs& localtx,
LedgerHash const & prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime, FeeVote& feeVote)
- : m_clock (clock)
- , m_localTX (localtx)
+ : m_localTX (localtx)
, m_feeVote (feeVote)
, mState (lcsPRE_CLOSE)
, mCloseTime (closeTime)
@@ -112,6 +111,8 @@ public:
mPreviousMSeconds = getApp().getOPs ().getPreviousConvergeTime ();
assert (mPreviousMSeconds);
+ getApp().getInboundTransactions().newRound (mPreviousLedger->getLedgerSeq());
+
// Adapt close time resolution to recent network conditions
mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution (
mPreviousLedger->getCloseResolution (),
@@ -256,16 +257,6 @@ public:
ret["acquired"] = acq;
}
- if (!mAcquiring.empty ())
- {
- Json::Value acq (Json::arrayValue);
- for (auto& at : mAcquiring)
- {
- acq.append (to_string (at.first));
- }
- ret["acquiring"] = acq;
- }
-
if (!mDisputes.empty ())
{
Json::Value dsj (Json::objectValue);
@@ -310,64 +301,6 @@ public:
return mPrevLedgerHash;
}
- /**
- Get a transaction tree, fetching it from the network if required and
- requested. When the transaction acquire engine successfully acquires
- a transaction set, it will call back.
-
- @param hash hash of the requested transaction tree.
- @param doAcquire true if we should get this from the network if we don't
- already have it.
- @return Pointer to the transaction tree if we got it, else
- nullptr.
- */
- std::shared_ptr
- getTransactionTree (uint256 const& hash, bool doAcquire)
- {
- auto it = mAcquired.find (hash);
-
- if (it != mAcquired.end ())
- return it->second;
-
- if (mState == lcsPRE_CLOSE)
- {
- std::shared_ptr currentMap
- = getApp().getLedgerMaster ().getCurrentLedger ()
- ->peekTransactionMap ();
-
- if (currentMap->getHash () == hash)
- {
- WriteLog (lsDEBUG, LedgerConsensus)
- << "Map " << hash << " is our current";
- currentMap = currentMap->snapShot (false);
- mapCompleteInternal (hash, currentMap, false);
- return currentMap;
- }
- }
-
- if (doAcquire)
- {
- TransactionAcquire::pointer& acquiring = mAcquiring[hash];
-
- if (!acquiring)
- {
- if (hash.isZero ())
- {
- auto empty = std::make_shared (
- SHAMapType::TRANSACTION, getApp().family(),
- deprecatedLogs().journal("SHAMap"));
- mapCompleteInternal (hash, empty, false);
- return empty;
- }
-
- acquiring = std::make_shared (hash, m_clock);
- startAcquiring (acquiring);
- }
- }
-
- return std::shared_ptr ();
- }
-
/**
We have a complete transaction set, typically acquired from the network
@@ -401,7 +334,6 @@ public:
{
// this is an invalid/corrupt map
mAcquired[hash] = map;
- mAcquiring.erase (hash);
WriteLog (lsWARNING, LedgerConsensus)
<< "A trusted node directed us to acquire an invalid TXN map";
return;
@@ -416,7 +348,6 @@ public:
{
if (it->second)
{
- mAcquiring.erase (hash);
return; // we already have this map
}
@@ -426,6 +357,15 @@ public:
// We now have a map that we did not have before
+ if (!acquired)
+ {
+ // Put the map where others can get it
+ getApp().getInboundTransactions().giveSet (hash, map, false);
+ }
+
+ // Inform directly-connected peers that we have this transaction set
+ sendHaveTxSet (hash, true);
+
if (mOurPosition && (!mOurPosition->isBowOut ())
&& (hash != mOurPosition->getCurrentHash ()))
{
@@ -448,7 +388,6 @@ public:
<< "Not ready to create disputes";
mAcquired[hash] = map;
- mAcquiring.erase (hash);
// Adjust tracking for each peer that takes this position
std::vector peers;
@@ -469,30 +408,6 @@ public:
<< hash << " no peers were proposing it";
}
- // Inform directly-connected peers that we have this transaction set
- sendHaveTxSet (hash, true);
- }
-
- /**
- Determine if we still need to acquire a transaction set from the network.
- If a transaction set is popular, we probably have it. If it's unpopular,
- we probably don't need it (and the peer that initially made us
- retrieve it has probably already changed its position).
-
- @param hash hash of the transaction set.
- @return true if we need to acquire it, else false.
- */
- bool stillNeedTXSet (uint256 const& hash)
- {
- if (mAcquired.find (hash) != mAcquired.end ())
- return false;
-
- for (auto const& it : mPeerPositions)
- {
- if (it.second->getCurrentHash () == hash)
- return true;
- }
- return false;
}
/**
@@ -852,6 +767,20 @@ public:
, mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail);
}
+ std::shared_ptr getTransactionTree (uint256 const& hash)
+ {
+ auto it = mAcquired.find (hash);
+ if (it != mAcquired.end() && it->second)
+ return it->second;
+
+ auto set = getApp().getInboundTransactions().getSet (hash, true);
+
+ if (set)
+ mAcquired[hash] = set;
+
+ return set;
+ }
+
/**
A server has taken a new position, adjust our tracking
Called when a peer takes a new postion.
@@ -910,7 +839,7 @@ public:
currentPosition = newPosition;
std::shared_ptr set
- = getTransactionTree (newPosition->getCurrentHash (), true);
+ = getTransactionTree (newPosition->getCurrentHash ());
if (set)
{
@@ -926,32 +855,6 @@ public:
return true;
}
- /**
- A peer has sent us some nodes from a transaction set
-
- @param peer The peer which has sent the nodes
- @param setHash The transaction set
- @param nodeIDs The nodes in the transaction set
- @param nodeData The data
- @return The status results of adding the nodes.
- */
- SHAMapAddNode peerGaveNodes (Peer::ptr const& peer
- , uint256 const& setHash, const std::list& nodeIDs
- , const std::list< Blob >& nodeData)
- {
- auto acq (mAcquiring.find (setHash));
-
- if (acq == mAcquiring.end ())
- {
- WriteLog (lsDEBUG, LedgerConsensus)
- << "Got TX data for set no longer acquiring: " << setHash;
- return SHAMapAddNode ();
- }
- // We must keep the set around during the function
- TransactionAcquire::pointer set = acq->second;
- return set->takeNodes (nodeIDs, nodeData, peer);
- }
-
bool isOurPubKey (const RippleAddress & k)
{
return k == mValPublic;
@@ -1197,36 +1100,6 @@ private:
}
}
- /**
- Begin acquiring a transaction set
-
- @param acquire The transaction set to acquire.
- */
- void startAcquiring (TransactionAcquire::pointer acquire)
- {
- // FIXME: Randomize and limit the number
- struct build_acquire_list
- {
- typedef void return_type;
-
- TransactionAcquire::pointer const& acquire;
-
- build_acquire_list (TransactionAcquire::pointer const& acq)
- : acquire(acq)
- { }
-
- return_type operator() (Peer::ptr const& peer) const
- {
- if (peer->hasTxSet (acquire->getHash ()))
- acquire->peerHas (peer);
- }
- };
-
- getApp().overlay ().foreach (build_acquire_list (acquire));
-
- acquire->setTimer ();
- }
-
/**
Compare two proposed transaction sets and create disputed
transctions structures for any mismatches
@@ -1883,7 +1756,6 @@ private:
val->setFieldU32(sfLoadFee, fee);
}
private:
- clock_type& m_clock;
LocalTxs& m_localTX;
FeeVote& m_feeVote;
@@ -1918,7 +1790,6 @@ private:
// Transaction Sets, indexed by hash of transaction tree
hash_map> mAcquired;
- hash_map mAcquiring;
// Disputed transactions
hash_map mDisputes;
@@ -1938,11 +1809,11 @@ LedgerConsensus::~LedgerConsensus ()
}
std::shared_ptr
-make_LedgerConsensus (LedgerConsensus::clock_type& clock, LocalTxs& localtx,
+make_LedgerConsensus (LocalTxs& localtx,
LedgerHash const &prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime, FeeVote& feeVote)
{
- return std::make_shared (clock, localtx,
+ return std::make_shared (localtx,
prevLCLHash, previousLedger, closeTime, feeVote);
}
diff --git a/src/ripple/app/consensus/LedgerConsensus.h b/src/ripple/app/consensus/LedgerConsensus.h
index 825bf6c91..22b8e505d 100644
--- a/src/ripple/app/consensus/LedgerConsensus.h
+++ b/src/ripple/app/consensus/LedgerConsensus.h
@@ -28,7 +28,6 @@
#include
#include
#include
-#include
#include
namespace ripple {
@@ -41,8 +40,6 @@ namespace ripple {
class LedgerConsensus
{
public:
- typedef beast::abstract_clock clock_type;
-
virtual ~LedgerConsensus() = 0;
virtual int startup () = 0;
@@ -53,14 +50,9 @@ public:
virtual uint256 getLCL () = 0;
- virtual std::shared_ptr getTransactionTree (uint256 const& hash,
- bool doAcquire) = 0;
-
virtual void mapComplete (uint256 const& hash,
std::shared_ptr const& map, bool acquired) = 0;
- virtual bool stillNeedTXSet (uint256 const& hash) = 0;
-
virtual void checkLCL () = 0;
virtual void handleLCL (uint256 const& lclHash) = 0;
@@ -77,11 +69,6 @@ public:
virtual bool peerPosition (LedgerProposal::ref) = 0;
- virtual SHAMapAddNode peerGaveNodes (Peer::ptr const& peer,
- uint256 const& setHash,
- const std::list& nodeIDs,
- const std::list< Blob >& nodeData) = 0;
-
virtual bool isOurPubKey (const RippleAddress & k) = 0;
// test/debug
@@ -89,7 +76,7 @@ public:
};
std::shared_ptr
-make_LedgerConsensus (LedgerConsensus::clock_type& clock, LocalTxs& localtx,
+make_LedgerConsensus (LocalTxs& localtx,
LedgerHash const & prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime, FeeVote& feeVote);
diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.cpp b/src/ripple/app/ledger/ConsensusTransSetSF.cpp
index e59d9a9af..bafb277f4 100644
--- a/src/ripple/app/ledger/ConsensusTransSetSF.cpp
+++ b/src/ripple/app/ledger/ConsensusTransSetSF.cpp
@@ -78,7 +78,7 @@ bool ConsensusTransSetSF::haveNode (const SHAMapNodeID& id, uint256 const& nodeH
if (txn)
{
// this is a transaction, and we have it
- WriteLog (lsDEBUG, TransactionAcquire) << "Node in our acquiring TX set is TXN we have";
+ WriteLog (lsTRACE, TransactionAcquire) << "Node in our acquiring TX set is TXN we have";
Serializer s;
s.add32 (HashPrefix::transactionID);
txn->getSTransaction ()->add (s, true);
diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp
index 617322d72..3e70418eb 100644
--- a/src/ripple/app/main/Application.cpp
+++ b/src/ripple/app/main/Application.cpp
@@ -39,6 +39,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -276,6 +277,7 @@ public:
std::unique_ptr m_pathRequests;
std::unique_ptr m_ledgerMaster;
std::unique_ptr m_inboundLedgers;
+ std::unique_ptr m_inboundTransactions;
std::unique_ptr m_networkOPs;
std::unique_ptr m_deprecatedUNL;
std::unique_ptr serverHandler_;
@@ -380,6 +382,16 @@ public:
, m_inboundLedgers (make_InboundLedgers (get_seconds_clock (),
*m_jobQueue, m_collectorManager->collector ()))
+ , m_inboundTransactions (make_InboundTransactions
+ ( get_seconds_clock ()
+ , *m_jobQueue
+ , m_collectorManager->collector ()
+ , [this](uint256 const& setHash,
+ std::shared_ptr const& set)
+ {
+ gotTXSet (setHash, set);
+ }))
+
, m_networkOPs (make_NetworkOPs (get_seconds_clock (),
getConfig ().RUN_STANDALONE, getConfig ().NETWORK_QUORUM,
*m_jobQueue, *m_ledgerMaster, *m_jobQueue,
@@ -504,6 +516,16 @@ public:
return *m_inboundLedgers;
}
+ InboundTransactions& getInboundTransactions ()
+ {
+ return *m_inboundTransactions;
+ }
+
+ void gotTXSet (uint256 const& setHash, std::shared_ptr const& set)
+ {
+ m_networkOPs->mapComplete (setHash, set);
+ }
+
TransactionMaster& getMasterTransaction ()
{
return m_txMaster;
diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h
index 0e868d789..b34892bf2 100644
--- a/src/ripple/app/main/Application.h
+++ b/src/ripple/app/main/Application.h
@@ -49,6 +49,7 @@ class LocalCredentials;
class UniqueNodeList;
class JobQueue;
class InboundLedgers;
+class InboundTransactions;
class LedgerMaster;
class LoadManager;
class NetworkOPs;
@@ -106,6 +107,7 @@ public:
virtual Validations& getValidations () = 0;
virtual NodeStore::Database& getNodeStore () = 0;
virtual InboundLedgers& getInboundLedgers () = 0;
+ virtual InboundTransactions& getInboundTransactions () = 0;
virtual LedgerMaster& getLedgerMaster () = 0;
virtual NetworkOPs& getOPs () = 0;
virtual OrderBookDB& getOrderBookDB () = 0;
diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp
index 636d5cc69..4b947b045 100644
--- a/src/ripple/app/misc/NetworkOPs.cpp
+++ b/src/ripple/app/misc/NetworkOPs.cpp
@@ -279,11 +279,6 @@ public:
std::shared_ptr set,
RippleAddress nodePublic, uint256 checkLedger, bool sigGood);
- SHAMapAddNode gotTXData (
- const std::shared_ptr& peer, uint256 const& hash,
- const std::list& nodeIDs,
- const std::list< Blob >& nodeData);
-
bool recvValidation (
STValidation::ref val, std::string const& source);
void takePosition (int seq, std::shared_ptr const& position);
@@ -293,7 +288,6 @@ public:
protocol::TxSetStatus status);
void mapComplete (uint256 const& hash, std::shared_ptr const& map);
- bool stillNeedTXSet (uint256 const& hash);
void makeFetchPack (
Job&, std::weak_ptr peer,
std::shared_ptr request,
@@ -1550,7 +1544,7 @@ int NetworkOPsImp::beginConsensus (
assert (!mConsensus);
prevLedger->setImmutable ();
- mConsensus = make_LedgerConsensus (m_clock, *m_localTX, networkClosed,
+ mConsensus = make_LedgerConsensus (*m_localTX, networkClosed,
prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC (),
*m_feeVote);
@@ -1657,21 +1651,6 @@ void NetworkOPsImp::processTrustedProposal (
}
}
-// Must be called while holding the master lock
-std::shared_ptr
-NetworkOPsImp::getTXMap (uint256 const& hash)
-{
- auto it = mRecentPositions.find (hash);
-
- if (it != mRecentPositions.end ())
- return it->second.second;
-
- if (!haveConsensusObject ())
- return std::shared_ptr ();
-
- return mConsensus->getTransactionTree (hash, false);
-}
-
// Must be called while holding the master lock
void
NetworkOPsImp::takePosition (int seq, std::shared_ptr const& position)
@@ -1693,33 +1672,12 @@ NetworkOPsImp::takePosition (int seq, std::shared_ptr const& position)
}
}
-// Call with the master lock for now
-SHAMapAddNode NetworkOPsImp::gotTXData (
- const std::shared_ptr& peer, uint256 const& hash,
- const std::list& nodeIDs, const std::list< Blob >& nodeData)
-{
-
- if (!mConsensus)
- {
- m_journal.debug << "Got TX data with no consensus object";
- return SHAMapAddNode ();
- }
-
- return mConsensus->peerGaveNodes (peer, hash, nodeIDs, nodeData);
-}
-
-bool NetworkOPsImp::stillNeedTXSet (uint256 const& hash)
-{
- if (!mConsensus)
- return false;
-
- return mConsensus->stillNeedTXSet (hash);
-}
-
void
NetworkOPsImp::mapComplete (uint256 const& hash,
std::shared_ptr const& map)
{
+ std::lock_guard lock(getApp().getMasterMutex());
+
if (haveConsensusObject ())
mConsensus->mapComplete (hash, map, true);
}
diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h
index 7d6645f03..22b9ec377 100644
--- a/src/ripple/app/misc/NetworkOPs.h
+++ b/src/ripple/app/misc/NetworkOPs.h
@@ -209,23 +209,15 @@ public:
std::shared_ptr set, RippleAddress nodePublic,
uint256 checkLedger, bool sigGood) = 0;
- virtual SHAMapAddNode gotTXData (const std::shared_ptr& peer,
- uint256 const& hash, const std::list& nodeIDs,
- const std::list< Blob >& nodeData) = 0;
-
virtual bool recvValidation (STValidation::ref val,
std::string const& source) = 0;
virtual void takePosition (int seq,
std::shared_ptr const& position) = 0;
- virtual std::shared_ptr getTXMap (uint256 const& hash) = 0;
-
virtual void mapComplete (uint256 const& hash,
std::shared_ptr const& map) = 0;
- virtual bool stillNeedTXSet (uint256 const& hash) = 0;
-
// Fetch packs
virtual void makeFetchPack (Job&, std::weak_ptr peer,
std::shared_ptr request,
diff --git a/src/ripple/app/tx/InboundTransactions.cpp b/src/ripple/app/tx/InboundTransactions.cpp
new file mode 100644
index 000000000..cdd5b70a6
--- /dev/null
+++ b/src/ripple/app/tx/InboundTransactions.cpp
@@ -0,0 +1,306 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include //
+
+namespace ripple {
+
+enum
+{
+ // Ideal number of peers to start with
+ startPeers = 2,
+
+ // How many rounds to keep a set
+ setKeepRounds = 3,
+};
+
+class InboundTransactionSet
+{
+// A transaction set we generated, acquired, or are acquiring
+public:
+ std::uint32_t mSeq;
+ TransactionAcquire::pointer mAcquire;
+ std::shared_ptr mSet;
+
+ InboundTransactionSet (
+ std::uint32_t seq,
+ std::shared_ptr const& set) :
+ mSeq (seq), mSet (set)
+ { ; }
+ InboundTransactionSet () : mSeq (0)
+ { ; }
+};
+
+class InboundTransactionsImp
+ : public InboundTransactions
+ , public beast::Stoppable
+{
+public:
+
+ typedef std::pair u256_acq_pair;
+
+ InboundTransactionsImp (
+ clock_type& clock,
+ Stoppable& parent,
+ beast::insight::Collector::ptr const& collector,
+ std::function const&)> gotSet)
+ : Stoppable ("InboundTransactions", parent)
+ , m_clock (clock)
+ , m_seq (0)
+ , m_zeroSet (m_map[uint256()])
+ , m_gotSet (std::move (gotSet))
+ {
+ m_zeroSet.mSet = std::make_shared (
+ SHAMapType::TRANSACTION, uint256(),
+ getApp().family(), deprecatedLogs().journal("SHAMap"));
+ m_zeroSet.mSet->setUnbacked();
+ }
+
+ TransactionAcquire::pointer getAcquire (uint256 const& hash)
+ {
+ {
+ ScopedLockType sl (mLock);
+
+ auto it = m_map.find (hash);
+
+ if (it != m_map.end ())
+ return it->second.mAcquire;
+ }
+ return {};
+ }
+
+ std::shared_ptr getSet (
+ uint256 const& hash,
+ bool acquire) override
+ {
+ TransactionAcquire::pointer ta;
+
+ {
+ ScopedLockType sl (mLock);
+
+ auto it = m_map.find (hash);
+
+ if (it != m_map.end ())
+ {
+ if (acquire)
+ {
+ it->second.mSeq = m_seq;
+ if (it->second.mAcquire)
+ {
+ it->second.mAcquire->stillNeed ();
+ }
+ }
+ return it->second.mSet;
+ }
+
+ if (!acquire || isStopping ())
+ return std::shared_ptr ();
+
+ ta = std::make_shared (hash, m_clock);
+
+ auto &obj = m_map[hash];
+ obj.mAcquire = ta;
+ obj.mSeq = m_seq;
+ }
+
+
+ ta->init (startPeers);
+
+ return {};
+ }
+
+ /** We received a TMLedgerData from a peer.
+ */
+ void gotData (LedgerHash const& hash,
+ std::shared_ptr peer,
+ std::shared_ptr packet_ptr)
+ {
+ protocol::TMLedgerData& packet = *packet_ptr;
+
+ WriteLog (lsTRACE, InboundLedger) <<
+ "Got data (" << packet.nodes ().size () << ") "
+ "for acquiring ledger: " << hash;
+
+ TransactionAcquire::pointer ta = getAcquire (hash);
+
+ if (ta == nullptr)
+ {
+ peer->charge (Resource::feeUnwantedData);
+ return;
+ }
+
+ std::list nodeIDs;
+ std::list< Blob > nodeData;
+ for (auto const &node : packet.nodes())
+ {
+ if (!node.has_nodeid () || !node.has_nodedata () || (
+ node.nodeid ().size () != 33))
+ {
+ peer->charge (Resource::feeInvalidRequest);
+ return;
+ }
+
+ nodeIDs.emplace_back (node.nodeid ().data (),
+ static_cast(node.nodeid ().size ()));
+ nodeData.emplace_back (node.nodedata ().begin (),
+ node.nodedata ().end ());
+ }
+
+ if (! ta->takeNodes (nodeIDs, nodeData, peer).isUseful ())
+ peer->charge (Resource::feeUnwantedData);
+ }
+
+ void giveSet (uint256 const& hash,
+ std::shared_ptr const& set,
+ bool fromAcquire) override
+ {
+ bool isNew = true;
+
+ {
+ ScopedLockType sl (mLock);
+
+ auto& inboundSet = m_map [hash];
+
+ if (inboundSet.mSeq < m_seq)
+ inboundSet.mSeq = m_seq;
+
+ if (inboundSet.mSet)
+ isNew = false;
+ else
+ inboundSet.mSet = set;
+
+ inboundSet.mAcquire.reset ();
+
+ }
+
+ if (isNew && fromAcquire)
+ m_gotSet (hash, set);
+ }
+
+ Json::Value getInfo() override
+ {
+ Json::Value ret (Json::objectValue);
+
+ Json::Value& sets = (ret["sets"] = Json::arrayValue);
+
+ {
+ ScopedLockType sl (mLock);
+
+ ret["seq"] = m_seq;
+
+ for (auto const& it : m_map)
+ {
+ Json::Value& set = sets [to_string (it.first)];
+ set["seq"] = it.second.mSeq;
+ if (it.second.mSet)
+ set["state"] = "complete";
+ else if (it.second.mAcquire)
+ set["state"] = "acquiring";
+ else
+ set["state"] = "dead";
+ }
+
+ }
+
+ return ret;
+ }
+
+ void newRound (std::uint32_t seq) override
+ {
+ ScopedLockType lock (mLock);
+
+ // Protect zero set from expiration
+ m_zeroSet.mSeq = seq;
+
+ if (m_seq != seq)
+ {
+
+ m_seq = seq;
+
+ auto it = m_map.begin ();
+
+ std::uint32_t const minSeq =
+ (seq < setKeepRounds) ? 0 : (seq - setKeepRounds);
+ std::uint32_t maxSeq = seq + setKeepRounds;
+
+ while (it != m_map.end ())
+ {
+ if (it->second.mSeq < minSeq || it->second.mSeq > maxSeq)
+ it = m_map.erase (it);
+ else
+ ++it;
+ }
+ }
+ }
+
+ void onStop () override
+ {
+ ScopedLockType lock (mLock);
+
+ m_map.clear ();
+
+ stopped();
+ }
+
+private:
+ clock_type& m_clock;
+
+ typedef hash_map MapType;
+
+ typedef RippleRecursiveMutex LockType;
+ typedef std::unique_lock ScopedLockType;
+ LockType mLock;
+
+ MapType m_map;
+ std::uint32_t m_seq;
+
+ // The empty transaction set whose hash is zero
+ InboundTransactionSet& m_zeroSet;
+
+ std::function const&)> m_gotSet;
+};
+
+//------------------------------------------------------------------------------
+
+InboundTransactions::~InboundTransactions() = default;
+
+std::unique_ptr
+make_InboundTransactions (
+ InboundLedgers::clock_type& clock,
+ beast::Stoppable& parent,
+ beast::insight::Collector::ptr const& collector,
+ std::function const&)> gotSet)
+{
+ return std::make_unique
+ (clock, parent, collector, std::move (gotSet));
+}
+
+} // ripple
diff --git a/src/ripple/app/tx/InboundTransactions.h b/src/ripple/app/tx/InboundTransactions.h
new file mode 100644
index 000000000..82a6103a2
--- /dev/null
+++ b/src/ripple/app/tx/InboundTransactions.h
@@ -0,0 +1,84 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012-2015 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_INBOUNDTRANSACTIONS_H
+#define RIPPLE_INBOUNDTRANSACTIONS_H
+
+#include
+#include
+#include
+#include //
+#include
+
+namespace ripple {
+
+/** Manages the acquisition and lifetime of transaction sets.
+*/
+
+class InboundTransactions
+{
+public:
+ typedef beast::abstract_clock clock_type;
+
+ InboundTransactions() = default;
+ InboundTransactions(InboundTransactions const&) = delete;
+ InboundTransactions& operator=(InboundTransactions const&) = delete;
+
+ virtual ~InboundTransactions() = 0;
+
+ /** Retrieves a transaction set by hash
+ */
+ virtual std::shared_ptr getSet (
+ uint256 const& setHash,
+ bool acquire) = 0;
+
+ /** Gives data to an inbound transaction set
+ */
+ virtual void gotData (uint256 const& setHash,
+ std::shared_ptr ,
+ std::shared_ptr ) = 0;
+
+ /** Gives set to the container
+ */
+ virtual void giveSet (uint256 const& setHash,
+ std::shared_ptr const& set,
+ bool acquired) = 0;
+
+ /** Informs the container if a new consensus round
+ */
+ virtual void newRound (std::uint32_t seq) = 0;
+
+ virtual Json::Value getInfo() = 0;
+
+ virtual void onStop() = 0;
+};
+
+std::unique_ptr
+make_InboundTransactions (
+ InboundTransactions::clock_type& clock,
+ beast::Stoppable& parent,
+ beast::insight::Collector::ptr const& collector,
+ std::function
+ const&)> gotSet);
+
+
+} // ripple
+
+#endif
diff --git a/src/ripple/app/tx/TransactionAcquire.cpp b/src/ripple/app/tx/TransactionAcquire.cpp
index d97f1ddaa..e34eec4c3 100644
--- a/src/ripple/app/tx/TransactionAcquire.cpp
+++ b/src/ripple/app/tx/TransactionAcquire.cpp
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -31,9 +32,11 @@ namespace ripple {
enum
{
- // VFALCO NOTE This should be a std::chrono::duration constant.
- // TODO Document this. Is it seconds? Milliseconds? WTF?
- TX_ACQUIRE_TIMEOUT = 250
+ // Timeout interval in milliseconds
+ TX_ACQUIRE_TIMEOUT = 250,
+
+ NORM_TIMEOUTS = 4,
+ MAX_TIMEOUTS = 20,
};
TransactionAcquire::TransactionAcquire (uint256 const& hash, clock_type& clock)
@@ -50,21 +53,9 @@ TransactionAcquire::~TransactionAcquire ()
{
}
-static void TACompletionHandler (uint256 hash, std::shared_ptr map)
-{
- {
- std::lock_guard lock(getApp().getMasterMutex());
-
- getApp().getOPs ().mapComplete (hash, map);
-
- getApp().getInboundLedgers ().dropLedger (hash);
- }
-}
-
void TransactionAcquire::done ()
{
- // We hold a PeerSet lock and so cannot acquire the master lock here
- std::shared_ptr map;
+ // We hold a PeerSet lock and so cannot do real work here
if (mFailed)
{
@@ -74,33 +65,28 @@ void TransactionAcquire::done ()
{
WriteLog (lsDEBUG, TransactionAcquire) << "Acquired TX set " << mHash;
mMap->setImmutable ();
- map = mMap;
+
+ uint256 const& hash (mHash);
+ std::shared_ptr const& map (mMap);
+ getApp().getJobQueue().addJob (jtTXN_DATA, "completeAcquire",
+ [hash, map](Job&)
+ {
+ getApp().getInboundTransactions().giveSet (
+ hash, map, true);
+ });
}
- getApp().getJobQueue().addJob (jtTXN_DATA, "completeAcquire", std::bind (&TACompletionHandler, mHash, map));
}
void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl)
{
bool aggressive = false;
- if (getTimeouts () > 10)
+ if (getTimeouts () >= NORM_TIMEOUTS)
{
- WriteLog (lsWARNING, TransactionAcquire) << "Ten timeouts on TX set " << getHash ();
- psl.unlock();
- {
- auto lock = beast::make_lock(getApp().getMasterMutex());
+ aggressive = true;
- if (getApp().getOPs ().stillNeedTXSet (mHash))
- {
- WriteLog (lsWARNING, TransactionAcquire) << "Still need it";
- mTimeouts = 0;
- aggressive = true;
- }
- }
- psl.lock();
-
- if (!aggressive)
+ if (getTimeouts () > MAX_TIMEOUTS)
{
mFailed = true;
done ();
@@ -108,30 +94,10 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl)
}
}
- if (aggressive || !getPeerCount ())
- {
- // out of peers
- WriteLog (lsWARNING, TransactionAcquire) << "Out of peers for TX set " << getHash ();
-
- bool found = false;
- Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers ();
- for (auto const& peer : peerList)
- {
- if (peer->hasTxSet (getHash ()))
- {
- found = true;
- peerHas (peer);
- }
- }
-
- if (!found)
- {
- for (auto const& peer : peerList)
- peerHas (peer);
- }
- }
- else if (!progress)
+ if (aggressive)
trigger (Peer::ptr ());
+
+ addPeers (1);
}
std::weak_ptr TransactionAcquire::pmDowncast ()
@@ -207,6 +173,8 @@ void TransactionAcquire::trigger (Peer::ptr const& peer)
SHAMapAddNode TransactionAcquire::takeNodes (const std::list& nodeIDs,
const std::list< Blob >& data, Peer::ptr const& peer)
{
+ ScopedLockType sl (mLock);
+
if (mComplete)
{
WriteLog (lsTRACE, TransactionAcquire) << "TX set complete";
@@ -262,4 +230,69 @@ SHAMapAddNode TransactionAcquire::takeNodes (const std::list& node
}
}
+void TransactionAcquire::addPeers (int numPeers)
+{
+ std::vector peerVec1, peerVec2;
+
+ {
+ auto peers = getApp().overlay().getActivePeers();
+ for (auto const& peer : peers)
+ {
+ if (peer->hasTxSet (mHash))
+ peerVec1.push_back (peer);
+ else
+ peerVec2.push_back (peer);
+ }
+ }
+
+ WriteLog (lsDEBUG, TransactionAcquire) << peerVec1.size() << " peers known to have " << mHash;
+
+ if (peerVec1.size() != 0)
+ {
+ // First try peers known to have the set
+ std::random_shuffle (peerVec1.begin (), peerVec1.end ());
+
+ for (auto const& peer : peerVec1)
+ {
+ if (peerHas (peer))
+ {
+ if (--numPeers <= 0)
+ return;
+ }
+ }
+ }
+
+ if (peerVec2.size() != 0)
+ {
+ // Then try peers not known to have the set
+ std::random_shuffle (peerVec2.begin (), peerVec2.end ());
+
+ for (auto const& peer : peerVec2)
+ {
+ if (peerHas (peer))
+ {
+ if (--numPeers <= 0)
+ return;
+ }
+ }
+ }
+}
+
+void TransactionAcquire::init (int numPeers)
+{
+ ScopedLockType sl (mLock);
+
+ addPeers (numPeers);
+
+ setTimer ();
+}
+
+void TransactionAcquire::stillNeed ()
+{
+ ScopedLockType sl (mLock);
+
+ if (mTimeouts > NORM_TIMEOUTS)
+ mTimeouts = NORM_TIMEOUTS;
+}
+
} // ripple
diff --git a/src/ripple/app/tx/TransactionAcquire.h b/src/ripple/app/tx/TransactionAcquire.h
index d5ea8e201..f6f94d119 100644
--- a/src/ripple/app/tx/TransactionAcquire.h
+++ b/src/ripple/app/tx/TransactionAcquire.h
@@ -49,17 +49,28 @@ public:
SHAMapAddNode takeNodes (const std::list& IDs,
const std::list< Blob >& data, Peer::ptr const&);
+ void init (int startPeers);
+
+ void stillNeed ();
+
private:
+
std::shared_ptr mMap;
bool mHaveRoot;
void onTimer (bool progress, ScopedLockType& peerSetLock);
+
+
void newPeer (Peer::ptr const& peer)
{
trigger (peer);
}
void done ();
+
+ // Tries to add the specified number of peers
+ void addPeers (int num);
+
void trigger (Peer::ptr const&);
std::weak_ptr pmDowncast ();
};
diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp
index 718828f66..350951963 100644
--- a/src/ripple/overlay/impl/PeerImp.cpp
+++ b/src/ripple/overlay/impl/PeerImp.cpp
@@ -27,6 +27,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -1613,10 +1614,7 @@ PeerImp::getLedger (std::shared_ptr const& m)
uint256 txHash;
memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
- {
- std::lock_guard lock(getApp().getMasterMutex());
- map = getApp().getOPs ().getTXMap (txHash);
- }
+ map = getApp().getInboundTransactions().getSet (txHash, false);
if (!map)
{
@@ -1946,44 +1944,12 @@ PeerImp::getLedger (std::shared_ptr const& m)
send (oPacket);
}
-// VFALCO TODO Make this non-static
void
PeerImp::peerTXData (Job&, uint256 const& hash,
std::shared_ptr const& pPacket,
beast::Journal journal)
{
- protocol::TMLedgerData& packet = *pPacket;
-
- std::list nodeIDs;
- std::list< Blob > nodeData;
- for (int i = 0; i < packet.nodes ().size (); ++i)
- {
- const protocol::TMLedgerNode& node = packet.nodes (i);
-
- if (!node.has_nodeid () || !node.has_nodedata () || (
- node.nodeid ().size () != 33))
- {
- journal.warning << "LedgerData request with invalid node ID";
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (),
- static_cast(node.nodeid ().size ())});
- nodeData.push_back (Blob (node.nodedata ().begin (),
- node.nodedata ().end ()));
- }
-
- SHAMapAddNode san;
- {
- std::lock_guard lock(getApp().getMasterMutex());
-
- san = getApp().getOPs().gotTXData (shared_from_this(),
- hash, nodeIDs, nodeData);
- }
-
- if (san.isInvalid ())
- charge (Resource::feeUnwantedData);
+ getApp().getInboundTransactions().gotData (hash, shared_from_this(), pPacket);
}
} // ripple
diff --git a/src/ripple/unity/app7.cpp b/src/ripple/unity/app7.cpp
index 1b49c05a4..9d87bc6b4 100644
--- a/src/ripple/unity/app7.cpp
+++ b/src/ripple/unity/app7.cpp
@@ -23,4 +23,5 @@
#include
#include
#include
+#include
#include