diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index cfb2845ae0..a39c468ff2 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -38,7 +38,10 @@ PeerImp::PeerImp (std::unique_ptr&& ssl_bundle, OverlayImpl& overlay, Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot) : Child (overlay) - , journal_ (deprecatedLogs().journal("Peer")) + , sink_(deprecatedLogs().journal("Peer")) + , p_sink_(deprecatedLogs().journal("Protocol")) + , journal_ (sink_) + , p_journal_(p_sink_) , ssl_bundle_(std::move(ssl_bundle)) , socket_ (ssl_bundle_->socket) , stream_ (ssl_bundle_->stream) @@ -53,6 +56,7 @@ PeerImp::PeerImp (std::unique_ptr&& ssl_bundle, , slot_ (slot) , message_stream_(*this) { + setPrefix(); } PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, @@ -61,7 +65,10 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, PeerFinder::Slot::ptr const& slot, std::shared_ptr const& context) : Child (overlay) - , journal_ (deprecatedLogs().journal("Peer")) + , sink_(deprecatedLogs().journal("Peer")) + , p_sink_(deprecatedLogs().journal("Protocol")) + , journal_ (sink_) + , p_journal_(p_sink_) , ssl_bundle_(std::make_unique( context, io_service)) , socket_ (ssl_bundle_->socket) @@ -77,6 +84,7 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, , slot_ (slot) , message_stream_(*this) { + setPrefix(); } PeerImp::~PeerImp () @@ -106,7 +114,7 @@ PeerImp::start() void PeerImp::close() { - detach("close", false); + detach("close"); } //------------------------------------------------------------------------------ @@ -223,7 +231,7 @@ PeerImp::json() default: // FIXME: do we really want this? - journal_.warning << + p_journal_.warning << "Unknown status: " << last_status_.newstatus (); } } @@ -300,6 +308,17 @@ PeerImp::hasRange (std::uint32_t uMin, std::uint32_t uMax) //------------------------------------------------------------------------------ +// Puts a string at the beginning of each string logged to the journal +void +PeerImp::setPrefix() +{ + static std::atomic id; + std::stringstream ss; + ss << "[" << std::setfill('0') << std::setw(3) << ++id << "] "; + sink_.prefix(ss.str()); + p_sink_.prefix(ss.str()); +} + /* Completion handlers for client role. Logic steps: 1. Establish outgoing connection @@ -935,8 +954,7 @@ PeerImp::error_code PeerImp::on_message (std::shared_ptr const& m) { error_code ec; - - timer_.cancel (); + timer_.cancel(ec); std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ()); std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds); @@ -962,14 +980,12 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->nettime () > maxTime) { journal_.info << - "Hello: Clock for " << *this << - " is off by +" << m->nettime () - ourTime; + "Hello: Clock off by +" << m->nettime () - ourTime; } else if (m->nettime () < minTime) { journal_.info << - "Hello: Clock for " << *this << - " is off by -" << ourTime - m->nettime (); + "Hello: Clock off by -" << ourTime - m->nettime (); } } else if (m->protoversionmin () > to_packed (BuildInfo::getCurrentProtocol())) @@ -1159,13 +1175,13 @@ PeerImp::on_message (std::shared_ptr const& m) jtPROOFWORK, "recvProof->doProof", std::bind (&PeerImp::doProofOfWork, std::placeholders::_1, - std::weak_ptr (shared_from_this ()), pow)); + std::weak_ptr (shared_from_this ()), pow)); #endif return ec; } - journal_.info << "Received in valid proof of work object from peer"; + p_journal_.info << "Bad proof of work"; return ec; } @@ -1322,8 +1338,8 @@ PeerImp::on_message (std::shared_ptr const& m) return ec; } - journal_.debug << - "Got transaction from peer " << *this << ": " << txID; + p_journal_.debug << + "Got tx " << txID; if (clusterNode_) { @@ -1343,21 +1359,21 @@ PeerImp::on_message (std::shared_ptr const& m) } if (getApp().getJobQueue().getJobCount(jtTRANSACTION) > 100) - journal_.info << "Transaction queue is full"; + p_journal_.info << "Transaction queue is full"; else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240) - journal_.trace << "No new transactions until synchronized"; + p_journal_.trace << "No new transactions until synchronized"; else getApp().getJobQueue ().addJob (jtTRANSACTION, "recvTransaction->checkTransaction", std::bind ( &PeerImp::checkTransaction, std::placeholders::_1, flags, stx, - std::weak_ptr (shared_from_this ()))); + std::weak_ptr (shared_from_this ()))); } catch (...) { - journal_.warning << "Transaction invalid: " << + p_journal_.warning << "Transaction invalid: " << s.getHex(); } return ec; @@ -1381,7 +1397,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->nodes ().size () <= 0) { - journal_.warning << "Ledger/TXset data with no nodes"; + p_journal_.warning << "Ledger/TXset data with no nodes"; return ec; } @@ -1397,7 +1413,7 @@ PeerImp::on_message (std::shared_ptr const& m) } else { - journal_.info << "Unable to route TX/ledger data reply"; + p_journal_.info << "Unable to route TX/ledger data reply"; charge (Resource::feeUnwantedData); } @@ -1408,7 +1424,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->ledgerhash ().size () != 32) { - journal_.warning << "TX candidate reply with invalid hash size"; + p_journal_.warning << "TX candidate reply with invalid hash size"; charge (Resource::feeInvalidRequest); return ec; } @@ -1421,8 +1437,8 @@ PeerImp::on_message (std::shared_ptr const& m) getApp().getJobQueue().addJob (jtTXN_DATA, "recvPeerData", std::bind (&PeerImp::peerTXData, std::placeholders::_1, - std::weak_ptr (shared_from_this ()), - hash, m, journal_)); + std::weak_ptr (shared_from_this ()), + hash, m, p_journal_)); return ec; } @@ -1430,7 +1446,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (!getApp().getInboundLedgers ().gotLedgerData ( hash, shared_from_this(), m)) { - journal_.trace << "Got data for unwanted ledger"; + p_journal_.trace << "Got data for unwanted ledger"; charge (Resource::feeUnwantedData); } return ec; @@ -1456,14 +1472,14 @@ PeerImp::on_message (std::shared_ptr const& m) (set.signature ().size () > 128) ) { - journal_.warning << "Received proposal is malformed"; + p_journal_.warning << "Proposal: malformed"; charge (Resource::feeInvalidSignature); return ec; } if (set.has_previousledger () && (set.previousledger ().size () != 32)) { - journal_.warning << "Received proposal is malformed"; + p_journal_.warning << "Proposal: malformed"; charge (Resource::feeInvalidRequest); return ec; } @@ -1482,8 +1498,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (! getApp().getHashRouter ().addSuppressionPeer ( suppression, shortId_)) { - journal_.trace << - "Received duplicate proposal from peer " << shortId_; + p_journal_.trace << "Proposal: duplicate"; return ec; } @@ -1492,21 +1507,19 @@ PeerImp::on_message (std::shared_ptr const& m) if (signerPublic == getConfig ().VALIDATION_PUB) { - journal_.trace << - "Received our own proposal from peer " << shortId_; + p_journal_.trace << "Proposal: self"; return ec; } bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic); if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ()) { - journal_.debug << "Dropping UNTRUSTED proposal due to load"; + p_journal_.debug << "Proposal: Dropping UNTRUSTED (load)"; return ec; } - journal_.trace << - "Received " << (isTrusted ? "trusted" : "UNTRUSTED") << - " proposal from " << shortId_; + p_journal_.trace << + "Proposal: " << (isTrusted ? "trusted" : "UNTRUSTED"); uint256 consensusLCL; @@ -1524,8 +1537,8 @@ PeerImp::on_message (std::shared_ptr const& m) "recvPropose->checkPropose", std::bind ( &PeerImp::checkPropose, std::placeholders::_1, &overlay_, m, proposal, consensusLCL, publicKey_, - std::weak_ptr (shared_from_this ()), clusterNode_, - journal_)); + std::weak_ptr (shared_from_this ()), clusterNode_, + p_journal_)); return ec; } @@ -1533,8 +1546,7 @@ PeerImp::error_code PeerImp::on_message (std::shared_ptr const& m) { error_code ec; - journal_.trace << "Received status change from peer " << - to_string (this); + p_journal_.trace << "Status: Change"; if (!m->has_networktime ()) m->set_networktime (getApp().getOPs ().getNetworkTimeNC ()); @@ -1553,7 +1565,7 @@ PeerImp::on_message (std::shared_ptr const& m) { if (!closedLedgerHash_.isZero ()) { - journal_.trace << "peer has lost sync " << to_string (this); + p_journal_.trace << "Status: Out of sync"; closedLedgerHash_.zero (); } @@ -1566,12 +1578,11 @@ PeerImp::on_message (std::shared_ptr const& m) // a peer has changed ledgers memcpy (closedLedgerHash_.begin (), m->ledgerhash ().data (), 256 / 8); addLedger (closedLedgerHash_); - journal_.trace << "peer LCL is " << closedLedgerHash_ << - " " << to_string (this); + p_journal_.trace << "LCL is " << closedLedgerHash_; } else { - journal_.trace << "peer has no ledger hash" << to_string (this); + p_journal_.trace << "Status: No ledger"; closedLedgerHash_.zero (); } @@ -1638,7 +1649,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->validation ().size () < 50) { - journal_.warning << "Too small validation from peer"; + p_journal_.warning << "Validation: Too small"; charge (Resource::feeInvalidRequest); return ec; } @@ -1652,7 +1663,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (closeTime > (120 + val->getFieldU32(sfSigningTime))) { - journal_.trace << "Validation is more than two minutes old"; + p_journal_.trace << "Validation: Too old"; charge (Resource::feeUnwantedData); return ec; } @@ -1660,7 +1671,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (! getApp().getHashRouter ().addSuppressionPeer ( s.getSHA512Half(), shortId_)) { - journal_.trace << "Validation is duplicate"; + p_journal_.trace << "Validation: duplicate"; return ec; } @@ -1672,19 +1683,25 @@ PeerImp::on_message (std::shared_ptr const& m) "recvValidation->checkValidation", std::bind (&PeerImp::checkValidation, std::placeholders::_1, &overlay_, val, isTrusted, clusterNode_, m, - std::weak_ptr (shared_from_this ()), - journal_)); + std::weak_ptr (shared_from_this ()), + p_journal_)); } else { - journal_.debug << - "Dropping UNTRUSTED validation due to load"; + p_journal_.debug << + "Validation: Dropping UNTRUSTED (load)"; } } + catch (std::exception const& e) + { + p_journal_.warning << + "Validation: Exception, " << e.what(); + charge (Resource::feeInvalidRequest); + } catch (...) { - journal_.warning << - "Exception processing validation"; + p_journal_.warning << + "Validation: Unknown exception"; charge (Resource::feeInvalidRequest); } @@ -1748,9 +1765,9 @@ PeerImp::on_message (std::shared_ptr const& m) } } - journal_.trace << "GetObjByHash had " << reply.objects_size () << - " of " << packet.objects_size () << - " for " << to_string (this); + p_journal_.trace << + "GetObj: " << reply.objects_size () << + " of " << packet.objects_size (); send (std::make_shared (reply, protocol::mtGET_OBJECTS)); } else @@ -1771,16 +1788,16 @@ PeerImp::on_message (std::shared_ptr const& m) if (obj.ledgerseq () != pLSeq) { if ((pLDo && (pLSeq != 0)) && - journal_.active(beast::Journal::Severity::kDebug)) - journal_.debug << - "Received full fetch pack for " << pLSeq; + p_journal_.active(beast::Journal::Severity::kDebug)) + p_journal_.debug << + "GetObj: Full fetch pack for " << pLSeq; pLSeq = obj.ledgerseq (); pLDo = !getApp().getOPs ().haveLedger (pLSeq); if (!pLDo) - journal_.debug << - "Got pack for " << pLSeq << " too late"; + p_journal_.debug << + "GetObj: Late fetch pack for " << pLSeq; else progress = true; } @@ -1801,8 +1818,8 @@ PeerImp::on_message (std::shared_ptr const& m) } if ((pLDo && (pLSeq != 0)) && - journal_.active(beast::Journal::Severity::kDebug)) - journal_.debug << "Received partial fetch pack for " << pLSeq; + p_journal_.active(beast::Journal::Severity::kDebug)) + p_journal_.debug << "GetObj: Partial fetch pack for " << pLSeq; if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK) getApp().getOPs ().gotFetchPack (progress, pLSeq); @@ -1828,13 +1845,13 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.itype () == protocol::liTS_CANDIDATE) { // Request is for a transaction candidate set - journal_.trace << "Received request for TX candidate set data " - << to_string (this); + p_journal_.trace << + "GetLedger: Tx candidate set"; if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32)) { charge (Resource::feeInvalidRequest); - journal_.warning << "invalid request for TX candidate set data"; + p_journal_.warning << "GetLedger: Tx candidate set invalid"; return; } @@ -1850,7 +1867,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) { if (packet.has_querytype () && !packet.has_requestcookie ()) { - journal_.debug << "Trying to route TX set request"; + p_journal_.debug << + "GetLedger: Routing Tx set request"; struct get_usable_peers { @@ -1881,7 +1899,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (usablePeers.empty ()) { - journal_.info << "Unable to route TX set request"; + p_journal_.info << + "GetLedger: Route TX set failed"; return; } @@ -1893,9 +1912,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) return; } - journal_.error << "We do not have the map our peer wants " - << to_string (this); - + p_journal_.error << + "GetLedger: Can't provide map "; charge (Resource::feeInvalidRequest); return; } @@ -1910,13 +1928,14 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) { if (getApp().getFeeTrack().isLoadedLocal() && !clusterNode_) { - journal_.debug << "Too busy to fetch ledger data"; + p_journal_.debug << + "GetLedger: Too busy"; return; } // Figure out what ledger they want - journal_.trace << "Received request for ledger data " - << to_string (this); + p_journal_.trace << + "GetLedger: Received"; Ledger::pointer ledger; if (packet.has_ledgerhash ()) @@ -1926,7 +1945,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.ledgerhash ().size () != 32) { charge (Resource::feeInvalidRequest); - journal_.warning << "Invalid request"; + p_journal_.warning << + "GetLedger: Invalid request"; return; } @@ -1935,8 +1955,9 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) logMe += to_string (ledgerhash); ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash); - if (!ledger && journal_.trace) - journal_.trace << "Don't have ledger " << ledgerhash; + if (!ledger && p_journal_.trace) + p_journal_.trace << + "GetLedger: Don't have " << ledgerhash; if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ())) @@ -1956,7 +1977,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (usablePeers.empty ()) { - journal_.trace << "Unable to route ledger request"; + p_journal_.trace << + "GetLedger: Cannot route"; return; } @@ -1965,7 +1987,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) packet.set_requestcookie (getShortId ()); selectedPeer->send ( std::make_shared (packet, protocol::mtGET_LEDGER)); - journal_.debug << "Ledger request routed"; + p_journal_.debug << + "GetLedger: Request routed"; return; } } @@ -1974,13 +1997,15 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch()) { - journal_.debug << "Peer requests early ledger"; + p_journal_.debug << + "GetLedger: Early ledger request"; return; } ledger = getApp().getLedgerMaster ().getLedgerBySeq ( packet.ledgerseq ()); - if (!ledger && journal_.debug) - journal_.debug << "Don't have ledger " << packet.ledgerseq (); + if (!ledger && p_journal_.debug) + p_journal_.debug << + "GetLedger: Don't have " << packet.ledgerseq (); } else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT)) { @@ -1997,7 +2022,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) else { charge (Resource::feeInvalidRequest); - journal_.warning << "Can't figure out what ledger they want"; + p_journal_.warning << + "GetLedger: Unknown request"; return; } @@ -2006,8 +2032,9 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) { charge (Resource::feeInvalidRequest); - if (journal_.warning && ledger) - journal_.warning << "Ledger has wrong sequence"; + if (p_journal_.warning && ledger) + p_journal_.warning << + "GetLedger: Invalid sequence"; return; } @@ -2015,7 +2042,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < getApp().getLedgerMaster().getEarliestFetch())) { - journal_.debug << "Peer requests early ledger"; + p_journal_.debug << + "GetLedger: Early ledger request"; return; } @@ -2028,7 +2056,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.itype () == protocol::liBASE) { // they want the ledger base data - journal_.trace << "They want ledger base data"; + p_journal_.trace << + "GetLedger: Base data"; Serializer nData (128); ledger->addRaw (nData); reply.add_nodes ()->set_nodedata ( @@ -2085,12 +2114,14 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!map || (packet.nodeids_size () == 0)) { - journal_.warning << "Can't find map or empty request"; + p_journal_.warning << + "GetLedger: Can't find map or empty request"; charge (Resource::feeInvalidRequest); return; } - journal_.trace << "Request: " << logMe; + p_journal_.trace << + "GetLeder: " << logMe; for (int i = 0; i < packet.nodeids ().size (); ++i) { @@ -2098,7 +2129,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!mn.isValid ()) { - journal_.warning << "Request for invalid node: " << logMe; + p_journal_.warning << + "GetLedger: Invalid node " << logMe; charge (Resource::feeInvalidRequest); return; } @@ -2111,8 +2143,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) { assert (nodeIDs.size () == rawNodes.size ()); - journal_.trace << - "getNodeFat got " << rawNodes.size () << " nodes"; + p_journal_.trace << + "GetLedger: getNodeFat got " << rawNodes.size () << " nodes"; std::vector::iterator nodeIDIterator; std::list< Blob >::iterator rawNodeIterator; @@ -2130,7 +2162,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) } } else - journal_.warning << "getNodeFat returns false"; + p_journal_.warning << + "GetLedger: getNodeFat returns false"; } catch (std::exception&) { @@ -2148,7 +2181,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!packet.has_ledgerhash ()) info += ", no hash specified"; - journal_.warning << + p_journal_.warning << "getNodeFat( " << mn << ") throws exception: " << info; } } @@ -2161,12 +2194,12 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) //------------------------------------------------------------------------------ void -PeerImp::detach (const char* rsn, bool graceful) +PeerImp::detach (const char* rsn) { if (! strand_.running_in_this_thread ()) { strand_.post (std::bind (&PeerImp::detach, - shared_from_this (), rsn, graceful)); + shared_from_this (), rsn)); return; } @@ -2183,25 +2216,14 @@ PeerImp::detach (const char* rsn, bool graceful) state_ = stateGracefulClose; - if (clusterNode_ && journal_.active(beast::Journal::Severity::kWarning)) - journal_.warning << "Cluster peer " << name_ << - " detached: " << rsn; + if (clusterNode_ && p_journal_.active(beast::Journal::Severity::kWarning)) + journal_.warning << + name_ << " left cluster: " << rsn; send_queue_.clear (); - - (void) timer_.cancel (); - - if (graceful) - { - stream_.async_shutdown (strand_.wrap (std::bind( - &PeerImp::handleShutdown, shared_from_this(), - beast::asio::placeholders::error))); - } - else - { - error_code ec; - stream_.next_layer().cancel(ec); - } + error_code ec; + timer_.cancel(ec); + stream_.next_layer().close(ec); // VFALCO TODO Stop doing this. if (publicKey_.isValid ()) @@ -2226,7 +2248,7 @@ PeerImp::sendGetPeers () } void -PeerImp::charge (std::weak_ptr & peer, Resource::Charge const& fee) +PeerImp::charge (std::weak_ptr & peer, Resource::Charge const& fee) { Peer::ptr p (peer.lock()); @@ -2401,13 +2423,13 @@ PeerImp::doFetchPack (const std::shared_ptr& packet (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) || (getApp().getJobQueue().getJobCount(jtPACK) > 10)) { - journal_.info << "Too busy to make fetch pack"; + p_journal_.info << "Too busy to make fetch pack"; return; } if (packet->ledgerhash ().size () != 32) { - journal_.warning << "FetchPack hash size malformed"; + p_journal_.warning << "FetchPack hash size malformed"; charge (Resource::feeInvalidRequest); return; } @@ -2417,12 +2439,12 @@ PeerImp::doFetchPack (const std::shared_ptr& packet getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), - std::placeholders::_1, std::weak_ptr (shared_from_this ()), + std::placeholders::_1, std::weak_ptr (shared_from_this ()), packet, hash, UptimeTimer::getInstance ().getElapsedSeconds ())); } void -PeerImp::doProofOfWork (Job&, std::weak_ptr peer, +PeerImp::doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow) { if (peer.expired ()) @@ -2432,7 +2454,7 @@ PeerImp::doProofOfWork (Job&, std::weak_ptr peer, if (solution.isZero ()) { - journal_.warning << "Failed to solve proof of work"; + p_journal_.warning << "Failed to solve proof of work"; } else { @@ -2455,7 +2477,7 @@ PeerImp::doProofOfWork (Job&, std::weak_ptr peer, void PeerImp::checkTransaction (Job&, int flags, - SerializedTransaction::pointer stx, std::weak_ptr peer) + SerializedTransaction::pointer stx, std::weak_ptr peer) { // VFALCO TODO Rewrite to not use exceptions try @@ -2497,7 +2519,7 @@ void PeerImp::checkPropose (Job& job, Overlay* pPeers, std::shared_ptr packet, LedgerProposal::pointer proposal, uint256 consensusLCL, - RippleAddress nodePublic, std::weak_ptr peer, + RippleAddress nodePublic, std::weak_ptr peer, bool fromCluster, beast::Journal journal) { bool sigGood = false; @@ -2522,7 +2544,7 @@ PeerImp::checkPropose (Job& job, Overlay* pPeers, { Peer::ptr p = peer.lock (); journal.warning << - "proposal with previous ledger fails sig check: " << *p; + "Proposal with previous ledger fails sig check"; charge (peer, Resource::feeInvalidSignature); return; } @@ -2576,7 +2598,7 @@ void PeerImp::checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster, std::shared_ptr packet, - std::weak_ptr peer, beast::Journal journal) + std::weak_ptr peer, beast::Journal journal) { try { @@ -2590,10 +2612,10 @@ PeerImp::checkValidation (Job&, Overlay* pPeers, } std::string source; - Peer::ptr lp = peer.lock (); + PeerImp::ptr lp = peer.lock (); if (lp) - source = to_string(*lp); + source = std::to_string(lp->getShortId()); else source = "unknown"; @@ -2641,10 +2663,10 @@ PeerImp::sGetLedger (std::weak_ptr wPeer, // VFALCO TODO Make this non-static void -PeerImp::peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, +PeerImp::peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, std::shared_ptr pPacket, beast::Journal journal) { - std::shared_ptr peer = wPeer.lock (); + std::shared_ptr peer = wPeer.lock (); if (!peer) return; diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 0127e985a9..622d11b07f 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -45,20 +45,6 @@ namespace ripple { class PeerImp; -std::string to_string (Peer const& peer); -std::ostream& operator<< (std::ostream& os, Peer const& peer); - -std::string to_string (Peer const* peer); -std::ostream& operator<< (std::ostream& os, Peer const* peer); - -std::string to_string (PeerImp const& peer); -std::ostream& operator<< (std::ostream& os, PeerImp const& peer); - -std::string to_string (PeerImp const* peer); -std::ostream& operator<< (std::ostream& os, PeerImp const* peer); - -//------------------------------------------------------------------------------ - class PeerImp : public Peer , public std::enable_shared_from_this @@ -89,6 +75,66 @@ public: typedef std::shared_ptr ptr; private: + /** Wraps a Journal::Sink to prefix it's output. */ + class WrappedSink : public beast::Journal::Sink + { + private: + std::string prefix_; + beast::Journal::Sink& sink_; + + public: + explicit + WrappedSink (beast::Journal::Sink& sink) + : sink_ (sink) + { + } + + explicit + WrappedSink (beast::Journal const& journal) + : sink_ (journal.sink()) + { + } + + void prefix (std::string const& s) + { + prefix_ = s; + } + + bool + active (beast::Journal::Severity level) const override + { + return sink_.active (level); + } + + bool + console () const override + { + return sink_.console (); + } + + void console (bool output) override + { + sink_.console (output); + } + + beast::Journal::Severity + severity() const + { + return sink_.severity(); + } + + void severity (beast::Journal::Severity level) + { + sink_.severity (level); + } + + void write (beast::Journal::Severity level, std::string const& text) + { + using beast::Journal; + sink_.write (level, prefix_ + text); + } + }; + using clock_type = std::chrono::steady_clock; using error_code= boost::system::error_code ; using yield_context = boost::asio::yield_context; @@ -104,7 +150,10 @@ private: // The length of the smallest valid finished message static const size_t sslMinimumFinishedLength = 12; + WrappedSink sink_; + WrappedSink p_sink_; beast::Journal journal_; + beast::Journal p_journal_; std::unique_ptr ssl_bundle_; socket_type& socket_; stream_type& stream_; @@ -286,6 +335,9 @@ public: hasRange (std::uint32_t uMin, std::uint32_t uMax) override; private: + void + setPrefix(); + // // client role // @@ -413,23 +465,16 @@ private: //-------------------------------------------------------------------------- - /** Disconnect a peer - - The peer transitions from its current state into `stateGracefulClose` - - @param rsn a code indicating why the peer was disconnected - @param onIOStrand true if called on an I/O strand. It if is not, then - a callback will be queued up. - */ + // DEPRECATED Close the socket void - detach (const char* rsn, bool graceful = true); + detach (const char* rsn); void sendGetPeers (); static void - charge (std::weak_ptr & peer, Resource::Charge const& fee); + charge (std::weak_ptr & peer, Resource::Charge const& fee); void sendForce (const Message::pointer& packet); @@ -482,11 +527,11 @@ private: doFetchPack (const std::shared_ptr& packet); void - doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow); + doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow); static void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx, - std::weak_ptr peer); + std::weak_ptr peer); // Called from our JobQueue static @@ -494,7 +539,7 @@ private: checkPropose (Job& job, Overlay* pPeers, std::shared_ptr packet, LedgerProposal::pointer proposal, uint256 consensusLCL, - RippleAddress nodePublic, std::weak_ptr peer, + RippleAddress nodePublic, std::weak_ptr peer, bool fromCluster, beast::Journal journal); static @@ -502,7 +547,7 @@ private: checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster, std::shared_ptr packet, - std::weak_ptr peer, beast::Journal journal); + std::weak_ptr peer, beast::Journal journal); static void @@ -512,7 +557,7 @@ private: /** Called when we receive tx set data. */ static void - peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, + peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, std::shared_ptr pPacket, beast::Journal journal); }; @@ -526,7 +571,10 @@ PeerImp::PeerImp (std::unique_ptr&& ssl_bundle, PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot) : Child (overlay) - , journal_ (deprecatedLogs().journal("Peer")) + , sink_(deprecatedLogs().journal("Peer")) + , p_sink_(deprecatedLogs().journal("Protocol")) + , journal_ (sink_) + , p_journal_(p_sink_) , ssl_bundle_(std::move(ssl_bundle)) , socket_ (ssl_bundle_->socket) , stream_ (ssl_bundle_->stream) @@ -541,6 +589,7 @@ PeerImp::PeerImp (std::unique_ptr&& ssl_bundle, , slot_ (slot) , message_stream_(*this) { + setPrefix(); read_buffer_.commit(boost::asio::buffer_copy(read_buffer_.prepare( boost::asio::buffer_size(buffer)), buffer)); } @@ -572,78 +621,6 @@ PeerImp::send_endpoints (FwdIt first, FwdIt last) // DEPRECATED const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); -//------------------------------------------------------------------------------ - -// to_string should not be used we should just use lexical_cast maybe - -inline -std::string -to_string (PeerImp const& peer) -{ - if (peer.isInCluster()) - return peer.getClusterNodeName(); - - return peer.getRemoteAddress().to_string(); -} - -inline -std::string -to_string (PeerImp const* peer) -{ - return to_string (*peer); -} - -inline -std::ostream& -operator<< (std::ostream& os, PeerImp const& peer) -{ - os << to_string (peer); - - return os; -} - -inline -std::ostream& -operator<< (std::ostream& os, PeerImp const* peer) -{ - os << to_string (peer); - return os; -} - -//------------------------------------------------------------------------------ - -inline -std::string -to_string (Peer const& peer) -{ - if (peer.isInCluster()) - return peer.getClusterNodeName(); - return peer.getRemoteAddress().to_string(); -} - -inline -std::string -to_string (Peer const* peer) -{ - return to_string (*peer); -} - -inline -std::ostream& -operator<< (std::ostream& os, Peer const& peer) -{ - os << to_string (peer); - return os; -} - -inline -std::ostream& -operator<< (std::ostream& os, Peer const* peer) -{ - os << to_string (peer); - return os; -} - } #endif