Consensus refactor preliminary changes (RIPD-1011):

* Remove extraneous passing of transaction set hashes
* Remove recentPositions_. InboundTXs does the job now
* Move responsibility for sending "have TX set" out of consensus
This commit is contained in:
JoelKatz
2016-08-29 10:50:08 -07:00
committed by Edward Hennis
parent ed02b0717e
commit 97806b42c4
10 changed files with 74 additions and 101 deletions

View File

@@ -78,8 +78,8 @@ make_InboundTransactions (
Stoppable& parent, Stoppable& parent,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
std::function std::function
<void (uint256 const&, <void (std::shared_ptr <SHAMap> const&,
std::shared_ptr <SHAMap> const&)> gotSet); bool)> gotSet);
} // ripple } // ripple

View File

@@ -47,7 +47,7 @@ public:
virtual uint256 getLCL () = 0; virtual uint256 getLCL () = 0;
virtual void gotMap (uint256 const& hash, virtual void gotMap (
std::shared_ptr<SHAMap> const& map) = 0; std::shared_ptr<SHAMap> const& map) = 0;
virtual void timerEntry () = 0; virtual void timerEntry () = 0;

View File

@@ -154,29 +154,6 @@ ConsensusImp::storeProposal (
props.push_back (proposal); props.push_back (proposal);
} }
// Must be called while holding the master lock
void
ConsensusImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position)
{
std::lock_guard <std::mutex> _(lock_);
recentPositions_[position->getHash ().as_uint256()] = std::make_pair (seq, position);
if (recentPositions_.size () > 4)
{
for (auto i = recentPositions_.begin (); i != recentPositions_.end ();)
{
if (i->second.first < (seq - 2))
{
recentPositions_.erase (i);
return;
}
++i;
}
}
}
std::vector <std::shared_ptr <LedgerProposal>> std::vector <std::shared_ptr <LedgerProposal>>
ConsensusImp::getStoredProposals (uint256 const& prevLedger) ConsensusImp::getStoredProposals (uint256 const& prevLedger)
{ {

View File

@@ -94,8 +94,6 @@ public:
NetClock::time_point NetClock::time_point
getLastCloseTime () const; getLastCloseTime () const;
void takePosition (int seq, std::shared_ptr<SHAMap> const& position);
std::vector <std::shared_ptr <LedgerProposal>> std::vector <std::shared_ptr <LedgerProposal>>
getStoredProposals (uint256 const& previousLedger); getStoredProposals (uint256 const& previousLedger);
@@ -122,12 +120,9 @@ private:
// The last close time // The last close time
NetClock::time_point lastCloseTime_; NetClock::time_point lastCloseTime_;
// Recent positions taken
std::map<uint256, std::pair<int, std::shared_ptr<SHAMap>>> recentPositions_;
Consensus::Proposals storedProposals_; Consensus::Proposals storedProposals_;
// lock to protect recentPositions_ and storedProposals_ // lock to protect storedProposals_
std::mutex lock_; std::mutex lock_;
}; };

View File

@@ -70,8 +70,8 @@ public:
clock_type& clock, clock_type& clock,
Stoppable& parent, Stoppable& parent,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
std::function <void (uint256 const&, std::function <void (std::shared_ptr <SHAMap> const&,
std::shared_ptr <SHAMap> const&)> gotSet) bool)> gotSet)
: Stoppable ("InboundTransactions", parent) : Stoppable ("InboundTransactions", parent)
, app_ (app) , app_ (app)
, m_clock (clock) , m_clock (clock)
@@ -202,8 +202,8 @@ public:
} }
if (isNew && fromAcquire) if (isNew)
m_gotSet (hash, set); m_gotSet (set, fromAcquire);
} }
Json::Value getInfo() override Json::Value getInfo() override
@@ -285,7 +285,7 @@ private:
// The empty transaction set whose hash is zero // The empty transaction set whose hash is zero
InboundTransactionSet& m_zeroSet; InboundTransactionSet& m_zeroSet;
std::function <void (uint256 const&, std::shared_ptr <SHAMap> const&)> m_gotSet; std::function <void (std::shared_ptr <SHAMap> const&, bool)> m_gotSet;
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -298,8 +298,8 @@ make_InboundTransactions (
InboundLedgers::clock_type& clock, InboundLedgers::clock_type& clock,
Stoppable& parent, Stoppable& parent,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
std::function <void (uint256 const&, std::function <void (std::shared_ptr <SHAMap> const&,
std::shared_ptr <SHAMap> const&)> gotSet) bool)> gotSet)
{ {
return std::make_unique <InboundTransactionsImp> return std::make_unique <InboundTransactionsImp>
(app, clock, parent, collector, std::move (gotSet)); (app, clock, parent, collector, std::move (gotSet));

View File

@@ -214,10 +214,11 @@ uint256 LedgerConsensusImp::getLCL ()
// and update our tracking if any validators currently // and update our tracking if any validators currently
// propose it // propose it
void LedgerConsensusImp::mapCompleteInternal ( void LedgerConsensusImp::mapCompleteInternal (
uint256 const& hash,
std::shared_ptr<SHAMap> const& map, std::shared_ptr<SHAMap> const& map,
bool acquired) bool acquired)
{ {
auto hash = map->getHash ().as_uint256();
{ {
auto it = acquired_.find (hash); auto it = acquired_.find (hash);
if (it != acquired_.end ()) if (it != acquired_.end ())
@@ -266,9 +267,6 @@ void LedgerConsensusImp::mapCompleteInternal (
inboundTransactions_.giveSet (hash, map, false); inboundTransactions_.giveSet (hash, map, false);
} }
// Inform directly-connected peers that we have this transaction set
sendHaveTxSet (hash, true);
if (ourPosition_ && (! ourPosition_->isBowOut ()) if (ourPosition_ && (! ourPosition_->isBowOut ())
&& (hash != ourPosition_->getCurrentHash ())) && (hash != ourPosition_->getCurrentHash ()))
{ {
@@ -324,17 +322,17 @@ void LedgerConsensusImp::mapCompleteInternal (
} }
void LedgerConsensusImp::gotMap ( void LedgerConsensusImp::gotMap (
uint256 const& hash,
std::shared_ptr<SHAMap> const& map) std::shared_ptr<SHAMap> const& map)
{ {
std::lock_guard<std::recursive_mutex> _(lock_); std::lock_guard<std::recursive_mutex> _(lock_);
try try
{ {
mapCompleteInternal (hash, map, true); mapCompleteInternal (map, true);
} }
catch (SHAMapMissingNode const& mn) catch (SHAMapMissingNode const& mn)
{ {
// This should never happen
leaveConsensus(); leaveConsensus();
JLOG (j_.error()) << JLOG (j_.error()) <<
"Missing node processing complete map " << mn; "Missing node processing complete map " << mn;
@@ -542,6 +540,7 @@ void LedgerConsensusImp::timerEntry ()
} }
catch (SHAMapMissingNode const& mn) catch (SHAMapMissingNode const& mn)
{ {
// This should never happen
leaveConsensus (); leaveConsensus ();
JLOG (j_.error()) << JLOG (j_.error()) <<
"Missing node during consensus process " << mn; "Missing node during consensus process " << mn;
@@ -782,10 +781,6 @@ void LedgerConsensusImp::simulate (
void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set) void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
{ {
// put our set where others can get it later
if (set->getHash ().isNonZero ())
consensus_.takePosition (previousLedger_->info().seq, set);
auto closeTime = ourPosition_->getCloseTime(); auto closeTime = ourPosition_->getCloseTime();
bool closeTimeCorrect; bool closeTimeCorrect;
@@ -1218,16 +1213,6 @@ void LedgerConsensusImp::propose ()
app_.overlay().send(prop); app_.overlay().send(prop);
} }
void LedgerConsensusImp::sendHaveTxSet (uint256 const& hash, bool direct)
{
protocol::TMHaveTransactionSet msg;
msg.set_hash (hash.begin (), 256 / 8);
msg.set_status (direct ? protocol::tsHAVE : protocol::tsCAN_GET);
app_.overlay ().foreach (send_always (
std::make_shared <Message> (
msg, protocol::mtHAVE_SET)));
}
void LedgerConsensusImp::statusChange ( void LedgerConsensusImp::statusChange (
protocol::NodeEvent event, ReadView const& ledger) protocol::NodeEvent event, ReadView const& ledger)
{ {
@@ -1266,10 +1251,17 @@ void LedgerConsensusImp::statusChange (
JLOG (j_.trace()) << "send status change to peer"; JLOG (j_.trace()) << "send status change to peer";
} }
void LedgerConsensusImp::takeInitialPosition ( // For the consensus refactor, takeInitialPosition has been split
std::shared_ptr<ReadView const> const& initialLedger) // into two pieces. This piece, makeInitialPosition does the
// non-consensus parts
std::shared_ptr<SHAMap> LedgerConsensusImp::makeInitialPosition ()
{ {
std::shared_ptr<SHAMap> initialSet = std::make_shared <SHAMap> ( // Tell the ledger master not to acquire the ledger we're probably building
ledgerMaster_.setBuildingLedger (previousLedger_->info().seq + 1);
auto initialLedger = app_.openLedger().current();
auto initialSet = std::make_shared <SHAMap> (
SHAMapType::TRANSACTION, app_.family(), SHAMap::version{1}); SHAMapType::TRANSACTION, app_.family(), SHAMap::version{1});
initialSet->setUnbacked (); initialSet->setUnbacked ();
@@ -1311,21 +1303,23 @@ void LedgerConsensusImp::takeInitialPosition (
} }
// Set should be immutable snapshot // Set should be immutable snapshot
initialSet = initialSet->snapShot (false); return initialSet->snapShot (false);
}
// Tell the ledger master not to acquire the ledger we're probably building void LedgerConsensusImp::takeInitialPosition()
ledgerMaster_.setBuildingLedger (previousLedger_->info().seq + 1); {
auto initialSet = makeInitialPosition();
auto txSet = initialSet->getHash ().as_uint256(); mapCompleteInternal (initialSet, false);
JLOG (j_.info()) << "initial position " << txSet;
mapCompleteInternal (txSet, initialSet, false);
ourPosition_ = std::make_shared<LedgerProposal> ( ourPosition_ = std::make_shared<LedgerProposal> (
initialLedger->info().parentHash, txSet, closeTime_); previousLedger_->info().hash,
initialSet->getHash().as_uint256(),
closeTime_);
for (auto& it : disputes_) for (auto& it : disputes_)
{ {
it.second->setOurVote (initialLedger->txExists (it.first)); it.second->setOurVote (initialSet->hasItem (it.first));
} }
// if any peers have taken a contrary position, process disputes // if any peers have taken a contrary position, process disputes
@@ -1550,7 +1544,7 @@ void LedgerConsensusImp::updateOurPositions ()
if (proposing_) if (proposing_)
propose (); propose ();
mapCompleteInternal (newHash, ourPosition, false); mapCompleteInternal (ourPosition, false);
} }
} }
} }
@@ -1598,7 +1592,7 @@ void LedgerConsensusImp::closeLedger ()
consensus_.setLastCloseTime(closeTime_); consensus_.setLastCloseTime(closeTime_);
statusChange (protocol::neCLOSING_LEDGER, *previousLedger_); statusChange (protocol::neCLOSING_LEDGER, *previousLedger_);
ledgerMaster_.applyHeldTransactions (); ledgerMaster_.applyHeldTransactions ();
takeInitialPosition (app_.openLedger().current()); takeInitialPosition ();
} }
void LedgerConsensusImp::checkOurValidation () void LedgerConsensusImp::checkOurValidation ()

View File

@@ -125,12 +125,10 @@ public:
/** /**
We have a complete transaction set, typically acquired from the network We have a complete transaction set, typically acquired from the network
@param hash hash of the transaction set.
@param map the transaction set. @param map the transaction set.
@param acquired true if we have acquired the transaction set. @param acquired true if we have acquired the transaction set.
*/ */
void gotMap ( void gotMap (
uint256 const& hash,
std::shared_ptr<SHAMap> const& map) override; std::shared_ptr<SHAMap> const& map) override;
/** /**
@@ -186,12 +184,10 @@ private:
/** /**
We have a complete transaction set, typically acquired from the network We have a complete transaction set, typically acquired from the network
@param hash hash of the transaction set.
@param map the transaction set. @param map the transaction set.
@param acquired true if we have acquired the transaction set. @param acquired true if we have acquired the transaction set.
*/ */
void mapCompleteInternal ( void mapCompleteInternal (
uint256 const& hash,
std::shared_ptr<SHAMap> const& map, std::shared_ptr<SHAMap> const& map,
bool acquired); bool acquired);
@@ -240,15 +236,6 @@ private:
*/ */
void propose (); void propose ();
/** Let peers know that we a particular transactions set so they
can fetch it from us.
@param hash The ID of the transaction.
@param direct true if we have this transaction set locally, else a
directly connected peer has it.
*/
void sendHaveTxSet (uint256 const& hash, bool direct);
/** Send a node status change message to our directly connected peers /** Send a node status change message to our directly connected peers
@param event The event which caused the status change. This is @param event The event which caused the status change. This is
@@ -257,12 +244,14 @@ private:
*/ */
void statusChange (protocol::NodeEvent event, ReadView const& ledger); void statusChange (protocol::NodeEvent event, ReadView const& ledger);
/** Take an initial position on what we think the consensus should be /** Determine our initial proposed transaction set based on
based on the transactions that made it into our open ledger our open ledger
@param initialLedger The ledger that contains our initial position.
*/ */
void takeInitialPosition (std::shared_ptr<ReadView const> const& initialLedger); std::shared_ptr<SHAMap> makeInitialPosition();
/** Take an initial position on what we think the consensus set should be
*/
void takeInitialPosition ();
/** /**
Called while trying to avalanche towards consensus. Called while trying to avalanche towards consensus.

View File

@@ -456,10 +456,11 @@ public:
( *this, stopwatch() ( *this, stopwatch()
, *m_jobQueue , *m_jobQueue
, m_collectorManager->collector () , m_collectorManager->collector ()
, [this](uint256 const& setHash, , [this](std::shared_ptr <SHAMap> const& set,
std::shared_ptr <SHAMap> const& set) bool fromAcquire)
{ {
gotTXSet (setHash, set); if (set)
gotTXSet (set, fromAcquire);
})) }))
, m_acceptedLedgerCache ("AcceptedLedger", 4, 60, stopwatch(), , m_acceptedLedgerCache ("AcceptedLedger", 4, 60, stopwatch(),
@@ -614,9 +615,10 @@ public:
return m_acceptedLedgerCache; return m_acceptedLedgerCache;
} }
void gotTXSet (uint256 const& setHash, std::shared_ptr<SHAMap> const& set) void gotTXSet (std::shared_ptr<SHAMap> const& set, bool fromAcquire)
{ {
m_networkOPs->mapComplete (setHash, set); if (set)
m_networkOPs->mapComplete (set, fromAcquire);
} }
TransactionMaster& getMasterTransaction () override TransactionMaster& getMasterTransaction () override

View File

@@ -297,7 +297,9 @@ public:
const std::shared_ptr<Peer>& peer, uint256 const& set, const std::shared_ptr<Peer>& peer, uint256 const& set,
protocol::TxSetStatus status); protocol::TxSetStatus status);
void mapComplete (uint256 const& hash, std::shared_ptr<SHAMap> const& map) override; void mapComplete (
std::shared_ptr<SHAMap> const& map,
bool fromAcquire) override;
// Network state machine. // Network state machine.
@@ -1512,10 +1514,24 @@ void NetworkOPsImp::processTrustedProposal (
} }
void void
NetworkOPsImp::mapComplete (uint256 const& hash, NetworkOPsImp::mapComplete (
std::shared_ptr<SHAMap> const& map) std::shared_ptr<SHAMap> const& map, bool fromAcquire)
{ {
mLedgerConsensus->gotMap (hash, map); // We now have an additional transaction set
// either created locally during the consensus process
// or acquired from a peer
// Inform peers we have this set
protocol::TMHaveTransactionSet msg;
msg.set_hash (map->getHash().as_uint256().begin(), 256 / 8);
msg.set_status (protocol::tsHAVE);
app_.overlay().foreach (send_always (
std::make_shared<Message> (
msg, protocol::mtHAVE_SET)));
// We acquired it because consensus asked us to
if (fromAcquire)
mLedgerConsensus->gotMap (map);
} }
void NetworkOPsImp::endConsensus (bool correctLCL) void NetworkOPsImp::endConsensus (bool correctLCL)

View File

@@ -157,8 +157,8 @@ public:
virtual bool recvValidation (STValidation::ref val, virtual bool recvValidation (STValidation::ref val,
std::string const& source) = 0; std::string const& source) = 0;
virtual void mapComplete (uint256 const& hash, virtual void mapComplete (std::shared_ptr<SHAMap> const& map,
std::shared_ptr<SHAMap> const& map) = 0; bool fromAcquire) = 0;
// network state machine // network state machine
virtual bool beginConsensus (uint256 const& netLCL) = 0; virtual bool beginConsensus (uint256 const& netLCL) = 0;