diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index 596855c2e..fc31aea31 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -22,6 +22,12 @@ + + true + true + true + true + true true @@ -285,6 +291,12 @@ true true + + true + true + true + true + true true @@ -1493,6 +1505,7 @@ + @@ -1560,6 +1573,7 @@ + diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index 805968654..8f87ff90d 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -984,6 +984,12 @@ [2] Old Ripple\ripple_app\main + + [1] Ripple\frame\api + + + [2] Old Ripple\ripple_app\main + @@ -1944,6 +1950,12 @@ [2] Old Ripple\ripple_app\main + + [1] Ripple\frame\api + + + [2] Old Ripple\ripple_app\main + diff --git a/doc/todo/VFALCO_TODO.txt b/doc/todo/VFALCO_TODO.txt index 8d5e361ad..461d1da01 100644 --- a/doc/todo/VFALCO_TODO.txt +++ b/doc/todo/VFALCO_TODO.txt @@ -3,6 +3,7 @@ RIPPLE TODO -------------------------------------------------------------------------------- Vinnie's List: Changes day to day, descending priority +- Finish RPCAsyncServer, RPCService and RPCService::Manager - Fix and tidy up broken beast classes - Parse Validator line using cribbed code - Parse ContentBodyBuffer from HTTPResponse diff --git a/src/ripple/frame/api/HTTPServer.cpp b/src/ripple/frame/api/HTTPServer.cpp new file mode 100644 index 000000000..7969e1e6e --- /dev/null +++ b/src/ripple/frame/api/HTTPServer.cpp @@ -0,0 +1,1016 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#include "../ripple_net/ripple_net.h" + +namespace ripple { + +using namespace beast; + +//------------------------------------------------------------------------------ + +HTTPServer::Port::Port () + : port (0) + , security (no_ssl) + , context (nullptr) +{ +} + +HTTPServer::Port::Port (Port const& other) + : port (other.port) + , addr (other.addr) + , security (other.security) + , context (other.context) +{ +} + +HTTPServer::Port& HTTPServer::Port::operator= (Port const& other) +{ + port = other.port; + addr = other.addr; + security = other.security; + context = other.context; + return *this; +} + +HTTPServer::Port::Port ( + uint16 port_, + IPEndpoint const& addr_, + Security security_, + SSLContext* context_) + : port (port_) + , addr (addr_) + , security (security_) + , context (context_) +{ +} + +int compare (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) +{ + int comp; + + comp = compare (lhs.addr, rhs.addr); + if (comp != 0) + return comp; + + if (lhs.port < rhs.port) + return -1; + else if (lhs.port > rhs.port) + return 1; + + if (lhs.security < rhs.security) + return -1; + else if (lhs.security > rhs.security) + return 1; + + // 'context' does not participate in the comparison + + return 0; +} + +bool operator== (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) == 0; } +bool operator!= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) != 0; } +bool operator< (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) < 0; } +bool operator<= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) <= 0; } +bool operator> (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) > 0; } +bool operator>= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) >= 0; } + +//------------------------------------------------------------------------------ + +HTTPServer::ScopedStream::ScopedStream (Session& session) + : m_session (session) +{ +} + +HTTPServer::ScopedStream::ScopedStream (ScopedStream const& other) + : m_session (other.m_session) +{ +} + +HTTPServer::ScopedStream::ScopedStream (Session& session, + std::ostream& manip (std::ostream&)) + : m_session (session) +{ + m_ostream << manip; +} + +HTTPServer::ScopedStream::~ScopedStream () +{ + if (! m_ostream.str().empty()) + m_session.write (m_ostream.str()); +} + +std::ostream& HTTPServer::ScopedStream::operator<< (std::ostream& manip (std::ostream&)) const +{ + return m_ostream << manip; +} + +std::ostringstream& HTTPServer::ScopedStream::ostream () const +{ + return m_ostream; +} + +//------------------------------------------------------------------------------ + +HTTPServer::Session::Session () + : headersComplete (false) + , tag (nullptr) +{ + content.reserve (1000); + reply.reserve (1000); +} + +HTTPServer::ScopedStream HTTPServer::Session::operator<< ( + std::ostream& manip (std::ostream&)) +{ + return ScopedStream (*this, manip); +} + +//------------------------------------------------------------------------------ + +class HTTPServer::Impl : public Thread +{ +public: + typedef boost::system::error_code error_code; + typedef boost::asio::ip::tcp Protocol; + typedef boost::asio::ip::address address; + typedef Protocol::endpoint endpoint_t; + typedef Protocol::acceptor acceptor; + typedef Protocol::socket socket; + + static std::string to_string (address const& addr) + { + return addr.to_string(); + } + + static std::string to_string (endpoint_t const& endpoint) + { + std::stringstream ss; + ss << to_string (endpoint.address()); + if (endpoint.port() != 0) + ss << ":" << std::dec << endpoint.port(); + return std::string (ss.str()); + } + + static endpoint_t to_asio (Port const& port) + { + if (port.addr.isV4()) + { + IPEndpoint::V4 v4 (port.addr.v4()); + std::string const& s (v4.to_string()); + return endpoint_t (address().from_string (s), port.port); + } + + //IPEndpoint::V6 v6 (ep.v6()); + return endpoint_t (); + } + + static IPEndpoint from_asio (endpoint_t const& endpoint) + { + std::stringstream ss (to_string (endpoint)); + IPEndpoint ep; + ss >> ep; + return ep; + } + + //-------------------------------------------------------------------------- + + // Holds the copy of buffers being sent + typedef SharedArg SharedBuffer; + + class Peer; + + class SessionImp : public Session + { + public: + Peer& m_peer; + bool m_closed; + boost::optional m_work; + + explicit SessionImp (Peer& peer) + : m_peer (peer) + , m_closed (false) + { + } + + ~SessionImp () + { + } + + bool closed() const + { + return m_closed; + } + + void write (void const* buffer, std::size_t bytes) + { + m_peer.write (buffer, bytes); + } + + void close() + { + m_closed = true; + } + + void detach() + { + if (! m_work) + m_work = boost::in_place (boost::ref ( + m_peer.m_impl.get_io_service())); + } + }; + + //-------------------------------------------------------------------------- + + /** Represents an active connection. */ + class Peer + : public SharedObject + , public AsyncObject + , public List ::Node + , public LeakChecked + { + public: + enum + { + // Size of our receive buffer + bufferSize = 8192, + + // Largest HTTP request allowed + maxRequestBytes = 32 * 1024, + + // Max seconds without receiving a byte + dataTimeoutSeconds = 10, + + // Max seconds without completing the request + requestTimeoutSeconds = 30 + + }; + + typedef SharedPtr Ptr; + + Impl& m_impl; + boost::asio::io_service::strand m_strand; + boost::asio::deadline_timer m_data_timer; + boost::asio::deadline_timer m_request_timer; + ScopedPointer m_socket; + MemoryBlock m_buffer; + HTTPParser m_parser; + SessionImp m_session; + int m_writesPending; + bool m_callClose; + + Peer (Impl& impl, Port const& port) + : m_impl (impl) + , m_strand (m_impl.get_io_service()) + , m_data_timer (m_impl.get_io_service()) + , m_request_timer (m_impl.get_io_service()) + , m_buffer (bufferSize) + , m_parser (HTTPParser::typeRequest) + , m_session (*this) + , m_writesPending (0) + , m_callClose (false) + { + int flags; + switch (port.security) + { + default: + bassertfalse; + case Port::no_ssl: flags = MultiSocket::none; break; + case Port::allow_ssl: flags = MultiSocket::server_ssl; break; + case Port::require_ssl: flags = MultiSocket::server_ssl_required; break; + } + + m_socket = MultiSocket::New (m_impl.get_io_service(), port.context->get(), flags); + + m_impl.add (*this); + } + + ~Peer () + { + if (m_callClose) + m_impl.handler().onClose (m_session); + + m_impl.remove (*this); + } + + // Returns the asio socket for the peer. + // + socket& get_socket() + { + return m_socket->this_layer(); + } + + // Return the Session associated with this peer's session. + // + SessionImp& session () + { + return m_session; + } + + // Cancels all pending i/o and timers and sends tcp shutdown. + // + void cancel () + { + error_code ec; + m_data_timer.cancel (ec); + m_request_timer.cancel (ec); + m_socket->cancel (ec); + m_socket->shutdown (socket::shutdown_both); + } + + // Called when I/O completes with an error that is not eof or aborted. + // + void failed (error_code ec) + { + cancel (); + } + + // Called when there are no more completion handlers pending. + // + void asyncHandlersComplete () + { + } + + // Send a copy of the data. + // + void write (void const* buffer, std::size_t bytes) + { + SharedBuffer buf (static_cast (buffer), bytes); + // Make sure this happens on an i/o service thread. + m_impl.get_io_service().dispatch (m_strand.wrap ( + boost::bind (&Peer::handle_write, Ptr (this), + buf, CompletionCounter (this)))); + } + + // Called from an io_service thread to write the shared buffer. + // + void handle_write (SharedBuffer const& buf, CompletionCounter) + { + async_write (buf); + } + + // Send a shared buffer + // + void async_write (SharedBuffer const& buf) + { + bassert (buf.get().size() > 0); + + ++m_writesPending; + + // Send the copy. We pass the SharedArg in the last parameter + // so that a reference is maintained as the handler gets copied. + // When the final completion function returns, the reference + // count will drop to zero and the buffer will be freed. + // + boost::asio::async_write (*m_socket, + boost::asio::const_buffers_1 (&buf->front(), buf->size()), + m_strand.wrap (boost::bind (&Peer::handle_write, + Ptr (this), boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred, + buf, CompletionCounter (this)))); + } + + // Send a copy of the buffer sequence. + // + template + void async_write (BufferSequence const& buffers) + { + // Count the number of buffers + std::size_t const nbuf (std::distance ( + buffers.begin(), buffers.end())); + + // Iterate over each linear vector in the BufferSequence. + for (typename BufferSequence::const_iterator iter (buffers.begin()); + iter != buffers.end(); ++iter) + { + typename BufferSequence::value_type const& buffer (*iter); + + // Put a copy of this section of the buffer sequence into + // a reference counted, shared container. + // + SharedBuffer buf ( + boost::asio::buffer_cast (buffer), + boost::asio::buffer_size (buffer)); + + async_write (buf); + } + } + + // Calls the async_read_some initiating function. + // + void async_read_some () + { + // re-arm the data timer + // (this cancels the previous wait, if any) + // + m_data_timer.expires_from_now ( + boost::posix_time::seconds ( + dataTimeoutSeconds)); + + m_data_timer.async_wait (m_strand.wrap (boost::bind ( + &Peer::handle_data_timer, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + + // issue the read + // + boost::asio::mutable_buffers_1 buf ( + m_buffer.getData (), m_buffer.getSize ()); + + m_socket->async_read_some (buf, m_strand.wrap ( + boost::bind (&Peer::handle_read, Ptr (this), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred, + CompletionCounter (this)))); + } + + // Sends a copy of the reply in the session if it is not empty. + // Returns `true` if m_session.closed is `true` + // On return, reply.empty() will return `true`. + // + void maybe_send_reply () + { + if (! m_session.reply.empty()) + { + async_write (boost::asio::const_buffers_1 ( + &m_session.reply.front(), m_session.reply.size())); + m_session.reply.clear(); + } + } + + // Called when the acceptor gives us the connection. + // + void handle_accept () + { + m_callClose = true; + + m_request_timer.expires_from_now ( + boost::posix_time::seconds ( + requestTimeoutSeconds)); + + m_request_timer.async_wait (m_strand.wrap (boost::bind ( + &Peer::handle_request_timer, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + + if (m_socket->needs_handshake ()) + { + m_socket->async_handshake (Socket::server, m_strand.wrap ( + boost::bind (&Peer::handle_handshake, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + } + else + { + async_read_some(); + } + } + + // Called when the handshake completes + // + void handle_handshake (error_code ec, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + // fail + return; + } + + async_read_some(); + } + + // Called when the data timer expires + // + void handle_data_timer (error_code ec, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + // fail + return; + } + + // They took too long to send any bytes + cancel(); + } + + // Called when the request timer expires + // + void handle_request_timer (error_code ec, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + // fail + return; + } + + // They took too long to complete the request + cancel(); + } + + // Called when async_write completes. + // + void handle_write (error_code ec, std::size_t bytes_transferred, + SharedBuffer buf, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + bassert (m_writesPending > 0); + if (--m_writesPending == 0 && m_session.closed()) + { + m_socket->shutdown (socket::shutdown_send); + } + } + + // Called when async_read_some completes. + // + void handle_read (error_code ec, std::size_t bytes_transferred, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0 && ec != boost::asio::error::eof) + { + failed (ec); + return; + } + + std::size_t const bytes_parsed (m_parser.process ( + m_buffer.getData(), bytes_transferred)); + + if (m_parser.error() || + bytes_parsed != bytes_transferred) + { + // set ec manually and call failed() + return; + } + + if (ec == boost::asio::error::eof) + { + m_parser.process_eof(); + ec = error_code(); + } + + if (m_parser.error()) + { + // set ec manually and call failed() + return; + } + + if (! m_parser.finished()) + { + // Feed some headers to the callback + if (m_parser.fields().size() > 0) + { + handle_headers (); + if (m_session.closed()) + return; + } + } + + if (m_parser.finished ()) + { + m_data_timer.cancel(); + + // VFALCO NOTE: Should we cancel this one? + m_request_timer.cancel(); + + if (! m_socket->needs_handshake()) + m_socket->shutdown (socket::shutdown_receive); + + handle_request (); + return; + } + + async_read_some(); + } + + // Called when we have some new headers. + // + void handle_headers () + { + m_session.headersComplete = m_parser.headersComplete(); + m_session.headers = HTTPHeaders (m_parser.fields()); + m_impl.handler().onHeaders (m_session); + + maybe_send_reply (); + } + + // Called when we have a complete http request. + // + void handle_request () + { + // This is to guarantee onHeaders is called at least once. + handle_headers(); + + if (m_session.closed()) + return; + + m_session.request = m_parser.request(); + + // Turn the Content-Body into a linear buffer. + ContentBodyBuffer const& body (m_session.request->body ()); + m_session.content.resize (body.size ()); + boost::asio::buffer_copy ( + boost::asio::buffer (&m_session.content.front(), + m_session.content.size()), body.data()); + + // Process the HTTPRequest + m_impl.handler().onRequest (m_session); + + maybe_send_reply (); + } + }; + + //-------------------------------------------------------------------------- + + /** A listening socket. */ + class Door + : public SharedObject + , public AsyncObject + , public List ::Node + , public LeakChecked + { + public: + typedef SharedPtr Ptr; + + Impl& m_impl; + acceptor m_acceptor; + Port m_port; + + Door (Impl& impl, Port const& port) + : m_impl (impl) + , m_acceptor (m_impl.get_io_service(), to_asio (port)) + , m_port (port) + { + m_impl.add (*this); + + error_code ec; + + m_acceptor.set_option (acceptor::reuse_address (true), ec); + if (ec) + { + m_impl.journal().error << + "Error setting acceptor socket option: " << ec.message(); + } + + if (! ec) + { + m_impl.journal().info << "Bound to endpoint " << + to_string (m_acceptor.local_endpoint()); + + async_accept(); + } + else + { + m_impl.journal().error << "Error binding to endpoint " << + to_string (m_acceptor.local_endpoint()) << + ", '" << ec.message() << "'"; + } + } + + ~Door () + { + m_impl.remove (*this); + } + + Port const& port () const + { + return m_port; + } + + void cancel () + { + m_acceptor.cancel(); + } + + void failed (error_code ec) + { + } + + void asyncHandlersComplete () + { + } + + void async_accept () + { + Peer* peer (new Peer (m_impl, m_port)); + m_acceptor.async_accept (peer->get_socket(), boost::bind ( + &Door::handle_accept, Ptr(this), + boost::asio::placeholders::error, + Peer::Ptr (peer), CompletionCounter (this))); + } + + void handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_impl.journal().error << "Accept failed: " << ec.message(); + return; + } + + async_accept(); + + // Save remote address in session + peer->session().remoteAddress = from_asio ( + peer->get_socket().remote_endpoint()).withPort (0); + m_impl.handler().onAccept (peer->session()); + + if (peer->session().closed()) + { + peer->cancel(); + return; + } + + peer->handle_accept(); + } + }; + + //-------------------------------------------------------------------------- + + struct State + { + // Attributes for our listening ports + Ports ports; + + // All allocated Peer objects + List peers; + + // All allocated Door objects + List doors; + }; + + typedef SharedData SharedState; + typedef std::vector Doors; + + HTTPServer& m_server; + Handler& m_handler; + Journal m_journal; + boost::asio::io_service m_io_service; + boost::asio::io_service::strand m_strand; + boost::optional m_work; + WaitableEvent m_stopped; + SharedState m_state; + Doors m_doors; + + //-------------------------------------------------------------------------- + + Impl (HTTPServer& server, Handler& handler, Journal journal) + : Thread ("RPC::HTTPServer") + , m_server (server) + , m_handler (handler) + , m_journal (journal) + , m_strand (m_io_service) + , m_work (boost::in_place (boost::ref (m_io_service))) + , m_stopped (true) + { + startThread (); + } + + ~Impl () + { + stopThread (); + } + + Journal const& journal() const + { + return m_journal; + } + + Ports const& getPorts () const + { + SharedState::UnlockedAccess state (m_state); + return state->ports; + } + + void setPorts (Ports const& ports) + { + SharedState::Access state (m_state); + state->ports = ports; + update(); + } + + bool stopping () const + { + return ! m_work; + } + + void stop (bool wait) + { + if (! stopping()) + { + m_work = boost::none; + update(); + } + + if (wait) + m_stopped.wait(); + } + + //-------------------------------------------------------------------------- + // + // Server + // + + Handler& handler() + { + return m_handler; + } + + boost::asio::io_service& get_io_service() + { + return m_io_service; + } + + // Inserts the peer into our list of peers. We only remove it + // from the list inside the destructor of the Peer object. This + // way, the Peer can never outlive the server. + // + void add (Peer& peer) + { + SharedState::Access state (m_state); + state->peers.push_back (peer); + } + + void add (Door& door) + { + SharedState::Access state (m_state); + state->doors.push_back (door); + } + + // Removes the peer from our list of peers. This is only called from + // the destructor of Peer. Essentially, the item in the list functions + // as a weak_ptr. + // + void remove (Peer& peer) + { + SharedState::Access state (m_state); + state->peers.erase (state->peers.iterator_to (peer)); + } + + void remove (Door& door) + { + SharedState::Access state (m_state); + state->doors.push_back (door); + } + + //-------------------------------------------------------------------------- + // + // Thread + // + + // Updates our Door list based on settings. + // + void handle_update () + { + if (! stopping()) + { + // Make a local copy to shorten the lock + // + Ports ports; + { + SharedState::ConstAccess state (m_state); + ports = state->ports; + } + + std::sort (ports.begin(), ports.end()); + + // Walk the Door list and the Port list simultaneously and + // build a replacement Door vector which we will then swap in. + // + Doors doors; + Doors::iterator door (m_doors.begin()); + for (Ports::const_iterator port (ports.begin()); + port != ports.end(); ++port) + { + int comp; + + while (door != m_doors.end() && + ((comp = compare (*port, (*door)->port())) > 0)) + { + (*door)->cancel(); + ++door; + } + + if (door != m_doors.end()) + { + if (comp < 0) + { + doors.push_back (new Door (*this, *port)); + } + else + { + // old Port and new Port are the same + doors.push_back (*door); + ++door; + } + } + else + { + doors.push_back (new Door (*this, *port)); + } + } + + // Any remaining Door objects are not in the new set, so cancel them. + // + for (;door != m_doors.end();) + (*door)->cancel(); + + m_doors.swap (doors); + } + else + { + // Cancel pending I/O on all doors. + // + for (Doors::iterator iter (m_doors.begin()); + iter != m_doors.end(); ++iter) + { + (*iter)->cancel(); + } + + // Remove our references to the old doors. + // + m_doors.resize (0); + } + } + + // Causes handle_update to run on the io_service + // + void update () + { + m_io_service.post (m_strand.wrap (boost::bind ( + &Impl::handle_update, this))); + } + + // The main i/o processing loop. + // + void run () + { + m_io_service.run (); + + m_stopped.signal(); + m_handler.onStopped (m_server); + } +}; + +//------------------------------------------------------------------------------ + +HTTPServer::HTTPServer (Handler& handler, Journal journal) + : m_impl (new Impl (*this, handler, journal)) +{ +} + +HTTPServer::~HTTPServer () +{ + stop(); +} + +Journal const& HTTPServer::journal () const +{ + return m_impl->journal(); +} + +HTTPServer::Ports const& HTTPServer::getPorts () const +{ + return m_impl->getPorts(); +} + +void HTTPServer::setPorts (Ports const& ports) +{ + m_impl->setPorts (ports); +} + +void HTTPServer::stopAsync () +{ + m_impl->stop(false); +} + +void HTTPServer::stop () +{ + m_impl->stop(true); +} + +//------------------------------------------------------------------------------ + +} diff --git a/src/ripple/frame/api/HTTPServer.h b/src/ripple/frame/api/HTTPServer.h new file mode 100644 index 000000000..77bf7ea8a --- /dev/null +++ b/src/ripple/frame/api/HTTPServer.h @@ -0,0 +1,263 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_FRAME_HTTPSERVER_H_INCLUDED +#define RIPPLE_FRAME_HTTPSERVER_H_INCLUDED + +#include + +namespace ripple { + +using namespace beast; + +/** Multi-threaded, asynchronous HTTP server. */ +class HTTPServer +{ +public: + /** Configuration information for a listening port. */ + struct Port + { + enum Security + { + no_ssl, + allow_ssl, + require_ssl + }; + + Port (); + Port (Port const& other); + Port& operator= (Port const& other); + Port (uint16 port_, IPEndpoint const& addr_, + Security security_, SSLContext* context_); + + uint16 port; + IPEndpoint addr; + Security security; + SSLContext* context; + }; + + //-------------------------------------------------------------------------- + + class Session; + + /** Scoped ostream-based RAII container for building the HTTP response. */ + class ScopedStream + { + public: + explicit ScopedStream (Session& session); + ScopedStream (ScopedStream const& other); + + template + ScopedStream (Session& session, T const& t) + : m_session (session) + { + m_ostream << t; + } + + ScopedStream (Session& session, std::ostream& manip (std::ostream&)); + + ~ScopedStream (); + + std::ostringstream& ostream () const; + + std::ostream& operator<< (std::ostream& manip (std::ostream&)) const; + + template + std::ostream& operator<< (T const& t) const + { + return m_ostream << t; + } + + private: + ScopedStream& operator= (ScopedStream const&); // disallowed + + Session& m_session; + std::ostringstream mutable m_ostream; + }; + + //-------------------------------------------------------------------------- + + /** Persistent state information for a connection session. + These values are preserved between calls for efficiency. + Some fields are input parameters, some are output parameters, + and all only become defined during specific callbacks. + */ + class Session : public Uncopyable + { + public: + Session (); + + /** Input: The Journal the HTTPServer is using. */ + Journal journal; + + /** Input: The remote address of the connection. */ + IPEndpoint remoteAddress; + + /** Input: `true` if all the headers have been received. */ + bool headersComplete; + + /** Input: The currently known set of HTTP headers. */ + HTTPHeaders headers; + + /** Input: The full HTTPRequest when it is known. */ + SharedPtr request; + + /** Input: The Content-Body as a linear buffer if we have the HTTPRequest. */ + std::string content; + + /** Output: The buffer to send back as a reply. + Upon each entry into the callback, reply.size() will be zero. + If reply.size() is zero when the callback returns, no data is + sent. + */ + std::string reply; + + /** A user-definable pointer. + The initial value is always zero. + Changes to the value are persisted between calls. + */ + void* tag; + + + + /** Send a copy of data asynchronously. */ + /** @{ */ + void write (std::string const& s) + { + if (! s.empty()) + write (&s.front(), + std::distance (s.begin(), s.end())); + } + + template + void write (BufferSequence const& buffers) + { + for (typename BufferSequence::const_iterator iter (buffers.begin()); + iter != buffers.end(); ++iter) + { + typename BufferSequence::value_type const& buffer (*iter); + write (boost::asio::buffer_cast (buffer), + boost::asio::buffer_size (buffer)); + } + } + + virtual void write (void const* buffer, std::size_t bytes) = 0; + /** @} */ + + /** Output support using ostream. */ + /** @{ */ + ScopedStream operator<< (std::ostream& manip (std::ostream&)); + + template + ScopedStream operator<< (T const& t) + { + return ScopedStream (*this, t); + } + /** @} */ + + /** Detach the session. + This holds the session open so that the response can be sent + asynchronously. Calls to io_service::run made by the HTTPServer + will not return until all detached sessions are closed. + */ + virtual void detach() = 0; + + /** Close the session. + This will be performed asynchronously. The session will be + closed gracefully after all pending writes have completed. + */ + virtual void close() = 0; + }; + + //-------------------------------------------------------------------------- + + /** Processes all sessions. + Thread safety: + Must be safe to call concurrently from any number of foreign threads. + */ + struct Handler + { + /** Called when the connection is accepted and we know remoteAddress. */ + virtual void onAccept (Session& session) = 0; + + /** Called repeatedly as new HTTP headers are received. + Guaranteed to be called at least once. + */ + virtual void onHeaders (Session& session) = 0; + + /** Called when we have the full Content-Body. */ + virtual void onRequest (Session& session) = 0; + + /** Called when the session ends. + Guaranteed to be called once. + */ + virtual void onClose (Session& session) = 0; + + /** Called when the HTTPServer has finished its stop. */ + virtual void onStopped (HTTPServer& server) = 0; + }; + + //-------------------------------------------------------------------------- + + /** A set of listening ports settings. */ + typedef std::vector Ports; + + /** Create the server using the specified handler. */ + HTTPServer (Handler& handler, Journal journal); + + /** Destroy the server. + This blocks until the server stops. + */ + virtual ~HTTPServer (); + + /** Returns the Journal associated with the server. */ + Journal const& journal () const; + + /** Returns the listening ports settings. + Thread safety: + Safe to call from any thread. + Cannot be called concurrently with setPorts. + */ + Ports const& getPorts () const; + + /** Set the listening ports settings. + These take effect immediately. Any current ports that are not in the + new set will be closed. Established connections will not be disturbed. + Thread safety: + Cannot be called concurrently. + */ + void setPorts (Ports const& ports); + + /** Notify the server to stop, without blocking. + Thread safety: + Safe to call concurrently from any thread. + */ + void stopAsync (); + + /** Notify the server to stop, and block until the stop is complete. + The handler's onStopped method will be called when the stop completes. + Thread safety: + Cannot be called concurrently. + Cannot be called from the thread of execution of any Handler functions. + */ + void stop (); + +private: + class Impl; + ScopedPointer m_impl; +}; + +int compare (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); +bool operator== (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); +bool operator!= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); +bool operator< (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); +bool operator<= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); +bool operator> (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); +bool operator>= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); + +} + +#endif diff --git a/src/ripple/frame/ripple_frame.cpp b/src/ripple/frame/ripple_frame.cpp index d38b202ef..45cef89f1 100644 --- a/src/ripple/frame/ripple_frame.cpp +++ b/src/ripple/frame/ripple_frame.cpp @@ -6,11 +6,12 @@ #include "BeastConfig.h" -#include "beast/modules/beast_core/beast_core.h" - -#include "beast/modules/beast_core/system/BeforeBoost.h" // must come first -#include - #include "ripple_frame.h" +#include "beast/modules/beast_core/system/BeforeBoost.h" // must come first +#include +#include +#include + +#include "api/HTTPServer.cpp" #include "api/RPCService.cpp" diff --git a/src/ripple/frame/ripple_frame.h b/src/ripple/frame/ripple_frame.h index 772efee51..a26a2909c 100644 --- a/src/ripple/frame/ripple_frame.h +++ b/src/ripple/frame/ripple_frame.h @@ -9,8 +9,13 @@ #include "beast/modules/beast_core/beast_core.h" +// VFALCO NOTE this sucks that we have to include asio in the header +// just for HTTPMessage!! +#include "beast/modules/beast_asio/beast_asio.h" + #include "../json/ripple_json.h" +#include "api/HTTPServer.h" #include "api/RPCService.h" #endif diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 03c54265b..0fb91d6c4 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -24,6 +24,8 @@ class NetworkOPsLog; template <> char const* LogPartition::getPartitionName () { return "NetworkOPs"; } class RPCServiceManagerLog; template <> char const* LogPartition::getPartitionName () { return "RPCServiceManager"; } +class HTTPServerLog; +template <> char const* LogPartition::getPartitionName () { return "RPCServer"; } // //------------------------------------------------------------------------------ @@ -82,6 +84,9 @@ public: // VFALCO NOTE LocalCredentials starts the deprecated UNL service , m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue)) + , m_rpcHTTPServer (RPCHTTPServer::New (*m_jobQueue, + LogJournal::get (), *m_networkOPs)) + , m_rpcServerHandler (*m_networkOPs) // passive object, not a Service , m_nodeStoreScheduler (*m_jobQueue, *m_jobQueue) @@ -577,6 +582,10 @@ public: // // Allow RPC connections. // +#if RIPPLE_USE_RPC_SERVICE_MANAGER + m_rpcHTTPServer->setup (m_journal); + +#else if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0) { try @@ -596,6 +605,7 @@ public: { m_journal.info << "RPC interface: disabled"; } +#endif // // Begin connecting to network. @@ -797,6 +807,7 @@ private: LedgerMaster m_ledgerMaster; ScopedPointer m_networkOPs; ScopedPointer m_deprecatedUNL; + ScopedPointer m_rpcHTTPServer; RPCServerHandler m_rpcServerHandler; NodeStoreScheduler m_nodeStoreScheduler; ScopedPointer m_nodeStore; diff --git a/src/ripple_app/main/RPCHTTPServer.cpp b/src/ripple_app/main/RPCHTTPServer.cpp new file mode 100644 index 000000000..4718f1607 --- /dev/null +++ b/src/ripple_app/main/RPCHTTPServer.cpp @@ -0,0 +1,137 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +class RPCHTTPServerImp + : public RPCHTTPServer + , public LeakChecked + , public HTTPServer::Handler +{ +public: + NetworkOPs& m_networkOPs; + RPCServerHandler m_deprecatedHandler; + HTTPServer m_server; + ScopedPointer m_context; + + RPCHTTPServerImp (Stoppable& parent, + Journal journal, + NetworkOPs& networkOPs) + : RPCHTTPServer (parent) + , m_networkOPs (networkOPs) + , m_deprecatedHandler (networkOPs) + , m_server (*this, journal) + { + if (getConfig ().RPC_SECURE == 0) + { + m_context = RippleSSLContext::createBare (); + } + else + { + m_context = RippleSSLContext::createAuthenticated ( + getConfig ().RPC_SSL_KEY, + getConfig ().RPC_SSL_CERT, + getConfig ().RPC_SSL_CHAIN); + } + } + + ~RPCHTTPServerImp () + { + m_server.stop(); + } + + void setup (Journal journal) + { + if (! getConfig ().getRpcIP().empty () && + getConfig ().getRpcPort() != 0) + { + IPEndpoint ep (IPEndpoint::from_string (getConfig().getRpcIP())); + if (! ep.empty()) + { + HTTPServer::Port port; + port.addr = ep.withPort(0); + if (getConfig ().getRpcPort() != 0) + port.port = getConfig ().getRpcPort(); + else + port.port = ep.port(); + port.context = m_context; + + HTTPServer::Ports ports; + ports.push_back (port); + m_server.setPorts (ports); + } + } + else + { + journal.info << "RPC interface: disabled"; + } + } + + //-------------------------------------------------------------------------- + // + // Stoppable + // + + void onStop() + { + m_server.stopAsync(); + } + + void onChildrenStopped() + { + } + + //-------------------------------------------------------------------------- + // + // HTTPServer::Handler + // + + void onAccept (HTTPServer::Session& session) + { + // Reject non-loopback connections if RPC_ALLOW_REMOTE is not set + if (! getConfig().RPC_ALLOW_REMOTE && + ! session.remoteAddress.isLoopback()) + { + session.close(); + } + } + + void onHeaders (HTTPServer::Session& session) + { + } + + void onRequest (HTTPServer::Session& session) + { + session.write (m_deprecatedHandler.processRequest ( + session.content, session.remoteAddress.to_string())); + + session.close(); + } + + void onClose (HTTPServer::Session& session) + { + } + + void onStopped (HTTPServer&) + { + stopped(); + } +}; + +//------------------------------------------------------------------------------ + +RPCHTTPServer::RPCHTTPServer (Stoppable& parent) + : Stoppable ("RPCHTTPServer", parent) +{ +} + +//------------------------------------------------------------------------------ + +RPCHTTPServer* RPCHTTPServer::New (Stoppable& parent, + Journal journal, + NetworkOPs& networkOPs) +{ + return new RPCHTTPServerImp (parent, journal, networkOPs); +} + diff --git a/src/ripple_app/main/RPCHTTPServer.h b/src/ripple_app/main/RPCHTTPServer.h new file mode 100644 index 000000000..9898e1e41 --- /dev/null +++ b/src/ripple_app/main/RPCHTTPServer.h @@ -0,0 +1,25 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED +#define RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED + +class RPCHTTPServer : public Stoppable +{ +protected: + RPCHTTPServer (Stoppable& parent); + +public: + static RPCHTTPServer* New (Stoppable& parent, + Journal journal, NetworkOPs& networkOPs); + + virtual ~RPCHTTPServer () { } + + /** Opens listening ports based on the Config settings. */ + virtual void setup(Journal journal) = 0; +}; + +#endif diff --git a/src/ripple_app/ripple_app.cpp b/src/ripple_app/ripple_app.cpp index 25ae98f93..6d52b06f3 100644 --- a/src/ripple_app/ripple_app.cpp +++ b/src/ripple_app/ripple_app.cpp @@ -49,6 +49,8 @@ namespace ripple #include "rpc/RPCHandler.cpp" # include "rpc/RPCServerHandler.h" +# include "main/RPCHTTPServer.h" +#include "main/RPCHTTPServer.cpp" #include "rpc/RPCServerHandler.cpp" #include "websocket/WSConnection.h"