diff --git a/src/cpp/ripple/CallRPC.h b/src/cpp/ripple/CallRPC.h index f17b5b576e..9b9ca83967 100644 --- a/src/cpp/ripple/CallRPC.h +++ b/src/cpp/ripple/CallRPC.h @@ -1,7 +1,6 @@ #ifndef __CALLRPC__ #define __CALLRPC__ - #include #include "../json/value.h" diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 007bcd6b8c..327d165280 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -1502,13 +1502,32 @@ bool NetworkOPs::unsubRTTransactions(InfoSub* ispListener) return !!mSubTransactions.erase(ispListener); } -RPCSub* NetworkOPs::findRpcSub(const std::string& strRpc) +RPCSub* NetworkOPs::findRpcSub(const std::string& strUrl) { - return (RPCSub*)(0); + RPCSub* rspResult; + boost::recursive_mutex::scoped_lock sl(mMonitorLock); + + subRpcMapType::iterator it; + + it = mRpcSubMap.find(strUrl); + if (it == mRpcSubMap.end()) + { + rspResult = (RPCSub*)(0); + } + else + { + rspResult = it->second; + } + + return rspResult; } -RPCSub* NetworkOPs::addRpcSub(const std::string& strRpc, RPCSub* rspEntry) +RPCSub* NetworkOPs::addRpcSub(const std::string& strUrl, RPCSub* rspEntry) { + boost::recursive_mutex::scoped_lock sl(mMonitorLock); + + mRpcSubMap.insert(std::make_pair(strUrl, rspEntry)); + return rspEntry; } diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index dfc1ea374d..180a06ff8b 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -70,7 +70,7 @@ protected: typedef boost::unordered_map > subSubmitMapType; - typedef boost::unordered_map subRpcMapType; + typedef boost::unordered_map subRpcMapType; OperatingMode mMode; bool mNeedNetworkLedger; @@ -276,8 +276,8 @@ public: bool subRTTransactions(InfoSub* ispListener); bool unsubRTTransactions(InfoSub* ispListener); - RPCSub* findRpcSub(const std::string& strRpc); - RPCSub* addRpcSub(const std::string& strRpc, RPCSub* rspEntry); + RPCSub* findRpcSub(const std::string& strUrl); + RPCSub* addRpcSub(const std::string& strUrl, RPCSub* rspEntry); }; #endif diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index 884d03f49e..eda9b97b47 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -2257,7 +2257,7 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) return jvResult; } -// This leaks RPCSub objects for JSON-RPC. Shouldn't matter for anyone sane. +// FIXME: This leaks RPCSub objects for JSON-RPC. Shouldn't matter for anyone sane. Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) { InfoSub* ispSub; @@ -2416,62 +2416,61 @@ Json::Value RPCHandler::doCommand(Json::Value& jvRequest, int iRole) const char* pCommand; doFuncPtr dfpFunc; bool bAdminRequired; - bool bEvented; unsigned int iOptions; } commandsA[] = { // Request-response methods - { "accept_ledger", &RPCHandler::doAcceptLedger, true, false, optCurrent }, - { "account_info", &RPCHandler::doAccountInfo, false, false, optCurrent }, - { "account_lines", &RPCHandler::doAccountLines, false, false, optCurrent }, - { "account_offers", &RPCHandler::doAccountOffers, false, false, optCurrent }, - { "account_tx", &RPCHandler::doAccountTransactions, false, false, optNetwork }, - { "connect", &RPCHandler::doConnect, true, false, optNone }, - { "get_counts", &RPCHandler::doGetCounts, true, false, optNone }, - { "ledger", &RPCHandler::doLedger, false, false, optNetwork }, - { "ledger_accept", &RPCHandler::doLedgerAccept, true, false, optCurrent }, - { "ledger_closed", &RPCHandler::doLedgerClosed, false, false, optClosed }, - { "ledger_current", &RPCHandler::doLedgerCurrent, false, false, optCurrent }, - { "ledger_entry", &RPCHandler::doLedgerEntry, false, false, optCurrent }, - { "ledger_header", &RPCHandler::doLedgerHeader, false, false, optCurrent }, - { "log_level", &RPCHandler::doLogLevel, true, false, optNone }, - { "logrotate", &RPCHandler::doLogRotate, true, false, optNone }, -// { "nickname_info", &RPCHandler::doNicknameInfo, false, false, optCurrent }, - { "owner_info", &RPCHandler::doOwnerInfo, false, false, optCurrent }, - { "peers", &RPCHandler::doPeers, true, false, optNone }, -// { "profile", &RPCHandler::doProfile, false, false, optCurrent }, - { "random", &RPCHandler::doRandom, false, false, optNone }, - { "ripple_path_find", &RPCHandler::doRipplePathFind, false, false, optCurrent }, - { "submit", &RPCHandler::doSubmit, false, false, optCurrent }, - { "server_info", &RPCHandler::doServerInfo, true, false, optNone }, - { "stop", &RPCHandler::doStop, true, false, optNone }, - { "transaction_entry", &RPCHandler::doTransactionEntry, false, false, optCurrent }, - { "tx", &RPCHandler::doTx, false, false, optNetwork }, - { "tx_history", &RPCHandler::doTxHistory, false, false, optNone }, + { "accept_ledger", &RPCHandler::doAcceptLedger, true, optCurrent }, + { "account_info", &RPCHandler::doAccountInfo, false, optCurrent }, + { "account_lines", &RPCHandler::doAccountLines, false, optCurrent }, + { "account_offers", &RPCHandler::doAccountOffers, false, optCurrent }, + { "account_tx", &RPCHandler::doAccountTransactions, false, optNetwork }, + { "connect", &RPCHandler::doConnect, true, optNone }, + { "get_counts", &RPCHandler::doGetCounts, true, optNone }, + { "ledger", &RPCHandler::doLedger, false, optNetwork }, + { "ledger_accept", &RPCHandler::doLedgerAccept, true, optCurrent }, + { "ledger_closed", &RPCHandler::doLedgerClosed, false, optClosed }, + { "ledger_current", &RPCHandler::doLedgerCurrent, false, optCurrent }, + { "ledger_entry", &RPCHandler::doLedgerEntry, false, optCurrent }, + { "ledger_header", &RPCHandler::doLedgerHeader, false, optCurrent }, + { "log_level", &RPCHandler::doLogLevel, true, optNone }, + { "logrotate", &RPCHandler::doLogRotate, true, optNone }, +// { "nickname_info", &RPCHandler::doNicknameInfo, false, optCurrent }, + { "owner_info", &RPCHandler::doOwnerInfo, false, optCurrent }, + { "peers", &RPCHandler::doPeers, true, optNone }, +// { "profile", &RPCHandler::doProfile, false, optCurrent }, + { "random", &RPCHandler::doRandom, false, optNone }, + { "ripple_path_find", &RPCHandler::doRipplePathFind, false, optCurrent }, + { "submit", &RPCHandler::doSubmit, false, optCurrent }, + { "server_info", &RPCHandler::doServerInfo, true, optNone }, + { "stop", &RPCHandler::doStop, true, optNone }, + { "transaction_entry", &RPCHandler::doTransactionEntry, false, optCurrent }, + { "tx", &RPCHandler::doTx, false, optNetwork }, + { "tx_history", &RPCHandler::doTxHistory, false, optNone }, - { "unl_add", &RPCHandler::doUnlAdd, true, false, optNone }, - { "unl_delete", &RPCHandler::doUnlDelete, true, false, optNone }, - { "unl_list", &RPCHandler::doUnlList, true, false, optNone }, - { "unl_load", &RPCHandler::doUnlLoad, true, false, optNone }, - { "unl_network", &RPCHandler::doUnlNetwork, true, false, optNone }, - { "unl_reset", &RPCHandler::doUnlReset, true, false, optNone }, - { "unl_score", &RPCHandler::doUnlScore, true, false, optNone }, + { "unl_add", &RPCHandler::doUnlAdd, true, optNone }, + { "unl_delete", &RPCHandler::doUnlDelete, true, optNone }, + { "unl_list", &RPCHandler::doUnlList, true, optNone }, + { "unl_load", &RPCHandler::doUnlLoad, true, optNone }, + { "unl_network", &RPCHandler::doUnlNetwork, true, optNone }, + { "unl_reset", &RPCHandler::doUnlReset, true, optNone }, + { "unl_score", &RPCHandler::doUnlScore, true, optNone }, - { "validation_create", &RPCHandler::doValidationCreate, false, false, optNone }, - { "validation_seed", &RPCHandler::doValidationSeed, false, false, optNone }, + { "validation_create", &RPCHandler::doValidationCreate, false, optNone }, + { "validation_seed", &RPCHandler::doValidationSeed, false, optNone }, - { "wallet_accounts", &RPCHandler::doWalletAccounts, false, false, optCurrent }, - { "wallet_propose", &RPCHandler::doWalletPropose, false, false, optNone }, - { "wallet_seed", &RPCHandler::doWalletSeed, false, false, optNone }, + { "wallet_accounts", &RPCHandler::doWalletAccounts, false, optCurrent }, + { "wallet_propose", &RPCHandler::doWalletPropose, false, optNone }, + { "wallet_seed", &RPCHandler::doWalletSeed, false, optNone }, // XXX Unnecessary commands which should be removed. - { "login", &RPCHandler::doLogin, true, false, optNone }, - { "data_delete", &RPCHandler::doDataDelete, true, false, optNone }, - { "data_fetch", &RPCHandler::doDataFetch, true, false, optNone }, - { "data_store", &RPCHandler::doDataStore, true, false, optNone }, + { "login", &RPCHandler::doLogin, true, optNone }, + { "data_delete", &RPCHandler::doDataDelete, true, optNone }, + { "data_fetch", &RPCHandler::doDataFetch, true, optNone }, + { "data_store", &RPCHandler::doDataStore, true, optNone }, // Evented methods - { "subscribe", &RPCHandler::doSubscribe, false, true, optNone }, - { "unsubscribe", &RPCHandler::doUnsubscribe, false, true, optNone }, + { "subscribe", &RPCHandler::doSubscribe, false, optNone }, + { "unsubscribe", &RPCHandler::doUnsubscribe, false, optNone }, }; int i = NUMBER(commandsA); @@ -2487,10 +2486,6 @@ Json::Value RPCHandler::doCommand(Json::Value& jvRequest, int iRole) { return rpcError(rpcNO_PERMISSION); } - else if (commandsA[i].bEvented && mInfoSub == NULL) - { - return rpcError(rpcNO_EVENTS); - } else if (commandsA[i].iOptions & optNetwork && mNetOps->getOperatingMode() != NetworkOPs::omTRACKING && mNetOps->getOperatingMode() != NetworkOPs::omFULL) diff --git a/src/cpp/ripple/RPCSub.cpp b/src/cpp/ripple/RPCSub.cpp index 633feaaa77..a0759cca46 100644 --- a/src/cpp/ripple/RPCSub.cpp +++ b/src/cpp/ripple/RPCSub.cpp @@ -1,12 +1,73 @@ +#include + #include "RPCSub.h" +#include "CallRPC.h" + RPCSub::RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword) : mUrl(strUrl), mUsername(strUsername), mPassword(strPassword) { + mId = 1; +} +void RPCSub::sendThread() +{ + Json::Value jvEvent; + bool bSend; + + do + { + { + // Obtain the lock to manipulate the queue and change sending. + boost::mutex::scoped_lock sl(mLockInfo); + + if (mDeque.empty()) + { + mSending = false; + bSend = false; + } + else + { + std::pair pEvent = mDeque.front(); + + mDeque.pop_front(); + + jvEvent = pEvent.second; + jvEvent["id"] = pEvent.first; + + bSend = true; + } + } + + // Send outside of the lock. + if (bSend) + { + // Drop result. + (void) callRPC(mIp, mPort, mUsername, mPassword, "event", jvEvent); + + sendThread(); + } + } while (bSend); } void RPCSub::send(const Json::Value& jvObj) { + boost::mutex::scoped_lock sl(mLockInfo); + if (RPC_EVENT_QUEUE_MAX == mDeque.size()) + { + // Drop the previous event. + + mDeque.pop_back(); + } + + mDeque.push_back(std::make_pair(mId++, jvObj)); + + if (!mSending) + { + // Start a sending thread. + mSending = true; + + boost::thread(boost::bind(&RPCSub::sendThread, this)).detach(); + } } diff --git a/src/cpp/ripple/RPCSub.h b/src/cpp/ripple/RPCSub.h index 7971ee1211..2e4921ec14 100644 --- a/src/cpp/ripple/RPCSub.h +++ b/src/cpp/ripple/RPCSub.h @@ -1,20 +1,37 @@ #ifndef __RPCSUB__ #define __RPCSUB__ +#include + +#include "../json/value.h" + #include "NetworkOPs.h" +#define RPC_EVENT_QUEUE_MAX 32 + // Subscription object for JSON-RPC class RPCSub : public InfoSub { - std::string mUrl; - std::string mIp; - int mPort; - std::string mUsername; - std::string mPassword; + std::string mUrl; + std::string mIp; + int mPort; + std::string mUsername; + std::string mPassword; + + int mId; // Next id to allocate. + + bool mSending; // Sending threead is active. + + std::deque > mDeque; + +protected: + void sendThread(); public: RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword); + virtual ~RPCSub() { ; } + // Implement overridden functions from base class: void send(const Json::Value& jvObj);