Commit some ledger close/consensus work.

This commit is contained in:
JoelKatz
2012-05-21 06:05:32 -07:00
parent 4e1f435655
commit 9a4862a5d9
6 changed files with 101 additions and 47 deletions

View File

@@ -5,6 +5,12 @@ void LedgerConsensus::abort()
{
}
void LedgerConsensus::startup()
int LedgerConsensus::startup()
{
return 5;
}
int LedgerConsensus::timerEntry()
{
return 5;
}

View File

@@ -17,14 +17,21 @@ class LCPosition
protected:
uint256 mPubKeyHash;
CKey::pointer mPubKey;
uint256 mCurrentPosition;
uint256 mPreviousPosition, mCurrentPosition;
uint32 mSequence;
public:
typedef boost::shared_ptr<LCPosition> pointer;
LCPosition(CKey::pointer pubKey, const uint256& currentPosition, uint32 seq) :
mPubKey(pubKey), mCurrentPosition(currentPosition), mSequence(seq) { ; }
// for remote positions
LCPosition(uint32 closingSeq, uint32 proposeSeq, const uint256& previousTxHash,
const uint256& currentTxHash, CKey::pointer nodePubKey, const std::string& signature);
// for our initial position
LCPosition(CKey::pointer privKey, uint32 ledgerSeq, const uint256& currentPosition);
// for our subsequent positions
LCPosition(LCPosition::pointer previousPosition, CKey::pointer privKey, const uint256& newPosition);
const uint256& getPubKeyHash() const { return mPubKeyHash; }
const uint256& getCurrentPosition() const { return mCurrentPosition; }
@@ -68,6 +75,8 @@ protected:
Ledger::pointer mPreviousLedger, mCurrentLedger;
LedgerProposal::pointer mCurrentProposal;
LCPosition::pointer mOurPosition;
// Convergence tracking, trusted peers indexed by hash of public key
boost::unordered_map<uint256, LCPosition::pointer> mPeerPositions;
@@ -78,13 +87,13 @@ protected:
// Peer sets
boost::unordered_map<uint256, std::vector< boost::weak_ptr<Peer> > > mPeerData;
void startup();
void weHave(const uint256& id, Peer::pointer avoidPeer);
public:
LedgerConsensus(Ledger::pointer previousLedger, Ledger::pointer currentLedger) :
mPreviousLedger(previousLedger), mCurrentLedger(currentLedger)
{ startup(); }
mPreviousLedger(previousLedger), mCurrentLedger(currentLedger) { ; }
int startup();
Ledger::pointer peekPreviousLedger() { return mPreviousLedger; }
Ledger::pointer peekCurrentLedger() { return mCurrentLedger; }
@@ -103,6 +112,7 @@ public:
bool peerHasSet(Peer::pointer peer, const std::vector<uint256>& sets);
bool peerGaveNodes(Peer::pointer peer, const uint256& setHash,
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData);
int timerEntry(void);
};

View File

@@ -310,9 +310,18 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
// check if the ledger is bad enough to go to omTRACKING
}
if (mConsensus)
{
setStateTimer(mConsensus->timerEntry());
return;
}
Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger();
if ( (getNetworkTimeNC() >= currentLedger->getCloseTimeNC()) && !mConsensus)
beginConsensus(currentLedger);
if (getNetworkTimeNC() >= currentLedger->getCloseTimeNC())
{
setStateTimer(beginConsensus(currentLedger));
return;
}
setStateTimer(10);
}
@@ -321,7 +330,8 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger)
{ // set the newledger as our last closed ledger -- this is abnormal code
#ifdef DEBUG
std::cerr << "Switching last closed ledger to " << newLedger->getHash().GetHex() << std::endl;
assert(false);
std::cerr << "ABNORMAL Switching last closed ledger to " << newLedger->getHash().GetHex() << std::endl;
#endif
if (mConsensus)
@@ -341,18 +351,16 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger)
}
// vim:ts=4
void NetworkOPs::beginConsensus(Ledger::pointer closingLedger)
int NetworkOPs::beginConsensus(Ledger::pointer closingLedger)
{
if (mMode != omFULL)
{ // We just close this ledger and start a new one
switchLastClosedLedger(closingLedger);
return;
}
#ifdef DEBUG
std::cerr << "Ledger close time for ledger " << closingLedger->getLedgerSeq() << std::endl;
#endif
Ledger::pointer prevLedger = theApp->getMasterLedger().getLedgerByHash(closingLedger->getParentHash());
if (!prevLedger)
{ // this shouldn't happen if we jump ledgers
mMode = omTRACKING;
return;
return 3;
}
// Create a new ledger to be the open ledger
@@ -374,4 +382,22 @@ void NetworkOPs::beginConsensus(Ledger::pointer closingLedger)
PackedMessage::pointer packet =
boost::make_shared<PackedMessage>(PackedMessage::MessagePointer(s), newcoin::mtSTATUS_CHANGE);
theApp->getConnectionPool().relayMessage(NULL, packet);
return mConsensus->startup();
}
bool NetworkOPs::proposeLedger(uint32 closingSeq, uint32 proposeSeq,
const uint256& prevHash, const uint256& proposeHash, const std::string& pubKey, const std::string& signature)
{
uint256 nodeID = Serializer::getSHA512Half(pubKey);
// Is this node on our UNL?
// WRITEME
// Are we currently closing?
// Yes: Is it an update?
// WRITEME
return true;
}

View File

@@ -70,10 +70,14 @@ public:
bool getAccountStateNodes(uint32 ledgerSeq, const uint256& myNodeId,
const std::vector<unsigned char>& myNode, std::list<std::vector<unsigned char> >& newNodes);
// ledger proposal/close functions
bool proposeLedger(uint32 closingSeq, uint32 proposeSeq, const uint256& prevHash, const uint256& proposeHash,
const std::string& pubKey, const std::string& signature);
// network state machine
void checkState(const boost::system::error_code& result);
void switchLastClosedLedger(Ledger::pointer newLedger); // Used for the "jump" case
void beginConsensus(Ledger::pointer closingLedger);
int beginConsensus(Ledger::pointer closingLedger);
void setStateTimer(int seconds);
};

View File

@@ -310,7 +310,7 @@ void Peer::handle_read_body(const boost::system::error_code& error)
void Peer::processReadBuffer()
{
int type=PackedMessage::getType(mReadbuf);
int type = PackedMessage::getType(mReadbuf);
#ifdef DEBUG
std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-HEADER_SIZE) << std::endl;
#endif
@@ -417,6 +417,15 @@ void Peer::processReadBuffer()
}
break;
case newcoin::mtPROPOSE_LEDGER:
{
boost::shared_ptr<newcoin::TMProposeSet> msg = boost::make_shared<newcoin::TMProposeSet>();
if(msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvPropose(msg);
else std::cerr << "parse error: " << type << std::endl;
}
break;
case newcoin::mtGET_LEDGER:
@@ -438,15 +447,6 @@ void Peer::processReadBuffer()
break;
#if 0
case newcoin::mtPROPOSE_LEDGER:
{
newcoin::TM msg;
if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recv(msg);
else std::cerr << "parse error: " << type << std::endl;
}
break;
case newcoin::mtCLOSE_LEDGER:
{
newcoin::TM msg;
@@ -593,7 +593,7 @@ void Peer::recvTransaction(newcoin::TMTransaction& packet)
tx = theApp->getOPs().processTransaction(tx, this);
if(tx->getStatus()!=INCLUDED)
if(tx->getStatus() != INCLUDED)
{ // transaction wasn't accepted into ledger
#ifdef DEBUG
std::cerr << "Transaction from peer won't go in ledger" << std::endl;
@@ -601,6 +601,25 @@ void Peer::recvTransaction(newcoin::TMTransaction& packet)
}
}
void Peer::recvPropose(boost::shared_ptr<newcoin::TMProposeSet> packet)
{
if ((packet->previoustxhash().size() != 32) || (packet->currenttxhash().size() != 32) ||
(packet->nodepubkey().size() < 28) || (packet->signature().size() < 56))
return;
uint32 closingSeq = packet->closingseq(), proposeSeq = packet->proposeseq();
uint256 previousTxHash, currentTxHash;
memcpy(previousTxHash.begin(), packet->previoustxhash().data(), 32);
memcpy(currentTxHash.begin(), packet->currenttxhash().data(), 32);
if(theApp->getOPs().proposeLedger(closingSeq, proposeSeq, previousTxHash, currentTxHash,
packet->nodepubkey(), packet->signature()))
{ // FIXME: Not all nodes will want proposals
PackedMessage::pointer message = boost::make_shared<PackedMessage>(packet, newcoin::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessage(this, message);
}
}
void Peer::recvValidation(newcoin::TMValidation& packet)
{
}
@@ -654,6 +673,10 @@ void Peer::recvStatus(newcoin::TMStatusChange& packet)
#ifdef DEBUG
std::cerr << "Received status change from peer" << std::endl;
#endif
if (!packet.has_networktime())
packet.set_networktime(theApp->getOPs().getNetworkTimeNC());
mLastStatus = packet;
if (packet.has_ledgerhash() && (packet.ledgerhash().size() == (256 / 8)))
{ // a peer has changed ledgers
if (packet.has_previousledgerhash() && (packet.previousledgerhash().size() == (256 / 8)))
@@ -661,10 +684,7 @@ void Peer::recvStatus(newcoin::TMStatusChange& packet)
else
mPreviousLedgerHash = mClosedLedgerHash;
memcpy(mClosedLedgerHash.begin(), packet.ledgerhash().data(), 256 / 8);
if (packet.has_networktime())
mClosedLedgerTime = ptFromSeconds(packet.networktime());
else
mClosedLedgerTime = theApp->getOPs().getNetworkTimePT();
mClosedLedgerTime = ptFromSeconds(packet.networktime());
#ifdef DEBUG
std::cerr << "peer LCL is " << mClosedLedgerHash.GetHex() << std::endl;
#endif
@@ -869,19 +889,6 @@ PackedMessage::pointer Peer::createFullLedger(Ledger::pointer ledger)
return(PackedMessage::pointer());
}*/
PackedMessage::pointer Peer::createLedgerProposal(Ledger::pointer ledger)
{
uint256& hash=ledger->getHash();
newcoin::ProposeLedger* prop=new newcoin::ProposeLedger();
prop->set_ledgerindex(ledger->getIndex());
prop->set_hash(hash.begin(), hash.GetSerializeSize());
prop->set_numtransactions(ledger->getNumTransactions());
PackedMessage::pointer packet=boost::make_shared<PackedMessage>
(PackedMessage::MessagePointer(prop), newcoin::PROPOSE_LEDGER);
return(packet);
}
PackedMessage::pointer Peer::createValidation(Ledger::pointer ledger)
{
uint256 hash=ledger->getHash();

View File

@@ -53,6 +53,7 @@ protected:
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
newcoin::TMStatusChange mLastStatus;
Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx);
@@ -85,7 +86,7 @@ protected:
void recvGetLedger(newcoin::TMGetLedger& packet);
void recvLedger(newcoin::TMLedgerData& packet);
void recvStatus(newcoin::TMStatusChange& packet);
void recvPropose(newcoin::TMProposeSet& packet);
void recvPropose(boost::shared_ptr<newcoin::TMProposeSet> packet);
void recvHaveTxSet(newcoin::TMHaveTransactionSet& packet);
void getSessionCookie(std::string& strDst);