partial fix for joining the network

This commit is contained in:
jed
2012-06-21 10:20:11 -07:00
parent 04d891bfd9
commit c62cbeadf0
4 changed files with 74 additions and 53 deletions

View File

@@ -291,11 +291,11 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort)
if (ppResult)
{
Log(lsINFO) << "Pool: Connecting: " << ADDRESS_SHARED(ppResult) << ": " << strIp << " " << iPort;
//Log(lsINFO) << "Pool: Connecting: " << ADDRESS_SHARED(ppResult) << ": " << strIp << " " << iPort;
}
else
{
Log(lsINFO) << "Pool: Already connected: " << strIp << " " << iPort;
//Log(lsINFO) << "Pool: Already connected: " << strIp << " " << iPort;
}
return ppResult;
@@ -352,7 +352,7 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naP
if (itCm == mConnectedMap.end())
{
// New connection.
Log(lsINFO) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
//Log(lsINFO) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
mConnectedMap[naPeer] = peer;
bNew = true;
@@ -365,7 +365,7 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naP
if (itCm->second->getIP().empty())
{
// Old peer did not know it's IP.
Log(lsINFO) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort;
//Log(lsINFO) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort;
itCm->second->setIpPort(strIP, iPort);
@@ -375,14 +375,14 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naP
else
{
// Old peer knew its IP. Do nothing.
Log(lsINFO) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
//Log(lsINFO) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
nothing();
}
}
else
{
Log(lsINFO) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
//Log(lsINFO) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
nothing();
}
@@ -419,12 +419,12 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress&
// Found it. Delete it.
mConnectedMap.erase(itCm);
Log(lsINFO) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();
//Log(lsINFO) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();
}
}
else
{
Log(lsINFO) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort();
//Log(lsINFO) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort();
}
}
@@ -450,8 +450,8 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval);
Log(lsINFO) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)")
% mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
//Log(lsINFO) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;")
% iToSeconds(tpNext)
@@ -466,13 +466,13 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext"));
Log(lsINFO) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)")
% mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
//Log(lsINFO) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
}
}
else
{
Log(lsWARNING) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort;
//Log(lsWARNING) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort;
}
return bScanDirty;
@@ -487,7 +487,7 @@ void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, in
// If the connection was our scan, we are no longer scanning.
if (mScanning && mScanning == peer)
{
Log(lsINFO) << "Pool: Scan: scan fail: " << strIp << " " << iPort;
//Log(lsINFO) << "Pool: Scan: scan fail: " << strIp << " " << iPort;
mScanning = Peer::pointer(); // No longer scanning.
bScanRefresh = true; // Look for more to scan.
@@ -510,7 +510,7 @@ void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, in
else if (mIpMap[ipPeer] == peer)
{
// We were the identified connection.
Log(lsINFO) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
//Log(lsINFO) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// Delete our entry.
mIpMap.erase(itIp);
@@ -519,8 +519,8 @@ void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, in
}
else
{
// Found it. But, we were redundent.
Log(lsINFO) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// Found it. But, we were redundant.
//Log(lsINFO) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
}
}

View File

@@ -6,7 +6,9 @@
#include <boost/bind.hpp>
#include "Application.h"
#include "Log.h"
#define LA_DEBUG
#define LEDGER_ACQUIRE_TIMEOUT 2
PeerSet::PeerSet(const uint256& hash, int interval) : mHash(hash), mTimerInterval(interval),
@@ -69,8 +71,8 @@ void PeerSet::TimerEntry(boost::weak_ptr<PeerSet> wptr, const boost::system::err
LedgerAcquire::LedgerAcquire(const uint256& hash) : PeerSet(hash, LEDGER_ACQUIRE_TIMEOUT),
mFilter(&theApp->getNodeCache()), mHaveBase(false), mHaveState(false), mHaveTransactions(false)
{
#ifdef DEBUG
std::cerr << "Acquiring ledger " << mHash.GetHex() << std::endl;
#ifdef LA_DEBUG
Log(lsTRACE) << "Acquiring ledger " << mHash.GetHex();
#endif
}
@@ -81,8 +83,8 @@ boost::weak_ptr<PeerSet> LedgerAcquire::pmDowncast()
void LedgerAcquire::done()
{
#ifdef DEBUG
std::cerr << "Done acquiring ledger " << mHash.GetHex() << std::endl;
#ifdef LA_DEBUG
Log(lsTRACE) << "Done acquiring ledger " << mHash.GetHex();
#endif
std::vector< boost::function<void (LedgerAcquire::pointer)> > triggers;
@@ -106,16 +108,17 @@ void LedgerAcquire::addOnComplete(boost::function<void (LedgerAcquire::pointer)>
void LedgerAcquire::trigger(Peer::pointer peer)
{
#ifdef LA_DEBUG
std::cerr << "Trigger acquiring ledger " << mHash.GetHex() << std::endl;
std::cerr << "complete=" << mComplete << " failed=" << mFailed << std::endl;
std::cerr << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState << std::endl;
if(peer) Log(lsTRACE) << "Trigger acquiring ledger " << mHash.GetHex() << " from " << peer->getIP();
else Log(lsTRACE) << "Trigger acquiring ledger " << mHash.GetHex();
Log(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed;
Log(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState;
#endif
if (mComplete || mFailed)
return;
if (!mHaveBase)
{
#ifdef LA_DEBUG
std::cerr << "need base" << std::endl;
Log(lsTRACE) << "need base";
#endif
newcoin::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
@@ -131,7 +134,7 @@ void LedgerAcquire::trigger(Peer::pointer peer)
if (mHaveBase && !mHaveTransactions)
{
#ifdef LA_DEBUG
std::cerr << "need tx" << std::endl;
Log(lsTRACE) << "need tx";
#endif
assert(mLedger);
if (mLedger->peekTransactionMap()->getHash().isZero())
@@ -183,7 +186,7 @@ void LedgerAcquire::trigger(Peer::pointer peer)
if (mHaveBase && !mHaveState)
{
#ifdef LA_DEBUG
std::cerr << "need as" << std::endl;
Log(lsTRACE) << "need as";
#endif
assert(mLedger);
if (mLedger->peekAccountStateMap()->getHash().isZero())
@@ -268,15 +271,15 @@ void PeerSet::sendRequest(const newcoin::TMGetLedger& tmGL)
bool LedgerAcquire::takeBase(const std::string& data, Peer::pointer peer)
{ // Return value: true=normal, false=bad data
#ifdef LA_DEBUG
std::cerr << "got base acquiring ledger " << mHash.GetHex() << std::endl;
Log(lsTRACE) << "got base acquiring ledger " << mHash.GetHex();
#endif
boost::recursive_mutex::scoped_lock sl(mLock);
if (mHaveBase) return true;
mLedger = boost::make_shared<Ledger>(data);
if (mLedger->getHash() != mHash)
{
std::cerr << "Acquire hash mismatch" << std::endl;
std::cerr << mLedger->getHash().GetHex() << "!=" << mHash.GetHex() << std::endl;
Log(lsWARNING) << "Acquire hash mismatch";
Log(lsWARNING) << mLedger->getHash().GetHex() << "!=" << mHash.GetHex();
mLedger = Ledger::pointer();
return false;
}
@@ -319,7 +322,7 @@ bool LedgerAcquire::takeAsNode(const std::list<SHAMapNode>& nodeIDs,
const std::list< std::vector<unsigned char> >& data, Peer::pointer peer)
{
#ifdef LA_DEBUG
std::cerr << "got ASdata acquiring ledger " << mHash.GetHex() << std::endl;
Log(lsTRACE) << "got ASdata acquiring ledger " << mHash.GetHex();
#endif
if (!mHaveBase) return false;
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin();
@@ -379,7 +382,7 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash)
bool LedgerAcquireMaster::gotLedgerData(newcoin::TMLedgerData& packet, Peer::pointer peer)
{
#ifdef LA_DEBUG
std::cerr << "got data for acquiring ledger ";
Log(lsTRACE) << "got data for acquiring ledger ";
#endif
uint256 hash;
if (packet.ledgerhash().size() != 32)
@@ -389,7 +392,7 @@ bool LedgerAcquireMaster::gotLedgerData(newcoin::TMLedgerData& packet, Peer::poi
}
memcpy(hash.begin(), packet.ledgerhash().data(), 32);
#ifdef LA_DEBUG
std::cerr << hash.GetHex() << std::endl;
Log(lsTRACE) << hash.GetHex();
#endif
LedgerAcquire::pointer ledger = find(hash);

View File

@@ -360,6 +360,7 @@ bool NetworkOPs::checkLastClosedLedger(const std::vector<Peer::pointer>& peerLis
// FIXME: We may have a ledger with many recent validations but that no directly-connected
// node is using. THis is kind of fundamental.
Log(lsTRACE) << "NetworkOPs::checkLastClosedLedger";
boost::unordered_map<uint256, ValidationCount> ledgers;
@@ -391,7 +392,7 @@ bool NetworkOPs::checkLastClosedLedger(const std::vector<Peer::pointer>& peerLis
vc.highNode = (*it)->getNodePublic();
++vc.nodesUsing;
}
else Log(lsTRACE) << "Connected peer announces no LCL";
else Log(lsTRACE) << "Connected peer announces no LCL " << (*it)->getIP();
}
}
@@ -432,13 +433,26 @@ bool NetworkOPs::checkLastClosedLedger(const std::vector<Peer::pointer>& peerLis
}
if (!acq->isComplete())
{ // add more peers
//JED: seems like you need to do something here so it knows in beginConsensus that it isn't on the the right ledger
// switch to an empty ledger so we won't keep going
//Ledger::pointer emptyLedger(new Ledger());
//switchLastClosedLedger(emptyLedger);
// JED: just ask everyone
std::vector<Peer::pointer> peers=theApp->getConnectionPool().getPeerVector();
for(int n=0; n<peers.size(); n++)
{
if(peers[n]->isConnected()) acq->peerHas(peers[n]);
}
/* JED this wasn't adding any peers. This is also an optimization so we can do this later
// FIXME: A peer may not have a ledger just because it accepts it as the network's consensus
for (std::vector<Peer::pointer>::const_iterator it = peerList.begin(), end = peerList.end();
it != end; ++it)
{
if ((*it)->getClosedLedgerHash() == closedLedger)
acq->peerHas(*it);
}
}*/
return true;
}
consensus = acq->getLedger();
@@ -504,6 +518,8 @@ int NetworkOPs::beginConsensus(Ledger::pointer closingLedger)
bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash,
const std::string& pubKey, const std::string& signature)
{
// JED: does mConsensus need to be locked?
// XXX Validate key.
// XXX Take a vuc for pubkey.
NewcoinAddress naPeerPublic = NewcoinAddress::createNodePublic(strCopy(pubKey));
@@ -520,11 +536,8 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash,
return false;
}
boost::shared_ptr<LedgerConsensus> consensus = mConsensus;
if (!consensus) return false;
LedgerProposal::pointer proposal =
boost::make_shared<LedgerProposal>(consensus->getLCL(), proposeSeq, proposeHash, naPeerPublic);
boost::make_shared<LedgerProposal>(mConsensus->getLCL(), proposeSeq, proposeHash, naPeerPublic);
if (!proposal->checkSign(signature))
{ // Note that if the LCL is different, the signature check will fail
Log(lsWARNING) << "Ledger proposal fails signature check";
@@ -540,7 +553,8 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash,
return true;
}
return consensus->peerPosition(proposal);
return mConsensus->peerPosition(proposal);
}
SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash)
@@ -626,6 +640,8 @@ std::vector< std::pair<uint32, uint256> >
bool NetworkOPs::recvValidation(SerializedValidation::pointer val)
{
Log(lsINFO) << "recvValidation " << val->getLedgerHash().GetHex();
return theApp->getValidations().addValidation(val);
}

View File

@@ -77,11 +77,12 @@ void Peer::detach(const char *rsn)
if (!mDetaching)
{
mDetaching = true; // Race is ok.
/*
Log(lsDEBUG) << "Peer: Detach: "
<< ADDRESS(this) << "> "
<< rsn << ": "
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
*/
mSendQ.clear();
@@ -92,7 +93,7 @@ void Peer::detach(const char *rsn)
{
theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic);
mNodePublic.clear(); // Be idompotent.
mNodePublic.clear(); // Be idempotent.
}
if (!mIpPort.first.empty())
@@ -101,13 +102,14 @@ void Peer::detach(const char *rsn)
// Might need to scan. Inform connection closed.
theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second);
mIpPort.first.clear(); // Be idompotent.
mIpPort.first.clear(); // Be idempotent.
}
/*
Log(lsDEBUG) << "Peer: Detach: "
<< ADDRESS(this) << "< "
<< rsn << ": "
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
*/
}
}
@@ -129,7 +131,7 @@ void Peer::handleVerifyTimer(const boost::system::error_code& ecResult)
}
else
{
Log(lsINFO) << "Peer: Verify: Peer failed to verify in time.";
//Log(lsINFO) << "Peer: Verify: Peer failed to verify in time.";
detach("hvt");
}
@@ -787,7 +789,7 @@ void Peer::recvGetPeers(newcoin::TMGetPeers& packet)
addr->set_ipv4(inet_addr(strIP.c_str()));
addr->set_ipv4port(iPort);
Log(lsINFO) << "Peer: Teaching: " << ADDRESS(this) << ": " << n << ": " << strIP << " " << iPort;
//Log(lsINFO) << "Peer: Teaching: " << ADDRESS(this) << ": " << n << ": " << strIP << " " << iPort;
}
PackedMessage::pointer message = boost::make_shared<PackedMessage>(peers, newcoin::mtPEERS);
@@ -809,7 +811,7 @@ void Peer::recvPeers(newcoin::TMPeers& packet)
if (strIP != "0.0.0.0" && strIP != "127.0.0.1")
{
Log(lsINFO) << "Peer: Learning: " << ADDRESS(this) << ": " << i << ": " << strIP << " " << iPort;
//Log(lsINFO) << "Peer: Learning: " << ADDRESS(this) << ": " << i << ": " << strIP << " " << iPort;
theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsTold);
}
@@ -850,14 +852,14 @@ void Peer::recvAccount(newcoin::TMAccount& packet)
void Peer::recvStatus(newcoin::TMStatusChange& packet)
{
Log(lsTRACE) << "Received status change from peer";
Log(lsTRACE) << "Received status change from peer" << getIP();
if (!packet.has_networktime())
packet.set_networktime(theApp->getOPs().getNetworkTimeNC());
mLastStatus = packet;
if (packet.newevent() == newcoin::neLOST_SYNC)
{
Log(lsTRACE) << "peer has lost sync";
Log(lsTRACE) << "peer has lost sync" << getIP();
mPreviousLedgerHash.zero();
mClosedLedgerHash.zero();
return;
@@ -866,11 +868,11 @@ void Peer::recvStatus(newcoin::TMStatusChange& packet)
{ // a peer has changed ledgers
memcpy(mClosedLedgerHash.begin(), packet.ledgerhash().data(), 256 / 8);
mClosedLedgerTime = ptFromSeconds(packet.networktime());
Log(lsTRACE) << "peer LCL is " << mClosedLedgerHash.GetHex();
Log(lsTRACE) << "peer LCL is " << mClosedLedgerHash.GetHex() << " " << getIP();
}
else
{
Log(lsTRACE) << "peer has no ledger hash";
Log(lsTRACE) << "peer has no ledger hash" << getIP();
mClosedLedgerHash.zero();
}
@@ -889,7 +891,7 @@ void Peer::recvGetLedger(newcoin::TMGetLedger& packet)
if (packet.itype() == newcoin::liTS_CANDIDATE)
{ // Request is for a transaction candidate set
Log(lsINFO) << "Received request for TX candidate set data";
Log(lsINFO) << "Received request for TX candidate set data " << getIP();
Ledger::pointer ledger;
if ((!packet.has_ledgerhash() || packet.ledgerhash().size() != 32))
{
@@ -912,7 +914,7 @@ void Peer::recvGetLedger(newcoin::TMGetLedger& packet)
}
else
{ // Figure out what ledger they want
Log(lsINFO) << "Received request for ledger data";
Log(lsINFO) << "Received request for ledger data " << getIP();
Ledger::pointer ledger;
if (packet.has_ledgerhash())
{