mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Get WS ledger_subscribe and ledger_unsubscribe working.
This commit is contained in:
@@ -318,6 +318,8 @@ void Ledger::saveAcceptedLedger(Ledger::pointer ledger)
|
||||
}
|
||||
}
|
||||
db->executeSQL("COMMIT TRANSACTION;");
|
||||
|
||||
theApp->getOPs().pubLedger(ledger);
|
||||
}
|
||||
|
||||
Ledger::pointer Ledger::getSQL(const std::string& sql)
|
||||
|
||||
@@ -695,15 +695,34 @@ void NetworkOPs::pubAccountInfo(const NewcoinAddress& naAccountID, const Json::V
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted)
|
||||
{
|
||||
if (!mSubLedger.empty())
|
||||
{
|
||||
Json::Value jvObj(Json::objectValue);
|
||||
|
||||
jvObj["type"] = "ledgerAccepted";
|
||||
jvObj["seq"] = lpAccepted->getLedgerSeq();
|
||||
jvObj["hash"] = lpAccepted->getHash().ToString();
|
||||
jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC());
|
||||
|
||||
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
|
||||
BOOST_FOREACH(InfoSub* ispListener, mSubLedger)
|
||||
{
|
||||
ispListener->send(jvObj);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Monitoring
|
||||
//
|
||||
|
||||
void NetworkOPs::subAccountInfo(InfoSub* ispListener, const std::vector<NewcoinAddress>& vnaAccountIDs)
|
||||
void NetworkOPs::subAccountInfo(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs)
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
|
||||
|
||||
BOOST_FOREACH(NewcoinAddress naAccountID, vnaAccountIDs)
|
||||
BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs)
|
||||
{
|
||||
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID);
|
||||
if (simIterator == mSubAccountInfo.end())
|
||||
@@ -722,11 +741,11 @@ void NetworkOPs::subAccountInfo(InfoSub* ispListener, const std::vector<NewcoinA
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const std::vector<NewcoinAddress>& vnaAccountIDs)
|
||||
void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs)
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
|
||||
|
||||
BOOST_FOREACH(NewcoinAddress naAccountID, vnaAccountIDs)
|
||||
BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs)
|
||||
{
|
||||
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID);
|
||||
if (simIterator == mSubAccountInfo.end())
|
||||
@@ -748,7 +767,7 @@ void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const std::vector<Newcoi
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#if 0
|
||||
void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash)
|
||||
{
|
||||
}
|
||||
@@ -756,6 +775,18 @@ void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHa
|
||||
void NetworkOPs::unsubAccountChanges(InfoSub* ispListener)
|
||||
{
|
||||
}
|
||||
#endif
|
||||
|
||||
// <-- bool: true=added, false=already there
|
||||
bool NetworkOPs::subLedger(InfoSub* ispListener)
|
||||
{
|
||||
return mSubLedger.insert(ispListener).second;
|
||||
}
|
||||
|
||||
// <-- bool: true=erased, false=was not there
|
||||
bool NetworkOPs::unsubLedger(InfoSub* ispListener)
|
||||
{
|
||||
return !!mSubLedger.erase(ispListener);
|
||||
}
|
||||
|
||||
// vim:ts=4
|
||||
|
||||
@@ -21,6 +21,9 @@ class LedgerConsensus;
|
||||
class InfoSub
|
||||
{
|
||||
public:
|
||||
|
||||
virtual ~InfoSub() { ; }
|
||||
|
||||
virtual void send(const Json::Value& jvObj) = 0;
|
||||
};
|
||||
|
||||
@@ -55,6 +58,7 @@ protected:
|
||||
|
||||
boost::interprocess::interprocess_upgradable_mutex mMonitorLock;
|
||||
subInfoMapType mSubAccountInfo;
|
||||
boost::unordered_set<InfoSub*> mSubLedger;
|
||||
|
||||
public:
|
||||
NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster);
|
||||
@@ -65,8 +69,8 @@ public:
|
||||
uint32 getCurrentLedgerID();
|
||||
OperatingMode getOperatingMode() { return mMode; }
|
||||
inline bool available() {
|
||||
// XXX don't consider network available till have a closed ledger.
|
||||
return omDISCONNECTED != mMode;
|
||||
// XXX Later this can be relaxed to omCONNECTED
|
||||
return mMode >= omTRACKING;
|
||||
}
|
||||
|
||||
uint256 getClosedLedger()
|
||||
@@ -154,17 +158,21 @@ public:
|
||||
//
|
||||
|
||||
void pubAccountInfo(const NewcoinAddress& naAccountID, const Json::Value& jvObj);
|
||||
void pubLedger(const Ledger::pointer& lpAccepted);
|
||||
|
||||
//
|
||||
// Monitoring: subscriber side
|
||||
//
|
||||
|
||||
// --> vnaAddress: empty = all
|
||||
void subAccountInfo(InfoSub* ispListener, const std::vector<NewcoinAddress>& vnaAccountIDs);
|
||||
void unsubAccountInfo(InfoSub* ispListener, const std::vector<NewcoinAddress>& vnaAccountIDs);
|
||||
void subAccountInfo(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs);
|
||||
void unsubAccountInfo(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs);
|
||||
|
||||
void subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash);
|
||||
void unsubAccountChanges(InfoSub* ispListener);
|
||||
// void subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash);
|
||||
// void unsubAccountChanges(InfoSub* ispListener);
|
||||
|
||||
bool subLedger(InfoSub* ispListener);
|
||||
bool unsubLedger(InfoSub* ispListener);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
219
src/WSDoor.cpp
219
src/WSDoor.cpp
@@ -5,6 +5,7 @@
|
||||
#include "Config.h"
|
||||
#include "Log.h"
|
||||
#include "NetworkOPs.h"
|
||||
#include "NetworkOPs.h"
|
||||
#include "utils.h"
|
||||
|
||||
#include <iostream>
|
||||
@@ -12,6 +13,7 @@
|
||||
#include <boost/bind.hpp>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/mem_fn.hpp>
|
||||
#include <boost/unordered_set.hpp>
|
||||
|
||||
#include "../json/reader.h"
|
||||
#include "../json/writer.h"
|
||||
@@ -47,6 +49,11 @@ public:
|
||||
typedef typename websocketpp::WSDOOR_SERVER::handler::message_ptr message_ptr;
|
||||
|
||||
protected:
|
||||
typedef void (WSConnection::*doFuncPtr)(Json::Value& jvResult, const Json::Value &jvRequest);
|
||||
|
||||
boost::mutex mLock;
|
||||
boost::unordered_set<NewcoinAddress> mSubAccountInfo;
|
||||
|
||||
WSServerHandler<websocketpp::WSDOOR_SERVER>* mHandler;
|
||||
connection_ptr mConnection;
|
||||
|
||||
@@ -58,14 +65,20 @@ public:
|
||||
WSConnection(WSServerHandler<websocketpp::WSDOOR_SERVER>* wshpHandler, connection_ptr cpConnection)
|
||||
: mHandler(wshpHandler), mConnection(cpConnection) { ; }
|
||||
|
||||
~WSConnection()
|
||||
{
|
||||
// XXX Unsubscribe.
|
||||
nothing();
|
||||
}
|
||||
virtual ~WSConnection();
|
||||
|
||||
// Implement overriden functions from base class:
|
||||
void send(const Json::Value& jvObj);
|
||||
|
||||
// Utilities
|
||||
Json::Value invokeCommand(const Json::Value& jvRequest);
|
||||
boost::unordered_set<NewcoinAddress> parseAccountIds(const Json::Value& jvArray);
|
||||
|
||||
// Commands
|
||||
void doAccountInfoSubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
|
||||
void doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
|
||||
void doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest);
|
||||
void doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
|
||||
};
|
||||
|
||||
|
||||
@@ -88,7 +101,8 @@ private:
|
||||
|
||||
protected:
|
||||
boost::mutex mMapLock;
|
||||
boost::unordered_map<connection_ptr, WSConnection> mMap;
|
||||
// For each connection maintain an assoicated object to track subscriptions.
|
||||
boost::unordered_map<connection_ptr, boost::shared_ptr<WSConnection> > mMap;
|
||||
|
||||
public:
|
||||
WSServerHandler(boost::shared_ptr<boost::asio::ssl::context> spCtx) : mCtx(spCtx) {}
|
||||
@@ -130,7 +144,6 @@ public:
|
||||
|
||||
Log(lsINFO) << "Ws:: Object '" << jfwWriter.write(jvObj) << "'";
|
||||
|
||||
|
||||
send(cpClient, jfwWriter.write(jvObj));
|
||||
}
|
||||
|
||||
@@ -138,7 +151,7 @@ public:
|
||||
{
|
||||
boost::mutex::scoped_lock sl(mMapLock);
|
||||
|
||||
mMap[cpClient] = WSConnection(this, cpClient);
|
||||
mMap[cpClient] = boost::make_shared<WSConnection>(this, cpClient);
|
||||
}
|
||||
|
||||
void on_close(connection_ptr cpClient)
|
||||
@@ -174,12 +187,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
Json::Value jvResult(Json::objectValue);
|
||||
|
||||
jvResult["type"] = "success";
|
||||
jvResult["value"] = mpMessage->get_payload();
|
||||
|
||||
send(cpClient, jvResult);
|
||||
send(cpClient, mMap[cpClient]->invokeCommand(jvRequest));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,9 +258,192 @@ void WSDoor::stop()
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// WSConnection
|
||||
//
|
||||
|
||||
WSConnection::~WSConnection()
|
||||
{
|
||||
theApp->getOPs().unsubLedger(this);
|
||||
theApp->getOPs().unsubAccountInfo(this, mSubAccountInfo);
|
||||
}
|
||||
|
||||
void WSConnection::send(const Json::Value& jvObj)
|
||||
{
|
||||
mHandler->send(mConnection, jvObj);
|
||||
}
|
||||
|
||||
//
|
||||
// Utilities
|
||||
//
|
||||
|
||||
Json::Value WSConnection::invokeCommand(const Json::Value& jvRequest)
|
||||
{
|
||||
static struct {
|
||||
const char* pCommand;
|
||||
doFuncPtr dfpFunc;
|
||||
} commandsA[] = {
|
||||
{ "account_info_subscribe", &WSConnection::doAccountInfoSubscribe },
|
||||
{ "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe },
|
||||
{ "ledger_subscribe", &WSConnection::doLedgerSubcribe },
|
||||
{ "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe },
|
||||
};
|
||||
|
||||
|
||||
if (!jvRequest.isMember("command"))
|
||||
{
|
||||
Json::Value jvResult(Json::objectValue);
|
||||
|
||||
jvResult["type"] = "response";
|
||||
jvResult["result"] = "error";
|
||||
jvResult["error"] = "missingCommand";
|
||||
jvResult["command"] = jvRequest;
|
||||
|
||||
return jvResult;
|
||||
}
|
||||
|
||||
std::string strCommand = jvRequest["command"].asString();
|
||||
|
||||
int i = NUMBER(commandsA);
|
||||
|
||||
while (i-- && strCommand != commandsA[i].pCommand)
|
||||
;
|
||||
|
||||
Json::Value jvResult(Json::objectValue);
|
||||
|
||||
jvResult["type"] = "response";
|
||||
|
||||
if (i < 0)
|
||||
{
|
||||
jvResult["error"] = "unknownCommand"; // Unknown command.
|
||||
}
|
||||
else
|
||||
{
|
||||
(this->*(commandsA[i].dfpFunc))(jvResult, jvRequest);
|
||||
}
|
||||
|
||||
if (jvRequest.isMember("id"))
|
||||
{
|
||||
jvResult["id"] = jvRequest["id"];
|
||||
}
|
||||
|
||||
if (jvResult.isMember("error"))
|
||||
{
|
||||
jvResult["result"] = "error";
|
||||
jvResult["request"] = jvRequest;
|
||||
}
|
||||
else
|
||||
{
|
||||
jvResult["result"] = "success";
|
||||
}
|
||||
|
||||
return jvResult;
|
||||
}
|
||||
|
||||
boost::unordered_set<NewcoinAddress> WSConnection::parseAccountIds(const Json::Value& jvArray)
|
||||
{
|
||||
boost::unordered_set<NewcoinAddress> usnaResult;
|
||||
|
||||
for (Json::Value::const_iterator it = jvArray.begin(); it != jvArray.end(); it++)
|
||||
{
|
||||
NewcoinAddress naString;
|
||||
|
||||
if (!(*it).isString() || !naString.setAccountID((*it).asString()))
|
||||
{
|
||||
usnaResult.clear();
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
(void) usnaResult.insert(naString);
|
||||
}
|
||||
}
|
||||
|
||||
return usnaResult;
|
||||
}
|
||||
|
||||
//
|
||||
// Commands
|
||||
//
|
||||
|
||||
void WSConnection::doAccountInfoSubscribe(Json::Value& jvResult, const Json::Value& jvRequest)
|
||||
{
|
||||
if (!jvRequest.isMember("accounts"))
|
||||
{
|
||||
jvResult["error"] = "missingField";
|
||||
}
|
||||
else if (jvResult["accounts"].empty())
|
||||
{
|
||||
jvResult["error"] = "emptySet";
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::unordered_set<NewcoinAddress> usnaAccoundIds = parseAccountIds(jvRequest["accounts"]);
|
||||
|
||||
if (usnaAccoundIds.empty())
|
||||
{
|
||||
jvResult["error"] = "malformedAccount";
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::mutex::scoped_lock sl(mLock);
|
||||
|
||||
BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds)
|
||||
{
|
||||
mSubAccountInfo.insert(naAccountID);
|
||||
}
|
||||
|
||||
theApp->getOPs().subAccountInfo(this, usnaAccoundIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WSConnection::doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest)
|
||||
{
|
||||
if (!jvRequest.isMember("accounts"))
|
||||
{
|
||||
jvResult["error"] = "missingField";
|
||||
}
|
||||
else if (jvResult["accounts"].empty())
|
||||
{
|
||||
jvResult["error"] = "emptySet";
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::unordered_set<NewcoinAddress> usnaAccoundIds = parseAccountIds(jvRequest["accounts"]);
|
||||
|
||||
if (usnaAccoundIds.empty())
|
||||
{
|
||||
jvResult["error"] = "malformedAccount";
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::mutex::scoped_lock sl(mLock);
|
||||
|
||||
BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds)
|
||||
{
|
||||
mSubAccountInfo.erase(naAccountID);
|
||||
}
|
||||
|
||||
theApp->getOPs().unsubAccountInfo(this, usnaAccoundIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WSConnection::doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest)
|
||||
{
|
||||
if (!theApp->getOPs().subLedger(this))
|
||||
{
|
||||
jvResult["error"] = "ledgerSubscribed";
|
||||
}
|
||||
}
|
||||
|
||||
void WSConnection::doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest)
|
||||
{
|
||||
if (!theApp->getOPs().unsubLedger(this))
|
||||
{
|
||||
jvResult["error"] = "ledgerNotSubscribed";
|
||||
}
|
||||
}
|
||||
|
||||
// vim:ts=4
|
||||
|
||||
Reference in New Issue
Block a user