mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
Merge branch 'experimental' of github.com:zaphoyd/websocketpp into experimental
This commit is contained in:
@@ -14,10 +14,10 @@ prgs = []
|
||||
|
||||
# if a C++11 environment is avaliable build using that, otherwise use boost
|
||||
if env_cpp11.has_key('WSPP_CPP11_ENABLED'):
|
||||
ALL_LIBS = boostlibs(['system','thread'],env_cpp11) + [platform_libs] + [polyfill_libs]
|
||||
ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs]
|
||||
prgs += env_cpp11.Program('broadcast_server', ["broadcast_server.cpp"], LIBS = ALL_LIBS)
|
||||
else:
|
||||
ALL_LIBS = boostlibs(['system','regex'],env) + [platform_libs] + [polyfill_libs]
|
||||
ALL_LIBS = boostlibs(['system','thread','regex'],env) + [platform_libs] + [polyfill_libs]
|
||||
prgs += env.Program('broadcast_server', ["broadcast_server.cpp"], LIBS = ALL_LIBS)
|
||||
|
||||
Return('prgs')
|
||||
|
||||
@@ -4,9 +4,10 @@
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
/*#include <boost/thread.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition_variable.hpp>
|
||||
#include <boost/thread/condition_variable.hpp>*/
|
||||
#include <websocketpp/common/thread.hpp>
|
||||
|
||||
typedef websocketpp::server<websocketpp::config::asio> server;
|
||||
|
||||
@@ -15,6 +16,11 @@ using websocketpp::lib::placeholders::_1;
|
||||
using websocketpp::lib::placeholders::_2;
|
||||
using websocketpp::lib::bind;
|
||||
|
||||
using websocketpp::lib::thread;
|
||||
using websocketpp::lib::mutex;
|
||||
using websocketpp::lib::unique_lock;
|
||||
using websocketpp::lib::condition_variable;
|
||||
|
||||
/* on_open insert connection_hdl into channel
|
||||
* on_close remove connection_hdl from channel
|
||||
* on_message queue send to all channels
|
||||
@@ -67,7 +73,7 @@ public:
|
||||
}
|
||||
|
||||
void on_open(connection_hdl hdl) {
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
unique_lock<mutex> lock(m_action_lock);
|
||||
//std::cout << "on_open" << std::endl;
|
||||
m_actions.push(action(SUBSCRIBE,hdl));
|
||||
lock.unlock();
|
||||
@@ -75,7 +81,7 @@ public:
|
||||
}
|
||||
|
||||
void on_close(connection_hdl hdl) {
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
unique_lock<mutex> lock(m_action_lock);
|
||||
//std::cout << "on_close" << std::endl;
|
||||
m_actions.push(action(UNSUBSCRIBE,hdl));
|
||||
lock.unlock();
|
||||
@@ -84,7 +90,7 @@ public:
|
||||
|
||||
void on_message(connection_hdl hdl, server::message_ptr msg) {
|
||||
// queue message up for sending by processing thread
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
unique_lock<mutex> lock(m_action_lock);
|
||||
//std::cout << "on_message" << std::endl;
|
||||
m_actions.push(action(MESSAGE,msg));
|
||||
lock.unlock();
|
||||
@@ -93,7 +99,7 @@ public:
|
||||
|
||||
void process_messages() {
|
||||
while(1) {
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
unique_lock<mutex> lock(m_action_lock);
|
||||
|
||||
while(m_actions.empty()) {
|
||||
m_action_cond.wait(lock);
|
||||
@@ -105,13 +111,13 @@ public:
|
||||
lock.unlock();
|
||||
|
||||
if (a.type == SUBSCRIBE) {
|
||||
boost::unique_lock<boost::mutex> lock(m_connection_lock);
|
||||
unique_lock<mutex> lock(m_connection_lock);
|
||||
m_connections.insert(a.hdl);
|
||||
} else if (a.type == UNSUBSCRIBE) {
|
||||
boost::unique_lock<boost::mutex> lock(m_connection_lock);
|
||||
unique_lock<mutex> lock(m_connection_lock);
|
||||
m_connections.erase(a.hdl);
|
||||
} else if (a.type == MESSAGE) {
|
||||
boost::unique_lock<boost::mutex> lock(m_connection_lock);
|
||||
unique_lock<mutex> lock(m_connection_lock);
|
||||
|
||||
con_list::iterator it;
|
||||
for (it = m_connections.begin(); it != m_connections.end(); ++it) {
|
||||
@@ -129,17 +135,24 @@ private:
|
||||
con_list m_connections;
|
||||
std::queue<action> m_actions;
|
||||
|
||||
boost::mutex m_action_lock;
|
||||
boost::mutex m_connection_lock;
|
||||
boost::condition_variable m_action_cond;
|
||||
mutex m_action_lock;
|
||||
mutex m_connection_lock;
|
||||
condition_variable m_action_cond;
|
||||
};
|
||||
|
||||
int main() {
|
||||
try {
|
||||
broadcast_server server;
|
||||
|
||||
// Start a thread to run the processing loop
|
||||
boost::thread(bind(&broadcast_server::process_messages,&server));
|
||||
thread t(bind(&broadcast_server::process_messages,&server));
|
||||
|
||||
// Run the asio loop with the main thread
|
||||
server.run(9002);
|
||||
|
||||
t.join();
|
||||
|
||||
} catch (std::exception & e) {
|
||||
std::cout << e.what() << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
/**
|
||||
* This example is presently used as a scratch space. It may or may not be broken
|
||||
* at any given time.
|
||||
*/
|
||||
|
||||
#include <websocketpp/config/asio_client.hpp>
|
||||
|
||||
#include <websocketpp/client.hpp>
|
||||
@@ -24,7 +29,6 @@ public:
|
||||
typedef std::chrono::duration<int,std::micro> dur_type;
|
||||
|
||||
perftest () {
|
||||
// We expect there to be a lot of errors, so suppress them
|
||||
m_endpoint.set_access_channels(websocketpp::log::alevel::all);
|
||||
m_endpoint.set_error_channels(websocketpp::log::elevel::all);
|
||||
|
||||
|
||||
@@ -100,7 +100,14 @@ void validate_func(server* s, websocketpp::connection_hdl hdl, message_ptr msg)
|
||||
s->send(hdl, msg->get_payload(), msg->get_opcode());
|
||||
}
|
||||
|
||||
|
||||
void http_func(server* s, websocketpp::connection_hdl hdl) {
|
||||
server::connection_ptr con = s->get_con_from_hdl(hdl);
|
||||
|
||||
std::string res = con->get_resource();
|
||||
|
||||
con->set_body(res);
|
||||
con->set_status(websocketpp::http::status_code::ok);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( connection_extensions ) {
|
||||
connection_setup env(true);
|
||||
@@ -123,6 +130,18 @@ BOOST_AUTO_TEST_CASE( basic_websocket_request ) {
|
||||
BOOST_CHECK(run_server_test(s,input) == output);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( http_request ) {
|
||||
std::string input = "GET /foo/bar HTTP/1.1\r\nHost: www.example.com\r\nOrigin: http://www.example.com\r\n\r\n";
|
||||
std::string output = "HTTP/1.1 200 OK\r\nContent-Length: 8\r\nServer: ";
|
||||
output+=websocketpp::user_agent;
|
||||
output+="\r\n\r\n/foo/bar";
|
||||
|
||||
server s;
|
||||
s.set_http_handler(bind(&http_func,&s,::_1));
|
||||
|
||||
BOOST_CHECK_EQUAL(run_server_test(s,input), output);
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
BOOST_AUTO_TEST_CASE( user_reject_origin ) {
|
||||
|
||||
@@ -33,24 +33,7 @@ void echo_func(server* s, websocketpp::connection_hdl hdl, message_ptr msg) {
|
||||
|
||||
std::string run_server_test(std::string input) {
|
||||
server test_server;
|
||||
server::connection_ptr con;
|
||||
|
||||
test_server.set_message_handler(bind(&echo_func,&test_server,::_1,::_2));
|
||||
|
||||
std::stringstream output;
|
||||
|
||||
test_server.register_ostream(&output);
|
||||
|
||||
con = test_server.get_connection();
|
||||
|
||||
con->start();
|
||||
|
||||
std::stringstream channel;
|
||||
|
||||
channel << input;
|
||||
channel >> *con;
|
||||
|
||||
return output.str();
|
||||
return run_server_test(test_server,input);
|
||||
}
|
||||
|
||||
std::string run_server_test(server& s, std::string input) {
|
||||
|
||||
@@ -50,10 +50,14 @@ namespace lib {
|
||||
using std::mutex;
|
||||
using std::lock_guard;
|
||||
using std::thread;
|
||||
using std::unique_lock;
|
||||
using std::condition_variable;
|
||||
#else
|
||||
using boost::mutex;
|
||||
using boost::lock_guard;
|
||||
using boost::thread;
|
||||
using boost::unique_lock;
|
||||
using boost::condition_variable;
|
||||
#endif
|
||||
|
||||
} // namespace lib
|
||||
|
||||
@@ -859,7 +859,11 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
|
||||
}
|
||||
|
||||
if (m_processor->ready()) {
|
||||
//m_alog.write(log::alevel::devel,"consume ended in ready");
|
||||
if (m_alog.static_test(log::alevel::devel)) {
|
||||
std::stringstream s;
|
||||
s << "Complete frame received. Dispatching";
|
||||
m_alog.write(log::alevel::devel,s.str());
|
||||
}
|
||||
|
||||
message_ptr msg = m_processor->get_message();
|
||||
|
||||
@@ -948,7 +952,17 @@ bool connection<config>::process_handshake_request() {
|
||||
if (!processor::is_websocket_handshake(m_request)) {
|
||||
// this is not a websocket handshake. Process as plain HTTP
|
||||
m_alog.write(log::alevel::devel,"HTTP REQUEST");
|
||||
|
||||
|
||||
// extract URI from request
|
||||
try {
|
||||
m_uri = processor::get_uri_from_host(m_request,(transport_con_type::is_secure() ? "https" : "http"));
|
||||
} catch (const websocketpp::uri_exception& e) {
|
||||
m_alog.write(log::alevel::devel,
|
||||
std::string("BAD REQUEST: uri failed to parse: ")+e.what());
|
||||
m_response.set_status(http::status_code::bad_request);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (m_http_handler) {
|
||||
m_http_handler(m_connection_hdl);
|
||||
}
|
||||
|
||||
@@ -291,27 +291,7 @@ public:
|
||||
}
|
||||
|
||||
uri_ptr get_uri(const request_type& request) const {
|
||||
std::string h = request.get_header("Host");
|
||||
|
||||
size_t last_colon = h.rfind(":");
|
||||
size_t last_sbrace = h.rfind("]");
|
||||
|
||||
// no : = hostname with no port
|
||||
// last : before ] = ipv6 literal with no port
|
||||
// : with no ] = hostname with port
|
||||
// : after ] = ipv6 literal with port
|
||||
if (last_colon == std::string::npos ||
|
||||
(last_sbrace != std::string::npos && last_sbrace > last_colon))
|
||||
{
|
||||
return uri_ptr(new uri(base::m_secure,h,request.get_uri()));
|
||||
} else {
|
||||
return uri_ptr(new uri(base::m_secure,
|
||||
h.substr(0,last_colon),
|
||||
h.substr(last_colon+1),
|
||||
request.get_uri()));
|
||||
}
|
||||
|
||||
// TODO: check if get_uri is a full uri
|
||||
return get_uri_from_host(request,(base::m_secure ? "wss" : "ws"));
|
||||
}
|
||||
|
||||
/// Process new websocket connection bytes
|
||||
|
||||
@@ -106,6 +106,29 @@ int get_websocket_version(request_type& r) {
|
||||
return version;
|
||||
}
|
||||
|
||||
template <typename request_type>
|
||||
uri_ptr get_uri_from_host(request_type & request, std::string scheme) {
|
||||
std::string h = request.get_header("Host");
|
||||
|
||||
size_t last_colon = h.rfind(":");
|
||||
size_t last_sbrace = h.rfind("]");
|
||||
|
||||
// no : = hostname with no port
|
||||
// last : before ] = ipv6 literal with no port
|
||||
// : with no ] = hostname with port
|
||||
// : after ] = ipv6 literal with port
|
||||
if (last_colon == std::string::npos ||
|
||||
(last_sbrace != std::string::npos && last_sbrace > last_colon))
|
||||
{
|
||||
return uri_ptr(new uri(scheme, h, request.get_uri()));
|
||||
} else {
|
||||
return uri_ptr(new uri(scheme,
|
||||
h.substr(0,last_colon),
|
||||
h.substr(last_colon+1),
|
||||
request.get_uri()));
|
||||
}
|
||||
}
|
||||
|
||||
// All other functions are WebSocket version dependent. processor is a base
|
||||
// class for version dependent processing functions.
|
||||
|
||||
|
||||
@@ -63,10 +63,11 @@ public:
|
||||
explicit uri(const std::string& uri) {
|
||||
// TODO: should this split resource into path/query?
|
||||
lib::cmatch matches;
|
||||
const lib::regex expression("(http|ws|wss)://([^/:\\[]+|\\[[0-9a-fA-F:.]+\\])(:\\d{1,5})?(/[^#]*)?");
|
||||
const lib::regex expression("(http|https|ws|wss)://([^/:\\[]+|\\[[0-9a-fA-F:.]+\\])(:\\d{1,5})?(/[^#]*)?");
|
||||
|
||||
if (lib::regex_match(uri.c_str(), matches, expression)) {
|
||||
m_secure = (matches[1] == "wss");
|
||||
m_scheme = matches[1];
|
||||
m_secure = (m_scheme == "wss" || m_scheme == "https");
|
||||
m_host = matches[2];
|
||||
|
||||
// strip brackets from IPv6 literal URIs
|
||||
@@ -169,23 +170,48 @@ public:
|
||||
}*/
|
||||
|
||||
uri(bool secure, const std::string& host, uint16_t port, const std::string& resource)
|
||||
: m_host(host)
|
||||
: m_scheme(secure ? "wss" : "ws")
|
||||
, m_host(host)
|
||||
, m_resource(resource == "" ? "/" : resource)
|
||||
, m_port(port)
|
||||
, m_secure(secure) {}
|
||||
|
||||
uri(bool secure, const std::string& host, const std::string& resource)
|
||||
: m_host(host)
|
||||
: m_scheme(secure ? "wss" : "ws")
|
||||
, m_host(host)
|
||||
, m_resource(resource == "" ? "/" : resource)
|
||||
, m_port(secure ? uri_default_secure_port : uri_default_port)
|
||||
, m_secure(secure) {}
|
||||
|
||||
uri(bool secure, const std::string& host, const std::string& port, const std::string& resource)
|
||||
: m_host(host)
|
||||
: m_scheme(secure ? "wss" : "ws")
|
||||
, m_host(host)
|
||||
, m_resource(resource == "" ? "/" : resource)
|
||||
, m_port(get_port_from_string(port))
|
||||
, m_secure(secure) {}
|
||||
|
||||
|
||||
uri(std::string scheme, const std::string& host, uint16_t port, const std::string& resource)
|
||||
: m_scheme(scheme)
|
||||
, m_host(host)
|
||||
, m_resource(resource == "" ? "/" : resource)
|
||||
, m_port(port)
|
||||
, m_secure(scheme == "wss" || scheme == "https") {}
|
||||
|
||||
uri(std::string scheme, const std::string& host, const std::string& resource)
|
||||
: m_scheme(scheme)
|
||||
, m_host(host)
|
||||
, m_resource(resource == "" ? "/" : resource)
|
||||
, m_port((scheme == "wss" || scheme == "https") ? uri_default_secure_port : uri_default_port)
|
||||
, m_secure(scheme == "wss" || scheme == "https") {}
|
||||
|
||||
uri(std::string scheme, const std::string& host, const std::string& port, const std::string& resource)
|
||||
: m_scheme(scheme)
|
||||
, m_host(host)
|
||||
, m_resource(resource == "" ? "/" : resource)
|
||||
, m_port(get_port_from_string(port))
|
||||
, m_secure(scheme == "wss" || scheme == "https") {}
|
||||
|
||||
bool get_secure() const {
|
||||
return m_secure;
|
||||
}
|
||||
@@ -227,7 +253,7 @@ public:
|
||||
std::string str() const {
|
||||
std::stringstream s;
|
||||
|
||||
s << "ws" << (m_secure ? "s" : "") << "://" << m_host;
|
||||
s << m_scheme << "://" << m_host;
|
||||
|
||||
if (m_port != (m_secure ? uri_default_secure_port : uri_default_port)) {
|
||||
s << ":" << m_port;
|
||||
@@ -274,8 +300,7 @@ private:
|
||||
return static_cast<uint16_t>(t_port);
|
||||
}
|
||||
|
||||
|
||||
|
||||
std::string m_scheme;
|
||||
std::string m_host;
|
||||
std::string m_resource;
|
||||
uint16_t m_port;
|
||||
|
||||
Reference in New Issue
Block a user