mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 11:45:53 +00:00
@@ -1,31 +0,0 @@
|
|||||||
set(POSTGRES_INSTALL_DIR ${CMAKE_BINARY_DIR}/postgres)
|
|
||||||
set(POSTGRES_LIBS pq pgcommon pgport)
|
|
||||||
ExternalProject_Add(postgres
|
|
||||||
GIT_REPOSITORY https://github.com/postgres/postgres.git
|
|
||||||
GIT_TAG REL_14_1
|
|
||||||
GIT_SHALLOW 1
|
|
||||||
LOG_CONFIGURE 1
|
|
||||||
LOG_BUILD 1
|
|
||||||
CONFIGURE_COMMAND ./configure --prefix ${POSTGRES_INSTALL_DIR} --without-readline --verbose
|
|
||||||
BUILD_COMMAND ${CMAKE_COMMAND} -E env --unset=MAKELEVEL make VERBOSE=${CMAKE_VERBOSE_MAKEFILE} -j32
|
|
||||||
BUILD_IN_SOURCE 1
|
|
||||||
INSTALL_COMMAND ${CMAKE_COMMAND} -E env make -s --no-print-directory install
|
|
||||||
UPDATE_COMMAND ""
|
|
||||||
BUILD_BYPRODUCTS
|
|
||||||
${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}pq${CMAKE_STATIC_LIBRARY_SUFFIX}}
|
|
||||||
${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}pgcommon${CMAKE_STATIC_LIBRARY_SUFFIX}}
|
|
||||||
${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}pgport${CMAKE_STATIC_LIBRARY_SUFFIX}}
|
|
||||||
)
|
|
||||||
ExternalProject_Get_Property (postgres BINARY_DIR)
|
|
||||||
|
|
||||||
foreach(_lib ${POSTGRES_LIBS})
|
|
||||||
add_library(${_lib} STATIC IMPORTED GLOBAL)
|
|
||||||
add_dependencies(${_lib} postgres)
|
|
||||||
set_target_properties(${_lib} PROPERTIES
|
|
||||||
IMPORTED_LOCATION ${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${_lib}.a)
|
|
||||||
set_target_properties(${_lib} PROPERTIES
|
|
||||||
INTERFACE_INCLUDE_DIRECTORIES ${POSTGRES_INSTALL_DIR}/include)
|
|
||||||
target_link_libraries(clio PUBLIC ${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${_lib}${CMAKE_STATIC_LIBRARY_SUFFIX})
|
|
||||||
endforeach()
|
|
||||||
add_dependencies(clio postgres)
|
|
||||||
target_include_directories(clio PUBLIC ${POSTGRES_INSTALL_DIR}/include)
|
|
||||||
@@ -45,7 +45,6 @@ include(CMake/ClioVersion.cmake)
|
|||||||
include(CMake/deps/rippled.cmake)
|
include(CMake/deps/rippled.cmake)
|
||||||
include(CMake/deps/Boost.cmake)
|
include(CMake/deps/Boost.cmake)
|
||||||
include(CMake/deps/cassandra.cmake)
|
include(CMake/deps/cassandra.cmake)
|
||||||
include(CMake/deps/Postgres.cmake)
|
|
||||||
|
|
||||||
target_sources(clio PRIVATE
|
target_sources(clio PRIVATE
|
||||||
## Main
|
## Main
|
||||||
@@ -53,8 +52,6 @@ target_sources(clio PRIVATE
|
|||||||
## Backend
|
## Backend
|
||||||
src/backend/BackendInterface.cpp
|
src/backend/BackendInterface.cpp
|
||||||
src/backend/CassandraBackend.cpp
|
src/backend/CassandraBackend.cpp
|
||||||
src/backend/Pg.cpp
|
|
||||||
src/backend/PostgresBackend.cpp
|
|
||||||
src/backend/SimpleCache.cpp
|
src/backend/SimpleCache.cpp
|
||||||
## ETL
|
## ETL
|
||||||
src/etl/ETLSource.cpp
|
src/etl/ETLSource.cpp
|
||||||
|
|||||||
121
REVIEW.md
121
REVIEW.md
@@ -1,121 +0,0 @@
|
|||||||
# How to review clio
|
|
||||||
Clio is a massive project, and thus I don't expect the code to be reviewed the
|
|
||||||
way a normal PR would. So I put this guide together to help reviewers look at
|
|
||||||
the relevant pieces of code without getting lost in the weeds.
|
|
||||||
|
|
||||||
One thing reviewers should keep in mind is that most of clio is designed to be
|
|
||||||
lightweight and simple. We try not to introduce any uneccessary complexity and
|
|
||||||
keep the code as simple and straightforward as possible. Sometimes complexity is
|
|
||||||
unavoidable, but simplicity is the goal.
|
|
||||||
|
|
||||||
## Order of review
|
|
||||||
The code is organized into 4 main components, each with their own folder. The
|
|
||||||
code in each folder is as self contained as possible. A good way to approach
|
|
||||||
the review would be to review one folder at a time.
|
|
||||||
|
|
||||||
### backend
|
|
||||||
The code in the backend folder is the heart of the project, and reviewers should
|
|
||||||
start here. This is the most complex part of the code, as well as the most
|
|
||||||
performance sensitive. clio does not keep any data in memory, so performance
|
|
||||||
generally depends on the data model and the way we talk to the database.
|
|
||||||
|
|
||||||
Reviewers should start with the README in this folder to get a high level idea
|
|
||||||
of the data model and to review the data model itself. Then, reviewers should
|
|
||||||
dive into the implementation. The table schemas and queries for Cassandra are
|
|
||||||
defined in `CassandraBackend::open()`. The table schemas for Postgres are defined
|
|
||||||
in Pg.cpp. The queries for Postgres are defined in each of the functions of `PostgresBackend`.
|
|
||||||
A good way to approach the implementation would be to look at the table schemas,
|
|
||||||
and then go through the functions declared in `BackendInterface`. Reviewers could
|
|
||||||
also branch out to the rest of the code by looking at where these functions are
|
|
||||||
called from.
|
|
||||||
|
|
||||||
### webserver
|
|
||||||
The code in the webserver folder implements the web server for handling RPC requests.
|
|
||||||
This code was mostly copied and pasted from boost beast example code, so I would
|
|
||||||
really appreciate review here.
|
|
||||||
|
|
||||||
### rpc
|
|
||||||
The rpc folder contains all of the handlers and any helper functions they need.
|
|
||||||
This code is not too complicated, so reviewers don't need to dwell long here.
|
|
||||||
|
|
||||||
### etl
|
|
||||||
The etl folder contains all of the code for extracting data from rippled. This
|
|
||||||
code is complex and important, but most of this code was just copied from rippled
|
|
||||||
reporting mode, and thus has already been reviewed and is being used in prod.
|
|
||||||
|
|
||||||
## Design decisions that should be reviewed
|
|
||||||
|
|
||||||
### Data model
|
|
||||||
Reviewers should review the general data model. The data model itself is described
|
|
||||||
at a high level in the README in the backend folder. The table schemas and queries
|
|
||||||
for Cassandra are defined in the `open()` function of `CassandraBackend`. The table
|
|
||||||
schemas for Postgres are defined in Pg.cpp.
|
|
||||||
|
|
||||||
Particular attention should be paid to the keys table, and the problem that solves
|
|
||||||
(successor/upper bound). I originally was going to have a special table for book_offers,
|
|
||||||
but then I decided that we could use the keys table itself for that and save space.
|
|
||||||
This makes book_offers somewhat slow compared to rippled, though still very usable.
|
|
||||||
|
|
||||||
### Large rows
|
|
||||||
I did some tricks with Cassandra to deal with very large rows in the keys and account_tx
|
|
||||||
tables. For each of these, the partition key (the first component of the primary
|
|
||||||
key) is a compound key. This is meant to break large rows into smaller rows. This
|
|
||||||
is done to avoid hotspots. Data is sharded in Cassandra, and if some rows get very
|
|
||||||
large, some nodes can have a lot more data than others.
|
|
||||||
|
|
||||||
For account_tx, this has performance implications when iterating very far back
|
|
||||||
in time. Refer to the `fetchAccountTransactions()` function in `CassandraBackend`.
|
|
||||||
|
|
||||||
It is unclear if this needs to be done for other tables.
|
|
||||||
|
|
||||||
### Postgres table partitioning
|
|
||||||
Originally, Postgres exhibited performance problems when the dataset approach 1
|
|
||||||
TB. This was solved by table partitioning.
|
|
||||||
|
|
||||||
### Threading
|
|
||||||
I used asio for multithreading. There are a lot of different io_contexts lying
|
|
||||||
around the code. This needs to be cleaned up a bit. Most of these are really
|
|
||||||
just ways to submit an async job to a single thread. I don't think it makes
|
|
||||||
sense to have one io_context for the whole application, but some of the threading
|
|
||||||
is a bit opaque and could be cleaned up.
|
|
||||||
|
|
||||||
### Boost Json
|
|
||||||
I used boost json for serializing data to json.
|
|
||||||
|
|
||||||
### No cache
|
|
||||||
As of now, there is no cache. I am not sure if a cache is even worth it. A
|
|
||||||
transaction cache would not be hard, but a cache for ledger data will be hard.
|
|
||||||
While a cache would improve performance, it would increase memory usage. clio
|
|
||||||
is designed to be lightweight. Also, I've reached thousands of requests per
|
|
||||||
second with a single clio node, so I'm not sure performance is even an issue.
|
|
||||||
|
|
||||||
## Things I'm less than happy about
|
|
||||||
|
|
||||||
#### BackendIndexer
|
|
||||||
This is a particularly hairy piece of code that handles writing to the keys table.
|
|
||||||
I am not too happy with this code. Parts of it need to execute in real time as
|
|
||||||
part of ETL, and other parts are allowed to run in the background. There is also
|
|
||||||
code that detects if a previous background job failed to complete before the
|
|
||||||
server shutdown, and thus tries to rerun that job. The code feels tacked on, and
|
|
||||||
I would like it to be more cleanly integrated with the rest of the code.
|
|
||||||
|
|
||||||
#### Shifting
|
|
||||||
There is some bit shifting going on with the keys table and the account_tx table.
|
|
||||||
The keys table is written to every 2^20 ledgers. Maybe it would be better to just
|
|
||||||
write every 1 million ledgers.
|
|
||||||
|
|
||||||
#### performance of book_offers
|
|
||||||
book_offers is a bit slow. It could be sped up in a variety of ways. One is to
|
|
||||||
keep a separate book_offers table. However, this is not straightforward and will
|
|
||||||
use more space. Another is to keep a cache of book_offers for the most recent ledger
|
|
||||||
(or few ledgers). I am not sure if this is worth it
|
|
||||||
|
|
||||||
#### account_tx in Cassandra
|
|
||||||
After the fix to deal with large rows, account_tx can be slow at times when using
|
|
||||||
Cassandra. Specifically, if there are large gaps in time where the account was
|
|
||||||
not affected by any transactions, the code will be reading empty records. I would
|
|
||||||
like to sidestep this issue if possible.
|
|
||||||
|
|
||||||
#### Implementation of fetchLedgerPage
|
|
||||||
`fetchLedgerPage()` is rather complex. Part of this seems unavoidable, since this
|
|
||||||
code is dealing with the keys table.
|
|
||||||
@@ -4,7 +4,6 @@
|
|||||||
#include <boost/algorithm/string.hpp>
|
#include <boost/algorithm/string.hpp>
|
||||||
#include <backend/BackendInterface.h>
|
#include <backend/BackendInterface.h>
|
||||||
#include <backend/CassandraBackend.h>
|
#include <backend/CassandraBackend.h>
|
||||||
#include <backend/PostgresBackend.h>
|
|
||||||
|
|
||||||
namespace Backend {
|
namespace Backend {
|
||||||
std::shared_ptr<BackendInterface>
|
std::shared_ptr<BackendInterface>
|
||||||
@@ -30,19 +29,6 @@ make_Backend(boost::asio::io_context& ioc, boost::json::object const& config)
|
|||||||
backend = std::make_shared<CassandraBackend>(
|
backend = std::make_shared<CassandraBackend>(
|
||||||
ioc, dbConfig.at(type).as_object());
|
ioc, dbConfig.at(type).as_object());
|
||||||
}
|
}
|
||||||
else if (boost::iequals(type, "postgres"))
|
|
||||||
{
|
|
||||||
if (dbConfig.contains("experimental") &&
|
|
||||||
dbConfig.at("experimental").is_bool() &&
|
|
||||||
dbConfig.at("experimental").as_bool())
|
|
||||||
backend = std::make_shared<PostgresBackend>(
|
|
||||||
ioc, dbConfig.at(type).as_object());
|
|
||||||
else
|
|
||||||
BOOST_LOG_TRIVIAL(fatal)
|
|
||||||
<< "Postgres support is experimental at this time. "
|
|
||||||
<< "If you would really like to use Postgres, add "
|
|
||||||
"\"experimental\":true to your database config";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!backend)
|
if (!backend)
|
||||||
throw std::runtime_error("Invalid database type");
|
throw std::runtime_error("Invalid database type");
|
||||||
|
|||||||
@@ -1,10 +1,16 @@
|
|||||||
#ifndef RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
|
#ifndef RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
|
||||||
#define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
|
#define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
|
||||||
|
|
||||||
|
#include <boost/asio/spawn.hpp>
|
||||||
|
#include <boost/json.hpp>
|
||||||
|
#include <boost/log/trivial.hpp>
|
||||||
|
|
||||||
#include <ripple/ledger/ReadView.h>
|
#include <ripple/ledger/ReadView.h>
|
||||||
#include <boost/asio.hpp>
|
|
||||||
#include <backend/DBHelpers.h>
|
#include <backend/DBHelpers.h>
|
||||||
#include <backend/SimpleCache.h>
|
#include <backend/SimpleCache.h>
|
||||||
#include <backend/Types.h>
|
#include <backend/Types.h>
|
||||||
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
|
|
||||||
|
|||||||
@@ -2,11 +2,14 @@
|
|||||||
#define CLIO_BACKEND_DBHELPERS_H_INCLUDED
|
#define CLIO_BACKEND_DBHELPERS_H_INCLUDED
|
||||||
|
|
||||||
#include <ripple/basics/Log.h>
|
#include <ripple/basics/Log.h>
|
||||||
|
#include <ripple/basics/StringUtilities.h>
|
||||||
|
#include <ripple/ledger/ReadView.h>
|
||||||
#include <ripple/protocol/SField.h>
|
#include <ripple/protocol/SField.h>
|
||||||
#include <ripple/protocol/STAccount.h>
|
#include <ripple/protocol/STAccount.h>
|
||||||
#include <ripple/protocol/TxMeta.h>
|
#include <ripple/protocol/TxMeta.h>
|
||||||
|
|
||||||
#include <boost/container/flat_set.hpp>
|
#include <boost/container/flat_set.hpp>
|
||||||
#include <backend/Pg.h>
|
|
||||||
#include <backend/Types.h>
|
#include <backend/Types.h>
|
||||||
|
|
||||||
/// Struct used to keep track of what to write to
|
/// Struct used to keep track of what to write to
|
||||||
|
|||||||
1788
src/backend/Pg.cpp
1788
src/backend/Pg.cpp
File diff suppressed because it is too large
Load Diff
564
src/backend/Pg.h
564
src/backend/Pg.h
@@ -1,564 +0,0 @@
|
|||||||
#ifndef RIPPLE_CORE_PG_H_INCLUDED
|
|
||||||
#define RIPPLE_CORE_PG_H_INCLUDED
|
|
||||||
|
|
||||||
#include <ripple/basics/StringUtilities.h>
|
|
||||||
#include <ripple/basics/chrono.h>
|
|
||||||
#include <ripple/ledger/ReadView.h>
|
|
||||||
#include <boost/asio/io_context.hpp>
|
|
||||||
#include <boost/asio/ip/tcp.hpp>
|
|
||||||
#include <boost/asio/spawn.hpp>
|
|
||||||
#include <boost/icl/closed_interval.hpp>
|
|
||||||
#include <boost/json.hpp>
|
|
||||||
#include <boost/lexical_cast.hpp>
|
|
||||||
#include <boost/log/trivial.hpp>
|
|
||||||
#include <atomic>
|
|
||||||
#include <chrono>
|
|
||||||
#include <condition_variable>
|
|
||||||
#include <functional>
|
|
||||||
#include <libpq-fe.h>
|
|
||||||
#include <memory>
|
|
||||||
#include <mutex>
|
|
||||||
#include <optional>
|
|
||||||
#include <string>
|
|
||||||
#include <string_view>
|
|
||||||
#include <utility>
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
// These postgres structs must be freed only by the postgres API.
|
|
||||||
using pg_result_type = std::unique_ptr<PGresult, void (*)(PGresult*)>;
|
|
||||||
using pg_connection_type = std::unique_ptr<PGconn, void (*)(PGconn*)>;
|
|
||||||
using asio_socket_type = std::unique_ptr<
|
|
||||||
boost::asio::ip::tcp::socket,
|
|
||||||
void (*)(boost::asio::ip::tcp::socket*)>;
|
|
||||||
|
|
||||||
/** first: command
|
|
||||||
* second: parameter values
|
|
||||||
*
|
|
||||||
* The 2nd member takes an optional string to
|
|
||||||
* distinguish between NULL parameters and empty strings. An empty
|
|
||||||
* item corresponds to a NULL parameter.
|
|
||||||
*
|
|
||||||
* Postgres reads each parameter as a c-string, regardless of actual type.
|
|
||||||
* Binary types (bytea) need to be converted to hex and prepended with
|
|
||||||
* \x ("\\x").
|
|
||||||
*/
|
|
||||||
using pg_params =
|
|
||||||
std::pair<char const*, std::vector<std::optional<std::string>>>;
|
|
||||||
|
|
||||||
/** Parameter values for pg API. */
|
|
||||||
using pg_formatted_params = std::vector<char const*>;
|
|
||||||
|
|
||||||
/** Parameters for managing postgres connections. */
|
|
||||||
struct PgConfig
|
|
||||||
{
|
|
||||||
/** Maximum connections allowed to db. */
|
|
||||||
std::size_t max_connections{1000};
|
|
||||||
/** Close idle connections past this duration. */
|
|
||||||
std::chrono::seconds timeout{600};
|
|
||||||
|
|
||||||
/** Index of DB connection parameter names. */
|
|
||||||
std::vector<char const*> keywordsIdx;
|
|
||||||
/** DB connection parameter names. */
|
|
||||||
std::vector<std::string> keywords;
|
|
||||||
/** Index of DB connection parameter values. */
|
|
||||||
std::vector<char const*> valuesIdx;
|
|
||||||
/** DB connection parameter values. */
|
|
||||||
std::vector<std::string> values;
|
|
||||||
};
|
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
/** Class that operates on postgres query results.
|
|
||||||
*
|
|
||||||
* The functions that return results do not check first whether the
|
|
||||||
* expected results are actually there. Therefore, the caller first needs
|
|
||||||
* to check whether or not a valid response was returned using the operator
|
|
||||||
* bool() overload. If number of tuples or fields are unknown, then check
|
|
||||||
* those. Each result field should be checked for null before attempting
|
|
||||||
* to return results. Finally, the caller must know the type of the field
|
|
||||||
* before calling the corresponding function to return a field. Postgres
|
|
||||||
* internally stores each result field as null-terminated strings.
|
|
||||||
*/
|
|
||||||
class PgResult
|
|
||||||
{
|
|
||||||
// The result object must be freed using the libpq API PQclear() call.
|
|
||||||
pg_result_type result_{nullptr, [](PGresult* result) { PQclear(result); }};
|
|
||||||
std::optional<std::pair<ExecStatusType, std::string>> error_;
|
|
||||||
|
|
||||||
public:
|
|
||||||
/** Constructor for when the process is stopping.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
PgResult()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Constructor for successful query results.
|
|
||||||
*
|
|
||||||
* @param result Query result.
|
|
||||||
*/
|
|
||||||
explicit PgResult(pg_result_type&& result) : result_(std::move(result))
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Constructor for failed query results.
|
|
||||||
*
|
|
||||||
* @param result Query result that contains error information.
|
|
||||||
* @param conn Postgres connection that contains error information.
|
|
||||||
*/
|
|
||||||
PgResult(PGresult* result, PGconn* conn)
|
|
||||||
: error_({PQresultStatus(result), PQerrorMessage(conn)})
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return field as a null-terminated string pointer.
|
|
||||||
*
|
|
||||||
* Note that this function does not guarantee that the result struct
|
|
||||||
* exists, or that the row and fields exist, or that the field is
|
|
||||||
* not null.
|
|
||||||
*
|
|
||||||
* @param ntuple Row number.
|
|
||||||
* @param nfield Field number.
|
|
||||||
* @return Field contents.
|
|
||||||
*/
|
|
||||||
char const*
|
|
||||||
c_str(int ntuple = 0, int nfield = 0) const
|
|
||||||
{
|
|
||||||
return PQgetvalue(result_.get(), ntuple, nfield);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<unsigned char>
|
|
||||||
asUnHexedBlob(int ntuple = 0, int nfield = 0) const
|
|
||||||
{
|
|
||||||
std::string_view view{c_str(ntuple, nfield) + 2};
|
|
||||||
auto res = ripple::strUnHex(view.size(), view.cbegin(), view.cend());
|
|
||||||
if (res)
|
|
||||||
return *res;
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
ripple::uint256
|
|
||||||
asUInt256(int ntuple = 0, int nfield = 0) const
|
|
||||||
{
|
|
||||||
ripple::uint256 val;
|
|
||||||
if (!val.parseHex(c_str(ntuple, nfield) + 2))
|
|
||||||
throw std::runtime_error("Pg - failed to parse hex into uint256");
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return field as equivalent to Postgres' INT type (32 bit signed).
|
|
||||||
*
|
|
||||||
* Note that this function does not guarantee that the result struct
|
|
||||||
* exists, or that the row and fields exist, or that the field is
|
|
||||||
* not null, or that the type is that requested.
|
|
||||||
|
|
||||||
* @param ntuple Row number.
|
|
||||||
* @param nfield Field number.
|
|
||||||
* @return Field contents.
|
|
||||||
*/
|
|
||||||
std::int32_t
|
|
||||||
asInt(int ntuple = 0, int nfield = 0) const
|
|
||||||
{
|
|
||||||
return boost::lexical_cast<std::int32_t>(
|
|
||||||
PQgetvalue(result_.get(), ntuple, nfield));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return field as equivalent to Postgres' BIGINT type (64 bit signed).
|
|
||||||
*
|
|
||||||
* Note that this function does not guarantee that the result struct
|
|
||||||
* exists, or that the row and fields exist, or that the field is
|
|
||||||
* not null, or that the type is that requested.
|
|
||||||
|
|
||||||
* @param ntuple Row number.
|
|
||||||
* @param nfield Field number.
|
|
||||||
* @return Field contents.
|
|
||||||
*/
|
|
||||||
std::int64_t
|
|
||||||
asBigInt(int ntuple = 0, int nfield = 0) const
|
|
||||||
{
|
|
||||||
return boost::lexical_cast<std::int64_t>(
|
|
||||||
PQgetvalue(result_.get(), ntuple, nfield));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns whether the field is NULL or not.
|
|
||||||
*
|
|
||||||
* Note that this function does not guarantee that the result struct
|
|
||||||
* exists, or that the row and fields exist.
|
|
||||||
*
|
|
||||||
* @param ntuple Row number.
|
|
||||||
* @param nfield Field number.
|
|
||||||
* @return Whether field is NULL.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
isNull(int ntuple = 0, int nfield = 0) const
|
|
||||||
{
|
|
||||||
return PQgetisnull(result_.get(), ntuple, nfield);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Check whether a valid response occurred.
|
|
||||||
*
|
|
||||||
* @return Whether or not the query returned a valid response.
|
|
||||||
*/
|
|
||||||
operator bool() const
|
|
||||||
{
|
|
||||||
return result_ != nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Message describing the query results suitable for diagnostics.
|
|
||||||
*
|
|
||||||
* If error, then the postgres error type and message are returned.
|
|
||||||
* Otherwise, "ok"
|
|
||||||
*
|
|
||||||
* @return Query result message.
|
|
||||||
*/
|
|
||||||
std::string
|
|
||||||
msg() const;
|
|
||||||
|
|
||||||
/** Get number of rows in result.
|
|
||||||
*
|
|
||||||
* Note that this function does not guarantee that the result struct
|
|
||||||
* exists.
|
|
||||||
*
|
|
||||||
* @return Number of result rows.
|
|
||||||
*/
|
|
||||||
int
|
|
||||||
ntuples() const
|
|
||||||
{
|
|
||||||
return PQntuples(result_.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get number of fields in result.
|
|
||||||
*
|
|
||||||
* Note that this function does not guarantee that the result struct
|
|
||||||
* exists.
|
|
||||||
*
|
|
||||||
* @return Number of result fields.
|
|
||||||
*/
|
|
||||||
int
|
|
||||||
nfields() const
|
|
||||||
{
|
|
||||||
return PQnfields(result_.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return result status of the command.
|
|
||||||
*
|
|
||||||
* Note that this function does not guarantee that the result struct
|
|
||||||
* exists.
|
|
||||||
*
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
ExecStatusType
|
|
||||||
status() const
|
|
||||||
{
|
|
||||||
return PQresultStatus(result_.get());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/* Class that contains and operates upon a postgres connection. */
|
|
||||||
class Pg
|
|
||||||
{
|
|
||||||
friend class PgPool;
|
|
||||||
friend class PgQuery;
|
|
||||||
|
|
||||||
PgConfig const& config_;
|
|
||||||
boost::asio::io_context::strand strand_;
|
|
||||||
|
|
||||||
asio_socket_type socket_{nullptr, [](boost::asio::ip::tcp::socket*) {}};
|
|
||||||
|
|
||||||
// The connection object must be freed using the libpq API PQfinish() call.
|
|
||||||
pg_connection_type conn_{nullptr, [](PGconn* conn) { PQfinish(conn); }};
|
|
||||||
|
|
||||||
inline asio_socket_type
|
|
||||||
getSocket(boost::asio::yield_context& strand);
|
|
||||||
|
|
||||||
inline PgResult
|
|
||||||
waitForStatus(boost::asio::yield_context& yield, ExecStatusType expected);
|
|
||||||
|
|
||||||
inline void
|
|
||||||
flush(boost::asio::yield_context& yield);
|
|
||||||
|
|
||||||
/** Clear results from the connection.
|
|
||||||
*
|
|
||||||
* Results from previous commands must be cleared before new commands
|
|
||||||
* can be processed. This function should be called on connections
|
|
||||||
* that weren't processed completely before being reused, such as
|
|
||||||
* when being checked-in.
|
|
||||||
*
|
|
||||||
* @return whether or not connection still exists.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
clear();
|
|
||||||
|
|
||||||
/** Connect to postgres.
|
|
||||||
*
|
|
||||||
* Idempotently connects to postgres by first checking whether an
|
|
||||||
* existing connection is already present. If connection is not present
|
|
||||||
* or in an errored state, reconnects to the database.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
connect(boost::asio::yield_context& yield);
|
|
||||||
|
|
||||||
/** Disconnect from postgres. */
|
|
||||||
void
|
|
||||||
disconnect()
|
|
||||||
{
|
|
||||||
conn_.reset();
|
|
||||||
socket_.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Execute postgres query.
|
|
||||||
*
|
|
||||||
* If parameters are included, then the command should contain only a
|
|
||||||
* single SQL statement. If no parameters, then multiple SQL statements
|
|
||||||
* delimited by semi-colons can be processed. The response is from
|
|
||||||
* the last command executed.
|
|
||||||
*
|
|
||||||
* @param command postgres API command string.
|
|
||||||
* @param nParams postgres API number of parameters.
|
|
||||||
* @param values postgres API array of parameter.
|
|
||||||
* @return Query result object.
|
|
||||||
*/
|
|
||||||
PgResult
|
|
||||||
query(
|
|
||||||
char const* command,
|
|
||||||
std::size_t const nParams,
|
|
||||||
char const* const* values,
|
|
||||||
boost::asio::yield_context& yield);
|
|
||||||
|
|
||||||
/** Execute postgres query with no parameters.
|
|
||||||
*
|
|
||||||
* @param command Query string.
|
|
||||||
* @return Query result object;
|
|
||||||
*/
|
|
||||||
PgResult
|
|
||||||
query(char const* command, boost::asio::yield_context& yield)
|
|
||||||
{
|
|
||||||
return query(command, 0, nullptr, yield);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Execute postgres query with parameters.
|
|
||||||
*
|
|
||||||
* @param dbParams Database command and parameter values.
|
|
||||||
* @return Query result object.
|
|
||||||
*/
|
|
||||||
PgResult
|
|
||||||
query(pg_params const& dbParams, boost::asio::yield_context& yield);
|
|
||||||
|
|
||||||
/** Insert multiple records into a table using Postgres' bulk COPY.
|
|
||||||
*
|
|
||||||
* Throws upon error.
|
|
||||||
*
|
|
||||||
* @param table Name of table for import.
|
|
||||||
* @param records Records in the COPY IN format.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
bulkInsert(
|
|
||||||
char const* table,
|
|
||||||
std::string const& records,
|
|
||||||
boost::asio::yield_context& yield);
|
|
||||||
|
|
||||||
public:
|
|
||||||
/** Constructor for Pg class.
|
|
||||||
*
|
|
||||||
* @param config Config parameters.
|
|
||||||
* @param j Logger object.
|
|
||||||
*/
|
|
||||||
Pg(PgConfig const& config, boost::asio::io_context& ctx)
|
|
||||||
: config_(config), strand_(ctx)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
/** Database connection pool.
|
|
||||||
*
|
|
||||||
* Allow re-use of postgres connections. Postgres connections are created
|
|
||||||
* as needed until configurable limit is reached. After use, each connection
|
|
||||||
* is placed in a container ordered by time of use. Each request for
|
|
||||||
* a connection grabs the most recently used connection from the container.
|
|
||||||
* If none are available, a new connection is used (up to configured limit).
|
|
||||||
* Idle connections are destroyed periodically after configurable
|
|
||||||
* timeout duration.
|
|
||||||
*
|
|
||||||
* This should be stored as a shared pointer so PgQuery objects can safely
|
|
||||||
* outlive it.
|
|
||||||
*/
|
|
||||||
class PgPool
|
|
||||||
{
|
|
||||||
friend class PgQuery;
|
|
||||||
|
|
||||||
using clock_type = std::chrono::steady_clock;
|
|
||||||
|
|
||||||
boost::asio::io_context& ioc_;
|
|
||||||
PgConfig config_;
|
|
||||||
std::mutex mutex_;
|
|
||||||
std::condition_variable cond_;
|
|
||||||
std::size_t connections_{};
|
|
||||||
bool stop_{false};
|
|
||||||
|
|
||||||
/** Idle database connections ordered by timestamp to allow timing out. */
|
|
||||||
std::multimap<std::chrono::time_point<clock_type>, std::unique_ptr<Pg>>
|
|
||||||
idle_;
|
|
||||||
|
|
||||||
/** Get a postgres connection object.
|
|
||||||
*
|
|
||||||
* Return the most recent idle connection in the pool, if available.
|
|
||||||
* Otherwise, return a new connection unless we're at the threshold.
|
|
||||||
* If so, then wait until a connection becomes available.
|
|
||||||
*
|
|
||||||
* @return Postgres object.
|
|
||||||
*/
|
|
||||||
std::unique_ptr<Pg>
|
|
||||||
checkout();
|
|
||||||
|
|
||||||
/** Return a postgres object to the pool for reuse.
|
|
||||||
*
|
|
||||||
* If connection is healthy, place in pool for reuse. After calling this,
|
|
||||||
* the container no longer have a connection unless checkout() is called.
|
|
||||||
*
|
|
||||||
* @param pg Pg object.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
checkin(std::unique_ptr<Pg>& pg);
|
|
||||||
|
|
||||||
public:
|
|
||||||
/** Connection pool constructor.
|
|
||||||
*
|
|
||||||
* @param pgConfig Postgres config.
|
|
||||||
* @param j Logger object.
|
|
||||||
* @param parent Stoppable parent.
|
|
||||||
*/
|
|
||||||
PgPool(boost::asio::io_context& ioc, boost::json::object const& config);
|
|
||||||
|
|
||||||
~PgPool()
|
|
||||||
{
|
|
||||||
onStop();
|
|
||||||
}
|
|
||||||
|
|
||||||
PgConfig&
|
|
||||||
config()
|
|
||||||
{
|
|
||||||
return config_;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Initiate idle connection timer.
|
|
||||||
*
|
|
||||||
* The PgPool object needs to be fully constructed to support asynchronous
|
|
||||||
* operations.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
setup();
|
|
||||||
|
|
||||||
/** Prepare for process shutdown. (Stoppable) */
|
|
||||||
void
|
|
||||||
onStop();
|
|
||||||
};
|
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
/** Class to query postgres.
|
|
||||||
*
|
|
||||||
* This class should be used by functions outside of this
|
|
||||||
* compilation unit for querying postgres. It automatically acquires and
|
|
||||||
* relinquishes a database connection to handle each query.
|
|
||||||
*/
|
|
||||||
class PgQuery
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
std::shared_ptr<PgPool> pool_;
|
|
||||||
std::unique_ptr<Pg> pg_;
|
|
||||||
|
|
||||||
public:
|
|
||||||
PgQuery() = delete;
|
|
||||||
|
|
||||||
PgQuery(std::shared_ptr<PgPool> const& pool)
|
|
||||||
: pool_(pool), pg_(pool->checkout())
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
~PgQuery()
|
|
||||||
{
|
|
||||||
pool_->checkin(pg_);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO. add sendQuery and getResult, for sending the query and getting the
|
|
||||||
// result asynchronously. This could be useful for sending a bunch of
|
|
||||||
// requests concurrently
|
|
||||||
|
|
||||||
/** Execute postgres query with parameters.
|
|
||||||
*
|
|
||||||
* @param dbParams Database command with parameters.
|
|
||||||
* @return Result of query, including errors.
|
|
||||||
*/
|
|
||||||
PgResult
|
|
||||||
operator()(pg_params const& dbParams, boost::asio::yield_context& yield)
|
|
||||||
{
|
|
||||||
if (!pg_) // It means we're stopping. Return empty result.
|
|
||||||
return PgResult();
|
|
||||||
return pg_->query(dbParams, yield);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Execute postgres query with only command statement.
|
|
||||||
*
|
|
||||||
* @param command Command statement.
|
|
||||||
* @return Result of query, including errors.
|
|
||||||
*/
|
|
||||||
PgResult
|
|
||||||
operator()(char const* command, boost::asio::yield_context& yield)
|
|
||||||
{
|
|
||||||
return operator()(pg_params{command, {}}, yield);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Insert multiple records into a table using Postgres' bulk COPY.
|
|
||||||
*
|
|
||||||
* Throws upon error.
|
|
||||||
*
|
|
||||||
* @param table Name of table for import.
|
|
||||||
* @param records Records in the COPY IN format.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
bulkInsert(
|
|
||||||
char const* table,
|
|
||||||
std::string const& records,
|
|
||||||
boost::asio::yield_context& yield)
|
|
||||||
{
|
|
||||||
pg_->bulkInsert(table, records, yield);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
/** Create Postgres connection pool manager.
|
|
||||||
*
|
|
||||||
* @param pgConfig Configuration for Postgres.
|
|
||||||
* @param j Logger object.
|
|
||||||
* @param parent Stoppable parent object.
|
|
||||||
* @return Postgres connection pool manager
|
|
||||||
*/
|
|
||||||
std::shared_ptr<PgPool>
|
|
||||||
make_PgPool(boost::asio::io_context& ioc, boost::json::object const& pgConfig);
|
|
||||||
|
|
||||||
/** Initialize the Postgres schema.
|
|
||||||
*
|
|
||||||
* This function ensures that the database is running the latest version
|
|
||||||
* of the schema.
|
|
||||||
*
|
|
||||||
* @param pool Postgres connection pool manager.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
initSchema(std::shared_ptr<PgPool> const& pool);
|
|
||||||
void
|
|
||||||
initAccountTx(std::shared_ptr<PgPool> const& pool);
|
|
||||||
|
|
||||||
// Load the ledger info for the specified ledger/s from the database
|
|
||||||
// @param whichLedger specifies the ledger to load via ledger sequence, ledger
|
|
||||||
// hash or std::monostate (which loads the most recent)
|
|
||||||
// @return vector of LedgerInfos
|
|
||||||
std::optional<ripple::LedgerInfo>
|
|
||||||
getLedger(
|
|
||||||
std::variant<std::monostate, ripple::uint256, std::uint32_t> const&
|
|
||||||
whichLedger,
|
|
||||||
std::shared_ptr<PgPool>& pgPool);
|
|
||||||
|
|
||||||
#endif // RIPPLE_CORE_PG_H_INCLUDED
|
|
||||||
@@ -1,895 +0,0 @@
|
|||||||
#include <boost/asio.hpp>
|
|
||||||
#include <boost/format.hpp>
|
|
||||||
#include <backend/PostgresBackend.h>
|
|
||||||
#include <thread>
|
|
||||||
|
|
||||||
namespace Backend {
|
|
||||||
|
|
||||||
// Type alias for async completion handlers
|
|
||||||
using completion_token = boost::asio::yield_context;
|
|
||||||
using function_type = void(boost::system::error_code);
|
|
||||||
using result_type = boost::asio::async_result<completion_token, function_type>;
|
|
||||||
using handler_type = typename result_type::completion_handler_type;
|
|
||||||
|
|
||||||
struct HandlerWrapper
|
|
||||||
{
|
|
||||||
handler_type handler;
|
|
||||||
|
|
||||||
HandlerWrapper(handler_type&& handler_) : handler(std::move(handler_))
|
|
||||||
{
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
PostgresBackend::PostgresBackend(
|
|
||||||
boost::asio::io_context& ioc,
|
|
||||||
boost::json::object const& config)
|
|
||||||
: BackendInterface(config)
|
|
||||||
, pgPool_(make_PgPool(ioc, config))
|
|
||||||
, writeConnection_(pgPool_)
|
|
||||||
{
|
|
||||||
if (config.contains("write_interval"))
|
|
||||||
{
|
|
||||||
writeInterval_ = config.at("write_interval").as_int64();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
void
|
|
||||||
PostgresBackend::writeLedger(
|
|
||||||
ripple::LedgerInfo const& ledgerInfo,
|
|
||||||
std::string&& ledgerHeader)
|
|
||||||
{
|
|
||||||
synchronous([&](boost::asio::yield_context yield) {
|
|
||||||
auto cmd = boost::format(
|
|
||||||
R"(INSERT INTO ledgers
|
|
||||||
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
|
|
||||||
|
|
||||||
auto ledgerInsert = boost::str(
|
|
||||||
cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) %
|
|
||||||
ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() %
|
|
||||||
ledgerInfo.closeTime.time_since_epoch().count() %
|
|
||||||
ledgerInfo.parentCloseTime.time_since_epoch().count() %
|
|
||||||
ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags %
|
|
||||||
ripple::strHex(ledgerInfo.accountHash) %
|
|
||||||
ripple::strHex(ledgerInfo.txHash));
|
|
||||||
|
|
||||||
auto res = writeConnection_(ledgerInsert.data(), yield);
|
|
||||||
abortWrite_ = !res;
|
|
||||||
inProcessLedger = ledgerInfo.seq;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::writeAccountTransactions(
|
|
||||||
std::vector<AccountTransactionsData>&& data)
|
|
||||||
{
|
|
||||||
if (abortWrite_)
|
|
||||||
return;
|
|
||||||
PgQuery pg(pgPool_);
|
|
||||||
for (auto const& record : data)
|
|
||||||
{
|
|
||||||
for (auto const& a : record.accounts)
|
|
||||||
{
|
|
||||||
std::string acct = ripple::strHex(a);
|
|
||||||
accountTxBuffer_ << "\\\\x" << acct << '\t'
|
|
||||||
<< std::to_string(record.ledgerSequence) << '\t'
|
|
||||||
<< std::to_string(record.transactionIndex) << '\t'
|
|
||||||
<< "\\\\x" << ripple::strHex(record.txHash)
|
|
||||||
<< '\n';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::writeNFTTransactions(std::vector<NFTTransactionsData>&& data)
|
|
||||||
{
|
|
||||||
throw std::runtime_error("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::doWriteLedgerObject(
|
|
||||||
std::string&& key,
|
|
||||||
std::uint32_t const seq,
|
|
||||||
std::string&& blob)
|
|
||||||
{
|
|
||||||
synchronous([&](boost::asio::yield_context yield) {
|
|
||||||
if (abortWrite_)
|
|
||||||
return;
|
|
||||||
objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
|
|
||||||
<< std::to_string(seq) << '\t' << "\\\\x"
|
|
||||||
<< ripple::strHex(blob) << '\n';
|
|
||||||
numRowsInObjectsBuffer_++;
|
|
||||||
// If the buffer gets too large, the insert fails. Not sure why. So we
|
|
||||||
// insert after 1 million records
|
|
||||||
if (numRowsInObjectsBuffer_ % writeInterval_ == 0)
|
|
||||||
{
|
|
||||||
BOOST_LOG_TRIVIAL(info)
|
|
||||||
<< __func__ << " Flushing large buffer. num objects = "
|
|
||||||
<< numRowsInObjectsBuffer_;
|
|
||||||
writeConnection_.bulkInsert("objects", objectsBuffer_.str(), yield);
|
|
||||||
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
|
|
||||||
objectsBuffer_.str("");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::writeSuccessor(
|
|
||||||
std::string&& key,
|
|
||||||
std::uint32_t const seq,
|
|
||||||
std::string&& successor)
|
|
||||||
{
|
|
||||||
synchronous([&](boost::asio::yield_context yield) {
|
|
||||||
if (range)
|
|
||||||
{
|
|
||||||
if (successors_.count(key) > 0)
|
|
||||||
return;
|
|
||||||
successors_.insert(key);
|
|
||||||
}
|
|
||||||
successorBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
|
|
||||||
<< std::to_string(seq) << '\t' << "\\\\x"
|
|
||||||
<< ripple::strHex(successor) << '\n';
|
|
||||||
BOOST_LOG_TRIVIAL(trace)
|
|
||||||
<< __func__ << ripple::strHex(key) << " - " << std::to_string(seq);
|
|
||||||
numRowsInSuccessorBuffer_++;
|
|
||||||
if (numRowsInSuccessorBuffer_ % writeInterval_ == 0)
|
|
||||||
{
|
|
||||||
BOOST_LOG_TRIVIAL(info)
|
|
||||||
<< __func__ << " Flushing large buffer. num successors = "
|
|
||||||
<< numRowsInSuccessorBuffer_;
|
|
||||||
writeConnection_.bulkInsert(
|
|
||||||
"successor", successorBuffer_.str(), yield);
|
|
||||||
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
|
|
||||||
successorBuffer_.str("");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::writeTransaction(
|
|
||||||
std::string&& hash,
|
|
||||||
std::uint32_t const seq,
|
|
||||||
std::uint32_t const date,
|
|
||||||
std::string&& transaction,
|
|
||||||
std::string&& metadata)
|
|
||||||
{
|
|
||||||
if (abortWrite_)
|
|
||||||
return;
|
|
||||||
transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t'
|
|
||||||
<< std::to_string(seq) << '\t' << std::to_string(date)
|
|
||||||
<< '\t' << "\\\\x" << ripple::strHex(transaction)
|
|
||||||
<< '\t' << "\\\\x" << ripple::strHex(metadata) << '\n';
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::writeNFTs(std::vector<NFTsData>&& data)
|
|
||||||
{
|
|
||||||
throw std::runtime_error("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
std::uint32_t
|
|
||||||
checkResult(PgResult const& res, std::uint32_t const numFieldsExpected)
|
|
||||||
{
|
|
||||||
if (!res)
|
|
||||||
{
|
|
||||||
auto msg = res.msg();
|
|
||||||
BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg;
|
|
||||||
if (msg.find("statement timeout"))
|
|
||||||
throw DatabaseTimeout();
|
|
||||||
assert(false);
|
|
||||||
throw DatabaseTimeout();
|
|
||||||
}
|
|
||||||
if (res.status() != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
std::stringstream msg;
|
|
||||||
msg << " : Postgres response should have been "
|
|
||||||
"PGRES_TUPLES_OK but instead was "
|
|
||||||
<< res.status() << " - msg = " << res.msg();
|
|
||||||
BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg.str();
|
|
||||||
assert(false);
|
|
||||||
throw DatabaseTimeout();
|
|
||||||
}
|
|
||||||
|
|
||||||
BOOST_LOG_TRIVIAL(trace)
|
|
||||||
<< __func__ << " Postgres result msg : " << res.msg();
|
|
||||||
if (res.isNull() || res.ntuples() == 0)
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
else if (res.ntuples() > 0)
|
|
||||||
{
|
|
||||||
if (res.nfields() != numFieldsExpected)
|
|
||||||
{
|
|
||||||
std::stringstream msg;
|
|
||||||
msg << "Wrong number of fields in Postgres "
|
|
||||||
"response. Expected "
|
|
||||||
<< numFieldsExpected << ", but got " << res.nfields();
|
|
||||||
throw std::runtime_error(msg.str());
|
|
||||||
assert(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return res.ntuples();
|
|
||||||
}
|
|
||||||
|
|
||||||
ripple::LedgerInfo
|
|
||||||
parseLedgerInfo(PgResult const& res)
|
|
||||||
{
|
|
||||||
std::int64_t ledgerSeq = res.asBigInt(0, 0);
|
|
||||||
ripple::uint256 hash = res.asUInt256(0, 1);
|
|
||||||
ripple::uint256 prevHash = res.asUInt256(0, 2);
|
|
||||||
std::int64_t totalCoins = res.asBigInt(0, 3);
|
|
||||||
std::int64_t closeTime = res.asBigInt(0, 4);
|
|
||||||
std::int64_t parentCloseTime = res.asBigInt(0, 5);
|
|
||||||
std::int64_t closeTimeRes = res.asBigInt(0, 6);
|
|
||||||
std::int64_t closeFlags = res.asBigInt(0, 7);
|
|
||||||
ripple::uint256 accountHash = res.asUInt256(0, 8);
|
|
||||||
ripple::uint256 txHash = res.asUInt256(0, 9);
|
|
||||||
|
|
||||||
using time_point = ripple::NetClock::time_point;
|
|
||||||
using duration = ripple::NetClock::duration;
|
|
||||||
|
|
||||||
ripple::LedgerInfo info;
|
|
||||||
info.seq = ledgerSeq;
|
|
||||||
info.hash = hash;
|
|
||||||
info.parentHash = prevHash;
|
|
||||||
info.drops = totalCoins;
|
|
||||||
info.closeTime = time_point{duration{closeTime}};
|
|
||||||
info.parentCloseTime = time_point{duration{parentCloseTime}};
|
|
||||||
info.closeFlags = closeFlags;
|
|
||||||
info.closeTimeResolution = duration{closeTimeRes};
|
|
||||||
info.accountHash = accountHash;
|
|
||||||
info.txHash = txHash;
|
|
||||||
info.validated = true;
|
|
||||||
return info;
|
|
||||||
}
|
|
||||||
std::optional<std::uint32_t>
|
|
||||||
PostgresBackend::fetchLatestLedgerSequence(
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
auto const query =
|
|
||||||
"SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1";
|
|
||||||
|
|
||||||
if (auto res = pgQuery(query, yield); checkResult(res, 1))
|
|
||||||
return res.asBigInt(0, 0);
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<ripple::LedgerInfo>
|
|
||||||
PostgresBackend::fetchLedgerBySequence(
|
|
||||||
std::uint32_t const sequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT * FROM ledgers WHERE ledger_seq = "
|
|
||||||
<< std::to_string(sequence);
|
|
||||||
|
|
||||||
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 10))
|
|
||||||
return parseLedgerInfo(res);
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<ripple::LedgerInfo>
|
|
||||||
PostgresBackend::fetchLedgerByHash(
|
|
||||||
ripple::uint256 const& hash,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT * FROM ledgers WHERE ledger_hash = \'\\x"
|
|
||||||
<< ripple::to_string(hash) << "\'";
|
|
||||||
|
|
||||||
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 10))
|
|
||||||
return parseLedgerInfo(res);
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<LedgerRange>
|
|
||||||
PostgresBackend::hardFetchLedgerRange(boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
auto range = PgQuery(pgPool_)("SELECT complete_ledgers()", yield);
|
|
||||||
if (!range)
|
|
||||||
return {};
|
|
||||||
|
|
||||||
std::string res{range.c_str()};
|
|
||||||
BOOST_LOG_TRIVIAL(debug) << "range is = " << res;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
size_t minVal = 0;
|
|
||||||
size_t maxVal = 0;
|
|
||||||
if (res == "empty" || res == "error" || res.empty())
|
|
||||||
return {};
|
|
||||||
else if (size_t delim = res.find('-'); delim != std::string::npos)
|
|
||||||
{
|
|
||||||
minVal = std::stol(res.substr(0, delim));
|
|
||||||
maxVal = std::stol(res.substr(delim + 1));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
minVal = maxVal = std::stol(res);
|
|
||||||
}
|
|
||||||
return LedgerRange{minVal, maxVal};
|
|
||||||
}
|
|
||||||
catch (std::exception&)
|
|
||||||
{
|
|
||||||
BOOST_LOG_TRIVIAL(error)
|
|
||||||
<< __func__ << " : "
|
|
||||||
<< "Error parsing result of getCompleteLedgers()";
|
|
||||||
}
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<Blob>
|
|
||||||
PostgresBackend::doFetchLedgerObject(
|
|
||||||
ripple::uint256 const& key,
|
|
||||||
std::uint32_t const sequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT object FROM objects WHERE key = "
|
|
||||||
<< "\'\\x" << ripple::strHex(key) << "\'"
|
|
||||||
<< " AND ledger_seq <= " << std::to_string(sequence)
|
|
||||||
<< " ORDER BY ledger_seq DESC LIMIT 1";
|
|
||||||
|
|
||||||
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 1))
|
|
||||||
{
|
|
||||||
auto blob = res.asUnHexedBlob(0, 0);
|
|
||||||
if (blob.size())
|
|
||||||
return blob;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns a transaction, metadata pair
|
|
||||||
std::optional<TransactionAndMetadata>
|
|
||||||
PostgresBackend::fetchTransaction(
|
|
||||||
ripple::uint256 const& hash,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT transaction,metadata,ledger_seq,date FROM transactions "
|
|
||||||
"WHERE hash = "
|
|
||||||
<< "\'\\x" << ripple::strHex(hash) << "\'";
|
|
||||||
|
|
||||||
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 4))
|
|
||||||
{
|
|
||||||
return {
|
|
||||||
{res.asUnHexedBlob(0, 0),
|
|
||||||
res.asUnHexedBlob(0, 1),
|
|
||||||
res.asBigInt(0, 2),
|
|
||||||
res.asBigInt(0, 3)}};
|
|
||||||
}
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
std::vector<TransactionAndMetadata>
|
|
||||||
PostgresBackend::fetchAllTransactionsInLedger(
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT transaction, metadata, ledger_seq,date FROM transactions "
|
|
||||||
"WHERE "
|
|
||||||
<< "ledger_seq = " << std::to_string(ledgerSequence);
|
|
||||||
|
|
||||||
auto res = pgQuery(sql.str().data(), yield);
|
|
||||||
if (size_t numRows = checkResult(res, 4))
|
|
||||||
{
|
|
||||||
std::vector<TransactionAndMetadata> txns;
|
|
||||||
for (size_t i = 0; i < numRows; ++i)
|
|
||||||
{
|
|
||||||
txns.push_back(
|
|
||||||
{res.asUnHexedBlob(i, 0),
|
|
||||||
res.asUnHexedBlob(i, 1),
|
|
||||||
res.asBigInt(i, 2),
|
|
||||||
res.asBigInt(i, 3)});
|
|
||||||
}
|
|
||||||
return txns;
|
|
||||||
}
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
std::vector<ripple::uint256>
|
|
||||||
PostgresBackend::fetchAllTransactionHashesInLedger(
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT hash FROM transactions WHERE "
|
|
||||||
<< "ledger_seq = " << std::to_string(ledgerSequence);
|
|
||||||
|
|
||||||
auto res = pgQuery(sql.str().data(), yield);
|
|
||||||
if (size_t numRows = checkResult(res, 1))
|
|
||||||
{
|
|
||||||
std::vector<ripple::uint256> hashes;
|
|
||||||
for (size_t i = 0; i < numRows; ++i)
|
|
||||||
{
|
|
||||||
hashes.push_back(res.asUInt256(i, 0));
|
|
||||||
}
|
|
||||||
return hashes;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<NFT>
|
|
||||||
PostgresBackend::fetchNFT(
|
|
||||||
ripple::uint256 const& tokenID,
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
throw std::runtime_error("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<ripple::uint256>
|
|
||||||
PostgresBackend::doFetchSuccessorKey(
|
|
||||||
ripple::uint256 key,
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT next FROM successor WHERE key = "
|
|
||||||
<< "\'\\x" << ripple::strHex(key) << "\'"
|
|
||||||
<< " AND ledger_seq <= " << std::to_string(ledgerSequence)
|
|
||||||
<< " ORDER BY ledger_seq DESC LIMIT 1";
|
|
||||||
|
|
||||||
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 1))
|
|
||||||
{
|
|
||||||
auto next = res.asUInt256(0, 0);
|
|
||||||
if (next == lastKey)
|
|
||||||
return {};
|
|
||||||
return next;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<TransactionAndMetadata>
|
|
||||||
PostgresBackend::fetchTransactions(
|
|
||||||
std::vector<ripple::uint256> const& hashes,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
if (!hashes.size())
|
|
||||||
return {};
|
|
||||||
|
|
||||||
std::vector<TransactionAndMetadata> results;
|
|
||||||
results.resize(hashes.size());
|
|
||||||
|
|
||||||
handler_type handler(std::forward<decltype(yield)>(yield));
|
|
||||||
result_type result(handler);
|
|
||||||
|
|
||||||
auto hw = new HandlerWrapper(std::move(handler));
|
|
||||||
|
|
||||||
auto start = std::chrono::system_clock::now();
|
|
||||||
|
|
||||||
std::atomic_uint numRemaining = hashes.size();
|
|
||||||
std::atomic_bool errored = false;
|
|
||||||
|
|
||||||
for (size_t i = 0; i < hashes.size(); ++i)
|
|
||||||
{
|
|
||||||
auto const& hash = hashes[i];
|
|
||||||
boost::asio::spawn(
|
|
||||||
get_associated_executor(yield),
|
|
||||||
[this, &hash, &results, hw, &numRemaining, &errored, i](
|
|
||||||
boost::asio::yield_context yield) {
|
|
||||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " getting txn = " << i;
|
|
||||||
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT transaction,metadata,ledger_seq,date FROM "
|
|
||||||
"transactions "
|
|
||||||
"WHERE HASH = \'\\x"
|
|
||||||
<< ripple::strHex(hash) << "\'";
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (auto const res = pgQuery(sql.str().data(), yield);
|
|
||||||
checkResult(res, 4))
|
|
||||||
{
|
|
||||||
results[i] = {
|
|
||||||
res.asUnHexedBlob(0, 0),
|
|
||||||
res.asUnHexedBlob(0, 1),
|
|
||||||
res.asBigInt(0, 2),
|
|
||||||
res.asBigInt(0, 3)};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (DatabaseTimeout const&)
|
|
||||||
{
|
|
||||||
errored = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (--numRemaining == 0)
|
|
||||||
{
|
|
||||||
handler_type h(std::move(hw->handler));
|
|
||||||
h(boost::system::error_code{});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Yields the worker to the io_context until handler is called.
|
|
||||||
result.get();
|
|
||||||
|
|
||||||
delete hw;
|
|
||||||
|
|
||||||
auto end = std::chrono::system_clock::now();
|
|
||||||
auto duration =
|
|
||||||
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
|
|
||||||
|
|
||||||
BOOST_LOG_TRIVIAL(info)
|
|
||||||
<< __func__ << " fetched " << std::to_string(hashes.size())
|
|
||||||
<< " transactions asynchronously. took "
|
|
||||||
<< std::to_string(duration.count());
|
|
||||||
if (errored)
|
|
||||||
{
|
|
||||||
BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out";
|
|
||||||
throw DatabaseTimeout();
|
|
||||||
}
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<Blob>
|
|
||||||
PostgresBackend::doFetchLedgerObjects(
|
|
||||||
std::vector<ripple::uint256> const& keys,
|
|
||||||
std::uint32_t const sequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
if (!keys.size())
|
|
||||||
return {};
|
|
||||||
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::vector<Blob> results;
|
|
||||||
results.resize(keys.size());
|
|
||||||
|
|
||||||
handler_type handler(std::forward<decltype(yield)>(yield));
|
|
||||||
result_type result(handler);
|
|
||||||
|
|
||||||
auto hw = new HandlerWrapper(std::move(handler));
|
|
||||||
|
|
||||||
std::atomic_uint numRemaining = keys.size();
|
|
||||||
std::atomic_bool errored = false;
|
|
||||||
auto start = std::chrono::system_clock::now();
|
|
||||||
for (size_t i = 0; i < keys.size(); ++i)
|
|
||||||
{
|
|
||||||
auto const& key = keys[i];
|
|
||||||
boost::asio::spawn(
|
|
||||||
boost::asio::get_associated_executor(yield),
|
|
||||||
[this, &key, &results, &numRemaining, &errored, hw, i, sequence](
|
|
||||||
boost::asio::yield_context yield) {
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT object FROM "
|
|
||||||
"objects "
|
|
||||||
"WHERE key = \'\\x"
|
|
||||||
<< ripple::strHex(key) << "\'"
|
|
||||||
<< " AND ledger_seq <= " << std::to_string(sequence)
|
|
||||||
<< " ORDER BY ledger_seq DESC LIMIT 1";
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (auto const res = pgQuery(sql.str().data(), yield);
|
|
||||||
checkResult(res, 1))
|
|
||||||
results[i] = res.asUnHexedBlob();
|
|
||||||
}
|
|
||||||
catch (DatabaseTimeout const& ex)
|
|
||||||
{
|
|
||||||
errored = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (--numRemaining == 0)
|
|
||||||
{
|
|
||||||
handler_type h(std::move(hw->handler));
|
|
||||||
h(boost::system::error_code{});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Yields the worker to the io_context until handler is called.
|
|
||||||
result.get();
|
|
||||||
|
|
||||||
delete hw;
|
|
||||||
|
|
||||||
auto end = std::chrono::system_clock::now();
|
|
||||||
auto duration =
|
|
||||||
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
|
|
||||||
|
|
||||||
BOOST_LOG_TRIVIAL(info)
|
|
||||||
<< __func__ << " fetched " << std::to_string(keys.size())
|
|
||||||
<< " objects asynchronously. ms = " << std::to_string(duration.count());
|
|
||||||
if (errored)
|
|
||||||
{
|
|
||||||
BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out";
|
|
||||||
throw DatabaseTimeout();
|
|
||||||
}
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<LedgerObject>
|
|
||||||
PostgresBackend::fetchLedgerDiff(
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "SELECT key,object FROM objects "
|
|
||||||
"WHERE "
|
|
||||||
<< "ledger_seq = " << std::to_string(ledgerSequence);
|
|
||||||
|
|
||||||
auto res = pgQuery(sql.str().data(), yield);
|
|
||||||
if (size_t numRows = checkResult(res, 2))
|
|
||||||
{
|
|
||||||
std::vector<LedgerObject> objects;
|
|
||||||
for (size_t i = 0; i < numRows; ++i)
|
|
||||||
{
|
|
||||||
objects.push_back({res.asUInt256(i, 0), res.asUnHexedBlob(i, 1)});
|
|
||||||
}
|
|
||||||
return objects;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO this implementation and fetchAccountTransactions should be
|
|
||||||
// generalized
|
|
||||||
TransactionsAndCursor
|
|
||||||
PostgresBackend::fetchNFTTransactions(
|
|
||||||
ripple::uint256 const& tokenID,
|
|
||||||
std::uint32_t const limit,
|
|
||||||
bool forward,
|
|
||||||
std::optional<TransactionsCursor> const& cursor,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
throw std::runtime_error("Not implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
TransactionsAndCursor
|
|
||||||
PostgresBackend::fetchAccountTransactions(
|
|
||||||
ripple::AccountID const& account,
|
|
||||||
std::uint32_t const limit,
|
|
||||||
bool forward,
|
|
||||||
std::optional<TransactionsCursor> const& cursor,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery(set_timeout, yield);
|
|
||||||
pg_params dbParams;
|
|
||||||
|
|
||||||
char const*& command = dbParams.first;
|
|
||||||
std::vector<std::optional<std::string>>& values = dbParams.second;
|
|
||||||
command =
|
|
||||||
"SELECT account_tx($1::bytea, $2::bigint, $3::bool, "
|
|
||||||
"$4::bigint, $5::bigint)";
|
|
||||||
values.resize(5);
|
|
||||||
values[0] = "\\x" + strHex(account);
|
|
||||||
|
|
||||||
values[1] = std::to_string(limit);
|
|
||||||
|
|
||||||
values[2] = std::to_string(forward);
|
|
||||||
|
|
||||||
if (cursor)
|
|
||||||
{
|
|
||||||
values[3] = std::to_string(cursor->ledgerSequence);
|
|
||||||
values[4] = std::to_string(cursor->transactionIndex);
|
|
||||||
}
|
|
||||||
for (size_t i = 0; i < values.size(); ++i)
|
|
||||||
{
|
|
||||||
BOOST_LOG_TRIVIAL(debug) << "value " << std::to_string(i) << " = "
|
|
||||||
<< (values[i] ? values[i].value() : "null");
|
|
||||||
}
|
|
||||||
|
|
||||||
auto start = std::chrono::system_clock::now();
|
|
||||||
auto res = pgQuery(dbParams, yield);
|
|
||||||
auto end = std::chrono::system_clock::now();
|
|
||||||
|
|
||||||
auto duration = ((end - start).count()) / 1000000000.0;
|
|
||||||
BOOST_LOG_TRIVIAL(info)
|
|
||||||
<< __func__ << " : executed stored_procedure in "
|
|
||||||
<< std::to_string(duration)
|
|
||||||
<< " num records = " << std::to_string(checkResult(res, 1));
|
|
||||||
|
|
||||||
checkResult(res, 1);
|
|
||||||
|
|
||||||
char const* resultStr = res.c_str();
|
|
||||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
|
|
||||||
<< "postgres result = " << resultStr
|
|
||||||
<< " : account = " << strHex(account);
|
|
||||||
|
|
||||||
boost::json::value raw = boost::json::parse(resultStr);
|
|
||||||
boost::json::object responseObj = raw.as_object();
|
|
||||||
BOOST_LOG_TRIVIAL(debug) << " parsed = " << responseObj;
|
|
||||||
if (responseObj.contains("transactions"))
|
|
||||||
{
|
|
||||||
auto txns = responseObj.at("transactions").as_array();
|
|
||||||
std::vector<ripple::uint256> hashes;
|
|
||||||
for (auto& hashHex : txns)
|
|
||||||
{
|
|
||||||
ripple::uint256 hash;
|
|
||||||
if (hash.parseHex(hashHex.at("hash").as_string().c_str() + 2))
|
|
||||||
hashes.push_back(hash);
|
|
||||||
}
|
|
||||||
if (responseObj.contains("cursor"))
|
|
||||||
{
|
|
||||||
return {
|
|
||||||
fetchTransactions(hashes, yield),
|
|
||||||
{{responseObj.at("cursor").at("ledger_sequence").as_int64(),
|
|
||||||
responseObj.at("cursor")
|
|
||||||
.at("transaction_index")
|
|
||||||
.as_int64()}}};
|
|
||||||
}
|
|
||||||
return {fetchTransactions(hashes, yield), {}};
|
|
||||||
}
|
|
||||||
return {{}, {}};
|
|
||||||
} // namespace Backend
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::open(bool readOnly)
|
|
||||||
{
|
|
||||||
initSchema(pgPool_);
|
|
||||||
initAccountTx(pgPool_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::close()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
PostgresBackend::startWrites() const
|
|
||||||
{
|
|
||||||
synchronous([&](boost::asio::yield_context yield) {
|
|
||||||
numRowsInObjectsBuffer_ = 0;
|
|
||||||
abortWrite_ = false;
|
|
||||||
auto res = writeConnection_("BEGIN", yield);
|
|
||||||
if (!res || res.status() != PGRES_COMMAND_OK)
|
|
||||||
{
|
|
||||||
std::stringstream msg;
|
|
||||||
msg << "Postgres error creating transaction: " << res.msg();
|
|
||||||
throw std::runtime_error(msg.str());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
|
||||||
PostgresBackend::doFinishWrites()
|
|
||||||
{
|
|
||||||
synchronous([&](boost::asio::yield_context yield) {
|
|
||||||
if (!abortWrite_)
|
|
||||||
{
|
|
||||||
std::string txStr = transactionsBuffer_.str();
|
|
||||||
writeConnection_.bulkInsert("transactions", txStr, yield);
|
|
||||||
writeConnection_.bulkInsert(
|
|
||||||
"account_transactions", accountTxBuffer_.str(), yield);
|
|
||||||
std::string objectsStr = objectsBuffer_.str();
|
|
||||||
if (objectsStr.size())
|
|
||||||
writeConnection_.bulkInsert("objects", objectsStr, yield);
|
|
||||||
BOOST_LOG_TRIVIAL(debug)
|
|
||||||
<< __func__ << " objects size = " << objectsStr.size()
|
|
||||||
<< " txns size = " << txStr.size();
|
|
||||||
std::string successorStr = successorBuffer_.str();
|
|
||||||
if (successorStr.size())
|
|
||||||
writeConnection_.bulkInsert("successor", successorStr, yield);
|
|
||||||
if (!range)
|
|
||||||
{
|
|
||||||
std::stringstream indexCreate;
|
|
||||||
indexCreate
|
|
||||||
<< "CREATE INDEX diff ON objects USING hash(ledger_seq) "
|
|
||||||
"WHERE NOT "
|
|
||||||
"ledger_seq = "
|
|
||||||
<< std::to_string(inProcessLedger);
|
|
||||||
writeConnection_(indexCreate.str().data(), yield);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
auto res = writeConnection_("COMMIT", yield);
|
|
||||||
if (!res || res.status() != PGRES_COMMAND_OK)
|
|
||||||
{
|
|
||||||
std::stringstream msg;
|
|
||||||
msg << "Postgres error committing transaction: " << res.msg();
|
|
||||||
throw std::runtime_error(msg.str());
|
|
||||||
}
|
|
||||||
transactionsBuffer_.str("");
|
|
||||||
transactionsBuffer_.clear();
|
|
||||||
objectsBuffer_.str("");
|
|
||||||
objectsBuffer_.clear();
|
|
||||||
successorBuffer_.str("");
|
|
||||||
successorBuffer_.clear();
|
|
||||||
successors_.clear();
|
|
||||||
accountTxBuffer_.str("");
|
|
||||||
accountTxBuffer_.clear();
|
|
||||||
numRowsInObjectsBuffer_ = 0;
|
|
||||||
});
|
|
||||||
|
|
||||||
return !abortWrite_;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
|
||||||
PostgresBackend::doOnlineDelete(
|
|
||||||
std::uint32_t const numLedgersToKeep,
|
|
||||||
boost::asio::yield_context& yield) const
|
|
||||||
{
|
|
||||||
auto rng = fetchLedgerRange();
|
|
||||||
if (!rng)
|
|
||||||
return false;
|
|
||||||
std::uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
|
|
||||||
if (minLedger <= rng->minSequence)
|
|
||||||
return false;
|
|
||||||
PgQuery pgQuery(pgPool_);
|
|
||||||
pgQuery("SET statement_timeout TO 0", yield);
|
|
||||||
std::optional<ripple::uint256> cursor;
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
auto [objects, curCursor] = retryOnTimeout([&]() {
|
|
||||||
return fetchLedgerPage(cursor, minLedger, 256, false, yield);
|
|
||||||
});
|
|
||||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
|
|
||||||
std::stringstream objectsBuffer;
|
|
||||||
|
|
||||||
for (auto& obj : objects)
|
|
||||||
{
|
|
||||||
objectsBuffer << "\\\\x" << ripple::strHex(obj.key) << '\t'
|
|
||||||
<< std::to_string(minLedger) << '\t' << "\\\\x"
|
|
||||||
<< ripple::strHex(obj.blob) << '\n';
|
|
||||||
}
|
|
||||||
pgQuery.bulkInsert("objects", objectsBuffer.str(), yield);
|
|
||||||
cursor = curCursor;
|
|
||||||
if (!cursor)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
BOOST_LOG_TRIVIAL(info) << __func__ << " finished inserting into objects";
|
|
||||||
{
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "DELETE FROM ledgers WHERE ledger_seq < "
|
|
||||||
<< std::to_string(minLedger);
|
|
||||||
auto res = pgQuery(sql.str().data(), yield);
|
|
||||||
if (res.msg() != "ok")
|
|
||||||
throw std::runtime_error("Error deleting from ledgers table");
|
|
||||||
}
|
|
||||||
{
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "DELETE FROM keys WHERE ledger_seq < "
|
|
||||||
<< std::to_string(minLedger);
|
|
||||||
auto res = pgQuery(sql.str().data(), yield);
|
|
||||||
if (res.msg() != "ok")
|
|
||||||
throw std::runtime_error("Error deleting from keys table");
|
|
||||||
}
|
|
||||||
{
|
|
||||||
std::stringstream sql;
|
|
||||||
sql << "DELETE FROM books WHERE ledger_seq < "
|
|
||||||
<< std::to_string(minLedger);
|
|
||||||
auto res = pgQuery(sql.str().data(), yield);
|
|
||||||
if (res.msg() != "ok")
|
|
||||||
throw std::runtime_error("Error deleting from books table");
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace Backend
|
|
||||||
@@ -1,171 +0,0 @@
|
|||||||
#ifndef RIPPLE_APP_REPORTING_POSTGRESBACKEND_H_INCLUDED
|
|
||||||
#define RIPPLE_APP_REPORTING_POSTGRESBACKEND_H_INCLUDED
|
|
||||||
#include <boost/json.hpp>
|
|
||||||
#include <backend/BackendInterface.h>
|
|
||||||
|
|
||||||
namespace Backend {
|
|
||||||
class PostgresBackend : public BackendInterface
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
mutable size_t numRowsInObjectsBuffer_ = 0;
|
|
||||||
mutable std::stringstream objectsBuffer_;
|
|
||||||
mutable size_t numRowsInSuccessorBuffer_ = 0;
|
|
||||||
mutable std::stringstream successorBuffer_;
|
|
||||||
mutable std::stringstream transactionsBuffer_;
|
|
||||||
mutable std::stringstream accountTxBuffer_;
|
|
||||||
std::shared_ptr<PgPool> pgPool_;
|
|
||||||
mutable PgQuery writeConnection_;
|
|
||||||
mutable bool abortWrite_ = false;
|
|
||||||
std::uint32_t writeInterval_ = 1000000;
|
|
||||||
std::uint32_t inProcessLedger = 0;
|
|
||||||
mutable std::unordered_set<std::string> successors_;
|
|
||||||
|
|
||||||
const char* const set_timeout = "SET statement_timeout TO 10000";
|
|
||||||
|
|
||||||
public:
|
|
||||||
PostgresBackend(
|
|
||||||
boost::asio::io_context& ioc,
|
|
||||||
boost::json::object const& config);
|
|
||||||
|
|
||||||
std::optional<std::uint32_t>
|
|
||||||
fetchLatestLedgerSequence(boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::optional<ripple::LedgerInfo>
|
|
||||||
fetchLedgerBySequence(
|
|
||||||
std::uint32_t const sequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::optional<ripple::LedgerInfo>
|
|
||||||
fetchLedgerByHash(
|
|
||||||
ripple::uint256 const& hash,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::optional<Blob>
|
|
||||||
doFetchLedgerObject(
|
|
||||||
ripple::uint256 const& key,
|
|
||||||
std::uint32_t const sequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
// returns a transaction, metadata pair
|
|
||||||
std::optional<TransactionAndMetadata>
|
|
||||||
fetchTransaction(
|
|
||||||
ripple::uint256 const& hash,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::vector<TransactionAndMetadata>
|
|
||||||
fetchAllTransactionsInLedger(
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::vector<ripple::uint256>
|
|
||||||
fetchAllTransactionHashesInLedger(
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::optional<NFT>
|
|
||||||
fetchNFT(
|
|
||||||
ripple::uint256 const& tokenID,
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
TransactionsAndCursor
|
|
||||||
fetchNFTTransactions(
|
|
||||||
ripple::uint256 const& tokenID,
|
|
||||||
std::uint32_t const limit,
|
|
||||||
bool const forward,
|
|
||||||
std::optional<TransactionsCursor> const& cursorIn,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::vector<LedgerObject>
|
|
||||||
fetchLedgerDiff(
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::optional<LedgerRange>
|
|
||||||
hardFetchLedgerRange(boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::optional<ripple::uint256>
|
|
||||||
doFetchSuccessorKey(
|
|
||||||
ripple::uint256 key,
|
|
||||||
std::uint32_t const ledgerSequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::vector<TransactionAndMetadata>
|
|
||||||
fetchTransactions(
|
|
||||||
std::vector<ripple::uint256> const& hashes,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
std::vector<Blob>
|
|
||||||
doFetchLedgerObjects(
|
|
||||||
std::vector<ripple::uint256> const& keys,
|
|
||||||
std::uint32_t const sequence,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
TransactionsAndCursor
|
|
||||||
fetchAccountTransactions(
|
|
||||||
ripple::AccountID const& account,
|
|
||||||
std::uint32_t const limit,
|
|
||||||
bool forward,
|
|
||||||
std::optional<TransactionsCursor> const& cursor,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
|
|
||||||
void
|
|
||||||
writeLedger(
|
|
||||||
ripple::LedgerInfo const& ledgerInfo,
|
|
||||||
std::string&& ledgerHeader) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
doWriteLedgerObject(
|
|
||||||
std::string&& key,
|
|
||||||
std::uint32_t const seq,
|
|
||||||
std::string&& blob) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
writeSuccessor(
|
|
||||||
std::string&& key,
|
|
||||||
std::uint32_t const seq,
|
|
||||||
std::string&& successor) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
writeTransaction(
|
|
||||||
std::string&& hash,
|
|
||||||
std::uint32_t const seq,
|
|
||||||
std::uint32_t const date,
|
|
||||||
std::string&& transaction,
|
|
||||||
std::string&& metadata) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
writeNFTs(std::vector<NFTsData>&& data) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
writeAccountTransactions(
|
|
||||||
std::vector<AccountTransactionsData>&& data) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
writeNFTTransactions(std::vector<NFTTransactionsData>&& data) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
open(bool readOnly) override;
|
|
||||||
|
|
||||||
void
|
|
||||||
close() override;
|
|
||||||
|
|
||||||
void
|
|
||||||
startWrites() const override;
|
|
||||||
|
|
||||||
bool
|
|
||||||
doFinishWrites() override;
|
|
||||||
|
|
||||||
bool
|
|
||||||
isTooBusy() const override
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
|
||||||
doOnlineDelete(
|
|
||||||
std::uint32_t const numLedgersToKeep,
|
|
||||||
boost::asio::yield_context& yield) const override;
|
|
||||||
};
|
|
||||||
} // namespace Backend
|
|
||||||
#endif
|
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
# Clio Backend
|
# Clio Backend
|
||||||
## Background
|
## Background
|
||||||
The backend of Clio is responsible for handling the proper reading and writing of past ledger data from and to a given database. As of right now, Cassandra is the only supported database that is production-ready. However, support for more databases like PostgreSQL and DynamoDB may be added in future versions. Support for database types can be easily extended by creating new implementations which implements the virtual methods of `BackendInterface.h`. Then, use the Factory Object Design Pattern to simply add logic statements to `BackendFactory.h` that return the new database interface for a specific `type` in Clio's configuration file.
|
The backend of Clio is responsible for handling the proper reading and writing of past ledger data from and to a given database. As of right now, Cassandra and ScyllaDB are the only supported databases that are production-ready. Support for database types can be easily extended by creating new implementations which implements the virtual methods of `BackendInterface.h`. Then, use the Factory Object Design Pattern to simply add logic statements to `BackendFactory.h` that return the new database interface for a specific `type` in Clio's configuration file.
|
||||||
|
|
||||||
## Data Model
|
## Data Model
|
||||||
The data model used by Clio to read and write ledger data is different from what Rippled uses. Rippled uses a novel data structure named [*SHAMap*](https://github.com/ripple/rippled/blob/master/src/ripple/shamap/README.md), which is a combination of a Merkle Tree and a Radix Trie. In a SHAMap, ledger objects are stored in the root vertices of the tree. Thus, looking up a record located at the leaf node of the SHAMap executes a tree search, where the path from the root node to the leaf node is the key of the record. Rippled nodes can also generate a proof-tree by forming a subtree with all the path nodes and their neighbors, which can then be used to prove the existnce of the leaf node data to other Rippled nodes. In short, the main purpose of the SHAMap data structure is to facilitate the fast validation of data integrity between different decentralized Rippled nodes.
|
The data model used by Clio to read and write ledger data is different from what Rippled uses. Rippled uses a novel data structure named [*SHAMap*](https://github.com/ripple/rippled/blob/master/src/ripple/shamap/README.md), which is a combination of a Merkle Tree and a Radix Trie. In a SHAMap, ledger objects are stored in the root vertices of the tree. Thus, looking up a record located at the leaf node of the SHAMap executes a tree search, where the path from the root node to the leaf node is the key of the record. Rippled nodes can also generate a proof-tree by forming a subtree with all the path nodes and their neighbors, which can then be used to prove the existnce of the leaf node data to other Rippled nodes. In short, the main purpose of the SHAMap data structure is to facilitate the fast validation of data integrity between different decentralized Rippled nodes.
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ read-only mode. In read-only mode, the server does not perform ETL and simply
|
|||||||
publishes new ledgers as they are written to the database.
|
publishes new ledgers as they are written to the database.
|
||||||
If the database is not updated within a certain time period
|
If the database is not updated within a certain time period
|
||||||
(currently hard coded at 20 seconds), clio will begin the ETL
|
(currently hard coded at 20 seconds), clio will begin the ETL
|
||||||
process and start writing to the database. Postgres will report an error when
|
process and start writing to the database. The database will report an error when
|
||||||
trying to write a record with a key that already exists. ETL uses this error to
|
trying to write a record with a key that already exists. ETL uses this error to
|
||||||
determine that another process is writing to the database, and subsequently
|
determine that another process is writing to the database, and subsequently
|
||||||
falls back to a soft read-only mode. clio can also operate in strict
|
falls back to a soft read-only mode. clio can also operate in strict
|
||||||
|
|||||||
@@ -262,7 +262,7 @@ private:
|
|||||||
/// following parent
|
/// following parent
|
||||||
/// @param parent the previous ledger
|
/// @param parent the previous ledger
|
||||||
/// @param rawData data extracted from an ETL source
|
/// @param rawData data extracted from an ETL source
|
||||||
/// @return the newly built ledger and data to write to Postgres
|
/// @return the newly built ledger and data to write to the database
|
||||||
std::pair<ripple::LedgerInfo, bool>
|
std::pair<ripple::LedgerInfo, bool>
|
||||||
buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData);
|
buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData);
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,11 @@
|
|||||||
#include <boost/algorithm/string.hpp>
|
#include <boost/algorithm/string.hpp>
|
||||||
|
#include <boost/format.hpp>
|
||||||
|
|
||||||
|
#include <ripple/basics/StringUtilities.h>
|
||||||
|
|
||||||
#include <backend/BackendInterface.h>
|
#include <backend/BackendInterface.h>
|
||||||
#include <rpc/RPCHelpers.h>
|
#include <rpc/RPCHelpers.h>
|
||||||
|
|
||||||
namespace RPC {
|
namespace RPC {
|
||||||
|
|
||||||
std::optional<bool>
|
std::optional<bool>
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <backend/BackendInterface.h>
|
#include <backend/BackendInterface.h>
|
||||||
#include <backend/DBHelpers.h>
|
#include <backend/DBHelpers.h>
|
||||||
#include <backend/Pg.h>
|
|
||||||
#include <rpc/RPCHelpers.h>
|
#include <rpc/RPCHelpers.h>
|
||||||
|
|
||||||
namespace RPC {
|
namespace RPC {
|
||||||
|
|||||||
@@ -10,7 +10,6 @@
|
|||||||
|
|
||||||
#include <backend/BackendInterface.h>
|
#include <backend/BackendInterface.h>
|
||||||
#include <backend/DBHelpers.h>
|
#include <backend/DBHelpers.h>
|
||||||
#include <backend/Pg.h>
|
|
||||||
|
|
||||||
namespace RPC {
|
namespace RPC {
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
#include <backend/BackendInterface.h>
|
#include <backend/BackendInterface.h>
|
||||||
#include <backend/Pg.h>
|
|
||||||
#include <rpc/RPCHelpers.h>
|
#include <rpc/RPCHelpers.h>
|
||||||
|
|
||||||
namespace RPC {
|
namespace RPC {
|
||||||
|
|||||||
@@ -3,12 +3,6 @@ The correctness of new implementations can be verified via running unit tests. B
|
|||||||
## Requirements
|
## Requirements
|
||||||
### 1. Cassandra cluster
|
### 1. Cassandra cluster
|
||||||
Have access to a **local (127.0.0.1)** Cassandra cluster, opened at port **9042**. Please ensure that the cluster is successfully running before running Unit Tests.
|
Have access to a **local (127.0.0.1)** Cassandra cluster, opened at port **9042**. Please ensure that the cluster is successfully running before running Unit Tests.
|
||||||
### 2. Postgres server
|
|
||||||
Have access to a **local (127.0.0.1)** Postgres server, opened at port **5432**. The server must also have a super user named **postgres** with password set to **postgres**. In addition, modify *postgresql.conf* with the following field values:
|
|
||||||
```
|
|
||||||
max_connections = 1000
|
|
||||||
shared_buffers = 1280MB
|
|
||||||
```
|
|
||||||
## Running
|
## Running
|
||||||
To run the unit tests, first build Clio as normal, then execute `./clio_tests` to run the unit tests.
|
To run the unit tests, first build Clio as normal, then execute `./clio_tests` to run the unit tests.
|
||||||
|
|
||||||
|
|||||||
@@ -37,20 +37,7 @@ TEST(BackendTest, Basic)
|
|||||||
{"max_requests_outstanding", 1000},
|
{"max_requests_outstanding", 1000},
|
||||||
{"indexer_key_shift", 2},
|
{"indexer_key_shift", 2},
|
||||||
{"threads", 8}}}}}};
|
{"threads", 8}}}}}};
|
||||||
boost::json::object postgresConfig{
|
std::vector<boost::json::object> configs = {cassandraConfig};
|
||||||
{"database",
|
|
||||||
{{"type", "postgres"},
|
|
||||||
{"experimental", true},
|
|
||||||
{"postgres",
|
|
||||||
{{"contact_point", "127.0.0.1"},
|
|
||||||
{"username", "postgres"},
|
|
||||||
{"database", keyspace.c_str()},
|
|
||||||
{"password", "postgres"},
|
|
||||||
{"indexer_key_shift", 2},
|
|
||||||
{"max_connections", 100},
|
|
||||||
{"threads", 8}}}}}};
|
|
||||||
std::vector<boost::json::object> configs = {
|
|
||||||
cassandraConfig, postgresConfig};
|
|
||||||
for (auto& config : configs)
|
for (auto& config : configs)
|
||||||
{
|
{
|
||||||
std::cout << keyspace << std::endl;
|
std::cout << keyspace << std::endl;
|
||||||
@@ -1853,20 +1840,7 @@ TEST(Backend, cacheIntegration)
|
|||||||
{"max_requests_outstanding", 1000},
|
{"max_requests_outstanding", 1000},
|
||||||
{"indexer_key_shift", 2},
|
{"indexer_key_shift", 2},
|
||||||
{"threads", 8}}}}}};
|
{"threads", 8}}}}}};
|
||||||
boost::json::object postgresConfig{
|
std::vector<boost::json::object> configs = {cassandraConfig};
|
||||||
{"database",
|
|
||||||
{{"type", "postgres"},
|
|
||||||
{"experimental", true},
|
|
||||||
{"postgres",
|
|
||||||
{{"contact_point", "127.0.0.1"},
|
|
||||||
{"username", "postgres"},
|
|
||||||
{"database", keyspace.c_str()},
|
|
||||||
{"password", "postgres"},
|
|
||||||
{"indexer_key_shift", 2},
|
|
||||||
{"max_connections", 100},
|
|
||||||
{"threads", 8}}}}}};
|
|
||||||
std::vector<boost::json::object> configs = {
|
|
||||||
cassandraConfig, postgresConfig};
|
|
||||||
for (auto& config : configs)
|
for (auto& config : configs)
|
||||||
{
|
{
|
||||||
std::cout << keyspace << std::endl;
|
std::cout << keyspace << std::endl;
|
||||||
|
|||||||
Reference in New Issue
Block a user