Fixes for rpc subscriptions.

This commit is contained in:
Arthur Britto
2012-12-30 17:01:34 -08:00
parent 5934f77b60
commit 6ba31accb2
2 changed files with 31 additions and 5 deletions

View File

@@ -4,10 +4,29 @@
#include "CallRPC.h" #include "CallRPC.h"
SETUP_LOG();
RPCSub::RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword) RPCSub::RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword)
: mUrl(strUrl), mUsername(strUsername), mPassword(strPassword) : mUrl(strUrl), mUsername(strUsername), mPassword(strPassword)
{ {
mId = 1; std::string strScheme;
std::string strPath;
if (!parseUrl(strUrl, strScheme, mIp, mPort, strPath))
{
throw std::runtime_error("Failed to parse url.");
}
else if (strScheme != "http")
{
throw std::runtime_error("Only http is supported.");
}
else if (!strPath.empty())
{
// XXX FIXME: support path
throw std::runtime_error("Only empty path is supported.");
}
mSeq = 1;
} }
void RPCSub::sendThread() void RPCSub::sendThread()
@@ -33,7 +52,7 @@ void RPCSub::sendThread()
mDeque.pop_front(); mDeque.pop_front();
jvEvent = pEvent.second; jvEvent = pEvent.second;
jvEvent["id"] = pEvent.first; jvEvent["seq"] = pEvent.first;
bSend = true; bSend = true;
} }
@@ -43,7 +62,14 @@ void RPCSub::sendThread()
if (bSend) if (bSend)
{ {
// Drop result. // Drop result.
(void) callRPC(mIp, mPort, mUsername, mPassword, "event", jvEvent); try
{
(void) callRPC(mIp, mPort, mUsername, mPassword, "event", jvEvent);
}
catch (const std::exception& e)
{
cLog(lsDEBUG) << boost::str(boost::format("callRPC exception: %s") % e.what());
}
sendThread(); sendThread();
} }
@@ -61,7 +87,7 @@ void RPCSub::send(const Json::Value& jvObj)
mDeque.pop_back(); mDeque.pop_back();
} }
mDeque.push_back(std::make_pair(mId++, jvObj)); mDeque.push_back(std::make_pair(mSeq++, jvObj));
if (!mSending) if (!mSending)
{ {

View File

@@ -18,7 +18,7 @@ class RPCSub : public InfoSub
std::string mUsername; std::string mUsername;
std::string mPassword; std::string mPassword;
int mId; // Next id to allocate. int mSeq; // Next id to allocate.
bool mSending; // Sending threead is active. bool mSending; // Sending threead is active.