Decongest the master lock:

* Reduce scope of lock in ledger accept
* Remove duplicate tracking of transaction sets
* Need master lock to secure ledger sequencing
This commit is contained in:
David Schwartz
2015-03-06 13:22:20 -08:00
committed by Nik Bougalis
parent e44e75fa6b
commit 60a7abcef6
6 changed files with 192 additions and 277 deletions

View File

@@ -925,36 +925,6 @@ public:
return true;
}
/**
A peer has informed us that it can give us a transaction set
@param peer The peer we can get it from.
@param hashSet The transaction set we can get.
@param status Says whether or not the peer has the transaction set
locally.
@return true if we have or acquire the transaction set.
*/
bool peerHasSet (Peer::ptr const& peer, uint256 const& hashSet
, protocol::TxSetStatus status)
{
if (status != protocol::tsHAVE) // Indirect requests for future support
return true;
std::vector< std::weak_ptr<Peer> >& set = mPeerData[hashSet];
for (std::weak_ptr<Peer>& iit : set)
if (iit.lock () == peer)
return false;
set.push_back (peer);
auto acq (mAcquiring.find (hashSet));
if (acq != mAcquiring.end ())
getApp().getJobQueue().addJob(jtTXN_DATA, "peerHasTxnData",
std::bind(&TransactionAcquire::peerHasVoid, acq->second, peer));
return true;
}
/**
A peer has sent us some nodes from a transaction set
@@ -1017,211 +987,214 @@ private:
assert (set->getHash () == mOurPosition->getCurrentHash ());
// these are now obsolete
getApp().getOPs ().peekStoredProposals ().clear ();
}
std::uint32_t closeTime = roundCloseTime (mOurPosition->getCloseTime ());
bool closeTimeCorrect = true;
std::uint32_t closeTime = roundCloseTime (mOurPosition->getCloseTime ());
bool closeTimeCorrect = true;
if (closeTime == 0)
if (closeTime == 0)
{
// we agreed to disagree
closeTimeCorrect = false;
closeTime = mPreviousLedger->getCloseTimeNC () + 1;
}
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: Prop=" << (mProposing ? "yes" : "no")
<< " val=" << (mValidating ? "yes" : "no")
<< " corLCL=" << (mHaveCorrectLCL ? "yes" : "no")
<< " fail=" << (mConsensusFail ? "yes" : "no");
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: Prev = " << mPrevLedgerHash
<< ":" << mPreviousLedger->getLedgerSeq ();
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: TxSt = " << set->getHash ()
<< ", close " << closeTime << (closeTimeCorrect ? "" : "X");
// Put failed transactions into a deterministic order
CanonicalTXSet retriableTransactions (set->getHash ());
// Build the new last closed ledger
Ledger::pointer newLCL
= std::make_shared<Ledger> (false
, *mPreviousLedger);
// Set up to write SHAMap changes to our database,
// perform updates, extract changes
WriteLog (lsDEBUG, LedgerConsensus)
<< "Applying consensus set transactions to the"
<< " last closed ledger";
applyTransactions (set, newLCL, newLCL, retriableTransactions, false);
newLCL->updateSkipList ();
newLCL->setClosed ();
int asf = newLCL->peekAccountStateMap ()->flushDirty (
hotACCOUNT_NODE, newLCL->getLedgerSeq());
int tmf = newLCL->peekTransactionMap ()->flushDirty (
hotTRANSACTION_NODE, newLCL->getLedgerSeq());
WriteLog (lsDEBUG, LedgerConsensus) << "Flushed " << asf << " account and " <<
tmf << "transaction nodes";
// Accept ledger
newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect);
// And stash the ledger in the ledger master
if (getApp().getLedgerMaster().storeLedger (newLCL))
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built ledger we already had";
else if (getApp().getInboundLedgers().find (newLCL->getHash()))
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built ledger we were acquiring";
else
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built new ledger";
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: NewL = " << newLCL->getHash ()
<< ":" << newLCL->getLedgerSeq ();
uint256 newLCLHash = newLCL->getHash ();
// Tell directly connected peers that we have a new LCL
statusChange (protocol::neACCEPTED_LEDGER, *newLCL);
if (mValidating && !mConsensusFail)
{
// Build validation
uint256 signingHash;
STValidation::pointer v =
std::make_shared<STValidation>
(newLCLHash, getApp().getOPs ().getValidationTimeNC ()
, mValPublic, mProposing);
v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ());
addLoad(v); // Our network load
if (((newLCL->getLedgerSeq () + 1) % 256) == 0)
// next ledger is flag ledger
{
// we agreed to disagree
closeTimeCorrect = false;
closeTime = mPreviousLedger->getCloseTimeNC () + 1;
// Suggest fee changes and new features
m_feeVote.doValidation (newLCL, *v);
getApp().getAmendmentTable ().doValidation (newLCL, *v);
}
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: Prop=" << (mProposing ? "yes" : "no")
<< " val=" << (mValidating ? "yes" : "no")
<< " corLCL=" << (mHaveCorrectLCL ? "yes" : "no")
<< " fail=" << (mConsensusFail ? "yes" : "no");
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: Prev = " << mPrevLedgerHash
<< ":" << mPreviousLedger->getLedgerSeq ();
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: TxSt = " << set->getHash ()
<< ", close " << closeTime << (closeTimeCorrect ? "" : "X");
v->sign (signingHash, mValPrivate);
v->setTrusted ();
// suppress it if we receive it - FIXME: wrong suppression
getApp().getHashRouter ().addSuppression (signingHash);
getApp().getValidations ().addValidation (v, "local");
getApp().getOPs ().setLastValidation (v);
Blob validation = v->getSigned ();
protocol::TMValidation val;
val.set_validation (&validation[0], validation.size ());
// Send signed validation to all of our directly connected peers
getApp ().overlay ().foreach (send_always (
std::make_shared <Message> (
val, protocol::mtVALIDATION)));
WriteLog (lsINFO, LedgerConsensus)
<< "CNF Val " << newLCLHash;
}
else
WriteLog (lsINFO, LedgerConsensus)
<< "CNF newLCL " << newLCLHash;
// Put failed transactions into a deterministic order
CanonicalTXSet retriableTransactions (set->getHash ());
// See if we can accept a ledger as fully-validated
getApp().getLedgerMaster().consensusBuilt (newLCL);
// Build the new last closed ledger
Ledger::pointer newLCL
= std::make_shared<Ledger> (false
, *mPreviousLedger);
// Build new open ledger
Ledger::pointer newOL = std::make_shared<Ledger>
(true, *newLCL);
// Set up to write SHAMap changes to our database,
// perform updates, extract changes
WriteLog (lsDEBUG, LedgerConsensus)
<< "Applying consensus set transactions to the"
<< " last closed ledger";
applyTransactions (set, newLCL, newLCL, retriableTransactions, false);
newLCL->updateSkipList ();
newLCL->setClosed ();
int asf = newLCL->peekAccountStateMap ()->flushDirty (
hotACCOUNT_NODE, newLCL->getLedgerSeq());
int tmf = newLCL->peekTransactionMap ()->flushDirty (
hotTRANSACTION_NODE, newLCL->getLedgerSeq());
WriteLog (lsDEBUG, LedgerConsensus) << "Flushed " << asf << " account and " <<
tmf << "transaction nodes";
// Accept ledger
newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect);
// And stash the ledger in the ledger master
if (getApp().getLedgerMaster().storeLedger (newLCL))
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built ledger we already had";
else if (getApp().getInboundLedgers().find (newLCL->getHash()))
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built ledger we were acquiring";
else
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built new ledger";
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: NewL = " << newLCL->getHash ()
<< ":" << newLCL->getLedgerSeq ();
uint256 newLCLHash = newLCL->getHash ();
// Tell directly connected peers that we have a new LCL
statusChange (protocol::neACCEPTED_LEDGER, *newLCL);
if (mValidating && !mConsensusFail)
// Apply disputed transactions that didn't get in
TransactionEngine engine (newOL);
bool anyDisputes = false;
for (auto& it : mDisputes)
{
if (!it.second->getOurVote ())
{
// Build validation
uint256 signingHash;
STValidation::pointer v =
std::make_shared<STValidation>
(newLCLHash, getApp().getOPs ().getValidationTimeNC ()
, mValPublic, mProposing);
v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ());
addLoad(v); // Our network load
if (((newLCL->getLedgerSeq () + 1) % 256) == 0)
// next ledger is flag ledger
// we voted NO
try
{
// Suggest fee changes and new features
m_feeVote.doValidation (newLCL, *v);
getApp().getAmendmentTable ().doValidation (newLCL, *v);
WriteLog (lsDEBUG, LedgerConsensus)
<< "Test applying disputed transaction that did"
<< " not get in";
SerialIter sit (it.second->peekTransaction ());
STTx::pointer txn
= std::make_shared<STTx>(sit);
retriableTransactions.push_back (txn);
anyDisputes = true;
}
catch (...)
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Failed to apply transaction we voted NO on";
}
v->sign (signingHash, mValPrivate);
v->setTrusted ();
// suppress it if we receive it - FIXME: wrong suppression
getApp().getHashRouter ().addSuppression (signingHash);
getApp().getValidations ().addValidation (v, "local");
getApp().getOPs ().setLastValidation (v);
Blob validation = v->getSigned ();
protocol::TMValidation val;
val.set_validation (&validation[0], validation.size ());
// Send signed validation to all of our directly connected peers
getApp ().overlay ().foreach (send_always (
std::make_shared <Message> (
val, protocol::mtVALIDATION)));
WriteLog (lsINFO, LedgerConsensus)
<< "CNF Val " << newLCLHash;
}
else
WriteLog (lsINFO, LedgerConsensus)
<< "CNF newLCL " << newLCLHash;
}
// See if we can accept a ledger as fully-validated
getApp().getLedgerMaster().consensusBuilt (newLCL);
if (anyDisputes)
{
applyTransactions (std::shared_ptr<SHAMap>(),
newOL, newLCL, retriableTransactions, true);
}
{
Application::ScopedLockType lock
(getApp ().getMasterLock ());
// Build new open ledger
Ledger::pointer newOL = std::make_shared<Ledger>
(true, *newLCL);
LedgerMaster::ScopedLockType sl
(getApp().getLedgerMaster ().peekMutex ());
// Apply disputed transactions that didn't get in
TransactionEngine engine (newOL);
bool anyDisputes = false;
for (auto& it : mDisputes)
// Apply transactions from the old open ledger
Ledger::pointer oldOL = getApp().getLedgerMaster().getCurrentLedger();
if (oldOL->peekTransactionMap()->getHash().isNonZero ())
{
if (!it.second->getOurVote ())
{
// we voted NO
try
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Test applying disputed transaction that did"
<< " not get in";
SerialIter sit (it.second->peekTransaction ());
STTx::pointer txn
= std::make_shared<STTx>(sit);
retriableTransactions.push_back (txn);
anyDisputes = true;
}
catch (...)
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Failed to apply transaction we voted NO on";
}
}
}
if (anyDisputes)
{
applyTransactions (std::shared_ptr<SHAMap>(),
WriteLog (lsDEBUG, LedgerConsensus)
<< "Applying transactions from current open ledger";
applyTransactions (oldOL->peekTransactionMap (),
newOL, newLCL, retriableTransactions, true);
}
{
// Apply transactions from the old open ledger
Ledger::pointer oldOL = getApp().getLedgerMaster().getCurrentLedger();
if (oldOL->peekTransactionMap()->getHash().isNonZero ())
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Applying transactions from current open ledger";
applyTransactions (oldOL->peekTransactionMap (),
newOL, newLCL, retriableTransactions, true);
}
}
{
// Apply local transactions
TransactionEngine engine (newOL);
m_localTX.apply (engine);
}
// Apply local transactions
TransactionEngine engine (newOL);
m_localTX.apply (engine);
// We have a new Last Closed Ledger and new Open Ledger
getApp().getLedgerMaster ().pushLedger (newLCL, newOL);
mNewLedgerHash = newLCL->getHash ();
mState = lcsACCEPTED;
sl.unlock ();
}
if (mValidating)
mNewLedgerHash = newLCL->getHash ();
mState = lcsACCEPTED;
if (mValidating)
{
// see how close our close time is to other node's
// close time reports, and update our clock.
WriteLog (lsINFO, LedgerConsensus)
<< "We closed at "
<< beast::lexicalCastThrow <std::string> (mCloseTime);
std::uint64_t closeTotal = mCloseTime;
int closeCount = 1;
for (auto it = mCloseTimes.begin ()
, end = mCloseTimes.end (); it != end; ++it)
{
// see how close our close time is to other node's
// close time reports, and update our clock.
// FIXME: Use median, not average
WriteLog (lsINFO, LedgerConsensus)
<< "We closed at "
<< beast::lexicalCastThrow <std::string> (mCloseTime);
std::uint64_t closeTotal = mCloseTime;
int closeCount = 1;
for (auto it = mCloseTimes.begin ()
, end = mCloseTimes.end (); it != end; ++it)
{
// FIXME: Use median, not average
WriteLog (lsINFO, LedgerConsensus)
<< beast::lexicalCastThrow <std::string> (it->second)
<< " time votes for "
<< beast::lexicalCastThrow <std::string> (it->first);
closeCount += it->second;
closeTotal += static_cast<std::uint64_t>
(it->first) * static_cast<std::uint64_t> (it->second);
}
closeTotal += (closeCount / 2);
closeTotal /= closeCount;
int offset = static_cast<int> (closeTotal)
- static_cast<int> (mCloseTime);
WriteLog (lsINFO, LedgerConsensus)
<< "Our close offset is estimated at "
<< offset << " (" << closeCount << ")";
getApp().getOPs ().closeTimeOffset (offset);
<< beast::lexicalCastThrow <std::string> (it->second)
<< " time votes for "
<< beast::lexicalCastThrow <std::string> (it->first);
closeCount += it->second;
closeTotal += static_cast<std::uint64_t>
(it->first) * static_cast<std::uint64_t> (it->second);
}
closeTotal += (closeCount / 2);
closeTotal /= closeCount;
int offset = static_cast<int> (closeTotal)
- static_cast<int> (mCloseTime);
WriteLog (lsINFO, LedgerConsensus)
<< "Our close offset is estimated at "
<< offset << " (" << closeCount << ")";
getApp().getOPs ().closeTimeOffset (offset);
}
}
@@ -1232,31 +1205,7 @@ private:
*/
void startAcquiring (TransactionAcquire::pointer acquire)
{
auto it = mPeerData.find (acquire->getHash ());
if (it != mPeerData.end ())
{
// Add any peers we already know have his transaction set
std::vector< std::weak_ptr<Peer> >& peerList = it->second;
std::vector< std::weak_ptr<Peer> >::iterator pit
= peerList.begin ();
while (pit != peerList.end ())
{
Peer::ptr pr = pit->lock ();
if (!pr)
{
pit = peerList.erase (pit);
}
else
{
acquire->peerHas (pr);
++pit;
}
}
}
// FIXME: Randomize and limit the number
struct build_acquire_list
{
typedef void return_type;
@@ -1972,9 +1921,6 @@ private:
hash_map<uint256, std::shared_ptr<SHAMap>> mAcquired;
hash_map<uint256, TransactionAcquire::pointer> mAcquiring;
// Peer sets
hash_map<uint256, std::vector< std::weak_ptr<Peer> > > mPeerData;
// Disputed transactions
hash_map<uint256, DisputedTx::pointer> mDisputes;
hash_set<uint256> mCompares;

View File

@@ -77,9 +77,6 @@ public:
virtual bool peerPosition (LedgerProposal::ref) = 0;
virtual bool peerHasSet (Peer::ptr const& peer, uint256 const& set,
protocol::TxSetStatus status) = 0;
virtual SHAMapAddNode peerGaveNodes (Peer::ptr const& peer,
uint256 const& setHash,
const std::list<SHAMapNodeID>& nodeIDs,

View File

@@ -1709,19 +1709,6 @@ SHAMapAddNode NetworkOPsImp::gotTXData (
return mConsensus->peerGaveNodes (peer, hash, nodeIDs, nodeData);
}
bool NetworkOPsImp::hasTXSet (
const std::shared_ptr<Peer>& peer, uint256 const& set,
protocol::TxSetStatus status)
{
if (mConsensus == nullptr)
{
m_journal.info << "Peer has TX set, not during consensus";
return false;
}
return mConsensus->peerHasSet (peer, set, status);
}
bool NetworkOPsImp::stillNeedTXSet (uint256 const& hash)
{
if (!mConsensus)

View File

@@ -222,9 +222,6 @@ public:
virtual std::shared_ptr<SHAMap> getTXMap (uint256 const& hash) = 0;
virtual bool hasTXSet (const std::shared_ptr<Peer>& peer,
uint256 const& set, protocol::TxSetStatus status) = 0;
virtual void mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map) = 0;

View File

@@ -1146,14 +1146,20 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMHaveTransactionSet> const& m)
memcpy (hash.begin (), m->hash ().data (), 32);
if (m->status () == protocol::tsHAVE)
addTxSet (hash);
{
Application::ScopedLockType lock (getApp ().getMasterLock ());
std::lock_guard<std::mutex> sl(recentLock_);
if (!getApp().getOPs ().hasTXSet (
shared_from_this (), hash, m->status ()))
if (std::find (recentTxSets_.begin (),
recentTxSets_.end (), hash) != recentTxSets_.end ())
{
fee_ = Resource::feeUnwantedData;
return;
}
if (recentTxSets_.size () == 128)
recentTxSets_.pop_front ();
recentTxSets_.push_back (hash);
}
}
@@ -1387,21 +1393,6 @@ PeerImp::addLedger (uint256 const& hash)
recentLedgers_.push_back (hash);
}
void
PeerImp::addTxSet (uint256 const& hash)
{
std::lock_guard<std::mutex> sl(recentLock_);
if (std::find (recentTxSets_.begin (),
recentTxSets_.end (), hash) != recentTxSets_.end ())
return;
if (recentTxSets_.size () == 128)
recentTxSets_.pop_front ();
recentTxSets_.push_back (hash);
}
void
PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet)
{

View File

@@ -396,9 +396,6 @@ private:
void
addLedger (uint256 const& hash);
void
addTxSet (uint256 const& hash);
void
doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet);