From ec701270503f737841d4192fe1c815a86bbc0f18 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Tue, 15 Aug 2023 14:36:11 +0100 Subject: [PATCH] Add LOG macro to prevent unnecessary evaluations (#823) Fixes #824 --- src/data/BackendFactory.h | 4 +- src/data/BackendInterface.cpp | 52 ++++----- src/data/BackendInterface.h | 2 +- src/data/CassandraBackend.h | 120 ++++++++++---------- src/data/cassandra/Schema.h | 4 +- src/data/cassandra/impl/Cluster.cpp | 32 +++--- src/data/cassandra/impl/ExecutionStrategy.h | 18 +-- src/data/cassandra/impl/RetryPolicy.h | 4 +- src/etl/ETLService.cpp | 44 +++---- src/etl/ETLService.h | 6 +- src/etl/LoadBalancer.cpp | 37 +++--- src/etl/ProbingSource.cpp | 6 +- src/etl/Source.cpp | 10 +- src/etl/Source.h | 50 ++++---- src/etl/impl/AsyncData.h | 24 ++-- src/etl/impl/CacheLoader.h | 67 +++++------ src/etl/impl/ExtractionDataPipe.h | 2 +- src/etl/impl/Extractor.h | 6 +- src/etl/impl/ForwardCache.cpp | 2 +- src/etl/impl/LedgerFetcher.h | 8 +- src/etl/impl/LedgerLoader.h | 28 ++--- src/etl/impl/LedgerPublisher.h | 18 +-- src/etl/impl/Transformer.h | 75 ++++++------ src/feed/SubscriptionManager.h | 2 +- src/main/Main.cpp | 8 +- src/rpc/RPCEngine.h | 10 +- src/rpc/RPCHelpers.cpp | 14 +-- src/rpc/RPCHelpers.h | 6 +- src/rpc/WorkQueue.h | 7 +- src/rpc/common/impl/APIVersionParser.h | 10 +- src/rpc/handlers/AccountTx.cpp | 4 +- src/rpc/handlers/LedgerData.cpp | 8 +- src/rpc/handlers/NFTHistory.cpp | 4 +- src/util/config/Config.cpp | 3 +- src/util/log/Logger.cpp | 2 +- src/util/log/Logger.h | 21 +++- src/web/Context.h | 2 +- src/web/DOSGuard.h | 8 +- src/web/RPCServerHandler.h | 22 ++-- src/web/Server.h | 6 +- src/web/impl/HttpBase.h | 8 +- src/web/impl/WsBase.h | 10 +- unittests/LoggerTests.cpp | 17 +++ unittests/etl/CacheLoaderTests.cpp | 1 - 44 files changed, 418 insertions(+), 374 deletions(-) diff --git a/src/data/BackendFactory.h b/src/data/BackendFactory.h index 4aa28bc7..6301ae77 100644 --- a/src/data/BackendFactory.h +++ b/src/data/BackendFactory.h @@ -39,7 +39,7 @@ std::shared_ptr make_Backend(boost::asio::io_context& ioc, util::Config const& config) { static util::Logger log{"Backend"}; - log.info() << "Constructing BackendInterface"; + LOG(log.info()) << "Constructing BackendInterface"; auto const readOnly = config.valueOr("read_only", false); @@ -63,7 +63,7 @@ make_Backend(boost::asio::io_context& ioc, util::Config const& config) backend->updateRange(rng->maxSequence); } - log.info() << "Constructed BackendInterface Successfully"; + LOG(log.info()) << "Constructed BackendInterface Successfully"; return backend; } } // namespace data diff --git a/src/data/BackendInterface.cpp b/src/data/BackendInterface.cpp index 0b59183c..9a594717 100644 --- a/src/data/BackendInterface.cpp +++ b/src/data/BackendInterface.cpp @@ -32,11 +32,11 @@ namespace data { bool BackendInterface::finishWrites(std::uint32_t const ledgerSequence) { - gLog.debug() << "Want finish writes for " << ledgerSequence; + LOG(gLog.debug()) << "Want finish writes for " << ledgerSequence; auto commitRes = doFinishWrites(); if (commitRes) { - gLog.debug() << "Successfully commited. Updating range now to " << ledgerSequence; + LOG(gLog.debug()) << "Successfully commited. Updating range now to " << ledgerSequence; updateRange(ledgerSequence); } return commitRes; @@ -64,17 +64,17 @@ BackendInterface::fetchLedgerObject( auto obj = cache_.get(key, sequence); if (obj) { - gLog.trace() << "Cache hit - " << ripple::strHex(key); + LOG(gLog.trace()) << "Cache hit - " << ripple::strHex(key); return *obj; } else { - gLog.trace() << "Cache miss - " << ripple::strHex(key); + LOG(gLog.trace()) << "Cache miss - " << ripple::strHex(key); auto dbObj = doFetchLedgerObject(key, sequence, yield); if (!dbObj) - gLog.trace() << "Missed cache and missed in db"; + LOG(gLog.trace()) << "Missed cache and missed in db"; else - gLog.trace() << "Missed cache but found in db"; + LOG(gLog.trace()) << "Missed cache but found in db"; return dbObj; } } @@ -96,7 +96,7 @@ BackendInterface::fetchLedgerObjects( else misses.push_back(keys[i]); } - gLog.trace() << "Cache hits = " << keys.size() - misses.size() << " - cache misses = " << misses.size(); + LOG(gLog.trace()) << "Cache hits = " << keys.size() - misses.size() << " - cache misses = " << misses.size(); if (misses.size()) { @@ -122,9 +122,9 @@ BackendInterface::fetchSuccessorKey( { auto succ = cache_.getSuccessor(key, ledgerSequence); if (succ) - gLog.trace() << "Cache hit - " << ripple::strHex(key); + LOG(gLog.trace()) << "Cache hit - " << ripple::strHex(key); else - gLog.trace() << "Cache miss - " << ripple::strHex(key); + LOG(gLog.trace()) << "Cache miss - " << ripple::strHex(key); return succ ? succ->key : doFetchSuccessorKey(key, ledgerSequence, yield); } @@ -174,7 +174,7 @@ BackendInterface::fetchBookOffers( succMillis += getMillis(mid2 - mid1); if (!offerDir || offerDir->key >= bookEnd) { - gLog.trace() << "offerDir.has_value() " << offerDir.has_value() << " breaking"; + LOG(gLog.trace()) << "offerDir.has_value() " << offerDir.has_value() << " breaking"; break; } uTipIndex = offerDir->key; @@ -187,7 +187,7 @@ BackendInterface::fetchBookOffers( auto next = sle.getFieldU64(ripple::sfIndexNext); if (!next) { - gLog.trace() << "Next is empty. breaking"; + LOG(gLog.trace()) << "Next is empty. breaking"; break; } auto nextKey = ripple::keylet::page(uTipIndex, next); @@ -203,21 +203,21 @@ BackendInterface::fetchBookOffers( auto objs = fetchLedgerObjects(keys, ledgerSequence, yield); for (size_t i = 0; i < keys.size() && i < limit; ++i) { - gLog.trace() << "Key = " << ripple::strHex(keys[i]) << " blob = " << ripple::strHex(objs[i]) - << " ledgerSequence = " << ledgerSequence; + LOG(gLog.trace()) << "Key = " << ripple::strHex(keys[i]) << " blob = " << ripple::strHex(objs[i]) + << " ledgerSequence = " << ledgerSequence; assert(objs[i].size()); page.offers.push_back({keys[i], objs[i]}); } auto end = std::chrono::system_clock::now(); - gLog.debug() << "Fetching " << std::to_string(keys.size()) << " offers took " - << std::to_string(getMillis(mid - begin)) << " milliseconds. Fetching next dir took " - << std::to_string(succMillis) << " milliseonds. Fetched next dir " << std::to_string(numSucc) - << " times" - << " Fetching next page of dir took " << std::to_string(pageMillis) << " milliseconds" - << ". num pages = " << std::to_string(numPages) << ". Fetching all objects took " - << std::to_string(getMillis(end - mid)) - << " milliseconds. total time = " << std::to_string(getMillis(end - begin)) << " milliseconds" - << " book = " << ripple::strHex(book); + LOG(gLog.debug()) << "Fetching " << std::to_string(keys.size()) << " offers took " + << std::to_string(getMillis(mid - begin)) << " milliseconds. Fetching next dir took " + << std::to_string(succMillis) << " milliseonds. Fetched next dir " << std::to_string(numSucc) + << " times" + << " Fetching next page of dir took " << std::to_string(pageMillis) << " milliseconds" + << ". num pages = " << std::to_string(numPages) << ". Fetching all objects took " + << std::to_string(getMillis(end - mid)) + << " milliseconds. total time = " << std::to_string(getMillis(end - begin)) << " milliseconds" + << " book = " << ripple::strHex(book); return page; } @@ -276,14 +276,14 @@ BackendInterface::fetchLedgerPage( page.objects.push_back({std::move(keys[i]), std::move(objects[i])}); else if (!outOfOrder) { - gLog.error() << "Deleted or non-existent object in successor table. key = " << ripple::strHex(keys[i]) - << " - seq = " << ledgerSequence; + LOG(gLog.error()) << "Deleted or non-existent object in successor table. key = " << ripple::strHex(keys[i]) + << " - seq = " << ledgerSequence; std::stringstream msg; for (size_t j = 0; j < objects.size(); ++j) { msg << " - " << ripple::strHex(keys[j]); } - gLog.error() << msg.str(); + LOG(gLog.error()) << msg.str(); } } if (keys.size() && !reachedEnd) @@ -302,7 +302,7 @@ BackendInterface::fetchFees(std::uint32_t const seq, boost::asio::yield_context if (!bytes) { - gLog.error() << "Could not find fees"; + LOG(gLog.error()) << "Could not find fees"; return {}; } diff --git a/src/data/BackendInterface.h b/src/data/BackendInterface.h index ac218a3a..1a25f6d5 100644 --- a/src/data/BackendInterface.h +++ b/src/data/BackendInterface.h @@ -70,7 +70,7 @@ retryOnTimeout(FnType func, size_t waitMs = 500) } catch (DatabaseTimeout const&) { - log.error() << "Database request timed out. Sleeping and retrying ... "; + LOG(log.error()) << "Database request timed out. Sleeping and retrying ... "; std::this_thread::sleep_for(std::chrono::milliseconds(waitMs)); } } diff --git a/src/data/CassandraBackend.h b/src/data/CassandraBackend.h index 6e8ffa49..067e8a4c 100644 --- a/src/data/CassandraBackend.h +++ b/src/data/CassandraBackend.h @@ -93,11 +93,11 @@ public: } catch (std::runtime_error const& ex) { - log_.error() << "Failed to prepare the statements: " << ex.what() << "; readOnly: " << readOnly; + LOG(log_.error()) << "Failed to prepare the statements: " << ex.what() << "; readOnly: " << readOnly; throw; } - log_.info() << "Created (revamped) CassandraBackend"; + LOG(log_.info()) << "Created (revamped) CassandraBackend"; } TransactionsAndCursor @@ -123,8 +123,8 @@ public: if (cursor) { statement.bindAt(1, cursor->asTuple()); - log_.debug() << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence - << cursor->transactionIndex; + LOG(log_.debug()) << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence + << cursor->transactionIndex; } else { @@ -132,7 +132,8 @@ public: auto const placeHolder = forward ? 0u : std::numeric_limits::max(); statement.bindAt(1, std::make_tuple(placeHolder, placeHolder)); - log_.debug() << "account = " << ripple::strHex(account) << " idx = " << seq << " tuple = " << placeHolder; + LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq + << " tuple = " << placeHolder; } // FIXME: Limit is a hack to support uint32_t properly for the time @@ -143,20 +144,20 @@ public: auto const& results = res.value(); if (not results.hasRows()) { - log_.debug() << "No rows returned"; + LOG(log_.debug()) << "No rows returned"; return {}; } std::vector hashes = {}; auto numRows = results.numRows(); - log_.info() << "num_rows = " << numRows; + LOG(log_.info()) << "num_rows = " << numRows; for (auto [hash, data] : extract>(results)) { hashes.push_back(hash); if (--numRows == 0) { - log_.debug() << "Setting cursor"; + LOG(log_.debug()) << "Setting cursor"; cursor = data; // forward queries by ledger/tx sequence `>=` @@ -167,11 +168,11 @@ public: } auto const txns = fetchTransactions(hashes, yield); - log_.debug() << "Txns = " << txns.size(); + LOG(log_.debug()) << "Txns = " << txns.size(); if (txns.size() == limit) { - log_.debug() << "Returning cursor"; + LOG(log_.debug()) << "Returning cursor"; return {txns, cursor}; } @@ -191,11 +192,11 @@ public: if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { - log_.warn() << "Update failed for ledger " << ledgerSequence_; + LOG(log_.warn()) << "Update failed for ledger " << ledgerSequence_; return false; } - log_.info() << "Committed ledger " << ledgerSequence_; + LOG(log_.info()) << "Committed ledger " << ledgerSequence_; return true; } @@ -219,15 +220,15 @@ public: if (auto const maybeValue = result.template get(); maybeValue) return maybeValue; - log_.error() << "Could not fetch latest ledger - no rows"; + LOG(log_.error()) << "Could not fetch latest ledger - no rows"; return std::nullopt; } - log_.error() << "Could not fetch latest ledger - no result"; + LOG(log_.error()) << "Could not fetch latest ledger - no result"; } else { - log_.error() << "Could not fetch latest ledger: " << res.error(); + LOG(log_.error()) << "Could not fetch latest ledger: " << res.error(); } return std::nullopt; @@ -246,15 +247,15 @@ public: return util::deserializeHeader(ripple::makeSlice(*maybeValue)); } - log_.error() << "Could not fetch ledger by sequence - no rows"; + LOG(log_.error()) << "Could not fetch ledger by sequence - no rows"; return std::nullopt; } - log_.error() << "Could not fetch ledger by sequence - no result"; + LOG(log_.error()) << "Could not fetch ledger by sequence - no result"; } else { - log_.error() << "Could not fetch ledger by sequence: " << res.error(); + LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error(); } return std::nullopt; @@ -270,15 +271,15 @@ public: if (auto const maybeValue = result.template get(); maybeValue) return fetchLedgerBySequence(*maybeValue, yield); - log_.error() << "Could not fetch ledger by hash - no rows"; + LOG(log_.error()) << "Could not fetch ledger by hash - no rows"; return std::nullopt; } - log_.error() << "Could not fetch ledger by hash - no result"; + LOG(log_.error()) << "Could not fetch ledger by hash - no result"; } else { - log_.error() << "Could not fetch ledger by hash: " << res.error(); + LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error(); } return std::nullopt; @@ -292,7 +293,7 @@ public: auto const& results = res.value(); if (not results.hasRows()) { - log_.debug() << "Could not fetch ledger range - no rows"; + LOG(log_.debug()) << "Could not fetch ledger range - no rows"; return std::nullopt; } @@ -314,12 +315,13 @@ public: if (range.minSequence > range.maxSequence) std::swap(range.minSequence, range.maxSequence); - log_.debug() << "After hardFetchLedgerRange range is " << range.minSequence << ":" << range.maxSequence; + LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":" + << range.maxSequence; return range; } else { - log_.error() << "Could not fetch ledger range: " << res.error(); + LOG(log_.error()) << "Could not fetch ledger range: " << res.error(); } return std::nullopt; @@ -341,15 +343,15 @@ public: if (not res) { - log_.error() << "Could not fetch all transaction hashes: " << res.error(); + LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error(); return {}; } auto const& result = res.value(); if (not result.hasRows()) { - log_.error() << "Could not fetch all transaction hashes - no rows; ledger = " - << std::to_string(ledgerSequence); + LOG(log_.error()) << "Could not fetch all transaction hashes - no rows; ledger = " + << std::to_string(ledgerSequence); return {}; } @@ -358,8 +360,9 @@ public: hashes.push_back(std::move(hash)); auto end = std::chrono::system_clock::now(); - log_.debug() << "Fetched " << hashes.size() << " transaction hashes from Cassandra in " - << std::chrono::duration_cast(end - start).count() << " milliseconds"; + LOG(log_.debug()) << "Fetched " << hashes.size() << " transaction hashes from Cassandra in " + << std::chrono::duration_cast(end - start).count() + << " milliseconds"; return hashes; } @@ -398,7 +401,7 @@ public: return result; } - log_.error() << "Could not fetch NFT - no rows"; + LOG(log_.error()) << "Could not fetch NFT - no rows"; return std::nullopt; } @@ -425,8 +428,8 @@ public: if (cursor) { statement.bindAt(1, cursor->asTuple()); - log_.debug() << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence - << cursor->transactionIndex; + LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence + << cursor->transactionIndex; } else { @@ -434,7 +437,8 @@ public: auto const placeHolder = forward ? 0 : std::numeric_limits::max(); statement.bindAt(1, std::make_tuple(placeHolder, placeHolder)); - log_.debug() << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq << " tuple = " << placeHolder; + LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq + << " tuple = " << placeHolder; } statement.bindAt(2, Limit{limit}); @@ -443,20 +447,20 @@ public: auto const& results = res.value(); if (not results.hasRows()) { - log_.debug() << "No rows returned"; + LOG(log_.debug()) << "No rows returned"; return {}; } std::vector hashes = {}; auto numRows = results.numRows(); - log_.info() << "num_rows = " << numRows; + LOG(log_.info()) << "num_rows = " << numRows; for (auto [hash, data] : extract>(results)) { hashes.push_back(hash); if (--numRows == 0) { - log_.debug() << "Setting cursor"; + LOG(log_.debug()) << "Setting cursor"; cursor = data; // forward queries by ledger/tx sequence `>=` @@ -467,11 +471,11 @@ public: } auto const txns = fetchTransactions(hashes, yield); - log_.debug() << "NFT Txns = " << txns.size(); + LOG(log_.debug()) << "NFT Txns = " << txns.size(); if (txns.size() == limit) { - log_.debug() << "Returning cursor"; + LOG(log_.debug()) << "Returning cursor"; return {txns, cursor}; } @@ -482,7 +486,7 @@ public: doFetchLedgerObject(ripple::uint256 const& key, std::uint32_t const sequence, boost::asio::yield_context yield) const override { - log_.debug() << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key); + LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key); if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) { if (auto const result = res->template get(); result) @@ -492,12 +496,12 @@ public: } else { - log_.debug() << "Could not fetch ledger object - no rows"; + LOG(log_.debug()) << "Could not fetch ledger object - no rows"; } } else { - log_.error() << "Could not fetch ledger object: " << res.error(); + LOG(log_.error()) << "Could not fetch ledger object: " << res.error(); } return std::nullopt; @@ -515,12 +519,12 @@ public: } else { - log_.debug() << "Could not fetch transaction - no rows"; + LOG(log_.debug()) << "Could not fetch transaction - no rows"; } } else { - log_.error() << "Could not fetch transaction: " << res.error(); + LOG(log_.error()) << "Could not fetch transaction: " << res.error(); } return std::nullopt; @@ -540,12 +544,12 @@ public: } else { - log_.debug() << "Could not fetch successor - no rows"; + LOG(log_.debug()) << "Could not fetch successor - no rows"; } } else { - log_.error() << "Could not fetch successor: " << res.error(); + LOG(log_.error()) << "Could not fetch successor: " << res.error(); } return std::nullopt; @@ -585,7 +589,8 @@ public: }); assert(numHashes == results.size()); - log_.debug() << "Fetched " << numHashes << " transactions from Cassandra in " << timeDiff << " milliseconds"; + LOG(log_.debug()) << "Fetched " << numHashes << " transactions from Cassandra in " << timeDiff + << " milliseconds"; return results; } @@ -599,7 +604,7 @@ public: return {}; auto const numKeys = keys.size(); - log_.trace() << "Fetching " << numKeys << " objects"; + LOG(log_.trace()) << "Fetching " << numKeys << " objects"; std::vector results; results.reserve(numKeys); @@ -622,7 +627,7 @@ public: return {}; }); - log_.trace() << "Fetched " << numKeys << " objects"; + LOG(log_.trace()) << "Fetched " << numKeys << " objects"; return results; } @@ -633,14 +638,14 @@ public: auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence); if (not res) { - log_.error() << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence; + LOG(log_.error()) << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence; return {}; } auto const& results = res.value(); if (not results) { - log_.error() << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence; + LOG(log_.error()) << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence; return {}; } @@ -655,7 +660,8 @@ public: if (keys.empty()) return {}; - log_.debug() << "Fetched " << keys.size() << " diff hashes from Cassandra in " << timeDiff << " milliseconds"; + LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from Cassandra in " << timeDiff + << " milliseconds"; auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield); std::vector results; @@ -676,7 +682,7 @@ public: void doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override { - log_.trace() << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]"; + LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]"; if (range) executor_.write(schema_->insertDiff, seq, key); @@ -687,8 +693,8 @@ public: void writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override { - log_.trace() << "Writing successor. key = " << key.size() << " bytes. " - << " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes."; + LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. " + << " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes."; assert(key.size() != 0); assert(successor.size() != 0); @@ -740,7 +746,7 @@ public: std::string&& transaction, std::string&& metadata) override { - log_.trace() << "Writing txn to cassandra"; + LOG(log_.trace()) << "Writing txn to cassandra"; executor_.write(schema_->insertLedgerTransaction, seq, hash); executor_.write( @@ -798,13 +804,13 @@ private: auto maybeSuccess = res->template get(); if (not maybeSuccess) { - log_.error() << "executeSyncUpdate - error getting result - no row"; + LOG(log_.error()) << "executeSyncUpdate - error getting result - no row"; return false; } if (not maybeSuccess.value()) { - log_.warn() << "Update failed. Checking if DB state is what we expect"; + LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect"; // error may indicate that another writer wrote something. // in this case let's just compare the current state of things diff --git a/src/data/cassandra/Schema.h b/src/data/cassandra/Schema.h index 9c2f7e06..7cd8ff95 100644 --- a/src/data/cassandra/Schema.h +++ b/src/data/cassandra/Schema.h @@ -641,9 +641,9 @@ public: void prepareStatements(Handle const& handle) { - log_.info() << "Preparing cassandra statements"; + LOG(log_.info()) << "Preparing cassandra statements"; statements_ = std::make_unique(settingsProvider_, handle); - log_.info() << "Finished preparing statements"; + LOG(log_.info()) << "Finished preparing statements"; } /** diff --git a/src/data/cassandra/impl/Cluster.cpp b/src/data/cassandra/impl/Cluster.cpp index 70400904..a3dff2a2 100644 --- a/src/data/cassandra/impl/Cluster.cpp +++ b/src/data/cassandra/impl/Cluster.cpp @@ -143,18 +143,18 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c return maybeValue ? to_string(*maybeValue) : "default"; }; - log_.info() << "Threads: " << settings.threads; - log_.info() << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold; - log_.info() << "Max connections per host: " << settings.maxConnectionsPerHost; - log_.info() << "Core connections per host: " << settings.coreConnectionsPerHost; - log_.info() << "IO queue size: " << queueSize; - log_.info() << "Event queue size: " << valueOrDefault(settings.queueSizeEvent); - log_.info() << "Write bytes high watermark: " << valueOrDefault(settings.writeBytesHighWatermark); - log_.info() << "Write bytes low watermark: " << valueOrDefault(settings.writeBytesLowWatermark); - log_.info() << "Pending requests high watermark: " << valueOrDefault(settings.pendingRequestsHighWatermark); - log_.info() << "Pending requests low watermark: " << valueOrDefault(settings.pendingRequestsLowWatermark); - log_.info() << "Max requests per flush: " << valueOrDefault(settings.maxRequestsPerFlush); - log_.info() << "Max concurrent creation: " << valueOrDefault(settings.maxConcurrentCreation); + LOG(log_.info()) << "Threads: " << settings.threads; + LOG(log_.info()) << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold; + LOG(log_.info()) << "Max connections per host: " << settings.maxConnectionsPerHost; + LOG(log_.info()) << "Core connections per host: " << settings.coreConnectionsPerHost; + LOG(log_.info()) << "IO queue size: " << queueSize; + LOG(log_.info()) << "Event queue size: " << valueOrDefault(settings.queueSizeEvent); + LOG(log_.info()) << "Write bytes high watermark: " << valueOrDefault(settings.writeBytesHighWatermark); + LOG(log_.info()) << "Write bytes low watermark: " << valueOrDefault(settings.writeBytesLowWatermark); + LOG(log_.info()) << "Pending requests high watermark: " << valueOrDefault(settings.pendingRequestsHighWatermark); + LOG(log_.info()) << "Pending requests low watermark: " << valueOrDefault(settings.pendingRequestsLowWatermark); + LOG(log_.info()) << "Max requests per flush: " << valueOrDefault(settings.maxRequestsPerFlush); + LOG(log_.info()) << "Max concurrent creation: " << valueOrDefault(settings.maxConcurrentCreation); } void @@ -178,7 +178,7 @@ Cluster::setupContactPoints(Settings::ContactPoints const& points) }; { - log_.debug() << "Attempt connection using contact points: " << points.contactPoints; + LOG(log_.debug()) << "Attempt connection using contact points: " << points.contactPoints; auto const rc = cass_cluster_set_contact_points(*this, points.contactPoints.data()); throwErrorIfNeeded(rc, "contact_points", points.contactPoints); } @@ -193,7 +193,7 @@ Cluster::setupContactPoints(Settings::ContactPoints const& points) void Cluster::setupSecureBundle(Settings::SecureConnectionBundle const& bundle) { - log_.debug() << "Attempt connection using secure bundle"; + LOG(log_.debug()) << "Attempt connection using secure bundle"; if (auto const rc = cass_cluster_set_cloud_secure_connection_bundle(*this, bundle.bundle.data()); rc != CASS_OK) { throw std::runtime_error("Failed to connect using secure connection bundle " + bundle.bundle); @@ -206,7 +206,7 @@ Cluster::setupCertificate(Settings const& settings) if (not settings.certificate) return; - log_.debug() << "Configure SSL context"; + LOG(log_.debug()) << "Configure SSL context"; SslContext context = SslContext(*settings.certificate); cass_cluster_set_ssl(*this, context); } @@ -217,7 +217,7 @@ Cluster::setupCredentials(Settings const& settings) if (not settings.username || not settings.password) return; - log_.debug() << "Set credentials; username: " << settings.username.value(); + LOG(log_.debug()) << "Set credentials; username: " << settings.username.value(); cass_cluster_set_credentials(*this, settings.username.value().c_str(), settings.password.value().c_str()); } diff --git a/src/data/cassandra/impl/ExecutionStrategy.h b/src/data/cassandra/impl/ExecutionStrategy.h index 8a52efef..ce3fbffc 100644 --- a/src/data/cassandra/impl/ExecutionStrategy.h +++ b/src/data/cassandra/impl/ExecutionStrategy.h @@ -89,8 +89,8 @@ public: , handle_{std::cref(handle)} , thread_{[this]() { ioc_.run(); }} { - log_.info() << "Max write requests outstanding is " << maxWriteRequestsOutstanding_ - << "; Max read requests outstanding is " << maxReadRequestsOutstanding_; + LOG(log_.info()) << "Max write requests outstanding is " << maxWriteRequestsOutstanding_ + << "; Max read requests outstanding is " << maxReadRequestsOutstanding_; } ~DefaultExecutionStrategy() @@ -106,10 +106,10 @@ public: void sync() { - log_.debug() << "Waiting to sync all writes..."; + LOG(log_.debug()) << "Waiting to sync all writes..."; std::unique_lock lck(syncMutex_); syncCv_.wait(lck, [this]() { return finishedAllWriteRequests(); }); - log_.debug() << "Sync done."; + LOG(log_.debug()) << "Sync done."; } /** @@ -137,7 +137,7 @@ public: } else { - log_.warn() << "Cassandra sync write error, retrying: " << res.error(); + LOG(log_.warn()) << "Cassandra sync write error, retrying: " << res.error(); std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } @@ -262,7 +262,7 @@ public: } else { - log_.error() << "Failed batch read in coroutine: " << res.error(); + LOG(log_.error()) << "Failed batch read in coroutine: " << res.error(); throwErrorIfNeeded(res.error()); } } @@ -311,7 +311,7 @@ public: } else { - log_.error() << "Failed read in coroutine: " << res.error(); + LOG(log_.error()) << "Failed read in coroutine: " << res.error(); throwErrorIfNeeded(res.error()); } } @@ -400,8 +400,8 @@ private: std::unique_lock lck(throttleMutex_); if (!canAddWriteRequest()) { - log_.trace() << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; + LOG(log_.trace()) << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; throttleCv_.wait(lck, [this]() { return canAddWriteRequest(); }); } } diff --git a/src/data/cassandra/impl/RetryPolicy.h b/src/data/cassandra/impl/RetryPolicy.h index ee85dfad..5db4a5d8 100644 --- a/src/data/cassandra/impl/RetryPolicy.h +++ b/src/data/cassandra/impl/RetryPolicy.h @@ -59,8 +59,8 @@ public: shouldRetry([[maybe_unused]] CassandraError err) { auto const delay = calculateDelay(attempt_); - log_.error() << "Cassandra write error: " << err << ", current retries " << attempt_ << ", retrying in " - << delay.count() << " milliseconds"; + LOG(log_.error()) << "Cassandra write error: " << err << ", current retries " << attempt_ << ", retrying in " + << delay.count() << " milliseconds"; return true; // keep retrying forever } diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index c3f1189d..94008ffe 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -29,7 +29,7 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors) if (finishSequence_ && startSequence > *finishSequence_) return {}; - log_.debug() << "Starting etl pipeline"; + LOG(log_.debug()) << "Starting etl pipeline"; state_.isWriting = true; auto rng = backend_->hardFetchLedgerRangeNoThrow(); @@ -57,12 +57,12 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors) auto const end = std::chrono::system_clock::now(); auto const lastPublishedSeq = ledgerPublisher_.getLastPublishedSequence(); - log_.debug() << "Extracted and wrote " << lastPublishedSeq.value_or(startSequence) - startSequence << " in " - << ((end - begin).count()) / 1000000000.0; + LOG(log_.debug()) << "Extracted and wrote " << lastPublishedSeq.value_or(startSequence) - startSequence << " in " + << ((end - begin).count()) / 1000000000.0; state_.isWriting = false; - log_.debug() << "Stopping etl pipeline"; + LOG(log_.debug()) << "Stopping etl pipeline"; return lastPublishedSeq; } @@ -80,30 +80,30 @@ ETLService::monitor() auto rng = backend_->hardFetchLedgerRangeNoThrow(); if (!rng) { - log_.info() << "Database is empty. Will download a ledger from the network."; + LOG(log_.info()) << "Database is empty. Will download a ledger from the network."; std::optional ledger; try { if (startSequence_) { - log_.info() << "ledger sequence specified in config. " - << "Will begin ETL process starting with ledger " << *startSequence_; + LOG(log_.info()) << "ledger sequence specified in config. " + << "Will begin ETL process starting with ledger " << *startSequence_; ledger = ledgerLoader_.loadInitialLedger(*startSequence_); } else { - log_.info() << "Waiting for next ledger to be validated by network..."; + LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; std::optional mostRecentValidated = networkValidatedLedgers_->getMostRecent(); if (mostRecentValidated) { - log_.info() << "Ledger " << *mostRecentValidated << " has been validated. Downloading..."; + LOG(log_.info()) << "Ledger " << *mostRecentValidated << " has been validated. Downloading..."; ledger = ledgerLoader_.loadInitialLedger(*mostRecentValidated); } else { - log_.info() << "The wait for the next validated ledger has been aborted. Exiting monitor loop"; + LOG(log_.info()) << "The wait for the next validated ledger has been aborted. Exiting monitor loop"; return; } } @@ -124,24 +124,24 @@ ETLService::monitor() } else { - log_.error() << "Failed to load initial ledger. Exiting monitor loop"; + LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop"; return; } } else { if (startSequence_) - log_.warn() << "start sequence specified but db is already populated"; + LOG(log_.warn()) << "start sequence specified but db is already populated"; - log_.info() << "Database already populated. Picking up from the tip of history"; + LOG(log_.info()) << "Database already populated. Picking up from the tip of history"; cacheLoader_.load(rng->maxSequence); } assert(rng); uint32_t nextSequence = rng->maxSequence + 1; - log_.debug() << "Database is populated. " - << "Starting monitor loop. sequence = " << nextSequence; + LOG(log_.debug()) << "Database is populated. " + << "Starting monitor loop. sequence = " << nextSequence; while (true) { @@ -152,8 +152,8 @@ ETLService::monitor() } else if (networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence, 1000)) { - log_.info() << "Ledger with sequence = " << nextSequence << " has been validated by the network. " - << "Attempting to find in database and publish"; + LOG(log_.info()) << "Ledger with sequence = " << nextSequence << " has been validated by the network. " + << "Attempting to find in database and publish"; // Attempt to take over responsibility of ETL writer after 10 failed // attempts to publish the ledger. publishLedger() fails if the @@ -166,11 +166,11 @@ ETLService::monitor() if (!success) { - log_.warn() << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL"; + LOG(log_.warn()) << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL"; // returns the most recent sequence published empty optional if no sequence was published std::optional lastPublished = runETLPipeline(nextSequence, extractorThreads_); - log_.info() << "Aborting ETL. Falling back to publishing"; + LOG(log_.info()) << "Aborting ETL. Falling back to publishing"; // if no ledger was published, don't increment nextSequence if (lastPublished) @@ -187,7 +187,7 @@ ETLService::monitor() void ETLService::monitorReadOnly() { - log_.debug() << "Starting reporting in strict read only mode"; + LOG(log_.debug()) << "Starting reporting in strict read only mode"; auto rng = backend_->hardFetchLedgerRangeNoThrow(); uint32_t latestSequence; @@ -226,7 +226,7 @@ ETLService::monitorReadOnly() void ETLService::run() { - log_.info() << "Starting reporting etl"; + LOG(log_.info()) << "Starting reporting etl"; state_.isStopping = false; doWork(); @@ -266,4 +266,4 @@ ETLService::ETLService( extractorThreads_ = config.valueOr("extractor_threads", extractorThreads_); txnThreshold_ = config.valueOr("txn_threshold", txnThreshold_); } -} // namespace etl \ No newline at end of file +} // namespace etl diff --git a/src/etl/ETLService.h b/src/etl/ETLService.h index 3cf73291..40ded4f3 100644 --- a/src/etl/ETLService.h +++ b/src/etl/ETLService.h @@ -150,8 +150,8 @@ public: */ ~ETLService() { - log_.info() << "onStop called"; - log_.debug() << "Stopping Reporting ETL"; + LOG(log_.info()) << "onStop called"; + LOG(log_.debug()) << "Stopping Reporting ETL"; state_.isStopping = true; cacheLoader_.stop(); @@ -159,7 +159,7 @@ public: if (worker_.joinable()) worker_.join(); - log_.debug() << "Joined ETLService worker thread"; + LOG(log_.debug()) << "Joined ETLService worker thread"; } /** diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 70c1d26f..691d64f6 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -83,7 +83,7 @@ LoadBalancer::LoadBalancer( std::unique_ptr source = make_Source(entry, ioc, backend, subscriptions, validatedLedgers, *this); sources_.push_back(std::move(source)); - log_.info() << "Added etl source - " << sources_.back()->toString(); + LOG(log_.info()) << "Added etl source - " << sources_.back()->toString(); } } @@ -101,8 +101,8 @@ LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly) auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, cacheOnly); if (!res) - log_.error() << "Failed to download initial ledger." - << " Sequence = " << sequence << " source = " << source->toString(); + LOG(log_.error()) << "Failed to download initial ledger." + << " Sequence = " << sequence << " source = " << source->toString(); else response = std::move(data); @@ -122,15 +122,15 @@ LoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObje response = std::move(data); if (status.ok() && response.validated()) { - log.info() << "Successfully fetched ledger = " << ledgerSequence - << " from source = " << source->toString(); + LOG(log.info()) << "Successfully fetched ledger = " << ledgerSequence + << " from source = " << source->toString(); return true; } else { - log.warn() << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString() - << ", error_code: " << status.error_code() << ", error_msg: " << status.error_message() - << ", source = " << source->toString(); + LOG(log.warn()) << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString() + << ", error_code: " << status.error_code() << ", error_msg: " << status.error_message() + << ", source = " << source->toString(); return false; } }, @@ -201,8 +201,8 @@ LoadBalancer::execute(Func f, uint32_t ledgerSequence) { auto& source = sources_[sourceIdx]; - log_.debug() << "Attempting to execute func. ledger sequence = " << ledgerSequence - << " - source = " << source->toString(); + LOG(log_.debug()) << "Attempting to execute func. ledger sequence = " << ledgerSequence + << " - source = " << source->toString(); // Originally, it was (source->hasLedger(ledgerSequence) || true) /* Sometimes rippled has ledger but doesn't actually know. However, but this does NOT happen in the normal case and is safe to remove @@ -212,27 +212,28 @@ LoadBalancer::execute(Func f, uint32_t ledgerSequence) bool res = f(source); if (res) { - log_.debug() << "Successfully executed func at source = " << source->toString() - << " - ledger sequence = " << ledgerSequence; + LOG(log_.debug()) << "Successfully executed func at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; break; } else { - log_.warn() << "Failed to execute func at source = " << source->toString() - << " - ledger sequence = " << ledgerSequence; + LOG(log_.warn()) << "Failed to execute func at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; } } else { - log_.warn() << "Ledger not present at source = " << source->toString() - << " - ledger sequence = " << ledgerSequence; + LOG(log_.warn()) << "Ledger not present at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; } sourceIdx = (sourceIdx + 1) % sources_.size(); numAttempts++; if (numAttempts % sources_.size() == 0) { - log_.info() << "Ledger sequence " << ledgerSequence << " is not yet available from any configured sources. " - << "Sleeping and trying again"; + LOG(log_.info()) << "Ledger sequence " << ledgerSequence + << " is not yet available from any configured sources. " + << "Sleeping and trying again"; std::this_thread::sleep_for(std::chrono::seconds(2)); } } diff --git a/src/etl/ProbingSource.cpp b/src/etl/ProbingSource.cpp index ccb878c2..923f241b 100644 --- a/src/etl/ProbingSource.cpp +++ b/src/etl/ProbingSource.cpp @@ -155,7 +155,7 @@ ProbingSource::make_SSLHooks() noexcept { plainSrc_->pause(); currentSrc_ = sslSrc_; - log_.info() << "Selected WSS as the main source: " << currentSrc_->toString(); + LOG(log_.info()) << "Selected WSS as the main source: " << currentSrc_->toString(); } return SourceHooks::Action::PROCEED; }, @@ -184,7 +184,7 @@ ProbingSource::make_PlainHooks() noexcept { sslSrc_->pause(); currentSrc_ = plainSrc_; - log_.info() << "Selected Plain WS as the main source: " << currentSrc_->toString(); + LOG(log_.info()) << "Selected Plain WS as the main source: " << currentSrc_->toString(); } return SourceHooks::Action::PROCEED; }, @@ -199,4 +199,4 @@ ProbingSource::make_PlainHooks() noexcept return SourceHooks::Action::STOP; }}; }; -} // namespace etl \ No newline at end of file +} // namespace etl diff --git a/src/etl/Source.cpp b/src/etl/Source.cpp index 77fab7f1..08531dfd 100644 --- a/src/etl/Source.cpp +++ b/src/etl/Source.cpp @@ -59,8 +59,8 @@ PlainSource::close(bool startAgain) derived().ws().async_close(boost::beast::websocket::close_code::normal, [this, startAgain](auto ec) { if (ec) { - log_.error() << " async_close : " - << "error code = " << ec << " - " << toString(); + LOG(log_.error()) << " async_close : " + << "error code = " << ec << " - " << toString(); } closing_ = false; if (startAgain) @@ -94,8 +94,8 @@ SslSource::close(bool startAgain) derived().ws().async_close(boost::beast::websocket::close_code::normal, [this, startAgain](auto ec) { if (ec) { - log_.error() << " async_close : " - << "error code = " << ec << " - " << toString(); + LOG(log_.error()) << " async_close : " + << "error code = " << ec << " - " << toString(); } closing_ = false; if (startAgain) @@ -193,4 +193,4 @@ SslSource::onSslHandshake( ws().async_handshake(host, "/", [this](auto ec) { onHandshake(ec); }); } } -} // namespace etl \ No newline at end of file +} // namespace etl diff --git a/src/etl/Source.h b/src/etl/Source.h index 70d607ac..823248cf 100644 --- a/src/etl/Source.h +++ b/src/etl/Source.h @@ -273,11 +273,11 @@ public: chArgs.SetMaxReceiveMessageSize(-1); stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub( grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), chArgs)); - log_.debug() << "Made stub for remote = " << toString(); + LOG(log_.debug()) << "Made stub for remote = " << toString(); } catch (std::exception const& e) { - log_.debug() << "Exception while creating stub = " << e.what() << " . Remote = " << toString(); + LOG(log_.debug()) << "Exception while creating stub = " << e.what() << " . Remote = " << toString(); } } } @@ -305,13 +305,13 @@ public: std::string const& clientIp, boost::asio::yield_context yield) const override { - log_.trace() << "Attempting to forward request to tx. " - << "request = " << boost::json::serialize(request); + LOG(log_.trace()) << "Attempting to forward request to tx. " + << "request = " << boost::json::serialize(request); boost::json::object response; if (!isConnected()) { - log_.error() << "Attempted to proxy but failed to connect to tx"; + LOG(log_.error()) << "Attempted to proxy but failed to connect to tx"; return {}; } @@ -365,7 +365,7 @@ public: if (!parsed.is_object()) { - log_.error() << "Error parsing response: " << std::string{begin, end}; + LOG(log_.error()) << "Error parsing response: " << std::string{begin, end}; return {}; } @@ -376,7 +376,7 @@ public: } catch (std::exception const& e) { - log_.error() << "Encountered exception : " << e.what(); + LOG(log_.error()) << "Encountered exception : " << e.what(); return {}; } } @@ -481,7 +481,7 @@ public: calls.emplace_back(sequence, markers[i], nextMarker); } - log_.debug() << "Starting data download for ledger " << sequence << ". Using source = " << toString(); + LOG(log_.debug()) << "Starting data download for ledger " << sequence << ". Using source = " << toString(); for (auto& c : calls) c.call(stub_, cq); @@ -499,19 +499,19 @@ public: if (!ok) { - log_.error() << "loadInitialLedger - ok is false"; + LOG(log_.error()) << "loadInitialLedger - ok is false"; return {{}, false}; // handle cancelled } else { - log_.trace() << "Marker prefix = " << ptr->getMarkerPrefix(); + LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix(); auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly); if (result != etl::detail::AsyncCallData::CallStatus::MORE) { ++numFinished; - log_.debug() << "Finished a marker. " - << "Current number of finished = " << numFinished; + LOG(log_.debug()) << "Finished a marker. " + << "Current number of finished = " << numFinished; std::string lastKey = ptr->getLastKey(); @@ -524,13 +524,13 @@ public: if (backend_->cache().size() > progress) { - log_.info() << "Downloaded " << backend_->cache().size() << " records from rippled"; + LOG(log_.info()) << "Downloaded " << backend_->cache().size() << " records from rippled"; progress += incr; } } } - log_.info() << "Finished loadInitialLedger. cache size = " << backend_->cache().size(); + LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size(); return {std::move(edgeKeys), !abort}; } @@ -540,7 +540,7 @@ public: { if (auto resp = forwardCache_.get(request); resp) { - log_.debug() << "request hit forwardCache"; + LOG(log_.debug()) << "request hit forwardCache"; return resp; } @@ -607,7 +607,7 @@ public: {"streams", {"ledger", "manifests", "validations", "transactions_proposed"}}, }; std::string s = boost::json::serialize(jv); - log_.trace() << "Sending subscribe stream message"; + LOG(log_.trace()) << "Sending subscribe stream message"; derived().ws().set_option( boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::request_type& req) { @@ -689,13 +689,13 @@ public: setValidatedRange({validatedLedgers.data(), validatedLedgers.size()}); } - log_.info() << "Received a message on ledger " - << " subscription stream. Message : " << response << " - " << toString(); + LOG(log_.info()) << "Received a message on ledger " + << " subscription stream. Message : " << response << " - " << toString(); } else if (response.contains("type") && response.at("type") == "ledgerClosed") { - log_.info() << "Received a message on ledger " - << " subscription stream. Message : " << response << " - " << toString(); + LOG(log_.info()) << "Received a message on ledger " + << " subscription stream. Message : " << response << " - " << toString(); if (response.contains("ledger_index")) { ledgerIndex = response.at("ledger_index").as_int64(); @@ -728,7 +728,7 @@ public: if (ledgerIndex != 0) { - log_.trace() << "Pushing ledger sequence = " << ledgerIndex << " - " << toString(); + LOG(log_.trace()) << "Pushing ledger sequence = " << ledgerIndex << " - " << toString(); networkValidatedLedgers_->push(ledgerIndex); } @@ -736,7 +736,7 @@ public: } catch (std::exception const& e) { - log_.error() << "Exception in handleMessage : " << e.what(); + LOG(log_.error()) << "Exception in handleMessage : " << e.what(); return false; } } @@ -780,16 +780,16 @@ protected: ::ERR_error_string_n(ec.value(), buf, sizeof(buf)); err += buf; - log_.error() << err; + LOG(log_.error()) << err; } if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused) { - log_.error() << "error code = " << ec << " - " << toString(); + LOG(log_.error()) << "error code = " << ec << " - " << toString(); } else { - log_.warn() << "error code = " << ec << " - " << toString(); + LOG(log_.warn()) << "error code = " << ec << " - " << toString(); } // exponentially increasing timeouts, with a max of 30 seconds diff --git a/src/etl/impl/AsyncData.h b/src/etl/impl/AsyncData.h index 70e0c995..7d915d1f 100644 --- a/src/etl/impl/AsyncData.h +++ b/src/etl/impl/AsyncData.h @@ -57,9 +57,9 @@ public: unsigned char prefix = marker.data()[0]; - log_.debug() << "Setting up AsyncCallData. marker = " << ripple::strHex(marker) - << " . prefix = " << ripple::strHex(std::string(1, prefix)) - << " . nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_)); + LOG(log_.debug()) << "Setting up AsyncCallData. marker = " << ripple::strHex(marker) + << " . prefix = " << ripple::strHex(std::string(1, prefix)) + << " . nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_)); assert(nextPrefix_ > prefix || nextPrefix_ == 0x00); @@ -78,23 +78,23 @@ public: bool abort, bool cacheOnly = false) { - log_.trace() << "Processing response. " - << "Marker prefix = " << getMarkerPrefix(); + LOG(log_.trace()) << "Processing response. " + << "Marker prefix = " << getMarkerPrefix(); if (abort) { - log_.error() << "AsyncCallData aborted"; + LOG(log_.error()) << "AsyncCallData aborted"; return CallStatus::ERRORED; } if (!status_.ok()) { - log_.error() << "AsyncCallData status_ not ok: " - << " code = " << status_.error_code() << " message = " << status_.error_message(); + LOG(log_.error()) << "AsyncCallData status_ not ok: " + << " code = " << status_.error_code() << " message = " << status_.error_message(); return CallStatus::ERRORED; } if (!next_->is_unlimited()) { - log_.warn() << "AsyncCallData is_unlimited is false. Make sure " - "secure_gateway is set correctly at the ETL source"; + LOG(log_.warn()) << "AsyncCallData is_unlimited is false. Make sure " + "secure_gateway is set correctly at the ETL source"; } std::swap(cur_, next_); @@ -118,7 +118,7 @@ public: } auto const numObjects = cur_->ledger_objects().objects_size(); - log_.debug() << "Writing " << numObjects << " objects"; + LOG(log_.debug()) << "Writing " << numObjects << " objects"; std::vector cacheUpdates; cacheUpdates.reserve(numObjects); @@ -145,7 +145,7 @@ public: } } backend.cache().update(cacheUpdates, request_.ledger().sequence(), cacheOnly); - log_.debug() << "Wrote " << numObjects << " objects. Got more: " << (more ? "YES" : "NO"); + LOG(log_.debug()) << "Wrote " << numObjects << " objects. Got more: " << (more ? "YES" : "NO"); return more ? CallStatus::MORE : CallStatus::DONE; } diff --git a/src/etl/impl/CacheLoader.h b/src/etl/impl/CacheLoader.h index 4b4b3363..943b9ca1 100644 --- a/src/etl/impl/CacheLoader.h +++ b/src/etl/impl/CacheLoader.h @@ -132,7 +132,7 @@ public: if (cacheLoadStyle_ == LoadStyle::NOT_AT_ALL) { cache_.get().setDisabled(); - log_.warn() << "Cache is disabled. Not loading"; + LOG(log_.warn()) << "Cache is disabled. Not loading"; return; } @@ -165,10 +165,10 @@ public: // If loading synchronously, poll cache until full while (cacheLoadStyle_ == LoadStyle::SYNC && not cache_.get().isFull()) { - log_.debug() << "Cache not full. Cache size = " << cache_.get().size() << ". Sleeping ..."; + LOG(log_.debug()) << "Cache not full. Cache size = " << cache_.get().size() << ". Sleeping ..."; std::this_thread::sleep_for(std::chrono::seconds(10)); if (cache_.get().isFull()) - log_.info() << "Cache is full. Cache size = " << cache_.get().size(); + LOG(log_.info()) << "Cache is full. Cache size = " << cache_.get().size(); } } @@ -186,7 +186,7 @@ private: std::string const& port, boost::asio::yield_context yield) { - log_.info() << "Loading cache from peer. ip = " << ip << " . port = " << port; + LOG(log_.info()) << "Loading cache from peer. ip = " << ip << " . port = " << port; namespace beast = boost::beast; // from namespace http = beast::http; // from namespace websocket = beast::websocket; // from @@ -198,7 +198,7 @@ private: // These objects perform our I/O tcp::resolver resolver{ioContext_.get()}; - log_.trace() << "Creating websocket"; + LOG(log_.trace()) << "Creating websocket"; auto ws = std::make_unique>(ioContext_.get()); // Look up the domain name @@ -206,13 +206,13 @@ private: if (ec) return {}; - log_.trace() << "Connecting websocket"; + LOG(log_.trace()) << "Connecting websocket"; // Make the connection on the IP address we get from a lookup ws->next_layer().async_connect(results, yield[ec]); if (ec) return false; - log_.trace() << "Performing websocket handshake"; + LOG(log_.trace()) << "Performing websocket handshake"; // Perform the websocket handshake ws->async_handshake(ip, "/", yield[ec]); if (ec) @@ -220,7 +220,7 @@ private: std::optional marker; - log_.trace() << "Sending request"; + LOG(log_.trace()) << "Sending request"; auto getRequest = [&](auto marker) { boost::json::object request = { {"command", "ledger_data"}, @@ -242,7 +242,7 @@ private: ws->async_write(net::buffer(boost::json::serialize(getRequest(marker))), yield[ec]); if (ec) { - log_.error() << "error writing = " << ec.message(); + LOG(log_.error()) << "error writing = " << ec.message(); return false; } @@ -250,7 +250,7 @@ private: ws->async_read(buffer, yield[ec]); if (ec) { - log_.error() << "error reading = " << ec.message(); + LOG(log_.error()) << "error reading = " << ec.message(); return false; } @@ -259,27 +259,28 @@ private: if (!parsed.is_object()) { - log_.error() << "Error parsing response: " << raw; + LOG(log_.error()) << "Error parsing response: " << raw; return false; } - log_.trace() << "Successfully parsed response " << parsed; + LOG(log_.trace()) << "Successfully parsed response " << parsed; if (auto const& response = parsed.as_object(); response.contains("error")) { - log_.error() << "Response contains error: " << response; + LOG(log_.error()) << "Response contains error: " << response; auto const& err = response.at("error"); if (err.is_string() && err.as_string() == "lgrNotFound") { ++numAttempts; if (numAttempts >= 5) { - log_.error() << " ledger not found at peer after 5 attempts. " - "peer = " - << ip << " ledger = " << ledgerIndex - << ". Check your config and the health of the peer"; + LOG(log_.error()) << " ledger not found at peer after 5 attempts. " + "peer = " + << ip << " ledger = " << ledgerIndex + << ". Check your config and the health of the peer"; return false; } - log_.warn() << "Ledger not found. ledger = " << ledgerIndex << ". Sleeping and trying again"; + LOG(log_.warn()) << "Ledger not found. ledger = " << ledgerIndex + << ". Sleeping and trying again"; std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } @@ -290,7 +291,7 @@ private: if (!response.contains("cache_full") || !response.at("cache_full").as_bool()) { - log_.error() << "cache not full for clio node. ip = " << ip; + LOG(log_.error()) << "cache not full for clio node. ip = " << ip; return false; } if (response.contains("marker")) @@ -310,7 +311,7 @@ private: if (!stateObject.key.parseHex(obj.at("index").as_string().c_str())) { - log_.error() << "failed to parse object id"; + LOG(log_.error()) << "failed to parse object id"; return false; } boost::algorithm::unhex(obj.at("data").as_string().c_str(), std::back_inserter(stateObject.blob)); @@ -319,17 +320,17 @@ private: cache_.get().update(objects, ledgerIndex, true); if (marker) - log_.debug() << "At marker " << *marker; + LOG(log_.debug()) << "At marker " << *marker; } while (marker || !started); - log_.info() << "Finished downloading ledger from clio node. ip = " << ip; + LOG(log_.info()) << "Finished downloading ledger from clio node. ip = " << ip; cache_.get().setFull(); return true; } catch (std::exception const& e) { - log_.error() << "Encountered exception : " << e.what() << " - ip = " << ip; + LOG(log_.error()) << "Encountered exception : " << e.what() << " - ip = " << ip; return false; } } @@ -366,8 +367,8 @@ private: if (c) cursorStr << ripple::strHex(*c) << ", "; - log_.info() << "Loading cache. num cursors = " << cursors.size() - 1; - log_.trace() << "cursors = " << cursorStr.str(); + LOG(log_.info()) << "Loading cache. num cursors = " << cursors.size() - 1; + LOG(log_.trace()) << "cursors = " << cursorStr.str(); thread_ = std::thread{[this, seq, cursors]() { auto startTime = std::chrono::system_clock::now(); @@ -388,7 +389,7 @@ private: std::optional cursor = start; std::string cursorStr = cursor.has_value() ? ripple::strHex(cursor.value()) : ripple::strHex(data::firstKey); - log_.debug() << "Starting a cursor: " << cursorStr << " markers = " << *markers; + LOG(log_.debug()) << "Starting a cursor: " << cursorStr << " markers = " << *markers; while (not stopping_) { @@ -401,9 +402,9 @@ private: if (!res.cursor || (end && *(res.cursor) > *end)) break; - log_.trace() << "Loading cache. cache size = " << cache_.get().size() - << " - cursor = " << ripple::strHex(res.cursor.value()) - << " start = " << cursorStr << " markers = " << *markers; + LOG(log_.trace()) << "Loading cache. cache size = " << cache_.get().size() + << " - cursor = " << ripple::strHex(res.cursor.value()) + << " start = " << cursorStr << " markers = " << *markers; cursor = std::move(res.cursor); } @@ -416,14 +417,14 @@ private: auto endTime = std::chrono::system_clock::now(); auto duration = std::chrono::duration_cast(endTime - startTime); - log_.info() << "Finished loading cache. cache size = " << cache_.get().size() << ". Took " - << duration.count() << " seconds"; + LOG(log_.info()) << "Finished loading cache. cache size = " << cache_.get().size() + << ". Took " << duration.count() << " seconds"; cache_.get().setFull(); } else { - log_.info() << "Finished a cursor. num remaining = " << *numRemaining - << " start = " << cursorStr << " markers = " << *markers; + LOG(log_.info()) << "Finished a cursor. num remaining = " << *numRemaining + << " start = " << cursorStr << " markers = " << *markers; } }); } diff --git a/src/etl/impl/ExtractionDataPipe.h b/src/etl/impl/ExtractionDataPipe.h index 65060579..da1cd13b 100644 --- a/src/etl/impl/ExtractionDataPipe.h +++ b/src/etl/impl/ExtractionDataPipe.h @@ -126,7 +126,7 @@ private: std::shared_ptr getQueue(uint32_t sequence) { - log_.debug() << "Grabbing extraction queue for " << sequence << "; start was " << startSequence_; + LOG(log_.debug()) << "Grabbing extraction queue for " << sequence << "; start was " << startSequence_; return queues_[(sequence - startSequence_) % stride_]; } }; diff --git a/src/etl/impl/Extractor.h b/src/etl/impl/Extractor.h index e3b4a9e4..8879af51 100644 --- a/src/etl/impl/Extractor.h +++ b/src/etl/impl/Extractor.h @@ -103,9 +103,9 @@ private: // TODO: extract this part into a strategy perhaps auto const tps = fetchResponse->transactions_list().transactions_size() / time; - log_.info() << "Extract phase time = " << time << "; Extract phase tps = " << tps - << "; Avg extract time = " << totalTime / (currentSequence - startSequence_ + 1) - << "; seq = " << currentSequence; + LOG(log_.info()) << "Extract phase time = " << time << "; Extract phase tps = " << tps + << "; Avg extract time = " << totalTime / (currentSequence - startSequence_ + 1) + << "; seq = " << currentSequence; pipe_.get().push(currentSequence, std::move(fetchResponse)); currentSequence += pipe_.get().getStride(); diff --git a/src/etl/impl/ForwardCache.cpp b/src/etl/impl/ForwardCache.cpp index cc67fc19..217476ae 100644 --- a/src/etl/impl/ForwardCache.cpp +++ b/src/etl/impl/ForwardCache.cpp @@ -29,7 +29,7 @@ namespace etl::detail { void ForwardCache::freshen() { - log_.trace() << "Freshening ForwardCache"; + LOG(log_.trace()) << "Freshening ForwardCache"; auto numOutstanding = std::make_shared(latestForwarded_.size()); diff --git a/src/etl/impl/LedgerFetcher.h b/src/etl/impl/LedgerFetcher.h index 0b0524ea..ebcf6680 100644 --- a/src/etl/impl/LedgerFetcher.h +++ b/src/etl/impl/LedgerFetcher.h @@ -66,11 +66,11 @@ public: OptionalGetLedgerResponseType fetchData(uint32_t sequence) { - log_.debug() << "Attempting to fetch ledger with sequence = " << sequence; + LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence; auto response = loadBalancer_->fetchLedger(sequence, false, false); if (response) - log_.trace() << "GetLedger reply = " << response->DebugString(); + LOG(log_.trace()) << "GetLedger reply = " << response->DebugString(); return response; } @@ -87,12 +87,12 @@ public: OptionalGetLedgerResponseType fetchDataAndDiff(uint32_t sequence) { - log_.debug() << "Attempting to fetch ledger with sequence = " << sequence; + LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence; auto response = loadBalancer_->fetchLedger( sequence, true, !backend_->cache().isFull() || backend_->cache().latestLedgerSequence() >= sequence); if (response) - log_.trace() << "GetLedger reply = " << response->DebugString(); + LOG(log_.trace()) << "GetLedger reply = " << response->DebugString(); return response; } diff --git a/src/etl/impl/LedgerLoader.h b/src/etl/impl/LedgerLoader.h index ddbaa4b7..5c79e361 100644 --- a/src/etl/impl/LedgerLoader.h +++ b/src/etl/impl/LedgerLoader.h @@ -98,7 +98,7 @@ public: ripple::SerialIter it{raw->data(), raw->size()}; ripple::STTx sttx{it}; - log_.trace() << "Inserting transaction = " << sttx.getTransactionID(); + LOG(log_.trace()) << "Inserting transaction = " << sttx.getTransactionID(); ripple::TxMeta txMeta{sttx.getTransactionID(), ledger.seq, txn.metadata_blob()}; @@ -149,7 +149,7 @@ public: auto rng = backend_->hardFetchLedgerRangeNoThrow(); if (rng) { - log_.fatal() << "Database is not empty"; + LOG(log_.fatal()) << "Database is not empty"; assert(false); return {}; } @@ -162,18 +162,18 @@ public: ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(ledgerData->ledger_header())); - log_.debug() << "Deserialized ledger header. " << ::util::toString(lgrInfo); + LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo); auto timeDiff = ::util::timed>([this, sequence, &lgrInfo, &ledgerData]() { backend_->startWrites(); - log_.debug() << "Started writes"; + LOG(log_.debug()) << "Started writes"; backend_->writeLedger(lgrInfo, std::move(*ledgerData->mutable_ledger_header())); - log_.debug() << "Wrote ledger"; + LOG(log_.debug()) << "Wrote ledger"; FormattedTransactionsData insertTxResult = insertTransactions(lgrInfo, *ledgerData); - log_.debug() << "Inserted txns"; + LOG(log_.debug()) << "Inserted txns"; // download the full account state map. This function downloads full // ledger data and pushes the downloaded data into the writeQueue. @@ -191,7 +191,7 @@ public: ::util::timed([this, edgeKeys = &edgeKeys, sequence, &numWrites]() { for (auto& key : *edgeKeys) { - log_.debug() << "Writing edge key = " << ripple::strHex(key); + LOG(log_.debug()) << "Writing edge key = " << ripple::strHex(key); auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence); if (succ) @@ -215,8 +215,8 @@ public: assert(succ); if (succ->key == cur->key) { - log_.debug() << "Writing book successor = " << ripple::strHex(base) << " - " - << ripple::strHex(cur->key); + LOG(log_.debug()) << "Writing book successor = " << ripple::strHex(base) + << " - " << ripple::strHex(cur->key); backend_->writeSuccessor( uint256ToString(base), sequence, uint256ToString(cur->key)); @@ -228,18 +228,18 @@ public: prev = std::move(cur->key); if (numWrites % 100000 == 0 && numWrites != 0) - log_.info() << "Wrote " << numWrites << " book successors"; + LOG(log_.info()) << "Wrote " << numWrites << " book successors"; } backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::lastKey)); ++numWrites; }); - log_.info() << "Looping through cache and submitting all writes took " << seconds - << " seconds. numWrites = " << std::to_string(numWrites); + LOG(log_.info()) << "Looping through cache and submitting all writes took " << seconds + << " seconds. numWrites = " << std::to_string(numWrites); } - log_.debug() << "Loaded initial ledger"; + LOG(log_.debug()) << "Loaded initial ledger"; if (not state_.get().isStopping) { @@ -251,7 +251,7 @@ public: backend_->finishWrites(sequence); }); - log_.debug() << "Time to download and store ledger = " << timeDiff; + LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff; return lgrInfo; } }; diff --git a/src/etl/impl/LedgerPublisher.h b/src/etl/impl/LedgerPublisher.h index 221efab3..59cc1f7b 100644 --- a/src/etl/impl/LedgerPublisher.h +++ b/src/etl/impl/LedgerPublisher.h @@ -89,7 +89,7 @@ public: bool publish(uint32_t ledgerSequence, std::optional maxAttempts) { - log_.info() << "Attempting to publish ledger = " << ledgerSequence; + LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence; size_t numAttempts = 0; while (not state_.get().isStopping) { @@ -97,14 +97,14 @@ public: if (!range || range->maxSequence < ledgerSequence) { - log_.debug() << "Trying to publish. Could not find " - "ledger with sequence = " - << ledgerSequence; + LOG(log_.debug()) << "Trying to publish. Could not find " + "ledger with sequence = " + << ledgerSequence; // We try maxAttempts times to publish the ledger, waiting one second in between each attempt. if (maxAttempts && numAttempts >= maxAttempts) { - log_.debug() << "Failed to publish ledger after " << numAttempts << " attempts."; + LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts."; return false; } std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -136,11 +136,11 @@ public: publish(ripple::LedgerHeader const& lgrInfo) { boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { - log_.info() << "Publishing ledger " << std::to_string(lgrInfo.seq); + LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq); if (!state_.get().isWriting) { - log_.info() << "Updating cache"; + LOG(log_.info()) << "Updating cache"; std::vector diff = data::synchronousAndRetryOnTimeout( [&](auto yield) { return backend_->fetchLedgerDiff(lgrInfo.seq, yield); }); @@ -177,10 +177,10 @@ public: subscriptions_->pubBookChanges(lgrInfo, transactions); setLastPublishTime(); - log_.info() << "Published ledger " << std::to_string(lgrInfo.seq); + LOG(log_.info()) << "Published ledger " << std::to_string(lgrInfo.seq); } else - log_.info() << "Skipping publishing ledger " << std::to_string(lgrInfo.seq); + LOG(log_.info()) << "Skipping publishing ledger " << std::to_string(lgrInfo.seq); }); // we track latest publish-requested seq, not necessarily already published diff --git a/src/etl/impl/Transformer.h b/src/etl/impl/Transformer.h index f11cde80..9b7f795e 100644 --- a/src/etl/impl/Transformer.h +++ b/src/etl/impl/Transformer.h @@ -137,18 +137,18 @@ private: auto const end = std::chrono::system_clock::now(); auto const duration = ((end - start).count()) / 1000000000.0; - log_.info() << "Load phase of etl : " - << "Successfully wrote ledger! Ledger info: " << util::toString(lgrInfo) - << ". txn count = " << numTxns << ". object count = " << numObjects - << ". load time = " << duration << ". load txns per second = " << numTxns / duration - << ". load objs per second = " << numObjects / duration; + LOG(log_.info()) << "Load phase of etl : " + << "Successfully wrote ledger! Ledger info: " << util::toString(lgrInfo) + << ". txn count = " << numTxns << ". object count = " << numObjects + << ". load time = " << duration << ". load txns per second = " << numTxns / duration + << ". load objs per second = " << numObjects / duration; // success is false if the ledger was already written publisher_.get().publish(lgrInfo); } else { - log_.error() << "Error writing ledger. " << util::toString(lgrInfo); + LOG(log_.error()) << "Error writing ledger. " << util::toString(lgrInfo); } setWriteConflict(not success); @@ -165,10 +165,10 @@ private: std::pair buildNextLedger(GetLedgerResponseType& rawData) { - log_.debug() << "Beginning ledger update"; + LOG(log_.debug()) << "Beginning ledger update"; ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(rawData.ledger_header())); - log_.debug() << "Deserialized ledger header. " << ::util::toString(lgrInfo); + LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo); backend_->startWrites(); backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header())); @@ -178,8 +178,8 @@ private: { updateCache(lgrInfo, rawData); - log_.debug() << "Inserted/modified/deleted all objects. Number of objects = " - << rawData.ledger_objects().objects_size(); + LOG(log_.debug()) << "Inserted/modified/deleted all objects. Number of objects = " + << rawData.ledger_objects().objects_size(); insertTxResultOp.emplace(loader_.get().insertTransactions(lgrInfo, rawData)); } @@ -193,8 +193,8 @@ private: return {ripple::LedgerHeader{}, false}; } - log_.debug() << "Inserted all transactions. Number of transactions = " - << rawData.transactions_list().transactions_size(); + LOG(log_.debug()) << "Inserted all transactions. Number of transactions = " + << rawData.transactions_list().transactions_size(); backend_->writeAccountTransactions(std::move(insertTxResultOp->accountTxData)); backend_->writeNFTs(std::move(insertTxResultOp->nfTokensData)); @@ -203,8 +203,8 @@ private: auto [success, duration] = ::util::timed>([&]() { return backend_->finishWrites(lgrInfo.seq); }); - log_.debug() << "Finished writes. Total time: " << std::to_string(duration); - log_.debug() << "Finished ledger update: " << ::util::toString(lgrInfo); + LOG(log_.debug()) << "Finished writes. Total time: " << std::to_string(duration); + LOG(log_.debug()) << "Finished ledger update: " << ::util::toString(lgrInfo); return {lgrInfo, success}; } @@ -231,11 +231,11 @@ private: assert(key); cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}}); - log_.debug() << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type(); + LOG(log_.debug()) << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type(); if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included()) { - log_.debug() << "object neighbors not included. using cache"; + LOG(log_.debug()) << "object neighbors not included. using cache"; if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1) throw std::runtime_error("Cache is not full, but object neighbors were not included"); @@ -257,7 +257,7 @@ private: if (checkBookBase) { - log_.debug() << "Is book dir. Key = " << ripple::strHex(*key); + LOG(log_.debug()) << "Is book dir. Key = " << ripple::strHex(*key); auto const bookBase = getBookBase(*key); auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1); @@ -266,9 +266,10 @@ private: // We deleted the first directory, or we added a directory prior to the old first directory if ((isDeleted && key == oldFirstDir->key) || (!isDeleted && key < oldFirstDir->key)) { - log_.debug() << "Need to recalculate book base successor. base = " << ripple::strHex(bookBase) - << " - key = " << ripple::strHex(*key) << " - isDeleted = " << isDeleted - << " - seq = " << lgrInfo.seq; + LOG(log_.debug()) + << "Need to recalculate book base successor. base = " << ripple::strHex(bookBase) + << " - key = " << ripple::strHex(*key) << " - isDeleted = " << isDeleted + << " - seq = " << lgrInfo.seq; bookSuccessorsToCalculate.insert(bookBase); } } @@ -285,7 +286,7 @@ private: // rippled didn't send successor information, so use our cache if (!rawData.object_neighbors_included()) { - log_.debug() << "object neighbors not included. using cache"; + LOG(log_.debug()) << "object neighbors not included. using cache"; if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq) throw std::runtime_error("Cache is not full, but object neighbors were not included"); @@ -304,8 +305,8 @@ private: if (obj.blob.size() == 0) { - log_.debug() << "writing successor for deleted object " << ripple::strHex(obj.key) << " - " - << ripple::strHex(lb->key) << " - " << ripple::strHex(ub->key); + LOG(log_.debug()) << "writing successor for deleted object " << ripple::strHex(obj.key) << " - " + << ripple::strHex(lb->key) << " - " << ripple::strHex(ub->key); backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(ub->key)); } @@ -314,8 +315,8 @@ private: backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(obj.key)); backend_->writeSuccessor(uint256ToString(obj.key), lgrInfo.seq, uint256ToString(ub->key)); - log_.debug() << "writing successor for new object " << ripple::strHex(lb->key) << " - " - << ripple::strHex(obj.key) << " - " << ripple::strHex(ub->key); + LOG(log_.debug()) << "writing successor for new object " << ripple::strHex(lb->key) << " - " + << ripple::strHex(obj.key) << " - " << ripple::strHex(ub->key); } } @@ -326,15 +327,15 @@ private: { backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(succ->key)); - log_.debug() << "Updating book successor " << ripple::strHex(base) << " - " - << ripple::strHex(succ->key); + LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - " + << ripple::strHex(succ->key); } else { backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(data::lastKey)); - log_.debug() << "Updating book successor " << ripple::strHex(base) << " - " - << ripple::strHex(data::lastKey); + LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - " + << ripple::strHex(data::lastKey); } } } @@ -352,15 +353,15 @@ private: // Write successor info, if included from rippled if (rawData.object_neighbors_included()) { - log_.debug() << "object neighbors included"; + LOG(log_.debug()) << "object neighbors included"; for (auto& obj : *(rawData.mutable_book_successors())) { auto firstBook = std::move(*obj.mutable_first_book()); if (!firstBook.size()) firstBook = uint256ToString(data::lastKey); - log_.debug() << "writing book successor " << ripple::strHex(obj.book_base()) << " - " - << ripple::strHex(firstBook); + LOG(log_.debug()) << "writing book successor " << ripple::strHex(obj.book_base()) << " - " + << ripple::strHex(firstBook); backend_->writeSuccessor(std::move(*obj.mutable_book_base()), lgrInfo.seq, std::move(firstBook)); } @@ -378,22 +379,22 @@ private: if (obj.mod_type() == RawLedgerObjectType::DELETED) { - log_.debug() << "Modifying successors for deleted object " << ripple::strHex(obj.key()) << " - " - << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr); + LOG(log_.debug()) << "Modifying successors for deleted object " << ripple::strHex(obj.key()) + << " - " << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr); backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::move(*succPtr)); } else { - log_.debug() << "adding successor for new object " << ripple::strHex(obj.key()) << " - " - << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr); + LOG(log_.debug()) << "adding successor for new object " << ripple::strHex(obj.key()) << " - " + << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr); backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::string{obj.key()}); backend_->writeSuccessor(std::string{obj.key()}, lgrInfo.seq, std::move(*succPtr)); } } else - log_.debug() << "object modified " << ripple::strHex(obj.key()); + LOG(log_.debug()) << "object modified " << ripple::strHex(obj.key()); } } } diff --git a/src/feed/SubscriptionManager.h b/src/feed/SubscriptionManager.h index 76e37b0e..cc255597 100644 --- a/src/feed/SubscriptionManager.h +++ b/src/feed/SubscriptionManager.h @@ -320,7 +320,7 @@ public: // We will eventually want to clamp this to be the number of strands, // since adding more threads than we have strands won't see any // performance benefits - log_.info() << "Starting subscription manager with " << numThreads << " workers"; + LOG(log_.info()) << "Starting subscription manager with " << numThreads << " workers"; workers_.reserve(numThreads); for (auto i = numThreads; i > 0; --i) diff --git a/src/main/Main.cpp b/src/main/Main.cpp index dd55b4f1..667df246 100644 --- a/src/main/Main.cpp +++ b/src/main/Main.cpp @@ -164,15 +164,15 @@ try } LogService::init(config); - LogService::info() << "Clio version: " << Build::getClioFullVersionString(); + LOG(LogService::info()) << "Clio version: " << Build::getClioFullVersionString(); auto const threads = config.valueOr("io_threads", 2); if (threads <= 0) { - LogService::fatal() << "io_threads is less than 1"; + LOG(LogService::fatal()) << "io_threads is less than 1"; return EXIT_FAILURE; } - LogService::info() << "Number of io threads = " << threads; + LOG(LogService::info()) << "Number of io threads = " << threads; // IO context to handle all incoming requests, as well as other things. // This is not the only io context in the application. @@ -224,5 +224,5 @@ try } catch (std::exception const& e) { - LogService::fatal() << "Exit on exception: " << e.what(); + LOG(LogService::fatal()) << "Exit on exception: " << e.what(); } diff --git a/src/rpc/RPCEngine.h b/src/rpc/RPCEngine.h index 1c8a7f3e..2e7c96a1 100644 --- a/src/rpc/RPCEngine.h +++ b/src/rpc/RPCEngine.h @@ -129,7 +129,7 @@ public: if (backend_->isTooBusy()) { - log_.error() << "Database is too busy. Rejecting request"; + LOG(log_.error()) << "Database is too busy. Rejecting request"; notifyTooBusy(); // TODO: should we add ctx.method if we have it? return Status{RippledError::rpcTOO_BUSY}; } @@ -143,13 +143,13 @@ public: try { - perfLog_.debug() << ctx.tag() << " start executing rpc `" << ctx.method << '`'; + LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`'; auto const isAdmin = adminVerifier_.isAdmin(ctx.clientIp); auto const context = Context{ctx.yield, ctx.session, isAdmin, ctx.clientIp, ctx.apiVersion}; auto const v = (*method).process(ctx.params, context); - perfLog_.debug() << ctx.tag() << " finish executing rpc `" << ctx.method << '`'; + LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`'; if (v) return v->as_object(); @@ -161,14 +161,14 @@ public: } catch (data::DatabaseTimeout const& t) { - log_.error() << "Database timeout"; + LOG(log_.error()) << "Database timeout"; notifyTooBusy(); return Status{RippledError::rpcTOO_BUSY}; } catch (std::exception const& ex) { - log_.error() << ctx.tag() << "Caught exception: " << ex.what(); + LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what(); notifyInternalError(); return Status{RippledError::rpcINTERNAL}; diff --git a/src/rpc/RPCHelpers.cpp b/src/rpc/RPCHelpers.cpp index d03530ba..efb5749c 100644 --- a/src/rpc/RPCHelpers.cpp +++ b/src/rpc/RPCHelpers.cpp @@ -163,9 +163,9 @@ deserializeTxPlusMeta(data::TransactionAndMetadata const& blobs) std::stringstream meta; std::copy(blobs.transaction.begin(), blobs.transaction.end(), std::ostream_iterator(txn)); std::copy(blobs.metadata.begin(), blobs.metadata.end(), std::ostream_iterator(meta)); - gLog.error() << "Failed to deserialize transaction. txn = " << txn.str() << " - meta = " << meta.str() - << " txn length = " << std::to_string(blobs.transaction.size()) - << " meta length = " << std::to_string(blobs.metadata.size()); + LOG(gLog.error()) << "Failed to deserialize transaction. txn = " << txn.str() << " - meta = " << meta.str() + << " txn length = " << std::to_string(blobs.transaction.size()) + << " meta length = " << std::to_string(blobs.metadata.size()); throw e; } } @@ -654,12 +654,12 @@ traverseOwnedNodes( } auto end = std::chrono::system_clock::now(); - gLog.debug() << "Time loading owned directories: " - << std::chrono::duration_cast(end - start).count() << " milliseconds"; + LOG(gLog.debug()) << "Time loading owned directories: " + << std::chrono::duration_cast(end - start).count() << " milliseconds"; auto [objects, timeDiff] = util::timed([&]() { return backend.fetchLedgerObjects(keys, sequence, yield); }); - gLog.debug() << "Time loading owned entries: " << timeDiff << " milliseconds"; + LOG(gLog.debug()) << "Time loading owned entries: " << timeDiff << " milliseconds"; for (auto i = 0u; i < objects.size(); ++i) { @@ -1133,7 +1133,7 @@ postProcessOrderBook( } catch (std::exception const& e) { - gLog.error() << "caught exception: " << e.what(); + LOG(gLog.error()) << "caught exception: " << e.what(); } } return jsonOffers; diff --git a/src/rpc/RPCHelpers.h b/src/rpc/RPCHelpers.h index 1b94b7f6..26e9f0f1 100644 --- a/src/rpc/RPCHelpers.h +++ b/src/rpc/RPCHelpers.h @@ -240,11 +240,11 @@ logDuration(web::Context const& ctx, T const& dur) serialize(util::removeSecret(ctx.params))); if (seconds > 10) - log.error() << ctx.tag() << msg; + LOG(log.error()) << ctx.tag() << msg; else if (seconds > 1) - log.warn() << ctx.tag() << msg; + LOG(log.warn()) << ctx.tag() << msg; else - log.info() << ctx.tag() << msg; + LOG(log.info()) << ctx.tag() << msg; } } // namespace rpc diff --git a/src/rpc/WorkQueue.h b/src/rpc/WorkQueue.h index 7cfe4e7b..a0387167 100644 --- a/src/rpc/WorkQueue.h +++ b/src/rpc/WorkQueue.h @@ -72,7 +72,7 @@ public: auto const numThreads = config.valueOr("workers", std::thread::hardware_concurrency()); auto const maxQueueSize = serverConfig.valueOr("max_queue_size", 0); // 0 is no limit - log.info() << "Number of workers = " << numThreads << ". Max queue size = " << maxQueueSize; + LOG(log.info()) << "Number of workers = " << numThreads << ". Max queue size = " << maxQueueSize; return WorkQueue{numThreads, maxQueueSize}; } @@ -92,7 +92,8 @@ public: { if (curSize_ >= maxSize_ && !isWhiteListed) { - log_.warn() << "Queue is full. rejecting job. current size = " << curSize_ << "; max size = " << maxSize_; + LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << curSize_ + << "; max size = " << maxSize_; return false; } @@ -108,7 +109,7 @@ public: ++queued_; durationUs_ += wait; - log_.info() << "WorkQueue wait time = " << wait << " queue size = " << curSize_; + LOG(log_.info()) << "WorkQueue wait time = " << wait << " queue size = " << curSize_; func(yield); --curSize_; diff --git a/src/rpc/common/impl/APIVersionParser.h b/src/rpc/common/impl/APIVersionParser.h index e788a9e4..17364bea 100644 --- a/src/rpc/common/impl/APIVersionParser.h +++ b/src/rpc/common/impl/APIVersionParser.h @@ -51,9 +51,9 @@ public: auto checkRange = [this](uint32_t version, std::string label) { if (std::clamp(version, API_VERSION_MIN, API_VERSION_MAX) != version) { - log_.error() << "API version settings issue detected: " << label << " version with value " << version - << " is outside of supported range " << API_VERSION_MIN << "-" << API_VERSION_MAX - << "; Falling back to hardcoded values."; + LOG(log_.error()) << "API version settings issue detected: " << label << " version with value " + << version << " is outside of supported range " << API_VERSION_MIN << "-" + << API_VERSION_MAX << "; Falling back to hardcoded values."; defaultVersion_ = API_VERSION_DEFAULT; minVersion_ = API_VERSION_MIN; @@ -66,8 +66,8 @@ public: checkRange(maxVersion, "maximum"); #endif - log_.info() << "API version settings: [min = " << minVersion_ << "; max = " << maxVersion_ - << "; default = " << defaultVersion_ << "]"; + LOG(log_.info()) << "API version settings: [min = " << minVersion_ << "; max = " << maxVersion_ + << "; default = " << defaultVersion_ << "]"; } ProductionAPIVersionParser(util::Config const& config); diff --git a/src/rpc/handlers/AccountTx.cpp b/src/rpc/handlers/AccountTx.cpp index 4fab8e37..3f4abc9c 100644 --- a/src/rpc/handlers/AccountTx.cpp +++ b/src/rpc/handlers/AccountTx.cpp @@ -87,7 +87,7 @@ AccountTxHandler::process(AccountTxHandler::Input input, Context const& ctx) con return sharedPtrBackend_->fetchAccountTransactions(*accountID, limit, input.forward, cursor, ctx.yield); }); - log_.info() << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size(); + LOG(log_.info()) << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size(); auto const [blobs, retCursor] = txnsAndCursor; Output response; @@ -106,7 +106,7 @@ AccountTxHandler::process(AccountTxHandler::Input input, Context const& ctx) con } else if (txnPlusMeta.ledgerSequence > maxIndex && !input.forward) { - log_.debug() << "Skipping over transactions from incomplete ledger"; + LOG(log_.debug()) << "Skipping over transactions from incomplete ledger"; continue; } diff --git a/src/rpc/handlers/LedgerData.cpp b/src/rpc/handlers/LedgerData.cpp index f4463fb5..c9ff8504 100644 --- a/src/rpc/handlers/LedgerData.cpp +++ b/src/rpc/handlers/LedgerData.cpp @@ -150,8 +150,8 @@ LedgerDataHandler::process(Input input, Context const& ctx) const } auto const end = std::chrono::system_clock::now(); - log_.debug() << "Number of results = " << results.size() << " fetched in " - << std::chrono::duration_cast(end - start).count() << " microseconds"; + LOG(log_.debug()) << "Number of results = " << results.size() << " fetched in " + << std::chrono::duration_cast(end - start).count() << " microseconds"; output.states.reserve(results.size()); @@ -180,8 +180,8 @@ LedgerDataHandler::process(Input input, Context const& ctx) const output.cacheFull = sharedPtrBackend_->cache().isFull(); auto const end2 = std::chrono::system_clock::now(); - log_.debug() << "Number of results = " << results.size() << " serialized in " - << std::chrono::duration_cast(end2 - end).count() << " microseconds"; + LOG(log_.debug()) << "Number of results = " << results.size() << " serialized in " + << std::chrono::duration_cast(end2 - end).count() << " microseconds"; return output; } diff --git a/src/rpc/handlers/NFTHistory.cpp b/src/rpc/handlers/NFTHistory.cpp index 6f07c2b8..2d0b4515 100644 --- a/src/rpc/handlers/NFTHistory.cpp +++ b/src/rpc/handlers/NFTHistory.cpp @@ -86,7 +86,7 @@ NFTHistoryHandler::process(NFTHistoryHandler::Input input, Context const& ctx) c auto const [txnsAndCursor, timeDiff] = util::timed( [&]() { return sharedPtrBackend_->fetchNFTTransactions(tokenID, limit, input.forward, cursor, ctx.yield); }); - log_.info() << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size(); + LOG(log_.info()) << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size(); Output response; auto const [blobs, retCursor] = txnsAndCursor; @@ -105,7 +105,7 @@ NFTHistoryHandler::process(NFTHistoryHandler::Input input, Context const& ctx) c } else if (txnPlusMeta.ledgerSequence > maxIndex && !input.forward) { - log_.debug() << "Skipping over transactions from incomplete ledger"; + LOG(log_.debug()) << "Skipping over transactions from incomplete ledger"; continue; } diff --git a/src/util/config/Config.cpp b/src/util/config/Config.cpp index 80285913..19f21009 100644 --- a/src/util/config/Config.cpp +++ b/src/util/config/Config.cpp @@ -183,7 +183,8 @@ ConfigReader::open(std::filesystem::path path) } catch (std::exception const& e) { - util::LogService::error() << "Could not read configuration file from '" << path.string() << "': " << e.what(); + LOG(util::LogService::error()) << "Could not read configuration file from '" << path.string() + << "': " << e.what(); } return Config{}; diff --git a/src/util/log/Logger.cpp b/src/util/log/Logger.cpp index 80bf2b98..1d114476 100644 --- a/src/util/log/Logger.cpp +++ b/src/util/log/Logger.cpp @@ -139,7 +139,7 @@ LogService::init(util::Config const& config) } core->set_filter(min_severity); - LogService::info() << "Default log level = " << defaultSeverity; + LOG(LogService::info()) << "Default log level = " << defaultSeverity; } Logger::Pump diff --git a/src/util/log/Logger.h b/src/util/log/Logger.h index 805ef7d3..dcb4d296 100644 --- a/src/util/log/Logger.h +++ b/src/util/log/Logger.h @@ -88,6 +88,16 @@ using SourceLocationType = SourceLocation; #define CURRENT_SRC_LOCATION SourceLocationType(__builtin_FILE(), __builtin_LINE()) #endif +/** + * @brief Skips evaluation of expensive argument lists if the given logger is disabled for the required severity level. + */ +#define LOG(x) \ + if (!x) \ + { \ + } \ + else \ + x + /** * @brief Custom severity levels for @ref util::Logger. */ @@ -158,8 +168,7 @@ class Logger final operator=(Pump&&) = delete; /** - * @brief Perfectly forwards any incoming data into the underlying - * boost::log pump if the pump is available. nop otherwise. + * @brief Perfectly forwards any incoming data into the underlying boost::log pump if the pump is available. * * @tparam T Type of data to pump * @param data The data to pump @@ -174,6 +183,14 @@ class Logger final return *this; } + /** + * @return true if logger is enabled; false otherwise + */ + operator bool() const + { + return pump_.has_value(); + } + private: [[nodiscard]] std::string pretty_path(SourceLocationType const& loc, size_t max_depth = 3) const; diff --git a/src/web/Context.h b/src/web/Context.h index 9506c1d5..592e2373 100644 --- a/src/web/Context.h +++ b/src/web/Context.h @@ -76,7 +76,7 @@ struct Context : util::Taggable , clientIp(clientIp) { static util::Logger perfLog{"Performance"}; - perfLog.debug() << tag() << "new Context created"; + LOG(perfLog.debug()) << tag() << "new Context created"; } Context(Context&&) = default; diff --git a/src/web/DOSGuard.h b/src/web/DOSGuard.h index 0f7811c3..60360096 100644 --- a/src/web/DOSGuard.h +++ b/src/web/DOSGuard.h @@ -129,8 +129,8 @@ public: auto [transferedByte, requests] = ipState_.at(ip); if (transferedByte > maxFetches_ || requests > maxRequestCount_) { - log_.warn() << "Dosguard: Client surpassed the rate limit. ip = " << ip - << " Transfered Byte: " << transferedByte << "; Requests: " << requests; + LOG(log_.warn()) << "Dosguard: Client surpassed the rate limit. ip = " << ip + << " Transfered Byte: " << transferedByte << "; Requests: " << requests; return false; } } @@ -139,8 +139,8 @@ public: { if (it->second > maxConnCount_) { - log_.warn() << "Dosguard: Client surpassed the rate limit. ip = " << ip - << " Concurrent connection: " << it->second; + LOG(log_.warn()) << "Dosguard: Client surpassed the rate limit. ip = " << ip + << " Concurrent connection: " << it->second; return false; } } diff --git a/src/web/RPCServerHandler.h b/src/web/RPCServerHandler.h index 6d734a5c..0572ac15 100644 --- a/src/web/RPCServerHandler.h +++ b/src/web/RPCServerHandler.h @@ -87,7 +87,7 @@ public: try { auto req = boost::json::parse(request).as_object(); - perfLog_.debug() << connection->tag() << "Adding to work queue"; + LOG(perfLog_.debug()) << connection->tag() << "Adding to work queue"; if (not connection->upgraded and not req.contains("params")) req["params"] = boost::json::array({boost::json::object{}}); @@ -116,7 +116,7 @@ public: } catch (std::exception const& ex) { - perfLog_.error() << connection->tag() << "Caught exception: " << ex.what(); + LOG(perfLog_.error()) << connection->tag() << "Caught exception: " << ex.what(); rpcEngine_->notifyInternalError(); throw; } @@ -144,9 +144,9 @@ private: boost::json::object&& request, std::shared_ptr const& connection) { - log_.info() << connection->tag() << (connection->upgraded ? "ws" : "http") - << " received request from work queue: " << util::removeSecret(request) - << " ip = " << connection->clientIp; + LOG(log_.info()) << connection->tag() << (connection->upgraded ? "ws" : "http") + << " received request from work queue: " << util::removeSecret(request) + << " ip = " << connection->clientIp; try { @@ -181,8 +181,8 @@ private: if (!context) { auto const err = context.error(); - perfLog_.warn() << connection->tag() << "Could not create Web context: " << err; - log_.warn() << connection->tag() << "Could not create Web context: " << err; + LOG(perfLog_.warn()) << connection->tag() << "Could not create Web context: " << err; + LOG(log_.warn()) << connection->tag() << "Could not create Web context: " << err; // we count all those as BadSyntax - as the WS path would. // Although over HTTP these will yield a 400 status with a plain text response (for most). @@ -202,8 +202,8 @@ private: response = web::detail::ErrorHelper(connection, request).composeError(*status); auto const responseStr = boost::json::serialize(response); - perfLog_.debug() << context->tag() << "Encountered error: " << responseStr; - log_.debug() << context->tag() << "Encountered error: " << responseStr; + LOG(perfLog_.debug()) << context->tag() << "Encountered error: " << responseStr; + LOG(log_.debug()) << context->tag() << "Encountered error: " << responseStr; } else { @@ -260,8 +260,8 @@ private: { // note: while we are catching this in buildResponse too, this is here to make sure // that any other code that may throw is outside of buildResponse is also worked around. - perfLog_.error() << connection->tag() << "Caught exception: " << ex.what(); - log_.error() << connection->tag() << "Caught exception: " << ex.what(); + LOG(perfLog_.error()) << connection->tag() << "Caught exception: " << ex.what(); + LOG(log_.error()) << connection->tag() << "Caught exception: " << ex.what(); rpcEngine_->notifyInternalError(); return web::detail::ErrorHelper(connection, request).sendInternalError(); diff --git a/src/web/Server.h b/src/web/Server.h index e18b37cf..ff12c6a6 100644 --- a/src/web/Server.h +++ b/src/web/Server.h @@ -95,7 +95,7 @@ public: if (ec == boost::asio::ssl::error::stream_truncated) return; - log_.info() << "Detector failed (" << message << "): " << ec.message(); + LOG(log_.info()) << "Detector failed (" << message << "): " << ec.message(); } /** @brief Initiate the detection. */ @@ -205,7 +205,7 @@ public: acceptor_.bind(endpoint, ec); if (ec) { - log_.error() << "Failed to bind to endpoint: " << endpoint << ". message: " << ec.message(); + LOG(log_.error()) << "Failed to bind to endpoint: " << endpoint << ". message: " << ec.message(); throw std::runtime_error( fmt::format("Failed to bind to endpoint: {}:{}", endpoint.address().to_string(), endpoint.port())); } @@ -213,7 +213,7 @@ public: acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); if (ec) { - log_.error() << "Failed to listen at endpoint: " << endpoint << ". message: " << ec.message(); + LOG(log_.error()) << "Failed to listen at endpoint: " << endpoint << ". message: " << ec.message(); throw std::runtime_error( fmt::format("Failed to listen at endpoint: {}:{}", endpoint.address().to_string(), endpoint.port())); } diff --git a/src/web/impl/HttpBase.h b/src/web/impl/HttpBase.h index 1a4e1eca..3809058a 100644 --- a/src/web/impl/HttpBase.h +++ b/src/web/impl/HttpBase.h @@ -121,7 +121,7 @@ protected: if (!ec_ && ec != boost::asio::error::operation_aborted) { ec_ = ec; - perfLog_.info() << tag() << ": " << what << ": " << ec.message(); + LOG(perfLog_.info()) << tag() << ": " << what << ": " << ec.message(); boost::beast::get_lowest_layer(derived().stream()).socket().close(ec); } } @@ -139,13 +139,13 @@ public: , dosGuard_(dosGuard) , handler_(handler) { - perfLog_.debug() << tag() << "http session created"; + LOG(perfLog_.debug()) << tag() << "http session created"; dosGuard_.get().increment(ip); } virtual ~HttpBase() { - perfLog_.debug() << tag() << "http session closed"; + LOG(perfLog_.debug()) << tag() << "http session closed"; if (not upgraded) dosGuard_.get().decrement(this->clientIp); } @@ -204,7 +204,7 @@ public: boost::json::serialize(rpc::makeError(rpc::RippledError::rpcSLOW_DOWN)))); } - log_.info() << tag() << "Received request from ip = " << clientIp << " - posting to WorkQueue"; + LOG(log_.info()) << tag() << "Received request from ip = " << clientIp << " - posting to WorkQueue"; try { diff --git a/src/web/impl/WsBase.h b/src/web/impl/WsBase.h index 58aaf027..a1e1ad9c 100644 --- a/src/web/impl/WsBase.h +++ b/src/web/impl/WsBase.h @@ -63,7 +63,7 @@ protected: if (!ec_ && ec != boost::asio::error::operation_aborted) { ec_ = ec; - perfLog_.info() << tag() << ": " << what << ": " << ec.message(); + LOG(perfLog_.info()) << tag() << ": " << what << ": " << ec.message(); boost::beast::get_lowest_layer(derived().ws()).socket().close(ec); (*handler_)(ec, derived().shared_from_this()); } @@ -79,12 +79,12 @@ public: : ConnectionBase(tagFactory, ip), buffer_(std::move(buffer)), dosGuard_(dosGuard), handler_(handler) { upgraded = true; - perfLog_.debug() << tag() << "session created"; + LOG(perfLog_.debug()) << tag() << "session created"; } virtual ~WsBase() { - perfLog_.debug() << tag() << "session closed"; + LOG(perfLog_.debug()) << tag() << "session closed"; dosGuard_.get().decrement(clientIp); } @@ -193,7 +193,7 @@ public: if (ec) return wsFail(ec, "accept"); - perfLog_.info() << tag() << "accepting new connection"; + LOG(perfLog_.info()) << tag() << "accepting new connection"; doRead(); } @@ -218,7 +218,7 @@ public: if (ec) return wsFail(ec, "read"); - perfLog_.info() << tag() << "Received request from ip = " << this->clientIp; + LOG(perfLog_.info()) << tag() << "Received request from ip = " << this->clientIp; auto sendError = [this](auto error, std::string&& requestStr) { auto e = rpc::makeError(error); diff --git a/unittests/LoggerTests.cpp b/unittests/LoggerTests.cpp index 19030f52..cb900694 100644 --- a/unittests/LoggerTests.cpp +++ b/unittests/LoggerTests.cpp @@ -57,6 +57,23 @@ TEST_F(LoggerTest, Filtering) checkEqual("Trace:TRC Trace line logged for 'Trace' component"); } +TEST_F(LoggerTest, LOGMacro) +{ + Logger log{"General"}; + + auto computeCalled = false; + auto compute = [&computeCalled]() { + computeCalled = true; + return "computed"; + }; + + LOG(log.trace()) << compute(); + EXPECT_FALSE(computeCalled); + + log.trace() << compute(); + EXPECT_TRUE(computeCalled); +} + TEST_F(NoLoggerTest, Basic) { Logger log{"Trace"}; diff --git a/unittests/etl/CacheLoaderTests.cpp b/unittests/etl/CacheLoaderTests.cpp index 050a367c..e37800ce 100644 --- a/unittests/etl/CacheLoaderTests.cpp +++ b/unittests/etl/CacheLoaderTests.cpp @@ -127,7 +127,6 @@ TEST_F(CacheLoaderTest, FromCache) .WillByDefault(Return(std::vector{keysSize - 1, Blob{'s'}})); EXPECT_CALL(*rawBackendPtr, doFetchLedgerObjects).Times(loops); - EXPECT_CALL(cache, size).Times(AtLeast(1)); EXPECT_CALL(cache, update).Times(loops); EXPECT_CALL(cache, isFull).Times(1);