New ResourceManager for managing server load.

* Track abusive endpoints
* Gossip across cluster.
* Use resource manager's gossip support to share load reporting across a cluster
* Swtich from legacy fees to new Resource::Charge fees.
* Connect RPC to the new resource manager.
* Set load levels where needed in RPC/websocket commands.
* Disconnect abusive peer endpoints.
* Don't start conversations with abusive peer endpoints.
* Move Resource::Consumer to InfoSub and remove LoadSource
* Remove port from inbound Consumer keys
* Add details in getJson
* Fix doAccountCurrencies for the new resource manager.
This commit is contained in:
David Schwartz
2013-10-29 17:35:47 -07:00
committed by Vinnie Falco
parent a05f33f6a7
commit 58f07a573f
48 changed files with 665 additions and 759 deletions

View File

@@ -900,7 +900,8 @@ public:
{
// TODO: read timeout timer?
socket_type::get_socket().async_read(
async_read(
socket_type::get_socket(),
m_buf,
boost::asio::transfer_at_least(std::min(
m_read_threshold,
@@ -1209,7 +1210,7 @@ public:
//m_endpoint.alog().at(log::alevel::DEVEL) << "write header: " << zsutil::to_hex(m_write_queue.front()->get_header()) << log::endl;
socket_type::get_socket().async_write(
async_write(socket_type::get_socket(),
m_write_buf,
m_strand.wrap(boost::bind(
&type::handle_write,

View File

@@ -29,7 +29,7 @@
#define WEBSOCKETPP_ENDPOINT_HPP
#include "connection.hpp"
#include "sockets/autotls.hpp" // should this be here?
#include "sockets/multitls.hpp" // should this be here?
#include "logger/logger.hpp"
#include <boost/asio.hpp>
@@ -74,7 +74,7 @@ protected:
*/
template <
template <class> class role,
template <class> class socket = socket::autotls,
template <class> class socket = socket::multitls,
template <class> class logger = log::logger>
class endpoint
: public endpoint_base,

View File

@@ -410,7 +410,7 @@ void server<endpoint>::start_accept() {
}
m_acceptor.async_accept(
con->get_raw_socket(),
con->get_native_socket (),
boost::bind(
&type::handle_accept,
this,
@@ -427,7 +427,7 @@ template <class endpoint>
void server<endpoint>::handle_accept(connection_ptr con,
const boost::system::error_code& error)
{
bool delay = false;
bool delay = false;
boost::lock_guard<boost::recursive_mutex> lock(m_endpoint.m_lock);
@@ -554,18 +554,19 @@ void server<endpoint>::connection<connection_type>::async_init() {
static boost::arg<1> pl1;
static boost::arg<2> pl2;
boost::shared_ptr<std::string> stringPtr = boost::make_shared<std::string>();
m_connection.get_socket().async_read_until(
m_connection.buffer(),
boost::bind(&match_header, stringPtr, pl1, pl2),
m_connection.get_strand().wrap(boost::bind(
&type::handle_read_request,
m_connection.shared_from_this(),
stringPtr,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
))
);
boost::shared_ptr<std::string> stringPtr = boost::make_shared<std::string>();
async_read_until(
m_connection.get_socket(),
m_connection.buffer(),
boost::bind(&match_header, stringPtr, pl1, pl2),
m_connection.get_strand().wrap(boost::bind(
&type::handle_read_request,
m_connection.shared_from_this(),
stringPtr,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
))
);
}
/// processes the response from an async read for an HTTP header
@@ -598,13 +599,14 @@ void server<endpoint>::connection<connection_type>::handle_read_request(
std::string reply =
"<?xml version=\"1.0\"?><cross-domain-policy>"
"<allow-access-from domain=\"*\" to-ports=\"";
reply += beast::lexicalCastThrow <std::string>(m_connection.get_raw_socket().local_endpoint().port());
reply += "\"/></cross-domain-policy>";
reply.append("\0", 1);
reply += beast::lexicalCastThrow <std::string>(m_connection.get_native_socket().local_endpoint().port());
reply += "\"/></cross-domain-policy>";
reply.append("\0", 1);
m_version = -1;
shared_const_buffer buffer(reply);
m_connection.get_socket().async_write(
async_write(
m_connection.get_socket(),
shared_const_buffer(reply),
boost::bind(
&type::handle_write_response,
@@ -862,7 +864,8 @@ void server<endpoint>::connection<connection_type>::write_response() {
m_endpoint.m_alog->at(log::alevel::DEBUG_HANDSHAKE) << raw << log::endl;
m_connection.get_socket().async_write(
async_write(
m_connection.get_socket(),
//boost::asio::buffer(raw),
buffer,
boost::bind(
@@ -928,8 +931,8 @@ void server<endpoint>::connection<connection_type>::log_open_result() {
version << "v" << m_version << " ";
std::string remote;
boost::system::error_code ec;
boost::asio::ip::tcp::endpoint ep = m_connection.get_raw_socket().remote_endpoint(ec);
boost::system::error_code ec; // FIXME: proxy
boost::asio::ip::tcp::endpoint ep = m_connection.get_native_socket().remote_endpoint(ec);
if (ec) {
m_endpoint.m_elog->at(log::elevel::WARN)
<< "Error getting remote endpoint. code: " << ec << log::endl;

View File

@@ -0,0 +1,173 @@
/*
* Copyright (c) 2011, Peter Thorson. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the WebSocket++ Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef WEBSOCKETPP_SOCKET_MULTITLS_HPP
#define WEBSOCKETPP_SOCKET_MULTITLS_HPP
namespace websocketpp {
namespace socket {
template <typename endpoint_type>
class multitls {
public:
typedef multitls<endpoint_type> type;
typedef ripple::MultiSocket multitls_socket;
typedef boost::asio::ip::tcp::socket native_socket_t;
typedef boost::shared_ptr<multitls_socket> multitls_socket_ptr;
// should be private friended
boost::asio::io_service& get_io_service() {
return m_io_service;
}
static void handle_shutdown(multitls_socket_ptr, const boost::system::error_code&) {
}
void set_secure_only() {
m_secure_only = true;
}
void set_plain_only() {
m_plain_only = true;
}
// should be private friended?
multitls_socket::handshake_type get_handshake_type() {
if (static_cast< endpoint_type* >(this)->is_server()) {
return boost::asio::ssl::stream_base::server;
} else {
return boost::asio::ssl::stream_base::client;
}
}
class handler_interface {
public:
virtual ~handler_interface() {}
virtual void on_tcp_init() {};
virtual boost::asio::ssl::context& get_ssl_context () = 0;
virtual bool get_proxy() = 0;
};
// Connection specific details
template <typename connection_type>
class connection {
public:
multitls_socket& get_socket() {
return *m_socket_ptr;
}
native_socket_t& get_native_socket() {
return m_socket_ptr->next_layer<native_socket_t> ();
}
bool is_secure() {
return m_socket_ptr->ssl_handle() != NULL;
}
protected:
connection(multitls<endpoint_type>& e)
: m_endpoint(e)
, m_connection(static_cast< connection_type& >(*this)) {}
void init() {
boost::asio::ssl::context& ssl_context (
m_connection.get_handler()->get_ssl_context());
int flags = multitls_socket::Flag::server_role |
(m_endpoint.m_secure_only ? multitls_socket::Flag::ssl_required : 0) |
(m_endpoint.m_plain_only ? 0 : multitls_socket::Flag::ssl);
if (m_connection.get_handler()->get_proxy ())
flags |= multitls_socket::Flag::proxy;
m_socket_ptr = multitls_socket_ptr (multitls_socket::New (
m_endpoint.get_io_service(), ssl_context, flags ) );
}
void async_init(boost::function<void(const boost::system::error_code&)> callback)
{
m_connection.get_handler()->on_tcp_init();
// wait for TLS handshake
// TODO: configurable value
m_connection.register_timeout(5000,
fail::status::TIMEOUT_TLS,
"Timeout on TLS handshake");
m_socket_ptr->async_handshake(
m_endpoint.get_handshake_type(),
boost::bind(
&connection<connection_type>::handle_init,
this,
callback,
boost::asio::placeholders::error
)
);
}
void handle_init(socket_init_callback callback,const boost::system::error_code& error) {
m_connection.cancel_timeout();
callback(error);
}
// note, this function for some reason shouldn't/doesn't need to be
// called for plain HTTP connections. not sure why.
bool shutdown() {
boost::system::error_code ignored_ec;
m_socket_ptr->async_shutdown( // Don't block on connection shutdown DJS
boost::bind(
&multitls<endpoint_type>::handle_shutdown,
m_socket_ptr,
boost::asio::placeholders::error
)
);
if (ignored_ec) {
return false;
} else {
return true;
}
}
private:
boost::shared_ptr<boost::asio::ssl::context> m_context_ptr;
multitls_socket_ptr m_socket_ptr;
multitls<endpoint_type>& m_endpoint;
connection_type& m_connection;
};
protected:
multitls (boost::asio::io_service& m) : m_io_service(m), m_secure_only(false), m_plain_only(false) {}
private:
boost::asio::io_service& m_io_service;
bool m_secure_only;
bool m_plain_only;
};
} // namespace socket
} // namespace websocketpp
#endif // WEBSOCKETPP_SOCKET_MULTITLS_HPP

View File

@@ -41,9 +41,9 @@ namespace websocketpp {
typedef websocketpp::endpoint<websocketpp::role::server,
websocketpp::socket::tls> server_tls;
#endif
#ifdef WEBSOCKETPP_SOCKET_AUTOTLS_HPP
#ifdef WEBSOCKETPP_SOCKET_MULTITLS_HPP
typedef websocketpp::endpoint<websocketpp::role::server,
websocketpp::socket::autotls> server_autotls;
websocketpp::socket::multitls> server_multitls;
#endif
#endif