Refactor InfoSub to remove NetworkOPs dependency

This commit is contained in:
Vinnie Falco
2013-09-01 09:34:03 -07:00
parent 01fda4c30e
commit 35598d7933
11 changed files with 125 additions and 48 deletions

View File

@@ -557,7 +557,8 @@ public:
m_mainService, m_mainService,
m_peerSSLContext->get ()); m_peerSSLContext->get ());
#else #else
WriteLog (lsWARNING, Application) << "Peer PROXY interface: configured but disabled by build configuration."; WriteLog (lsWARNING, Application) <<
"Peer PROXY interface: configured but disabled by build configuration.";
#endif #endif
} }
} }
@@ -583,7 +584,7 @@ public:
// //
if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT) if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT)
{ {
m_wsPrivateDoor = WSDoor::New (getConfig ().WEBSOCKET_IP, m_wsPrivateDoor = WSDoor::New (getOPs(), getConfig ().WEBSOCKET_IP,
getConfig ().WEBSOCKET_PORT, false, m_wsSSLContext->get ()); getConfig ().WEBSOCKET_PORT, false, m_wsSSLContext->get ());
if (m_wsPrivateDoor == nullptr) if (m_wsPrivateDoor == nullptr)
@@ -601,7 +602,7 @@ public:
// //
if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT) if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT)
{ {
m_wsPublicDoor = WSDoor::New (getConfig ().WEBSOCKET_PUBLIC_IP, m_wsPublicDoor = WSDoor::New (getOPs(), getConfig ().WEBSOCKET_PUBLIC_IP,
getConfig ().WEBSOCKET_PUBLIC_PORT, true, m_wsSSLContext->get ()); getConfig ().WEBSOCKET_PUBLIC_PORT, true, m_wsSSLContext->get ());
if (m_wsPublicDoor == nullptr) if (m_wsPublicDoor == nullptr)

View File

@@ -13,9 +13,22 @@
class Peer; class Peer;
class LedgerConsensus; class LedgerConsensus;
/** Provides server functionality for clients.
Clients include backend applications, local commands, and connected
clients. This class acts as a proxy, fulfilling the command with local
data if possible, or asking the network and returning the results if
needed.
A backend application or local client can trust a local instance of
rippled / NetworkOPs. However, client software connecting to non-local
instances of rippled will need to be hardened to protect against hostile
or unreliable servers.
*/
class NetworkOPs class NetworkOPs
: public DeadlineTimer::Listener : public DeadlineTimer::Listener
, LeakChecked <NetworkOPs> , public InfoSub::Source
, public LeakChecked <NetworkOPs>
{ {
public: public:
enum Fault enum Fault
@@ -355,9 +368,9 @@ public:
void pubLedger (Ledger::ref lpAccepted); void pubLedger (Ledger::ref lpAccepted);
void pubProposedTransaction (Ledger::ref lpCurrent, SerializedTransaction::ref stTxn, TER terResult); void pubProposedTransaction (Ledger::ref lpCurrent, SerializedTransaction::ref stTxn, TER terResult);
//--------------------------------------------------------------------------
// //
// Monitoring: subscriber side // InfoSub::Source
// //
void subAccount (InfoSub::ref ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, uint32 uLedgerIndex, bool rt); void subAccount (InfoSub::ref ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, uint32 uLedgerIndex, bool rt);
void unsubAccount (uint64 uListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, bool rt); void unsubAccount (uint64 uListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, bool rt);
@@ -382,6 +395,9 @@ public:
InfoSub::pointer findRpcSub (const std::string& strUrl); InfoSub::pointer findRpcSub (const std::string& strUrl);
InfoSub::pointer addRpcSub (const std::string& strUrl, InfoSub::ref rspEntry); InfoSub::pointer addRpcSub (const std::string& strUrl, InfoSub::ref rspEntry);
//
//--------------------------------------------------------------------------
private: private:
void setHeartbeatTimer (); void setHeartbeatTimer ();
void setClusterTimer (); void setClusterTimer ();

View File

@@ -17,8 +17,9 @@
// VFALCO TODO Figure out how to clean up these globals // VFALCO TODO Figure out how to clean up these globals
InfoSub::InfoSub () InfoSub::InfoSub (Source& source)
: mLock (this, "InfoSub", __FILE__, __LINE__) : m_source (source)
, mLock (this, "InfoSub", __FILE__, __LINE__)
{ {
static Atomic <int> s_seq_id; static Atomic <int> s_seq_id;
mSeq = ++s_seq_id; mSeq = ++s_seq_id;
@@ -26,13 +27,12 @@ InfoSub::InfoSub ()
InfoSub::~InfoSub () InfoSub::~InfoSub ()
{ {
NetworkOPs& ops = getApp().getOPs (); m_source.unsubTransactions (mSeq);
ops.unsubTransactions (mSeq); m_source.unsubRTTransactions (mSeq);
ops.unsubRTTransactions (mSeq); m_source.unsubLedger (mSeq);
ops.unsubLedger (mSeq); m_source.unsubServer (mSeq);
ops.unsubServer (mSeq); m_source.unsubAccount (mSeq, mSubAccountInfo, true);
ops.unsubAccount (mSeq, mSubAccountInfo, true); m_source.unsubAccount (mSeq, mSubAccountInfo, false);
ops.unsubAccount (mSeq, mSubAccountInfo, false);
} }
void InfoSub::send (const Json::Value& jvObj, const std::string& sObj, bool broadcast) void InfoSub::send (const Json::Value& jvObj, const std::string& sObj, bool broadcast)

View File

@@ -12,6 +12,8 @@
class PathRequest; class PathRequest;
/** Manages a client's subscription to data feeds.
*/
class InfoSub class InfoSub
: public CountedObject <InfoSub> : public CountedObject <InfoSub>
{ {
@@ -26,7 +28,58 @@ public:
typedef const boost::shared_ptr<InfoSub>& ref; typedef const boost::shared_ptr<InfoSub>& ref;
public: public:
InfoSub (); /** Abstracts the source of subscription data.
*/
class Source
{
public:
// VFALCO TODO Rename the 'rt' parameters to something meaningful.
virtual void subAccount (ref ispListener,
const boost::unordered_set<RippleAddress>& vnaAccountIDs,
uint32 uLedgerIndex, bool rt) = 0;
virtual void unsubAccount (uint64 uListener,
const boost::unordered_set<RippleAddress>& vnaAccountIDs,
bool rt) = 0;
// VFALCO TODO Document the bool return value
virtual bool subLedger (ref ispListener,
Json::Value& jvResult) = 0;
virtual bool unsubLedger (uint64 uListener) = 0;
virtual bool subServer (ref ispListener,
Json::Value& jvResult) = 0;
virtual bool unsubServer (uint64 uListener) = 0;
virtual bool subBook (ref ispListener,
const uint160& currencyPays, const uint160& currencyGets,
const uint160& issuerPays, const uint160& issuerGets) = 0;
virtual bool unsubBook (uint64 uListener,
const uint160& currencyPays, const uint160& currencyGets,
const uint160& issuerPays, const uint160& issuerGets) = 0;
virtual bool subTransactions (ref ispListener) = 0;
virtual bool unsubTransactions (uint64 uListener) = 0;
virtual bool subRTTransactions (ref ispListener) = 0;
virtual bool unsubRTTransactions (uint64 uListener) = 0;
// VFALCO TODO Remove
// This was added for one particular partner, it
// "pushes" subscription data to a particular URL.
//
virtual pointer findRpcSub (const std::string& strUrl) = 0;
virtual pointer addRpcSub (const std::string& strUrl, ref rspEntry) = 0;
};
public:
explicit InfoSub (Source& source);
virtual ~InfoSub (); virtual ~InfoSub ();
@@ -53,6 +106,7 @@ protected:
LockType mLock; LockType mLock;
private: private:
Source& m_source;
boost::unordered_set <RippleAddress> mSubAccountInfo; boost::unordered_set <RippleAddress> mSubAccountInfo;
boost::unordered_set <RippleAddress> mSubAccountTransaction; boost::unordered_set <RippleAddress> mSubAccountTransaction;
boost::shared_ptr <PathRequest> mPathRequest; boost::shared_ptr <PathRequest> mPathRequest;

View File

@@ -3233,8 +3233,9 @@ Json::Value RPCHandler::doSubscribe (Json::Value params, LoadType* loadType, App
{ {
WriteLog (lsDEBUG, RPCHandler) << boost::str (boost::format ("doSubscribe: building: %s") % strUrl); WriteLog (lsDEBUG, RPCHandler) << boost::str (boost::format ("doSubscribe: building: %s") % strUrl);
RPCSub::pointer rspSub = boost::make_shared<RPCSub> (getApp ().getIOService (), RPCSub::pointer rspSub = boost::make_shared <RPCSub> (
getApp ().getJobQueue (), strUrl, strUsername, strPassword); getApp ().getOPs (), getApp ().getIOService (),
getApp ().getJobQueue (), strUrl, strUsername, strPassword);
ispSub = mNetOps->addRpcSub (strUrl, boost::dynamic_pointer_cast<InfoSub> (rspSub)); ispSub = mNetOps->addRpcSub (strUrl, boost::dynamic_pointer_cast<InfoSub> (rspSub));
} }
else else

View File

@@ -6,9 +6,11 @@
SETUP_LOG (RPCSub) SETUP_LOG (RPCSub)
RPCSub::RPCSub (boost::asio::io_service& io_service, JobQueue& jobQueue, RPCSub::RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service,
const std::string& strUrl, const std::string& strUsername, const std::string& strPassword) JobQueue& jobQueue, const std::string& strUrl, const std::string& strUsername,
: m_io_service (io_service) const std::string& strPassword)
: InfoSub (source)
, m_io_service (io_service)
, m_jobQueue (jobQueue) , m_jobQueue (jobQueue)
, mUrl (strUrl) , mUrl (strUrl)
, mSSL (false) , mSSL (false)

View File

@@ -4,28 +4,28 @@
*/ */
//============================================================================== //==============================================================================
#ifndef __RPCSUB__ #ifndef RIPPLE_RPCSUB_H_INCLUDED
#define __RPCSUB__ #define RIPPLE_RPCSUB_H_INCLUDED
// VFALCO TODO replace this macro with a language constant
#define RPC_EVENT_QUEUE_MAX 32 #define RPC_EVENT_QUEUE_MAX 32
// Subscription object for JSON-RPC // Subscription object for JSON-RPC
// VFALCO TODO Move the implementation into the .cpp
//
class RPCSub class RPCSub
: public InfoSub : public InfoSub
, LeakChecked <RPCSub> , public LeakChecked <RPCSub>
{ {
public: public:
typedef boost::shared_ptr<RPCSub> pointer; typedef boost::shared_ptr<RPCSub> pointer;
typedef const pointer& ref; typedef const pointer& ref;
RPCSub (boost::asio::io_service& io_service, RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service,
JobQueue& jobQueue, const std::string& strUrl, JobQueue& jobQueue, const std::string& strUrl,
const std::string& strUsername, const std::string& strPassword); const std::string& strUsername, const std::string& strPassword);
virtual ~RPCSub () virtual ~RPCSub () { }
{
;
}
// Implement overridden functions from base class: // Implement overridden functions from base class:
void send (const Json::Value& jvObj, bool broadcast); void send (const Json::Value& jvObj, bool broadcast);
@@ -67,4 +67,3 @@ private:
}; };
#endif #endif
// vim:ts=4

View File

@@ -14,7 +14,6 @@ struct WSConnectionLog;
// Helps with naming the lock // Helps with naming the lock
struct WSConnectionBase struct WSConnectionBase
{ {
}; };
template <typename endpoint_type> template <typename endpoint_type>
@@ -39,12 +38,10 @@ public:
typedef typename endpoint_type::handler::message_ptr message_ptr; typedef typename endpoint_type::handler::message_ptr message_ptr;
public: public:
// WSConnection() WSConnection (InfoSub::Source& source, WSServerHandler<endpoint_type>* wshpHandler,
// : mHandler((WSServerHandler<websocketpp::WSDOOR_SERVER>*)(NULL)), const connection_ptr& cpConnection)
// mConnection(connection_ptr()) { ; } : InfoSub (source)
, mRcvQueueLock (static_cast<WSConnectionBase const*>(this), "WSConn", __FILE__, __LINE__)
WSConnection (WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection)
: mRcvQueueLock (static_cast<WSConnectionBase const*>(this), "WSConn", __FILE__, __LINE__)
, mHandler (wshpHandler), mConnection (cpConnection), mNetwork (getApp().getOPs ()), , mHandler (wshpHandler), mConnection (cpConnection), mNetwork (getApp().getOPs ()),
mRemoteIP (cpConnection->get_socket ().lowest_layer ().remote_endpoint ().address ().to_string ()), mRemoteIP (cpConnection->get_socket ().lowest_layer ().remote_endpoint ().address ().to_string ()),
mLoadSource (mRemoteIP), mPingTimer (cpConnection->get_io_service ()), mPinged (false), mLoadSource (mRemoteIP), mPingTimer (cpConnection->get_io_service ()), mPinged (false),

View File

@@ -25,9 +25,11 @@ SETUP_LOG (WSDoor)
class WSDoorImp : public WSDoor, protected Thread, LeakChecked <WSDoorImp> class WSDoorImp : public WSDoor, protected Thread, LeakChecked <WSDoorImp>
{ {
public: public:
WSDoorImp (std::string const& strIp, int iPort, bool bPublic, WSDoorImp (InfoSub::Source& source,
std::string const& strIp, int iPort, bool bPublic,
boost::asio::ssl::context& ssl_context) boost::asio::ssl::context& ssl_context)
: Thread ("websocket") : Thread ("websocket")
, m_source (source)
, m_ssl_context (ssl_context) , m_ssl_context (ssl_context)
, m_endpointLock (this, "WSDoor", __FILE__, __LINE__) , m_endpointLock (this, "WSDoor", __FILE__, __LINE__)
, mPublic (bPublic) , mPublic (bPublic)
@@ -58,7 +60,7 @@ private:
(mPublic ? "Public" : "Private") % mIp % mPort); (mPublic ? "Public" : "Private") % mIp % mPort);
websocketpp::server_autotls::handler::ptr handler ( websocketpp::server_autotls::handler::ptr handler (
new WSServerHandler <websocketpp::server_autotls> ( new WSServerHandler <websocketpp::server_autotls> (m_source,
m_ssl_context, mPublic)); m_ssl_context, mPublic));
{ {
@@ -105,6 +107,7 @@ private:
typedef RippleRecursiveMutex LockType; typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType; typedef LockType::ScopedLockType ScopedLockType;
InfoSub::Source& m_source;
boost::asio::ssl::context& m_ssl_context; boost::asio::ssl::context& m_ssl_context;
LockType m_endpointLock; LockType m_endpointLock;
@@ -116,14 +119,14 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
WSDoor* WSDoor::New (std::string const& strIp, int iPort, bool bPublic, WSDoor* WSDoor::New (InfoSub::Source& source, std::string const& strIp,
boost::asio::ssl::context& ssl_context) int iPort, bool bPublic, boost::asio::ssl::context& ssl_context)
{ {
ScopedPointer <WSDoor> door; ScopedPointer <WSDoor> door;
try try
{ {
door = new WSDoorImp (strIp, iPort, bPublic, ssl_context); door = new WSDoorImp (source, strIp, iPort, bPublic, ssl_context);
} }
catch (...) catch (...)
{ {

View File

@@ -11,8 +11,8 @@
class WSDoor class WSDoor
{ {
public: public:
static WSDoor* New (std::string const& strIp, int iPort, bool bPublic, static WSDoor* New (InfoSub::Source& source, std::string const& strIp,
boost::asio::ssl::context& ssl_context); int iPort, bool bPublic, boost::asio::ssl::context& ssl_context);
}; };
#endif #endif

View File

@@ -41,7 +41,11 @@ public:
crTooSlow = 4000, // Client is too slow. crTooSlow = 4000, // Client is too slow.
}; };
private:
InfoSub::Source& m_source;
protected: protected:
// VFALCO TODO Make this private.
typedef RippleMutex LockType; typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType; typedef LockType::ScopedLockType ScopedLockType;
LockType mLock; LockType mLock;
@@ -50,15 +54,15 @@ private:
boost::asio::ssl::context& m_ssl_context; boost::asio::ssl::context& m_ssl_context;
protected: protected:
// For each connection maintain an associated object to track subscriptions. // For each connection maintain an associated object to track subscriptions.
boost::unordered_map <connection_ptr, boost::unordered_map <connection_ptr,
boost::shared_ptr <WSConnection <endpoint_type> > > mMap; boost::shared_ptr <WSConnection <endpoint_type> > > mMap;
bool mPublic; bool mPublic;
public: public:
WSServerHandler (boost::asio::ssl::context& ssl_context, bool bPublic) WSServerHandler (InfoSub::Source& source, boost::asio::ssl::context& ssl_context, bool bPublic)
: mLock (static_cast <WSServerHandlerBase*> (this), "WSServerHandler", __FILE__, __LINE__) : m_source (source)
, mLock (static_cast <WSServerHandlerBase*> (this), "WSServerHandler", __FILE__, __LINE__)
, m_ssl_context (ssl_context) , m_ssl_context (ssl_context)
, mPublic (bPublic) , mPublic (bPublic)
{ {
@@ -168,7 +172,7 @@ public:
try try
{ {
mMap[cpClient] = boost::make_shared< WSConnection<endpoint_type> > (this, cpClient); mMap[cpClient] = boost::make_shared< WSConnection<endpoint_type> > (m_source, this, cpClient);
} }
catch (...) catch (...)
{ {