Configuration for yielding RPC server.

This commit is contained in:
Tom Ritchford
2014-12-10 13:21:04 -05:00
committed by Vinnie Falco
parent a360c481c2
commit fce77c9372
13 changed files with 102 additions and 80 deletions

View File

@@ -559,7 +559,7 @@ public:
WriteLog (lsWARNING, LedgerConsensus) << mPrevLedgerHash WriteLog (lsWARNING, LedgerConsensus) << mPrevLedgerHash
<< " to " << netLgr; << " to " << netLgr;
WriteLog (lsWARNING, LedgerConsensus) WriteLog (lsWARNING, LedgerConsensus)
<< ripple::getJson (*mPreviousLedger, 0); << ripple::getJson (*mPreviousLedger);
if (ShouldLog (lsDEBUG, LedgerConsensus)) if (ShouldLog (lsDEBUG, LedgerConsensus))
{ {

View File

@@ -643,7 +643,7 @@ bool Ledger::saveValidatedLedger (bool current)
if (!getAccountHash ().isNonZero ()) if (!getAccountHash ().isNonZero ())
{ {
WriteLog (lsFATAL, Ledger) << "AH is zero: " WriteLog (lsFATAL, Ledger) << "AH is zero: "
<< getJson (*this, 0); << getJson (*this);
assert (false); assert (false);
} }
@@ -920,7 +920,7 @@ Ledger::pointer Ledger::getSQL (std::string const& sql)
{ {
WriteLog (lsERROR, Ledger) << "Failed on ledger"; WriteLog (lsERROR, Ledger) << "Failed on ledger";
Json::Value p; Json::Value p;
addJson (*ret, p, LEDGER_JSON_FULL); addJson (p, {*ret, LEDGER_JSON_FULL});
WriteLog (lsERROR, Ledger) << p; WriteLog (lsERROR, Ledger) << p;
} }
@@ -1648,7 +1648,7 @@ bool Ledger::assertSane () const
WriteLog (lsFATAL, Ledger) << "ledger is not sane"; WriteLog (lsFATAL, Ledger) << "ledger is not sane";
Json::Value j = getJson (*this, 0); Json::Value j = getJson (*this);
j [jss::accountTreeHash] = to_string (mAccountHash); j [jss::accountTreeHash] = to_string (mAccountHash);
j [jss::transTreeHash] = to_string (mTransHash); j [jss::transTreeHash] = to_string (mTransHash);

View File

@@ -49,10 +49,10 @@ enum LedgerStateParms
lepERROR = 32, // error lepERROR = 32, // error
}; };
#define LEDGER_JSON_DUMP_TXRP 0x10000000 #define LEDGER_JSON_DUMP_TXRP 0x1
#define LEDGER_JSON_DUMP_STATE 0x20000000 #define LEDGER_JSON_DUMP_STATE 0x2
#define LEDGER_JSON_EXPAND 0x40000000 #define LEDGER_JSON_EXPAND 0x4
#define LEDGER_JSON_FULL 0x80000000 #define LEDGER_JSON_FULL 0x8
class SqliteStatement; class SqliteStatement;

View File

@@ -30,35 +30,49 @@
namespace ripple { namespace ripple {
struct LedgerFill
{
LedgerFill (Ledger const& l,
int o = 0,
RPC::Yield const& y = {},
RPC::YieldStrategy const& ys = {})
: ledger (l),
options (o),
yield (y),
yieldStrategy (ys)
{
}
Ledger const& ledger;
int options;
RPC::Yield yield;
RPC::YieldStrategy yieldStrategy;
};
/** Given a Ledger, options, and a generic Object that has Json semantics, /** Given a Ledger, options, and a generic Object that has Json semantics,
fill the Object with a description of the ledger. fill the Object with a description of the ledger.
*/ */
template <class Object> template <class Object>
void fillJson (Ledger const&, Object&, int options, void fillJson (Object&, LedgerFill const&);
RPC::Yield const& yield = {});
/** Add Json to an existing generic Object. */ /** Add Json to an existing generic Object. */
template <class Object> template <class Object>
void addJson (Ledger const&, Object&, int options, void addJson (Object&, LedgerFill const&);
RPC::Yield const& yield = {});
/** Return a new Json::Value representing the ledger with given options.*/ /** Return a new Json::Value representing the ledger with given options.*/
Json::Value getJson (Ledger const&, int options, Json::Value getJson (LedgerFill const&);
RPC::Yield const& yield = {});
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Implementations. // Implementations.
auto const accountYieldCount = 0;
auto const transactionYieldCount = 0;
template <typename Object> template <typename Object>
void fillJson ( void fillJson (Object& json, LedgerFill const& fill)
Ledger const& ledger, Object& json, int options, RPC::Yield const& yield)
{ {
bool const bFull (options & LEDGER_JSON_FULL); auto const& ledger = fill.ledger;
bool const bExpand (options & LEDGER_JSON_EXPAND);
bool const bFull (fill.options & LEDGER_JSON_FULL);
bool const bExpand (fill.options & LEDGER_JSON_EXPAND);
// DEPRECATED // DEPRECATED
json[jss::seqNum] = to_string (ledger.getLedgerSeq()); json[jss::seqNum] = to_string (ledger.getLedgerSeq());
@@ -100,20 +114,17 @@ void fillJson (
} }
auto &transactionMap = ledger.peekTransactionMap(); auto &transactionMap = ledger.peekTransactionMap();
if (transactionMap && (bFull || options & LEDGER_JSON_DUMP_TXRP)) if (transactionMap && (bFull || fill.options & LEDGER_JSON_DUMP_TXRP))
{ {
auto&& txns = RPC::addArray (json, jss::transactions); auto&& txns = RPC::addArray (json, jss::transactions);
SHAMapTreeNode::TNType type; SHAMapTreeNode::TNType type;
int count = 0; RPC::CountedYield count (
fill.yieldStrategy.transactionYieldCount, fill.yield);
for (auto item = transactionMap->peekFirstItem (type); item; for (auto item = transactionMap->peekFirstItem (type); item;
item = transactionMap->peekNextItem (item->getTag (), type)) item = transactionMap->peekNextItem (item->getTag (), type))
{ {
if (transactionYieldCount && ++count >= transactionYieldCount) count.yield();
{
yield();
count = 0;
}
if (bFull || bExpand) if (bFull || bExpand)
{ {
if (type == SHAMapTreeNode::tnTRANSACTION_NM) if (type == SHAMapTreeNode::tnTRANSACTION_NM)
@@ -148,33 +159,26 @@ void fillJson (
} }
auto& accountStateMap = ledger.peekAccountStateMap(); auto& accountStateMap = ledger.peekAccountStateMap();
if (accountStateMap && (bFull || options & LEDGER_JSON_DUMP_STATE)) if (accountStateMap && (bFull || fill.options & LEDGER_JSON_DUMP_STATE))
{ {
auto&& array = RPC::addArray (json, jss::accountState); auto&& array = RPC::addArray (json, jss::accountState);
auto count = 0; RPC::CountedYield count (
fill.yieldStrategy.accountYieldCount, fill.yield);
if (bFull || bExpand) if (bFull || bExpand)
{ {
ledger.visitStateItems ( ledger.visitStateItems (
[&array, &count, &yield] (SLE::ref sle) [&array, &count, &fill] (SLE::ref sle)
{ {
if (accountYieldCount && ++count >= accountYieldCount) count.yield();
{
yield();
count = 0;
}
array.append (sle->getJson(0)); array.append (sle->getJson(0));
}); });
} }
else else
{ {
accountStateMap->visitLeaves( accountStateMap->visitLeaves(
[&array, &count, &yield] (SHAMapItem::ref smi) [&array, &count, &fill] (SHAMapItem::ref smi)
{ {
if (accountYieldCount && ++count >= accountYieldCount) count.yield();
{
yield();
count = 0;
}
array.append (to_string(smi->getTag ())); array.append (to_string(smi->getTag ()));
}); });
} }
@@ -183,18 +187,17 @@ void fillJson (
/** Add Json to an existing generic Object. */ /** Add Json to an existing generic Object. */
template <class Object> template <class Object>
void addJson ( void addJson (Object& json, LedgerFill const& fill)
Ledger const& ledger, Object& json, int options, RPC::Yield const& yield)
{ {
auto&& object = RPC::addObject (json, jss::ledger); auto&& object = RPC::addObject (json, jss::ledger);
fillJson (ledger, object, options, yield); fillJson (object, fill);
} }
inline inline
Json::Value getJson (Ledger const& ledger, int options, RPC::Yield const& yield) Json::Value getJson (LedgerFill const& fill)
{ {
Json::Value json; Json::Value json;
fillJson (ledger, json, options, yield); fillJson (json, fill);
return json; return json;
} }

View File

@@ -1409,7 +1409,7 @@ bool NetworkOPsImp::checkLastClosedLedger (
} }
m_journal.warning << "We are not running on the consensus ledger"; m_journal.warning << "We are not running on the consensus ledger";
m_journal.info << "Our LCL: " << getJson (*ourClosed, 0); m_journal.info << "Our LCL: " << getJson (*ourClosed);
m_journal.info << "Net LCL " << closedLedger; m_journal.info << "Net LCL " << closedLedger;
if ((mMode == omTRACKING) || (mMode == omFULL)) if ((mMode == omTRACKING) || (mMode == omFULL))

View File

@@ -20,20 +20,22 @@
#ifndef RIPPLE_APP_RPC_HANDLER #ifndef RIPPLE_APP_RPC_HANDLER
#define RIPPLE_APP_RPC_HANDLER #define RIPPLE_APP_RPC_HANDLER
#include <ripple/server/Role.h>
#include <ripple/core/Config.h> #include <ripple/core/Config.h>
#include <ripple/net/InfoSub.h> #include <ripple/net/InfoSub.h>
#include <ripple/rpc/impl/Context.h>
#include <ripple/rpc/Status.h> #include <ripple/rpc/Status.h>
#include <ripple/rpc/impl/Context.h>
namespace ripple { namespace ripple {
namespace RPC { namespace RPC {
struct Context;
struct YieldStrategy;
/** Execute an RPC command and store the results in a Json::Value. */ /** Execute an RPC command and store the results in a Json::Value. */
Status doCommand (RPC::Context&, Json::Value&); Status doCommand (RPC::Context&, Json::Value&, YieldStrategy const& s = {});
/** Execute an RPC command and store the results in an std::string. */ /** Execute an RPC command and store the results in an std::string. */
void executeRPC (RPC::Context&, std::string&); void executeRPC (RPC::Context&, std::string&, YieldStrategy const& s = {});
/** Temporary flag to enable RPCs. */ /** Temporary flag to enable RPCs. */
auto const streamingRPC = false; auto const streamingRPC = false;

View File

@@ -78,7 +78,7 @@ void LedgerHandler::writeResult (Object& value)
if (ledger_) if (ledger_)
{ {
RPC::copyFrom (value, result_); RPC::copyFrom (value, result_);
addJson (*ledger_, value, options_, context_.yield); addJson (value, {*ledger_, options_, context_.yield});
} }
else else
{ {
@@ -86,13 +86,12 @@ void LedgerHandler::writeResult (Object& value)
auto& yield = context_.yield; auto& yield = context_.yield;
{ {
auto&& closed = RPC::addObject (value, jss::closed); auto&& closed = RPC::addObject (value, jss::closed);
addJson (*master.getClosedLedger(), closed, 0, yield); addJson (closed, {*master.getClosedLedger(), 0, yield});
} }
{ {
auto&& open = RPC::addObject (value, jss::open); auto&& open = RPC::addObject (value, jss::open);
addJson (*master.getCurrentLedger(), open, 0, yield); addJson (open, {*master.getCurrentLedger(), 0, yield});
} }
} }
} }

View File

@@ -42,7 +42,7 @@ Json::Value doLedgerHeader (RPC::Context& context)
// This information isn't verified: they should only use it if they trust // This information isn't verified: they should only use it if they trust
// us. // us.
addJson (*lpLedger, jvResult, 0); addJson (jvResult, {*lpLedger, 0});
return jvResult; return jvResult;
} }

View File

@@ -94,7 +94,7 @@ Json::Value doLedgerRequest (RPC::Context& context)
// We already have the ledger they want // We already have the ledger they want
Json::Value jvResult; Json::Value jvResult;
jvResult[jss::ledger_index] = ledger->getLedgerSeq(); jvResult[jss::ledger_index] = ledger->getLedgerSeq();
addJson (*ledger, jvResult, 0); addJson (jvResult, {*ledger, 0});
return jvResult; return jvResult;
} }
else else

View File

@@ -22,6 +22,7 @@
#include <ripple/core/Config.h> #include <ripple/core/Config.h>
#include <ripple/rpc/Yield.h> #include <ripple/rpc/Yield.h>
#include <ripple/server/Role.h>
#include <ripple/server/ServerHandler.h> #include <ripple/server/ServerHandler.h>
namespace ripple { namespace ripple {

View File

@@ -22,11 +22,17 @@
#include <ripple/protocol/JsonFields.h> #include <ripple/protocol/JsonFields.h>
#include <ripple/net/RPCErr.h> #include <ripple/net/RPCErr.h>
#include <ripple/rpc/RPCHandler.h> #include <ripple/rpc/RPCHandler.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/impl/Tuning.h> #include <ripple/rpc/impl/Tuning.h>
#include <ripple/rpc/impl/Context.h> #include <ripple/rpc/impl/Context.h>
#include <ripple/rpc/impl/Handler.h> #include <ripple/rpc/impl/Handler.h>
#include <ripple/rpc/impl/WriteJson.h> #include <ripple/rpc/impl/WriteJson.h>
#include <ripple/server/Role.h>
#include <ripple/core/Config.h>
#include <ripple/net/InfoSub.h>
#include <ripple/rpc/impl/Context.h>
namespace ripple { namespace ripple {
namespace RPC { namespace RPC {
@@ -198,7 +204,8 @@ void getResult (
} // namespace } // namespace
Status doCommand (RPC::Context& context, Json::Value& result) Status doCommand (
RPC::Context& context, Json::Value& result, YieldStrategy const&)
{ {
boost::optional <Handler const&> handler; boost::optional <Handler const&> handler;
if (auto error = fillHandler (context, handler)) if (auto error = fillHandler (context, handler))
@@ -214,7 +221,8 @@ Status doCommand (RPC::Context& context, Json::Value& result)
} }
/** Execute an RPC command and store the results in a string. */ /** Execute an RPC command and store the results in a string. */
void executeRPC (RPC::Context& context, std::string& output) void executeRPC (
RPC::Context& context, std::string& output, YieldStrategy const& strategy)
{ {
boost::optional <Handler const&> handler; boost::optional <Handler const&> handler;
if (auto error = fillHandler (context, handler)) if (auto error = fillHandler (context, handler))
@@ -232,8 +240,7 @@ void executeRPC (RPC::Context& context, std::string& output)
{ {
auto object = Json::Value (Json::objectValue); auto object = Json::Value (Json::objectValue);
getResult (context, method, object, handler->name_); getResult (context, method, object, handler->name_);
if (strategy.streaming == YieldStrategy::Streaming::yes)
if (streamingRPC)
output = jsonAsString (object); output = jsonAsString (object);
else else
output = to_string (object); output = to_string (object);

View File

@@ -23,6 +23,7 @@
#include <ripple/basics/BasicConfig.h> #include <ripple/basics/BasicConfig.h>
#include <ripple/server/Port.h> #include <ripple/server/Port.h>
#include <ripple/overlay/Overlay.h> #include <ripple/overlay/Overlay.h>
#include <ripple/rpc/Yield.h>
#include <beast/utility/Journal.h> #include <beast/utility/Journal.h>
#include <beast/utility/PropertyStream.h> #include <beast/utility/PropertyStream.h>
#include <beast/cxx14/memory.h> // <memory> #include <beast/cxx14/memory.h> // <memory>
@@ -66,6 +67,7 @@ public:
}; };
overlay_t overlay; overlay_t overlay;
RPC::YieldStrategy yieldStrategy;
void void
makeContexts(); makeContexts();

View File

@@ -192,9 +192,19 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
auto detach = session.detach(); auto detach = session.detach();
RPC::Coroutine::YieldFunction yieldFunction = if (setup_.yieldStrategy.useCoroutines ==
[this, detach] (Yield const& y) { processSession (detach, y); }; RPC::YieldStrategy::UseCoroutines::yes)
runCoroutine (RPC::Coroutine (yieldFunction), m_jobQueue); {
RPC::Coroutine::YieldFunction yieldFunction =
[this, detach] (Yield const& y) { processSession (detach, y); };
runCoroutine (RPC::Coroutine (yieldFunction), m_jobQueue);
}
else
{
m_jobQueue.addJob (
jtCLIENT, "RPC-Client",
[=] (Job&) { processSession (detach, RPC::Yield{}); });
}
} }
void void
@@ -211,23 +221,14 @@ ServerHandlerImp::onStopped (HTTP::Server&)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// ServerHandlerImp will yield after emitting serverOutputChunkSize bytes.
// If this value is 0, it means "yield after each output"
// A negative value means "never yield"
// TODO(tom): negotiate a spot for this in Configs.
const int serverOutputChunkSize = -1;
// Dispatched on the job queue // Dispatched on the job queue
void void
ServerHandlerImp::processSession ( ServerHandlerImp::processSession (
std::shared_ptr<HTTP::Session> const& session, Yield const& yield) std::shared_ptr<HTTP::Session> const& session, Yield const& yield)
{ {
auto output = makeOutput (*session); auto output = makeOutput (*session);
if (serverOutputChunkSize >= 0) if (auto byteYieldCount = setup_.yieldStrategy.byteYieldCount)
{ output = RPC::chunkedYieldingOutput (output, yield, byteYieldCount);
output = RPC::chunkedYieldingOutput (
output, yield, serverOutputChunkSize);
}
processRequest ( processRequest (
session->port(), session->port(),
@@ -313,7 +314,12 @@ ServerHandlerImp::processRequest (
return; return;
} }
// Parse params // Extract request parameters from the request Json as `params`.
//
// If the field "params" is empty, `params` is an empty object.
//
// Otherwise, that field must be an array of length 1 (why?)
// and we take that first entry and validate that it's an object.
Json::Value params = jsonRPC ["params"]; Json::Value params = jsonRPC ["params"];
if (params.isNull () || params.empty()) if (params.isNull () || params.empty())
@@ -357,14 +363,14 @@ ServerHandlerImp::processRequest (
RPC::Context context {params, loadType, m_networkOPs, role, nullptr, yield}; RPC::Context context {params, loadType, m_networkOPs, role, nullptr, yield};
std::string response; std::string response;
if (RPC::streamingRPC) if (setup_.yieldStrategy.streaming == RPC::YieldStrategy::Streaming::yes)
{ {
executeRPC (context, response); executeRPC (context, response, setup_.yieldStrategy);
} }
else else
{ {
Json::Value result; Json::Value result;
RPC::doCommand (context, result); RPC::doCommand (context, result, setup_.yieldStrategy);
// Always report "status". On an error report the request as received. // Always report "status". On an error report the request as received.
if (result.isMember ("error")) if (result.isMember ("error"))
@@ -797,8 +803,10 @@ setup_ServerHandler (BasicConfig const& config, std::ostream& log)
{ {
ServerHandler::Setup setup; ServerHandler::Setup setup;
setup.ports = detail::parse_Ports (config, log); setup.ports = detail::parse_Ports (config, log);
setup.yieldStrategy = RPC::makeYieldStrategy (config["server"]);
detail::setup_Client(setup); detail::setup_Client(setup);
detail::setup_Overlay(setup); detail::setup_Overlay(setup);
return setup; return setup;
} }