diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index e29fcb22e8..438586dd3b 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2499,18 +2499,9 @@ - - True - - True - - - - - True @@ -2521,6 +2512,8 @@ + + True diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 0c505c2356..9b636aa708 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3522,21 +3522,9 @@ ripple\nodestore - - ripple\overlay\impl - ripple\overlay\impl - - ripple\overlay\impl - - - ripple\overlay\impl - - - ripple\overlay\impl - ripple\overlay\impl @@ -3549,6 +3537,9 @@ ripple\overlay\impl + + ripple\overlay\impl + ripple\overlay\impl diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index a4491a7c3a..32f4929cc7 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -21,8 +21,13 @@ #define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED #include "ripple.pb.h" - +#include +#include +#include +#include +#include #include +#include // namespace ripple { @@ -62,13 +67,84 @@ public: bool operator == (Message const& other) const; /** Calculate the length of a packed message. */ + /** @{ */ static unsigned getLength (std::vector const& buf); + template + static + std::enable_if_t::value, std::size_t> + size (FwdIter first, FwdIter last) + { + if (std::distance(first, last) < + Message::kHeaderBytes) + return 0; + std::size_t n; + n = std::size_t{*first++} << 24; + n += std::size_t{*first++} << 16; + n += std::size_t{*first++} << 8; + n += std::size_t{*first}; + return n; + } + + template + static + std::size_t + size (BufferSequence const& buffers) + { + return size(buffers_begin(buffers), + buffers_end(buffers)); + } + /** @} */ + /** Determine the type of a packed message. */ + /** @{ */ static int getType (std::vector const& buf); + template + static + std::enable_if_t::value, int> + type (FwdIter first, FwdIter last) + { + if (std::distance(first, last) < + Message::kHeaderBytes) + return 0; + return (int{*std::next(first, 4)} << 8) | + *std::next(first, 5); + } + + template + static + int + type (BufferSequence const& buffers) + { + return type(buffers_begin(buffers), + buffers_end(buffers)); + } + + /** @} */ + private: - // Encodes the size and type into a header at the beginning of buf + template + static + boost::asio::buffers_iterator + buffers_begin (BufferSequence const& buffers) + { + return boost::asio::buffers_iterator< + BufferSequence, Value>::begin (buffers); + } + + template + static + boost::asio::buffers_iterator + buffers_end (BufferSequence const& buffers) + { + return boost::asio::buffers_iterator< + BufferSequence, Value>::end (buffers); + } + + // Encodes the size and type into a header at the beginning of buf // void encodeHeader (unsigned size, int type); diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 681100e1a5..9cf3be0c90 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -62,7 +62,6 @@ PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint, , usage_(consumer) , slot_ (slot) , http_message_(std::move(request)) - , message_stream_(*this) { } @@ -87,7 +86,6 @@ PeerImp::PeerImp (id_t id, beast::IP::Endpoint remoteAddress, , m_inbound (false) , state_ (State::connecting) , slot_ (slot) - , message_stream_(*this) { } @@ -911,10 +909,20 @@ PeerImp::onReadMessage (error_code ec, std::size_t bytes_transferred) } read_buffer_.commit (bytes_transferred); - ec = message_stream_.write (read_buffer_.data()); - read_buffer_.consume (read_buffer_.size()); - if(ec) - return fail("onReadMessage", ec); + + while (read_buffer_.size() > 0) + { + std::size_t bytes_consumed; + std::tie(bytes_consumed, ec) = invokeProtocolMessage( + read_buffer_.data(), *this); + if (ec) + return fail("onReadMessage", ec); + if (! stream_.next_layer().is_open()) + return; + if (bytes_consumed == 0) + break; + read_buffer_.consume (bytes_consumed); + } if(gracefulClose_) return; // Timeout on writes only @@ -966,12 +974,12 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred) //------------------------------------------------------------------------------ // -// abstract_protocol_handler +// ProtocolHandler // //------------------------------------------------------------------------------ PeerImp::error_code -PeerImp::on_message_unknown (std::uint16_t type) +PeerImp::onMessageUnknown (std::uint16_t type) { error_code ec; // TODO @@ -979,7 +987,7 @@ PeerImp::on_message_unknown (std::uint16_t type) } PeerImp::error_code -PeerImp::on_message_begin (std::uint16_t type, +PeerImp::onMessageBegin (std::uint16_t type, std::shared_ptr <::google::protobuf::Message> const& m) { error_code ec; @@ -1000,25 +1008,22 @@ PeerImp::on_message_begin (std::uint16_t type, if (! ec) { load_event_ = getApp().getJobQueue ().getLoadEventAP ( - jtPEER, protocol_message_name(type)); + jtPEER, protocolMessageName(type)); } return ec; } void -PeerImp::on_message_end (std::uint16_t, +PeerImp::onMessageEnd (std::uint16_t, std::shared_ptr <::google::protobuf::Message> const&) { load_event_.reset(); } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; - timer_.cancel(ec); - std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ()); std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds); std::uint32_t const maxTime (ourTime + clockToleranceDeltaSeconds); @@ -1113,8 +1118,7 @@ PeerImp::on_message (std::shared_ptr const& m) } } - sendGetPeers(); - return ec; + return sendGetPeers(); } if (result == PeerFinder::Result::full) @@ -1122,44 +1126,35 @@ PeerImp::on_message (std::shared_ptr const& m) // TODO Provide correct HTTP response auto const redirects = overlay_.peerFinder().redirect (slot_); sendEndpoints (redirects.begin(), redirects.end()); - gracefulClose(); - return ec; + return gracefulClose(); } - else + else if (result == PeerFinder::Result::duplicate) { - // VFALCO TODO Duplicate connection - //fail("Hello: Duplicate public key"); + return fail("Duplicate public key"); } } - ec = invalid_argument_error(); - return ec; + fail("TMHello invalid"); } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; if (m->type () == protocol::TMPing::ptPING) { m->set_type (protocol::TMPing::ptPONG); send (std::make_shared (*m, protocol::mtPING)); } - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; if (m->has_response ()) { // this is an answer to a proof of work we requested if (m->response ().size () != (256 / 8)) - { - charge (Resource::feeInvalidRequest); - return ec; - } + return charge (Resource::feeInvalidRequest); uint256 response; memcpy (response.begin (), m->response ().data (), 256 / 8); @@ -1169,20 +1164,16 @@ PeerImp::on_message (std::shared_ptr const& m) m->token (), response); if (r == powOK) - { // credit peer // WRITEME - return ec; - } + return; // return error message // WRITEME if (r != powTOOEASY) - { charge (Resource::feeBadProofOfWork); - } - return ec; + return; } if (m->has_result ()) @@ -1200,10 +1191,7 @@ PeerImp::on_message (std::shared_ptr const& m) if ((m->challenge ().size () != (256 / 8)) || ( m->target ().size () != (256 / 8))) - { - charge (Resource::feeInvalidRequest); - return ec; - } + return charge (Resource::feeInvalidRequest); memcpy (challenge.begin (), m->challenge ().data (), 256 / 8); memcpy (target.begin (), m->target ().data (), 256 / 8); @@ -1211,10 +1199,7 @@ PeerImp::on_message (std::shared_ptr const& m) m->token (), m->iterations (), challenge, target); if (!pow->isValid ()) - { - charge (Resource::feeInvalidRequest); - return ec; - } + return charge (Resource::feeInvalidRequest); #if 0 // Until proof of work is completed, don't do it getApp().getJobQueue ().addJob ( @@ -1224,24 +1209,18 @@ PeerImp::on_message (std::shared_ptr const& m) std::weak_ptr (shared_from_this ()), pow)); #endif - return ec; + return; } p_journal_.info << "Bad proof of work"; - - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; // VFALCO NOTE I think we should drop the peer immediately if (! cluster()) - { - charge (Resource::feeUnwantedData); - return ec; - } + return charge (Resource::feeUnwantedData); for (int i = 0; i < m->clusternodes().size(); ++i) { @@ -1276,21 +1255,17 @@ PeerImp::on_message (std::shared_ptr const& m) } getApp().getFeeTrack().setClusterFee(getApp().getUNL().getClusterFee()); - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; // VFALCO TODO This message is now obsolete due to PeerFinder - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; // VFALCO TODO This message is now obsolete due to PeerFinder std::vector list; list.reserve (m->nodes().size()); @@ -1309,13 +1284,11 @@ PeerImp::on_message (std::shared_ptr const& m) if (! list.empty()) overlay_.peerFinder().on_legacy_endpoints (list); - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; std::vector endpoints; endpoints.reserve (m->endpoints().size()); @@ -1353,13 +1326,11 @@ PeerImp::on_message (std::shared_ptr const& m) if (! endpoints.empty()) overlay_.peerFinder().on_endpoints (slot_, endpoints); - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; Serializer s (m->rawtransaction ()); try @@ -1378,11 +1349,11 @@ PeerImp::on_message (std::shared_ptr const& m) if (flags & SF_BAD) { charge (Resource::feeInvalidSignature); - return ec; + return; } if (!(flags & SF_RETRY)) - return ec; + return; } p_journal_.debug << @@ -1420,34 +1391,29 @@ PeerImp::on_message (std::shared_ptr const& m) p_journal_.warning << "Transaction invalid: " << s.getHex(); } - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; getApp().getJobQueue().addJob (jtPACK, "recvGetLedger", std::bind( beast::weak_fn(&PeerImp::getLedger, shared_from_this()), m)); - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; protocol::TMLedgerData& packet = *m; if (m->nodes ().size () <= 0) { p_journal_.warning << "Ledger/TXset data with no nodes"; - return ec; + return; } if (m->has_requestcookie ()) { Peer::ptr target = overlay_.findPeerByShortID (m->requestcookie ()); - if (target) { m->clear_requestcookie (); @@ -1459,8 +1425,7 @@ PeerImp::on_message (std::shared_ptr const& m) p_journal_.info << "Unable to route TX/ledger data reply"; charge (Resource::feeUnwantedData); } - - return ec; + return; } uint256 hash; @@ -1469,7 +1434,7 @@ PeerImp::on_message (std::shared_ptr const& m) { p_journal_.warning << "TX candidate reply with invalid hash size"; charge (Resource::feeInvalidRequest); - return ec; + return; } memcpy (hash.begin (), m->ledgerhash ().data (), 32); @@ -1477,12 +1442,10 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->type () == protocol::liTS_CANDIDATE) { // got data for a candidate transaction set - getApp().getJobQueue().addJob(jtTXN_DATA, "recvPeerData", std::bind( beast::weak_fn(&PeerImp::peerTXData, shared_from_this()), std::placeholders::_1, hash, m, p_journal_)); - - return ec; + return; } if (!getApp().getInboundLedgers ().gotLedgerData ( @@ -1491,18 +1454,16 @@ PeerImp::on_message (std::shared_ptr const& m) p_journal_.trace << "Got data for unwanted ledger"; charge (Resource::feeUnwantedData); } - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; protocol::TMProposeSet& set = *m; // VFALCO Magic numbers are bad if ((set.closetime() + 180) < getApp().getOPs().getCloseTimeNC()) - return ec; + return; // VFALCO Magic numbers are bad // Roll this into a validation function @@ -1516,14 +1477,14 @@ PeerImp::on_message (std::shared_ptr const& m) { p_journal_.warning << "Proposal: malformed"; charge (Resource::feeInvalidSignature); - return ec; + return; } if (set.has_previousledger () && (set.previousledger ().size () != 32)) { p_journal_.warning << "Proposal: malformed"; charge (Resource::feeInvalidRequest); - return ec; + return; } uint256 proposeHash, prevLedger; @@ -1541,7 +1502,7 @@ PeerImp::on_message (std::shared_ptr const& m) suppression, id_)) { p_journal_.trace << "Proposal: duplicate"; - return ec; + return; } RippleAddress signerPublic = RippleAddress::createNodePublic ( @@ -1550,14 +1511,14 @@ PeerImp::on_message (std::shared_ptr const& m) if (signerPublic == getConfig ().VALIDATION_PUB) { p_journal_.trace << "Proposal: self"; - return ec; + return; } bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic); if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ()) { p_journal_.debug << "Proposal: Dropping UNTRUSTED (load)"; - return ec; + return; } p_journal_.trace << @@ -1579,13 +1540,11 @@ PeerImp::on_message (std::shared_ptr const& m) "recvPropose->checkPropose", std::bind(beast::weak_fn( &PeerImp::checkPropose, shared_from_this()), std::placeholders::_1, m, proposal, consensusLCL)); - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; p_journal_.trace << "Status: Change"; if (!m->has_networktime ()) @@ -1610,7 +1569,7 @@ PeerImp::on_message (std::shared_ptr const& m) } previousLedgerHash_.zero (); - return ec; + return; } if (m->has_ledgerhash () && (m->ledgerhash ().size () == (256 / 8))) @@ -1640,25 +1599,24 @@ PeerImp::on_message (std::shared_ptr const& m) minLedger_ = m->firstseq (); maxLedger_ = m->lastseq (); + // VFALCO Is this workaround still needed? // Work around some servers that report sequences incorrectly if (minLedger_ == 0) maxLedger_ = 0; if (maxLedger_ == 0) minLedger_ = 0; } - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; uint256 hashes; if (m->hash ().size () != (256 / 8)) { charge (Resource::feeInvalidRequest); - return ec; + return; } uint256 hash; @@ -1678,11 +1636,10 @@ PeerImp::on_message (std::shared_ptr const& m) shared_from_this (), hash, m->status ())) charge (Resource::feeUnwantedData); } - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { error_code ec; std::uint32_t closeTime = getApp().getOPs().getCloseTimeNC(); @@ -1691,7 +1648,7 @@ PeerImp::on_message (std::shared_ptr const& m) { p_journal_.warning << "Validation: Too small"; charge (Resource::feeInvalidRequest); - return ec; + return; } try @@ -1705,14 +1662,14 @@ PeerImp::on_message (std::shared_ptr const& m) { p_journal_.trace << "Validation: Too old"; charge (Resource::feeUnwantedData); - return ec; + return; } if (! getApp().getHashRouter ().addSuppressionPeer ( s.getSHA512Half(), id_)) { p_journal_.trace << "Validation: duplicate"; - return ec; + return; } bool isTrusted = getApp().getUNL ().nodeInUNL (val->getSignerPublic ()); @@ -1742,14 +1699,11 @@ PeerImp::on_message (std::shared_ptr const& m) "Validation: Unknown exception"; charge (Resource::feeInvalidRequest); } - - return ec; } -PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +void +PeerImp::onMessage (std::shared_ptr const& m) { - error_code ec; protocol::TMGetObjectByHash& packet = *m; if (packet.query ()) @@ -1758,7 +1712,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK) { doFetchPack (m); - return ec; + return; } protocol::TMGetObjectByHash reply; @@ -1862,7 +1816,6 @@ PeerImp::on_message (std::shared_ptr const& m) if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK) getApp().getOPs ().gotFetchPack (progress, pLSeq); } - return ec; } //-------------------------------------------------------------------------- diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 851a4cdb0b..2d9389007b 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -22,8 +22,7 @@ #include #include -#include -#include +#include #include #include #include @@ -44,14 +43,10 @@ namespace ripple { -class PeerImp; - class PeerImp : public Peer , public std::enable_shared_from_this , public OverlayImpl::Child - , private beast::LeakChecked - , private abstract_protocol_handler { public: /** Type of connection. @@ -109,7 +104,7 @@ private: boost::asio::basic_waitable_timer< std::chrono::steady_clock> timer_; - Type type_ = Type::legacy; + //Type type_ = Type::legacy; // Updated at each stage of the connection process to reflect // the current conditions as closely as possible. @@ -152,7 +147,6 @@ private: beast::http::message http_message_; boost::optional http_parser_; beast::http::body http_body_; - message_stream message_stream_; beast::asio::streambuf write_buffer_; std::queue send_queue_; bool gracefulClose_ = false; @@ -371,9 +365,10 @@ private: void onWriteMessage (error_code ec, std::size_t bytes_transferred); +public: //-------------------------------------------------------------------------- // - // abstract_protocol_handler + // ProtocolStream // //-------------------------------------------------------------------------- @@ -386,32 +381,31 @@ private: } error_code - on_message_unknown (std::uint16_t type) override; + onMessageUnknown (std::uint16_t type); error_code - on_message_begin (std::uint16_t type, - std::shared_ptr <::google::protobuf::Message> const& m) override; + onMessageBegin (std::uint16_t type, + std::shared_ptr <::google::protobuf::Message> const& m); void - on_message_end (std::uint16_t type, - std::shared_ptr <::google::protobuf::Message> const& m) override; + onMessageEnd (std::uint16_t type, + std::shared_ptr <::google::protobuf::Message> const& m); - // message handlers - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; - error_code on_message (std::shared_ptr const& m) override; + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); //-------------------------------------------------------------------------- @@ -498,7 +492,6 @@ PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint, , m_inbound (true) , state_ (State::connected) , slot_ (slot) - , message_stream_(*this) { read_buffer_.commit(boost::asio::buffer_copy(read_buffer_.prepare( boost::asio::buffer_size(buffer)), buffer)); diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h new file mode 100644 index 0000000000..e627434e95 --- /dev/null +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -0,0 +1,240 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED +#define RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED + +#include "ripple.pb.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // + +namespace ripple { + +/** Implements ZeroCopyInputStream around a buffer sequence. + @tparam Buffers A type meeting the requirements of ConstBufferSequence. +*/ +template +class ZeroCopyInputStream + : public ::google::protobuf::io::ZeroCopyInputStream +{ +private: + using iterator = typename Buffers::const_iterator; + using const_buffer = boost::asio::const_buffer; + + std::int64_t count_ = 0; + iterator last_; + iterator first_; // Where pos_ comes from + const_buffer pos_; // What Next() will return + +public: + ZeroCopyInputStream (Buffers const& buffers); + + bool + Next (const void** data, int* size) override; + + void + BackUp (int count) override; + + bool + Skip (int count) override; + + std::int64_t + ByteCount() const override + { + return count_; + } +}; + +//------------------------------------------------------------------------------ + +template +ZeroCopyInputStream::ZeroCopyInputStream (Buffers const& buffers) + : last_ (buffers.end()) + , first_ (buffers.begin()) + , pos_ ((first_ != last_) ? + *first_ : const_buffer(nullptr, 0)) +{ +} + +template +bool +ZeroCopyInputStream::Next (const void** data, int* size) +{ + *data = boost::asio::buffer_cast(pos_); + *size = boost::asio::buffer_size(pos_); + if (first_ == last_) + return false; + count_ += *size; + pos_ = (++first_ != last_) ? *first_ : + const_buffer(nullptr, 0); + return true; +} + +template +void +ZeroCopyInputStream::BackUp (int count) +{ + --first_; + pos_ = *first_ + + (boost::asio::buffer_size(*first_) - count); + count_ -= count; +} + +template +bool +ZeroCopyInputStream::Skip (int count) +{ + if (first_ == last_) + return false; + while (count > 0) + { + auto const size = + boost::asio::buffer_size(pos_); + if (count < size) + { + pos_ = pos_ + count; + count_ += count; + return true; + } + count_ += size; + if (++first_ == last_) + return false; + count -= size; + pos_ = *first_; + } + return true; +} + +//------------------------------------------------------------------------------ + +namespace detail { + +template +std::enable_if_t::value, + boost::system::error_code> +invoke (int type, Buffers const& buffers, + Handler& handler) +{ + ZeroCopyInputStream stream(buffers); + stream.Skip(Message::kHeaderBytes); + auto const m (std::make_shared()); + if (! m->ParseFromZeroCopyStream(&stream)) + return boost::system::errc::make_error_code( + boost::system::errc::invalid_argument); + auto ec = handler.onMessageBegin (type, m); + if (! ec) + { + handler.onMessage (m); + handler.onMessageEnd (type, m); + } + return ec; +} + +} + +/** Calls the handler for up to one protocol message in the passed buffers. + + If there is insufficient data to produce a complete protocol + message, zero is returned for the number of bytes consumed. + + @return The number of bytes consumed, or the error code if any. +*/ +template +std::pair +invokeProtocolMessage (Buffers const& buffers, Handler& handler) +{ + std::pair result = { 0, {} }; + boost::system::error_code& ec = result.second; + + auto const type = Message::type(buffers); + if (type == 0) + return result; + auto const size = Message::kHeaderBytes + Message::size(buffers); + if (boost::asio::buffer_size(buffers) < size) + return result; + + switch (type) + { + case protocol::mtHELLO: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPING: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPROOFOFWORK: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtCLUSTER: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtGET_PEERS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPEERS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtENDPOINTS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtTRANSACTION: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtGET_LEDGER: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtLEDGER_DATA: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPROPOSE_LEDGER:ec = detail::invoke (type, buffers, handler); break; + case protocol::mtSTATUS_CHANGE: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtHAVE_SET: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtVALIDATION: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtGET_OBJECTS: ec = detail::invoke (type, buffers, handler); break; + default: + ec = handler.onMessageUnknown (type); + break; + } + if (! ec) + result.first = size; + + return result; +} + +//------------------------------------------------------------------------------ + +/** Returns the name of a protocol message given its type. */ +inline +std::string +protocolMessageName (int type) +{ + switch (type) + { + case protocol::mtHELLO: return "hello"; + case protocol::mtPING: return "ping"; + case protocol::mtPROOFOFWORK: return "proof_of_work"; + case protocol::mtCLUSTER: return "cluster"; + case protocol::mtGET_PEERS: return "get_peers"; + case protocol::mtPEERS: return "peers"; + case protocol::mtENDPOINTS: return "endpoints"; + case protocol::mtTRANSACTION: return "tx"; + case protocol::mtGET_LEDGER: return "get_ledger"; + case protocol::mtLEDGER_DATA: return "ledger_data"; + case protocol::mtPROPOSE_LEDGER: return "propose"; + case protocol::mtSTATUS_CHANGE: return "status"; + case protocol::mtHAVE_SET: return "have_set"; + case protocol::mtVALIDATION: return "validation"; + case protocol::mtGET_OBJECTS: return "get_objects"; + default: + break; + }; + return "unknown"; +} + +} // ripple + +#endif diff --git a/src/ripple/overlay/impl/abstract_protocol_handler.h b/src/ripple/overlay/impl/abstract_protocol_handler.h deleted file mode 100644 index a58412a60e..0000000000 --- a/src/ripple/overlay/impl/abstract_protocol_handler.h +++ /dev/null @@ -1,67 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_OVERLAY_ABSTRACT_PROTOCOL_HANDLER_H_INCLUDED -#define RIPPLE_OVERLAY_ABSTRACT_PROTOCOL_HANDLER_H_INCLUDED - -#include "ripple.pb.h" -#include -#include - -namespace ripple { - -/** Handles protocol messages. */ -class abstract_protocol_handler -{ -protected: - typedef boost::system::error_code error_code; - -public: - // Called for messages of unknown type - virtual error_code on_message_unknown (std::uint16_t type) = 0; - - // Called before a specific message handler is invoked - virtual error_code on_message_begin (std::uint16_t type, - std::shared_ptr <::google::protobuf::Message> const& m) = 0; - - // Called after a specific message handler is invoked, - // if on_message_begin did not return an error. - virtual void on_message_end (std::uint16_t type, - std::shared_ptr <::google::protobuf::Message> const& m) = 0; - - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } - virtual error_code on_message (std::shared_ptr const& m) { return error_code(); } -}; - -} // ripple - -#endif diff --git a/src/ripple/overlay/impl/message_name.cpp b/src/ripple/overlay/impl/message_name.cpp deleted file mode 100644 index 15990e0bf8..0000000000 --- a/src/ripple/overlay/impl/message_name.cpp +++ /dev/null @@ -1,48 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -namespace ripple { - -char const* -protocol_message_name (int type) -{ - switch (type) - { - case protocol::mtHELLO: return "hello"; - case protocol::mtPING: return "ping"; - case protocol::mtPROOFOFWORK: return "proof_of_work"; - case protocol::mtCLUSTER: return "cluster"; - case protocol::mtGET_PEERS: return "get_peers"; - case protocol::mtPEERS: return "peers"; - case protocol::mtENDPOINTS: return "endpoints"; - case protocol::mtTRANSACTION: return "tx"; - case protocol::mtGET_LEDGER: return "get_ledger"; - case protocol::mtLEDGER_DATA: return "ledger_data"; - case protocol::mtPROPOSE_LEDGER: return "propose"; - case protocol::mtSTATUS_CHANGE: return "status"; - case protocol::mtHAVE_SET: return "have_set"; - case protocol::mtVALIDATION: return "validation"; - case protocol::mtGET_OBJECTS: return "get_objects"; - default: - break; - }; - return "uknown"; -} - -} diff --git a/src/ripple/overlay/impl/message_name.h b/src/ripple/overlay/impl/message_name.h deleted file mode 100644 index cd0fe297d7..0000000000 --- a/src/ripple/overlay/impl/message_name.h +++ /dev/null @@ -1,30 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_OVERLAY_MESSAGE_NAME_H_INCLUDED -#define RIPPLE_OVERLAY_MESSAGE_NAME_H_INCLUDED - -namespace ripple { - -char const* -protocol_message_name (int type); - -} - -#endif diff --git a/src/ripple/overlay/impl/message_stream.h b/src/ripple/overlay/impl/message_stream.h deleted file mode 100644 index 5d4ec4a979..0000000000 --- a/src/ripple/overlay/impl/message_stream.h +++ /dev/null @@ -1,171 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_OVERLAY_MESSAGE_STREAM_H_INCLUDED -#define RIPPLE_OVERLAY_MESSAGE_STREAM_H_INCLUDED - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ripple { - -/** Turns a stream of bytes into protocol messages and invokes the handler. */ -class message_stream -{ -private: - abstract_protocol_handler& handler_; - std::size_t header_bytes_; - std::size_t body_bytes_; - std::uint32_t length_; - std::uint16_t type_; - std::vector header_; // VFALCO TODO Use std::array - std::vector body_; - - static - boost::system::error_code - parse_error() - { - return boost::system::errc::make_error_code ( - boost::system::errc::invalid_argument); - } - - template - boost::system::error_code - invoke() - { - boost::system::error_code ec; - std::shared_ptr m (std::make_shared ()); - bool const parsed (m->ParseFromArray (body_.data(), length_)); - if (! parsed) - return parse_error(); - ec = handler_.on_message_begin (type_, m); - if (! ec) - { - ec = handler_.on_message (m); - handler_.on_message_end (type_, m); - } - return ec; - } - -public: - message_stream (abstract_protocol_handler& handler) - : handler_(handler) - , header_bytes_(0) - , body_bytes_(0) - { - header_.resize (Message::kHeaderBytes); - } - - /** Push a single buffer through. - The handler is called for each complete protocol message contained - in the buffer. - */ - template - boost::system::error_code - write_one (ConstBuffer const& cb) - { - using namespace boost::asio; - boost::system::error_code ec; - const_buffer buffer (cb); - std::size_t remain (buffer_size(buffer)); - while (remain) - { - if (header_bytes_ < header_.size()) - { - std::size_t const n (buffer_copy (mutable_buffer (header_.data() + - header_bytes_, header_.size() - header_bytes_), buffer)); - header_bytes_ += n; - buffer = buffer + n; - remain = remain - n; - if (header_bytes_ >= header_.size()) - { - assert (header_bytes_ == header_.size()); - length_ = Message::getLength (header_); - type_ = Message::getType (header_); - body_.resize (length_); - } - } - - if (header_bytes_ >= header_.size()) - { - std::size_t const n (buffer_copy (mutable_buffer (body_.data() + - body_bytes_, body_.size() - body_bytes_), buffer)); - body_bytes_ += n; - buffer = buffer + n; - remain = remain - n; - if (body_bytes_ >= length_) - { - assert (body_bytes_ == length_); - switch (type_) - { - case protocol::mtHELLO: ec = invoke (); break; - case protocol::mtPING: ec = invoke (); break; - case protocol::mtPROOFOFWORK: ec = invoke (); break; - case protocol::mtCLUSTER: ec = invoke (); break; - case protocol::mtGET_PEERS: ec = invoke (); break; - case protocol::mtPEERS: ec = invoke (); break; - case protocol::mtENDPOINTS: ec = invoke (); break; - case protocol::mtTRANSACTION: ec = invoke (); break; - case protocol::mtGET_LEDGER: ec = invoke (); break; - case protocol::mtLEDGER_DATA: ec = invoke (); break; - case protocol::mtPROPOSE_LEDGER: ec = invoke (); break; - case protocol::mtSTATUS_CHANGE: ec = invoke (); break; - case protocol::mtHAVE_SET: ec = invoke (); break; - case protocol::mtVALIDATION: ec = invoke (); break; - case protocol::mtGET_OBJECTS: ec = invoke (); break; - default: - ec = handler_.on_message_unknown(type_); - break; - } - header_bytes_ = 0; - body_bytes_ = 0; - } - } - } - return ec; - } - - /** Push a set of buffers through. - The handler is called for each complete protocol message contained - in the buffers. - */ - template - boost::system::error_code - write (ConstBufferSequence const& buffers) - { - boost::system::error_code ec; - for (auto const& buffer : buffers) - { - ec = write_one(buffer); - if (ec) - break; - } - return ec; - } -}; - -} // ripple - -#endif diff --git a/src/ripple/unity/overlay.cpp b/src/ripple/unity/overlay.cpp index 147cf6d036..c069589ec1 100644 --- a/src/ripple/unity/overlay.cpp +++ b/src/ripple/unity/overlay.cpp @@ -24,7 +24,6 @@ #include #include -#include #include #include #include