mirror of
https://github.com/Xahau/xahaud.git
synced 2025-12-06 17:27:52 +00:00
Add support for JSON-RPC subscriptions.
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
#ifndef __CALLRPC__
|
||||
#define __CALLRPC__
|
||||
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "../json/value.h"
|
||||
|
||||
@@ -1502,13 +1502,32 @@ bool NetworkOPs::unsubRTTransactions(InfoSub* ispListener)
|
||||
return !!mSubTransactions.erase(ispListener);
|
||||
}
|
||||
|
||||
RPCSub* NetworkOPs::findRpcSub(const std::string& strRpc)
|
||||
RPCSub* NetworkOPs::findRpcSub(const std::string& strUrl)
|
||||
{
|
||||
return (RPCSub*)(0);
|
||||
RPCSub* rspResult;
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
|
||||
subRpcMapType::iterator it;
|
||||
|
||||
it = mRpcSubMap.find(strUrl);
|
||||
if (it == mRpcSubMap.end())
|
||||
{
|
||||
rspResult = (RPCSub*)(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
rspResult = it->second;
|
||||
}
|
||||
|
||||
return rspResult;
|
||||
}
|
||||
|
||||
RPCSub* NetworkOPs::addRpcSub(const std::string& strRpc, RPCSub* rspEntry)
|
||||
RPCSub* NetworkOPs::addRpcSub(const std::string& strUrl, RPCSub* rspEntry)
|
||||
{
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
|
||||
mRpcSubMap.insert(std::make_pair(strUrl, rspEntry));
|
||||
|
||||
return rspEntry;
|
||||
}
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ protected:
|
||||
|
||||
typedef boost::unordered_map<uint160,std::pair<InfoSub*,uint32> > subSubmitMapType;
|
||||
|
||||
typedef boost::unordered_map<std::string, InfoSub* > subRpcMapType;
|
||||
typedef boost::unordered_map<std::string, RPCSub* > subRpcMapType;
|
||||
|
||||
OperatingMode mMode;
|
||||
bool mNeedNetworkLedger;
|
||||
@@ -276,8 +276,8 @@ public:
|
||||
bool subRTTransactions(InfoSub* ispListener);
|
||||
bool unsubRTTransactions(InfoSub* ispListener);
|
||||
|
||||
RPCSub* findRpcSub(const std::string& strRpc);
|
||||
RPCSub* addRpcSub(const std::string& strRpc, RPCSub* rspEntry);
|
||||
RPCSub* findRpcSub(const std::string& strUrl);
|
||||
RPCSub* addRpcSub(const std::string& strUrl, RPCSub* rspEntry);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
@@ -2257,7 +2257,7 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
|
||||
return jvResult;
|
||||
}
|
||||
|
||||
// This leaks RPCSub objects for JSON-RPC. Shouldn't matter for anyone sane.
|
||||
// FIXME: This leaks RPCSub objects for JSON-RPC. Shouldn't matter for anyone sane.
|
||||
Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest)
|
||||
{
|
||||
InfoSub* ispSub;
|
||||
@@ -2416,62 +2416,61 @@ Json::Value RPCHandler::doCommand(Json::Value& jvRequest, int iRole)
|
||||
const char* pCommand;
|
||||
doFuncPtr dfpFunc;
|
||||
bool bAdminRequired;
|
||||
bool bEvented;
|
||||
unsigned int iOptions;
|
||||
} commandsA[] = {
|
||||
// Request-response methods
|
||||
{ "accept_ledger", &RPCHandler::doAcceptLedger, true, false, optCurrent },
|
||||
{ "account_info", &RPCHandler::doAccountInfo, false, false, optCurrent },
|
||||
{ "account_lines", &RPCHandler::doAccountLines, false, false, optCurrent },
|
||||
{ "account_offers", &RPCHandler::doAccountOffers, false, false, optCurrent },
|
||||
{ "account_tx", &RPCHandler::doAccountTransactions, false, false, optNetwork },
|
||||
{ "connect", &RPCHandler::doConnect, true, false, optNone },
|
||||
{ "get_counts", &RPCHandler::doGetCounts, true, false, optNone },
|
||||
{ "ledger", &RPCHandler::doLedger, false, false, optNetwork },
|
||||
{ "ledger_accept", &RPCHandler::doLedgerAccept, true, false, optCurrent },
|
||||
{ "ledger_closed", &RPCHandler::doLedgerClosed, false, false, optClosed },
|
||||
{ "ledger_current", &RPCHandler::doLedgerCurrent, false, false, optCurrent },
|
||||
{ "ledger_entry", &RPCHandler::doLedgerEntry, false, false, optCurrent },
|
||||
{ "ledger_header", &RPCHandler::doLedgerHeader, false, false, optCurrent },
|
||||
{ "log_level", &RPCHandler::doLogLevel, true, false, optNone },
|
||||
{ "logrotate", &RPCHandler::doLogRotate, true, false, optNone },
|
||||
// { "nickname_info", &RPCHandler::doNicknameInfo, false, false, optCurrent },
|
||||
{ "owner_info", &RPCHandler::doOwnerInfo, false, false, optCurrent },
|
||||
{ "peers", &RPCHandler::doPeers, true, false, optNone },
|
||||
// { "profile", &RPCHandler::doProfile, false, false, optCurrent },
|
||||
{ "random", &RPCHandler::doRandom, false, false, optNone },
|
||||
{ "ripple_path_find", &RPCHandler::doRipplePathFind, false, false, optCurrent },
|
||||
{ "submit", &RPCHandler::doSubmit, false, false, optCurrent },
|
||||
{ "server_info", &RPCHandler::doServerInfo, true, false, optNone },
|
||||
{ "stop", &RPCHandler::doStop, true, false, optNone },
|
||||
{ "transaction_entry", &RPCHandler::doTransactionEntry, false, false, optCurrent },
|
||||
{ "tx", &RPCHandler::doTx, false, false, optNetwork },
|
||||
{ "tx_history", &RPCHandler::doTxHistory, false, false, optNone },
|
||||
{ "accept_ledger", &RPCHandler::doAcceptLedger, true, optCurrent },
|
||||
{ "account_info", &RPCHandler::doAccountInfo, false, optCurrent },
|
||||
{ "account_lines", &RPCHandler::doAccountLines, false, optCurrent },
|
||||
{ "account_offers", &RPCHandler::doAccountOffers, false, optCurrent },
|
||||
{ "account_tx", &RPCHandler::doAccountTransactions, false, optNetwork },
|
||||
{ "connect", &RPCHandler::doConnect, true, optNone },
|
||||
{ "get_counts", &RPCHandler::doGetCounts, true, optNone },
|
||||
{ "ledger", &RPCHandler::doLedger, false, optNetwork },
|
||||
{ "ledger_accept", &RPCHandler::doLedgerAccept, true, optCurrent },
|
||||
{ "ledger_closed", &RPCHandler::doLedgerClosed, false, optClosed },
|
||||
{ "ledger_current", &RPCHandler::doLedgerCurrent, false, optCurrent },
|
||||
{ "ledger_entry", &RPCHandler::doLedgerEntry, false, optCurrent },
|
||||
{ "ledger_header", &RPCHandler::doLedgerHeader, false, optCurrent },
|
||||
{ "log_level", &RPCHandler::doLogLevel, true, optNone },
|
||||
{ "logrotate", &RPCHandler::doLogRotate, true, optNone },
|
||||
// { "nickname_info", &RPCHandler::doNicknameInfo, false, optCurrent },
|
||||
{ "owner_info", &RPCHandler::doOwnerInfo, false, optCurrent },
|
||||
{ "peers", &RPCHandler::doPeers, true, optNone },
|
||||
// { "profile", &RPCHandler::doProfile, false, optCurrent },
|
||||
{ "random", &RPCHandler::doRandom, false, optNone },
|
||||
{ "ripple_path_find", &RPCHandler::doRipplePathFind, false, optCurrent },
|
||||
{ "submit", &RPCHandler::doSubmit, false, optCurrent },
|
||||
{ "server_info", &RPCHandler::doServerInfo, true, optNone },
|
||||
{ "stop", &RPCHandler::doStop, true, optNone },
|
||||
{ "transaction_entry", &RPCHandler::doTransactionEntry, false, optCurrent },
|
||||
{ "tx", &RPCHandler::doTx, false, optNetwork },
|
||||
{ "tx_history", &RPCHandler::doTxHistory, false, optNone },
|
||||
|
||||
{ "unl_add", &RPCHandler::doUnlAdd, true, false, optNone },
|
||||
{ "unl_delete", &RPCHandler::doUnlDelete, true, false, optNone },
|
||||
{ "unl_list", &RPCHandler::doUnlList, true, false, optNone },
|
||||
{ "unl_load", &RPCHandler::doUnlLoad, true, false, optNone },
|
||||
{ "unl_network", &RPCHandler::doUnlNetwork, true, false, optNone },
|
||||
{ "unl_reset", &RPCHandler::doUnlReset, true, false, optNone },
|
||||
{ "unl_score", &RPCHandler::doUnlScore, true, false, optNone },
|
||||
{ "unl_add", &RPCHandler::doUnlAdd, true, optNone },
|
||||
{ "unl_delete", &RPCHandler::doUnlDelete, true, optNone },
|
||||
{ "unl_list", &RPCHandler::doUnlList, true, optNone },
|
||||
{ "unl_load", &RPCHandler::doUnlLoad, true, optNone },
|
||||
{ "unl_network", &RPCHandler::doUnlNetwork, true, optNone },
|
||||
{ "unl_reset", &RPCHandler::doUnlReset, true, optNone },
|
||||
{ "unl_score", &RPCHandler::doUnlScore, true, optNone },
|
||||
|
||||
{ "validation_create", &RPCHandler::doValidationCreate, false, false, optNone },
|
||||
{ "validation_seed", &RPCHandler::doValidationSeed, false, false, optNone },
|
||||
{ "validation_create", &RPCHandler::doValidationCreate, false, optNone },
|
||||
{ "validation_seed", &RPCHandler::doValidationSeed, false, optNone },
|
||||
|
||||
{ "wallet_accounts", &RPCHandler::doWalletAccounts, false, false, optCurrent },
|
||||
{ "wallet_propose", &RPCHandler::doWalletPropose, false, false, optNone },
|
||||
{ "wallet_seed", &RPCHandler::doWalletSeed, false, false, optNone },
|
||||
{ "wallet_accounts", &RPCHandler::doWalletAccounts, false, optCurrent },
|
||||
{ "wallet_propose", &RPCHandler::doWalletPropose, false, optNone },
|
||||
{ "wallet_seed", &RPCHandler::doWalletSeed, false, optNone },
|
||||
|
||||
// XXX Unnecessary commands which should be removed.
|
||||
{ "login", &RPCHandler::doLogin, true, false, optNone },
|
||||
{ "data_delete", &RPCHandler::doDataDelete, true, false, optNone },
|
||||
{ "data_fetch", &RPCHandler::doDataFetch, true, false, optNone },
|
||||
{ "data_store", &RPCHandler::doDataStore, true, false, optNone },
|
||||
{ "login", &RPCHandler::doLogin, true, optNone },
|
||||
{ "data_delete", &RPCHandler::doDataDelete, true, optNone },
|
||||
{ "data_fetch", &RPCHandler::doDataFetch, true, optNone },
|
||||
{ "data_store", &RPCHandler::doDataStore, true, optNone },
|
||||
|
||||
// Evented methods
|
||||
{ "subscribe", &RPCHandler::doSubscribe, false, true, optNone },
|
||||
{ "unsubscribe", &RPCHandler::doUnsubscribe, false, true, optNone },
|
||||
{ "subscribe", &RPCHandler::doSubscribe, false, optNone },
|
||||
{ "unsubscribe", &RPCHandler::doUnsubscribe, false, optNone },
|
||||
};
|
||||
|
||||
int i = NUMBER(commandsA);
|
||||
@@ -2487,10 +2486,6 @@ Json::Value RPCHandler::doCommand(Json::Value& jvRequest, int iRole)
|
||||
{
|
||||
return rpcError(rpcNO_PERMISSION);
|
||||
}
|
||||
else if (commandsA[i].bEvented && mInfoSub == NULL)
|
||||
{
|
||||
return rpcError(rpcNO_EVENTS);
|
||||
}
|
||||
else if (commandsA[i].iOptions & optNetwork
|
||||
&& mNetOps->getOperatingMode() != NetworkOPs::omTRACKING
|
||||
&& mNetOps->getOperatingMode() != NetworkOPs::omFULL)
|
||||
|
||||
@@ -1,12 +1,73 @@
|
||||
#include <boost/thread.hpp>
|
||||
|
||||
#include "RPCSub.h"
|
||||
|
||||
#include "CallRPC.h"
|
||||
|
||||
RPCSub::RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword)
|
||||
: mUrl(strUrl), mUsername(strUsername), mPassword(strPassword)
|
||||
{
|
||||
mId = 1;
|
||||
}
|
||||
|
||||
void RPCSub::sendThread()
|
||||
{
|
||||
Json::Value jvEvent;
|
||||
bool bSend;
|
||||
|
||||
do
|
||||
{
|
||||
{
|
||||
// Obtain the lock to manipulate the queue and change sending.
|
||||
boost::mutex::scoped_lock sl(mLockInfo);
|
||||
|
||||
if (mDeque.empty())
|
||||
{
|
||||
mSending = false;
|
||||
bSend = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::pair<int, Json::Value> pEvent = mDeque.front();
|
||||
|
||||
mDeque.pop_front();
|
||||
|
||||
jvEvent = pEvent.second;
|
||||
jvEvent["id"] = pEvent.first;
|
||||
|
||||
bSend = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Send outside of the lock.
|
||||
if (bSend)
|
||||
{
|
||||
// Drop result.
|
||||
(void) callRPC(mIp, mPort, mUsername, mPassword, "event", jvEvent);
|
||||
|
||||
sendThread();
|
||||
}
|
||||
} while (bSend);
|
||||
}
|
||||
|
||||
void RPCSub::send(const Json::Value& jvObj)
|
||||
{
|
||||
boost::mutex::scoped_lock sl(mLockInfo);
|
||||
|
||||
if (RPC_EVENT_QUEUE_MAX == mDeque.size())
|
||||
{
|
||||
// Drop the previous event.
|
||||
|
||||
mDeque.pop_back();
|
||||
}
|
||||
|
||||
mDeque.push_back(std::make_pair(mId++, jvObj));
|
||||
|
||||
if (!mSending)
|
||||
{
|
||||
// Start a sending thread.
|
||||
mSending = true;
|
||||
|
||||
boost::thread(boost::bind(&RPCSub::sendThread, this)).detach();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,37 @@
|
||||
#ifndef __RPCSUB__
|
||||
#define __RPCSUB__
|
||||
|
||||
#include <deque>
|
||||
|
||||
#include "../json/value.h"
|
||||
|
||||
#include "NetworkOPs.h"
|
||||
|
||||
#define RPC_EVENT_QUEUE_MAX 32
|
||||
|
||||
// Subscription object for JSON-RPC
|
||||
class RPCSub : public InfoSub
|
||||
{
|
||||
std::string mUrl;
|
||||
std::string mIp;
|
||||
int mPort;
|
||||
std::string mUsername;
|
||||
std::string mPassword;
|
||||
std::string mUrl;
|
||||
std::string mIp;
|
||||
int mPort;
|
||||
std::string mUsername;
|
||||
std::string mPassword;
|
||||
|
||||
int mId; // Next id to allocate.
|
||||
|
||||
bool mSending; // Sending threead is active.
|
||||
|
||||
std::deque<std::pair<int, Json::Value> > mDeque;
|
||||
|
||||
protected:
|
||||
void sendThread();
|
||||
|
||||
public:
|
||||
RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword);
|
||||
|
||||
virtual ~RPCSub() { ; }
|
||||
|
||||
// Implement overridden functions from base class:
|
||||
void send(const Json::Value& jvObj);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user