Fix a bug that causes outbound notifications to URLs to fail.

This commit is contained in:
JoelKatz
2013-02-11 18:00:22 -08:00
parent 341d198ffe
commit 0d03c1fb4a

View File

@@ -8,27 +8,27 @@
SETUP_LOG();
RPCSub::RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword)
: mUrl(strUrl), mSSL(false), mUsername(strUsername), mPassword(strPassword)
: mUrl(strUrl), mSSL(false), mUsername(strUsername), mPassword(strPassword), mSending(false)
{
std::string strScheme;
if (!parseUrl(strUrl, strScheme, mIp, mPort, mPath))
{
throw std::runtime_error("Failed to parse url.");
throw std::runtime_error("Failed to parse url.");
}
else if (strScheme == "https")
{
mSSL = true;
mSSL = true;
}
else if (strScheme != "http")
{
throw std::runtime_error("Only http and https is supported.");
throw std::runtime_error("Only http and https is supported.");
}
mSeq = 1;
if (mPort < 0)
mPort = mSSL ? 443 : 80;
mPort = mSSL ? 443 : 80;
}
// XXX Could probably create a bunch of send jobs in a single get of the lock.
@@ -39,49 +39,49 @@ void RPCSub::sendThread()
do
{
{
// Obtain the lock to manipulate the queue and change sending.
boost::mutex::scoped_lock sl(mLockInfo);
{
// Obtain the lock to manipulate the queue and change sending.
boost::mutex::scoped_lock sl(mLockInfo);
if (mDeque.empty())
{
mSending = false;
bSend = false;
}
else
{
std::pair<int, Json::Value> pEvent = mDeque.front();
if (mDeque.empty())
{
mSending = false;
bSend = false;
}
else
{
std::pair<int, Json::Value> pEvent = mDeque.front();
mDeque.pop_front();
mDeque.pop_front();
jvEvent = pEvent.second;
jvEvent["seq"] = pEvent.first;
jvEvent = pEvent.second;
jvEvent["seq"] = pEvent.first;
bSend = true;
}
}
bSend = true;
}
}
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
cLog(lsINFO) << boost::str(boost::format("callRPC calling: %s") % mIp);
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
cLog(lsINFO) << boost::str(boost::format("callRPC calling: %s") % mIp);
callRPC(
theApp->getIOService(),
mIp, mPort,
mUsername, mPassword,
mPath, "event",
jvEvent,
mSSL);
}
catch (const std::exception& e)
{
cLog(lsINFO) << boost::str(boost::format("callRPC exception: %s") % e.what());
}
}
callRPC(
theApp->getIOService(),
mIp, mPort,
mUsername, mPassword,
mPath, "event",
jvEvent,
mSSL);
}
catch (const std::exception& e)
{
cLog(lsINFO) << boost::str(boost::format("callRPC exception: %s") % e.what());
}
}
} while (bSend);
}
@@ -92,9 +92,8 @@ void RPCSub::send(const Json::Value& jvObj)
if (RPC_EVENT_QUEUE_MAX == mDeque.size())
{
// Drop the previous event.
cLog(lsWARNING) << boost::str(boost::format("callRPC drop"));
mDeque.pop_back();
cLog(lsWARNING) << boost::str(boost::format("callRPC drop"));
mDeque.pop_back();
}
cLog(lsINFO) << boost::str(boost::format("callRPC push: %s") % jvObj);
@@ -103,10 +102,10 @@ void RPCSub::send(const Json::Value& jvObj)
if (!mSending)
{
// Start a sending thread.
mSending = true;
// Start a sending thread.
mSending = true;
cLog(lsINFO) << boost::str(boost::format("callRPC start"));
boost::thread(boost::bind(&RPCSub::sendThread, this)).detach();
cLog(lsINFO) << boost::str(boost::format("callRPC start"));
boost::thread(boost::bind(&RPCSub::sendThread, this)).detach();
}
}