From 3cef677c31a80f50fc93a60ce18722db534f1366 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Mon, 19 Mar 2012 16:20:27 -0600 Subject: [PATCH] docs and concurrency clean up --- src/endpoint.hpp | 117 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 94 insertions(+), 23 deletions(-) diff --git a/src/endpoint.hpp b/src/endpoint.hpp index 4021e0f917..985fb2fbe4 100644 --- a/src/endpoint.hpp +++ b/src/endpoint.hpp @@ -129,8 +129,9 @@ public: explicit endpoint(handler_ptr handler) : role_type(endpoint_base::m_io_service), socket_type(endpoint_base::m_io_service), - m_state(IDLE), m_handler(handler), + m_read_threshold(DEFAULT_READ_THRESHOLD), + m_state(IDLE), m_pool(new message::pool(1000)), m_pool_control(new message::pool(SIZE_MAX)) { @@ -144,7 +145,7 @@ public: m_pool->set_callback(NULL); // Detach any connections that are still alive at this point - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); typename std::set::iterator it; @@ -160,6 +161,10 @@ public: /// Returns a reference to the endpoint's access logger. /** + * Visibility: public + * State: Any + * Concurrency: Callable from anywhere + * * @return A reference to the endpoint's access logger. See @ref logger * for more details about WebSocket++ logging policy classes. * @@ -188,13 +193,29 @@ public: return m_elog; } - /// Updates the default handler to be used for future connections + /// Get default handler /** + * Visibility: public + * State: valid always + * Concurrency: callable from anywhere + * + * @return A pointer to the default handler + */ + handler_ptr get_handler() const { + boost::lock_guard lock(m_lock); + + return m_handler; + } + + /// Sets the default handler to be used for future connections + /** + * Does not affect existing connections. + * * @param new_handler A shared pointer to the new default handler. Must not * be NULL. */ void set_handler(handler_ptr new_handler) { - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); if (!new_handler) { elog().at(log::elevel::FATAL) @@ -205,6 +226,46 @@ public: m_handler = new_handler; } + /// Set endpoint read threshold + /** + * Sets the default read threshold value that will be passed to new connections. + * Changing this value will only affect new connections, not existing ones. The read + * threshold represents the largest block of payload bytes that will be processed in + * a single async read. Lower values may experience better callback latency at the + * expense of additional ASIO context switching overhead. This value also affects the + * maximum number of bytes to be buffered before performing utf8 and other streaming + * validation. + * + * Visibility: public + * State: valid always + * Concurrency: callable from anywhere + * + * @param val Size of the threshold in bytes + */ + void set_read_threshold(size_t val) { + boost::lock_guard lock(m_lock); + + m_read_threshold = val; + } + + /// Get endpoint read threshold + /** + * Returns the endpoint read threshold. See set_read_threshold for more information + * about the read threshold. + * + * Visibility: public + * State: valid always + * Concurrency: callable from anywhere + * + * @return Size of the threshold in bytes + * @see set_read_threshold() + */ + size_t get_read_threshold() const { + boost::lock_guard lock(m_lock); + + return m_read_threshold; + } + /// Cleanly closes all websocket connections /** * Sends a close signal to every connection with the specified code and @@ -219,7 +280,7 @@ public: void close_all(close::status::value code = close::status::GOING_AWAY, const std::string& reason = "") { - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); alog().at(log::alevel::ENDPOINT) << "Endpoint received signal to close all connections cleanly with code " @@ -247,7 +308,11 @@ public: * If clean is true stop will use code and reason for the close code and * close reason when it closes open connections. The default code is * 1001/Going Away and the default reason is blank. - * + * + * Visibility: public + * State: Valid from RUNNING only + * Concurrency: Callable from anywhere + * * @param clean Whether or not to wait until all connections have been * cleanly closed to stop io_service operations. * @param code The WebSocket close code to send to remote clients as the @@ -259,7 +324,7 @@ public: close::status::value code = close::status::GOING_AWAY, const std::string& reason = "") { - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); if (clean) { alog().at(log::alevel::ENDPOINT) @@ -287,17 +352,21 @@ protected: * If the endpoint is in a state where it is trying to stop or has already * stopped an empty shared pointer is returned. * + * Visibility: protected + * State: Always valid, behavior differs based on state + * Concurrency: Callable from anywhere + * * @return A shared pointer to the newly created connection or an empty * shared pointer if one could not be created. */ connection_ptr create_connection() { - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); if (m_state == STOPPING || m_state == STOPPED) { return connection_ptr(); } - connection_ptr new_connection(new connection_type(*this,get_handler())); + connection_ptr new_connection(new connection_type(*this,m_handler)); m_connections.insert(new_connection); alog().at(log::alevel::DEVEL) << "Connection created: count is now: " @@ -315,12 +384,18 @@ protected: * the shared pointer the connection will be freed and any state it * contained (close code status, etc) will be lost. * + * Visibility: protected + * State: Always valid, behavior differs based on state + * Concurrency: Callable from anywhere + * * @param con A shared pointer to a connection created by this endpoint. */ void remove_connection(connection_ptr con) { - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); // TODO: is this safe to use? + // Detaching signals to the connection that the endpoint is no longer aware of it + // and it is no longer safe to assume the endpoint exists. con->detach(); m_connections.erase(con); @@ -338,11 +413,6 @@ protected: } } - /// Gets a shared pointer to this endpoint's default connection handler - handler_ptr get_handler() { - return m_handler; - } - /// Gets a shared pointer to a read/write data message. // TODO: thread safety message::data::ptr get_data_message() { @@ -358,7 +428,7 @@ protected: /// Asks the endpoint to restart this connection's handle_read_frame loop /// when there are avaliable data messages. void wait(connection_ptr con) { - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); m_read_waiting.push(con); alog().at(log::alevel::DEVEL) << "connection " << con << " is waiting. " << m_read_waiting.size() << log::endl; @@ -367,7 +437,7 @@ protected: /// Message pool callback indicating that there is a free data message /// avaliable. Causes one waiting connection to get restarted. void on_new_message() { - boost::lock_guard lock(get_lock()); + boost::lock_guard lock(m_lock); if (!m_read_waiting.empty()) { connection_ptr next = m_read_waiting.front(); @@ -380,10 +450,6 @@ protected: } } - - boost::recursive_mutex& get_lock() { - return m_lock; - } private: enum state { IDLE = 0, @@ -392,8 +458,13 @@ private: STOPPED = 3 }; - state m_state; + // default settings to pass to connections handler_ptr m_handler; + size_t m_read_threshold; + + // other stuff + state m_state; + std::set m_connections; alogger_type m_alog; elogger_type m_elog; @@ -404,7 +475,7 @@ private: std::queue m_read_waiting; // concurrency support - boost::recursive_mutex m_lock; + mutable boost::recursive_mutex m_lock; }; /// traits class that allows looking up relevant endpoint types by the fully