Check file handle limit on startup (RIPD-442, RIPD-1024):

Calculate the number of file descriptors that are needed during
execution based on the configuration file, with a hard floor
of 1024, adjusting the limit if possible. Refuse to run if enough
fds are not available.

Additionally, allow administrators to limit the number of incoming
connections a configured port will accept. By default no limit is
imposed.
This commit is contained in:
Nik Bougalis
2016-04-08 09:39:09 -07:00
parent 79ca82c078
commit 47eb4da080
25 changed files with 265 additions and 88 deletions

View File

@@ -534,6 +534,7 @@ public:
void signalStop() override; void signalStop() override;
bool checkSigs() const override; bool checkSigs() const override;
void checkSigs(bool) override; void checkSigs(bool) override;
int fdlimit () const override;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -1156,14 +1157,12 @@ ApplicationImp::doStart()
void void
ApplicationImp::run() ApplicationImp::run()
{ {
if (!config_->RUN_STANDALONE)
{ {
if (!config_->RUN_STANDALONE) // VFALCO NOTE This seems unnecessary. If we properly refactor the load
{ // manager then the deadlock detector can just always be "armed"
// VFALCO NOTE This seems unnecessary. If we properly refactor the load //
// manager then the deadlock detector can just always be "armed" getLoadManager ().activateDeadlockDetector ();
//
getLoadManager ().activateDeadlockDetector ();
}
} }
m_stop.wait (); m_stop.wait ();
@@ -1201,6 +1200,27 @@ void ApplicationImp::checkSigs(bool check)
checkSigs_ = check; checkSigs_ = check;
} }
int ApplicationImp::fdlimit() const
{
// Standard handles, config file, misc I/O etc:
int needed = 128;
// 1.5 times the configured peer limit for peer connections:
needed += static_cast<int>(0.5 + (1.5 * m_overlay->limit()));
// the number of fds needed by the backend (internally
// doubled if online delete is enabled).
needed += std::max(5, m_shaMapStore->fdlimit());
// One fd per incoming connection a port can accept, or
// if no limit is set, assume it'll handle 256 clients.
for(auto const& p : serverHandler_->setup().ports)
needed += std::max (256, p.limit);
// The minimum number of file descriptors we need is 1024:
return std::max(1024, needed);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void

View File

@@ -151,13 +151,11 @@ public:
virtual bool serverOkay (std::string& reason) = 0; virtual bool serverOkay (std::string& reason) = 0;
virtual beast::Journal journal (std::string const& name) = 0; virtual beast::Journal journal (std::string const& name) = 0;
/** Retrieve the "wallet database"
It looks like this is used to store the unique node list. /* Returns the number of file descriptors the application wants */
*/ virtual int fdlimit () const = 0;
// VFALCO TODO Rename, document this
// NOTE This will be replaced by class Validators /** Retrieve the "wallet database" */
//
virtual DatabaseCon& getWalletDB () = 0; virtual DatabaseCon& getWalletDB () = 0;
}; };

View File

@@ -54,23 +54,6 @@ namespace po = boost::program_options;
namespace ripple { namespace ripple {
void setupServer (Application& app)
{
#ifdef RLIMIT_NOFILE
struct rlimit rl;
if (getrlimit(RLIMIT_NOFILE, &rl) == 0)
{
if (rl.rlim_cur != rl.rlim_max)
{
rl.rlim_cur = rl.rlim_max;
setrlimit(RLIMIT_NOFILE, &rl);
}
}
#endif
app.setup ();
}
boost::filesystem::path boost::filesystem::path
getEntropyFile(Config const& config) getEntropyFile(Config const& config)
{ {
@@ -80,6 +63,47 @@ getEntropyFile(Config const& config)
return boost::filesystem::path (path) / "random.seed"; return boost::filesystem::path (path) / "random.seed";
} }
bool
adjustDescriptorLimit(int needed)
{
#ifdef RLIMIT_NOFILE
// Get the current limit, then adjust it to what we need.
struct rlimit rl;
int available = 0;
if (getrlimit(RLIMIT_NOFILE, &rl) == 0)
{
// If the limit is infnite, then we are good.
if (rl.rlim_cur == RLIM_INFINITY)
available = needed;
else
available = rl.rlim_cur;
if (available < needed)
{
// Ignore the rlim_max, as the process may
// be configured to override it anyways. We
// ask for the number descriptors we need.
rl.rlim_cur = needed;
if (setrlimit(RLIMIT_NOFILE, &rl) == 0)
available = rl.rlim_cur;
}
}
if (needed > available)
{
std::cerr << "Insufficient number of file descriptors:\n";
std::cerr << " Needed: " << needed << '\n';
std::cerr << " Available: " << available << '\n';
return false;
}
#endif
return true;
}
void startServer (Application& app) void startServer (Application& app)
{ {
// //
@@ -420,6 +444,11 @@ int run (int argc, char** argv)
// No arguments. Run server. // No arguments. Run server.
if (!vm.count ("parameters")) if (!vm.count ("parameters"))
{ {
// We want at least 1024 file descriptors. We'll
// tweak this further.
if (!adjustDescriptorLimit(1024))
return -1;
if (HaveSustain() && !vm.count ("fg") && !config->RUN_STANDALONE) if (HaveSustain() && !vm.count ("fg") && !config->RUN_STANDALONE)
{ {
auto const ret = DoSustain (); auto const ret = DoSustain ();
@@ -438,7 +467,16 @@ int run (int argc, char** argv)
std::move(config), std::move(config),
std::move(logs), std::move(logs),
std::move(timeKeeper)); std::move(timeKeeper));
setupServer (*app); app->setup ();
// With our configuration parsed, ensure we have
// enough file descriptors available:
if (!adjustDescriptorLimit(app->fdlimit()))
{
StopSustain();
return -1;
}
startServer (*app); startServer (*app);
return 0; return 0;
} }
@@ -451,8 +489,6 @@ int run (int argc, char** argv)
*logs); *logs);
} }
extern int run (int argc, char** argv);
} // ripple } // ripple
// Must be outside the namespace for obvious reasons // Must be outside the namespace for obvious reasons

View File

@@ -75,6 +75,9 @@ public:
/** Highest ledger that may be deleted. */ /** Highest ledger that may be deleted. */
virtual LedgerIndex getCanDelete() = 0; virtual LedgerIndex getCanDelete() = 0;
/** The number of files that are needed. */
virtual int fdlimit() const = 0;
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -221,6 +221,9 @@ SHAMapStoreImp::makeDatabase (std::string const& name,
makeBackendRotating (state.writableDb)); makeBackendRotating (state.writableDb));
std::shared_ptr <NodeStore::Backend> archiveBackend ( std::shared_ptr <NodeStore::Backend> archiveBackend (
makeBackendRotating (state.archiveDb)); makeBackendRotating (state.archiveDb));
fdlimit_ = writableBackend->fdlimit() + archiveBackend->fdlimit();
std::unique_ptr <NodeStore::DatabaseRotating> dbr = std::unique_ptr <NodeStore::DatabaseRotating> dbr =
makeDatabaseRotating (name, readThreads, writableBackend, makeDatabaseRotating (name, readThreads, writableBackend,
archiveBackend); archiveBackend);
@@ -237,8 +240,9 @@ SHAMapStoreImp::makeDatabase (std::string const& name,
} }
else else
{ {
db = NodeStore::Manager::instance().make_Database (name, scheduler_, nodeStoreJournal_, db = NodeStore::Manager::instance().make_Database (name, scheduler_,
readThreads, setup_.nodeDatabase); nodeStoreJournal_, readThreads, setup_.nodeDatabase);
fdlimit_ = db->fdlimit();
} }
return db; return db;
@@ -269,6 +273,12 @@ SHAMapStoreImp::rendezvous() const
}); });
} }
int
SHAMapStoreImp::fdlimit () const
{
return fdlimit_;
}
bool bool
SHAMapStoreImp::copyNode (std::uint64_t& nodeCount, SHAMapStoreImp::copyNode (std::uint64_t& nodeCount,
SHAMapAbstractNode const& node) SHAMapAbstractNode const& node)

View File

@@ -108,6 +108,7 @@ private:
TreeNodeCache* treeNodeCache_ = nullptr; TreeNodeCache* treeNodeCache_ = nullptr;
DatabaseCon* transactionDb_ = nullptr; DatabaseCon* transactionDb_ = nullptr;
DatabaseCon* ledgerDb_ = nullptr; DatabaseCon* ledgerDb_ = nullptr;
int fdlimit_ = 0;
public: public:
SHAMapStoreImp (Application& app, SHAMapStoreImp (Application& app,
@@ -164,6 +165,7 @@ public:
void onLedgerClosed (std::shared_ptr<Ledger const> const& ledger) override; void onLedgerClosed (std::shared_ptr<Ledger const> const& ledger) override;
void rendezvous() const override; void rendezvous() const override;
int fdlimit() const override;
private: private:
// callback for visitNodes // callback for visitNodes

View File

@@ -105,6 +105,9 @@ public:
/** Perform consistency checks on database .*/ /** Perform consistency checks on database .*/
virtual void verify() = 0; virtual void verify() = 0;
/** Returns the number of file handles the backend expects to need */
virtual int fdlimit() const = 0;
}; };
} }

View File

@@ -147,6 +147,9 @@ public:
virtual std::uint32_t getFetchHitCount () const = 0; virtual std::uint32_t getFetchHitCount () const = 0;
virtual std::uint32_t getStoreSize () const = 0; virtual std::uint32_t getStoreSize () const = 0;
virtual std::uint32_t getFetchSize () const = 0; virtual std::uint32_t getFetchSize () const = 0;
/** Return the number of files needed by our backend */
virtual int fdlimit() const = 0;
}; };
} }

View File

@@ -178,6 +178,12 @@ public:
verify() override verify() override
{ {
} }
int
fdlimit() const override
{
return 0;
}
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -244,6 +244,13 @@ public:
db_.open (dp, kp, lp, db_.open (dp, kp, lp,
arena_alloc_size); arena_alloc_size);
} }
/** Returns the number of file handles the backend expects to need */
int
fdlimit() const override
{
return 3;
}
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -98,6 +98,13 @@ public:
{ {
} }
/** Returns the number of file handles the backend expects to need */
int
fdlimit() const override
{
return 0;
}
private: private:
}; };

View File

@@ -99,6 +99,7 @@ public:
BatchWriter m_batch; BatchWriter m_batch;
std::string m_name; std::string m_name;
std::unique_ptr <rocksdb::DB> m_db; std::unique_ptr <rocksdb::DB> m_db;
int fdlimit_ = 2048;
RocksDBBackend (int keyBytes, Section const& keyValues, RocksDBBackend (int keyBytes, Section const& keyValues,
Scheduler& scheduler, beast::Journal journal, RocksDBEnv* env) Scheduler& scheduler, beast::Journal journal, RocksDBEnv* env)
@@ -122,7 +123,8 @@ public:
if (auto const v = get<int>(keyValues, "filter_bits")) if (auto const v = get<int>(keyValues, "filter_bits"))
table_options.filter_policy.reset (rocksdb::NewBloomFilterPolicy (v)); table_options.filter_policy.reset (rocksdb::NewBloomFilterPolicy (v));
get_if_exists (keyValues, "open_files", options.max_open_files); if (get_if_exists (keyValues, "open_files", options.max_open_files))
fdlimit_ = options.max_open_files;
if (keyValues.exists ("file_size_mb")) if (keyValues.exists ("file_size_mb"))
{ {
@@ -361,6 +363,13 @@ public:
verify() override verify() override
{ {
} }
/** Returns the number of file handles the backend expects to need */
int
fdlimit() const override
{
return fdlimit_;
}
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -95,6 +95,7 @@ public:
size_t const m_keyBytes; size_t const m_keyBytes;
std::string m_name; std::string m_name;
std::unique_ptr <rocksdb::DB> m_db; std::unique_ptr <rocksdb::DB> m_db;
int fdlimit_ = 2048;
RocksDBQuickBackend (int keyBytes, Section const& keyValues, RocksDBQuickBackend (int keyBytes, Section const& keyValues,
Scheduler& scheduler, beast::Journal journal, RocksDBQuickEnv* env) Scheduler& scheduler, beast::Journal journal, RocksDBQuickEnv* env)
@@ -116,7 +117,6 @@ public:
get_if_exists (keyValues, "style", style); get_if_exists (keyValues, "style", style);
get_if_exists (keyValues, "threads", threads); get_if_exists (keyValues, "threads", threads);
// Set options // Set options
rocksdb::Options options; rocksdb::Options options;
options.create_if_missing = true; options.create_if_missing = true;
@@ -160,7 +160,8 @@ public:
// options.memtable_factory.reset( // options.memtable_factory.reset(
// rocksdb::NewHashCuckooRepFactory(options.write_buffer_size)); // rocksdb::NewHashCuckooRepFactory(options.write_buffer_size));
get_if_exists (keyValues, "open_files", options.max_open_files); if (get_if_exists (keyValues, "open_files", options.max_open_files))
fdlimit_ = options.max_open_files;
if (keyValues.exists ("compression") && if (keyValues.exists ("compression") &&
(get<int>(keyValues, "compression") == 0)) (get<int>(keyValues, "compression") == 0))
@@ -363,6 +364,13 @@ public:
verify() override verify() override
{ {
} }
/** Returns the number of file handles the backend expects to need */
int
fdlimit() const override
{
return fdlimit_;
}
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -61,6 +61,8 @@ private:
std::vector <std::thread> m_readThreads; std::vector <std::thread> m_readThreads;
bool m_readShut; bool m_readShut;
uint64_t m_readGen; // current read generation uint64_t m_readGen; // current read generation
int fdlimit_;
public: public:
DatabaseImp (std::string const& name, DatabaseImp (std::string const& name,
Scheduler& scheduler, Scheduler& scheduler,
@@ -83,8 +85,10 @@ public:
, m_fetchSize (0) , m_fetchSize (0)
{ {
for (int i = 0; i < readThreads; ++i) for (int i = 0; i < readThreads; ++i)
m_readThreads.push_back (std::thread (&DatabaseImp::threadEntry, m_readThreads.emplace_back (&DatabaseImp::threadEntry, this);
this));
if (m_backend)
fdlimit_ = m_backend->fdlimit();
} }
~DatabaseImp () ~DatabaseImp ()
@@ -432,6 +436,11 @@ public:
return m_fetchSize; return m_fetchSize;
} }
int fdlimit() const override
{
return fdlimit_;
}
private: private:
std::atomic <std::uint32_t> m_storeCount; std::atomic <std::uint32_t> m_storeCount;
std::atomic <std::uint32_t> m_fetchTotalCount; std::atomic <std::uint32_t> m_fetchTotalCount;

View File

@@ -92,6 +92,11 @@ public:
void void
connect (beast::IP::Endpoint const& address) = 0; connect (beast::IP::Endpoint const& address) = 0;
/** Returns the maximum number of peers we are configured to allow. */
virtual
int
limit () = 0;
/** Returns the number of active peers. /** Returns the number of active peers.
Active peers are only those peers that have completed the Active peers are only those peers that have completed the
handshake and are using the peer protocol. handshake and are using the peer protocol.

View File

@@ -775,6 +775,12 @@ OverlayImpl::size()
return ids_.size (); return ids_.size ();
} }
int
OverlayImpl::limit()
{
return m_peerFinder->config().maxPeers;
}
Json::Value Json::Value
OverlayImpl::crawl() OverlayImpl::crawl()
{ {

View File

@@ -266,6 +266,22 @@ public:
bool isInbound, bool isInbound,
int bytes); int bytes);
/* The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the Ripple protocol.
*/
std::size_t
size() override;
int
limit () override;
Json::Value
crawl() override;
Json::Value
json() override;
private: private:
std::shared_ptr<Writer> std::shared_ptr<Writer>
makeRedirectResponse (PeerFinder::Slot::ptr const& slot, makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
@@ -274,20 +290,6 @@ private:
void void
connect (beast::IP::Endpoint const& remote_endpoint) override; connect (beast::IP::Endpoint const& remote_endpoint) override;
/* The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the Ripple protocol.
*/
// VFALCO Why private?
std::size_t
size() override;
Json::Value
crawl() override;
Json::Value
json() override;
bool bool
processRequest (beast::http::message const& req, processRequest (beast::http::message const& req,
Handoff& handoff); Handoff& handoff);

View File

@@ -39,10 +39,6 @@ class Server;
*/ */
struct Handler struct Handler
{ {
/** Called when the connection is accepted and we know remoteAddress. */
// DEPRECATED
virtual void onAccept (Session& session) = 0;
/** Called when a connection is accepted. /** Called when a connection is accepted.
@return `true` If we should keep the connection. @return `true` If we should keep the connection.
*/ */

View File

@@ -51,6 +51,10 @@ struct Port
std::string ssl_chain; std::string ssl_chain;
std::shared_ptr<boost::asio::ssl::context> context; std::shared_ptr<boost::asio::ssl::context> context;
// How many incoming connections are allowed on this
// port in the range [0, 65535] where 0 means unlimited.
int limit = 0;
// Returns `true` if any websocket protocols are specified // Returns `true` if any websocket protocols are specified
bool websockets() const; bool websockets() const;
@@ -77,6 +81,7 @@ struct ParsedPort
std::string ssl_key; std::string ssl_key;
std::string ssl_cert; std::string ssl_cert;
std::string ssl_chain; std::string ssl_chain;
int limit = 0;
boost::optional<boost::asio::ip::address> ip; boost::optional<boost::asio::ip::address> ip;
boost::optional<std::uint16_t> port; boost::optional<std::uint16_t> port;

View File

@@ -73,7 +73,14 @@ PlainHTTPPeer::PlainHTTPPeer (Port const& port, Handler& handler,
void void
PlainHTTPPeer::run () PlainHTTPPeer::run ()
{ {
handler_.onAccept (session()); if (!handler_.onAccept (session(), remote_address_))
{
boost::asio::spawn (strand_,
std::bind (&PlainHTTPPeer::do_close,
shared_from_this()));
return;
}
if (! stream_.is_open()) if (! stream_.is_open())
return; return;

View File

@@ -19,6 +19,7 @@
#include <ripple/server/Port.h> #include <ripple/server/Port.h>
#include <beast/http/rfc2616.h> #include <beast/http/rfc2616.h>
#include <beast/module/core/text/LexicalCast.h>
namespace ripple { namespace ripple {
@@ -159,20 +160,22 @@ parse_Port (ParsedPort& port, Section const& section, std::ostream& log)
auto const result = section.find("port"); auto const result = section.find("port");
if (result.second) if (result.second)
{ {
auto const ul = std::stoul(result.first); try
if (ul > std::numeric_limits<std::uint16_t>::max())
{ {
log << "Value '" << result.first port.port =
<< "' for key 'port' is out of range\n"; beast::lexicalCastThrow<std::uint16_t>(result.first);
Throw<std::exception> ();
// Port 0 is not supported
if (*port.port == 0)
Throw<std::exception> ();
} }
if (ul == 0) catch (std::exception const& ex)
{ {
log << log <<
"Value '0' for key 'port' is invalid\n"; "Invalid value '" << result.first << "' for key " <<
Throw<std::exception> (); "'port' in [" << section.name() << "]\n";
Throw();
} }
port.port = static_cast<std::uint16_t>(ul);
} }
} }
@@ -186,6 +189,26 @@ parse_Port (ParsedPort& port, Section const& section, std::ostream& log)
} }
} }
{
auto const lim = get (section, "limit", "unlimited");
if (!beast::ci_equal (lim, "unlimited"))
{
try
{
port.limit = static_cast<int> (
beast::lexicalCastThrow<std::uint16_t>(lim));
}
catch (std::exception const& ex)
{
log <<
"Invalid value '" << lim << "' for key " <<
"'limit' in [" << section.name() << "]\n";
Throw();
}
}
}
populate (section, "admin", log, port.admin_ip, true, {}); populate (section, "admin", log, port.admin_ip, true, {});
populate (section, "secure_gateway", log, port.secure_gateway_ip, false, populate (section, "secure_gateway", log, port.secure_gateway_ip, false,
port.admin_ip.get_value_or({})); port.admin_ip.get_value_or({}));

View File

@@ -78,7 +78,14 @@ SSLHTTPPeer::SSLHTTPPeer (Port const& port, Handler& handler,
void void
SSLHTTPPeer::run() SSLHTTPPeer::run()
{ {
handler_.onAccept (session()); if (!handler_.onAccept (session(), remote_address_))
{
boost::asio::spawn (strand_,
std::bind (&SSLHTTPPeer::do_close,
shared_from_this()));
return;
}
if (! stream_.lowest_layer().is_open()) if (! stream_.lowest_layer().is_open())
return; return;

View File

@@ -94,15 +94,22 @@ ServerHandlerImp::onStop()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void
ServerHandlerImp::onAccept (Session& session)
{
}
bool bool
ServerHandlerImp::onAccept (Session& session, ServerHandlerImp::onAccept (Session& session,
boost::asio::ip::tcp::endpoint endpoint) boost::asio::ip::tcp::endpoint endpoint)
{ {
std::lock_guard<std::mutex> l(countlock_);
auto const c = ++count_[session.port()];
if (session.port().limit && c >= session.port().limit)
{
JLOG (m_journal.trace()) <<
session.port().name << " is full; dropping " <<
endpoint;
return false;
}
return true; return true;
} }
@@ -188,6 +195,8 @@ void
ServerHandlerImp::onClose (Session& session, ServerHandlerImp::onClose (Session& session,
boost::system::error_code const&) boost::system::error_code const&)
{ {
std::lock_guard<std::mutex> l(countlock_);
--count_[session.port()];
} }
void void

View File

@@ -28,9 +28,16 @@
#include <ripple/server/Session.h> #include <ripple/server/Session.h>
#include <ripple/rpc/RPCHandler.h> #include <ripple/rpc/RPCHandler.h>
#include <ripple/app/main/CollectorManager.h> #include <ripple/app/main/CollectorManager.h>
#include <map>
#include <mutex>
namespace ripple { namespace ripple {
bool operator< (Port const& lhs, Port const& rhs)
{
return lhs.name < rhs.name;
}
// Private implementation // Private implementation
class ServerHandlerImp class ServerHandlerImp
: public ServerHandler : public ServerHandler
@@ -47,6 +54,8 @@ private:
beast::insight::Counter rpc_requests_; beast::insight::Counter rpc_requests_;
beast::insight::Event rpc_size_; beast::insight::Event rpc_size_;
beast::insight::Event rpc_time_; beast::insight::Event rpc_time_;
std::mutex countlock_;
std::map<std::reference_wrapper<Port const>, int> count_;
public: public:
ServerHandlerImp (Application& app, Stoppable& parent, ServerHandlerImp (Application& app, Stoppable& parent,
@@ -79,9 +88,6 @@ private:
// HTTP::Handler // HTTP::Handler
// //
void
onAccept (Session& session) override;
bool bool
onAccept (Session& session, onAccept (Session& session,
boost::asio::ip::tcp::endpoint endpoint) override; boost::asio::ip::tcp::endpoint endpoint) override;

View File

@@ -97,11 +97,6 @@ public:
struct TestHandler : Handler struct TestHandler : Handler
{ {
void
onAccept (Session& session) override
{
}
bool bool
onAccept (Session& session, onAccept (Session& session,
boost::asio::ip::tcp::endpoint endpoint) override boost::asio::ip::tcp::endpoint endpoint) override
@@ -304,11 +299,6 @@ public:
{ {
struct NullHandler : Handler struct NullHandler : Handler
{ {
void
onAccept (Session& session) override
{
}
bool bool
onAccept (Session& session, onAccept (Session& session,
boost::asio::ip::tcp::endpoint endpoint) override boost::asio::ip::tcp::endpoint endpoint) override