fix transaction on db restart bug

This commit is contained in:
CJ Cobb
2021-08-19 10:28:14 -04:00
parent ac5bbc9ea1
commit 79ac95ae8e
7 changed files with 171 additions and 188 deletions

View File

@@ -44,10 +44,7 @@ BackendInterface::writeLedgerObject(
assert(key.size() == sizeof(rippled::uint256)); assert(key.size() == sizeof(rippled::uint256));
ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); ripple::uint256 key256 = ripple::uint256::fromVoid(key.data());
indexer_.addKey(std::move(key256)); indexer_.addKey(std::move(key256));
doWriteLedgerObject( doWriteLedgerObject(std::move(key), seq, std::move(blob));
std::move(key),
seq,
std::move(blob));
} }
std::optional<LedgerRange> std::optional<LedgerRange>
BackendInterface::hardFetchLedgerRangeNoThrow() const BackendInterface::hardFetchLedgerRangeNoThrow() const
@@ -196,6 +193,7 @@ BackendInterface::fetchLedgerPage(
{ {
assert(limit != 0); assert(limit != 0);
bool incomplete = !isLedgerIndexed(ledgerSequence); bool incomplete = !isLedgerIndexed(ledgerSequence);
BOOST_LOG_TRIVIAL(debug) << __func__ << " incomplete = " << incomplete;
// really low limits almost always miss // really low limits almost always miss
uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4));
LedgerPage page; LedgerPage page;
@@ -208,10 +206,14 @@ BackendInterface::fetchLedgerPage(
auto partial = auto partial =
doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit); doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
std::string pageCursorStr =
page.cursor ? ripple::strHex(*page.cursor) : "";
std::string partialCursorStr =
partial.cursor ? ripple::strHex(*partial.cursor) : "";
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " " << std::to_string(ledgerSequence) << " " << __func__ << " " << std::to_string(ledgerSequence) << " "
<< std::to_string(adjustedLimit) << " " << std::to_string(adjustedLimit) << " " << pageCursorStr << " - "
<< ripple::strHex(*page.cursor) << " - time = " << partialCursorStr << " - time = "
<< std::to_string( << std::to_string(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::duration_cast<std::chrono::milliseconds>(
end - start) end - start)
@@ -262,11 +264,6 @@ BackendInterface::fetchLedgerPage(
}); });
page.warning = "Data may be incomplete"; page.warning = "Data may be incomplete";
} }
if (page.objects.size() >= limit)
{
page.objects.resize(limit);
page.cursor = page.objects.back().key;
}
return page; return page;
} }

View File

@@ -158,9 +158,9 @@ CassandraBackend::doWriteLedgerObject(
auto& [key, sequence, blob] = params.data; auto& [key, sequence, blob] = params.data;
CassandraStatement statement{insertObject_}; CassandraStatement statement{insertObject_};
statement.bindBytes(key); statement.bindNextBytes(key);
statement.bindInt(sequence); statement.bindNextInt(sequence);
statement.bindBytes(blob); statement.bindNextBytes(blob);
return statement; return statement;
}); });
} }
@@ -176,8 +176,8 @@ CassandraBackend::writeLedger(
[this](auto& params) { [this](auto& params) {
auto& [sequence, header] = params.data; auto& [sequence, header] = params.data;
CassandraStatement statement{insertLedgerHeader_}; CassandraStatement statement{insertLedgerHeader_};
statement.bindInt(sequence); statement.bindNextInt(sequence);
statement.bindBytes(header); statement.bindNextBytes(header);
return statement; return statement;
}); });
makeAndExecuteAsyncWrite( makeAndExecuteAsyncWrite(
@@ -186,8 +186,8 @@ CassandraBackend::writeLedger(
[this](auto& params) { [this](auto& params) {
auto& [hash, sequence] = params.data; auto& [hash, sequence] = params.data;
CassandraStatement statement{insertLedgerHash_}; CassandraStatement statement{insertLedgerHash_};
statement.bindBytes(hash); statement.bindNextBytes(hash);
statement.bindInt(sequence); statement.bindNextInt(sequence);
return statement; return statement;
}); });
ledgerSequence_ = ledgerInfo.seq; ledgerSequence_ = ledgerInfo.seq;
@@ -211,12 +211,12 @@ CassandraBackend::writeAccountTransactions(
[this](auto& params) { [this](auto& params) {
CassandraStatement statement(insertAccountTx_); CassandraStatement statement(insertAccountTx_);
auto& [account, lgrSeq, txnIdx, hash] = params.data; auto& [account, lgrSeq, txnIdx, hash] = params.data;
statement.bindBytes(account); statement.bindNextBytes(account);
uint32_t index = lgrSeq >> 20 << 20; uint32_t index = lgrSeq >> 20 << 20;
statement.bindUInt(index); statement.bindNextUInt(index);
statement.bindIntTuple(lgrSeq, txnIdx); statement.bindNextIntTuple(lgrSeq, txnIdx);
statement.bindBytes(hash); statement.bindNextBytes(hash);
return statement; return statement;
}); });
} }
@@ -235,8 +235,8 @@ CassandraBackend::writeTransaction(
makeAndExecuteAsyncWrite( makeAndExecuteAsyncWrite(
this, std::move(std::make_pair(seq, hash)), [this](auto& params) { this, std::move(std::make_pair(seq, hash)), [this](auto& params) {
CassandraStatement statement{insertLedgerTransaction_}; CassandraStatement statement{insertLedgerTransaction_};
statement.bindInt(params.data.first); statement.bindNextInt(params.data.first);
statement.bindBytes(params.data.second); statement.bindNextBytes(params.data.second);
return statement; return statement;
}); });
makeAndExecuteAsyncWrite( makeAndExecuteAsyncWrite(
@@ -246,10 +246,10 @@ CassandraBackend::writeTransaction(
[this](auto& params) { [this](auto& params) {
CassandraStatement statement{insertTransaction_}; CassandraStatement statement{insertTransaction_};
auto& [hash, sequence, transaction, metadata] = params.data; auto& [hash, sequence, transaction, metadata] = params.data;
statement.bindBytes(hash); statement.bindNextBytes(hash);
statement.bindInt(sequence); statement.bindNextInt(sequence);
statement.bindBytes(transaction); statement.bindNextBytes(transaction);
statement.bindBytes(metadata); statement.bindNextBytes(metadata);
return statement; return statement;
}); });
} }
@@ -290,7 +290,7 @@ struct ReadCallbackData
std::atomic_int& numOutstanding; std::atomic_int& numOutstanding;
std::mutex& mtx; std::mutex& mtx;
std::condition_variable& cv; std::condition_variable& cv;
bool errored = false; std::atomic_bool errored = false;
ReadCallbackData( ReadCallbackData(
std::atomic_int& numOutstanding, std::atomic_int& numOutstanding,
std::mutex& m, std::mutex& m,
@@ -321,7 +321,7 @@ struct ReadCallbackData
void void
processAsyncRead(CassFuture* fut, void* cbData) processAsyncRead(CassFuture* fut, void* cbData)
{ {
ReadCallbackData cb = *static_cast<ReadCallbackData*>(cbData); ReadCallbackData& cb = *static_cast<ReadCallbackData*>(cbData);
cb.finish(fut); cb.finish(fut);
} }
std::vector<TransactionAndMetadata> std::vector<TransactionAndMetadata>
@@ -339,7 +339,7 @@ CassandraBackend::fetchTransactions(
for (std::size_t i = 0; i < hashes.size(); ++i) for (std::size_t i = 0; i < hashes.size(); ++i)
{ {
CassandraStatement statement{selectTransaction_}; CassandraStatement statement{selectTransaction_};
statement.bindBytes(hashes[i]); statement.bindNextBytes(hashes[i]);
cbs.push_back(std::make_shared<ReadCallbackData>( cbs.push_back(std::make_shared<ReadCallbackData>(
numOutstanding, mtx, cv, [i, &results](auto& result) { numOutstanding, mtx, cv, [i, &results](auto& result) {
if (result.hasResult()) if (result.hasResult())
@@ -373,7 +373,7 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
uint32_t ledgerSequence) const uint32_t ledgerSequence) const
{ {
CassandraStatement statement{selectAllTransactionHashesInLedger_}; CassandraStatement statement{selectAllTransactionHashesInLedger_};
statement.bindInt(ledgerSequence); statement.bindNextInt(ledgerSequence);
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
@@ -424,11 +424,11 @@ CassandraBackend::fetchAccountTransactions(
else else
return CassandraStatement{selectAccountTx_}; return CassandraStatement{selectAccountTx_};
}(); }();
statement.bindBytes(account); statement.bindNextBytes(account);
if (cursor) if (cursor)
{ {
statement.bindUInt(cursor->ledgerSequence >> 20 << 20); statement.bindNextUInt(cursor->ledgerSequence >> 20 << 20);
statement.bindIntTuple( statement.bindNextIntTuple(
cursor->ledgerSequence, cursor->transactionIndex); cursor->ledgerSequence, cursor->transactionIndex);
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account) << " account = " << ripple::strHex(account)
@@ -439,16 +439,16 @@ CassandraBackend::fetchAccountTransactions(
else else
{ {
int seq = forward ? rng->minSequence : rng->maxSequence; int seq = forward ? rng->minSequence : rng->maxSequence;
statement.bindUInt(seq >> 20 << 20); statement.bindNextUInt(seq >> 20 << 20);
int placeHolder = forward ? 0 : INT32_MAX; int placeHolder = forward ? 0 : INT32_MAX;
statement.bindIntTuple(placeHolder, placeHolder); statement.bindNextIntTuple(placeHolder, placeHolder);
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account) << " account = " << ripple::strHex(account)
<< " idx = " << seq << " tuple = " << placeHolder; << " idx = " << seq << " tuple = " << placeHolder;
} }
uint32_t adjustedLimit = limit - hashes.size(); uint32_t adjustedLimit = limit - hashes.size();
statement.bindUInt(adjustedLimit); statement.bindNextUInt(adjustedLimit);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result.hasResult()) if (!result.hasResult())
{ {
@@ -487,8 +487,8 @@ CassandraBackend::fetchAccountTransactions(
cursor->transactionIndex = forward ? 0 : INT32_MAX; cursor->transactionIndex = forward ? 0 : INT32_MAX;
BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back"; BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back";
CassandraStatement statement{selectObject_}; CassandraStatement statement{selectObject_};
statement.bindBytes(keylet.key); statement.bindNextBytes(keylet.key);
statement.bindInt(seq); statement.bindNextInt(seq);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result) if (!result)
{ {
@@ -529,15 +529,15 @@ CassandraBackend::doFetchLedgerPage(
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor); << __func__ << " - Cursor = " << ripple::strHex(*cursor);
CassandraStatement statement{selectKeys_}; CassandraStatement statement{selectKeys_};
statement.bindInt(index->keyIndex); statement.bindNextInt(index->keyIndex);
if (!cursor) if (!cursor)
{ {
ripple::uint256 zero; ripple::uint256 zero;
cursor = zero; cursor = zero;
} }
statement.bindBytes(cursor->data(), 1); statement.bindNextBytes(cursor->data(), 1);
statement.bindBytes(*cursor); statement.bindNextBytes(*cursor);
statement.bindUInt(limit + 1); statement.bindNextUInt(limit + 1);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!!result) if (!!result)
{ {
@@ -613,8 +613,8 @@ CassandraBackend::fetchLedgerObjects(
results[i] = result.getBytes(); results[i] = result.getBytes();
})); }));
CassandraStatement statement{selectObject_}; CassandraStatement statement{selectObject_};
statement.bindBytes(keys[i]); statement.bindNextBytes(keys[i]);
statement.bindInt(sequence); statement.bindNextInt(sequence);
executeAsyncRead(statement, processAsyncRead, *cbs[i]); executeAsyncRead(statement, processAsyncRead, *cbs[i]);
} }
assert(results.size() == cbs.size()); assert(results.size() == cbs.size());
@@ -641,9 +641,9 @@ CassandraBackend::writeKeys(
auto bind = [this](auto& params) { auto bind = [this](auto& params) {
auto& [lgrSeq, key] = params.data; auto& [lgrSeq, key] = params.data;
CassandraStatement statement{insertKey_}; CassandraStatement statement{insertKey_};
statement.bindInt(lgrSeq); statement.bindNextInt(lgrSeq);
statement.bindBytes(key.data(), 1); statement.bindNextBytes(key.data(), 1);
statement.bindBytes(key); statement.bindNextBytes(key);
return statement; return statement;
}; };
std::atomic_int numOutstanding = 0; std::atomic_int numOutstanding = 0;
@@ -706,9 +706,9 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
auto bind = [this](auto& params) { auto bind = [this](auto& params) {
auto& [key, seq, obj] = params.data; auto& [key, seq, obj] = params.data;
CassandraStatement statement{insertObject_}; CassandraStatement statement{insertObject_};
statement.bindBytes(key); statement.bindNextBytes(key);
statement.bindInt(seq); statement.bindNextInt(seq);
statement.bindBytes(obj); statement.bindNextBytes(obj);
return statement; return statement;
}; };
std::condition_variable cv; std::condition_variable cv;
@@ -770,7 +770,7 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
std::unique_lock<std::mutex> lck(mtx); std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
CassandraStatement statement{deleteLedgerRange_}; CassandraStatement statement{deleteLedgerRange_};
statement.bindInt(minLedger); statement.bindNextInt(minLedger);
executeSyncWrite(statement); executeSyncWrite(statement);
// update ledger_range // update ledger_range
return true; return true;

View File

@@ -124,11 +124,11 @@ public:
} }
void void
bindBoolean(bool val) bindNextBoolean(bool val)
{ {
if (!statement_) if (!statement_)
throw std::runtime_error( throw std::runtime_error(
"CassandraStatement::bindBoolean - statement_ is null"); "CassandraStatement::bindNextBoolean - statement_ is null");
CassError rc = cass_statement_bind_bool( CassError rc = cass_statement_bind_bool(
statement_, 1, static_cast<cass_bool_t>(val)); statement_, 1, static_cast<cass_bool_t>(val));
if (rc != CASS_OK) if (rc != CASS_OK)
@@ -143,45 +143,45 @@ public:
} }
void void
bindBytes(const char* data, uint32_t size) bindNextBytes(const char* data, uint32_t size)
{ {
bindBytes((unsigned char*)data, size); bindNextBytes((unsigned char*)data, size);
} }
void void
bindBytes(ripple::uint256 const& data) bindNextBytes(ripple::uint256 const& data)
{ {
bindBytes(data.data(), data.size()); bindNextBytes(data.data(), data.size());
} }
void void
bindBytes(std::vector<unsigned char> const& data) bindNextBytes(std::vector<unsigned char> const& data)
{ {
bindBytes(data.data(), data.size()); bindNextBytes(data.data(), data.size());
} }
void void
bindBytes(ripple::AccountID const& data) bindNextBytes(ripple::AccountID const& data)
{ {
bindBytes(data.data(), data.size()); bindNextBytes(data.data(), data.size());
} }
void void
bindBytes(std::string const& data) bindNextBytes(std::string const& data)
{ {
bindBytes(data.data(), data.size()); bindNextBytes(data.data(), data.size());
} }
void void
bindBytes(void const* key, uint32_t size) bindNextBytes(void const* key, uint32_t size)
{ {
bindBytes(static_cast<const unsigned char*>(key), size); bindNextBytes(static_cast<const unsigned char*>(key), size);
} }
void void
bindBytes(const unsigned char* data, uint32_t size) bindNextBytes(const unsigned char* data, uint32_t size)
{ {
if (!statement_) if (!statement_)
throw std::runtime_error( throw std::runtime_error(
"CassandraStatement::bindBytes - statement_ is null"); "CassandraStatement::bindNextBytes - statement_ is null");
CassError rc = cass_statement_bind_bytes( CassError rc = cass_statement_bind_bytes(
statement_, statement_,
curBindingIndex_, curBindingIndex_,
@@ -199,11 +199,11 @@ public:
} }
void void
bindUInt(uint32_t value) bindNextUInt(uint32_t value)
{ {
if (!statement_) if (!statement_)
throw std::runtime_error( throw std::runtime_error(
"CassandraStatement::bindUInt - statement_ is null"); "CassandraStatement::bindNextUInt - statement_ is null");
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(trace)
<< std::to_string(curBindingIndex_) << " " << std::to_string(value); << std::to_string(curBindingIndex_) << " " << std::to_string(value);
CassError rc = CassError rc =
@@ -220,17 +220,17 @@ public:
} }
void void
bindInt(uint32_t value) bindNextInt(uint32_t value)
{ {
bindInt((int64_t)value); bindNextInt((int64_t)value);
} }
void void
bindInt(int64_t value) bindNextInt(int64_t value)
{ {
if (!statement_) if (!statement_)
throw std::runtime_error( throw std::runtime_error(
"CassandraStatement::bindInt - statement_ is null"); "CassandraStatement::bindNextInt - statement_ is null");
CassError rc = CassError rc =
cass_statement_bind_int64(statement_, curBindingIndex_, value); cass_statement_bind_int64(statement_, curBindingIndex_, value);
if (rc != CASS_OK) if (rc != CASS_OK)
@@ -245,7 +245,7 @@ public:
} }
void void
bindIntTuple(uint32_t first, uint32_t second) bindNextIntTuple(uint32_t first, uint32_t second)
{ {
CassTuple* tuple = cass_tuple_new(2); CassTuple* tuple = cass_tuple_new(2);
CassError rc = cass_tuple_set_int64(tuple, 0, first); CassError rc = cass_tuple_set_int64(tuple, 0, first);
@@ -667,15 +667,15 @@ public:
if (isFirstLedger_) if (isFirstLedger_)
{ {
CassandraStatement statement{updateLedgerRange_}; CassandraStatement statement{updateLedgerRange_};
statement.bindInt(ledgerSequence_); statement.bindNextInt(ledgerSequence_);
statement.bindBoolean(false); statement.bindNextBoolean(false);
statement.bindInt(ledgerSequence_); statement.bindNextInt(ledgerSequence_);
executeSyncWrite(statement); executeSyncWrite(statement);
} }
CassandraStatement statement{updateLedgerRange_}; CassandraStatement statement{updateLedgerRange_};
statement.bindInt(ledgerSequence_); statement.bindNextInt(ledgerSequence_);
statement.bindBoolean(true); statement.bindNextBoolean(true);
statement.bindInt(ledgerSequence_ - 1); statement.bindNextInt(ledgerSequence_ - 1);
if (!executeSyncUpdate(statement)) if (!executeSyncUpdate(statement))
{ {
BOOST_LOG_TRIVIAL(warning) BOOST_LOG_TRIVIAL(warning)
@@ -713,7 +713,7 @@ public:
{ {
BOOST_LOG_TRIVIAL(trace) << __func__; BOOST_LOG_TRIVIAL(trace) << __func__;
CassandraStatement statement{selectLedgerBySeq_}; CassandraStatement statement{selectLedgerBySeq_};
statement.bindInt(sequence); statement.bindNextInt(sequence);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result) if (!result)
@@ -730,7 +730,7 @@ public:
{ {
CassandraStatement statement{selectLedgerByHash_}; CassandraStatement statement{selectLedgerByHash_};
statement.bindBytes(hash); statement.bindNextBytes(hash);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result.hasResult()) if (!result.hasResult())
@@ -761,8 +761,8 @@ public:
{ {
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
CassandraStatement statement{selectObject_}; CassandraStatement statement{selectObject_};
statement.bindBytes(key); statement.bindNextBytes(key);
statement.bindInt(sequence); statement.bindNextInt(sequence);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result) if (!result)
{ {
@@ -780,7 +780,7 @@ public:
{ {
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
CassandraStatement statement{getToken_}; CassandraStatement statement{getToken_};
statement.bindBytes(key, 32); statement.bindNextBytes(key, 32);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result) if (!result)
{ {
@@ -799,7 +799,7 @@ public:
{ {
BOOST_LOG_TRIVIAL(trace) << __func__; BOOST_LOG_TRIVIAL(trace) << __func__;
CassandraStatement statement{selectTransaction_}; CassandraStatement statement{selectTransaction_};
statement.bindBytes(hash); statement.bindNextBytes(hash);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result) if (!result)
{ {
@@ -830,10 +830,8 @@ public:
uint32_t sequence) const override; uint32_t sequence) const override;
void void
doWriteLedgerObject( doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob)
std::string&& key, const override;
uint32_t seq,
std::string&& blob) const override;
void void
writeAccountTransactions( writeAccountTransactions(

View File

@@ -145,12 +145,14 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
void void
ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{ {
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Publishing ledger " << std::to_string(lgrInfo.seq);
backend_->updateRange(lgrInfo.seq); backend_->updateRange(lgrInfo.seq);
auto ledgerRange = backend_->fetchLedgerRange(); auto ledgerRange = backend_->fetchLedgerRange();
std::optional<ripple::Fees> fees; std::optional<ripple::Fees> fees;
std::vector<Backend::TransactionAndMetadata> transactions; std::vector<Backend::TransactionAndMetadata> transactions;
for (;;) while (true)
{ {
try try
{ {
@@ -180,6 +182,8 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
subscriptions_->pubTransaction(txAndMeta, lgrInfo.seq); subscriptions_->pubTransaction(txAndMeta, lgrInfo.seq);
setLastPublish(); setLastPublish();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Published ledger " << std::to_string(lgrInfo.seq);
} }
bool bool
@@ -301,11 +305,10 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
} }
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
<< "wrote objects. num objects = " << "Inserted/modified/deleted all objects. Number of objects = "
<< std::to_string(rawData.ledger_objects().objects_size()); << rawData.ledger_objects().objects_size();
std::vector<AccountTransactionsData> accountTxData{ std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)}; insertTransactions(lgrInfo, rawData)};
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
<< "Inserted all transactions. Number of transactions = " << "Inserted all transactions. Number of transactions = "
@@ -313,28 +316,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
backend_->writeAccountTransactions(std::move(accountTxData)); backend_->writeAccountTransactions(std::move(accountTxData));
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote account_tx"; << "wrote account_tx";
accumTxns_ += rawData.transactions_list().transactions_size(); auto start = std::chrono::system_clock::now();
bool success = true; bool success = backend_->finishWrites(lgrInfo.seq);
if (accumTxns_ >= txnThreshold_) auto end = std::chrono::system_clock::now();
{
auto start = std::chrono::system_clock::now();
success = backend_->finishWrites(lgrInfo.seq);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0; auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Accumulated " << std::to_string(accumTxns_)
<< " transactions. Wrote in " << std::to_string(duration)
<< " transactions per second = "
<< std::to_string(accumTxns_ / duration);
accumTxns_ = 0;
}
else
BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping commit";
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " finished writes. took " << std::to_string(duration);
<< "Inserted/modified/deleted all objects. Number of objects = "
<< rawData.ledger_objects().objects_size();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
@@ -477,29 +465,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
beast::setCurrentThreadName("rippled: ReportingETL transform"); beast::setCurrentThreadName("rippled: ReportingETL transform");
uint32_t currentSequence = startSequence; uint32_t currentSequence = startSequence;
int counter = 0;
std::atomic_int per = 100;
auto startTimer = [this, &per]() {
auto innerFunc = [this, &per](auto& f) -> void {
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
ioContext_,
std::chrono::steady_clock::now() +
std::chrono::minutes(5));
timer->async_wait(
[timer, f, &per](const boost::system::error_code& error) {
++per;
BOOST_LOG_TRIVIAL(info)
<< "Incremented per to " << std::to_string(per);
if (per > 100)
per = 100;
f(f);
});
};
innerFunc(innerFunc);
};
// startTimer();
auto begin = std::chrono::system_clock::now(); auto begin = std::chrono::system_clock::now();
while (!writeConflict) while (!writeConflict)
@@ -527,7 +492,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
if (success) if (success)
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< "Load phase of etl : " << "Load phase of etl : "
<< "Successfully published ledger! Ledger info: " << "Successfully wrote ledger! Ledger info: "
<< detail::toString(lgrInfo) << ". txn count = " << numTxns << detail::toString(lgrInfo) << ". txn count = " << numTxns
<< ". object count = " << numObjects << ". object count = " << numObjects
<< ". load time = " << duration << ". load time = " << duration
@@ -558,19 +523,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
deleting_ = false; deleting_ = false;
}); });
} }
/*
if (++counter >= per)
{
std::chrono::milliseconds sleep =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::seconds(4) - (end - begin));
BOOST_LOG_TRIVIAL(info) << "Sleeping for " << sleep.count()
<< " . per = " << std::to_string(per);
std::this_thread::sleep_for(sleep);
counter = 0;
begin = std::chrono::system_clock::now();
}
*/
} }
}}; }};

View File

@@ -84,21 +84,44 @@ std::pair<
std::shared_ptr<ripple::STObject const>> std::shared_ptr<ripple::STObject const>>
deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs) deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs)
{ {
std::pair< try
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>
result;
{ {
ripple::SerialIter s{ std::pair<
blobs.transaction.data(), blobs.transaction.size()}; std::shared_ptr<ripple::STTx const>,
result.first = std::make_shared<ripple::STTx const>(s); std::shared_ptr<ripple::STObject const>>
result;
{
ripple::SerialIter s{
blobs.transaction.data(), blobs.transaction.size()};
result.first = std::make_shared<ripple::STTx const>(s);
}
{
ripple::SerialIter s{blobs.metadata.data(), blobs.metadata.size()};
result.second =
std::make_shared<ripple::STObject const>(s, ripple::sfMetadata);
}
return result;
} }
catch (std::exception const& e)
{ {
ripple::SerialIter s{blobs.metadata.data(), blobs.metadata.size()}; std::stringstream txn;
result.second = std::stringstream meta;
std::make_shared<ripple::STObject const>(s, ripple::sfMetadata); std::copy(
blobs.transaction.begin(),
blobs.transaction.end(),
std::ostream_iterator<unsigned char>(txn));
std::copy(
blobs.metadata.begin(),
blobs.metadata.end(),
std::ostream_iterator<unsigned char>(meta));
BOOST_LOG_TRIVIAL(error)
<< __func__
<< " Failed to deserialize transaction. txn = " << txn.str()
<< " - meta = " << meta.str()
<< " txn length = " << std::to_string(blobs.transaction.size())
<< " meta length = " << std::to_string(blobs.metadata.size());
throw e;
} }
return result;
} }
std::pair< std::pair<

View File

@@ -21,8 +21,8 @@
#include <ripple/protocol/STLedgerEntry.h> #include <ripple/protocol/STLedgerEntry.h>
#include <boost/json.hpp> #include <boost/json.hpp>
#include <rpc/RPCHelpers.h>
#include <backend/BackendInterface.h> #include <backend/BackendInterface.h>
#include <rpc/RPCHelpers.h>
// Get state nodes from a ledger // Get state nodes from a ledger
// Inputs: // Inputs:
// limit: integer, maximum number of entries // limit: integer, maximum number of entries
@@ -37,8 +37,7 @@
// //
// //
namespace RPC namespace RPC {
{
Result Result
doLedgerData(Context const& context) doLedgerData(Context const& context)
@@ -47,34 +46,34 @@ doLedgerData(Context const& context)
boost::json::object response = {}; boost::json::object response = {};
bool binary = false; bool binary = false;
if(request.contains("binary")) if (request.contains("binary"))
{ {
if(!request.at("binary").is_bool()) if (!request.at("binary").is_bool())
return Status{Error::rpcINVALID_PARAMS, "binaryFlagNotBool"}; return Status{Error::rpcINVALID_PARAMS, "binaryFlagNotBool"};
binary = request.at("binary").as_bool(); binary = request.at("binary").as_bool();
} }
std::size_t limit = binary ? 2048 : 256; std::size_t limit = binary ? 2048 : 256;
if(request.contains("limit")) if (request.contains("limit"))
{ {
if(!request.at("limit").is_int64()) if (!request.at("limit").is_int64())
return Status{Error::rpcINVALID_PARAMS, "limitNotInteger"}; return Status{Error::rpcINVALID_PARAMS, "limitNotInteger"};
limit = value_to<int>(request.at("limit")); limit = value_to<int>(request.at("limit"));
} }
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
if(request.contains("cursor")) if (request.contains("marker"))
{ {
if(!request.at("cursor").is_string()) if (!request.at("marker").is_string())
return Status{Error::rpcINVALID_PARAMS, "cursorNotString"}; return Status{Error::rpcINVALID_PARAMS, "markerNotString"};
BOOST_LOG_TRIVIAL(debug) << __func__ << " : parsing cursor"; BOOST_LOG_TRIVIAL(debug) << __func__ << " : parsing marker";
cursor = ripple::uint256{}; cursor = ripple::uint256{};
if(!cursor->parseHex(request.at("cursor").as_string().c_str())) if (!cursor->parseHex(request.at("marker").as_string().c_str()))
return Status{Error::rpcINVALID_PARAMS, "cursorMalformed"}; return Status{Error::rpcINVALID_PARAMS, "markerMalformed"};
} }
auto v = ledgerInfoFromRequest(context); auto v = ledgerInfoFromRequest(context);
@@ -94,7 +93,7 @@ doLedgerData(Context const& context)
.count(); .count();
boost::json::object header; boost::json::object header;
if(!cursor) if (!cursor)
{ {
if (binary) if (binary)
{ {
@@ -106,8 +105,10 @@ doLedgerData(Context const& context)
header["account_hash"] = ripple::strHex(lgrInfo.accountHash); header["account_hash"] = ripple::strHex(lgrInfo.accountHash);
header["close_flags"] = lgrInfo.closeFlags; header["close_flags"] = lgrInfo.closeFlags;
header["close_time"] = lgrInfo.closeTime.time_since_epoch().count(); header["close_time"] = lgrInfo.closeTime.time_since_epoch().count();
header["close_time_human"] = ripple::to_string(lgrInfo.closeTime);; header["close_time_human"] = ripple::to_string(lgrInfo.closeTime);
header["close_time_resolution"] = lgrInfo.closeTimeResolution.count(); ;
header["close_time_resolution"] =
lgrInfo.closeTimeResolution.count();
header["closed"] = true; header["closed"] = true;
header["hash"] = ripple::strHex(lgrInfo.hash); header["hash"] = ripple::strHex(lgrInfo.hash);
header["ledger_hash"] = ripple::strHex(lgrInfo.hash); header["ledger_hash"] = ripple::strHex(lgrInfo.hash);
@@ -123,7 +124,7 @@ doLedgerData(Context const& context)
response["ledger"] = header; response["ledger"] = header;
} }
} }
response["ledger_hash"] = ripple::strHex(lgrInfo.hash); response["ledger_hash"] = ripple::strHex(lgrInfo.hash);
response["ledger_index"] = lgrInfo.seq; response["ledger_index"] = lgrInfo.seq;
@@ -131,8 +132,12 @@ doLedgerData(Context const& context)
std::vector<Backend::LedgerObject>& results = page.objects; std::vector<Backend::LedgerObject>& results = page.objects;
std::optional<ripple::uint256> const& returnedCursor = page.cursor; std::optional<ripple::uint256> const& returnedCursor = page.cursor;
if(returnedCursor) if (returnedCursor)
{
response["marker"] = ripple::strHex(*returnedCursor); response["marker"] = ripple::strHex(*returnedCursor);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " cursor = " << ripple::strHex(*returnedCursor);
}
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " number of results = " << results.size(); << __func__ << " number of results = " << results.size();
@@ -152,7 +157,6 @@ doLedgerData(Context const& context)
} }
response["state"] = objects; response["state"] = objects;
if (cursor && page.warning) if (cursor && page.warning)
{ {
response["warning"] = response["warning"] =
@@ -167,4 +171,4 @@ doLedgerData(Context const& context)
return response; return response;
} }
} // namespace RPC } // namespace RPC

11
test.py
View File

@@ -810,7 +810,7 @@ async def perf(ip, port):
parser = argparse.ArgumentParser(description='test script for xrpl-reporting') parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info"]) parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info", "gaps"])
parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080') parser.add_argument('--port', default='8080')
@@ -864,6 +864,15 @@ def run(args):
elif args.action == "perf": elif args.action == "perf":
asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
perf(args.ip,args.port)) perf(args.ip,args.port))
elif args.action == "gaps":
missing = []
for x in range(rng[0],rng[1]):
res = asyncio.get_event_loop().run_until_complete(
ledger(args.ip, args.port, x, True, False, False))
if "error" in res:
print("missing " + str(x))
missing.append(x)
print(missing)
elif args.action == "account_info": elif args.action == "account_info":
res1 = asyncio.get_event_loop().run_until_complete( res1 = asyncio.get_event_loop().run_until_complete(
account_info(args.ip, args.port, args.account, args.ledger, args.binary)) account_info(args.ip, args.port, args.account, args.ledger, args.binary))