mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-29 08:05:50 +00:00
@@ -39,7 +39,7 @@ std::shared_ptr<BackendInterface>
|
||||
make_Backend(boost::asio::io_context& ioc, util::Config const& config)
|
||||
{
|
||||
static util::Logger log{"Backend"};
|
||||
log.info() << "Constructing BackendInterface";
|
||||
LOG(log.info()) << "Constructing BackendInterface";
|
||||
|
||||
auto const readOnly = config.valueOr("read_only", false);
|
||||
|
||||
@@ -63,7 +63,7 @@ make_Backend(boost::asio::io_context& ioc, util::Config const& config)
|
||||
backend->updateRange(rng->maxSequence);
|
||||
}
|
||||
|
||||
log.info() << "Constructed BackendInterface Successfully";
|
||||
LOG(log.info()) << "Constructed BackendInterface Successfully";
|
||||
return backend;
|
||||
}
|
||||
} // namespace data
|
||||
|
||||
@@ -32,11 +32,11 @@ namespace data {
|
||||
bool
|
||||
BackendInterface::finishWrites(std::uint32_t const ledgerSequence)
|
||||
{
|
||||
gLog.debug() << "Want finish writes for " << ledgerSequence;
|
||||
LOG(gLog.debug()) << "Want finish writes for " << ledgerSequence;
|
||||
auto commitRes = doFinishWrites();
|
||||
if (commitRes)
|
||||
{
|
||||
gLog.debug() << "Successfully commited. Updating range now to " << ledgerSequence;
|
||||
LOG(gLog.debug()) << "Successfully commited. Updating range now to " << ledgerSequence;
|
||||
updateRange(ledgerSequence);
|
||||
}
|
||||
return commitRes;
|
||||
@@ -64,17 +64,17 @@ BackendInterface::fetchLedgerObject(
|
||||
auto obj = cache_.get(key, sequence);
|
||||
if (obj)
|
||||
{
|
||||
gLog.trace() << "Cache hit - " << ripple::strHex(key);
|
||||
LOG(gLog.trace()) << "Cache hit - " << ripple::strHex(key);
|
||||
return *obj;
|
||||
}
|
||||
else
|
||||
{
|
||||
gLog.trace() << "Cache miss - " << ripple::strHex(key);
|
||||
LOG(gLog.trace()) << "Cache miss - " << ripple::strHex(key);
|
||||
auto dbObj = doFetchLedgerObject(key, sequence, yield);
|
||||
if (!dbObj)
|
||||
gLog.trace() << "Missed cache and missed in db";
|
||||
LOG(gLog.trace()) << "Missed cache and missed in db";
|
||||
else
|
||||
gLog.trace() << "Missed cache but found in db";
|
||||
LOG(gLog.trace()) << "Missed cache but found in db";
|
||||
return dbObj;
|
||||
}
|
||||
}
|
||||
@@ -96,7 +96,7 @@ BackendInterface::fetchLedgerObjects(
|
||||
else
|
||||
misses.push_back(keys[i]);
|
||||
}
|
||||
gLog.trace() << "Cache hits = " << keys.size() - misses.size() << " - cache misses = " << misses.size();
|
||||
LOG(gLog.trace()) << "Cache hits = " << keys.size() - misses.size() << " - cache misses = " << misses.size();
|
||||
|
||||
if (misses.size())
|
||||
{
|
||||
@@ -122,9 +122,9 @@ BackendInterface::fetchSuccessorKey(
|
||||
{
|
||||
auto succ = cache_.getSuccessor(key, ledgerSequence);
|
||||
if (succ)
|
||||
gLog.trace() << "Cache hit - " << ripple::strHex(key);
|
||||
LOG(gLog.trace()) << "Cache hit - " << ripple::strHex(key);
|
||||
else
|
||||
gLog.trace() << "Cache miss - " << ripple::strHex(key);
|
||||
LOG(gLog.trace()) << "Cache miss - " << ripple::strHex(key);
|
||||
return succ ? succ->key : doFetchSuccessorKey(key, ledgerSequence, yield);
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ BackendInterface::fetchBookOffers(
|
||||
succMillis += getMillis(mid2 - mid1);
|
||||
if (!offerDir || offerDir->key >= bookEnd)
|
||||
{
|
||||
gLog.trace() << "offerDir.has_value() " << offerDir.has_value() << " breaking";
|
||||
LOG(gLog.trace()) << "offerDir.has_value() " << offerDir.has_value() << " breaking";
|
||||
break;
|
||||
}
|
||||
uTipIndex = offerDir->key;
|
||||
@@ -187,7 +187,7 @@ BackendInterface::fetchBookOffers(
|
||||
auto next = sle.getFieldU64(ripple::sfIndexNext);
|
||||
if (!next)
|
||||
{
|
||||
gLog.trace() << "Next is empty. breaking";
|
||||
LOG(gLog.trace()) << "Next is empty. breaking";
|
||||
break;
|
||||
}
|
||||
auto nextKey = ripple::keylet::page(uTipIndex, next);
|
||||
@@ -203,13 +203,13 @@ BackendInterface::fetchBookOffers(
|
||||
auto objs = fetchLedgerObjects(keys, ledgerSequence, yield);
|
||||
for (size_t i = 0; i < keys.size() && i < limit; ++i)
|
||||
{
|
||||
gLog.trace() << "Key = " << ripple::strHex(keys[i]) << " blob = " << ripple::strHex(objs[i])
|
||||
LOG(gLog.trace()) << "Key = " << ripple::strHex(keys[i]) << " blob = " << ripple::strHex(objs[i])
|
||||
<< " ledgerSequence = " << ledgerSequence;
|
||||
assert(objs[i].size());
|
||||
page.offers.push_back({keys[i], objs[i]});
|
||||
}
|
||||
auto end = std::chrono::system_clock::now();
|
||||
gLog.debug() << "Fetching " << std::to_string(keys.size()) << " offers took "
|
||||
LOG(gLog.debug()) << "Fetching " << std::to_string(keys.size()) << " offers took "
|
||||
<< std::to_string(getMillis(mid - begin)) << " milliseconds. Fetching next dir took "
|
||||
<< std::to_string(succMillis) << " milliseonds. Fetched next dir " << std::to_string(numSucc)
|
||||
<< " times"
|
||||
@@ -276,14 +276,14 @@ BackendInterface::fetchLedgerPage(
|
||||
page.objects.push_back({std::move(keys[i]), std::move(objects[i])});
|
||||
else if (!outOfOrder)
|
||||
{
|
||||
gLog.error() << "Deleted or non-existent object in successor table. key = " << ripple::strHex(keys[i])
|
||||
LOG(gLog.error()) << "Deleted or non-existent object in successor table. key = " << ripple::strHex(keys[i])
|
||||
<< " - seq = " << ledgerSequence;
|
||||
std::stringstream msg;
|
||||
for (size_t j = 0; j < objects.size(); ++j)
|
||||
{
|
||||
msg << " - " << ripple::strHex(keys[j]);
|
||||
}
|
||||
gLog.error() << msg.str();
|
||||
LOG(gLog.error()) << msg.str();
|
||||
}
|
||||
}
|
||||
if (keys.size() && !reachedEnd)
|
||||
@@ -302,7 +302,7 @@ BackendInterface::fetchFees(std::uint32_t const seq, boost::asio::yield_context
|
||||
|
||||
if (!bytes)
|
||||
{
|
||||
gLog.error() << "Could not find fees";
|
||||
LOG(gLog.error()) << "Could not find fees";
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ retryOnTimeout(FnType func, size_t waitMs = 500)
|
||||
}
|
||||
catch (DatabaseTimeout const&)
|
||||
{
|
||||
log.error() << "Database request timed out. Sleeping and retrying ... ";
|
||||
LOG(log.error()) << "Database request timed out. Sleeping and retrying ... ";
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(waitMs));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,11 +93,11 @@ public:
|
||||
}
|
||||
catch (std::runtime_error const& ex)
|
||||
{
|
||||
log_.error() << "Failed to prepare the statements: " << ex.what() << "; readOnly: " << readOnly;
|
||||
LOG(log_.error()) << "Failed to prepare the statements: " << ex.what() << "; readOnly: " << readOnly;
|
||||
throw;
|
||||
}
|
||||
|
||||
log_.info() << "Created (revamped) CassandraBackend";
|
||||
LOG(log_.info()) << "Created (revamped) CassandraBackend";
|
||||
}
|
||||
|
||||
TransactionsAndCursor
|
||||
@@ -123,7 +123,7 @@ public:
|
||||
if (cursor)
|
||||
{
|
||||
statement.bindAt(1, cursor->asTuple());
|
||||
log_.debug() << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence
|
||||
LOG(log_.debug()) << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence
|
||||
<< cursor->transactionIndex;
|
||||
}
|
||||
else
|
||||
@@ -132,7 +132,8 @@ public:
|
||||
auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
|
||||
|
||||
statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
|
||||
log_.debug() << "account = " << ripple::strHex(account) << " idx = " << seq << " tuple = " << placeHolder;
|
||||
LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq
|
||||
<< " tuple = " << placeHolder;
|
||||
}
|
||||
|
||||
// FIXME: Limit is a hack to support uint32_t properly for the time
|
||||
@@ -143,20 +144,20 @@ public:
|
||||
auto const& results = res.value();
|
||||
if (not results.hasRows())
|
||||
{
|
||||
log_.debug() << "No rows returned";
|
||||
LOG(log_.debug()) << "No rows returned";
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256> hashes = {};
|
||||
auto numRows = results.numRows();
|
||||
log_.info() << "num_rows = " << numRows;
|
||||
LOG(log_.info()) << "num_rows = " << numRows;
|
||||
|
||||
for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results))
|
||||
{
|
||||
hashes.push_back(hash);
|
||||
if (--numRows == 0)
|
||||
{
|
||||
log_.debug() << "Setting cursor";
|
||||
LOG(log_.debug()) << "Setting cursor";
|
||||
cursor = data;
|
||||
|
||||
// forward queries by ledger/tx sequence `>=`
|
||||
@@ -167,11 +168,11 @@ public:
|
||||
}
|
||||
|
||||
auto const txns = fetchTransactions(hashes, yield);
|
||||
log_.debug() << "Txns = " << txns.size();
|
||||
LOG(log_.debug()) << "Txns = " << txns.size();
|
||||
|
||||
if (txns.size() == limit)
|
||||
{
|
||||
log_.debug() << "Returning cursor";
|
||||
LOG(log_.debug()) << "Returning cursor";
|
||||
return {txns, cursor};
|
||||
}
|
||||
|
||||
@@ -191,11 +192,11 @@ public:
|
||||
|
||||
if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1)))
|
||||
{
|
||||
log_.warn() << "Update failed for ledger " << ledgerSequence_;
|
||||
LOG(log_.warn()) << "Update failed for ledger " << ledgerSequence_;
|
||||
return false;
|
||||
}
|
||||
|
||||
log_.info() << "Committed ledger " << ledgerSequence_;
|
||||
LOG(log_.info()) << "Committed ledger " << ledgerSequence_;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -219,15 +220,15 @@ public:
|
||||
if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
|
||||
return maybeValue;
|
||||
|
||||
log_.error() << "Could not fetch latest ledger - no rows";
|
||||
LOG(log_.error()) << "Could not fetch latest ledger - no rows";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
log_.error() << "Could not fetch latest ledger - no result";
|
||||
LOG(log_.error()) << "Could not fetch latest ledger - no result";
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Could not fetch latest ledger: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch latest ledger: " << res.error();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
@@ -246,15 +247,15 @@ public:
|
||||
return util::deserializeHeader(ripple::makeSlice(*maybeValue));
|
||||
}
|
||||
|
||||
log_.error() << "Could not fetch ledger by sequence - no rows";
|
||||
LOG(log_.error()) << "Could not fetch ledger by sequence - no rows";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
log_.error() << "Could not fetch ledger by sequence - no result";
|
||||
LOG(log_.error()) << "Could not fetch ledger by sequence - no result";
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Could not fetch ledger by sequence: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
@@ -270,15 +271,15 @@ public:
|
||||
if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
|
||||
return fetchLedgerBySequence(*maybeValue, yield);
|
||||
|
||||
log_.error() << "Could not fetch ledger by hash - no rows";
|
||||
LOG(log_.error()) << "Could not fetch ledger by hash - no rows";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
log_.error() << "Could not fetch ledger by hash - no result";
|
||||
LOG(log_.error()) << "Could not fetch ledger by hash - no result";
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Could not fetch ledger by hash: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
@@ -292,7 +293,7 @@ public:
|
||||
auto const& results = res.value();
|
||||
if (not results.hasRows())
|
||||
{
|
||||
log_.debug() << "Could not fetch ledger range - no rows";
|
||||
LOG(log_.debug()) << "Could not fetch ledger range - no rows";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -314,12 +315,13 @@ public:
|
||||
if (range.minSequence > range.maxSequence)
|
||||
std::swap(range.minSequence, range.maxSequence);
|
||||
|
||||
log_.debug() << "After hardFetchLedgerRange range is " << range.minSequence << ":" << range.maxSequence;
|
||||
LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":"
|
||||
<< range.maxSequence;
|
||||
return range;
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Could not fetch ledger range: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch ledger range: " << res.error();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
@@ -341,14 +343,14 @@ public:
|
||||
|
||||
if (not res)
|
||||
{
|
||||
log_.error() << "Could not fetch all transaction hashes: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error();
|
||||
return {};
|
||||
}
|
||||
|
||||
auto const& result = res.value();
|
||||
if (not result.hasRows())
|
||||
{
|
||||
log_.error() << "Could not fetch all transaction hashes - no rows; ledger = "
|
||||
LOG(log_.error()) << "Could not fetch all transaction hashes - no rows; ledger = "
|
||||
<< std::to_string(ledgerSequence);
|
||||
return {};
|
||||
}
|
||||
@@ -358,8 +360,9 @@ public:
|
||||
hashes.push_back(std::move(hash));
|
||||
|
||||
auto end = std::chrono::system_clock::now();
|
||||
log_.debug() << "Fetched " << hashes.size() << " transaction hashes from Cassandra in "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << " milliseconds";
|
||||
LOG(log_.debug()) << "Fetched " << hashes.size() << " transaction hashes from Cassandra in "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
|
||||
<< " milliseconds";
|
||||
|
||||
return hashes;
|
||||
}
|
||||
@@ -398,7 +401,7 @@ public:
|
||||
return result;
|
||||
}
|
||||
|
||||
log_.error() << "Could not fetch NFT - no rows";
|
||||
LOG(log_.error()) << "Could not fetch NFT - no rows";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -425,7 +428,7 @@ public:
|
||||
if (cursor)
|
||||
{
|
||||
statement.bindAt(1, cursor->asTuple());
|
||||
log_.debug() << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence
|
||||
LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence
|
||||
<< cursor->transactionIndex;
|
||||
}
|
||||
else
|
||||
@@ -434,7 +437,8 @@ public:
|
||||
auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
|
||||
|
||||
statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
|
||||
log_.debug() << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq << " tuple = " << placeHolder;
|
||||
LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq
|
||||
<< " tuple = " << placeHolder;
|
||||
}
|
||||
|
||||
statement.bindAt(2, Limit{limit});
|
||||
@@ -443,20 +447,20 @@ public:
|
||||
auto const& results = res.value();
|
||||
if (not results.hasRows())
|
||||
{
|
||||
log_.debug() << "No rows returned";
|
||||
LOG(log_.debug()) << "No rows returned";
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256> hashes = {};
|
||||
auto numRows = results.numRows();
|
||||
log_.info() << "num_rows = " << numRows;
|
||||
LOG(log_.info()) << "num_rows = " << numRows;
|
||||
|
||||
for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results))
|
||||
{
|
||||
hashes.push_back(hash);
|
||||
if (--numRows == 0)
|
||||
{
|
||||
log_.debug() << "Setting cursor";
|
||||
LOG(log_.debug()) << "Setting cursor";
|
||||
cursor = data;
|
||||
|
||||
// forward queries by ledger/tx sequence `>=`
|
||||
@@ -467,11 +471,11 @@ public:
|
||||
}
|
||||
|
||||
auto const txns = fetchTransactions(hashes, yield);
|
||||
log_.debug() << "NFT Txns = " << txns.size();
|
||||
LOG(log_.debug()) << "NFT Txns = " << txns.size();
|
||||
|
||||
if (txns.size() == limit)
|
||||
{
|
||||
log_.debug() << "Returning cursor";
|
||||
LOG(log_.debug()) << "Returning cursor";
|
||||
return {txns, cursor};
|
||||
}
|
||||
|
||||
@@ -482,7 +486,7 @@ public:
|
||||
doFetchLedgerObject(ripple::uint256 const& key, std::uint32_t const sequence, boost::asio::yield_context yield)
|
||||
const override
|
||||
{
|
||||
log_.debug() << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
|
||||
LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
|
||||
if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res)
|
||||
{
|
||||
if (auto const result = res->template get<Blob>(); result)
|
||||
@@ -492,12 +496,12 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.debug() << "Could not fetch ledger object - no rows";
|
||||
LOG(log_.debug()) << "Could not fetch ledger object - no rows";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Could not fetch ledger object: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch ledger object: " << res.error();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
@@ -515,12 +519,12 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.debug() << "Could not fetch transaction - no rows";
|
||||
LOG(log_.debug()) << "Could not fetch transaction - no rows";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Could not fetch transaction: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch transaction: " << res.error();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
@@ -540,12 +544,12 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.debug() << "Could not fetch successor - no rows";
|
||||
LOG(log_.debug()) << "Could not fetch successor - no rows";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Could not fetch successor: " << res.error();
|
||||
LOG(log_.error()) << "Could not fetch successor: " << res.error();
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
@@ -585,7 +589,8 @@ public:
|
||||
});
|
||||
|
||||
assert(numHashes == results.size());
|
||||
log_.debug() << "Fetched " << numHashes << " transactions from Cassandra in " << timeDiff << " milliseconds";
|
||||
LOG(log_.debug()) << "Fetched " << numHashes << " transactions from Cassandra in " << timeDiff
|
||||
<< " milliseconds";
|
||||
return results;
|
||||
}
|
||||
|
||||
@@ -599,7 +604,7 @@ public:
|
||||
return {};
|
||||
|
||||
auto const numKeys = keys.size();
|
||||
log_.trace() << "Fetching " << numKeys << " objects";
|
||||
LOG(log_.trace()) << "Fetching " << numKeys << " objects";
|
||||
|
||||
std::vector<Blob> results;
|
||||
results.reserve(numKeys);
|
||||
@@ -622,7 +627,7 @@ public:
|
||||
return {};
|
||||
});
|
||||
|
||||
log_.trace() << "Fetched " << numKeys << " objects";
|
||||
LOG(log_.trace()) << "Fetched " << numKeys << " objects";
|
||||
return results;
|
||||
}
|
||||
|
||||
@@ -633,14 +638,14 @@ public:
|
||||
auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
|
||||
if (not res)
|
||||
{
|
||||
log_.error() << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence;
|
||||
LOG(log_.error()) << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence;
|
||||
return {};
|
||||
}
|
||||
|
||||
auto const& results = res.value();
|
||||
if (not results)
|
||||
{
|
||||
log_.error() << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
|
||||
LOG(log_.error()) << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -655,7 +660,8 @@ public:
|
||||
if (keys.empty())
|
||||
return {};
|
||||
|
||||
log_.debug() << "Fetched " << keys.size() << " diff hashes from Cassandra in " << timeDiff << " milliseconds";
|
||||
LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from Cassandra in " << timeDiff
|
||||
<< " milliseconds";
|
||||
|
||||
auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield);
|
||||
std::vector<LedgerObject> results;
|
||||
@@ -676,7 +682,7 @@ public:
|
||||
void
|
||||
doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
|
||||
{
|
||||
log_.trace() << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]";
|
||||
LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]";
|
||||
|
||||
if (range)
|
||||
executor_.write(schema_->insertDiff, seq, key);
|
||||
@@ -687,7 +693,7 @@ public:
|
||||
void
|
||||
writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override
|
||||
{
|
||||
log_.trace() << "Writing successor. key = " << key.size() << " bytes. "
|
||||
LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. "
|
||||
<< " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes.";
|
||||
assert(key.size() != 0);
|
||||
assert(successor.size() != 0);
|
||||
@@ -740,7 +746,7 @@ public:
|
||||
std::string&& transaction,
|
||||
std::string&& metadata) override
|
||||
{
|
||||
log_.trace() << "Writing txn to cassandra";
|
||||
LOG(log_.trace()) << "Writing txn to cassandra";
|
||||
|
||||
executor_.write(schema_->insertLedgerTransaction, seq, hash);
|
||||
executor_.write(
|
||||
@@ -798,13 +804,13 @@ private:
|
||||
auto maybeSuccess = res->template get<bool>();
|
||||
if (not maybeSuccess)
|
||||
{
|
||||
log_.error() << "executeSyncUpdate - error getting result - no row";
|
||||
LOG(log_.error()) << "executeSyncUpdate - error getting result - no row";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (not maybeSuccess.value())
|
||||
{
|
||||
log_.warn() << "Update failed. Checking if DB state is what we expect";
|
||||
LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect";
|
||||
|
||||
// error may indicate that another writer wrote something.
|
||||
// in this case let's just compare the current state of things
|
||||
|
||||
@@ -641,9 +641,9 @@ public:
|
||||
void
|
||||
prepareStatements(Handle const& handle)
|
||||
{
|
||||
log_.info() << "Preparing cassandra statements";
|
||||
LOG(log_.info()) << "Preparing cassandra statements";
|
||||
statements_ = std::make_unique<Statements>(settingsProvider_, handle);
|
||||
log_.info() << "Finished preparing statements";
|
||||
LOG(log_.info()) << "Finished preparing statements";
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -143,18 +143,18 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
|
||||
return maybeValue ? to_string(*maybeValue) : "default";
|
||||
};
|
||||
|
||||
log_.info() << "Threads: " << settings.threads;
|
||||
log_.info() << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold;
|
||||
log_.info() << "Max connections per host: " << settings.maxConnectionsPerHost;
|
||||
log_.info() << "Core connections per host: " << settings.coreConnectionsPerHost;
|
||||
log_.info() << "IO queue size: " << queueSize;
|
||||
log_.info() << "Event queue size: " << valueOrDefault(settings.queueSizeEvent);
|
||||
log_.info() << "Write bytes high watermark: " << valueOrDefault(settings.writeBytesHighWatermark);
|
||||
log_.info() << "Write bytes low watermark: " << valueOrDefault(settings.writeBytesLowWatermark);
|
||||
log_.info() << "Pending requests high watermark: " << valueOrDefault(settings.pendingRequestsHighWatermark);
|
||||
log_.info() << "Pending requests low watermark: " << valueOrDefault(settings.pendingRequestsLowWatermark);
|
||||
log_.info() << "Max requests per flush: " << valueOrDefault(settings.maxRequestsPerFlush);
|
||||
log_.info() << "Max concurrent creation: " << valueOrDefault(settings.maxConcurrentCreation);
|
||||
LOG(log_.info()) << "Threads: " << settings.threads;
|
||||
LOG(log_.info()) << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold;
|
||||
LOG(log_.info()) << "Max connections per host: " << settings.maxConnectionsPerHost;
|
||||
LOG(log_.info()) << "Core connections per host: " << settings.coreConnectionsPerHost;
|
||||
LOG(log_.info()) << "IO queue size: " << queueSize;
|
||||
LOG(log_.info()) << "Event queue size: " << valueOrDefault(settings.queueSizeEvent);
|
||||
LOG(log_.info()) << "Write bytes high watermark: " << valueOrDefault(settings.writeBytesHighWatermark);
|
||||
LOG(log_.info()) << "Write bytes low watermark: " << valueOrDefault(settings.writeBytesLowWatermark);
|
||||
LOG(log_.info()) << "Pending requests high watermark: " << valueOrDefault(settings.pendingRequestsHighWatermark);
|
||||
LOG(log_.info()) << "Pending requests low watermark: " << valueOrDefault(settings.pendingRequestsLowWatermark);
|
||||
LOG(log_.info()) << "Max requests per flush: " << valueOrDefault(settings.maxRequestsPerFlush);
|
||||
LOG(log_.info()) << "Max concurrent creation: " << valueOrDefault(settings.maxConcurrentCreation);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -178,7 +178,7 @@ Cluster::setupContactPoints(Settings::ContactPoints const& points)
|
||||
};
|
||||
|
||||
{
|
||||
log_.debug() << "Attempt connection using contact points: " << points.contactPoints;
|
||||
LOG(log_.debug()) << "Attempt connection using contact points: " << points.contactPoints;
|
||||
auto const rc = cass_cluster_set_contact_points(*this, points.contactPoints.data());
|
||||
throwErrorIfNeeded(rc, "contact_points", points.contactPoints);
|
||||
}
|
||||
@@ -193,7 +193,7 @@ Cluster::setupContactPoints(Settings::ContactPoints const& points)
|
||||
void
|
||||
Cluster::setupSecureBundle(Settings::SecureConnectionBundle const& bundle)
|
||||
{
|
||||
log_.debug() << "Attempt connection using secure bundle";
|
||||
LOG(log_.debug()) << "Attempt connection using secure bundle";
|
||||
if (auto const rc = cass_cluster_set_cloud_secure_connection_bundle(*this, bundle.bundle.data()); rc != CASS_OK)
|
||||
{
|
||||
throw std::runtime_error("Failed to connect using secure connection bundle " + bundle.bundle);
|
||||
@@ -206,7 +206,7 @@ Cluster::setupCertificate(Settings const& settings)
|
||||
if (not settings.certificate)
|
||||
return;
|
||||
|
||||
log_.debug() << "Configure SSL context";
|
||||
LOG(log_.debug()) << "Configure SSL context";
|
||||
SslContext context = SslContext(*settings.certificate);
|
||||
cass_cluster_set_ssl(*this, context);
|
||||
}
|
||||
@@ -217,7 +217,7 @@ Cluster::setupCredentials(Settings const& settings)
|
||||
if (not settings.username || not settings.password)
|
||||
return;
|
||||
|
||||
log_.debug() << "Set credentials; username: " << settings.username.value();
|
||||
LOG(log_.debug()) << "Set credentials; username: " << settings.username.value();
|
||||
cass_cluster_set_credentials(*this, settings.username.value().c_str(), settings.password.value().c_str());
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ public:
|
||||
, handle_{std::cref(handle)}
|
||||
, thread_{[this]() { ioc_.run(); }}
|
||||
{
|
||||
log_.info() << "Max write requests outstanding is " << maxWriteRequestsOutstanding_
|
||||
LOG(log_.info()) << "Max write requests outstanding is " << maxWriteRequestsOutstanding_
|
||||
<< "; Max read requests outstanding is " << maxReadRequestsOutstanding_;
|
||||
}
|
||||
|
||||
@@ -106,10 +106,10 @@ public:
|
||||
void
|
||||
sync()
|
||||
{
|
||||
log_.debug() << "Waiting to sync all writes...";
|
||||
LOG(log_.debug()) << "Waiting to sync all writes...";
|
||||
std::unique_lock<std::mutex> lck(syncMutex_);
|
||||
syncCv_.wait(lck, [this]() { return finishedAllWriteRequests(); });
|
||||
log_.debug() << "Sync done.";
|
||||
LOG(log_.debug()) << "Sync done.";
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -137,7 +137,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.warn() << "Cassandra sync write error, retrying: " << res.error();
|
||||
LOG(log_.warn()) << "Cassandra sync write error, retrying: " << res.error();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
}
|
||||
}
|
||||
@@ -262,7 +262,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Failed batch read in coroutine: " << res.error();
|
||||
LOG(log_.error()) << "Failed batch read in coroutine: " << res.error();
|
||||
throwErrorIfNeeded(res.error());
|
||||
}
|
||||
}
|
||||
@@ -311,7 +311,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Failed read in coroutine: " << res.error();
|
||||
LOG(log_.error()) << "Failed read in coroutine: " << res.error();
|
||||
throwErrorIfNeeded(res.error());
|
||||
}
|
||||
}
|
||||
@@ -400,7 +400,7 @@ private:
|
||||
std::unique_lock<std::mutex> lck(throttleMutex_);
|
||||
if (!canAddWriteRequest())
|
||||
{
|
||||
log_.trace() << "Max outstanding requests reached. "
|
||||
LOG(log_.trace()) << "Max outstanding requests reached. "
|
||||
<< "Waiting for other requests to finish";
|
||||
throttleCv_.wait(lck, [this]() { return canAddWriteRequest(); });
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ public:
|
||||
shouldRetry([[maybe_unused]] CassandraError err)
|
||||
{
|
||||
auto const delay = calculateDelay(attempt_);
|
||||
log_.error() << "Cassandra write error: " << err << ", current retries " << attempt_ << ", retrying in "
|
||||
LOG(log_.error()) << "Cassandra write error: " << err << ", current retries " << attempt_ << ", retrying in "
|
||||
<< delay.count() << " milliseconds";
|
||||
|
||||
return true; // keep retrying forever
|
||||
|
||||
@@ -29,7 +29,7 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)
|
||||
if (finishSequence_ && startSequence > *finishSequence_)
|
||||
return {};
|
||||
|
||||
log_.debug() << "Starting etl pipeline";
|
||||
LOG(log_.debug()) << "Starting etl pipeline";
|
||||
state_.isWriting = true;
|
||||
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
@@ -57,12 +57,12 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)
|
||||
|
||||
auto const end = std::chrono::system_clock::now();
|
||||
auto const lastPublishedSeq = ledgerPublisher_.getLastPublishedSequence();
|
||||
log_.debug() << "Extracted and wrote " << lastPublishedSeq.value_or(startSequence) - startSequence << " in "
|
||||
LOG(log_.debug()) << "Extracted and wrote " << lastPublishedSeq.value_or(startSequence) - startSequence << " in "
|
||||
<< ((end - begin).count()) / 1000000000.0;
|
||||
|
||||
state_.isWriting = false;
|
||||
|
||||
log_.debug() << "Stopping etl pipeline";
|
||||
LOG(log_.debug()) << "Stopping etl pipeline";
|
||||
return lastPublishedSeq;
|
||||
}
|
||||
|
||||
@@ -80,30 +80,30 @@ ETLService::monitor()
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
if (!rng)
|
||||
{
|
||||
log_.info() << "Database is empty. Will download a ledger from the network.";
|
||||
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
|
||||
std::optional<ripple::LedgerHeader> ledger;
|
||||
|
||||
try
|
||||
{
|
||||
if (startSequence_)
|
||||
{
|
||||
log_.info() << "ledger sequence specified in config. "
|
||||
LOG(log_.info()) << "ledger sequence specified in config. "
|
||||
<< "Will begin ETL process starting with ledger " << *startSequence_;
|
||||
ledger = ledgerLoader_.loadInitialLedger(*startSequence_);
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.info() << "Waiting for next ledger to be validated by network...";
|
||||
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
|
||||
std::optional<uint32_t> mostRecentValidated = networkValidatedLedgers_->getMostRecent();
|
||||
|
||||
if (mostRecentValidated)
|
||||
{
|
||||
log_.info() << "Ledger " << *mostRecentValidated << " has been validated. Downloading...";
|
||||
LOG(log_.info()) << "Ledger " << *mostRecentValidated << " has been validated. Downloading...";
|
||||
ledger = ledgerLoader_.loadInitialLedger(*mostRecentValidated);
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.info() << "The wait for the next validated ledger has been aborted. Exiting monitor loop";
|
||||
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. Exiting monitor loop";
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -124,23 +124,23 @@ ETLService::monitor()
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Failed to load initial ledger. Exiting monitor loop";
|
||||
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (startSequence_)
|
||||
log_.warn() << "start sequence specified but db is already populated";
|
||||
LOG(log_.warn()) << "start sequence specified but db is already populated";
|
||||
|
||||
log_.info() << "Database already populated. Picking up from the tip of history";
|
||||
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
|
||||
cacheLoader_.load(rng->maxSequence);
|
||||
}
|
||||
|
||||
assert(rng);
|
||||
uint32_t nextSequence = rng->maxSequence + 1;
|
||||
|
||||
log_.debug() << "Database is populated. "
|
||||
LOG(log_.debug()) << "Database is populated. "
|
||||
<< "Starting monitor loop. sequence = " << nextSequence;
|
||||
|
||||
while (true)
|
||||
@@ -152,7 +152,7 @@ ETLService::monitor()
|
||||
}
|
||||
else if (networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence, 1000))
|
||||
{
|
||||
log_.info() << "Ledger with sequence = " << nextSequence << " has been validated by the network. "
|
||||
LOG(log_.info()) << "Ledger with sequence = " << nextSequence << " has been validated by the network. "
|
||||
<< "Attempting to find in database and publish";
|
||||
|
||||
// Attempt to take over responsibility of ETL writer after 10 failed
|
||||
@@ -166,11 +166,11 @@ ETLService::monitor()
|
||||
|
||||
if (!success)
|
||||
{
|
||||
log_.warn() << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL";
|
||||
LOG(log_.warn()) << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL";
|
||||
|
||||
// returns the most recent sequence published empty optional if no sequence was published
|
||||
std::optional<uint32_t> lastPublished = runETLPipeline(nextSequence, extractorThreads_);
|
||||
log_.info() << "Aborting ETL. Falling back to publishing";
|
||||
LOG(log_.info()) << "Aborting ETL. Falling back to publishing";
|
||||
|
||||
// if no ledger was published, don't increment nextSequence
|
||||
if (lastPublished)
|
||||
@@ -187,7 +187,7 @@ ETLService::monitor()
|
||||
void
|
||||
ETLService::monitorReadOnly()
|
||||
{
|
||||
log_.debug() << "Starting reporting in strict read only mode";
|
||||
LOG(log_.debug()) << "Starting reporting in strict read only mode";
|
||||
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
uint32_t latestSequence;
|
||||
@@ -226,7 +226,7 @@ ETLService::monitorReadOnly()
|
||||
void
|
||||
ETLService::run()
|
||||
{
|
||||
log_.info() << "Starting reporting etl";
|
||||
LOG(log_.info()) << "Starting reporting etl";
|
||||
state_.isStopping = false;
|
||||
|
||||
doWork();
|
||||
|
||||
@@ -150,8 +150,8 @@ public:
|
||||
*/
|
||||
~ETLService()
|
||||
{
|
||||
log_.info() << "onStop called";
|
||||
log_.debug() << "Stopping Reporting ETL";
|
||||
LOG(log_.info()) << "onStop called";
|
||||
LOG(log_.debug()) << "Stopping Reporting ETL";
|
||||
|
||||
state_.isStopping = true;
|
||||
cacheLoader_.stop();
|
||||
@@ -159,7 +159,7 @@ public:
|
||||
if (worker_.joinable())
|
||||
worker_.join();
|
||||
|
||||
log_.debug() << "Joined ETLService worker thread";
|
||||
LOG(log_.debug()) << "Joined ETLService worker thread";
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -83,7 +83,7 @@ LoadBalancer::LoadBalancer(
|
||||
std::unique_ptr<Source> source = make_Source(entry, ioc, backend, subscriptions, validatedLedgers, *this);
|
||||
|
||||
sources_.push_back(std::move(source));
|
||||
log_.info() << "Added etl source - " << sources_.back()->toString();
|
||||
LOG(log_.info()) << "Added etl source - " << sources_.back()->toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,7 +101,7 @@ LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly)
|
||||
auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, cacheOnly);
|
||||
|
||||
if (!res)
|
||||
log_.error() << "Failed to download initial ledger."
|
||||
LOG(log_.error()) << "Failed to download initial ledger."
|
||||
<< " Sequence = " << sequence << " source = " << source->toString();
|
||||
else
|
||||
response = std::move(data);
|
||||
@@ -122,13 +122,13 @@ LoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObje
|
||||
response = std::move(data);
|
||||
if (status.ok() && response.validated())
|
||||
{
|
||||
log.info() << "Successfully fetched ledger = " << ledgerSequence
|
||||
LOG(log.info()) << "Successfully fetched ledger = " << ledgerSequence
|
||||
<< " from source = " << source->toString();
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
log.warn() << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString()
|
||||
LOG(log.warn()) << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString()
|
||||
<< ", error_code: " << status.error_code() << ", error_msg: " << status.error_message()
|
||||
<< ", source = " << source->toString();
|
||||
return false;
|
||||
@@ -201,7 +201,7 @@ LoadBalancer::execute(Func f, uint32_t ledgerSequence)
|
||||
{
|
||||
auto& source = sources_[sourceIdx];
|
||||
|
||||
log_.debug() << "Attempting to execute func. ledger sequence = " << ledgerSequence
|
||||
LOG(log_.debug()) << "Attempting to execute func. ledger sequence = " << ledgerSequence
|
||||
<< " - source = " << source->toString();
|
||||
// Originally, it was (source->hasLedger(ledgerSequence) || true)
|
||||
/* Sometimes rippled has ledger but doesn't actually know. However,
|
||||
@@ -212,26 +212,27 @@ LoadBalancer::execute(Func f, uint32_t ledgerSequence)
|
||||
bool res = f(source);
|
||||
if (res)
|
||||
{
|
||||
log_.debug() << "Successfully executed func at source = " << source->toString()
|
||||
LOG(log_.debug()) << "Successfully executed func at source = " << source->toString()
|
||||
<< " - ledger sequence = " << ledgerSequence;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.warn() << "Failed to execute func at source = " << source->toString()
|
||||
LOG(log_.warn()) << "Failed to execute func at source = " << source->toString()
|
||||
<< " - ledger sequence = " << ledgerSequence;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.warn() << "Ledger not present at source = " << source->toString()
|
||||
LOG(log_.warn()) << "Ledger not present at source = " << source->toString()
|
||||
<< " - ledger sequence = " << ledgerSequence;
|
||||
}
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
numAttempts++;
|
||||
if (numAttempts % sources_.size() == 0)
|
||||
{
|
||||
log_.info() << "Ledger sequence " << ledgerSequence << " is not yet available from any configured sources. "
|
||||
LOG(log_.info()) << "Ledger sequence " << ledgerSequence
|
||||
<< " is not yet available from any configured sources. "
|
||||
<< "Sleeping and trying again";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
}
|
||||
|
||||
@@ -155,7 +155,7 @@ ProbingSource::make_SSLHooks() noexcept
|
||||
{
|
||||
plainSrc_->pause();
|
||||
currentSrc_ = sslSrc_;
|
||||
log_.info() << "Selected WSS as the main source: " << currentSrc_->toString();
|
||||
LOG(log_.info()) << "Selected WSS as the main source: " << currentSrc_->toString();
|
||||
}
|
||||
return SourceHooks::Action::PROCEED;
|
||||
},
|
||||
@@ -184,7 +184,7 @@ ProbingSource::make_PlainHooks() noexcept
|
||||
{
|
||||
sslSrc_->pause();
|
||||
currentSrc_ = plainSrc_;
|
||||
log_.info() << "Selected Plain WS as the main source: " << currentSrc_->toString();
|
||||
LOG(log_.info()) << "Selected Plain WS as the main source: " << currentSrc_->toString();
|
||||
}
|
||||
return SourceHooks::Action::PROCEED;
|
||||
},
|
||||
|
||||
@@ -59,7 +59,7 @@ PlainSource::close(bool startAgain)
|
||||
derived().ws().async_close(boost::beast::websocket::close_code::normal, [this, startAgain](auto ec) {
|
||||
if (ec)
|
||||
{
|
||||
log_.error() << " async_close : "
|
||||
LOG(log_.error()) << " async_close : "
|
||||
<< "error code = " << ec << " - " << toString();
|
||||
}
|
||||
closing_ = false;
|
||||
@@ -94,7 +94,7 @@ SslSource::close(bool startAgain)
|
||||
derived().ws().async_close(boost::beast::websocket::close_code::normal, [this, startAgain](auto ec) {
|
||||
if (ec)
|
||||
{
|
||||
log_.error() << " async_close : "
|
||||
LOG(log_.error()) << " async_close : "
|
||||
<< "error code = " << ec << " - " << toString();
|
||||
}
|
||||
closing_ = false;
|
||||
|
||||
@@ -273,11 +273,11 @@ public:
|
||||
chArgs.SetMaxReceiveMessageSize(-1);
|
||||
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
|
||||
grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), chArgs));
|
||||
log_.debug() << "Made stub for remote = " << toString();
|
||||
LOG(log_.debug()) << "Made stub for remote = " << toString();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
log_.debug() << "Exception while creating stub = " << e.what() << " . Remote = " << toString();
|
||||
LOG(log_.debug()) << "Exception while creating stub = " << e.what() << " . Remote = " << toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -305,13 +305,13 @@ public:
|
||||
std::string const& clientIp,
|
||||
boost::asio::yield_context yield) const override
|
||||
{
|
||||
log_.trace() << "Attempting to forward request to tx. "
|
||||
LOG(log_.trace()) << "Attempting to forward request to tx. "
|
||||
<< "request = " << boost::json::serialize(request);
|
||||
|
||||
boost::json::object response;
|
||||
if (!isConnected())
|
||||
{
|
||||
log_.error() << "Attempted to proxy but failed to connect to tx";
|
||||
LOG(log_.error()) << "Attempted to proxy but failed to connect to tx";
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -365,7 +365,7 @@ public:
|
||||
|
||||
if (!parsed.is_object())
|
||||
{
|
||||
log_.error() << "Error parsing response: " << std::string{begin, end};
|
||||
LOG(log_.error()) << "Error parsing response: " << std::string{begin, end};
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -376,7 +376,7 @@ public:
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
log_.error() << "Encountered exception : " << e.what();
|
||||
LOG(log_.error()) << "Encountered exception : " << e.what();
|
||||
return {};
|
||||
}
|
||||
}
|
||||
@@ -481,7 +481,7 @@ public:
|
||||
calls.emplace_back(sequence, markers[i], nextMarker);
|
||||
}
|
||||
|
||||
log_.debug() << "Starting data download for ledger " << sequence << ". Using source = " << toString();
|
||||
LOG(log_.debug()) << "Starting data download for ledger " << sequence << ". Using source = " << toString();
|
||||
|
||||
for (auto& c : calls)
|
||||
c.call(stub_, cq);
|
||||
@@ -499,18 +499,18 @@ public:
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
log_.error() << "loadInitialLedger - ok is false";
|
||||
LOG(log_.error()) << "loadInitialLedger - ok is false";
|
||||
return {{}, false}; // handle cancelled
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.trace() << "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
|
||||
auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly);
|
||||
if (result != etl::detail::AsyncCallData::CallStatus::MORE)
|
||||
{
|
||||
++numFinished;
|
||||
log_.debug() << "Finished a marker. "
|
||||
LOG(log_.debug()) << "Finished a marker. "
|
||||
<< "Current number of finished = " << numFinished;
|
||||
|
||||
std::string lastKey = ptr->getLastKey();
|
||||
@@ -524,13 +524,13 @@ public:
|
||||
|
||||
if (backend_->cache().size() > progress)
|
||||
{
|
||||
log_.info() << "Downloaded " << backend_->cache().size() << " records from rippled";
|
||||
LOG(log_.info()) << "Downloaded " << backend_->cache().size() << " records from rippled";
|
||||
progress += incr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log_.info() << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
|
||||
LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
|
||||
return {std::move(edgeKeys), !abort};
|
||||
}
|
||||
|
||||
@@ -540,7 +540,7 @@ public:
|
||||
{
|
||||
if (auto resp = forwardCache_.get(request); resp)
|
||||
{
|
||||
log_.debug() << "request hit forwardCache";
|
||||
LOG(log_.debug()) << "request hit forwardCache";
|
||||
return resp;
|
||||
}
|
||||
|
||||
@@ -607,7 +607,7 @@ public:
|
||||
{"streams", {"ledger", "manifests", "validations", "transactions_proposed"}},
|
||||
};
|
||||
std::string s = boost::json::serialize(jv);
|
||||
log_.trace() << "Sending subscribe stream message";
|
||||
LOG(log_.trace()) << "Sending subscribe stream message";
|
||||
|
||||
derived().ws().set_option(
|
||||
boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::request_type& req) {
|
||||
@@ -689,12 +689,12 @@ public:
|
||||
setValidatedRange({validatedLedgers.data(), validatedLedgers.size()});
|
||||
}
|
||||
|
||||
log_.info() << "Received a message on ledger "
|
||||
LOG(log_.info()) << "Received a message on ledger "
|
||||
<< " subscription stream. Message : " << response << " - " << toString();
|
||||
}
|
||||
else if (response.contains("type") && response.at("type") == "ledgerClosed")
|
||||
{
|
||||
log_.info() << "Received a message on ledger "
|
||||
LOG(log_.info()) << "Received a message on ledger "
|
||||
<< " subscription stream. Message : " << response << " - " << toString();
|
||||
if (response.contains("ledger_index"))
|
||||
{
|
||||
@@ -728,7 +728,7 @@ public:
|
||||
|
||||
if (ledgerIndex != 0)
|
||||
{
|
||||
log_.trace() << "Pushing ledger sequence = " << ledgerIndex << " - " << toString();
|
||||
LOG(log_.trace()) << "Pushing ledger sequence = " << ledgerIndex << " - " << toString();
|
||||
networkValidatedLedgers_->push(ledgerIndex);
|
||||
}
|
||||
|
||||
@@ -736,7 +736,7 @@ public:
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
log_.error() << "Exception in handleMessage : " << e.what();
|
||||
LOG(log_.error()) << "Exception in handleMessage : " << e.what();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -780,16 +780,16 @@ protected:
|
||||
::ERR_error_string_n(ec.value(), buf, sizeof(buf));
|
||||
err += buf;
|
||||
|
||||
log_.error() << err;
|
||||
LOG(log_.error()) << err;
|
||||
}
|
||||
|
||||
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused)
|
||||
{
|
||||
log_.error() << "error code = " << ec << " - " << toString();
|
||||
LOG(log_.error()) << "error code = " << ec << " - " << toString();
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.warn() << "error code = " << ec << " - " << toString();
|
||||
LOG(log_.warn()) << "error code = " << ec << " - " << toString();
|
||||
}
|
||||
|
||||
// exponentially increasing timeouts, with a max of 30 seconds
|
||||
|
||||
@@ -57,7 +57,7 @@ public:
|
||||
|
||||
unsigned char prefix = marker.data()[0];
|
||||
|
||||
log_.debug() << "Setting up AsyncCallData. marker = " << ripple::strHex(marker)
|
||||
LOG(log_.debug()) << "Setting up AsyncCallData. marker = " << ripple::strHex(marker)
|
||||
<< " . prefix = " << ripple::strHex(std::string(1, prefix))
|
||||
<< " . nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_));
|
||||
|
||||
@@ -78,22 +78,22 @@ public:
|
||||
bool abort,
|
||||
bool cacheOnly = false)
|
||||
{
|
||||
log_.trace() << "Processing response. "
|
||||
LOG(log_.trace()) << "Processing response. "
|
||||
<< "Marker prefix = " << getMarkerPrefix();
|
||||
if (abort)
|
||||
{
|
||||
log_.error() << "AsyncCallData aborted";
|
||||
LOG(log_.error()) << "AsyncCallData aborted";
|
||||
return CallStatus::ERRORED;
|
||||
}
|
||||
if (!status_.ok())
|
||||
{
|
||||
log_.error() << "AsyncCallData status_ not ok: "
|
||||
LOG(log_.error()) << "AsyncCallData status_ not ok: "
|
||||
<< " code = " << status_.error_code() << " message = " << status_.error_message();
|
||||
return CallStatus::ERRORED;
|
||||
}
|
||||
if (!next_->is_unlimited())
|
||||
{
|
||||
log_.warn() << "AsyncCallData is_unlimited is false. Make sure "
|
||||
LOG(log_.warn()) << "AsyncCallData is_unlimited is false. Make sure "
|
||||
"secure_gateway is set correctly at the ETL source";
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ public:
|
||||
}
|
||||
|
||||
auto const numObjects = cur_->ledger_objects().objects_size();
|
||||
log_.debug() << "Writing " << numObjects << " objects";
|
||||
LOG(log_.debug()) << "Writing " << numObjects << " objects";
|
||||
|
||||
std::vector<data::LedgerObject> cacheUpdates;
|
||||
cacheUpdates.reserve(numObjects);
|
||||
@@ -145,7 +145,7 @@ public:
|
||||
}
|
||||
}
|
||||
backend.cache().update(cacheUpdates, request_.ledger().sequence(), cacheOnly);
|
||||
log_.debug() << "Wrote " << numObjects << " objects. Got more: " << (more ? "YES" : "NO");
|
||||
LOG(log_.debug()) << "Wrote " << numObjects << " objects. Got more: " << (more ? "YES" : "NO");
|
||||
|
||||
return more ? CallStatus::MORE : CallStatus::DONE;
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ public:
|
||||
if (cacheLoadStyle_ == LoadStyle::NOT_AT_ALL)
|
||||
{
|
||||
cache_.get().setDisabled();
|
||||
log_.warn() << "Cache is disabled. Not loading";
|
||||
LOG(log_.warn()) << "Cache is disabled. Not loading";
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -165,10 +165,10 @@ public:
|
||||
// If loading synchronously, poll cache until full
|
||||
while (cacheLoadStyle_ == LoadStyle::SYNC && not cache_.get().isFull())
|
||||
{
|
||||
log_.debug() << "Cache not full. Cache size = " << cache_.get().size() << ". Sleeping ...";
|
||||
LOG(log_.debug()) << "Cache not full. Cache size = " << cache_.get().size() << ". Sleeping ...";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
if (cache_.get().isFull())
|
||||
log_.info() << "Cache is full. Cache size = " << cache_.get().size();
|
||||
LOG(log_.info()) << "Cache is full. Cache size = " << cache_.get().size();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,7 +186,7 @@ private:
|
||||
std::string const& port,
|
||||
boost::asio::yield_context yield)
|
||||
{
|
||||
log_.info() << "Loading cache from peer. ip = " << ip << " . port = " << port;
|
||||
LOG(log_.info()) << "Loading cache from peer. ip = " << ip << " . port = " << port;
|
||||
namespace beast = boost::beast; // from <boost/beast.hpp>
|
||||
namespace http = beast::http; // from <boost/beast/http.hpp>
|
||||
namespace websocket = beast::websocket; // from
|
||||
@@ -198,7 +198,7 @@ private:
|
||||
// These objects perform our I/O
|
||||
tcp::resolver resolver{ioContext_.get()};
|
||||
|
||||
log_.trace() << "Creating websocket";
|
||||
LOG(log_.trace()) << "Creating websocket";
|
||||
auto ws = std::make_unique<websocket::stream<beast::tcp_stream>>(ioContext_.get());
|
||||
|
||||
// Look up the domain name
|
||||
@@ -206,13 +206,13 @@ private:
|
||||
if (ec)
|
||||
return {};
|
||||
|
||||
log_.trace() << "Connecting websocket";
|
||||
LOG(log_.trace()) << "Connecting websocket";
|
||||
// Make the connection on the IP address we get from a lookup
|
||||
ws->next_layer().async_connect(results, yield[ec]);
|
||||
if (ec)
|
||||
return false;
|
||||
|
||||
log_.trace() << "Performing websocket handshake";
|
||||
LOG(log_.trace()) << "Performing websocket handshake";
|
||||
// Perform the websocket handshake
|
||||
ws->async_handshake(ip, "/", yield[ec]);
|
||||
if (ec)
|
||||
@@ -220,7 +220,7 @@ private:
|
||||
|
||||
std::optional<boost::json::value> marker;
|
||||
|
||||
log_.trace() << "Sending request";
|
||||
LOG(log_.trace()) << "Sending request";
|
||||
auto getRequest = [&](auto marker) {
|
||||
boost::json::object request = {
|
||||
{"command", "ledger_data"},
|
||||
@@ -242,7 +242,7 @@ private:
|
||||
ws->async_write(net::buffer(boost::json::serialize(getRequest(marker))), yield[ec]);
|
||||
if (ec)
|
||||
{
|
||||
log_.error() << "error writing = " << ec.message();
|
||||
LOG(log_.error()) << "error writing = " << ec.message();
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -250,7 +250,7 @@ private:
|
||||
ws->async_read(buffer, yield[ec]);
|
||||
if (ec)
|
||||
{
|
||||
log_.error() << "error reading = " << ec.message();
|
||||
LOG(log_.error()) << "error reading = " << ec.message();
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -259,27 +259,28 @@ private:
|
||||
|
||||
if (!parsed.is_object())
|
||||
{
|
||||
log_.error() << "Error parsing response: " << raw;
|
||||
LOG(log_.error()) << "Error parsing response: " << raw;
|
||||
return false;
|
||||
}
|
||||
log_.trace() << "Successfully parsed response " << parsed;
|
||||
LOG(log_.trace()) << "Successfully parsed response " << parsed;
|
||||
|
||||
if (auto const& response = parsed.as_object(); response.contains("error"))
|
||||
{
|
||||
log_.error() << "Response contains error: " << response;
|
||||
LOG(log_.error()) << "Response contains error: " << response;
|
||||
auto const& err = response.at("error");
|
||||
if (err.is_string() && err.as_string() == "lgrNotFound")
|
||||
{
|
||||
++numAttempts;
|
||||
if (numAttempts >= 5)
|
||||
{
|
||||
log_.error() << " ledger not found at peer after 5 attempts. "
|
||||
LOG(log_.error()) << " ledger not found at peer after 5 attempts. "
|
||||
"peer = "
|
||||
<< ip << " ledger = " << ledgerIndex
|
||||
<< ". Check your config and the health of the peer";
|
||||
return false;
|
||||
}
|
||||
log_.warn() << "Ledger not found. ledger = " << ledgerIndex << ". Sleeping and trying again";
|
||||
LOG(log_.warn()) << "Ledger not found. ledger = " << ledgerIndex
|
||||
<< ". Sleeping and trying again";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
continue;
|
||||
}
|
||||
@@ -290,7 +291,7 @@ private:
|
||||
|
||||
if (!response.contains("cache_full") || !response.at("cache_full").as_bool())
|
||||
{
|
||||
log_.error() << "cache not full for clio node. ip = " << ip;
|
||||
LOG(log_.error()) << "cache not full for clio node. ip = " << ip;
|
||||
return false;
|
||||
}
|
||||
if (response.contains("marker"))
|
||||
@@ -310,7 +311,7 @@ private:
|
||||
|
||||
if (!stateObject.key.parseHex(obj.at("index").as_string().c_str()))
|
||||
{
|
||||
log_.error() << "failed to parse object id";
|
||||
LOG(log_.error()) << "failed to parse object id";
|
||||
return false;
|
||||
}
|
||||
boost::algorithm::unhex(obj.at("data").as_string().c_str(), std::back_inserter(stateObject.blob));
|
||||
@@ -319,17 +320,17 @@ private:
|
||||
cache_.get().update(objects, ledgerIndex, true);
|
||||
|
||||
if (marker)
|
||||
log_.debug() << "At marker " << *marker;
|
||||
LOG(log_.debug()) << "At marker " << *marker;
|
||||
} while (marker || !started);
|
||||
|
||||
log_.info() << "Finished downloading ledger from clio node. ip = " << ip;
|
||||
LOG(log_.info()) << "Finished downloading ledger from clio node. ip = " << ip;
|
||||
|
||||
cache_.get().setFull();
|
||||
return true;
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
log_.error() << "Encountered exception : " << e.what() << " - ip = " << ip;
|
||||
LOG(log_.error()) << "Encountered exception : " << e.what() << " - ip = " << ip;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -366,8 +367,8 @@ private:
|
||||
if (c)
|
||||
cursorStr << ripple::strHex(*c) << ", ";
|
||||
|
||||
log_.info() << "Loading cache. num cursors = " << cursors.size() - 1;
|
||||
log_.trace() << "cursors = " << cursorStr.str();
|
||||
LOG(log_.info()) << "Loading cache. num cursors = " << cursors.size() - 1;
|
||||
LOG(log_.trace()) << "cursors = " << cursorStr.str();
|
||||
|
||||
thread_ = std::thread{[this, seq, cursors]() {
|
||||
auto startTime = std::chrono::system_clock::now();
|
||||
@@ -388,7 +389,7 @@ private:
|
||||
std::optional<ripple::uint256> cursor = start;
|
||||
std::string cursorStr =
|
||||
cursor.has_value() ? ripple::strHex(cursor.value()) : ripple::strHex(data::firstKey);
|
||||
log_.debug() << "Starting a cursor: " << cursorStr << " markers = " << *markers;
|
||||
LOG(log_.debug()) << "Starting a cursor: " << cursorStr << " markers = " << *markers;
|
||||
|
||||
while (not stopping_)
|
||||
{
|
||||
@@ -401,7 +402,7 @@ private:
|
||||
if (!res.cursor || (end && *(res.cursor) > *end))
|
||||
break;
|
||||
|
||||
log_.trace() << "Loading cache. cache size = " << cache_.get().size()
|
||||
LOG(log_.trace()) << "Loading cache. cache size = " << cache_.get().size()
|
||||
<< " - cursor = " << ripple::strHex(res.cursor.value())
|
||||
<< " start = " << cursorStr << " markers = " << *markers;
|
||||
|
||||
@@ -416,13 +417,13 @@ private:
|
||||
auto endTime = std::chrono::system_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime);
|
||||
|
||||
log_.info() << "Finished loading cache. cache size = " << cache_.get().size() << ". Took "
|
||||
<< duration.count() << " seconds";
|
||||
LOG(log_.info()) << "Finished loading cache. cache size = " << cache_.get().size()
|
||||
<< ". Took " << duration.count() << " seconds";
|
||||
cache_.get().setFull();
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.info() << "Finished a cursor. num remaining = " << *numRemaining
|
||||
LOG(log_.info()) << "Finished a cursor. num remaining = " << *numRemaining
|
||||
<< " start = " << cursorStr << " markers = " << *markers;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -126,7 +126,7 @@ private:
|
||||
std::shared_ptr<QueueType>
|
||||
getQueue(uint32_t sequence)
|
||||
{
|
||||
log_.debug() << "Grabbing extraction queue for " << sequence << "; start was " << startSequence_;
|
||||
LOG(log_.debug()) << "Grabbing extraction queue for " << sequence << "; start was " << startSequence_;
|
||||
return queues_[(sequence - startSequence_) % stride_];
|
||||
}
|
||||
};
|
||||
|
||||
@@ -103,7 +103,7 @@ private:
|
||||
|
||||
// TODO: extract this part into a strategy perhaps
|
||||
auto const tps = fetchResponse->transactions_list().transactions_size() / time;
|
||||
log_.info() << "Extract phase time = " << time << "; Extract phase tps = " << tps
|
||||
LOG(log_.info()) << "Extract phase time = " << time << "; Extract phase tps = " << tps
|
||||
<< "; Avg extract time = " << totalTime / (currentSequence - startSequence_ + 1)
|
||||
<< "; seq = " << currentSequence;
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ namespace etl::detail {
|
||||
void
|
||||
ForwardCache::freshen()
|
||||
{
|
||||
log_.trace() << "Freshening ForwardCache";
|
||||
LOG(log_.trace()) << "Freshening ForwardCache";
|
||||
|
||||
auto numOutstanding = std::make_shared<std::atomic_uint>(latestForwarded_.size());
|
||||
|
||||
|
||||
@@ -66,11 +66,11 @@ public:
|
||||
OptionalGetLedgerResponseType
|
||||
fetchData(uint32_t sequence)
|
||||
{
|
||||
log_.debug() << "Attempting to fetch ledger with sequence = " << sequence;
|
||||
LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence;
|
||||
|
||||
auto response = loadBalancer_->fetchLedger(sequence, false, false);
|
||||
if (response)
|
||||
log_.trace() << "GetLedger reply = " << response->DebugString();
|
||||
LOG(log_.trace()) << "GetLedger reply = " << response->DebugString();
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -87,12 +87,12 @@ public:
|
||||
OptionalGetLedgerResponseType
|
||||
fetchDataAndDiff(uint32_t sequence)
|
||||
{
|
||||
log_.debug() << "Attempting to fetch ledger with sequence = " << sequence;
|
||||
LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence;
|
||||
|
||||
auto response = loadBalancer_->fetchLedger(
|
||||
sequence, true, !backend_->cache().isFull() || backend_->cache().latestLedgerSequence() >= sequence);
|
||||
if (response)
|
||||
log_.trace() << "GetLedger reply = " << response->DebugString();
|
||||
LOG(log_.trace()) << "GetLedger reply = " << response->DebugString();
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ public:
|
||||
ripple::SerialIter it{raw->data(), raw->size()};
|
||||
ripple::STTx sttx{it};
|
||||
|
||||
log_.trace() << "Inserting transaction = " << sttx.getTransactionID();
|
||||
LOG(log_.trace()) << "Inserting transaction = " << sttx.getTransactionID();
|
||||
|
||||
ripple::TxMeta txMeta{sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
|
||||
|
||||
@@ -149,7 +149,7 @@ public:
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
if (rng)
|
||||
{
|
||||
log_.fatal() << "Database is not empty";
|
||||
LOG(log_.fatal()) << "Database is not empty";
|
||||
assert(false);
|
||||
return {};
|
||||
}
|
||||
@@ -162,18 +162,18 @@ public:
|
||||
|
||||
ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(ledgerData->ledger_header()));
|
||||
|
||||
log_.debug() << "Deserialized ledger header. " << ::util::toString(lgrInfo);
|
||||
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo);
|
||||
|
||||
auto timeDiff = ::util::timed<std::chrono::duration<double>>([this, sequence, &lgrInfo, &ledgerData]() {
|
||||
backend_->startWrites();
|
||||
|
||||
log_.debug() << "Started writes";
|
||||
LOG(log_.debug()) << "Started writes";
|
||||
|
||||
backend_->writeLedger(lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
|
||||
|
||||
log_.debug() << "Wrote ledger";
|
||||
LOG(log_.debug()) << "Wrote ledger";
|
||||
FormattedTransactionsData insertTxResult = insertTransactions(lgrInfo, *ledgerData);
|
||||
log_.debug() << "Inserted txns";
|
||||
LOG(log_.debug()) << "Inserted txns";
|
||||
|
||||
// download the full account state map. This function downloads full
|
||||
// ledger data and pushes the downloaded data into the writeQueue.
|
||||
@@ -191,7 +191,7 @@ public:
|
||||
::util::timed<std::chrono::seconds>([this, edgeKeys = &edgeKeys, sequence, &numWrites]() {
|
||||
for (auto& key : *edgeKeys)
|
||||
{
|
||||
log_.debug() << "Writing edge key = " << ripple::strHex(key);
|
||||
LOG(log_.debug()) << "Writing edge key = " << ripple::strHex(key);
|
||||
auto succ =
|
||||
backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence);
|
||||
if (succ)
|
||||
@@ -215,8 +215,8 @@ public:
|
||||
assert(succ);
|
||||
if (succ->key == cur->key)
|
||||
{
|
||||
log_.debug() << "Writing book successor = " << ripple::strHex(base) << " - "
|
||||
<< ripple::strHex(cur->key);
|
||||
LOG(log_.debug()) << "Writing book successor = " << ripple::strHex(base)
|
||||
<< " - " << ripple::strHex(cur->key);
|
||||
|
||||
backend_->writeSuccessor(
|
||||
uint256ToString(base), sequence, uint256ToString(cur->key));
|
||||
@@ -228,18 +228,18 @@ public:
|
||||
|
||||
prev = std::move(cur->key);
|
||||
if (numWrites % 100000 == 0 && numWrites != 0)
|
||||
log_.info() << "Wrote " << numWrites << " book successors";
|
||||
LOG(log_.info()) << "Wrote " << numWrites << " book successors";
|
||||
}
|
||||
|
||||
backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::lastKey));
|
||||
++numWrites;
|
||||
});
|
||||
|
||||
log_.info() << "Looping through cache and submitting all writes took " << seconds
|
||||
LOG(log_.info()) << "Looping through cache and submitting all writes took " << seconds
|
||||
<< " seconds. numWrites = " << std::to_string(numWrites);
|
||||
}
|
||||
|
||||
log_.debug() << "Loaded initial ledger";
|
||||
LOG(log_.debug()) << "Loaded initial ledger";
|
||||
|
||||
if (not state_.get().isStopping)
|
||||
{
|
||||
@@ -251,7 +251,7 @@ public:
|
||||
backend_->finishWrites(sequence);
|
||||
});
|
||||
|
||||
log_.debug() << "Time to download and store ledger = " << timeDiff;
|
||||
LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
|
||||
return lgrInfo;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -89,7 +89,7 @@ public:
|
||||
bool
|
||||
publish(uint32_t ledgerSequence, std::optional<uint32_t> maxAttempts)
|
||||
{
|
||||
log_.info() << "Attempting to publish ledger = " << ledgerSequence;
|
||||
LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
|
||||
size_t numAttempts = 0;
|
||||
while (not state_.get().isStopping)
|
||||
{
|
||||
@@ -97,14 +97,14 @@ public:
|
||||
|
||||
if (!range || range->maxSequence < ledgerSequence)
|
||||
{
|
||||
log_.debug() << "Trying to publish. Could not find "
|
||||
LOG(log_.debug()) << "Trying to publish. Could not find "
|
||||
"ledger with sequence = "
|
||||
<< ledgerSequence;
|
||||
|
||||
// We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
|
||||
if (maxAttempts && numAttempts >= maxAttempts)
|
||||
{
|
||||
log_.debug() << "Failed to publish ledger after " << numAttempts << " attempts.";
|
||||
LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
|
||||
return false;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
@@ -136,11 +136,11 @@ public:
|
||||
publish(ripple::LedgerHeader const& lgrInfo)
|
||||
{
|
||||
boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
|
||||
log_.info() << "Publishing ledger " << std::to_string(lgrInfo.seq);
|
||||
LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
|
||||
|
||||
if (!state_.get().isWriting)
|
||||
{
|
||||
log_.info() << "Updating cache";
|
||||
LOG(log_.info()) << "Updating cache";
|
||||
|
||||
std::vector<data::LedgerObject> diff = data::synchronousAndRetryOnTimeout(
|
||||
[&](auto yield) { return backend_->fetchLedgerDiff(lgrInfo.seq, yield); });
|
||||
@@ -177,10 +177,10 @@ public:
|
||||
subscriptions_->pubBookChanges(lgrInfo, transactions);
|
||||
|
||||
setLastPublishTime();
|
||||
log_.info() << "Published ledger " << std::to_string(lgrInfo.seq);
|
||||
LOG(log_.info()) << "Published ledger " << std::to_string(lgrInfo.seq);
|
||||
}
|
||||
else
|
||||
log_.info() << "Skipping publishing ledger " << std::to_string(lgrInfo.seq);
|
||||
LOG(log_.info()) << "Skipping publishing ledger " << std::to_string(lgrInfo.seq);
|
||||
});
|
||||
|
||||
// we track latest publish-requested seq, not necessarily already published
|
||||
|
||||
@@ -137,7 +137,7 @@ private:
|
||||
auto const end = std::chrono::system_clock::now();
|
||||
auto const duration = ((end - start).count()) / 1000000000.0;
|
||||
|
||||
log_.info() << "Load phase of etl : "
|
||||
LOG(log_.info()) << "Load phase of etl : "
|
||||
<< "Successfully wrote ledger! Ledger info: " << util::toString(lgrInfo)
|
||||
<< ". txn count = " << numTxns << ". object count = " << numObjects
|
||||
<< ". load time = " << duration << ". load txns per second = " << numTxns / duration
|
||||
@@ -148,7 +148,7 @@ private:
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.error() << "Error writing ledger. " << util::toString(lgrInfo);
|
||||
LOG(log_.error()) << "Error writing ledger. " << util::toString(lgrInfo);
|
||||
}
|
||||
|
||||
setWriteConflict(not success);
|
||||
@@ -165,10 +165,10 @@ private:
|
||||
std::pair<ripple::LedgerHeader, bool>
|
||||
buildNextLedger(GetLedgerResponseType& rawData)
|
||||
{
|
||||
log_.debug() << "Beginning ledger update";
|
||||
LOG(log_.debug()) << "Beginning ledger update";
|
||||
ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
|
||||
|
||||
log_.debug() << "Deserialized ledger header. " << ::util::toString(lgrInfo);
|
||||
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo);
|
||||
backend_->startWrites();
|
||||
backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header()));
|
||||
|
||||
@@ -178,7 +178,7 @@ private:
|
||||
{
|
||||
updateCache(lgrInfo, rawData);
|
||||
|
||||
log_.debug() << "Inserted/modified/deleted all objects. Number of objects = "
|
||||
LOG(log_.debug()) << "Inserted/modified/deleted all objects. Number of objects = "
|
||||
<< rawData.ledger_objects().objects_size();
|
||||
|
||||
insertTxResultOp.emplace(loader_.get().insertTransactions(lgrInfo, rawData));
|
||||
@@ -193,7 +193,7 @@ private:
|
||||
return {ripple::LedgerHeader{}, false};
|
||||
}
|
||||
|
||||
log_.debug() << "Inserted all transactions. Number of transactions = "
|
||||
LOG(log_.debug()) << "Inserted all transactions. Number of transactions = "
|
||||
<< rawData.transactions_list().transactions_size();
|
||||
|
||||
backend_->writeAccountTransactions(std::move(insertTxResultOp->accountTxData));
|
||||
@@ -203,8 +203,8 @@ private:
|
||||
auto [success, duration] =
|
||||
::util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(lgrInfo.seq); });
|
||||
|
||||
log_.debug() << "Finished writes. Total time: " << std::to_string(duration);
|
||||
log_.debug() << "Finished ledger update: " << ::util::toString(lgrInfo);
|
||||
LOG(log_.debug()) << "Finished writes. Total time: " << std::to_string(duration);
|
||||
LOG(log_.debug()) << "Finished ledger update: " << ::util::toString(lgrInfo);
|
||||
|
||||
return {lgrInfo, success};
|
||||
}
|
||||
@@ -231,11 +231,11 @@ private:
|
||||
assert(key);
|
||||
|
||||
cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
|
||||
log_.debug() << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type();
|
||||
LOG(log_.debug()) << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type();
|
||||
|
||||
if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included())
|
||||
{
|
||||
log_.debug() << "object neighbors not included. using cache";
|
||||
LOG(log_.debug()) << "object neighbors not included. using cache";
|
||||
|
||||
if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1)
|
||||
throw std::runtime_error("Cache is not full, but object neighbors were not included");
|
||||
@@ -257,7 +257,7 @@ private:
|
||||
|
||||
if (checkBookBase)
|
||||
{
|
||||
log_.debug() << "Is book dir. Key = " << ripple::strHex(*key);
|
||||
LOG(log_.debug()) << "Is book dir. Key = " << ripple::strHex(*key);
|
||||
|
||||
auto const bookBase = getBookBase(*key);
|
||||
auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
|
||||
@@ -266,7 +266,8 @@ private:
|
||||
// We deleted the first directory, or we added a directory prior to the old first directory
|
||||
if ((isDeleted && key == oldFirstDir->key) || (!isDeleted && key < oldFirstDir->key))
|
||||
{
|
||||
log_.debug() << "Need to recalculate book base successor. base = " << ripple::strHex(bookBase)
|
||||
LOG(log_.debug())
|
||||
<< "Need to recalculate book base successor. base = " << ripple::strHex(bookBase)
|
||||
<< " - key = " << ripple::strHex(*key) << " - isDeleted = " << isDeleted
|
||||
<< " - seq = " << lgrInfo.seq;
|
||||
bookSuccessorsToCalculate.insert(bookBase);
|
||||
@@ -285,7 +286,7 @@ private:
|
||||
// rippled didn't send successor information, so use our cache
|
||||
if (!rawData.object_neighbors_included())
|
||||
{
|
||||
log_.debug() << "object neighbors not included. using cache";
|
||||
LOG(log_.debug()) << "object neighbors not included. using cache";
|
||||
if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq)
|
||||
throw std::runtime_error("Cache is not full, but object neighbors were not included");
|
||||
|
||||
@@ -304,7 +305,7 @@ private:
|
||||
|
||||
if (obj.blob.size() == 0)
|
||||
{
|
||||
log_.debug() << "writing successor for deleted object " << ripple::strHex(obj.key) << " - "
|
||||
LOG(log_.debug()) << "writing successor for deleted object " << ripple::strHex(obj.key) << " - "
|
||||
<< ripple::strHex(lb->key) << " - " << ripple::strHex(ub->key);
|
||||
|
||||
backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(ub->key));
|
||||
@@ -314,7 +315,7 @@ private:
|
||||
backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(obj.key));
|
||||
backend_->writeSuccessor(uint256ToString(obj.key), lgrInfo.seq, uint256ToString(ub->key));
|
||||
|
||||
log_.debug() << "writing successor for new object " << ripple::strHex(lb->key) << " - "
|
||||
LOG(log_.debug()) << "writing successor for new object " << ripple::strHex(lb->key) << " - "
|
||||
<< ripple::strHex(obj.key) << " - " << ripple::strHex(ub->key);
|
||||
}
|
||||
}
|
||||
@@ -326,14 +327,14 @@ private:
|
||||
{
|
||||
backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(succ->key));
|
||||
|
||||
log_.debug() << "Updating book successor " << ripple::strHex(base) << " - "
|
||||
LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
|
||||
<< ripple::strHex(succ->key);
|
||||
}
|
||||
else
|
||||
{
|
||||
backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(data::lastKey));
|
||||
|
||||
log_.debug() << "Updating book successor " << ripple::strHex(base) << " - "
|
||||
LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
|
||||
<< ripple::strHex(data::lastKey);
|
||||
}
|
||||
}
|
||||
@@ -352,14 +353,14 @@ private:
|
||||
// Write successor info, if included from rippled
|
||||
if (rawData.object_neighbors_included())
|
||||
{
|
||||
log_.debug() << "object neighbors included";
|
||||
LOG(log_.debug()) << "object neighbors included";
|
||||
|
||||
for (auto& obj : *(rawData.mutable_book_successors()))
|
||||
{
|
||||
auto firstBook = std::move(*obj.mutable_first_book());
|
||||
if (!firstBook.size())
|
||||
firstBook = uint256ToString(data::lastKey);
|
||||
log_.debug() << "writing book successor " << ripple::strHex(obj.book_base()) << " - "
|
||||
LOG(log_.debug()) << "writing book successor " << ripple::strHex(obj.book_base()) << " - "
|
||||
<< ripple::strHex(firstBook);
|
||||
|
||||
backend_->writeSuccessor(std::move(*obj.mutable_book_base()), lgrInfo.seq, std::move(firstBook));
|
||||
@@ -378,14 +379,14 @@ private:
|
||||
|
||||
if (obj.mod_type() == RawLedgerObjectType::DELETED)
|
||||
{
|
||||
log_.debug() << "Modifying successors for deleted object " << ripple::strHex(obj.key()) << " - "
|
||||
<< ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
|
||||
LOG(log_.debug()) << "Modifying successors for deleted object " << ripple::strHex(obj.key())
|
||||
<< " - " << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
|
||||
|
||||
backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::move(*succPtr));
|
||||
}
|
||||
else
|
||||
{
|
||||
log_.debug() << "adding successor for new object " << ripple::strHex(obj.key()) << " - "
|
||||
LOG(log_.debug()) << "adding successor for new object " << ripple::strHex(obj.key()) << " - "
|
||||
<< ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
|
||||
|
||||
backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::string{obj.key()});
|
||||
@@ -393,7 +394,7 @@ private:
|
||||
}
|
||||
}
|
||||
else
|
||||
log_.debug() << "object modified " << ripple::strHex(obj.key());
|
||||
LOG(log_.debug()) << "object modified " << ripple::strHex(obj.key());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,7 +320,7 @@ public:
|
||||
// We will eventually want to clamp this to be the number of strands,
|
||||
// since adding more threads than we have strands won't see any
|
||||
// performance benefits
|
||||
log_.info() << "Starting subscription manager with " << numThreads << " workers";
|
||||
LOG(log_.info()) << "Starting subscription manager with " << numThreads << " workers";
|
||||
|
||||
workers_.reserve(numThreads);
|
||||
for (auto i = numThreads; i > 0; --i)
|
||||
|
||||
@@ -164,15 +164,15 @@ try
|
||||
}
|
||||
|
||||
LogService::init(config);
|
||||
LogService::info() << "Clio version: " << Build::getClioFullVersionString();
|
||||
LOG(LogService::info()) << "Clio version: " << Build::getClioFullVersionString();
|
||||
|
||||
auto const threads = config.valueOr("io_threads", 2);
|
||||
if (threads <= 0)
|
||||
{
|
||||
LogService::fatal() << "io_threads is less than 1";
|
||||
LOG(LogService::fatal()) << "io_threads is less than 1";
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
LogService::info() << "Number of io threads = " << threads;
|
||||
LOG(LogService::info()) << "Number of io threads = " << threads;
|
||||
|
||||
// IO context to handle all incoming requests, as well as other things.
|
||||
// This is not the only io context in the application.
|
||||
@@ -224,5 +224,5 @@ try
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
LogService::fatal() << "Exit on exception: " << e.what();
|
||||
LOG(LogService::fatal()) << "Exit on exception: " << e.what();
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ public:
|
||||
|
||||
if (backend_->isTooBusy())
|
||||
{
|
||||
log_.error() << "Database is too busy. Rejecting request";
|
||||
LOG(log_.error()) << "Database is too busy. Rejecting request";
|
||||
notifyTooBusy(); // TODO: should we add ctx.method if we have it?
|
||||
return Status{RippledError::rpcTOO_BUSY};
|
||||
}
|
||||
@@ -143,13 +143,13 @@ public:
|
||||
|
||||
try
|
||||
{
|
||||
perfLog_.debug() << ctx.tag() << " start executing rpc `" << ctx.method << '`';
|
||||
LOG(perfLog_.debug()) << ctx.tag() << " start executing rpc `" << ctx.method << '`';
|
||||
|
||||
auto const isAdmin = adminVerifier_.isAdmin(ctx.clientIp);
|
||||
auto const context = Context{ctx.yield, ctx.session, isAdmin, ctx.clientIp, ctx.apiVersion};
|
||||
auto const v = (*method).process(ctx.params, context);
|
||||
|
||||
perfLog_.debug() << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
|
||||
LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
|
||||
|
||||
if (v)
|
||||
return v->as_object();
|
||||
@@ -161,14 +161,14 @@ public:
|
||||
}
|
||||
catch (data::DatabaseTimeout const& t)
|
||||
{
|
||||
log_.error() << "Database timeout";
|
||||
LOG(log_.error()) << "Database timeout";
|
||||
notifyTooBusy();
|
||||
|
||||
return Status{RippledError::rpcTOO_BUSY};
|
||||
}
|
||||
catch (std::exception const& ex)
|
||||
{
|
||||
log_.error() << ctx.tag() << "Caught exception: " << ex.what();
|
||||
LOG(log_.error()) << ctx.tag() << "Caught exception: " << ex.what();
|
||||
notifyInternalError();
|
||||
|
||||
return Status{RippledError::rpcINTERNAL};
|
||||
|
||||
@@ -163,7 +163,7 @@ deserializeTxPlusMeta(data::TransactionAndMetadata const& blobs)
|
||||
std::stringstream meta;
|
||||
std::copy(blobs.transaction.begin(), blobs.transaction.end(), std::ostream_iterator<unsigned char>(txn));
|
||||
std::copy(blobs.metadata.begin(), blobs.metadata.end(), std::ostream_iterator<unsigned char>(meta));
|
||||
gLog.error() << "Failed to deserialize transaction. txn = " << txn.str() << " - meta = " << meta.str()
|
||||
LOG(gLog.error()) << "Failed to deserialize transaction. txn = " << txn.str() << " - meta = " << meta.str()
|
||||
<< " txn length = " << std::to_string(blobs.transaction.size())
|
||||
<< " meta length = " << std::to_string(blobs.metadata.size());
|
||||
throw e;
|
||||
@@ -654,12 +654,12 @@ traverseOwnedNodes(
|
||||
}
|
||||
auto end = std::chrono::system_clock::now();
|
||||
|
||||
gLog.debug() << "Time loading owned directories: "
|
||||
LOG(gLog.debug()) << "Time loading owned directories: "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << " milliseconds";
|
||||
|
||||
auto [objects, timeDiff] = util::timed([&]() { return backend.fetchLedgerObjects(keys, sequence, yield); });
|
||||
|
||||
gLog.debug() << "Time loading owned entries: " << timeDiff << " milliseconds";
|
||||
LOG(gLog.debug()) << "Time loading owned entries: " << timeDiff << " milliseconds";
|
||||
|
||||
for (auto i = 0u; i < objects.size(); ++i)
|
||||
{
|
||||
@@ -1133,7 +1133,7 @@ postProcessOrderBook(
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
gLog.error() << "caught exception: " << e.what();
|
||||
LOG(gLog.error()) << "caught exception: " << e.what();
|
||||
}
|
||||
}
|
||||
return jsonOffers;
|
||||
|
||||
@@ -240,11 +240,11 @@ logDuration(web::Context const& ctx, T const& dur)
|
||||
serialize(util::removeSecret(ctx.params)));
|
||||
|
||||
if (seconds > 10)
|
||||
log.error() << ctx.tag() << msg;
|
||||
LOG(log.error()) << ctx.tag() << msg;
|
||||
else if (seconds > 1)
|
||||
log.warn() << ctx.tag() << msg;
|
||||
LOG(log.warn()) << ctx.tag() << msg;
|
||||
else
|
||||
log.info() << ctx.tag() << msg;
|
||||
LOG(log.info()) << ctx.tag() << msg;
|
||||
}
|
||||
|
||||
} // namespace rpc
|
||||
|
||||
@@ -72,7 +72,7 @@ public:
|
||||
auto const numThreads = config.valueOr<uint32_t>("workers", std::thread::hardware_concurrency());
|
||||
auto const maxQueueSize = serverConfig.valueOr<uint32_t>("max_queue_size", 0); // 0 is no limit
|
||||
|
||||
log.info() << "Number of workers = " << numThreads << ". Max queue size = " << maxQueueSize;
|
||||
LOG(log.info()) << "Number of workers = " << numThreads << ". Max queue size = " << maxQueueSize;
|
||||
return WorkQueue{numThreads, maxQueueSize};
|
||||
}
|
||||
|
||||
@@ -92,7 +92,8 @@ public:
|
||||
{
|
||||
if (curSize_ >= maxSize_ && !isWhiteListed)
|
||||
{
|
||||
log_.warn() << "Queue is full. rejecting job. current size = " << curSize_ << "; max size = " << maxSize_;
|
||||
LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << curSize_
|
||||
<< "; max size = " << maxSize_;
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -108,7 +109,7 @@ public:
|
||||
|
||||
++queued_;
|
||||
durationUs_ += wait;
|
||||
log_.info() << "WorkQueue wait time = " << wait << " queue size = " << curSize_;
|
||||
LOG(log_.info()) << "WorkQueue wait time = " << wait << " queue size = " << curSize_;
|
||||
|
||||
func(yield);
|
||||
--curSize_;
|
||||
|
||||
@@ -51,9 +51,9 @@ public:
|
||||
auto checkRange = [this](uint32_t version, std::string label) {
|
||||
if (std::clamp(version, API_VERSION_MIN, API_VERSION_MAX) != version)
|
||||
{
|
||||
log_.error() << "API version settings issue detected: " << label << " version with value " << version
|
||||
<< " is outside of supported range " << API_VERSION_MIN << "-" << API_VERSION_MAX
|
||||
<< "; Falling back to hardcoded values.";
|
||||
LOG(log_.error()) << "API version settings issue detected: " << label << " version with value "
|
||||
<< version << " is outside of supported range " << API_VERSION_MIN << "-"
|
||||
<< API_VERSION_MAX << "; Falling back to hardcoded values.";
|
||||
|
||||
defaultVersion_ = API_VERSION_DEFAULT;
|
||||
minVersion_ = API_VERSION_MIN;
|
||||
@@ -66,7 +66,7 @@ public:
|
||||
checkRange(maxVersion, "maximum");
|
||||
#endif
|
||||
|
||||
log_.info() << "API version settings: [min = " << minVersion_ << "; max = " << maxVersion_
|
||||
LOG(log_.info()) << "API version settings: [min = " << minVersion_ << "; max = " << maxVersion_
|
||||
<< "; default = " << defaultVersion_ << "]";
|
||||
}
|
||||
|
||||
|
||||
@@ -87,7 +87,7 @@ AccountTxHandler::process(AccountTxHandler::Input input, Context const& ctx) con
|
||||
return sharedPtrBackend_->fetchAccountTransactions(*accountID, limit, input.forward, cursor, ctx.yield);
|
||||
});
|
||||
|
||||
log_.info() << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size();
|
||||
LOG(log_.info()) << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size();
|
||||
|
||||
auto const [blobs, retCursor] = txnsAndCursor;
|
||||
Output response;
|
||||
@@ -106,7 +106,7 @@ AccountTxHandler::process(AccountTxHandler::Input input, Context const& ctx) con
|
||||
}
|
||||
else if (txnPlusMeta.ledgerSequence > maxIndex && !input.forward)
|
||||
{
|
||||
log_.debug() << "Skipping over transactions from incomplete ledger";
|
||||
LOG(log_.debug()) << "Skipping over transactions from incomplete ledger";
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -150,7 +150,7 @@ LedgerDataHandler::process(Input input, Context const& ctx) const
|
||||
}
|
||||
|
||||
auto const end = std::chrono::system_clock::now();
|
||||
log_.debug() << "Number of results = " << results.size() << " fetched in "
|
||||
LOG(log_.debug()) << "Number of results = " << results.size() << " fetched in "
|
||||
<< std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << " microseconds";
|
||||
|
||||
output.states.reserve(results.size());
|
||||
@@ -180,7 +180,7 @@ LedgerDataHandler::process(Input input, Context const& ctx) const
|
||||
output.cacheFull = sharedPtrBackend_->cache().isFull();
|
||||
|
||||
auto const end2 = std::chrono::system_clock::now();
|
||||
log_.debug() << "Number of results = " << results.size() << " serialized in "
|
||||
LOG(log_.debug()) << "Number of results = " << results.size() << " serialized in "
|
||||
<< std::chrono::duration_cast<std::chrono::microseconds>(end2 - end).count() << " microseconds";
|
||||
|
||||
return output;
|
||||
|
||||
@@ -86,7 +86,7 @@ NFTHistoryHandler::process(NFTHistoryHandler::Input input, Context const& ctx) c
|
||||
|
||||
auto const [txnsAndCursor, timeDiff] = util::timed(
|
||||
[&]() { return sharedPtrBackend_->fetchNFTTransactions(tokenID, limit, input.forward, cursor, ctx.yield); });
|
||||
log_.info() << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size();
|
||||
LOG(log_.info()) << "db fetch took " << timeDiff << " milliseconds - num blobs = " << txnsAndCursor.txns.size();
|
||||
|
||||
Output response;
|
||||
auto const [blobs, retCursor] = txnsAndCursor;
|
||||
@@ -105,7 +105,7 @@ NFTHistoryHandler::process(NFTHistoryHandler::Input input, Context const& ctx) c
|
||||
}
|
||||
else if (txnPlusMeta.ledgerSequence > maxIndex && !input.forward)
|
||||
{
|
||||
log_.debug() << "Skipping over transactions from incomplete ledger";
|
||||
LOG(log_.debug()) << "Skipping over transactions from incomplete ledger";
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -183,7 +183,8 @@ ConfigReader::open(std::filesystem::path path)
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
util::LogService::error() << "Could not read configuration file from '" << path.string() << "': " << e.what();
|
||||
LOG(util::LogService::error()) << "Could not read configuration file from '" << path.string()
|
||||
<< "': " << e.what();
|
||||
}
|
||||
|
||||
return Config{};
|
||||
|
||||
@@ -139,7 +139,7 @@ LogService::init(util::Config const& config)
|
||||
}
|
||||
|
||||
core->set_filter(min_severity);
|
||||
LogService::info() << "Default log level = " << defaultSeverity;
|
||||
LOG(LogService::info()) << "Default log level = " << defaultSeverity;
|
||||
}
|
||||
|
||||
Logger::Pump
|
||||
|
||||
@@ -88,6 +88,16 @@ using SourceLocationType = SourceLocation;
|
||||
#define CURRENT_SRC_LOCATION SourceLocationType(__builtin_FILE(), __builtin_LINE())
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Skips evaluation of expensive argument lists if the given logger is disabled for the required severity level.
|
||||
*/
|
||||
#define LOG(x) \
|
||||
if (!x) \
|
||||
{ \
|
||||
} \
|
||||
else \
|
||||
x
|
||||
|
||||
/**
|
||||
* @brief Custom severity levels for @ref util::Logger.
|
||||
*/
|
||||
@@ -158,8 +168,7 @@ class Logger final
|
||||
operator=(Pump&&) = delete;
|
||||
|
||||
/**
|
||||
* @brief Perfectly forwards any incoming data into the underlying
|
||||
* boost::log pump if the pump is available. nop otherwise.
|
||||
* @brief Perfectly forwards any incoming data into the underlying boost::log pump if the pump is available.
|
||||
*
|
||||
* @tparam T Type of data to pump
|
||||
* @param data The data to pump
|
||||
@@ -174,6 +183,14 @@ class Logger final
|
||||
return *this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if logger is enabled; false otherwise
|
||||
*/
|
||||
operator bool() const
|
||||
{
|
||||
return pump_.has_value();
|
||||
}
|
||||
|
||||
private:
|
||||
[[nodiscard]] std::string
|
||||
pretty_path(SourceLocationType const& loc, size_t max_depth = 3) const;
|
||||
|
||||
@@ -76,7 +76,7 @@ struct Context : util::Taggable
|
||||
, clientIp(clientIp)
|
||||
{
|
||||
static util::Logger perfLog{"Performance"};
|
||||
perfLog.debug() << tag() << "new Context created";
|
||||
LOG(perfLog.debug()) << tag() << "new Context created";
|
||||
}
|
||||
|
||||
Context(Context&&) = default;
|
||||
|
||||
@@ -129,7 +129,7 @@ public:
|
||||
auto [transferedByte, requests] = ipState_.at(ip);
|
||||
if (transferedByte > maxFetches_ || requests > maxRequestCount_)
|
||||
{
|
||||
log_.warn() << "Dosguard: Client surpassed the rate limit. ip = " << ip
|
||||
LOG(log_.warn()) << "Dosguard: Client surpassed the rate limit. ip = " << ip
|
||||
<< " Transfered Byte: " << transferedByte << "; Requests: " << requests;
|
||||
return false;
|
||||
}
|
||||
@@ -139,7 +139,7 @@ public:
|
||||
{
|
||||
if (it->second > maxConnCount_)
|
||||
{
|
||||
log_.warn() << "Dosguard: Client surpassed the rate limit. ip = " << ip
|
||||
LOG(log_.warn()) << "Dosguard: Client surpassed the rate limit. ip = " << ip
|
||||
<< " Concurrent connection: " << it->second;
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ public:
|
||||
try
|
||||
{
|
||||
auto req = boost::json::parse(request).as_object();
|
||||
perfLog_.debug() << connection->tag() << "Adding to work queue";
|
||||
LOG(perfLog_.debug()) << connection->tag() << "Adding to work queue";
|
||||
|
||||
if (not connection->upgraded and not req.contains("params"))
|
||||
req["params"] = boost::json::array({boost::json::object{}});
|
||||
@@ -116,7 +116,7 @@ public:
|
||||
}
|
||||
catch (std::exception const& ex)
|
||||
{
|
||||
perfLog_.error() << connection->tag() << "Caught exception: " << ex.what();
|
||||
LOG(perfLog_.error()) << connection->tag() << "Caught exception: " << ex.what();
|
||||
rpcEngine_->notifyInternalError();
|
||||
throw;
|
||||
}
|
||||
@@ -144,7 +144,7 @@ private:
|
||||
boost::json::object&& request,
|
||||
std::shared_ptr<web::ConnectionBase> const& connection)
|
||||
{
|
||||
log_.info() << connection->tag() << (connection->upgraded ? "ws" : "http")
|
||||
LOG(log_.info()) << connection->tag() << (connection->upgraded ? "ws" : "http")
|
||||
<< " received request from work queue: " << util::removeSecret(request)
|
||||
<< " ip = " << connection->clientIp;
|
||||
|
||||
@@ -181,8 +181,8 @@ private:
|
||||
if (!context)
|
||||
{
|
||||
auto const err = context.error();
|
||||
perfLog_.warn() << connection->tag() << "Could not create Web context: " << err;
|
||||
log_.warn() << connection->tag() << "Could not create Web context: " << err;
|
||||
LOG(perfLog_.warn()) << connection->tag() << "Could not create Web context: " << err;
|
||||
LOG(log_.warn()) << connection->tag() << "Could not create Web context: " << err;
|
||||
|
||||
// we count all those as BadSyntax - as the WS path would.
|
||||
// Although over HTTP these will yield a 400 status with a plain text response (for most).
|
||||
@@ -202,8 +202,8 @@ private:
|
||||
response = web::detail::ErrorHelper(connection, request).composeError(*status);
|
||||
auto const responseStr = boost::json::serialize(response);
|
||||
|
||||
perfLog_.debug() << context->tag() << "Encountered error: " << responseStr;
|
||||
log_.debug() << context->tag() << "Encountered error: " << responseStr;
|
||||
LOG(perfLog_.debug()) << context->tag() << "Encountered error: " << responseStr;
|
||||
LOG(log_.debug()) << context->tag() << "Encountered error: " << responseStr;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -260,8 +260,8 @@ private:
|
||||
{
|
||||
// note: while we are catching this in buildResponse too, this is here to make sure
|
||||
// that any other code that may throw is outside of buildResponse is also worked around.
|
||||
perfLog_.error() << connection->tag() << "Caught exception: " << ex.what();
|
||||
log_.error() << connection->tag() << "Caught exception: " << ex.what();
|
||||
LOG(perfLog_.error()) << connection->tag() << "Caught exception: " << ex.what();
|
||||
LOG(log_.error()) << connection->tag() << "Caught exception: " << ex.what();
|
||||
|
||||
rpcEngine_->notifyInternalError();
|
||||
return web::detail::ErrorHelper(connection, request).sendInternalError();
|
||||
|
||||
@@ -95,7 +95,7 @@ public:
|
||||
if (ec == boost::asio::ssl::error::stream_truncated)
|
||||
return;
|
||||
|
||||
log_.info() << "Detector failed (" << message << "): " << ec.message();
|
||||
LOG(log_.info()) << "Detector failed (" << message << "): " << ec.message();
|
||||
}
|
||||
|
||||
/** @brief Initiate the detection. */
|
||||
@@ -205,7 +205,7 @@ public:
|
||||
acceptor_.bind(endpoint, ec);
|
||||
if (ec)
|
||||
{
|
||||
log_.error() << "Failed to bind to endpoint: " << endpoint << ". message: " << ec.message();
|
||||
LOG(log_.error()) << "Failed to bind to endpoint: " << endpoint << ". message: " << ec.message();
|
||||
throw std::runtime_error(
|
||||
fmt::format("Failed to bind to endpoint: {}:{}", endpoint.address().to_string(), endpoint.port()));
|
||||
}
|
||||
@@ -213,7 +213,7 @@ public:
|
||||
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
|
||||
if (ec)
|
||||
{
|
||||
log_.error() << "Failed to listen at endpoint: " << endpoint << ". message: " << ec.message();
|
||||
LOG(log_.error()) << "Failed to listen at endpoint: " << endpoint << ". message: " << ec.message();
|
||||
throw std::runtime_error(
|
||||
fmt::format("Failed to listen at endpoint: {}:{}", endpoint.address().to_string(), endpoint.port()));
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ protected:
|
||||
if (!ec_ && ec != boost::asio::error::operation_aborted)
|
||||
{
|
||||
ec_ = ec;
|
||||
perfLog_.info() << tag() << ": " << what << ": " << ec.message();
|
||||
LOG(perfLog_.info()) << tag() << ": " << what << ": " << ec.message();
|
||||
boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
|
||||
}
|
||||
}
|
||||
@@ -139,13 +139,13 @@ public:
|
||||
, dosGuard_(dosGuard)
|
||||
, handler_(handler)
|
||||
{
|
||||
perfLog_.debug() << tag() << "http session created";
|
||||
LOG(perfLog_.debug()) << tag() << "http session created";
|
||||
dosGuard_.get().increment(ip);
|
||||
}
|
||||
|
||||
virtual ~HttpBase()
|
||||
{
|
||||
perfLog_.debug() << tag() << "http session closed";
|
||||
LOG(perfLog_.debug()) << tag() << "http session closed";
|
||||
if (not upgraded)
|
||||
dosGuard_.get().decrement(this->clientIp);
|
||||
}
|
||||
@@ -204,7 +204,7 @@ public:
|
||||
boost::json::serialize(rpc::makeError(rpc::RippledError::rpcSLOW_DOWN))));
|
||||
}
|
||||
|
||||
log_.info() << tag() << "Received request from ip = " << clientIp << " - posting to WorkQueue";
|
||||
LOG(log_.info()) << tag() << "Received request from ip = " << clientIp << " - posting to WorkQueue";
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
@@ -63,7 +63,7 @@ protected:
|
||||
if (!ec_ && ec != boost::asio::error::operation_aborted)
|
||||
{
|
||||
ec_ = ec;
|
||||
perfLog_.info() << tag() << ": " << what << ": " << ec.message();
|
||||
LOG(perfLog_.info()) << tag() << ": " << what << ": " << ec.message();
|
||||
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
|
||||
(*handler_)(ec, derived().shared_from_this());
|
||||
}
|
||||
@@ -79,12 +79,12 @@ public:
|
||||
: ConnectionBase(tagFactory, ip), buffer_(std::move(buffer)), dosGuard_(dosGuard), handler_(handler)
|
||||
{
|
||||
upgraded = true;
|
||||
perfLog_.debug() << tag() << "session created";
|
||||
LOG(perfLog_.debug()) << tag() << "session created";
|
||||
}
|
||||
|
||||
virtual ~WsBase()
|
||||
{
|
||||
perfLog_.debug() << tag() << "session closed";
|
||||
LOG(perfLog_.debug()) << tag() << "session closed";
|
||||
dosGuard_.get().decrement(clientIp);
|
||||
}
|
||||
|
||||
@@ -193,7 +193,7 @@ public:
|
||||
if (ec)
|
||||
return wsFail(ec, "accept");
|
||||
|
||||
perfLog_.info() << tag() << "accepting new connection";
|
||||
LOG(perfLog_.info()) << tag() << "accepting new connection";
|
||||
|
||||
doRead();
|
||||
}
|
||||
@@ -218,7 +218,7 @@ public:
|
||||
if (ec)
|
||||
return wsFail(ec, "read");
|
||||
|
||||
perfLog_.info() << tag() << "Received request from ip = " << this->clientIp;
|
||||
LOG(perfLog_.info()) << tag() << "Received request from ip = " << this->clientIp;
|
||||
|
||||
auto sendError = [this](auto error, std::string&& requestStr) {
|
||||
auto e = rpc::makeError(error);
|
||||
|
||||
@@ -57,6 +57,23 @@ TEST_F(LoggerTest, Filtering)
|
||||
checkEqual("Trace:TRC Trace line logged for 'Trace' component");
|
||||
}
|
||||
|
||||
TEST_F(LoggerTest, LOGMacro)
|
||||
{
|
||||
Logger log{"General"};
|
||||
|
||||
auto computeCalled = false;
|
||||
auto compute = [&computeCalled]() {
|
||||
computeCalled = true;
|
||||
return "computed";
|
||||
};
|
||||
|
||||
LOG(log.trace()) << compute();
|
||||
EXPECT_FALSE(computeCalled);
|
||||
|
||||
log.trace() << compute();
|
||||
EXPECT_TRUE(computeCalled);
|
||||
}
|
||||
|
||||
TEST_F(NoLoggerTest, Basic)
|
||||
{
|
||||
Logger log{"Trace"};
|
||||
|
||||
@@ -127,7 +127,6 @@ TEST_F(CacheLoaderTest, FromCache)
|
||||
.WillByDefault(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
|
||||
EXPECT_CALL(*rawBackendPtr, doFetchLedgerObjects).Times(loops);
|
||||
|
||||
EXPECT_CALL(cache, size).Times(AtLeast(1));
|
||||
EXPECT_CALL(cache, update).Times(loops);
|
||||
EXPECT_CALL(cache, isFull).Times(1);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user