Cleanup, documentation, rename some things, cmake changes

This commit is contained in:
CJ Cobb
2021-06-23 14:43:29 +00:00
parent 8af7825d7f
commit 056e170a56
55 changed files with 288 additions and 227 deletions

View File

@@ -8,7 +8,7 @@
#
set(CMAKE_VERBOSE_MAKEFILE TRUE)
project(reporting)
project(clio)
cmake_minimum_required(VERSION 3.16)
set (CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread -Wno-narrowing")
@@ -30,22 +30,23 @@ FetchContent_MakeAvailable(googletest)
enable_testing()
include(GoogleTest)
add_executable (reporting_main
server/websocket_server_async.cpp
add_executable (clio_server
src/server/main.cpp
)
add_executable (reporting_tests
add_executable (clio_tests
unittests/main.cpp
)
add_library(reporting backend/BackendInterface.h)
add_library(clio src/backend/BackendInterface.h)
include_directories(src)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/deps")
include(ExternalProject)
message(${CMAKE_CURRENT_BINARY_DIR})
message(${CMAKE_MODULE_PATH})
#include(rippled/Builds/CMake/RippledCore.cmake)
add_subdirectory(rippled)
target_link_libraries(reporting PUBLIC xrpl_core grpc_pbufs)
add_dependencies(reporting xrpl_core)
add_dependencies(reporting grpc_pbufs)
target_link_libraries(clio PUBLIC xrpl_core grpc_pbufs)
add_dependencies(clio xrpl_core)
add_dependencies(clio grpc_pbufs)
get_target_property(grpc_includes grpc_pbufs INCLUDE_DIRECTORIES)
#get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES)
# get_target_property(proto_includes protobuf_src INCLUDE_DIRECTORIES)
@@ -66,43 +67,43 @@ include(cassandra)
include(Postgres)
target_sources(reporting PRIVATE
backend/CassandraBackend.cpp
backend/PostgresBackend.cpp
backend/BackendIndexer.cpp
backend/BackendInterface.cpp
backend/Pg.cpp
backend/DBHelpers.cpp
etl/ETLSource.cpp
etl/ReportingETL.cpp
server/Handlers.cpp
server/SubscriptionManager.cpp
handlers/AccountInfo.cpp
handlers/Tx.cpp
handlers/RPCHelpers.cpp
handlers/AccountTx.cpp
handlers/LedgerData.cpp
handlers/BookOffers.cpp
handlers/LedgerRange.cpp
handlers/Ledger.cpp
handlers/LedgerEntry.cpp
handlers/AccountChannels.cpp
handlers/AccountLines.cpp
handlers/AccountCurrencies.cpp
handlers/AccountOffers.cpp
handlers/AccountObjects.cpp
handlers/ChannelAuthorize.cpp
handlers/ChannelVerify.cpp
handlers/Subscribe.cpp
handlers/ServerInfo.cpp)
target_sources(clio PRIVATE
src/backend/CassandraBackend.cpp
src/backend/PostgresBackend.cpp
src/backend/BackendIndexer.cpp
src/backend/BackendInterface.cpp
src/backend/Pg.cpp
src/backend/DBHelpers.cpp
src/etl/ETLSource.cpp
src/etl/ReportingETL.cpp
src/server/Handlers.cpp
src/server/SubscriptionManager.cpp
src/handlers/AccountInfo.cpp
src/handlers/Tx.cpp
src/handlers/RPCHelpers.cpp
src/handlers/AccountTx.cpp
src/handlers/LedgerData.cpp
src/handlers/BookOffers.cpp
src/handlers/LedgerRange.cpp
src/handlers/Ledger.cpp
src/handlers/LedgerEntry.cpp
src/handlers/AccountChannels.cpp
src/handlers/AccountLines.cpp
src/handlers/AccountCurrencies.cpp
src/handlers/AccountOffers.cpp
src/handlers/AccountObjects.cpp
src/handlers/ChannelAuthorize.cpp
src/handlers/ChannelVerify.cpp
src/handlers/Subscribe.cpp
src/handlers/ServerInfo.cpp)
message(${Boost_LIBRARIES})
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR} ${Boost_INCLUDE_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
TARGET_LINK_LIBRARIES(reporting PUBLIC ${Boost_LIBRARIES})
TARGET_LINK_LIBRARIES(reporting_main PUBLIC reporting)
TARGET_LINK_LIBRARIES(reporting_tests PUBLIC reporting gtest_main)
TARGET_LINK_LIBRARIES(clio PUBLIC ${Boost_LIBRARIES})
TARGET_LINK_LIBRARIES(clio_server PUBLIC clio)
TARGET_LINK_LIBRARIES(clio_tests PUBLIC clio gtest_main)
gtest_discover_tests(reporting_tests)
gtest_discover_tests(clio_tests)

10
deps/Postgres.cmake vendored
View File

@@ -45,23 +45,23 @@ if(NOT PostgreSQL_FOUND)
file(TO_CMAKE_PATH "${postgres_src_SOURCE_DIR}" postgres_src_SOURCE_DIR)
target_link_libraries(reporting PUBLIC postgres)
target_link_libraries(clio PUBLIC postgres)
else()
message("Found system installed Postgres via find_libary")
target_include_directories(reporting INTERFACE ${libpq-fe})
target_include_directories(clio INTERFACE ${libpq-fe})
target_link_libraries(reporting INTERFACE ${postgres})
target_link_libraries(clio INTERFACE ${postgres})
endif()
else()
message("Found system installed Postgres via find_package")
message("${PostgreSQL_INCLUDE_DIRS}")
include_directories(reporting INTERFACE "${PostgreSQL_INCLUDE_DIRS}")
include_directories(clio INTERFACE "${PostgreSQL_INCLUDE_DIRS}")
target_link_libraries(reporting PUBLIC ${PostgreSQL_LIBRARIES})
target_link_libraries(clio PUBLIC ${PostgreSQL_LIBRARIES})
endif()

View File

@@ -152,13 +152,12 @@ if(NOT cassandra)
target_link_libraries(cassandra INTERFACE OpenSSL::SSL)
file(TO_CMAKE_PATH "${cassandra_src_SOURCE_DIR}" cassandra_src_SOURCE_DIR)
target_link_libraries(reporting PUBLIC cassandra)
target_link_libraries(clio PUBLIC cassandra)
else()
message("Found system installed cassandra cpp driver")
message(${cassandra})
find_path(cassandra_includes NAMES cassandra.h REQUIRED)
target_link_libraries (reporting PUBLIC ${cassandra})
target_include_directories(reporting INTERFACE ${cassandra_includes})
target_link_libraries (clio PUBLIC ${cassandra})
target_include_directories(clio INTERFACE ${cassandra_includes})
endif()

View File

@@ -7,7 +7,7 @@
#include <backend/PostgresBackend.h>
namespace Backend {
std::unique_ptr<BackendInterface>
std::shared_ptr<BackendInterface>
make_Backend(boost::json::object const& config)
{
BOOST_LOG_TRIVIAL(info) << __func__ << ": Constructing BackendInterface";
@@ -20,7 +20,7 @@ make_Backend(boost::json::object const& config)
auto type = dbConfig.at("type").as_string();
std::unique_ptr<BackendInterface> backend = nullptr;
std::shared_ptr<BackendInterface> backend = nullptr;
if (boost::iequals(type, "cassandra"))
{
@@ -28,12 +28,12 @@ make_Backend(boost::json::object const& config)
dbConfig.at(type).as_object()["ttl"] =
config.at("online_delete").as_int64() * 4;
backend =
std::make_unique<CassandraBackend>(dbConfig.at(type).as_object());
std::make_shared<CassandraBackend>(dbConfig.at(type).as_object());
}
else if (boost::iequals(type, "postgres"))
{
backend =
std::make_unique<PostgresBackend>(dbConfig.at(type).as_object());
std::make_shared<PostgresBackend>(dbConfig.at(type).as_object());
}
if (!backend)

View File

@@ -1,3 +1,4 @@
#include <backend/BackendIndexer.h>
#include <backend/BackendInterface.h>
namespace Backend {

View File

@@ -0,0 +1,104 @@
#ifndef CLIO_BACKEND_INDEXER_H_INCLUDED
#define CLIO_BACKEND_INDEXER_H_INCLUDED
#include <ripple/basics/base_uint.h>
#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <mutex>
#include <optional>
#include <thread>
namespace std {
template <>
struct hash<ripple::uint256>
{
std::size_t
operator()(const ripple::uint256& k) const noexcept
{
return boost::hash_range(k.begin(), k.end());
}
};
} // namespace std
namespace Backend {
// The below two structs exist to prevent developers from accidentally mixing up
// the two indexes.
struct BookIndex
{
uint32_t bookIndex;
explicit BookIndex(uint32_t v) : bookIndex(v){};
};
struct KeyIndex
{
uint32_t keyIndex;
explicit KeyIndex(uint32_t v) : keyIndex(v){};
};
class BackendInterface;
class BackendIndexer
{
boost::asio::io_context ioc_;
boost::asio::io_context::strand strand_;
std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_;
std::atomic_uint32_t indexing_ = 0;
uint32_t keyShift_ = 20;
std::unordered_set<ripple::uint256> keys;
mutable bool isFirst_ = true;
void
doKeysRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
void
writeKeyFlagLedger(
uint32_t ledgerSequence,
BackendInterface const& backend);
public:
BackendIndexer(boost::json::object const& config);
~BackendIndexer();
void
addKey(ripple::uint256&& key);
void
finish(uint32_t ledgerSequence, BackendInterface const& backend);
void
writeKeyFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend);
void
doKeysRepairAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
uint32_t
getKeyShift()
{
return keyShift_;
}
std::optional<uint32_t>
getCurrentlyIndexing()
{
uint32_t cur = indexing_.load();
if (cur != 0)
return cur;
return {};
}
KeyIndex
getKeyIndexOfSeq(uint32_t seq) const
{
if (isKeyFlagLedger(seq))
return KeyIndex{seq};
auto incr = (1 << keyShift_);
KeyIndex index{(seq >> keyShift_ << keyShift_) + incr};
assert(isKeyFlagLedger(index.keyIndex));
return index;
}
bool
isKeyFlagLedger(uint32_t ledgerSequence) const
{
return (ledgerSequence % (1 << keyShift_)) == 0;
}
};
} // namespace Backend
#endif

View File

@@ -2,20 +2,17 @@
#define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
#include <ripple/ledger/ReadView.h>
#include <boost/asio.hpp>
#include <backend/BackendIndexer.h>
#include <backend/DBHelpers.h>
namespace std {
template <>
struct hash<ripple::uint256>
{
std::size_t
operator()(const ripple::uint256& k) const noexcept
{
return boost::hash_range(k.begin(), k.end());
}
};
} // namespace std
class ReportingETL;
class AsyncCallData;
class BackendTest_Basic_Test;
namespace Backend {
// *** return types
using Blob = std::vector<unsigned char>;
struct LedgerObject
{
ripple::uint256 key;
@@ -55,19 +52,6 @@ struct LedgerRange
uint32_t maxSequence;
};
// The below two structs exist to prevent developers from accidentally mixing up
// the two indexes.
struct BookIndex
{
uint32_t bookIndex;
explicit BookIndex(uint32_t v) : bookIndex(v){};
};
struct KeyIndex
{
uint32_t keyIndex;
explicit KeyIndex(uint32_t v) : keyIndex(v){};
};
class DatabaseTimeout : public std::exception
{
const char*
@@ -76,76 +60,6 @@ class DatabaseTimeout : public std::exception
return "Database read timed out. Please retry the request";
}
};
class BackendInterface;
class BackendIndexer
{
boost::asio::io_context ioc_;
boost::asio::io_context::strand strand_;
std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_;
std::atomic_uint32_t indexing_ = 0;
uint32_t keyShift_ = 20;
std::unordered_set<ripple::uint256> keys;
mutable bool isFirst_ = true;
void
doKeysRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
void
writeKeyFlagLedger(
uint32_t ledgerSequence,
BackendInterface const& backend);
public:
BackendIndexer(boost::json::object const& config);
~BackendIndexer();
void
addKey(ripple::uint256&& key);
void
finish(uint32_t ledgerSequence, BackendInterface const& backend);
void
writeKeyFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend);
void
doKeysRepairAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
uint32_t
getKeyShift()
{
return keyShift_;
}
std::optional<uint32_t>
getCurrentlyIndexing()
{
uint32_t cur = indexing_.load();
if (cur != 0)
return cur;
return {};
}
KeyIndex
getKeyIndexOfSeq(uint32_t seq) const
{
if (isKeyFlagLedger(seq))
return KeyIndex{seq};
auto incr = (1 << keyShift_);
KeyIndex index{(seq >> keyShift_ << keyShift_) + incr};
assert(isKeyFlagLedger(index.keyIndex));
return index;
}
bool
isKeyFlagLedger(uint32_t ledgerSequence) const
{
return (ledgerSequence % (1 << keyShift_)) == 0;
}
};
class BackendInterface
{
@@ -154,10 +68,12 @@ protected:
mutable bool isFirst_ = true;
public:
// read methods
BackendInterface(boost::json::object const& config) : indexer_(config)
{
}
virtual ~BackendInterface()
{
}
BackendIndexer&
getIndexer() const
@@ -165,75 +81,34 @@ public:
return indexer_;
}
void
checkFlagLedgers() const;
std::optional<KeyIndex>
getKeyIndexOfSeq(uint32_t seq) const;
bool
finishWrites(uint32_t ledgerSequence) const;
virtual std::optional<uint32_t>
fetchLatestLedgerSequence() const = 0;
// *** public read methods ***
// All of these reads methods can throw DatabaseTimeout. When writing code
// in an RPC handler, this exception does not need to be caught: when an RPC
// results in a timeout, an error is returned to the client
public:
// *** ledger methods
virtual std::optional<ripple::LedgerInfo>
fetchLedgerBySequence(uint32_t sequence) const = 0;
virtual std::optional<uint32_t>
fetchLatestLedgerSequence() const = 0;
virtual std::optional<LedgerRange>
fetchLedgerRange() const = 0;
// Doesn't throw DatabaseTimeout. Should be used with care.
std::optional<LedgerRange>
fetchLedgerRangeNoThrow() const;
virtual std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;
// *** transaction methods
// returns a transaction, metadata pair
virtual std::optional<TransactionAndMetadata>
fetchTransaction(ripple::uint256 const& hash) const = 0;
virtual std::vector<TransactionAndMetadata>
fetchAllTransactionsInLedger(uint32_t ledgerSequence) const = 0;
virtual std::vector<ripple::uint256>
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const = 0;
LedgerPage
fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit,
std::uint32_t limitHint = 0) const;
bool
isLedgerIndexed(std::uint32_t ledgerSequence) const;
std::optional<LedgerObject>
fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence) const;
virtual LedgerPage
doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const = 0;
// TODO add warning for incomplete data
BookOffersPage
fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor = {}) const;
virtual std::vector<TransactionAndMetadata>
fetchTransactions(std::vector<ripple::uint256> const& hashes) const = 0;
virtual std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const = 0;
virtual std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
@@ -242,7 +117,58 @@ public:
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor = {}) const = 0;
// write methods
virtual std::vector<TransactionAndMetadata>
fetchAllTransactionsInLedger(uint32_t ledgerSequence) const = 0;
virtual std::vector<ripple::uint256>
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const = 0;
// *** state data methods
virtual std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;
virtual std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const = 0;
// Fetches a page of ledger objects, ordered by key/index.
// Used by ledger_data
LedgerPage
fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit,
std::uint32_t limitHint = 0) const;
// Fetches the successor to key/index. key need not actually be a valid
// key/index.
std::optional<LedgerObject>
fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence) const;
BookOffersPage
fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor = {}) const;
// Methods related to the indexer
bool
isLedgerIndexed(std::uint32_t ledgerSequence) const;
std::optional<KeyIndex>
getKeyIndexOfSeq(uint32_t seq) const;
// *** protected write methods
protected:
friend class ::ReportingETL;
friend class BackendIndexer;
friend class ::AsyncCallData;
friend std::shared_ptr<BackendInterface>
make_Backend(boost::json::object const& config);
friend class ::BackendTest_Basic_Test;
virtual void
writeLedger(
@@ -259,15 +185,6 @@ public:
bool isDeleted,
std::optional<ripple::uint256>&& book) const;
virtual void
doWriteLedgerObject(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const = 0;
virtual void
writeTransaction(
std::string&& hash,
@@ -279,7 +196,26 @@ public:
writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const = 0;
// other database methods
// TODO: this function, or something similar, could be called internally by
// writeLedgerObject
virtual bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync = false) const = 0;
// Tell the database we are about to begin writing data for a particular
// ledger.
virtual void
startWrites() const = 0;
// Tell the database we have finished writing all data for a particular
// ledger
bool
finishWrites(uint32_t ledgerSequence) const;
virtual bool
doOnlineDelete(uint32_t numLedgersToKeep) const = 0;
// Open the database. Set up all of the necessary objects and
// datastructures. After this call completes, the database is ready for
@@ -291,23 +227,28 @@ public:
virtual void
close() = 0;
// *** private helper methods
private:
virtual LedgerPage
doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const = 0;
virtual void
startWrites() const = 0;
doWriteLedgerObject(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const = 0;
virtual bool
doFinishWrites() const = 0;
virtual bool
doOnlineDelete(uint32_t numLedgersToKeep) const = 0;
virtual bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync = false) const = 0;
virtual ~BackendInterface()
{
}
void
checkFlagLedgers() const;
};
} // namespace Backend

View File

@@ -119,7 +119,7 @@ main(int argc, char* argv[])
auto const config = parse_config(argv[1]);
if (!config)
{
std::cerr << "Ccouldnt parse config. Exiting..." << std::endl;
std::cerr << "Couldnt parse config. Exiting..." << std::endl;
return EXIT_FAILURE;
}
initLogLevel(*config);
@@ -133,24 +133,39 @@ main(int argc, char* argv[])
}
BOOST_LOG_TRIVIAL(info) << "Number of workers = " << threads;
// io context to handle all incoming requests, as well as other things
// This is not the only io context in the application
boost::asio::io_context ioc{threads};
// Rate limiter, to prevent abuse
DOSGuard dosGuard{config.value(), ioc};
// Interface to the database
std::shared_ptr<BackendInterface> backend{Backend::make_Backend(*config)};
// Manages clients subscribed to streams
std::shared_ptr<SubscriptionManager> subscriptions{
SubscriptionManager::make_SubscriptionManager()};
// Tracks which ledgers have been validated by the
// network
std::shared_ptr<NetworkValidatedLedgers> ledgers{
NetworkValidatedLedgers::make_ValidatedLedgers()};
// Handles the connection to one or more rippled nodes.
// ETL uses the balancer to extract data.
// The server uses the balancer to forward RPCs to a rippled node.
// The balancer itself publishes to streams (transactions_proposed and
// accounts_proposed)
auto balancer = ETLLoadBalancer::make_ETLLoadBalancer(
*config, ioc, backend, subscriptions, ledgers);
// ETL is responsible for writing and publishing to streams. In read-only
// mode, ETL only publishes
auto etl = ReportingETL::make_ReportingETL(
*config, ioc, backend, subscriptions, balancer, ledgers);
// The server handles incoming RPCs
auto httpServer = Server::make_HttpServer(
*config, ioc, backend, subscriptions, balancer, dosGuard);

View File

@@ -681,7 +681,7 @@ import pathlib
numCalls = 0
async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls):
global numCalls
address = 'ws://' + str(ip) + ':' + str(port)
address = 'wss://' + str(ip) + ':' + str(port)
random.seed()
ledger = 0
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
@@ -690,7 +690,7 @@ async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
try:
async with websockets.connect(address,max_size=1000000000) as ws:
async with websockets.connect(address,max_size=1000000000,ssl=ssl_context) as ws:
global numCalls
for i in range(0, maxCalls):