Stub subscribe for JSON-RPC.

This commit is contained in:
Arthur Britto
2012-12-29 14:42:38 -08:00
parent 86d7781255
commit 192dae3b74
7 changed files with 203 additions and 54 deletions

View File

@@ -503,6 +503,10 @@ int commandLineRPC(const std::vector<std::string>& vCmd)
RPCParser rpParser; RPCParser rpParser;
Json::Value jvRpcParams(Json::arrayValue); Json::Value jvRpcParams(Json::arrayValue);
if (theConfig.RPC_USER.empty() && theConfig.RPC_PASSWORD.empty())
throw std::runtime_error("You must set rpcpassword=<password> in the configuration file. "
"If the file does not exist, create it with owner-readable-only file permissions.");
if (vCmd.empty()) return 1; // 1 = print usage. if (vCmd.empty()) return 1; // 1 = print usage.
for (int i = 1; i != vCmd.size(); i++) for (int i = 1; i != vCmd.size(); i++)
@@ -529,6 +533,10 @@ int commandLineRPC(const std::vector<std::string>& vCmd)
jvParams.append(jvRequest); jvParams.append(jvRequest);
jvOutput = callRPC( jvOutput = callRPC(
theConfig.RPC_IP,
theConfig.RPC_PORT,
theConfig.RPC_USER,
theConfig.RPC_PASSWORD,
jvRequest.isMember("method") // Allow parser to rewrite method. jvRequest.isMember("method") // Allow parser to rewrite method.
? jvRequest["method"].asString() ? jvRequest["method"].asString()
: vCmd[0], : vCmd[0],
@@ -589,25 +597,21 @@ int commandLineRPC(const std::vector<std::string>& vCmd)
return nRet; return nRet;
} }
Json::Value callRPC(const std::string& strMethod, const Json::Value& params) Json::Value callRPC(const std::string& strIp, const int iPort, const std::string& strUsername, const std::string& strPassword, const std::string& strMethod, const Json::Value& params)
{ {
if (theConfig.RPC_USER.empty() && theConfig.RPC_PASSWORD.empty())
throw std::runtime_error("You must set rpcpassword=<password> in the configuration file. "
"If the file does not exist, create it with owner-readable-only file permissions.");
// Connect to localhost // Connect to localhost
if (!theConfig.QUIET) if (!theConfig.QUIET)
std::cerr << "Connecting to: " << theConfig.RPC_IP << ":" << theConfig.RPC_PORT << std::endl; std::cerr << "Connecting to: " << theConfig.RPC_IP << ":" << theConfig.RPC_PORT << std::endl;
boost::asio::ip::tcp::endpoint boost::asio::ip::tcp::endpoint
endpoint(boost::asio::ip::address::from_string(theConfig.RPC_IP), theConfig.RPC_PORT); endpoint(boost::asio::ip::address::from_string(strIp), iPort);
boost::asio::ip::tcp::iostream stream; boost::asio::ip::tcp::iostream stream;
stream.connect(endpoint); stream.connect(endpoint);
if (stream.fail()) if (stream.fail())
throw std::runtime_error("couldn't connect to server"); throw std::runtime_error("couldn't connect to server");
// HTTP basic authentication // HTTP basic authentication
std::string strUserPass64 = EncodeBase64(theConfig.RPC_USER + ":" + theConfig.RPC_PASSWORD); std::string strUserPass64 = EncodeBase64(strUsername + ":" + strPassword);
std::map<std::string, std::string> mapRequestHeaders; std::map<std::string, std::string> mapRequestHeaders;
mapRequestHeaders["Authorization"] = std::string("Basic ") + strUserPass64; mapRequestHeaders["Authorization"] = std::string("Basic ") + strUserPass64;

View File

@@ -42,7 +42,7 @@ public:
}; };
extern int commandLineRPC(const std::vector<std::string>& vCmd); extern int commandLineRPC(const std::vector<std::string>& vCmd);
extern Json::Value callRPC(const std::string& strMethod, const Json::Value& params); extern Json::Value callRPC(const std::string& strIp, const int iPort, const std::string& strUsername, const std::string& strPassword, const std::string& strMethod, const Json::Value& params);
#endif #endif

View File

@@ -1502,4 +1502,15 @@ bool NetworkOPs::unsubRTTransactions(InfoSub* ispListener)
return !!mSubTransactions.erase(ispListener); return !!mSubTransactions.erase(ispListener);
} }
RPCSub* NetworkOPs::findRpcSub(const std::string& strRpc)
{
return (RPCSub*)(0);
}
RPCSub* NetworkOPs::addRpcSub(const std::string& strRpc, RPCSub* rspEntry)
{
return rspEntry;
}
// vim:ts=4 // vim:ts=4

View File

@@ -22,6 +22,8 @@ class LedgerConsensus;
DEFINE_INSTANCE(InfoSub); DEFINE_INSTANCE(InfoSub);
class RPCSub;
class InfoSub : public IS_INSTANCE(InfoSub) class InfoSub : public IS_INSTANCE(InfoSub)
{ {
public: public:
@@ -34,12 +36,12 @@ protected:
boost::unordered_set<RippleAddress> mSubAccountInfo; boost::unordered_set<RippleAddress> mSubAccountInfo;
boost::unordered_set<RippleAddress> mSubAccountTransaction; boost::unordered_set<RippleAddress> mSubAccountTransaction;
boost::mutex mLock; boost::mutex mLockInfo;
public: public:
void insertSubAccountInfo(RippleAddress addr) void insertSubAccountInfo(RippleAddress addr)
{ {
boost::mutex::scoped_lock sl(mLock); boost::mutex::scoped_lock sl(mLockInfo);
mSubAccountInfo.insert(addr); mSubAccountInfo.insert(addr);
} }
}; };
@@ -68,6 +70,8 @@ protected:
typedef boost::unordered_map<uint160,std::pair<InfoSub*,uint32> > subSubmitMapType; typedef boost::unordered_map<uint160,std::pair<InfoSub*,uint32> > subSubmitMapType;
typedef boost::unordered_map<std::string, InfoSub* > subRpcMapType;
OperatingMode mMode; OperatingMode mMode;
bool mNeedNetworkLedger; bool mNeedNetworkLedger;
boost::posix_time::ptime mConnectTime; boost::posix_time::ptime mConnectTime;
@@ -97,12 +101,13 @@ protected:
subInfoMapType mSubRTAccount; subInfoMapType mSubRTAccount;
subSubmitMapType mSubmitMap; // TODO: probably dump this subSubmitMapType mSubmitMap; // TODO: probably dump this
subRpcMapType mRpcSubMap;
boost::unordered_set<InfoSub*> mSubLedger; // accepted ledgers boost::unordered_set<InfoSub*> mSubLedger; // accepted ledgers
boost::unordered_set<InfoSub*> mSubServer; // when server changes connectivity state boost::unordered_set<InfoSub*> mSubServer; // when server changes connectivity state
boost::unordered_set<InfoSub*> mSubTransactions; // all accepted transactions boost::unordered_set<InfoSub*> mSubTransactions; // all accepted transactions
boost::unordered_set<InfoSub*> mSubRTTransactions; // all proposed and accepted transactions boost::unordered_set<InfoSub*> mSubRTTransactions; // all proposed and accepted transactions
void setMode(OperatingMode); void setMode(OperatingMode);
Json::Value transJson(const SerializedTransaction& stTxn, TER terResult, bool bAccepted, Ledger::ref lpCurrent, const std::string& strType); Json::Value transJson(const SerializedTransaction& stTxn, TER terResult, bool bAccepted, Ledger::ref lpCurrent, const std::string& strType);
@@ -270,6 +275,9 @@ public:
bool subRTTransactions(InfoSub* ispListener); bool subRTTransactions(InfoSub* ispListener);
bool unsubRTTransactions(InfoSub* ispListener); bool unsubRTTransactions(InfoSub* ispListener);
RPCSub* findRpcSub(const std::string& strRpc);
RPCSub* addRpcSub(const std::string& strRpc, RPCSub* rspEntry);
}; };
#endif #endif

View File

@@ -1,5 +1,5 @@
// //
// carries out the RPC // Carries out the RPC.
// //
#include <openssl/md5.h> #include <openssl/md5.h>
@@ -11,6 +11,7 @@
#include "Log.h" #include "Log.h"
#include "NetworkOPs.h" #include "NetworkOPs.h"
#include "RPCHandler.h" #include "RPCHandler.h"
#include "RPCSub.h"
#include "Application.h" #include "Application.h"
#include "AccountItems.h" #include "AccountItems.h"
#include "Wallet.h" #include "Wallet.h"
@@ -22,19 +23,18 @@
#include "InstanceCounter.h" #include "InstanceCounter.h"
#include "Offer.h" #include "Offer.h"
SETUP_LOG(); SETUP_LOG();
RPCHandler::RPCHandler(NetworkOPs* netOps) RPCHandler::RPCHandler(NetworkOPs* netOps)
{ {
mNetOps=netOps; mNetOps = netOps;
mInfoSub=NULL; mInfoSub = NULL;
} }
RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub* infoSub) RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub* infoSub)
{ {
mNetOps=netOps; mNetOps = netOps;
mInfoSub=infoSub; mInfoSub = infoSub;
} }
// Look up the master public generator for a regular seed so we may index source accounts ids. // Look up the master public generator for a regular seed so we may index source accounts ids.
@@ -2138,8 +2138,45 @@ rt_accounts
*/ */
Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
{ {
InfoSub* ispSub;
Json::Value jvResult(Json::objectValue); Json::Value jvResult(Json::objectValue);
if (!mInfoSub && !jvRequest.isMember("url"))
{
// Must be a JSON-RPC call.
return rpcError(rpcINVALID_PARAMS);
}
if (jvRequest.isMember("url"))
{
if (mRole != ADMIN)
return rpcError(rpcNO_PERMISSION);
std::string strUrl = jvRequest["url"].asString();
std::string strUsername = jvRequest.isMember("username") ? jvRequest["username"].asString() : "";
std::string strPassword = jvRequest.isMember("password") ? jvRequest["password"].asString() : "";
RPCSub *rspSub = mNetOps->findRpcSub(strUrl);
if (!rspSub)
{
rspSub = mNetOps->addRpcSub(strUrl, new RPCSub(strUrl, strUsername, strPassword));
}
else
{
if (jvRequest.isMember("username"))
rspSub->setUsername(strUsername);
if (jvRequest.isMember("password"))
rspSub->setPassword(strPassword);
}
ispSub = rspSub;
}
else
{
ispSub = mInfoSub;
}
if (jvRequest.isMember("streams")) if (jvRequest.isMember("streams"))
{ {
for (Json::Value::iterator it = jvRequest["streams"].begin(); it != jvRequest["streams"].end(); it++) for (Json::Value::iterator it = jvRequest["streams"].begin(); it != jvRequest["streams"].end(); it++)
@@ -2148,26 +2185,31 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
{ {
std::string streamName=(*it).asString(); std::string streamName=(*it).asString();
if(streamName=="server") if (streamName=="server")
{ {
mNetOps->subServer(mInfoSub, jvResult); mNetOps->subServer(ispSub, jvResult);
} else if(streamName=="ledger")
{
mNetOps->subLedger(mInfoSub, jvResult);
} else if(streamName=="transactions")
{
mNetOps->subTransactions(mInfoSub);
} else if(streamName=="rt_transactions")
{
mNetOps->subRTTransactions(mInfoSub);
} }
else { else if (streamName=="ledger")
{
mNetOps->subLedger(ispSub, jvResult);
}
else if (streamName=="transactions")
{
mNetOps->subTransactions(ispSub);
}
else if (streamName=="rt_transactions")
{
mNetOps->subRTTransactions(ispSub);
}
else
{
jvResult["error"] = str(boost::format("Unknown stream: %s") % streamName); jvResult["error"] = str(boost::format("Unknown stream: %s") % streamName);
} }
} else }
else
{ {
jvResult["error"] = "malformedSteam"; jvResult["error"] = "malformedSteam";
} }
@@ -2181,14 +2223,15 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
if (usnaAccoundIds.empty()) if (usnaAccoundIds.empty())
{ {
jvResult["error"] = "malformedAccount"; jvResult["error"] = "malformedAccount";
}else }
else
{ {
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{ {
mInfoSub->insertSubAccountInfo(naAccountID); ispSub->insertSubAccountInfo(naAccountID);
} }
mNetOps->subAccount(mInfoSub, usnaAccoundIds, true); mNetOps->subAccount(ispSub, usnaAccoundIds, true);
} }
} }
@@ -2199,24 +2242,51 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
if (usnaAccoundIds.empty()) if (usnaAccoundIds.empty())
{ {
jvResult["error"] = "malformedAccount"; jvResult["error"] = "malformedAccount";
}else }
else
{ {
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{ {
mInfoSub->insertSubAccountInfo(naAccountID); ispSub->insertSubAccountInfo(naAccountID);
} }
mNetOps->subAccount(mInfoSub, usnaAccoundIds, false); mNetOps->subAccount(ispSub, usnaAccoundIds, false);
} }
} }
return jvResult; return jvResult;
} }
// This leaks RPCSub objects for JSON-RPC. Shouldn't matter for anyone sane.
Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest)
{ {
InfoSub* ispSub;
Json::Value jvResult(Json::objectValue); Json::Value jvResult(Json::objectValue);
if (!mInfoSub && !jvRequest.isMember("url"))
{
// Must be a JSON-RPC call.
return rpcError(rpcINVALID_PARAMS);
}
if (jvRequest.isMember("url"))
{
if (mRole != ADMIN)
return rpcError(rpcNO_PERMISSION);
std::string strUrl = jvRequest["url"].asString();
RPCSub *rspSub = mNetOps->findRpcSub(strUrl);
if (!rspSub)
return jvResult;
ispSub = rspSub;
}
else
{
ispSub = mInfoSub;
}
if (jvRequest.isMember("streams")) if (jvRequest.isMember("streams"))
{ {
for (Json::Value::iterator it = jvRequest["streams"].begin(); it != jvRequest["streams"].end(); it++) for (Json::Value::iterator it = jvRequest["streams"].begin(); it != jvRequest["streams"].end(); it++)
@@ -2225,23 +2295,28 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest)
{ {
std::string streamName=(*it).asString(); std::string streamName=(*it).asString();
if(streamName=="server") if (streamName == "server")
{ {
mNetOps->unsubServer(mInfoSub); mNetOps->unsubServer(ispSub);
}else if(streamName=="ledger") }
else if (streamName == "ledger")
{ {
mNetOps->unsubLedger(mInfoSub); mNetOps->unsubLedger(ispSub);
}else if(streamName=="transactions") }
else if (streamName == "transactions")
{ {
mNetOps->unsubTransactions(mInfoSub); mNetOps->unsubTransactions(ispSub);
}else if(streamName=="rt_transactions") }
else if (streamName == "rt_transactions")
{ {
mNetOps->unsubRTTransactions(mInfoSub); mNetOps->unsubRTTransactions(ispSub);
}else }
else
{ {
jvResult["error"] = str(boost::format("Unknown stream: %s") % streamName); jvResult["error"] = str(boost::format("Unknown stream: %s") % streamName);
} }
}else }
else
{ {
jvResult["error"] = "malformedSteam"; jvResult["error"] = "malformedSteam";
} }
@@ -2255,14 +2330,15 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest)
if (usnaAccoundIds.empty()) if (usnaAccoundIds.empty())
{ {
jvResult["error"] = "malformedAccount"; jvResult["error"] = "malformedAccount";
}else }
else
{ {
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{ {
mInfoSub->insertSubAccountInfo(naAccountID); ispSub->insertSubAccountInfo(naAccountID);
} }
mNetOps->unsubAccount(mInfoSub, usnaAccoundIds,true); mNetOps->unsubAccount(ispSub, usnaAccoundIds,true);
} }
} }
@@ -2273,14 +2349,15 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest)
if (usnaAccoundIds.empty()) if (usnaAccoundIds.empty())
{ {
jvResult["error"] = "malformedAccount"; jvResult["error"] = "malformedAccount";
}else }
else
{ {
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{ {
mInfoSub->insertSubAccountInfo(naAccountID); ispSub->insertSubAccountInfo(naAccountID);
} }
mNetOps->unsubAccount(mInfoSub, usnaAccoundIds,false); mNetOps->unsubAccount(ispSub, usnaAccoundIds,false);
} }
} }

12
src/cpp/ripple/RPCSub.cpp Normal file
View File

@@ -0,0 +1,12 @@
#include "RPCSub.h"
RPCSub::RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword)
: mUrl(strUrl), mUsername(strUsername), mPassword(strPassword)
{
}
void RPCSub::send(const Json::Value& jvObj)
{
}

37
src/cpp/ripple/RPCSub.h Normal file
View File

@@ -0,0 +1,37 @@
#ifndef __RPCSUB__
#define __RPCSUB__
#include "NetworkOPs.h"
// Subscription object for JSON-RPC
class RPCSub : public InfoSub
{
std::string mUrl;
std::string mIp;
int mPort;
std::string mUsername;
std::string mPassword;
public:
RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword);
// Implement overridden functions from base class:
void send(const Json::Value& jvObj);
void setUsername(const std::string& strUsername)
{
boost::mutex::scoped_lock sl(mLockInfo);
mUsername = strUsername;
}
void setPassword(const std::string& strPassword)
{
boost::mutex::scoped_lock sl(mLockInfo);
mPassword = strPassword;
}
};
#endif
// vim:ts=4