diff --git a/src/Ledger.cpp b/src/Ledger.cpp index 1ab8fb3d5c..3838ce4104 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -318,6 +318,8 @@ void Ledger::saveAcceptedLedger(Ledger::pointer ledger) } } db->executeSQL("COMMIT TRANSACTION;"); + + theApp->getOPs().pubLedger(ledger); } Ledger::pointer Ledger::getSQL(const std::string& sql) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index cecc6bb5cb..8a055f203f 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -695,15 +695,34 @@ void NetworkOPs::pubAccountInfo(const NewcoinAddress& naAccountID, const Json::V } } +void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted) +{ + if (!mSubLedger.empty()) + { + Json::Value jvObj(Json::objectValue); + + jvObj["type"] = "ledgerAccepted"; + jvObj["seq"] = lpAccepted->getLedgerSeq(); + jvObj["hash"] = lpAccepted->getHash().ToString(); + jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC()); + + boost::interprocess::sharable_lock sl(mMonitorLock); + BOOST_FOREACH(InfoSub* ispListener, mSubLedger) + { + ispListener->send(jvObj); + } + } +} + // // Monitoring // -void NetworkOPs::subAccountInfo(InfoSub* ispListener, const std::vector& vnaAccountIDs) +void NetworkOPs::subAccountInfo(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs) { boost::interprocess::scoped_lock sl(mMonitorLock); - BOOST_FOREACH(NewcoinAddress naAccountID, vnaAccountIDs) + BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs) { subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID); if (simIterator == mSubAccountInfo.end()) @@ -722,11 +741,11 @@ void NetworkOPs::subAccountInfo(InfoSub* ispListener, const std::vector& vnaAccountIDs) +void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs) { boost::interprocess::scoped_lock sl(mMonitorLock); - BOOST_FOREACH(NewcoinAddress naAccountID, vnaAccountIDs) + BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs) { subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID); if (simIterator == mSubAccountInfo.end()) @@ -748,7 +767,7 @@ void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const std::vector mSubLedger; public: NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster); @@ -65,8 +69,8 @@ public: uint32 getCurrentLedgerID(); OperatingMode getOperatingMode() { return mMode; } inline bool available() { - // XXX don't consider network available till have a closed ledger. - return omDISCONNECTED != mMode; + // XXX Later this can be relaxed to omCONNECTED + return mMode >= omTRACKING; } uint256 getClosedLedger() @@ -154,17 +158,21 @@ public: // void pubAccountInfo(const NewcoinAddress& naAccountID, const Json::Value& jvObj); + void pubLedger(const Ledger::pointer& lpAccepted); // // Monitoring: subscriber side // // --> vnaAddress: empty = all - void subAccountInfo(InfoSub* ispListener, const std::vector& vnaAccountIDs); - void unsubAccountInfo(InfoSub* ispListener, const std::vector& vnaAccountIDs); + void subAccountInfo(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs); + void unsubAccountInfo(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs); - void subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash); - void unsubAccountChanges(InfoSub* ispListener); + // void subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash); + // void unsubAccountChanges(InfoSub* ispListener); + + bool subLedger(InfoSub* ispListener); + bool unsubLedger(InfoSub* ispListener); }; #endif diff --git a/src/WSDoor.cpp b/src/WSDoor.cpp index 01b395b3fe..3b38380baf 100644 --- a/src/WSDoor.cpp +++ b/src/WSDoor.cpp @@ -5,6 +5,7 @@ #include "Config.h" #include "Log.h" #include "NetworkOPs.h" +#include "NetworkOPs.h" #include "utils.h" #include @@ -12,6 +13,7 @@ #include #include #include +#include #include "../json/reader.h" #include "../json/writer.h" @@ -47,6 +49,11 @@ public: typedef typename websocketpp::WSDOOR_SERVER::handler::message_ptr message_ptr; protected: + typedef void (WSConnection::*doFuncPtr)(Json::Value& jvResult, const Json::Value &jvRequest); + + boost::mutex mLock; + boost::unordered_set mSubAccountInfo; + WSServerHandler* mHandler; connection_ptr mConnection; @@ -58,14 +65,20 @@ public: WSConnection(WSServerHandler* wshpHandler, connection_ptr cpConnection) : mHandler(wshpHandler), mConnection(cpConnection) { ; } - ~WSConnection() - { - // XXX Unsubscribe. - nothing(); - } + virtual ~WSConnection(); // Implement overriden functions from base class: void send(const Json::Value& jvObj); + + // Utilities + Json::Value invokeCommand(const Json::Value& jvRequest); + boost::unordered_set parseAccountIds(const Json::Value& jvArray); + + // Commands + void doAccountInfoSubscribe(Json::Value& jvResult, const Json::Value& jvRequest); + void doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); + void doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest); + void doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); }; @@ -88,7 +101,8 @@ private: protected: boost::mutex mMapLock; - boost::unordered_map mMap; + // For each connection maintain an assoicated object to track subscriptions. + boost::unordered_map > mMap; public: WSServerHandler(boost::shared_ptr spCtx) : mCtx(spCtx) {} @@ -130,7 +144,6 @@ public: Log(lsINFO) << "Ws:: Object '" << jfwWriter.write(jvObj) << "'"; - send(cpClient, jfwWriter.write(jvObj)); } @@ -138,7 +151,7 @@ public: { boost::mutex::scoped_lock sl(mMapLock); - mMap[cpClient] = WSConnection(this, cpClient); + mMap[cpClient] = boost::make_shared(this, cpClient); } void on_close(connection_ptr cpClient) @@ -174,12 +187,7 @@ public: } else { - Json::Value jvResult(Json::objectValue); - - jvResult["type"] = "success"; - jvResult["value"] = mpMessage->get_payload(); - - send(cpClient, jvResult); + send(cpClient, mMap[cpClient]->invokeCommand(jvRequest)); } } @@ -250,9 +258,192 @@ void WSDoor::stop() } } +// +// WSConnection +// + +WSConnection::~WSConnection() +{ + theApp->getOPs().unsubLedger(this); + theApp->getOPs().unsubAccountInfo(this, mSubAccountInfo); +} + void WSConnection::send(const Json::Value& jvObj) { mHandler->send(mConnection, jvObj); } +// +// Utilities +// + +Json::Value WSConnection::invokeCommand(const Json::Value& jvRequest) +{ + static struct { + const char* pCommand; + doFuncPtr dfpFunc; + } commandsA[] = { + { "account_info_subscribe", &WSConnection::doAccountInfoSubscribe }, + { "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe }, + { "ledger_subscribe", &WSConnection::doLedgerSubcribe }, + { "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe }, + }; + + + if (!jvRequest.isMember("command")) + { + Json::Value jvResult(Json::objectValue); + + jvResult["type"] = "response"; + jvResult["result"] = "error"; + jvResult["error"] = "missingCommand"; + jvResult["command"] = jvRequest; + + return jvResult; + } + + std::string strCommand = jvRequest["command"].asString(); + + int i = NUMBER(commandsA); + + while (i-- && strCommand != commandsA[i].pCommand) + ; + + Json::Value jvResult(Json::objectValue); + + jvResult["type"] = "response"; + + if (i < 0) + { + jvResult["error"] = "unknownCommand"; // Unknown command. + } + else + { + (this->*(commandsA[i].dfpFunc))(jvResult, jvRequest); + } + + if (jvRequest.isMember("id")) + { + jvResult["id"] = jvRequest["id"]; + } + + if (jvResult.isMember("error")) + { + jvResult["result"] = "error"; + jvResult["request"] = jvRequest; + } + else + { + jvResult["result"] = "success"; + } + + return jvResult; +} + +boost::unordered_set WSConnection::parseAccountIds(const Json::Value& jvArray) +{ + boost::unordered_set usnaResult; + + for (Json::Value::const_iterator it = jvArray.begin(); it != jvArray.end(); it++) + { + NewcoinAddress naString; + + if (!(*it).isString() || !naString.setAccountID((*it).asString())) + { + usnaResult.clear(); + break; + } + else + { + (void) usnaResult.insert(naString); + } + } + + return usnaResult; +} + +// +// Commands +// + +void WSConnection::doAccountInfoSubscribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!jvRequest.isMember("accounts")) + { + jvResult["error"] = "missingField"; + } + else if (jvResult["accounts"].empty()) + { + jvResult["error"] = "emptySet"; + } + else + { + boost::unordered_set usnaAccoundIds = parseAccountIds(jvRequest["accounts"]); + + if (usnaAccoundIds.empty()) + { + jvResult["error"] = "malformedAccount"; + } + else + { + boost::mutex::scoped_lock sl(mLock); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds) + { + mSubAccountInfo.insert(naAccountID); + } + + theApp->getOPs().subAccountInfo(this, usnaAccoundIds); + } + } +} + +void WSConnection::doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!jvRequest.isMember("accounts")) + { + jvResult["error"] = "missingField"; + } + else if (jvResult["accounts"].empty()) + { + jvResult["error"] = "emptySet"; + } + else + { + boost::unordered_set usnaAccoundIds = parseAccountIds(jvRequest["accounts"]); + + if (usnaAccoundIds.empty()) + { + jvResult["error"] = "malformedAccount"; + } + else + { + boost::mutex::scoped_lock sl(mLock); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds) + { + mSubAccountInfo.erase(naAccountID); + } + + theApp->getOPs().unsubAccountInfo(this, usnaAccoundIds); + } + } +} + +void WSConnection::doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!theApp->getOPs().subLedger(this)) + { + jvResult["error"] = "ledgerSubscribed"; + } +} + +void WSConnection::doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!theApp->getOPs().unsubLedger(this)) + { + jvResult["error"] = "ledgerNotSubscribed"; + } +} + // vim:ts=4