diff --git a/modules/ripple_app/rpc/RPCHandler.cpp b/modules/ripple_app/rpc/RPCHandler.cpp index 574193205..e55a35c98 100644 --- a/modules/ripple_app/rpc/RPCHandler.cpp +++ b/modules/ripple_app/rpc/RPCHandler.cpp @@ -3233,9 +3233,9 @@ Json::Value RPCHandler::doSubscribe (Json::Value params, LoadType* loadType, App { WriteLog (lsDEBUG, RPCHandler) << boost::str (boost::format ("doSubscribe: building: %s") % strUrl); - RPCSub::pointer rspSub = boost::make_shared ( - getApp ().getOPs (), getApp ().getIOService (), - getApp ().getJobQueue (), strUrl, strUsername, strPassword); + RPCSub::pointer rspSub = RPCSub::New (getApp ().getOPs (), + getApp ().getIOService (), getApp ().getJobQueue (), + strUrl, strUsername, strPassword); ispSub = mNetOps->addRpcSub (strUrl, boost::dynamic_pointer_cast (rspSub)); } else diff --git a/modules/ripple_net/rpc/RPCSub.cpp b/modules/ripple_net/rpc/RPCSub.cpp index 81062eaf9..70e185251 100644 --- a/modules/ripple_net/rpc/RPCSub.cpp +++ b/modules/ripple_net/rpc/RPCSub.cpp @@ -6,124 +6,190 @@ SETUP_LOG (RPCSub) -RPCSub::RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service, - JobQueue& jobQueue, const std::string& strUrl, const std::string& strUsername, - const std::string& strPassword) +// Subscription object for JSON-RPC +class RPCSubImp + : public RPCSub + , public LeakChecked +{ +public: + RPCSubImp (InfoSub::Source& source, boost::asio::io_service& io_service, + JobQueue& jobQueue, const std::string& strUrl, const std::string& strUsername, + const std::string& strPassword) + : RPCSub (source) + , m_io_service (io_service) + , m_jobQueue (jobQueue) + , mUrl (strUrl) + , mSSL (false) + , mUsername (strUsername) + , mPassword (strPassword) + , mSending (false) + { + std::string strScheme; + + if (!parseUrl (strUrl, strScheme, mIp, mPort, mPath)) + { + throw std::runtime_error ("Failed to parse url."); + } + else if (strScheme == "https") + { + mSSL = true; + } + else if (strScheme != "http") + { + throw std::runtime_error ("Only http and https is supported."); + } + + mSeq = 1; + + if (mPort < 0) + mPort = mSSL ? 443 : 80; + + WriteLog (lsINFO, RPCSub) << + "RPCCall::fromNetwork sub: ip=" << mIp << + " port=" << mPort << + " ssl= "<< (mSSL ? "yes" : "no") << + " path='" << mPath << "'"; + } + + ~RPCSubImp () + { + } + + void send (const Json::Value& jvObj, bool broadcast) + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + + if (mDeque.size () >= eventQueueMax) + { + // Drop the previous event. + WriteLog (lsWARNING, RPCSub) << "RPCCall::fromNetwork drop"; + mDeque.pop_back (); + } + + WriteLog (broadcast ? lsDEBUG : lsINFO, RPCSub) << + "RPCCall::fromNetwork push: " << jvObj; + + mDeque.push_back (std::make_pair (mSeq++, jvObj)); + + if (!mSending) + { + // Start a sending thread. + mSending = true; + + WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork start"; + + m_jobQueue.addJob ( + jtCLIENT, "RPCSub::sendThread", BIND_TYPE (&RPCSubImp::sendThread, this)); + } + } + + void setUsername (const std::string& strUsername) + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + + mUsername = strUsername; + } + + void setPassword (const std::string& strPassword) + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + + mPassword = strPassword; + } + +private: + // XXX Could probably create a bunch of send jobs in a single get of the lock. + void sendThread () + { + Json::Value jvEvent; + bool bSend; + + do + { + { + // Obtain the lock to manipulate the queue and change sending. + ScopedLockType sl (mLock, __FILE__, __LINE__); + + if (mDeque.empty ()) + { + mSending = false; + bSend = false; + } + else + { + std::pair pEvent = mDeque.front (); + + mDeque.pop_front (); + + jvEvent = pEvent.second; + jvEvent["seq"] = pEvent.first; + + bSend = true; + } + } + + // Send outside of the lock. + if (bSend) + { + // XXX Might not need this in a try. + try + { + WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork: " << mIp; + + RPCCall::fromNetwork ( + m_io_service, + mIp, mPort, + mUsername, mPassword, + mPath, "event", + jvEvent, + mSSL); + } + catch (const std::exception& e) + { + WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork exception: " << e.what (); + } + } + } + while (bSend); + } + +private: +// VFALCO TODO replace this macro with a language constant + enum + { + eventQueueMax = 32 + }; + + boost::asio::io_service& m_io_service; + JobQueue& m_jobQueue; + + std::string mUrl; + std::string mIp; + int mPort; + bool mSSL; + std::string mUsername; + std::string mPassword; + std::string mPath; + + int mSeq; // Next id to allocate. + + bool mSending; // Sending threead is active. + + std::deque > mDeque; +}; + +//------------------------------------------------------------------------------ + +RPCSub::RPCSub (InfoSub::Source& source) : InfoSub (source) - , m_io_service (io_service) - , m_jobQueue (jobQueue) - , mUrl (strUrl) - , mSSL (false) - , mUsername (strUsername) - , mPassword (strPassword) - , mSending (false) { - std::string strScheme; - - if (!parseUrl (strUrl, strScheme, mIp, mPort, mPath)) - { - throw std::runtime_error ("Failed to parse url."); - } - else if (strScheme == "https") - { - mSSL = true; - } - else if (strScheme != "http") - { - throw std::runtime_error ("Only http and https is supported."); - } - - mSeq = 1; - - if (mPort < 0) - mPort = mSSL ? 443 : 80; - - WriteLog (lsINFO, RPCSub) << - "RPCCall::fromNetwork sub: ip=" << mIp << - " port=" << mPort << - " ssl= "<< (mSSL ? "yes" : "no") << - " path='" << mPath << "'"; } -// XXX Could probably create a bunch of send jobs in a single get of the lock. -void RPCSub::sendThread () +RPCSub::pointer RPCSub::New (InfoSub::Source& source, + boost::asio::io_service& io_service, JobQueue& jobQueue, + const std::string& strUrl, const std::string& strUsername, + const std::string& strPassword) { - Json::Value jvEvent; - bool bSend; - - do - { - { - // Obtain the lock to manipulate the queue and change sending. - ScopedLockType sl (mLock, __FILE__, __LINE__); - - if (mDeque.empty ()) - { - mSending = false; - bSend = false; - } - else - { - std::pair pEvent = mDeque.front (); - - mDeque.pop_front (); - - jvEvent = pEvent.second; - jvEvent["seq"] = pEvent.first; - - bSend = true; - } - } - - // Send outside of the lock. - if (bSend) - { - // XXX Might not need this in a try. - try - { - WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork: " << mIp; - - RPCCall::fromNetwork ( - m_io_service, - mIp, mPort, - mUsername, mPassword, - mPath, "event", - jvEvent, - mSSL); - } - catch (const std::exception& e) - { - WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork exception: " << e.what (); - } - } - } - while (bSend); -} - -void RPCSub::send (const Json::Value& jvObj, bool broadcast) -{ - ScopedLockType sl (mLock, __FILE__, __LINE__); - - if (mDeque.size () >= eventQueueMax) - { - // Drop the previous event. - WriteLog (lsWARNING, RPCSub) << "RPCCall::fromNetwork drop"; - mDeque.pop_back (); - } - - WriteLog (broadcast ? lsDEBUG : lsINFO, RPCSub) << - "RPCCall::fromNetwork push: " << jvObj; - - mDeque.push_back (std::make_pair (mSeq++, jvObj)); - - if (!mSending) - { - // Start a sending thread. - mSending = true; - - WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork start"; - - m_jobQueue.addJob ( - jtCLIENT, "RPCSub::sendThread", BIND_TYPE (&RPCSub::sendThread, this)); - } -} + return boost::make_shared (source, io_service, jobQueue, + strUrl, strUsername, strPassword); +} \ No newline at end of file diff --git a/modules/ripple_net/rpc/RPCSub.h b/modules/ripple_net/rpc/RPCSub.h index 0a811d1f0..7dd543230 100644 --- a/modules/ripple_net/rpc/RPCSub.h +++ b/modules/ripple_net/rpc/RPCSub.h @@ -7,66 +7,24 @@ #ifndef RIPPLE_NET_RPC_RPCSUB_H_INCLUDED #define RIPPLE_NET_RPC_RPCSUB_H_INCLUDED -// Subscription object for JSON-RPC -// VFALCO TODO Move the implementation into the .cpp -// -class RPCSub - : public InfoSub - , public LeakChecked +/** Subscription object for JSON RPC. +*/ +class RPCSub : public InfoSub { public: - typedef boost::shared_ptr pointer; - typedef const pointer& ref; + typedef boost::shared_ptr pointer; + typedef pointer const& ref; - RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service, - JobQueue& jobQueue, const std::string& strUrl, - const std::string& strUsername, const std::string& strPassword); + static pointer New (InfoSub::Source& source, + boost::asio::io_service& io_service, JobQueue& jobQueue, + const std::string& strUrl, const std::string& strUsername, + const std::string& strPassword); - virtual ~RPCSub () { } - - // Implement overridden functions from base class: - void send (const Json::Value& jvObj, bool broadcast); - - void setUsername (const std::string& strUsername) - { - ScopedLockType sl (mLock, __FILE__, __LINE__); - - mUsername = strUsername; - } - - void setPassword (const std::string& strPassword) - { - ScopedLockType sl (mLock, __FILE__, __LINE__); - - mPassword = strPassword; - } + virtual void setUsername (const std::string& strUsername) = 0; + virtual void setPassword (const std::string& strPassword) = 0; protected: - void sendThread (); - -private: -// VFALCO TODO replace this macro with a language constant - enum - { - eventQueueMax = 32 - }; - - boost::asio::io_service& m_io_service; - JobQueue& m_jobQueue; - - std::string mUrl; - std::string mIp; - int mPort; - bool mSSL; - std::string mUsername; - std::string mPassword; - std::string mPath; - - int mSeq; // Next id to allocate. - - bool mSending; // Sending threead is active. - - std::deque > mDeque; + explicit RPCSub (InfoSub::Source& source); }; #endif