fix account_tx iteration

This commit is contained in:
CJ Cobb
2021-07-15 20:12:20 +00:00
12 changed files with 673 additions and 1464 deletions

View File

@@ -40,6 +40,12 @@ make_Backend(boost::json::object const& config)
throw std::runtime_error("Invalid database type"); throw std::runtime_error("Invalid database type");
backend->open(readOnly); backend->open(readOnly);
auto rng = backend->hardFetchLedgerRangeNoThrow();
if (rng)
{
backend->updateRange(rng->minSequence);
backend->updateRange(rng->maxSequence);
}
backend->checkFlagLedgers(); backend->checkFlagLedgers();
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)

View File

@@ -27,7 +27,7 @@ BackendIndexer::doKeysRepair(
BackendInterface const& backend, BackendInterface const& backend,
std::optional<uint32_t> sequence) std::optional<uint32_t> sequence)
{ {
auto rng = backend.fetchLedgerRangeNoThrow(); auto rng = backend.fetchLedgerRange();
if (!rng) if (!rng)
return; return;
@@ -209,11 +209,10 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << __func__
<< " starting. sequence = " << std::to_string(ledgerSequence); << " starting. sequence = " << std::to_string(ledgerSequence);
bool isFirst = false;
auto keyIndex = getKeyIndexOfSeq(ledgerSequence); auto keyIndex = getKeyIndexOfSeq(ledgerSequence);
if (isFirst_) if (isFirst_)
{ {
auto rng = backend.fetchLedgerRangeNoThrow(); auto rng = backend.fetchLedgerRange();
if (rng && rng->minSequence != ledgerSequence) if (rng && rng->minSequence != ledgerSequence)
isFirst_ = false; isFirst_ = false;
else else

View File

@@ -55,14 +55,14 @@ BackendInterface::writeLedgerObject(
std::move(book)); std::move(book));
} }
std::optional<LedgerRange> std::optional<LedgerRange>
BackendInterface::fetchLedgerRangeNoThrow() const BackendInterface::hardFetchLedgerRangeNoThrow() const
{ {
BOOST_LOG_TRIVIAL(warning) << __func__; BOOST_LOG_TRIVIAL(warning) << __func__;
while (true) while (true)
{ {
try try
{ {
return fetchLedgerRange(); return hardFetchLedgerRange();
} }
catch (DatabaseTimeout& t) catch (DatabaseTimeout& t)
{ {
@@ -205,6 +205,7 @@ BackendInterface::fetchLedgerPage(
uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4));
LedgerPage page; LedgerPage page;
page.cursor = cursor; page.cursor = cursor;
int numCalls = 0;
do do
{ {
adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2; adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2;
@@ -223,8 +224,7 @@ BackendInterface::fetchLedgerPage(
page.objects.insert( page.objects.insert(
page.objects.end(), partial.objects.begin(), partial.objects.end()); page.objects.end(), partial.objects.begin(), partial.objects.end());
page.cursor = partial.cursor; page.cursor = partial.cursor;
} while (page.objects.size() < limit && page.cursor); } while (page.objects.size() < limit && page.cursor && ++numCalls < 10);
if (incomplete) if (incomplete)
{ {
auto rng = fetchLedgerRange(); auto rng = fetchLedgerRange();
@@ -278,7 +278,7 @@ BackendInterface::fetchLedgerPage(
void void
BackendInterface::checkFlagLedgers() const BackendInterface::checkFlagLedgers() const
{ {
auto rng = fetchLedgerRangeNoThrow(); auto rng = hardFetchLedgerRangeNoThrow();
if (rng) if (rng)
{ {
bool prevComplete = true; bool prevComplete = true;

View File

@@ -66,6 +66,7 @@ class BackendInterface
protected: protected:
mutable BackendIndexer indexer_; mutable BackendIndexer indexer_;
mutable bool isFirst_ = true; mutable bool isFirst_ = true;
mutable std::optional<LedgerRange> range;
public: public:
BackendInterface(boost::json::object const& config) : indexer_(config) BackendInterface(boost::json::object const& config) : indexer_(config)
@@ -97,16 +98,15 @@ public:
virtual std::optional<uint32_t> virtual std::optional<uint32_t>
fetchLatestLedgerSequence() const = 0; fetchLatestLedgerSequence() const = 0;
virtual std::optional<LedgerRange> std::optional<LedgerRange>
fetchLedgerRange() const = 0; fetchLedgerRange() const
{
return range;
}
std::optional<ripple::Fees> std::optional<ripple::Fees>
fetchFees(std::uint32_t seq) const; fetchFees(std::uint32_t seq) const;
// Doesn't throw DatabaseTimeout. Should be used with care.
std::optional<LedgerRange>
fetchLedgerRangeNoThrow() const;
// *** transaction methods // *** transaction methods
virtual std::optional<TransactionAndMetadata> virtual std::optional<TransactionAndMetadata>
fetchTransaction(ripple::uint256 const& hash) const = 0; fetchTransaction(ripple::uint256 const& hash) const = 0;
@@ -174,6 +174,20 @@ protected:
friend std::shared_ptr<BackendInterface> friend std::shared_ptr<BackendInterface>
make_Backend(boost::json::object const& config); make_Backend(boost::json::object const& config);
friend class ::BackendTest_Basic_Test; friend class ::BackendTest_Basic_Test;
virtual std::optional<LedgerRange>
hardFetchLedgerRange() const = 0;
// Doesn't throw DatabaseTimeout. Should be used with care.
std::optional<LedgerRange>
hardFetchLedgerRangeNoThrow() const;
void
updateRange(uint32_t newMax)
{
if (!range)
range = {newMax, newMax};
else
range->maxSequence = newMax;
}
virtual void virtual void
writeLedger( writeLedger(

File diff suppressed because it is too large Load Diff

View File

@@ -26,37 +26,16 @@
#include <boost/json.hpp> #include <boost/json.hpp>
#include <boost/log/trivial.hpp> #include <boost/log/trivial.hpp>
#include <atomic> #include <atomic>
#include <backend/BackendInterface.h>
#include <backend/DBHelpers.h>
#include <cassandra.h> #include <cassandra.h>
#include <cstddef> #include <cstddef>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <backend/BackendInterface.h>
#include <backend/DBHelpers.h>
namespace Backend { namespace Backend {
void
flatMapWriteCallback(CassFuture* fut, void* cbData);
void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
void
flatMapWriteBookCallback(CassFuture* fut, void* cbData);
void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData);
void
flatMapReadCallback(CassFuture* fut, void* cbData);
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
class CassandraPreparedStatement class CassandraPreparedStatement
{ {
private: private:
@@ -129,6 +108,15 @@ public:
cass_statement_set_consistency(statement_, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement_, CASS_CONSISTENCY_QUORUM);
} }
CassandraStatement(CassandraStatement&& other)
{
statement_ = other.statement_;
other.statement_ = nullptr;
curBindingIndex_ = other.curBindingIndex_;
other.curBindingIndex_ = 0;
}
CassandraStatement(CassandraStatement const& other) = delete;
CassStatement* CassStatement*
get() const get() const
{ {
@@ -535,65 +523,6 @@ isTimeout(CassError rc)
return true; return true;
return false; return false;
} }
template <class T, class F>
class CassandraAsyncResult
{
T& requestParams_;
CassandraResult result_;
bool timedOut_ = false;
bool retryOnTimeout_ = false;
public:
CassandraAsyncResult(
T& requestParams,
CassFuture* fut,
F retry,
bool retryOnTimeout = false)
: requestParams_(requestParams), retryOnTimeout_(retryOnTimeout)
{
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// TODO - should we ever be retrying requests? These are reads,
// and they usually only fail when the db is under heavy load. Seems
// best to just return an error to the client and have the client
// try again
if (isTimeout(rc))
timedOut_ = true;
if (!timedOut_ || retryOnTimeout_)
retry(requestParams_);
}
else
{
result_ = std::move(CassandraResult(cass_future_get_result(fut)));
}
}
~CassandraAsyncResult()
{
if (result_.isOk() or timedOut_)
{
BOOST_LOG_TRIVIAL(trace) << "finished a request";
size_t batchSize = requestParams_.batchSize;
std::unique_lock lk(requestParams_.mtx);
if (++(requestParams_.numFinished) == batchSize)
requestParams_.cv.notify_all();
}
}
CassandraResult&
getResult()
{
return result_;
}
bool
timedOut()
{
return timedOut_;
}
};
class CassandraBackend : public BackendInterface class CassandraBackend : public BackendInterface
{ {
@@ -635,8 +564,8 @@ private:
// than making a new statement // than making a new statement
CassandraPreparedStatement insertObject_; CassandraPreparedStatement insertObject_;
CassandraPreparedStatement insertTransaction_; CassandraPreparedStatement insertTransaction_;
CassandraPreparedStatement insertLedgerTransaction_;
CassandraPreparedStatement selectTransaction_; CassandraPreparedStatement selectTransaction_;
CassandraPreparedStatement selectAllTransactionsInLedger_;
CassandraPreparedStatement selectAllTransactionHashesInLedger_; CassandraPreparedStatement selectAllTransactionHashesInLedger_;
CassandraPreparedStatement selectObject_; CassandraPreparedStatement selectObject_;
CassandraPreparedStatement selectLedgerPageKeys_; CassandraPreparedStatement selectLedgerPageKeys_;
@@ -645,12 +574,6 @@ private:
CassandraPreparedStatement getToken_; CassandraPreparedStatement getToken_;
CassandraPreparedStatement insertKey_; CassandraPreparedStatement insertKey_;
CassandraPreparedStatement selectKeys_; CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement getBook_;
CassandraPreparedStatement selectBook_;
CassandraPreparedStatement completeBook_;
CassandraPreparedStatement insertBook_;
CassandraPreparedStatement insertBook2_;
CassandraPreparedStatement deleteBook_;
CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement selectAccountTx_;
CassandraPreparedStatement insertLedgerHeader_; CassandraPreparedStatement insertLedgerHeader_;
@@ -662,7 +585,6 @@ private:
CassandraPreparedStatement selectLedgerByHash_; CassandraPreparedStatement selectLedgerByHash_;
CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLatestLedger_;
CassandraPreparedStatement selectLedgerRange_; CassandraPreparedStatement selectLedgerRange_;
CassandraPreparedStatement selectLedgerDiff_;
// io_context used for exponential backoff for write retries // io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_; mutable boost::asio::io_context ioContext_;
@@ -700,17 +622,10 @@ public:
~CassandraBackend() override ~CassandraBackend() override
{ {
BOOST_LOG_TRIVIAL(info) << __func__;
if (open_) if (open_)
close(); close();
} }
std::string
getName()
{
return "cassandra";
}
bool bool
isOpen() isOpen()
{ {
@@ -734,27 +649,6 @@ public:
} }
open_ = false; open_ = false;
} }
CassandraPreparedStatement const&
getInsertKeyPreparedStatement() const
{
return insertKey_;
}
CassandraPreparedStatement const&
getInsertBookPreparedStatement() const
{
return insertBook2_;
}
CassandraPreparedStatement const&
getInsertObjectPreparedStatement() const
{
return insertObject_;
}
CassandraPreparedStatement const&
getSelectLedgerDiffPreparedStatement() const
{
return selectLedgerDiff_;
}
std::pair< std::pair<
std::vector<TransactionAndMetadata>, std::vector<TransactionAndMetadata>,
@@ -762,82 +656,14 @@ public:
fetchAccountTransactions( fetchAccountTransactions(
ripple::AccountID const& account, ripple::AccountID const& account,
std::uint32_t limit, std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor) const override std::optional<AccountTransactionsCursor> const& cursor) const override;
{ std::pair<
BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; std::vector<TransactionAndMetadata>,
CassandraStatement statement{selectAccountTx_}; std::optional<AccountTransactionsCursor>>
statement.bindBytes(account); doFetchAccountTransactions(
if (cursor) ripple::AccountID const& account,
statement.bindIntTuple( std::uint32_t limit,
cursor->ledgerSequence, cursor->transactionIndex); std::optional<AccountTransactionsCursor> const& cursor) const;
else
statement.bindIntTuple(INT32_MAX, INT32_MAX);
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
if (!result.hasResult())
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned";
return {{}, {}};
}
std::vector<ripple::uint256> hashes;
size_t numRows = result.numRows();
bool returnCursor = numRows == limit;
std::optional<AccountTransactionsCursor> retCursor;
BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows);
do
{
hashes.push_back(result.getUInt256());
--numRows;
if (numRows == 0 && returnCursor)
{
auto [lgrSeq, txnIdx] = result.getInt64Tuple();
retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx};
}
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< "doAccountTx - populated hashes. num hashes = " << hashes.size();
if (hashes.size())
{
return {fetchTransactions(hashes), retCursor};
}
return {{}, {}};
}
struct WriteLedgerHeaderCallbackData
{
CassandraBackend const* backend;
uint32_t sequence;
std::string header;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteLedgerHeaderCallbackData(
CassandraBackend const* f,
uint32_t sequence,
std::string&& header)
: backend(f), sequence(sequence), header(std::move(header))
{
}
};
struct WriteLedgerHashCallbackData
{
CassandraBackend const* backend;
ripple::uint256 hash;
uint32_t sequence;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteLedgerHashCallbackData(
CassandraBackend const* f,
ripple::uint256 hash,
uint32_t sequence)
: backend(f), hash(hash), sequence(sequence)
{
}
};
bool bool
doFinishWrites() const override doFinishWrites() const override
@@ -872,37 +698,7 @@ public:
writeLedger( writeLedger(
ripple::LedgerInfo const& ledgerInfo, ripple::LedgerInfo const& ledgerInfo,
std::string&& header, std::string&& header,
bool isFirst = false) const override bool isFirst = false) const override;
{
WriteLedgerHeaderCallbackData* headerCb =
new WriteLedgerHeaderCallbackData(
this, ledgerInfo.seq, std::move(header));
WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData(
this, ledgerInfo.hash, ledgerInfo.seq);
writeLedgerHeader(*headerCb, false);
writeLedgerHash(*hashCb, false);
ledgerSequence_ = ledgerInfo.seq;
isFirstLedger_ = isFirst;
}
void
writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const
{
CassandraStatement statement{insertLedgerHash_};
statement.bindBytes(cb.hash);
statement.bindInt(cb.sequence);
executeAsyncWrite(
statement, flatMapWriteLedgerHashCallback, cb, isRetry);
}
void
writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const
{
CassandraStatement statement{insertLedgerHeader_};
statement.bindInt(cb.sequence);
statement.bindBytes(cb.header);
executeAsyncWrite(
statement, flatMapWriteLedgerHeaderCallback, cb, isRetry);
}
std::optional<uint32_t> std::optional<uint32_t>
fetchLatestLedgerSequence() const override fetchLatestLedgerSequence() const override
@@ -956,7 +752,7 @@ public:
} }
std::optional<LedgerRange> std::optional<LedgerRange>
fetchLedgerRange() const override; hardFetchLedgerRange() const override;
std::vector<TransactionAndMetadata> std::vector<TransactionAndMetadata>
fetchAllTransactionsInLedger(uint32_t ledgerSequence) const override; fetchAllTransactionsInLedger(uint32_t ledgerSequence) const override;
@@ -964,11 +760,8 @@ public:
std::vector<ripple::uint256> std::vector<ripple::uint256>
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override; fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override;
// Synchronously fetch the object with key key and store the result in // Synchronously fetch the object with key key, as of ledger with sequence
// pno // sequence
// @param key the key of the object
// @param pno object in which to store the result
// @return result status of query
std::optional<Blob> std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const override const override
@@ -983,7 +776,10 @@ public:
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows"; BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows";
return {}; return {};
} }
return result.getBytes(); auto res = result.getBytes();
if (res.size())
return res;
return {};
} }
std::optional<int64_t> std::optional<int64_t>
@@ -1024,18 +820,6 @@ public:
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const override; std::uint32_t limit) const override;
std::vector<LedgerObject>
fetchLedgerDiff(uint32_t ledgerSequence) const;
std::map<uint32_t, std::vector<LedgerObject>>
fetchLedgerDiffs(std::vector<uint32_t> const& sequences) const;
bool
runIndexer(uint32_t ledgerSequence) const;
bool
isIndexed(uint32_t ledgerSequence) const;
std::optional<uint32_t>
getNextToIndex() const;
bool bool
writeKeys( writeKeys(
@@ -1043,208 +827,15 @@ public:
KeyIndex const& index, KeyIndex const& index,
bool isAsync = false) const override; bool isAsync = false) const override;
bool
canFetchBatch()
{
return true;
}
struct ReadCallbackData
{
CassandraBackend const& backend;
ripple::uint256 const& hash;
TransactionAndMetadata& result;
std::mutex& mtx;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& hash,
TransactionAndMetadata& result,
std::mutex& mtx,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, hash(hash)
, result(result)
, mtx(mtx)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
ReadCallbackData(ReadCallbackData const& other) = default;
};
std::vector<TransactionAndMetadata> std::vector<TransactionAndMetadata>
fetchTransactions(std::vector<ripple::uint256> const& hashes) const override fetchTransactions(
{ std::vector<ripple::uint256> const& hashes) const override;
std::size_t const numHashes = hashes.size();
BOOST_LOG_TRIVIAL(debug)
<< "Fetching " << numHashes << " transactions from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<TransactionAndMetadata> results{numHashes};
std::vector<std::shared_ptr<ReadCallbackData>> cbs;
cbs.reserve(numHashes);
for (std::size_t i = 0; i < hashes.size(); ++i)
{
cbs.push_back(std::make_shared<ReadCallbackData>(
*this, hashes[i], results[i], mtx, cv, numFinished, numHashes));
read(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &numHashes]() {
return numFinished == numHashes;
});
for (auto const& res : results)
{
if (res.transaction.size() == 1 && res.transaction[0] == 0)
throw DatabaseTimeout();
}
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << numHashes << " transactions from Cassandra";
return results;
}
void
read(ReadCallbackData& data) const
{
CassandraStatement statement{selectTransaction_};
statement.bindBytes(data.hash);
executeAsyncRead(statement, flatMapReadCallback, data);
}
struct ReadObjectCallbackData
{
CassandraBackend const& backend;
ripple::uint256 const& key;
uint32_t sequence;
Blob& result;
std::mutex& mtx;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadObjectCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& key,
uint32_t sequence,
Blob& result,
std::mutex& mtx,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, key(key)
, sequence(sequence)
, result(result)
, mtx(mtx)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
ReadObjectCallbackData(ReadObjectCallbackData const& other) = default;
};
void
readObject(ReadObjectCallbackData& data) const
{
CassandraStatement statement{selectObject_};
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
executeAsyncRead(statement, flatMapReadObjectCallback, data);
}
std::vector<Blob> std::vector<Blob>
fetchLedgerObjects( fetchLedgerObjects(
std::vector<ripple::uint256> const& keys, std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override; uint32_t sequence) const override;
struct WriteCallbackData
{
CassandraBackend const* backend;
std::string key;
uint32_t sequence;
uint32_t createdSequence = 0;
std::string blob;
bool isCreated;
bool isDeleted;
std::optional<ripple::uint256> book;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteCallbackData(
CassandraBackend const* f,
std::string&& key,
uint32_t sequence,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& inBook)
: backend(f)
, key(std::move(key))
, sequence(sequence)
, blob(std::move(blob))
, isCreated(isCreated)
, isDeleted(isDeleted)
, book(std::move(inBook))
{
}
};
struct WriteAccountTxCallbackData
{
CassandraBackend const* backend;
ripple::AccountID account;
uint32_t ledgerSequence;
uint32_t transactionIndex;
ripple::uint256 txHash;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteAccountTxCallbackData(
CassandraBackend const* f,
ripple::AccountID&& account,
uint32_t lgrSeq,
uint32_t txIdx,
ripple::uint256&& hash)
: backend(f)
, account(std::move(account))
, ledgerSequence(lgrSeq)
, transactionIndex(txIdx)
, txHash(std::move(hash))
{
}
};
void
write(WriteCallbackData& data, bool isRetry) const
{
{
CassandraStatement statement{insertObject_};
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
statement.bindBytes(data.blob);
executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry);
}
}
void void
doWriteLedgerObject( doWriteLedgerObject(
std::string&& key, std::string&& key,
@@ -1252,112 +843,18 @@ public:
std::string&& blob, std::string&& blob,
bool isCreated, bool isCreated,
bool isDeleted, bool isDeleted,
std::optional<ripple::uint256>&& book) const override std::optional<ripple::uint256>&& book) const override;
{
BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
bool hasBook = book.has_value();
WriteCallbackData* data = new WriteCallbackData(
this,
std::move(key),
seq,
std::move(blob),
isCreated,
isDeleted,
std::move(book));
write(*data, false);
}
void void
writeAccountTransactions( writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const override std::vector<AccountTransactionsData>&& data) const override;
{
for (auto& record : data)
{
for (auto& account : record.accounts)
{
WriteAccountTxCallbackData* cbData =
new WriteAccountTxCallbackData(
this,
std::move(account),
record.ledgerSequence,
record.transactionIndex,
std::move(record.txHash));
writeAccountTx(*cbData, false);
}
}
}
void
writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const
{
CassandraStatement statement(insertAccountTx_);
statement.bindBytes(data.account);
statement.bindIntTuple(data.ledgerSequence, data.transactionIndex);
statement.bindBytes(data.txHash);
executeAsyncWrite(
statement, flatMapWriteAccountTxCallback, data, isRetry);
}
struct WriteTransactionCallbackData
{
CassandraBackend const* backend;
// The shared pointer to the node object must exist until it's
// confirmed persisted. Otherwise, it can become deleted
// prematurely if other copies are removed from caches.
std::string hash;
uint32_t sequence;
std::string transaction;
std::string metadata;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteTransactionCallbackData(
CassandraBackend const* f,
std::string&& hash,
uint32_t sequence,
std::string&& transaction,
std::string&& metadata)
: backend(f)
, hash(std::move(hash))
, sequence(sequence)
, transaction(std::move(transaction))
, metadata(std::move(metadata))
{
}
};
void
writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const
{
CassandraStatement statement{insertTransaction_};
statement.bindBytes(data.hash);
statement.bindInt(data.sequence);
statement.bindBytes(data.transaction);
statement.bindBytes(data.metadata);
executeAsyncWrite(
statement, flatMapWriteTransactionCallback, data, isRetry);
}
void void
writeTransaction( writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const override std::string&& metadata) const override;
{
BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra";
WriteTransactionCallbackData* data = new WriteTransactionCallbackData(
this,
std::move(hash),
seq,
std::move(transaction),
std::move(metadata));
writeTransaction(*data, false);
}
void void
startWrites() const override startWrites() const override
@@ -1380,28 +877,6 @@ public:
return ioContext_; return ioContext_;
} }
friend void
flatMapWriteCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteBookCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
friend void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
inline void inline void
incremementOutstandingRequestCount() const incremementOutstandingRequestCount() const
{ {

View File

@@ -214,7 +214,7 @@ PostgresBackend::fetchLedgerByHash(ripple::uint256 const& hash) const
} }
std::optional<LedgerRange> std::optional<LedgerRange>
PostgresBackend::fetchLedgerRange() const PostgresBackend::hardFetchLedgerRange() const
{ {
auto range = PgQuery(pgPool_)("SELECT complete_ledgers()"); auto range = PgQuery(pgPool_)("SELECT complete_ledgers()");
if (!range) if (!range)
@@ -729,7 +729,7 @@ PostgresBackend::writeKeys(
bool bool
PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
{ {
auto rng = fetchLedgerRangeNoThrow(); auto rng = fetchLedgerRange();
if (!rng) if (!rng)
return false; return false;
uint32_t minLedger = rng->maxSequence - numLedgersToKeep; uint32_t minLedger = rng->maxSequence - numLedgersToKeep;

View File

@@ -30,9 +30,6 @@ public:
std::optional<ripple::LedgerInfo> std::optional<ripple::LedgerInfo>
fetchLedgerByHash(ripple::uint256 const& hash) const override; fetchLedgerByHash(ripple::uint256 const& hash) const override;
std::optional<LedgerRange>
fetchLedgerRange() const override;
std::optional<Blob> std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const override; const override;
@@ -47,6 +44,9 @@ public:
std::vector<ripple::uint256> std::vector<ripple::uint256>
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override; fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override;
std::optional<LedgerRange>
hardFetchLedgerRange() const override;
LedgerPage LedgerPage
doFetchLedgerPage( doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,

View File

@@ -26,12 +26,11 @@
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <webserver/SubscriptionManager.h>
#include <cstdlib> #include <cstdlib>
#include <iostream> #include <iostream>
#include <webserver/SubscriptionManager.h>
#include <string> #include <string>
#include <variant> #include <variant>
#include <webserver/SubscriptionManager.h>
namespace detail { namespace detail {
/// Convenience function for printing out basic ledger info /// Convenience function for printing out basic ledger info
@@ -90,7 +89,7 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence) ReportingETL::loadInitialLedger(uint32_t startingSequence)
{ {
// check that database is actually empty // check that database is actually empty
auto rng = backend_->fetchLedgerRangeNoThrow(); auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) if (rng)
{ {
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
@@ -115,10 +114,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
<< "Deserialized ledger header. " << detail::toString(lgrInfo); << "Deserialized ledger header. " << detail::toString(lgrInfo);
backend_->startWrites(); backend_->startWrites();
BOOST_LOG_TRIVIAL(debug) << __func__ << " started writes";
backend_->writeLedger( backend_->writeLedger(
lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true);
BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote ledger";
std::vector<AccountTransactionsData> accountTxData = std::vector<AccountTransactionsData> accountTxData =
insertTransactions(lgrInfo, *ledgerData); insertTransactions(lgrInfo, *ledgerData);
BOOST_LOG_TRIVIAL(debug) << __func__ << " inserted txns";
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
@@ -127,6 +129,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
// consumes from the queue and inserts the data into the Ledger object. // consumes from the queue and inserts the data into the Ledger object.
// Once the below call returns, all data has been pushed into the queue // Once the below call returns, all data has been pushed into the queue
loadBalancer_->loadInitialLedger(startingSequence); loadBalancer_->loadInitialLedger(startingSequence);
BOOST_LOG_TRIVIAL(debug) << __func__ << " loaded initial ledger";
if (!stopping_) if (!stopping_)
{ {
@@ -142,11 +145,12 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
void void
ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{ {
auto ledgerRange = backend_->fetchLedgerRangeNoThrow(); backend_->updateRange(lgrInfo.seq);
auto ledgerRange = backend_->fetchLedgerRange();
std::optional<ripple::Fees> fees; std::optional<ripple::Fees> fees;
std::vector<Backend::TransactionAndMetadata> transactions; std::vector<Backend::TransactionAndMetadata> transactions;
for(;;) for (;;)
{ {
try try
{ {
@@ -189,7 +193,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
{ {
try try
{ {
auto range = backend_->fetchLedgerRangeNoThrow(); auto range = backend_->hardFetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence) if (!range || range->maxSequence < ledgerSequence)
{ {
@@ -395,7 +399,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Starting etl pipeline"; << "Starting etl pipeline";
writing_ = true; writing_ = true;
auto rng = backend_->fetchLedgerRangeNoThrow(); auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (!rng || rng->maxSequence != startSequence - 1) if (!rng || rng->maxSequence != startSequence - 1)
{ {
assert(false); assert(false);
@@ -497,6 +501,31 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
beast::setCurrentThreadName("rippled: ReportingETL transform"); beast::setCurrentThreadName("rippled: ReportingETL transform");
uint32_t currentSequence = startSequence; uint32_t currentSequence = startSequence;
int counter = 0;
std::atomic_int per = 100;
auto startTimer = [this, &per]() {
auto innerFunc = [this, &per](auto& f) -> void {
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
ioContext_,
std::chrono::steady_clock::now() +
std::chrono::minutes(5));
timer->async_wait(
[timer, f, &per](const boost::system::error_code& error) {
++per;
BOOST_LOG_TRIVIAL(info)
<< "Incremented per to " << std::to_string(per);
if (per > 100)
per = 100;
f(f);
});
};
innerFunc(innerFunc);
};
// startTimer();
auto begin = std::chrono::system_clock::now();
while (!writeConflict) while (!writeConflict)
{ {
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{ std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{
@@ -548,11 +577,24 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
BOOST_LOG_TRIVIAL(info) << "Running online delete"; BOOST_LOG_TRIVIAL(info) << "Running online delete";
backend_->doOnlineDelete(*onlineDeleteInterval_); backend_->doOnlineDelete(*onlineDeleteInterval_);
BOOST_LOG_TRIVIAL(info) << "Finished online delete"; BOOST_LOG_TRIVIAL(info) << "Finished online delete";
auto rng = backend_->fetchLedgerRangeNoThrow(); auto rng = backend_->fetchLedgerRange();
minSequence = rng->minSequence; minSequence = rng->minSequence;
deleting_ = false; deleting_ = false;
}); });
} }
/*
if (++counter >= per)
{
std::chrono::milliseconds sleep =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::seconds(4) - (end - begin));
BOOST_LOG_TRIVIAL(info) << "Sleeping for " << sleep.count()
<< " . per = " << std::to_string(per);
std::this_thread::sleep_for(sleep);
counter = 0;
begin = std::chrono::system_clock::now();
}
*/
} }
}}; }};

View File

@@ -17,13 +17,12 @@
*/ */
//============================================================================== //==============================================================================
#include <handlers/RPCHelpers.h>
#include <handlers/methods/Transaction.h>
#include <backend/BackendInterface.h> #include <backend/BackendInterface.h>
#include <backend/Pg.h> #include <backend/Pg.h>
#include <handlers/RPCHelpers.h>
#include <handlers/methods/Transaction.h>
namespace RPC namespace RPC {
{
Result Result
doAccountTx(Context const& context) doAccountTx(Context const& context)
@@ -31,10 +30,10 @@ doAccountTx(Context const& context)
auto request = context.params; auto request = context.params;
boost::json::object response = {}; boost::json::object response = {};
if(!request.contains("account")) if (!request.contains("account"))
return Status{Error::rpcINVALID_PARAMS, "missingAccount"}; return Status{Error::rpcINVALID_PARAMS, "missingAccount"};
if(!request.at("account").is_string()) if (!request.at("account").is_string())
return Status{Error::rpcINVALID_PARAMS, "accountNotString"}; return Status{Error::rpcINVALID_PARAMS, "accountNotString"};
auto accountID = auto accountID =
@@ -44,9 +43,9 @@ doAccountTx(Context const& context)
return Status{Error::rpcINVALID_PARAMS, "malformedAccount"}; return Status{Error::rpcINVALID_PARAMS, "malformedAccount"};
bool binary = false; bool binary = false;
if(request.contains("binary")) if (request.contains("binary"))
{ {
if(!request.at("binary").is_bool()) if (!request.at("binary").is_bool())
return Status{Error::rpcINVALID_PARAMS, "binaryFlagNotBool"}; return Status{Error::rpcINVALID_PARAMS, "binaryFlagNotBool"};
binary = request.at("binary").as_bool(); binary = request.at("binary").as_bool();
@@ -64,15 +63,16 @@ doAccountTx(Context const& context)
std::optional<Backend::AccountTransactionsCursor> cursor; std::optional<Backend::AccountTransactionsCursor> cursor;
cursor = {context.range.maxSequence, 0}; cursor = {context.range.maxSequence, 0};
if (request.contains("cursor")) if (request.contains("marker"))
{ {
auto const& obj = request.at("cursor").as_object(); auto const& obj = request.at("marker").as_object();
std::optional<std::uint32_t> transactionIndex = {}; std::optional<std::uint32_t> transactionIndex = {};
if (obj.contains("seq")) if (obj.contains("seq"))
{ {
if (!obj.at("seq").is_int64()) if (!obj.at("seq").is_int64())
return Status{Error::rpcINVALID_PARAMS, "transactionIndexNotInt"}; return Status{
Error::rpcINVALID_PARAMS, "transactionIndexNotInt"};
transactionIndex = value_to<std::uint32_t>(obj.at("seq")); transactionIndex = value_to<std::uint32_t>(obj.at("seq"));
} }
@@ -81,9 +81,9 @@ doAccountTx(Context const& context)
if (obj.contains("ledger")) if (obj.contains("ledger"))
{ {
if (!obj.at("ledger").is_int64()) if (!obj.at("ledger").is_int64())
return Status{Error::rpcINVALID_PARAMS, "transactionIndexNotInt"}; return Status{Error::rpcINVALID_PARAMS, "ledgerIndexNotInt"};
transactionIndex = value_to<std::uint32_t>(obj.at("ledger")); ledgerIndex = value_to<std::uint32_t>(obj.at("ledger"));
} }
if (!transactionIndex || !ledgerIndex) if (!transactionIndex || !ledgerIndex)
@@ -107,7 +107,7 @@ doAccountTx(Context const& context)
std::uint32_t limit = 200; std::uint32_t limit = 200;
if (request.contains("limit")) if (request.contains("limit"))
{ {
if(!request.at("limit").is_int64()) if (!request.at("limit").is_int64())
return Status{Error::rpcINVALID_PARAMS, "limitNotInt"}; return Status{Error::rpcINVALID_PARAMS, "limitNotInt"};
limit = request.at("limit").as_int64(); limit = request.at("limit").as_int64();
@@ -117,14 +117,15 @@ doAccountTx(Context const& context)
response["limit"] = limit; response["limit"] = limit;
} }
boost::json::array txns; boost::json::array txns;
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
auto [blobs, retCursor] = auto [blobs, retCursor] =
context.backend->fetchAccountTransactions(*accountID, limit, cursor); context.backend->fetchAccountTransactions(*accountID, limit, cursor);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info) << __func__ << " db fetch took " << ((end - start).count() / 1000000000.0) << " num blobs = " << blobs.size(); BOOST_LOG_TRIVIAL(info) << __func__ << " db fetch took "
<< ((end - start).count() / 1000000000.0)
<< " num blobs = " << blobs.size();
response["account"] = ripple::to_string(*accountID); response["account"] = ripple::to_string(*accountID);
response["ledger_index_min"] = minIndex; response["ledger_index_min"] = minIndex;
@@ -132,6 +133,7 @@ doAccountTx(Context const& context)
if (retCursor) if (retCursor)
{ {
BOOST_LOG_TRIVIAL(debug) << "setting json cursor";
boost::json::object cursorJson; boost::json::object cursorJson;
cursorJson["ledger"] = retCursor->ledgerSequence; cursorJson["ledger"] = retCursor->ledgerSequence;
cursorJson["seq"] = retCursor->transactionIndex; cursorJson["seq"] = retCursor->transactionIndex;
@@ -157,7 +159,6 @@ doAccountTx(Context const& context)
obj["tx"] = toJson(*txn); obj["tx"] = toJson(*txn);
obj["tx"].as_object()["ledger_index"] = txnPlusMeta.ledgerSequence; obj["tx"].as_object()["ledger_index"] = txnPlusMeta.ledgerSequence;
obj["tx"].as_object()["inLedger"] = txnPlusMeta.ledgerSequence; obj["tx"].as_object()["inLedger"] = txnPlusMeta.ledgerSequence;
} }
else else
{ {
@@ -174,9 +175,10 @@ doAccountTx(Context const& context)
response["transactions"] = txns; response["transactions"] = txns;
auto end2 = std::chrono::system_clock::now(); auto end2 = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info) << __func__ << " serialization took " << ((end2 - end).count() / 1000000000.0); BOOST_LOG_TRIVIAL(info) << __func__ << " serialization took "
<< ((end2 - end).count() / 1000000000.0);
return response; return response;
} }
} // namespace RPC } // namespace RPC

View File

@@ -37,8 +37,8 @@
#include <thread> #include <thread>
#include <handlers/Handlers.h> #include <handlers/Handlers.h>
#include <webserver/DOSGuard.h>
#include <vector> #include <vector>
#include <webserver/DOSGuard.h>
namespace http = boost::beast::http; namespace http = boost::beast::http;
namespace net = boost::asio; namespace net = boost::asio;
@@ -93,9 +93,9 @@ handle_request(
std::string const& ip) std::string const& ip)
{ {
auto const httpResponse = [&req]( auto const httpResponse = [&req](
http::status status, http::status status,
std::string content_type, std::string content_type,
std::string message) { std::string message) {
http::response<http::string_body> res{status, req.version()}; http::response<http::string_body> res{status, req.version()};
res.set(http::field::server, "xrpl-reporting-server-v0.0.0"); res.set(http::field::server, "xrpl-reporting-server-v0.0.0");
res.set(http::field::content_type, content_type); res.set(http::field::content_type, content_type);
@@ -119,8 +119,7 @@ handle_request(
return send(httpResponse( return send(httpResponse(
http::status::ok, http::status::ok,
"application/json", "application/json",
boost::json::serialize( boost::json::serialize(RPC::make_error(RPC::Error::rpcSLOW_DOWN))));
RPC::make_error(RPC::Error::rpcSLOW_DOWN))));
try try
{ {
@@ -156,13 +155,8 @@ handle_request(
boost::json::serialize( boost::json::serialize(
RPC::make_error(RPC::Error::rpcNOT_READY)))); RPC::make_error(RPC::Error::rpcNOT_READY))));
std::optional<RPC::Context> context = RPC::make_HttpContext( std::optional<RPC::Context> context =
request, RPC::make_HttpContext(request, backend, nullptr, balancer, *range);
backend,
nullptr,
balancer,
*range
);
if (!context) if (!context)
return send(httpResponse( return send(httpResponse(
@@ -198,10 +192,8 @@ handle_request(
if (!dosGuard.add(ip, responseStr.size())) if (!dosGuard.add(ip, responseStr.size()))
result["warning"] = "Too many requests"; result["warning"] = "Too many requests";
return send(httpResponse( return send(
http::status::ok, httpResponse(http::status::ok, "application/json", responseStr));
"application/json",
responseStr));
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {

44
test.py
View File

@@ -219,7 +219,7 @@ async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None):
await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
#print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
return res return res
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
print(e) print(e)
@@ -322,6 +322,8 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No
req["ledger_index_min"] = minLedger req["ledger_index_min"] = minLedger
req["ledger_index_max"] = maxLedger req["ledger_index_max"] = maxLedger
start = datetime.datetime.now().timestamp() start = datetime.datetime.now().timestamp()
print("sending")
print(req)
await ws.send(json.dumps(req)) await ws.send(json.dumps(req))
res = await ws.recv() res = await ws.recv()
@@ -329,7 +331,8 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No
print(end - start) print(end - start)
res = json.loads(res) res = json.loads(res)
#print(json.dumps(res,indent=4,sort_keys=True)) #print(res)
print(json.dumps(res,indent=4,sort_keys=True))
if "result" in res: if "result" in res:
print(len(res["result"]["transactions"])) print(len(res["result"]["transactions"]))
else: else:
@@ -341,14 +344,14 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No
if "cursor" in res: if "cursor" in res:
cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]} cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]}
print(cursor) print(cursor)
if "marker" in res:
marker = {"ledger":res["marker"]["ledger"],"seq":res["marker"]["seq"]}
print(marker)
elif "result" in res and "marker" in res["result"]: elif "result" in res and "marker" in res["result"]:
marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]} marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]}
print(marker) print(marker)
else: else:
print(res) print("no cursor or marker")
break
if numCalls > numPages:
print("breaking")
break break
return results return results
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
@@ -665,7 +668,8 @@ def getHashes(res):
res = res["result"]["ledger"] res = res["result"]["ledger"]
hashes = [] hashes = []
for x in res["transactions"]: for x in res["ledger"]["transactions"]:
print(x)
if "hash" in x: if "hash" in x:
hashes.append(x["hash"]) hashes.append(x["hash"])
elif "transaction" in x and "hash" in x["transaction"]: elif "transaction" in x and "hash" in x["transaction"]:
@@ -699,11 +703,14 @@ async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls
start = datetime.datetime.now().timestamp() start = datetime.datetime.now().timestamp()
await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":True, "transactions":bool(transactions),"expand":bool(expand)})) await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":True, "transactions":bool(transactions),"expand":bool(expand)}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(res["header"]["blob"])
end = datetime.datetime.now().timestamp() end = datetime.datetime.now().timestamp()
if (end - start) > 0.1: if (end - start) > 0.1:
print("request took more than 100ms") print("request took more than 100ms : " + str(end - start))
numCalls = numCalls + 1 numCalls = numCalls + 1
if "error" in res:
print(res["error"])
else:
print(res["header"]["blob"])
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
print(e) print(e)
@@ -842,8 +849,13 @@ args = parser.parse_args()
def run(args): def run(args):
asyncio.set_event_loop(asyncio.new_event_loop()) asyncio.set_event_loop(asyncio.new_event_loop())
if(args.ledger is None): rng =asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))
args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] if args.ledger is None:
args.ledger = rng[1]
if args.maxLedger == -1:
args.maxLedger = rng[1]
if args.minLedger == -1:
args.minLedger = rng[0]
if args.action == "fee": if args.action == "fee":
asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port))
elif args.action == "server_info": elif args.action == "server_info":
@@ -891,6 +903,7 @@ def run(args):
end = datetime.datetime.now().timestamp() end = datetime.datetime.now().timestamp()
num = int(args.numRunners) * int(args.numCalls) num = int(args.numRunners) * int(args.numCalls)
print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second")
print("Latency = " + str((end - start) / int(args.numCalls)) + " seconds")
elif args.action == "ledger_entries": elif args.action == "ledger_entries":
keys = [] keys = []
ledger_index = 0 ledger_index = 0
@@ -1012,7 +1025,8 @@ def run(args):
args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0]
res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False))
args.account = res["transaction"]["Account"] print(res)
args.account = res["Account"]
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx(args.ip, args.port, args.account, args.binary)) account_tx(args.ip, args.port, args.account, args.binary))
@@ -1031,16 +1045,16 @@ def run(args):
args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0]
res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False))
args.account = res["transaction"]["Account"] args.account = res["Account"]
print("starting") print("starting")
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages))) account_tx_full(args.ip, args.port, args.account, args.binary,None,None))
rng = getMinAndMax(res) rng = getMinAndMax(res)
print(len(res["transactions"])) print(len(res["transactions"]))
print(args.account) print(args.account)
txs = set() txs = set()
for x in res["transactions"]: for x in res["transactions"]:
txs.add((x["transaction"],x["ledger_sequence"])) txs.add((x["tx_blob"],x["ledger_index"]))
print(len(txs)) print(len(txs))
if args.verify: if args.verify: