#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace xrpl { static void fixConfigPorts(Config& config, Endpoints const& endpoints); // VFALCO TODO Move the function definitions into the class declaration class ApplicationImp : public Application, public BasicApp { private: class io_latency_sampler { private: beast::insight::Event m_event; beast::Journal m_journal; beast::io_latency_probe m_probe; std::atomic lastSample_; public: io_latency_sampler( beast::insight::Event ev, beast::Journal journal, std::chrono::milliseconds interval, boost::asio::io_context& ios) : m_event(std::move(ev)), m_journal(journal), m_probe(interval, ios) { } void start() { m_probe.sample(std::ref(*this)); } template void operator()(Duration const& elapsed) { using namespace std::chrono; auto const lastSample = ceil(elapsed); lastSample_ = lastSample; if (lastSample >= 10ms) m_event.notify(lastSample); if (lastSample >= 500ms) { JLOG(m_journal.warn()) << "io_context latency = " << lastSample.count(); } } [[nodiscard]] std::chrono::milliseconds get() const { return lastSample_.load(); } void cancel() { m_probe.cancel(); } void cancel_async() { m_probe.cancel_async(); } }; public: std::unique_ptr config_; std::unique_ptr logs_; std::unique_ptr timeKeeper_; std::uint64_t const instanceCookie_; beast::Journal m_journal; std::unique_ptr perfLog_; std::unique_ptr telemetry_; Application::MutexType m_masterMutex; // Required by the SHAMapStore TransactionMaster m_txMaster; std::unique_ptr m_collectorManager; std::unique_ptr m_jobQueue; NodeStoreScheduler m_nodeStoreScheduler; std::unique_ptr m_shaMapStore; PendingSaves pendingSaves_; std::optional openLedger_; NodeCache m_tempNodeCache; CachedSLEs cachedSLEs_; std::unique_ptr networkIDService_; std::optional> nodeIdentity_; ValidatorKeys const validatorKeys_; std::unique_ptr m_resourceManager; std::unique_ptr m_nodeStore; NodeFamily nodeFamily_; std::unique_ptr m_orderBookDB; std::unique_ptr m_pathRequestManager; std::unique_ptr m_ledgerMaster; std::unique_ptr ledgerCleaner_; std::unique_ptr m_inboundLedgers; std::unique_ptr m_inboundTransactions; std::unique_ptr m_ledgerReplayer; TaggedCache m_acceptedLedgerCache; std::unique_ptr m_networkOPs; std::unique_ptr cluster_; std::unique_ptr peerReservations_; std::unique_ptr validatorManifests_; std::unique_ptr publisherManifests_; std::unique_ptr validators_; std::unique_ptr validatorSites_; std::unique_ptr serverHandler_; std::unique_ptr m_amendmentTable; std::unique_ptr mFeeTrack; std::unique_ptr hashRouter_; RCLValidations mValidations; std::unique_ptr m_loadManager; std::unique_ptr txQ_; ClosureCounter waitHandlerCounter_; boost::asio::steady_timer sweepTimer_; boost::asio::steady_timer entropyTimer_; std::optional relationalDatabase_; std::unique_ptr mWalletDB; std::unique_ptr overlay_; std::optional trapTxID_; boost::asio::signal_set m_signals; std::atomic_flag isTimeToStop; std::atomic checkSigs_; std::unique_ptr m_resolver; io_latency_sampler m_io_latency_sampler; std::unique_ptr grpcServer_; //-------------------------------------------------------------------------- static std::size_t numberOfThreads(Config const& config) { #if XRPL_SINGLE_IO_SERVICE_THREAD return 1; #else if (config.IO_WORKERS > 0) return config.IO_WORKERS; auto const cores = std::thread::hardware_concurrency(); // Use a single thread when running on under-provisioned systems // or if we are configured to use minimal resources. if ((cores == 1) || ((config.NODE_SIZE == 0) && (cores == 2))) return 1; // Otherwise, prefer six threads. return 6; #endif } //-------------------------------------------------------------------------- ApplicationImp( std::unique_ptr config, std::unique_ptr logs, std::unique_ptr timeKeeper) : BasicApp(numberOfThreads(*config)) , config_(std::move(config)) , logs_(std::move(logs)) , timeKeeper_(std::move(timeKeeper)) , instanceCookie_( 1 + rand_int(crypto_prng(), std::numeric_limits::max() - 1)) , m_journal(logs_->journal("Application")) // PerfLog must be started before any other threads are launched. , perfLog_( perf::make_PerfLog( perf::setup_PerfLog(config_->section("perf"), config_->CONFIG_DIR), *this, logs_->journal("PerfLog"), [this] { signalStop("PerfLog"); })) , telemetry_( telemetry::make_Telemetry( telemetry::setup_Telemetry( config_->section("telemetry"), "", // Updated later via setServiceInstanceId() BuildInfo::getVersionString(), config_->NETWORK_ID), logs_->journal("Telemetry"))) , m_txMaster(*this) , m_collectorManager( make_CollectorManager(config_->section(SECTION_INSIGHT), logs_->journal("Collector"))) , m_jobQueue( std::make_unique( [](std::unique_ptr const& config) { if (config->standalone() && !config->FORCE_MULTI_THREAD) return 1; if (config->WORKERS) return config->WORKERS; auto count = static_cast(std::thread::hardware_concurrency()); // Be more aggressive about the number of threads to use // for the job queue if the server is configured as // "large" or "huge" if there are enough cores. if (config->NODE_SIZE >= 4 && count >= 16) { count = 6 + std::min(count, 8); } else if (config->NODE_SIZE >= 3 && count >= 8) { count = 4 + std::min(count, 6); } else { count = 2 + std::min(count, 4); } return count; }(config_), m_collectorManager->group("jobq"), logs_->journal("JobQueue"), *logs_, *perfLog_)) , m_nodeStoreScheduler(*m_jobQueue) , m_shaMapStore( make_SHAMapStore(*this, m_nodeStoreScheduler, logs_->journal("SHAMapStore"))) , m_tempNodeCache( "NodeCache", 16384, std::chrono::seconds{90}, stopwatch(), logs_->journal("TaggedCache")) , cachedSLEs_( "Cached SLEs", 0, std::chrono::minutes(1), stopwatch(), logs_->journal("CachedSLEs")) , networkIDService_(std::make_unique(config_->NETWORK_ID)) , validatorKeys_(*config_, m_journal) , m_resourceManager( Resource::make_Manager(m_collectorManager->collector(), logs_->journal("Resource"))) , m_nodeStore(m_shaMapStore->makeNodeStore( config_->PREFETCH_WORKERS > 0 ? config_->PREFETCH_WORKERS : 4)) , nodeFamily_(*this, *m_collectorManager) , m_orderBookDB(make_OrderBookDB( *this, {.pathSearchMax = config_->PATH_SEARCH_MAX, .standalone = config_->standalone()})) , m_pathRequestManager( std::make_unique( *this, logs_->journal("PathRequest"), m_collectorManager->collector())) , m_ledgerMaster( std::make_unique( *this, stopwatch(), m_collectorManager->collector(), logs_->journal("LedgerMaster"))) , ledgerCleaner_(make_LedgerCleaner(*this, logs_->journal("LedgerCleaner"))) // VFALCO NOTE must come before NetworkOPs to prevent a crash due // to dependencies in the destructor. // , m_inboundLedgers(make_InboundLedgers(*this, stopwatch(), m_collectorManager->collector())) , m_inboundTransactions(make_InboundTransactions( *this, m_collectorManager->collector(), [this](std::shared_ptr const& set, bool fromAcquire) { gotTXSet(set, fromAcquire); })) , m_ledgerReplayer( std::make_unique( *this, *m_inboundLedgers, make_PeerSetBuilder(*this))) , m_acceptedLedgerCache( "AcceptedLedger", 4, std::chrono::minutes{1}, stopwatch(), logs_->journal("TaggedCache")) , m_networkOPs(make_NetworkOPs( *this, stopwatch(), config_->standalone(), config_->NETWORK_QUORUM, config_->START_VALID, *m_jobQueue, *m_ledgerMaster, validatorKeys_, get_io_context(), logs_->journal("NetworkOPs"), m_collectorManager->collector())) , cluster_(std::make_unique(logs_->journal("Overlay"))) , peerReservations_( std::make_unique(logs_->journal("PeerReservationTable"))) , validatorManifests_(std::make_unique(logs_->journal("ManifestCache"))) , publisherManifests_(std::make_unique(logs_->journal("ManifestCache"))) , validators_( std::make_unique( *validatorManifests_, *publisherManifests_, *timeKeeper_, config_->legacy("database_path"), logs_->journal("ValidatorList"), config_->VALIDATION_QUORUM)) , validatorSites_(std::make_unique(*this)) , serverHandler_(make_ServerHandler( *this, get_io_context(), *m_jobQueue, *m_networkOPs, *m_resourceManager, *m_collectorManager)) , mFeeTrack(std::make_unique(logs_->journal("LoadManager"))) , hashRouter_(std::make_unique(setup_HashRouter(*config_), stopwatch())) , mValidations(ValidationParms(), stopwatch(), *this, logs_->journal("Validations")) , m_loadManager(make_LoadManager(*this, logs_->journal("LoadManager"))) , txQ_(std::make_unique(setup_TxQ(*config_), logs_->journal("TxQ"))) , sweepTimer_(get_io_context()) , entropyTimer_(get_io_context()) , m_signals(get_io_context()) , checkSigs_(true) , m_resolver(ResolverAsio::New(get_io_context(), logs_->journal("Resolver"))) , m_io_latency_sampler( m_collectorManager->collector()->make_event("ios_latency"), logs_->journal("Application"), std::chrono::milliseconds(100), get_io_context()) , grpcServer_(std::make_unique(*this)) { initAccountIdCache(config_->getValueFor(SizedItem::accountIdCacheSize)); add(m_resourceManager.get()); // // VFALCO - READ THIS! // // Do not start threads, open sockets, or do any sort of "real work" // inside the constructor. Put it in start instead. Or if you must, // put it in setup (but everything in setup should be moved to start // anyway. // // The reason is that the unit tests require an Application object to // be created. But we don't actually start all the threads, sockets, // and services when running the unit tests. Therefore anything which // needs to be stopped will not get stopped correctly if it is // started in this constructor. // add(ledgerCleaner_.get()); } //-------------------------------------------------------------------------- bool setup(boost::program_options::variables_map const& cmdline) override; void start(bool withTimers) override; void run() override; void signalStop(std::string msg) override; bool checkSigs() const override; void checkSigs(bool) override; bool isStopping() const override; int fdRequired() const override; //-------------------------------------------------------------------------- std::uint64_t instanceID() const override { return instanceCookie_; } Logs& getLogs() override { return *logs_; } Config& config() override { return *config_; } CollectorManager& getCollectorManager() override { return *m_collectorManager; } Family& getNodeFamily() override { return nodeFamily_; } TimeKeeper& getTimeKeeper() override { return *timeKeeper_; } JobQueue& getJobQueue() override { return *m_jobQueue; } std::pair const& nodeIdentity() override { if (nodeIdentity_) return *nodeIdentity_; LogicError("Accessing Application::nodeIdentity() before it is initialized."); } std::optional getValidationPublicKey() const override { if (!validatorKeys_.keys) return {}; return validatorKeys_.keys->publicKey; } NetworkOPs& getOPs() override { return *m_networkOPs; } ServerHandler& getServerHandler() override { XRPL_ASSERT( serverHandler_, "xrpl::ApplicationImp::getServerHandler : non-null server " "handle"); return *serverHandler_; } boost::asio::io_context& getIOContext() override { return get_io_context(); } std::chrono::milliseconds getIOLatency() override { return m_io_latency_sampler.get(); } LedgerMaster& getLedgerMaster() override { return *m_ledgerMaster; } LedgerCleaner& getLedgerCleaner() override { return *ledgerCleaner_; } LedgerReplayer& getLedgerReplayer() override { return *m_ledgerReplayer; } InboundLedgers& getInboundLedgers() override { return *m_inboundLedgers; } InboundTransactions& getInboundTransactions() override { return *m_inboundTransactions; } TaggedCache& getAcceptedLedgerCache() override { return m_acceptedLedgerCache; } void gotTXSet(std::shared_ptr const& set, bool fromAcquire) const { if (set) m_networkOPs->mapComplete(set, fromAcquire); } TransactionMaster& getMasterTransaction() override { return m_txMaster; } perf::PerfLog& getPerfLog() override { return *perfLog_; } telemetry::Telemetry& getTelemetry() override { return *telemetry_; } NodeCache& getTempNodeCache() override { return m_tempNodeCache; } NodeStore::Database& getNodeStore() override { return *m_nodeStore; } Application::MutexType& getMasterMutex() override { return m_masterMutex; } LoadManager& getLoadManager() override { return *m_loadManager; } Resource::Manager& getResourceManager() override { return *m_resourceManager; } OrderBookDB& getOrderBookDB() override { return *m_orderBookDB; } PathRequestManager& getPathRequestManager() override { return *m_pathRequestManager; } CachedSLEs& getCachedSLEs() override { return cachedSLEs_; } NetworkIDService& getNetworkIDService() override { return *networkIDService_; } AmendmentTable& getAmendmentTable() override { return *m_amendmentTable; } LoadFeeTrack& getFeeTrack() override { return *mFeeTrack; } HashRouter& getHashRouter() override { return *hashRouter_; } RCLValidations& getValidations() override { return mValidations; } ValidatorList& getValidators() override { return *validators_; } ValidatorSite& getValidatorSites() override { return *validatorSites_; } ManifestCache& getValidatorManifests() override { return *validatorManifests_; } ManifestCache& getPublisherManifests() override { return *publisherManifests_; } Cluster& getCluster() override { return *cluster_; } PeerReservationTable& getPeerReservations() override { return *peerReservations_; } SHAMapStore& getSHAMapStore() override { return *m_shaMapStore; } PendingSaves& getPendingSaves() override { return pendingSaves_; } OpenLedger& getOpenLedger() override { return *openLedger_; // NOLINT(bugprone-unchecked-optional-access) emplaced during // initialization before any caller } OpenLedger const& getOpenLedger() const override { return *openLedger_; // NOLINT(bugprone-unchecked-optional-access) emplaced during // initialization before any caller } Overlay& getOverlay() override { XRPL_ASSERT(overlay_, "xrpl::ApplicationImp::overlay : non-null overlay"); return *overlay_; // NOLINT(bugprone-unchecked-optional-access) assert above } TxQ& getTxQ() override { XRPL_ASSERT(txQ_, "xrpl::ApplicationImp::getTxQ : non-null transaction queue"); return *txQ_; // NOLINT(bugprone-unchecked-optional-access) assert above } RelationalDatabase& getRelationalDatabase() override { XRPL_ASSERT( relationalDatabase_, "xrpl::ApplicationImp::getRelationalDatabase : non-null relational database"); return *relationalDatabase_; // NOLINT(bugprone-unchecked-optional-access) assert above } DatabaseCon& getWalletDB() override { XRPL_ASSERT(mWalletDB, "xrpl::ApplicationImp::getWalletDB : non-null wallet database"); return *mWalletDB; } bool serverOkay(std::string& reason) override; beast::Journal getJournal(std::string const& name) override; //-------------------------------------------------------------------------- bool initRelationalDatabase() { XRPL_ASSERT( mWalletDB.get() == nullptr, "xrpl::ApplicationImp::initRelationalDatabase : null wallet " "database"); try { relationalDatabase_.emplace(setup_RelationalDatabase(*this, *config_, *m_jobQueue)); // wallet database auto setup = setup_DatabaseCon(*config_, m_journal); setup.useGlobalPragma = false; mWalletDB = makeWalletDB(setup, m_journal); } catch (std::exception const& e) { JLOG(m_journal.fatal()) << "Failed to initialize SQL databases: " << e.what(); return false; } return true; } bool initNodeStore() const { if (config_->doImport) { auto j = logs_->journal("NodeObject"); NodeStore::DummyScheduler dummyScheduler; std::unique_ptr source = NodeStore::Manager::instance().make_Database( megabytes(config_->getValueFor(SizedItem::burstSize, std::nullopt)), dummyScheduler, 0, config_->section(ConfigSection::importNodeDatabase()), j); JLOG(j.warn()) << "Starting node import from '" << source->getName() << "' to '" << m_nodeStore->getName() << "'."; using namespace std::chrono; auto const start = steady_clock::now(); m_nodeStore->importDatabase(*source); auto const elapsed = duration_cast(steady_clock::now() - start); JLOG(j.warn()) << "Node import from '" << source->getName() << "' took " << elapsed.count() << " seconds."; } return true; } //-------------------------------------------------------------------------- // // PropertyStream // void onWrite(beast::PropertyStream::Map& stream) override { } //-------------------------------------------------------------------------- void setSweepTimer() { // Only start the timer if waitHandlerCounter_ is not yet joined. if (auto optionalCountedHandler = waitHandlerCounter_.wrap([this](boost::system::error_code const& e) { if (e.value() == boost::system::errc::success) { m_jobQueue->addJob(jtSWEEP, "sweep", [this]() { doSweep(); }); } // Recover as best we can if an unexpected error occurs. if (e.value() != boost::system::errc::success && e.value() != boost::asio::error::operation_aborted) { // Try again later and hope for the best. JLOG(m_journal.error()) << "Sweep timer got error '" << e.message() << "'. Restarting timer."; setSweepTimer(); } })) { using namespace std::chrono; sweepTimer_.expires_after( seconds{config_->SWEEP_INTERVAL.value_or( config_->getValueFor(SizedItem::sweepInterval))}); sweepTimer_.async_wait(std::move(*optionalCountedHandler)); } } void setEntropyTimer() { // Only start the timer if waitHandlerCounter_ is not yet joined. if (auto optionalCountedHandler = waitHandlerCounter_.wrap([this](boost::system::error_code const& e) { if (e.value() == boost::system::errc::success) { crypto_prng().mix_entropy(); setEntropyTimer(); } // Recover as best we can if an unexpected error occurs. if (e.value() != boost::system::errc::success && e.value() != boost::asio::error::operation_aborted) { // Try again later and hope for the best. JLOG(m_journal.error()) << "Entropy timer got error '" << e.message() << "'. Restarting timer."; setEntropyTimer(); } })) { using namespace std::chrono_literals; entropyTimer_.expires_after(5min); entropyTimer_.async_wait(std::move(*optionalCountedHandler)); } } void doSweep() { XRPL_ASSERT( relationalDatabase_, "xrpl::ApplicationImp::doSweep : non-null relational database"); // NOLINTNEXTLINE(bugprone-unchecked-optional-access) assert above if (!config_->standalone() && !relationalDatabase_->transactionDbHasSpace(*config_)) { signalStop("Out of transaction DB space"); } // VFALCO NOTE Does the order of calls matter? // VFALCO TODO fix the dependency inversion using an observer, // have listeners register for "onSweep ()" notification. { std::shared_ptr const fullBelowCache = nodeFamily_.getFullBelowCache(); std::shared_ptr const treeNodeCache = nodeFamily_.getTreeNodeCache(); std::size_t const oldFullBelowSize = fullBelowCache->size(); std::size_t const oldTreeNodeSize = treeNodeCache->size(); nodeFamily_.sweep(); JLOG(m_journal.debug()) << "NodeFamily::FullBelowCache sweep. Size before: " << oldFullBelowSize << "; size after: " << fullBelowCache->size(); JLOG(m_journal.debug()) << "NodeFamily::TreeNodeCache sweep. Size before: " << oldTreeNodeSize << "; size after: " << treeNodeCache->size(); } { TaggedCache const& masterTxCache = getMasterTransaction().getCache(); std::size_t const oldMasterTxSize = masterTxCache.size(); getMasterTransaction().sweep(); JLOG(m_journal.debug()) << "MasterTransaction sweep. Size before: " << oldMasterTxSize << "; size after: " << masterTxCache.size(); } { std::size_t const oldLedgerMasterCacheSize = getLedgerMaster().getFetchPackCacheSize(); getLedgerMaster().sweep(); JLOG(m_journal.debug()) << "LedgerMaster sweep. Size before: " << oldLedgerMasterCacheSize << "; size after: " << getLedgerMaster().getFetchPackCacheSize(); } { // NodeCache == TaggedCache std::size_t const oldTempNodeCacheSize = getTempNodeCache().size(); getTempNodeCache().sweep(); JLOG(m_journal.debug()) << "TempNodeCache sweep. Size before: " << oldTempNodeCacheSize << "; size after: " << getTempNodeCache().size(); } { std::size_t const oldCurrentCacheSize = getValidations().sizeOfCurrentCache(); std::size_t const oldSizeSeqEnforcesSize = getValidations().sizeOfSeqEnforcersCache(); std::size_t const oldByLedgerSize = getValidations().sizeOfByLedgerCache(); std::size_t const oldBySequenceSize = getValidations().sizeOfBySequenceCache(); getValidations().expire(m_journal); JLOG(m_journal.debug()) << "Validations Current expire. Size before: " << oldCurrentCacheSize << "; size after: " << getValidations().sizeOfCurrentCache(); JLOG(m_journal.debug()) << "Validations SeqEnforcer expire. Size before: " << oldSizeSeqEnforcesSize << "; size after: " << getValidations().sizeOfSeqEnforcersCache(); JLOG(m_journal.debug()) << "Validations ByLedger expire. Size before: " << oldByLedgerSize << "; size after: " << getValidations().sizeOfByLedgerCache(); JLOG(m_journal.debug()) << "Validations BySequence expire. Size before: " << oldBySequenceSize << "; size after: " << getValidations().sizeOfBySequenceCache(); } { std::size_t const oldInboundLedgersSize = getInboundLedgers().cacheSize(); getInboundLedgers().sweep(); JLOG(m_journal.debug()) << "InboundLedgers sweep. Size before: " << oldInboundLedgersSize << "; size after: " << getInboundLedgers().cacheSize(); } { size_t const oldTasksSize = getLedgerReplayer().tasksSize(); size_t const oldDeltasSize = getLedgerReplayer().deltasSize(); size_t const oldSkipListsSize = getLedgerReplayer().skipListsSize(); getLedgerReplayer().sweep(); JLOG(m_journal.debug()) << "LedgerReplayer tasks sweep. Size before: " << oldTasksSize << "; size after: " << getLedgerReplayer().tasksSize(); JLOG(m_journal.debug()) << "LedgerReplayer deltas sweep. Size before: " << oldDeltasSize << "; size after: " << getLedgerReplayer().deltasSize(); JLOG(m_journal.debug()) << "LedgerReplayer skipLists sweep. Size before: " << oldSkipListsSize << "; size after: " << getLedgerReplayer().skipListsSize(); } { std::size_t const oldAcceptedLedgerSize = m_acceptedLedgerCache.size(); m_acceptedLedgerCache.sweep(); JLOG(m_journal.debug()) << "AcceptedLedgerCache sweep. Size before: " << oldAcceptedLedgerSize << "; size after: " << m_acceptedLedgerCache.size(); } { std::size_t const oldCachedSLEsSize = cachedSLEs_.size(); cachedSLEs_.sweep(); JLOG(m_journal.debug()) << "CachedSLEs sweep. Size before: " << oldCachedSLEsSize << "; size after: " << cachedSLEs_.size(); } mallocTrim("doSweep", m_journal); // Set timer to do another sweep later. setSweepTimer(); } LedgerIndex getMaxDisallowedLedger() override { return maxDisallowedLedger_; } std::optional const& getTrapTxID() const override { return trapTxID_; } size_t getNumberOfThreads() const override { return get_number_of_threads(); } private: // For a newly-started validator, this is the greatest persisted ledger // and new validations must be greater than this. std::atomic maxDisallowedLedger_{0}; void startGenesisLedger(); std::shared_ptr getLastFullLedger(); std::shared_ptr loadLedgerFromFile(std::string const& ledgerID); bool loadOldLedger( std::string const& ledgerID, bool replay, bool isFilename, std::optional trapTxID); void setMaxDisallowedLedger(); Application& getApp() override { return *this; } }; //------------------------------------------------------------------------------ // TODO Break this up into smaller, more digestible initialization segments. bool ApplicationImp::setup(boost::program_options::variables_map const& cmdline) { // We want to intercept CTRL-C and the standard termination signal SIGTERM // and terminate the process. This handler will NEVER be invoked twice. // // Note that async_wait is "one-shot": for each call, the handler will be // invoked exactly once, either when one of the registered signals in the // signal set occurs or the signal set is cancelled. Subsequent signals are // effectively ignored (technically, they are queued up, waiting for a call // to async_wait). m_signals.add(SIGINT); m_signals.add(SIGTERM); m_signals.async_wait([this](boost::system::error_code const& ec, int signum) { // Indicates the signal handler has been aborted; do nothing if (ec == boost::asio::error::operation_aborted) return; JLOG(m_journal.info()) << "Received signal " << signum; if (signum == SIGTERM || signum == SIGINT) signalStop("Signal: " + to_string(signum)); }); auto debug_log = config_->getDebugLogFile(); if (!debug_log.empty()) { // Let debug messages go to the file but only WARNING or higher to // regular output (unless verbose) if (!logs_->open(debug_log)) std::cerr << "Can't open log file " << debug_log << '\n'; using namespace beast::severities; if (logs_->threshold() > kDebug) logs_->threshold(kDebug); } JLOG(m_journal.info()) << "Process starting: " << BuildInfo::getFullVersionString() << ", Instance Cookie: " << instanceCookie_; if (numberOfThreads(*config_) < 2) { JLOG(m_journal.warn()) << "Limited to a single I/O service thread by " "system configuration."; } // Optionally turn off logging to console. logs_->silent(config_->silent()); if (!initRelationalDatabase() || !initNodeStore()) return false; if (!peerReservations_->load(getWalletDB())) { JLOG(m_journal.fatal()) << "Cannot find peer reservations!"; return false; } if (validatorKeys_.keys) setMaxDisallowedLedger(); // Configure the amendments the server supports { auto const supported = []() { auto const& amendments = detail::supportedAmendments(); std::vector supported; supported.reserve(amendments.size()); for (auto const& [a, vote] : amendments) { auto const f = xrpl::getRegisteredFeature(a); XRPL_ASSERT(f, "xrpl::ApplicationImp::setup : registered feature"); if (f) supported.emplace_back(a, *f, vote); } return supported; }(); Section const& downVoted = config_->section(SECTION_VETO_AMENDMENTS); Section const& upVoted = config_->section(SECTION_AMENDMENTS); m_amendmentTable = make_AmendmentTable( *this, config().AMENDMENT_MAJORITY_TIME, supported, upVoted, downVoted, logs_->journal("Amendments")); } Pathfinder::initPathTable(); auto const startUp = config_->START_UP; JLOG(m_journal.debug()) << "startUp: " << startUp; if (startUp == StartUpType::Fresh) { JLOG(m_journal.info()) << "Starting new Ledger"; startGenesisLedger(); } else if ( startUp == StartUpType::Load || startUp == StartUpType::LoadFile || startUp == StartUpType::Replay) { JLOG(m_journal.info()) << "Loading specified Ledger"; if (!loadOldLedger( config_->START_LEDGER, startUp == StartUpType::Replay, startUp == StartUpType::LoadFile, config_->TRAP_TX_HASH)) { JLOG(m_journal.error()) << "The specified ledger could not be loaded."; if (config_->FAST_LOAD) { // Fall back to syncing from the network, such as // when there's no existing data. startGenesisLedger(); } else { return false; } } } else if (startUp == StartUpType::Network) { // This should probably become the default once we have a stable // network. if (!config_->standalone()) m_networkOPs->setNeedNetworkLedger(); startGenesisLedger(); } else { startGenesisLedger(); } if (auto const& forcedRange = config().FORCED_LEDGER_RANGE_PRESENT) { m_ledgerMaster->setLedgerRangePresent(forcedRange->first, forcedRange->second); } m_orderBookDB->setup(getLedgerMaster().getCurrentLedger()); nodeIdentity_ = getNodeIdentity(*this, cmdline); // Now that the node identity is known, inject it into the telemetry // resource attributes — but only if the user didn't already set a // custom service_instance_id in [telemetry]. The Telemetry object // was constructed with an empty serviceInstanceId because // nodeIdentity_ is not available in the member initializer list. if (!config_->section("telemetry").exists("service_instance_id")) telemetry_->setServiceInstanceId(toBase58(TokenType::NodePublic, nodeIdentity_->first)); if (!cluster_->load(config().section(SECTION_CLUSTER_NODES))) { JLOG(m_journal.fatal()) << "Invalid entry in cluster configuration."; return false; } { if (validatorKeys_.configInvalid()) return false; if (!validatorManifests_->load( getWalletDB(), "ValidatorManifests", validatorKeys_.manifest, config().section(SECTION_VALIDATOR_KEY_REVOCATION).values())) { JLOG(m_journal.fatal()) << "Invalid configured validator manifest."; return false; } publisherManifests_->load(getWalletDB(), "PublisherManifests"); // It is possible to have a valid ValidatorKeys object without // setting the signingKey or masterKey. This occurs if the // configuration file does not have either // SECTION_VALIDATOR_TOKEN or SECTION_VALIDATION_SEED section. // masterKey for the configuration-file specified validator keys std::optional localSigningKey; if (validatorKeys_.keys) localSigningKey = validatorKeys_.keys->publicKey; // Setup trusted validators if (!validators_->load( localSigningKey, config().section(SECTION_VALIDATORS).values(), config().section(SECTION_VALIDATOR_LIST_KEYS).values(), config().VALIDATOR_LIST_THRESHOLD)) { JLOG(m_journal.fatal()) << "Invalid entry in validator configuration."; return false; } } if (!validatorSites_->load(config().section(SECTION_VALIDATOR_LIST_SITES).values())) { JLOG(m_journal.fatal()) << "Invalid entry in [" << SECTION_VALIDATOR_LIST_SITES << "]"; return false; } // Tell the AmendmentTable who the trusted validators are. m_amendmentTable->trustChanged(validators_->getQuorumKeys().second); //---------------------------------------------------------------------- // // Server // //---------------------------------------------------------------------- // VFALCO NOTE Unfortunately, in stand-alone mode some code still // foolishly calls overlay(). When this is fixed we can // move the instantiation inside a conditional: // // if (!config_.standalone()) overlay_ = make_Overlay( *this, setup_Overlay(*config_), *serverHandler_, *m_resourceManager, *m_resolver, get_io_context(), *config_, m_collectorManager->collector()); add(*overlay_); // add to PropertyStream // start first consensus round if (!m_networkOPs->beginConsensus(m_ledgerMaster->getClosedLedger()->header().hash, {})) { JLOG(m_journal.fatal()) << "Unable to start consensus"; return false; } { try { beast::logstream logStream{m_journal.error()}; auto setup = setup_ServerHandler(*config_, logStream); setup.makeContexts(); serverHandler_->setup(setup, m_journal); fixConfigPorts(*config_, serverHandler_->endpoints()); } catch (std::exception const& e) { if (auto stream = m_journal.fatal()) { stream << "Unable to setup server handler"; if (std::strlen(e.what()) > 0) stream << ": " << e.what(); } return false; } } // Begin connecting to network. if (!config_->standalone()) { // Should this message be here, conceptually? In theory this sort // of message, if displayed, should be displayed from PeerFinder. if (config_->PEER_PRIVATE && config_->IPS_FIXED.empty()) { JLOG(m_journal.warn()) << "No outbound peer connections will be made"; } // VFALCO NOTE the state timer resets the deadlock detector. // m_networkOPs->setStateTimer(); } else { JLOG(m_journal.warn()) << "Running in standalone mode"; m_networkOPs->setStandAlone(); } if (config_->canSign()) { JLOG(m_journal.warn()) << "*** The server is configured to allow the " "'sign' and 'sign_for'"; JLOG(m_journal.warn()) << "*** commands. These commands have security " "implications and have"; JLOG(m_journal.warn()) << "*** been deprecated. They will be removed " "in a future release of"; JLOG(m_journal.warn()) << "*** xrpld."; JLOG(m_journal.warn()) << "*** If you do not use them to sign " "transactions please edit your"; JLOG(m_journal.warn()) << "*** configuration file and remove the [enable_signing] stanza."; JLOG(m_journal.warn()) << "*** If you do use them to sign transactions " "please migrate to a"; JLOG(m_journal.warn()) << "*** standalone signing solution as soon as possible."; } // // Execute start up rpc commands. // for (auto const& cmd : config_->section(SECTION_RPC_STARTUP).lines()) { Json::Reader jrReader; Json::Value jvCommand; if (!jrReader.parse(cmd, jvCommand)) { JLOG(m_journal.fatal()) << "Couldn't parse entry in [" << SECTION_RPC_STARTUP << "]: '" << cmd; } if (!config_->quiet()) { JLOG(m_journal.fatal()) << "Startup RPC: " << jvCommand << std::endl; } Resource::Charge loadType = Resource::feeReferenceRPC; Resource::Consumer c; RPC::JsonContext context{ {.j = getJournal("RPCHandler"), .app = *this, .loadType = loadType, .netOps = getOPs(), .ledgerMaster = getLedgerMaster(), .consumer = c, .role = Role::ADMIN, .coro = {}, .infoSub = {}, .apiVersion = RPC::apiMaximumSupportedVersion}, jvCommand}; Json::Value jvResult; RPC::doCommand(context, jvResult); if (!config_->quiet()) { JLOG(m_journal.fatal()) << "Result: " << jvResult << std::endl; } } validatorSites_->start(); return true; } void ApplicationImp::start(bool withTimers) { JLOG(m_journal.info()) << "Application starting. Version is " << BuildInfo::getVersionString(); if (withTimers) { setSweepTimer(); setEntropyTimer(); } m_io_latency_sampler.start(); m_resolver->start(); m_loadManager->start(); m_shaMapStore->start(); if (overlay_) overlay_->start(); if (grpcServer_->start()) fixConfigPorts(*config_, {{SECTION_PORT_GRPC, grpcServer_->getEndpoint()}}); ledgerCleaner_->start(); perfLog_->start(); telemetry_->start(); } void ApplicationImp::run() { if (!config_->standalone()) { // VFALCO NOTE This seems unnecessary. If we properly refactor the load // manager then the stall detector can just always be // "armed" // getLoadManager().activateStallDetector(); } isTimeToStop.wait(false, std::memory_order_relaxed); JLOG(m_journal.debug()) << "Application stopping"; m_io_latency_sampler.cancel_async(); // VFALCO Enormous hack, we have to force the probe to cancel // before we stop the io_context queue or else it never // unblocks in its destructor. The fix is to make all // io_objects gracefully handle exit so that we can // naturally return from io_context::run() instead of // forcing a call to io_context::stop() m_io_latency_sampler.cancel(); m_resolver->stop_async(); // NIKB This is a hack - we need to wait for the resolver to // stop. before we stop the io_server_queue or weird // things will happen. m_resolver->stop(); { try { sweepTimer_.cancel(); } catch (boost::system::system_error const& e) { JLOG(m_journal.error()) << "Application: sweepTimer cancel error: " << e.what(); } try { entropyTimer_.cancel(); } catch (boost::system::system_error const& e) { JLOG(m_journal.error()) << "Application: entropyTimer cancel error: " << e.what(); } } // Make sure that any waitHandlers pending in our timers are done // before we declare ourselves stopped. using namespace std::chrono_literals; waitHandlerCounter_.join("Application", 1s, m_journal); mValidations.flush(); validatorSites_->stop(); // TODO Store manifests in manifests.sqlite instead of wallet.db validatorManifests_->save(getWalletDB(), "ValidatorManifests", [this](PublicKey const& pubKey) { return getValidators().listed(pubKey); }); publisherManifests_->save(getWalletDB(), "PublisherManifests", [this](PublicKey const& pubKey) { return getValidators().trustedPublisher(pubKey); }); // The order of these stop calls is delicate. // Re-ordering them risks undefined behavior. m_loadManager->stop(); m_shaMapStore->stop(); m_jobQueue->stop(); if (overlay_) overlay_->stop(); grpcServer_->stop(); m_networkOPs->stop(); serverHandler_->stop(); m_ledgerReplayer->stop(); m_inboundTransactions->stop(); m_inboundLedgers->stop(); ledgerCleaner_->stop(); m_nodeStore->stop(); perfLog_->stop(); telemetry_->stop(); JLOG(m_journal.info()) << "Done."; } void ApplicationImp::signalStop(std::string msg) { if (!isTimeToStop.test_and_set(std::memory_order_acquire)) { if (msg.empty()) { JLOG(m_journal.warn()) << "Server stopping"; } else JLOG(m_journal.warn()) << "Server stopping: " << msg; isTimeToStop.notify_all(); } } bool ApplicationImp::checkSigs() const { return checkSigs_; } void ApplicationImp::checkSigs(bool check) { checkSigs_ = check; } bool ApplicationImp::isStopping() const { return isTimeToStop.test(std::memory_order_relaxed); } int ApplicationImp::fdRequired() const { // Standard handles, config file, misc I/O etc: int needed = 128; // 2x the configured peer limit for peer connections: if (overlay_) needed += 2 * overlay_->limit(); // the number of fds needed by the backend (internally // doubled if online delete is enabled). needed += std::max(5, m_shaMapStore->fdRequired()); // One fd per incoming connection a port can accept, or // if no limit is set, assume it'll handle 256 clients. for (auto const& p : serverHandler_->setup().ports) needed += std::max(256, p.limit); // The minimum number of file descriptors we need is 1024: return std::max(1024, needed); } //------------------------------------------------------------------------------ void ApplicationImp::startGenesisLedger() { std::vector const initialAmendments = (config_->START_UP == StartUpType::Fresh) ? m_amendmentTable->getDesired() : std::vector{}; std::shared_ptr const genesis = std::make_shared( create_genesis, Rules{config_->features}, config_->FEES.toFees(), initialAmendments, nodeFamily_); m_ledgerMaster->storeLedger(genesis); auto const next = std::make_shared(*genesis, getTimeKeeper().closeTime()); next->updateSkipList(); XRPL_ASSERT( next->header().seq < XRP_LEDGER_EARLIEST_FEES || next->read(keylet::fees()), "xrpl::ApplicationImp::startGenesisLedger : valid ledger fees"); next->setImmutable(); openLedger_.emplace(next, cachedSLEs_, logs_->journal("OpenLedger")); m_ledgerMaster->storeLedger(next); m_ledgerMaster->switchLCL(next); } std::shared_ptr ApplicationImp::getLastFullLedger() { auto j = getJournal("Ledger"); try { auto const [ledger, seq, hash] = getLatestLedger(Rules{config_->features}, config_->FEES.toFees(), *this); if (!ledger) return ledger; XRPL_ASSERT( ledger->header().seq < XRP_LEDGER_EARLIEST_FEES || ledger->read(keylet::fees()), "xrpl::ApplicationImp::getLastFullLedger : valid ledger fees"); ledger->setImmutable(); if (getLedgerMaster().haveLedger(seq)) ledger->setValidated(); if (ledger->header().hash == hash) { JLOG(j.trace()) << "Loaded ledger: " << hash; return ledger; } if (auto stream = j.error()) { stream << "Failed on ledger"; Json::Value p; addJson(p, {*ledger, nullptr, LedgerFill::full}); stream << p; } return {}; } catch (SHAMapMissingNode const& mn) { JLOG(j.warn()) << "Ledger in database: " << mn.what(); return {}; } } std::shared_ptr ApplicationImp::loadLedgerFromFile(std::string const& name) { try { std::ifstream ledgerFile(name, std::ios::in); if (!ledgerFile) { JLOG(m_journal.fatal()) << "Unable to open file '" << name << "'"; return nullptr; } Json::Reader reader; Json::Value jLedger; if (!reader.parse(ledgerFile, jLedger)) { JLOG(m_journal.fatal()) << "Unable to parse ledger JSON"; return nullptr; } std::reference_wrapper ledger(jLedger); // accept a wrapped ledger if (ledger.get().isMember("result")) ledger = ledger.get()["result"]; if (ledger.get().isMember("ledger")) ledger = ledger.get()["ledger"]; std::uint32_t seq = 1; auto closeTime = getTimeKeeper().closeTime(); using namespace std::chrono_literals; auto closeTimeResolution = 30s; bool closeTimeEstimated = false; std::uint64_t totalDrops = 0; if (ledger.get().isMember("accountState")) { if (ledger.get().isMember(jss::ledger_index)) { seq = ledger.get()[jss::ledger_index].asUInt(); } if (ledger.get().isMember("close_time")) { using tp = NetClock::time_point; using d = tp::duration; closeTime = tp{d{ledger.get()["close_time"].asUInt()}}; } if (ledger.get().isMember("close_time_resolution")) { using namespace std::chrono; closeTimeResolution = seconds{ledger.get()["close_time_resolution"].asUInt()}; } if (ledger.get().isMember("close_time_estimated")) { closeTimeEstimated = ledger.get()["close_time_estimated"].asBool(); } if (ledger.get().isMember("total_coins")) { totalDrops = beast::lexicalCastThrow(ledger.get()["total_coins"].asString()); } ledger = ledger.get()["accountState"]; } if (!ledger.get().isArrayOrNull()) { JLOG(m_journal.fatal()) << "State nodes must be an array"; return nullptr; } auto loadLedger = std::make_shared( seq, closeTime, Rules{config_->features}, config_->FEES.toFees(), nodeFamily_); loadLedger->setTotalDrops(totalDrops); for (Json::UInt index = 0; index < ledger.get().size(); ++index) { Json::Value& entry = ledger.get()[index]; if (!entry.isObjectOrNull()) { JLOG(m_journal.fatal()) << "Invalid entry in ledger"; return nullptr; } uint256 uIndex; if (!uIndex.parseHex(entry[jss::index].asString())) { JLOG(m_journal.fatal()) << "Invalid entry in ledger"; return nullptr; } entry.removeMember(jss::index); STParsedJSONObject stp("sle", ledger.get()[index]); if (!stp.object || uIndex.isZero()) { JLOG(m_journal.fatal()) << "Invalid entry in ledger"; return nullptr; } // VFALCO TODO This is the only place that // constructor is used, try to remove it STLedgerEntry const sle(*stp.object, uIndex); if (!loadLedger->addSLE(sle)) { JLOG(m_journal.fatal()) << "Couldn't add serialized ledger: " << uIndex; return nullptr; } } loadLedger->stateMap().flushDirty(hotACCOUNT_NODE); XRPL_ASSERT( loadLedger->header().seq < XRP_LEDGER_EARLIEST_FEES || loadLedger->read(keylet::fees()), "xrpl::ApplicationImp::loadLedgerFromFile : valid ledger fees"); loadLedger->setAccepted(closeTime, closeTimeResolution, !closeTimeEstimated); return loadLedger; } catch (std::exception const& x) { JLOG(m_journal.fatal()) << "Ledger contains invalid data: " << x.what(); return nullptr; } } bool ApplicationImp::loadOldLedger( std::string const& ledgerID, bool replay, bool isFileName, std::optional trapTxID) { try { std::shared_ptr loadLedger, replayLedger; if (isFileName) { if (!ledgerID.empty()) loadLedger = loadLedgerFromFile(ledgerID); } else if (ledgerID.length() == 64) { uint256 hash; if (hash.parseHex(ledgerID)) { loadLedger = loadByHash(hash, Rules{config_->features}, config_->FEES.toFees(), *this); if (!loadLedger) { // Try to build the ledger from the back end auto il = std::make_shared( *this, hash, 0, InboundLedger::Reason::GENERIC, stopwatch(), make_DummyPeerSet(*this)); if (il->checkLocal()) loadLedger = il->getLedger(); } } } else if (ledgerID.empty() || boost::iequals(ledgerID, "latest")) { loadLedger = getLastFullLedger(); } else { // assume by sequence std::uint32_t index = 0; if (beast::lexicalCastChecked(index, ledgerID)) { loadLedger = loadByIndex(index, Rules{config_->features}, config_->FEES.toFees(), *this); } } if (!loadLedger) return false; if (replay) { // Replay a ledger close with same prior ledger and transactions // this ledger holds the transactions we want to replay replayLedger = loadLedger; JLOG(m_journal.info()) << "Loading parent ledger"; loadLedger = loadByHash( replayLedger->header().parentHash, Rules{config_->features}, config_->FEES.toFees(), *this); if (!loadLedger) { JLOG(m_journal.info()) << "Loading parent ledger from node store"; // Try to build the ledger from the back end auto il = std::make_shared( *this, replayLedger->header().parentHash, 0, InboundLedger::Reason::GENERIC, stopwatch(), make_DummyPeerSet(*this)); if (il->checkLocal()) loadLedger = il->getLedger(); if (!loadLedger) { // LCOV_EXCL_START JLOG(m_journal.fatal()) << "Replay ledger missing/damaged"; UNREACHABLE( "xrpl::ApplicationImp::loadOldLedger : replay ledger " "missing/damaged"); return false; // LCOV_EXCL_STOP } } } using namespace std::chrono_literals; using namespace date; static constexpr NetClock::time_point ledgerWarnTimePoint{ sys_days{January / 1 / 2018} - sys_days{January / 1 / 2000}}; if (loadLedger->header().closeTime < ledgerWarnTimePoint) { JLOG(m_journal.fatal()) << "\n\n*** WARNING ***\n" "You are replaying a ledger from before " << to_string(ledgerWarnTimePoint) << " UTC.\n" "This replay will not handle your ledger as it was " "originally " "handled.\nConsider running an earlier version of xrpld " "to " "get the older rules.\n*** CONTINUING ***\n"; } JLOG(m_journal.info()) << "Loading ledger " << loadLedger->header().hash << " seq:" << loadLedger->header().seq; if (loadLedger->header().accountHash.isZero()) { // LCOV_EXCL_START JLOG(m_journal.fatal()) << "Ledger is empty."; UNREACHABLE("xrpl::ApplicationImp::loadOldLedger : ledger is empty"); return false; // LCOV_EXCL_STOP } if (!loadLedger->walkLedger(getJournal("Ledger"), true)) { // LCOV_EXCL_START JLOG(m_journal.fatal()) << "Ledger is missing nodes."; UNREACHABLE( "xrpl::ApplicationImp::loadOldLedger : ledger is missing " "nodes"); return false; // LCOV_EXCL_STOP } if (!loadLedger->isSensible()) { // LCOV_EXCL_START Json::Value j = getJson({*loadLedger, {}}); j[jss::accountTreeHash] = to_string(loadLedger->header().accountHash); j[jss::transTreeHash] = to_string(loadLedger->header().txHash); JLOG(m_journal.fatal()) << "Ledger is not sensible: " << j; UNREACHABLE( "xrpl::ApplicationImp::loadOldLedger : ledger is not " "sensible"); return false; // LCOV_EXCL_STOP } m_ledgerMaster->setLedgerRangePresent(loadLedger->header().seq, loadLedger->header().seq); m_ledgerMaster->switchLCL(loadLedger); loadLedger->setValidated(); m_ledgerMaster->setFullLedger(loadLedger, true, false); openLedger_.emplace(loadLedger, cachedSLEs_, logs_->journal("OpenLedger")); if (replay) { // inject transaction(s) from the replayLedger into our open ledger // and build replay structure auto replayData = std::make_unique(loadLedger, replayLedger); for (auto const& [_, tx] : replayData->orderedTxns()) { (void)_; auto txID = tx->getTransactionID(); if (trapTxID == txID) { trapTxID_ = txID; JLOG(m_journal.debug()) << "Trap transaction set: " << txID; } auto s = std::make_shared(); tx->add(*s); forceValidity(getHashRouter(), txID, Validity::SigGoodOnly); // emplaced during initialization before any caller // NOLINTNEXTLINE(bugprone-unchecked-optional-access) openLedger_->modify([&txID, &s](OpenView& view, beast::Journal j) { view.rawTxInsert(txID, std::move(s), nullptr); return true; }); } m_ledgerMaster->takeReplay(std::move(replayData)); if (trapTxID && !trapTxID_) { JLOG(m_journal.fatal()) << "Ledger " << replayLedger->header().seq << " does not contain the transaction hash " << *trapTxID; return false; } } } catch (SHAMapMissingNode const& mn) { JLOG(m_journal.fatal()) << "While loading specified ledger: " << mn.what(); return false; } catch (boost::bad_lexical_cast&) { JLOG(m_journal.fatal()) << "Ledger specified '" << ledgerID << "' is not valid"; return false; } return true; } bool ApplicationImp::serverOkay(std::string& reason) { if (!config().ELB_SUPPORT) return true; if (isStopping()) { reason = "Server is shutting down"; return false; } if (getOPs().isNeedNetworkLedger()) { reason = "Not synchronized with network yet"; return false; } if (getOPs().isAmendmentBlocked()) { reason = "Server version too old"; return false; } if (getOPs().isUNLBlocked()) { reason = "No valid validator list available"; return false; } if (getOPs().getOperatingMode() < OperatingMode::SYNCING) { reason = "Not synchronized with network"; return false; } if (!getLedgerMaster().isCaughtUp(reason)) return false; if (getFeeTrack().isLoadedLocal()) { reason = "Too much load"; return false; } return true; } beast::Journal ApplicationImp::getJournal(std::string const& name) { return logs_->journal(name); } void ApplicationImp::setMaxDisallowedLedger() { auto seq = getRelationalDatabase().getMaxLedgerSeq(); if (seq) maxDisallowedLedger_ = *seq; JLOG(m_journal.trace()) << "Max persisted ledger is " << maxDisallowedLedger_; } //------------------------------------------------------------------------------ Application::Application() : beast::PropertyStream::Source("app") { } //------------------------------------------------------------------------------ std::unique_ptr make_Application( std::unique_ptr config, std::unique_ptr logs, std::unique_ptr timeKeeper) { return std::make_unique( std::move(config), std::move(logs), std::move(timeKeeper)); } void fixConfigPorts(Config& config, Endpoints const& endpoints) { for (auto const& [name, ep] : endpoints) { if (!config.exists(name)) continue; auto& section = config[name]; auto const optPort = section.get("port"); if (optPort) { std::uint16_t const port = beast::lexicalCast(*optPort); if (port == 0u) section.set("port", std::to_string(ep.port())); } } } } // namespace xrpl