diff --git a/src/ripple/rpc/Context.h b/src/ripple/rpc/Context.h index 0d4feeb15..42cafa868 100644 --- a/src/ripple/rpc/Context.h +++ b/src/ripple/rpc/Context.h @@ -42,20 +42,10 @@ struct Context LedgerMaster& ledgerMaster; Role role; InfoSub::pointer infoSub; - Suspend suspend; - Callback yield; + JobQueueSuspender suspend; NodeStore::ScopedMetrics metrics; }; -inline -void suspend(Context const& context, Continuation const& continuation) -{ - if (context.suspend) - context.suspend(continuation); - else - continuation(doNothingCallback); -} - } // RPC } // ripple diff --git a/src/ripple/rpc/Coroutine.h b/src/ripple/rpc/Coroutine.h index 81964f759..1a4d47f17 100644 --- a/src/ripple/rpc/Coroutine.h +++ b/src/ripple/rpc/Coroutine.h @@ -25,27 +25,15 @@ namespace ripple { namespace RPC { -/** SuspendCallback: a function that a Coroutine gives to the coroutine - scheduler so that it gets a callback with a Suspend when it runs. - */ -using SuspendCallback = std::function ; +/** Coroutine is a function that is given to the coroutine scheduler which + later gets called with a Suspend. A Coroutine can't be empty. */ +using Coroutine = std::function ; -/** Runs a function that takes a SuspendCallback as a coroutine. */ -class Coroutine -{ -public: - explicit Coroutine (SuspendCallback const&); - ~Coroutine(); +/** Run as a coroutine. */ +void runOnCoroutine(Coroutine const&); - /** Run the coroutine and guarantee completion. */ - void run (); - -private: - struct Impl; - std::shared_ptr impl_; - - Coroutine (std::shared_ptr const&); -}; +/** Run as coroutine if UseCoroutines::yes, otherwise run immediately. */ +void runOnCoroutine(UseCoroutines, Coroutine const&); } // RPC } // ripple diff --git a/src/ripple/rpc/README.md b/src/ripple/rpc/README.md index 19e625dd3..cece30a3b 100644 --- a/src/ripple/rpc/README.md +++ b/src/ripple/rpc/README.md @@ -20,7 +20,7 @@ namespace: using Callback = std::function ; using Continuation = std::function ; using Suspend = std::function ; - using SuspendCallback = std::function ; + using Coroutine = std::function ; A `Callback` is a generic 0-argument function. A given `Callback` might or might not block. Unless otherwise advised, do not hold locks or any resource that @@ -36,7 +36,7 @@ A `Suspend` is a function belonging to a `Coroutine`. A `Suspend` runs a `Continuation`, passing it a `Callback` that continues execution of the `Coroutine`. -And finally, a `SuspendCallback` is a `std::function` which is given a +And finally, a `Coroutine` is a `std::function` which is given a `Suspend`. This is what the RPC handler gives to the coroutine manager, expecting to get called back with a `Suspend` and to be able to start execution. @@ -47,10 +47,10 @@ straight-forward. 1. The instance of `ServerHandler` receives an RPC request. -2. It creates a `SuspendCallback` and gives it to the coroutine manager. +2. It creates a `Coroutine` and gives it to the coroutine manager. 3. The coroutine manager creates a `Coroutine`, starts it up, and then calls - the `SuspendCallback` with a `Suspend`. + the `Coroutine` with a `Suspend`. 4. Now the RPC response starts to be calculated. diff --git a/src/ripple/rpc/Yield.h b/src/ripple/rpc/Yield.h index 1c700e297..7c61691ba 100644 --- a/src/ripple/rpc/Yield.h +++ b/src/ripple/rpc/Yield.h @@ -28,6 +28,7 @@ namespace ripple { +class BasicConfig; class JobQueue; class Section; @@ -37,19 +38,21 @@ namespace RPC { the RPC yield mechanism works. */ -/** Callback: do something and eventually return. */ +/** Callback: do something and eventually return. Can't be empty. */ using Callback = std::function ; -/** A non-empty callback that does nothing. */ -static -Callback const doNothingCallback ([] () {}); - -/** Continuation: do something, guarantee to eventually call Callback. */ +/** Continuation: do something, guarantee to eventually call Callback. + Can't be empty. */ using Continuation = std::function ; -/** Suspend: suspend execution, pending completion of a Continuation. */ +/** Suspend: suspend execution, pending completion of a Continuation. + Can't be empty. */ using Suspend = std::function ; +/** A non-empty Suspend that immediately calls its callback. */ +extern +Suspend const dontSuspend; + /** Wrap an Output so it yields after approximately `chunkSize` bytes. chunkedYieldingOutput() only yields after a call to output(), so there might @@ -75,11 +78,12 @@ private: Callback const yield_; }; +enum class UseCoroutines {no, yes}; + /** When do we yield when performing a ledger computation? */ struct YieldStrategy { enum class Streaming {no, yes}; - enum class UseCoroutines {no, yes}; /** Is the data streamed, or generated monolithically? */ Streaming streaming = Streaming::no; @@ -88,10 +92,6 @@ struct YieldStrategy never yield. */ UseCoroutines useCoroutines = UseCoroutines::no; - /** How many bytes do we emit before yielding? 0 means "never yield due to - number of bytes sent". */ - std::size_t byteYieldCount = 0; - /** How many accounts do we process before yielding? 0 means "never yield due to number of accounts processed." */ std::size_t accountYieldCount = 0; @@ -101,23 +101,32 @@ struct YieldStrategy std::size_t transactionYieldCount = 0; }; -/** Create a yield strategy from a configuration Section. */ -YieldStrategy makeYieldStrategy (Section const&); +/** Does a BasicConfig require the use of coroutines? */ +UseCoroutines useCoroutines(BasicConfig const&); -/** Return a continuation that runs a Callback on a Job Queue with a given - name and JobType. */ -Continuation callbackOnJobQueue ( - JobQueue&, std::string const& jobName, JobType); +/** Create a yield strategy from a BasicConfig. */ +YieldStrategy makeYieldStrategy(BasicConfig const&); -/** Return a Callback that will suspend and then run a continuation. */ -inline -Callback suspendForContinuation ( - Suspend const& suspend, Continuation const& continuation) +/** JobQueueSuspender is a suspend, with a yield that reschedules the job + on the job queue. */ +struct JobQueueSuspender { - return suspend - ? Callback ([=] () { suspend (continuation); }) - : Callback ([=] () { continuation (doNothingCallback); }); -} + /** Possibly suspend current execution. */ + Suspend const suspend; + + /** Possibly yield and restart on the job queue. */ + Callback const yield; + + /** Create a JobQueueSuspender where yield does nothing and the suspend + immediately executes the continuation. */ + JobQueueSuspender(); + + /** Create a JobQueueSuspender with a Suspend. + + When yield is called, it reschedules the current job on the JobQueue + with the given jobName. */ + JobQueueSuspender(Suspend const&, std::string const& jobName); +}; } // RPC } // ripple diff --git a/src/ripple/rpc/handlers/LedgerHandler.h b/src/ripple/rpc/handlers/LedgerHandler.h index 3561270a6..a282d7e82 100644 --- a/src/ripple/rpc/handlers/LedgerHandler.h +++ b/src/ripple/rpc/handlers/LedgerHandler.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -86,15 +87,15 @@ private: template void LedgerHandler::writeResult (Object& value) { + auto& yield = context_.suspend.yield; if (ledger_) { Json::copyFrom (value, result_); - addJson (value, {*ledger_, options_, context_.yield}); + addJson (value, {*ledger_, options_, yield}); } else { auto& master = getApp().getLedgerMaster (); - auto& yield = context_.yield; { auto&& closed = Json::addObject (value, jss::closed); addJson (closed, {*master.getClosedLedger(), 0, yield}); diff --git a/src/ripple/rpc/handlers/RipplePathFind.cpp b/src/ripple/rpc/handlers/RipplePathFind.cpp index 4cda192eb..edb25296b 100644 --- a/src/ripple/rpc/handlers/RipplePathFind.cpp +++ b/src/ripple/rpc/handlers/RipplePathFind.cpp @@ -82,8 +82,7 @@ Json::Value doRipplePathFind (RPC::Context& context) Json::Value jvResult; - if (! context.suspend || - getConfig().RUN_STANDALONE || + if (getConfig().RUN_STANDALONE || context.params.isMember(jss::ledger) || context.params.isMember(jss::ledger_index) || context.params.isMember(jss::ledger_hash)) @@ -105,15 +104,13 @@ Json::Value doRipplePathFind (RPC::Context& context) lpLedger = context.ledgerMaster.getClosedLedger(); PathRequest::pointer request; - suspend(context, - [&request, &context, &jvResult, &lpLedger] - (RPC::Callback const& callback) + context.suspend.suspend( + [&request, &context, &jvResult, &lpLedger] + (RPC::Callback const& callback) { jvResult = getApp().getPathRequests().makeLegacyPathRequest ( request, callback, lpLedger, context.params); - assert(callback); - if (! request && callback) - callback(); + callback(); }); if (request) diff --git a/src/ripple/rpc/impl/Coroutine.cpp b/src/ripple/rpc/impl/Coroutine.cpp index f7d0b1208..a23a84a32 100644 --- a/src/ripple/rpc/impl/Coroutine.cpp +++ b/src/ripple/rpc/impl/Coroutine.cpp @@ -24,60 +24,56 @@ namespace ripple { namespace RPC { +namespace { using CoroutineType = Continuation; -using CoroutinePull = boost::coroutines::coroutine ::pull_type; -using CoroutinePush = boost::coroutines::coroutine ::push_type; +using BoostCoroutine = boost::coroutines::asymmetric_coroutine; +using Pull = BoostCoroutine::pull_type; +using Push = BoostCoroutine::push_type; -struct Coroutine::Impl : public std::enable_shared_from_this +void runOnCoroutineImpl(std::shared_ptr pull) { - Impl (CoroutinePull&& pull_) : pull (std::move (pull_)) + while (*pull) { - } + (*pull)(); - CoroutinePull pull; + if (! *pull) + return; - void run() - { - while (pull) + if (auto continuation = pull->get()) { - pull(); - - if (! pull) - return; - - if (auto continuation = pull.get()) - { - auto that = shared_from_this(); - continuation ([that] () { that->run(); }); - return; - } + continuation ([pull] () { runOnCoroutineImpl(pull); }); + return; } } -}; - -Coroutine::Coroutine (SuspendCallback const& suspendCallback) -{ - CoroutinePull pull ([suspendCallback] (CoroutinePush& push) - { - Suspend suspend = [&push] (CoroutineType const& cbc) { - push (cbc); - }; - suspend ({}); - suspendCallback (suspend); - }); - - impl_ = std::make_shared (std::move (pull)); } -Coroutine::~Coroutine() = default; +} // namespace -void Coroutine::run() +void runOnCoroutine(Coroutine const& coroutine) { - assert (impl_); - if (impl_) - impl_->run(); - impl_.reset(); + auto pullFunction = [coroutine] (Push& push) { + Suspend suspend = [&push] (CoroutineType const& cbc) { + if (push) + push (cbc); + }; + + // Run once doing nothing, to get the other side started. + suspend([] (Callback const& callback) { callback(); }); + + // Now run the coroutine. + coroutine(suspend); + }; + + runOnCoroutineImpl(std::make_shared(pullFunction)); +} + +void runOnCoroutine(UseCoroutines useCoroutines, Coroutine const& coroutine) +{ + if (useCoroutines == UseCoroutines::yes) + runOnCoroutine(coroutine); + else + coroutine(dontSuspend); } } // RPC diff --git a/src/ripple/rpc/impl/Yield.cpp b/src/ripple/rpc/impl/Yield.cpp index 4f4d3a289..7294a354b 100644 --- a/src/ripple/rpc/impl/Yield.cpp +++ b/src/ripple/rpc/impl/Yield.cpp @@ -18,6 +18,7 @@ //============================================================================== #include +#include #include #include #include @@ -25,6 +26,34 @@ namespace ripple { namespace RPC { +static +UseCoroutines defaultUseCoroutines = UseCoroutines::no; + +Suspend const dontSuspend = [] (Continuation const& continuation) +{ + continuation([] () {}); +}; + +namespace { + +void runOnJobQueue(std::string const& name, Callback const& callback) +{ + boost::function cb([callback] (Job&) { callback(); }); + getApp().getJobQueue().addJob(jtCLIENT, name, cb); +}; + +Callback suspendForJobQueue(Suspend const& suspend, std::string const& jobName) +{ + assert(suspend); + return Callback( [suspend, jobName] () { + suspend([jobName] (Callback const& callback) { + runOnJobQueue(jobName, callback); + }); + }); +} + +} // namespace + Json::Output chunkedYieldingOutput ( Json::Output const& output, Callback const& yield, std::size_t chunkSize) { @@ -44,7 +73,6 @@ Json::Output chunkedYieldingOutput ( }; } - CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield) : yieldCount_ (yieldCount), yield_ (yield) { @@ -52,10 +80,7 @@ CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield) void CountedYield::yield() { - if (!yield_) - return; - - if (yieldCount_) + if (yieldCount_ && yield_) { if (++count_ >= yieldCount_) { @@ -65,28 +90,38 @@ void CountedYield::yield() } } -YieldStrategy makeYieldStrategy (Section const& s) +UseCoroutines useCoroutines(BasicConfig const& config) { + if (auto use = config["section"].get("use_coroutines")) + return *use ? UseCoroutines::yes : UseCoroutines::no; + return defaultUseCoroutines; +} + +YieldStrategy makeYieldStrategy (BasicConfig const& config) +{ + auto s = config["section"]; YieldStrategy ys; ys.streaming = get (s, "streaming") ? YieldStrategy::Streaming::yes : YieldStrategy::Streaming::no; - ys.useCoroutines = get (s, "use_coroutines") ? - YieldStrategy::UseCoroutines::yes : - YieldStrategy::UseCoroutines::no; - ys.byteYieldCount = get (s, "byte_yield_count"); + ys.useCoroutines = useCoroutines(config); ys.accountYieldCount = get (s, "account_yield_count"); ys.transactionYieldCount = get (s, "transaction_yield_count"); return ys; } -Continuation callbackOnJobQueue ( - JobQueue& jobQueue, std::string const& name, JobType jobType) +JobQueueSuspender::JobQueueSuspender( + Suspend const& susp, std::string const& jobName) + : suspend(susp ? susp : dontSuspend), + yield(suspendForJobQueue(suspend, jobName)) +{ + // There's a non-empty jobName exactly if there's a non-empty Suspend. + assert(!(susp && jobName.empty())); +} + +JobQueueSuspender::JobQueueSuspender() : JobQueueSuspender({}, {}) { - return Continuation ([name, jobType, &jobQueue] (Callback const& cb) { - jobQueue.addJob (jobType, name, [cb] (Job&) { cb(); }); - }); } } // RPC diff --git a/src/ripple/rpc/tests/Coroutine.test.cpp b/src/ripple/rpc/tests/Coroutine.test.cpp index 4e125bddc..a0344745d 100644 --- a/src/ripple/rpc/tests/Coroutine.test.cpp +++ b/src/ripple/rpc/tests/Coroutine.test.cpp @@ -46,10 +46,9 @@ public: }; Strings result; - SuspendCallback suspendCallback ([&] (Suspend const& suspend) + Coroutine coroutine ([&] (Suspend const& suspend) { - Callback yield = suspendForContinuation ( - suspend, makeContinuation ("*")); + Callback yield ([=] () { suspend (makeContinuation ("*")); }); auto out = chunkedYieldingOutput (output, yield, chunkSize); out ("hello "); result.push_back (buffer); @@ -70,7 +69,7 @@ public: result.push_back (buffer); }); - Coroutine (suspendCallback).run(); + runOnCoroutine(UseCoroutines::yes, coroutine); expectCollectionEquals (result, expected); } diff --git a/src/ripple/server/impl/ServerHandlerImp.cpp b/src/ripple/server/impl/ServerHandlerImp.cpp index 4f7fd925f..7002ddfcd 100644 --- a/src/ripple/server/impl/ServerHandlerImp.cpp +++ b/src/ripple/server/impl/ServerHandlerImp.cpp @@ -63,8 +63,7 @@ ServerHandlerImp::ServerHandlerImp (Stoppable& parent, , m_networkOPs (networkOPs) , m_server (HTTP::make_Server( *this, io_service, deprecatedLogs().journal("Server"))) - , m_continuation (RPC::callbackOnJobQueue ( - jobQueue, "RPC-Coroutine", jtCLIENT)) + , m_jobQueue (jobQueue) { auto const& group (cm.group ("rpc")); rpc_requests_ = group->make_counter ("requests"); @@ -180,24 +179,15 @@ ServerHandlerImp::onRequest (HTTP::Session& session) auto detach = session.detach(); - if (setup_.yieldStrategy.useCoroutines == - RPC::YieldStrategy::UseCoroutines::yes) - { - RPC::SuspendCallback suspend ( + // We can copy `this` because ServerHandlerImp is a long-lasting singleton. + auto job = [this, detach] (Job&) { + RPC::runOnCoroutine( + setup_.yieldStrategy.useCoroutines, [this, detach] (RPC::Suspend const& suspend) { - processSession (detach, suspend); + processSession(detach, suspend); }); - RPC::Coroutine coroutine (suspend); - coroutine.run(); - } - else - { - getApp().getJobQueue().addJob ( - jtCLIENT, "RPC-Client", - [=] (Job&) { - processSession (detach, RPC::Suspend()); - }); - } + }; + m_jobQueue.addJob(jtCLIENT, "RPC-Client", job); } void @@ -240,15 +230,9 @@ ServerHandlerImp::processRequest ( Output&& output, Suspend const& suspend) { - auto yield = RPC::suspendForContinuation (suspend, m_continuation); - // Move off the webserver thread onto the JobQueue. - yield(); assert (getApp().getJobQueue().getJobForThread()); - if (auto count = setup_.yieldStrategy.byteYieldCount) - output = RPC::chunkedYieldingOutput (std::move (output), yield, count); - Json::Value jsonRPC; { Json::Reader reader; @@ -267,7 +251,6 @@ ServerHandlerImp::processRequest ( // VFALCO NOTE Except that "id" isn't included in the following errors. // Json::Value const& id = jsonRPC ["id"]; - Json::Value const& method = jsonRPC ["method"]; if (! method) { @@ -365,9 +348,11 @@ ServerHandlerImp::processRequest ( << "doRpcCommand:" << strMethod << ":" << params; auto const start (std::chrono::high_resolution_clock::now ()); + RPC::Context context { params, loadType, m_networkOPs, getApp().getLedgerMaster(), role, - nullptr, std::move (suspend), std::move (yield)}; + nullptr, {suspend, "RPC-Coroutine"}}; + std::string response; if (setup_.yieldStrategy.streaming == RPC::YieldStrategy::Streaming::yes) @@ -755,11 +740,12 @@ setup_Overlay (ServerHandler::Setup& setup) } ServerHandler::Setup -setup_ServerHandler (BasicConfig const& config, std::ostream& log) +setup_ServerHandler(BasicConfig const& config, std::ostream& log) { ServerHandler::Setup setup; - setup.ports = detail::parse_Ports (config, log); - setup.yieldStrategy = RPC::makeYieldStrategy (config["server"]); + setup.ports = detail::parse_Ports(config, log); + setup.yieldStrategy = RPC::makeYieldStrategy(config); + detail::setup_Client(setup); detail::setup_Overlay(setup); diff --git a/src/ripple/server/impl/ServerHandlerImp.h b/src/ripple/server/impl/ServerHandlerImp.h index 0c969a3c8..8f022870d 100644 --- a/src/ripple/server/impl/ServerHandlerImp.h +++ b/src/ripple/server/impl/ServerHandlerImp.h @@ -39,8 +39,8 @@ private: beast::Journal m_journal; NetworkOPs& m_networkOPs; std::unique_ptr m_server; - RPC::Continuation m_continuation; Setup setup_; + JobQueue& m_jobQueue; beast::insight::Counter rpc_requests_; beast::insight::Event rpc_io_; beast::insight::Event rpc_size_; diff --git a/src/ripple/websocket/Connection.h b/src/ripple/websocket/Connection.h index 5eae785be..aa1afb4f5 100644 --- a/src/ripple/websocket/Connection.h +++ b/src/ripple/websocket/Connection.h @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -32,10 +31,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include @@ -94,7 +95,7 @@ public: message_ptr getMessage (); bool checkMessage (); void returnMessage (message_ptr const&); - Json::Value invokeCommand (Json::Value& jvRequest); + Json::Value invokeCommand (Json::Value const& jvRequest, RPC::Suspend const&); // Generically implemented per version. void setPingTimer (); @@ -226,7 +227,8 @@ void ConnectionImpl ::returnMessage (message_ptr const& ptr) } template -Json::Value ConnectionImpl ::invokeCommand (Json::Value& jvRequest) +Json::Value ConnectionImpl ::invokeCommand ( + Json::Value const& jvRequest, RPC::Suspend const& suspend) { if (getConsumer().disconnect ()) { @@ -269,7 +271,8 @@ Json::Value ConnectionImpl ::invokeCommand (Json::Value& jvRequest) { RPC::Context context { jvRequest, loadType, m_netOPs, getApp().getLedgerMaster(), role, - std::dynamic_pointer_cast (this->shared_from_this ())}; + this->shared_from_this (), + {suspend, "WSClient::command"}}; RPC::doCommand (context, jvResult[jss::result]); } diff --git a/src/ripple/websocket/Handler.h b/src/ripple/websocket/Handler.h index 873996a76..688083524 100644 --- a/src/ripple/websocket/Handler.h +++ b/src/ripple/websocket/Handler.h @@ -56,6 +56,7 @@ beast::IP::Endpoint makeBeastEndpoint (beast::IP::Endpoint const& e) return e; } + template class HandlerImpl : public WebSocket::Handler @@ -293,7 +294,14 @@ public: getApp().getJobQueue ().addJob ( jtCLIENT, "WSClient::destroy", - std::bind (&ConnectionImpl ::destroy, ptr)); + [ptr] (Job&) { ConnectionImpl ::destroy(ptr); }); + } + + void message_job(std::string const& name, + connection_ptr const& cpClient) + { + auto msgs = [this, cpClient] (Job& j) { do_messages(j, cpClient); }; + getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::" + name, msgs); } void on_message (connection_ptr cpClient, message_ptr mpMessage) override @@ -327,9 +335,7 @@ public: } if (bRunQ) - getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::command", - std::bind (&HandlerImpl ::do_messages, - this, std::placeholders::_1, cpClient)); + message_job("command", cpClient); } void do_messages (Job& job, connection_ptr const& cpClient) @@ -364,17 +370,14 @@ public: } if (ptr->checkMessage ()) - getApp().getJobQueue ().addJob ( - jtCLIENT, "WSClient::more", - std::bind (&HandlerImpl ::do_messages, this, - std::placeholders::_1, cpClient)); + message_job("more", cpClient); } bool do_message (Job& job, const connection_ptr& cpClient, const wsc_ptr& conn, const message_ptr& mpMessage) { - Json::Value jvRequest; - Json::Reader jrReader; + Json::Value jvRequest; + Json::Reader jrReader; try { @@ -414,19 +417,38 @@ public: { Json::Value& jCmd = jvRequest[jss::command]; if (jCmd.isString()) - job.rename (std::string ("WSClient::") + jCmd.asString()); + job.rename ("WSClient::" + jCmd.asString()); } - auto const start (std::chrono::high_resolution_clock::now ()); - Json::Value const jvObj (conn->invokeCommand (jvRequest)); - std::string const buffer (to_string (jvObj)); + auto const start = std::chrono::high_resolution_clock::now (); + + struct HandlerCoroutineData + { + Json::Value jvRequest; + std::string buffer; + wsc_ptr conn; + }; + + auto data = std::make_shared(); + data->jvRequest = std::move(jvRequest); + data->conn = conn; + + auto coroutine = [data] (RPC::Suspend const& suspend) { + data->buffer = to_string( + data->conn->invokeCommand(data->jvRequest, suspend)); + }; + static auto const disableWebsocketsCoroutines = true; + auto useCoroutines = disableWebsocketsCoroutines ? + RPC::UseCoroutines::no : RPC::useCoroutines(desc_.config); + runOnCoroutine(useCoroutines, coroutine); + rpc_time_.notify (static_cast ( std::chrono::duration_cast ( std::chrono::high_resolution_clock::now () - start))); ++rpc_requests_; rpc_size_.notify (static_cast - (buffer.size ())); - send (cpClient, buffer, false); + (data->buffer.size())); + send (cpClient, data->buffer, false); } return true;