Get ledger acquire work off the main thread and out of the master lock.

Prioritize ledger acquire work appripriately. This should help prevent the
server from becoming overly sluggish while acquiring ledgers. Still todo:
Finish all pending receive operations before sending out any data requests.
This commit is contained in:
JoelKatz
2013-03-15 22:34:55 -07:00
parent 7a76cfe7da
commit e392366c63
6 changed files with 45 additions and 30 deletions

View File

@@ -39,6 +39,7 @@ const char* Job::toString(JobType t)
case jtVALIDATION_ut: return "untrustedValidation";
case jtPROOFWORK: return "proofOfWork";
case jtPROPOSAL_ut: return "untrustedProposal";
case jtLEDGER_DATA: return "ledgerData";
case jtCLIENT: return "clientCommand";
case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishNewLedger";

View File

@@ -25,16 +25,17 @@ enum JobType
jtVALIDATION_ut = 2, // A validation from an untrusted source
jtPROOFWORK = 3, // A proof of work demand from another server
jtPROPOSAL_ut = 4, // A proposal from an untrusted source
jtCLIENT = 5, // A websocket command from the client
jtTRANSACTION = 6, // A transaction received from the network
jtPUBLEDGER = 7, // Publish a fully-accepted ledger
jtWAL = 8, // Write-ahead logging
jtVALIDATION_t = 9, // A validation from a trusted source
jtWRITE = 10, // Write out hashed objects
jtTRANSACTION_l = 11, // A local transaction
jtPROPOSAL_t = 12, // A proposal from a trusted source
jtADMIN = 13, // An administrative operation
jtDEATH = 14, // job of death, used internally
jtLEDGER_DATA = 5, // Received data for a ledger we're acquiring
jtCLIENT = 6, // A websocket command from the client
jtTRANSACTION = 7, // A transaction received from the network
jtPUBLEDGER = 8, // Publish a fully-accepted ledger
jtWAL = 9, // Write-ahead logging
jtVALIDATION_t = 10, // A validation from a trusted source
jtWRITE = 11, // Write out hashed objects
jtTRANSACTION_l = 12, // A local transaction
jtPROPOSAL_t = 13, // A proposal from a trusted source
jtADMIN = 14, // An administrative operation
jtDEATH = 15, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 24,

View File

@@ -477,6 +477,7 @@ void LedgerAcquire::trigger(Peer::ref peer)
{
cLog(lsDEBUG) << "Done:" << (mComplete ? " complete" : "") << (mFailed ? " failed " : " ")
<< mLedger->getLedgerSeq();
sl.unlock();
done();
}
}
@@ -830,13 +831,19 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash)
mLedgers.erase(hash);
}
SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref peer)
void LedgerAcquireMaster::gotLedgerData(Job&, boost::shared_ptr<ripple::TMLedgerData> packet_ptr,
boost::weak_ptr<Peer> wPeer)
{
ripple::TMLedgerData& packet = *packet_ptr;
Peer::pointer peer = wPeer.lock();
if (!peer)
return;
uint256 hash;
if (packet.ledgerhash().size() != 32)
{
std::cerr << "Acquire error" << std::endl;
return SMAddNode::invalid();
peer->punishPeer(LT_InvalidRequest);
return;
}
memcpy(hash.begin(), packet.ledgerhash().data(), 32);
cLog(lsTRACE) << "Got data (" << packet.nodes().size() << ") for acquiring ledger: " << hash;
@@ -845,7 +852,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer:
if (!ledger)
{
cLog(lsINFO) << "Got data for ledger we're not acquiring";
return SMAddNode();
peer->punishPeer(LT_InvalidRequest);
return;
}
if (packet.type() == ripple::liBASE)
@@ -853,12 +861,14 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer:
if (packet.nodes_size() < 1)
{
cLog(lsWARNING) << "Got empty base data";
return SMAddNode::invalid();
peer->punishPeer(LT_InvalidRequest);
return;
}
if (!ledger->takeBase(packet.nodes(0).nodedata()))
{
cLog(lsWARNING) << "Got invalid base data";
return SMAddNode::invalid();
peer->punishPeer(LT_InvalidRequest);
return;
}
SMAddNode san = SMAddNode::useful();
if ((packet.nodes().size() > 1) && !ledger->takeAsRootNode(strCopy(packet.nodes(1).nodedata()), san))
@@ -871,7 +881,7 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer:
}
if (!san.isInvalid())
ledger->trigger(peer);
return san;
return;
}
if ((packet.type() == ripple::liTX_NODE) || (packet.type() == ripple::liAS_NODE))
@@ -882,7 +892,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer:
if (packet.nodes().size() <= 0)
{
cLog(lsINFO) << "Got response with no nodes";
return SMAddNode::invalid();
peer->punishPeer(LT_InvalidRequest);
return;
}
for (int i = 0; i < packet.nodes().size(); ++i)
{
@@ -890,7 +901,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer:
if (!node.has_nodeid() || !node.has_nodedata())
{
cLog(lsWARNING) << "Got bad node";
return SMAddNode::invalid();
peer->punishPeer(LT_InvalidRequest);
return;
}
nodeIDs.push_back(SHAMapNode(node.nodeid().data(), node.nodeid().size()));
@@ -903,11 +915,11 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer:
ledger->takeAsNode(nodeIDs, nodeData, ret);
if (!ret.isInvalid())
ledger->trigger(peer);
return ret;
return;
}
cLog(lsWARNING) << "Not sure what ledger data we got";
return SMAddNode::invalid();
peer->punishPeer(LT_InvalidRequest);
}
void LedgerAcquireMaster::sweep()

View File

@@ -147,7 +147,7 @@ public:
LedgerAcquire::pointer find(const uint256& hash);
bool hasLedger(const uint256& ledgerHash);
void dropLedger(const uint256& ledgerHash);
SMAddNode gotLedgerData(ripple::TMLedgerData& packet, Peer::ref);
void gotLedgerData(Job&, boost::shared_ptr<ripple::TMLedgerData> packet, boost::weak_ptr<Peer> peer);
int getFetchCount(int& timeoutCount);
void logFailure(const uint256& h) { mRecentFailures.add(h); }

View File

@@ -590,8 +590,8 @@ void Peer::processReadBuffer()
case ripple::mtLEDGER_DATA:
{
event->reName("Peer::ledgerdata");
ripple::TMLedgerData msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
boost::shared_ptr<ripple::TMLedgerData> msg = boost::make_shared<ripple::TMLedgerData>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvLedger(msg);
else
cLog(lsWARNING) << "parse error: " << type;
@@ -1606,8 +1606,9 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
sendPacket(oPacket, true);
}
void Peer::recvLedger(ripple::TMLedgerData& packet)
void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr)
{
ripple::TMLedgerData& packet = *packet_ptr;
if (packet.nodes().size() <= 0)
{
cLog(lsWARNING) << "Ledger/TXset data with no nodes";
@@ -1664,9 +1665,9 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
return;
}
SMAddNode san = theApp->getMasterLedgerAcquire().gotLedgerData(packet, shared_from_this());
if (san.isInvalid())
punishPeer(LT_UnwantedData);
theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotLedgerData",
boost::bind(&LedgerAcquireMaster::gotLedgerData, &theApp->getMasterLedgerAcquire(),
_1, packet_ptr, boost::weak_ptr<Peer>(shared_from_this())));
}
bool Peer::hasLedger(const uint256& hash) const

View File

@@ -99,7 +99,7 @@ protected:
void recvGetAccount(ripple::TMGetAccount& packet);
void recvAccount(ripple::TMAccount& packet);
void recvGetLedger(ripple::TMGetLedger& packet);
void recvLedger(ripple::TMLedgerData& packet);
void recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet);
void recvStatus(ripple::TMStatusChange& packet);
void recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet);
void recvHaveTxSet(ripple::TMHaveTransactionSet& packet);