checkpoint

This commit is contained in:
CJ Cobb
2021-06-07 20:19:47 -04:00
parent 51ff08b3ae
commit 5f265c5850
8 changed files with 51 additions and 210 deletions

View File

@@ -87,7 +87,6 @@ target_sources(reporting PRIVATE
handlers/LedgerRange.cpp
handlers/Ledger.cpp
handlers/LedgerEntry.cpp
<<<<<<< HEAD
handlers/AccountChannels.cpp
handlers/AccountLines.cpp
handlers/AccountCurrencies.cpp
@@ -95,10 +94,8 @@ target_sources(reporting PRIVATE
handlers/AccountObjects.cpp
handlers/ChannelAuthorize.cpp
handlers/ChannelVerify.cpp
handlers/Subscribe.cpp)
=======
handlers/Subscribe.cpp
handlers/ServerInfo.cpp)
>>>>>>> dev
message(${Boost_LIBRARIES})

View File

@@ -9,8 +9,8 @@ accountFromStringStrict(std::string const& account)
boost::optional<ripple::PublicKey> publicKey = {};
if (blob && ripple::publicKeyType(ripple::makeSlice(*blob)))
{
publicKey = ripple::PublicKey(
ripple::Slice{blob->data(), blob->size()});
publicKey =
ripple::PublicKey(ripple::Slice{blob->data(), blob->size()});
}
else
{
@@ -51,19 +51,17 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs)
return result;
}
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::TxMeta const>>
deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq)
deserializeTxPlusMeta(
Backend::TransactionAndMetadata const& blobs,
std::uint32_t seq)
{
auto [tx, meta] = deserializeTxPlusMeta(blobs);
std::shared_ptr<ripple::TxMeta> m =
std::make_shared<ripple::TxMeta>(
tx->getTransactionID(),
seq,
*meta);
std::make_shared<ripple::TxMeta>(tx->getTransactionID(), seq, *meta);
return {tx, m};
}
@@ -82,8 +80,7 @@ toJson(ripple::STBase const& obj)
}
boost::json::object
<<<<<<< HEAD
getJson(ripple::TxMeta const& meta)
toJson(ripple::TxMeta const& meta)
{
auto start = std::chrono::system_clock::now();
boost::json::value value = boost::json::parse(
@@ -95,20 +92,8 @@ getJson(ripple::TxMeta const& meta)
return value.as_object();
}
boost::json::value
getJson(Json::Value const& value)
{
boost::json::value boostValue =
boost::json::parse(value.toStyledString());
return boostValue;
}
boost::json::object
getJson(ripple::SLE const& sle)
=======
toJson(ripple::SLE const& sle)
>>>>>>> dev
{
auto start = std::chrono::system_clock::now();
boost::json::value value = boost::json::parse(
@@ -154,7 +139,6 @@ ledgerSequenceFromRequest(
return request.at("ledger_index").as_int64();
}
}
<<<<<<< HEAD
std::optional<ripple::uint256>
traverseOwnedNodes(
@@ -173,8 +157,7 @@ traverseOwnedNodes(
auto start = std::chrono::system_clock::now();
for (;;)
{
auto ownedNode =
backend.fetchLedgerObject(currentIndex.key, sequence);
auto ownedNode = backend.fetchLedgerObject(currentIndex.key, sequence);
if (!ownedNode)
{
@@ -199,15 +182,14 @@ traverseOwnedNodes(
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time loading owned directories: "
<< ((end - start).count() / 1000000000.0);
<< ((end - start).count() / 1000000000.0);
start = std::chrono::system_clock::now();
auto objects = backend.fetchLedgerObjects(keys, sequence);
end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time loading owned entries: "
<< ((end - start).count() / 1000000000.0);
<< ((end - start).count() / 1000000000.0);
for (auto i = 0; i < objects.size(); ++i)
{
@@ -215,7 +197,7 @@ traverseOwnedNodes(
ripple::SLE sle(it, keys[i]);
if (!atOwnedNode(sle))
{
nextCursor = keys[i+1];
nextCursor = keys[i + 1];
break;
}
}
@@ -232,8 +214,8 @@ parseRippleLibSeed(boost::json::value const& value)
if (!value.is_string())
return {};
auto const result =
ripple::decodeBase58Token(value.as_string().c_str(), ripple::TokenType::None);
auto const result = ripple::decodeBase58Token(
value.as_string().c_str(), ripple::TokenType::None);
if (result.size() == 18 &&
static_cast<std::uint8_t>(result[0]) == std::uint8_t(0xE1) &&
@@ -251,10 +233,7 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error)
// All of the secret types we allow, but only one at a time.
// The array should be constexpr, but that makes Visual Studio unhappy.
static std::string const secretTypes[]{
"passphrase",
"secret",
"seed",
"seed_hex"};
"passphrase", "secret", "seed", "seed_hex"};
// Identify which secret type is in use.
std::string secretType = "";
@@ -276,8 +255,9 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error)
if (count > 1)
{
error = "Exactly one of the following must be specified: "
" passphrase, secret, seed, or seed_hex";
error =
"Exactly one of the following must be specified: "
" passphrase, secret, seed, or seed_hex";
return {};
}
@@ -319,7 +299,8 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error)
{
// If the user passed in an Ed25519 seed but *explicitly*
// requested another key type, return an error.
if (keyType.value_or(ripple::KeyType::ed25519) != ripple::KeyType::ed25519)
if (keyType.value_or(ripple::KeyType::ed25519) !=
ripple::KeyType::ed25519)
{
error = "Specified seed is for an Ed25519 wallet.";
return {};
@@ -374,8 +355,8 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error)
return {};
}
if (keyType != ripple::KeyType::secp256k1
&& keyType != ripple::KeyType::ed25519)
if (keyType != ripple::KeyType::secp256k1 &&
keyType != ripple::KeyType::ed25519)
{
error = "keypairForSignature: invalid key type";
return {};
@@ -406,7 +387,7 @@ getAccountsFromTransaction(boost::json::object const& transaction)
}
return accounts;
=======
}
std::vector<unsigned char>
ledgerInfoToBlob(ripple::LedgerInfo const& info)
{
@@ -422,5 +403,4 @@ ledgerInfoToBlob(ripple::LedgerInfo const& info)
s.add8(info.closeFlags);
s.addBitString(info.hash);
return s.peekData();
>>>>>>> dev
}

View File

@@ -18,7 +18,9 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs);
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::TxMeta const>>
deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq);
deserializeTxPlusMeta(
Backend::TransactionAndMetadata const& blobs,
std::uint32_t seq);
boost::json::object
toJson(ripple::STBase const& obj);
@@ -40,7 +42,6 @@ ledgerSequenceFromRequest(
boost::json::object const& request,
BackendInterface const& backend);
<<<<<<< HEAD
std::optional<ripple::uint256>
traverseOwnedNodes(
BackendInterface const& backend,
@@ -56,9 +57,8 @@ keypairFromRequst(
std::vector<ripple::AccountID>
getAccountsFromTransaction(boost::json::object const& transaction);
=======
std::vector<unsigned char>
ledgerInfoToBlob(ripple::LedgerInfo const& info);
>>>>>>> dev
#endif

View File

@@ -10,13 +10,9 @@ namespace Backend {
std::unique_ptr<BackendInterface>
make_Backend(boost::json::object const& config)
{
<<<<<<< HEAD
BOOST_LOG_TRIVIAL(info) << __func__ << ": Constructing BackendInterface";
boost::json::object const& dbConfig = config.at("database").as_object();
=======
boost::json::object dbConfig = config.at("database").as_object();
>>>>>>> dev
bool readOnly = false;
if (config.contains("read_only"))
@@ -28,14 +24,10 @@ make_Backend(boost::json::object const& config)
if (boost::iequals(type, "cassandra"))
{
<<<<<<< HEAD
backend =
=======
if (config.contains("online_delete"))
dbConfig.at(type).as_object()["ttl"] =
config.at("online_delete").as_int64() * 4;
auto backend =
>>>>>>> dev
backend =
std::make_unique<CassandraBackend>(dbConfig.at(type).as_object());
}
else if (boost::iequals(type, "postgres"))
@@ -48,12 +40,13 @@ make_Backend(boost::json::object const& config)
throw std::runtime_error("Invalid database type");
backend->open(readOnly);
backend->checkFlagLedgers();
BOOST_LOG_TRIVIAL(info) << __func__
<< ": Constructed BackendInterface Successfully";
BOOST_LOG_TRIVIAL(info)
<< __func__ << ": Constructed BackendInterface Successfully";
return backend;
}
} // namespace Backend
#endif //RIPPLE_REPORTING_BACKEND_FACTORY
#endif // RIPPLE_REPORTING_BACKEND_FACTORY

View File

@@ -1364,29 +1364,11 @@ CassandraBackend::open(bool readOnly)
continue;
query.str("");
<<<<<<< HEAD
query << "CREATE INDEX ON " << tablePrefix << "objects(sequence)";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query
<< "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, transaction "
"blob, metadata blob)";
=======
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
"transaction "
"blob, metadata blob)"
<< " WITH default_time_to_live = " << std::to_string(ttl);
>>>>>>> dev
if (!executeSimpleStatement(query.str()))
continue;
@@ -1507,15 +1489,6 @@ CassandraBackend::open(bool readOnly)
continue;
query.str("");
<<<<<<< HEAD
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, sequence, quality_key) VALUES (?, ?, (?, ?))";
if (!insertBook2_.prepareStatement(query, session_.get()))
continue;
query.str("");
=======
>>>>>>> dev
query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?";
if (!selectKeys_.prepareStatement(query, session_.get()))
@@ -1639,19 +1612,6 @@ CassandraBackend::open(bool readOnly)
setupPreparedStatements = true;
}
<<<<<<< HEAD
if (config_.contains("max_requests_outstanding"))
{
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
}
if (config_.contains("indexer_max_requests_outstanding"))
{
indexerMaxRequestsOutstanding =
config_["indexer_max_requests_outstanding"].as_int64();
}
=======
>>>>>>> dev
work_.emplace(ioContext_);
ioThread_ = std::thread{[this]() { ioContext_.run(); }};
open_ = true;

View File

@@ -253,12 +253,6 @@ public:
", grpc port : " + grpcPort_ + " }";
}
<<<<<<< HEAD
boost::json::value
toJson() const
{
return boost::json::string(toString());
=======
boost::json::object
toJson() const
{
@@ -275,7 +269,6 @@ public:
std::chrono::system_clock::now() - getLastMsgTime())
.count());
return res;
>>>>>>> dev
}
/// Download a ledger in full
@@ -396,7 +389,6 @@ public:
/// to clients).
/// @param in ETLSource in question
/// @return true if messages should be forwarded
<<<<<<< HEAD
bool
shouldPropagateTxnStream(ETLSource* in) const
{
@@ -418,35 +410,11 @@ public:
}
boost::json::value
=======
// bool
// shouldPropagateTxnStream(ETLSource* in) const
// {
// for (auto& src : sources_)
// {
// assert(src);
// // We pick the first ETLSource encountered that is connected
// if (src->isConnected())
// {
// if (src.get() == in)
// return true;
// else
// return false;
// }
// }
//
// // If no sources connected, then this stream has not been
// forwarded. return true;
// }
boost::json::array
>>>>>>> dev
toJson() const
{
boost::json::array ret;
for (auto& src : sources_)
{
<<<<<<< HEAD
ret.push_back(src->toJson());
}
return ret;
@@ -462,23 +430,6 @@ public:
/// @return response received from p2p node
boost::json::object
forwardToP2p(boost::json::object const& request) const;
=======
ret.emplace_back(src->toJson());
}
return ret;
}
//
// /// Randomly select a p2p node to forward a gRPC request to
// /// @return gRPC stub to forward requests to p2p node
// std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
// getP2pForwardingStub() const;
//
// /// Forward a JSON RPC request to a randomly selected p2p node
// /// @param context context of the request
// /// @return response received from p2p node
// Json::Value
// forwardToP2p(RPC::JsonContext& context) const;
>>>>>>> dev
private:
/// f is a function that takes an ETLSource as an argument and returns a

View File

@@ -174,17 +174,17 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{
auto ledgerRange = backend_->fetchLedgerRange();
auto fees = getFees(lgrInfo.seq);
auto transactions =
backend_->fetchAllTransactionsInLedger(lgrInfo.seq);
auto transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq);
if (!fees || !ledgerRange)
{
BOOST_LOG_TRIVIAL(error) << __func__
<< " - could not fetch from database";
BOOST_LOG_TRIVIAL(error)
<< __func__ << " - could not fetch from database";
return;
}
std::string range = std::to_string(ledgerRange->minSequence) + "-" + std::to_string(ledgerRange->maxSequence);
std::string range = std::to_string(ledgerRange->minSequence) + "-" +
std::to_string(ledgerRange->maxSequence);
subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
@@ -309,14 +309,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
backend_->startWrites();
<<<<<<< HEAD
backend_->writeLedger(
=======
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "started writes";
flatMapBackend_->writeLedger(
>>>>>>> dev
lgrInfo, std::move(*rawData.mutable_ledger_header()));
backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header()));
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote ledger header";
@@ -351,9 +346,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
isDeleted,
std::move(bookDir));
}
<<<<<<< HEAD
backend_->writeAccountTransactions(std::move(accountTxData));
=======
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "wrote objects. num objects = "
@@ -365,10 +357,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
<< __func__ << " : "
<< "Inserted all transactions. Number of transactions = "
<< rawData.transactions_list().transactions_size();
flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
backend_->writeAccountTransactions(std::move(accountTxData));
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote account_tx";
>>>>>>> dev
accumTxns_ += rawData.transactions_list().transactions_size();
bool success = true;
if (accumTxns_ >= txnThreshold_)
@@ -441,10 +432,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches";
<<<<<<< HEAD
backend_->getIndexer().populateCachesAsync(*backend_);
=======
>>>>>>> dev
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches";
@@ -579,22 +566,13 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
lastPublishedSequence = lgrInfo.seq;
}
writeConflict = !success;
<<<<<<< HEAD
auto range = backend_->fetchLedgerRangeNoThrow();
=======
>>>>>>> dev
if (onlineDeleteInterval_ && !deleting_ &&
lgrInfo.seq - minSequence > *onlineDeleteInterval_)
{
deleting_ = true;
ioContext_.post([this, &minSequence]() {
BOOST_LOG_TRIVIAL(info) << "Running online delete";
<<<<<<< HEAD
backend_->doOnlineDelete(
range->maxSequence - *onlineDeleteInterval_);
=======
flatMapBackend_->doOnlineDelete(*onlineDeleteInterval_);
>>>>>>> dev
backend_->doOnlineDelete(*onlineDeleteInterval_);
BOOST_LOG_TRIVIAL(info) << "Finished online delete";
auto rng = flatMapBackend_->fetchLedgerRangeNoThrow();
minSequence = rng->minSequence;
@@ -618,10 +596,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence
<< " in " << ((end - begin).count()) / 1000000000.0;
writing_ = false;
<<<<<<< HEAD
backend_->getIndexer().clearCaches();
=======
>>>>>>> dev
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Stopping etl pipeline";
@@ -830,10 +804,5 @@ ReportingETL::ReportingETL(
extractorThreads_ = config.at("extractor_threads").as_int64();
if (config.contains("txn_threshold"))
txnThreshold_ = config.at("txn_threshold").as_int64();
<<<<<<< HEAD
=======
flatMapBackend_->open(readOnly_);
flatMapBackend_->checkFlagLedgers();
>>>>>>> dev
}

View File

@@ -27,8 +27,8 @@
#include <boost/beast/websocket.hpp>
#include <reporting/BackendInterface.h>
#include <reporting/ETLSource.h>
#include <reporting/server/SubscriptionManager.h>
#include <reporting/Pg.h>
#include <reporting/server/SubscriptionManager.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
@@ -271,8 +271,6 @@ private:
return numMarkers_;
}
<<<<<<< HEAD
=======
boost::json::object
getInfo()
{
@@ -292,7 +290,6 @@ private:
}
/// start all of the necessary components and begin ETL
>>>>>>> dev
void
run()
{
@@ -306,7 +303,6 @@ private:
doWork();
public:
ReportingETL(
boost::json::object const& config,
boost::asio::io_context& ioc,
@@ -325,12 +321,7 @@ public:
std::shared_ptr<NetworkValidatedLedgers> ledgers)
{
auto etl = std::make_shared<ReportingETL>(
config,
ioc,
backend,
subscriptions,
balancer,
ledgers);
config, ioc, backend, subscriptions, balancer, ledgers);
etl->run();