Remove transaction set acquire logic from consensus object

This creates a new InboundTransactions object that handles transaction sets,
removing this responsibility from the consensus object. The main benefit is
that many inbound transaction operations no longer require the master lock.

Improve logic to decide which peers to query, when to add more peers, and
when to re-query existing peers.
This commit is contained in:
David Schwartz
2015-03-09 11:39:52 -07:00
committed by Tom Ritchford
parent 00596f1436
commit 1fedede771
15 changed files with 575 additions and 321 deletions

View File

@@ -1047,6 +1047,8 @@
</ClCompile> </ClCompile>
<ClInclude Include="..\..\src\beast\beast\utility\Journal.h"> <ClInclude Include="..\..\src\beast\beast\utility\Journal.h">
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\beast\beast\utility\make_lock.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\utility\maybe_const.h"> <ClInclude Include="..\..\src\beast\beast\utility\maybe_const.h">
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\beast\beast\utility\meta.h"> <ClInclude Include="..\..\src\beast\beast\utility\meta.h">
@@ -1945,6 +1947,8 @@
<ClCompile Include="..\..\src\ripple\app\tests\Path_test.cpp"> <ClCompile Include="..\..\src\ripple\app\tests\Path_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='debug.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='release.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile> </ClCompile>
<ClCompile Include="..\..\src\ripple\app\transactors\CancelOffer.cpp"> <ClCompile Include="..\..\src\ripple\app\transactors\CancelOffer.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
@@ -2008,6 +2012,14 @@
</ClCompile> </ClCompile>
<ClInclude Include="..\..\src\ripple\app\transactors\Transactor.h"> <ClInclude Include="..\..\src\ripple\app\transactors\Transactor.h">
</ClInclude> </ClInclude>
<ClCompile Include="..\..\src\ripple\app\tx\InboundTransactions.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='debug.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='release.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<ClInclude Include="..\..\src\ripple\app\tx\InboundTransactions.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\app\tx\LocalTxs.cpp"> <ClCompile Include="..\..\src\ripple\app\tx\LocalTxs.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>

View File

@@ -1707,6 +1707,9 @@
<ClInclude Include="..\..\src\beast\beast\utility\Journal.h"> <ClInclude Include="..\..\src\beast\beast\utility\Journal.h">
<Filter>beast\utility</Filter> <Filter>beast\utility</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\beast\beast\utility\make_lock.h">
<Filter>beast\utility</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\utility\maybe_const.h"> <ClInclude Include="..\..\src\beast\beast\utility\maybe_const.h">
<Filter>beast\utility</Filter> <Filter>beast\utility</Filter>
</ClInclude> </ClInclude>
@@ -2571,6 +2574,12 @@
<ClInclude Include="..\..\src\ripple\app\transactors\Transactor.h"> <ClInclude Include="..\..\src\ripple\app\transactors\Transactor.h">
<Filter>ripple\app\transactors</Filter> <Filter>ripple\app\transactors</Filter>
</ClInclude> </ClInclude>
<ClCompile Include="..\..\src\ripple\app\tx\InboundTransactions.cpp">
<Filter>ripple\app\tx</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\app\tx\InboundTransactions.h">
<Filter>ripple\app\tx</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\app\tx\LocalTxs.cpp"> <ClCompile Include="..\..\src\ripple\app\tx\LocalTxs.cpp">
<Filter>ripple\app\tx</Filter> <Filter>ripple\app\tx</Filter>
</ClCompile> </ClCompile>

View File

@@ -31,6 +31,7 @@
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/Validations.h> #include <ripple/app/misc/Validations.h>
#include <ripple/app/tx/TransactionAcquire.h> #include <ripple/app/tx/TransactionAcquire.h>
#include <ripple/app/tx/InboundTransactions.h>
#include <ripple/basics/CountedObject.h> #include <ripple/basics/CountedObject.h>
#include <ripple/basics/Log.h> #include <ripple/basics/Log.h>
#include <ripple/core/Config.h> #include <ripple/core/Config.h>
@@ -78,7 +79,6 @@ public:
/** /**
The result of applying a transaction to a ledger. The result of applying a transaction to a ledger.
@param clock The clock which will be used to measure time.
@param localtx A set of local transactions to apply. @param localtx A set of local transactions to apply.
@param prevLCLHash The hash of the Last Closed Ledger (LCL). @param prevLCLHash The hash of the Last Closed Ledger (LCL).
@param previousLedger Best guess of what the Last Closed Ledger (LCL) @param previousLedger Best guess of what the Last Closed Ledger (LCL)
@@ -86,11 +86,10 @@ public:
@param closeTime Closing time point of the LCL. @param closeTime Closing time point of the LCL.
@param feeVote Our desired fee levels and voting logic. @param feeVote Our desired fee levels and voting logic.
*/ */
LedgerConsensusImp (clock_type& clock, LocalTxs& localtx, LedgerConsensusImp (LocalTxs& localtx,
LedgerHash const & prevLCLHash, Ledger::ref previousLedger, LedgerHash const & prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime, FeeVote& feeVote) std::uint32_t closeTime, FeeVote& feeVote)
: m_clock (clock) : m_localTX (localtx)
, m_localTX (localtx)
, m_feeVote (feeVote) , m_feeVote (feeVote)
, mState (lcsPRE_CLOSE) , mState (lcsPRE_CLOSE)
, mCloseTime (closeTime) , mCloseTime (closeTime)
@@ -112,6 +111,8 @@ public:
mPreviousMSeconds = getApp().getOPs ().getPreviousConvergeTime (); mPreviousMSeconds = getApp().getOPs ().getPreviousConvergeTime ();
assert (mPreviousMSeconds); assert (mPreviousMSeconds);
getApp().getInboundTransactions().newRound (mPreviousLedger->getLedgerSeq());
// Adapt close time resolution to recent network conditions // Adapt close time resolution to recent network conditions
mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution (
mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseResolution (),
@@ -256,16 +257,6 @@ public:
ret["acquired"] = acq; ret["acquired"] = acq;
} }
if (!mAcquiring.empty ())
{
Json::Value acq (Json::arrayValue);
for (auto& at : mAcquiring)
{
acq.append (to_string (at.first));
}
ret["acquiring"] = acq;
}
if (!mDisputes.empty ()) if (!mDisputes.empty ())
{ {
Json::Value dsj (Json::objectValue); Json::Value dsj (Json::objectValue);
@@ -310,64 +301,6 @@ public:
return mPrevLedgerHash; return mPrevLedgerHash;
} }
/**
Get a transaction tree, fetching it from the network if required and
requested. When the transaction acquire engine successfully acquires
a transaction set, it will call back.
@param hash hash of the requested transaction tree.
@param doAcquire true if we should get this from the network if we don't
already have it.
@return Pointer to the transaction tree if we got it, else
nullptr.
*/
std::shared_ptr<SHAMap>
getTransactionTree (uint256 const& hash, bool doAcquire)
{
auto it = mAcquired.find (hash);
if (it != mAcquired.end ())
return it->second;
if (mState == lcsPRE_CLOSE)
{
std::shared_ptr<SHAMap> currentMap
= getApp().getLedgerMaster ().getCurrentLedger ()
->peekTransactionMap ();
if (currentMap->getHash () == hash)
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Map " << hash << " is our current";
currentMap = currentMap->snapShot (false);
mapCompleteInternal (hash, currentMap, false);
return currentMap;
}
}
if (doAcquire)
{
TransactionAcquire::pointer& acquiring = mAcquiring[hash];
if (!acquiring)
{
if (hash.isZero ())
{
auto empty = std::make_shared<SHAMap> (
SHAMapType::TRANSACTION, getApp().family(),
deprecatedLogs().journal("SHAMap"));
mapCompleteInternal (hash, empty, false);
return empty;
}
acquiring = std::make_shared<TransactionAcquire> (hash, m_clock);
startAcquiring (acquiring);
}
}
return std::shared_ptr<SHAMap> ();
}
/** /**
We have a complete transaction set, typically acquired from the network We have a complete transaction set, typically acquired from the network
@@ -401,7 +334,6 @@ public:
{ {
// this is an invalid/corrupt map // this is an invalid/corrupt map
mAcquired[hash] = map; mAcquired[hash] = map;
mAcquiring.erase (hash);
WriteLog (lsWARNING, LedgerConsensus) WriteLog (lsWARNING, LedgerConsensus)
<< "A trusted node directed us to acquire an invalid TXN map"; << "A trusted node directed us to acquire an invalid TXN map";
return; return;
@@ -416,7 +348,6 @@ public:
{ {
if (it->second) if (it->second)
{ {
mAcquiring.erase (hash);
return; // we already have this map return; // we already have this map
} }
@@ -426,6 +357,15 @@ public:
// We now have a map that we did not have before // We now have a map that we did not have before
if (!acquired)
{
// Put the map where others can get it
getApp().getInboundTransactions().giveSet (hash, map, false);
}
// Inform directly-connected peers that we have this transaction set
sendHaveTxSet (hash, true);
if (mOurPosition && (!mOurPosition->isBowOut ()) if (mOurPosition && (!mOurPosition->isBowOut ())
&& (hash != mOurPosition->getCurrentHash ())) && (hash != mOurPosition->getCurrentHash ()))
{ {
@@ -448,7 +388,6 @@ public:
<< "Not ready to create disputes"; << "Not ready to create disputes";
mAcquired[hash] = map; mAcquired[hash] = map;
mAcquiring.erase (hash);
// Adjust tracking for each peer that takes this position // Adjust tracking for each peer that takes this position
std::vector<NodeID> peers; std::vector<NodeID> peers;
@@ -469,30 +408,6 @@ public:
<< hash << " no peers were proposing it"; << hash << " no peers were proposing it";
} }
// Inform directly-connected peers that we have this transaction set
sendHaveTxSet (hash, true);
}
/**
Determine if we still need to acquire a transaction set from the network.
If a transaction set is popular, we probably have it. If it's unpopular,
we probably don't need it (and the peer that initially made us
retrieve it has probably already changed its position).
@param hash hash of the transaction set.
@return true if we need to acquire it, else false.
*/
bool stillNeedTXSet (uint256 const& hash)
{
if (mAcquired.find (hash) != mAcquired.end ())
return false;
for (auto const& it : mPeerPositions)
{
if (it.second->getCurrentHash () == hash)
return true;
}
return false;
} }
/** /**
@@ -852,6 +767,20 @@ public:
, mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail); , mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail);
} }
std::shared_ptr<SHAMap> getTransactionTree (uint256 const& hash)
{
auto it = mAcquired.find (hash);
if (it != mAcquired.end() && it->second)
return it->second;
auto set = getApp().getInboundTransactions().getSet (hash, true);
if (set)
mAcquired[hash] = set;
return set;
}
/** /**
A server has taken a new position, adjust our tracking A server has taken a new position, adjust our tracking
Called when a peer takes a new postion. Called when a peer takes a new postion.
@@ -910,7 +839,7 @@ public:
currentPosition = newPosition; currentPosition = newPosition;
std::shared_ptr<SHAMap> set std::shared_ptr<SHAMap> set
= getTransactionTree (newPosition->getCurrentHash (), true); = getTransactionTree (newPosition->getCurrentHash ());
if (set) if (set)
{ {
@@ -926,32 +855,6 @@ public:
return true; return true;
} }
/**
A peer has sent us some nodes from a transaction set
@param peer The peer which has sent the nodes
@param setHash The transaction set
@param nodeIDs The nodes in the transaction set
@param nodeData The data
@return The status results of adding the nodes.
*/
SHAMapAddNode peerGaveNodes (Peer::ptr const& peer
, uint256 const& setHash, const std::list<SHAMapNodeID>& nodeIDs
, const std::list< Blob >& nodeData)
{
auto acq (mAcquiring.find (setHash));
if (acq == mAcquiring.end ())
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Got TX data for set no longer acquiring: " << setHash;
return SHAMapAddNode ();
}
// We must keep the set around during the function
TransactionAcquire::pointer set = acq->second;
return set->takeNodes (nodeIDs, nodeData, peer);
}
bool isOurPubKey (const RippleAddress & k) bool isOurPubKey (const RippleAddress & k)
{ {
return k == mValPublic; return k == mValPublic;
@@ -1197,36 +1100,6 @@ private:
} }
} }
/**
Begin acquiring a transaction set
@param acquire The transaction set to acquire.
*/
void startAcquiring (TransactionAcquire::pointer acquire)
{
// FIXME: Randomize and limit the number
struct build_acquire_list
{
typedef void return_type;
TransactionAcquire::pointer const& acquire;
build_acquire_list (TransactionAcquire::pointer const& acq)
: acquire(acq)
{ }
return_type operator() (Peer::ptr const& peer) const
{
if (peer->hasTxSet (acquire->getHash ()))
acquire->peerHas (peer);
}
};
getApp().overlay ().foreach (build_acquire_list (acquire));
acquire->setTimer ();
}
/** /**
Compare two proposed transaction sets and create disputed Compare two proposed transaction sets and create disputed
transctions structures for any mismatches transctions structures for any mismatches
@@ -1883,7 +1756,6 @@ private:
val->setFieldU32(sfLoadFee, fee); val->setFieldU32(sfLoadFee, fee);
} }
private: private:
clock_type& m_clock;
LocalTxs& m_localTX; LocalTxs& m_localTX;
FeeVote& m_feeVote; FeeVote& m_feeVote;
@@ -1918,7 +1790,6 @@ private:
// Transaction Sets, indexed by hash of transaction tree // Transaction Sets, indexed by hash of transaction tree
hash_map<uint256, std::shared_ptr<SHAMap>> mAcquired; hash_map<uint256, std::shared_ptr<SHAMap>> mAcquired;
hash_map<uint256, TransactionAcquire::pointer> mAcquiring;
// Disputed transactions // Disputed transactions
hash_map<uint256, DisputedTx::pointer> mDisputes; hash_map<uint256, DisputedTx::pointer> mDisputes;
@@ -1938,11 +1809,11 @@ LedgerConsensus::~LedgerConsensus ()
} }
std::shared_ptr <LedgerConsensus> std::shared_ptr <LedgerConsensus>
make_LedgerConsensus (LedgerConsensus::clock_type& clock, LocalTxs& localtx, make_LedgerConsensus (LocalTxs& localtx,
LedgerHash const &prevLCLHash, Ledger::ref previousLedger, LedgerHash const &prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime, FeeVote& feeVote) std::uint32_t closeTime, FeeVote& feeVote)
{ {
return std::make_shared <LedgerConsensusImp> (clock, localtx, return std::make_shared <LedgerConsensusImp> (localtx,
prevLCLHash, previousLedger, closeTime, feeVote); prevLCLHash, previousLedger, closeTime, feeVote);
} }

View File

@@ -28,7 +28,6 @@
#include <ripple/json/json_value.h> #include <ripple/json/json_value.h>
#include <ripple/overlay/Peer.h> #include <ripple/overlay/Peer.h>
#include <ripple/protocol/RippleLedgerHash.h> #include <ripple/protocol/RippleLedgerHash.h>
#include <beast/chrono/abstract_clock.h>
#include <chrono> #include <chrono>
namespace ripple { namespace ripple {
@@ -41,8 +40,6 @@ namespace ripple {
class LedgerConsensus class LedgerConsensus
{ {
public: public:
typedef beast::abstract_clock <std::chrono::steady_clock> clock_type;
virtual ~LedgerConsensus() = 0; virtual ~LedgerConsensus() = 0;
virtual int startup () = 0; virtual int startup () = 0;
@@ -53,14 +50,9 @@ public:
virtual uint256 getLCL () = 0; virtual uint256 getLCL () = 0;
virtual std::shared_ptr<SHAMap> getTransactionTree (uint256 const& hash,
bool doAcquire) = 0;
virtual void mapComplete (uint256 const& hash, virtual void mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map, bool acquired) = 0; std::shared_ptr<SHAMap> const& map, bool acquired) = 0;
virtual bool stillNeedTXSet (uint256 const& hash) = 0;
virtual void checkLCL () = 0; virtual void checkLCL () = 0;
virtual void handleLCL (uint256 const& lclHash) = 0; virtual void handleLCL (uint256 const& lclHash) = 0;
@@ -77,11 +69,6 @@ public:
virtual bool peerPosition (LedgerProposal::ref) = 0; virtual bool peerPosition (LedgerProposal::ref) = 0;
virtual SHAMapAddNode peerGaveNodes (Peer::ptr const& peer,
uint256 const& setHash,
const std::list<SHAMapNodeID>& nodeIDs,
const std::list< Blob >& nodeData) = 0;
virtual bool isOurPubKey (const RippleAddress & k) = 0; virtual bool isOurPubKey (const RippleAddress & k) = 0;
// test/debug // test/debug
@@ -89,7 +76,7 @@ public:
}; };
std::shared_ptr <LedgerConsensus> std::shared_ptr <LedgerConsensus>
make_LedgerConsensus (LedgerConsensus::clock_type& clock, LocalTxs& localtx, make_LedgerConsensus (LocalTxs& localtx,
LedgerHash const & prevLCLHash, Ledger::ref previousLedger, LedgerHash const & prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime, FeeVote& feeVote); std::uint32_t closeTime, FeeVote& feeVote);

View File

@@ -78,7 +78,7 @@ bool ConsensusTransSetSF::haveNode (const SHAMapNodeID& id, uint256 const& nodeH
if (txn) if (txn)
{ {
// this is a transaction, and we have it // this is a transaction, and we have it
WriteLog (lsDEBUG, TransactionAcquire) << "Node in our acquiring TX set is TXN we have"; WriteLog (lsTRACE, TransactionAcquire) << "Node in our acquiring TX set is TXN we have";
Serializer s; Serializer s;
s.add32 (HashPrefix::transactionID); s.add32 (HashPrefix::transactionID);
txn->getSTransaction ()->add (s, true); txn->getSTransaction ()->add (s, true);

View File

@@ -39,6 +39,7 @@
#include <ripple/app/paths/FindPaths.h> #include <ripple/app/paths/FindPaths.h>
#include <ripple/app/paths/PathRequests.h> #include <ripple/app/paths/PathRequests.h>
#include <ripple/app/peers/UniqueNodeList.h> #include <ripple/app/peers/UniqueNodeList.h>
#include <ripple/app/tx/InboundTransactions.h>
#include <ripple/app/tx/TransactionMaster.h> #include <ripple/app/tx/TransactionMaster.h>
#include <ripple/basics/Log.h> #include <ripple/basics/Log.h>
#include <ripple/basics/LoggedTimings.h> #include <ripple/basics/LoggedTimings.h>
@@ -276,6 +277,7 @@ public:
std::unique_ptr <PathRequests> m_pathRequests; std::unique_ptr <PathRequests> m_pathRequests;
std::unique_ptr <LedgerMaster> m_ledgerMaster; std::unique_ptr <LedgerMaster> m_ledgerMaster;
std::unique_ptr <InboundLedgers> m_inboundLedgers; std::unique_ptr <InboundLedgers> m_inboundLedgers;
std::unique_ptr <InboundTransactions> m_inboundTransactions;
std::unique_ptr <NetworkOPs> m_networkOPs; std::unique_ptr <NetworkOPs> m_networkOPs;
std::unique_ptr <UniqueNodeList> m_deprecatedUNL; std::unique_ptr <UniqueNodeList> m_deprecatedUNL;
std::unique_ptr <ServerHandler> serverHandler_; std::unique_ptr <ServerHandler> serverHandler_;
@@ -380,6 +382,16 @@ public:
, m_inboundLedgers (make_InboundLedgers (get_seconds_clock (), , m_inboundLedgers (make_InboundLedgers (get_seconds_clock (),
*m_jobQueue, m_collectorManager->collector ())) *m_jobQueue, m_collectorManager->collector ()))
, m_inboundTransactions (make_InboundTransactions
( get_seconds_clock ()
, *m_jobQueue
, m_collectorManager->collector ()
, [this](uint256 const& setHash,
std::shared_ptr <SHAMap> const& set)
{
gotTXSet (setHash, set);
}))
, m_networkOPs (make_NetworkOPs (get_seconds_clock (), , m_networkOPs (make_NetworkOPs (get_seconds_clock (),
getConfig ().RUN_STANDALONE, getConfig ().NETWORK_QUORUM, getConfig ().RUN_STANDALONE, getConfig ().NETWORK_QUORUM,
*m_jobQueue, *m_ledgerMaster, *m_jobQueue, *m_jobQueue, *m_ledgerMaster, *m_jobQueue,
@@ -504,6 +516,16 @@ public:
return *m_inboundLedgers; return *m_inboundLedgers;
} }
InboundTransactions& getInboundTransactions ()
{
return *m_inboundTransactions;
}
void gotTXSet (uint256 const& setHash, std::shared_ptr<SHAMap> const& set)
{
m_networkOPs->mapComplete (setHash, set);
}
TransactionMaster& getMasterTransaction () TransactionMaster& getMasterTransaction ()
{ {
return m_txMaster; return m_txMaster;

View File

@@ -49,6 +49,7 @@ class LocalCredentials;
class UniqueNodeList; class UniqueNodeList;
class JobQueue; class JobQueue;
class InboundLedgers; class InboundLedgers;
class InboundTransactions;
class LedgerMaster; class LedgerMaster;
class LoadManager; class LoadManager;
class NetworkOPs; class NetworkOPs;
@@ -106,6 +107,7 @@ public:
virtual Validations& getValidations () = 0; virtual Validations& getValidations () = 0;
virtual NodeStore::Database& getNodeStore () = 0; virtual NodeStore::Database& getNodeStore () = 0;
virtual InboundLedgers& getInboundLedgers () = 0; virtual InboundLedgers& getInboundLedgers () = 0;
virtual InboundTransactions& getInboundTransactions () = 0;
virtual LedgerMaster& getLedgerMaster () = 0; virtual LedgerMaster& getLedgerMaster () = 0;
virtual NetworkOPs& getOPs () = 0; virtual NetworkOPs& getOPs () = 0;
virtual OrderBookDB& getOrderBookDB () = 0; virtual OrderBookDB& getOrderBookDB () = 0;

View File

@@ -279,11 +279,6 @@ public:
std::shared_ptr<protocol::TMProposeSet> set, std::shared_ptr<protocol::TMProposeSet> set,
RippleAddress nodePublic, uint256 checkLedger, bool sigGood); RippleAddress nodePublic, uint256 checkLedger, bool sigGood);
SHAMapAddNode gotTXData (
const std::shared_ptr<Peer>& peer, uint256 const& hash,
const std::list<SHAMapNodeID>& nodeIDs,
const std::list< Blob >& nodeData);
bool recvValidation ( bool recvValidation (
STValidation::ref val, std::string const& source); STValidation::ref val, std::string const& source);
void takePosition (int seq, std::shared_ptr<SHAMap> const& position); void takePosition (int seq, std::shared_ptr<SHAMap> const& position);
@@ -293,7 +288,6 @@ public:
protocol::TxSetStatus status); protocol::TxSetStatus status);
void mapComplete (uint256 const& hash, std::shared_ptr<SHAMap> const& map); void mapComplete (uint256 const& hash, std::shared_ptr<SHAMap> const& map);
bool stillNeedTXSet (uint256 const& hash);
void makeFetchPack ( void makeFetchPack (
Job&, std::weak_ptr<Peer> peer, Job&, std::weak_ptr<Peer> peer,
std::shared_ptr<protocol::TMGetObjectByHash> request, std::shared_ptr<protocol::TMGetObjectByHash> request,
@@ -1550,7 +1544,7 @@ int NetworkOPsImp::beginConsensus (
assert (!mConsensus); assert (!mConsensus);
prevLedger->setImmutable (); prevLedger->setImmutable ();
mConsensus = make_LedgerConsensus (m_clock, *m_localTX, networkClosed, mConsensus = make_LedgerConsensus (*m_localTX, networkClosed,
prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC (), prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC (),
*m_feeVote); *m_feeVote);
@@ -1657,21 +1651,6 @@ void NetworkOPsImp::processTrustedProposal (
} }
} }
// Must be called while holding the master lock
std::shared_ptr<SHAMap>
NetworkOPsImp::getTXMap (uint256 const& hash)
{
auto it = mRecentPositions.find (hash);
if (it != mRecentPositions.end ())
return it->second.second;
if (!haveConsensusObject ())
return std::shared_ptr<SHAMap> ();
return mConsensus->getTransactionTree (hash, false);
}
// Must be called while holding the master lock // Must be called while holding the master lock
void void
NetworkOPsImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position) NetworkOPsImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position)
@@ -1693,33 +1672,12 @@ NetworkOPsImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position)
} }
} }
// Call with the master lock for now
SHAMapAddNode NetworkOPsImp::gotTXData (
const std::shared_ptr<Peer>& peer, uint256 const& hash,
const std::list<SHAMapNodeID>& nodeIDs, const std::list< Blob >& nodeData)
{
if (!mConsensus)
{
m_journal.debug << "Got TX data with no consensus object";
return SHAMapAddNode ();
}
return mConsensus->peerGaveNodes (peer, hash, nodeIDs, nodeData);
}
bool NetworkOPsImp::stillNeedTXSet (uint256 const& hash)
{
if (!mConsensus)
return false;
return mConsensus->stillNeedTXSet (hash);
}
void void
NetworkOPsImp::mapComplete (uint256 const& hash, NetworkOPsImp::mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map) std::shared_ptr<SHAMap> const& map)
{ {
std::lock_guard<Application::MutexType> lock(getApp().getMasterMutex());
if (haveConsensusObject ()) if (haveConsensusObject ())
mConsensus->mapComplete (hash, map, true); mConsensus->mapComplete (hash, map, true);
} }

View File

@@ -209,23 +209,15 @@ public:
std::shared_ptr<protocol::TMProposeSet> set, RippleAddress nodePublic, std::shared_ptr<protocol::TMProposeSet> set, RippleAddress nodePublic,
uint256 checkLedger, bool sigGood) = 0; uint256 checkLedger, bool sigGood) = 0;
virtual SHAMapAddNode gotTXData (const std::shared_ptr<Peer>& peer,
uint256 const& hash, const std::list<SHAMapNodeID>& nodeIDs,
const std::list< Blob >& nodeData) = 0;
virtual bool recvValidation (STValidation::ref val, virtual bool recvValidation (STValidation::ref val,
std::string const& source) = 0; std::string const& source) = 0;
virtual void takePosition (int seq, virtual void takePosition (int seq,
std::shared_ptr<SHAMap> const& position) = 0; std::shared_ptr<SHAMap> const& position) = 0;
virtual std::shared_ptr<SHAMap> getTXMap (uint256 const& hash) = 0;
virtual void mapComplete (uint256 const& hash, virtual void mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map) = 0; std::shared_ptr<SHAMap> const& map) = 0;
virtual bool stillNeedTXSet (uint256 const& hash) = 0;
// Fetch packs // Fetch packs
virtual void makeFetchPack (Job&, std::weak_ptr<Peer> peer, virtual void makeFetchPack (Job&, std::weak_ptr<Peer> peer,
std::shared_ptr<protocol::TMGetObjectByHash> request, std::shared_ptr<protocol::TMGetObjectByHash> request,

View File

@@ -0,0 +1,306 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/tx/InboundTransactions.h>
#include <ripple/app/tx/TransactionAcquire.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/Log.h>
#include <ripple/core/JobQueue.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/resource/Fees.h>
#include <beast/cxx14/memory.h> // <memory>
namespace ripple {
enum
{
// Ideal number of peers to start with
startPeers = 2,
// How many rounds to keep a set
setKeepRounds = 3,
};
class InboundTransactionSet
{
// A transaction set we generated, acquired, or are acquiring
public:
std::uint32_t mSeq;
TransactionAcquire::pointer mAcquire;
std::shared_ptr <SHAMap> mSet;
InboundTransactionSet (
std::uint32_t seq,
std::shared_ptr <SHAMap> const& set) :
mSeq (seq), mSet (set)
{ ; }
InboundTransactionSet () : mSeq (0)
{ ; }
};
class InboundTransactionsImp
: public InboundTransactions
, public beast::Stoppable
{
public:
typedef std::pair<uint256, TransactionAcquire::pointer> u256_acq_pair;
InboundTransactionsImp (
clock_type& clock,
Stoppable& parent,
beast::insight::Collector::ptr const& collector,
std::function <void (uint256 const&,
std::shared_ptr <SHAMap> const&)> gotSet)
: Stoppable ("InboundTransactions", parent)
, m_clock (clock)
, m_seq (0)
, m_zeroSet (m_map[uint256()])
, m_gotSet (std::move (gotSet))
{
m_zeroSet.mSet = std::make_shared<SHAMap> (
SHAMapType::TRANSACTION, uint256(),
getApp().family(), deprecatedLogs().journal("SHAMap"));
m_zeroSet.mSet->setUnbacked();
}
TransactionAcquire::pointer getAcquire (uint256 const& hash)
{
{
ScopedLockType sl (mLock);
auto it = m_map.find (hash);
if (it != m_map.end ())
return it->second.mAcquire;
}
return {};
}
std::shared_ptr <SHAMap> getSet (
uint256 const& hash,
bool acquire) override
{
TransactionAcquire::pointer ta;
{
ScopedLockType sl (mLock);
auto it = m_map.find (hash);
if (it != m_map.end ())
{
if (acquire)
{
it->second.mSeq = m_seq;
if (it->second.mAcquire)
{
it->second.mAcquire->stillNeed ();
}
}
return it->second.mSet;
}
if (!acquire || isStopping ())
return std::shared_ptr <SHAMap> ();
ta = std::make_shared <TransactionAcquire> (hash, m_clock);
auto &obj = m_map[hash];
obj.mAcquire = ta;
obj.mSeq = m_seq;
}
ta->init (startPeers);
return {};
}
/** We received a TMLedgerData from a peer.
*/
void gotData (LedgerHash const& hash,
std::shared_ptr<Peer> peer,
std::shared_ptr<protocol::TMLedgerData> packet_ptr)
{
protocol::TMLedgerData& packet = *packet_ptr;
WriteLog (lsTRACE, InboundLedger) <<
"Got data (" << packet.nodes ().size () << ") "
"for acquiring ledger: " << hash;
TransactionAcquire::pointer ta = getAcquire (hash);
if (ta == nullptr)
{
peer->charge (Resource::feeUnwantedData);
return;
}
std::list<SHAMapNodeID> nodeIDs;
std::list< Blob > nodeData;
for (auto const &node : packet.nodes())
{
if (!node.has_nodeid () || !node.has_nodedata () || (
node.nodeid ().size () != 33))
{
peer->charge (Resource::feeInvalidRequest);
return;
}
nodeIDs.emplace_back (node.nodeid ().data (),
static_cast<int>(node.nodeid ().size ()));
nodeData.emplace_back (node.nodedata ().begin (),
node.nodedata ().end ());
}
if (! ta->takeNodes (nodeIDs, nodeData, peer).isUseful ())
peer->charge (Resource::feeUnwantedData);
}
void giveSet (uint256 const& hash,
std::shared_ptr <SHAMap> const& set,
bool fromAcquire) override
{
bool isNew = true;
{
ScopedLockType sl (mLock);
auto& inboundSet = m_map [hash];
if (inboundSet.mSeq < m_seq)
inboundSet.mSeq = m_seq;
if (inboundSet.mSet)
isNew = false;
else
inboundSet.mSet = set;
inboundSet.mAcquire.reset ();
}
if (isNew && fromAcquire)
m_gotSet (hash, set);
}
Json::Value getInfo() override
{
Json::Value ret (Json::objectValue);
Json::Value& sets = (ret["sets"] = Json::arrayValue);
{
ScopedLockType sl (mLock);
ret["seq"] = m_seq;
for (auto const& it : m_map)
{
Json::Value& set = sets [to_string (it.first)];
set["seq"] = it.second.mSeq;
if (it.second.mSet)
set["state"] = "complete";
else if (it.second.mAcquire)
set["state"] = "acquiring";
else
set["state"] = "dead";
}
}
return ret;
}
void newRound (std::uint32_t seq) override
{
ScopedLockType lock (mLock);
// Protect zero set from expiration
m_zeroSet.mSeq = seq;
if (m_seq != seq)
{
m_seq = seq;
auto it = m_map.begin ();
std::uint32_t const minSeq =
(seq < setKeepRounds) ? 0 : (seq - setKeepRounds);
std::uint32_t maxSeq = seq + setKeepRounds;
while (it != m_map.end ())
{
if (it->second.mSeq < minSeq || it->second.mSeq > maxSeq)
it = m_map.erase (it);
else
++it;
}
}
}
void onStop () override
{
ScopedLockType lock (mLock);
m_map.clear ();
stopped();
}
private:
clock_type& m_clock;
typedef hash_map <uint256, InboundTransactionSet> MapType;
typedef RippleRecursiveMutex LockType;
typedef std::unique_lock <LockType> ScopedLockType;
LockType mLock;
MapType m_map;
std::uint32_t m_seq;
// The empty transaction set whose hash is zero
InboundTransactionSet& m_zeroSet;
std::function <void (uint256 const&, std::shared_ptr <SHAMap> const&)> m_gotSet;
};
//------------------------------------------------------------------------------
InboundTransactions::~InboundTransactions() = default;
std::unique_ptr <InboundTransactions>
make_InboundTransactions (
InboundLedgers::clock_type& clock,
beast::Stoppable& parent,
beast::insight::Collector::ptr const& collector,
std::function <void (uint256 const&,
std::shared_ptr <SHAMap> const&)> gotSet)
{
return std::make_unique <InboundTransactionsImp>
(clock, parent, collector, std::move (gotSet));
}
} // ripple

View File

@@ -0,0 +1,84 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2015 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_INBOUNDTRANSACTIONS_H
#define RIPPLE_INBOUNDTRANSACTIONS_H
#include <ripple/overlay/Peer.h>
#include <ripple/shamap/SHAMap.h>
#include <beast/chrono/abstract_clock.h>
#include <beast/cxx14/memory.h> // <memory>
#include <beast/threads/Stoppable.h>
namespace ripple {
/** Manages the acquisition and lifetime of transaction sets.
*/
class InboundTransactions
{
public:
typedef beast::abstract_clock <std::chrono::steady_clock> clock_type;
InboundTransactions() = default;
InboundTransactions(InboundTransactions const&) = delete;
InboundTransactions& operator=(InboundTransactions const&) = delete;
virtual ~InboundTransactions() = 0;
/** Retrieves a transaction set by hash
*/
virtual std::shared_ptr <SHAMap> getSet (
uint256 const& setHash,
bool acquire) = 0;
/** Gives data to an inbound transaction set
*/
virtual void gotData (uint256 const& setHash,
std::shared_ptr <Peer>,
std::shared_ptr <protocol::TMLedgerData>) = 0;
/** Gives set to the container
*/
virtual void giveSet (uint256 const& setHash,
std::shared_ptr <SHAMap> const& set,
bool acquired) = 0;
/** Informs the container if a new consensus round
*/
virtual void newRound (std::uint32_t seq) = 0;
virtual Json::Value getInfo() = 0;
virtual void onStop() = 0;
};
std::unique_ptr <InboundTransactions>
make_InboundTransactions (
InboundTransactions::clock_type& clock,
beast::Stoppable& parent,
beast::insight::Collector::ptr const& collector,
std::function
<void (uint256 const&,
std::shared_ptr <SHAMap> const&)> gotSet);
} // ripple
#endif

View File

@@ -23,6 +23,7 @@
#include <ripple/app/main/Application.h> #include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/tx/TransactionAcquire.h> #include <ripple/app/tx/TransactionAcquire.h>
#include <ripple/app/tx/InboundTransactions.h>
#include <ripple/overlay/Overlay.h> #include <ripple/overlay/Overlay.h>
#include <beast/utility/make_lock.h> #include <beast/utility/make_lock.h>
#include <memory> #include <memory>
@@ -31,9 +32,11 @@ namespace ripple {
enum enum
{ {
// VFALCO NOTE This should be a std::chrono::duration constant. // Timeout interval in milliseconds
// TODO Document this. Is it seconds? Milliseconds? WTF? TX_ACQUIRE_TIMEOUT = 250,
TX_ACQUIRE_TIMEOUT = 250
NORM_TIMEOUTS = 4,
MAX_TIMEOUTS = 20,
}; };
TransactionAcquire::TransactionAcquire (uint256 const& hash, clock_type& clock) TransactionAcquire::TransactionAcquire (uint256 const& hash, clock_type& clock)
@@ -50,21 +53,9 @@ TransactionAcquire::~TransactionAcquire ()
{ {
} }
static void TACompletionHandler (uint256 hash, std::shared_ptr<SHAMap> map)
{
{
std::lock_guard<Application::MutexType> lock(getApp().getMasterMutex());
getApp().getOPs ().mapComplete (hash, map);
getApp().getInboundLedgers ().dropLedger (hash);
}
}
void TransactionAcquire::done () void TransactionAcquire::done ()
{ {
// We hold a PeerSet lock and so cannot acquire the master lock here // We hold a PeerSet lock and so cannot do real work here
std::shared_ptr<SHAMap> map;
if (mFailed) if (mFailed)
{ {
@@ -74,33 +65,28 @@ void TransactionAcquire::done ()
{ {
WriteLog (lsDEBUG, TransactionAcquire) << "Acquired TX set " << mHash; WriteLog (lsDEBUG, TransactionAcquire) << "Acquired TX set " << mHash;
mMap->setImmutable (); mMap->setImmutable ();
map = mMap;
uint256 const& hash (mHash);
std::shared_ptr <SHAMap> const& map (mMap);
getApp().getJobQueue().addJob (jtTXN_DATA, "completeAcquire",
[hash, map](Job&)
{
getApp().getInboundTransactions().giveSet (
hash, map, true);
});
} }
getApp().getJobQueue().addJob (jtTXN_DATA, "completeAcquire", std::bind (&TACompletionHandler, mHash, map));
} }
void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl) void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl)
{ {
bool aggressive = false; bool aggressive = false;
if (getTimeouts () > 10) if (getTimeouts () >= NORM_TIMEOUTS)
{ {
WriteLog (lsWARNING, TransactionAcquire) << "Ten timeouts on TX set " << getHash ();
psl.unlock();
{
auto lock = beast::make_lock(getApp().getMasterMutex());
if (getApp().getOPs ().stillNeedTXSet (mHash))
{
WriteLog (lsWARNING, TransactionAcquire) << "Still need it";
mTimeouts = 0;
aggressive = true; aggressive = true;
}
}
psl.lock();
if (!aggressive) if (getTimeouts () > MAX_TIMEOUTS)
{ {
mFailed = true; mFailed = true;
done (); done ();
@@ -108,30 +94,10 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl)
} }
} }
if (aggressive || !getPeerCount ()) if (aggressive)
{
// out of peers
WriteLog (lsWARNING, TransactionAcquire) << "Out of peers for TX set " << getHash ();
bool found = false;
Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers ();
for (auto const& peer : peerList)
{
if (peer->hasTxSet (getHash ()))
{
found = true;
peerHas (peer);
}
}
if (!found)
{
for (auto const& peer : peerList)
peerHas (peer);
}
}
else if (!progress)
trigger (Peer::ptr ()); trigger (Peer::ptr ());
addPeers (1);
} }
std::weak_ptr<PeerSet> TransactionAcquire::pmDowncast () std::weak_ptr<PeerSet> TransactionAcquire::pmDowncast ()
@@ -207,6 +173,8 @@ void TransactionAcquire::trigger (Peer::ptr const& peer)
SHAMapAddNode TransactionAcquire::takeNodes (const std::list<SHAMapNodeID>& nodeIDs, SHAMapAddNode TransactionAcquire::takeNodes (const std::list<SHAMapNodeID>& nodeIDs,
const std::list< Blob >& data, Peer::ptr const& peer) const std::list< Blob >& data, Peer::ptr const& peer)
{ {
ScopedLockType sl (mLock);
if (mComplete) if (mComplete)
{ {
WriteLog (lsTRACE, TransactionAcquire) << "TX set complete"; WriteLog (lsTRACE, TransactionAcquire) << "TX set complete";
@@ -262,4 +230,69 @@ SHAMapAddNode TransactionAcquire::takeNodes (const std::list<SHAMapNodeID>& node
} }
} }
void TransactionAcquire::addPeers (int numPeers)
{
std::vector <Peer::ptr> peerVec1, peerVec2;
{
auto peers = getApp().overlay().getActivePeers();
for (auto const& peer : peers)
{
if (peer->hasTxSet (mHash))
peerVec1.push_back (peer);
else
peerVec2.push_back (peer);
}
}
WriteLog (lsDEBUG, TransactionAcquire) << peerVec1.size() << " peers known to have " << mHash;
if (peerVec1.size() != 0)
{
// First try peers known to have the set
std::random_shuffle (peerVec1.begin (), peerVec1.end ());
for (auto const& peer : peerVec1)
{
if (peerHas (peer))
{
if (--numPeers <= 0)
return;
}
}
}
if (peerVec2.size() != 0)
{
// Then try peers not known to have the set
std::random_shuffle (peerVec2.begin (), peerVec2.end ());
for (auto const& peer : peerVec2)
{
if (peerHas (peer))
{
if (--numPeers <= 0)
return;
}
}
}
}
void TransactionAcquire::init (int numPeers)
{
ScopedLockType sl (mLock);
addPeers (numPeers);
setTimer ();
}
void TransactionAcquire::stillNeed ()
{
ScopedLockType sl (mLock);
if (mTimeouts > NORM_TIMEOUTS)
mTimeouts = NORM_TIMEOUTS;
}
} // ripple } // ripple

View File

@@ -49,17 +49,28 @@ public:
SHAMapAddNode takeNodes (const std::list<SHAMapNodeID>& IDs, SHAMapAddNode takeNodes (const std::list<SHAMapNodeID>& IDs,
const std::list< Blob >& data, Peer::ptr const&); const std::list< Blob >& data, Peer::ptr const&);
void init (int startPeers);
void stillNeed ();
private: private:
std::shared_ptr<SHAMap> mMap; std::shared_ptr<SHAMap> mMap;
bool mHaveRoot; bool mHaveRoot;
void onTimer (bool progress, ScopedLockType& peerSetLock); void onTimer (bool progress, ScopedLockType& peerSetLock);
void newPeer (Peer::ptr const& peer) void newPeer (Peer::ptr const& peer)
{ {
trigger (peer); trigger (peer);
} }
void done (); void done ();
// Tries to add the specified number of peers
void addPeers (int num);
void trigger (Peer::ptr const&); void trigger (Peer::ptr const&);
std::weak_ptr<PeerSet> pmDowncast (); std::weak_ptr<PeerSet> pmDowncast ();
}; };

View File

@@ -27,6 +27,7 @@
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/peers/ClusterNodeStatus.h> #include <ripple/app/peers/ClusterNodeStatus.h>
#include <ripple/app/peers/UniqueNodeList.h> #include <ripple/app/peers/UniqueNodeList.h>
#include <ripple/app/tx/InboundTransactions.h>
#include <ripple/basics/StringUtilities.h> #include <ripple/basics/StringUtilities.h>
#include <ripple/basics/UptimeTimer.h> #include <ripple/basics/UptimeTimer.h>
#include <ripple/core/JobQueue.h> #include <ripple/core/JobQueue.h>
@@ -1613,10 +1614,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
uint256 txHash; uint256 txHash;
memcpy (txHash.begin (), packet.ledgerhash ().data (), 32); memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
{ map = getApp().getInboundTransactions().getSet (txHash, false);
std::lock_guard<Application::MutexType> lock(getApp().getMasterMutex());
map = getApp().getOPs ().getTXMap (txHash);
}
if (!map) if (!map)
{ {
@@ -1946,44 +1944,12 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
send (oPacket); send (oPacket);
} }
// VFALCO TODO Make this non-static
void void
PeerImp::peerTXData (Job&, uint256 const& hash, PeerImp::peerTXData (Job&, uint256 const& hash,
std::shared_ptr <protocol::TMLedgerData> const& pPacket, std::shared_ptr <protocol::TMLedgerData> const& pPacket,
beast::Journal journal) beast::Journal journal)
{ {
protocol::TMLedgerData& packet = *pPacket; getApp().getInboundTransactions().gotData (hash, shared_from_this(), pPacket);
std::list<SHAMapNodeID> nodeIDs;
std::list< Blob > nodeData;
for (int i = 0; i < packet.nodes ().size (); ++i)
{
const protocol::TMLedgerNode& node = packet.nodes (i);
if (!node.has_nodeid () || !node.has_nodedata () || (
node.nodeid ().size () != 33))
{
journal.warning << "LedgerData request with invalid node ID";
charge (Resource::feeInvalidRequest);
return;
}
nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (),
static_cast<int>(node.nodeid ().size ())});
nodeData.push_back (Blob (node.nodedata ().begin (),
node.nodedata ().end ()));
}
SHAMapAddNode san;
{
std::lock_guard<Application::MutexType> lock(getApp().getMasterMutex());
san = getApp().getOPs().gotTXData (shared_from_this(),
hash, nodeIDs, nodeData);
}
if (san.isInvalid ())
charge (Resource::feeUnwantedData);
} }
} // ripple } // ripple

View File

@@ -23,4 +23,5 @@
#include <ripple/app/ledger/LedgerHistory.cpp> #include <ripple/app/ledger/LedgerHistory.cpp>
#include <ripple/app/tx/TransactionAcquire.cpp> #include <ripple/app/tx/TransactionAcquire.cpp>
#include <ripple/app/tx/LocalTxs.cpp> #include <ripple/app/tx/LocalTxs.cpp>
#include <ripple/app/tx/InboundTransactions.cpp>
#include <ripple/app/misc/NetworkOPs.cpp> #include <ripple/app/misc/NetworkOPs.cpp>