From 35598d7933489111bcc0182525a4748fcc4680ce Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 1 Sep 2013 09:34:03 -0700 Subject: [PATCH] Refactor InfoSub to remove NetworkOPs dependency --- .../ripple_app/main/ripple_Application.cpp | 7 ++- modules/ripple_app/misc/NetworkOPs.h | 22 +++++++- modules/ripple_app/misc/ripple_InfoSub.cpp | 18 +++--- modules/ripple_app/misc/ripple_InfoSub.h | 56 ++++++++++++++++++- modules/ripple_app/rpc/RPCHandler.cpp | 5 +- modules/ripple_app/rpc/RPCSub.cpp | 8 ++- modules/ripple_app/rpc/RPCSub.h | 17 +++--- modules/ripple_app/websocket/WSConnection.h | 11 ++-- modules/ripple_app/websocket/WSDoor.cpp | 13 +++-- modules/ripple_app/websocket/WSDoor.h | 4 +- .../ripple_app/websocket/WSServerHandler.h | 12 ++-- 11 files changed, 125 insertions(+), 48 deletions(-) diff --git a/modules/ripple_app/main/ripple_Application.cpp b/modules/ripple_app/main/ripple_Application.cpp index 2547944ae0..0acb983854 100644 --- a/modules/ripple_app/main/ripple_Application.cpp +++ b/modules/ripple_app/main/ripple_Application.cpp @@ -557,7 +557,8 @@ public: m_mainService, m_peerSSLContext->get ()); #else - WriteLog (lsWARNING, Application) << "Peer PROXY interface: configured but disabled by build configuration."; + WriteLog (lsWARNING, Application) << + "Peer PROXY interface: configured but disabled by build configuration."; #endif } } @@ -583,7 +584,7 @@ public: // if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT) { - m_wsPrivateDoor = WSDoor::New (getConfig ().WEBSOCKET_IP, + m_wsPrivateDoor = WSDoor::New (getOPs(), getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false, m_wsSSLContext->get ()); if (m_wsPrivateDoor == nullptr) @@ -601,7 +602,7 @@ public: // if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT) { - m_wsPublicDoor = WSDoor::New (getConfig ().WEBSOCKET_PUBLIC_IP, + m_wsPublicDoor = WSDoor::New (getOPs(), getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true, m_wsSSLContext->get ()); if (m_wsPublicDoor == nullptr) diff --git a/modules/ripple_app/misc/NetworkOPs.h b/modules/ripple_app/misc/NetworkOPs.h index 3d66506d50..64ee3bfcb8 100644 --- a/modules/ripple_app/misc/NetworkOPs.h +++ b/modules/ripple_app/misc/NetworkOPs.h @@ -13,9 +13,22 @@ class Peer; class LedgerConsensus; +/** Provides server functionality for clients. + + Clients include backend applications, local commands, and connected + clients. This class acts as a proxy, fulfilling the command with local + data if possible, or asking the network and returning the results if + needed. + + A backend application or local client can trust a local instance of + rippled / NetworkOPs. However, client software connecting to non-local + instances of rippled will need to be hardened to protect against hostile + or unreliable servers. +*/ class NetworkOPs : public DeadlineTimer::Listener - , LeakChecked + , public InfoSub::Source + , public LeakChecked { public: enum Fault @@ -355,9 +368,9 @@ public: void pubLedger (Ledger::ref lpAccepted); void pubProposedTransaction (Ledger::ref lpCurrent, SerializedTransaction::ref stTxn, TER terResult); - + //-------------------------------------------------------------------------- // - // Monitoring: subscriber side + // InfoSub::Source // void subAccount (InfoSub::ref ispListener, const boost::unordered_set& vnaAccountIDs, uint32 uLedgerIndex, bool rt); void unsubAccount (uint64 uListener, const boost::unordered_set& vnaAccountIDs, bool rt); @@ -382,6 +395,9 @@ public: InfoSub::pointer findRpcSub (const std::string& strUrl); InfoSub::pointer addRpcSub (const std::string& strUrl, InfoSub::ref rspEntry); + // + //-------------------------------------------------------------------------- + private: void setHeartbeatTimer (); void setClusterTimer (); diff --git a/modules/ripple_app/misc/ripple_InfoSub.cpp b/modules/ripple_app/misc/ripple_InfoSub.cpp index f31469a037..709218c5de 100644 --- a/modules/ripple_app/misc/ripple_InfoSub.cpp +++ b/modules/ripple_app/misc/ripple_InfoSub.cpp @@ -17,8 +17,9 @@ // VFALCO TODO Figure out how to clean up these globals -InfoSub::InfoSub () - : mLock (this, "InfoSub", __FILE__, __LINE__) +InfoSub::InfoSub (Source& source) + : m_source (source) + , mLock (this, "InfoSub", __FILE__, __LINE__) { static Atomic s_seq_id; mSeq = ++s_seq_id; @@ -26,13 +27,12 @@ InfoSub::InfoSub () InfoSub::~InfoSub () { - NetworkOPs& ops = getApp().getOPs (); - ops.unsubTransactions (mSeq); - ops.unsubRTTransactions (mSeq); - ops.unsubLedger (mSeq); - ops.unsubServer (mSeq); - ops.unsubAccount (mSeq, mSubAccountInfo, true); - ops.unsubAccount (mSeq, mSubAccountInfo, false); + m_source.unsubTransactions (mSeq); + m_source.unsubRTTransactions (mSeq); + m_source.unsubLedger (mSeq); + m_source.unsubServer (mSeq); + m_source.unsubAccount (mSeq, mSubAccountInfo, true); + m_source.unsubAccount (mSeq, mSubAccountInfo, false); } void InfoSub::send (const Json::Value& jvObj, const std::string& sObj, bool broadcast) diff --git a/modules/ripple_app/misc/ripple_InfoSub.h b/modules/ripple_app/misc/ripple_InfoSub.h index a4b1b5eb43..0a6bf1c5c2 100644 --- a/modules/ripple_app/misc/ripple_InfoSub.h +++ b/modules/ripple_app/misc/ripple_InfoSub.h @@ -12,6 +12,8 @@ class PathRequest; +/** Manages a client's subscription to data feeds. +*/ class InfoSub : public CountedObject { @@ -26,7 +28,58 @@ public: typedef const boost::shared_ptr& ref; public: - InfoSub (); + /** Abstracts the source of subscription data. + */ + class Source + { + public: + // VFALCO TODO Rename the 'rt' parameters to something meaningful. + virtual void subAccount (ref ispListener, + const boost::unordered_set& vnaAccountIDs, + uint32 uLedgerIndex, bool rt) = 0; + + virtual void unsubAccount (uint64 uListener, + const boost::unordered_set& vnaAccountIDs, + bool rt) = 0; + + // VFALCO TODO Document the bool return value + virtual bool subLedger (ref ispListener, + Json::Value& jvResult) = 0; + + virtual bool unsubLedger (uint64 uListener) = 0; + + virtual bool subServer (ref ispListener, + Json::Value& jvResult) = 0; + + virtual bool unsubServer (uint64 uListener) = 0; + + virtual bool subBook (ref ispListener, + const uint160& currencyPays, const uint160& currencyGets, + const uint160& issuerPays, const uint160& issuerGets) = 0; + + virtual bool unsubBook (uint64 uListener, + const uint160& currencyPays, const uint160& currencyGets, + const uint160& issuerPays, const uint160& issuerGets) = 0; + + virtual bool subTransactions (ref ispListener) = 0; + + virtual bool unsubTransactions (uint64 uListener) = 0; + + virtual bool subRTTransactions (ref ispListener) = 0; + + virtual bool unsubRTTransactions (uint64 uListener) = 0; + + // VFALCO TODO Remove + // This was added for one particular partner, it + // "pushes" subscription data to a particular URL. + // + virtual pointer findRpcSub (const std::string& strUrl) = 0; + + virtual pointer addRpcSub (const std::string& strUrl, ref rspEntry) = 0; + }; + +public: + explicit InfoSub (Source& source); virtual ~InfoSub (); @@ -53,6 +106,7 @@ protected: LockType mLock; private: + Source& m_source; boost::unordered_set mSubAccountInfo; boost::unordered_set mSubAccountTransaction; boost::shared_ptr mPathRequest; diff --git a/modules/ripple_app/rpc/RPCHandler.cpp b/modules/ripple_app/rpc/RPCHandler.cpp index ce1842f870..d50864a99d 100644 --- a/modules/ripple_app/rpc/RPCHandler.cpp +++ b/modules/ripple_app/rpc/RPCHandler.cpp @@ -3233,8 +3233,9 @@ Json::Value RPCHandler::doSubscribe (Json::Value params, LoadType* loadType, App { WriteLog (lsDEBUG, RPCHandler) << boost::str (boost::format ("doSubscribe: building: %s") % strUrl); - RPCSub::pointer rspSub = boost::make_shared (getApp ().getIOService (), - getApp ().getJobQueue (), strUrl, strUsername, strPassword); + RPCSub::pointer rspSub = boost::make_shared ( + getApp ().getOPs (), getApp ().getIOService (), + getApp ().getJobQueue (), strUrl, strUsername, strPassword); ispSub = mNetOps->addRpcSub (strUrl, boost::dynamic_pointer_cast (rspSub)); } else diff --git a/modules/ripple_app/rpc/RPCSub.cpp b/modules/ripple_app/rpc/RPCSub.cpp index b322af0d92..67309659bf 100644 --- a/modules/ripple_app/rpc/RPCSub.cpp +++ b/modules/ripple_app/rpc/RPCSub.cpp @@ -6,9 +6,11 @@ SETUP_LOG (RPCSub) -RPCSub::RPCSub (boost::asio::io_service& io_service, JobQueue& jobQueue, - const std::string& strUrl, const std::string& strUsername, const std::string& strPassword) - : m_io_service (io_service) +RPCSub::RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service, + JobQueue& jobQueue, const std::string& strUrl, const std::string& strUsername, + const std::string& strPassword) + : InfoSub (source) + , m_io_service (io_service) , m_jobQueue (jobQueue) , mUrl (strUrl) , mSSL (false) diff --git a/modules/ripple_app/rpc/RPCSub.h b/modules/ripple_app/rpc/RPCSub.h index 539999187a..722522dca6 100644 --- a/modules/ripple_app/rpc/RPCSub.h +++ b/modules/ripple_app/rpc/RPCSub.h @@ -4,28 +4,28 @@ */ //============================================================================== -#ifndef __RPCSUB__ -#define __RPCSUB__ +#ifndef RIPPLE_RPCSUB_H_INCLUDED +#define RIPPLE_RPCSUB_H_INCLUDED +// VFALCO TODO replace this macro with a language constant #define RPC_EVENT_QUEUE_MAX 32 // Subscription object for JSON-RPC +// VFALCO TODO Move the implementation into the .cpp +// class RPCSub : public InfoSub - , LeakChecked + , public LeakChecked { public: typedef boost::shared_ptr pointer; typedef const pointer& ref; - RPCSub (boost::asio::io_service& io_service, + RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service, JobQueue& jobQueue, const std::string& strUrl, const std::string& strUsername, const std::string& strPassword); - virtual ~RPCSub () - { - ; - } + virtual ~RPCSub () { } // Implement overridden functions from base class: void send (const Json::Value& jvObj, bool broadcast); @@ -67,4 +67,3 @@ private: }; #endif -// vim:ts=4 diff --git a/modules/ripple_app/websocket/WSConnection.h b/modules/ripple_app/websocket/WSConnection.h index 3d6e7d1d4c..9343ce8adb 100644 --- a/modules/ripple_app/websocket/WSConnection.h +++ b/modules/ripple_app/websocket/WSConnection.h @@ -14,7 +14,6 @@ struct WSConnectionLog; // Helps with naming the lock struct WSConnectionBase { - }; template @@ -39,12 +38,10 @@ public: typedef typename endpoint_type::handler::message_ptr message_ptr; public: - // WSConnection() - // : mHandler((WSServerHandler*)(NULL)), - // mConnection(connection_ptr()) { ; } - - WSConnection (WSServerHandler* wshpHandler, const connection_ptr& cpConnection) - : mRcvQueueLock (static_cast(this), "WSConn", __FILE__, __LINE__) + WSConnection (InfoSub::Source& source, WSServerHandler* wshpHandler, + const connection_ptr& cpConnection) + : InfoSub (source) + , mRcvQueueLock (static_cast(this), "WSConn", __FILE__, __LINE__) , mHandler (wshpHandler), mConnection (cpConnection), mNetwork (getApp().getOPs ()), mRemoteIP (cpConnection->get_socket ().lowest_layer ().remote_endpoint ().address ().to_string ()), mLoadSource (mRemoteIP), mPingTimer (cpConnection->get_io_service ()), mPinged (false), diff --git a/modules/ripple_app/websocket/WSDoor.cpp b/modules/ripple_app/websocket/WSDoor.cpp index ba5e980ce1..39263ed04f 100644 --- a/modules/ripple_app/websocket/WSDoor.cpp +++ b/modules/ripple_app/websocket/WSDoor.cpp @@ -25,9 +25,11 @@ SETUP_LOG (WSDoor) class WSDoorImp : public WSDoor, protected Thread, LeakChecked { public: - WSDoorImp (std::string const& strIp, int iPort, bool bPublic, + WSDoorImp (InfoSub::Source& source, + std::string const& strIp, int iPort, bool bPublic, boost::asio::ssl::context& ssl_context) : Thread ("websocket") + , m_source (source) , m_ssl_context (ssl_context) , m_endpointLock (this, "WSDoor", __FILE__, __LINE__) , mPublic (bPublic) @@ -58,7 +60,7 @@ private: (mPublic ? "Public" : "Private") % mIp % mPort); websocketpp::server_autotls::handler::ptr handler ( - new WSServerHandler ( + new WSServerHandler (m_source, m_ssl_context, mPublic)); { @@ -105,6 +107,7 @@ private: typedef RippleRecursiveMutex LockType; typedef LockType::ScopedLockType ScopedLockType; + InfoSub::Source& m_source; boost::asio::ssl::context& m_ssl_context; LockType m_endpointLock; @@ -116,14 +119,14 @@ private: //------------------------------------------------------------------------------ -WSDoor* WSDoor::New (std::string const& strIp, int iPort, bool bPublic, - boost::asio::ssl::context& ssl_context) +WSDoor* WSDoor::New (InfoSub::Source& source, std::string const& strIp, + int iPort, bool bPublic, boost::asio::ssl::context& ssl_context) { ScopedPointer door; try { - door = new WSDoorImp (strIp, iPort, bPublic, ssl_context); + door = new WSDoorImp (source, strIp, iPort, bPublic, ssl_context); } catch (...) { diff --git a/modules/ripple_app/websocket/WSDoor.h b/modules/ripple_app/websocket/WSDoor.h index 36aeb9002b..7909aa7106 100644 --- a/modules/ripple_app/websocket/WSDoor.h +++ b/modules/ripple_app/websocket/WSDoor.h @@ -11,8 +11,8 @@ class WSDoor { public: - static WSDoor* New (std::string const& strIp, int iPort, bool bPublic, - boost::asio::ssl::context& ssl_context); + static WSDoor* New (InfoSub::Source& source, std::string const& strIp, + int iPort, bool bPublic, boost::asio::ssl::context& ssl_context); }; #endif diff --git a/modules/ripple_app/websocket/WSServerHandler.h b/modules/ripple_app/websocket/WSServerHandler.h index d388b0c528..dc34cfa6fd 100644 --- a/modules/ripple_app/websocket/WSServerHandler.h +++ b/modules/ripple_app/websocket/WSServerHandler.h @@ -41,7 +41,11 @@ public: crTooSlow = 4000, // Client is too slow. }; +private: + InfoSub::Source& m_source; + protected: + // VFALCO TODO Make this private. typedef RippleMutex LockType; typedef LockType::ScopedLockType ScopedLockType; LockType mLock; @@ -50,15 +54,15 @@ private: boost::asio::ssl::context& m_ssl_context; protected: - // For each connection maintain an associated object to track subscriptions. boost::unordered_map > > mMap; bool mPublic; public: - WSServerHandler (boost::asio::ssl::context& ssl_context, bool bPublic) - : mLock (static_cast (this), "WSServerHandler", __FILE__, __LINE__) + WSServerHandler (InfoSub::Source& source, boost::asio::ssl::context& ssl_context, bool bPublic) + : m_source (source) + , mLock (static_cast (this), "WSServerHandler", __FILE__, __LINE__) , m_ssl_context (ssl_context) , mPublic (bPublic) { @@ -168,7 +172,7 @@ public: try { - mMap[cpClient] = boost::make_shared< WSConnection > (this, cpClient); + mMap[cpClient] = boost::make_shared< WSConnection > (m_source, this, cpClient); } catch (...) {