Make RPCSub abstract

This commit is contained in:
Vinnie Falco
2013-09-01 06:02:35 -07:00
parent 3077892c3b
commit e09d6a0a9b
3 changed files with 197 additions and 173 deletions

View File

@@ -3233,9 +3233,9 @@ Json::Value RPCHandler::doSubscribe (Json::Value params, LoadType* loadType, App
{
WriteLog (lsDEBUG, RPCHandler) << boost::str (boost::format ("doSubscribe: building: %s") % strUrl);
RPCSub::pointer rspSub = boost::make_shared <RPCSub> (
getApp ().getOPs (), getApp ().getIOService (),
getApp ().getJobQueue (), strUrl, strUsername, strPassword);
RPCSub::pointer rspSub = RPCSub::New (getApp ().getOPs (),
getApp ().getIOService (), getApp ().getJobQueue (),
strUrl, strUsername, strPassword);
ispSub = mNetOps->addRpcSub (strUrl, boost::dynamic_pointer_cast<InfoSub> (rspSub));
}
else

View File

@@ -6,124 +6,190 @@
SETUP_LOG (RPCSub)
RPCSub::RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service,
JobQueue& jobQueue, const std::string& strUrl, const std::string& strUsername,
const std::string& strPassword)
// Subscription object for JSON-RPC
class RPCSubImp
: public RPCSub
, public LeakChecked <RPCSub>
{
public:
RPCSubImp (InfoSub::Source& source, boost::asio::io_service& io_service,
JobQueue& jobQueue, const std::string& strUrl, const std::string& strUsername,
const std::string& strPassword)
: RPCSub (source)
, m_io_service (io_service)
, m_jobQueue (jobQueue)
, mUrl (strUrl)
, mSSL (false)
, mUsername (strUsername)
, mPassword (strPassword)
, mSending (false)
{
std::string strScheme;
if (!parseUrl (strUrl, strScheme, mIp, mPort, mPath))
{
throw std::runtime_error ("Failed to parse url.");
}
else if (strScheme == "https")
{
mSSL = true;
}
else if (strScheme != "http")
{
throw std::runtime_error ("Only http and https is supported.");
}
mSeq = 1;
if (mPort < 0)
mPort = mSSL ? 443 : 80;
WriteLog (lsINFO, RPCSub) <<
"RPCCall::fromNetwork sub: ip=" << mIp <<
" port=" << mPort <<
" ssl= "<< (mSSL ? "yes" : "no") <<
" path='" << mPath << "'";
}
~RPCSubImp ()
{
}
void send (const Json::Value& jvObj, bool broadcast)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mDeque.size () >= eventQueueMax)
{
// Drop the previous event.
WriteLog (lsWARNING, RPCSub) << "RPCCall::fromNetwork drop";
mDeque.pop_back ();
}
WriteLog (broadcast ? lsDEBUG : lsINFO, RPCSub) <<
"RPCCall::fromNetwork push: " << jvObj;
mDeque.push_back (std::make_pair (mSeq++, jvObj));
if (!mSending)
{
// Start a sending thread.
mSending = true;
WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork start";
m_jobQueue.addJob (
jtCLIENT, "RPCSub::sendThread", BIND_TYPE (&RPCSubImp::sendThread, this));
}
}
void setUsername (const std::string& strUsername)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mUsername = strUsername;
}
void setPassword (const std::string& strPassword)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mPassword = strPassword;
}
private:
// XXX Could probably create a bunch of send jobs in a single get of the lock.
void sendThread ()
{
Json::Value jvEvent;
bool bSend;
do
{
{
// Obtain the lock to manipulate the queue and change sending.
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mDeque.empty ())
{
mSending = false;
bSend = false;
}
else
{
std::pair<int, Json::Value> pEvent = mDeque.front ();
mDeque.pop_front ();
jvEvent = pEvent.second;
jvEvent["seq"] = pEvent.first;
bSend = true;
}
}
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork: " << mIp;
RPCCall::fromNetwork (
m_io_service,
mIp, mPort,
mUsername, mPassword,
mPath, "event",
jvEvent,
mSSL);
}
catch (const std::exception& e)
{
WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork exception: " << e.what ();
}
}
}
while (bSend);
}
private:
// VFALCO TODO replace this macro with a language constant
enum
{
eventQueueMax = 32
};
boost::asio::io_service& m_io_service;
JobQueue& m_jobQueue;
std::string mUrl;
std::string mIp;
int mPort;
bool mSSL;
std::string mUsername;
std::string mPassword;
std::string mPath;
int mSeq; // Next id to allocate.
bool mSending; // Sending threead is active.
std::deque<std::pair<int, Json::Value> > mDeque;
};
//------------------------------------------------------------------------------
RPCSub::RPCSub (InfoSub::Source& source)
: InfoSub (source)
, m_io_service (io_service)
, m_jobQueue (jobQueue)
, mUrl (strUrl)
, mSSL (false)
, mUsername (strUsername)
, mPassword (strPassword)
, mSending (false)
{
std::string strScheme;
if (!parseUrl (strUrl, strScheme, mIp, mPort, mPath))
{
throw std::runtime_error ("Failed to parse url.");
}
else if (strScheme == "https")
{
mSSL = true;
}
else if (strScheme != "http")
{
throw std::runtime_error ("Only http and https is supported.");
}
mSeq = 1;
if (mPort < 0)
mPort = mSSL ? 443 : 80;
WriteLog (lsINFO, RPCSub) <<
"RPCCall::fromNetwork sub: ip=" << mIp <<
" port=" << mPort <<
" ssl= "<< (mSSL ? "yes" : "no") <<
" path='" << mPath << "'";
}
// XXX Could probably create a bunch of send jobs in a single get of the lock.
void RPCSub::sendThread ()
RPCSub::pointer RPCSub::New (InfoSub::Source& source,
boost::asio::io_service& io_service, JobQueue& jobQueue,
const std::string& strUrl, const std::string& strUsername,
const std::string& strPassword)
{
Json::Value jvEvent;
bool bSend;
do
{
{
// Obtain the lock to manipulate the queue and change sending.
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mDeque.empty ())
{
mSending = false;
bSend = false;
}
else
{
std::pair<int, Json::Value> pEvent = mDeque.front ();
mDeque.pop_front ();
jvEvent = pEvent.second;
jvEvent["seq"] = pEvent.first;
bSend = true;
}
}
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork: " << mIp;
RPCCall::fromNetwork (
m_io_service,
mIp, mPort,
mUsername, mPassword,
mPath, "event",
jvEvent,
mSSL);
}
catch (const std::exception& e)
{
WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork exception: " << e.what ();
}
}
}
while (bSend);
}
void RPCSub::send (const Json::Value& jvObj, bool broadcast)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mDeque.size () >= eventQueueMax)
{
// Drop the previous event.
WriteLog (lsWARNING, RPCSub) << "RPCCall::fromNetwork drop";
mDeque.pop_back ();
}
WriteLog (broadcast ? lsDEBUG : lsINFO, RPCSub) <<
"RPCCall::fromNetwork push: " << jvObj;
mDeque.push_back (std::make_pair (mSeq++, jvObj));
if (!mSending)
{
// Start a sending thread.
mSending = true;
WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork start";
m_jobQueue.addJob (
jtCLIENT, "RPCSub::sendThread", BIND_TYPE (&RPCSub::sendThread, this));
}
}
return boost::make_shared <RPCSubImp> (source, io_service, jobQueue,
strUrl, strUsername, strPassword);
}

View File

@@ -7,66 +7,24 @@
#ifndef RIPPLE_NET_RPC_RPCSUB_H_INCLUDED
#define RIPPLE_NET_RPC_RPCSUB_H_INCLUDED
// Subscription object for JSON-RPC
// VFALCO TODO Move the implementation into the .cpp
//
class RPCSub
: public InfoSub
, public LeakChecked <RPCSub>
/** Subscription object for JSON RPC.
*/
class RPCSub : public InfoSub
{
public:
typedef boost::shared_ptr<RPCSub> pointer;
typedef const pointer& ref;
typedef boost::shared_ptr <RPCSub> pointer;
typedef pointer const& ref;
RPCSub (InfoSub::Source& source, boost::asio::io_service& io_service,
JobQueue& jobQueue, const std::string& strUrl,
const std::string& strUsername, const std::string& strPassword);
static pointer New (InfoSub::Source& source,
boost::asio::io_service& io_service, JobQueue& jobQueue,
const std::string& strUrl, const std::string& strUsername,
const std::string& strPassword);
virtual ~RPCSub () { }
// Implement overridden functions from base class:
void send (const Json::Value& jvObj, bool broadcast);
void setUsername (const std::string& strUsername)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mUsername = strUsername;
}
void setPassword (const std::string& strPassword)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mPassword = strPassword;
}
virtual void setUsername (const std::string& strUsername) = 0;
virtual void setPassword (const std::string& strPassword) = 0;
protected:
void sendThread ();
private:
// VFALCO TODO replace this macro with a language constant
enum
{
eventQueueMax = 32
};
boost::asio::io_service& m_io_service;
JobQueue& m_jobQueue;
std::string mUrl;
std::string mIp;
int mPort;
bool mSSL;
std::string mUsername;
std::string mPassword;
std::string mPath;
int mSeq; // Next id to allocate.
bool mSending; // Sending threead is active.
std::deque<std::pair<int, Json::Value> > mDeque;
explicit RPCSub (InfoSub::Source& source);
};
#endif