Files
xahaud/src/cpp/ripple/LedgerConsensus.cpp
JoelKatz 4c6920dd55 Add extra debug to better understand how the txn retry logic is working.
Avoid an extra transaction pass caused by failed transactions counting as changes.
Downgrade some debug messages from INFO to DEBUG.
2013-01-19 14:09:42 -08:00

1398 lines
42 KiB
C++

#include "LedgerConsensus.h"
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/unordered_set.hpp>
#include <boost/foreach.hpp>
#include "../json/writer.h"
#include "Application.h"
#include "NetworkOPs.h"
#include "LedgerTiming.h"
#include "SerializedValidation.h"
#include "Log.h"
#include "SHAMapSync.h"
#define TX_ACQUIRE_TIMEOUT 250
#define TRUST_NETWORK
#define LC_DEBUG
typedef std::map<uint160, LedgerProposal::pointer>::value_type u160_prop_pair;
typedef std::map<uint256, LCTransaction::pointer>::value_type u256_lct_pair;
SETUP_LOG();
DECLARE_INSTANCE(LedgerConsensus);
DECLARE_INSTANCE(TransactionAcquire);
TransactionAcquire::TransactionAcquire(const uint256& hash) : PeerSet(hash, TX_ACQUIRE_TIMEOUT), mHaveRoot(false)
{
mMap = boost::make_shared<SHAMap>(smtTRANSACTION, hash);
}
void TransactionAcquire::done()
{
if (mFailed)
{
cLog(lsWARNING) << "Failed to acquire TX set " << mHash;
theApp->getOPs().mapComplete(mHash, SHAMap::pointer());
}
else
{
cLog(lsINFO) << "Acquired TX set " << mHash;
mMap->setImmutable();
theApp->getOPs().mapComplete(mHash, mMap);
}
theApp->getMasterLedgerAcquire().dropLedger(mHash);
}
void TransactionAcquire::onTimer(bool progress)
{
if (!getPeerCount())
{ // out of peers
cLog(lsWARNING) << "Out of peers for TX set " << getHash();
bool found = false;
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
BOOST_FOREACH(Peer::ref peer, peerList)
{
if (peer->hasTxSet(getHash()))
{
found = true;
peerHas(peer);
}
}
if (!found)
{
BOOST_FOREACH(Peer::ref peer, peerList)
peerHas(peer);
}
}
else if (!progress)
trigger(Peer::pointer());
}
boost::weak_ptr<PeerSet> TransactionAcquire::pmDowncast()
{
return boost::shared_polymorphic_downcast<PeerSet>(shared_from_this());
}
void TransactionAcquire::trigger(Peer::ref peer)
{
if (mComplete || mFailed)
{
cLog(lsINFO) << "complete or failed";
return;
}
if (!mHaveRoot)
{
cLog(lsTRACE) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer") << " no root";
ripple::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
tmGL.set_itype(ripple::liTS_CANDIDATE);
if (getTimeouts() != 0)
tmGL.set_querytype(ripple::qtINDIRECT);
*(tmGL.add_nodeids()) = SHAMapNode().getRawString();
sendRequest(tmGL, peer);
}
else
{
std::vector<SHAMapNode> nodeIDs;
std::vector<uint256> nodeHashes;
ConsensusTransSetSF sf;
mMap->getMissingNodes(nodeIDs, nodeHashes, 256, &sf);
if (nodeIDs.empty())
{
if (mMap->isValid())
mComplete = true;
else
mFailed = true;
done();
return;
}
ripple::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
tmGL.set_itype(ripple::liTS_CANDIDATE);
if (getTimeouts() != 0)
tmGL.set_querytype(ripple::qtINDIRECT);
BOOST_FOREACH(SHAMapNode& it, nodeIDs)
*(tmGL.add_nodeids()) = it.getRawString();
sendRequest(tmGL, peer);
}
}
SMAddNode TransactionAcquire::takeNodes(const std::list<SHAMapNode>& nodeIDs,
const std::list< std::vector<unsigned char> >& data, Peer::ref peer)
{
if (mComplete)
{
cLog(lsTRACE) << "TX set complete";
return SMAddNode();
}
if (mFailed)
{
cLog(lsTRACE) << "TX set failed";
return SMAddNode();
}
try
{
if (nodeIDs.empty())
return SMAddNode::invalid();
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin();
std::list< std::vector<unsigned char> >::const_iterator nodeDatait = data.begin();
ConsensusTransSetSF sf;
while (nodeIDit != nodeIDs.end())
{
if (nodeIDit->isRoot())
{
if (mHaveRoot)
{
cLog(lsWARNING) << "Got root TXS node, already have it";
return SMAddNode();
}
if (!mMap->addRootNode(getHash(), *nodeDatait, snfWIRE, NULL))
{
cLog(lsWARNING) << "TX acquire got bad root node";
return SMAddNode::invalid();
}
else
mHaveRoot = true;
}
else if (!mMap->addKnownNode(*nodeIDit, *nodeDatait, &sf))
{
cLog(lsWARNING) << "TX acquire got bad non-root node";
return SMAddNode::invalid();
}
++nodeIDit;
++nodeDatait;
}
trigger(peer);
progress();
return SMAddNode::useful();
}
catch (...)
{
cLog(lsERROR) << "Peer sends us junky transaction node data";
return SMAddNode::invalid();
}
}
void LCTransaction::setVote(const uint160& peer, bool votesYes)
{ // Track a peer's yes/no vote on a particular disputed transaction
std::pair<boost::unordered_map<const uint160, bool>::iterator, bool> res =
mVotes.insert(std::pair<const uint160, bool>(peer, votesYes));
if (res.second)
{ // new vote
if (votesYes)
{
cLog(lsTRACE) << "Peer " << peer << " votes YES on " << mTransactionID;
++mYays;
}
else
{
cLog(lsTRACE) << "Peer " << peer << " votes NO on " << mTransactionID;
++mNays;
}
}
else if (votesYes && !res.first->second)
{ // changes vote to yes
cLog(lsDEBUG) << "Peer " << peer << " now votes YES on " << mTransactionID;
--mNays;
++mYays;
res.first->second = true;
}
else if (!votesYes && res.first->second)
{ // changes vote to no
cLog(lsDEBUG) << "Peer " << peer << " now votes NO on " << mTransactionID;
++mNays;
--mYays;
res.first->second = false;
}
}
void LCTransaction::unVote(const uint160& peer)
{ // Remove a peer's vote on this disputed transasction
boost::unordered_map<uint160, bool>::iterator it = mVotes.find(peer);
if (it != mVotes.end())
{
if (it->second)
--mYays;
else
--mNays;
mVotes.erase(it);
}
}
bool LCTransaction::updateVote(int percentTime, bool proposing)
{
if (mOurVote && (mNays == 0))
return false;
if (!mOurVote && (mYays == 0))
return false;
bool newPosition;
int weight;
if (proposing) // give ourselves full weight
{
// This is basically the percentage of nodes voting 'yes' (including us)
weight = (mYays * 100 + (mOurVote ? 100 : 0)) / (mNays + mYays + 1);
// To prevent avalanche stalls, we increase the needed weight slightly over time
if (percentTime < AV_MID_CONSENSUS_TIME) newPosition = weight > AV_INIT_CONSENSUS_PCT;
else if (percentTime < AV_LATE_CONSENSUS_TIME) newPosition = weight > AV_MID_CONSENSUS_PCT;
else newPosition = weight > AV_LATE_CONSENSUS_PCT;
}
else // don't let us outweigh a proposing node, just recognize consensus
{
weight = -1;
newPosition = mYays > mNays;
}
if (newPosition == mOurVote)
{
#ifdef LC_DEBUG
cLog(lsTRACE) << "No change (" << (mOurVote ? "YES" : "NO") << ") : weight "
<< weight << ", percent " << percentTime;
#endif
return false;
}
mOurVote = newPosition;
cLog(lsDEBUG) << "We now vote " << (mOurVote ? "YES" : "NO") << " on " << mTransactionID;
return true;
}
LedgerConsensus::LedgerConsensus(const uint256& prevLCLHash, Ledger::ref previousLedger, uint32 closeTime)
: mState(lcsPRE_CLOSE), mCloseTime(closeTime), mPrevLedgerHash(prevLCLHash), mPreviousLedger(previousLedger),
mValPublic(theConfig.VALIDATION_PUB), mValPrivate(theConfig.VALIDATION_PRIV), mConsensusFail(false),
mCurrentMSeconds(0), mClosePercent(0), mHaveCloseTimeConsensus(false),
mConsensusStartTime(boost::posix_time::microsec_clock::universal_time())
{
cLog(lsDEBUG) << "Creating consensus object";
cLog(lsTRACE) << "LCL:" << previousLedger->getHash() <<", ct=" << closeTime;
mPreviousProposers = theApp->getOPs().getPreviousProposers();
mPreviousMSeconds = theApp->getOPs().getPreviousConvergeTime();
assert(mPreviousMSeconds);
mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution(
mPreviousLedger->getCloseResolution(), mPreviousLedger->getCloseAgree(), previousLedger->getLedgerSeq() + 1);
if (mValPublic.isValid() && mValPrivate.isValid() && !theApp->getOPs().isNeedNetworkLedger())
{
cLog(lsINFO) << "Entering consensus process, validating";
mValidating = true;
mProposing = theApp->getOPs().getOperatingMode() == NetworkOPs::omFULL;
}
else
{
cLog(lsINFO) << "Entering consensus process, watching";
mProposing = mValidating = false;
}
mHaveCorrectLCL = (mPreviousLedger->getHash() == mPrevLedgerHash);
if (!mHaveCorrectLCL)
{
theApp->getOPs().setProposing(false, false);
handleLCL(mPrevLedgerHash);
if (!mHaveCorrectLCL)
{
// mProposing = mValidating = false;
cLog(lsINFO) << "Entering consensus with: " << previousLedger->getHash();
cLog(lsINFO) << "Correct LCL is: " << prevLCLHash;
}
}
else
theApp->getOPs().setProposing(mProposing, mValidating);
}
void LedgerConsensus::checkOurValidation()
{ // This only covers some cases - Fix for the case where we can't ever acquire the consensus ledger
if (!mHaveCorrectLCL || !mValPublic.isValid() || !mValPrivate.isValid() || theApp->getOPs().isNeedNetworkLedger())
return;
SerializedValidation::pointer lastVal = theApp->getOPs().getLastValidation();
if (lastVal)
{
if (lastVal->getFieldU32(sfLedgerSequence) == mPreviousLedger->getLedgerSeq())
return;
if (lastVal->getLedgerHash() == mPrevLedgerHash)
return;
}
uint256 signingHash;
SerializedValidation::pointer v = boost::make_shared<SerializedValidation>
(mPreviousLedger->getHash(), theApp->getOPs().getValidationTimeNC(), mValPublic, false);
v->setTrusted();
v->sign(signingHash, mValPrivate);
theApp->isNew(signingHash);
theApp->getValidations().addValidation(v);
std::vector<unsigned char> validation = v->getSigned();
ripple::TMValidation val;
val.set_validation(&validation[0], validation.size());
#if 0
theApp->getConnectionPool().relayMessage(NULL,
boost::make_shared<PackedMessage>(val, ripple::mtVALIDATION));
#endif
theApp->getOPs().setLastValidation(v);
cLog(lsWARNING) << "Sending partial validation";
}
void LedgerConsensus::checkLCL()
{
uint256 netLgr = mPrevLedgerHash;
int netLgrCount = 0;
uint256 favoredLedger = (mState == lcsPRE_CLOSE) ? uint256() : mPrevLedgerHash; // Don't get stuck one ledger back
boost::unordered_map<uint256, currentValidationCount> vals =
theApp->getValidations().getCurrentValidations(favoredLedger);
typedef std::map<uint256, currentValidationCount>::value_type u256_cvc_pair;
BOOST_FOREACH(u256_cvc_pair& it, vals)
if (it.second.first > netLgrCount)
{
netLgr = it.first;
netLgrCount = it.second.first;
}
if (netLgr != mPrevLedgerHash)
{ // LCL change
const char *status;
switch (mState)
{
case lcsPRE_CLOSE: status = "PreClose"; break;
case lcsESTABLISH: status = "Establish"; break;
case lcsFINISHED: status = "Finised"; break;
case lcsACCEPTED: status = "Accepted"; break;
default: status = "unknown";
}
cLog(lsWARNING) << "View of consensus changed during consensus (" << netLgrCount << ") status="
<< status << ", " << (mHaveCorrectLCL ? "CorrectLCL" : "IncorrectLCL");
cLog(lsWARNING) << mPrevLedgerHash << " to " << netLgr;
if (sLog(lsDEBUG))
{
BOOST_FOREACH(u256_cvc_pair& it, vals)
cLog(lsDEBUG) << "V: " << it.first << ", " << it.second.first;
}
if (mHaveCorrectLCL)
theApp->getOPs().consensusViewChange();
handleLCL(netLgr);
}
else if (mPreviousLedger->getHash() != mPrevLedgerHash)
handleLCL(netLgr);
}
void LedgerConsensus::handleLCL(const uint256& lclHash)
{
if (mPrevLedgerHash != lclHash)
{ // first time switching to this ledger
mPrevLedgerHash = lclHash;
if (mHaveCorrectLCL && mProposing && mOurPosition)
{
cLog(lsINFO) << "Bowing out of consensus";
mOurPosition->bowOut();
propose();
}
mProposing = false;
// mValidating = false;
mPeerPositions.clear();
mDisputes.clear();
mCloseTimes.clear();
mDeadNodes.clear();
playbackProposals();
}
if (mPreviousLedger->getHash() != mPrevLedgerHash)
{ // we need to switch the ledger we're working from
Ledger::pointer newLCL = theApp->getLedgerMaster().getLedgerByHash(lclHash);
if (newLCL)
mPreviousLedger = newLCL;
else if (!mAcquiringLedger || (mAcquiringLedger->getHash() != mPrevLedgerHash))
{ // need to start acquiring the correct consensus LCL
cLog(lsWARNING) << "Need consensus ledger " << mPrevLedgerHash;
mAcquiringLedger = theApp->getMasterLedgerAcquire().findCreate(mPrevLedgerHash);
mHaveCorrectLCL = false;
return;
}
}
cLog(lsINFO) << "Have the consensus ledger " << mPrevLedgerHash;
mHaveCorrectLCL = true;
#if 0 // FIXME: can trigger early
if (mAcquiringLedger && mAcquiringLedger->isComplete())
theApp->getOPs().clearNeedNetworkLedger();
#endif
mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution(
mPreviousLedger->getCloseResolution(), mPreviousLedger->getCloseAgree(),
mPreviousLedger->getLedgerSeq() + 1);
}
void LedgerConsensus::takeInitialPosition(Ledger& initialLedger)
{
SHAMap::pointer initialSet = initialLedger.peekTransactionMap()->snapShot(false);
uint256 txSet = initialSet->getHash();
cLog(lsINFO) << "initial position " << txSet;
mapComplete(txSet, initialSet, false);
if (mValidating)
mOurPosition = boost::make_shared<LedgerProposal>
(mValPublic, mValPrivate, initialLedger.getParentHash(), txSet, mCloseTime);
else
mOurPosition = boost::make_shared<LedgerProposal>(initialLedger.getParentHash(), txSet, mCloseTime);
BOOST_FOREACH(u256_lct_pair& it, mDisputes)
{
it.second->setOurVote(initialLedger.hasTransaction(it.first));
}
// if any peers have taken a contrary position, process disputes
boost::unordered_set<uint256> found;
BOOST_FOREACH(u160_prop_pair& it, mPeerPositions)
{
uint256 set = it.second->getCurrentHash();
if (found.insert(set).second)
{
boost::unordered_map<uint256, SHAMap::pointer>::iterator iit = mAcquired.find(set);
if (iit != mAcquired.end())
createDisputes(initialSet, iit->second);
}
}
if (mProposing)
propose();
}
void LedgerConsensus::createDisputes(SHAMap::ref m1, SHAMap::ref m2)
{
SHAMap::SHAMapDiff differences;
m1->compare(m2, differences, 16384);
typedef std::map<uint256, SHAMap::SHAMapDiffItem>::value_type u256_diff_pair;
BOOST_FOREACH (u256_diff_pair& pos, differences)
{ // create disputed transactions (from the ledger that has them)
if (pos.second.first)
{ // transaction is in first map
assert(!pos.second.second);
addDisputedTransaction(pos.first, pos.second.first->peekData());
}
else if (pos.second.second)
{ // transaction is in second map
assert(!pos.second.first);
addDisputedTransaction(pos.first, pos.second.second->peekData());
}
else // No other disagreement over a transaction should be possible
assert(false);
}
}
void LedgerConsensus::mapComplete(const uint256& hash, SHAMap::ref map, bool acquired)
{
tLog(acquired, lsINFO) << "We have acquired TXS " << hash;
if (!map)
{ // this is an invalid/corrupt map
mAcquired[hash] = map;
mAcquiring.erase(hash);
cLog(lsWARNING) << "A trusted node directed us to acquire an invalid TXN map";
return;
}
assert(hash == map->getHash());
if (mAcquired.find(hash) != mAcquired.end())
{
mAcquiring.erase(hash);
return; // we already have this map
}
if (mOurPosition && (!mOurPosition->isBowOut()) && (hash != mOurPosition->getCurrentHash()))
{ // this could create disputed transactions
boost::unordered_map<uint256, SHAMap::pointer>::iterator it2 = mAcquired.find(mOurPosition->getCurrentHash());
if (it2 != mAcquired.end())
{
assert((it2->first == mOurPosition->getCurrentHash()) && it2->second);
createDisputes(it2->second, map);
}
else
assert(false); // We don't have our own position?!
}
mAcquired[hash] = map;
mAcquiring.erase(hash);
// Adjust tracking for each peer that takes this position
std::vector<uint160> peers;
BOOST_FOREACH(u160_prop_pair& it, mPeerPositions)
{
if (it.second->getCurrentHash() == map->getHash())
peers.push_back(it.second->getPeerID());
}
if (!peers.empty())
adjustCount(map, peers);
else tLog(acquired, lsWARNING) << "By the time we got the map " << hash << " no peers were proposing it";
sendHaveTxSet(hash, true);
}
void LedgerConsensus::sendHaveTxSet(const uint256& hash, bool direct)
{
ripple::TMHaveTransactionSet msg;
msg.set_hash(hash.begin(), 256 / 8);
msg.set_status(direct ? ripple::tsHAVE : ripple::tsCAN_GET);
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(msg, ripple::mtHAVE_SET);
theApp->getConnectionPool().relayMessage(NULL, packet);
}
void LedgerConsensus::adjustCount(SHAMap::ref map, const std::vector<uint160>& peers)
{ // Adjust the counts on all disputed transactions based on the set of peers taking this position
BOOST_FOREACH(u256_lct_pair& it, mDisputes)
{
bool setHas = map->hasItem(it.second->getTransactionID());
BOOST_FOREACH(const uint160& pit, peers)
it.second->setVote(pit, setHas);
}
}
void LedgerConsensus::statusChange(ripple::NodeEvent event, Ledger& ledger)
{ // Send a node status change message to our peers
ripple::TMStatusChange s;
if (!mHaveCorrectLCL)
s.set_newevent(ripple::neLOST_SYNC);
else
s.set_newevent(event);
s.set_ledgerseq(ledger.getLedgerSeq());
s.set_networktime(theApp->getOPs().getNetworkTimeNC());
uint256 hash = ledger.getParentHash();
s.set_ledgerhashprevious(hash.begin(), hash.size());
hash = ledger.getHash();
s.set_ledgerhash(hash.begin(), hash.size());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(s, ripple::mtSTATUS_CHANGE);
theApp->getConnectionPool().relayMessage(NULL, packet);
cLog(lsTRACE) << "send status change to peer";
}
int LedgerConsensus::startup()
{
return 1;
}
void LedgerConsensus::statePreClose()
{ // it is shortly before ledger close time
bool anyTransactions = theApp->getLedgerMaster().getCurrentLedger()->peekTransactionMap()->getHash().isNonZero();
int proposersClosed = mPeerPositions.size();
// This ledger is open. This computes how long since the last ledger closed
int sinceClose;
int idleInterval = 0;
if (mHaveCorrectLCL && mPreviousLedger->getCloseAgree())
{ // we can use consensus timing
sinceClose = 1000 * (theApp->getOPs().getCloseTimeNC() - mPreviousLedger->getCloseTimeNC());
idleInterval = 2 * mPreviousLedger->getCloseResolution();
if (idleInterval < LEDGER_IDLE_INTERVAL)
idleInterval = LEDGER_IDLE_INTERVAL;
}
else
{
sinceClose = 1000 * (theApp->getOPs().getCloseTimeNC() - theApp->getOPs().getLastCloseTime());
idleInterval = LEDGER_IDLE_INTERVAL;
}
if (ContinuousLedgerTiming::shouldClose(anyTransactions, mPreviousProposers, proposersClosed,
mPreviousMSeconds, sinceClose, idleInterval))
{
closeLedger();
}
}
void LedgerConsensus::closeLedger()
{
checkOurValidation();
mState = lcsESTABLISH;
mConsensusStartTime = boost::posix_time::microsec_clock::universal_time();
mCloseTime = theApp->getOPs().getCloseTimeNC();
theApp->getOPs().setLastCloseTime(mCloseTime);
statusChange(ripple::neCLOSING_LEDGER, *mPreviousLedger);
takeInitialPosition(*theApp->getLedgerMaster().closeLedger(true));
}
void LedgerConsensus::stateEstablish()
{ // we are establishing consensus
if (mCurrentMSeconds < LEDGER_MIN_CONSENSUS)
return;
updateOurPositions();
if (!mHaveCloseTimeConsensus)
{
tLog(haveConsensus(false), lsINFO) << "We have TX consensus but not CT consensus";
}
else if (haveConsensus(true))
{
cLog(lsINFO) << "Converge cutoff (" << mPeerPositions.size() << " participants)";
mState = lcsFINISHED;
beginAccept(false);
}
}
void LedgerConsensus::stateFinished()
{ // we are processing the finished ledger
// logic of calculating next ledger advances us out of this state
// nothing to do
}
void LedgerConsensus::stateAccepted()
{ // we have accepted a new ledger
endConsensus();
}
extern volatile bool doShutdown;
void LedgerConsensus::timerEntry()
{
if (doShutdown)
{
cLog(lsFATAL) << "Shutdown requested";
theApp->stop();
}
if ((mState != lcsFINISHED) && (mState != lcsACCEPTED))
checkLCL();
mCurrentMSeconds =
(boost::posix_time::microsec_clock::universal_time() - mConsensusStartTime).total_milliseconds();
mClosePercent = mCurrentMSeconds * 100 / mPreviousMSeconds;
switch (mState)
{
case lcsPRE_CLOSE: statePreClose(); return;
case lcsESTABLISH: stateEstablish(); if (mState != lcsFINISHED) return; fallthru();
case lcsFINISHED: stateFinished(); if (mState != lcsACCEPTED) return; fallthru();
case lcsACCEPTED: stateAccepted(); return;
}
assert(false);
}
void LedgerConsensus::updateOurPositions()
{
boost::posix_time::ptime peerCutoff = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime ourCutoff = peerCutoff - boost::posix_time::seconds(PROPOSE_INTERVAL);
peerCutoff -= boost::posix_time::seconds(PROPOSE_FRESHNESS);
bool changes = false;
SHAMap::pointer ourPosition;
// std::vector<uint256> addedTx, removedTx;
// Verify freshness of peer positions and compute close times
std::map<uint32, int> closeTimes;
boost::unordered_map<uint160, LedgerProposal::pointer>::iterator
it = mPeerPositions.begin(), end = mPeerPositions.end();
while (it != end)
{
if (it->second->isStale(peerCutoff))
{ // proposal is stale
uint160 peerID = it->second->getPeerID();
cLog(lsWARNING) << "Removing stale proposal from " << peerID;
BOOST_FOREACH(u256_lct_pair& it, mDisputes)
it.second->unVote(peerID);
mPeerPositions.erase(it++);
}
else
{ // proposal is still fresh
++closeTimes[roundCloseTime(it->second->getCloseTime())];
++it;
}
}
BOOST_FOREACH(u256_lct_pair& it, mDisputes)
{
if (it.second->updateVote(mClosePercent, mProposing))
{
if (!changes)
{
ourPosition = mAcquired[mOurPosition->getCurrentHash()]->snapShot(true);
assert(ourPosition);
changes = true;
}
if (it.second->getOurVote()) // now a yes
{
ourPosition->addItem(SHAMapItem(it.first, it.second->peekTransaction()), true, false);
// addedTx.push_back(it.first);
}
else // now a no
{
ourPosition->delItem(it.first);
// removedTx.push_back(it.first);
}
}
}
int neededWeight;
if (mClosePercent < AV_MID_CONSENSUS_TIME)
neededWeight = AV_INIT_CONSENSUS_PCT;
else if (mClosePercent < AV_LATE_CONSENSUS_TIME)
neededWeight = AV_MID_CONSENSUS_PCT;
else neededWeight = AV_LATE_CONSENSUS_PCT;
uint32 closeTime = 0;
mHaveCloseTimeConsensus = false;
int thresh = mPeerPositions.size();
if (thresh == 0)
{ // no other times
mHaveCloseTimeConsensus = true;
closeTime = roundCloseTime(mOurPosition->getCloseTime());
}
else
{
if (mProposing)
{
++closeTimes[roundCloseTime(mOurPosition->getCloseTime())];
++thresh;
}
thresh = ((thresh * neededWeight) + (neededWeight / 2)) / 100;
if (thresh == 0)
thresh = 1;
for (std::map<uint32, int>::iterator it = closeTimes.begin(), end = closeTimes.end(); it != end; ++it)
{
cLog(lsTRACE) << "CCTime: " << it->first << " has " << it->second << ", " << thresh << " required";
if (it->second >= thresh)
{
cLog(lsDEBUG) << "Close time consensus reached: " << it->first;
mHaveCloseTimeConsensus = true;
closeTime = it->first;
thresh = it->second;
}
}
tLog(!mHaveCloseTimeConsensus, lsDEBUG) << "No CT consensus: Proposers:" << mPeerPositions.size()
<< " Proposing:" << (mProposing ? "yes" : "no") << " Thresh:" << thresh << " Pos:" << closeTime;
}
if ((!changes) &&
((closeTime != (roundCloseTime(mOurPosition->getCloseTime()))) ||
(mOurPosition->isStale(ourCutoff))))
{ // close time changed or our position is stale
ourPosition = mAcquired[mOurPosition->getCurrentHash()]->snapShot(true);
assert(ourPosition);
changes = true;
}
if (changes)
{
uint256 newHash = ourPosition->getHash();
cLog(lsINFO) << "Position change: CTime " << closeTime << ", tx " << newHash;
if (mOurPosition->changePosition(newHash, closeTime))
{
if (mProposing)
propose();
mapComplete(newHash, ourPosition, false);
}
}
}
bool LedgerConsensus::haveConsensus(bool forReal)
{ // CHECKME: should possibly count unacquired TX sets as disagreeing
int agree = 0, disagree = 0;
uint256 ourPosition = mOurPosition->getCurrentHash();
BOOST_FOREACH(u160_prop_pair& it, mPeerPositions)
{
if (!it.second->isBowOut())
{
if (it.second->getCurrentHash() == ourPosition)
++agree;
else
++disagree;
}
}
int currentValidations = theApp->getValidations().getNodesAfter(mPrevLedgerHash);
cLog(lsDEBUG) << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree;
return ContinuousLedgerTiming::haveConsensus(mPreviousProposers, agree + disagree, agree, currentValidations,
mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail);
}
SHAMap::pointer LedgerConsensus::getTransactionTree(const uint256& hash, bool doAcquire)
{
boost::unordered_map<uint256, SHAMap::pointer>::iterator it = mAcquired.find(hash);
if (it == mAcquired.end())
{ // we have not completed acquiring this ledger
if (mState == lcsPRE_CLOSE)
{
SHAMap::pointer currentMap = theApp->getLedgerMaster().getCurrentLedger()->peekTransactionMap();
if (currentMap->getHash() == hash)
{
currentMap = currentMap->snapShot(false);
mapComplete(hash, currentMap, false);
return currentMap;
}
}
if (doAcquire)
{
TransactionAcquire::pointer& acquiring = mAcquiring[hash];
if (!acquiring)
{
if (!hash)
{
SHAMap::pointer empty = boost::make_shared<SHAMap>(smtTRANSACTION);
mapComplete(hash, empty, false);
return empty;
}
acquiring = boost::make_shared<TransactionAcquire>(hash);
startAcquiring(acquiring);
}
}
return SHAMap::pointer();
}
return it->second;
}
void LedgerConsensus::startAcquiring(const TransactionAcquire::pointer& acquire)
{
boost::unordered_map< uint256, std::vector< boost::weak_ptr<Peer> > >::iterator it =
mPeerData.find(acquire->getHash());
if (it != mPeerData.end())
{ // Add any peers we already know have his transaction set
std::vector< boost::weak_ptr<Peer> >& peerList = it->second;
std::vector< boost::weak_ptr<Peer> >::iterator pit = peerList.begin();
while (pit != peerList.end())
{
Peer::pointer pr = pit->lock();
if (!pr)
pit = peerList.erase(pit);
else
{
acquire->peerHas(pr);
++pit;
}
}
}
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
BOOST_FOREACH(Peer::ref peer, peerList)
{
if (peer->hasTxSet(acquire->getHash()))
acquire->peerHas(peer);
}
acquire->setTimer();
}
void LedgerConsensus::propose()
{
cLog(lsTRACE) << "We propose: " <<
(mOurPosition->isBowOut() ? std::string("bowOut") : mOurPosition->getCurrentHash().GetHex());
ripple::TMProposeSet prop;
prop.set_currenttxhash(mOurPosition->getCurrentHash().begin(), 256 / 8);
prop.set_previousledger(mOurPosition->getPrevLedger().begin(), 256 / 8);
prop.set_proposeseq(mOurPosition->getProposeSeq());
prop.set_closetime(mOurPosition->getCloseTime());
std::vector<unsigned char> pubKey = mOurPosition->getPubKey();
std::vector<unsigned char> sig = mOurPosition->sign();
prop.set_nodepubkey(&pubKey[0], pubKey.size());
prop.set_signature(&sig[0], sig.size());
theApp->getConnectionPool().relayMessage(NULL,
boost::make_shared<PackedMessage>(prop, ripple::mtPROPOSE_LEDGER));
}
void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vector<unsigned char>& tx)
{
cLog(lsDEBUG) << "Transaction " << txID << " is disputed";
boost::unordered_map<uint256, LCTransaction::pointer>::iterator it = mDisputes.find(txID);
if (it != mDisputes.end())
return;
bool ourVote = false;
if (mOurPosition)
{
boost::unordered_map<uint256, SHAMap::pointer>::iterator mit = mAcquired.find(mOurPosition->getCurrentHash());
if (mit != mAcquired.end())
ourVote = mit->second->hasItem(txID);
else
assert(false); // We don't have our own position?
}
LCTransaction::pointer txn = boost::make_shared<LCTransaction>(txID, tx, ourVote);
mDisputes[txID] = txn;
BOOST_FOREACH(u160_prop_pair& pit, mPeerPositions)
{
boost::unordered_map<uint256, SHAMap::pointer>::const_iterator cit =
mAcquired.find(pit.second->getCurrentHash());
if ((cit != mAcquired.end()) && cit->second)
txn->setVote(pit.first, cit->second->hasItem(txID));
}
if (!ourVote && theApp->isNewFlag(txID, SF_RELAYED))
{
ripple::TMTransaction msg;
msg.set_rawtransaction(&(tx.front()), tx.size());
msg.set_status(ripple::tsNEW);
msg.set_receivetimestamp(theApp->getOPs().getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(msg, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessage(NULL, packet);
}
}
bool LedgerConsensus::peerPosition(const LedgerProposal::pointer& newPosition)
{
uint160 peerID = newPosition->getPeerID();
if (mDeadNodes.find(peerID) != mDeadNodes.end())
{
cLog(lsINFO) << "Position from dead node";
return false;
}
LedgerProposal::pointer& currentPosition = mPeerPositions[peerID];
if (currentPosition)
{
assert(peerID == currentPosition->getPeerID());
if (newPosition->getProposeSeq() <= currentPosition->getProposeSeq())
return false;
}
if (newPosition->getProposeSeq() == 0)
{ // new initial close time estimate
cLog(lsTRACE) << "Peer reports close time as " << newPosition->getCloseTime();
++mCloseTimes[newPosition->getCloseTime()];
}
else if (newPosition->getProposeSeq() == LedgerProposal::seqLeave)
{
BOOST_FOREACH(u256_lct_pair& it, mDisputes)
it.second->unVote(peerID);
mPeerPositions.erase(peerID);
mDeadNodes.insert(peerID);
return true;
}
cLog(lsTRACE) << "Processing peer proposal " << newPosition->getProposeSeq() << "/" << newPosition->getCurrentHash();
currentPosition = newPosition;
SHAMap::pointer set = getTransactionTree(newPosition->getCurrentHash(), true);
if (set)
{
BOOST_FOREACH(u256_lct_pair& it, mDisputes)
it.second->setVote(peerID, set->hasItem(it.first));
}
else
cLog(lsDEBUG) << "Don't have that tx set";
return true;
}
bool LedgerConsensus::peerHasSet(Peer::ref peer, const uint256& hashSet, ripple::TxSetStatus status)
{
if (status != ripple::tsHAVE) // Indirect requests are for future support
return true;
std::vector< boost::weak_ptr<Peer> >& set = mPeerData[hashSet];
BOOST_FOREACH(boost::weak_ptr<Peer>& iit, set)
if (iit.lock() == peer)
return false;
set.push_back(peer);
boost::unordered_map<uint256, TransactionAcquire::pointer>::iterator acq = mAcquiring.find(hashSet);
if (acq != mAcquiring.end())
acq->second->peerHas(peer);
return true;
}
SMAddNode LedgerConsensus::peerGaveNodes(Peer::ref peer, const uint256& setHash,
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData)
{
boost::unordered_map<uint256, TransactionAcquire::pointer>::iterator acq = mAcquiring.find(setHash);
if (acq == mAcquiring.end())
{
cLog(lsINFO) << "Got TX data for set not acquiring: " << setHash;
return SMAddNode();
}
TransactionAcquire::pointer set = acq->second; // We must keep the set around during the function
return set->takeNodes(nodeIDs, nodeData, peer);
}
void LedgerConsensus::beginAccept(bool synchronous)
{
SHAMap::pointer consensusSet = mAcquired[mOurPosition->getCurrentHash()];
if (!consensusSet)
{
cLog(lsFATAL) << "We don't have a consensus set";
abort();
return;
}
theApp->getOPs().newLCL(mPeerPositions.size(), mCurrentMSeconds, mNewLedgerHash);
if (synchronous)
accept(consensusSet, LoadEvent::pointer());
else
{
theApp->getIOService().post(boost::bind(&LedgerConsensus::accept, shared_from_this(), consensusSet,
theApp->getJobQueue().getLoadEvent(jtACCEPTLEDGER)));
}
}
void LedgerConsensus::playbackProposals()
{
boost::unordered_map<uint160,
std::list<LedgerProposal::pointer> >& storedProposals = theApp->getOPs().peekStoredProposals();
for (boost::unordered_map< uint160, std::list<LedgerProposal::pointer> >::iterator
it = storedProposals.begin(), end = storedProposals.end(); it != end; ++it)
{
bool relay = false;
BOOST_FOREACH(const LedgerProposal::pointer& proposal, it->second)
{
if (proposal->hasSignature())
{ // we have the signature but don't know the ledger so couldn't verify
proposal->setPrevLedger(mPrevLedgerHash);
if (proposal->checkSign())
{
cLog(lsINFO) << "Applying stored proposal";
relay = peerPosition(proposal);
}
}
else if (proposal->isPrevLedger(mPrevLedgerHash))
relay = peerPosition(proposal);
if (relay)
{
cLog(lsWARNING) << "We should do delayed relay of this proposal, but we cannot";
}
#if 0 // FIXME: We can't do delayed relay because we don't have the signature
std::set<uint64> peers
if (relay && theApp->getSuppression().swapSet(proposal.getSuppress(), set, SF_RELAYED))
{
cLog(lsDEBUG) << "Stored proposal delayed relay";
ripple::TMProposeSet set;
set.set_proposeseq
set.set_currenttxhash(, 256 / 8);
previousledger
closetime
nodepubkey
signature
PackedMessage::pointer message = boost::make_shared<PackedMessage>(set, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessageBut(peers, message);
}
#endif
}
}
}
#define LCAT_SUCCESS 0
#define LCAT_FAIL 1
#define LCAT_RETRY 2
int LedgerConsensus::applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref ledger,
bool openLedger, bool retryAssured)
{ // Returns false if the transaction has need not be retried.
TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE;
if (retryAssured)
parms = static_cast<TransactionEngineParams>(parms | tapRETRY);
cLog(lsDEBUG) << "TXN " << txn->getTransactionID()
<< (openLedger ? " open" : " closed")
<< (retryAssured ? "/retry" : "/final");
cLog(lsTRACE) << txn->getJson(0);
#ifndef TRUST_NETWORK
try
{
#endif
bool didApply;
TER result = engine.applyTransaction(*txn, parms, didApply);
if (didApply)
{
cLog(lsDEBUG) << "Transaction success: " << transHuman(result);
return LCAT_SUCCESS;
}
if (isTefFailure(result) || isTemMalformed(result))
{ // failure
cLog(lsDEBUG) << "Transaction failure: " << transHuman(result);
return LCAT_FAIL;
}
cLog(lsDEBUG) << "Transaction retry: " << transHuman(result);
assert(!ledger->hasTransaction(txn->getTransactionID()));
return LCAT_RETRY;
#ifndef TRUST_NETWORK
}
catch (...)
{
cLog(lsWARNING) << "Throws";
return false;
}
#endif
}
void LedgerConsensus::applyTransactions(SHAMap::ref set, Ledger::ref applyLedger,
Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr)
{
TransactionEngine engine(applyLedger);
for (SHAMapItem::pointer item = set->peekFirstItem(); !!item; item = set->peekNextItem(item->getTag()))
if (!checkLedger->hasTransaction(item->getTag()))
{
cLog(lsINFO) << "Processing candidate transaction: " << item->getTag();
#ifndef TRUST_NETWORK
try
{
#endif
SerializerIterator sit(item->peekSerializer());
SerializedTransaction::pointer txn = boost::make_shared<SerializedTransaction>(boost::ref(sit));
if (applyTransaction(engine, txn, applyLedger, openLgr, true) != LCAT_FAIL)
failedTransactions.push_back(txn);
#ifndef TRUST_NETWORK
}
catch (...)
{
cLog(lsWARNING) << " Throws";
}
#endif
}
int changes;
bool certainRetry = true;
for (int pass = 0; pass < 8; ++pass)
{
cLog(lsDEBUG) << "Pass: " << pass << " Txns: " << failedTransactions.size()
<< (certainRetry ? " retriable" : " final");
changes = 0;
CanonicalTXSet::iterator it = failedTransactions.begin();
while (it != failedTransactions.end())
{
try
{
switch (applyTransaction(engine, it->second, applyLedger, openLgr, certainRetry))
{
case LCAT_SUCCESS:
it = failedTransactions.erase(it);
++changes;
break;
case LCAT_FAIL:
it = failedTransactions.erase(it);
break;
case LCAT_RETRY:
++it;
}
}
catch (...)
{
cLog(lsWARNING) << "Transaction throws";
it = failedTransactions.erase(it);
}
}
cLog(lsDEBUG) << "Pass: " << pass << " finished " << changes << " changes";
// A non-retry pass made no changes
if (!changes && !certainRetry)
return;
// Stop retriable passes
if ((!changes) || (pass >= 4))
certainRetry = false;
}
}
uint32 LedgerConsensus::roundCloseTime(uint32 closeTime)
{
return closeTime - (closeTime % mCloseResolution);
}
void LedgerConsensus::accept(SHAMap::ref set, LoadEvent::pointer)
{
if (set->getHash().isNonZero()) // put our set where others can get it later
theApp->getOPs().takePosition(mPreviousLedger->getLedgerSeq(), set);
boost::recursive_mutex::scoped_lock masterLock(theApp->getMasterLock());
assert(set->getHash() == mOurPosition->getCurrentHash());
uint32 closeTime = roundCloseTime(mOurPosition->getCloseTime());
bool closeTimeCorrect = true;
if (closeTime == 0)
{ // we agreed to disagree
closeTimeCorrect = false;
closeTime = mPreviousLedger->getCloseTimeNC() + 1;
}
cLog(lsDEBUG) << "Report: Prop=" << (mProposing ? "yes" : "no") << " val=" << (mValidating ? "yes" : "no") <<
" corLCL=" << (mHaveCorrectLCL ? "yes" : "no") << " fail="<< (mConsensusFail ? "yes" : "no");
cLog(lsDEBUG) << "Report: Prev = " << mPrevLedgerHash << ":" << mPreviousLedger->getLedgerSeq();
cLog(lsDEBUG) << "Report: TxSt = " << set->getHash() << ", close " << closeTime << (closeTimeCorrect ? "" : "X");
CanonicalTXSet failedTransactions(set->getHash());
Ledger::pointer newLCL = boost::make_shared<Ledger>(false, boost::ref(*mPreviousLedger));
newLCL->peekTransactionMap()->armDirty();
newLCL->peekAccountStateMap()->armDirty();
cLog(lsDEBUG) << "Applying consensus set transactions to the last closed ledger";
applyTransactions(set, newLCL, newLCL, failedTransactions, false);
newLCL->updateSkipList();
newLCL->setClosed();
boost::shared_ptr<SHAMap::SHADirtyMap> acctNodes = newLCL->peekAccountStateMap()->disarmDirty();
boost::shared_ptr<SHAMap::SHADirtyMap> txnNodes = newLCL->peekTransactionMap()->disarmDirty();
// write out dirty nodes (temporarily done here) Most come before setAccepted
int fc;
while ((fc = SHAMap::flushDirty(*acctNodes, 256, hotACCOUNT_NODE, newLCL->getLedgerSeq())) > 0)
{ cLog(lsTRACE) << "Flushed " << fc << " dirty state nodes"; }
while ((fc = SHAMap::flushDirty(*txnNodes, 256, hotTRANSACTION_NODE, newLCL->getLedgerSeq())) > 0)
{ cLog(lsTRACE) << "Flushed " << fc << " dirty transaction nodes"; }
cLog(lsDEBUG) << "Report: NewL = " << newLCL->getHash() << ":" << newLCL->getLedgerSeq();
newLCL->setAccepted(closeTime, mCloseResolution, closeTimeCorrect);
newLCL->updateHash();
uint256 newLCLHash = newLCL->getHash();
if (sLog(lsTRACE))
{
cLog(lsTRACE) << "newLCL";
Json::Value p;
newLCL->addJson(p, LEDGER_JSON_DUMP_TXRP | LEDGER_JSON_DUMP_STATE);
cLog(lsTRACE) << p;
}
statusChange(ripple::neACCEPTED_LEDGER, *newLCL);
if (mValidating && !mConsensusFail)
{
uint256 signingHash;
SerializedValidation::pointer v = boost::make_shared<SerializedValidation>
(newLCLHash, theApp->getOPs().getValidationTimeNC(), mValPublic, mProposing);
v->setFieldU32(sfLedgerSequence, newLCL->getLedgerSeq());
v->sign(signingHash, mValPrivate);
v->setTrusted();
theApp->isNew(signingHash); // suppress it if we receive it
theApp->getValidations().addValidation(v);
theApp->getOPs().setLastValidation(v);
std::vector<unsigned char> validation = v->getSigned();
ripple::TMValidation val;
val.set_validation(&validation[0], validation.size());
int j = theApp->getConnectionPool().relayMessage(NULL,
boost::make_shared<PackedMessage>(val, ripple::mtVALIDATION));
cLog(lsINFO) << "CNF Val " << newLCLHash << " to " << j << " peers";
}
else
cLog(lsINFO) << "CNF newLCL " << newLCLHash;
Ledger::pointer newOL = boost::make_shared<Ledger>(true, boost::ref(*newLCL));
ScopedLock sl( theApp->getLedgerMaster().getLock());
// Apply disputed transactions that didn't get in
TransactionEngine engine(newOL);
BOOST_FOREACH(u256_lct_pair& it, mDisputes)
{
if (!it.second->getOurVote())
{ // we voted NO
try
{
cLog(lsDEBUG) << "Test applying disputed transaction that did not get in";
SerializerIterator sit(it.second->peekTransaction());
SerializedTransaction::pointer txn = boost::make_shared<SerializedTransaction>(boost::ref(sit));
if (applyTransaction(engine, txn, newOL, true, false))
failedTransactions.push_back(txn);
}
catch (...)
{
cLog(lsDEBUG) << "Failed to apply transaction we voted NO on";
}
}
}
cLog(lsDEBUG) << "Applying transactions from current open ledger";
applyTransactions(theApp->getLedgerMaster().getCurrentLedger()->peekTransactionMap(), newOL, newLCL,
failedTransactions, true);
theApp->getLedgerMaster().pushLedger(newLCL, newOL, !mConsensusFail);
mNewLedgerHash = newLCL->getHash();
mState = lcsACCEPTED;
sl.unlock();
if (mValidating)
{ // see how close our close time is to other node's close time reports
cLog(lsINFO) << "We closed at " << boost::lexical_cast<std::string>(mCloseTime);
uint64 closeTotal = mCloseTime;
int closeCount = 1;
for (std::map<uint32, int>::iterator it = mCloseTimes.begin(), end = mCloseTimes.end(); it != end; ++it)
{ // FIXME: Use median, not average
cLog(lsINFO) << boost::lexical_cast<std::string>(it->second) << " time votes for "
<< boost::lexical_cast<std::string>(it->first);
closeCount += it->second;
closeTotal += static_cast<uint64>(it->first) * static_cast<uint64>(it->second);
}
closeTotal += (closeCount / 2);
closeTotal /= closeCount;
int offset = static_cast<int>(closeTotal) - static_cast<int>(mCloseTime);
cLog(lsINFO) << "Our close offset is estimated at " << offset << " (" << closeCount << ")";
theApp->getOPs().closeTimeOffset(offset);
}
}
void LedgerConsensus::endConsensus()
{
theApp->getOPs().endConsensus(mHaveCorrectLCL);
}
void LedgerConsensus::simulate()
{
cLog(lsINFO) << "Simulating consensus";
closeLedger();
mCurrentMSeconds = 100;
beginAccept(true);
endConsensus();
cLog(lsINFO) << "Simulation complete";
}
Json::Value LedgerConsensus::getJson()
{
Json::Value ret(Json::objectValue);
ret["proposing"] = mProposing;
ret["validating"] = mValidating;
ret["proposers"] = static_cast<int>(mPeerPositions.size());
if (mHaveCorrectLCL)
{
ret["synched"] = true;
ret["ledger_seq"] = mPreviousLedger->getLedgerSeq() + 1;
ret["close_granularity"] = mCloseResolution;
}
else
ret["synched"] = false;
switch (mState)
{
case lcsPRE_CLOSE: ret["state"] = "open"; break;
case lcsESTABLISH: ret["state"] = "consensus"; break;
case lcsFINISHED: ret["state"] = "finished"; break;
case lcsACCEPTED: ret["state"] = "accepted"; break;
}
int v = mDisputes.size();
if (v != 0)
ret["disputes"] = v;
if (mOurPosition)
ret["our_position"] = mOurPosition->getJson();
return ret;
}
// vim:ts=4