mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
docs and concurrency clean up
This commit is contained in:
117
src/endpoint.hpp
117
src/endpoint.hpp
@@ -129,8 +129,9 @@ public:
|
||||
explicit endpoint(handler_ptr handler)
|
||||
: role_type(endpoint_base::m_io_service),
|
||||
socket_type(endpoint_base::m_io_service),
|
||||
m_state(IDLE),
|
||||
m_handler(handler),
|
||||
m_read_threshold(DEFAULT_READ_THRESHOLD),
|
||||
m_state(IDLE),
|
||||
m_pool(new message::pool<message::data>(1000)),
|
||||
m_pool_control(new message::pool<message::data>(SIZE_MAX))
|
||||
{
|
||||
@@ -144,7 +145,7 @@ public:
|
||||
m_pool->set_callback(NULL);
|
||||
|
||||
// Detach any connections that are still alive at this point
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
typename std::set<connection_ptr>::iterator it;
|
||||
|
||||
@@ -160,6 +161,10 @@ public:
|
||||
|
||||
/// Returns a reference to the endpoint's access logger.
|
||||
/**
|
||||
* Visibility: public
|
||||
* State: Any
|
||||
* Concurrency: Callable from anywhere
|
||||
*
|
||||
* @return A reference to the endpoint's access logger. See @ref logger
|
||||
* for more details about WebSocket++ logging policy classes.
|
||||
*
|
||||
@@ -188,13 +193,29 @@ public:
|
||||
return m_elog;
|
||||
}
|
||||
|
||||
/// Updates the default handler to be used for future connections
|
||||
/// Get default handler
|
||||
/**
|
||||
* Visibility: public
|
||||
* State: valid always
|
||||
* Concurrency: callable from anywhere
|
||||
*
|
||||
* @return A pointer to the default handler
|
||||
*/
|
||||
handler_ptr get_handler() const {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
return m_handler;
|
||||
}
|
||||
|
||||
/// Sets the default handler to be used for future connections
|
||||
/**
|
||||
* Does not affect existing connections.
|
||||
*
|
||||
* @param new_handler A shared pointer to the new default handler. Must not
|
||||
* be NULL.
|
||||
*/
|
||||
void set_handler(handler_ptr new_handler) {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
if (!new_handler) {
|
||||
elog().at(log::elevel::FATAL)
|
||||
@@ -205,6 +226,46 @@ public:
|
||||
m_handler = new_handler;
|
||||
}
|
||||
|
||||
/// Set endpoint read threshold
|
||||
/**
|
||||
* Sets the default read threshold value that will be passed to new connections.
|
||||
* Changing this value will only affect new connections, not existing ones. The read
|
||||
* threshold represents the largest block of payload bytes that will be processed in
|
||||
* a single async read. Lower values may experience better callback latency at the
|
||||
* expense of additional ASIO context switching overhead. This value also affects the
|
||||
* maximum number of bytes to be buffered before performing utf8 and other streaming
|
||||
* validation.
|
||||
*
|
||||
* Visibility: public
|
||||
* State: valid always
|
||||
* Concurrency: callable from anywhere
|
||||
*
|
||||
* @param val Size of the threshold in bytes
|
||||
*/
|
||||
void set_read_threshold(size_t val) {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
m_read_threshold = val;
|
||||
}
|
||||
|
||||
/// Get endpoint read threshold
|
||||
/**
|
||||
* Returns the endpoint read threshold. See set_read_threshold for more information
|
||||
* about the read threshold.
|
||||
*
|
||||
* Visibility: public
|
||||
* State: valid always
|
||||
* Concurrency: callable from anywhere
|
||||
*
|
||||
* @return Size of the threshold in bytes
|
||||
* @see set_read_threshold()
|
||||
*/
|
||||
size_t get_read_threshold() const {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
return m_read_threshold;
|
||||
}
|
||||
|
||||
/// Cleanly closes all websocket connections
|
||||
/**
|
||||
* Sends a close signal to every connection with the specified code and
|
||||
@@ -219,7 +280,7 @@ public:
|
||||
void close_all(close::status::value code = close::status::GOING_AWAY,
|
||||
const std::string& reason = "")
|
||||
{
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
alog().at(log::alevel::ENDPOINT)
|
||||
<< "Endpoint received signal to close all connections cleanly with code "
|
||||
@@ -247,7 +308,11 @@ public:
|
||||
* If clean is true stop will use code and reason for the close code and
|
||||
* close reason when it closes open connections. The default code is
|
||||
* 1001/Going Away and the default reason is blank.
|
||||
*
|
||||
*
|
||||
* Visibility: public
|
||||
* State: Valid from RUNNING only
|
||||
* Concurrency: Callable from anywhere
|
||||
*
|
||||
* @param clean Whether or not to wait until all connections have been
|
||||
* cleanly closed to stop io_service operations.
|
||||
* @param code The WebSocket close code to send to remote clients as the
|
||||
@@ -259,7 +324,7 @@ public:
|
||||
close::status::value code = close::status::GOING_AWAY,
|
||||
const std::string& reason = "")
|
||||
{
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
if (clean) {
|
||||
alog().at(log::alevel::ENDPOINT)
|
||||
@@ -287,17 +352,21 @@ protected:
|
||||
* If the endpoint is in a state where it is trying to stop or has already
|
||||
* stopped an empty shared pointer is returned.
|
||||
*
|
||||
* Visibility: protected
|
||||
* State: Always valid, behavior differs based on state
|
||||
* Concurrency: Callable from anywhere
|
||||
*
|
||||
* @return A shared pointer to the newly created connection or an empty
|
||||
* shared pointer if one could not be created.
|
||||
*/
|
||||
connection_ptr create_connection() {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
if (m_state == STOPPING || m_state == STOPPED) {
|
||||
return connection_ptr();
|
||||
}
|
||||
|
||||
connection_ptr new_connection(new connection_type(*this,get_handler()));
|
||||
connection_ptr new_connection(new connection_type(*this,m_handler));
|
||||
m_connections.insert(new_connection);
|
||||
|
||||
alog().at(log::alevel::DEVEL) << "Connection created: count is now: "
|
||||
@@ -315,12 +384,18 @@ protected:
|
||||
* the shared pointer the connection will be freed and any state it
|
||||
* contained (close code status, etc) will be lost.
|
||||
*
|
||||
* Visibility: protected
|
||||
* State: Always valid, behavior differs based on state
|
||||
* Concurrency: Callable from anywhere
|
||||
*
|
||||
* @param con A shared pointer to a connection created by this endpoint.
|
||||
*/
|
||||
void remove_connection(connection_ptr con) {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
// TODO: is this safe to use?
|
||||
// Detaching signals to the connection that the endpoint is no longer aware of it
|
||||
// and it is no longer safe to assume the endpoint exists.
|
||||
con->detach();
|
||||
|
||||
m_connections.erase(con);
|
||||
@@ -338,11 +413,6 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets a shared pointer to this endpoint's default connection handler
|
||||
handler_ptr get_handler() {
|
||||
return m_handler;
|
||||
}
|
||||
|
||||
/// Gets a shared pointer to a read/write data message.
|
||||
// TODO: thread safety
|
||||
message::data::ptr get_data_message() {
|
||||
@@ -358,7 +428,7 @@ protected:
|
||||
/// Asks the endpoint to restart this connection's handle_read_frame loop
|
||||
/// when there are avaliable data messages.
|
||||
void wait(connection_ptr con) {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
m_read_waiting.push(con);
|
||||
alog().at(log::alevel::DEVEL) << "connection " << con << " is waiting. " << m_read_waiting.size() << log::endl;
|
||||
@@ -367,7 +437,7 @@ protected:
|
||||
/// Message pool callback indicating that there is a free data message
|
||||
/// avaliable. Causes one waiting connection to get restarted.
|
||||
void on_new_message() {
|
||||
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
|
||||
boost::lock_guard<boost::recursive_mutex> lock(m_lock);
|
||||
|
||||
if (!m_read_waiting.empty()) {
|
||||
connection_ptr next = m_read_waiting.front();
|
||||
@@ -380,10 +450,6 @@ protected:
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
boost::recursive_mutex& get_lock() {
|
||||
return m_lock;
|
||||
}
|
||||
private:
|
||||
enum state {
|
||||
IDLE = 0,
|
||||
@@ -392,8 +458,13 @@ private:
|
||||
STOPPED = 3
|
||||
};
|
||||
|
||||
state m_state;
|
||||
// default settings to pass to connections
|
||||
handler_ptr m_handler;
|
||||
size_t m_read_threshold;
|
||||
|
||||
// other stuff
|
||||
state m_state;
|
||||
|
||||
std::set<connection_ptr> m_connections;
|
||||
alogger_type m_alog;
|
||||
elogger_type m_elog;
|
||||
@@ -404,7 +475,7 @@ private:
|
||||
std::queue<connection_ptr> m_read_waiting;
|
||||
|
||||
// concurrency support
|
||||
boost::recursive_mutex m_lock;
|
||||
mutable boost::recursive_mutex m_lock;
|
||||
};
|
||||
|
||||
/// traits class that allows looking up relevant endpoint types by the fully
|
||||
|
||||
Reference in New Issue
Block a user