Minimize differences between my working code and the main repo to avoid conficts:

Consensus startup.
Logic to create consensus structure, change state transitions.
Fix bugs in switchLastClosedLedger
Protocol changes.
This commit is contained in:
JoelKatz
2012-05-19 05:43:41 -07:00
parent de3d88f01b
commit e47fb22087
5 changed files with 75 additions and 59 deletions

View File

@@ -3,11 +3,14 @@
#include <list>
#include <boost/weak_ptr.hpp>
#include <boost/unordered/unordered_map.hpp>
#include "key.h"
#include "Transaction.h"
#include "LedgerAcquire.h"
#include "LedgerProposal.h"
#include "Peer.h"
class LCPosition
{ // A position taken by one of our trusted peers
@@ -63,6 +66,7 @@ class LedgerConsensus
{
protected:
Ledger::pointer mPreviousLedger, mCurrentLedger;
LedgerProposal::pointer mCurrentProposal;
// Convergence tracking, trusted peers indexed by hash of public key
boost::unordered_map<uint256, LCPosition::pointer> mPeerPositions;
@@ -74,9 +78,13 @@ protected:
// Peer sets
boost::unordered_map<uint256, std::vector< boost::weak_ptr<Peer> > > mPeerData;
void startup();
void weHave(const uint256& id, Peer::pointer avoidPeer);
public:
LedgerConsensus(Ledger::pointer previousLedger, Ledger::pointer currentLedger) :
mPreviousLedger(previousLedger), mCurrentLedger(currentLedger) { ; }
mPreviousLedger(previousLedger), mCurrentLedger(currentLedger)
{ startup(); }
Ledger::pointer peekPreviousLedger() { return mPreviousLedger; }
Ledger::pointer peekCurrentLedger() { return mCurrentLedger; }
@@ -90,6 +98,7 @@ public:
LCPosition::pointer getPeerPosition(const uint256& peer);
// high-level functions
void abort();
bool peerPosition(Peer::pointer peer, const Serializer& report);
bool peerHasSet(Peer::pointer peer, const std::vector<uint256>& sets);
bool peerGaveNodes(Peer::pointer peer, const uint256& setHash,

View File

@@ -7,6 +7,7 @@
#include "utils.h"
#include "Application.h"
#include "Transaction.h"
#include "LedgerConsensus.h"
// This is the primary interface into the "client" portion of the program.
@@ -155,7 +156,7 @@ void NetworkOPs::setStateTimer(int sec)
uint64 closedTime = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC();
uint64 now = getNetworkTimeNC();
if (mMode == omFULL)
if ((mMode == omFULL) && !mConsensus)
{
if (now >= closedTime) sec = 0;
else if (sec > (closedTime - now)) sec = (closedTime - now);
@@ -289,7 +290,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
}
consensus = acq->getLedger();
}
switchLastClosedLedger(consensus, false);
switchLastClosedLedger(consensus);
}
if (mMode == omCONNECTED)
@@ -300,6 +301,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
if (mMode == omTRACKING)
{
// check if the ledger is good enough to go to omFULL
// Note: Do not go to omFULL if we don't have the previous ledger
// check if the ledger is bad enough to go to omCONNECTED
}
@@ -309,29 +311,25 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
}
Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger();
if (getNetworkTimeNC() >= currentLedger->getCloseTimeNC())
{
currentLedger->setClosed();
switchLastClosedLedger(currentLedger, true);
}
if ( (getNetworkTimeNC() >= currentLedger->getCloseTimeNC()) && !mConsensus)
beginConsensus(currentLedger);
setStateTimer(10);
}
void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger, bool normal)
{ // set the newledger as our last closed ledger
// FIXME: Correct logic is:
// 1) Mark this ledger closed, schedule it to be saved
// 2) If normal, reprocess transactions
// 3) Open a new subsequent ledger
// 4) Walk back the previous ledger chain from our current ledger and the new last closed ledger
// find a common previous ledger, if possible. Try to insert any transactions in our ledger
// chain into the new open ledger. Broadcast any that make it in.
void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger)
{ // set the newledger as our last closed ledger -- this is abnormal code
#ifdef DEBUG
std::cerr << "Switching last closed ledger to " << newLedger->getHash().GetHex() << std::endl;
#endif
if (mConsensus)
{
mConsensus->abort();
mConsensus = boost::shared_ptr<LedgerConsensus>();
}
newLedger->setClosed();
Ledger::pointer openLedger = boost::make_shared<Ledger>(newLedger);
theApp->getMasterLedger().switchLedgers(newLedger, openLedger);
@@ -340,24 +338,40 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger, bool normal)
{ // this ledger has already closed
}
boost::shared_ptr<newcoin::TMStatusChange> s = boost::make_shared<newcoin::TMStatusChange>();
}
// vim:ts=4
s->set_newevent(normal ? newcoin::neACCEPTED_LEDGER : newcoin::neSWITCHED_LEDGER);
s->set_ledgerseq(newLedger->getLedgerSeq());
s->set_networktime(getNetworkTimeNC());
void NetworkOPs::beginConsensus(Ledger::pointer closingLedger)
{
if (mMode != omFULL)
{ // We just close this ledger and start a new one
switchLastClosedLedger(closingLedger);
return;
}
Ledger::pointer prevLedger = theApp->getMasterLedger().getLedgerByHash(closingLedger->getParentHash());
if (!prevLedger)
{ // this shouldn't happen if we jump ledgers
mMode = omTRACKING;
return;
}
uint256 lhash = newLedger->getHash();
s->set_ledgerhash(lhash.begin(), lhash.size());
lhash = newLedger->getParentHash();
s->set_previousledgerhash(lhash.begin(), lhash.size());
// Create a new ledger to be the open ledger
theApp->getMasterLedger().pushLedger(boost::make_shared<Ledger>(closingLedger));
// Create a consensus object to get consensus on this ledger
if (!!mConsensus) mConsensus->abort();
mConsensus = boost::make_shared<LedgerConsensus>(prevLedger, closingLedger);
#ifdef DEBUG
std::cerr << "Broadcasting ledger change" << std::endl;
std::cerr << "Broadcasting ledger close" << std::endl;
#endif
boost::shared_ptr<newcoin::TMStatusChange> s = boost::make_shared<newcoin::TMStatusChange>();
s->set_newevent(newcoin::neCLOSING_LEDGER);
s->set_ledgerseq(closingLedger->getLedgerSeq());
s->set_networktime(getNetworkTimeNC());
uint256 plhash = closingLedger->getParentHash();
s->set_previousledgerhash(plhash.begin(), plhash.size());
PackedMessage::pointer packet =
boost::make_shared<PackedMessage>(PackedMessage::MessagePointer(s), newcoin::mtSTATUS_CHANGE);
theApp->getConnectionPool().relayMessage(NULL, packet);
}
// vim:ts=4

View File

@@ -11,6 +11,7 @@
// Master operational handler, server sequencer, network tracker
class Peer;
class LedgerConsensus;
class NetworkOPs
{
@@ -30,8 +31,9 @@ public:
};
protected:
OperatingMode mMode;
boost::asio::deadline_timer mNetTimer;
OperatingMode mMode;
boost::asio::deadline_timer mNetTimer;
boost::shared_ptr<LedgerConsensus> mConsensus;
public:
NetworkOPs(boost::asio::io_service& io_service);
@@ -70,7 +72,8 @@ public:
// network state machine
void checkState(const boost::system::error_code& result);
void switchLastClosedLedger(Ledger::pointer newLedger, bool normal);
void switchLastClosedLedger(Ledger::pointer newLedger); // Used for the "jump" case
void beginConsensus(Ledger::pointer closingLedger);
void setStateTimer(int seconds);
};

View File

@@ -85,6 +85,8 @@ protected:
void recvGetLedger(newcoin::TMGetLedger& packet);
void recvLedger(newcoin::TMLedgerData& packet);
void recvStatus(newcoin::TMStatusChange& packet);
void recvPropose(newcoin::TMProposeSet& packet);
void recvHaveTxSet(newcoin::TMHaveTransactionSet& packet);
void getSessionCookie(std::string& strDst);

View File

@@ -83,7 +83,7 @@ enum NodeStatus {
}
enum NodeEvent {
neCLOSED_LEDGER = 1; // closing a ledger because its close time has come
neCLOSING_LEDGER = 1; // closing a ledger because its close time has come
neACCEPTED_LEDGER = 2; // accepting a closed ledger, we have finished computing it
neSWITCHED_LEDGER = 3; // switching ledgers due to network consensus
neSHUTTING_DOWN = 4;
@@ -91,7 +91,7 @@ enum NodeEvent {
message TMStatusChange {
optional NodeStatus newStatus = 1;
optional NodeEvent newEvent = 2;
optional NodeEvent newEvent = 2;
optional uint32 ledgerSeq = 3;
optional bytes ledgerHash = 4;
optional bytes previousLedgerHash = 5;
@@ -99,43 +99,31 @@ message TMStatusChange {
}
message TMPeerPosition {
required uint32 ledgerSequence = 1;
required bytes pubKey = 2;
required uint32 sequence = 3;
required bytes transactionHash = 4;
required bytes signature = 5;
// Announce to the network our position on a closing ledger
message TMProposeSet {
required uint32 closingSeq = 1;
required uint32 proposeSeq = 2;
required bytes previousTxHash = 3; // 0 if first proposal, hash we no longer propose
required bytes currentTxHash = 4; // the hash of the ledger we are proposing
required bytes nodePubKey = 5;
required bytes signature = 6; // signature of above fields
repeated bytes addedTransactions = 7; // not required if number is large
repeated bytes removedTransactions = 8; // not required if number is large
}
// Announce to a peer that we have fully acquired a transaction set
message TMHaveTransactionSet {
repeated bytes hashes = 1;
}
message TMProposeLedger {
required uint32 closingSeq = 1;
required uint32 proposeSeq = 2;
required bytes previousLedgerHash = 3; // 0 if first proposal, hash we no longer propose
required bytes currentLedgerHash = 4; // the hash of the ledger we are proposing
required bytes hanko = 5;
repeated bytes addedTransactions = 6;
repeated bytes removedTransactions = 7;
required bytes signature = 8;
}
// Used to propose/validate during ledger close
// Used to sign a final closed ledger after reprocessing
message TMValidation {
required uint32 ledgerIndex = 1;
required bytes ledgerHash = 2;
optional uint64 timestamp = 3; // only in proposed ledgers
optional uint32 confidence = 4; // only in proposed ledgers
required bytes hanko = 5;
required bytes sig = 6;
required bytes validation = 1; // in SerializedValidation form
required bytes signature = 2;
}
message TMGetValidations {
required uint32 ledgerIndex = 1;
repeated bytes hanko = 2;