Rearrange sources

This commit is contained in:
Pretty Printer
2025-06-17 19:16:40 +09:00
committed by tequ
parent 6b5a7ec905
commit 6c1bc9052d
1035 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,58 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_HANDOFF_H_INCLUDED
#define RIPPLE_SERVER_HANDOFF_H_INCLUDED
#include <ripple/server/Writer.h>
#include <boost/beast/http/dynamic_body.hpp>
#include <boost/beast/http/message.hpp>
#include <memory>
namespace ripple {
using http_request_type =
boost::beast::http::request<boost::beast::http::dynamic_body>;
using http_response_type =
boost::beast::http::response<boost::beast::http::dynamic_body>;
/** Used to indicate the result of a server connection handoff. */
struct Handoff
{
// When `true`, the Session will close the socket. The
// Handler may optionally take socket ownership using std::move
bool moved = false;
// If response is set, this determines the keep alive
bool keep_alive = false;
// When set, this will be sent back
std::shared_ptr<Writer> response;
bool
handled() const
{
return moved || response;
}
};
} // namespace ripple
#endif

136
include/xrpl/server/Port.h Normal file
View File

@@ -0,0 +1,136 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_PORT_H_INCLUDED
#define RIPPLE_SERVER_PORT_H_INCLUDED
#include <ripple/basics/BasicConfig.h>
#include <ripple/beast/net/IPEndpoint.h>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/network_v4.hpp>
#include <boost/asio/ip/network_v6.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket/option.hpp>
#include <cstdint>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <variant>
namespace boost {
namespace asio {
namespace ssl {
class context;
}
} // namespace asio
} // namespace boost
namespace ripple {
/** Configuration information for a Server listening port. */
struct Port
{
explicit Port() = default;
std::string name;
boost::asio::ip::address ip;
std::uint16_t port = 0;
std::set<std::string, boost::beast::iless> protocol;
std::vector<boost::asio::ip::network_v4> admin_nets_v4;
std::vector<boost::asio::ip::network_v6> admin_nets_v6;
std::vector<boost::asio::ip::network_v4> secure_gateway_nets_v4;
std::vector<boost::asio::ip::network_v6> secure_gateway_nets_v6;
std::string user;
std::string password;
std::string admin_user;
std::string admin_password;
std::string ssl_key;
std::string ssl_cert;
std::string ssl_chain;
std::string ssl_ciphers;
boost::beast::websocket::permessage_deflate pmd_options;
std::shared_ptr<boost::asio::ssl::context> context;
// How many incoming connections are allowed on this
// port in the range [0, 65535] where 0 means unlimited.
int limit = 0;
// Websocket disconnects if send queue exceeds this limit
std::uint16_t ws_queue_limit;
// Returns `true` if any websocket protocols are specified
bool
websockets() const;
// Returns `true` if any secure protocols are specified
bool
secure() const;
// Returns a string containing the list of protocols
std::string
protocols() const;
bool
has_udp() const
{
return protocol.count("udp") > 0;
}
// Maximum UDP packet size (default 64KB)
std::size_t udp_packet_size = 65536;
};
std::ostream&
operator<<(std::ostream& os, Port const& p);
//------------------------------------------------------------------------------
struct ParsedPort
{
explicit ParsedPort() = default;
std::string name;
std::set<std::string, boost::beast::iless> protocol;
std::string user;
std::string password;
std::string admin_user;
std::string admin_password;
std::string ssl_key;
std::string ssl_cert;
std::string ssl_chain;
std::string ssl_ciphers;
boost::beast::websocket::permessage_deflate pmd_options;
int limit = 0;
std::uint16_t ws_queue_limit;
std::optional<boost::asio::ip::address> ip;
std::optional<std::uint16_t> port;
std::vector<boost::asio::ip::network_v4> admin_nets_v4;
std::vector<boost::asio::ip::network_v6> admin_nets_v6;
std::vector<boost::asio::ip::network_v4> secure_gateway_nets_v4;
std::vector<boost::asio::ip::network_v6> secure_gateway_nets_v6;
};
void
parse_Port(ParsedPort& port, Section const& section, std::ostream& log);
} // namespace ripple
#endif

View File

@@ -0,0 +1,44 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SERVER_H_INCLUDED
#define RIPPLE_SERVER_SERVER_H_INCLUDED
#include <ripple/beast/utility/Journal.h>
#include <ripple/beast/utility/PropertyStream.h>
#include <ripple/server/Port.h>
#include <ripple/server/impl/ServerImpl.h>
#include <boost/asio/io_service.hpp>
namespace ripple {
/** Create the HTTP server using the specified handler. */
template <class Handler>
std::unique_ptr<Server>
make_Server(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal)
{
return std::make_unique<ServerImpl<Handler>>(handler, io_service, journal);
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,135 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SESSION_H_INCLUDED
#define RIPPLE_SERVER_SESSION_H_INCLUDED
#include <ripple/beast/net/IPEndpoint.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/server/WSSession.h>
#include <ripple/server/Writer.h>
#include <boost/beast/http/message.hpp>
#include <functional>
#include <memory>
#include <ostream>
#include <vector>
namespace ripple {
/** Persistent state information for a connection session.
These values are preserved between calls for efficiency.
Some fields are input parameters, some are output parameters,
and all only become defined during specific callbacks.
*/
class Session
{
public:
Session() = default;
Session(Session const&) = delete;
Session&
operator=(Session const&) = delete;
virtual ~Session() = default;
/** A user-definable pointer.
The initial value is always zero.
Changes to the value are persisted between calls.
*/
void* tag = nullptr;
/** Returns the Journal to use for logging. */
virtual beast::Journal
journal() = 0;
/** Returns the Port settings for this connection. */
virtual Port const&
port() = 0;
/** Returns the remote address of the connection. */
virtual beast::IP::Endpoint
remoteAddress() = 0;
/** Returns the current HTTP request. */
virtual http_request_type&
request() = 0;
/** Send a copy of data asynchronously. */
/** @{ */
void
write(std::string const& s)
{
if (!s.empty())
write(&s[0], std::distance(s.begin(), s.end()));
}
template <typename BufferSequence>
void
write(BufferSequence const& buffers)
{
for (typename BufferSequence::const_iterator iter(buffers.begin());
iter != buffers.end();
++iter)
{
typename BufferSequence::value_type const& buffer(*iter);
write(
boost::asio::buffer_cast<void const*>(buffer),
boost::asio::buffer_size(buffer));
}
}
virtual void
write(void const* buffer, std::size_t bytes) = 0;
virtual void
write(std::shared_ptr<Writer> const& writer, bool keep_alive) = 0;
/** @} */
/** Detach the session.
This holds the session open so that the response can be sent
asynchronously. Calls to io_service::run made by the server
will not return until all detached sessions are closed.
*/
virtual std::shared_ptr<Session>
detach() = 0;
/** Indicate that the response is complete.
The handler should call this when it has completed writing
the response. If Keep-Alive is indicated on the connection,
this will trigger a read for the next request; else, the
connection will be closed when all remaining data has been sent.
*/
virtual void
complete() = 0;
/** Close the session.
This will be performed asynchronously. The session will be
closed gracefully after all pending writes have completed.
@param graceful `true` to wait until all data has finished sending.
*/
virtual void
close(bool graceful) = 0;
/** Convert the connection to WebSocket. */
virtual std::shared_ptr<WSSession>
websocketUpgrade() = 0;
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,77 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SIMPLEWRITER_H_INCLUDED
#define RIPPLE_SERVER_SIMPLEWRITER_H_INCLUDED
#include <ripple/server/Writer.h>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/ostream.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/write.hpp>
#include <utility>
namespace ripple {
/// Deprecated: Writer that serializes a HTTP/1 message
class SimpleWriter : public Writer
{
boost::beast::multi_buffer sb_;
public:
template <bool isRequest, class Body, class Fields>
explicit SimpleWriter(
boost::beast::http::message<isRequest, Body, Fields> const& msg)
{
boost::beast::ostream(sb_) << msg;
}
bool
complete() override
{
return sb_.size() == 0;
}
void
consume(std::size_t bytes) override
{
sb_.consume(bytes);
}
bool
prepare(std::size_t bytes, std::function<void(void)>) override
{
return true;
}
std::vector<boost::asio::const_buffer>
data() override
{
auto const& buf = sb_.data();
std::vector<boost::asio::const_buffer> result;
result.reserve(std::distance(buf.begin(), buf.end()));
for (auto const b : buf)
result.push_back(b);
return result;
}
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,284 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_UDPDOOR_H_INCLUDED
#define RIPPLE_SERVER_UDPDOOR_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/basics/contract.h>
#include <ripple/server/impl/PlainHTTPPeer.h>
#include <ripple/server/impl/SSLHTTPPeer.h>
#include <ripple/server/impl/io_list.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast/core/detect_ssl.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/container/flat_map.hpp>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
namespace ripple {
template <class Handler>
class UDPDoor : public io_list::work,
public std::enable_shared_from_this<UDPDoor<Handler>>
{
private:
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using udp_socket = boost::asio::ip::udp::socket;
beast::Journal const j_;
Port const& port_;
Handler& handler_;
boost::asio::io_context& ioc_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
udp_socket socket_;
std::vector<char> recv_buffer_;
endpoint_type local_endpoint_; // Store TCP-style endpoint
public:
UDPDoor(
Handler& handler,
boost::asio::io_context& io_context,
Port const& port,
beast::Journal j)
: j_(j)
, port_(port)
, handler_(handler)
, ioc_(io_context)
, strand_(io_context.get_executor())
, socket_(io_context)
, recv_buffer_(port.udp_packet_size)
, local_endpoint_(port.ip, port.port) // Store as TCP endpoint
{
error_code ec;
// Create UDP endpoint from port configuration
auto const addr = port_.ip.to_v4();
boost::asio::ip::udp::endpoint udp_endpoint(addr, port_.port);
socket_.open(boost::asio::ip::udp::v4(), ec);
if (ec)
{
JLOG(j_.error()) << "UDP socket open failed: " << ec.message();
return;
}
// Set socket options
socket_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec)
{
JLOG(j_.error())
<< "UDP set reuse_address failed: " << ec.message();
return;
}
socket_.bind(udp_endpoint, ec);
if (ec)
{
JLOG(j_.error()) << "UDP socket bind failed: " << ec.message();
return;
}
JLOG(j_.info()) << "UDP-RPC listening on " << udp_endpoint;
}
endpoint_type
get_endpoint() const
{
return local_endpoint_;
}
void
run()
{
if (!socket_.is_open())
return;
do_receive();
}
void
close() override
{
error_code ec;
socket_.close(ec);
}
private:
void
do_receive()
{
if (!socket_.is_open())
return;
socket_.async_receive_from(
boost::asio::buffer(recv_buffer_),
sender_endpoint_,
boost::asio::bind_executor(
strand_,
std::bind(
&UDPDoor::on_receive,
this->shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
void
on_receive(error_code ec, std::size_t bytes_transferred)
{
if (ec)
{
if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error()) << "UDP receive failed: " << ec.message();
do_receive();
}
return;
}
// Convert UDP endpoint to TCP endpoint for compatibility
endpoint_type tcp_endpoint(
sender_endpoint_.address(), sender_endpoint_.port());
// Handle the received UDP message
handler_.onUDPMessage(
std::string(recv_buffer_.data(), bytes_transferred),
tcp_endpoint,
[this, tcp_endpoint](std::string const& response) {
do_send(response, tcp_endpoint);
});
do_receive();
}
void
do_send(std::string const& response, endpoint_type const& tcp_endpoint)
{
if (!socket_.is_open())
{
std::cout << "UDP SOCKET NOT OPEN WHEN SENDING\n\n";
return;
}
const size_t HEADER_SIZE = 16;
const size_t MAX_DATAGRAM_SIZE =
65487; // Allow for ipv6 header 40 bytes + 8 bytes of udp header
const size_t MAX_PAYLOAD_SIZE = MAX_DATAGRAM_SIZE - HEADER_SIZE;
// Convert TCP endpoint back to UDP for sending
boost::asio::ip::udp::endpoint udp_endpoint(
tcp_endpoint.address(), tcp_endpoint.port());
// If message fits in single datagram, send normally
if (response.length() <= MAX_DATAGRAM_SIZE)
{
socket_.async_send_to(
boost::asio::buffer(response),
udp_endpoint,
boost::asio::bind_executor(
strand_,
[this, self = this->shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error())
<< "UDP send failed: " << ec.message();
}
}));
return;
}
// Calculate number of packets needed
const size_t payload_size = MAX_PAYLOAD_SIZE;
const uint16_t total_packets =
(response.length() + payload_size - 1) / payload_size;
// Get current timestamp in microseconds
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch())
.count();
uint64_t timestamp = static_cast<uint64_t>(micros);
// Send fragmented packets
for (uint16_t packet_num = 0; packet_num < total_packets; packet_num++)
{
std::string fragment;
fragment.reserve(MAX_DATAGRAM_SIZE);
// Add header - 4 bytes of zeros
fragment.push_back(0);
fragment.push_back(0);
fragment.push_back(0);
fragment.push_back(0);
// Add packet number (little endian)
fragment.push_back(packet_num & 0xFF);
fragment.push_back((packet_num >> 8) & 0xFF);
// Add total packets (little endian)
fragment.push_back(total_packets & 0xFF);
fragment.push_back((total_packets >> 8) & 0xFF);
// Add timestamp (8 bytes, little endian)
fragment.push_back(timestamp & 0xFF);
fragment.push_back((timestamp >> 8) & 0xFF);
fragment.push_back((timestamp >> 16) & 0xFF);
fragment.push_back((timestamp >> 24) & 0xFF);
fragment.push_back((timestamp >> 32) & 0xFF);
fragment.push_back((timestamp >> 40) & 0xFF);
fragment.push_back((timestamp >> 48) & 0xFF);
fragment.push_back((timestamp >> 56) & 0xFF);
// Calculate payload slice
size_t start = packet_num * payload_size;
size_t length = std::min(payload_size, response.length() - start);
fragment.append(response.substr(start, length));
socket_.async_send_to(
boost::asio::buffer(fragment),
udp_endpoint,
boost::asio::bind_executor(
strand_,
[this, self = this->shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error())
<< "UDP send failed: " << ec.message();
}
}));
}
}
boost::asio::ip::udp::endpoint sender_endpoint_;
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,149 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_WSSESSION_H_INCLUDED
#define RIPPLE_SERVER_WSSESSION_H_INCLUDED
#include <ripple/server/Handoff.h>
#include <ripple/server/Port.h>
#include <ripple/server/Writer.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/logic/tribool.hpp>
#include <algorithm>
#include <functional>
#include <memory>
#include <utility>
#include <vector>
namespace ripple {
class WSMsg
{
public:
WSMsg() = default;
WSMsg(WSMsg const&) = delete;
WSMsg&
operator=(WSMsg const&) = delete;
virtual ~WSMsg() = default;
/** Retrieve message data.
Returns a tribool indicating whether or not
data is available, and a ConstBufferSequence
representing the data.
tribool values:
maybe: Data is not ready yet
false: Data is available
true: Data is available, and
it is the last chunk of bytes.
Derived classes that do not know when the data
ends (for example, when returning the output of a
paged database query) may return `true` and an
empty vector.
*/
virtual std::pair<boost::tribool, std::vector<boost::asio::const_buffer>>
prepare(std::size_t bytes, std::function<void(void)> resume) = 0;
};
template <class Streambuf>
class StreambufWSMsg : public WSMsg
{
Streambuf sb_;
std::size_t n_ = 0;
public:
StreambufWSMsg(Streambuf&& sb) : sb_(std::move(sb))
{
}
std::pair<boost::tribool, std::vector<boost::asio::const_buffer>>
prepare(std::size_t bytes, std::function<void(void)>) override
{
if (sb_.size() == 0)
return {true, {}};
sb_.consume(n_);
boost::tribool done;
if (bytes < sb_.size())
{
n_ = bytes;
done = false;
}
else
{
n_ = sb_.size();
done = true;
}
auto const pb = boost::beast::buffers_prefix(n_, sb_.data());
std::vector<boost::asio::const_buffer> vb(
std::distance(pb.begin(), pb.end()));
std::copy(pb.begin(), pb.end(), std::back_inserter(vb));
return {done, vb};
}
};
struct WSSession
{
std::shared_ptr<void> appDefined;
virtual ~WSSession() = default;
WSSession() = default;
WSSession(WSSession const&) = delete;
WSSession&
operator=(WSSession const&) = delete;
virtual void
run() = 0;
virtual Port const&
port() const = 0;
virtual http_request_type const&
request() const = 0;
virtual boost::asio::ip::tcp::endpoint const&
remote_endpoint() const = 0;
/** Send a WebSockets message. */
virtual void
send(std::shared_ptr<WSMsg> w) = 0;
virtual void
close() = 0;
virtual void
close(boost::beast::websocket::close_reason const& reason) = 0;
/** Indicate that the response is complete.
The handler should call this when it has completed writing
the response.
*/
virtual void
complete() = 0;
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,60 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_WRITER_H_INCLUDED
#define RIPPLE_SERVER_WRITER_H_INCLUDED
#include <boost/asio/buffer.hpp>
#include <functional>
#include <vector>
namespace ripple {
class Writer
{
public:
virtual ~Writer() = default;
/** Returns `true` if there is no more data to pull. */
virtual bool
complete() = 0;
/** Removes bytes from the input sequence.
Can be called with 0.
*/
virtual void
consume(std::size_t bytes) = 0;
/** Add data to the input sequence.
@param bytes A hint to the number of bytes desired.
@param resume A functor to later resume execution.
@return `true` if the writer is ready to provide more data.
*/
virtual bool
prepare(std::size_t bytes, std::function<void(void)> resume) = 0;
/** Returns a ConstBufferSequence representing the input sequence. */
virtual std::vector<boost::asio::const_buffer>
data() = 0;
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,532 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright(c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED
#define RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/beast/net/IPAddressConversion.h>
#include <ripple/server/Session.h>
#include <ripple/server/impl/io_list.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/http/dynamic_body.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>
#include <atomic>
#include <cassert>
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <type_traits>
#include <vector>
namespace ripple {
/** Represents an active connection. */
template <class Handler, class Impl>
class BaseHTTPPeer : public io_list::work, public Session
{
protected:
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using yield_context = boost::asio::yield_context;
enum {
// Size of our read/write buffer
bufferSize = 4 * 1024,
// Max seconds without completing a message
timeoutSeconds = 30,
timeoutSecondsLocal = 3 // used for localhost clients
};
struct buffer
{
buffer(void const* ptr, std::size_t len)
: data(new char[len]), bytes(len), used(0)
{
memcpy(data.get(), ptr, len);
}
std::unique_ptr<char[]> data;
std::size_t bytes;
std::size_t used;
};
Port const& port_;
Handler& handler_;
boost::asio::executor_work_guard<boost::asio::executor> work_;
boost::asio::strand<boost::asio::executor> strand_;
endpoint_type remote_address_;
beast::Journal const journal_;
std::string id_;
std::size_t nid_;
boost::asio::streambuf read_buf_;
http_request_type message_;
std::vector<buffer> wq_;
std::vector<buffer> wq2_;
std::mutex mutex_;
bool graceful_ = false;
bool complete_ = false;
boost::system::error_code ec_;
int request_count_ = 0;
std::size_t bytes_in_ = 0;
std::size_t bytes_out_ = 0;
//--------------------------------------------------------------------------
public:
template <class ConstBufferSequence>
BaseHTTPPeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
beast::Journal journal,
endpoint_type remote_address,
ConstBufferSequence const& buffers);
virtual ~BaseHTTPPeer();
Session&
session()
{
return *this;
}
void
close() override;
protected:
Impl&
impl()
{
return *static_cast<Impl*>(this);
}
void
fail(error_code ec, char const* what);
void
start_timer();
void
cancel_timer();
void
on_timer();
void
do_read(yield_context do_yield);
void
on_write(error_code const& ec, std::size_t bytes_transferred);
void
do_writer(
std::shared_ptr<Writer> const& writer,
bool keep_alive,
yield_context do_yield);
virtual void
do_request() = 0;
virtual void
do_close() = 0;
// Session
beast::Journal
journal() override
{
return journal_;
}
Port const&
port() override
{
return port_;
}
beast::IP::Endpoint
remoteAddress() override
{
return beast::IPAddressConversion::from_asio(remote_address_);
}
http_request_type&
request() override
{
return message_;
}
void
write(void const* buffer, std::size_t bytes) override;
void
write(std::shared_ptr<Writer> const& writer, bool keep_alive) override;
std::shared_ptr<Session>
detach() override;
void
complete() override;
void
close(bool graceful) override;
};
//------------------------------------------------------------------------------
template <class Handler, class Impl>
template <class ConstBufferSequence>
BaseHTTPPeer<Handler, Impl>::BaseHTTPPeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
beast::Journal journal,
endpoint_type remote_address,
ConstBufferSequence const& buffers)
: port_(port)
, handler_(handler)
, work_(executor)
, strand_(executor)
, remote_address_(remote_address)
, journal_(journal)
{
read_buf_.commit(boost::asio::buffer_copy(
read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers));
static std::atomic<int> sid;
nid_ = ++sid;
id_ = std::string("#") + std::to_string(nid_) + " ";
JLOG(journal_.trace()) << id_ << "accept: " << remote_address_.address();
}
template <class Handler, class Impl>
BaseHTTPPeer<Handler, Impl>::~BaseHTTPPeer()
{
handler_.onClose(session(), ec_);
JLOG(journal_.trace()) << id_ << "destroyed: " << request_count_
<< ((request_count_ == 1) ? " request"
: " requests");
}
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::close()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
(void (BaseHTTPPeer::*)(void)) & BaseHTTPPeer::close,
impl().shared_from_this()));
boost::beast::get_lowest_layer(impl().stream_).close();
}
//------------------------------------------------------------------------------
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::fail(error_code ec, char const* what)
{
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
JLOG(journal_.trace())
<< id_ << std::string(what) << ": " << ec.message();
boost::beast::get_lowest_layer(impl().stream_).close();
}
}
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::start_timer()
{
boost::beast::get_lowest_layer(impl().stream_)
.expires_after(std::chrono::seconds(
remote_address_.address().is_loopback() ? timeoutSecondsLocal
: timeoutSeconds));
}
// Convenience for discarding the error code
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::cancel_timer()
{
boost::beast::get_lowest_layer(impl().stream_).expires_never();
}
// Called when session times out
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::on_timer()
{
auto ec =
boost::system::errc::make_error_code(boost::system::errc::timed_out);
fail(ec, "timer");
}
//------------------------------------------------------------------------------
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::do_read(yield_context do_yield)
{
complete_ = false;
error_code ec;
start_timer();
boost::beast::http::async_read(
impl().stream_, read_buf_, message_, do_yield[ec]);
cancel_timer();
if (ec == boost::beast::http::error::end_of_stream)
return do_close();
if (ec == boost::beast::error::timeout)
return on_timer();
if (ec)
return fail(ec, "http::read");
do_request();
}
// Send everything in the write queue.
// The write queue must not be empty upon entry.
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::on_write(
error_code const& ec,
std::size_t bytes_transferred)
{
cancel_timer();
if (ec == boost::beast::error::timeout)
return on_timer();
if (ec)
return fail(ec, "write");
bytes_out_ += bytes_transferred;
{
std::lock_guard lock(mutex_);
wq2_.clear();
wq2_.reserve(wq_.size());
std::swap(wq2_, wq_);
}
if (!wq2_.empty())
{
std::vector<boost::asio::const_buffer> v;
v.reserve(wq2_.size());
for (auto const& b : wq2_)
v.emplace_back(b.data.get(), b.bytes);
start_timer();
return boost::asio::async_write(
impl().stream_,
v,
bind_executor(
strand_,
std::bind(
&BaseHTTPPeer::on_write,
impl().shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
if (!complete_)
return;
if (graceful_)
return do_close();
boost::asio::spawn(
strand_,
std::bind(
&BaseHTTPPeer<Handler, Impl>::do_read,
impl().shared_from_this(),
std::placeholders::_1));
}
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::do_writer(
std::shared_ptr<Writer> const& writer,
bool keep_alive,
yield_context do_yield)
{
std::function<void(void)> resume;
{
auto const p = impl().shared_from_this();
resume = std::function<void(void)>([this, p, writer, keep_alive]() {
boost::asio::spawn(
strand_,
std::bind(
&BaseHTTPPeer<Handler, Impl>::do_writer,
p,
writer,
keep_alive,
std::placeholders::_1));
});
}
for (;;)
{
if (!writer->prepare(bufferSize, resume))
return;
error_code ec;
auto const bytes_transferred = boost::asio::async_write(
impl().stream_,
writer->data(),
boost::asio::transfer_at_least(1),
do_yield[ec]);
if (ec)
return fail(ec, "writer");
writer->consume(bytes_transferred);
if (writer->complete())
break;
}
if (!keep_alive)
return do_close();
boost::asio::spawn(
strand_,
std::bind(
&BaseHTTPPeer<Handler, Impl>::do_read,
impl().shared_from_this(),
std::placeholders::_1));
}
//------------------------------------------------------------------------------
// Send a copy of the data.
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::write(void const* buf, std::size_t bytes)
{
if (bytes == 0)
return;
if ([&] {
std::lock_guard lock(mutex_);
wq_.emplace_back(buf, bytes);
return wq_.size() == 1 && wq2_.size() == 0;
}())
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
&BaseHTTPPeer::on_write,
impl().shared_from_this(),
error_code{},
0));
else
return on_write(error_code{}, 0);
}
}
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::write(
std::shared_ptr<Writer> const& writer,
bool keep_alive)
{
boost::asio::spawn(bind_executor(
strand_,
std::bind(
&BaseHTTPPeer<Handler, Impl>::do_writer,
impl().shared_from_this(),
writer,
keep_alive,
std::placeholders::_1)));
}
// DEPRECATED
// Make the Session asynchronous
template <class Handler, class Impl>
std::shared_ptr<Session>
BaseHTTPPeer<Handler, Impl>::detach()
{
return impl().shared_from_this();
}
// DEPRECATED
// Called to indicate the response has been written(but not sent)
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::complete()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
&BaseHTTPPeer<Handler, Impl>::complete,
impl().shared_from_this()));
message_ = {};
complete_ = true;
{
std::lock_guard lock(mutex_);
if (!wq_.empty() && !wq2_.empty())
return;
}
// keep-alive
boost::asio::spawn(bind_executor(
strand_,
std::bind(
&BaseHTTPPeer<Handler, Impl>::do_read,
impl().shared_from_this(),
std::placeholders::_1)));
}
// DEPRECATED
// Called from the Handler to close the session.
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::close(bool graceful)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
(void (BaseHTTPPeer::*)(bool)) &
BaseHTTPPeer<Handler, Impl>::close,
impl().shared_from_this(),
graceful));
complete_ = true;
if (graceful)
{
graceful_ = true;
{
std::lock_guard lock(mutex_);
if (!wq_.empty() || !wq2_.empty())
return;
}
return do_close();
}
boost::beast::get_lowest_layer(impl().stream_).close();
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,110 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_BASEPEER_H_INCLUDED
#define RIPPLE_SERVER_BASEPEER_H_INCLUDED
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/server/Port.h>
#include <ripple/server/impl/LowestLayer.h>
#include <ripple/server/impl/io_list.h>
#include <boost/asio.hpp>
#include <atomic>
#include <cassert>
#include <functional>
#include <string>
namespace ripple {
// Common part of all peers
template <class Handler, class Impl>
class BasePeer : public io_list::work
{
protected:
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
Port const& port_;
Handler& handler_;
endpoint_type remote_address_;
beast::WrappedSink sink_;
beast::Journal const j_;
boost::asio::executor_work_guard<boost::asio::executor> work_;
boost::asio::strand<boost::asio::executor> strand_;
public:
BasePeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
endpoint_type remote_address,
beast::Journal journal);
void
close() override;
private:
Impl&
impl()
{
return *static_cast<Impl*>(this);
}
};
//------------------------------------------------------------------------------
template <class Handler, class Impl>
BasePeer<Handler, Impl>::BasePeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
endpoint_type remote_address,
beast::Journal journal)
: port_(port)
, handler_(handler)
, remote_address_(remote_address)
, sink_(
journal.sink(),
[] {
static std::atomic<unsigned> id{0};
return "##" + std::to_string(++id) + " ";
}())
, j_(sink_)
, work_(executor)
, strand_(executor)
{
}
template <class Handler, class Impl>
void
BasePeer<Handler, Impl>::close()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&BasePeer::close, impl().shared_from_this()));
error_code ec;
ripple::get_lowest_layer(impl().ws_).socket().close(ec);
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,524 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright(c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
#define RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
#include <ripple/basics/safe_cast.h>
#include <ripple/beast/utility/rngfill.h>
#include <ripple/crypto/csprng.h>
#include <ripple/protocol/BuildInfo.h>
#include <ripple/server/impl/BasePeer.h>
#include <ripple/server/impl/LowestLayer.h>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/websocket.hpp>
#include <cassert>
#include <functional>
namespace ripple {
/** Represents an active WebSocket connection. */
template <class Handler, class Impl>
class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
{
protected:
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
using BasePeer<Handler, Impl>::strand_;
private:
friend class BasePeer<Handler, Impl>;
http_request_type request_;
boost::beast::multi_buffer rb_;
boost::beast::multi_buffer wb_;
std::list<std::shared_ptr<WSMsg>> wq_;
/// The socket has been closed, or will close after the next write
/// finishes. Do not do any more writes, and don't try to close
/// again.
bool do_close_ = false;
boost::beast::websocket::close_reason cr_;
waitable_timer timer_;
bool close_on_timer_ = false;
bool ping_active_ = false;
boost::beast::websocket::ping_data payload_;
error_code ec_;
std::function<
void(boost::beast::websocket::frame_type, boost::beast::string_view)>
control_callback_;
public:
template <class Body, class Headers>
BaseWSPeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
waitable_timer timer,
endpoint_type remote_address,
boost::beast::http::request<Body, Headers>&& request,
beast::Journal journal);
void
run() override;
//
// WSSession
//
Port const&
port() const override
{
return this->port_;
}
http_request_type const&
request() const override
{
return this->request_;
}
boost::asio::ip::tcp::endpoint const&
remote_endpoint() const override
{
return this->remote_address_;
}
void
send(std::shared_ptr<WSMsg> w) override;
void
close() override;
void
close(boost::beast::websocket::close_reason const& reason) override;
void
complete() override;
protected:
Impl&
impl()
{
return *static_cast<Impl*>(this);
}
void
on_ws_handshake(error_code const& ec);
void
do_write();
void
on_write(error_code const& ec);
void
on_write_fin(error_code const& ec);
void
do_read();
void
on_read(error_code const& ec);
void
on_close(error_code const& ec);
void
start_timer();
void
cancel_timer();
void
on_ping(error_code const& ec);
void
on_ping_pong(
boost::beast::websocket::frame_type kind,
boost::beast::string_view payload);
void
on_timer(error_code ec);
template <class String>
void
fail(error_code ec, String const& what);
};
//------------------------------------------------------------------------------
template <class Handler, class Impl>
template <class Body, class Headers>
BaseWSPeer<Handler, Impl>::BaseWSPeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
waitable_timer timer,
endpoint_type remote_address,
boost::beast::http::request<Body, Headers>&& request,
beast::Journal journal)
: BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
, request_(std::move(request))
, timer_(std::move(timer))
, payload_("12345678") // ensures size is 8 bytes
{
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::run()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
impl().ws_.set_option(port().pmd_options);
// Must manage the control callback memory outside of the `control_callback`
// function
control_callback_ = std::bind(
&BaseWSPeer::on_ping_pong,
this,
std::placeholders::_1,
std::placeholders::_2);
impl().ws_.control_callback(control_callback_);
start_timer();
close_on_timer_ = true;
impl().ws_.set_option(
boost::beast::websocket::stream_base::decorator([](auto& res) {
res.set(
boost::beast::http::field::server,
BuildInfo::getFullVersionString());
}));
impl().ws_.async_accept(
request_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_ws_handshake,
impl().shared_from_this(),
std::placeholders::_1)));
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::send(std::shared_ptr<WSMsg> w)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
&BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
if (do_close_)
return;
if (wq_.size() > port().ws_queue_limit)
{
cr_.code = safe_cast<decltype(cr_.code)>(
boost::beast::websocket::close_code::policy_error);
cr_.reason = "Policy error: client is too slow.";
JLOG(this->j_.info()) << cr_.reason;
wq_.erase(std::next(wq_.begin()), wq_.end());
close(cr_);
return;
}
wq_.emplace_back(std::move(w));
if (wq_.size() == 1)
on_write({});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::close()
{
close(boost::beast::websocket::close_reason{});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::close(
boost::beast::websocket::close_reason const& reason)
{
if (!strand_.running_in_this_thread())
return post(strand_, [self = impl().shared_from_this(), reason] {
self->close(reason);
});
if (do_close_)
return;
do_close_ = true;
if (wq_.empty())
{
impl().ws_.async_close(
reason,
bind_executor(
strand_,
[self = impl().shared_from_this()](
boost::beast::error_code const& ec) {
self->on_close(ec);
}));
}
else
{
cr_ = reason;
}
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::complete()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
do_read();
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ws_handshake(error_code const& ec)
{
if (ec)
return fail(ec, "on_ws_handshake");
close_on_timer_ = false;
do_read();
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::do_write()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
on_write({});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_write(error_code const& ec)
{
if (ec)
return fail(ec, "write");
auto& w = *wq_.front();
auto const result = w.prepare(
65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
if (boost::indeterminate(result.first))
return;
start_timer();
if (!result.first)
impl().ws_.async_write_some(
static_cast<bool>(result.first),
result.second,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_write,
impl().shared_from_this(),
std::placeholders::_1)));
else
impl().ws_.async_write_some(
static_cast<bool>(result.first),
result.second,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_write_fin,
impl().shared_from_this(),
std::placeholders::_1)));
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_write_fin(error_code const& ec)
{
if (ec)
return fail(ec, "write_fin");
wq_.pop_front();
if (do_close_)
{
impl().ws_.async_close(
cr_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_close,
impl().shared_from_this(),
std::placeholders::_1)));
}
else if (!wq_.empty())
on_write({});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::do_read()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
impl().ws_.async_read(
rb_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_read,
impl().shared_from_this(),
std::placeholders::_1)));
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_read(error_code const& ec)
{
if (ec == boost::beast::websocket::error::closed)
return on_close({});
if (ec)
return fail(ec, "read");
auto const& data = rb_.data();
std::vector<boost::asio::const_buffer> b;
b.reserve(std::distance(data.begin(), data.end()));
std::copy(data.begin(), data.end(), std::back_inserter(b));
this->handler_.onWSMessage(impl().shared_from_this(), b);
rb_.consume(rb_.size());
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_close(error_code const& ec)
{
cancel_timer();
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::start_timer()
{
// Max seconds without completing a message
static constexpr std::chrono::seconds timeout{30};
static constexpr std::chrono::seconds timeoutLocal{3};
error_code ec;
timer_.expires_from_now(
remote_endpoint().address().is_loopback() ? timeoutLocal : timeout, ec);
if (ec)
return fail(ec, "start_timer");
timer_.async_wait(bind_executor(
strand_,
std::bind(
&BaseWSPeer<Handler, Impl>::on_timer,
impl().shared_from_this(),
std::placeholders::_1)));
}
// Convenience for discarding the error code
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::cancel_timer()
{
error_code ec;
timer_.cancel(ec);
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ping(error_code const& ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
ping_active_ = false;
if (!ec)
return;
fail(ec, "on_ping");
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ping_pong(
boost::beast::websocket::frame_type kind,
boost::beast::string_view payload)
{
if (kind == boost::beast::websocket::frame_type::pong)
{
boost::beast::string_view p(payload_.begin());
if (payload == p)
{
close_on_timer_ = false;
JLOG(this->j_.trace()) << "got matching pong";
}
else
{
JLOG(this->j_.trace()) << "got pong";
}
}
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_timer(error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (!ec)
{
if (!close_on_timer_ || !ping_active_)
{
start_timer();
close_on_timer_ = true;
ping_active_ = true;
// cryptographic is probably overkill..
beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
impl().ws_.async_ping(
payload_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_ping,
impl().shared_from_this(),
std::placeholders::_1)));
JLOG(this->j_.trace()) << "sent ping";
return;
}
ec = boost::system::errc::make_error_code(
boost::system::errc::timed_out);
}
fail(ec, "timer");
}
template <class Handler, class Impl>
template <class String>
void
BaseWSPeer<Handler, Impl>::fail(error_code ec, String const& what)
{
assert(strand_.running_in_this_thread());
cancel_timer();
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
JLOG(this->j_.trace()) << what << ": " << ec.message();
ripple::get_lowest_layer(impl().ws_).socket().close(ec);
}
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,412 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_DOOR_H_INCLUDED
#define RIPPLE_SERVER_DOOR_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/basics/contract.h>
#include <ripple/server/impl/PlainHTTPPeer.h>
#include <ripple/server/impl/SSLHTTPPeer.h>
#include <ripple/server/impl/io_list.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast/core/detect_ssl.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/container/flat_map.hpp>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
namespace ripple {
/** A listening socket. */
template <class Handler>
class Door : public io_list::work,
public std::enable_shared_from_this<Door<Handler>>
{
private:
using clock_type = std::chrono::steady_clock;
using timer_type = boost::asio::basic_waitable_timer<clock_type>;
using error_code = boost::system::error_code;
using yield_context = boost::asio::yield_context;
using protocol_type = boost::asio::ip::tcp;
using acceptor_type = protocol_type::acceptor;
using endpoint_type = protocol_type::endpoint;
using socket_type = boost::asio::ip::tcp::socket;
using stream_type = boost::beast::tcp_stream;
// Detects SSL on a socket
class Detector : public io_list::work,
public std::enable_shared_from_this<Detector>
{
private:
Port const& port_;
Handler& handler_;
boost::asio::io_context& ioc_;
stream_type stream_;
socket_type& socket_;
endpoint_type remote_address_;
boost::asio::io_context::strand strand_;
beast::Journal const j_;
public:
Detector(
Port const& port,
Handler& handler,
boost::asio::io_context& ioc,
stream_type&& stream,
endpoint_type remote_address,
beast::Journal j);
void
run();
void
close() override;
private:
void
do_detect(yield_context yield);
};
beast::Journal const j_;
Port const& port_;
Handler& handler_;
boost::asio::io_context& ioc_;
acceptor_type acceptor_;
boost::asio::io_context::strand strand_;
bool ssl_;
bool plain_;
void
reOpen();
public:
Door(
Handler& handler,
boost::asio::io_context& io_context,
Port const& port,
beast::Journal j);
// Work-around because we can't call shared_from_this in ctor
void
run();
/** Close the Door listening socket and connections.
The listening socket is closed, and all open connections
belonging to the Door are closed.
Thread Safety:
May be called concurrently
*/
void
close() override;
endpoint_type
get_endpoint() const
{
return acceptor_.local_endpoint();
}
private:
template <class ConstBufferSequence>
void
create(
bool ssl,
ConstBufferSequence const& buffers,
stream_type&& stream,
endpoint_type remote_address);
void
do_accept(yield_context yield);
};
template <class Handler>
Door<Handler>::Detector::Detector(
Port const& port,
Handler& handler,
boost::asio::io_context& ioc,
stream_type&& stream,
endpoint_type remote_address,
beast::Journal j)
: port_(port)
, handler_(handler)
, ioc_(ioc)
, stream_(std::move(stream))
, socket_(stream_.socket())
, remote_address_(remote_address)
, strand_(ioc_)
, j_(j)
{
}
template <class Handler>
void
Door<Handler>::Detector::run()
{
boost::asio::spawn(
strand_,
std::bind(
&Detector::do_detect,
this->shared_from_this(),
std::placeholders::_1));
}
template <class Handler>
void
Door<Handler>::Detector::close()
{
stream_.close();
}
template <class Handler>
void
Door<Handler>::Detector::do_detect(boost::asio::yield_context do_yield)
{
boost::beast::multi_buffer buf(16);
stream_.expires_after(std::chrono::seconds(15));
boost::system::error_code ec;
bool const ssl = async_detect_ssl(stream_, buf, do_yield[ec]);
stream_.expires_never();
if (!ec)
{
if (ssl)
{
if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
port_,
handler_,
ioc_,
j_,
remote_address_,
buf.data(),
std::move(stream_)))
sp->run();
return;
}
if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
port_,
handler_,
ioc_,
j_,
remote_address_,
buf.data(),
std::move(stream_)))
sp->run();
return;
}
if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.trace()) << "Error detecting ssl: " << ec.message() << " from "
<< remote_address_;
}
}
//------------------------------------------------------------------------------
template <class Handler>
void
Door<Handler>::reOpen()
{
error_code ec;
if (acceptor_.is_open())
{
acceptor_.close(ec);
if (ec)
{
std::stringstream ss;
ss << "Can't close acceptor: " << port_.name << ", "
<< ec.message();
JLOG(j_.error()) << ss.str();
Throw<std::runtime_error>(ss.str());
}
}
endpoint_type const local_address = endpoint_type(port_.ip, port_.port);
acceptor_.open(local_address.protocol(), ec);
if (ec)
{
JLOG(j_.error()) << "Open port '" << port_.name
<< "' failed:" << ec.message();
Throw<std::exception>();
}
acceptor_.set_option(
boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec)
{
JLOG(j_.error()) << "Option for port '" << port_.name
<< "' failed:" << ec.message();
Throw<std::exception>();
}
acceptor_.bind(local_address, ec);
if (ec)
{
JLOG(j_.error()) << "Bind port '" << port_.name
<< "' failed:" << ec.message();
Throw<std::exception>();
}
acceptor_.listen(boost::asio::socket_base::max_connections, ec);
if (ec)
{
JLOG(j_.error()) << "Listen on port '" << port_.name
<< "' failed:" << ec.message();
Throw<std::exception>();
}
JLOG(j_.info()) << "Opened " << port_;
}
template <class Handler>
Door<Handler>::Door(
Handler& handler,
boost::asio::io_context& io_context,
Port const& port,
beast::Journal j)
: j_(j)
, port_(port)
, handler_(handler)
, ioc_(io_context)
, acceptor_(io_context)
, strand_(io_context)
, ssl_(
port_.protocol.count("https") > 0 ||
port_.protocol.count("wss") > 0 || port_.protocol.count("wss2") > 0 ||
port_.protocol.count("peer") > 0)
, plain_(
port_.protocol.count("http") > 0 || port_.protocol.count("ws") > 0 ||
port_.protocol.count("ws2"))
{
reOpen();
}
template <class Handler>
void
Door<Handler>::run()
{
boost::asio::spawn(
strand_,
std::bind(
&Door<Handler>::do_accept,
this->shared_from_this(),
std::placeholders::_1));
}
template <class Handler>
void
Door<Handler>::close()
{
if (!strand_.running_in_this_thread())
return strand_.post(
std::bind(&Door<Handler>::close, this->shared_from_this()));
error_code ec;
acceptor_.close(ec);
}
//------------------------------------------------------------------------------
template <class Handler>
template <class ConstBufferSequence>
void
Door<Handler>::create(
bool ssl,
ConstBufferSequence const& buffers,
stream_type&& stream,
endpoint_type remote_address)
{
if (ssl)
{
if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
port_,
handler_,
ioc_,
j_,
remote_address,
buffers,
std::move(stream)))
sp->run();
return;
}
if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
port_,
handler_,
ioc_,
j_,
remote_address,
buffers,
std::move(stream)))
sp->run();
}
template <class Handler>
void
Door<Handler>::do_accept(boost::asio::yield_context do_yield)
{
while (acceptor_.is_open())
{
error_code ec;
endpoint_type remote_address;
stream_type stream(ioc_);
socket_type& socket = stream.socket();
acceptor_.async_accept(socket, remote_address, do_yield[ec]);
if (ec)
{
if (ec == boost::asio::error::operation_aborted)
break;
JLOG(j_.error()) << "accept: " << ec.message();
if (ec == boost::asio::error::no_descriptors)
{
JLOG(j_.info()) << "re-opening acceptor";
reOpen();
}
continue;
}
if (ssl_ && plain_)
{
if (auto sp = ios().template emplace<Detector>(
port_,
handler_,
ioc_,
std::move(stream),
remote_address,
j_))
sp->run();
}
else if (ssl_ || plain_)
{
create(
ssl_,
boost::asio::null_buffers{},
std::move(stream),
remote_address);
}
}
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,37 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_JSONRPCUTIL_H_INCLUDED
#define RIPPLE_SERVER_JSONRPCUTIL_H_INCLUDED
#include <ripple/json/Output.h>
#include <ripple/json/json_value.h>
namespace ripple {
void
HTTPReply(
int nStatus,
std::string const& strMsg,
Json::Output const&,
beast::Journal j);
} // namespace ripple
#endif

View File

@@ -0,0 +1,46 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2019 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_LOWESTLAYER_H_INCLUDED
#define RIPPLE_SERVER_LOWESTLAYER_H_INCLUDED
#if BOOST_VERSION >= 107000
#include <boost/beast/core/stream_traits.hpp>
#else
#include <boost/beast/core/type_traits.hpp>
#endif
namespace ripple {
// Before boost 1.70, get_lowest_layer required an explicit templat parameter
template <class T>
decltype(auto)
get_lowest_layer(T& t) noexcept
{
#if BOOST_VERSION >= 107000
return boost::beast::get_lowest_layer(t);
#else
return t.lowest_layer();
#endif
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,176 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_PLAINHTTPPEER_H_INCLUDED
#define RIPPLE_SERVER_PLAINHTTPPEER_H_INCLUDED
#include <ripple/beast/rfc2616.h>
#include <ripple/server/impl/BaseHTTPPeer.h>
#include <ripple/server/impl/PlainWSPeer.h>
#include <boost/beast/core/tcp_stream.hpp>
#include <memory>
namespace ripple {
template <class Handler>
class PlainHTTPPeer
: public BaseHTTPPeer<Handler, PlainHTTPPeer<Handler>>,
public std::enable_shared_from_this<PlainHTTPPeer<Handler>>
{
private:
friend class BaseHTTPPeer<Handler, PlainHTTPPeer>;
using socket_type = boost::asio::ip::tcp::socket;
using stream_type = boost::beast::tcp_stream;
using endpoint_type = boost::asio::ip::tcp::endpoint;
stream_type stream_;
socket_type& socket_;
public:
template <class ConstBufferSequence>
PlainHTTPPeer(
Port const& port,
Handler& handler,
boost::asio::io_context& ioc,
beast::Journal journal,
endpoint_type remote_address,
ConstBufferSequence const& buffers,
stream_type&& stream);
void
run();
std::shared_ptr<WSSession>
websocketUpgrade() override;
private:
void
do_request() override;
void
do_close() override;
};
//------------------------------------------------------------------------------
template <class Handler>
template <class ConstBufferSequence>
PlainHTTPPeer<Handler>::PlainHTTPPeer(
Port const& port,
Handler& handler,
boost::asio::io_context& ioc,
beast::Journal journal,
endpoint_type remote_endpoint,
ConstBufferSequence const& buffers,
stream_type&& stream)
: BaseHTTPPeer<Handler, PlainHTTPPeer>(
port,
handler,
ioc.get_executor(),
journal,
remote_endpoint,
buffers)
, stream_(std::move(stream))
, socket_(stream_.socket())
{
// Set TCP_NODELAY on loopback interfaces,
// otherwise Nagle's algorithm makes Env
// tests run slower on Linux systems.
//
if (remote_endpoint.address().is_loopback())
socket_.set_option(boost::asio::ip::tcp::no_delay{true});
}
template <class Handler>
void
PlainHTTPPeer<Handler>::run()
{
if (!this->handler_.onAccept(this->session(), this->remote_address_))
{
boost::asio::spawn(
this->strand_,
std::bind(&PlainHTTPPeer::do_close, this->shared_from_this()));
return;
}
if (!socket_.is_open())
return;
boost::asio::spawn(
this->strand_,
std::bind(
&PlainHTTPPeer::do_read,
this->shared_from_this(),
std::placeholders::_1));
}
template <class Handler>
std::shared_ptr<WSSession>
PlainHTTPPeer<Handler>::websocketUpgrade()
{
auto ws = this->ios().template emplace<PlainWSPeer<Handler>>(
this->port_,
this->handler_,
this->remote_address_,
std::move(this->message_),
std::move(stream_),
this->journal_);
return ws;
}
template <class Handler>
void
PlainHTTPPeer<Handler>::do_request()
{
++this->request_count_;
auto const what = this->handler_.onHandoff(
this->session(), std::move(this->message_), this->remote_address_);
if (what.moved)
return;
boost::system::error_code ec;
if (what.response)
{
// half-close on Connection: close
if (!what.keep_alive)
socket_.shutdown(socket_type::shutdown_receive, ec);
if (ec)
return this->fail(ec, "request");
return this->write(what.response, what.keep_alive);
}
// Perform half-close when Connection: close and not SSL
if (!beast::rfc2616::is_keep_alive(this->message_))
socket_.shutdown(socket_type::shutdown_receive, ec);
if (ec)
return this->fail(ec, "request");
// legacy
this->handler_.onRequest(this->session());
}
template <class Handler>
void
PlainHTTPPeer<Handler>::do_close()
{
boost::system::error_code ec;
socket_.shutdown(socket_type::shutdown_send, ec);
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,80 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_PLAINWSPEER_H_INCLUDED
#define RIPPLE_SERVER_PLAINWSPEER_H_INCLUDED
#include <ripple/server/impl/BaseWSPeer.h>
#include <boost/beast/core/tcp_stream.hpp>
#include <memory>
namespace ripple {
template <class Handler>
class PlainWSPeer : public BaseWSPeer<Handler, PlainWSPeer<Handler>>,
public std::enable_shared_from_this<PlainWSPeer<Handler>>
{
friend class BasePeer<Handler, PlainWSPeer>;
friend class BaseWSPeer<Handler, PlainWSPeer>;
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
using socket_type = boost::beast::tcp_stream;
boost::beast::websocket::stream<socket_type> ws_;
public:
template <class Body, class Headers>
PlainWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
boost::beast::http::request<Body, Headers>&& request,
socket_type&& socket,
beast::Journal journal);
};
//------------------------------------------------------------------------------
template <class Handler>
template <class Body, class Headers>
PlainWSPeer<Handler>::PlainWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
boost::beast::http::request<Body, Headers>&& request,
socket_type&& socket,
beast::Journal journal)
: BaseWSPeer<Handler, PlainWSPeer>(
port,
handler,
socket.get_executor(),
waitable_timer{socket.get_executor()},
remote_address,
std::move(request),
journal)
, ws_(std::move(socket))
{
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,226 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SSLHTTPPEER_H_INCLUDED
#define RIPPLE_SERVER_SSLHTTPPEER_H_INCLUDED
#include <ripple/server/impl/BaseHTTPPeer.h>
#include <ripple/server/impl/SSLWSPeer.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/ssl/ssl_stream.hpp>
#include <memory>
namespace ripple {
template <class Handler>
class SSLHTTPPeer : public BaseHTTPPeer<Handler, SSLHTTPPeer<Handler>>,
public std::enable_shared_from_this<SSLHTTPPeer<Handler>>
{
private:
friend class BaseHTTPPeer<Handler, SSLHTTPPeer>;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using yield_context = boost::asio::yield_context;
using error_code = boost::system::error_code;
std::unique_ptr<stream_type> stream_ptr_;
stream_type& stream_;
socket_type& socket_;
public:
template <class ConstBufferSequence>
SSLHTTPPeer(
Port const& port,
Handler& handler,
boost::asio::io_context& ioc,
beast::Journal journal,
endpoint_type remote_address,
ConstBufferSequence const& buffers,
middle_type&& stream);
void
run();
std::shared_ptr<WSSession>
websocketUpgrade() override;
private:
void
do_handshake(yield_context do_yield);
void
do_request() override;
void
do_close() override;
void
on_shutdown(error_code ec);
};
//------------------------------------------------------------------------------
template <class Handler>
template <class ConstBufferSequence>
SSLHTTPPeer<Handler>::SSLHTTPPeer(
Port const& port,
Handler& handler,
boost::asio::io_context& ioc,
beast::Journal journal,
endpoint_type remote_address,
ConstBufferSequence const& buffers,
middle_type&& stream)
: BaseHTTPPeer<Handler, SSLHTTPPeer>(
port,
handler,
ioc.get_executor(),
journal,
remote_address,
buffers)
, stream_ptr_(std::make_unique<stream_type>(
middle_type(std::move(stream)),
*port.context))
, stream_(*stream_ptr_)
, socket_(stream_.next_layer().socket())
{
}
// Called when the acceptor accepts our socket.
template <class Handler>
void
SSLHTTPPeer<Handler>::run()
{
if (!this->handler_.onAccept(this->session(), this->remote_address_))
{
boost::asio::spawn(
this->strand_,
std::bind(&SSLHTTPPeer::do_close, this->shared_from_this()));
return;
}
if (!socket_.is_open())
return;
boost::asio::spawn(
this->strand_,
std::bind(
&SSLHTTPPeer::do_handshake,
this->shared_from_this(),
std::placeholders::_1));
}
template <class Handler>
std::shared_ptr<WSSession>
SSLHTTPPeer<Handler>::websocketUpgrade()
{
auto ws = this->ios().template emplace<SSLWSPeer<Handler>>(
this->port_,
this->handler_,
this->remote_address_,
std::move(this->message_),
std::move(this->stream_ptr_),
this->journal_);
return ws;
}
template <class Handler>
void
SSLHTTPPeer<Handler>::do_handshake(yield_context do_yield)
{
boost::system::error_code ec;
stream_.set_verify_mode(boost::asio::ssl::verify_none);
this->start_timer();
this->read_buf_.consume(stream_.async_handshake(
stream_type::server, this->read_buf_.data(), do_yield[ec]));
this->cancel_timer();
if (ec == boost::beast::error::timeout)
return this->on_timer();
if (ec)
return this->fail(ec, "handshake");
bool const http = this->port().protocol.count("peer") > 0 ||
this->port().protocol.count("wss") > 0 ||
this->port().protocol.count("wss2") > 0 ||
this->port().protocol.count("https") > 0;
if (http)
{
boost::asio::spawn(
this->strand_,
std::bind(
&SSLHTTPPeer::do_read,
this->shared_from_this(),
std::placeholders::_1));
return;
}
// `this` will be destroyed
}
template <class Handler>
void
SSLHTTPPeer<Handler>::do_request()
{
++this->request_count_;
auto const what = this->handler_.onHandoff(
this->session(),
std::move(stream_ptr_),
std::move(this->message_),
this->remote_address_);
if (what.moved)
return;
if (what.response)
return this->write(what.response, what.keep_alive);
// legacy
this->handler_.onRequest(this->session());
}
template <class Handler>
void
SSLHTTPPeer<Handler>::do_close()
{
this->start_timer();
stream_.async_shutdown(bind_executor(
this->strand_,
std::bind(
&SSLHTTPPeer::on_shutdown,
this->shared_from_this(),
std::placeholders::_1)));
}
template <class Handler>
void
SSLHTTPPeer<Handler>::on_shutdown(error_code ec)
{
this->cancel_timer();
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
{
JLOG(this->journal_.debug()) << "on_shutdown: " << ec.message();
}
// Close socket now in case this->destructor is delayed
stream_.next_layer().close();
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,89 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SSLWSPEER_H_INCLUDED
#define RIPPLE_SERVER_SSLWSPEER_H_INCLUDED
#include <ripple/server/WSSession.h>
#include <ripple/server/impl/BaseHTTPPeer.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/ssl/ssl_stream.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <memory>
namespace ripple {
template <class Handler>
class SSLWSPeer : public BaseWSPeer<Handler, SSLWSPeer<Handler>>,
public std::enable_shared_from_this<SSLWSPeer<Handler>>
{
friend class BasePeer<Handler, SSLWSPeer>;
friend class BaseWSPeer<Handler, SSLWSPeer>;
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using socket_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<socket_type>;
using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
std::unique_ptr<stream_type> stream_ptr_;
boost::beast::websocket::stream<stream_type&> ws_;
public:
template <class Body, class Headers>
SSLWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_endpoint,
boost::beast::http::request<Body, Headers>&& request,
std::unique_ptr<stream_type>&& stream_ptr,
beast::Journal journal);
};
//------------------------------------------------------------------------------
template <class Handler>
template <class Body, class Headers>
SSLWSPeer<Handler>::SSLWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_endpoint,
boost::beast::http::request<Body, Headers>&& request,
std::unique_ptr<stream_type>&& stream_ptr,
beast::Journal journal)
: BaseWSPeer<Handler, SSLWSPeer>(
port,
handler,
stream_ptr->get_executor(),
waitable_timer{stream_ptr->get_executor()},
remote_endpoint,
std::move(request),
journal)
, stream_ptr_(std::move(stream_ptr))
, ws_(*stream_ptr_)
{
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,218 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SERVERIMPL_H_INCLUDED
#define RIPPLE_SERVER_SERVERIMPL_H_INCLUDED
#include <ripple/basics/chrono.h>
#include <ripple/beast/core/List.h>
#include <ripple/server/Server.h>
#include <ripple/server/impl/Door.h>
#include <ripple/server/impl/UDPDoor.h>
#include <ripple/server/impl/io_list.h>
#include <boost/asio.hpp>
#include <array>
#include <chrono>
#include <mutex>
#include <optional>
namespace ripple {
using Endpoints = std::vector<boost::asio::ip::tcp::endpoint>;
/** A multi-protocol server.
This server maintains multiple configured listening ports,
with each listening port allows for multiple protocols including
HTTP, HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol.
*/
class Server
{
public:
/** Destroy the server.
The server is closed if it is not already closed. This call
blocks until the server has stopped.
*/
virtual ~Server() = default;
/** Returns the Journal associated with the server. */
virtual beast::Journal
journal() = 0;
/** Set the listening port settings.
This may only be called once.
*/
virtual Endpoints
ports(std::vector<Port> const& v) = 0;
/** Close the server.
The close is performed asynchronously. The handler will be notified
when the server has stopped. The server is considered stopped when
there are no pending I/O completion handlers and all connections
have closed.
Thread safety:
Safe to call concurrently from any thread.
*/
virtual void
close() = 0;
};
template <class Handler>
class ServerImpl : public Server
{
private:
using clock_type = std::chrono::system_clock;
enum { historySize = 100 };
Handler& handler_;
beast::Journal const j_;
boost::asio::io_service& io_service_;
boost::asio::io_service::strand strand_;
std::optional<boost::asio::io_service::work> work_;
std::mutex m_;
std::vector<Port> ports_;
std::vector<std::weak_ptr<Door<Handler>>> list_;
int high_ = 0;
std::array<std::size_t, 64> hist_;
io_list ios_;
public:
ServerImpl(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal);
~ServerImpl();
beast::Journal
journal() override
{
return j_;
}
Endpoints
ports(std::vector<Port> const& ports) override;
void
close() override;
io_list&
ios()
{
return ios_;
}
boost::asio::io_service&
get_io_service()
{
return io_service_;
}
bool
closed();
private:
static int
ceil_log2(unsigned long long x);
};
template <class Handler>
ServerImpl<Handler>::ServerImpl(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal)
: handler_(handler)
, j_(journal)
, io_service_(io_service)
, strand_(io_service_)
, work_(io_service_)
{
}
template <class Handler>
ServerImpl<Handler>::~ServerImpl()
{
// Handler::onStopped will not be called
work_ = std::nullopt;
ios_.close();
ios_.join();
}
template <class Handler>
Endpoints
ServerImpl<Handler>::ports(std::vector<Port> const& ports)
{
if (closed())
Throw<std::logic_error>("ports() on closed Server");
ports_.reserve(ports.size());
Endpoints eps;
eps.reserve(ports.size());
for (auto const& port : ports)
{
ports_.push_back(port);
if (port.has_udp())
{
// UDP-RPC door
if (auto sp = ios_.emplace<UDPDoor<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
eps.push_back(sp->get_endpoint());
sp->run();
}
}
else
{
// Standard TCP door
if (auto sp = ios_.emplace<Door<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
list_.push_back(sp);
eps.push_back(sp->get_endpoint());
sp->run();
}
}
}
return eps;
}
template <class Handler>
void
ServerImpl<Handler>::close()
{
ios_.close([&] {
work_ = std::nullopt;
handler_.onStopped(*this);
});
}
template <class Handler>
bool
ServerImpl<Handler>::closed()
{
return ios_.closed();
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,267 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_IO_LIST_H_INCLUDED
#define RIPPLE_SERVER_IO_LIST_H_INCLUDED
#include <boost/container/flat_map.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <type_traits>
#include <utility>
namespace ripple {
/** Manages a set of objects performing asynchronous I/O. */
class io_list final
{
public:
class work
{
template <class = void>
void
destroy();
friend class io_list;
io_list* ios_ = nullptr;
public:
virtual ~work()
{
destroy();
}
/** Return the io_list associated with the work.
Requirements:
The call to io_list::emplace to
create the work has already returned.
*/
io_list&
ios()
{
return *ios_;
}
virtual void
close() = 0;
};
private:
template <class = void>
void
destroy();
std::mutex m_;
std::size_t n_ = 0;
bool closed_ = false;
std::condition_variable cv_;
boost::container::flat_map<work*, std::weak_ptr<work>> map_;
std::function<void(void)> f_;
public:
io_list() = default;
/** Destroy the list.
Effects:
Closes the io_list if it was not previously
closed. No finisher is invoked in this case.
Blocks until all work is destroyed.
*/
~io_list()
{
destroy();
}
/** Return `true` if the list is closed.
Thread Safety:
Undefined result if called concurrently
with close().
*/
bool
closed() const
{
return closed_;
}
/** Create associated work if not closed.
Requirements:
`std::is_base_of_v<work, T> == true`
Thread Safety:
May be called concurrently.
Effects:
Atomically creates, inserts, and returns new
work T, or returns nullptr if the io_list is
closed,
If the call succeeds and returns a new object,
it is guaranteed that a subsequent call to close
will invoke work::close on the object.
*/
template <class T, class... Args>
std::shared_ptr<T>
emplace(Args&&... args);
/** Cancel active I/O.
Thread Safety:
May not be called concurrently.
Effects:
Associated work is closed.
Finisher if provided, will be called when
all associated work is destroyed. The finisher
may be called from a foreign thread, or within
the call to this function.
Only the first call to close will set the
finisher.
No effect after the first call.
*/
template <class Finisher>
void
close(Finisher&& f);
void
close()
{
close([] {});
}
/** Block until the io_list stops.
Effects:
The caller is blocked until the io_list is
closed and all associated work is destroyed.
Thread safety:
May be called concurrently.
Preconditions:
No call to io_service::run on any io_service
used by work objects associated with this io_list
exists in the caller's call stack.
*/
template <class = void>
void
join();
};
//------------------------------------------------------------------------------
template <class>
void
io_list::work::destroy()
{
if (!ios_)
return;
std::function<void(void)> f;
{
std::lock_guard lock(ios_->m_);
ios_->map_.erase(this);
if (--ios_->n_ == 0 && ios_->closed_)
{
std::swap(f, ios_->f_);
ios_->cv_.notify_all();
}
}
if (f)
f();
}
template <class>
void
io_list::destroy()
{
close();
join();
}
template <class T, class... Args>
std::shared_ptr<T>
io_list::emplace(Args&&... args)
{
static_assert(
std::is_base_of<work, T>::value, "T must derive from io_list::work");
if (closed_)
return nullptr;
auto sp = std::make_shared<T>(std::forward<Args>(args)...);
decltype(sp) dead;
std::lock_guard lock(m_);
if (!closed_)
{
++n_;
sp->work::ios_ = this;
map_.emplace(sp.get(), sp);
}
else
{
std::swap(sp, dead);
}
return sp;
}
template <class Finisher>
void
io_list::close(Finisher&& f)
{
std::unique_lock<std::mutex> lock(m_);
if (closed_)
return;
closed_ = true;
auto map = std::move(map_);
if (!map.empty())
{
f_ = std::forward<Finisher>(f);
lock.unlock();
for (auto const& p : map)
if (auto sp = p.second.lock())
sp->close();
}
else
{
lock.unlock();
f();
}
}
template <class>
void
io_list::join()
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] { return closed_ && n_ == 0; });
}
} // namespace ripple
#endif