Compare commits

...

4 Commits

Author SHA1 Message Date
Vinnie Falco
7d47304efd Set version to 0.24.0-rc2 2014-05-02 11:40:46 -07:00
JoelKatz
91f84e1512 Pathfinding dispatch improvements
* Simplify decision whether to update a path
* Prevent pathfinding threads from blocking each other
2014-05-02 11:39:36 -07:00
Vinnie Falco
bae05d3a98 Set version to 0.24.0-rc1 2014-05-01 14:49:44 -07:00
JoelKatz
9ffb81b8c2 Validation timing fixes:
* Log whether consensus built a ledger we already had or were acquiring
* Don't trigger an acquire for a validation for a ledger we might be building
* When we finish building a ledger, try to accept that ledger
* If we cannot accept a built ledger, check held validations
* Correctly set isCurrent for untrusted validations
* Add appropriate logging

This fixes a race condition that could cause spurious and expensive
ledger fetches across the network and delayed recognition of
fully-validated ledger.
2014-05-01 14:49:44 -07:00
11 changed files with 232 additions and 79 deletions

View File

@@ -929,7 +929,16 @@ private:
}
newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect);
getApp().getLedgerMaster().storeLedger(newLCL);
if (getApp().getLedgerMaster().storeLedger (newLCL))
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built ledger we already had";
else if (getApp().getInboundLedgers().find (newLCL->getHash()))
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built ledger we were acquiring";
else
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built new ledger";
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: NewL = " << newLCL->getHash ()
@@ -974,6 +983,9 @@ private:
WriteLog (lsINFO, LedgerConsensus)
<< "CNF newLCL " << newLCLHash;
// See if we can accept a ledger as fully-validated
getApp().getLedgerMaster().consensusBuilt (newLCL);
Ledger::pointer newOL = boost::make_shared<Ledger>
(true, boost::ref (*newLCL));
LedgerMaster::ScopedLockType sl
@@ -1485,6 +1497,9 @@ private:
else
initialSet = initialLedger.peekTransactionMap ()->snapShot (false);
// Tell the ledger master not to acquire the ledger we're probably building
getApp().getLedgerMaster().setBuildingLedger (mPreviousLedger->getLedgerSeq () + 1);
uint256 txSet = initialSet->getHash ();
WriteLog (lsINFO, LedgerConsensus) << "initial position " << txSet;
mapComplete (txSet, initialSet, false);

View File

@@ -39,16 +39,18 @@ LedgerHistory::LedgerHistory ()
{
}
void LedgerHistory::addLedger (Ledger::pointer ledger, bool validated)
bool LedgerHistory::addLedger (Ledger::pointer ledger, bool validated)
{
assert (ledger && ledger->isImmutable ());
assert (ledger->peekAccountStateMap ()->getHash ().isNonZero ());
LedgersByHash::ScopedLockType sl (m_ledgers_by_hash.peekMutex ());
m_ledgers_by_hash.canonicalize (ledger->getHash(), ledger, true);
const bool alreadyHad = m_ledgers_by_hash.canonicalize (ledger->getHash(), ledger, true);
if (validated)
mLedgersByIndex[ledger->getLedgerSeq()] = ledger->getHash();
return alreadyHad;
}
uint256 LedgerHistory::getLedgerHash (std::uint32_t index)

View File

@@ -28,7 +28,7 @@ class LedgerHistory : beast::LeakChecked <LedgerHistory>
public:
LedgerHistory ();
void addLedger (Ledger::pointer ledger, bool validated);
bool addLedger (Ledger::pointer ledger, bool validated);
float getCacheHitRate ()
{

View File

@@ -78,6 +78,7 @@ public:
std::atomic <std::uint32_t> mPubLedgerSeq;
std::atomic <std::uint32_t> mValidLedgerClose;
std::atomic <std::uint32_t> mValidLedgerSeq;
std::atomic <std::uint32_t> mBuildingLedgerSeq;
//--------------------------------------------------------------------------
@@ -97,6 +98,7 @@ public:
, mPubLedgerSeq (0)
, mValidLedgerClose (0)
, mValidLedgerSeq (0)
, mBuildingLedgerSeq (0)
{
}
@@ -271,9 +273,10 @@ public:
return mLedgerHistory.fixIndex (ledgerIndex, ledgerHash);
}
void storeLedger (Ledger::pointer ledger)
bool storeLedger (Ledger::pointer ledger)
{
mLedgerHistory.addLedger (ledger, false);
// Returns true if we already had the ledger
return mLedgerHistory.addLedger (ledger, false);
}
void forceValid (Ledger::pointer ledger)
@@ -328,6 +331,17 @@ public:
mEngine.setLedger (mCurrentLedger.getMutable ());
}
LedgerIndex getBuildingLedger ()
{
// The ledger we are currently building, 0 of none
return mBuildingLedgerSeq.load ();
}
void setBuildingLedger (LedgerIndex i)
{
mBuildingLedgerSeq.store (i);
}
TER doTransaction (SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply)
{
Ledger::pointer ledger;
@@ -641,11 +655,21 @@ public:
getApp().getInboundLedgers().findCreate(hash, seq, InboundLedger::fcGENERIC);
}
// Check if the specified ledger can become the new last fully-validated ledger
void checkAccept (uint256 const& hash, std::uint32_t seq)
{
if ((seq == 0) && (seq <= mValidLedgerSeq))
if (seq != 0)
{
// Ledger is too old
if (seq <= mValidLedgerSeq)
return;
// Ledger could match the ledger we're already building
if (seq == mBuildingLedgerSeq)
return;
}
Ledger::pointer ledger = mLedgerHistory.getLedgerByHash (hash);
if (!ledger)
@@ -667,16 +691,15 @@ public:
checkAccept (ledger);
}
void checkAccept (Ledger::ref ledger)
/**
* Determines how many validations are needed to fully-validated a ledger
*
* @return Number of validations needed
*/
int getNeededValidations ()
{
if (ledger->getLedgerSeq() <= mValidLedgerSeq)
return;
// Can we advance the last fully-validated ledger? If so, can we publish?
ScopedLockType ml (m_mutex);
if (ledger->getLedgerSeq() <= mValidLedgerSeq)
return;
if (getConfig ().RUN_STANDALONE)
return 0;
int minVal = mMinValidations;
@@ -690,9 +713,21 @@ public:
minVal = val;
}
if (getConfig ().RUN_STANDALONE)
minVal = 0;
return minVal;
}
void checkAccept (Ledger::ref ledger)
{
if (ledger->getLedgerSeq() <= mValidLedgerSeq)
return;
// Can we advance the last fully-validated ledger? If so, can we publish?
ScopedLockType ml (m_mutex);
if (ledger->getLedgerSeq() <= mValidLedgerSeq)
return;
int minVal = getNeededValidations();
int tvc = getApp().getValidations().getTrustedValidationCount(ledger->getHash());
if (tvc < minVal) // nothing we can do
{
@@ -728,6 +763,94 @@ public:
tryAdvance ();
}
/** Report that the consensus process build a particular ledger */
void consensusBuilt (Ledger::ref ledger) override
{
setBuildingLedger (0);
if (ledger->getLedgerSeq() <= mValidLedgerSeq)
{
WriteLog (lsINFO, LedgerConsensus)
<< "Consensus built old ledger: "
<< ledger->getLedgerSeq() << " <= " << mValidLedgerSeq;
return;
}
// See if this ledger can be the new fully-validated ledger
checkAccept (ledger);
if (ledger->getLedgerSeq() <= mValidLedgerSeq)
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus ledger fully validated";
return;
}
// This ledger cannot be the new fully-validated ledger, but
// maybe we saved up validations for some other ledger that can be
auto const val = getApp().getValidations().getCurrentTrustedValidations();
// Track validation counts with sequence numbers
class valSeq
{
public:
valSeq () : valCount(0), ledgerSeq(0) { ; }
void mergeValidation (LedgerSeq seq)
{
valCount++;
// If we didn't already know the sequence, now we do
if (ledgerSeq == 0)
ledgerSeq = seq;
}
int valCount;
LedgerSeq ledgerSeq;
};
// Count the number of current, trusted validations
ripple::unordered_map <uint256, valSeq> count;
for (auto const& v : val)
{
valSeq& vs = count[v->getLedgerHash()];
vs.mergeValidation (v->getFieldU32 (sfLedgerSequence));
}
int neededValidations = getNeededValidations ();
LedgerSeq maxSeq = mValidLedgerSeq;
uint256 maxLedger = ledger->getHash();
// Of the ledgers with sufficient validations,
// find the one with the highest sequence
for (auto& v : count)
if (v.second.valCount > neededValidations)
{
// If we still don't know the sequence, get it
if (v.second.ledgerSeq == 0)
{
Ledger::pointer l = getLedgerByHash (v.first);
if (l)
v.second.ledgerSeq = l->getLedgerSeq();
}
if (v.second.ledgerSeq > maxSeq)
{
maxSeq = v.second.ledgerSeq;
maxLedger = v.first;
}
}
if (maxSeq > mValidLedgerSeq)
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus triggered check of ledger";
checkAccept (maxLedger, maxSeq);
}
}
void advanceThread()
{
ScopedLockType sl (m_mutex);

View File

@@ -80,7 +80,7 @@ public:
virtual void pushLedger (Ledger::pointer newLedger) = 0;
virtual void pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL) = 0;
virtual void storeLedger (Ledger::pointer) = 0;
virtual bool storeLedger (Ledger::pointer) = 0;
virtual void forceValid (Ledger::pointer) = 0;
virtual void setFullLedger (Ledger::pointer ledger, bool isSynchronous, bool isCurrent) = 0;
@@ -128,6 +128,10 @@ public:
virtual void checkAccept (Ledger::ref ledger) = 0;
virtual void checkAccept (uint256 const& hash, std::uint32_t seq) = 0;
virtual void consensusBuilt (Ledger::ref ledger) = 0;
virtual LedgerIndex getBuildingLedger () = 0;
virtual void setBuildingLedger (LedgerIndex index) = 0;
virtual void tryAdvance () = 0;
virtual void newPathRequest () = 0;

View File

@@ -74,20 +74,18 @@ private:
RippleAddress signer = val->getSignerPublic ();
bool isCurrent = false;
if (val->isTrusted () || getApp().getUNL ().nodeInUNL (signer))
{
if (!val->isTrusted() && getApp().getUNL().nodeInUNL (signer))
val->setTrusted();
std::uint32_t now = getApp().getOPs().getCloseTimeNC();
std::uint32_t valClose = val->getSignTime();
if ((now > (valClose - LEDGER_EARLY_INTERVAL)) && (now < (valClose + LEDGER_VAL_INTERVAL)))
isCurrent = true;
else
{
WriteLog (lsWARNING, Validations) << "Received stale validation now=" << now << ", close=" << valClose;
}
}
else
if (!val->isTrusted ())
{
WriteLog (lsDEBUG, Validations) << "Node " << signer.humanNodePublic () << " not in UNL st=" << val->getSignTime () <<
", hash=" << val->getLedgerHash () << ", shash=" << val->getSigningHash () << " src=" << source;
@@ -96,15 +94,14 @@ private:
uint256 hash = val->getLedgerHash ();
uint160 node = signer.getNodeID ();
if (val->isTrusted () && isCurrent)
{
ScopedLockType sl (mLock);
if (!findCreateSet (hash)->insert (std::make_pair (node, val)).second)
return false;
if (isCurrent)
{
ripple::unordered_map<uint160, SerializedValidation::pointer>::iterator it = mCurrentValidations.find (node);
auto it = mCurrentValidations.find (node);
if (it == mCurrentValidations.end ())
mCurrentValidations.emplace (node, val);
@@ -120,16 +117,18 @@ private:
else
isCurrent = false;
}
}
WriteLog (lsDEBUG, Validations) << "Val for " << hash << " from " << signer.humanNodePublic ()
<< " added " << (val->isTrusted () ? "trusted/" : "UNtrusted/") << (isCurrent ? "current" : "stale");
if (val->isTrusted () && isCurrent)
{
getApp().getLedgerMaster ().checkAccept (hash, val->getFieldU32 (sfLedgerSequence));
return true;
}
// FIXME: This never forwards untrusted validations
return isCurrent;
return false;
}
void tune (int size, int age)

View File

@@ -29,7 +29,8 @@ const boost::shared_ptr<InfoSub>& subscriber, int id, PathRequests& owner,
, wpSubscriber (subscriber)
, jvStatus (Json::objectValue)
, bValid (false)
, iLastIndex (0)
, mLastIndex (0)
, mInProgress (false)
, iLastLevel (0)
, bLastSuccess (false)
, iIdentifier (id)
@@ -77,38 +78,43 @@ bool PathRequest::isValid ()
bool PathRequest::isNew ()
{
ScopedLockType sl (mIndexLock);
// does this path request still need its first full path
return iLastIndex.load() == 0;
return mLastIndex == 0;
}
bool PathRequest::needsUpdate (bool newOnly, LedgerIndex index)
{
LedgerIndex lastIndex = iLastIndex.load();
for (;;)
{
if (newOnly)
{
// Is this request new
if (lastIndex != 0)
return false;
ScopedLockType sl (mIndexLock);
// This thread marks this request handled
if (iLastIndex.compare_exchange_weak (lastIndex, 1,
std::memory_order_release, std::memory_order_relaxed))
if (mInProgress)
{
// Another thread is handling this
return false;
}
if (newOnly && (mLastIndex != 0))
{
// Only handling new requests, this isn't new
return false;
}
if (mLastIndex >= index)
{
return false;
}
mInProgress = true;
return true;
}
else
{
// Has the request already been handled?
if (lastIndex >= index)
return false;
// This thread marks this request handled
if (iLastIndex.compare_exchange_weak (lastIndex, index,
std::memory_order_release, std::memory_order_relaxed))
return true;
}
}
void PathRequest::updateComplete ()
{
ScopedLockType sl (mIndexLock);
assert (mInProgress);
mInProgress = false;
}
bool PathRequest::isValid (RippleLineCache::ref crCache)

View File

@@ -57,6 +57,7 @@ public:
bool isValid ();
bool isNew ();
bool needsUpdate (bool newOnly, LedgerIndex index);
void updateComplete ();
Json::Value getStatus ();
Json::Value doCreate (const boost::shared_ptr<Ledger>&, const RippleLineCache::pointer&,
@@ -93,7 +94,9 @@ private:
bool bValid;
std::atomic<LedgerIndex> iLastIndex;
LockType mIndexLock;
LedgerIndex mLastIndex;
bool mInProgress;
int iLastLevel;
bool bLastSuccess;

View File

@@ -90,6 +90,7 @@ void PathRequests::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
if (!ipSub->getConsumer ().warn ())
{
Json::Value update = pRequest->doUpdate (cache, false);
pRequest->updateComplete ();
update["type"] = "path_find";
ipSub->send (update, false);
remove = false;

View File

@@ -31,7 +31,7 @@ char const* BuildInfo::getRawVersionString ()
//
// The build version number (edit this for each release)
//
"0.23.0"
"0.24.0-rc2"
//
// Must follow the format described here:
//