Send output incrementally in ServerHandlerImp.

This commit is contained in:
Tom Ritchford
2014-12-02 15:26:42 -05:00
committed by Vinnie Falco
parent 167f4666e2
commit fc9a23d6d4
6 changed files with 144 additions and 185 deletions

View File

@@ -28,10 +28,12 @@
#include <ripple/overlay/Overlay.h>
#include <ripple/resource/Manager.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/Coroutine.h>
#include <beast/crypto/base64.h>
#include <beast/cxx14/algorithm.h> // <algorithm>
#include <beast/http/rfc2616.h>
#include <boost/algorithm/string.hpp>
#include <boost/type_traits.hpp>
#include <boost/optional.hpp>
#include <boost/regex.hpp>
#include <algorithm>
@@ -146,6 +148,36 @@ ServerHandlerImp::onHandoff (HTTP::Session& session,
return Handoff{};
}
static inline
RPC::Output makeOutput (HTTP::Session& session)
{
return [&](boost::string_ref const& b)
{
session.write (b.data(), b.size());
};
}
namespace {
void runCoroutine (RPC::Coroutine coroutine, JobQueue& jobQueue)
{
if (!coroutine)
return;
coroutine();
if (!coroutine)
return;
// Reschedule the job on the job queue.
jobQueue.addJob (
jtCLIENT, "RPC-Coroutine",
[coroutine, &jobQueue] (Job&)
{
runCoroutine (coroutine, jobQueue);
});
}
} // namespace
void
ServerHandlerImp::onRequest (HTTP::Session& session)
{
@@ -153,14 +185,17 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
if (! authorized (session.port(),
build_map(session.request().headers)))
{
session.write (HTTPReply (403, "Forbidden"));
HTTPReply (403, "Forbidden", makeOutput (session));
session.close (true);
return;
}
m_jobQueue.addJob (jtCLIENT, "RPC-Client", std::bind (
&ServerHandlerImp::processSession, this, std::placeholders::_1,
session.detach()));
auto detach = session.detach();
RPC::Coroutine::YieldFunction yieldFunction =
[this, detach] (Yield const& y) { processSession (detach, y); };
RPC::Coroutine coroutine (yieldFunction);
runCoroutine (std::move(coroutine), m_jobQueue);
}
void
@@ -177,36 +212,44 @@ ServerHandlerImp::onStopped (HTTP::Server&)
//------------------------------------------------------------------------------
// ServerHandlerImp will yield after emitting serverOutputChunkSize bytes.
// If this value is 0, it means "yield after each output"
// A negative value means "never yield"
// TODO(tom): negotiate a spot for this in Configs.
const int serverOutputChunkSize = -1;
// Dispatched on the job queue
void
ServerHandlerImp::processSession (Job& job,
std::shared_ptr<HTTP::Session> const& session)
ServerHandlerImp::processSession (
std::shared_ptr<HTTP::Session> const& session, Yield const& yield)
{
session->write (processRequest (session->port(),
to_string(session->body()), session->remoteAddress().at_port(0)));
auto output = makeOutput (*session);
if (serverOutputChunkSize >= 0)
{
output = RPC::chunkedYieldingOutput (
output, yield, serverOutputChunkSize);
}
processRequest (
session->port(),
to_string (session->body()),
session->remoteAddress().at_port (0),
output,
yield);
if (session->request().keep_alive())
{
session->complete();
}
else
{
session->close (true);
}
}
std::string
ServerHandlerImp::createResponse (
int statusCode,
std::string const& description)
{
return HTTPReply (statusCode, description);
}
// VFALCO ARGH! returning a single std::string for the entire response?
std::string
ServerHandlerImp::processRequest (HTTP::Port const& port,
std::string const& request, beast::IP::Endpoint const& remoteIPAddress)
void
ServerHandlerImp::processRequest (
HTTP::Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output output,
Yield yield)
{
Json::Value jsonRPC;
{
@@ -216,7 +259,8 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
jsonRPC.isNull () ||
! jsonRPC.isObject ())
{
return createResponse (400, "Unable to parse request");
HTTPReply (400, "Unable to parse request", output);
return;
}
}
@@ -239,25 +283,36 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
if (usage.disconnect ())
return createResponse (503, "Server is overloaded");
{
HTTPReply (503, "Server is overloaded", output);
return;
}
// Parse id now so errors from here on will have the id
//
// VFALCO NOTE Except that "id" isn't included in the following errors.
//
Json::Value const id = jsonRPC ["id"];
Json::Value const method = jsonRPC ["method"];
if (method.isNull ())
return createResponse (400, "Null method");
{
HTTPReply (400, "Null method", output);
return;
}
if (! method.isString ())
return createResponse (400, "method is not string");
{
HTTPReply (400, "method is not string", output);
return;
}
std::string strMethod = method.asString ();
if (strMethod.empty())
return createResponse (400, "method is empty");
{
HTTPReply (400, "method is empty", output);
return;
}
// Parse params
Json::Value params = jsonRPC ["params"];
@@ -266,7 +321,10 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
params = Json::Value (Json::arrayValue);
else if (!params.isArray ())
return HTTPReply (400, "params unparseable");
{
HTTPReply (400, "params unparseable", output);
return;
}
// VFALCO TODO Shouldn't we handle this earlier?
//
@@ -275,25 +333,28 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
// VFALCO TODO Needs implementing
// FIXME Needs implementing
// XXX This needs rate limiting to prevent brute forcing password.
return HTTPReply (403, "Forbidden");
HTTPReply (403, "Forbidden", output);
return;
}
std::string response;
RPCHandler rpcHandler (m_networkOPs);
Resource::Charge loadType = Resource::feeReferenceRPC;
m_journal.debug << "Query: " << strMethod << params;
Json::Value const result (rpcHandler.doRpcCommand (
strMethod, params, role, loadType));
auto result = rpcHandler.doRpcCommand (
strMethod, params, role, loadType, yield);
m_journal.debug << "Reply: " << result;
usage.charge (loadType);
response = JSONRPCReply (result, Json::Value (), id);
Json::Value reply (Json::objectValue);
reply[jss::result] = std::move (result);
auto response = to_string (reply);
response += '\n';
return createResponse (200, response);
HTTPReply (200, response, output);
}
//------------------------------------------------------------------------------