Use continuation in legacy pathfinding:

Handle legacy (ripple_path_find) requests that don't specify a ledger
the same way regular path_find requests are. This provides a
performance improvement for these requests and reduces the problem
of server busy errors.

Conflicts:
	src/ripple/app/paths/PathRequest.cpp
This commit is contained in:
JoelKatz
2015-06-17 13:27:51 -07:00
committed by Vinnie Falco
parent 3d6e76046c
commit c64ec9cfb9
7 changed files with 146 additions and 19 deletions

View File

@@ -58,6 +58,27 @@ PathRequest::PathRequest (
ptCreated = boost::posix_time::microsec_clock::universal_time ();
}
PathRequest::PathRequest (
std::function <void(void)> const& completion,
int id,
PathRequests& owner,
beast::Journal journal)
: m_journal (journal)
, mOwner (owner)
, fCompletion (completion)
, jvStatus (Json::objectValue)
, bValid (false)
, mLastIndex (0)
, mInProgress (false)
, iLastLevel (0)
, bLastSuccess (false)
, iIdentifier (id)
{
if (m_journal.debug)
m_journal.debug << iIdentifier << " created";
ptCreated = boost::posix_time::microsec_clock::universal_time ();
}
static std::string const get_milli_diff (
boost::posix_time::ptime const& after,
boost::posix_time::ptime
@@ -132,12 +153,23 @@ bool PathRequest::needsUpdate (bool newOnly, LedgerIndex index)
return true;
}
bool PathRequest::hasCompletion ()
{
return bool (fCompletion);
}
void PathRequest::updateComplete ()
{
ScopedLockType sl (mIndexLock);
assert (mInProgress);
mInProgress = false;
if (fCompletion)
{
fCompletion();
fCompletion = std::function<void (void)>();
}
}
bool PathRequest::isValid (RippleLineCache::ref crCache)
@@ -407,6 +439,15 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
}
}
if (hasCompletion ())
{
// Old ripple_path_find API gives destination_currencies
auto& destCurrencies = (jvStatus[jss::destination_currencies] = Json::arrayValue);
auto usCurrencies = accountDestCurrencies (*raDstAccount, cache, true);
for (auto const& c : usCurrencies)
destCurrencies.append (to_string (c));
}
jvStatus[jss::source_account] = getApp().accountIDCache().toBase58(*raSrcAccount);
jvStatus[jss::destination_account] = getApp().accountIDCache().toBase58(*raDstAccount);
jvStatus[jss::destination_amount] = saDstAmount.getJson (0);
@@ -535,6 +576,13 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
jvEntry[jss::source_amount] = rc.actualAmountIn.getJson (0);
jvEntry[jss::paths_computed] = spsPaths.getJson (0);
if (hasCompletion ())
{
// Old ripple_path_find API requires this
jvEntry[jss::paths_canonical] = Json::arrayValue;
}
found = true;
jvArray.append (jvEntry);
}

View File

@@ -61,6 +61,12 @@ public:
PathRequests&,
beast::Journal journal);
PathRequest (
std::function <void (void)> const& completion,
int id,
PathRequests&,
beast::Journal journal);
~PathRequest ();
bool isValid ();
@@ -80,6 +86,7 @@ public:
// update jvStatus
Json::Value doUpdate (const std::shared_ptr<RippleLineCache>&, bool fast);
InfoSub::pointer getSubscriber ();
bool hasCompletion ();
private:
bool isValid (RippleLineCache::ref crCache);
@@ -96,6 +103,7 @@ private:
PathRequests& mOwner;
std::weak_ptr<InfoSub> wpSubscriber; // Who this request came from
std::function <void (void)> fCompletion;
Json::Value jvId;
Json::Value jvStatus; // Last result

View File

@@ -105,6 +105,13 @@ void PathRequests::updateAll (Ledger::ref inLedger,
++processed;
}
}
else if (pRequest->hasCompletion ())
{
// One-shot request with completion function
pRequest->doUpdate (cache, false);
pRequest->updateComplete();
++processed;
}
}
}
@@ -168,6 +175,25 @@ void PathRequests::updateAll (Ledger::ref inLedger,
removed << " removed";
}
void PathRequests::insertPathRequest (PathRequest::pointer const& req)
{
ScopedLockType sl (mLock);
// Insert after any older unserviced requests but before any serviced requests
std::vector<PathRequest::wptr>::iterator it = mRequests.begin ();
while (it != mRequests.end ())
{
PathRequest::pointer req = it->lock ();
if (req && !req->isNew ())
break; // This request has been handled, we come before it
// This is a newer request, we come after it
++it;
}
mRequests.insert (it, PathRequest::wptr (req));
}
// Make a new-style path_find request
Json::Value PathRequests::makePathRequest(
std::shared_ptr <InfoSub> const& subscriber,
const std::shared_ptr<Ledger>& inLedger,
@@ -189,27 +215,47 @@ Json::Value PathRequests::makePathRequest(
if (valid)
{
{
ScopedLockType sl (mLock);
// Insert after any older unserviced requests but before any serviced requests
std::vector<PathRequest::wptr>::iterator it = mRequests.begin ();
while (it != mRequests.end ())
{
PathRequest::pointer req = it->lock ();
if (req && !req->isNew ())
break; // This request has been handled, we come before it
// This is a newer request, we come after it
++it;
}
mRequests.insert (it, PathRequest::wptr (req));
}
subscriber->setPathRequest (req);
insertPathRequest (req);
getApp().getLedgerMaster().newPathRequest();
}
return result;
}
// Make an old-style ripple_path_find request
Json::Value PathRequests::makeLegacyPathRequest(
PathRequest::pointer& req,
std::function <void (void)> completion,
const std::shared_ptr<Ledger>& inLedger,
Json::Value const& request)
{
// This assignment must take place before the
// completion function is called
req = std::make_shared<PathRequest> (
completion, ++mLastIdentifier, *this, mJournal);
auto ledger = inLedger;
RippleLineCache::pointer cache;
{
ScopedLockType sl (mLock);
cache = getLineCache (ledger, false);
}
bool valid = false;
Json::Value result = req->doCreate (ledger, cache, request, valid);
if (!valid)
{
req.reset();
}
else
{
insertPathRequest (req);
getApp().getLedgerMaster().newPathRequest();
}
return result;
}
} // ripple

View File

@@ -49,6 +49,12 @@ public:
const std::shared_ptr<Ledger>& ledger,
Json::Value const& request);
Json::Value makeLegacyPathRequest (
PathRequest::pointer& req,
std::function <void (void)> completion,
const std::shared_ptr<Ledger>& inLedger,
Json::Value const& request);
void reportFast (int milliseconds)
{
mFast.notify (static_cast < beast::insight::Event::value_type> (milliseconds));
@@ -60,6 +66,8 @@ public:
}
private:
void insertPathRequest (PathRequest::pointer const&);
beast::Journal mJournal;
beast::insight::Event mFast;

View File

@@ -59,6 +59,25 @@ Json::Value doRipplePathFind (RPC::Context& context)
if (!lpLedger)
return jvResult;
}
else
{
context.loadType = Resource::feeHighBurdenRPC;
lpLedger = context.netOps.getClosedLedger();
PathRequest::pointer request;
context.suspend ([&request, &context, &jvResult, &lpLedger](RPC::Callback const& c)
{
jvResult = getApp().getPathRequests().makeLegacyPathRequest (
request, c, lpLedger, context.params);
if (! request)
c();
});
if (request)
jvResult = request->doStatus (context.params);
return jvResult;
}
if (!context.params.isMember (jss::source_account))
{

View File

@@ -60,7 +60,6 @@ ServerHandlerImp::ServerHandlerImp (Stoppable& parent,
: ServerHandler (parent)
, m_resourceManager (resourceManager)
, m_journal (deprecatedLogs().journal("Server"))
, m_jobQueue (jobQueue)
, m_networkOPs (networkOPs)
, m_server (HTTP::make_Server(
*this, io_service, deprecatedLogs().journal("Server")))

View File

@@ -37,7 +37,6 @@ class ServerHandlerImp
private:
Resource::Manager& m_resourceManager;
beast::Journal m_journal;
JobQueue& m_jobQueue;
NetworkOPs& m_networkOPs;
std::unique_ptr<HTTP::Server> m_server;
RPC::Continuation m_continuation;