Remove unused websocket files (RIPD-1293)

This commit is contained in:
Mike Ellery
2017-01-06 13:51:04 -08:00
committed by Nik Bougalis
parent 7536c53a48
commit 0d577d9349
9 changed files with 6 additions and 1096 deletions

View File

@@ -2126,6 +2126,8 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\ledger\View.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\AutoSocket.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\HTTPClient.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\net\impl\HTTPClient.cpp">
@@ -3262,10 +3264,6 @@
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='debug|x64'">..\..\src\soci\include\private;..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='release|x64'">..\..\src\soci\include\private;..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<ClInclude Include="..\..\src\ripple\websocket\AutoSocket.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\websocket\MakeServer.h">
</ClInclude>
<ClCompile Include="..\..\src\rocksdb2\db\builder.cc">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>

View File

@@ -298,9 +298,6 @@
<Filter Include="ripple\unity">
<UniqueIdentifier>{5DB3CD0B-B361-B301-9562-697CA8A52B68}</UniqueIdentifier>
</Filter>
<Filter Include="ripple\websocket">
<UniqueIdentifier>{44780F86-42D3-2F2B-0846-5AEE2CA6D7FE}</UniqueIdentifier>
</Filter>
<Filter Include="rocksdb2">
<UniqueIdentifier>{15B4B65A-0F03-7BA9-38CD-42A5712392CB}</UniqueIdentifier>
</Filter>
@@ -2712,6 +2709,9 @@
<ClInclude Include="..\..\src\ripple\ledger\View.h">
<Filter>ripple\ledger</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\AutoSocket.h">
<Filter>ripple\net</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\HTTPClient.h">
<Filter>ripple\net</Filter>
</ClInclude>
@@ -3804,12 +3804,6 @@
<ClCompile Include="..\..\src\ripple\unity\soci_ripple.cpp">
<Filter>ripple\unity</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\websocket\AutoSocket.h">
<Filter>ripple\websocket</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\websocket\MakeServer.h">
<Filter>ripple\websocket</Filter>
</ClInclude>
<ClCompile Include="..\..\src\rocksdb2\db\builder.cc">
<Filter>rocksdb2\db</Filter>
</ClCompile>

View File

@@ -73,7 +73,6 @@
#include <ripple/rpc/Context.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/shamap/Family.h>
#include <ripple/websocket/MakeServer.h>
#include <ripple/crypto/csprng.h>
#include <ripple/beast/asio/io_latency_probe.h>
#include <ripple/beast/core/LexicalCast.h>

View File

@@ -22,7 +22,7 @@
#include <ripple/basics/Log.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/net/HTTPClient.h>
#include <ripple/websocket/AutoSocket.h>
#include <ripple/net/AutoSocket.h>
#include <beast/core/placeholders.hpp>
#include <ripple/beast/core/LexicalCast.h>
#include <boost/asio.hpp>

View File

@@ -1,415 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_WEBSOCKET_WSCONNECTION_H_INCLUDED
#define RIPPLE_APP_WEBSOCKET_WSCONNECTION_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/Log.h>
#include <ripple/core/JobQueue.h>
#include <ripple/json/to_string.h>
#include <ripple/net/InfoSub.h>
#include <ripple/net/RPCErr.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/resource/Fees.h>
#include <ripple/resource/ResourceManager.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/server/Port.h>
#include <ripple/rpc/Role.h>
#include <ripple/json/to_string.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/rpc/Role.h>
#include <boost/asio.hpp>
#include <beast/core/placeholders.hpp>
#include <memory>
namespace ripple {
namespace websocket {
template <class WebSocket>
class HandlerImpl;
/** A Ripple WebSocket connection handler.
*/
template <class WebSocket>
class ConnectionImpl
: public std::enable_shared_from_this <ConnectionImpl <WebSocket> >
, public InfoSub
, public CountedObject <ConnectionImpl <WebSocket>>
{
public:
static char const* getCountedObjectName () { return "ConnectionImpl"; }
using message_ptr = typename WebSocket::MessagePtr;
using connection = typename WebSocket::Connection;
using connection_ptr = typename WebSocket::ConnectionPtr;
using weak_connection_ptr = typename WebSocket::ConnectionWeakPtr;
using handler_type = HandlerImpl <WebSocket>;
ConnectionImpl (
Application& app,
Resource::Manager& resourceManager,
InfoSub::Source& source,
handler_type& handler,
connection_ptr const& cpConnection,
beast::IP::Endpoint const& remoteAddress,
boost::asio::io_service& io_service,
std::pair<std::string, std::string> identity);
void preDestroy ();
static void destroy (std::shared_ptr <ConnectionImpl <WebSocket> >)
{
// Just discards the reference
}
void send (Json::Value const& jvObj, bool broadcast);
void disconnect ();
static void handle_disconnect(weak_connection_ptr c);
bool onPingTimer (std::string&);
void pingTimer (typename WebSocket::ErrorCode const& e);
void onPong (std::string const&);
void rcvMessage (message_ptr const&, bool& msgRejected, bool& runQueue);
boost::optional <std::string> getMessage ();
bool checkMessage ();
Json::Value invokeCommand (Json::Value const& jvRequest,
std::shared_ptr<JobQueue::Coro> coro);
// Generically implemented per version.
void setPingTimer ();
private:
Application& app_;
Port const& m_port;
Resource::Manager& m_resourceManager;
beast::IP::Endpoint const m_remoteAddress;
std::string const m_forwardedFor;
std::string const m_user;
std::mutex m_receiveQueueMutex;
std::deque <std::string> m_receiveQueue;
NetworkOPs& m_netOPs;
boost::asio::io_service& m_io_service;
boost::asio::basic_waitable_timer<std::chrono::system_clock> m_pingTimer;
bool m_sentPing = false;
bool m_receiveQueueRunning = false;
bool m_isDead = false;
handler_type& m_handler;
weak_connection_ptr m_connection;
std::chrono::seconds pingFreq_;
beast::Journal j_;
};
template <class WebSocket>
ConnectionImpl <WebSocket>::ConnectionImpl (
Application& app,
Resource::Manager& resourceManager,
InfoSub::Source& source,
handler_type& handler,
connection_ptr const& cpConnection,
beast::IP::Endpoint const& remoteAddress,
boost::asio::io_service& io_service,
std::pair<std::string, std::string> identity)
: InfoSub (source, requestInboundEndpoint (
resourceManager, remoteAddress, handler.port(), identity.second))
, app_(app)
, m_port (handler.port ())
, m_resourceManager (resourceManager)
, m_remoteAddress (remoteAddress)
, m_forwardedFor (isIdentified (m_port, m_remoteAddress.address(),
identity.second) ? identity.first : std::string())
, m_user (isIdentified (m_port, m_remoteAddress.address(),
identity.second) ? identity.second : std::string())
, m_netOPs (app_.getOPs ())
, m_io_service (io_service)
, m_pingTimer (io_service)
, m_handler (handler)
, m_connection (cpConnection)
, pingFreq_ (app.config ().WEBSOCKET_PING_FREQ)
, j_ (app.journal ("ConnectionImpl"))
{
// VFALCO Disabled since it might cause hangs
pingFreq_ = std::chrono::seconds{0};
if (! m_forwardedFor.empty() || ! m_user.empty())
{
JLOG(j_.debug()) << "connect secure_gateway X-Forwarded-For: " <<
m_forwardedFor << ", X-User: " << m_user;
}
}
template <class WebSocket>
void ConnectionImpl <WebSocket>::onPong (std::string const&)
{
m_sentPing = false;
}
template <class WebSocket>
void ConnectionImpl <WebSocket>::rcvMessage (
message_ptr const& msg, bool& msgRejected, bool& runQueue)
{
JLOG(j_.debug()) <<
"WebSocket: received " << msg->get_payload();
ScopedLockType sl (m_receiveQueueMutex);
if (m_isDead)
{
msgRejected = false;
runQueue = false;
return;
}
if ((m_receiveQueue.size () >= 1000) ||
(msg->get_payload().size() > 1000000) ||
! WebSocket::isTextMessage (*msg))
{
msgRejected = true;
runQueue = false;
}
else
{
msgRejected = false;
m_receiveQueue.push_back (msg->get_payload ());
if (m_receiveQueueRunning)
runQueue = false;
else
{
runQueue = true;
m_receiveQueueRunning = true;
}
}
}
template <class WebSocket>
bool ConnectionImpl <WebSocket>::checkMessage ()
{
ScopedLockType sl (m_receiveQueueMutex);
assert (m_receiveQueueRunning);
if (m_isDead || m_receiveQueue.empty ())
{
m_receiveQueueRunning = false;
return false;
}
return true;
}
template <class WebSocket>
boost::optional <std::string>
ConnectionImpl <WebSocket>::getMessage ()
{
ScopedLockType sl (m_receiveQueueMutex);
if (m_isDead || m_receiveQueue.empty ())
{
m_receiveQueueRunning = false;
return boost::none;
}
boost::optional <std::string> ret (std::move (m_receiveQueue.front ()));
m_receiveQueue.pop_front ();
return ret;
}
template <class WebSocket>
Json::Value ConnectionImpl <WebSocket>::invokeCommand (
Json::Value const& jvRequest, std::shared_ptr<JobQueue::Coro> coro)
{
if (getConsumer().disconnect ())
{
disconnect ();
return rpcError (rpcSLOW_DOWN);
}
// Requests without "command" are invalid.
//
if (!jvRequest.isMember (jss::command))
{
Json::Value jvResult (Json::objectValue);
jvResult[jss::type] = jss::response;
jvResult[jss::status] = jss::error;
jvResult[jss::error] = jss::missingCommand;
jvResult[jss::request] = jvRequest;
if (jvRequest.isMember (jss::id))
{
jvResult[jss::id] = jvRequest[jss::id];
}
getConsumer().charge (Resource::feeInvalidRPC);
return jvResult;
}
Resource::Charge loadType = Resource::feeReferenceRPC;
Json::Value jvResult (Json::objectValue);
auto required = RPC::roleRequired (jvRequest[jss::command].asString());
auto role = requestRole (required, m_port, jvRequest, m_remoteAddress,
m_user);
if (Role::FORBID == role)
{
jvResult[jss::result] = rpcError (rpcFORBIDDEN);
}
else
{
RPC::Context context {app_.journal ("RPCHandler"), jvRequest,
app_, loadType, m_netOPs, app_.getLedgerMaster(), getConsumer(),
role, coro, this->shared_from_this(),
{m_user, m_forwardedFor}};
RPC::doCommand (context, jvResult[jss::result]);
}
getConsumer().charge (loadType);
if (getConsumer().warn ())
{
jvResult[jss::warning] = jss::load;
}
// Currently we will simply unwrap errors returned by the RPC
// API, in the future maybe we can make the responses
// consistent.
//
// Regularize result. This is duplicate code.
if (jvResult[jss::result].isMember (jss::error))
{
jvResult = jvResult[jss::result];
jvResult[jss::status] = jss::error;
jvResult[jss::request] = jvRequest;
}
else
{
jvResult[jss::status] = jss::success;
// For testing resource limits on this connection.
if (jvRequest[jss::command].asString() == "ping")
{
if (getConsumer().isUnlimited())
jvResult[jss::unlimited] = true;
}
}
if (jvRequest.isMember (jss::id))
{
jvResult[jss::id] = jvRequest[jss::id];
}
jvResult[jss::type] = jss::response;
return jvResult;
}
template <class WebSocket>
void ConnectionImpl <WebSocket>::preDestroy ()
{
if (! m_forwardedFor.empty() || ! m_user.empty())
{
JLOG(j_.debug()) << "disconnect secure_gateway X-Forwarded-For: " <<
m_forwardedFor << ", X-User: " << m_user;
}
// sever connection
this->m_pingTimer.cancel ();
m_connection.reset ();
{
ScopedLockType sl (this->m_receiveQueueMutex);
this->m_isDead = true;
}
}
// Implement overridden functions from base class:
template <class WebSocket>
void ConnectionImpl <WebSocket>::send (Json::Value const& jvObj, bool broadcast)
{
JLOG (j_.debug()) <<
"WebSocket: sending " << to_string (jvObj);
connection_ptr ptr = m_connection.lock ();
if (ptr)
m_handler.send (ptr, jvObj, broadcast);
}
template <class WebSocket>
void ConnectionImpl <WebSocket>::disconnect ()
{
JLOG (j_.debug()) <<
"WebSocket: disconnecting";
connection_ptr ptr = m_connection.lock ();
if (ptr)
this->m_io_service.dispatch (
WebSocket::getStrand (*ptr).wrap (
std::bind(&ConnectionImpl <WebSocket>::handle_disconnect,
m_connection)));
}
// static
template <class WebSocket>
void ConnectionImpl <WebSocket>::handle_disconnect(weak_connection_ptr c)
{
connection_ptr ptr = c.lock ();
if (ptr)
WebSocket::handleDisconnect (*ptr);
}
template <class WebSocket>
bool ConnectionImpl <WebSocket>::onPingTimer (std::string&)
{
if (this->m_sentPing)
return true; // causes connection to close
this->m_sentPing = true;
setPingTimer ();
return false; // causes ping to be sent
}
//--------------------------------------------------------------------------
template <class WebSocket>
void ConnectionImpl <WebSocket>::pingTimer (
typename WebSocket::ErrorCode const& e)
{
if (!e)
{
if (auto ptr = this->m_connection.lock ())
this->m_handler.pingTimer (ptr);
}
}
} // websocket
} // ripple
#endif

View File

@@ -1,471 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_WEBSOCKET_WSSERVERHANDLER_H_INCLUDED
#define RIPPLE_APP_WEBSOCKET_WSSERVERHANDLER_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/app/main/CollectorManager.h>
#include <ripple/core/JobQueue.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/server/Port.h>
#include <ripple/json/json_reader.h>
#include <ripple/websocket/Connection.h>
#include <boost/unordered_map.hpp>
#include <memory>
namespace ripple {
namespace websocket {
// CAUTION: on_* functions are called by the websocket code while holding a lock
// A single instance of this object is made.
// This instance dispatches all events. There is no per connection persistence.
/** Make a beast endpoint from a boost::asio endpoint. */
inline
beast::IP::Endpoint makeBeastEndpoint (boost::asio::ip::tcp::endpoint const& e)
{
return beast::IP::from_asio (e);
}
/** Make a beast endpoint from itself. */
inline
beast::IP::Endpoint makeBeastEndpoint (beast::IP::Endpoint const& e)
{
return e;
}
template <class WebSocket>
class HandlerImpl
: public WebSocket::Handler
{
public:
using connection_ptr = typename WebSocket::ConnectionPtr;
using message_ptr = typename WebSocket::MessagePtr;
using wsc_ptr = std::shared_ptr <ConnectionImpl <WebSocket> >;
// Private reasons to close.
enum
{
crTooSlow = 4000, // Client is too slow.
};
private:
Application& app_;
beast::insight::Counter rpc_requests_;
beast::insight::Event rpc_size_;
beast::insight::Event rpc_time_;
ServerDescription desc_;
beast::Journal j_;
protected:
// VFALCO TODO Make this private.
std::mutex mLock;
// For each connection maintain an associated object to track subscriptions.
using MapType = boost::unordered_map<connection_ptr, wsc_ptr>;
MapType mMap;
public:
HandlerImpl (ServerDescription const& desc)
: app_ (desc.app)
, desc_ (desc)
, j_ (app_.journal ("HandlerLog"))
{
auto const& group (desc_.collectorManager.group ("rpc"));
rpc_requests_ = group->make_counter ("requests");
rpc_size_ = group->make_event ("size");
rpc_time_ = group->make_event ("time");
}
HandlerImpl(HandlerImpl const&) = delete;
HandlerImpl& operator= (HandlerImpl const&) = delete;
Port const&
port() const
{
return desc_.port;
}
void send (connection_ptr const& cpClient, message_ptr const& mpMessage)
{
try
{
cpClient->send (
mpMessage->get_payload (), mpMessage->get_opcode ());
}
catch (std::exception const&)
{
WebSocket::closeTooSlowClient (*cpClient, crTooSlow);
}
}
void send (connection_ptr const& cpClient, std::string const& strMessage,
bool broadcast)
{
try
{
auto jm = broadcast ? j_.trace() : j_.debug();
JLOG (jm)
<< "Ws:: Sending '" << strMessage << "'";
cpClient->send (strMessage);
}
catch (std::exception const&)
{
WebSocket::closeTooSlowClient (*cpClient, crTooSlow);
}
}
void send (connection_ptr const& cpClient, Json::Value const& jvObj,
bool broadcast)
{
send (cpClient, to_string (jvObj), broadcast);
}
void pingTimer (connection_ptr const& cpClient)
{
wsc_ptr ptr;
{
ScopedLockType sl (mLock);
auto it = mMap.find (cpClient);
if (it == mMap.end ())
return;
ptr = it->second;
}
std::string data ("ping");
if (ptr->onPingTimer (data))
{
cpClient->terminate ({});
try
{
JLOG (j_.debug()) <<
"Ws:: ping_out(" <<
// TODO(tom): re-enable this logging.
// cpClient->get_socket ().remote_endpoint ().to_string ()
")";
}
catch (std::exception const&)
{
}
}
else
cpClient->ping (data);
}
void on_send_empty (connection_ptr cpClient) override
{
wsc_ptr ptr;
{
ScopedLockType sl (mLock);
auto it = mMap.find (cpClient);
if (it == mMap.end ())
return;
ptr = it->second;
}
ptr->onSendEmpty ();
}
void on_open (connection_ptr cpClient) override
{
ScopedLockType sl (mLock);
try
{
auto remoteEndpoint = cpClient->get_socket ().remote_endpoint ();
auto connection = std::make_shared <ConnectionImpl <WebSocket> > (
desc_.app,
desc_.resourceManager,
desc_.source,
*this,
cpClient,
makeBeastEndpoint (remoteEndpoint),
WebSocket::getStrand (*cpClient).get_io_service (),
cpClient->get_identity());
connection->setPingTimer ();
auto result = mMap.emplace (cpClient, std::move (connection));
assert (result.second);
(void) result.second;
JLOG (j_.debug()) <<
"Ws:: on_open(" << remoteEndpoint << ")";
}
catch (std::exception const&)
{
}
}
void on_pong (connection_ptr cpClient, std::string data) override
{
wsc_ptr ptr;
{
ScopedLockType sl (mLock);
auto it = mMap.find (cpClient);
if (it == mMap.end ())
return;
ptr = it->second;
}
try
{
JLOG (j_.debug()) <<
"Ws:: on_pong(" << cpClient->get_socket ().remote_endpoint() << ")";
}
catch (std::exception const&)
{
}
ptr->onPong (data);
}
void on_close (connection_ptr cpClient) override
{
doClose (cpClient, "on_close");
}
void on_fail (connection_ptr cpClient) override
{
doClose (cpClient, "on_fail");
}
void doClose (connection_ptr const& cpClient, char const* reason)
{
// we cannot destroy the connection while holding the map lock or we
// deadlock with pubLedger
wsc_ptr ptr;
{
ScopedLockType sl (mLock);
auto it = mMap.find (cpClient);
if (it == mMap.end ())
{
try
{
JLOG (j_.debug()) <<
"Ws:: " << reason << "(" <<
cpClient->get_socket ().remote_endpoint() <<
") not found";
}
catch (std::exception const&)
{
}
return;
}
ptr = std::move (it->second);
// prevent the ConnectionImpl from being destroyed until we release
// the lock
mMap.erase (it);
}
ptr->preDestroy (); // Must be done before we return
try
{
JLOG (j_.debug()) <<
"Ws:: " << reason << "(" <<
cpClient->get_socket ().remote_endpoint () << ") found";
}
catch (std::exception const&)
{
}
// Must be done without holding the websocket send lock
app_.getJobQueue ().addJob (
jtCLIENT,
"WSClient::destroy",
[p = std::move(ptr)] (Job&)
{
ConnectionImpl <WebSocket>::destroy(std::move (p));
});
}
void message_job(std::string const& name, connection_ptr const& cpClient)
{
app_.getJobQueue().postCoro(jtCLIENT, "WSClient",
[this, cpClient]
(auto const& c)
{
this->do_messages(c, cpClient);
});
}
void on_message (connection_ptr cpClient, message_ptr mpMessage) override
{
wsc_ptr ptr;
{
ScopedLockType sl (mLock);
auto it = mMap.find (cpClient);
if (it == mMap.end ())
return;
ptr = it->second;
}
bool bRejected, bRunQ;
ptr->rcvMessage (mpMessage, bRejected, bRunQ);
if (bRejected)
{
try
{
JLOG (j_.debug()) <<
"Ws:: Rejected(" <<
cpClient->get_socket().remote_endpoint() <<
") '" << mpMessage->get_payload () << "'";
}
catch (std::exception const&)
{
}
}
if (bRunQ)
message_job("command", cpClient);
}
void do_messages (std::shared_ptr<JobQueue::Coro> const& c,
connection_ptr const& cpClient)
{
wsc_ptr ptr;
{
ScopedLockType sl (mLock);
auto it = mMap.find (cpClient);
if (it == mMap.end ())
return;
ptr = it->second;
}
// This loop prevents a single thread from handling more
// than 3 operations for the same client, otherwise a client
// can monopolize resources.
//
for (int i = 0; i < 3; ++i)
{
if(app_.getJobQueue().isStopping())
return;
boost::optional <std::string> msg = ptr->getMessage ();
if (! msg)
return;
do_message(c, cpClient, ptr, *msg);
}
if (ptr->checkMessage ())
message_job("more", cpClient);
}
void
do_message (std::shared_ptr<JobQueue::Coro> const& c,
const connection_ptr cpClient, wsc_ptr conn,
const std::string& message)
{
Json::Value jvRequest;
Json::Reader jrReader;
try
{
JLOG (j_.debug())
<< "Ws:: Receiving("
<< cpClient->get_socket ().remote_endpoint ()
<< ") '" << message << "'";
}
catch (std::exception const&)
{
}
if (!jrReader.parse (message, jvRequest) ||
! jvRequest || !jvRequest.isObject ())
{
Json::Value jvResult (Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid"; // Received invalid json.
jvResult[jss::value] = message;
send (cpClient, jvResult, false);
}
else
{
using namespace std::chrono;
auto const start = high_resolution_clock::now();
auto buffer = to_string(conn->invokeCommand(jvRequest, c));
rpc_time_.notify (
static_cast <beast::insight::Event::value_type> (
duration_cast <milliseconds> (
high_resolution_clock::now () - start)));
++rpc_requests_;
rpc_size_.notify (
static_cast <beast::insight::Event::value_type>
(buffer.size()));
send (cpClient, buffer, false);
}
}
boost::asio::ssl::context&
get_ssl_context () override
{
return *port().context;
}
bool plain_only() override
{
return port().protocol.count("wss") == 0;
}
bool secure_only() override
{
return port().protocol.count("ws") == 0;
}
// Respond to http requests.
bool http (connection_ptr cpClient) override
{
std::string reason;
if (! app_.serverOkay (reason))
{
cpClient->set_body (
"<HTML><BODY>Server cannot accept clients: " +
reason + "</BODY></HTML>");
return false;
}
cpClient->set_body (
"<!DOCTYPE html><html><head><title>" + systemName () +
" Test page for rippled</title></head>" + "<body><h1>" +
systemName () + " Test</h1><p>This page shows rippled http(s) "
"connectivity is working.</p></body></html>");
return true;
}
};
} // websocket
} // ripple
#endif

View File

@@ -1,52 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLED_RIPPLE_WEBSOCKET_MAKESERVER_H
#define RIPPLED_RIPPLE_WEBSOCKET_MAKESERVER_H
#include <ripple/app/main/Application.h>
#include <ripple/app/main/CollectorManager.h>
#include <ripple/net/InfoSub.h>
#include <ripple/server/Port.h>
namespace beast { class Stoppable; }
namespace ripple {
class BasicConfig;
namespace Resource { class Manager; }
namespace websocket {
struct ServerDescription
{
Application& app;
Port port;
Resource::Manager& resourceManager;
InfoSub::Source& source;
beast::Journal& journal;
BasicConfig const& config;
CollectorManager& collectorManager;
};
} // websocket
} // ripple
#endif

View File

@@ -1,143 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_WEBSOCKET_SERVER_H_INCLUDED
#define RIPPLE_WEBSOCKET_SERVER_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/core/ThreadEntry.h>
#include <ripple/beast/core/Thread.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
namespace ripple {
namespace websocket {
template <class WebSocket>
class Server : public Stoppable
{
private:
ServerDescription desc_;
std::recursive_mutex endpointMutex_; // TODO: why is this recursive?
std::thread thread_;
beast::Journal j_;
typename WebSocket::EndpointPtr endpoint_;
public:
Server (ServerDescription const& desc)
: Stoppable (WebSocket::versionName(), desc.source)
, desc_(desc)
, j_ (desc.app.journal ("WebSocket"))
{
}
~Server ()
{
assert (!thread_.joinable());
if (thread_.joinable())
LogicError ("WebSocket::Server::onStop not called.");
}
private:
void run ()
{
threadEntry (
this, &Server::runImpl, "Server<WebSocket>::run()");
}
void runImpl ()
{
beast::Thread::setCurrentThreadName ("WebSocket");
JLOG (j_.warn())
<< "Websocket: listening on " << desc_.port;
try
{
listen();
}
catch (std::exception const& e)
{
JLOG (j_.warn()) <<
"Websocket: failed to listen on " <<
desc_.port << ": " << e.what();
}
{
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
endpoint_.reset();
}
JLOG (j_.warn())
<< "Websocket: finished listening on " << desc_.port;
stopped ();
JLOG (j_.warn())
<< "Websocket: stopped on " << desc_.port;
}
void onStart () override
{
JLOG (j_.warn())
<< "Websocket: creating endpoint " << desc_.port;
{
auto handler = WebSocket::makeHandler (desc_);
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
endpoint_ = WebSocket::makeEndpoint (std::move (handler));
}
thread_ = std::thread {&Server<WebSocket>::run, this};
auto ep = [&]
{
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
return endpoint_;
}();
if (ep)
ep->wait_for_listen();
}
void onStop () override
{
JLOG (j_.warn())
<< "Websocket: onStop " << desc_.port;
auto endpoint = [&]
{
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
return endpoint_;
}();
if (endpoint)
endpoint->stop ();
thread_.join();
}
void listen();
};
} // websocket
} // ripple
#endif