mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
cleans up concurrent_handler adds num_workers to welcome
This commit is contained in:
@@ -55,16 +55,17 @@ public:
|
||||
|
||||
typedef boost::shared_ptr<writer> writer_ptr;
|
||||
|
||||
template <typename T>
|
||||
template <typename endpoint_type>
|
||||
class ws_writer : public writer {
|
||||
public:
|
||||
ws_writer(typename T::handler::connection_ptr con) : m_con(con) {}
|
||||
ws_writer(typename endpoint_type::handler::connection_ptr con)
|
||||
: m_con(con) {}
|
||||
|
||||
void write(std::string msg) {
|
||||
m_con->send(msg);
|
||||
}
|
||||
private:
|
||||
typename T::handler::connection_ptr m_con;
|
||||
typename endpoint_type::handler::connection_ptr m_con;
|
||||
};
|
||||
|
||||
// A request encapsulates all of the information necesssary to perform a request
|
||||
@@ -114,71 +115,56 @@ private:
|
||||
boost::condition_variable m_cond;
|
||||
};
|
||||
|
||||
// The WebSocket++ handler in this case reads numbers from connections and packs
|
||||
// connection pointer + number into a request struct and passes it off to the
|
||||
// coordinator.
|
||||
class concurrent_server_handler : public server::handler {
|
||||
/// Handler that reads requests off the wire and dispatches them to a request queue
|
||||
template <typename endpoint_type>
|
||||
class concurrent_handler : public endpoint_type::handler {
|
||||
public:
|
||||
concurrent_server_handler(request_coordinator& c,
|
||||
std::string ident,
|
||||
std::string ua)
|
||||
: m_coordinator(c), m_ident(ident), m_ua(ua) {}
|
||||
typedef typename endpoint_type::handler::connection_ptr connection_ptr;
|
||||
typedef typename endpoint_type::handler::message_ptr message_ptr;
|
||||
|
||||
concurrent_handler(request_coordinator& c,
|
||||
std::string ident,
|
||||
std::string ua,
|
||||
unsigned int num_workers)
|
||||
: m_coordinator(c),
|
||||
m_ident(ident),
|
||||
m_ua(ua),
|
||||
m_num_workers(num_workers) {}
|
||||
|
||||
void on_open(connection_ptr con) {
|
||||
con->send("{\"type\":\"test_welcome\",\"version\":\""+m_ua+"\",\"ident\":\""+m_ident+"\"}");
|
||||
std::stringstream o;
|
||||
|
||||
o << "{"
|
||||
<< "\"type\":\"test_welcome\","
|
||||
<< "\"version\":\"" << m_ua << "\","
|
||||
<< "\"ident\":\"" << m_ident << "\","
|
||||
<< "\"num_workers\":" << m_num_workers
|
||||
<< "}";
|
||||
|
||||
con->send(o.str());
|
||||
}
|
||||
|
||||
void on_message(connection_ptr con,message_ptr msg) {
|
||||
void on_message(connection_ptr con, message_ptr msg) {
|
||||
request r;
|
||||
r.type = PERF_TEST;
|
||||
r.writer = writer_ptr(new ws_writer<server>(con));
|
||||
r.writer = writer_ptr(new ws_writer<endpoint_type>(con));
|
||||
r.req = msg->get_payload();
|
||||
m_coordinator.add_request(r);
|
||||
}
|
||||
|
||||
void on_fail(connection_ptr con) {
|
||||
std::cout << "A connection from a command client failed." << std::endl;
|
||||
std::cout << "A command connection failed." << std::endl;
|
||||
}
|
||||
private:
|
||||
request_coordinator& m_coordinator;
|
||||
std::string m_ident;
|
||||
std::string m_ua;
|
||||
};
|
||||
|
||||
// The WebSocket++ handler in this case reads numbers from connections and packs
|
||||
// connection pointer + number into a request struct and passes it off to the
|
||||
// coordinator.
|
||||
class concurrent_client_handler : public client::handler {
|
||||
public:
|
||||
concurrent_client_handler(request_coordinator& c,
|
||||
std::string ident,
|
||||
std::string ua)
|
||||
: m_coordinator(c), m_ident(ident), m_ua(ua) {}
|
||||
|
||||
void on_open(connection_ptr con) {
|
||||
con->send("{\"type\":\"test_welcome\",\"version\":\""+m_ua+"\",\"ident\":\""+m_ident+"\"}");
|
||||
}
|
||||
|
||||
void on_message(connection_ptr con,message_ptr msg) {
|
||||
request r;
|
||||
r.type = PERF_TEST;
|
||||
r.writer = writer_ptr(new ws_writer<client>(con));
|
||||
r.req = msg->get_payload();
|
||||
m_coordinator.add_request(r);
|
||||
}
|
||||
|
||||
void on_fail(connection_ptr con) {
|
||||
std::cout << "The connection to the commanding server failed." << std::endl;
|
||||
}
|
||||
private:
|
||||
request_coordinator& m_coordinator;
|
||||
std::string m_ident;
|
||||
std::string m_ua;
|
||||
request_coordinator& m_coordinator;
|
||||
std::string m_ident;
|
||||
std::string m_ua;
|
||||
unsigned int m_num_workers;
|
||||
};
|
||||
|
||||
// process_requests is the body function for a processing thread. It loops
|
||||
// forever reading requests, processing them serially, then reading another
|
||||
// request.
|
||||
// request. A request with type END_WORKER will stop the processing loop.
|
||||
void process_requests(request_coordinator* coordinator);
|
||||
|
||||
} // namespace wsperf
|
||||
|
||||
@@ -67,7 +67,14 @@ int start_server(po::variables_map& vm) {
|
||||
std::cout << "bad thread number" << std::endl;
|
||||
return 1;
|
||||
} else {
|
||||
h = server::handler::ptr(new wsperf::concurrent_server_handler(rc,ident,user_agent));
|
||||
h = server::handler::ptr(
|
||||
new wsperf::concurrent_handler<server>(
|
||||
rc,
|
||||
ident,
|
||||
user_agent,
|
||||
num_threads
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
server endpoint(h);
|
||||
@@ -109,7 +116,14 @@ int start_client(po::variables_map& vm) {
|
||||
std::cout << "bad thread number" << std::endl;
|
||||
return 1;
|
||||
} else {
|
||||
h = client::handler::ptr(new wsperf::concurrent_client_handler(rc,ident,user_agent));
|
||||
h = client::handler::ptr(
|
||||
new wsperf::concurrent_handler<client>(
|
||||
rc,
|
||||
ident,
|
||||
user_agent,
|
||||
num_threads
|
||||
)
|
||||
);;
|
||||
}
|
||||
|
||||
client endpoint(h);
|
||||
|
||||
Reference in New Issue
Block a user