Wrap all peer socket accesses in a strand. This is needed to prevent a fatal

race condition in composed intermediary functions.

A few ledger acquire cleanups.
This commit is contained in:
JoelKatz
2013-03-12 16:53:30 -07:00
parent c85fc7ef64
commit 2248d92a33
5 changed files with 87 additions and 95 deletions

View File

@@ -241,7 +241,7 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m
if ((!fromPeer || !(peer.get() == fromPeer)) && peer->isConnected())
{
++sentTo;
peer->sendPacket(msg);
peer->sendPacket(msg, false);
}
}
@@ -254,7 +254,7 @@ void ConnectionPool::relayMessageBut(const std::set<uint64>& fromPeers, const Pa
BOOST_FOREACH(Peer::ref peer, peerVector)
{
if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0))
peer->sendPacket(msg);
peer->sendPacket(msg, false);
}
}
@@ -265,7 +265,7 @@ void ConnectionPool::relayMessageTo(const std::set<uint64>& fromPeers, const Pac
BOOST_FOREACH(Peer::ref peer, peerVector)
{
if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) != 0))
peer->sendPacket(msg);
peer->sendPacket(msg, false);
}
}

View File

@@ -514,6 +514,8 @@ void Ledger::saveAcceptedLedger(Job&, bool fromConsensus)
if (theApp->getJobQueue().getJobCountTotal(jtPUBOLDLEDGER) < 2)
theApp->getLedgerMaster().resumeAcquiring();
else
cLog(lsDEBUG) << "no resume, too many pending ledger saves";
}
#ifndef NO_SQLITE3_PREPARE

View File

@@ -183,7 +183,7 @@ void LedgerAcquire::onTimer(bool progress)
{
if (getTimeouts() > LEDGER_TIMEOUT_COUNT)
{
cLog(lsWARNING) << "Six timeouts for ledger " << mHash;
cLog(lsWARNING) << "Too many timeouts for ledger " << mHash;
setFailed();
done();
return;
@@ -351,7 +351,7 @@ void LedgerAcquire::trigger(Peer::ref peer)
if (iPeer)
{
mByHash = false;
iPeer->sendPacket(packet);
iPeer->sendPacket(packet, false);
}
}
}
@@ -485,7 +485,7 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL, Peer::ref peer)
if (!peer)
sendRequest(tmGL);
else
peer->sendPacket(boost::make_shared<PackedMessage>(tmGL, ripple::mtGET_LEDGER));
peer->sendPacket(boost::make_shared<PackedMessage>(tmGL, ripple::mtGET_LEDGER), false);
}
void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL)
@@ -499,7 +499,7 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL)
{
Peer::pointer peer = theApp->getConnectionPool().getPeerById(it->first);
if (peer)
peer->sendPacket(packet);
peer->sendPacket(packet, false);
}
}

View File

@@ -37,20 +37,19 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx,
mPrivate(false),
mLoad(""),
mSocketSsl(io_service, ctx),
mActivityTimer(io_service)
mActivityTimer(io_service),
mIOStrand(io_service)
{
cLog(lsDEBUG) << "CREATING PEER: " << ADDRESS(this);
}
void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_transferred)
{
{ // Call on IO strand
#ifdef DEBUG
// if (!error)
// std::cerr << "Peer::handleWrite bytes: "<< bytes_transferred << std::endl;
#endif
boost::recursive_mutex::scoped_lock sl(ioMutex);
mSendingPacket.reset();
if (mDetaching)
@@ -62,7 +61,7 @@ void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_tran
{
cLog(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error;
detach("hw");
detach("hw", true);
}
else if (!mSendQ.empty())
{
@@ -86,10 +85,13 @@ void Peer::setIpPort(const std::string& strIP, int iPort)
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
}
void Peer::detach(const char *rsn)
void Peer::detach(const char *rsn, bool onIOStrand)
{
boost::recursive_mutex::scoped_lock sl(ioMutex);
if (!onIOStrand)
{
mIOStrand.post(boost::bind(&Peer::detach, shared_from_this(), rsn, true));
return;
}
if (!mDetaching)
{
mDetaching = true; // Race is ok.
@@ -103,7 +105,8 @@ void Peer::detach(const char *rsn)
mSendQ.clear();
(void) mActivityTimer.cancel();
mSocketSsl.async_shutdown(boost::bind(&Peer::handleShutdown, shared_from_this(), boost::asio::placeholders::error));
mSocketSsl.async_shutdown(mIOStrand.wrap(boost::bind(&Peer::handleShutdown, shared_from_this(),
boost::asio::placeholders::error)));
if (mNodePublic.isValid())
{
@@ -130,13 +133,13 @@ void Peer::detach(const char *rsn)
}
void Peer::handlePingTimer(const boost::system::error_code& ecResult)
{
{ // called on IO strand
if (ecResult || mDetaching)
return;
if (mActive == 1)
{ // ping out
detach("pto");
detach("pto", true);
return;
}
@@ -145,14 +148,14 @@ void Peer::handlePingTimer(const boost::system::error_code& ecResult)
mActive = 1;
ripple::TMPing packet;
packet.set_type(ripple::TMPing::ptPING);
sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtPING));
sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtPING), true);
}
else // active->idle
mActive = 0;
mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS));
mActivityTimer.async_wait(boost::bind(&Peer::handlePingTimer, shared_from_this(),
boost::asio::placeholders::error));
mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(),
boost::asio::placeholders::error)));
}
@@ -173,7 +176,7 @@ void Peer::handleVerifyTimer(const boost::system::error_code& ecResult)
{
//cLog(lsINFO) << "Peer: Verify: Peer failed to verify in time.";
detach("hvt");
detach("hvt", true);
}
}
@@ -198,19 +201,19 @@ void Peer::connect(const std::string& strIp, int iPort)
if (err || itrEndpoint == boost::asio::ip::tcp::resolver::iterator())
{
cLog(lsWARNING) << "Peer: Connect: Bad IP: " << strIp;
detach("c");
detach("c", false);
return;
}
else
{
mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_VERIFY_SECONDS), err);
mActivityTimer.async_wait(boost::bind(&Peer::handleVerifyTimer, shared_from_this(),
boost::asio::placeholders::error));
mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handleVerifyTimer, shared_from_this(),
boost::asio::placeholders::error)));
if (err)
{
cLog(lsWARNING) << "Peer: Connect: Failed to set timer.";
detach("c2");
detach("c2", false);
return;
}
}
@@ -219,15 +222,14 @@ void Peer::connect(const std::string& strIp, int iPort)
{
cLog(lsINFO) << "Peer: Connect: Outbound: " << ADDRESS(this) << ": " << mIpPort.first << " " << mIpPort.second;
boost::recursive_mutex::scoped_lock sl(ioMutex);
boost::asio::async_connect(
getSocket(),
itrEndpoint,
boost::bind(
mIOStrand.wrap(boost::bind(
&Peer::handleConnect,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::iterator));
boost::asio::placeholders::iterator)));
}
}
@@ -240,7 +242,7 @@ void Peer::handleStart(const boost::system::error_code& error)
if (error)
{
cLog(lsINFO) << "Peer: Handshake: Error: " << error.category().name() << ": " << error.message() << ": " << error;
detach("hs");
detach("hs", true);
}
else
{
@@ -255,18 +257,16 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip
if (error)
{
cLog(lsINFO) << "Peer: Connect: Error: " << error.category().name() << ": " << error.message() << ": " << error;
detach("hc");
detach("hc", true);
}
else
{
cLog(lsINFO) << "Connect peer: success.";
boost::recursive_mutex::scoped_lock sl(ioMutex);
mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none);
mSocketSsl.async_handshake(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::client,
boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error));
mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)));
}
}
@@ -284,8 +284,6 @@ void Peer::connected(const boost::system::error_code& error)
if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this?
iPort = -1;
boost::recursive_mutex::scoped_lock sl(ioMutex);
if (!error)
{
// Not redundant ip and port, handshake, and start.
@@ -296,35 +294,38 @@ void Peer::connected(const boost::system::error_code& error)
mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none);
mSocketSsl.async_handshake(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::server,
boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error));
mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)));
}
else if (!mDetaching)
{
cLog(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error;
detach("ctd");
detach("ctd", false);
}
}
void Peer::sendPacketForce(const PackedMessage::pointer& packet)
{ // must hold I/O mutex
{ // must be on IO strand
if (!mDetaching)
{
mSendingPacket = packet;
boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()),
boost::bind(&Peer::handleWrite, shared_from_this(),
mIOStrand.wrap(boost::bind(&Peer::handleWrite, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
boost::asio::placeholders::bytes_transferred)));
}
}
void Peer::sendPacket(const PackedMessage::pointer& packet)
void Peer::sendPacket(const PackedMessage::pointer& packet, bool onStrand)
{
boost::recursive_mutex::scoped_lock sl(ioMutex);
if (packet)
{
if (!onStrand)
{
mIOStrand.post(boost::bind(&Peer::sendPacket, shared_from_this(), packet, true));
return;
}
if (mSendingPacket)
{
mSendQ.push_back(packet);
@@ -338,15 +339,13 @@ void Peer::sendPacket(const PackedMessage::pointer& packet)
void Peer::startReadHeader()
{
boost::recursive_mutex::scoped_lock sl(ioMutex);
if (!mDetaching)
{
mReadbuf.clear();
mReadbuf.resize(HEADER_SIZE);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf),
boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error));
boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), mIOStrand.wrap(
boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error)));
}
}
@@ -356,21 +355,17 @@ void Peer::startReadBody(unsigned msg_len)
// bytes. Expand it to fit in the body as well, and start async
// read into the body.
boost::recursive_mutex::scoped_lock sl(ioMutex);
if (!mDetaching)
{
mReadbuf.resize(HEADER_SIZE + msg_len);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len),
boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error));
mIOStrand.wrap(boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error)));
}
}
void Peer::handleReadHeader(const boost::system::error_code& error)
{
boost::recursive_mutex::scoped_lock sl(ioMutex);
if (mDetaching)
{
// Drop data or error if detaching.
@@ -382,7 +377,7 @@ void Peer::handleReadHeader(const boost::system::error_code& error)
// WRITEME: Compare to maximum message length, abort if too large
if ((msg_len > (32 * 1024 * 1024)) || (msg_len == 0))
{
detach("hrh");
detach("hrh", true);
return;
}
startReadBody(msg_len);
@@ -390,26 +385,22 @@ void Peer::handleReadHeader(const boost::system::error_code& error)
else
{
cLog(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
detach("hrh2");
detach("hrh2", true);
}
}
void Peer::handleReadBody(const boost::system::error_code& error)
{
if (mDetaching)
{
boost::recursive_mutex::scoped_lock sl(ioMutex);
if (mDetaching)
{
return;
}
else if (error)
{
cLog(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
detach("hrb");
return;
}
return;
}
else if (error)
{
cLog(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
detach("hrb", true);
return;
}
processReadBuffer();
@@ -433,7 +424,7 @@ void Peer::processReadBuffer()
if (mHelloed == (type == ripple::mtHELLO))
{
cLog(lsWARNING) << "Wrong message type: " << type;
detach("prb1");
detach("prb1", true);
}
else
{
@@ -684,8 +675,8 @@ void Peer::recvHello(ripple::TMHello& packet)
(void) mActivityTimer.cancel();
mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS));
mActivityTimer.async_wait(boost::bind(&Peer::handlePingTimer, shared_from_this(),
boost::asio::placeholders::error));
mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(),
boost::asio::placeholders::error)));
uint32 ourTime = theApp->getOPs().getNetworkTimeNC();
uint32 minTime = ourTime - 20;
@@ -807,7 +798,7 @@ void Peer::recvHello(ripple::TMHello& packet)
if (bDetach)
{
mNodePublic.clear();
detach("recvh");
detach("recvh", true);
}
else
{
@@ -939,8 +930,9 @@ static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packe
if (isTrusted)
{
theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(),
proposal, packet, nodePublic, prevLedger, sigGood));
theApp->getJobQueue().addJob(jtPROPOSAL_t, "trustedProposal",
boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(),
proposal, packet, nodePublic, prevLedger, sigGood));
}
else if (sigGood && (prevLedger == consensusLCL))
{ // relay untrusted proposal
@@ -1139,7 +1131,7 @@ void Peer::recvGetPeers(ripple::TMGetPeers& packet)
}
PackedMessage::pointer message = boost::make_shared<PackedMessage>(peers, ripple::mtPEERS);
sendPacket(message);
sendPacket(message, true);
}
}
@@ -1200,7 +1192,7 @@ void Peer::recvGetObjectByHash(ripple::TMGetObjectByHash& packet)
}
cLog(lsTRACE) << "GetObjByHash had " << reply.objects_size() << " of " << packet.objects_size()
<< " for " << getIP();
sendPacket(boost::make_shared<PackedMessage>(reply, ripple::mtGET_OBJECTS));
sendPacket(boost::make_shared<PackedMessage>(reply, ripple::mtGET_OBJECTS), true);
}
else
{ // this is a reply
@@ -1248,7 +1240,7 @@ void Peer::recvPing(ripple::TMPing& packet)
if (packet.type() == ripple::TMPing::ptPING)
{
packet.set_type(ripple::TMPing::ptPONG);
sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtPING));
sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtPING), true);
}
else if (packet.type() == ripple::TMPing::ptPONG)
{
@@ -1419,7 +1411,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
}
Peer::ref selectedPeer = usablePeers[rand() % usablePeers.size()];
packet.set_requestcookie(getPeerId());
selectedPeer->sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtGET_LEDGER));
selectedPeer->sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtGET_LEDGER), false);
return;
}
cLog(lsERROR) << "We do not have the map our peer wants";
@@ -1466,7 +1458,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
}
Peer::ref selectedPeer = usablePeers[rand() % usablePeers.size()];
packet.set_requestcookie(getPeerId());
selectedPeer->sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtGET_LEDGER));
selectedPeer->sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtGET_LEDGER), false);
cLog(lsDEBUG) << "Ledger request routed";
return;
}
@@ -1536,7 +1528,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
}
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage>(reply, ripple::mtLEDGER_DATA);
sendPacket(oPacket);
sendPacket(oPacket, true);
return;
}
@@ -1611,7 +1603,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
}
}
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage>(reply, ripple::mtLEDGER_DATA);
sendPacket(oPacket);
sendPacket(oPacket, true);
}
void Peer::recvLedger(ripple::TMLedgerData& packet)
@@ -1629,7 +1621,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
if (target)
{
packet.clear_requestcookie();
target->sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtLEDGER_DATA));
target->sendPacket(boost::make_shared<PackedMessage>(packet, ripple::mtLEDGER_DATA), true);
}
else
{
@@ -1717,8 +1709,6 @@ void Peer::addTxSet(const uint256& hash)
// (both sides get the same information, neither side controls it)
void Peer::getSessionCookie(std::string& strDst)
{
boost::recursive_mutex::scoped_lock sl(ioMutex);
SSL* ssl = mSocketSsl.native_handle();
if (!ssl) throw std::runtime_error("No underlying connection");
@@ -1776,7 +1766,7 @@ void Peer::sendHello()
}
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(h, ripple::mtHELLO);
sendPacket(packet);
sendPacket(packet, true);
}
void Peer::sendGetPeers()
@@ -1788,7 +1778,7 @@ void Peer::sendGetPeers()
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(getPeers, ripple::mtGET_PEERS);
sendPacket(packet);
sendPacket(packet, true);
}
void Peer::punishPeer(LoadType l)
@@ -1817,7 +1807,7 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr<Peer> peer, ProofOfWork::pointer
ripple::TMProofWork reply;
reply.set_token(pow->getToken());
reply.set_response(solution.begin(), solution.size());
pptr->sendPacket(boost::make_shared<PackedMessage>(reply, ripple::mtPROOFOFWORK));
pptr->sendPacket(boost::make_shared<PackedMessage>(reply, ripple::mtPROOFOFWORK), false);
}
else
{

View File

@@ -62,12 +62,12 @@ private:
protected:
boost::recursive_mutex ioMutex;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
ripple::TMStatusChange mLastStatus;
ripple::TMHello mHello;
boost::asio::io_service::strand mIOStrand;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
ripple::TMStatusChange mLastStatus;
ripple::TMHello mHello;
Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerId, bool inbound);
@@ -133,11 +133,11 @@ public:
void connect(const std::string& strIp, int iPort);
void connected(const boost::system::error_code& error);
void detach(const char *);
void detach(const char *, bool onIOStrand);
bool samePeer(Peer::ref p) { return samePeer(*p); }
bool samePeer(const Peer& p) { return this == &p; }
void sendPacket(const PackedMessage::pointer& packet);
void sendPacket(const PackedMessage::pointer& packet, bool onStrand);
void sendLedgerProposal(Ledger::ref ledger);
void sendFullLedger(Ledger::ref ledger);
void sendGetFullLedger(uint256& hash);