ignore unsolicited acks

This commit is contained in:
Peter Thorson
2011-12-14 20:17:08 -06:00
parent 10b28259f5
commit 57fb71d61d
2 changed files with 40 additions and 5 deletions

View File

@@ -169,6 +169,21 @@ public:
}
}
std::string hash = msg->get_payload().substr(start+2,32);
std::map<std::string,struct msg>::iterator it = m_msgs.find(hash);
if (it == m_msgs.end()) {
std::cout << "ack for message we didn't send" << std::endl;
continue;
}
struct msg& m(m_msgs[hash]);
m.acked += count;
if (m.acked == m.sent) {
m.time = get_ms(m.time_sent);
}
start = end+1;
end = msg->get_payload().find(",",start);
@@ -179,13 +194,23 @@ public:
// get the last value
if (end-start < 38) {
// error, not the input we were expecting
return;
} else {
count = atol(msg->get_payload().substr(start+36,end-4).c_str());
if (count == 0) {
// error parsing number
return;
}
}
std::string hash = msg->get_payload().substr(start+2,32);
std::map<std::string,struct msg>::iterator it = m_msgs.find(hash);
if (it == m_msgs.end()) {
std::cout << "ack for message we didn't send" << std::endl;
return;
}
struct msg& m(m_msgs[msg->get_payload().substr(start+2,32)]);
m.acked += count;
@@ -194,8 +219,8 @@ public:
m.time = get_ms(m.time_sent);
}
m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
m_messages_acked += count;
//m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
// m_messages_acked += count;
} else {
std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
struct msg& new_msg(m_msgs[hash]);

View File

@@ -41,11 +41,15 @@ typedef websocketpp::endpoint<websocketpp::role::client,websocketpp::socket::pla
typedef plain_endpoint_type::handler_ptr plain_handler_ptr;
typedef plain_endpoint_type::connection_ptr connection_ptr;
class echo_client_handler : public plain_endpoint_type::handler {
class stress_client_handler : public plain_endpoint_type::handler {
public:
typedef echo_client_handler type;
typedef stress_client_handler type;
typedef plain_endpoint_type::connection_ptr connection_ptr;
stress_client_handler(int num_connections) : m_connections(num_connections) {
}
void on_open(connection_ptr connection) {
if (!m_timer) {
m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0)));
@@ -56,6 +60,11 @@ public:
void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) {
m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())]++;
if (m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())] == m_connections) {
send_stats_update(connection);
}
connection->recycle(msg);
}
@@ -102,6 +111,7 @@ private:
m_msg_stats.clear();
}
int m_connections;
std::map<std::string,size_t> m_msg_stats;
boost::shared_ptr<boost::asio::deadline_timer> m_timer;
};
@@ -158,7 +168,7 @@ int main(int argc, char* argv[]) {
}
try {
plain_handler_ptr handler(new echo_client_handler());
plain_handler_ptr handler(new stress_client_handler(num_connections));
plain_endpoint_type endpoint(handler);
endpoint.alog().unset_level(websocketpp::log::alevel::ALL);