diff --git a/websocketpp/connection.hpp b/websocketpp/connection.hpp index a3de8459ee..e2ebcd6ab8 100644 --- a/websocketpp/connection.hpp +++ b/websocketpp/connection.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Peter Thorson. All rights reserved. + * Copyright (c) 2014, Peter Thorson. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -1451,9 +1451,9 @@ private: */ std::vector m_send_buffer; - /// a pointer to hold on to the current message being written to keep it + /// a list of pointers to hold on to the messages being written to keep them /// from going out of scope before the write is complete. - message_ptr m_current_msg; + std::vector m_current_msgs; /// True if there is currently an outstanding transport write /** diff --git a/websocketpp/impl/connection_impl.hpp b/websocketpp/impl/connection_impl.hpp index 41260c6192..a2b689762e 100644 --- a/websocketpp/impl/connection_impl.hpp +++ b/websocketpp/impl/connection_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Peter Thorson. All rights reserved. + * Copyright (c) 2014, Peter Thorson. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -1562,43 +1562,77 @@ void connection::write_frame() { return; } - // Get the next message in the queue. This will return an empty - // message if the queue was empty. - m_current_msg = write_pop(); - - if (!m_current_msg) { - return; + // pull off all the messages that are ready to write. + // stop if we get a message marked terminal + message_ptr next_message = write_pop(); + while (next_message) { + m_current_msgs.push_back(next_message); + if (!next_message->get_terminal()) { + next_message = write_pop(); + } else { + next_message = message_ptr(); + } + } + + if (m_current_msgs.empty()) { + // there was nothing to send + return; + } else { + // At this point we own the next messages to be sent and are + // responsible for holding the write flag until they are + // successfully sent or there is some error + m_write_flag = true; } - - // At this point we own the next message to be sent and are - // responsible for holding the write flag until it is successfully - // sent or there is some error - m_write_flag = true; } - std::string const & header = m_current_msg->get_header(); - std::string const & payload = m_current_msg->get_payload(); - - m_send_buffer.push_back(transport::buffer(header.c_str(),header.size())); - m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size())); + typename std::vector::iterator it; + for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) { + std::string const & header = (*it)->get_header(); + std::string const & payload = (*it)->get_payload(); + m_send_buffer.push_back(transport::buffer(header.c_str(),header.size())); + m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size())); + } + // Print detailed send stats if those log levels are enabled if (m_alog.static_test(log::alevel::frame_header)) { if (m_alog.dynamic_test(log::alevel::frame_header)) { - std::stringstream s; - s << "Dispatching write with " << header.size() - << " header bytes and " << payload.size() - << " payload bytes" << std::endl; - m_alog.write(log::alevel::frame_header,s.str()); - m_alog.write(log::alevel::frame_header,"Header: "+utility::to_hex(header)); - } - } - if (m_alog.static_test(log::alevel::frame_payload)) { - if (m_alog.dynamic_test(log::alevel::frame_payload)) { - m_alog.write(log::alevel::frame_payload,"Payload: "+utility::to_hex(payload)); - } - } + std::stringstream general,header,payload; + + general << "Dispatching write containing " << m_current_msgs.size() + <<" message(s) containing "; + header << "Header Bytes: \n"; + payload << "Payload Bytes: \n"; + + size_t hbytes = 0; + size_t pbytes = 0; + + for (size_t i = 0; i < m_current_msgs.size(); i++) { + hbytes += m_current_msgs[i]->get_header().size(); + pbytes += m_current_msgs[i]->get_payload().size(); + + header << "[" << i << "] (" + << m_current_msgs[i]->get_header().size() << ") " + << utility::to_hex(m_current_msgs[i]->get_header()) << "\n"; + + if (m_alog.static_test(log::alevel::frame_payload)) { + if (m_alog.dynamic_test(log::alevel::frame_payload)) { + payload << "[" << i << "] (" + << m_current_msgs[i]->get_payload().size() << ") " + << utility::to_hex(m_current_msgs[i]->get_payload()) + << "\n"; + } + } + } + + general << hbytes << " header bytes and " << pbytes << " payload bytes"; + + m_alog.write(log::alevel::frame_header,general.str()); + m_alog.write(log::alevel::frame_header,header.str()); + m_alog.write(log::alevel::frame_payload,payload.str()); + } + } transport_con_type::async_write( m_send_buffer, @@ -1613,10 +1647,11 @@ void connection::handle_write_frame(lib::error_code const & ec) m_alog.write(log::alevel::devel,"connection handle_write_frame"); } - bool terminal = m_current_msg->get_terminal(); + bool terminal = m_current_msgs.back()->get_terminal(); m_send_buffer.clear(); - m_current_msg.reset(); + m_current_msgs.clear(); + // TODO: recycle instead of deleting if (ec) { log_err(log::elevel::fatal,"handle_write_frame",ec);