diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 9213f98a0b..7bf42c2ba5 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -3105,6 +3105,8 @@ + + @@ -3121,6 +3123,9 @@ + + True + diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 846d48ee56..8239a22f42 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -463,6 +463,9 @@ {07E4BC73-2B68-D0D1-D922-FEBBB573F503} + + {630E81FA-2122-38EA-81BD-636140BF270C} + {186385AD-A056-FA3A-7E0E-759EB55E9EAB} @@ -4281,6 +4284,9 @@ ripple\overlay\impl + + ripple\overlay\impl + ripple\overlay\impl @@ -4305,6 +4311,9 @@ ripple\overlay + + ripple\overlay\tests + ripple\peerfinder\api diff --git a/src/BeastConfig.h b/src/BeastConfig.h index 1b0a014a6b..bf71a0bd1e 100644 --- a/src/BeastConfig.h +++ b/src/BeastConfig.h @@ -220,11 +220,17 @@ #define RIPPLE_SINGLE_IO_SERVICE_THREAD 0 #endif -/** Config: RIPPLE_STRUCTURED_OVERLAY - Enables Structured Overlay support (unfinished) +/** Config: RIPPLE_STRUCTURED_OVERLAY_CLIENT + RIPPLE_STRUCTURED_OVERLAY_SERVER + Enables Structured Overlay support for the client or server roles. + This feature is currently in development: + https://ripplelabs.atlassian.net/browse/RIPD-157 */ -#ifndef RIPPLE_STRUCTURED_OVERLAY -#define RIPPLE_STRUCTURED_OVERLAY 0 +#ifndef RIPPLE_STRUCTURED_OVERLAY_CLIENT +#define RIPPLE_STRUCTURED_OVERLAY_CLIENT 0 +#endif +#ifndef RIPPLE_STRUCTURED_OVERLAY_SERVER +#define RIPPLE_STRUCTURED_OVERLAY_SERVER 1 #endif /** Config: RIPPLE_ASYNC_RPC_HANDLER diff --git a/src/ripple/overlay/README.md b/src/ripple/overlay/README.md index 779ea46356..9e597d6746 100644 --- a/src/ripple/overlay/README.md +++ b/src/ripple/overlay/README.md @@ -21,6 +21,39 @@ establishes, receives, and maintains connections to peers. Protocol messages are exchanged between peers and serialized using [_Google Protocol Buffers_][protocol_buffers]. +### Structure + +Each connection between peers is identified by its connection type, which +affects the behavior of message routing: + +* Leaf + +* Peer + +## Roles + +Depending on the type of connection desired, the peers will modify their +behavior according to certain roles: + +### Leaf or Superpeer + +A peer in the leaf role does not route messages. In the superpeer role, a +peer accepts incoming connections from other leaves and superpeers up to the +configured slot limit. It also routes messages. For a particular connection, +the choice of leaf or superpeer is mutually exclusive. However, a peer can +operate in both the leaf and superpeer role for different connections. One of +the requirements + +### Client Handler + +While not part of the responsibilities of the Overlay module, a peer +operating in the Client Handler role accepts incoming connections from clients +and services them through the JSON-RPC interface. A peer can operate in either +the leaf or superpeer roles while also operating as a client handler. + + + + ## Handshake To establish a protocol connection, a peer makes an outgoing TLS encrypted @@ -128,6 +161,13 @@ Protocol-Session-Cookie: 71ED064155FFADFA38782C5E0158CB26 This field must be present (TODO) +* _User Defined_ + + The rippled operator may specify additional, optional fields and values + through the configuration. These headers will be transmitted in the + corresponding request or response messages. + + --- [overlay_network]: http://en.wikipedia.org/wiki/Overlay_network diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 38af596e85..e8c0b6b39c 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -291,7 +291,7 @@ OverlayImpl::disconnect (PeerFinder::Slot::ptr const& slot, bool graceful) { if (m_journal.trace) m_journal.trace << "Disconnect " << slot->remote_endpoint () << - (graceful ? "gracefully" : ""); + (graceful ? " gracefully" : ""); std::lock_guard lock (m_mutex); diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index db71087315..0de0fbbe12 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -25,444 +25,282 @@ namespace ripple { //------------------------------------------------------------------------------ -/** A peer has sent us transaction set data */ -// VFALCO TODO Make this non-static -static void peerTXData (Job&, - std::weak_ptr wPeer, - uint256 const& hash, - std::shared_ptr pPacket, - beast::Journal journal) -{ - std::shared_ptr peer = wPeer.lock (); - if (!peer) - return; +// TODO Make these class members or something. - protocol::TMLedgerData& packet = *pPacket; - - std::list nodeIDs; - std::list< Blob > nodeData; - for (int i = 0; i < packet.nodes ().size (); ++i) - { - const protocol::TMLedgerNode& node = packet.nodes (i); - - if (!node.has_nodeid () || !node.has_nodedata () || (node.nodeid ().size () != 33)) - { - journal.warning << "LedgerData request with invalid node ID"; - peer->charge (Resource::feeInvalidRequest); - return; - } - - nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (), - static_cast(node.nodeid ().size ())}); - nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); - } - - SHAMapAddNode san; - { - Application::ScopedLockType lock (getApp ().getMasterLock ()); - - san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData); - } - - if (san.isInvalid ()) - { - peer->charge (Resource::feeUnwantedData); - } -} - -// VFALCO NOTE This function is way too big and cumbersome. -void -PeerImp::getLedger (protocol::TMGetLedger& packet) -{ - SHAMap::pointer map; - protocol::TMLedgerData reply; - bool fatLeaves = true, fatRoot = false; - - if (packet.has_requestcookie ()) - reply.set_requestcookie (packet.requestcookie ()); - - std::string logMe; - - if (packet.itype () == protocol::liTS_CANDIDATE) - { - // Request is for a transaction candidate set - m_journal.trace << "Received request for TX candidate set data " - << to_string (this); - - if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32)) - { - charge (Resource::feeInvalidRequest); - m_journal.warning << "invalid request for TX candidate set data"; - return; - } - - uint256 txHash; - memcpy (txHash.begin (), packet.ledgerhash ().data (), 32); - - { - Application::ScopedLockType lock (getApp ().getMasterLock ()); - map = getApp().getOPs ().getTXMap (txHash); - } - - if (!map) - { - if (packet.has_querytype () && !packet.has_requestcookie ()) - { - m_journal.debug << "Trying to route TX set request"; - - struct get_usable_peers - { - typedef Overlay::PeerSequence return_type; - - Overlay::PeerSequence usablePeers; - uint256 const& txHash; - Peer const* skip; - - get_usable_peers(uint256 const& hash, Peer const* s) - : txHash(hash), skip(s) - { } - - void operator() (Peer::ptr const& peer) - { - if (peer->hasTxSet (txHash) && (peer.get () != skip)) - usablePeers.push_back (peer); - } - - return_type operator() () - { - return usablePeers; - } - }; - - Overlay::PeerSequence usablePeers (m_overlay.foreach ( - get_usable_peers (txHash, this))); - - if (usablePeers.empty ()) - { - m_journal.info << "Unable to route TX set request"; - return; - } - - Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; - packet.set_requestcookie (getShortId ()); - selectedPeer->send ( - std::make_shared (packet, protocol::mtGET_LEDGER)); - return; - } - - m_journal.error << "We do not have the map our peer wants " - << to_string (this); - - charge (Resource::feeInvalidRequest); - return; - } - - reply.set_ledgerseq (0); - reply.set_ledgerhash (txHash.begin (), txHash.size ()); - reply.set_type (protocol::liTS_CANDIDATE); - fatLeaves = false; // We'll already have most transactions - fatRoot = true; // Save a pass - } - else - { - if (getApp().getFeeTrack().isLoadedLocal() && !m_clusterNode) - { - m_journal.debug << "Too busy to fetch ledger data"; - return; - } - - // Figure out what ledger they want - m_journal.trace << "Received request for ledger data " - << to_string (this); - Ledger::pointer ledger; - - if (packet.has_ledgerhash ()) - { - uint256 ledgerhash; - - if (packet.ledgerhash ().size () != 32) - { - charge (Resource::feeInvalidRequest); - m_journal.warning << "Invalid request"; - return; - } - - memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32); - logMe += "LedgerHash:"; - logMe += to_string (ledgerhash); - ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash); - - if (!ledger && m_journal.trace) - m_journal.trace << "Don't have ledger " << ledgerhash; - - if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ())) - { - std::uint32_t seq = 0; - - if (packet.has_ledgerseq ()) - seq = packet.ledgerseq (); - - Overlay::PeerSequence peerList = m_overlay.getActivePeers (); - Overlay::PeerSequence usablePeers; - BOOST_FOREACH (Peer::ptr const& peer, peerList) - { - if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this)) - usablePeers.push_back (peer); - } - - if (usablePeers.empty ()) - { - m_journal.trace << "Unable to route ledger request"; - return; - } - - Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; - packet.set_requestcookie (getShortId ()); - selectedPeer->send ( - std::make_shared (packet, protocol::mtGET_LEDGER)); - m_journal.debug << "Ledger request routed"; - return; - } - } - else if (packet.has_ledgerseq ()) - { - if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch()) - { - m_journal.debug << "Peer requests early ledger"; - return; - } - ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ()); - if (!ledger && m_journal.debug) - m_journal.debug << "Don't have ledger " << packet.ledgerseq (); - } - else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT)) - { - ledger = getApp().getLedgerMaster ().getCurrentLedger (); - } - else if (packet.has_ltype () && (packet.ltype () == protocol::ltCLOSED) ) - { - ledger = getApp().getLedgerMaster ().getClosedLedger (); - - if (ledger && !ledger->isClosed ()) - ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1); - } - else - { - charge (Resource::feeInvalidRequest); - m_journal.warning << "Can't figure out what ledger they want"; - return; - } - - if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ()))) - { - charge (Resource::feeInvalidRequest); - - if (m_journal.warning && ledger) - m_journal.warning << "Ledger has wrong sequence"; - - return; - } - - if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < getApp().getLedgerMaster().getEarliestFetch())) - { - m_journal.debug << "Peer requests early ledger"; - return; - } - - // Fill out the reply - uint256 lHash = ledger->getHash (); - reply.set_ledgerhash (lHash.begin (), lHash.size ()); - reply.set_ledgerseq (ledger->getLedgerSeq ()); - reply.set_type (packet.itype ()); - - if (packet.itype () == protocol::liBASE) - { - // they want the ledger base data - m_journal.trace << "They want ledger base data"; - Serializer nData (128); - ledger->addRaw (nData); - reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ()); - - SHAMap::pointer map = ledger->peekAccountStateMap (); - - if (map && map->getHash ().isNonZero ()) - { - // return account state root node if possible - Serializer rootNode (768); - - if (map->getRootNode (rootNode, snfWIRE)) - { - reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); - - if (ledger->getTransHash ().isNonZero ()) - { - map = ledger->peekTransactionMap (); - - if (map && map->getHash ().isNonZero ()) - { - rootNode.erase (); - - if (map->getRootNode (rootNode, snfWIRE)) - reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); - } - } - } - } - - Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); - send (oPacket); - return; - } - - if (packet.itype () == protocol::liTX_NODE) - { - map = ledger->peekTransactionMap (); - logMe += " TX:"; - logMe += to_string (map->getHash ()); - } - else if (packet.itype () == protocol::liAS_NODE) - { - map = ledger->peekAccountStateMap (); - logMe += " AS:"; - logMe += to_string (map->getHash ()); - } - } - - if (!map || (packet.nodeids_size () == 0)) - { - m_journal.warning << "Can't find map or empty request"; - charge (Resource::feeInvalidRequest); - return; - } - - m_journal.trace << "Request: " << logMe; - - for (int i = 0; i < packet.nodeids ().size (); ++i) - { - SHAMapNodeID mn (packet.nodeids (i).data (), packet.nodeids (i).size ()); - - if (!mn.isValid ()) - { - m_journal.warning << "Request for invalid node: " << logMe; - charge (Resource::feeInvalidRequest); - return; - } - - std::vector nodeIDs; - std::list< Blob > rawNodes; - - try - { - if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) - { - assert (nodeIDs.size () == rawNodes.size ()); - m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes"; - std::vector::iterator nodeIDIterator; - std::list< Blob >::iterator rawNodeIterator; - - for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin (); - nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator) - { - Serializer nID (33); - nodeIDIterator->addIDRaw (nID); - protocol::TMLedgerNode* node = reply.add_nodes (); - node->set_nodeid (nID.getDataPtr (), nID.getLength ()); - node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ()); - } - } - else - m_journal.warning << "getNodeFat returns false"; - } - catch (std::exception&) - { - std::string info; - - if (packet.itype () == protocol::liTS_CANDIDATE) - info = "TS candidate"; - else if (packet.itype () == protocol::liBASE) - info = "Ledger base"; - else if (packet.itype () == protocol::liTX_NODE) - info = "TX node"; - else if (packet.itype () == protocol::liAS_NODE) - info = "AS node"; - - if (!packet.has_ledgerhash ()) - info += ", no hash specified"; - - m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info; - } - } - - Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); - send (oPacket); -} - -// This is dispatched by the job queue static void sGetLedger (std::weak_ptr wPeer, - std::shared_ptr packet) -{ - std::shared_ptr peer = wPeer.lock (); + std::shared_ptr packet); - if (peer) - peer->getLedger (*packet); -} +static +void +peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, + std::shared_ptr pPacket, + beast::Journal journal); //------------------------------------------------------------------------------ -void -PeerImp::start_read() +//------------------------------------------------------------------------------ + +/* Completion handlers for client role. + Logic steps: + 1. Establish outgoing connection + 2. Perform SSL handshake + 3. Send HTTP request + 4. Receive HTTP response + 5. Enter protocol loop +*/ + +void PeerImp::do_connect () { -#if RIPPLE_STRUCTURED_OVERLAY - std::stringstream ss; + m_journal.info << "Connecting to " << m_remoteAddress; -/* - ss << - "HTTP/1.0 GET\n\n" - "\n\n" - ; -*/ - //basic_message m; - //m.request (beast::http::method_t::http_get, "/peer/leaf"); - //m.method(beast:: - ss << - "GET / HTTP/1.0\r\n" - "User-Agent: Ripple 0.26.0-rc1\r\n" - //"Connection: Upgrade\r\n" - //"Upgrade: Ripple/1.1\r\n" - "X-Connect-As: Leaf\r\n" - "X-Try-IPs: 192.168.0.1:51234,208.239.114.74:51234\r\n" - "Content-Length: 2\r\n" - "\r\n" - "xy" - ; + m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); - std::string const s (ss.str()); - boost::asio::buffer_copy (read_buffer_.prepare(s.size()), - boost::asio::buffer(s)); - read_buffer_.commit(s.size()); + if (m_usage.disconnect ()) + { + detach ("do_connect"); + return; + } + + boost::system::error_code ec; + timer_.expires_from_now (nodeVerifySeconds, ec); + timer_.async_wait (m_strand.wrap (std::bind (&PeerImp::handleVerifyTimer, + shared_from_this (), beast::asio::placeholders::error))); + if (ec) + { + m_journal.error << "Failed to set verify timer."; + detach ("do_connect"); + return; + } + + m_socket->next_layer ().async_connect ( + beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress), + m_strand.wrap (std::bind (&PeerImp::on_connect, + shared_from_this (), beast::asio::placeholders::error))); +} + +void +PeerImp::on_connect (error_code ec) +{ + if (m_detaching || ec == boost::asio::error::operation_aborted) + return; + + NativeSocketType::endpoint_type local_endpoint; + if (! ec) + local_endpoint = m_socket->this_layer < + NativeSocketType> ().local_endpoint (ec); + + if (ec) + { + m_journal.error << + "Connect to " << m_remoteAddress << + " failed: " << ec.message(); + detach ("hc"); + return; + } + + assert (m_state == stateConnecting); + m_state = stateConnected; + + m_peerFinder.on_connected (m_slot, + beast::IPAddressConversion::from_asio (local_endpoint)); + + m_socket->set_verify_mode (boost::asio::ssl::verify_none); + m_socket->async_handshake ( + boost::asio::ssl::stream_base::client, + m_strand.wrap (std::bind (&PeerImp::on_connect_ssl, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error))); +} + +beast::http::basic_message +PeerImp::make_request() +{ + assert (! m_inbound); + beast::http::basic_message m; + m.method (beast::http::method_t::http_get); + m.url ("/"); + m.version (1, 1); + m.headers.append ("User-Agent", BuildInfo::getFullVersionString()); + //m.headers.append ("Local-Address", m_socket-> + m.headers.append ("Remote-Address", m_remoteAddress.to_string()); + m.headers.append ("Upgrade", + std::string("Ripple/")+BuildInfo::getCurrentProtocol().toStdString()); + m.headers.append ("Connection", "Upgrade"); + m.headers.append ("Connect-As", "Leaf, Peer"); + m.headers.append ("Accept-Encoding", "identity, snappy"); + //m.headers.append ("X-Try-IPs", "192.168.0.1:51234"); + //m.headers.append ("X-Try-IPs", "208.239.114.74:51234"); + //m.headers.append ("A", "BC"); + //m.headers.append ("Content-Length", "0"); + return m; +} + +void +PeerImp::on_connect_ssl (error_code ec) +{ + if (m_detaching || ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_journal.info << + "on_connect_ssl: " << ec.message(); + detach("on_connect_ssl"); + return; + } + +#if RIPPLE_STRUCTURED_OVERLAY_CLIENT + beast::http::basic_message req (make_request()); + beast::http::xwrite (write_buffer_, req); + on_write_http_request (error_code(), 0); - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_detect, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); #else - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_protocol, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + do_protocol_start(); #endif } +// Called repeatedly with the http request data void -PeerImp::on_read_detect (error_code ec, std::size_t bytes_transferred) +PeerImp::on_write_http_request (error_code ec, std::size_t bytes_transferred) { - if (m_detaching) + if (m_detaching || ec == boost::asio::error::operation_aborted) return; - if (ec == boost::asio::error::operation_aborted) + if (ec) + { + m_journal.info << + "on_write_http_request: " << ec.message(); + detach("on_write_http_request"); + return; + } + + write_buffer_.consume (bytes_transferred); + + if (write_buffer_.size() == 0) + { + // done sending request, now read the response + http_message_ = boost::in_place (); + http_parser_ = boost::in_place (std::ref(*http_message_), false); + on_read_http_response (error_code(), 0); + return; + } + + m_socket->async_write_some (write_buffer_.data(), + m_strand.wrap (std::bind (&PeerImp::on_write_http_request, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +// Called repeatedly with the http response data +void +PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) +{ + if (m_detaching || ec == boost::asio::error::operation_aborted) + return; + + if (! ec) + { + read_buffer_.commit (bytes_transferred); + std::size_t bytes_consumed; + std::tie (ec, bytes_consumed) = http_parser_->write (read_buffer_.data()); + + if (! ec) + { + read_buffer_.consume (bytes_consumed); + if (http_parser_->complete()) + { + // + // TODO Apply response to connection state, then: + // - Go into protocol loop, or + // - Submit a new request (call on_write_http_request), or + // - Close the connection. + // + if (http_message_->status() != 200) + { + m_journal.info << + "HTTP Response: " << http_message_->reason() << + "(" << http_message_->status() << ")"; + detach("on_read_http_response"); + return; + } + do_protocol_start (); + return; + } + } + } + + if (ec) + { + m_journal.info << + "on_read_response: " << ec.message(); + detach("on_read_response"); + return; + } + + m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), + m_strand.wrap (std::bind (&PeerImp::on_read_http_response, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +//------------------------------------------------------------------------------ + +/* Completion handlers for server role. + Logic steps: + 1. Perform SSL handshake + 2. Detect HTTP request or protocol TMHello + 3. If HTTP request received, send HTTP response + 4. Enter protocol loop +*/ +void PeerImp::do_accept () +{ + m_journal.info << "Accepted " << m_remoteAddress; + + m_usage = m_resourceManager.newInboundEndpoint (m_remoteAddress); + if (m_usage.disconnect ()) + { + detach ("do_accept"); + return; + } + + m_socket->set_verify_mode (boost::asio::ssl::verify_none); + m_socket->async_handshake (boost::asio::ssl::stream_base::server, + m_strand.wrap (std::bind (&PeerImp::on_accept_ssl, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error))); +} + +void +PeerImp::on_accept_ssl (error_code ec) +{ + if (m_detaching || ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_journal.info << + "on_accept_ssl: " << ec.message(); + detach("on_accept_ssl"); + return; + } + +#if RIPPLE_STRUCTURED_OVERLAY_SERVER + on_read_http_detect (error_code(), 0); + +#else + do_protocol_start(); + +#endif +} + +// Called repeatedly with the initial bytes received on the connection +void +PeerImp::on_read_http_detect (error_code ec, std::size_t bytes_transferred) +{ + if (m_detaching || ec == boost::asio::error::operation_aborted) return; if (ec) @@ -475,34 +313,32 @@ PeerImp::on_read_detect (error_code ec, std::size_t bytes_transferred) read_buffer_.commit (bytes_transferred); peer_protocol_detector detector; - auto const is_peer_protocol (detector (read_buffer_.data())); + boost::tribool const is_peer_protocol (detector (read_buffer_.data())); if (is_peer_protocol) { - on_read_protocol (error_code(), 0); + do_protocol_start(); return; } else if (! is_peer_protocol) { http_message_ = boost::in_place (); http_parser_ = boost::in_place (std::ref(*http_message_), true); - on_read_http (error_code(), 0); + on_read_http_request (error_code(), 0); return; } m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_detect, + m_strand.wrap (std::bind (&PeerImp::on_read_http_detect, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } +// Called repeatedly with the http request data void -PeerImp::on_read_http (error_code ec, std::size_t bytes_transferred) +PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) { - if (m_detaching) - return; - - if (ec == boost::asio::error::operation_aborted) + if (m_detaching || ec == boost::asio::error::operation_aborted) return; if (! ec) @@ -515,9 +351,35 @@ PeerImp::on_read_http (error_code ec, std::size_t bytes_transferred) read_buffer_.consume (bytes_consumed); if (http_parser_->complete()) { + // // TODO Apply headers to connection state. - // TODO Send response if this is an incoming connection. - on_read_protocol (error_code(), 0); + // + if (http_message_->upgrade()) + { + std::stringstream ss; + ss << + "HTTP/1.1 200 OK\r\n" + "Server: " << BuildInfo::getFullVersionString() << "\r\n" + "Upgrade: Ripple/1.2\r\n" + "Connection: Upgrade\r\n" + "\r\n"; + beast::http::xwrite (write_buffer_, ss.str()); + on_write_http_response(error_code(), 0); + } + else + { + std::stringstream ss; + ss << + "HTTP/1.1 400 Bad Request\r\n" + "Server: " << BuildInfo::getFullVersionString() << "\r\n" + "\r\n" + "" + "400 Bad Request
" + "The server requires an Upgrade request." + ""; + beast::http::xwrite (write_buffer_, ss.str()); + on_write_http_response(error_code(), 0); + } return; } } @@ -526,24 +388,81 @@ PeerImp::on_read_http (error_code ec, std::size_t bytes_transferred) if (ec) { m_journal.info << - "on_read_some: " << ec.message(); - detach("on_read_some"); + "on_read_http_request: " << ec.message(); + detach("on_read_http_request"); return; } m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_http, + m_strand.wrap (std::bind (&PeerImp::on_read_http_request, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } +beast::http::basic_message +PeerImp::make_response (beast::http::basic_message const& req) +{ + beast::http::basic_message resp; + // Unimplemented + return resp; +} + +// Called repeatedly to send the bytes in the response +void +PeerImp::on_write_http_response (error_code ec, std::size_t bytes_transferred) +{ + if (m_detaching || ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_journal.info << + "on_write_http_response: " << ec.message(); + detach("on_write_http_response"); + return; + } + + write_buffer_.consume (bytes_transferred); + + if (write_buffer_.size() == 0) + { + do_protocol_start(); + return; + } + + m_socket->async_write_some (write_buffer_.data(), + m_strand.wrap (std::bind (&PeerImp::on_write_http_response, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +//------------------------------------------------------------------------------ + +// Protocol logic + +// We have an encrypted connection to the peer. +// Have it say who it is so we know to avoid redundant connections. +// Establish that it really who we are talking to by having it sign a +// connection detail. Also need to establish no man in the middle attack +// is in progress. +void +PeerImp::do_protocol_start () +{ + if (!sendHello ()) + { + m_journal.error << "Unable to send HELLO to " << m_remoteAddress; + detach ("hello"); + return; + } + + on_read_protocol (error_code(), 0); +} + +// Called repeatedly with protocol message data void PeerImp::on_read_protocol (error_code ec, std::size_t bytes_transferred) { - if (m_detaching) - return; - - if (ec == boost::asio::error::operation_aborted) + if (m_detaching || ec == boost::asio::error::operation_aborted) return; if (! ec) @@ -556,8 +475,8 @@ PeerImp::on_read_protocol (error_code ec, std::size_t bytes_transferred) if (ec) { m_journal.info << - "on_read_some: " << ec.message(); - detach("on_read_some"); + "on_read_protocol: " << ec.message(); + detach("on_read_protocol"); return; } @@ -567,6 +486,16 @@ PeerImp::on_read_protocol (error_code ec, std::size_t bytes_transferred) beast::asio::placeholders::bytes_transferred))); } +// Called repeatedly to send protcol message data +void +PeerImp::on_write_protocol (error_code ec, std::size_t bytes_transferred) +{ + // (this function isn't called yet) + + if (m_detaching || ec == boost::asio::error::operation_aborted) + return; +} + //------------------------------------------------------------------------------ // // abstract_protocol_handler @@ -628,7 +557,7 @@ PeerImp::on_message (std::shared_ptr const& m) bool bDetach (true); - m_timer.cancel (); + timer_.cancel (); std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ()); std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds); @@ -1450,4 +1379,388 @@ PeerImp::on_message (std::shared_ptr const& m) //------------------------------------------------------------------------------ +/** A peer has sent us transaction set data */ +// VFALCO TODO Make this non-static +static void peerTXData (Job&, + std::weak_ptr wPeer, + uint256 const& hash, + std::shared_ptr pPacket, + beast::Journal journal) +{ + std::shared_ptr peer = wPeer.lock (); + if (!peer) + return; + + protocol::TMLedgerData& packet = *pPacket; + + std::list nodeIDs; + std::list< Blob > nodeData; + for (int i = 0; i < packet.nodes ().size (); ++i) + { + const protocol::TMLedgerNode& node = packet.nodes (i); + + if (!node.has_nodeid () || !node.has_nodedata () || (node.nodeid ().size () != 33)) + { + journal.warning << "LedgerData request with invalid node ID"; + peer->charge (Resource::feeInvalidRequest); + return; + } + + nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (), + static_cast(node.nodeid ().size ())}); + nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); + } + + SHAMapAddNode san; + { + Application::ScopedLockType lock (getApp ().getMasterLock ()); + + san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData); + } + + if (san.isInvalid ()) + { + peer->charge (Resource::feeUnwantedData); + } +} + +// VFALCO NOTE This function is way too big and cumbersome. +void +PeerImp::getLedger (protocol::TMGetLedger& packet) +{ + SHAMap::pointer map; + protocol::TMLedgerData reply; + bool fatLeaves = true, fatRoot = false; + + if (packet.has_requestcookie ()) + reply.set_requestcookie (packet.requestcookie ()); + + std::string logMe; + + if (packet.itype () == protocol::liTS_CANDIDATE) + { + // Request is for a transaction candidate set + m_journal.trace << "Received request for TX candidate set data " + << to_string (this); + + if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32)) + { + charge (Resource::feeInvalidRequest); + m_journal.warning << "invalid request for TX candidate set data"; + return; + } + + uint256 txHash; + memcpy (txHash.begin (), packet.ledgerhash ().data (), 32); + + { + Application::ScopedLockType lock (getApp ().getMasterLock ()); + map = getApp().getOPs ().getTXMap (txHash); + } + + if (!map) + { + if (packet.has_querytype () && !packet.has_requestcookie ()) + { + m_journal.debug << "Trying to route TX set request"; + + struct get_usable_peers + { + typedef Overlay::PeerSequence return_type; + + Overlay::PeerSequence usablePeers; + uint256 const& txHash; + Peer const* skip; + + get_usable_peers(uint256 const& hash, Peer const* s) + : txHash(hash), skip(s) + { } + + void operator() (Peer::ptr const& peer) + { + if (peer->hasTxSet (txHash) && (peer.get () != skip)) + usablePeers.push_back (peer); + } + + return_type operator() () + { + return usablePeers; + } + }; + + Overlay::PeerSequence usablePeers (m_overlay.foreach ( + get_usable_peers (txHash, this))); + + if (usablePeers.empty ()) + { + m_journal.info << "Unable to route TX set request"; + return; + } + + Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; + packet.set_requestcookie (getShortId ()); + selectedPeer->send ( + std::make_shared (packet, protocol::mtGET_LEDGER)); + return; + } + + m_journal.error << "We do not have the map our peer wants " + << to_string (this); + + charge (Resource::feeInvalidRequest); + return; + } + + reply.set_ledgerseq (0); + reply.set_ledgerhash (txHash.begin (), txHash.size ()); + reply.set_type (protocol::liTS_CANDIDATE); + fatLeaves = false; // We'll already have most transactions + fatRoot = true; // Save a pass + } + else + { + if (getApp().getFeeTrack().isLoadedLocal() && !m_clusterNode) + { + m_journal.debug << "Too busy to fetch ledger data"; + return; + } + + // Figure out what ledger they want + m_journal.trace << "Received request for ledger data " + << to_string (this); + Ledger::pointer ledger; + + if (packet.has_ledgerhash ()) + { + uint256 ledgerhash; + + if (packet.ledgerhash ().size () != 32) + { + charge (Resource::feeInvalidRequest); + m_journal.warning << "Invalid request"; + return; + } + + memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32); + logMe += "LedgerHash:"; + logMe += to_string (ledgerhash); + ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash); + + if (!ledger && m_journal.trace) + m_journal.trace << "Don't have ledger " << ledgerhash; + + if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ())) + { + std::uint32_t seq = 0; + + if (packet.has_ledgerseq ()) + seq = packet.ledgerseq (); + + Overlay::PeerSequence peerList = m_overlay.getActivePeers (); + Overlay::PeerSequence usablePeers; + BOOST_FOREACH (Peer::ptr const& peer, peerList) + { + if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this)) + usablePeers.push_back (peer); + } + + if (usablePeers.empty ()) + { + m_journal.trace << "Unable to route ledger request"; + return; + } + + Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; + packet.set_requestcookie (getShortId ()); + selectedPeer->send ( + std::make_shared (packet, protocol::mtGET_LEDGER)); + m_journal.debug << "Ledger request routed"; + return; + } + } + else if (packet.has_ledgerseq ()) + { + if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch()) + { + m_journal.debug << "Peer requests early ledger"; + return; + } + ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ()); + if (!ledger && m_journal.debug) + m_journal.debug << "Don't have ledger " << packet.ledgerseq (); + } + else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT)) + { + ledger = getApp().getLedgerMaster ().getCurrentLedger (); + } + else if (packet.has_ltype () && (packet.ltype () == protocol::ltCLOSED) ) + { + ledger = getApp().getLedgerMaster ().getClosedLedger (); + + if (ledger && !ledger->isClosed ()) + ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1); + } + else + { + charge (Resource::feeInvalidRequest); + m_journal.warning << "Can't figure out what ledger they want"; + return; + } + + if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ()))) + { + charge (Resource::feeInvalidRequest); + + if (m_journal.warning && ledger) + m_journal.warning << "Ledger has wrong sequence"; + + return; + } + + if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < getApp().getLedgerMaster().getEarliestFetch())) + { + m_journal.debug << "Peer requests early ledger"; + return; + } + + // Fill out the reply + uint256 lHash = ledger->getHash (); + reply.set_ledgerhash (lHash.begin (), lHash.size ()); + reply.set_ledgerseq (ledger->getLedgerSeq ()); + reply.set_type (packet.itype ()); + + if (packet.itype () == protocol::liBASE) + { + // they want the ledger base data + m_journal.trace << "They want ledger base data"; + Serializer nData (128); + ledger->addRaw (nData); + reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ()); + + SHAMap::pointer map = ledger->peekAccountStateMap (); + + if (map && map->getHash ().isNonZero ()) + { + // return account state root node if possible + Serializer rootNode (768); + + if (map->getRootNode (rootNode, snfWIRE)) + { + reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); + + if (ledger->getTransHash ().isNonZero ()) + { + map = ledger->peekTransactionMap (); + + if (map && map->getHash ().isNonZero ()) + { + rootNode.erase (); + + if (map->getRootNode (rootNode, snfWIRE)) + reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); + } + } + } + } + + Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); + send (oPacket); + return; + } + + if (packet.itype () == protocol::liTX_NODE) + { + map = ledger->peekTransactionMap (); + logMe += " TX:"; + logMe += to_string (map->getHash ()); + } + else if (packet.itype () == protocol::liAS_NODE) + { + map = ledger->peekAccountStateMap (); + logMe += " AS:"; + logMe += to_string (map->getHash ()); + } + } + + if (!map || (packet.nodeids_size () == 0)) + { + m_journal.warning << "Can't find map or empty request"; + charge (Resource::feeInvalidRequest); + return; + } + + m_journal.trace << "Request: " << logMe; + + for (int i = 0; i < packet.nodeids ().size (); ++i) + { + SHAMapNodeID mn (packet.nodeids (i).data (), packet.nodeids (i).size ()); + + if (!mn.isValid ()) + { + m_journal.warning << "Request for invalid node: " << logMe; + charge (Resource::feeInvalidRequest); + return; + } + + std::vector nodeIDs; + std::list< Blob > rawNodes; + + try + { + if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) + { + assert (nodeIDs.size () == rawNodes.size ()); + m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes"; + std::vector::iterator nodeIDIterator; + std::list< Blob >::iterator rawNodeIterator; + + for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin (); + nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator) + { + Serializer nID (33); + nodeIDIterator->addIDRaw (nID); + protocol::TMLedgerNode* node = reply.add_nodes (); + node->set_nodeid (nID.getDataPtr (), nID.getLength ()); + node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ()); + } + } + else + m_journal.warning << "getNodeFat returns false"; + } + catch (std::exception&) + { + std::string info; + + if (packet.itype () == protocol::liTS_CANDIDATE) + info = "TS candidate"; + else if (packet.itype () == protocol::liBASE) + info = "Ledger base"; + else if (packet.itype () == protocol::liTX_NODE) + info = "TX node"; + else if (packet.itype () == protocol::liAS_NODE) + info = "AS node"; + + if (!packet.has_ledgerhash ()) + info += ", no hash specified"; + + m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info; + } + } + + Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); + send (oPacket); +} + +// This is dispatched by the job queue +static +void +sGetLedger (std::weak_ptr wPeer, + std::shared_ptr packet) +{ + std::shared_ptr peer = wPeer.lock (); + + if (peer) + peer->getLedger (*packet); +} + } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 671a057464..e7f1cee0f9 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -40,7 +39,7 @@ #include #include -#include +#include #include @@ -80,58 +79,6 @@ private: /** The length of the smallest valid finished message */ static const size_t sslMinimumFinishedLength = 12; - //-------------------------------------------------------------------------- - /** We have accepted an inbound connection. - - The connection state transitions from `stateConnect` to `stateConnected` - as `stateConnect`. - */ - void accept () - { - m_journal.info << "Accepted " << m_remoteAddress; - - m_socket->set_verify_mode (boost::asio::ssl::verify_none); - m_socket->async_handshake ( - boost::asio::ssl::stream_base::server, - m_strand.wrap (std::bind ( - &PeerImp::handleStart, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error))); - } - - /** Attempt an outbound connection. - - The connection may fail (for a number of reasons) and we do not know - what will happen at this point. - - The connection state does not transition with this function and remains - as `stateConnecting`. - */ - void connect () - { - m_journal.info << "Connecting to " << m_remoteAddress; - - boost::system::error_code err; - - m_timer.expires_from_now (nodeVerifySeconds, err); - - m_timer.async_wait (m_strand.wrap (std::bind ( - &PeerImp::handleVerifyTimer, - shared_from_this (), beast::asio::placeholders::error))); - - if (err) - { - m_journal.error << "Failed to set verify timer."; - detach ("c2"); - return; - } - - m_socket->next_layer ().async_connect ( - beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress), - m_strand.wrap (std::bind (&PeerImp::onConnect, - shared_from_this (), beast::asio::placeholders::error))); - } - public: /** Current state */ enum State @@ -201,7 +148,7 @@ public: std::list m_recentTxSets; mutable std::mutex m_recentLock; - boost::asio::deadline_timer m_timer; + boost::asio::deadline_timer timer_; std::vector m_readBuffer; std::list mSendQ; @@ -217,13 +164,20 @@ public: // True if close was called bool m_was_canceled; + + boost::asio::streambuf read_buffer_; - boost::optional http_message_; - boost::optional http_parser_; + boost::optional http_message_; + boost::optional http_parser_; message_stream message_stream_; + + boost::asio::streambuf write_buffer_; + bool write_pending_; + std::unique_ptr load_event_; //-------------------------------------------------------------------------- + /** New incoming peer from the specified socket */ PeerImp ( NativeSocketType&& socket, @@ -250,10 +204,11 @@ public: , m_clusterNode (false) , m_minLedger (0) , m_maxLedger (0) - , m_timer (m_owned_socket.get_io_service()) + , timer_ (m_owned_socket.get_io_service()) , m_slot (slot) , m_was_canceled (false) , message_stream_(*this) + , write_pending_ (false) { } @@ -288,10 +243,11 @@ public: , m_clusterNode (false) , m_minLedger (0) , m_maxLedger (0) - , m_timer (io_service) + , timer_ (io_service) , m_slot (slot) , m_was_canceled (false) , message_stream_(*this) + , write_pending_ (false) { } @@ -313,22 +269,66 @@ public: void getLedger (protocol::TMGetLedger& packet); +private: // - // i/o + // client role // void - start_read(); + do_connect(); void - on_read_detect (error_code ec, std::size_t bytes_transferred); + on_connect (error_code ec); + + beast::http::basic_message + make_request(); void - on_read_http (error_code ec, std::size_t bytes_transferred); + on_connect_ssl (error_code ec); + + void + on_write_http_request (error_code ec, std::size_t bytes_transferred); + + void + on_read_http_response (error_code ec, std::size_t bytes_transferred); + + // + // server role + // + + void + do_accept(); + + void + on_accept_ssl (error_code ec); + + void + on_read_http_detect (error_code ec, std::size_t bytes_transferred); + + void + on_read_http_request (error_code ec, std::size_t bytes_transferred); + + beast::http::basic_message + make_response (beast::http::basic_message const& req); + + void + on_write_http_response (error_code ec, std::size_t bytes_transferred); + + // + // protocol + // + + void + do_protocol_start(); void on_read_protocol (error_code ec, std::size_t bytes_transferred); + void + on_write_protocol (error_code ec, std::size_t bytes_transferred); + + //-------------------------------------------------------------------------- + //-------------------------------------------------------------------------- // // abstract_protocol_handler @@ -354,6 +354,7 @@ public: on_message_end (std::uint16_t type, std::shared_ptr <::google::protobuf::Message> const& m) override; + // message handlers error_code on_message (std::shared_ptr const& m) override; error_code on_message (std::shared_ptr const& m) override; error_code on_message (std::shared_ptr const& m) override; @@ -372,6 +373,7 @@ public: //-------------------------------------------------------------------------- +public: State state() const { return m_state; @@ -422,7 +424,7 @@ public: mSendQ.clear (); - (void) m_timer.cancel (); + (void) timer_.cancel (); if (graceful) { @@ -450,55 +452,6 @@ public: detach ("stop", graceful); } - /** Outbound connection attempt has completed (not necessarily successfully) - - The connection may fail for a number of reasons. Perhaps we do not have - a route to the remote endpoint, or there is no server listening at that - address. - - If the connection succeeded, we transition to the `stateConnected` state - and move on. - - If the connection failed, we simply disconnect. - - @param ec indicates success or an error code. - */ - void onConnect (boost::system::error_code ec) - { - if (m_detaching) - return; - - NativeSocketType::endpoint_type local_endpoint; - - if (! ec) - local_endpoint = m_socket->this_layer < - NativeSocketType> ().local_endpoint (ec); - - if (ec) - { - // VFALCO NOTE This log statement looks like ass - m_journal.info << - "Connect to " << m_remoteAddress << - " failed: " << ec.message(); - // This should end up calling onPeerClosed() - detach ("hc"); - return; - } - - bassert (m_state == stateConnecting); - m_state = stateConnected; - - m_peerFinder.on_connected (m_slot, - beast::IPAddressConversion::from_asio (local_endpoint)); - - m_socket->set_verify_mode (boost::asio::ssl::verify_none); - m_socket->async_handshake ( - boost::asio::ssl::stream_base::client, - m_strand.wrap (std::bind (&PeerImp::handleStart, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error))); - } - /** Indicates that the peer must be activated. A peer is activated after the handshake is completed and if it is not a second connection from a peer that we already have. Once activated @@ -516,9 +469,9 @@ public: void start () { if (m_inbound) - accept (); + do_accept (); else - connect (); + do_connect (); } //-------------------------------------------------------------------------- @@ -773,47 +726,6 @@ private: } } - // We have an encrypted connection to the peer. - // Have it say who it is so we know to avoid redundant connections. - // Establish that it really who we are talking to by having it sign a - // connection detail. Also need to establish no man in the middle attack - // is in progress. - void handleStart (boost::system::error_code const& ec) - { - if (m_detaching) - return; - - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec) - { - m_journal.info << "Handshake: " << ec.message (); - detach ("hs"); - return; - } - - if (m_inbound) - m_usage = m_resourceManager.newInboundEndpoint (m_remoteAddress); - else - m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); - - if (m_usage.disconnect ()) - { - detach ("resource"); - return; - } - - if(!sendHello ()) - { - m_journal.error << "Unable to send HELLO to " << m_remoteAddress; - detach ("hello"); - return; - } - - start_read(); - } - void handleVerifyTimer (boost::system::error_code const& ec) { if (m_detaching) diff --git a/src/ripple/overlay/impl/handshake_analyzer.h b/src/ripple/overlay/impl/handshake_analyzer.h new file mode 100644 index 0000000000..49dd458184 --- /dev/null +++ b/src/ripple/overlay/impl/handshake_analyzer.h @@ -0,0 +1,1318 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_PEERIMP_H_INCLUDED +#define RIPPLE_OVERLAY_PEERIMP_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// VFALCO This is unfortunate. Comment this out and +// just include what is needed. +#include + +#include +#include +#include + +#include + +namespace ripple { + +typedef boost::asio::ip::tcp::socket NativeSocketType; + +class PeerImp; + +std::string to_string (Peer const& peer); +std::ostream& operator<< (std::ostream& os, Peer const& peer); + +std::string to_string (Peer const* peer); +std::ostream& operator<< (std::ostream& os, Peer const* peer); + +std::string to_string (PeerImp const& peer); +std::ostream& operator<< (std::ostream& os, PeerImp const& peer); + +std::string to_string (PeerImp const* peer); +std::ostream& operator<< (std::ostream& os, PeerImp const* peer); + +//------------------------------------------------------------------------------ + +class PeerImp + : public Peer + , public std::enable_shared_from_this + , private beast::LeakChecked + , private abstract_protocol_handler +{ +private: + /** Time alloted for a peer to send a HELLO message (DEPRECATED) */ + static const boost::posix_time::seconds nodeVerifySeconds; + + /** The clock drift we allow a remote peer to have */ + static const std::uint32_t clockToleranceDeltaSeconds = 20; + + /** The length of the smallest valid finished message */ + static const size_t sslMinimumFinishedLength = 12; + + //-------------------------------------------------------------------------- + /** We have accepted an inbound connection. + + The connection state transitions from `stateConnect` to `stateConnected` + as `stateConnect`. + */ + void accept () + { + m_journal.info << "Accepted " << m_remoteAddress; + + m_socket->set_verify_mode (boost::asio::ssl::verify_none); + m_socket->async_handshake ( + boost::asio::ssl::stream_base::server, + m_strand.wrap (std::bind ( + &PeerImp::handleStart, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error))); + } + + /** Attempt an outbound connection. + + The connection may fail (for a number of reasons) and we do not know + what will happen at this point. + + The connection state does not transition with this function and remains + as `stateConnecting`. + */ + void connect () + { + m_journal.info << "Connecting to " << m_remoteAddress; + + boost::system::error_code err; + + m_timer.expires_from_now (nodeVerifySeconds, err); + + m_timer.async_wait (m_strand.wrap (std::bind ( + &PeerImp::handleVerifyTimer, + shared_from_this (), beast::asio::placeholders::error))); + + if (err) + { + m_journal.error << "Failed to set verify timer."; + detach ("c2"); + return; + } + + m_socket->next_layer ().async_connect ( + beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress), + m_strand.wrap (std::bind (&PeerImp::onConnect, + shared_from_this (), beast::asio::placeholders::error))); + } + +public: + /** Current state */ + enum State + { + /** An connection is being established (outbound) */ + stateConnecting + + /** Connection has been successfully established */ + ,stateConnected + + /** Handshake has been received from this peer */ + ,stateHandshaked + + /** Running the Ripple protocol actively */ + ,stateActive + + /** Gracefully closing */ + ,stateGracefulClose + }; + + typedef std::shared_ptr ptr; + + NativeSocketType m_owned_socket; + + beast::Journal m_journal; + + // A unique identifier (up to a restart of rippled) for this particular + // peer instance. A peer that disconnects will, upon reconnection, get a + // new ID. + ShortId m_shortId; + + // Updated at each stage of the connection process to reflect + // the current conditions as closely as possible. This includes + // the case where we learn the true IP via a PROXY handshake. + beast::IP::Endpoint m_remoteAddress; + + // These is up here to prevent warnings about order of initializations + // + Resource::Manager& m_resourceManager; + PeerFinder::Manager& m_peerFinder; + OverlayImpl& m_overlay; + bool m_inbound; + + std::unique_ptr m_socket; + boost::asio::io_service::strand m_strand; + + State m_state; // Current state + bool m_detaching; // True if detaching. + bool m_clusterNode; // True if peer is a node in our cluster + RippleAddress m_nodePublicKey; // Node public key of peer. + std::string m_nodeName; + + // Both sides of the peer calculate this value and verify that it matches + // to detect/prevent man-in-the-middle attacks. + // + uint256 m_secureCookie; + + // The indices of the smallest and largest ledgers this peer has available + // + LedgerIndex m_minLedger; + LedgerIndex m_maxLedger; + + uint256 m_closedLedgerHash; + uint256 m_previousLedgerHash; + + std::list m_recentLedgers; + std::list m_recentTxSets; + mutable std::mutex m_recentLock; + + boost::asio::deadline_timer m_timer; + + std::vector m_readBuffer; + std::list mSendQ; + Message::pointer mSendingPacket; + protocol::TMStatusChange mLastStatus; + protocol::TMHello mHello; + + Resource::Consumer m_usage; + + // The slot assigned to us by PeerFinder + PeerFinder::Slot::ptr m_slot; + + // True if close was called + bool m_was_canceled; + + boost::asio::streambuf read_buffer_; + boost::optional http_message_; + boost::optional http_parser_; + message_stream message_stream_; + std::unique_ptr load_event_; + + //-------------------------------------------------------------------------- + /** New incoming peer from the specified socket */ + PeerImp ( + NativeSocketType&& socket, + beast::IP::Endpoint remoteAddress, + OverlayImpl& overlay, + Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, + PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, + MultiSocket::Flag flags) + : m_owned_socket (std::move (socket)) + , m_journal (deprecatedLogs().journal("Peer")) + , m_shortId (0) + , m_remoteAddress (remoteAddress) + , m_resourceManager (resourceManager) + , m_peerFinder (peerFinder) + , m_overlay (overlay) + , m_inbound (true) + , m_socket (MultiSocket::New ( + m_owned_socket, ssl_context, flags.asBits ())) + , m_strand (m_owned_socket.get_io_service()) + , m_state (stateConnected) + , m_detaching (false) + , m_clusterNode (false) + , m_minLedger (0) + , m_maxLedger (0) + , m_timer (m_owned_socket.get_io_service()) + , m_slot (slot) + , m_was_canceled (false) + , message_stream_(*this) + { + } + + /** New outgoing peer + @note Construction of outbound peers is a two step process: a second + call is needed (to connect or accept) but we cannot make it from + inside the constructor because you cannot call shared_from_this + from inside constructors. + */ + PeerImp ( + beast::IP::Endpoint remoteAddress, + boost::asio::io_service& io_service, + OverlayImpl& overlay, + Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, + PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, + MultiSocket::Flag flags) + : m_owned_socket (io_service) + , m_journal (deprecatedLogs().journal("Peer")) + , m_shortId (0) + , m_remoteAddress (remoteAddress) + , m_resourceManager (resourceManager) + , m_peerFinder (peerFinder) + , m_overlay (overlay) + , m_inbound (false) + , m_socket (MultiSocket::New ( + io_service, ssl_context, flags.asBits ())) + , m_strand (io_service) + , m_state (stateConnecting) + , m_detaching (false) + , m_clusterNode (false) + , m_minLedger (0) + , m_maxLedger (0) + , m_timer (io_service) + , m_slot (slot) + , m_was_canceled (false) + , message_stream_(*this) + { + } + + virtual + ~PeerImp () + { + m_overlay.remove (m_slot); + } + + PeerImp (PeerImp const&) = delete; + PeerImp& operator= (PeerImp const&) = delete; + + MultiSocket& getStream () + { + return *m_socket; + } + + static char const* getCountedObjectName () { return "Peer"; } + + void getLedger (protocol::TMGetLedger& packet); + + // + // i/o + // + + void + start_read(); + + void + on_read_detect (error_code ec, std::size_t bytes_transferred); + + void + on_read_http (error_code ec, std::size_t bytes_transferred); + + void + on_read_protocol (error_code ec, std::size_t bytes_transferred); + + //-------------------------------------------------------------------------- + // + // abstract_protocol_handler + // + //-------------------------------------------------------------------------- + + static + error_code + invalid_argument_error() + { + return boost::system::errc::make_error_code ( + boost::system::errc::invalid_argument); + } + + error_code + on_message_unknown (std::uint16_t type) override; + + error_code + on_message_begin (std::uint16_t type, + std::shared_ptr <::google::protobuf::Message> const& m) override; + + void + on_message_end (std::uint16_t type, + std::shared_ptr <::google::protobuf::Message> const& m) override; + + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + error_code on_message (std::shared_ptr const& m) override; + + //-------------------------------------------------------------------------- + + State state() const + { + return m_state; + } + + void state (State new_state) + { + m_state = new_state; + } + + //-------------------------------------------------------------------------- + /** Disconnect a peer + + The peer transitions from its current state into `stateGracefulClose` + + @param rsn a code indicating why the peer was disconnected + @param onIOStrand true if called on an I/O strand. It if is not, then + a callback will be queued up. + */ + void detach (const char* rsn, bool graceful = true) + { + if (! m_strand.running_in_this_thread ()) + { + m_strand.post (std::bind (&PeerImp::detach, + shared_from_this (), rsn, graceful)); + return; + } + + if (!m_detaching) + { + // NIKB TODO No - a race is NOT ok. This needs to be fixed + // to have PeerFinder work reliably. + m_detaching = true; // Race is ok. + + if (m_was_canceled) + m_peerFinder.on_cancel (m_slot); + else + m_peerFinder.on_closed (m_slot); + + if (m_state == stateActive) + m_overlay.onPeerDisconnect (shared_from_this ()); + + m_state = stateGracefulClose; + + if (m_clusterNode && m_journal.active(beast::Journal::Severity::kWarning)) + m_journal.warning << "Cluster peer " << m_nodeName << + " detached: " << rsn; + + mSendQ.clear (); + + (void) m_timer.cancel (); + + if (graceful) + { + m_socket->async_shutdown ( + m_strand.wrap ( std::bind( + &PeerImp::handleShutdown, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error))); + } + else + { + m_socket->cancel (); + } + + // VFALCO TODO Stop doing this. + if (m_nodePublicKey.isValid ()) + m_nodePublicKey.clear (); // Be idempotent. + } + } + + /** Close the connection. */ + void close (bool graceful) + { + m_was_canceled = true; + detach ("stop", graceful); + } + + /** Outbound connection attempt has completed (not necessarily successfully) + + The connection may fail for a number of reasons. Perhaps we do not have + a route to the remote endpoint, or there is no server listening at that + address. + + If the connection succeeded, we transition to the `stateConnected` state + and move on. + + If the connection failed, we simply disconnect. + + @param ec indicates success or an error code. + */ + void onConnect (boost::system::error_code ec) + { + if (m_detaching) + return; + + NativeSocketType::endpoint_type local_endpoint; + + if (! ec) + local_endpoint = m_socket->this_layer < + NativeSocketType> ().local_endpoint (ec); + + if (ec) + { + // VFALCO NOTE This log statement looks like ass + m_journal.info << + "Connect to " << m_remoteAddress << + " failed: " << ec.message(); + // This should end up calling onPeerClosed() + detach ("hc"); + return; + } + + bassert (m_state == stateConnecting); + m_state = stateConnected; + + m_peerFinder.on_connected (m_slot, + beast::IPAddressConversion::from_asio (local_endpoint)); + + m_socket->set_verify_mode (boost::asio::ssl::verify_none); + m_socket->async_handshake ( + boost::asio::ssl::stream_base::client, + m_strand.wrap (std::bind (&PeerImp::handleStart, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error))); + } + + /** Indicates that the peer must be activated. + A peer is activated after the handshake is completed and if it is not + a second connection from a peer that we already have. Once activated + the peer transitions to `stateActive` and begins operating. + */ + void activate () + { + bassert (m_state == stateHandshaked); + m_state = stateActive; + bassert(m_shortId == 0); + m_shortId = m_overlay.next_id(); + m_overlay.onPeerActivated(shared_from_this ()); + } + + void start () + { + if (m_inbound) + accept (); + else + connect (); + } + + //-------------------------------------------------------------------------- + std::string getClusterNodeName() const + { + return m_nodeName; + } + + //-------------------------------------------------------------------------- + + void + send (Message::pointer const& m) override + { + if (m) + { + if (m_strand.running_in_this_thread()) + { + if (mSendingPacket) + mSendQ.push_back (m); + else + sendForce (m); + } + else + { + m_strand.post (std::bind (&PeerImp::send, shared_from_this(), m)); + } + + } + } + + void sendGetPeers () + { + // Ask peer for known other peers. + protocol::TMGetPeers msg; + + msg.set_doweneedthis (1); + + Message::pointer packet = std::make_shared ( + msg, protocol::mtGET_PEERS); + + send (packet); + } + + void charge (Resource::Charge const& fee) + { + if ((m_usage.charge (fee) == Resource::drop) && m_usage.disconnect ()) + detach ("resource"); + } + + static void charge (std::weak_ptr & peer, Resource::Charge const& fee) + { + Peer::ptr p (peer.lock()); + + if (p != nullptr) + p->charge (fee); + } + + Json::Value json () + { + Json::Value ret (Json::objectValue); + + ret["public_key"] = m_nodePublicKey.ToString (); + ret["address"] = m_remoteAddress.to_string(); + + if (m_inbound) + ret["inbound"] = true; + + if (m_clusterNode) + { + ret["cluster"] = true; + + if (!m_nodeName.empty ()) + ret["name"] = m_nodeName; + } + + if (mHello.has_fullversion ()) + ret["version"] = mHello.fullversion (); + + if (mHello.has_protoversion () && + (mHello.protoversion () != BuildInfo::getCurrentProtocol().toPacked ())) + { + ret["protocol"] = BuildInfo::Protocol (mHello.protoversion ()).toStdString (); + } + + std::uint32_t minSeq, maxSeq; + ledgerRange(minSeq, maxSeq); + + if ((minSeq != 0) || (maxSeq != 0)) + ret["complete_ledgers"] = boost::lexical_cast(minSeq) + " - " + + boost::lexical_cast(maxSeq); + + if (!!m_closedLedgerHash) + ret["ledger"] = to_string (m_closedLedgerHash); + + if (mLastStatus.has_newstatus ()) + { + switch (mLastStatus.newstatus ()) + { + case protocol::nsCONNECTING: + ret["status"] = "connecting"; + break; + + case protocol::nsCONNECTED: + ret["status"] = "connected"; + break; + + case protocol::nsMONITORING: + ret["status"] = "monitoring"; + break; + + case protocol::nsVALIDATING: + ret["status"] = "validating"; + break; + + case protocol::nsSHUTTING: + ret["status"] = "shutting"; + break; + + default: + // FIXME: do we really want this? + m_journal.warning << "Unknown status: " << mLastStatus.newstatus (); + } + } + + return ret; + } + + bool isInCluster () const + { + return m_clusterNode; + } + + uint256 const& getClosedLedgerHash () const + { + return m_closedLedgerHash; + } + + bool hasLedger (uint256 const& hash, std::uint32_t seq) const + { + std::lock_guard sl(m_recentLock); + + if ((seq != 0) && (seq >= m_minLedger) && (seq <= m_maxLedger)) + return true; + + BOOST_FOREACH (uint256 const & ledger, m_recentLedgers) + { + if (ledger == hash) + return true; + } + + return false; + } + + void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const + { + std::lock_guard sl(m_recentLock); + + minSeq = m_minLedger; + maxSeq = m_maxLedger; + } + + bool hasTxSet (uint256 const& hash) const + { + std::lock_guard sl(m_recentLock); + BOOST_FOREACH (uint256 const & set, m_recentTxSets) + + if (set == hash) + return true; + + return false; + } + + Peer::ShortId getShortId () const + { + return m_shortId; + } + + const RippleAddress& getNodePublic () const + { + return m_nodePublicKey; + } + + void cycleStatus () + { + m_previousLedgerHash = m_closedLedgerHash; + m_closedLedgerHash.zero (); + } + + bool supportsVersion (int version) + { + return mHello.has_protoversion () && (mHello.protoversion () >= version); + } + + bool hasRange (std::uint32_t uMin, std::uint32_t uMax) + { + return (uMin >= m_minLedger) && (uMax <= m_maxLedger); + } + + beast::IP::Endpoint getRemoteAddress() const + { + return m_remoteAddress; + } + +private: + void handleShutdown (boost::system::error_code const& ec) + { + if (m_detaching) + return; + + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_journal.info << "Shutdown: " << ec.message (); + detach ("hsd"); + return; + } + } + + void handleWrite (boost::system::error_code const& ec, size_t bytes) + { + if (m_detaching) + return; + + // Call on IO strand + + mSendingPacket.reset (); + + if (ec == boost::asio::error::operation_aborted) + return; + + if (m_detaching) + return; + + if (ec) + { + m_journal.info << "Write: " << ec.message (); + detach ("hw"); + return; + } + + if (!mSendQ.empty ()) + { + Message::pointer packet = mSendQ.front (); + + if (packet) + { + sendForce (packet); + mSendQ.pop_front (); + } + } + } + + // We have an encrypted connection to the peer. + // Have it say who it is so we know to avoid redundant connections. + // Establish that it really who we are talking to by having it sign a + // connection detail. Also need to establish no man in the middle attack + // is in progress. + void handleStart (boost::system::error_code const& ec) + { + if (m_detaching) + return; + + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_journal.info << "Handshake: " << ec.message (); + detach ("hs"); + return; + } + + if (m_inbound) + m_usage = m_resourceManager.newInboundEndpoint (m_remoteAddress); + else + m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); + + if (m_usage.disconnect ()) + { + detach ("resource"); + return; + } + + if(!sendHello ()) + { + m_journal.error << "Unable to send HELLO to " << m_remoteAddress; + detach ("hello"); + return; + } + + start_read(); + } + + void handleVerifyTimer (boost::system::error_code const& ec) + { + if (m_detaching) + return; + + if (ec == boost::asio::error::operation_aborted) + { + // Timer canceled because deadline no longer needed. + } + else if (ec) + { + m_journal.info << "Peer verify timer error"; + } + else + { + // m_journal.info << "Verify: Peer failed to verify in time."; + + detach ("hvt"); + } + } + + void sendForce (const Message::pointer& packet) + { + // must be on IO strand + if (!m_detaching) + { + mSendingPacket = packet; + + boost::asio::async_write (getStream (), + boost::asio::buffer (packet->getBuffer ()), + m_strand.wrap (std::bind ( + &PeerImp::handleWrite, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); + } + } + + /** Hashes the latest finished message from an SSL stream + + @param sslSession the session to get the message from. + @param hash the buffer into which the hash of the retrieved + message will be saved. The buffer MUST be at least + 64 bytes long. + @param getMessage a pointer to the function to call to retrieve the + finished message. This be either: + `SSL_get_finished` or + `SSL_get_peer_finished`. + + @return `true` if successful, `false` otherwise. + + */ + bool hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, + size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)) + { + unsigned char buf[1024]; + + // Get our finished message and hash it. + std::memset(hash, 0, 64); + + size_t len = getFinishedMessage (sslSession, buf, sizeof (buf)); + + if(len < sslMinimumFinishedLength) + return false; + + SHA512 (buf, len, hash); + + return true; + } + + /** Generates a secure cookie to protect against man-in-the-middle attacks + + This function should never fail under normal circumstances and regular + server operation. + + A failure prevents the cookie value from being calculated which is an + important component of connection security. If this function fails, a + secure connection cannot be established and the link MUST be dropped. + + @return `true` if the cookie was generated, `false` otherwise. + + @note failure is an exceptional situation - it should never happen and + will almost always indicate an active man-in-the-middle attack is + taking place. + */ + bool calculateSessionCookie () + { + SSL* ssl = m_socket->ssl_handle (); + + if (!ssl) + { + m_journal.error << "Cookie generation: No underlying connection"; + return false; + } + + unsigned char sha1[64]; + unsigned char sha2[64]; + + if (!hashLatestFinishedMessage(ssl, sha1, SSL_get_finished)) + { + m_journal.error << "Cookie generation: local setup not complete"; + return false; + } + + if (!hashLatestFinishedMessage(ssl, sha2, SSL_get_peer_finished)) + { + m_journal.error << "Cookie generation: peer setup not complete"; + return false; + } + + // If both messages hash to the same value (i.e. match) something is + // wrong. This would cause the resulting cookie to be 0. + if (memcmp (sha1, sha2, sizeof (sha1)) == 0) + { + m_journal.error << "Cookie generation: identical finished messages"; + return false; + } + + for (size_t i = 0; i < sizeof (sha1); ++i) + sha1[i] ^= sha2[i]; + + // Finally, derive the actual cookie for the values that we have + // calculated. + m_secureCookie = Serializer::getSHA512Half (sha1, sizeof(sha1)); + + return true; + } + + /** Perform a secure handshake with the peer at the other end. + + If this function returns false then we cannot guarantee that there + is no active man-in-the-middle attack taking place and the link + MUST be disconnected. + + @return true if successful, false otherwise. + */ + bool sendHello () + { + if (!calculateSessionCookie()) + return false; + + Blob vchSig; + getApp().getLocalCredentials ().getNodePrivate ().signNodePrivate (m_secureCookie, vchSig); + + protocol::TMHello h; + + h.set_protoversion (BuildInfo::getCurrentProtocol().toPacked ()); + h.set_protoversionmin (BuildInfo::getMinimumProtocol().toPacked ()); + h.set_fullversion (BuildInfo::getFullVersionString ()); + h.set_nettime (getApp().getOPs ().getNetworkTimeNC ()); + h.set_nodepublic (getApp().getLocalCredentials ().getNodePublic ().humanNodePublic ()); + h.set_nodeproof (&vchSig[0], vchSig.size ()); + h.set_ipv4port (getConfig ().peerListeningPort); + h.set_testnet (false); + + // We always advertise ourselves as private in the HELLO message. This + // suppresses the old peer advertising code and allows PeerFinder to + // take over the functionality. + h.set_nodeprivate (true); + + Ledger::pointer closedLedger = getApp().getLedgerMaster ().getClosedLedger (); + + if (closedLedger && closedLedger->isClosed ()) + { + uint256 hash = closedLedger->getHash (); + h.set_ledgerclosed (hash.begin (), hash.size ()); + hash = closedLedger->getParentHash (); + h.set_ledgerprevious (hash.begin (), hash.size ()); + } + + Message::pointer packet = std::make_shared ( + h, protocol::mtHELLO); + send (packet); + + return true; + } + + void addLedger (uint256 const& hash) + { + std::lock_guard sl(m_recentLock); + BOOST_FOREACH (uint256 const & ledger, m_recentLedgers) + + if (ledger == hash) + return; + + if (m_recentLedgers.size () == 128) + m_recentLedgers.pop_front (); + + m_recentLedgers.push_back (hash); + } + + void addTxSet (uint256 const& hash) + { + std::lock_guard sl(m_recentLock); + + if(std::find (m_recentTxSets.begin (), m_recentTxSets.end (), hash) != m_recentTxSets.end ()) + return; + + if (m_recentTxSets.size () == 128) + m_recentTxSets.pop_front (); + + m_recentTxSets.push_back (hash); + } + + void doFetchPack (const std::shared_ptr& packet) + { + // VFALCO TODO Invert this dependency using an observer and shared state object. + // Don't queue fetch pack jobs if we're under load or we already have + // some queued. + if (getApp().getFeeTrack ().isLoadedLocal () || + (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) || + (getApp().getJobQueue().getJobCount(jtPACK) > 10)) + { + m_journal.info << "Too busy to make fetch pack"; + return; + } + + if (packet->ledgerhash ().size () != 32) + { + m_journal.warning << "FetchPack hash size malformed"; + charge (Resource::feeInvalidRequest); + return; + } + + uint256 hash; + memcpy (hash.begin (), packet->ledgerhash ().data (), 32); + + getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", + std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), std::placeholders::_1, + std::weak_ptr (shared_from_this ()), packet, + hash, UptimeTimer::getInstance ().getElapsedSeconds ())); + } + + void doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow) + { + if (peer.expired ()) + return; + + uint256 solution = pow->solve (); + + if (solution.isZero ()) + { + m_journal.warning << "Failed to solve proof of work"; + } + else + { + Peer::ptr pptr (peer.lock ()); + + if (pptr) + { + protocol::TMProofWork reply; + reply.set_token (pow->getToken ()); + reply.set_response (solution.begin (), solution.size ()); + pptr->send (std::make_shared (reply, protocol::mtPROOFOFWORK)); + } + else + { + // WRITEME: Save solved proof of work for new connection + } + } + } + + static void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx, std::weak_ptr peer) + { + #ifndef TRUST_NETWORK + try + { + #endif + + if (stx->isFieldPresent(sfLastLedgerSequence) && + (stx->getFieldU32 (sfLastLedgerSequence) < + getApp().getLedgerMaster().getValidLedgerIndex())) + { // Transaction has expired + getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD); + charge (peer, Resource::feeUnwantedData); + return; + } + + bool const needCheck = !(flags & SF_SIGGOOD); + Transaction::pointer tx = + std::make_shared (stx, needCheck); + + if (tx->getStatus () == INVALID) + { + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); + charge (peer, Resource::feeInvalidSignature); + return; + } + else + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_SIGGOOD); + + bool const trusted (flags & SF_TRUSTED); + getApp().getOPs ().processTransaction (tx, trusted, false, false); + + #ifndef TRUST_NETWORK + } + catch (...) + { + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); + charge (peer, Resource::feeInvalidRequest); + } + + #endif + } + + // Called from our JobQueue + static void checkPropose (Job& job, Overlay* pPeers, std::shared_ptr packet, + LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, + std::weak_ptr peer, bool fromCluster) + { + bool sigGood = false; + bool isTrusted = (job.getType () == jtPROPOSAL_t); + + WriteLog (lsTRACE, Peer) << "Checking " << + (isTrusted ? "trusted" : "UNTRUSTED") << + " proposal"; + + assert (packet); + protocol::TMProposeSet& set = *packet; + + uint256 prevLedger; + + if (set.has_previousledger ()) + { + // proposal includes a previous ledger + WriteLog(lsTRACE, Peer) << "proposal with previous ledger"; + memcpy (prevLedger.begin (), set.previousledger ().data (), 256 / 8); + + if (!fromCluster && !proposal->checkSign (set.signature ())) + { + Peer::ptr p = peer.lock (); + WriteLog(lsWARNING, Peer) << "proposal with previous ledger fails sig check: " << + *p; + charge (peer, Resource::feeInvalidSignature); + return; + } + else + sigGood = true; + } + else + { + if (consensusLCL.isNonZero () && proposal->checkSign (set.signature ())) + { + prevLedger = consensusLCL; + sigGood = true; + } + else + { + // Could be mismatched prev ledger + WriteLog(lsWARNING, Peer) << "Ledger proposal fails signature check"; + proposal->setSignature (set.signature ()); + } + } + + if (isTrusted) + { + getApp().getOPs ().processTrustedProposal (proposal, packet, nodePublic, prevLedger, sigGood); + } + else if (sigGood && (prevLedger == consensusLCL)) + { + // relay untrusted proposal + WriteLog(lsTRACE, Peer) << "relaying UNTRUSTED proposal"; + std::set peers; + + if (getApp().getHashRouter ().swapSet ( + proposal->getSuppressionID (), peers, SF_RELAYED)) + { + pPeers->foreach (send_if_not ( + std::make_shared (set, protocol::mtPROPOSE_LEDGER), + peer_in_set(peers))); + } + } + else + { + WriteLog(lsDEBUG, Peer) << "Not relaying UNTRUSTED proposal"; + } + } + + static void checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster, + std::shared_ptr packet, std::weak_ptr peer) + { + #ifndef TRUST_NETWORK + + try + #endif + { + uint256 signingHash = val->getSigningHash(); + if (!isCluster && !val->isValid (signingHash)) + { + WriteLog(lsWARNING, Peer) << "Validation is invalid"; + charge (peer, Resource::feeInvalidRequest); + return; + } + + std::string source; + Peer::ptr lp = peer.lock (); + + if (lp) + source = to_string(*lp); + else + source = "unknown"; + + std::set peers; + + //---------------------------------------------------------------------- + // + { + SerializedValidation const& sv (*val); + Validators::ReceivedValidation rv; + rv.ledgerHash = sv.getLedgerHash (); + rv.publicKey = sv.getSignerPublic(); + getApp ().getValidators ().receiveValidation (rv); + } + // + //---------------------------------------------------------------------- + + if (getApp().getOPs ().recvValidation (val, source) && + getApp().getHashRouter ().swapSet (signingHash, peers, SF_RELAYED)) + { + pPeers->foreach (send_if_not ( + std::make_shared (*packet, protocol::mtVALIDATION), + peer_in_set(peers))); + } + } + + #ifndef TRUST_NETWORK + catch (...) + { + WriteLog(lsTRACE, Peer) << "Exception processing validation"; + charge (peer, Resource::feeInvalidRequest); + } + #endif + } +}; + +//------------------------------------------------------------------------------ + +const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); + +//------------------------------------------------------------------------------ + +// to_string should not be used we should just use lexical_cast maybe + +inline std::string to_string (PeerImp const& peer) +{ + if (peer.isInCluster()) + return peer.getClusterNodeName(); + + return peer.getRemoteAddress().to_string(); +} + +inline std::string to_string (PeerImp const* peer) +{ + return to_string (*peer); +} + +inline std::ostream& operator<< (std::ostream& os, PeerImp const& peer) +{ + os << to_string (peer); + + return os; +} + +inline std::ostream& operator<< (std::ostream& os, PeerImp const* peer) +{ + os << to_string (peer); + + return os; +} + +//------------------------------------------------------------------------------ + +inline std::string to_string (Peer const& peer) +{ + if (peer.isInCluster()) + return peer.getClusterNodeName(); + + return peer.getRemoteAddress().to_string(); +} + +inline std::string to_string (Peer const* peer) +{ + return to_string (*peer); +} + +inline std::ostream& operator<< (std::ostream& os, Peer const& peer) +{ + os << to_string (peer); + + return os; +} + +inline std::ostream& operator<< (std::ostream& os, Peer const* peer) +{ + os << to_string (peer); + + return os; +} + +} + +#endif diff --git a/src/ripple/overlay/impl/message_name.cpp b/src/ripple/overlay/impl/message_name.cpp index 13b8b5c727..15990e0bf8 100644 --- a/src/ripple/overlay/impl/message_name.cpp +++ b/src/ripple/overlay/impl/message_name.cpp @@ -17,9 +17,6 @@ */ //============================================================================== -#ifndef RIPPLE_OVERLAY_MESSAGE_NAME_H_INCLUDED -#define RIPPLE_OVERLAY_MESSAGE_NAME_H_INCLUDED - namespace ripple { char const* @@ -49,5 +46,3 @@ protocol_message_name (int type) } } - -#endif diff --git a/src/ripple/overlay/impl/peer_info.h b/src/ripple/overlay/impl/peer_info.h new file mode 100644 index 0000000000..3d3730b022 --- /dev/null +++ b/src/ripple/overlay/impl/peer_info.h @@ -0,0 +1,36 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_PEER_INFO_H_INCLUDED +#define RIPPLE_OVERLAY_PEER_INFO_H_INCLUDED + +#include + +namespace ripple { + +struct parsed_request +{ + int version_major; + int version_minor; +}; + + +} + +#endif diff --git a/src/ripple/overlay/impl/peer_protocol_detector.h b/src/ripple/overlay/impl/peer_protocol_detector.h index 7976fea3b1..f14e89f7c8 100644 --- a/src/ripple/overlay/impl/peer_protocol_detector.h +++ b/src/ripple/overlay/impl/peer_protocol_detector.h @@ -39,30 +39,35 @@ public: */ template boost::tribool - operator() (ConstBufferSequence const& buffers) - { - std::array data; - auto const n (boost::asio::buffer_copy ( - boost::asio::buffer(data), buffers)); - /* - Protocol messages are framed by a 6 byte header which includes - a big-endian 4-byte length followed by a big-endian 2-byte type. - The type for 'hello' is 1. - */ - if (n>=1 && data[0] != 0) - return false; - if (n>=2 && data[1] != 0) - return false; - if (n>=5 && data[4] != 0) - return false; - if (n>=6 && data[5] != 1) - return false; - if (n>=6) - return true; - return boost::indeterminate; - } + operator() (ConstBufferSequence const& buffers); }; +template +boost::tribool +peer_protocol_detector::operator() ( + ConstBufferSequence const& buffers) +{ + std::array data; + auto const n (boost::asio::buffer_copy ( + boost::asio::buffer(data), buffers)); + /* + Protocol messages are framed by a 6 byte header which includes + a big-endian 4-byte length followed by a big-endian 2-byte type. + The type for 'hello' is 1. + */ + if (n>=1 && data[0] != 0) + return false; + if (n>=2 && data[1] != 0) + return false; + if (n>=5 && data[4] != 0) + return false; + if (n>=6 && data[5] != 1) + return false; + if (n>=6) + return true; + return boost::indeterminate; +} + } // ripple #endif diff --git a/src/ripple/overlay/tests/peer_info.test.cpp b/src/ripple/overlay/tests/peer_info.test.cpp new file mode 100644 index 0000000000..7eb470c917 --- /dev/null +++ b/src/ripple/overlay/tests/peer_info.test.cpp @@ -0,0 +1,68 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include + +namespace ripple { + +class peer_info_test : public beast::unit_test::suite +{ +public: + // Parse a comma delimited list of strings + // Leading and trailing whitespace is removed from each element + static + std::vector + parse_list (std::string const& value) + { + std::vector list; + auto first (value.begin()); + auto last (value.end()); + for(;;) + { + auto const found (std::find (first, last, ',')); + if (found != first) + { + auto p0 (first); + auto p1 (found - 1); + + } + if (found == last) + break; + first = found + 1; + } + + return list; + } + + void + run() + { + expect (beast::http::rfc2616::trim("x") == "x"); + } +}; + +BEAST_DEFINE_TESTSUITE(peer_info,overlay,ripple); + +} // ripple + diff --git a/src/ripple/unity/overlay.cpp b/src/ripple/unity/overlay.cpp index 67d075be68..2436d9f922 100644 --- a/src/ripple/unity/overlay.cpp +++ b/src/ripple/unity/overlay.cpp @@ -25,3 +25,5 @@ #include #include +#include +