diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index f011ae8dd8..e082dba07f 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -1762,6 +1762,8 @@
+
+
True
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index ae9220566c..42aefb3fc4 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -2628,6 +2628,9 @@
ripple\app\ledger
+
+ ripple\app\ledger
+
ripple\app\ledger
diff --git a/src/ripple/server/impl/JSONRPCUtil.cpp b/src/ripple/server/impl/JSONRPCUtil.cpp
index 51800e7240..a730aafc2e 100644
--- a/src/ripple/server/impl/JSONRPCUtil.cpp
+++ b/src/ripple/server/impl/JSONRPCUtil.cpp
@@ -29,16 +29,6 @@ namespace ripple {
unsigned int const gMaxHTTPHeaderSize = 0x02000000;
-Json::Value JSONRPCError (int code, std::string const& message)
-{
- Json::Value error (Json::objectValue);
-
- error[jss::code] = Json::Value (code);
- error[jss::message] = Json::Value (message);
-
- return error;
-}
-
std::string getHTTPHeaderTimestamp ()
{
// CHECKME This is probably called often enough that optimizing it makes
@@ -54,160 +44,74 @@ std::string getHTTPHeaderTimestamp ()
return std::string (buffer);
}
-std::string HTTPReply (int nStatus, std::string const& strMsg)
+void HTTPReply (int nStatus, std::string const& content, RPC::Output output)
{
if (ShouldLog (lsTRACE, RPC))
{
- WriteLog (lsTRACE, RPC) << "HTTP Reply " << nStatus << " " << strMsg;
+ WriteLog (lsTRACE, RPC) << "HTTP Reply " << nStatus << " " << content;
}
- std::string ret;
-
if (nStatus == 401)
{
- ret.reserve (512);
-
- ret.append ("HTTP/1.0 401 Authorization Required\r\n");
- ret.append (getHTTPHeaderTimestamp ());
+ output ("HTTP/1.0 401 Authorization Required\r\n");
+ output (getHTTPHeaderTimestamp ());
// CHECKME this returns a different version than the replies below. Is
// this by design or an accident or should it be using
// BuildInfo::getFullVersionString () as well?
- ret.append ("Server: " + systemName () + "-json-rpc/v1");
- ret.append ("\r\n");
+ output ("Server: " + systemName () + "-json-rpc/v1");
+ output ("\r\n");
// Be careful in modifying this! If you change the contents you MUST
// update the Content-Length header as well to indicate the correct
// size of the data.
- ret.append ("WWW-Authenticate: Basic realm=\"jsonrpc\"\r\n"
+ output ("WWW-Authenticate: Basic realm=\"jsonrpc\"\r\n"
"Content-Type: text/html\r\n"
"Content-Length: 296\r\n"
"\r\n"
- "\r\n"
+ "\r\n"
"\r\n"
"
\r\n"
"Error\r\n"
- "\r\n"
+ "\r\n"
"\r\n"
"401 Unauthorized.
\r\n");
- return ret;
+ return;
}
- ret.reserve(256 + strMsg.length());
-
switch (nStatus)
{
- case 200: ret.append ("HTTP/1.1 200 OK\r\n"); break;
- case 400: ret.append ("HTTP/1.1 400 Bad Request\r\n"); break;
- case 403: ret.append ("HTTP/1.1 403 Forbidden\r\n"); break;
- case 404: ret.append ("HTTP/1.1 404 Not Found\r\n"); break;
- case 500: ret.append ("HTTP/1.1 500 Internal Server Error\r\n"); break;
+ case 200: output ("HTTP/1.1 200 OK\r\n"); break;
+ case 400: output ("HTTP/1.1 400 Bad Request\r\n"); break;
+ case 403: output ("HTTP/1.1 403 Forbidden\r\n"); break;
+ case 404: output ("HTTP/1.1 404 Not Found\r\n"); break;
+ case 500: output ("HTTP/1.1 500 Internal Server Error\r\n"); break;
}
- ret.append (getHTTPHeaderTimestamp ());
+ output (getHTTPHeaderTimestamp ());
- ret.append ("Connection: Keep-Alive\r\n");
+ output ("Connection: Keep-Alive\r\n"
+ "Content-Length: ");
// VFALCO TODO Determine if/when this header should be added
//if (getConfig ().RPC_ALLOW_REMOTE)
- // ret.append ("Access-Control-Allow-Origin: *\r\n");
+ // output ("Access-Control-Allow-Origin: *\r\n");
- ret.append ("Content-Length: ");
- ret.append (std::to_string(strMsg.size () + 2));
- ret.append ("\r\n");
+ output (std::to_string(content.size () + 2));
+ output ("\r\n"
+ "Content-Type: application/json; charset=UTF-8\r\n");
- ret.append ("Content-Type: application/json; charset=UTF-8\r\n");
-
- ret.append ("Server: " + systemName () + "-json-rpc/");
- ret.append (BuildInfo::getFullVersionString ());
- ret.append ("\r\n");
-
- ret.append ("\r\n");
- ret.append (strMsg);
- ret.append ("\r\n");
-
- return ret;
-}
-
-int ReadHTTPStatus (std::basic_istream& stream)
-{
- std::string str;
- getline (stream, str);
- std::vector vWords;
- boost::split (vWords, str, boost::is_any_of (" "));
-
- if (vWords.size () < 2)
- return 500;
-
- return atoi (vWords[1].c_str ());
-}
-
-int ReadHTTPHeader (std::basic_istream& stream, std::map& mapHeadersRet)
-{
- int nLen = 0;
-
- for (;;)
- {
- std::string str;
- std::getline (stream, str);
-
- if (str.empty () || str == "\r")
- break;
-
- std::string::size_type nColon = str.find (":");
-
- if (nColon != std::string::npos)
- {
- std::string strHeader = str.substr (0, nColon);
- boost::trim (strHeader);
- boost::to_lower (strHeader);
- std::string strValue = str.substr (nColon + 1);
- boost::trim (strValue);
- mapHeadersRet[strHeader] = strValue;
-
- if (strHeader == "content-length")
- nLen = atoi (strValue.c_str ());
- }
- }
-
- return nLen;
-}
-
-int ReadHTTP (std::basic_istream& stream, std::map& mapHeadersRet,
- std::string& strMessageRet)
-{
- mapHeadersRet.clear ();
- strMessageRet = "";
-
- // Read status
- int nStatus = ReadHTTPStatus (stream);
-
- // Read header
- int nLen = ReadHTTPHeader (stream, mapHeadersRet);
-
- if (nLen < 0 || nLen > gMaxHTTPHeaderSize)
- return 500;
-
- // Read message
- if (nLen > 0)
- {
- std::vector vch (nLen);
- stream.read (&vch[0], nLen);
- strMessageRet = std::string (vch.begin (), vch.end ());
- }
-
- return nStatus;
-}
-
-std::string JSONRPCReply (Json::Value const& result, Json::Value const& error, Json::Value const& id)
-{
- Json::Value reply (Json::objectValue);
- reply[jss::result] = result;
- //reply["error"]=error;
- //reply["id"]=id;
- return to_string (reply) + "\n";
+ output ("Server: " + systemName () + "-json-rpc/");
+ output (BuildInfo::getFullVersionString ());
+ output ("\r\n"
+ "\r\n");
+ output (content);
+ output ("\r\n");
}
} // ripple
diff --git a/src/ripple/server/impl/JSONRPCUtil.h b/src/ripple/server/impl/JSONRPCUtil.h
index 3ef5ad458d..8f167b5aff 100644
--- a/src/ripple/server/impl/JSONRPCUtil.h
+++ b/src/ripple/server/impl/JSONRPCUtil.h
@@ -21,22 +21,11 @@
#define RIPPLE_SERVER_JSONRPCUTIL_H_INCLUDED
#include
+#include
namespace ripple {
-// VFALCO These functions are all deprecated they are inefficient and have poor signatures.
-
-extern std::string JSONRPCReply (Json::Value const& result, Json::Value const& error, Json::Value const& id);
-
-extern Json::Value JSONRPCError (int code, std::string const& message);
-
-// VFALCO This needs to be rewritten to use beast::http::message
-extern std::string HTTPReply (int nStatus, std::string const& strMsg);
-
-// VFALCO NOTE This one looks like it does some sort of stream i/o
-extern int ReadHTTP (std::basic_istream& stream,
- std::map& mapHeadersRet,
- std::string& strMessageRet);
+void HTTPReply (int nStatus, std::string const& strMsg, RPC::Output);
} // ripple
diff --git a/src/ripple/server/impl/ServerHandlerImp.cpp b/src/ripple/server/impl/ServerHandlerImp.cpp
index 1bf021db99..5024934e26 100644
--- a/src/ripple/server/impl/ServerHandlerImp.cpp
+++ b/src/ripple/server/impl/ServerHandlerImp.cpp
@@ -28,10 +28,12 @@
#include
#include
#include
+#include
#include
#include //
#include
#include
+#include
#include
#include
#include
@@ -146,6 +148,36 @@ ServerHandlerImp::onHandoff (HTTP::Session& session,
return Handoff{};
}
+static inline
+RPC::Output makeOutput (HTTP::Session& session)
+{
+ return [&](boost::string_ref const& b)
+ {
+ session.write (b.data(), b.size());
+ };
+}
+
+namespace {
+
+void runCoroutine (RPC::Coroutine coroutine, JobQueue& jobQueue)
+{
+ if (!coroutine)
+ return;
+ coroutine();
+ if (!coroutine)
+ return;
+
+ // Reschedule the job on the job queue.
+ jobQueue.addJob (
+ jtCLIENT, "RPC-Coroutine",
+ [coroutine, &jobQueue] (Job&)
+ {
+ runCoroutine (coroutine, jobQueue);
+ });
+}
+
+} // namespace
+
void
ServerHandlerImp::onRequest (HTTP::Session& session)
{
@@ -153,14 +185,17 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
if (! authorized (session.port(),
build_map(session.request().headers)))
{
- session.write (HTTPReply (403, "Forbidden"));
+ HTTPReply (403, "Forbidden", makeOutput (session));
session.close (true);
return;
}
- m_jobQueue.addJob (jtCLIENT, "RPC-Client", std::bind (
- &ServerHandlerImp::processSession, this, std::placeholders::_1,
- session.detach()));
+ auto detach = session.detach();
+
+ RPC::Coroutine::YieldFunction yieldFunction =
+ [this, detach] (Yield const& y) { processSession (detach, y); };
+ RPC::Coroutine coroutine (yieldFunction);
+ runCoroutine (std::move(coroutine), m_jobQueue);
}
void
@@ -177,36 +212,44 @@ ServerHandlerImp::onStopped (HTTP::Server&)
//------------------------------------------------------------------------------
+// ServerHandlerImp will yield after emitting serverOutputChunkSize bytes.
+// If this value is 0, it means "yield after each output"
+// A negative value means "never yield"
+// TODO(tom): negotiate a spot for this in Configs.
+const int serverOutputChunkSize = -1;
+
// Dispatched on the job queue
void
-ServerHandlerImp::processSession (Job& job,
- std::shared_ptr const& session)
+ServerHandlerImp::processSession (
+ std::shared_ptr const& session, Yield const& yield)
{
- session->write (processRequest (session->port(),
- to_string(session->body()), session->remoteAddress().at_port(0)));
+ auto output = makeOutput (*session);
+ if (serverOutputChunkSize >= 0)
+ {
+ output = RPC::chunkedYieldingOutput (
+ output, yield, serverOutputChunkSize);
+ }
+
+ processRequest (
+ session->port(),
+ to_string (session->body()),
+ session->remoteAddress().at_port (0),
+ output,
+ yield);
if (session->request().keep_alive())
- {
session->complete();
- }
else
- {
session->close (true);
- }
}
-std::string
-ServerHandlerImp::createResponse (
- int statusCode,
- std::string const& description)
-{
- return HTTPReply (statusCode, description);
-}
-
-// VFALCO ARGH! returning a single std::string for the entire response?
-std::string
-ServerHandlerImp::processRequest (HTTP::Port const& port,
- std::string const& request, beast::IP::Endpoint const& remoteIPAddress)
+void
+ServerHandlerImp::processRequest (
+ HTTP::Port const& port,
+ std::string const& request,
+ beast::IP::Endpoint const& remoteIPAddress,
+ Output output,
+ Yield yield)
{
Json::Value jsonRPC;
{
@@ -216,7 +259,8 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
jsonRPC.isNull () ||
! jsonRPC.isObject ())
{
- return createResponse (400, "Unable to parse request");
+ HTTPReply (400, "Unable to parse request", output);
+ return;
}
}
@@ -239,25 +283,36 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
if (usage.disconnect ())
- return createResponse (503, "Server is overloaded");
+ {
+ HTTPReply (503, "Server is overloaded", output);
+ return;
+ }
// Parse id now so errors from here on will have the id
//
// VFALCO NOTE Except that "id" isn't included in the following errors.
//
Json::Value const id = jsonRPC ["id"];
-
Json::Value const method = jsonRPC ["method"];
if (method.isNull ())
- return createResponse (400, "Null method");
+ {
+ HTTPReply (400, "Null method", output);
+ return;
+ }
if (! method.isString ())
- return createResponse (400, "method is not string");
+ {
+ HTTPReply (400, "method is not string", output);
+ return;
+ }
std::string strMethod = method.asString ();
if (strMethod.empty())
- return createResponse (400, "method is empty");
+ {
+ HTTPReply (400, "method is empty", output);
+ return;
+ }
// Parse params
Json::Value params = jsonRPC ["params"];
@@ -266,7 +321,10 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
params = Json::Value (Json::arrayValue);
else if (!params.isArray ())
- return HTTPReply (400, "params unparseable");
+ {
+ HTTPReply (400, "params unparseable", output);
+ return;
+ }
// VFALCO TODO Shouldn't we handle this earlier?
//
@@ -275,25 +333,28 @@ ServerHandlerImp::processRequest (HTTP::Port const& port,
// VFALCO TODO Needs implementing
// FIXME Needs implementing
// XXX This needs rate limiting to prevent brute forcing password.
- return HTTPReply (403, "Forbidden");
+ HTTPReply (403, "Forbidden", output);
+ return;
}
- std::string response;
RPCHandler rpcHandler (m_networkOPs);
Resource::Charge loadType = Resource::feeReferenceRPC;
m_journal.debug << "Query: " << strMethod << params;
- Json::Value const result (rpcHandler.doRpcCommand (
- strMethod, params, role, loadType));
+ auto result = rpcHandler.doRpcCommand (
+ strMethod, params, role, loadType, yield);
m_journal.debug << "Reply: " << result;
usage.charge (loadType);
- response = JSONRPCReply (result, Json::Value (), id);
+ Json::Value reply (Json::objectValue);
+ reply[jss::result] = std::move (result);
+ auto response = to_string (reply);
+ response += '\n';
- return createResponse (200, response);
+ HTTPReply (200, response, output);
}
//------------------------------------------------------------------------------
diff --git a/src/ripple/server/impl/ServerHandlerImp.h b/src/ripple/server/impl/ServerHandlerImp.h
index 8f69241119..ccca56c1b8 100644
--- a/src/ripple/server/impl/ServerHandlerImp.h
+++ b/src/ripple/server/impl/ServerHandlerImp.h
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
namespace ripple {
@@ -49,6 +50,9 @@ public:
~ServerHandlerImp();
private:
+ using Output = RPC::Output;
+ using Yield = RPC::Yield;
+
void
setup (Setup const& setup, beast::Journal journal) override;
@@ -104,15 +108,11 @@ private:
//--------------------------------------------------------------------------
void
- processSession (Job& job,
- std::shared_ptr const& session);
+ processSession (std::shared_ptr const&, Yield const&);
- std::string
- createResponse (int statusCode, std::string const& description);
-
- std::string
+ void
processRequest (HTTP::Port const& port, std::string const& request,
- beast::IP::Endpoint const& remoteIPAddress);
+ beast::IP::Endpoint const& remoteIPAddress, Output, Yield);
//
// PropertyStream