diff --git a/src/ripple/json/json_reader.h b/src/ripple/json/json_reader.h index 42d107af16..c60e482288 100644 --- a/src/ripple/json/json_reader.h +++ b/src/ripple/json/json_reader.h @@ -24,7 +24,7 @@ #include #include - +#include #include namespace Json @@ -64,6 +64,16 @@ public: /// \see Json::operator>>(std::istream&, Json::Value&). bool parse ( std::istream& is, Value& root); + /** \brief Read a Value from a JSON buffer sequence. + * \param root [out] Contains the root value of the document if it was + * successfully parsed. + * \param UTF-8 encoded buffer sequence. + * \return \c true if the buffer was successfully parsed, \c false if an error occurred. + */ + template + bool + parse(Value& root, BufferSequence const& bs); + /** \brief Returns a user friendly string that list errors in the parsed document. * \return Formatted error message with the list of errors with their location in * the parsed document. An empty string is returned if no error occurred @@ -161,6 +171,20 @@ private: Value* lastValue_; }; +template +bool +Reader::parse(Value& root, BufferSequence const& bs) +{ + using namespace boost::asio; + for (auto const& b : bs) + { + auto begin = buffer_cast(b); + if(! parse(begin, begin + buffer_size(b), root)) + return false; + } + return true; +} + /** \brief Read from 'sin' into 'root'. Always keep comments from the input JSON. diff --git a/src/ripple/net/InfoSub.h b/src/ripple/net/InfoSub.h index a78d6a5e5a..93058341a0 100644 --- a/src/ripple/net/InfoSub.h +++ b/src/ripple/net/InfoSub.h @@ -118,6 +118,7 @@ public: }; public: + InfoSub (Source& source); InfoSub (Source& source, Consumer consumer); virtual ~InfoSub (); @@ -156,6 +157,14 @@ private: hash_set normalSubscriptions_; std::shared_ptr mPathRequest; std::uint64_t mSeq; + + static + int + assign_id() + { + static std::atomic id(0); + return ++id; + } }; } // ripple diff --git a/src/ripple/net/impl/InfoSub.cpp b/src/ripple/net/impl/InfoSub.cpp index d01db1d681..8683ee1082 100644 --- a/src/ripple/net/impl/InfoSub.cpp +++ b/src/ripple/net/impl/InfoSub.cpp @@ -43,12 +43,17 @@ InfoSub::Source::Source (char const* name, Stoppable& parent) //------------------------------------------------------------------------------ -InfoSub::InfoSub (Source& source, Consumer consumer) - : m_consumer (consumer) - , m_source (source) +InfoSub::InfoSub(Source& source) + : m_source(source) + , mSeq(assign_id()) +{ +} + +InfoSub::InfoSub(Source& source, Consumer consumer) + : m_consumer(consumer) + , m_source(source) + , mSeq(assign_id()) { - static std::atomic s_seq_id (0); - mSeq = ++s_seq_id; } InfoSub::~InfoSub () diff --git a/src/ripple/server/WSSession.h b/src/ripple/server/WSSession.h index 36d1c6fa78..2ffa9c1aaf 100644 --- a/src/ripple/server/WSSession.h +++ b/src/ripple/server/WSSession.h @@ -111,10 +111,18 @@ struct WSSession { std::shared_ptr appDefined; + virtual + void + run() = 0; + virtual Port const& port() const = 0; + virtual + http_request_type const& + request() const = 0; + virtual boost::asio::ip::tcp::endpoint const& remote_endpoint() const = 0; @@ -127,6 +135,14 @@ struct WSSession virtual void close() = 0; + + /** Indicate that the response is complete. + The handler should call this when it has completed writing + the response. + */ + virtual + void + complete() = 0; }; } // ripple diff --git a/src/ripple/server/impl/BaseHTTPPeer.h b/src/ripple/server/impl/BaseHTTPPeer.h index 8608339289..ede5e9b130 100644 --- a/src/ripple/server/impl/BaseHTTPPeer.h +++ b/src/ripple/server/impl/BaseHTTPPeer.h @@ -67,7 +67,6 @@ protected: // Max seconds without completing a message timeoutSeconds = 30 - }; struct buffer @@ -105,7 +104,6 @@ protected: bool complete_ = false; boost::system::error_code ec_; - clock_type::time_point when_; int request_count_ = 0; std::size_t bytes_in_ = 0; std::size_t bytes_out_ = 0; @@ -233,7 +231,6 @@ BaseHTTPPeer::BaseHTTPPeer (Port const& port, Handler& handler, id_ = std::string("#") + std::to_string(nid_) + " "; JLOG(journal_.trace()) << id_ << "accept: " << remote_address_.address(); - when_ = clock_type::now(); } template diff --git a/src/ripple/server/impl/BaseWSPeer.h b/src/ripple/server/impl/BaseWSPeer.h index 0206b98c7a..8c17f4f2ec 100644 --- a/src/ripple/server/impl/BaseWSPeer.h +++ b/src/ripple/server/impl/BaseWSPeer.h @@ -43,6 +43,12 @@ protected: using BasePeer::fail; using BasePeer::strand_; + enum + { + // Max seconds without completing a message + timeoutSeconds = 30 + }; + private: friend class BasePeer; @@ -52,6 +58,8 @@ private: beast::streambuf wb_; std::list> wq_; bool do_close_ = false; + beast::websocket::close_reason cr_; + waitable_timer timer_; public: template @@ -64,7 +72,7 @@ public: beast::Journal journal); void - run(); + run() override; // // WSSession @@ -76,6 +84,12 @@ public: return this->port_; } + http_request_type const& + request() const override + { + return this->request_; + } + boost::asio::ip::tcp::endpoint const& remote_endpoint() const override { @@ -88,6 +102,9 @@ public: void close() override; + void + complete() override; + protected: struct identity { @@ -138,6 +155,15 @@ protected: virtual void do_close() = 0; + + void + start_timer(); + + void + cancel_timer(); + + void + on_timer(error_code ec); }; //------------------------------------------------------------------------------ @@ -154,6 +180,7 @@ BaseWSPeer::BaseWSPeer( : BasePeer(port, handler, remote_address, io_service, journal) , request_(std::move(request)) + , timer_(io_service) { } @@ -175,11 +202,21 @@ template void BaseWSPeer::send(std::shared_ptr w) { + // Maximum send queue size + static std::size_t constexpr limit = 100; if(! strand_.running_in_this_thread()) return strand_.post(std::bind( &BaseWSPeer::send, impl().shared_from_this(), std::move(w))); - wq_.emplace_back(std::move(w)); + if (wq_.size() >= limit) + { + cr_.code = static_cast(4000); + cr_.reason = "Client is too slow."; + do_close_ = true; + wq_.erase(std::next(wq_.begin()), wq_.end()); + } + else + wq_.emplace_back(std::move(w)); if(wq_.size() == 1) on_write({}); } @@ -199,6 +236,16 @@ BaseWSPeer::close() beast::asio::placeholders::error))); } +template +void +BaseWSPeer::complete() +{ + if (! strand_.running_in_this_thread()) + return strand_.post(std::bind( + &BaseWSPeer::complete, impl().shared_from_this())); + do_read(); +} + template void BaseWSPeer::on_write_sb(error_code const& ec) @@ -222,6 +269,7 @@ template void BaseWSPeer::on_write(error_code const& ec) { + cancel_timer(); if(ec) return fail(ec, "write"); auto& w = *wq_.front(); @@ -231,6 +279,7 @@ BaseWSPeer::on_write(error_code const& ec) impl().shared_from_this())); if(boost::indeterminate(result.first)) return; + start_timer(); if(! result.first) impl().ws_.async_write_frame( result.first, result.second, strand_.wrap(std::bind( @@ -251,7 +300,7 @@ BaseWSPeer::on_write_fin(error_code const& ec) return fail(ec, "write_fin"); wq_.pop_front(); if(do_close_) - impl().ws_.async_close({}, strand_.wrap(std::bind( + impl().ws_.async_close(cr_, strand_.wrap(std::bind( &BaseWSPeer::on_close, impl().shared_from_this(), beast::asio::placeholders::error))); else if(! wq_.empty()) @@ -269,6 +318,7 @@ BaseWSPeer::do_read() impl().ws_.async_read(op_, rb_, strand_.wrap( std::bind(&BaseWSPeer::on_read, impl().shared_from_this(), placeholders::error))); + cancel_timer(); } template @@ -286,7 +336,6 @@ BaseWSPeer::on_read(error_code const& ec) std::back_inserter(b)); this->handler_.onWSMessage(impl().shared_from_this(), b); rb_.consume(rb_.size()); - do_read(); } template @@ -296,6 +345,42 @@ BaseWSPeer::on_close(error_code const& ec) // great } +template +void +BaseWSPeer::start_timer() +{ + // Max seconds without completing a message + static constexpr std::chrono::seconds timeout{30}; + error_code ec; + timer_.expires_from_now (timeout, ec); + if (ec) + return fail (ec, "start_timer"); + timer_.async_wait (strand_.wrap (std::bind ( + &BaseWSPeer::on_timer, impl().shared_from_this(), + beast::asio::placeholders::error))); +} + +// Convenience for discarding the error code +template +void +BaseWSPeer::cancel_timer() +{ + error_code ec; + timer_.cancel(ec); +} + +// Called when session times out +template +void +BaseWSPeer::on_timer (error_code ec) +{ + if (ec == boost::asio::error::operation_aborted) + return; + if (! ec) + ec = boost::system::errc::make_error_code ( + boost::system::errc::timed_out); + fail (ec, "timer"); +} } // ripple #endif diff --git a/src/ripple/server/impl/PlainHTTPPeer.h b/src/ripple/server/impl/PlainHTTPPeer.h index 7120d0ebc9..f3165e4592 100644 --- a/src/ripple/server/impl/PlainHTTPPeer.h +++ b/src/ripple/server/impl/PlainHTTPPeer.h @@ -99,7 +99,6 @@ PlainHTTPPeer::websocketUpgrade() port_, handler_, remote_address_, std::move(message_), std::move(stream_), journal_); - ws->run(); return ws; } diff --git a/src/ripple/server/impl/Port.cpp b/src/ripple/server/impl/Port.cpp index 60403a48b5..ab69bff98d 100644 --- a/src/ripple/server/impl/Port.cpp +++ b/src/ripple/server/impl/Port.cpp @@ -23,6 +23,7 @@ namespace ripple { +// Detects legacy websockets only. bool Port::websockets() const { @@ -33,7 +34,9 @@ bool Port::secure() const { return protocol.count("peer") > 0 || - protocol.count("https") > 0 || protocol.count("wss") > 0; + protocol.count("https") > 0 || + protocol.count("wss") > 0 || + protocol.count("wss2") > 0; } std::string diff --git a/src/ripple/server/impl/SSLHTTPPeer.h b/src/ripple/server/impl/SSLHTTPPeer.h index 3b8ff7cb23..5a1a37be33 100644 --- a/src/ripple/server/impl/SSLHTTPPeer.h +++ b/src/ripple/server/impl/SSLHTTPPeer.h @@ -104,7 +104,6 @@ SSLHTTPPeer::websocketUpgrade() port_, handler_, remote_address_, std::move(message_), std::move(ssl_bundle_), journal_); - ws->run(); return ws; } @@ -118,10 +117,11 @@ SSLHTTPPeer::do_handshake (yield_context yield) stream_type::server, read_buf_.data(), yield[ec])); cancel_timer(); if (ec) - return fail (ec, "handshake"); + return fail(ec, "handshake"); bool const http = port().protocol.count("peer") > 0 || - //|| port().protocol.count("wss") > 0 + //port().protocol.count("wss") > 0 || + port().protocol.count("wss2") > 0 || port().protocol.count("https") > 0; if (http) { diff --git a/src/ripple/server/impl/ServerHandlerImp.cpp b/src/ripple/server/impl/ServerHandlerImp.cpp index 8a1e9f665d..c748c803fc 100644 --- a/src/ripple/server/impl/ServerHandlerImp.cpp +++ b/src/ripple/server/impl/ServerHandlerImp.cpp @@ -124,37 +124,35 @@ ServerHandlerImp::onHandoff (Session& session, boost::asio::ip::tcp::endpoint remote_address) -> Handoff { - if (session.port().protocol.count("wss2") > 0 && - isWebsocketUpgrade (request)) + if(isWebsocketUpgrade(request)) { - // VFALCO TODO - Resource::Consumer usage; - //if (isUnlimited (role)) - // usage = m_resourceManager.newUnlimitedEndpoint ( - // remoteIPAddress.to_string()); - //else - usage = m_resourceManager.newInboundEndpoint( - beast::IP::from_asio(remote_address)); - auto const ws = session.websocketUpgrade(); - ws->appDefined = std::make_shared( - m_networkOPs, usage, ws); Handoff handoff; - handoff.moved = true; - return handoff; + if(session.port().protocol.count("wss2") > 0) + { + auto const ws = session.websocketUpgrade(); + auto is = std::make_shared(m_networkOPs, ws); + is->getConsumer() = requestInboundEndpoint( + m_resourceManager, + beast::IPAddressConversion::from_asio(remote_address), + session.port(), is->user()); + ws->appDefined = std::move(is); + ws->run(); + handoff.moved = true; + return handoff; + } + + if(session.port().protocol.count("wss") > 0) + return handoff; // Pass to websocket } - if (session.port().protocol.count("wss") > 0 && - isWebsocketUpgrade (request)) + + if(session.port().protocol.count("peer") > 0) { - // Pass to websockets - Handoff handoff; - // handoff.moved = true; - return handoff; - } - if (session.port().protocol.count("peer") > 0) - return app_.overlay().onHandoff (std::move(bundle), + return app_.overlay().onHandoff(std::move(bundle), std::move(request), remote_address); - // Pass through to legacy onRequest - return Handoff{}; + } + + // Pass to legacy onRequest + return {}; } auto @@ -164,34 +162,22 @@ ServerHandlerImp::onHandoff (Session& session, boost::asio::ip::tcp::endpoint remote_address) -> Handoff { - if (session.port().protocol.count("ws2") > 0 && + Handoff handoff; + if(session.port().protocol.count("ws2") > 0 && isWebsocketUpgrade (request)) { - // VFALCO TODO - Resource::Consumer usage; - //if (isUnlimited (role)) - // usage = m_resourceManager.newUnlimitedEndpoint ( - // remoteIPAddress.to_string()); - //else - usage = m_resourceManager.newInboundEndpoint( - beast::IP::from_asio(remote_address)); auto const ws = session.websocketUpgrade(); - ws->appDefined = std::make_shared( - m_networkOPs, usage, ws); - Handoff handoff; + auto is = std::make_shared(m_networkOPs, ws); + is->getConsumer() = requestInboundEndpoint( + m_resourceManager, + beast::IPAddressConversion::from_asio(remote_address), + session.port(), is->user()); + ws->appDefined = std::move(is); + ws->run(); handoff.moved = true; - return handoff; } - if (session.port().protocol.count("ws") > 0 && - isWebsocketUpgrade (request)) - { - // Pass to websockets - Handoff handoff; - // handoff.moved = true; - return handoff; - } - // Pass through to legacy onRequest - return Handoff{}; + // Otherwise pass to legacy onRequest or websocket + return handoff; } static inline @@ -219,6 +205,21 @@ build_map(beast::http::headers const& h) return c; } +template +static +std::string +buffers_to_string(ConstBufferSequence const& bs) +{ + using boost::asio::buffer_cast; + using boost::asio::buffer_size; + std::string s; + s.reserve(buffer_size(bs)); + for(auto const& b : bs) + s.append(buffer_cast(b), + buffer_size(b)); + return s; +} + void ServerHandlerImp::onRequest (Session& session) { @@ -252,29 +253,37 @@ ServerHandlerImp::onWSMessage( std::shared_ptr session, std::vector const& buffers) { - // VFALCO This is inefficient, the JSON - // should be parsed from the buffer sequence. - std::string s; - s.reserve(boost::asio::buffer_size(buffers)); - std::copy(boost::asio::buffers_begin(buffers), - boost::asio::buffers_end(buffers), - std::back_inserter(s)); -//m_journal.error << "Recv: " << s; Json::Value jv; - // VFALCO should we parse a coroutine instead? - if(! Json::Reader{}.parse(s, jv)) + auto const size = boost::asio::buffer_size(buffers); + if (size > RPC::Tuning::maxRequestSize || + ! Json::Reader{}.parse(jv, buffers) || + ! jv || + ! jv.isObject()) { - // TODO Send error + Json::Value jvResult(Json::objectValue); + jvResult[jss::type] = jss::error; + jvResult[jss::error] = "jsonInvalid"; + jvResult[jss::value] = buffers_to_string(buffers); + beast::streambuf sb; + Json::stream(jvResult, + [&sb](auto const p, auto const n) + { + sb.commit(boost::asio::buffer_copy( + sb.prepare(n), boost::asio::buffer(p, n))); + }); + session->send(std::make_shared< + StreambufWSMsg>(std::move(sb))); + session->complete(); return; } + m_jobQueue.postCoro(jtCLIENT, "WS-Client", [this, session = std::move(session), - jv = std::move(jv)](auto const& coro) + jv = std::move(jv)](auto const& jc) { auto const jr = - this->processSession(session, coro, jv); + this->processSession(session, jc, jv); beast::streambuf sb; -//m_journal.error << "Send: " << to_string(jr); Json::stream(jr, [&sb](auto const p, auto const n) { @@ -283,6 +292,7 @@ ServerHandlerImp::onWSMessage( }); session->send(std::make_shared< StreambufWSMsg>(std::move(sb))); + session->complete(); }); } @@ -308,58 +318,44 @@ ServerHandlerImp::processSession( std::shared_ptr const& coro, Json::Value const& jv) { - auto is = std::static_pointer_cast (session->appDefined); - /* - if (getConsumer().disconnect ()) + auto is = std::static_pointer_cast (session->appDefined); + if (is->getConsumer().disconnect()) { - disconnect (); - return rpcError (rpcSLOW_DOWN); + session->close(); + return rpcError(rpcSLOW_DOWN); } - */ // Requests without "command" are invalid. - // - if (!jv.isMember (jss::command)) + Json::Value jr(Json::objectValue); + if (! jv.isMember (jss::command)) { - Json::Value jr (Json::objectValue); - - jr[jss::type] = jss::response; - jr[jss::status] = jss::error; - jr[jss::error] = jss::missingCommand; + jr[jss::type] = jss::response; + jr[jss::status] = jss::error; + jr[jss::error] = jss::missingCommand; jr[jss::request] = jv; - if (jv.isMember (jss::id)) - { jr[jss::id] = jv[jss::id]; - } - - /* - getConsumer().charge (Resource::feeInvalidRPC); - */ + is->getConsumer().charge(Resource::feeInvalidRPC); return jr; } Resource::Charge loadType = Resource::feeReferenceRPC; - Json::Value jr (Json::objectValue); - - auto required = RPC::roleRequired (jv[jss::command].asString()); - // VFALCO TODO Get identity/credentials from HTTP headers - std::string const user = ""; - std::string const fwdfor = ""; - auto role = requestRole (required, session->port(), jv, + auto required = RPC::roleRequired(jv[jss::command].asString()); + auto role = requestRole( + required, + session->port(), + jv, beast::IP::from_asio(session->remote_endpoint().address()), - user); - + is->user()); if (Role::FORBID == role) { - jr[jss::result] = rpcError (rpcFORBIDDEN); + jr[jss::result] = rpcError (rpcFORBIDDEN); } else { - // VFALCO TODO InfoSub parameter in context RPC::Context context{ - app_.journal ("RPCHandler"), + app_.journal("RPCHandler"), jv, app_, loadType, @@ -369,28 +365,24 @@ ServerHandlerImp::processSession( role, coro, is, - { user, fwdfor } + {is->user(), is->forwarded_for()} }; - RPC::doCommand (context, jr[jss::result]); + RPC::doCommand(context, jr[jss::result]); } - /* - getConsumer().charge (loadType); - if (getConsumer().warn ()) - { + is->getConsumer().charge(loadType); + if (is->getConsumer().warn()) jr[jss::warning] = jss::load; - } - */ // Currently we will simply unwrap errors returned by the RPC // API, in the future maybe we can make the responses // consistent. // // Regularize result. This is duplicate code. - if (jr[jss::result].isMember (jss::error)) + if (jr[jss::result].isMember(jss::error)) { - jr = jr[jss::result]; - jr[jss::status] = jss::error; + jr = jr[jss::result]; + jr[jss::status] = jss::error; jr[jss::request] = jv; } @@ -399,47 +391,27 @@ ServerHandlerImp::processSession( jr[jss::status] = jss::success; // For testing resource limits on this connection. - if (jv[jss::command].asString() == "ping") - { - /* - if (getConsumer().isUnlimited()) + if (is->getConsumer().isUnlimited() && + jv[jss::command].asString() == "ping") jr[jss::unlimited] = true; - */ - } } - if (jv.isMember (jss::id)) - { + if (jv.isMember(jss::id)) jr[jss::id] = jv[jss::id]; - } - jr[jss::type] = jss::response; - return jr; } -template -static -std::string -buffers_to_string(ConstBufferSequence const& bs) -{ - using boost::asio::buffer_cast; - using boost::asio::buffer_size; - std::string s; - s.reserve(buffer_size(bs)); - for(auto const& b : bs) - s.append(buffer_cast(b), - buffer_size(b)); - return s; -} - // Run as a coroutine. void ServerHandlerImp::processSession (std::shared_ptr const& session, std::shared_ptr jobCoro) { - processRequest (session->port(), buffers_to_string(session->request().body.data()), - session->remoteAddress().at_port (0), makeOutput (*session), jobCoro, + processRequest ( + session->port(), buffers_to_string( + session->request().body.data()), + session->remoteAddress().at_port (0), + makeOutput (*session), jobCoro, [&] { auto const iter = @@ -506,10 +478,10 @@ ServerHandlerImp::processRequest (Port const& port, /* ---------------------------------------------------------------------- */ auto role = Role::FORBID; auto required = RPC::roleRequired(id.asString()); - - if (jsonRPC.isObject() && jsonRPC.isMember("params") && - jsonRPC["params"].isArray() && jsonRPC["params"].size() > 0 && - jsonRPC["params"][Json::UInt(0)].isObject()) + if (jsonRPC.isMember("params") && + jsonRPC["params"].isArray() && + jsonRPC["params"].size() > 0 && + jsonRPC["params"][Json::UInt(0)].isObject()) { role = requestRole(required, port, jsonRPC["params"][Json::UInt(0)], remoteIPAddress, user); diff --git a/src/ripple/server/impl/ServerHandlerImp.h b/src/ripple/server/impl/ServerHandlerImp.h index 255b45da39..1f45d6f1c6 100644 --- a/src/ripple/server/impl/ServerHandlerImp.h +++ b/src/ripple/server/impl/ServerHandlerImp.h @@ -45,13 +45,39 @@ bool operator< (Port const& lhs, Port const& rhs) class WSInfoSub : public InfoSub { std::weak_ptr ws_; + std::string user_; + std::string fwdfor_; public: - WSInfoSub(Source& source, Consumer consumer, + WSInfoSub(Source& source, std::shared_ptr const& ws) - : InfoSub(source, consumer) + : InfoSub(source) , ws_(ws) { + auto const& h = ws->request().headers; + auto it = h.find("X-User"); + if (it != h.end() && + isIdentified( + ws->port(), beast::IPAddressConversion::from_asio( + ws->remote_endpoint()).address(), it->second)) + { + user_ = it->second; + it = h.find("X-Forwarded-For"); + if (it != h.end()) + fwdfor_ = it->second; + } + } + + std::string + user() const + { + return user_; + } + + std::string + forwarded_for() const + { + return fwdfor_; } void