Add RPCService, call the Manager from RPCServerHandler

This commit is contained in:
Vinnie Falco
2013-09-21 16:57:36 -07:00
parent be1cede458
commit e59293ec92
15 changed files with 474 additions and 46 deletions

View File

@@ -22,6 +22,12 @@
<ClCompile Include="..\..\build\proto\ripple.pb.cc" />
<ClCompile Include="..\..\src\ripple\beast\ripple_beast.cpp" />
<ClCompile Include="..\..\src\ripple\beast\ripple_beastc.c" />
<ClCompile Include="..\..\src\ripple\frame\api\RPCService.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\frame\api\Service.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -1493,6 +1499,7 @@
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple\frame\api\RPCService.h" />
<ClInclude Include="..\..\src\ripple\frame\api\Service.h" />
<ClInclude Include="..\..\src\ripple\frame\ripple_frame.h" />
<ClInclude Include="..\..\src\ripple\json\api\json_config.h" />

View File

@@ -984,6 +984,9 @@
<ClCompile Include="..\..\src\ripple_app\node\SqliteFactory.cpp">
<Filter>[2] Old Ripple\ripple_app\node</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\frame\api\RPCService.cpp">
<Filter>[1] Ripple\frame\api</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\KeyCache.h">
@@ -1944,6 +1947,9 @@
<ClInclude Include="..\..\src\ripple_app\node\SqliteFactory.h">
<Filter>[2] Old Ripple\ripple_app\node</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\frame\api\RPCService.h">
<Filter>[1] Ripple\frame\api</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -169,9 +169,15 @@
// This is only here temporarily. Use it to turn off the sending of
// "ANNOUNCE" messages if you suspect that you're having problems
// because of it.
#ifndef RIPPLE_USE_MT_ANNOUNCE
#define RIPPLE_USE_MT_ANNOUNCE 0
#endif
// Here temporarily
// Controls whether or not the new RPCService::Manager logic will be
// used to invoke RPC commands before they pass to the original code.
#ifndef RIPPLE_USE_RPC_SERVICE_MANAGER
#define RIPPLE_USE_RPC_SERVICE_MANAGER 0
#endif
#endif

View File

@@ -0,0 +1,96 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
namespace ripple
{
class RPCService::ManagerImp : public RPCService::Manager
{
public:
// The type of map we use to look up by function name.
//
typedef boost::unordered_map <std::string, Handler> MapType;
//--------------------------------------------------------------------------
explicit ManagerImp (Journal journal)
: m_journal (journal)
{
}
~ManagerImp()
{
}
void add (RPCService& service)
{
Handlers const& handlers (service.m_handlers);
SharedState::Access state (m_state);
for (Handlers::const_iterator iter (handlers.begin());
iter != handlers.end(); ++iter)
{
Handler const& handler (*iter);
std::pair <MapType::const_iterator, bool> result (
state->table.emplace (handler.method(), handler));
if (!result.second)
m_journal.error << "duplicate method '" << handler.method() << "'";
}
}
std::pair <bool, Json::Value> call (
std::string const& method, Json::Value const& args)
{
Handler const* handler (find (method));
if (! handler)
return std::make_pair (false, Json::Value());
return std::make_pair (true, (*handler)(args));
}
Handler const* find (std::string const& method)
{
Handler const* handler (nullptr);
// Peform lookup on the method to retrieve handler
SharedState::Access state (m_state);
MapType::iterator iter (state->table.find (method));
if (iter != state->table.end())
handler = &iter->second;
else
m_journal.debug << "method '" << method << "' not found.";
return handler;
}
private:
struct State
{
MapType table;
};
typedef SharedData <State> SharedState;
Journal m_journal;
SharedState m_state;
};
//------------------------------------------------------------------------------
RPCService::Manager* RPCService::Manager::New (Journal journal)
{
return new RPCService::ManagerImp (journal);
}
//------------------------------------------------------------------------------
RPCService::RPCService ()
{
}
RPCService::~RPCService ()
{
}
}

View File

@@ -0,0 +1,188 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_FRAME_RPCSERVICE_H_INCLUDED
#define RIPPLE_FRAME_RPCSERVICE_H_INCLUDED
#include "../../../beast/beast/utility/Journal.h"
namespace ripple
{
using namespace beast;
/** Interface for abstacting RPC commands processing. */
class RPCService : public Uncopyable
{
public:
//--------------------------------------------------------------------------
/** An invokable handler for a particular method. */
class Handler
{
public:
/** Create a handler with the specified method and function. */
template <typename Function> // allocator
Handler (std::string const& method_, Function function)
: m_method (method_)
, m_function (function)
{
}
Handler (Handler const& other)
: m_method (other.m_method)
, m_function (other.m_function)
{
}
Handler& operator= (Handler const& other)
{
m_method = other.m_method;
m_function = other.m_function;
return *this;
}
/** Returns the method called when this handler is invoked. */
std::string const& method() const
{
return m_method;
}
/** Synchronously invoke the method on the associated service.
Thread safety:
Determined by the owner.
*/
Json::Value operator() (Json::Value const& args) const
{
return m_function (args);
}
private:
std::string m_method;
SharedFunction <Json::Value (Json::Value const&)> m_function;
};
//--------------------------------------------------------------------------
/** Manages a collection of RPCService interface objects. */
class Manager
{
public:
static Manager* New (Journal journal);
virtual ~Manager() { }
/** Add a service.
The list of commands that the service handles is enumerated and
added to the manager's dispatch table.
Thread safety:
Safe to call from any thread.
May only be called once for a given service.
*/
virtual void add (RPCService& service) = 0;
/** Add a subclass of RPCService and return the original pointer.
This is provided as a convenient so that RPCService objects may
be added from ctor-initializer lists.
*/
template <class Derived>
Derived* add (Derived* derived)
{
add (*(static_cast <RPCService*>(derived)));
return derived;
}
/** Execute an RPC command synchronously.
On return, if result.first == `true` then result.second will
have the Json return value from the call of the handler.
*/
virtual std::pair <bool, Json::Value> call (
std::string const& method, Json::Value const& args) = 0;
/** Execute an RPC command asynchronously.
If the method exists, the dispatcher is invoked to provide the
context for calling the handler with the argument list and this
function returns `true` immediately. The dispatcher calls the
CompletionHandler when the operation is complete. If the method
does not exist, `false` is returned.
Copies of the Dispatcher and CompletionHandler are made as needed.
CompletionHandler must be compatible with this signature:
void (Json::Value const&)
Dispatcher is a functor compatible with this signature:
void (Handler const& handler,
Json::Value const& args,
CompletionHandler completionHandler);
Thread safety:
Safe to call from any thread.
@return `true` if a handler was found.
*/
template <class CompletionHandler, class Dispatcher>
bool call_async (std::string const& method,
Json::Value const& args,
CompletionHandler completionHandler,
Dispatcher dispatcher)
{
Handler const* handler (find (method));
if (! handler)
return false;
dispatcher (*handler, args, completionHandler);
return true;
}
/** Returns the Handler for the specified method, or nullptr.
Thread safety:
Safe to call from any threads.
*/
virtual Handler const* find (std::string const& method) = 0;
};
//--------------------------------------------------------------------------
public:
typedef std::vector <Handler> Handlers;
/** Create the service.
Derived classes will usually call add() repeatedly from their
constructor to fill in the list of handlers prior to Manager::add.
*/
RPCService ();
virtual ~RPCService ();
/** Returns the handlers associated with this service. */
Handlers const& handlers() const
{
return m_handlers;
}
/** Add a handler for the specified method.
Adding a handler after the service is already associated with a
Manager results in undefined behavior.
Thread safety:
May not be called concurrently.
*/
template <typename Function>
void addRPCHandler (std::string const& method, Function function)
{
m_handlers.push_back (Handler (method, function));
}
private:
class ManagerImp;
Handlers m_handlers;
};
//------------------------------------------------------------------------------
}
#endif

View File

@@ -8,6 +8,10 @@
#include "beast/modules/beast_core/beast_core.h"
#include "beast/modules/beast_core/system/BeforeBoost.h" // must come first
#include <boost/unordered_map.hpp>
#include "ripple_frame.h"
#include "api/RPCService.cpp"
#include "api/Service.cpp"

View File

@@ -9,6 +9,9 @@
#include "beast/modules/beast_core/beast_core.h"
#include "../json/ripple_json.h"
#include "api/RPCService.h"
#include "api/Service.h"
#endif

View File

@@ -17,7 +17,7 @@ namespace Validators
All operations are performed asynchronously on an internal thread.
*/
class Manager : public Uncopyable
class Manager : public RPCService
{
public:
/** Create a new Manager object.

View File

@@ -30,6 +30,11 @@ public:
//m_map.reserve (expectedSize);
}
MapType const& map() const
{
return m_map;
}
std::size_t size () const noexcept
{
return m_map.size ();

View File

@@ -33,8 +33,6 @@ enum
class Logic
{
public:
//----------------------------------------------------------------------
// Information associated with each distinguishable validator
//
struct ValidatorInfo
@@ -50,6 +48,20 @@ public:
typedef boost::unordered_map <
PublicKey, ValidatorInfo, PublicKey::HashFunction> MapType;
struct State
{
MapType map;
SourcesType sources;
};
typedef SharedData <State> SharedState;
Store& m_store;
Journal m_journal;
bool m_rebuildChosenList;
ChosenList::Ptr m_chosenList;
SharedState m_state;
//----------------------------------------------------------------------
Logic (Store& store, Journal journal = Journal ())
@@ -77,9 +89,10 @@ public:
Source::Result result (object->fetch (cancelCallback, m_journal));
SharedState::Access state (m_state);
if (result.success)
{
merge (result.list);
merge (result.list, state);
}
else
{
@@ -92,23 +105,22 @@ public:
void add (Source* source)
{
m_journal.info << "Add Source, " << source->name();
SourceDesc& desc (*m_sources.emplace_back ());
SharedState::Access state (m_state);
SourceDesc& desc (*state->sources.emplace_back ());
desc.source = source;
m_store.insert (desc);
}
// Add each entry in the list to the map, incrementing the
// reference count if it already exists, and updating fields.
//
void merge (Array <Source::Info> const& list)
void merge (Array <Source::Info> const& list, SharedState::Access& state)
{
for (std::size_t i = 0; i < list.size (); ++i)
{
Source::Info const& info (list.getReference (i));
std::pair <MapType::iterator, bool> result (
m_map.emplace (info.publicKey, ValidatorInfo ()));
state->map.emplace (info.publicKey, ValidatorInfo ()));
ValidatorInfo& validatorInfo (result.first->second);
++validatorInfo.refCount;
if (result.second)
@@ -122,18 +134,18 @@ public:
// Decrement the reference count of each item in the list
// in the map.
//
void remove (Array <Source::Info> const& list)
void remove (Array <Source::Info> const& list, SharedState::Access& state)
{
for (std::size_t i = 0; i < list.size (); ++i)
{
Source::Info const& info (list.getReference (i));
MapType::iterator iter (m_map.find (info.publicKey));
bassert (iter != m_map.end ());
MapType::iterator iter (state->map.find (info.publicKey));
bassert (iter != state->map.end ());
ValidatorInfo& validatorInfo (iter->second);
if (--validatorInfo.refCount == 0)
{
// Last reference removed
m_map.erase (iter);
state->map.erase (iter);
dirtyChosen ();
}
}
@@ -147,10 +159,11 @@ public:
/** Rebuild the Chosen List. */
void buildChosen ()
{
ChosenList::Ptr list (new ChosenList (m_map.size ()));
SharedState::ConstAccess state (m_state);
ChosenList::Ptr list (new ChosenList (state->map.size ()));
for (MapType::iterator iter = m_map.begin ();
iter != m_map.end (); ++iter)
for (MapType::const_iterator iter = state->map.begin ();
iter != state->map.end (); ++iter)
{
ChosenList::Info info;
list->insert (iter->first, info);
@@ -208,14 +221,16 @@ public:
if (result.success)
{
SharedState::Access state (m_state);
// Add the new source info to the map
merge (result.list);
merge (result.list, state);
// Swap lists
desc.result.swapWith (result);
// Remove the old source info from the map
remove (result.list);
remove (result.list, state);
// See if we need to rebuild
checkChosen ();
@@ -239,10 +254,10 @@ public:
}
/** Expire a source's list of validators. */
void expire (SourceDesc& desc)
void expire (SourceDesc& desc, SharedState::Access& state)
{
// Decrement reference count on each validator
remove (desc.result.list);
remove (desc.result.list, state);
m_store.update (desc);
}
@@ -254,8 +269,10 @@ public:
{
bool interrupted (false);
Time const currentTime (Time::getCurrentTime ());
for (SourcesType::iterator iter = m_sources.begin ();
iter != m_sources.end (); ++iter)
SharedState::Access state (m_state);
for (SourcesType::iterator iter = state->sources.begin ();
iter != state->sources.end (); ++iter)
{
SourceDesc& desc (*iter);
@@ -276,13 +293,54 @@ public:
if (desc.expirationTime.isNotNull () &&
desc.expirationTime <= currentTime)
{
expire (desc);
expire (desc, state);
}
}
return interrupted;
}
//----------------------------------------------------------------------
//
// RPC Handlers
//
// Return the current ChosenList as JSON
Json::Value rpcPrint (Json::Value const& args)
{
Json::Value result;
ChosenList::Ptr list (m_chosenList);
if (! list.empty())
{
Json::Value entries (result["chosen_list"]);
std::size_t i (1);
for (ChosenList::MapType::const_iterator iter (list->map().begin());
iter != list->map().end(); ++iter)
{
ChosenList::MapType::key_type const& key (iter->first);
ChosenList::MapType::mapped_type const& value (iter->second);
entries[i] = i;
++i;
}
}
else
{
result ["chosen_list"] = "empty";
}
return result;
}
// Returns the list of sources
Json::Value rpcSources (Json::Value const& arg)
{
Json::Value result;
Json::Value sources (result ["validators_sources"]);
return result;
}
//----------------------------------------------------------------------
//
// Ripple interface
@@ -293,8 +351,8 @@ public:
void receiveValidation (ReceivedValidation const& rv)
{
#if 0
MapType::iterator iter (m_map.find (rv.signerPublicKeyHash));
if (iter != m_map.end ())
MapType::iterator iter (state->map.find (rv.signerPublicKeyHash));
if (iter != state->map.end ())
{
// Exists
//ValidatorInfo& validatorInfo (iter->value ());
@@ -302,7 +360,7 @@ public:
else
{
// New
//ValidatorInfo& validatorInfo (m_map.insert (rv.signerPublicKeyHash));
//ValidatorInfo& validatorInfo (state->map.insert (rv.signerPublicKeyHash));
}
#endif
}
@@ -317,14 +375,6 @@ public:
//
//
//----------------------------------------------------------------------
private:
Store& m_store;
Journal m_journal;
SourcesType m_sources;
MapType m_map;
bool m_rebuildChosenList;
ChosenList::Ptr m_chosenList;
};
}

View File

@@ -108,6 +108,7 @@ public:
, m_checkTimer (this)
, m_checkSources (true) // true to cause a full scan on start
{
addRPCHandlers();
m_thread.start (this);
}
@@ -126,6 +127,41 @@ public:
m_thread.stop (false);
}
//--------------------------------------------------------------------------
//
// RPCService
//
Json::Value rpcPrint (Json::Value const& args)
{
return m_logic.rpcPrint (args);
}
Json::Value rpcRebuild (Json::Value const& args)
{
m_thread.call (&Logic::dirtyChosen, &m_logic);
Json::Value result;
result ["chosen_list"] = "rebuilding";
return result;
}
Json::Value rpcSources (Json::Value const& args)
{
return m_logic.rpcSources(args);
}
void addRPCHandlers()
{
addRPCHandler ("validators_print", beast::bind (
&ManagerImp::rpcPrint, this, beast::_1));
addRPCHandler ("validators_rebuild", beast::bind (
&ManagerImp::rpcRebuild, this, beast::_1));
addRPCHandler ("validators_sources", beast::bind (
&ManagerImp::rpcSources, this, beast::_1));
}
//--------------------------------------------------------------------------
void addStrings (String name, std::vector <std::string> const& strings)
@@ -267,9 +303,9 @@ private:
//------------------------------------------------------------------------------
Manager* Manager::New (Service& parent, Journal journal)
Validators::Manager* Validators::Manager::New (Service& parent, Journal journal)
{
return new ManagerImp (parent, journal);
return new Validators::ManagerImp (parent, journal);
}
}

View File

@@ -22,6 +22,8 @@ class JobQueueLog;
template <> char const* LogPartition::getPartitionName <JobQueueLog> () { return "JobQueue"; }
class NetworkOPsLog;
template <> char const* LogPartition::getPartitionName <NetworkOPsLog> () { return "NetworkOPs"; }
class RPCServiceManagerLog;
template <> char const* LogPartition::getPartitionName <RPCServiceManagerLog> () { return "RPCServiceManager"; }
//
//------------------------------------------------------------------------------
@@ -51,6 +53,9 @@ public:
, m_journal (LogJournal::get <ApplicationLog> ())
, m_tempNodeCache ("NodeCache", 16384, 90)
, m_sleCache ("LedgerEntryCache", 4096, 120)
, m_rpcServiceManager (RPCService::Manager::New (
LogJournal::get <RPCServiceManagerLog> ()))
// The JobQueue has to come pretty early since
// almost everything is a Service child of the JobQueue.
@@ -90,7 +95,9 @@ public:
, m_txQueue (TxQueue::New ())
, m_validators (Validators::Manager::New (*this, LogJournal::get <ValidatorsLog> ()))
, m_validators (m_rpcServiceManager->add (
Validators::Manager::New (*this, LogJournal::get <ValidatorsLog> ())
))
, mFeatures (IFeatures::New (2 * 7 * 24 * 60 * 60, 200)) // two weeks, 200/256
@@ -159,6 +166,11 @@ public:
//--------------------------------------------------------------------------
RPCService::Manager& getRPCServiceManager()
{
return *m_rpcServiceManager;
}
LocalCredentials& getLocalCredentials ()
{
return m_localCredentials ;
@@ -774,6 +786,8 @@ private:
SLECache m_sleCache;
LocalCredentials m_localCredentials;
TransactionMaster m_txMaster;
ScopedPointer <RPCService::Manager> m_rpcServiceManager;
// These are Service-related
ScopedPointer <JobQueue> m_jobQueue;

View File

@@ -57,7 +57,7 @@ public:
public:
struct State
{
// Stuff in here is accessed concurrently and requires a WriteAccess
// Stuff in here is accessed concurrently and requires a Access
};
typedef SharedData <State> SharedState;
@@ -76,9 +76,9 @@ public:
virtual boost::asio::io_service& getIOService () = 0;
virtual RPCService::Manager& getRPCServiceManager() = 0;
virtual NodeCache& getTempNodeCache () = 0;
virtual SLECache& getSLECache () = 0;
virtual Validators::Manager& getValidators () = 0;
virtual IFeatures& getFeatureTable () = 0;
virtual IFeeVote& getFeeVote () = 0;

View File

@@ -3697,7 +3697,16 @@ Json::Value RPCHandler::doRpcCommand (const std::string& strMethod, Json::Value
// Provide the JSON-RPC method as the field "command" in the request.
params["command"] = strMethod;
Json::Value jvResult = doCommand (params, iRole, loadType);
Json::Value jvResult;
#if RIPPLE_USE_RPC_SERVICE_MANAGER
std::pair <bool, Json::Value> result (getApp().
getRPCServiceManager().call (strMethod, params));
if (result.first)
jvResult = result.second;
else
#endif
jvResult = doCommand (params, iRole, loadType);
// Always report "status". On an error report the request as received.
if (jvResult.isMember ("error"))

View File

@@ -803,18 +803,19 @@ public:
// Convert a rpc method and params to a request.
// <-- { method: xyz, params: [... ] } or { error: ..., ... }
Json::Value parseCommand (std::string strMethod, Json::Value jvParams)
Json::Value parseCommand (std::string strMethod, Json::Value jvParams, bool allowAnyCommand)
{
WriteLog (lsTRACE, RPCParser) << "RPC method:" << strMethod;
WriteLog (lsTRACE, RPCParser) << "RPC params:" << jvParams;
static struct
struct Command
{
const char* pCommand;
parseFuncPtr pfpFunc;
int iMinParams;
int iMaxParams;
} commandsA[] =
};
static Command commandsA[] =
{
// Request-response methods
// - Returns an error, or the request.
@@ -897,7 +898,10 @@ public:
if (i < 0)
{
return rpcError (rpcUNKNOWN_COMMAND);
if (!allowAnyCommand)
return rpcError (rpcUNKNOWN_COMMAND);
return parseAsIs (jvParams);
}
else if ((commandsA[i].iMinParams >= 0 && jvParams.size () < commandsA[i].iMinParams)
|| (commandsA[i].iMaxParams >= 0 && jvParams.size () > commandsA[i].iMaxParams))
@@ -1004,7 +1008,7 @@ int RPCCall::fromCommandLine (const std::vector<std::string>& vCmd)
jvRpc["method"] = vCmd[0];
jvRpc["params"] = jvRpcParams;
jvRequest = rpParser.parseCommand (vCmd[0], jvRpcParams);
jvRequest = rpParser.parseCommand (vCmd[0], jvRpcParams, true);
WriteLog (lsTRACE, RPCParser) << "RPC Request: " << jvRequest << std::endl;