Name load source. Hook up load sources through WSConnection->RPCHandler.

This commit is contained in:
JoelKatz
2013-03-07 18:07:34 -08:00
parent b98471b20a
commit 7c3d732a08
10 changed files with 37 additions and 22 deletions

View File

@@ -45,7 +45,7 @@ DatabaseCon::~DatabaseCon()
Application::Application() : Application::Application() :
mIOWork(mIOService), mAuxWork(mAuxService), mUNL(mIOService), mNetOps(mIOService, &mLedgerMaster), mIOWork(mIOService), mAuxWork(mAuxService), mUNL(mIOService), mNetOps(mIOService, &mLedgerMaster),
mTempNodeCache("NodeCache", 16384, 90), mHashedObjectStore(16384, 300), mSLECache("LedgerEntryCache", 4096, 120), mTempNodeCache("NodeCache", 16384, 90), mHashedObjectStore(16384, 300), mSLECache("LedgerEntryCache", 4096, 120),
mSNTPClient(mAuxService), mRPCHandler(&mNetOps), mFeeTrack(), mSNTPClient(mAuxService), mFeeTrack(),
mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL),
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL),
mSweepTimer(mAuxService), mShutdown(false) mSweepTimer(mAuxService), mShutdown(false)

View File

@@ -64,7 +64,6 @@ class Application
SLECache mSLECache; SLECache mSLECache;
SNTPClient mSNTPClient; SNTPClient mSNTPClient;
JobQueue mJobQueue; JobQueue mJobQueue;
RPCHandler mRPCHandler;
ProofOfWorkGenerator mPOWGen; ProofOfWorkGenerator mPOWGen;
LoadManager mLoadMgr; LoadManager mLoadMgr;
LoadFeeTrack mFeeTrack; LoadFeeTrack mFeeTrack;
@@ -114,7 +113,6 @@ public:
ValidationCollection& getValidations() { return mValidations; } ValidationCollection& getValidations() { return mValidations; }
JobQueue& getJobQueue() { return mJobQueue; } JobQueue& getJobQueue() { return mJobQueue; }
SuppressionTable& getSuppression() { return mSuppressions; } SuppressionTable& getSuppression() { return mSuppressions; }
RPCHandler& getRPCHandler() { return mRPCHandler; }
boost::recursive_mutex& getMasterLock() { return mMasterLock; } boost::recursive_mutex& getMasterLock() { return mMasterLock; }
ProofOfWorkGenerator& getPowGen() { return mPOWGen; } ProofOfWorkGenerator& getPowGen() { return mPOWGen; }
LoadManager& getLoadManager() { return mLoadMgr; } LoadManager& getLoadManager() { return mLoadMgr; }

View File

@@ -61,14 +61,20 @@ public:
static const int lsfOutbound = 2; // outbound connection static const int lsfOutbound = 2; // outbound connection
protected: protected:
std::string mName;
int mBalance; int mBalance;
int mFlags; int mFlags;
int mLastUpdate; int mLastUpdate;
int mLastWarning; int mLastWarning;
public: public:
LoadSource() : mBalance(0), mFlags(0), mLastWarning(0) LoadSource(bool admin) : mBalance(0), mFlags(admin ? lsfPrivileged : 0), mLastUpdate(upTime()), mLastWarning(0)
{ mLastUpdate = upTime(); } { ; }
LoadSource(const std::string& name) : mName(name), mBalance(0), mFlags(0), mLastUpdate(upTime()), mLastWarning(0)
{ ; }
void rename(const std::string& name)
{ mName = name; }
bool isPrivileged() const { return (mFlags & lsfPrivileged) != 0; } bool isPrivileged() const { return (mFlags & lsfPrivileged) != 0; }
void setPrivileged() { mFlags |= lsfPrivileged; } void setPrivileged() { mFlags |= lsfPrivileged; }

View File

@@ -34,6 +34,8 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx,
mActive(2), mActive(2),
mCluster(false), mCluster(false),
mPeerId(peerID), mPeerId(peerID),
mPrivate(false),
mLoad(""),
mSocketSsl(io_service, ctx), mSocketSsl(io_service, ctx),
mActivityTimer(io_service) mActivityTimer(io_service)
{ {
@@ -77,6 +79,7 @@ void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_tran
void Peer::setIpPort(const std::string& strIP, int iPort) void Peer::setIpPort(const std::string& strIP, int iPort)
{ {
mIpPort = make_pair(strIP, iPort); mIpPort = make_pair(strIP, iPort);
mLoad.rename(strIP);
cLog(lsDEBUG) << "Peer: Set: " cLog(lsDEBUG) << "Peer: Set: "
<< ADDRESS(this) << "> " << ADDRESS(this) << "> "

View File

@@ -63,12 +63,13 @@ int iAdminGet(const Json::Value& jvRequest, const std::string& strRemoteIp)
return iRole; return iRole;
} }
RPCHandler::RPCHandler(NetworkOPs* netOps) RPCHandler::RPCHandler(NetworkOPs* netOps, LoadSource &ls) : mLoadSource(ls)
{ {
mNetOps = netOps; mNetOps = netOps;
} }
RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub) : mInfoSub(infoSub) RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub, LoadSource& ls)
: mInfoSub(infoSub), mLoadSource(ls)
{ {
mNetOps = netOps; mNetOps = netOps;
} }

View File

@@ -18,6 +18,7 @@ class RPCHandler
NetworkOPs* mNetOps; NetworkOPs* mNetOps;
InfoSub::pointer mInfoSub; InfoSub::pointer mInfoSub;
int mRole; int mRole;
LoadSource& mLoadSource;
typedef Json::Value (RPCHandler::*doFuncPtr)(Json::Value params); typedef Json::Value (RPCHandler::*doFuncPtr)(Json::Value params);
enum { enum {
@@ -113,8 +114,8 @@ public:
enum { GUEST, USER, ADMIN, FORBID }; enum { GUEST, USER, ADMIN, FORBID };
RPCHandler(NetworkOPs* netOps); RPCHandler(NetworkOPs* netOps, LoadSource&);
RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub); RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub, LoadSource&);
Json::Value doCommand(const Json::Value& jvRequest, int role); Json::Value doCommand(const Json::Value& jvRequest, int role);
Json::Value doRpcCommand(const std::string& strCommand, Json::Value& jvParams, int iRole); Json::Value doRpcCommand(const std::string& strCommand, Json::Value& jvParams, int iRole);

View File

@@ -24,7 +24,7 @@ SETUP_LOG();
#endif #endif
RPCServer::RPCServer(boost::asio::io_service& io_service , NetworkOPs* nopNetwork) RPCServer::RPCServer(boost::asio::io_service& io_service , NetworkOPs* nopNetwork)
: mNetOps(nopNetwork), mSocket(io_service) : mNetOps(nopNetwork), mLoadSource("rpc"), mSocket(io_service)
{ {
mRole = RPCHandler::GUEST; mRole = RPCHandler::GUEST;
} }
@@ -51,7 +51,7 @@ void RPCServer::handle_read_req(const boost::system::error_code& e)
if (!HTTPAuthorized(mHTTPRequest.peekHeaders())) if (!HTTPAuthorized(mHTTPRequest.peekHeaders()))
mReplyStr = HTTPReply(403, "Forbidden"); mReplyStr = HTTPReply(403, "Forbidden");
else else
mReplyStr = handleRequest(req); mReplyStr = handleRequest(req, mLoadSource);
boost::asio::async_write(mSocket, boost::asio::buffer(mReplyStr), boost::asio::async_write(mSocket, boost::asio::buffer(mReplyStr),
boost::bind(&RPCServer::handle_write, shared_from_this(), boost::asio::placeholders::error)); boost::bind(&RPCServer::handle_write, shared_from_this(), boost::asio::placeholders::error));
@@ -110,7 +110,7 @@ void RPCServer::handle_read_line(const boost::system::error_code& e)
} }
} }
std::string RPCServer::handleRequest(const std::string& requestStr) std::string RPCServer::handleRequest(const std::string& requestStr, LoadSource& ls)
{ {
cLog(lsTRACE) << "handleRequest " << requestStr; cLog(lsTRACE) << "handleRequest " << requestStr;
@@ -154,7 +154,7 @@ std::string RPCServer::handleRequest(const std::string& requestStr)
return HTTPReply(403, "Forbidden"); return HTTPReply(403, "Forbidden");
} }
RPCHandler mRPCHandler(mNetOps); RPCHandler mRPCHandler(mNetOps, mLoadSource);
cLog(lsTRACE) << valParams; cLog(lsTRACE) << valParams;
Json::Value result = mRPCHandler.doRpcCommand(strMethod, valParams, mRole); Json::Value result = mRPCHandler.doRpcCommand(strMethod, valParams, mRole);

View File

@@ -13,6 +13,7 @@
#include "NetworkOPs.h" #include "NetworkOPs.h"
#include "SerializedLedger.h" #include "SerializedLedger.h"
#include "RPCHandler.h" #include "RPCHandler.h"
#include "LoadManager.h"
class RPCServer : public boost::enable_shared_from_this<RPCServer> class RPCServer : public boost::enable_shared_from_this<RPCServer>
{ {
@@ -23,6 +24,7 @@ public:
private: private:
NetworkOPs* mNetOps; NetworkOPs* mNetOps;
LoadSource mLoadSource;
boost::asio::ip::tcp::socket mSocket; boost::asio::ip::tcp::socket mSocket;
@@ -44,7 +46,7 @@ private:
void handle_read_line(const boost::system::error_code& ec); void handle_read_line(const boost::system::error_code& ec);
void handle_read_req(const boost::system::error_code& ec); void handle_read_req(const boost::system::error_code& ec);
std::string handleRequest(const std::string& requestStr); std::string handleRequest(const std::string& requestStr, LoadSource& ls);
public: public:
static pointer create(boost::asio::io_service& io_service, NetworkOPs* mNetOps) static pointer create(boost::asio::io_service& io_service, NetworkOPs* mNetOps)

View File

@@ -14,6 +14,7 @@
#include "CallRPC.h" #include "CallRPC.h"
#include "InstanceCounter.h" #include "InstanceCounter.h"
#include "Log.h" #include "Log.h"
#include "LoadManager.h"
#include "RPCErr.h" #include "RPCErr.h"
DEFINE_INSTANCE(WebSocketConnection); DEFINE_INSTANCE(WebSocketConnection);
@@ -45,6 +46,7 @@ protected:
weak_connection_ptr mConnection; weak_connection_ptr mConnection;
NetworkOPs& mNetwork; NetworkOPs& mNetwork;
std::string mRemoteIP; std::string mRemoteIP;
LoadSource mLoadSource;
boost::asio::deadline_timer mPingTimer; boost::asio::deadline_timer mPingTimer;
bool mPinged; bool mPinged;
@@ -56,9 +58,9 @@ public:
WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection) WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection)
: mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()), : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()),
mPingTimer(cpConnection->get_io_service()), mPinged(false) mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()),
mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false)
{ {
mRemoteIP = cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string();
cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP; cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP;
setPingTimer(); setPingTimer();
} }
@@ -103,7 +105,8 @@ public:
return jvResult; return jvResult;
} }
RPCHandler mRPCHandler(&mNetwork, boost::shared_polymorphic_downcast<InfoSub>(this->shared_from_this())); RPCHandler mRPCHandler(&mNetwork,
boost::shared_polymorphic_downcast<InfoSub>(this->shared_from_this()), mLoadSource);
Json::Value jvResult(Json::objectValue); Json::Value jvResult(Json::objectValue);
int iRole = mHandler->getPublic() int iRole = mHandler->getPublic()

View File

@@ -43,7 +43,8 @@ void startServer()
if (!theConfig.QUIET) if (!theConfig.QUIET)
std::cerr << "Startup RPC: " << jvCommand << std::endl; std::cerr << "Startup RPC: " << jvCommand << std::endl;
RPCHandler rhHandler(&theApp->getOPs()); LoadSource ls(true);
RPCHandler rhHandler(&theApp->getOPs(), ls);
Json::Value jvResult = rhHandler.doCommand(jvCommand, RPCHandler::ADMIN); Json::Value jvResult = rhHandler.doCommand(jvCommand, RPCHandler::ADMIN);