A few small bugfixes and some exra logging to track down a sync bug that Jed reported.

This commit is contained in:
JoelKatz
2012-10-03 22:23:32 -07:00
parent fca0f6fffc
commit 39cb1899d0
8 changed files with 62 additions and 28 deletions

View File

@@ -227,8 +227,9 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult)
// YYY: Should probably do this in the background.
// YYY: Might end up sending to disconnected peer?
void ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg)
int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg)
{
int sentTo = 0;
boost::mutex::scoped_lock sl(mPeerLock);
BOOST_FOREACH(naPeer pair, mConnectedMap)
@@ -237,8 +238,13 @@ void ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer&
if (!peer)
std::cerr << "CP::RM null peer in list" << std::endl;
else if ((!fromPeer || !(peer.get() == fromPeer)) && peer->isConnected())
{
++sentTo;
peer->sendPacket(msg);
}
}
return sentTo;
}
// Schedule a connection via scanning.

View File

@@ -58,7 +58,7 @@ public:
void start();
// Send message to network.
void relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg);
int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg);
// Manual connection request.
// Queue for immediate scanning.

View File

@@ -267,7 +267,8 @@ void PeerSet::sendRequest(const newcoin::TMGetLedger& tmGL, Peer::ref peer)
void PeerSet::sendRequest(const newcoin::TMGetLedger& tmGL)
{
boost::recursive_mutex::scoped_lock sl(mLock);
if (mPeers.empty()) return;
if (mPeers.empty())
return;
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tmGL, newcoin::mtGET_LEDGER);

View File

@@ -50,9 +50,13 @@ boost::weak_ptr<PeerSet> TransactionAcquire::pmDowncast()
void TransactionAcquire::trigger(Peer::ref peer, bool timer)
{
if (mComplete || mFailed)
{
Log(lsINFO) << "complete or failed";
return;
}
if (!mHaveRoot)
{
Log(lsINFO) << "have no root";
newcoin::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
tmGL.set_itype(newcoin::liTS_CANDIDATE);

View File

@@ -25,7 +25,7 @@
NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster) :
mMode(omDISCONNECTED),mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0),
mLastCloseProposers(0), mLastCloseConvergeTime(LEDGER_IDLE_INTERVAL), mLastValidationTime(0)
mLastCloseProposers(0), mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0)
{
}
@@ -114,7 +114,18 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
}
TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER);
if (r == tefFAILURE) throw Fault(IO_ERROR);
#ifdef DEBUG
if (r != tesSUCCESS)
{
std::string token, human;
if (transResultInfo(r, token, human))
Log(lsINFO) << "TransactionResult: " << token << ": " << human;
}
#endif
if (r == tefFAILURE)
throw Fault(IO_ERROR);
if (r == terPRE_SEQ)
{ // transaction should be held
@@ -150,7 +161,8 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
tx.set_receivetimestamp(getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, newcoin::mtTRANSACTION);
theApp->getConnectionPool().relayMessage(source, packet);
int sentTo = theApp->getConnectionPool().relayMessage(source, packet);
Log(lsINFO) << "Transaction relayed to " << sentTo << " node(s)";
return trans;
}
@@ -629,6 +641,24 @@ int NetworkOPs::beginConsensus(const uint256& networkClosed, Ledger::pointer clo
return mConsensus->startup();
}
bool NetworkOPs::haveConsensusObject()
{
if (mConsensus)
return true;
if (mMode != omFULL)
return false;
uint256 networkClosed;
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
bool ledgerChange = checkLastClosedLedger(peerList, networkClosed);
if (!ledgerChange)
{
Log(lsWARNING) << "Beginning consensus due to peer action";
beginConsensus(networkClosed, theApp->getMasterLedger().getCurrentLedger());
}
return mConsensus;
}
// <-- bool: true to relay
bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger,
uint32 closeTime, const std::string& pubKey, const std::string& signature, const NewcoinAddress& nodePublic)
@@ -651,18 +681,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons
NewcoinAddress naPeerPublic = NewcoinAddress::createNodePublic(strCopy(pubKey));
if ((!mConsensus) && (mMode == omFULL))
{
uint256 networkClosed;
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
bool ledgerChange = checkLastClosedLedger(peerList, networkClosed);
if (!ledgerChange)
{
Log(lsWARNING) << "Beginning consensus due to proposal from peer";
beginConsensus(networkClosed, theApp->getMasterLedger().getCurrentLedger());
}
}
if (!mConsensus)
if (!haveConsensusObject())
{
Log(lsINFO) << "Received proposal outside consensus window";
return mMode != omFULL;
@@ -704,7 +723,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons
SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash)
{
if (!mConsensus)
if (!haveConsensusObject())
return SHAMap::pointer();
return mConsensus->getTransactionTree(hash, false);
}
@@ -712,21 +731,24 @@ SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash)
bool NetworkOPs::gotTXData(const boost::shared_ptr<Peer>& peer, const uint256& hash,
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData)
{
if (!mConsensus)
if (!haveConsensusObject())
return false;
return mConsensus->peerGaveNodes(peer, hash, nodeIDs, nodeData);
}
bool NetworkOPs::hasTXSet(const boost::shared_ptr<Peer>& peer, const uint256& set, newcoin::TxSetStatus status)
{
if (!mConsensus)
if (!haveConsensusObject())
{
Log(lsINFO) << "Peer has TX set, not during consensus";
return false;
}
return mConsensus->peerHasSet(peer, set, status);
}
void NetworkOPs::mapComplete(const uint256& hash, SHAMap::ref map)
{
if (mConsensus)
if (!haveConsensusObject())
mConsensus->mapComplete(hash, map, true);
}
@@ -832,6 +854,11 @@ Json::Value NetworkOPs::getServerInfo()
if (!theConfig.VALIDATION_SEED.isValid()) info["serverState"] = "none";
else info["validationPKey"] = NewcoinAddress::createNodePublic(theConfig.VALIDATION_SEED).humanNodePublic();
Json::Value lastClose = Json::objectValue;
lastClose["proposers"] = theApp->getOPs().getPreviousProposers();
lastClose["convergeTime"] = theApp->getOPs().getPreviousConvergeTime();
info["lastClose"] = lastClose;
if (mConsensus)
info["consensus"] = mConsensus->getJson();

View File

@@ -84,6 +84,7 @@ protected:
Json::Value transJson(const SerializedTransaction& stTxn, TER terResult, const std::string& strStatus, int iSeq, const std::string& strType);
void pubTransactionAll(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, const char* pState);
void pubTransactionAccounts(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, const char* pState);
bool haveConsensusObject();
Json::Value pubBootstrapAccountInfo(Ledger::ref lpAccepted, const NewcoinAddress& naAccountID);

View File

@@ -937,7 +937,6 @@ void Peer::recvGetLedger(newcoin::TMGetLedger& packet)
if (packet.itype() == newcoin::liTS_CANDIDATE)
{ // Request is for a transaction candidate set
Log(lsINFO) << "Received request for TX candidate set data " << getIP();
Ledger::pointer ledger;
if ((!packet.has_ledgerhash() || packet.ledgerhash().size() != 32))
{
punishPeer(PP_INVALID_REQUEST);
@@ -948,7 +947,7 @@ void Peer::recvGetLedger(newcoin::TMGetLedger& packet)
map = theApp->getOPs().getTXMap(txHash);
if (!map)
{
Log(lsERROR) << "We do not hav the map our peer wants";
Log(lsERROR) << "We do not have the map our peer wants";
punishPeer(PP_INVALID_REQUEST);
return;
}

View File

@@ -345,11 +345,7 @@ TER TransactionEngine::applyTransaction(const SerializedTransaction& txn, Transa
terResult = terPRE_SEQ;
}
else if (mLedger->hasTransaction(txID))
{
Log(lsWARNING) << "applyTransaction: duplicate sequence number";
terResult = tefALREADY;
}
else
{
Log(lsWARNING) << "applyTransaction: past sequence number";