Overlay improvements and bug fixes:

PeerImp::detach had a default argument graceful=true which did not
correctly close the socket and cause the Overlay to often hang on exit.
The logging for Overlay and Peers has been reworked. All the socket activity
is logged to Peers while protocol activity goes to Protocol. Every log line
is prefixed by a small integer ID unique to the connection.
* Removed graceful PeerImp::detach option
* Peer and Protocol log message handle respective types of logging
* Log messages prefixed with peer unique integer
* Prevent call to timer ancel from throwing an exception
This commit is contained in:
Vinnie Falco
2014-11-08 06:30:33 -08:00
parent f6985586ea
commit e442a2846d
2 changed files with 226 additions and 227 deletions

View File

@@ -38,7 +38,10 @@ PeerImp::PeerImp (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
OverlayImpl& overlay, Resource::Manager& resourceManager, OverlayImpl& overlay, Resource::Manager& resourceManager,
PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot) PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot)
: Child (overlay) : Child (overlay)
, journal_ (deprecatedLogs().journal("Peer")) , sink_(deprecatedLogs().journal("Peer"))
, p_sink_(deprecatedLogs().journal("Protocol"))
, journal_ (sink_)
, p_journal_(p_sink_)
, ssl_bundle_(std::move(ssl_bundle)) , ssl_bundle_(std::move(ssl_bundle))
, socket_ (ssl_bundle_->socket) , socket_ (ssl_bundle_->socket)
, stream_ (ssl_bundle_->stream) , stream_ (ssl_bundle_->stream)
@@ -53,6 +56,7 @@ PeerImp::PeerImp (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
, slot_ (slot) , slot_ (slot)
, message_stream_(*this) , message_stream_(*this)
{ {
setPrefix();
} }
PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, PeerImp::PeerImp (beast::IP::Endpoint remoteAddress,
@@ -61,7 +65,10 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress,
PeerFinder::Slot::ptr const& slot, PeerFinder::Slot::ptr const& slot,
std::shared_ptr<boost::asio::ssl::context> const& context) std::shared_ptr<boost::asio::ssl::context> const& context)
: Child (overlay) : Child (overlay)
, journal_ (deprecatedLogs().journal("Peer")) , sink_(deprecatedLogs().journal("Peer"))
, p_sink_(deprecatedLogs().journal("Protocol"))
, journal_ (sink_)
, p_journal_(p_sink_)
, ssl_bundle_(std::make_unique<beast::asio::ssl_bundle>( , ssl_bundle_(std::make_unique<beast::asio::ssl_bundle>(
context, io_service)) context, io_service))
, socket_ (ssl_bundle_->socket) , socket_ (ssl_bundle_->socket)
@@ -77,6 +84,7 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress,
, slot_ (slot) , slot_ (slot)
, message_stream_(*this) , message_stream_(*this)
{ {
setPrefix();
} }
PeerImp::~PeerImp () PeerImp::~PeerImp ()
@@ -106,7 +114,7 @@ PeerImp::start()
void void
PeerImp::close() PeerImp::close()
{ {
detach("close", false); detach("close");
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -223,7 +231,7 @@ PeerImp::json()
default: default:
// FIXME: do we really want this? // FIXME: do we really want this?
journal_.warning << p_journal_.warning <<
"Unknown status: " << last_status_.newstatus (); "Unknown status: " << last_status_.newstatus ();
} }
} }
@@ -300,6 +308,17 @@ PeerImp::hasRange (std::uint32_t uMin, std::uint32_t uMax)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Puts a string at the beginning of each string logged to the journal
void
PeerImp::setPrefix()
{
static std::atomic<std::size_t> id;
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << ++id << "] ";
sink_.prefix(ss.str());
p_sink_.prefix(ss.str());
}
/* Completion handlers for client role. /* Completion handlers for client role.
Logic steps: Logic steps:
1. Establish outgoing connection 1. Establish outgoing connection
@@ -935,8 +954,7 @@ PeerImp::error_code
PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m) PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
{ {
error_code ec; error_code ec;
timer_.cancel(ec);
timer_.cancel ();
std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ()); std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ());
std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds); std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds);
@@ -962,14 +980,12 @@ PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
if (m->nettime () > maxTime) if (m->nettime () > maxTime)
{ {
journal_.info << journal_.info <<
"Hello: Clock for " << *this << "Hello: Clock off by +" << m->nettime () - ourTime;
" is off by +" << m->nettime () - ourTime;
} }
else if (m->nettime () < minTime) else if (m->nettime () < minTime)
{ {
journal_.info << journal_.info <<
"Hello: Clock for " << *this << "Hello: Clock off by -" << ourTime - m->nettime ();
" is off by -" << ourTime - m->nettime ();
} }
} }
else if (m->protoversionmin () > to_packed (BuildInfo::getCurrentProtocol())) else if (m->protoversionmin () > to_packed (BuildInfo::getCurrentProtocol()))
@@ -1159,13 +1175,13 @@ PeerImp::on_message (std::shared_ptr <protocol::TMProofWork> const& m)
jtPROOFWORK, jtPROOFWORK,
"recvProof->doProof", "recvProof->doProof",
std::bind (&PeerImp::doProofOfWork, std::placeholders::_1, std::bind (&PeerImp::doProofOfWork, std::placeholders::_1,
std::weak_ptr <Peer> (shared_from_this ()), pow)); std::weak_ptr <PeerImp> (shared_from_this ()), pow));
#endif #endif
return ec; return ec;
} }
journal_.info << "Received in valid proof of work object from peer"; p_journal_.info << "Bad proof of work";
return ec; return ec;
} }
@@ -1322,8 +1338,8 @@ PeerImp::on_message (std::shared_ptr <protocol::TMTransaction> const& m)
return ec; return ec;
} }
journal_.debug << p_journal_.debug <<
"Got transaction from peer " << *this << ": " << txID; "Got tx " << txID;
if (clusterNode_) if (clusterNode_)
{ {
@@ -1343,21 +1359,21 @@ PeerImp::on_message (std::shared_ptr <protocol::TMTransaction> const& m)
} }
if (getApp().getJobQueue().getJobCount(jtTRANSACTION) > 100) if (getApp().getJobQueue().getJobCount(jtTRANSACTION) > 100)
journal_.info << "Transaction queue is full"; p_journal_.info << "Transaction queue is full";
else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240) else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240)
journal_.trace << "No new transactions until synchronized"; p_journal_.trace << "No new transactions until synchronized";
else else
getApp().getJobQueue ().addJob (jtTRANSACTION, getApp().getJobQueue ().addJob (jtTRANSACTION,
"recvTransaction->checkTransaction", "recvTransaction->checkTransaction",
std::bind ( std::bind (
&PeerImp::checkTransaction, std::placeholders::_1, &PeerImp::checkTransaction, std::placeholders::_1,
flags, stx, flags, stx,
std::weak_ptr<Peer> (shared_from_this ()))); std::weak_ptr<PeerImp> (shared_from_this ())));
} }
catch (...) catch (...)
{ {
journal_.warning << "Transaction invalid: " << p_journal_.warning << "Transaction invalid: " <<
s.getHex(); s.getHex();
} }
return ec; return ec;
@@ -1381,7 +1397,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMLedgerData> const& m)
if (m->nodes ().size () <= 0) if (m->nodes ().size () <= 0)
{ {
journal_.warning << "Ledger/TXset data with no nodes"; p_journal_.warning << "Ledger/TXset data with no nodes";
return ec; return ec;
} }
@@ -1397,7 +1413,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMLedgerData> const& m)
} }
else else
{ {
journal_.info << "Unable to route TX/ledger data reply"; p_journal_.info << "Unable to route TX/ledger data reply";
charge (Resource::feeUnwantedData); charge (Resource::feeUnwantedData);
} }
@@ -1408,7 +1424,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMLedgerData> const& m)
if (m->ledgerhash ().size () != 32) if (m->ledgerhash ().size () != 32)
{ {
journal_.warning << "TX candidate reply with invalid hash size"; p_journal_.warning << "TX candidate reply with invalid hash size";
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
return ec; return ec;
} }
@@ -1421,8 +1437,8 @@ PeerImp::on_message (std::shared_ptr <protocol::TMLedgerData> const& m)
getApp().getJobQueue().addJob (jtTXN_DATA, "recvPeerData", getApp().getJobQueue().addJob (jtTXN_DATA, "recvPeerData",
std::bind (&PeerImp::peerTXData, std::placeholders::_1, std::bind (&PeerImp::peerTXData, std::placeholders::_1,
std::weak_ptr<Peer> (shared_from_this ()), std::weak_ptr<PeerImp> (shared_from_this ()),
hash, m, journal_)); hash, m, p_journal_));
return ec; return ec;
} }
@@ -1430,7 +1446,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMLedgerData> const& m)
if (!getApp().getInboundLedgers ().gotLedgerData ( if (!getApp().getInboundLedgers ().gotLedgerData (
hash, shared_from_this(), m)) hash, shared_from_this(), m))
{ {
journal_.trace << "Got data for unwanted ledger"; p_journal_.trace << "Got data for unwanted ledger";
charge (Resource::feeUnwantedData); charge (Resource::feeUnwantedData);
} }
return ec; return ec;
@@ -1456,14 +1472,14 @@ PeerImp::on_message (std::shared_ptr <protocol::TMProposeSet> const& m)
(set.signature ().size () > 128) (set.signature ().size () > 128)
) )
{ {
journal_.warning << "Received proposal is malformed"; p_journal_.warning << "Proposal: malformed";
charge (Resource::feeInvalidSignature); charge (Resource::feeInvalidSignature);
return ec; return ec;
} }
if (set.has_previousledger () && (set.previousledger ().size () != 32)) if (set.has_previousledger () && (set.previousledger ().size () != 32))
{ {
journal_.warning << "Received proposal is malformed"; p_journal_.warning << "Proposal: malformed";
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
return ec; return ec;
} }
@@ -1482,8 +1498,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMProposeSet> const& m)
if (! getApp().getHashRouter ().addSuppressionPeer ( if (! getApp().getHashRouter ().addSuppressionPeer (
suppression, shortId_)) suppression, shortId_))
{ {
journal_.trace << p_journal_.trace << "Proposal: duplicate";
"Received duplicate proposal from peer " << shortId_;
return ec; return ec;
} }
@@ -1492,21 +1507,19 @@ PeerImp::on_message (std::shared_ptr <protocol::TMProposeSet> const& m)
if (signerPublic == getConfig ().VALIDATION_PUB) if (signerPublic == getConfig ().VALIDATION_PUB)
{ {
journal_.trace << p_journal_.trace << "Proposal: self";
"Received our own proposal from peer " << shortId_;
return ec; return ec;
} }
bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic); bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic);
if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ()) if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ())
{ {
journal_.debug << "Dropping UNTRUSTED proposal due to load"; p_journal_.debug << "Proposal: Dropping UNTRUSTED (load)";
return ec; return ec;
} }
journal_.trace << p_journal_.trace <<
"Received " << (isTrusted ? "trusted" : "UNTRUSTED") << "Proposal: " << (isTrusted ? "trusted" : "UNTRUSTED");
" proposal from " << shortId_;
uint256 consensusLCL; uint256 consensusLCL;
@@ -1524,8 +1537,8 @@ PeerImp::on_message (std::shared_ptr <protocol::TMProposeSet> const& m)
"recvPropose->checkPropose", std::bind ( "recvPropose->checkPropose", std::bind (
&PeerImp::checkPropose, std::placeholders::_1, &overlay_, &PeerImp::checkPropose, std::placeholders::_1, &overlay_,
m, proposal, consensusLCL, publicKey_, m, proposal, consensusLCL, publicKey_,
std::weak_ptr<Peer> (shared_from_this ()), clusterNode_, std::weak_ptr<PeerImp> (shared_from_this ()), clusterNode_,
journal_)); p_journal_));
return ec; return ec;
} }
@@ -1533,8 +1546,7 @@ PeerImp::error_code
PeerImp::on_message (std::shared_ptr <protocol::TMStatusChange> const& m) PeerImp::on_message (std::shared_ptr <protocol::TMStatusChange> const& m)
{ {
error_code ec; error_code ec;
journal_.trace << "Received status change from peer " << p_journal_.trace << "Status: Change";
to_string (this);
if (!m->has_networktime ()) if (!m->has_networktime ())
m->set_networktime (getApp().getOPs ().getNetworkTimeNC ()); m->set_networktime (getApp().getOPs ().getNetworkTimeNC ());
@@ -1553,7 +1565,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMStatusChange> const& m)
{ {
if (!closedLedgerHash_.isZero ()) if (!closedLedgerHash_.isZero ())
{ {
journal_.trace << "peer has lost sync " << to_string (this); p_journal_.trace << "Status: Out of sync";
closedLedgerHash_.zero (); closedLedgerHash_.zero ();
} }
@@ -1566,12 +1578,11 @@ PeerImp::on_message (std::shared_ptr <protocol::TMStatusChange> const& m)
// a peer has changed ledgers // a peer has changed ledgers
memcpy (closedLedgerHash_.begin (), m->ledgerhash ().data (), 256 / 8); memcpy (closedLedgerHash_.begin (), m->ledgerhash ().data (), 256 / 8);
addLedger (closedLedgerHash_); addLedger (closedLedgerHash_);
journal_.trace << "peer LCL is " << closedLedgerHash_ << p_journal_.trace << "LCL is " << closedLedgerHash_;
" " << to_string (this);
} }
else else
{ {
journal_.trace << "peer has no ledger hash" << to_string (this); p_journal_.trace << "Status: No ledger";
closedLedgerHash_.zero (); closedLedgerHash_.zero ();
} }
@@ -1638,7 +1649,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMValidation> const& m)
if (m->validation ().size () < 50) if (m->validation ().size () < 50)
{ {
journal_.warning << "Too small validation from peer"; p_journal_.warning << "Validation: Too small";
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
return ec; return ec;
} }
@@ -1652,7 +1663,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMValidation> const& m)
if (closeTime > (120 + val->getFieldU32(sfSigningTime))) if (closeTime > (120 + val->getFieldU32(sfSigningTime)))
{ {
journal_.trace << "Validation is more than two minutes old"; p_journal_.trace << "Validation: Too old";
charge (Resource::feeUnwantedData); charge (Resource::feeUnwantedData);
return ec; return ec;
} }
@@ -1660,7 +1671,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMValidation> const& m)
if (! getApp().getHashRouter ().addSuppressionPeer ( if (! getApp().getHashRouter ().addSuppressionPeer (
s.getSHA512Half(), shortId_)) s.getSHA512Half(), shortId_))
{ {
journal_.trace << "Validation is duplicate"; p_journal_.trace << "Validation: duplicate";
return ec; return ec;
} }
@@ -1672,19 +1683,25 @@ PeerImp::on_message (std::shared_ptr <protocol::TMValidation> const& m)
"recvValidation->checkValidation", "recvValidation->checkValidation",
std::bind (&PeerImp::checkValidation, std::placeholders::_1, std::bind (&PeerImp::checkValidation, std::placeholders::_1,
&overlay_, val, isTrusted, clusterNode_, m, &overlay_, val, isTrusted, clusterNode_, m,
std::weak_ptr<Peer> (shared_from_this ()), std::weak_ptr<PeerImp> (shared_from_this ()),
journal_)); p_journal_));
} }
else else
{ {
journal_.debug << p_journal_.debug <<
"Dropping UNTRUSTED validation due to load"; "Validation: Dropping UNTRUSTED (load)";
} }
} }
catch (std::exception const& e)
{
p_journal_.warning <<
"Validation: Exception, " << e.what();
charge (Resource::feeInvalidRequest);
}
catch (...) catch (...)
{ {
journal_.warning << p_journal_.warning <<
"Exception processing validation"; "Validation: Unknown exception";
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
} }
@@ -1748,9 +1765,9 @@ PeerImp::on_message (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
} }
} }
journal_.trace << "GetObjByHash had " << reply.objects_size () << p_journal_.trace <<
" of " << packet.objects_size () << "GetObj: " << reply.objects_size () <<
" for " << to_string (this); " of " << packet.objects_size ();
send (std::make_shared<Message> (reply, protocol::mtGET_OBJECTS)); send (std::make_shared<Message> (reply, protocol::mtGET_OBJECTS));
} }
else else
@@ -1771,16 +1788,16 @@ PeerImp::on_message (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
if (obj.ledgerseq () != pLSeq) if (obj.ledgerseq () != pLSeq)
{ {
if ((pLDo && (pLSeq != 0)) && if ((pLDo && (pLSeq != 0)) &&
journal_.active(beast::Journal::Severity::kDebug)) p_journal_.active(beast::Journal::Severity::kDebug))
journal_.debug << p_journal_.debug <<
"Received full fetch pack for " << pLSeq; "GetObj: Full fetch pack for " << pLSeq;
pLSeq = obj.ledgerseq (); pLSeq = obj.ledgerseq ();
pLDo = !getApp().getOPs ().haveLedger (pLSeq); pLDo = !getApp().getOPs ().haveLedger (pLSeq);
if (!pLDo) if (!pLDo)
journal_.debug << p_journal_.debug <<
"Got pack for " << pLSeq << " too late"; "GetObj: Late fetch pack for " << pLSeq;
else else
progress = true; progress = true;
} }
@@ -1801,8 +1818,8 @@ PeerImp::on_message (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
} }
if ((pLDo && (pLSeq != 0)) && if ((pLDo && (pLSeq != 0)) &&
journal_.active(beast::Journal::Severity::kDebug)) p_journal_.active(beast::Journal::Severity::kDebug))
journal_.debug << "Received partial fetch pack for " << pLSeq; p_journal_.debug << "GetObj: Partial fetch pack for " << pLSeq;
if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK) if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK)
getApp().getOPs ().gotFetchPack (progress, pLSeq); getApp().getOPs ().gotFetchPack (progress, pLSeq);
@@ -1828,13 +1845,13 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (packet.itype () == protocol::liTS_CANDIDATE) if (packet.itype () == protocol::liTS_CANDIDATE)
{ {
// Request is for a transaction candidate set // Request is for a transaction candidate set
journal_.trace << "Received request for TX candidate set data " p_journal_.trace <<
<< to_string (this); "GetLedger: Tx candidate set";
if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32)) if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32))
{ {
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
journal_.warning << "invalid request for TX candidate set data"; p_journal_.warning << "GetLedger: Tx candidate set invalid";
return; return;
} }
@@ -1850,7 +1867,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
{ {
if (packet.has_querytype () && !packet.has_requestcookie ()) if (packet.has_querytype () && !packet.has_requestcookie ())
{ {
journal_.debug << "Trying to route TX set request"; p_journal_.debug <<
"GetLedger: Routing Tx set request";
struct get_usable_peers struct get_usable_peers
{ {
@@ -1881,7 +1899,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (usablePeers.empty ()) if (usablePeers.empty ())
{ {
journal_.info << "Unable to route TX set request"; p_journal_.info <<
"GetLedger: Route TX set failed";
return; return;
} }
@@ -1893,9 +1912,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
return; return;
} }
journal_.error << "We do not have the map our peer wants " p_journal_.error <<
<< to_string (this); "GetLedger: Can't provide map ";
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
return; return;
} }
@@ -1910,13 +1928,14 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
{ {
if (getApp().getFeeTrack().isLoadedLocal() && !clusterNode_) if (getApp().getFeeTrack().isLoadedLocal() && !clusterNode_)
{ {
journal_.debug << "Too busy to fetch ledger data"; p_journal_.debug <<
"GetLedger: Too busy";
return; return;
} }
// Figure out what ledger they want // Figure out what ledger they want
journal_.trace << "Received request for ledger data " p_journal_.trace <<
<< to_string (this); "GetLedger: Received";
Ledger::pointer ledger; Ledger::pointer ledger;
if (packet.has_ledgerhash ()) if (packet.has_ledgerhash ())
@@ -1926,7 +1945,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (packet.ledgerhash ().size () != 32) if (packet.ledgerhash ().size () != 32)
{ {
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
journal_.warning << "Invalid request"; p_journal_.warning <<
"GetLedger: Invalid request";
return; return;
} }
@@ -1935,8 +1955,9 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
logMe += to_string (ledgerhash); logMe += to_string (ledgerhash);
ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash); ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash);
if (!ledger && journal_.trace) if (!ledger && p_journal_.trace)
journal_.trace << "Don't have ledger " << ledgerhash; p_journal_.trace <<
"GetLedger: Don't have " << ledgerhash;
if (!ledger && (packet.has_querytype () && if (!ledger && (packet.has_querytype () &&
!packet.has_requestcookie ())) !packet.has_requestcookie ()))
@@ -1956,7 +1977,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (usablePeers.empty ()) if (usablePeers.empty ())
{ {
journal_.trace << "Unable to route ledger request"; p_journal_.trace <<
"GetLedger: Cannot route";
return; return;
} }
@@ -1965,7 +1987,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
packet.set_requestcookie (getShortId ()); packet.set_requestcookie (getShortId ());
selectedPeer->send ( selectedPeer->send (
std::make_shared<Message> (packet, protocol::mtGET_LEDGER)); std::make_shared<Message> (packet, protocol::mtGET_LEDGER));
journal_.debug << "Ledger request routed"; p_journal_.debug <<
"GetLedger: Request routed";
return; return;
} }
} }
@@ -1974,13 +1997,15 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (packet.ledgerseq() < if (packet.ledgerseq() <
getApp().getLedgerMaster().getEarliestFetch()) getApp().getLedgerMaster().getEarliestFetch())
{ {
journal_.debug << "Peer requests early ledger"; p_journal_.debug <<
"GetLedger: Early ledger request";
return; return;
} }
ledger = getApp().getLedgerMaster ().getLedgerBySeq ( ledger = getApp().getLedgerMaster ().getLedgerBySeq (
packet.ledgerseq ()); packet.ledgerseq ());
if (!ledger && journal_.debug) if (!ledger && p_journal_.debug)
journal_.debug << "Don't have ledger " << packet.ledgerseq (); p_journal_.debug <<
"GetLedger: Don't have " << packet.ledgerseq ();
} }
else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT)) else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT))
{ {
@@ -1997,7 +2022,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
else else
{ {
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
journal_.warning << "Can't figure out what ledger they want"; p_journal_.warning <<
"GetLedger: Unknown request";
return; return;
} }
@@ -2006,8 +2032,9 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
{ {
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
if (journal_.warning && ledger) if (p_journal_.warning && ledger)
journal_.warning << "Ledger has wrong sequence"; p_journal_.warning <<
"GetLedger: Invalid sequence";
return; return;
} }
@@ -2015,7 +2042,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() <
getApp().getLedgerMaster().getEarliestFetch())) getApp().getLedgerMaster().getEarliestFetch()))
{ {
journal_.debug << "Peer requests early ledger"; p_journal_.debug <<
"GetLedger: Early ledger request";
return; return;
} }
@@ -2028,7 +2056,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (packet.itype () == protocol::liBASE) if (packet.itype () == protocol::liBASE)
{ {
// they want the ledger base data // they want the ledger base data
journal_.trace << "They want ledger base data"; p_journal_.trace <<
"GetLedger: Base data";
Serializer nData (128); Serializer nData (128);
ledger->addRaw (nData); ledger->addRaw (nData);
reply.add_nodes ()->set_nodedata ( reply.add_nodes ()->set_nodedata (
@@ -2085,12 +2114,14 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (!map || (packet.nodeids_size () == 0)) if (!map || (packet.nodeids_size () == 0))
{ {
journal_.warning << "Can't find map or empty request"; p_journal_.warning <<
"GetLedger: Can't find map or empty request";
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
return; return;
} }
journal_.trace << "Request: " << logMe; p_journal_.trace <<
"GetLeder: " << logMe;
for (int i = 0; i < packet.nodeids ().size (); ++i) for (int i = 0; i < packet.nodeids ().size (); ++i)
{ {
@@ -2098,7 +2129,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (!mn.isValid ()) if (!mn.isValid ())
{ {
journal_.warning << "Request for invalid node: " << logMe; p_journal_.warning <<
"GetLedger: Invalid node " << logMe;
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
return; return;
} }
@@ -2111,8 +2143,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves))
{ {
assert (nodeIDs.size () == rawNodes.size ()); assert (nodeIDs.size () == rawNodes.size ());
journal_.trace << p_journal_.trace <<
"getNodeFat got " << rawNodes.size () << " nodes"; "GetLedger: getNodeFat got " << rawNodes.size () << " nodes";
std::vector<SHAMapNodeID>::iterator nodeIDIterator; std::vector<SHAMapNodeID>::iterator nodeIDIterator;
std::list< Blob >::iterator rawNodeIterator; std::list< Blob >::iterator rawNodeIterator;
@@ -2130,7 +2162,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
} }
} }
else else
journal_.warning << "getNodeFat returns false"; p_journal_.warning <<
"GetLedger: getNodeFat returns false";
} }
catch (std::exception&) catch (std::exception&)
{ {
@@ -2148,7 +2181,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
if (!packet.has_ledgerhash ()) if (!packet.has_ledgerhash ())
info += ", no hash specified"; info += ", no hash specified";
journal_.warning << p_journal_.warning <<
"getNodeFat( " << mn << ") throws exception: " << info; "getNodeFat( " << mn << ") throws exception: " << info;
} }
} }
@@ -2161,12 +2194,12 @@ PeerImp::getLedger (protocol::TMGetLedger& packet)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void
PeerImp::detach (const char* rsn, bool graceful) PeerImp::detach (const char* rsn)
{ {
if (! strand_.running_in_this_thread ()) if (! strand_.running_in_this_thread ())
{ {
strand_.post (std::bind (&PeerImp::detach, strand_.post (std::bind (&PeerImp::detach,
shared_from_this (), rsn, graceful)); shared_from_this (), rsn));
return; return;
} }
@@ -2183,25 +2216,14 @@ PeerImp::detach (const char* rsn, bool graceful)
state_ = stateGracefulClose; state_ = stateGracefulClose;
if (clusterNode_ && journal_.active(beast::Journal::Severity::kWarning)) if (clusterNode_ && p_journal_.active(beast::Journal::Severity::kWarning))
journal_.warning << "Cluster peer " << name_ << journal_.warning <<
" detached: " << rsn; name_ << " left cluster: " << rsn;
send_queue_.clear (); send_queue_.clear ();
error_code ec;
(void) timer_.cancel (); timer_.cancel(ec);
stream_.next_layer().close(ec);
if (graceful)
{
stream_.async_shutdown (strand_.wrap (std::bind(
&PeerImp::handleShutdown, shared_from_this(),
beast::asio::placeholders::error)));
}
else
{
error_code ec;
stream_.next_layer().cancel(ec);
}
// VFALCO TODO Stop doing this. // VFALCO TODO Stop doing this.
if (publicKey_.isValid ()) if (publicKey_.isValid ())
@@ -2226,7 +2248,7 @@ PeerImp::sendGetPeers ()
} }
void void
PeerImp::charge (std::weak_ptr <Peer>& peer, Resource::Charge const& fee) PeerImp::charge (std::weak_ptr <PeerImp>& peer, Resource::Charge const& fee)
{ {
Peer::ptr p (peer.lock()); Peer::ptr p (peer.lock());
@@ -2401,13 +2423,13 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
(getApp().getLedgerMaster().getValidatedLedgerAge() > 40) || (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) ||
(getApp().getJobQueue().getJobCount(jtPACK) > 10)) (getApp().getJobQueue().getJobCount(jtPACK) > 10))
{ {
journal_.info << "Too busy to make fetch pack"; p_journal_.info << "Too busy to make fetch pack";
return; return;
} }
if (packet->ledgerhash ().size () != 32) if (packet->ledgerhash ().size () != 32)
{ {
journal_.warning << "FetchPack hash size malformed"; p_journal_.warning << "FetchPack hash size malformed";
charge (Resource::feeInvalidRequest); charge (Resource::feeInvalidRequest);
return; return;
} }
@@ -2417,12 +2439,12 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack",
std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (),
std::placeholders::_1, std::weak_ptr<Peer> (shared_from_this ()), std::placeholders::_1, std::weak_ptr<PeerImp> (shared_from_this ()),
packet, hash, UptimeTimer::getInstance ().getElapsedSeconds ())); packet, hash, UptimeTimer::getInstance ().getElapsedSeconds ()));
} }
void void
PeerImp::doProofOfWork (Job&, std::weak_ptr <Peer> peer, PeerImp::doProofOfWork (Job&, std::weak_ptr <PeerImp> peer,
ProofOfWork::pointer pow) ProofOfWork::pointer pow)
{ {
if (peer.expired ()) if (peer.expired ())
@@ -2432,7 +2454,7 @@ PeerImp::doProofOfWork (Job&, std::weak_ptr <Peer> peer,
if (solution.isZero ()) if (solution.isZero ())
{ {
journal_.warning << "Failed to solve proof of work"; p_journal_.warning << "Failed to solve proof of work";
} }
else else
{ {
@@ -2455,7 +2477,7 @@ PeerImp::doProofOfWork (Job&, std::weak_ptr <Peer> peer,
void void
PeerImp::checkTransaction (Job&, int flags, PeerImp::checkTransaction (Job&, int flags,
SerializedTransaction::pointer stx, std::weak_ptr<Peer> peer) SerializedTransaction::pointer stx, std::weak_ptr<PeerImp> peer)
{ {
// VFALCO TODO Rewrite to not use exceptions // VFALCO TODO Rewrite to not use exceptions
try try
@@ -2497,7 +2519,7 @@ void
PeerImp::checkPropose (Job& job, Overlay* pPeers, PeerImp::checkPropose (Job& job, Overlay* pPeers,
std::shared_ptr <protocol::TMProposeSet> packet, std::shared_ptr <protocol::TMProposeSet> packet,
LedgerProposal::pointer proposal, uint256 consensusLCL, LedgerProposal::pointer proposal, uint256 consensusLCL,
RippleAddress nodePublic, std::weak_ptr<Peer> peer, RippleAddress nodePublic, std::weak_ptr<PeerImp> peer,
bool fromCluster, beast::Journal journal) bool fromCluster, beast::Journal journal)
{ {
bool sigGood = false; bool sigGood = false;
@@ -2522,7 +2544,7 @@ PeerImp::checkPropose (Job& job, Overlay* pPeers,
{ {
Peer::ptr p = peer.lock (); Peer::ptr p = peer.lock ();
journal.warning << journal.warning <<
"proposal with previous ledger fails sig check: " << *p; "Proposal with previous ledger fails sig check";
charge (peer, Resource::feeInvalidSignature); charge (peer, Resource::feeInvalidSignature);
return; return;
} }
@@ -2576,7 +2598,7 @@ void
PeerImp::checkValidation (Job&, Overlay* pPeers, PeerImp::checkValidation (Job&, Overlay* pPeers,
SerializedValidation::pointer val, bool isTrusted, bool isCluster, SerializedValidation::pointer val, bool isTrusted, bool isCluster,
std::shared_ptr<protocol::TMValidation> packet, std::shared_ptr<protocol::TMValidation> packet,
std::weak_ptr<Peer> peer, beast::Journal journal) std::weak_ptr<PeerImp> peer, beast::Journal journal)
{ {
try try
{ {
@@ -2590,10 +2612,10 @@ PeerImp::checkValidation (Job&, Overlay* pPeers,
} }
std::string source; std::string source;
Peer::ptr lp = peer.lock (); PeerImp::ptr lp = peer.lock ();
if (lp) if (lp)
source = to_string(*lp); source = std::to_string(lp->getShortId());
else else
source = "unknown"; source = "unknown";
@@ -2641,10 +2663,10 @@ PeerImp::sGetLedger (std::weak_ptr<PeerImp> wPeer,
// VFALCO TODO Make this non-static // VFALCO TODO Make this non-static
void void
PeerImp::peerTXData (Job&, std::weak_ptr <Peer> wPeer, uint256 const& hash, PeerImp::peerTXData (Job&, std::weak_ptr <PeerImp> wPeer, uint256 const& hash,
std::shared_ptr <protocol::TMLedgerData> pPacket, beast::Journal journal) std::shared_ptr <protocol::TMLedgerData> pPacket, beast::Journal journal)
{ {
std::shared_ptr <Peer> peer = wPeer.lock (); std::shared_ptr <PeerImp> peer = wPeer.lock ();
if (!peer) if (!peer)
return; return;

View File

@@ -45,20 +45,6 @@ namespace ripple {
class PeerImp; class PeerImp;
std::string to_string (Peer const& peer);
std::ostream& operator<< (std::ostream& os, Peer const& peer);
std::string to_string (Peer const* peer);
std::ostream& operator<< (std::ostream& os, Peer const* peer);
std::string to_string (PeerImp const& peer);
std::ostream& operator<< (std::ostream& os, PeerImp const& peer);
std::string to_string (PeerImp const* peer);
std::ostream& operator<< (std::ostream& os, PeerImp const* peer);
//------------------------------------------------------------------------------
class PeerImp class PeerImp
: public Peer : public Peer
, public std::enable_shared_from_this <PeerImp> , public std::enable_shared_from_this <PeerImp>
@@ -89,6 +75,66 @@ public:
typedef std::shared_ptr <PeerImp> ptr; typedef std::shared_ptr <PeerImp> ptr;
private: private:
/** Wraps a Journal::Sink to prefix it's output. */
class WrappedSink : public beast::Journal::Sink
{
private:
std::string prefix_;
beast::Journal::Sink& sink_;
public:
explicit
WrappedSink (beast::Journal::Sink& sink)
: sink_ (sink)
{
}
explicit
WrappedSink (beast::Journal const& journal)
: sink_ (journal.sink())
{
}
void prefix (std::string const& s)
{
prefix_ = s;
}
bool
active (beast::Journal::Severity level) const override
{
return sink_.active (level);
}
bool
console () const override
{
return sink_.console ();
}
void console (bool output) override
{
sink_.console (output);
}
beast::Journal::Severity
severity() const
{
return sink_.severity();
}
void severity (beast::Journal::Severity level)
{
sink_.severity (level);
}
void write (beast::Journal::Severity level, std::string const& text)
{
using beast::Journal;
sink_.write (level, prefix_ + text);
}
};
using clock_type = std::chrono::steady_clock; using clock_type = std::chrono::steady_clock;
using error_code= boost::system::error_code ; using error_code= boost::system::error_code ;
using yield_context = boost::asio::yield_context; using yield_context = boost::asio::yield_context;
@@ -104,7 +150,10 @@ private:
// The length of the smallest valid finished message // The length of the smallest valid finished message
static const size_t sslMinimumFinishedLength = 12; static const size_t sslMinimumFinishedLength = 12;
WrappedSink sink_;
WrappedSink p_sink_;
beast::Journal journal_; beast::Journal journal_;
beast::Journal p_journal_;
std::unique_ptr<beast::asio::ssl_bundle> ssl_bundle_; std::unique_ptr<beast::asio::ssl_bundle> ssl_bundle_;
socket_type& socket_; socket_type& socket_;
stream_type& stream_; stream_type& stream_;
@@ -286,6 +335,9 @@ public:
hasRange (std::uint32_t uMin, std::uint32_t uMax) override; hasRange (std::uint32_t uMin, std::uint32_t uMax) override;
private: private:
void
setPrefix();
// //
// client role // client role
// //
@@ -413,23 +465,16 @@ private:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
/** Disconnect a peer // DEPRECATED Close the socket
The peer transitions from its current state into `stateGracefulClose`
@param rsn a code indicating why the peer was disconnected
@param onIOStrand true if called on an I/O strand. It if is not, then
a callback will be queued up.
*/
void void
detach (const char* rsn, bool graceful = true); detach (const char* rsn);
void void
sendGetPeers (); sendGetPeers ();
static static
void void
charge (std::weak_ptr <Peer>& peer, Resource::Charge const& fee); charge (std::weak_ptr <PeerImp>& peer, Resource::Charge const& fee);
void void
sendForce (const Message::pointer& packet); sendForce (const Message::pointer& packet);
@@ -482,11 +527,11 @@ private:
doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet); doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet);
void void
doProofOfWork (Job&, std::weak_ptr <Peer> peer, ProofOfWork::pointer pow); doProofOfWork (Job&, std::weak_ptr <PeerImp> peer, ProofOfWork::pointer pow);
static static
void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx, void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx,
std::weak_ptr<Peer> peer); std::weak_ptr<PeerImp> peer);
// Called from our JobQueue // Called from our JobQueue
static static
@@ -494,7 +539,7 @@ private:
checkPropose (Job& job, Overlay* pPeers, checkPropose (Job& job, Overlay* pPeers,
std::shared_ptr<protocol::TMProposeSet> packet, std::shared_ptr<protocol::TMProposeSet> packet,
LedgerProposal::pointer proposal, uint256 consensusLCL, LedgerProposal::pointer proposal, uint256 consensusLCL,
RippleAddress nodePublic, std::weak_ptr<Peer> peer, RippleAddress nodePublic, std::weak_ptr<PeerImp> peer,
bool fromCluster, beast::Journal journal); bool fromCluster, beast::Journal journal);
static static
@@ -502,7 +547,7 @@ private:
checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val,
bool isTrusted, bool isCluster, bool isTrusted, bool isCluster,
std::shared_ptr<protocol::TMValidation> packet, std::shared_ptr<protocol::TMValidation> packet,
std::weak_ptr<Peer> peer, beast::Journal journal); std::weak_ptr<PeerImp> peer, beast::Journal journal);
static static
void void
@@ -512,7 +557,7 @@ private:
/** Called when we receive tx set data. */ /** Called when we receive tx set data. */
static static
void void
peerTXData (Job&, std::weak_ptr <Peer> wPeer, uint256 const& hash, peerTXData (Job&, std::weak_ptr <PeerImp> wPeer, uint256 const& hash,
std::shared_ptr <protocol::TMLedgerData> pPacket, std::shared_ptr <protocol::TMLedgerData> pPacket,
beast::Journal journal); beast::Journal journal);
}; };
@@ -526,7 +571,10 @@ PeerImp::PeerImp (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
PeerFinder::Manager& peerFinder, PeerFinder::Manager& peerFinder,
PeerFinder::Slot::ptr const& slot) PeerFinder::Slot::ptr const& slot)
: Child (overlay) : Child (overlay)
, journal_ (deprecatedLogs().journal("Peer")) , sink_(deprecatedLogs().journal("Peer"))
, p_sink_(deprecatedLogs().journal("Protocol"))
, journal_ (sink_)
, p_journal_(p_sink_)
, ssl_bundle_(std::move(ssl_bundle)) , ssl_bundle_(std::move(ssl_bundle))
, socket_ (ssl_bundle_->socket) , socket_ (ssl_bundle_->socket)
, stream_ (ssl_bundle_->stream) , stream_ (ssl_bundle_->stream)
@@ -541,6 +589,7 @@ PeerImp::PeerImp (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
, slot_ (slot) , slot_ (slot)
, message_stream_(*this) , message_stream_(*this)
{ {
setPrefix();
read_buffer_.commit(boost::asio::buffer_copy(read_buffer_.prepare( read_buffer_.commit(boost::asio::buffer_copy(read_buffer_.prepare(
boost::asio::buffer_size(buffer)), buffer)); boost::asio::buffer_size(buffer)), buffer));
} }
@@ -572,78 +621,6 @@ PeerImp::send_endpoints (FwdIt first, FwdIt last)
// DEPRECATED // DEPRECATED
const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15);
//------------------------------------------------------------------------------
// to_string should not be used we should just use lexical_cast maybe
inline
std::string
to_string (PeerImp const& peer)
{
if (peer.isInCluster())
return peer.getClusterNodeName();
return peer.getRemoteAddress().to_string();
}
inline
std::string
to_string (PeerImp const* peer)
{
return to_string (*peer);
}
inline
std::ostream&
operator<< (std::ostream& os, PeerImp const& peer)
{
os << to_string (peer);
return os;
}
inline
std::ostream&
operator<< (std::ostream& os, PeerImp const* peer)
{
os << to_string (peer);
return os;
}
//------------------------------------------------------------------------------
inline
std::string
to_string (Peer const& peer)
{
if (peer.isInCluster())
return peer.getClusterNodeName();
return peer.getRemoteAddress().to_string();
}
inline
std::string
to_string (Peer const* peer)
{
return to_string (*peer);
}
inline
std::ostream&
operator<< (std::ostream& os, Peer const& peer)
{
os << to_string (peer);
return os;
}
inline
std::ostream&
operator<< (std::ostream& os, Peer const* peer)
{
os << to_string (peer);
return os;
}
} }
#endif #endif