From 7c3d732a0864b16f18db8bee7bb6576ad4fae6d5 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Thu, 7 Mar 2013 18:07:34 -0800 Subject: [PATCH] Name load source. Hook up load sources through WSConnection->RPCHandler. --- src/cpp/ripple/Application.cpp | 2 +- src/cpp/ripple/Application.h | 2 -- src/cpp/ripple/LoadManager.h | 18 ++++++++++++------ src/cpp/ripple/Peer.cpp | 3 +++ src/cpp/ripple/RPCHandler.cpp | 5 +++-- src/cpp/ripple/RPCHandler.h | 5 +++-- src/cpp/ripple/RPCServer.cpp | 8 ++++---- src/cpp/ripple/RPCServer.h | 4 +++- src/cpp/ripple/WSConnection.h | 9 ++++++--- src/cpp/ripple/main.cpp | 3 ++- 10 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index cc417257c..9cbbdc766 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -45,7 +45,7 @@ DatabaseCon::~DatabaseCon() Application::Application() : mIOWork(mIOService), mAuxWork(mAuxService), mUNL(mIOService), mNetOps(mIOService, &mLedgerMaster), mTempNodeCache("NodeCache", 16384, 90), mHashedObjectStore(16384, 300), mSLECache("LedgerEntryCache", 4096, 120), - mSNTPClient(mAuxService), mRPCHandler(&mNetOps), mFeeTrack(), + mSNTPClient(mAuxService), mFeeTrack(), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL), mSweepTimer(mAuxService), mShutdown(false) diff --git a/src/cpp/ripple/Application.h b/src/cpp/ripple/Application.h index cafa5b7c7..48e58d97a 100644 --- a/src/cpp/ripple/Application.h +++ b/src/cpp/ripple/Application.h @@ -64,7 +64,6 @@ class Application SLECache mSLECache; SNTPClient mSNTPClient; JobQueue mJobQueue; - RPCHandler mRPCHandler; ProofOfWorkGenerator mPOWGen; LoadManager mLoadMgr; LoadFeeTrack mFeeTrack; @@ -114,7 +113,6 @@ public: ValidationCollection& getValidations() { return mValidations; } JobQueue& getJobQueue() { return mJobQueue; } SuppressionTable& getSuppression() { return mSuppressions; } - RPCHandler& getRPCHandler() { return mRPCHandler; } boost::recursive_mutex& getMasterLock() { return mMasterLock; } ProofOfWorkGenerator& getPowGen() { return mPOWGen; } LoadManager& getLoadManager() { return mLoadMgr; } diff --git a/src/cpp/ripple/LoadManager.h b/src/cpp/ripple/LoadManager.h index 55796ea02..8899bf575 100644 --- a/src/cpp/ripple/LoadManager.h +++ b/src/cpp/ripple/LoadManager.h @@ -61,14 +61,20 @@ public: static const int lsfOutbound = 2; // outbound connection protected: - int mBalance; - int mFlags; - int mLastUpdate; - int mLastWarning; + std::string mName; + int mBalance; + int mFlags; + int mLastUpdate; + int mLastWarning; public: - LoadSource() : mBalance(0), mFlags(0), mLastWarning(0) - { mLastUpdate = upTime(); } + LoadSource(bool admin) : mBalance(0), mFlags(admin ? lsfPrivileged : 0), mLastUpdate(upTime()), mLastWarning(0) + { ; } + LoadSource(const std::string& name) : mName(name), mBalance(0), mFlags(0), mLastUpdate(upTime()), mLastWarning(0) + { ; } + + void rename(const std::string& name) + { mName = name; } bool isPrivileged() const { return (mFlags & lsfPrivileged) != 0; } void setPrivileged() { mFlags |= lsfPrivileged; } diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 62bc160cf..991afb52d 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -34,6 +34,8 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, mActive(2), mCluster(false), mPeerId(peerID), + mPrivate(false), + mLoad(""), mSocketSsl(io_service, ctx), mActivityTimer(io_service) { @@ -77,6 +79,7 @@ void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_tran void Peer::setIpPort(const std::string& strIP, int iPort) { mIpPort = make_pair(strIP, iPort); + mLoad.rename(strIP); cLog(lsDEBUG) << "Peer: Set: " << ADDRESS(this) << "> " diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index 9c8eb6c47..67ebec782 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -63,12 +63,13 @@ int iAdminGet(const Json::Value& jvRequest, const std::string& strRemoteIp) return iRole; } -RPCHandler::RPCHandler(NetworkOPs* netOps) +RPCHandler::RPCHandler(NetworkOPs* netOps, LoadSource &ls) : mLoadSource(ls) { mNetOps = netOps; } -RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub) : mInfoSub(infoSub) +RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub, LoadSource& ls) + : mInfoSub(infoSub), mLoadSource(ls) { mNetOps = netOps; } diff --git a/src/cpp/ripple/RPCHandler.h b/src/cpp/ripple/RPCHandler.h index 48fe696ce..9c8c3e288 100644 --- a/src/cpp/ripple/RPCHandler.h +++ b/src/cpp/ripple/RPCHandler.h @@ -18,6 +18,7 @@ class RPCHandler NetworkOPs* mNetOps; InfoSub::pointer mInfoSub; int mRole; + LoadSource& mLoadSource; typedef Json::Value (RPCHandler::*doFuncPtr)(Json::Value params); enum { @@ -113,8 +114,8 @@ public: enum { GUEST, USER, ADMIN, FORBID }; - RPCHandler(NetworkOPs* netOps); - RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub); + RPCHandler(NetworkOPs* netOps, LoadSource&); + RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub, LoadSource&); Json::Value doCommand(const Json::Value& jvRequest, int role); Json::Value doRpcCommand(const std::string& strCommand, Json::Value& jvParams, int iRole); diff --git a/src/cpp/ripple/RPCServer.cpp b/src/cpp/ripple/RPCServer.cpp index cd7474261..33185c3b7 100644 --- a/src/cpp/ripple/RPCServer.cpp +++ b/src/cpp/ripple/RPCServer.cpp @@ -24,7 +24,7 @@ SETUP_LOG(); #endif RPCServer::RPCServer(boost::asio::io_service& io_service , NetworkOPs* nopNetwork) - : mNetOps(nopNetwork), mSocket(io_service) + : mNetOps(nopNetwork), mLoadSource("rpc"), mSocket(io_service) { mRole = RPCHandler::GUEST; } @@ -51,7 +51,7 @@ void RPCServer::handle_read_req(const boost::system::error_code& e) if (!HTTPAuthorized(mHTTPRequest.peekHeaders())) mReplyStr = HTTPReply(403, "Forbidden"); else - mReplyStr = handleRequest(req); + mReplyStr = handleRequest(req, mLoadSource); boost::asio::async_write(mSocket, boost::asio::buffer(mReplyStr), boost::bind(&RPCServer::handle_write, shared_from_this(), boost::asio::placeholders::error)); @@ -110,7 +110,7 @@ void RPCServer::handle_read_line(const boost::system::error_code& e) } } -std::string RPCServer::handleRequest(const std::string& requestStr) +std::string RPCServer::handleRequest(const std::string& requestStr, LoadSource& ls) { cLog(lsTRACE) << "handleRequest " << requestStr; @@ -154,7 +154,7 @@ std::string RPCServer::handleRequest(const std::string& requestStr) return HTTPReply(403, "Forbidden"); } - RPCHandler mRPCHandler(mNetOps); + RPCHandler mRPCHandler(mNetOps, mLoadSource); cLog(lsTRACE) << valParams; Json::Value result = mRPCHandler.doRpcCommand(strMethod, valParams, mRole); diff --git a/src/cpp/ripple/RPCServer.h b/src/cpp/ripple/RPCServer.h index c96774354..bfc6a4c0e 100644 --- a/src/cpp/ripple/RPCServer.h +++ b/src/cpp/ripple/RPCServer.h @@ -13,6 +13,7 @@ #include "NetworkOPs.h" #include "SerializedLedger.h" #include "RPCHandler.h" +#include "LoadManager.h" class RPCServer : public boost::enable_shared_from_this { @@ -23,6 +24,7 @@ public: private: NetworkOPs* mNetOps; + LoadSource mLoadSource; boost::asio::ip::tcp::socket mSocket; @@ -44,7 +46,7 @@ private: void handle_read_line(const boost::system::error_code& ec); void handle_read_req(const boost::system::error_code& ec); - std::string handleRequest(const std::string& requestStr); + std::string handleRequest(const std::string& requestStr, LoadSource& ls); public: static pointer create(boost::asio::io_service& io_service, NetworkOPs* mNetOps) diff --git a/src/cpp/ripple/WSConnection.h b/src/cpp/ripple/WSConnection.h index 399c6a827..7ce02fba4 100644 --- a/src/cpp/ripple/WSConnection.h +++ b/src/cpp/ripple/WSConnection.h @@ -14,6 +14,7 @@ #include "CallRPC.h" #include "InstanceCounter.h" #include "Log.h" +#include "LoadManager.h" #include "RPCErr.h" DEFINE_INSTANCE(WebSocketConnection); @@ -45,6 +46,7 @@ protected: weak_connection_ptr mConnection; NetworkOPs& mNetwork; std::string mRemoteIP; + LoadSource mLoadSource; boost::asio::deadline_timer mPingTimer; bool mPinged; @@ -56,9 +58,9 @@ public: WSConnection(WSServerHandler* wshpHandler, const connection_ptr& cpConnection) : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()), - mPingTimer(cpConnection->get_io_service()), mPinged(false) + mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()), + mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false) { - mRemoteIP = cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string(); cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP; setPingTimer(); } @@ -103,7 +105,8 @@ public: return jvResult; } - RPCHandler mRPCHandler(&mNetwork, boost::shared_polymorphic_downcast(this->shared_from_this())); + RPCHandler mRPCHandler(&mNetwork, + boost::shared_polymorphic_downcast(this->shared_from_this()), mLoadSource); Json::Value jvResult(Json::objectValue); int iRole = mHandler->getPublic() diff --git a/src/cpp/ripple/main.cpp b/src/cpp/ripple/main.cpp index 88dc35a33..1c114aa07 100644 --- a/src/cpp/ripple/main.cpp +++ b/src/cpp/ripple/main.cpp @@ -43,7 +43,8 @@ void startServer() if (!theConfig.QUIET) std::cerr << "Startup RPC: " << jvCommand << std::endl; - RPCHandler rhHandler(&theApp->getOPs()); + LoadSource ls(true); + RPCHandler rhHandler(&theApp->getOPs(), ls); Json::Value jvResult = rhHandler.doCommand(jvCommand, RPCHandler::ADMIN);