mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Performance fixes: decongest master lock
This commit is contained in:
committed by
Vinnie Falco
parent
2906899811
commit
572aae320d
@@ -702,7 +702,12 @@ private:
|
||||
if (ec)
|
||||
{
|
||||
m_journal.info << "ReadBody: " << ec.message ();
|
||||
|
||||
{
|
||||
Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__);
|
||||
detach ("hrb", true);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -889,7 +894,7 @@ private:
|
||||
|
||||
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
msgLen))
|
||||
recvGetPeers (msg, lock);
|
||||
recvGetPeers (msg);
|
||||
else
|
||||
m_journal.warning << "parse error: " << type;
|
||||
}
|
||||
@@ -967,7 +972,7 @@ private:
|
||||
|
||||
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
msgLen))
|
||||
recvTransaction (msg, lock);
|
||||
recvTransaction (msg);
|
||||
else
|
||||
m_journal.warning << "parse error: " << type;
|
||||
}
|
||||
@@ -989,7 +994,8 @@ private:
|
||||
case protocol::mtPROPOSE_LEDGER:
|
||||
{
|
||||
event->reName ("Peer::propose");
|
||||
boost::shared_ptr<protocol::TMProposeSet> msg = boost::make_shared<protocol::TMProposeSet> ();
|
||||
boost::shared_ptr<protocol::TMProposeSet> msg (
|
||||
boost::make_shared<protocol::TMProposeSet> ());
|
||||
|
||||
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
msgLen))
|
||||
@@ -1002,11 +1008,12 @@ private:
|
||||
case protocol::mtGET_LEDGER:
|
||||
{
|
||||
event->reName ("Peer::getledger");
|
||||
protocol::TMGetLedger msg;
|
||||
boost::shared_ptr<protocol::TMGetLedger> msg (
|
||||
boost::make_shared<protocol::TMGetLedger> ());
|
||||
|
||||
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
msgLen))
|
||||
recvGetLedger (msg, lock);
|
||||
recvGetLedger (msg);
|
||||
else
|
||||
m_journal.warning << "parse error: " << type;
|
||||
}
|
||||
@@ -1015,11 +1022,12 @@ private:
|
||||
case protocol::mtLEDGER_DATA:
|
||||
{
|
||||
event->reName ("Peer::ledgerdata");
|
||||
boost::shared_ptr<protocol::TMLedgerData> msg = boost::make_shared<protocol::TMLedgerData> ();
|
||||
boost::shared_ptr<protocol::TMLedgerData> msg (
|
||||
boost::make_shared<protocol::TMLedgerData> ());
|
||||
|
||||
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
msgLen))
|
||||
recvLedger (msg, lock);
|
||||
recvLedger (msg);
|
||||
else
|
||||
m_journal.warning << "parse error: " << type;
|
||||
}
|
||||
@@ -1041,11 +1049,12 @@ private:
|
||||
case protocol::mtVALIDATION:
|
||||
{
|
||||
event->reName ("Peer::validation");
|
||||
boost::shared_ptr<protocol::TMValidation> msg = boost::make_shared<protocol::TMValidation> ();
|
||||
boost::shared_ptr<protocol::TMValidation> msg (
|
||||
boost::make_shared<protocol::TMValidation> ());
|
||||
|
||||
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
msgLen))
|
||||
recvValidation (msg, lock);
|
||||
recvValidation (msg);
|
||||
else
|
||||
m_journal.warning << "parse error: " << type;
|
||||
}
|
||||
@@ -1073,7 +1082,7 @@ private:
|
||||
|
||||
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
|
||||
msgLen))
|
||||
recvGetObjectByHash (msg, lock);
|
||||
recvGetObjectByHash (msg);
|
||||
else
|
||||
m_journal.warning << "parse error: " << type;
|
||||
}
|
||||
@@ -1451,9 +1460,8 @@ private:
|
||||
}
|
||||
|
||||
|
||||
void recvTransaction (protocol::TMTransaction& packet, Application::ScopedLockType& masterLockHolder)
|
||||
void recvTransaction (protocol::TMTransaction& packet)
|
||||
{
|
||||
masterLockHolder.unlock ();
|
||||
Transaction::pointer tx;
|
||||
|
||||
#ifndef TRUST_NETWORK
|
||||
@@ -1511,10 +1519,9 @@ private:
|
||||
#endif
|
||||
}
|
||||
|
||||
void recvValidation (const boost::shared_ptr<protocol::TMValidation>& packet, Application::ScopedLockType& masterLockHolder)
|
||||
void recvValidation (const boost::shared_ptr<protocol::TMValidation>& packet)
|
||||
{
|
||||
uint32 closeTime = getApp().getOPs().getCloseTimeNC();
|
||||
masterLockHolder.unlock ();
|
||||
|
||||
if (packet->validation ().size () < 50)
|
||||
{
|
||||
@@ -1584,10 +1591,8 @@ private:
|
||||
|
||||
// Return a list of your favorite people
|
||||
// TODO: filter out all the LAN peers
|
||||
void recvGetPeers (protocol::TMGetPeers& packet, Application::ScopedLockType& masterLockHolder)
|
||||
void recvGetPeers (protocol::TMGetPeers& packet)
|
||||
{
|
||||
masterLockHolder.unlock ();
|
||||
|
||||
#if 0
|
||||
protocol::TMPeers peers;
|
||||
|
||||
@@ -1663,11 +1668,8 @@ private:
|
||||
m_peerFinder.onPeerEndpoints (m_remoteAddress, endpoints);
|
||||
}
|
||||
|
||||
void recvGetObjectByHash (const boost::shared_ptr<protocol::TMGetObjectByHash>& ptr,
|
||||
Application::ScopedLockType& masterLockHolder)
|
||||
void recvGetObjectByHash (const boost::shared_ptr<protocol::TMGetObjectByHash>& ptr)
|
||||
{
|
||||
masterLockHolder.unlock();
|
||||
|
||||
protocol::TMGetObjectByHash& packet = *ptr;
|
||||
|
||||
if (packet.query ())
|
||||
@@ -1804,303 +1806,19 @@ private:
|
||||
{
|
||||
}
|
||||
|
||||
void recvGetLedger (protocol::TMGetLedger& packet,
|
||||
Application::ScopedLockType& masterLockHolder)
|
||||
void recvGetLedger (boost::shared_ptr<protocol::TMGetLedger> const& packet)
|
||||
{
|
||||
SHAMap::pointer map;
|
||||
protocol::TMLedgerData reply;
|
||||
bool fatLeaves = true, fatRoot = false;
|
||||
|
||||
if (packet.has_requestcookie ())
|
||||
reply.set_requestcookie (packet.requestcookie ());
|
||||
|
||||
std::string logMe;
|
||||
|
||||
if (packet.itype () == protocol::liTS_CANDIDATE)
|
||||
{
|
||||
// Request is for a transaction candidate set
|
||||
m_journal.trace << "Received request for TX candidate set data " <<
|
||||
to_string (this);
|
||||
|
||||
if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32))
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
m_journal.warning << "invalid request for TX candidate set data";
|
||||
return;
|
||||
}
|
||||
|
||||
uint256 txHash;
|
||||
memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
|
||||
map = getApp().getOPs ().getTXMap (txHash);
|
||||
masterLockHolder.unlock();
|
||||
|
||||
if (!map)
|
||||
{
|
||||
if (packet.has_querytype () && !packet.has_requestcookie ())
|
||||
{
|
||||
m_journal.debug << "Trying to route TX set request";
|
||||
|
||||
Peers::PeerSequence usablePeers (m_peers.foreach (
|
||||
get_usable_peers (txHash, this)));
|
||||
|
||||
if (usablePeers.empty ())
|
||||
{
|
||||
m_journal.info << "Unable to route TX set request";
|
||||
return;
|
||||
}
|
||||
|
||||
Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()];
|
||||
packet.set_requestcookie (getShortId ());
|
||||
selectedPeer->sendPacket (
|
||||
boost::make_shared<PackedMessage> (packet, protocol::mtGET_LEDGER),
|
||||
false);
|
||||
return;
|
||||
}
|
||||
|
||||
m_journal.error << "We do not have the map our peer wants " <<
|
||||
to_string (this);
|
||||
charge (Resource::feeInvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
reply.set_ledgerseq (0);
|
||||
reply.set_ledgerhash (txHash.begin (), txHash.size ());
|
||||
reply.set_type (protocol::liTS_CANDIDATE);
|
||||
fatLeaves = false; // We'll already have most transactions
|
||||
fatRoot = true; // Save a pass
|
||||
}
|
||||
else
|
||||
{
|
||||
masterLockHolder.unlock ();
|
||||
|
||||
if (getApp().getFeeTrack().isLoadedLocal() && !m_clusterNode)
|
||||
{
|
||||
m_journal.debug << "Too busy to fetch ledger data";
|
||||
return;
|
||||
}
|
||||
|
||||
// Figure out what ledger they want
|
||||
m_journal.trace << "Received request for ledger data " <<
|
||||
to_string (this);
|
||||
Ledger::pointer ledger;
|
||||
|
||||
if (packet.has_ledgerhash ())
|
||||
{
|
||||
uint256 ledgerhash;
|
||||
|
||||
if (packet.ledgerhash ().size () != 32)
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
m_journal.warning << "Invalid request";
|
||||
return;
|
||||
}
|
||||
|
||||
memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32);
|
||||
logMe += "LedgerHash:";
|
||||
logMe += ledgerhash.GetHex ();
|
||||
ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash);
|
||||
|
||||
CondLog (!ledger, lsTRACE, Peer) << "Don't have ledger " << ledgerhash;
|
||||
|
||||
if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ()))
|
||||
{
|
||||
uint32 seq = 0;
|
||||
|
||||
if (packet.has_ledgerseq ())
|
||||
seq = packet.ledgerseq ();
|
||||
|
||||
Peers::PeerSequence peerList = m_peers.getActivePeers ();
|
||||
Peers::PeerSequence usablePeers;
|
||||
BOOST_FOREACH (Peer::ref peer, peerList)
|
||||
{
|
||||
if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this))
|
||||
usablePeers.push_back (peer);
|
||||
}
|
||||
|
||||
if (usablePeers.empty ())
|
||||
{
|
||||
m_journal.trace << "Unable to route ledger request";
|
||||
return;
|
||||
}
|
||||
|
||||
Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()];
|
||||
packet.set_requestcookie (getShortId ());
|
||||
selectedPeer->sendPacket (boost::make_shared<PackedMessage> (packet, protocol::mtGET_LEDGER), false);
|
||||
m_journal.debug << "Ledger request routed";
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (packet.has_ledgerseq ())
|
||||
{
|
||||
ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ());
|
||||
CondLog (!ledger, lsDEBUG, Peer) << "Don't have ledger " << packet.ledgerseq ();
|
||||
}
|
||||
else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT))
|
||||
ledger = getApp().getLedgerMaster ().getCurrentLedger ();
|
||||
else if (packet.has_ltype () && (packet.ltype () == protocol::ltCLOSED) )
|
||||
{
|
||||
ledger = getApp().getLedgerMaster ().getClosedLedger ();
|
||||
|
||||
if (ledger && !ledger->isClosed ())
|
||||
ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
m_journal.warning << "Can't figure out what ledger they want";
|
||||
return;
|
||||
}
|
||||
|
||||
if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ())))
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
|
||||
if (ShouldLog (lsWARNING, Peer))
|
||||
{
|
||||
if (ledger)
|
||||
Log (lsWARNING) << "Ledger has wrong sequence";
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Fill out the reply
|
||||
uint256 lHash = ledger->getHash ();
|
||||
reply.set_ledgerhash (lHash.begin (), lHash.size ());
|
||||
reply.set_ledgerseq (ledger->getLedgerSeq ());
|
||||
reply.set_type (packet.itype ());
|
||||
|
||||
if (packet.itype () == protocol::liBASE)
|
||||
{
|
||||
// they want the ledger base data
|
||||
m_journal.trace << "They want ledger base data";
|
||||
Serializer nData (128);
|
||||
ledger->addRaw (nData);
|
||||
reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ());
|
||||
|
||||
SHAMap::pointer map = ledger->peekAccountStateMap ();
|
||||
|
||||
if (map && map->getHash ().isNonZero ())
|
||||
{
|
||||
// return account state root node if possible
|
||||
Serializer rootNode (768);
|
||||
|
||||
if (map->getRootNode (rootNode, snfWIRE))
|
||||
{
|
||||
reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
|
||||
|
||||
if (ledger->getTransHash ().isNonZero ())
|
||||
{
|
||||
map = ledger->peekTransactionMap ();
|
||||
|
||||
if (map && map->getHash ().isNonZero ())
|
||||
{
|
||||
rootNode.erase ();
|
||||
|
||||
if (map->getRootNode (rootNode, snfWIRE))
|
||||
reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage> (reply, protocol::mtLEDGER_DATA);
|
||||
sendPacket (oPacket, true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (packet.itype () == protocol::liTX_NODE)
|
||||
{
|
||||
map = ledger->peekTransactionMap ();
|
||||
logMe += " TX:";
|
||||
logMe += map->getHash ().GetHex ();
|
||||
}
|
||||
else if (packet.itype () == protocol::liAS_NODE)
|
||||
{
|
||||
map = ledger->peekAccountStateMap ();
|
||||
logMe += " AS:";
|
||||
logMe += map->getHash ().GetHex ();
|
||||
}
|
||||
}
|
||||
|
||||
if ((!map) || (packet.nodeids_size () == 0))
|
||||
{
|
||||
m_journal.warning << "Can't find map or empty request";
|
||||
charge (Resource::feeInvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
m_journal.trace << "Request: " << logMe;
|
||||
|
||||
for (int i = 0; i < packet.nodeids ().size (); ++i)
|
||||
{
|
||||
SHAMapNode mn (packet.nodeids (i).data (), packet.nodeids (i).size ());
|
||||
|
||||
if (!mn.isValid ())
|
||||
{
|
||||
m_journal.warning << "Request for invalid node: " << logMe;
|
||||
charge (Resource::feeInvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<SHAMapNode> nodeIDs;
|
||||
std::list< Blob > rawNodes;
|
||||
|
||||
try
|
||||
{
|
||||
if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves))
|
||||
{
|
||||
assert (nodeIDs.size () == rawNodes.size ());
|
||||
m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes";
|
||||
std::vector<SHAMapNode>::iterator nodeIDIterator;
|
||||
std::list< Blob >::iterator rawNodeIterator;
|
||||
|
||||
for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin ();
|
||||
nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator)
|
||||
{
|
||||
Serializer nID (33);
|
||||
nodeIDIterator->addIDRaw (nID);
|
||||
protocol::TMLedgerNode* node = reply.add_nodes ();
|
||||
node->set_nodeid (nID.getDataPtr (), nID.getLength ());
|
||||
node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ());
|
||||
}
|
||||
}
|
||||
else
|
||||
m_journal.warning << "getNodeFat returns false";
|
||||
}
|
||||
catch (std::exception&)
|
||||
{
|
||||
std::string info;
|
||||
|
||||
if (packet.itype () == protocol::liTS_CANDIDATE)
|
||||
info = "TS candidate";
|
||||
else if (packet.itype () == protocol::liBASE)
|
||||
info = "Ledger base";
|
||||
else if (packet.itype () == protocol::liTX_NODE)
|
||||
info = "TX node";
|
||||
else if (packet.itype () == protocol::liAS_NODE)
|
||||
info = "AS node";
|
||||
|
||||
if (!packet.has_ledgerhash ())
|
||||
info += ", no hash specified";
|
||||
|
||||
m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info;
|
||||
}
|
||||
}
|
||||
|
||||
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage> (reply, protocol::mtLEDGER_DATA);
|
||||
sendPacket (oPacket, true);
|
||||
getApp().getJobQueue().addJob (jtPACK, "recvGetLedger",
|
||||
std::bind (&sGetLedger, boost::weak_ptr<Peer> (shared_from_this ()), packet));
|
||||
}
|
||||
|
||||
void recvLedger (const boost::shared_ptr<protocol::TMLedgerData>& packet_ptr, Application::ScopedLockType& masterLockHolder)
|
||||
void recvLedger (boost::shared_ptr<protocol::TMLedgerData> const& packet_ptr)
|
||||
{
|
||||
masterLockHolder.unlock ();
|
||||
protocol::TMLedgerData& packet = *packet_ptr;
|
||||
|
||||
if (packet.nodes ().size () <= 0)
|
||||
{
|
||||
m_journal.warning << "Ledger/TXset data with no nodes";
|
||||
charge (Resource::feeInvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -2297,7 +2015,13 @@ private:
|
||||
m_journal.trace << "Received " << (isTrusted ? "trusted" : "UNTRUSTED") <<
|
||||
" proposal from " << m_shortId;
|
||||
|
||||
uint256 consensusLCL = getApp().getOPs ().getConsensusLCL ();
|
||||
uint256 consensusLCL;
|
||||
|
||||
{
|
||||
Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__);
|
||||
consensusLCL = getApp().getOPs ().getConsensusLCL ();
|
||||
}
|
||||
|
||||
LedgerProposal::pointer proposal = boost::make_shared<LedgerProposal> (
|
||||
prevLedger.isNonZero () ? prevLedger : consensusLCL,
|
||||
set.proposeseq (), proposeHash, set.closetime (), signerPublic, suppression);
|
||||
@@ -2328,9 +2052,11 @@ private:
|
||||
if (packet.status () == protocol::tsHAVE)
|
||||
addTxSet (hash);
|
||||
|
||||
if (!getApp().getOPs ().hasTXSet (shared_from_this (), hash, packet.status ()))
|
||||
{
|
||||
charge (Resource::feeUnwantedData);
|
||||
Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__);
|
||||
|
||||
if (!getApp().getOPs ().hasTXSet (shared_from_this (), hash, packet.status ()))
|
||||
charge (Resource::feeUnwantedData);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2422,15 +2148,315 @@ private:
|
||||
|
||||
m_recentLedgers.push_back (hash);
|
||||
}
|
||||
|
||||
void getLedger (protocol::TMGetLedger& packet)
|
||||
{
|
||||
SHAMap::pointer map;
|
||||
protocol::TMLedgerData reply;
|
||||
bool fatLeaves = true, fatRoot = false;
|
||||
|
||||
if (packet.has_requestcookie ())
|
||||
reply.set_requestcookie (packet.requestcookie ());
|
||||
|
||||
std::string logMe;
|
||||
|
||||
if (packet.itype () == protocol::liTS_CANDIDATE)
|
||||
{
|
||||
// Request is for a transaction candidate set
|
||||
m_journal.trace << "Received request for TX candidate set data "
|
||||
<< to_string (this);
|
||||
|
||||
if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32))
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
m_journal.warning << "invalid request for TX candidate set data";
|
||||
return;
|
||||
}
|
||||
|
||||
uint256 txHash;
|
||||
memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
|
||||
|
||||
{
|
||||
Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__);
|
||||
map = getApp().getOPs ().getTXMap (txHash);
|
||||
}
|
||||
|
||||
if (!map)
|
||||
{
|
||||
if (packet.has_querytype () && !packet.has_requestcookie ())
|
||||
{
|
||||
m_journal.debug << "Trying to route TX set request";
|
||||
|
||||
Peers::PeerSequence usablePeers (m_peers.foreach (
|
||||
get_usable_peers (txHash, this)));
|
||||
|
||||
if (usablePeers.empty ())
|
||||
{
|
||||
m_journal.info << "Unable to route TX set request";
|
||||
return;
|
||||
}
|
||||
|
||||
Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()];
|
||||
packet.set_requestcookie (getShortId ());
|
||||
selectedPeer->sendPacket (
|
||||
boost::make_shared<PackedMessage> (packet, protocol::mtGET_LEDGER),
|
||||
false);
|
||||
return;
|
||||
}
|
||||
|
||||
m_journal.error << "We do not have the map our peer wants "
|
||||
<< to_string (this);
|
||||
|
||||
charge (Resource::feeInvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
reply.set_ledgerseq (0);
|
||||
reply.set_ledgerhash (txHash.begin (), txHash.size ());
|
||||
reply.set_type (protocol::liTS_CANDIDATE);
|
||||
fatLeaves = false; // We'll already have most transactions
|
||||
fatRoot = true; // Save a pass
|
||||
}
|
||||
else
|
||||
{
|
||||
if (getApp().getFeeTrack().isLoadedLocal() && !m_clusterNode)
|
||||
{
|
||||
m_journal.debug << "Too busy to fetch ledger data";
|
||||
return;
|
||||
}
|
||||
|
||||
// Figure out what ledger they want
|
||||
m_journal.trace << "Received request for ledger data "
|
||||
<< to_string (this);
|
||||
Ledger::pointer ledger;
|
||||
|
||||
if (packet.has_ledgerhash ())
|
||||
{
|
||||
uint256 ledgerhash;
|
||||
|
||||
if (packet.ledgerhash ().size () != 32)
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
m_journal.warning << "Invalid request";
|
||||
return;
|
||||
}
|
||||
|
||||
memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32);
|
||||
logMe += "LedgerHash:";
|
||||
logMe += ledgerhash.GetHex ();
|
||||
ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash);
|
||||
|
||||
if (m_journal.trace)
|
||||
m_journal.trace << "Don't have ledger " << ledgerhash;
|
||||
|
||||
if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ()))
|
||||
{
|
||||
uint32 seq = 0;
|
||||
|
||||
if (packet.has_ledgerseq ())
|
||||
seq = packet.ledgerseq ();
|
||||
|
||||
Peers::PeerSequence peerList = m_peers.getActivePeers ();
|
||||
Peers::PeerSequence usablePeers;
|
||||
BOOST_FOREACH (Peer::ref peer, peerList)
|
||||
{
|
||||
if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this))
|
||||
usablePeers.push_back (peer);
|
||||
}
|
||||
|
||||
if (usablePeers.empty ())
|
||||
{
|
||||
m_journal.trace << "Unable to route ledger request";
|
||||
return;
|
||||
}
|
||||
|
||||
Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()];
|
||||
packet.set_requestcookie (getShortId ());
|
||||
selectedPeer->sendPacket (
|
||||
boost::make_shared<PackedMessage> (packet, protocol::mtGET_LEDGER), false);
|
||||
m_journal.debug << "Ledger request routed";
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (packet.has_ledgerseq ())
|
||||
{
|
||||
ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ());
|
||||
if (m_journal.debug)
|
||||
m_journal.debug << "Don't have ledger " << packet.ledgerseq ();
|
||||
}
|
||||
else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT))
|
||||
{
|
||||
ledger = getApp().getLedgerMaster ().getCurrentLedger ();
|
||||
}
|
||||
else if (packet.has_ltype () && (packet.ltype () == protocol::ltCLOSED) )
|
||||
{
|
||||
ledger = getApp().getLedgerMaster ().getClosedLedger ();
|
||||
|
||||
if (ledger && !ledger->isClosed ())
|
||||
ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
m_journal.warning << "Can't figure out what ledger they want";
|
||||
return;
|
||||
}
|
||||
|
||||
if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ())))
|
||||
{
|
||||
charge (Resource::feeInvalidRequest);
|
||||
|
||||
if (m_journal.warning && ledger)
|
||||
m_journal.warning << "Ledger has wrong sequence";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Fill out the reply
|
||||
uint256 lHash = ledger->getHash ();
|
||||
reply.set_ledgerhash (lHash.begin (), lHash.size ());
|
||||
reply.set_ledgerseq (ledger->getLedgerSeq ());
|
||||
reply.set_type (packet.itype ());
|
||||
|
||||
if (packet.itype () == protocol::liBASE)
|
||||
{
|
||||
// they want the ledger base data
|
||||
m_journal.trace << "They want ledger base data";
|
||||
Serializer nData (128);
|
||||
ledger->addRaw (nData);
|
||||
reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ());
|
||||
|
||||
SHAMap::pointer map = ledger->peekAccountStateMap ();
|
||||
|
||||
if (map && map->getHash ().isNonZero ())
|
||||
{
|
||||
// return account state root node if possible
|
||||
Serializer rootNode (768);
|
||||
|
||||
if (map->getRootNode (rootNode, snfWIRE))
|
||||
{
|
||||
reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
|
||||
|
||||
if (ledger->getTransHash ().isNonZero ())
|
||||
{
|
||||
map = ledger->peekTransactionMap ();
|
||||
|
||||
if (map && map->getHash ().isNonZero ())
|
||||
{
|
||||
rootNode.erase ();
|
||||
|
||||
if (map->getRootNode (rootNode, snfWIRE))
|
||||
reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage> (reply, protocol::mtLEDGER_DATA);
|
||||
sendPacket (oPacket, false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (packet.itype () == protocol::liTX_NODE)
|
||||
{
|
||||
map = ledger->peekTransactionMap ();
|
||||
logMe += " TX:";
|
||||
logMe += map->getHash ().GetHex ();
|
||||
}
|
||||
else if (packet.itype () == protocol::liAS_NODE)
|
||||
{
|
||||
map = ledger->peekAccountStateMap ();
|
||||
logMe += " AS:";
|
||||
logMe += map->getHash ().GetHex ();
|
||||
}
|
||||
}
|
||||
|
||||
if (!map || (packet.nodeids_size () == 0))
|
||||
{
|
||||
m_journal.warning << "Can't find map or empty request";
|
||||
charge (Resource::feeInvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
m_journal.trace << "Request: " << logMe;
|
||||
|
||||
for (int i = 0; i < packet.nodeids ().size (); ++i)
|
||||
{
|
||||
SHAMapNode mn (packet.nodeids (i).data (), packet.nodeids (i).size ());
|
||||
|
||||
if (!mn.isValid ())
|
||||
{
|
||||
m_journal.warning << "Request for invalid node: " << logMe;
|
||||
charge (Resource::feeInvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<SHAMapNode> nodeIDs;
|
||||
std::list< Blob > rawNodes;
|
||||
|
||||
try
|
||||
{
|
||||
if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves))
|
||||
{
|
||||
assert (nodeIDs.size () == rawNodes.size ());
|
||||
m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes";
|
||||
std::vector<SHAMapNode>::iterator nodeIDIterator;
|
||||
std::list< Blob >::iterator rawNodeIterator;
|
||||
|
||||
for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin ();
|
||||
nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator)
|
||||
{
|
||||
Serializer nID (33);
|
||||
nodeIDIterator->addIDRaw (nID);
|
||||
protocol::TMLedgerNode* node = reply.add_nodes ();
|
||||
node->set_nodeid (nID.getDataPtr (), nID.getLength ());
|
||||
node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ());
|
||||
}
|
||||
}
|
||||
else
|
||||
m_journal.warning << "getNodeFat returns false";
|
||||
}
|
||||
catch (std::exception&)
|
||||
{
|
||||
std::string info;
|
||||
|
||||
if (packet.itype () == protocol::liTS_CANDIDATE)
|
||||
info = "TS candidate";
|
||||
else if (packet.itype () == protocol::liBASE)
|
||||
info = "Ledger base";
|
||||
else if (packet.itype () == protocol::liTX_NODE)
|
||||
info = "TX node";
|
||||
else if (packet.itype () == protocol::liAS_NODE)
|
||||
info = "AS node";
|
||||
|
||||
if (!packet.has_ledgerhash ())
|
||||
info += ", no hash specified";
|
||||
|
||||
m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info;
|
||||
}
|
||||
}
|
||||
|
||||
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage> (reply, protocol::mtLEDGER_DATA);
|
||||
sendPacket (oPacket, false);
|
||||
}
|
||||
|
||||
// This is dispatched by the job queue
|
||||
static void sGetLedger (boost::weak_ptr<Peer> wPeer,
|
||||
boost::shared_ptr<protocol::TMGetLedger> packet)
|
||||
{
|
||||
boost::shared_ptr<Peer> peer = wPeer.lock ();
|
||||
|
||||
if (peer)
|
||||
peer->getLedger (*packet);
|
||||
}
|
||||
|
||||
void addTxSet (uint256 const& hash)
|
||||
{
|
||||
boost::mutex::scoped_lock sl(m_recentLock);
|
||||
BOOST_FOREACH (uint256 const & set, m_recentTxSets)
|
||||
|
||||
if (set == hash)
|
||||
return;
|
||||
|
||||
if(std::find (m_recentTxSets.begin (), m_recentTxSets.end (), hash) != m_recentTxSets.end ())
|
||||
return;
|
||||
|
||||
if (m_recentTxSets.size () == 128)
|
||||
m_recentTxSets.pop_front ();
|
||||
|
||||
|
||||
@@ -132,6 +132,8 @@ public:
|
||||
|
||||
virtual bool hasLedger (uint256 const& hash, uint32 seq) const = 0;
|
||||
|
||||
virtual void getLedger (protocol::TMGetLedger &) = 0;
|
||||
|
||||
virtual void ledgerRange (uint32& minSeq, uint32& maxSeq) const = 0;
|
||||
|
||||
virtual bool hasTxSet (uint256 const& hash) const = 0;
|
||||
|
||||
@@ -299,9 +299,10 @@ public:
|
||||
tme.mutable_ipv4()->set_ipv4port (ep.address.port());
|
||||
|
||||
tme.set_hops (ep.hops);
|
||||
tme.set_version (1);
|
||||
}
|
||||
|
||||
tm.set_version (1);
|
||||
|
||||
PackedMessage::pointer msg (
|
||||
boost::make_shared <PackedMessage> (
|
||||
tm, protocol::mtENDPOINTS));
|
||||
|
||||
@@ -43,7 +43,7 @@ void parseAddresses (OutputSequence& out, InputIterator first, InputIterator las
|
||||
{
|
||||
while (first != last)
|
||||
{
|
||||
auto str (*first);
|
||||
auto const str (*first);
|
||||
++first;
|
||||
{
|
||||
IPAddress const addr (IPAddress::from_string (str));
|
||||
@@ -684,11 +684,11 @@ Config::Role Config::getAdminRole (Json::Value const& params, beast::IPAddress c
|
||||
{
|
||||
Config::Role role (Config::FORBID);
|
||||
|
||||
bool bPasswordSupplied =
|
||||
bool const bPasswordSupplied =
|
||||
params.isMember ("admin_user") ||
|
||||
params.isMember ("admin_password");
|
||||
|
||||
bool bPasswordRequired =
|
||||
bool const bPasswordRequired =
|
||||
! this->RPC_ADMIN_USER.empty () ||
|
||||
! this->RPC_ADMIN_PASSWORD.empty ();
|
||||
|
||||
@@ -722,7 +722,7 @@ Config::Role Config::getAdminRole (Json::Value const& params, beast::IPAddress c
|
||||
IPAddress const remote_addr (remoteIp.at_port (0));
|
||||
bool bAdminIP = false;
|
||||
|
||||
for (auto& allow_addr : RPC_ADMIN_ALLOW)
|
||||
for (auto const& allow_addr : RPC_ADMIN_ALLOW)
|
||||
{
|
||||
if (allow_addr == remote_addr)
|
||||
{
|
||||
|
||||
@@ -14,7 +14,10 @@ enum MessageType
|
||||
mtCONTACT = 11;
|
||||
mtGET_PEERS = 12;
|
||||
mtPEERS = 13;
|
||||
mtENDPOINTS = 14;
|
||||
// This field was previous used for PeerFinder ENDPOINTS messages. Since the
|
||||
// structure's required fields changed, the message number was changed.
|
||||
mtUNUSED_FIELD = 14;
|
||||
mtENDPOINTS = 15;
|
||||
|
||||
// operations for 'small' nodes
|
||||
mtSEARCH_TRANSACTION = 20;
|
||||
@@ -226,7 +229,7 @@ message TMGetPeers
|
||||
required uint32 doWeNeedThis = 1; // yes since you are asserting that the packet size isn't 0 in PackedMessage
|
||||
}
|
||||
|
||||
message TMIPv4EndPoint
|
||||
message TMIPv4Endpoint
|
||||
{
|
||||
required uint32 ipv4 = 1;
|
||||
|
||||
@@ -238,24 +241,24 @@ message TMIPv4EndPoint
|
||||
|
||||
message TMPeers
|
||||
{
|
||||
repeated TMIPv4EndPoint nodes = 1;
|
||||
repeated TMIPv4Endpoint nodes = 1;
|
||||
}
|
||||
|
||||
// An Endpoint describes a network peer that can accept incoming connections
|
||||
message TMEndpoint
|
||||
{
|
||||
required TMIPv4EndPoint ipv4 = 1;
|
||||
required TMIPv4Endpoint ipv4 = 1;
|
||||
required uint32 hops = 2;
|
||||
|
||||
// This field is used to allow the TMEndpoint message format to be modified
|
||||
// as necessary in the future.
|
||||
required uint32 version = 3;
|
||||
}
|
||||
|
||||
// An array of Endpoint messages
|
||||
message TMEndpoints
|
||||
{
|
||||
repeated TMEndpoint endpoints = 1;
|
||||
// This field is used to allow the TMEndpoints message format to be
|
||||
// modified as necessary in the future.
|
||||
required uint32 version = 1;
|
||||
|
||||
repeated TMEndpoint endpoints = 2;
|
||||
};
|
||||
|
||||
message TMSearchTransaction
|
||||
|
||||
Reference in New Issue
Block a user