diff --git a/CMake/deps/Postgres.cmake b/CMake/deps/Postgres.cmake deleted file mode 100644 index c37f9934..00000000 --- a/CMake/deps/Postgres.cmake +++ /dev/null @@ -1,31 +0,0 @@ -set(POSTGRES_INSTALL_DIR ${CMAKE_BINARY_DIR}/postgres) -set(POSTGRES_LIBS pq pgcommon pgport) -ExternalProject_Add(postgres - GIT_REPOSITORY https://github.com/postgres/postgres.git - GIT_TAG REL_14_1 - GIT_SHALLOW 1 - LOG_CONFIGURE 1 - LOG_BUILD 1 - CONFIGURE_COMMAND ./configure --prefix ${POSTGRES_INSTALL_DIR} --without-readline --verbose - BUILD_COMMAND ${CMAKE_COMMAND} -E env --unset=MAKELEVEL make VERBOSE=${CMAKE_VERBOSE_MAKEFILE} -j32 - BUILD_IN_SOURCE 1 - INSTALL_COMMAND ${CMAKE_COMMAND} -E env make -s --no-print-directory install - UPDATE_COMMAND "" - BUILD_BYPRODUCTS - ${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}pq${CMAKE_STATIC_LIBRARY_SUFFIX}} - ${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}pgcommon${CMAKE_STATIC_LIBRARY_SUFFIX}} - ${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}pgport${CMAKE_STATIC_LIBRARY_SUFFIX}} - ) -ExternalProject_Get_Property (postgres BINARY_DIR) - -foreach(_lib ${POSTGRES_LIBS}) - add_library(${_lib} STATIC IMPORTED GLOBAL) - add_dependencies(${_lib} postgres) - set_target_properties(${_lib} PROPERTIES - IMPORTED_LOCATION ${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${_lib}.a) - set_target_properties(${_lib} PROPERTIES - INTERFACE_INCLUDE_DIRECTORIES ${POSTGRES_INSTALL_DIR}/include) - target_link_libraries(clio PUBLIC ${POSTGRES_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${_lib}${CMAKE_STATIC_LIBRARY_SUFFIX}) -endforeach() -add_dependencies(clio postgres) -target_include_directories(clio PUBLIC ${POSTGRES_INSTALL_DIR}/include) diff --git a/CMakeLists.txt b/CMakeLists.txt index aa531bfa..363482b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,7 +45,6 @@ include(CMake/ClioVersion.cmake) include(CMake/deps/rippled.cmake) include(CMake/deps/Boost.cmake) include(CMake/deps/cassandra.cmake) -include(CMake/deps/Postgres.cmake) target_sources(clio PRIVATE ## Main @@ -53,8 +52,6 @@ target_sources(clio PRIVATE ## Backend src/backend/BackendInterface.cpp src/backend/CassandraBackend.cpp - src/backend/Pg.cpp - src/backend/PostgresBackend.cpp src/backend/SimpleCache.cpp ## ETL src/etl/ETLSource.cpp diff --git a/REVIEW.md b/REVIEW.md deleted file mode 100644 index 1258dca8..00000000 --- a/REVIEW.md +++ /dev/null @@ -1,121 +0,0 @@ -# How to review clio -Clio is a massive project, and thus I don't expect the code to be reviewed the -way a normal PR would. So I put this guide together to help reviewers look at -the relevant pieces of code without getting lost in the weeds. - -One thing reviewers should keep in mind is that most of clio is designed to be -lightweight and simple. We try not to introduce any uneccessary complexity and -keep the code as simple and straightforward as possible. Sometimes complexity is -unavoidable, but simplicity is the goal. - -## Order of review -The code is organized into 4 main components, each with their own folder. The -code in each folder is as self contained as possible. A good way to approach -the review would be to review one folder at a time. - -### backend -The code in the backend folder is the heart of the project, and reviewers should -start here. This is the most complex part of the code, as well as the most -performance sensitive. clio does not keep any data in memory, so performance -generally depends on the data model and the way we talk to the database. - -Reviewers should start with the README in this folder to get a high level idea -of the data model and to review the data model itself. Then, reviewers should -dive into the implementation. The table schemas and queries for Cassandra are -defined in `CassandraBackend::open()`. The table schemas for Postgres are defined -in Pg.cpp. The queries for Postgres are defined in each of the functions of `PostgresBackend`. -A good way to approach the implementation would be to look at the table schemas, -and then go through the functions declared in `BackendInterface`. Reviewers could -also branch out to the rest of the code by looking at where these functions are -called from. - -### webserver -The code in the webserver folder implements the web server for handling RPC requests. -This code was mostly copied and pasted from boost beast example code, so I would -really appreciate review here. - -### rpc -The rpc folder contains all of the handlers and any helper functions they need. -This code is not too complicated, so reviewers don't need to dwell long here. - -### etl -The etl folder contains all of the code for extracting data from rippled. This -code is complex and important, but most of this code was just copied from rippled -reporting mode, and thus has already been reviewed and is being used in prod. - -## Design decisions that should be reviewed - -### Data model -Reviewers should review the general data model. The data model itself is described -at a high level in the README in the backend folder. The table schemas and queries -for Cassandra are defined in the `open()` function of `CassandraBackend`. The table -schemas for Postgres are defined in Pg.cpp. - -Particular attention should be paid to the keys table, and the problem that solves -(successor/upper bound). I originally was going to have a special table for book_offers, -but then I decided that we could use the keys table itself for that and save space. -This makes book_offers somewhat slow compared to rippled, though still very usable. - -### Large rows -I did some tricks with Cassandra to deal with very large rows in the keys and account_tx -tables. For each of these, the partition key (the first component of the primary -key) is a compound key. This is meant to break large rows into smaller rows. This -is done to avoid hotspots. Data is sharded in Cassandra, and if some rows get very -large, some nodes can have a lot more data than others. - -For account_tx, this has performance implications when iterating very far back -in time. Refer to the `fetchAccountTransactions()` function in `CassandraBackend`. - -It is unclear if this needs to be done for other tables. - -### Postgres table partitioning -Originally, Postgres exhibited performance problems when the dataset approach 1 -TB. This was solved by table partitioning. - -### Threading -I used asio for multithreading. There are a lot of different io_contexts lying -around the code. This needs to be cleaned up a bit. Most of these are really -just ways to submit an async job to a single thread. I don't think it makes -sense to have one io_context for the whole application, but some of the threading -is a bit opaque and could be cleaned up. - -### Boost Json -I used boost json for serializing data to json. - -### No cache -As of now, there is no cache. I am not sure if a cache is even worth it. A -transaction cache would not be hard, but a cache for ledger data will be hard. -While a cache would improve performance, it would increase memory usage. clio -is designed to be lightweight. Also, I've reached thousands of requests per -second with a single clio node, so I'm not sure performance is even an issue. - -## Things I'm less than happy about - -#### BackendIndexer -This is a particularly hairy piece of code that handles writing to the keys table. -I am not too happy with this code. Parts of it need to execute in real time as -part of ETL, and other parts are allowed to run in the background. There is also -code that detects if a previous background job failed to complete before the -server shutdown, and thus tries to rerun that job. The code feels tacked on, and -I would like it to be more cleanly integrated with the rest of the code. - -#### Shifting -There is some bit shifting going on with the keys table and the account_tx table. -The keys table is written to every 2^20 ledgers. Maybe it would be better to just -write every 1 million ledgers. - -#### performance of book_offers -book_offers is a bit slow. It could be sped up in a variety of ways. One is to -keep a separate book_offers table. However, this is not straightforward and will -use more space. Another is to keep a cache of book_offers for the most recent ledger -(or few ledgers). I am not sure if this is worth it - -#### account_tx in Cassandra -After the fix to deal with large rows, account_tx can be slow at times when using -Cassandra. Specifically, if there are large gaps in time where the account was -not affected by any transactions, the code will be reading empty records. I would -like to sidestep this issue if possible. - -#### Implementation of fetchLedgerPage -`fetchLedgerPage()` is rather complex. Part of this seems unavoidable, since this -code is dealing with the keys table. diff --git a/src/backend/BackendFactory.h b/src/backend/BackendFactory.h index 283de78a..ad4464e6 100644 --- a/src/backend/BackendFactory.h +++ b/src/backend/BackendFactory.h @@ -4,7 +4,6 @@ #include #include #include -#include namespace Backend { std::shared_ptr @@ -30,19 +29,6 @@ make_Backend(boost::asio::io_context& ioc, boost::json::object const& config) backend = std::make_shared( ioc, dbConfig.at(type).as_object()); } - else if (boost::iequals(type, "postgres")) - { - if (dbConfig.contains("experimental") && - dbConfig.at("experimental").is_bool() && - dbConfig.at("experimental").as_bool()) - backend = std::make_shared( - ioc, dbConfig.at(type).as_object()); - else - BOOST_LOG_TRIVIAL(fatal) - << "Postgres support is experimental at this time. " - << "If you would really like to use Postgres, add " - "\"experimental\":true to your database config"; - } if (!backend) throw std::runtime_error("Invalid database type"); diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index aed93d9e..6ba15e36 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -1,10 +1,16 @@ #ifndef RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED #define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED + +#include +#include +#include + #include -#include + #include #include #include + #include #include diff --git a/src/backend/DBHelpers.h b/src/backend/DBHelpers.h index fdebea61..11ba2489 100644 --- a/src/backend/DBHelpers.h +++ b/src/backend/DBHelpers.h @@ -2,11 +2,14 @@ #define CLIO_BACKEND_DBHELPERS_H_INCLUDED #include +#include +#include #include #include #include + #include -#include + #include /// Struct used to keep track of what to write to diff --git a/src/backend/Pg.cpp b/src/backend/Pg.cpp deleted file mode 100644 index 3ce80f45..00000000 --- a/src/backend/Pg.cpp +++ /dev/null @@ -1,1788 +0,0 @@ -// Need raw socket manipulation to determine if postgres socket IPv4 or 6. -#if defined(_WIN32) -#include -#include -#else -#include -#include -#include -#include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -static void -noticeReceiver(void* arg, PGresult const* res) -{ - BOOST_LOG_TRIVIAL(trace) << "server message: " << PQresultErrorMessage(res); -} - -//----------------------------------------------------------------------------- - -std::string -PgResult::msg() const -{ - if (error_.has_value()) - { - std::stringstream ss; - ss << error_->first << ": " << error_->second; - return ss.str(); - } - if (result_) - return "ok"; - - // Must be stopping. - return "stopping"; -} - -//----------------------------------------------------------------------------- - -/* - Connecting described in: - https://www.postgresql.org/docs/10/libpq-connect.html - */ -void -Pg::connect(boost::asio::yield_context& yield) -{ - std::function poller; - if (conn_) - { - if (PQstatus(conn_.get()) == CONNECTION_OK) - return; - /* Try resetting connection, or disconnect and retry if that fails. - PQfinish() is synchronous so first try to asynchronously reset. */ - if (PQresetStart(conn_.get())) - poller = PQresetPoll; - else - disconnect(); - } - - if (!conn_) - { - conn_.reset(PQconnectStartParams( - reinterpret_cast(&config_.keywordsIdx[0]), - reinterpret_cast(&config_.valuesIdx[0]), - 0)); - poller = PQconnectPoll; - } - - if (!conn_) - throw std::runtime_error("No db connection object"); - - if (PQstatus(conn_.get()) == CONNECTION_BAD) - { - std::stringstream ss; - ss << "DB connection status " << PQstatus(conn_.get()) << ": " - << PQerrorMessage(conn_.get()); - throw std::runtime_error(ss.str()); - } - - PQsetNoticeReceiver(conn_.get(), noticeReceiver, nullptr); - - try - { - socket_ = getSocket(yield); - - /* Asynchronously connecting entails several messages between - * client and server. */ - PostgresPollingStatusType poll = PGRES_POLLING_WRITING; - while (poll != PGRES_POLLING_OK) - { - switch (poll) - { - case PGRES_POLLING_FAILED: { - std::stringstream ss; - ss << "DB connection failed"; - char* err = PQerrorMessage(conn_.get()); - if (err) - ss << ":" << err; - else - ss << '.'; - throw std::runtime_error(ss.str()); - } - - case PGRES_POLLING_READING: - socket_->async_wait( - boost::asio::ip::tcp::socket::wait_read, yield); - break; - - case PGRES_POLLING_WRITING: - socket_->async_wait( - boost::asio::ip::tcp::socket::wait_write, yield); - break; - - default: { - assert(false); - std::stringstream ss; - ss << "unknown DB polling status: " << poll; - throw std::runtime_error(ss.str()); - } - } - poll = poller(conn_.get()); - } - } - catch (std::exception const& e) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " " - << "error, polling connection" - << "error = " << e.what(); - // Sever connection upon any error. - disconnect(); - std::stringstream ss; - ss << "polling connection error: " << e.what(); - throw std::runtime_error(ss.str()); - } - - /* Enable asynchronous writes. */ - if (PQsetnonblocking(conn_.get(), 1) == -1) - { - std::stringstream ss; - char* err = PQerrorMessage(conn_.get()); - if (err) - ss << "Error setting connection to non-blocking: " << err; - else - ss << "Unknown error setting connection to non-blocking"; - throw std::runtime_error(ss.str()); - } - - if (PQstatus(conn_.get()) != CONNECTION_OK) - { - std::stringstream ss; - ss << "bad connection" << std::to_string(PQstatus(conn_.get())); - char* err = PQerrorMessage(conn_.get()); - if (err) - ss << ": " << err; - else - ss << '.'; - throw std::runtime_error(ss.str()); - } -} - -inline void -Pg::flush(boost::asio::yield_context& yield) -{ - // non-blocking connection requires manually flushing write. - int flushed; - do - { - flushed = PQflush(conn_.get()); - if (flushed == 1) - { - socket_->async_wait( - boost::asio::ip::tcp::socket::wait_write, yield); - } - else if (flushed == -1) - { - std::stringstream ss; - ss << "error flushing query " << PQerrorMessage(conn_.get()); - throw std::runtime_error(ss.str()); - } - } while (flushed); -} - -inline PgResult -Pg::waitForStatus(boost::asio::yield_context& yield, ExecStatusType expected) -{ - PgResult ret; - while (true) - { - if (PQisBusy(conn_.get())) - { - socket_->async_wait(boost::asio::ip::tcp::socket::wait_read, yield); - } - - if (!PQconsumeInput(conn_.get())) - { - std::stringstream ss; - ss << "query consume input error: " << PQerrorMessage(conn_.get()); - throw std::runtime_error(ss.str()); - } - - if (PQisBusy(conn_.get())) - continue; - - pg_result_type res{PQgetResult(conn_.get()), [](PGresult* result) { - PQclear(result); - }}; - - if (!res) - break; - - auto status = PQresultStatus(res.get()); - ret = PgResult(std::move(res)); - - if (status == expected) - break; - } - - return ret; -} - -inline asio_socket_type -Pg::getSocket(boost::asio::yield_context& yield) -{ - asio_socket_type s{ - new boost::asio::ip::tcp::socket( - boost::asio::get_associated_executor(yield), - boost::asio::ip::tcp::v4(), - PQsocket(conn_.get())), - [](boost::asio::ip::tcp::socket* socket) { - socket->cancel(); - socket->release(); - delete socket; - }}; - - return s; -} - -PgResult -Pg::query( - char const* command, - std::size_t const nParams, - char const* const* values, - boost::asio::yield_context& yield) -{ - pg_result_type ret{nullptr, [](PGresult* result) { PQclear(result); }}; - // Connect then submit query. - try - { - connect(yield); - - int sent; - if (nParams) - { - // PQexecParams can process only a single command. - sent = PQsendQueryParams( - conn_.get(), - command, - nParams, - nullptr, - values, - nullptr, - nullptr, - 0); - } - else - { - // PQexec can process multiple commands separated by - // semi-colons. Returns the response from the last - // command processed. - sent = PQsendQuery(conn_.get(), command); - } - - if (!sent) - { - std::stringstream ss; - ss << "Can't send query: " << PQerrorMessage(conn_.get()); - throw std::runtime_error(ss.str()); - } - - flush(yield); - - /* Only read response if query was submitted successfully. - Only a single response is expected, but the API requires - responses to be read until nullptr is returned. - It is possible for pending reads on the connection to interfere - with the current query. For simplicity, this implementation - only flushes pending writes and assumes there are no pending reads. - To avoid this, all pending reads from each query must be consumed, - and all connections with any type of error be severed. */ - while (true) - { - if (PQisBusy(conn_.get())) - socket_->async_wait( - boost::asio::ip::tcp::socket::wait_read, yield); - - if (!PQconsumeInput(conn_.get())) - { - std::stringstream ss; - ss << "query consume input error: " - << PQerrorMessage(conn_.get()); - throw std::runtime_error(ss.str()); - } - - if (PQisBusy(conn_.get())) - continue; - - pg_result_type res{PQgetResult(conn_.get()), [](PGresult* result) { - PQclear(result); - }}; - - if (!res) - break; - - ret.reset(res.release()); - - // ret is never null in these cases, so need to break. - bool copyStatus = false; - switch (PQresultStatus(ret.get())) - { - case PGRES_COPY_IN: - case PGRES_COPY_OUT: - case PGRES_COPY_BOTH: - copyStatus = true; - break; - default:; - } - if (copyStatus) - break; - } - } - catch (std::exception const& e) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " " - << "error, severing connection " - << "error = " << e.what(); - // Sever connection upon any error. - disconnect(); - std::stringstream ss; - ss << "query error: " << e.what(); - throw std::runtime_error(ss.str()); - } - - if (!ret) - throw std::runtime_error("no result structure returned"); - - // Ensure proper query execution. - switch (PQresultStatus(ret.get())) - { - case PGRES_TUPLES_OK: - case PGRES_COMMAND_OK: - case PGRES_COPY_IN: - case PGRES_COPY_OUT: - case PGRES_COPY_BOTH: - break; - default: { - std::stringstream ss; - ss << "bad query result: " << PQresStatus(PQresultStatus(ret.get())) - << " error message: " << PQerrorMessage(conn_.get()) - << ", number of tuples: " << PQntuples(ret.get()) - << ", number of fields: " << PQnfields(ret.get()); - BOOST_LOG_TRIVIAL(error) << ss.str(); - - PgResult retRes(ret.get(), conn_.get()); - disconnect(); - - return retRes; - } - } - - return PgResult(std::move(ret)); -} - -static pg_formatted_params -formatParams(pg_params const& dbParams) -{ - std::vector> const& values = dbParams.second; - /* Convert vector to C-style array of C-strings for postgres API. - std::nullopt is a proxy for NULL since an empty std::string is - 0 length but not NULL. */ - std::vector valuesIdx; - valuesIdx.reserve(values.size()); - std::stringstream ss; - bool first = true; - for (auto const& value : values) - { - if (value) - { - valuesIdx.push_back(value->c_str()); - ss << value->c_str(); - } - else - { - valuesIdx.push_back(nullptr); - ss << "(null)"; - } - if (first) - first = false; - else - ss << ','; - } - - BOOST_LOG_TRIVIAL(trace) - << "query: " << dbParams.first << ". params: " << ss.str(); - return valuesIdx; -} - -PgResult -Pg::query(pg_params const& dbParams, boost::asio::yield_context& yield) -{ - char const* const& command = dbParams.first; - auto const formattedParams = formatParams(dbParams); - return query( - command, - formattedParams.size(), - formattedParams.size() - ? reinterpret_cast(&formattedParams[0]) - : nullptr, - yield); -} - -void -Pg::bulkInsert( - char const* table, - std::string const& records, - boost::asio::yield_context& yield) -{ - // https://www.postgresql.org/docs/12/libpq-copy.html#LIBPQ-COPY-SEND - assert(conn_.get()); - auto copyCmd = boost::format(R"(COPY %s FROM stdin)"); - auto formattedCmd = boost::str(copyCmd % table); - auto res = query(formattedCmd.c_str(), yield); - if (!res || res.status() != PGRES_COPY_IN) - { - std::stringstream ss; - ss << "bulkInsert to " << table - << ". Postgres insert error: " << res.msg(); - if (res) - ss << ". Query status not PGRES_COPY_IN: " << res.status(); - BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; - throw std::runtime_error(ss.str()); - } - - try - { - while (true) - { - std::int32_t const putCopy = - PQputCopyData(conn_.get(), records.c_str(), records.size()); - - if (putCopy == -1) - { - std::stringstream ss; - ss << "bulkInsert to " << table - << ". PQputCopyData error: " << PQerrorMessage(conn_.get()); - disconnect(); - BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; - throw std::runtime_error(ss.str()); - } - - else if (putCopy == 0) - // If the value is zero, wait for write-ready and try again. - socket_->async_wait( - boost::asio::ip::tcp::socket::wait_write, yield); - else - break; - } - - flush(yield); - auto copyRes = waitForStatus(yield, PGRES_COPY_IN); - if (!copyRes || copyRes.status() != PGRES_COPY_IN) - { - std::stringstream ss; - ss << "bulkInsert to " << table - << ". Postgres insert error: " << copyRes.msg(); - if (res) - ss << ". CopyPut status not PGRES_COPY_IN: " - << copyRes.status(); - BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; - throw std::runtime_error(ss.str()); - } - - std::int32_t copyEnd; - do - { - copyEnd = PQputCopyEnd(conn_.get(), nullptr); - - if (copyEnd == -1) - { - std::stringstream ss; - ss << "bulkInsert to " << table - << ". PQputCopyEnd error: " << PQerrorMessage(conn_.get()); - disconnect(); - BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; - throw std::runtime_error(ss.str()); - } - - // If the value is zero, wait for write-ready and try again. - if (copyEnd == 0) - socket_->async_wait( - boost::asio::ip::tcp::socket::wait_write, yield); - } while (copyEnd == 0); - - flush(yield); - auto endRes = waitForStatus(yield, PGRES_COMMAND_OK); - - if (!endRes || endRes.status() != PGRES_COMMAND_OK) - { - std::stringstream ss; - ss << "bulkInsert to " << table - << ". Postgres insert error: " << endRes.msg(); - if (res) - ss << ". CopyEnd status not PGRES_COMMAND_OK: " - << endRes.status(); - BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; - throw std::runtime_error(ss.str()); - } - - pg_result_type finalRes{PQgetResult(conn_.get()), [](PGresult* result) { - PQclear(result); - }}; - - if (finalRes) - { - std::stringstream ss; - ss << "bulkInsert to " << table - << ". Postgres insert error: " << res.msg(); - if (res) - ss << ". Query status not NULL: " << res.status(); - BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; - throw std::runtime_error(ss.str()); - } - } - catch (std::exception const& e) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " " - << "error, bulk insertion" - << "error = " << e.what(); - // Sever connection upon any error. - disconnect(); - std::stringstream ss; - ss << "query error: " << e.what(); - throw std::runtime_error(ss.str()); - } -} - -bool -Pg::clear() -{ - if (!conn_) - return false; - - // The result object must be freed using the libpq API PQclear() call. - pg_result_type res{nullptr, [](PGresult* result) { PQclear(result); }}; - - // Consume results until no more, or until the connection is severed. - do - { - res.reset(PQgetResult(conn_.get())); - if (!res) - break; - - // Pending bulk copy operations may leave the connection in such a - // state that it must be disconnected. - switch (PQresultStatus(res.get())) - { - case PGRES_COPY_IN: - if (PQputCopyEnd(conn_.get(), nullptr) != -1) - break; - [[fallthrough]]; // avoids compiler warning - case PGRES_COPY_OUT: - case PGRES_COPY_BOTH: - conn_.reset(); - default:; - } - } while (res && conn_); - - try - { - socket_->cancel(); - } - catch (std::exception const& e) - { - } - - return conn_ != nullptr; -} - -//----------------------------------------------------------------------------- - -PgPool::PgPool(boost::asio::io_context& ioc, boost::json::object const& config) - : ioc_(ioc) -{ - // Make sure that boost::asio initializes the SSL library. - { - static boost::asio::ssl::detail::openssl_init initSsl; - } - // Don't have postgres client initialize SSL. - PQinitOpenSSL(0, 0); - - /* - Connect to postgres to create low level connection parameters - with optional caching of network address info for subsequent connections. - See https://www.postgresql.org/docs/10/libpq-connect.html - - For bounds checking of postgres connection data received from - the network: the largest size for any connection field in - PG source code is 64 bytes as of 5/2019. There are 29 fields. - */ - constexpr std::size_t maxFieldSize = 1024; - constexpr std::size_t maxFields = 1000; - std::string conninfo = "postgres://"; - auto getFieldAsString = [&config](auto field) { - if (!config.contains(field)) - throw std::runtime_error( - field + std::string{" missing from postgres config"}); - if (!config.at(field).is_string()) - throw std::runtime_error( - field + std::string{" in postgres config is not a string"}); - return std::string{config.at(field).as_string().c_str()}; - }; - conninfo += getFieldAsString("username"); - conninfo += ":"; - conninfo += getFieldAsString("password"); - conninfo += "@"; - conninfo += getFieldAsString("contact_point"); - conninfo += "/"; - conninfo += getFieldAsString("database"); - - // The connection object must be freed using the libpq API PQfinish() call. - pg_connection_type conn( - PQconnectdb(conninfo.c_str()), [](PGconn* conn) { PQfinish(conn); }); - if (!conn) - throw std::runtime_error("Can't create DB connection."); - if (PQstatus(conn.get()) != CONNECTION_OK) - { - std::stringstream ss; - ss << "Initial DB connection failed: " << PQerrorMessage(conn.get()); - throw std::runtime_error(ss.str()); - } - - int const sockfd = PQsocket(conn.get()); - if (sockfd == -1) - throw std::runtime_error("No DB socket is open."); - struct sockaddr_storage addr; - socklen_t len = sizeof(addr); - if (getpeername(sockfd, reinterpret_cast(&addr), &len) == - -1) - { - throw std::system_error( - errno, std::generic_category(), "Can't get server address info."); - } - - // Set "port" and "hostaddr" if we're caching it. - bool const remember_ip = config.contains("remember_ip") - ? config.at("remember_ip").as_bool() - : true; - - if (remember_ip) - { - config_.keywords.push_back("port"); - config_.keywords.push_back("hostaddr"); - std::string port; - std::string hostaddr; - - if (addr.ss_family == AF_INET) - { - hostaddr.assign(INET_ADDRSTRLEN, '\0'); - struct sockaddr_in const& ainfo = - reinterpret_cast(addr); - port = std::to_string(ntohs(ainfo.sin_port)); - if (!inet_ntop( - AF_INET, &ainfo.sin_addr, &hostaddr[0], hostaddr.size())) - { - throw std::system_error( - errno, - std::generic_category(), - "Can't get IPv4 address string."); - } - } - else if (addr.ss_family == AF_INET6) - { - hostaddr.assign(INET6_ADDRSTRLEN, '\0'); - struct sockaddr_in6 const& ainfo = - reinterpret_cast(addr); - port = std::to_string(ntohs(ainfo.sin6_port)); - if (!inet_ntop( - AF_INET6, &ainfo.sin6_addr, &hostaddr[0], hostaddr.size())) - { - throw std::system_error( - errno, - std::generic_category(), - "Can't get IPv6 address string."); - } - } - - config_.values.push_back(port.c_str()); - config_.values.push_back(hostaddr.c_str()); - } - std::unique_ptr connOptions( - PQconninfo(conn.get()), - [](PQconninfoOption* opts) { PQconninfoFree(opts); }); - if (!connOptions) - throw std::runtime_error("Can't get DB connection options."); - - std::size_t nfields = 0; - for (PQconninfoOption* option = connOptions.get(); - option->keyword != nullptr; - ++option) - { - if (++nfields > maxFields) - { - std::stringstream ss; - ss << "DB returned connection options with > " << maxFields - << " fields."; - throw std::runtime_error(ss.str()); - } - - if (!option->val || - (remember_ip && - (!strcmp(option->keyword, "hostaddr") || - !strcmp(option->keyword, "port")))) - { - continue; - } - - if (strlen(option->keyword) > maxFieldSize || - strlen(option->val) > maxFieldSize) - { - std::stringstream ss; - ss << "DB returned a connection option name or value with\n"; - ss << "excessive size (>" << maxFieldSize << " bytes).\n"; - ss << "option (possibly truncated): " - << std::string_view( - option->keyword, - std::min(strlen(option->keyword), maxFieldSize)) - << '\n'; - ss << " value (possibly truncated): " - << std::string_view( - option->val, std::min(strlen(option->val), maxFieldSize)); - throw std::runtime_error(ss.str()); - } - config_.keywords.push_back(option->keyword); - config_.values.push_back(option->val); - } - - config_.keywordsIdx.reserve(config_.keywords.size() + 1); - config_.valuesIdx.reserve(config_.values.size() + 1); - for (std::size_t n = 0; n < config_.keywords.size(); ++n) - { - config_.keywordsIdx.push_back(config_.keywords[n].c_str()); - config_.valuesIdx.push_back(config_.values[n].c_str()); - } - config_.keywordsIdx.push_back(nullptr); - config_.valuesIdx.push_back(nullptr); - - if (config.contains("max_connections")) - config_.max_connections = config.at("max_connections").as_int64(); - if (config.contains("timeout")) - config_.timeout = - std::chrono::seconds(config.at("timeout").as_uint64()); -} - -void -PgPool::setup() -{ - { - std::stringstream ss; - ss << "max_connections: " << config_.max_connections << ", " - << "timeout: " << config_.timeout.count() << ", " - << "connection params: "; - bool first = true; - for (std::size_t i = 0; i < config_.keywords.size(); ++i) - { - if (first) - first = false; - else - ss << ", "; - ss << config_.keywords[i] << ": " - << (config_.keywords[i] == "password" ? "*" : config_.values[i]); - } - BOOST_LOG_TRIVIAL(debug) << ss.str(); - } -} - -void -PgPool::onStop() -{ - std::lock_guard lock(mutex_); - stop_ = true; - cond_.notify_all(); - idle_.clear(); - BOOST_LOG_TRIVIAL(info) << "stopped"; -} - -std::unique_ptr -PgPool::checkout() -{ - std::unique_ptr ret; - std::unique_lock lock(mutex_); - do - { - if (stop_) - return {}; - - // If there is a connection in the pool, return the most recent. - if (idle_.size()) - { - auto entry = idle_.rbegin(); - ret = std::move(entry->second); - idle_.erase(std::next(entry).base()); - } - // Otherwise, return a new connection unless over threshold. - else if (connections_ < config_.max_connections) - { - ++connections_; - ret = std::make_unique(config_, ioc_); - } - // Otherwise, wait until a connection becomes available or we stop. - else - { - BOOST_LOG_TRIVIAL(error) << "No database connections available."; - cond_.wait(lock); - } - } while (!ret && !stop_); - lock.unlock(); - - return ret; -} - -void -PgPool::checkin(std::unique_ptr& pg) -{ - if (pg) - { - std::lock_guard lock(mutex_); - if (!stop_ && pg->clear()) - { - pg->clear(); - idle_.emplace(clock_type::now(), std::move(pg)); - } - else - { - --connections_; - pg.reset(); - } - } - - cond_.notify_all(); -} - -//----------------------------------------------------------------------------- - -std::shared_ptr -make_PgPool(boost::asio::io_context& ioc, boost::json::object const& config) -{ - try - { - auto ret = std::make_shared(ioc, config); - ret->setup(); - return ret; - } - catch (std::runtime_error& e) - { - boost::json::object configCopy = config; - configCopy["database"] = "postgres"; - auto ret = std::make_shared(ioc, configCopy); - ret->setup(); - - Backend::synchronous([&](boost::asio::yield_context& yield) { - PgQuery pgQuery(ret); - std::string query = "CREATE DATABASE " + - std::string{config.at("database").as_string().c_str()}; - pgQuery(query.c_str(), yield); - }); - - ret = std::make_shared(ioc, config); - - ret->setup(); - return ret; - } -} - -//----------------------------------------------------------------------------- - -/** Postgres Schema Management - * - * The postgres schema has several properties to facilitate - * consistent deployments, including upgrades. It is not recommended to - * upgrade the schema concurrently. - * - * Initial deployment should be against a completely fresh database. The - * postgres user must have the CREATE TABLE privilege. - * - * With postgres configured, the first step is to apply the version_query - * schema and consume the results. This script returns the currently - * installed schema version, if configured, or 0 if not. It is idempotent. - * - * If the version installed on the database is equal to the - * LATEST_SCHEMA_VERSION, then no action should take place. - * - * If the version on the database is 0, then the entire latest schema - * should be deployed with the applySchema() function. - * Each version that is developed is fully - * represented in the full_schemata array with each version equal to the - * text in the array's index position. For example, index position 1 - * contains the full schema version 1. Position 2 contains schema version 2. - * Index 0 should never be referenced and its value only a placeholder. - * If a fresh installation is aborted, then subsequent fresh installations - * should install the same version previously attempted, even if there - * exists a newer version. The initSchema() function performs this task. - * Therefore, previous schema versions should remain in the array - * without modification as new versions are developed and placed after them. - * Once the schema is succesffuly deployed, applySchema() persists the - * schema version to the database. - * - * If the current version of the database is greater than 0, then it means - * that a previous schema version is already present. In this case, the database - * schema needs to be updated incrementally for each subsequent version. - * Again, applySchema() is used to upgrade the schema. Schema upgrades are - * in the upgrade_schemata array. Each entry by index position represents - * the database schema version from which the upgrade begins. Each upgrade - * sets the database to the next version. Schema upgrades can only safely - * happen from one version to the next. To upgrade several versions of schema, - * upgrade incrementally for each version that separates the current from the - * latest. For example, to upgrade from version 5 to version 6 of the schema, - * use upgrade_schemata[5]. To upgrade from version 1 to version 4, use - * upgrade_schemata[1], upgrade_schemata[2], and upgrade_schemata[3] in - * sequence. - * - * To upgrade the schema past version 1, the following variables must be - * updated: - * 1) LATEST_SCHEMA_VERSION must be set to the new version. - * 2) A new entry must be placed at the end of the full_schemata array. This - * entry should have the entire schema so that fresh installations can - * be performed with it. The index position must be equal to the - * LATEST_SCHEMA_VERSION. - * 3) A new entry must be placed at the end of the upgrade_schemata array. - * This entry should only contain commands to upgrade the schema from - * the immediately previous version to the new version. - * - * It is up to the developer to ensure that all schema commands are idempotent. - * This protects against 2 things: - * 1) Resuming schema installation after a problem. - * 2) Concurrent schema updates from multiple processes. - * - * There are several things that must considered for upgrading existing - * schemata to avoid stability and performance problems. Some examples and - * suggestions follow. - * - Schema changes such as creating new columns and indices can consume - * a lot of time. Therefore, before such changes, a separate script should - * be executed by the user to perform the schema upgrade prior to restarting - * rippled. - * - Stored functions cannot be dropped while being accessed. Also, - * dropping stored functions can be ambiguous if multiple functions with - * the same name but different signatures exist. Further, stored function - * behavior from one schema version to the other would likely be handled - * differently by rippled. In this case, it is likely that the functions - * themselves should be versioned such as by appending a number to the - * end of the name (abcf becomes abcf_2, abcf_3, etc.) - * - * Essentially, each schema upgrade will have its own factors to impact - * service availability and function. - */ - -#define LATEST_SCHEMA_VERSION 1 - -char const* version_query = R"( -CREATE TABLE IF NOT EXISTS version (version int NOT NULL, - fresh_pending int NOT NULL); - --- Version 0 means that no schema has been fully deployed. -DO $$ -BEGIN - IF NOT EXISTS (SELECT 1 FROM version) THEN - INSERT INTO version VALUES (0, 0); -END IF; -END $$; - --- Function to set the schema version. _in_pending should only be set to --- non-zero prior to an attempt to initialize the schema from scratch. --- After successful initialization, this should set to 0. --- _in_version should be set to the version of schema that has been applied --- once successful application has occurred. -CREATE OR REPLACE FUNCTION set_schema_version ( - _in_version int, - _in_pending int -) RETURNS void AS $$ -DECLARE - _current_version int; -BEGIN - IF _in_version IS NULL OR _in_pending IS NULL THEN RETURN; END IF; - IF EXISTS (SELECT 1 FROM version) THEN DELETE FROM version; END IF; - INSERT INTO version VALUES (_in_version, _in_pending); - RETURN; -END; -$$ LANGUAGE plpgsql; - --- PQexec() returns the output of the last statement in its response. -SELECT * FROM version; -)"; - -std::array full_schemata = { - // version 0: - "There is no such thing as schema version 0." - - // version 1: - , - R"( --- Table to store ledger headers. -CREATE TABLE IF NOT EXISTS ledgers ( - ledger_seq bigint PRIMARY KEY, - ledger_hash bytea NOT NULL, - prev_hash bytea NOT NULL, - total_coins bigint NOT NULL, - closing_time bigint NOT NULL, - prev_closing_time bigint NOT NULL, - close_time_res bigint NOT NULL, - close_flags bigint NOT NULL, - account_set_hash bytea NOT NULL, - trans_set_hash bytea NOT NULL -); - - -CREATE TABLE IF NOT EXISTS objects ( - key bytea NOT NULL, - ledger_seq bigint NOT NULL, - object bytea -) PARTITION BY RANGE (ledger_seq); - -CREATE INDEX objects_idx ON objects USING btree(key,ledger_seq); - - - --- Index for lookups by ledger hash. -CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers - USING hash (ledger_hash); - --- Transactions table. Deletes from the ledger table --- cascade here based on ledger_seq. -CREATE TABLE IF NOT EXISTS transactions ( - hash bytea NOT NULL, - ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, - date bigint, - transaction bytea NOT NULL, - metadata bytea NOT NULL -) PARTITION BY RANGE(ledger_seq); - -create index if not exists tx_by_hash on transactions using hash (hash); -create index if not exists tx_by_lgr_seq on transactions using hash (ledger_seq); - --- Table that maps accounts to transactions affecting them. Deletes from the --- ledger table cascade here based on ledger_seq. -CREATE TABLE IF NOT EXISTS account_transactions ( - account bytea NOT NULL, - ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, - transaction_index bigint NOT NULL, - hash bytea NOT NULL, - PRIMARY KEY (account, ledger_seq, transaction_index, hash) -) PARTITION BY RANGE (ledger_seq); - -CREATE TABLE IF NOT EXISTS successor ( - key bytea NOT NULL, - ledger_seq bigint NOT NULL, - next bytea NOT NULL, - PRIMARY KEY(key, ledger_seq) -) PARTITION BY RANGE(ledger_seq); - - - --- Avoid inadvertent administrative tampering with committed data. -CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO - ledgers DO INSTEAD NOTHING; -CREATE OR REPLACE RULE transactions_update_protect AS ON UPDATE TO - transactions DO INSTEAD NOTHING; -CREATE OR REPLACE RULE account_transactions_update_protect AS ON UPDATE TO - account_transactions DO INSTEAD NOTHING; -CREATE OR REPLACE RULE objects_update_protect AS ON UPDATE TO - objects DO INSTEAD NOTHING; - - --- Return the earliest ledger sequence intended for range operations --- that protect the bottom of the range from deletion. Return NULL if empty. -CREATE OR REPLACE FUNCTION min_ledger () RETURNS bigint AS $$ -DECLARE - _min_seq bigint := (SELECT ledger_seq from min_seq); -BEGIN - IF _min_seq IS NULL THEN - RETURN (SELECT ledger_seq FROM ledgers ORDER BY ledger_seq ASC LIMIT 1); - ELSE - RETURN _min_seq; - END IF; -END; -$$ LANGUAGE plpgsql; - --- Return the latest ledger sequence in the database, or NULL if empty. -CREATE OR REPLACE FUNCTION max_ledger () RETURNS bigint AS $$ -BEGIN - RETURN (SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1); -END; -$$ LANGUAGE plpgsql; - - --- Trigger prior to insert on ledgers table. Validates length of hash fields. --- Verifies ancestry based on ledger_hash & prev_hash as follows: --- 1) If ledgers is empty, allows insert. --- 2) For each new row, check for previous and later ledgers by a single --- sequence. For each that exist, confirm ancestry based on hashes. --- 3) Disallow inserts with no prior or next ledger by sequence if any --- ledgers currently exist. This disallows gaps to be introduced by --- way of inserting. -CREATE OR REPLACE FUNCTION insert_ancestry() RETURNS TRIGGER AS $$ -DECLARE - _parent bytea; - _child bytea; - _partition text; - _lowerBound bigint; - _upperBound bigint; - _interval bigint; -BEGIN - IF length(NEW.ledger_hash) != 32 OR length(NEW.prev_hash) != 32 THEN - RAISE 'ledger_hash and prev_hash must each be 32 bytes: %', NEW; - END IF; - - _interval = 1000000; - _lowerBound = (NEW.ledger_seq / _interval); - _partition= _lowerBound; - _lowerBound = _lowerBound * _interval; - _upperBound = _lowerBound + _interval; - - - EXECUTE format('create table if not exists public.objects_part_%s partition of public.objects for values from (%s) to (%s)',_partition,_lowerBound,_upperBound); - EXECUTE format('create table if not exists public.successor_part_%s partition of public.successor for values from (%s) to (%s)',_partition,_lowerBound,_upperBound); - EXECUTE format('create table if not exists public.transactions_part_%s partition of public.transactions for values from (%s) to (%s)',_partition,_lowerBound,_upperBound); - EXECUTE format('create table if not exists public.account_transactions_part_%s partition of public.account_transactions for values from (%s) to (%s)',_partition,_lowerBound,_upperBound); - - IF (SELECT ledger_hash - FROM public.ledgers - ORDER BY ledger_seq DESC - LIMIT 1) = NEW.prev_hash THEN RETURN NEW; END IF; - - IF NOT EXISTS (SELECT 1 FROM public.ledgers) THEN RETURN NEW; END IF; - - _parent := (SELECT ledger_hash - FROM public.ledgers - WHERE ledger_seq = NEW.ledger_seq - 1); - _child := (SELECT prev_hash - FROM public.ledgers - WHERE ledger_seq = NEW.ledger_seq + 1); - IF _parent IS NULL AND _child IS NULL THEN - RAISE 'Ledger Ancestry error: orphan.'; - END IF; - IF _parent != NEW.prev_hash THEN - RAISE 'Ledger Ancestry error: bad parent.'; - END IF; - IF _child != NEW.ledger_hash THEN - RAISE 'Ledger Ancestry error: bad child.'; - END IF; - - - RETURN NEW; -END; -$$ LANGUAGE plpgsql; -CREATE TRIGGER verify_ancestry BEFORE INSERT OR UPDATE on ledgers - FOR EACH ROW EXECUTE PROCEDURE insert_ancestry(); - --- Trigger function prior to delete on ledgers table. Disallow gaps from --- forming. Do not allow deletions if both the previous and next ledgers --- are present. In other words, only allow either the least or greatest --- to be deleted. -CREATE OR REPLACE FUNCTION delete_ancestry () RETURNS TRIGGER AS $$ -BEGIN - IF EXISTS (SELECT 1 - FROM ledgers - WHERE ledger_seq = OLD.ledger_seq + 1) - AND EXISTS (SELECT 1 - FROM ledgers - WHERE ledger_seq = OLD.ledger_seq - 1) THEN - RAISE 'Ledger Ancestry error: Can only delete the least or greatest ' - 'ledger.'; - END IF; - RETURN OLD; -END; -$$ LANGUAGE plpgsql; - --- Track the minimum sequence that should be used for ranged queries --- with protection against deletion during the query. This should --- be updated before calling online_delete() to not block deleting that --- range. -CREATE TABLE IF NOT EXISTS min_seq ( - ledger_seq bigint NOT NULL -); - --- Set the minimum sequence for use in ranged queries with protection --- against deletion greater than or equal to the input parameter. This --- should be called prior to online_delete() with the same parameter --- value so that online_delete() is not blocked by range queries --- that are protected against concurrent deletion of the ledger at --- the bottom of the range. This function needs to be called from a --- separate transaction from that which executes online_delete(). -CREATE OR REPLACE FUNCTION prepare_delete ( - _in_last_rotated bigint -) RETURNS void AS $$ -BEGIN - IF EXISTS (SELECT 1 FROM min_seq) THEN - DELETE FROM min_seq; - END IF; - INSERT INTO min_seq VALUES (_in_last_rotated + 1); -END; -$$ LANGUAGE plpgsql; - --- Function to delete old data. All data belonging to ledgers prior to and --- equal to the _in_seq parameter will be deleted. This should be --- called with the input parameter equivalent to the value of lastRotated --- in rippled's online_delete routine. -CREATE OR REPLACE FUNCTION online_delete ( - _in_seq bigint -) RETURNS void AS $$ -BEGIN - DELETE FROM LEDGERS WHERE ledger_seq <= _in_seq; -END; -$$ LANGUAGE plpgsql; - --- Function to delete data from the top of the ledger range. Delete --- everything greater than the input parameter. --- It doesn't do a normal range delete because of the trigger protecting --- deletions causing gaps. Instead, it walks back from the greatest ledger. -CREATE OR REPLACE FUNCTION delete_above ( - _in_seq bigint -) RETURNS void AS $$ -DECLARE - _max_seq bigint := max_ledger(); - _i bigint := _max_seq; -BEGIN - IF _max_seq IS NULL THEN RETURN; END IF; - LOOP - IF _i <= _in_seq THEN RETURN; END IF; - EXECUTE 'DELETE FROM ledgers WHERE ledger_seq = $1' USING _i; - _i := _i - 1; - END LOOP; -END; -$$ LANGUAGE plpgsql; - --- Verify correct ancestry of ledgers in database: --- Table to persist last-confirmed latest ledger with proper ancestry. -CREATE TABLE IF NOT EXISTS ancestry_verified ( - ledger_seq bigint NOT NULL -); - --- Function to verify ancestry of ledgers based on ledger_hash and prev_hash. --- Upon failure, returns ledger sequence failing ancestry check. --- Otherwise, returns NULL. --- _in_full: If TRUE, verify entire table. Else verify starting from --- value in ancestry_verfied table. If no value, then start --- from lowest ledger. --- _in_persist: If TRUE, persist the latest ledger with correct ancestry. --- If an exception was raised because of failure, persist --- the latest ledger prior to that which failed. --- _in_min: If set and _in_full is not true, the starting ledger from which --- to verify. --- _in_max: If set and _in_full is not true, the latest ledger to verify. -CREATE OR REPLACE FUNCTION check_ancestry ( - _in_full bool = FALSE, - _in_persist bool = TRUE, - _in_min bigint = NULL, - _in_max bigint = NULL -) RETURNS bigint AS $$ -DECLARE - _min bigint; - _max bigint; - _last_verified bigint; - _parent ledgers; - _current ledgers; - _cursor refcursor; -BEGIN - IF _in_full IS TRUE AND - (_in_min IS NOT NULL) OR (_in_max IS NOT NULL) THEN - RAISE 'Cannot specify manual range and do full check.'; - END IF; - - IF _in_min IS NOT NULL THEN - _min := _in_min; - ELSIF _in_full IS NOT TRUE THEN - _last_verified := (SELECT ledger_seq FROM ancestry_verified); - IF _last_verified IS NULL THEN - _min := min_ledger(); - ELSE - _min := _last_verified + 1; - END IF; - ELSE - _min := min_ledger(); - END IF; - EXECUTE 'SELECT * FROM ledgers WHERE ledger_seq = $1' - INTO _parent USING _min - 1; - IF _last_verified IS NOT NULL AND _parent IS NULL THEN - RAISE 'Verified ledger % doesn''t exist.', _last_verified; - END IF; - - IF _in_max IS NOT NULL THEN - _max := _in_max; - ELSE - _max := max_ledger(); - END IF; - - OPEN _cursor FOR EXECUTE 'SELECT * - FROM ledgers - WHERE ledger_seq BETWEEN $1 AND $2 - ORDER BY ledger_seq ASC' - USING _min, _max; - LOOP - FETCH _cursor INTO _current; - IF _current IS NULL THEN EXIT; END IF; - IF _parent IS NOT NULL THEN - IF _current.prev_hash != _parent.ledger_hash THEN - CLOSE _cursor; - RETURN _current.ledger_seq; - RAISE 'Ledger ancestry failure current, parent:% %', - _current, _parent; - END IF; - END IF; - _parent := _current; - END LOOP; - CLOSE _cursor; - - IF _in_persist IS TRUE AND _parent IS NOT NULL THEN - DELETE FROM ancestry_verified; - INSERT INTO ancestry_verified VALUES (_parent.ledger_seq); - END IF; - - RETURN NULL; -END; -$$ LANGUAGE plpgsql; - --- Return number of whole seconds since the latest ledger was inserted, based --- on ledger close time (not wall clock) of the insert. --- Note that ledgers.closing_time is number of seconds since the XRP --- epoch, which is 01/01/2000 00:00:00. This in turn is 946684800 seconds --- after the UNIX epoch. This conforms to the "age" field in the --- server_info RPC call. -CREATE OR REPLACE FUNCTION age () RETURNS bigint AS $$ -BEGIN - RETURN (EXTRACT(EPOCH FROM (now())) - - (946684800 + (SELECT closing_time - FROM ledgers - ORDER BY ledger_seq DESC - LIMIT 1)))::bigint; -END; -$$ LANGUAGE plpgsql; - --- Return range of ledgers, or empty if none. This conforms to the --- "complete_ledgers" field of the server_info RPC call. Note --- that ledger gaps are prevented for reporting mode so the range --- is simply the set between the least and greatest ledgers. -CREATE OR REPLACE FUNCTION complete_ledgers () RETURNS text AS $$ -DECLARE - _min bigint := min_ledger(); - _max bigint := max_ledger(); -BEGIN - IF _min IS NULL THEN RETURN 'empty'; END IF; - IF _min = _max THEN RETURN _min; END IF; - RETURN _min || '-' || _max; -END; -$$ LANGUAGE plpgsql; - -)" - - // version 2: - // , R"(Full idempotent text of schema version 2)" - - // version 3: - // , R"(Full idempotent text of schema version 3)" - - // version 4: - // , R"(Full idempotent text of schema version 4)" - - // ... - - // version n: - // , R"(Full idempotent text of schema version n)" -}; - -static constexpr char const* accountTxSchema = R"( - --- account_tx() RPC helper. From the rippled reporting process, only the --- parameters without defaults are required. For the parameters with --- defaults, validation should be done by rippled, such as: --- _in_account_id should be a valid xrp base58 address. --- _in_forward either true or false according to the published api --- _in_limit should be validated and not simply passed through from --- client. --- --- For _in_ledger_index_min and _in_ledger_index_max, if passed in the --- request, verify that their type is int and pass through as is. --- For _ledger_hash, verify and convert from hex length 32 bytes and --- prepend with \x (\\x C++). --- --- For _in_ledger_index, if the input type is integer, then pass through --- as is. If the type is string and contents = validated, then do not --- set _in_ledger_index. Instead set _in_invalidated to TRUE. --- --- There is no need for rippled to do any type of lookup on max/min --- ledger range, lookup of hash, or the like. This functions does those --- things, including error responses if bad input. Only the above must --- be done to set the correct search range. --- --- If a marker is present in the request, verify the members 'ledger' --- and 'seq' are integers and they correspond to _in_marker_seq --- _in_marker_index. --- To reiterate: --- JSON input field 'ledger' corresponds to _in_marker_seq --- JSON input field 'seq' corresponds to _in_marker_index -CREATE OR REPLACE FUNCTION account_tx( - _in_account_id bytea, - _in_limit bigint, - _in_forward bool, - _in_marker_seq bigint DEFAULT NULL::bigint, - _in_marker_index bigint DEFAULT NULL::bigint) -RETURNS jsonb -AS $$ -DECLARE - _min bigint; - _max bigint; - _marker bool; - _between_min bigint; - _between_max bigint; - _sql text; - _cursor refcursor; - _result jsonb; - _record record; - _tally bigint := 0; - _ret_marker jsonb; - _transactions jsonb[] := '{}'; - _sort_order text := (SELECT CASE WHEN _in_forward IS TRUE THEN - 'ASC' ELSE 'DESC' END); -BEGIN - _min := min_ledger(); - _max := max_ledger(); - IF _in_marker_seq IS NOT NULL OR _in_marker_index IS NOT NULL THEN - _marker := TRUE; - IF _in_marker_seq IS NULL OR _in_marker_index IS NULL THEN - -- The rippled implementation returns no transaction results - -- if either of these values are missing. - _between_min := 0; - _between_max := 0; - ELSE - _between_min := _min; - _between_max := _in_marker_seq; - END IF; - ELSE - _marker := FALSE; - _between_min := _min; - _between_max := _max; - END IF; - - - _sql := format('SELECT hash, ledger_seq, transaction_index FROM account_transactions WHERE account = $1 - AND ledger_seq BETWEEN $2 AND $3 ORDER BY ledger_seq %s, transaction_index %s',_sort_order,_sort_order); - - OPEN _cursor FOR EXECUTE _sql USING _in_account_id, _between_min, _between_max; - LOOP - FETCH _cursor INTO _record; - IF _record IS NULL THEN EXIT; END IF; - IF _marker IS TRUE THEN - IF _in_marker_seq = _record.ledger_seq THEN - IF _in_forward IS TRUE THEN - IF _in_marker_index > _record.transaction_index THEN - CONTINUE; - END IF; - ELSE - IF _in_marker_index < _record.transaction_index THEN - CONTINUE; - END IF; - END IF; - END IF; - _marker := FALSE; - END IF; - _tally := _tally + 1; - IF _tally > _in_limit THEN - _ret_marker := jsonb_build_object( - 'ledger_sequence', _record.ledger_seq, - 'transaction_index', _record.transaction_index); - EXIT; - END IF; - - -- Is the transaction index in the tx object? - _transactions := _transactions || jsonb_build_object('hash',_record.hash); - END LOOP; - CLOSE _cursor; - - _result := jsonb_build_object('ledger_index_min', _min, - 'ledger_index_max', _max, - 'transactions', _transactions); - IF _ret_marker IS NOT NULL THEN - _result := _result || jsonb_build_object('cursor', _ret_marker); - END IF; - RETURN _result; -END; -$$ LANGUAGE plpgsql; - -)"; -std::array upgrade_schemata = { - // upgrade from version 0: - "There is no upgrade path from version 0. Instead, install " - "from full_schemata." - // upgrade from version 1 to 2: - //, R"(Text to idempotently upgrade from version 1 to 2)" - // upgrade from version 2 to 3: - //, R"(Text to idempotently upgrade from version 2 to 3)" - // upgrade from version 3 to 4: - //, R"(Text to idempotently upgrade from version 3 to 4)" - // ... - // upgrade from version n-1 to n: - //, R"(Text to idempotently upgrade from version n-1 to n)" -}; - -/** Apply schema to postgres. - * - * The schema text should contain idempotent SQL & plpgSQL statements. - * Once completed, the version of the schema will be persisted. - * - * Throws upon error. - * - * @param pool Postgres connection pool manager. - * @param schema SQL commands separated by semi-colon. - * @param currentVersion The current version of the schema on the database. - * @param schemaVersion The version that will be in place once the schema - * has been applied. - */ -void -applySchema( - std::shared_ptr const& pool, - char const* schema, - std::uint32_t const currentVersion, - std::uint32_t const schemaVersion) -{ - if (currentVersion != 0 && schemaVersion != currentVersion + 1) - { - assert(false); - std::stringstream ss; - ss << "Schema upgrade versions past initial deployment must increase " - "monotonically. Versions: current, target: " - << currentVersion << ", " << schemaVersion; - throw std::runtime_error(ss.str()); - } - - PgResult res; - Backend::synchronous([&](boost::asio::yield_context yield) { - res = PgQuery(pool)(schema, yield); - }); - - if (!res) - { - std::stringstream ss; - ss << "Error applying schema from version " << currentVersion << "to " - << schemaVersion << ": " << res.msg(); - throw std::runtime_error(ss.str()); - } - - auto cmd = boost::format(R"(SELECT set_schema_version(%u, 0))"); - Backend::synchronous([&](boost::asio::yield_context yield) { - res = PgQuery(pool)(boost::str(cmd % schemaVersion).c_str(), yield); - }); - - if (!res) - { - std::stringstream ss; - ss << "Error setting schema version from " << currentVersion << " to " - << schemaVersion << ": " << res.msg(); - throw std::runtime_error(ss.str()); - } -} - -void -initAccountTx(std::shared_ptr const& pool) -{ - PgResult res; - Backend::synchronous([&](boost::asio::yield_context yield) { - res = PgQuery(pool)(accountTxSchema, yield); - }); - - if (!res) - { - std::stringstream ss; - ss << "Error initializing account_tx stored procedure"; - throw std::runtime_error(ss.str()); - } -} - -void -initSchema(std::shared_ptr const& pool) -{ - // Figure out what schema version, if any, is already installed. - PgResult res; - Backend::synchronous([&](boost::asio::yield_context yield) { - res = PgQuery(pool)(version_query, yield); - }); - - if (!res) - { - std::stringstream ss; - ss << "Error getting database schema version: " << res.msg(); - throw std::runtime_error(ss.str()); - } - std::uint32_t currentSchemaVersion = res.asInt(); - std::uint32_t const pendingSchemaVersion = res.asInt(0, 1); - - // Nothing to do if we are on the latest schema; - if (currentSchemaVersion == LATEST_SCHEMA_VERSION) - return; - - if (currentSchemaVersion == 0) - { - // If a fresh install has not been completed, then re-attempt - // the install of the same schema version. - std::uint32_t const freshVersion = - pendingSchemaVersion ? pendingSchemaVersion : LATEST_SCHEMA_VERSION; - // Persist that we are attempting a fresh install to the latest version. - // This protects against corruption in an aborted install that is - // followed by a fresh installation attempt with a new schema. - auto cmd = boost::format(R"(SELECT set_schema_version(0, %u))"); - Backend::synchronous([&](boost::asio::yield_context yield) { - res = PgQuery(pool)(boost::str(cmd % freshVersion).c_str(), yield); - }); - - if (!res) - { - std::stringstream ss; - ss << "Error setting schema version from " << currentSchemaVersion - << " to " << freshVersion << ": " << res.msg(); - throw std::runtime_error(ss.str()); - } - - // Install the full latest schema. - applySchema( - pool, - full_schemata[freshVersion], - currentSchemaVersion, - freshVersion); - currentSchemaVersion = freshVersion; - } - - // Incrementally upgrade one version at a time until latest. - for (; currentSchemaVersion < LATEST_SCHEMA_VERSION; ++currentSchemaVersion) - { - applySchema( - pool, - upgrade_schemata[currentSchemaVersion], - currentSchemaVersion, - currentSchemaVersion + 1); - } -} - -// Load the ledger info for the specified ledger/s from the database -// @param whichLedger specifies the ledger to load via ledger sequence, ledger -// hash, a range of ledgers, or std::monostate (which loads the most recent) -// @return LedgerInfo -std::optional -getLedger( - boost::asio::yield_context yield, - std::variant const& - whichLedger, - std::shared_ptr& pgPool) -{ - std::stringstream sql; - sql << "SELECT ledger_hash, prev_hash, account_set_hash, trans_set_hash, " - "total_coins, closing_time, prev_closing_time, close_time_res, " - "close_flags, ledger_seq FROM ledgers "; - - if (auto ledgerSeq = std::get_if(&whichLedger)) - { - sql << "WHERE ledger_seq = " + std::to_string(*ledgerSeq); - } - else if (auto ledgerHash = std::get_if(&whichLedger)) - { - sql << ("WHERE ledger_hash = \'\\x" + strHex(*ledgerHash) + "\'"); - } - else - { - sql << ("ORDER BY ledger_seq desc LIMIT 1"); - } - sql << ";"; - - BOOST_LOG_TRIVIAL(trace) << __func__ << " : sql = " << sql.str(); - - auto res = PgQuery(pgPool)(sql.str().data(), yield); - if (!res) - { - BOOST_LOG_TRIVIAL(error) - << __func__ << " : Postgres response is null - sql = " << sql.str(); - assert(false); - return {}; - } - else if (res.status() != PGRES_TUPLES_OK) - { - BOOST_LOG_TRIVIAL(error) << __func__ - << " : Postgres response should have been " - "PGRES_TUPLES_OK but instead was " - << res.status() << " - msg = " << res.msg() - << " - sql = " << sql.str(); - assert(false); - return {}; - } - - BOOST_LOG_TRIVIAL(trace) - << __func__ << " Postgres result msg : " << res.msg(); - - if (res.isNull() || res.ntuples() == 0) - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " : Ledger not found. sql = " << sql.str(); - return {}; - } - else if (res.ntuples() > 0) - { - if (res.nfields() != 10) - { - BOOST_LOG_TRIVIAL(error) - << __func__ - << " : Wrong number of fields in Postgres " - "response. Expected 10, but got " - << res.nfields() << " . sql = " << sql.str(); - assert(false); - return {}; - } - } - - char const* hash = res.c_str(0, 0); - char const* prevHash = res.c_str(0, 1); - char const* accountHash = res.c_str(0, 2); - char const* txHash = res.c_str(0, 3); - std::int64_t totalCoins = res.asBigInt(0, 4); - std::int64_t closeTime = res.asBigInt(0, 5); - std::int64_t parentCloseTime = res.asBigInt(0, 6); - std::int64_t closeTimeRes = res.asBigInt(0, 7); - std::int64_t closeFlags = res.asBigInt(0, 8); - std::int64_t ledgerSeq = res.asBigInt(0, 9); - - BOOST_LOG_TRIVIAL(trace) - << __func__ << " - Postgres response = " << hash << " , " << prevHash - << " , " << accountHash << " , " << txHash << " , " << totalCoins - << ", " << closeTime << ", " << parentCloseTime << ", " << closeTimeRes - << ", " << closeFlags << ", " << ledgerSeq << " - sql = " << sql.str(); - BOOST_LOG_TRIVIAL(debug) - << __func__ - << " - Successfully fetched ledger with sequence = " << ledgerSeq - << " from Postgres"; - - using time_point = ripple::NetClock::time_point; - using duration = ripple::NetClock::duration; - - ripple::LedgerInfo info; - if (!info.parentHash.parseHex(prevHash + 2)) - assert(false); - if (!info.txHash.parseHex(txHash + 2)) - assert(false); - if (!info.accountHash.parseHex(accountHash + 2)) - assert(false); - info.drops = totalCoins; - info.closeTime = time_point{duration{closeTime}}; - info.parentCloseTime = time_point{duration{parentCloseTime}}; - info.closeFlags = closeFlags; - info.closeTimeResolution = duration{closeTimeRes}; - info.seq = ledgerSeq; - if (!info.hash.parseHex(hash + 2)) - assert(false); - info.validated = true; - - return info; -} diff --git a/src/backend/Pg.h b/src/backend/Pg.h deleted file mode 100644 index 361fb822..00000000 --- a/src/backend/Pg.h +++ /dev/null @@ -1,564 +0,0 @@ -#ifndef RIPPLE_CORE_PG_H_INCLUDED -#define RIPPLE_CORE_PG_H_INCLUDED - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// These postgres structs must be freed only by the postgres API. -using pg_result_type = std::unique_ptr; -using pg_connection_type = std::unique_ptr; -using asio_socket_type = std::unique_ptr< - boost::asio::ip::tcp::socket, - void (*)(boost::asio::ip::tcp::socket*)>; - -/** first: command - * second: parameter values - * - * The 2nd member takes an optional string to - * distinguish between NULL parameters and empty strings. An empty - * item corresponds to a NULL parameter. - * - * Postgres reads each parameter as a c-string, regardless of actual type. - * Binary types (bytea) need to be converted to hex and prepended with - * \x ("\\x"). - */ -using pg_params = - std::pair>>; - -/** Parameter values for pg API. */ -using pg_formatted_params = std::vector; - -/** Parameters for managing postgres connections. */ -struct PgConfig -{ - /** Maximum connections allowed to db. */ - std::size_t max_connections{1000}; - /** Close idle connections past this duration. */ - std::chrono::seconds timeout{600}; - - /** Index of DB connection parameter names. */ - std::vector keywordsIdx; - /** DB connection parameter names. */ - std::vector keywords; - /** Index of DB connection parameter values. */ - std::vector valuesIdx; - /** DB connection parameter values. */ - std::vector values; -}; - -//----------------------------------------------------------------------------- - -/** Class that operates on postgres query results. - * - * The functions that return results do not check first whether the - * expected results are actually there. Therefore, the caller first needs - * to check whether or not a valid response was returned using the operator - * bool() overload. If number of tuples or fields are unknown, then check - * those. Each result field should be checked for null before attempting - * to return results. Finally, the caller must know the type of the field - * before calling the corresponding function to return a field. Postgres - * internally stores each result field as null-terminated strings. - */ -class PgResult -{ - // The result object must be freed using the libpq API PQclear() call. - pg_result_type result_{nullptr, [](PGresult* result) { PQclear(result); }}; - std::optional> error_; - -public: - /** Constructor for when the process is stopping. - * - */ - PgResult() - { - } - - /** Constructor for successful query results. - * - * @param result Query result. - */ - explicit PgResult(pg_result_type&& result) : result_(std::move(result)) - { - } - - /** Constructor for failed query results. - * - * @param result Query result that contains error information. - * @param conn Postgres connection that contains error information. - */ - PgResult(PGresult* result, PGconn* conn) - : error_({PQresultStatus(result), PQerrorMessage(conn)}) - { - } - - /** Return field as a null-terminated string pointer. - * - * Note that this function does not guarantee that the result struct - * exists, or that the row and fields exist, or that the field is - * not null. - * - * @param ntuple Row number. - * @param nfield Field number. - * @return Field contents. - */ - char const* - c_str(int ntuple = 0, int nfield = 0) const - { - return PQgetvalue(result_.get(), ntuple, nfield); - } - - std::vector - asUnHexedBlob(int ntuple = 0, int nfield = 0) const - { - std::string_view view{c_str(ntuple, nfield) + 2}; - auto res = ripple::strUnHex(view.size(), view.cbegin(), view.cend()); - if (res) - return *res; - return {}; - } - - ripple::uint256 - asUInt256(int ntuple = 0, int nfield = 0) const - { - ripple::uint256 val; - if (!val.parseHex(c_str(ntuple, nfield) + 2)) - throw std::runtime_error("Pg - failed to parse hex into uint256"); - return val; - } - - /** Return field as equivalent to Postgres' INT type (32 bit signed). - * - * Note that this function does not guarantee that the result struct - * exists, or that the row and fields exist, or that the field is - * not null, or that the type is that requested. - - * @param ntuple Row number. - * @param nfield Field number. - * @return Field contents. - */ - std::int32_t - asInt(int ntuple = 0, int nfield = 0) const - { - return boost::lexical_cast( - PQgetvalue(result_.get(), ntuple, nfield)); - } - - /** Return field as equivalent to Postgres' BIGINT type (64 bit signed). - * - * Note that this function does not guarantee that the result struct - * exists, or that the row and fields exist, or that the field is - * not null, or that the type is that requested. - - * @param ntuple Row number. - * @param nfield Field number. - * @return Field contents. - */ - std::int64_t - asBigInt(int ntuple = 0, int nfield = 0) const - { - return boost::lexical_cast( - PQgetvalue(result_.get(), ntuple, nfield)); - } - - /** Returns whether the field is NULL or not. - * - * Note that this function does not guarantee that the result struct - * exists, or that the row and fields exist. - * - * @param ntuple Row number. - * @param nfield Field number. - * @return Whether field is NULL. - */ - bool - isNull(int ntuple = 0, int nfield = 0) const - { - return PQgetisnull(result_.get(), ntuple, nfield); - } - - /** Check whether a valid response occurred. - * - * @return Whether or not the query returned a valid response. - */ - operator bool() const - { - return result_ != nullptr; - } - - /** Message describing the query results suitable for diagnostics. - * - * If error, then the postgres error type and message are returned. - * Otherwise, "ok" - * - * @return Query result message. - */ - std::string - msg() const; - - /** Get number of rows in result. - * - * Note that this function does not guarantee that the result struct - * exists. - * - * @return Number of result rows. - */ - int - ntuples() const - { - return PQntuples(result_.get()); - } - - /** Get number of fields in result. - * - * Note that this function does not guarantee that the result struct - * exists. - * - * @return Number of result fields. - */ - int - nfields() const - { - return PQnfields(result_.get()); - } - - /** Return result status of the command. - * - * Note that this function does not guarantee that the result struct - * exists. - * - * @return - */ - ExecStatusType - status() const - { - return PQresultStatus(result_.get()); - } -}; - -/* Class that contains and operates upon a postgres connection. */ -class Pg -{ - friend class PgPool; - friend class PgQuery; - - PgConfig const& config_; - boost::asio::io_context::strand strand_; - - asio_socket_type socket_{nullptr, [](boost::asio::ip::tcp::socket*) {}}; - - // The connection object must be freed using the libpq API PQfinish() call. - pg_connection_type conn_{nullptr, [](PGconn* conn) { PQfinish(conn); }}; - - inline asio_socket_type - getSocket(boost::asio::yield_context& strand); - - inline PgResult - waitForStatus(boost::asio::yield_context& yield, ExecStatusType expected); - - inline void - flush(boost::asio::yield_context& yield); - - /** Clear results from the connection. - * - * Results from previous commands must be cleared before new commands - * can be processed. This function should be called on connections - * that weren't processed completely before being reused, such as - * when being checked-in. - * - * @return whether or not connection still exists. - */ - bool - clear(); - - /** Connect to postgres. - * - * Idempotently connects to postgres by first checking whether an - * existing connection is already present. If connection is not present - * or in an errored state, reconnects to the database. - */ - void - connect(boost::asio::yield_context& yield); - - /** Disconnect from postgres. */ - void - disconnect() - { - conn_.reset(); - socket_.reset(); - } - - /** Execute postgres query. - * - * If parameters are included, then the command should contain only a - * single SQL statement. If no parameters, then multiple SQL statements - * delimited by semi-colons can be processed. The response is from - * the last command executed. - * - * @param command postgres API command string. - * @param nParams postgres API number of parameters. - * @param values postgres API array of parameter. - * @return Query result object. - */ - PgResult - query( - char const* command, - std::size_t const nParams, - char const* const* values, - boost::asio::yield_context& yield); - - /** Execute postgres query with no parameters. - * - * @param command Query string. - * @return Query result object; - */ - PgResult - query(char const* command, boost::asio::yield_context& yield) - { - return query(command, 0, nullptr, yield); - } - - /** Execute postgres query with parameters. - * - * @param dbParams Database command and parameter values. - * @return Query result object. - */ - PgResult - query(pg_params const& dbParams, boost::asio::yield_context& yield); - - /** Insert multiple records into a table using Postgres' bulk COPY. - * - * Throws upon error. - * - * @param table Name of table for import. - * @param records Records in the COPY IN format. - */ - void - bulkInsert( - char const* table, - std::string const& records, - boost::asio::yield_context& yield); - -public: - /** Constructor for Pg class. - * - * @param config Config parameters. - * @param j Logger object. - */ - Pg(PgConfig const& config, boost::asio::io_context& ctx) - : config_(config), strand_(ctx) - { - } -}; - -//----------------------------------------------------------------------------- - -/** Database connection pool. - * - * Allow re-use of postgres connections. Postgres connections are created - * as needed until configurable limit is reached. After use, each connection - * is placed in a container ordered by time of use. Each request for - * a connection grabs the most recently used connection from the container. - * If none are available, a new connection is used (up to configured limit). - * Idle connections are destroyed periodically after configurable - * timeout duration. - * - * This should be stored as a shared pointer so PgQuery objects can safely - * outlive it. - */ -class PgPool -{ - friend class PgQuery; - - using clock_type = std::chrono::steady_clock; - - boost::asio::io_context& ioc_; - PgConfig config_; - std::mutex mutex_; - std::condition_variable cond_; - std::size_t connections_{}; - bool stop_{false}; - - /** Idle database connections ordered by timestamp to allow timing out. */ - std::multimap, std::unique_ptr> - idle_; - - /** Get a postgres connection object. - * - * Return the most recent idle connection in the pool, if available. - * Otherwise, return a new connection unless we're at the threshold. - * If so, then wait until a connection becomes available. - * - * @return Postgres object. - */ - std::unique_ptr - checkout(); - - /** Return a postgres object to the pool for reuse. - * - * If connection is healthy, place in pool for reuse. After calling this, - * the container no longer have a connection unless checkout() is called. - * - * @param pg Pg object. - */ - void - checkin(std::unique_ptr& pg); - -public: - /** Connection pool constructor. - * - * @param pgConfig Postgres config. - * @param j Logger object. - * @param parent Stoppable parent. - */ - PgPool(boost::asio::io_context& ioc, boost::json::object const& config); - - ~PgPool() - { - onStop(); - } - - PgConfig& - config() - { - return config_; - } - - /** Initiate idle connection timer. - * - * The PgPool object needs to be fully constructed to support asynchronous - * operations. - */ - void - setup(); - - /** Prepare for process shutdown. (Stoppable) */ - void - onStop(); -}; - -//----------------------------------------------------------------------------- - -/** Class to query postgres. - * - * This class should be used by functions outside of this - * compilation unit for querying postgres. It automatically acquires and - * relinquishes a database connection to handle each query. - */ -class PgQuery -{ -private: - std::shared_ptr pool_; - std::unique_ptr pg_; - -public: - PgQuery() = delete; - - PgQuery(std::shared_ptr const& pool) - : pool_(pool), pg_(pool->checkout()) - { - } - - ~PgQuery() - { - pool_->checkin(pg_); - } - - // TODO. add sendQuery and getResult, for sending the query and getting the - // result asynchronously. This could be useful for sending a bunch of - // requests concurrently - - /** Execute postgres query with parameters. - * - * @param dbParams Database command with parameters. - * @return Result of query, including errors. - */ - PgResult - operator()(pg_params const& dbParams, boost::asio::yield_context& yield) - { - if (!pg_) // It means we're stopping. Return empty result. - return PgResult(); - return pg_->query(dbParams, yield); - } - - /** Execute postgres query with only command statement. - * - * @param command Command statement. - * @return Result of query, including errors. - */ - PgResult - operator()(char const* command, boost::asio::yield_context& yield) - { - return operator()(pg_params{command, {}}, yield); - } - - /** Insert multiple records into a table using Postgres' bulk COPY. - * - * Throws upon error. - * - * @param table Name of table for import. - * @param records Records in the COPY IN format. - */ - void - bulkInsert( - char const* table, - std::string const& records, - boost::asio::yield_context& yield) - { - pg_->bulkInsert(table, records, yield); - } -}; - -//----------------------------------------------------------------------------- - -/** Create Postgres connection pool manager. - * - * @param pgConfig Configuration for Postgres. - * @param j Logger object. - * @param parent Stoppable parent object. - * @return Postgres connection pool manager - */ -std::shared_ptr -make_PgPool(boost::asio::io_context& ioc, boost::json::object const& pgConfig); - -/** Initialize the Postgres schema. - * - * This function ensures that the database is running the latest version - * of the schema. - * - * @param pool Postgres connection pool manager. - */ -void -initSchema(std::shared_ptr const& pool); -void -initAccountTx(std::shared_ptr const& pool); - -// Load the ledger info for the specified ledger/s from the database -// @param whichLedger specifies the ledger to load via ledger sequence, ledger -// hash or std::monostate (which loads the most recent) -// @return vector of LedgerInfos -std::optional -getLedger( - std::variant const& - whichLedger, - std::shared_ptr& pgPool); - -#endif // RIPPLE_CORE_PG_H_INCLUDED diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp deleted file mode 100644 index be67e4c3..00000000 --- a/src/backend/PostgresBackend.cpp +++ /dev/null @@ -1,895 +0,0 @@ -#include -#include -#include -#include - -namespace Backend { - -// Type alias for async completion handlers -using completion_token = boost::asio::yield_context; -using function_type = void(boost::system::error_code); -using result_type = boost::asio::async_result; -using handler_type = typename result_type::completion_handler_type; - -struct HandlerWrapper -{ - handler_type handler; - - HandlerWrapper(handler_type&& handler_) : handler(std::move(handler_)) - { - } -}; - -PostgresBackend::PostgresBackend( - boost::asio::io_context& ioc, - boost::json::object const& config) - : BackendInterface(config) - , pgPool_(make_PgPool(ioc, config)) - , writeConnection_(pgPool_) -{ - if (config.contains("write_interval")) - { - writeInterval_ = config.at("write_interval").as_int64(); - } -} -void -PostgresBackend::writeLedger( - ripple::LedgerInfo const& ledgerInfo, - std::string&& ledgerHeader) -{ - synchronous([&](boost::asio::yield_context yield) { - auto cmd = boost::format( - R"(INSERT INTO ledgers - VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))"); - - auto ledgerInsert = boost::str( - cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) % - ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() % - ledgerInfo.closeTime.time_since_epoch().count() % - ledgerInfo.parentCloseTime.time_since_epoch().count() % - ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags % - ripple::strHex(ledgerInfo.accountHash) % - ripple::strHex(ledgerInfo.txHash)); - - auto res = writeConnection_(ledgerInsert.data(), yield); - abortWrite_ = !res; - inProcessLedger = ledgerInfo.seq; - }); -} - -void -PostgresBackend::writeAccountTransactions( - std::vector&& data) -{ - if (abortWrite_) - return; - PgQuery pg(pgPool_); - for (auto const& record : data) - { - for (auto const& a : record.accounts) - { - std::string acct = ripple::strHex(a); - accountTxBuffer_ << "\\\\x" << acct << '\t' - << std::to_string(record.ledgerSequence) << '\t' - << std::to_string(record.transactionIndex) << '\t' - << "\\\\x" << ripple::strHex(record.txHash) - << '\n'; - } - } -} - -void -PostgresBackend::writeNFTTransactions(std::vector&& data) -{ - throw std::runtime_error("Not implemented"); -} - -void -PostgresBackend::doWriteLedgerObject( - std::string&& key, - std::uint32_t const seq, - std::string&& blob) -{ - synchronous([&](boost::asio::yield_context yield) { - if (abortWrite_) - return; - objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t' - << std::to_string(seq) << '\t' << "\\\\x" - << ripple::strHex(blob) << '\n'; - numRowsInObjectsBuffer_++; - // If the buffer gets too large, the insert fails. Not sure why. So we - // insert after 1 million records - if (numRowsInObjectsBuffer_ % writeInterval_ == 0) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " Flushing large buffer. num objects = " - << numRowsInObjectsBuffer_; - writeConnection_.bulkInsert("objects", objectsBuffer_.str(), yield); - BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer"; - objectsBuffer_.str(""); - } - }); -} - -void -PostgresBackend::writeSuccessor( - std::string&& key, - std::uint32_t const seq, - std::string&& successor) -{ - synchronous([&](boost::asio::yield_context yield) { - if (range) - { - if (successors_.count(key) > 0) - return; - successors_.insert(key); - } - successorBuffer_ << "\\\\x" << ripple::strHex(key) << '\t' - << std::to_string(seq) << '\t' << "\\\\x" - << ripple::strHex(successor) << '\n'; - BOOST_LOG_TRIVIAL(trace) - << __func__ << ripple::strHex(key) << " - " << std::to_string(seq); - numRowsInSuccessorBuffer_++; - if (numRowsInSuccessorBuffer_ % writeInterval_ == 0) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " Flushing large buffer. num successors = " - << numRowsInSuccessorBuffer_; - writeConnection_.bulkInsert( - "successor", successorBuffer_.str(), yield); - BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer"; - successorBuffer_.str(""); - } - }); -} - -void -PostgresBackend::writeTransaction( - std::string&& hash, - std::uint32_t const seq, - std::uint32_t const date, - std::string&& transaction, - std::string&& metadata) -{ - if (abortWrite_) - return; - transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t' - << std::to_string(seq) << '\t' << std::to_string(date) - << '\t' << "\\\\x" << ripple::strHex(transaction) - << '\t' << "\\\\x" << ripple::strHex(metadata) << '\n'; -} - -void -PostgresBackend::writeNFTs(std::vector&& data) -{ - throw std::runtime_error("Not implemented"); -} - -std::uint32_t -checkResult(PgResult const& res, std::uint32_t const numFieldsExpected) -{ - if (!res) - { - auto msg = res.msg(); - BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg; - if (msg.find("statement timeout")) - throw DatabaseTimeout(); - assert(false); - throw DatabaseTimeout(); - } - if (res.status() != PGRES_TUPLES_OK) - { - std::stringstream msg; - msg << " : Postgres response should have been " - "PGRES_TUPLES_OK but instead was " - << res.status() << " - msg = " << res.msg(); - BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg.str(); - assert(false); - throw DatabaseTimeout(); - } - - BOOST_LOG_TRIVIAL(trace) - << __func__ << " Postgres result msg : " << res.msg(); - if (res.isNull() || res.ntuples() == 0) - { - return 0; - } - else if (res.ntuples() > 0) - { - if (res.nfields() != numFieldsExpected) - { - std::stringstream msg; - msg << "Wrong number of fields in Postgres " - "response. Expected " - << numFieldsExpected << ", but got " << res.nfields(); - throw std::runtime_error(msg.str()); - assert(false); - } - } - return res.ntuples(); -} - -ripple::LedgerInfo -parseLedgerInfo(PgResult const& res) -{ - std::int64_t ledgerSeq = res.asBigInt(0, 0); - ripple::uint256 hash = res.asUInt256(0, 1); - ripple::uint256 prevHash = res.asUInt256(0, 2); - std::int64_t totalCoins = res.asBigInt(0, 3); - std::int64_t closeTime = res.asBigInt(0, 4); - std::int64_t parentCloseTime = res.asBigInt(0, 5); - std::int64_t closeTimeRes = res.asBigInt(0, 6); - std::int64_t closeFlags = res.asBigInt(0, 7); - ripple::uint256 accountHash = res.asUInt256(0, 8); - ripple::uint256 txHash = res.asUInt256(0, 9); - - using time_point = ripple::NetClock::time_point; - using duration = ripple::NetClock::duration; - - ripple::LedgerInfo info; - info.seq = ledgerSeq; - info.hash = hash; - info.parentHash = prevHash; - info.drops = totalCoins; - info.closeTime = time_point{duration{closeTime}}; - info.parentCloseTime = time_point{duration{parentCloseTime}}; - info.closeFlags = closeFlags; - info.closeTimeResolution = duration{closeTimeRes}; - info.accountHash = accountHash; - info.txHash = txHash; - info.validated = true; - return info; -} -std::optional -PostgresBackend::fetchLatestLedgerSequence( - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - auto const query = - "SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1"; - - if (auto res = pgQuery(query, yield); checkResult(res, 1)) - return res.asBigInt(0, 0); - - return {}; -} - -std::optional -PostgresBackend::fetchLedgerBySequence( - std::uint32_t const sequence, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT * FROM ledgers WHERE ledger_seq = " - << std::to_string(sequence); - - if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 10)) - return parseLedgerInfo(res); - - return {}; -} - -std::optional -PostgresBackend::fetchLedgerByHash( - ripple::uint256 const& hash, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT * FROM ledgers WHERE ledger_hash = \'\\x" - << ripple::to_string(hash) << "\'"; - - if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 10)) - return parseLedgerInfo(res); - - return {}; -} - -std::optional -PostgresBackend::hardFetchLedgerRange(boost::asio::yield_context& yield) const -{ - auto range = PgQuery(pgPool_)("SELECT complete_ledgers()", yield); - if (!range) - return {}; - - std::string res{range.c_str()}; - BOOST_LOG_TRIVIAL(debug) << "range is = " << res; - try - { - size_t minVal = 0; - size_t maxVal = 0; - if (res == "empty" || res == "error" || res.empty()) - return {}; - else if (size_t delim = res.find('-'); delim != std::string::npos) - { - minVal = std::stol(res.substr(0, delim)); - maxVal = std::stol(res.substr(delim + 1)); - } - else - { - minVal = maxVal = std::stol(res); - } - return LedgerRange{minVal, maxVal}; - } - catch (std::exception&) - { - BOOST_LOG_TRIVIAL(error) - << __func__ << " : " - << "Error parsing result of getCompleteLedgers()"; - } - return {}; -} - -std::optional -PostgresBackend::doFetchLedgerObject( - ripple::uint256 const& key, - std::uint32_t const sequence, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT object FROM objects WHERE key = " - << "\'\\x" << ripple::strHex(key) << "\'" - << " AND ledger_seq <= " << std::to_string(sequence) - << " ORDER BY ledger_seq DESC LIMIT 1"; - - if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 1)) - { - auto blob = res.asUnHexedBlob(0, 0); - if (blob.size()) - return blob; - } - - return {}; -} - -// returns a transaction, metadata pair -std::optional -PostgresBackend::fetchTransaction( - ripple::uint256 const& hash, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_seq,date FROM transactions " - "WHERE hash = " - << "\'\\x" << ripple::strHex(hash) << "\'"; - - if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 4)) - { - return { - {res.asUnHexedBlob(0, 0), - res.asUnHexedBlob(0, 1), - res.asBigInt(0, 2), - res.asBigInt(0, 3)}}; - } - - return {}; -} -std::vector -PostgresBackend::fetchAllTransactionsInLedger( - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT transaction, metadata, ledger_seq,date FROM transactions " - "WHERE " - << "ledger_seq = " << std::to_string(ledgerSequence); - - auto res = pgQuery(sql.str().data(), yield); - if (size_t numRows = checkResult(res, 4)) - { - std::vector txns; - for (size_t i = 0; i < numRows; ++i) - { - txns.push_back( - {res.asUnHexedBlob(i, 0), - res.asUnHexedBlob(i, 1), - res.asBigInt(i, 2), - res.asBigInt(i, 3)}); - } - return txns; - } - return {}; -} -std::vector -PostgresBackend::fetchAllTransactionHashesInLedger( - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT hash FROM transactions WHERE " - << "ledger_seq = " << std::to_string(ledgerSequence); - - auto res = pgQuery(sql.str().data(), yield); - if (size_t numRows = checkResult(res, 1)) - { - std::vector hashes; - for (size_t i = 0; i < numRows; ++i) - { - hashes.push_back(res.asUInt256(i, 0)); - } - return hashes; - } - - return {}; -} - -std::optional -PostgresBackend::fetchNFT( - ripple::uint256 const& tokenID, - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const -{ - throw std::runtime_error("Not implemented"); -} - -std::optional -PostgresBackend::doFetchSuccessorKey( - ripple::uint256 key, - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT next FROM successor WHERE key = " - << "\'\\x" << ripple::strHex(key) << "\'" - << " AND ledger_seq <= " << std::to_string(ledgerSequence) - << " ORDER BY ledger_seq DESC LIMIT 1"; - - if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 1)) - { - auto next = res.asUInt256(0, 0); - if (next == lastKey) - return {}; - return next; - } - - return {}; -} - -std::vector -PostgresBackend::fetchTransactions( - std::vector const& hashes, - boost::asio::yield_context& yield) const -{ - if (!hashes.size()) - return {}; - - std::vector results; - results.resize(hashes.size()); - - handler_type handler(std::forward(yield)); - result_type result(handler); - - auto hw = new HandlerWrapper(std::move(handler)); - - auto start = std::chrono::system_clock::now(); - - std::atomic_uint numRemaining = hashes.size(); - std::atomic_bool errored = false; - - for (size_t i = 0; i < hashes.size(); ++i) - { - auto const& hash = hashes[i]; - boost::asio::spawn( - get_associated_executor(yield), - [this, &hash, &results, hw, &numRemaining, &errored, i]( - boost::asio::yield_context yield) { - BOOST_LOG_TRIVIAL(trace) << __func__ << " getting txn = " << i; - - PgQuery pgQuery(pgPool_); - - std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_seq,date FROM " - "transactions " - "WHERE HASH = \'\\x" - << ripple::strHex(hash) << "\'"; - - try - { - if (auto const res = pgQuery(sql.str().data(), yield); - checkResult(res, 4)) - { - results[i] = { - res.asUnHexedBlob(0, 0), - res.asUnHexedBlob(0, 1), - res.asBigInt(0, 2), - res.asBigInt(0, 3)}; - } - } - catch (DatabaseTimeout const&) - { - errored = true; - } - - if (--numRemaining == 0) - { - handler_type h(std::move(hw->handler)); - h(boost::system::error_code{}); - } - }); - } - - // Yields the worker to the io_context until handler is called. - result.get(); - - delete hw; - - auto end = std::chrono::system_clock::now(); - auto duration = - std::chrono::duration_cast(end - start); - - BOOST_LOG_TRIVIAL(info) - << __func__ << " fetched " << std::to_string(hashes.size()) - << " transactions asynchronously. took " - << std::to_string(duration.count()); - if (errored) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out"; - throw DatabaseTimeout(); - } - - return results; -} - -std::vector -PostgresBackend::doFetchLedgerObjects( - std::vector const& keys, - std::uint32_t const sequence, - boost::asio::yield_context& yield) const -{ - if (!keys.size()) - return {}; - - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::vector results; - results.resize(keys.size()); - - handler_type handler(std::forward(yield)); - result_type result(handler); - - auto hw = new HandlerWrapper(std::move(handler)); - - std::atomic_uint numRemaining = keys.size(); - std::atomic_bool errored = false; - auto start = std::chrono::system_clock::now(); - for (size_t i = 0; i < keys.size(); ++i) - { - auto const& key = keys[i]; - boost::asio::spawn( - boost::asio::get_associated_executor(yield), - [this, &key, &results, &numRemaining, &errored, hw, i, sequence]( - boost::asio::yield_context yield) { - PgQuery pgQuery(pgPool_); - - std::stringstream sql; - sql << "SELECT object FROM " - "objects " - "WHERE key = \'\\x" - << ripple::strHex(key) << "\'" - << " AND ledger_seq <= " << std::to_string(sequence) - << " ORDER BY ledger_seq DESC LIMIT 1"; - - try - { - if (auto const res = pgQuery(sql.str().data(), yield); - checkResult(res, 1)) - results[i] = res.asUnHexedBlob(); - } - catch (DatabaseTimeout const& ex) - { - errored = true; - } - - if (--numRemaining == 0) - { - handler_type h(std::move(hw->handler)); - h(boost::system::error_code{}); - } - }); - } - - // Yields the worker to the io_context until handler is called. - result.get(); - - delete hw; - - auto end = std::chrono::system_clock::now(); - auto duration = - std::chrono::duration_cast(end - start); - - BOOST_LOG_TRIVIAL(info) - << __func__ << " fetched " << std::to_string(keys.size()) - << " objects asynchronously. ms = " << std::to_string(duration.count()); - if (errored) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out"; - throw DatabaseTimeout(); - } - - return results; -} - -std::vector -PostgresBackend::fetchLedgerDiff( - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - - std::stringstream sql; - sql << "SELECT key,object FROM objects " - "WHERE " - << "ledger_seq = " << std::to_string(ledgerSequence); - - auto res = pgQuery(sql.str().data(), yield); - if (size_t numRows = checkResult(res, 2)) - { - std::vector objects; - for (size_t i = 0; i < numRows; ++i) - { - objects.push_back({res.asUInt256(i, 0), res.asUnHexedBlob(i, 1)}); - } - return objects; - } - - return {}; -} - -// TODO this implementation and fetchAccountTransactions should be -// generalized -TransactionsAndCursor -PostgresBackend::fetchNFTTransactions( - ripple::uint256 const& tokenID, - std::uint32_t const limit, - bool forward, - std::optional const& cursor, - boost::asio::yield_context& yield) const -{ - throw std::runtime_error("Not implemented"); -} - -TransactionsAndCursor -PostgresBackend::fetchAccountTransactions( - ripple::AccountID const& account, - std::uint32_t const limit, - bool forward, - std::optional const& cursor, - boost::asio::yield_context& yield) const -{ - PgQuery pgQuery(pgPool_); - pgQuery(set_timeout, yield); - pg_params dbParams; - - char const*& command = dbParams.first; - std::vector>& values = dbParams.second; - command = - "SELECT account_tx($1::bytea, $2::bigint, $3::bool, " - "$4::bigint, $5::bigint)"; - values.resize(5); - values[0] = "\\x" + strHex(account); - - values[1] = std::to_string(limit); - - values[2] = std::to_string(forward); - - if (cursor) - { - values[3] = std::to_string(cursor->ledgerSequence); - values[4] = std::to_string(cursor->transactionIndex); - } - for (size_t i = 0; i < values.size(); ++i) - { - BOOST_LOG_TRIVIAL(debug) << "value " << std::to_string(i) << " = " - << (values[i] ? values[i].value() : "null"); - } - - auto start = std::chrono::system_clock::now(); - auto res = pgQuery(dbParams, yield); - auto end = std::chrono::system_clock::now(); - - auto duration = ((end - start).count()) / 1000000000.0; - BOOST_LOG_TRIVIAL(info) - << __func__ << " : executed stored_procedure in " - << std::to_string(duration) - << " num records = " << std::to_string(checkResult(res, 1)); - - checkResult(res, 1); - - char const* resultStr = res.c_str(); - BOOST_LOG_TRIVIAL(debug) << __func__ << " : " - << "postgres result = " << resultStr - << " : account = " << strHex(account); - - boost::json::value raw = boost::json::parse(resultStr); - boost::json::object responseObj = raw.as_object(); - BOOST_LOG_TRIVIAL(debug) << " parsed = " << responseObj; - if (responseObj.contains("transactions")) - { - auto txns = responseObj.at("transactions").as_array(); - std::vector hashes; - for (auto& hashHex : txns) - { - ripple::uint256 hash; - if (hash.parseHex(hashHex.at("hash").as_string().c_str() + 2)) - hashes.push_back(hash); - } - if (responseObj.contains("cursor")) - { - return { - fetchTransactions(hashes, yield), - {{responseObj.at("cursor").at("ledger_sequence").as_int64(), - responseObj.at("cursor") - .at("transaction_index") - .as_int64()}}}; - } - return {fetchTransactions(hashes, yield), {}}; - } - return {{}, {}}; -} // namespace Backend - -void -PostgresBackend::open(bool readOnly) -{ - initSchema(pgPool_); - initAccountTx(pgPool_); -} - -void -PostgresBackend::close() -{ -} - -void -PostgresBackend::startWrites() const -{ - synchronous([&](boost::asio::yield_context yield) { - numRowsInObjectsBuffer_ = 0; - abortWrite_ = false; - auto res = writeConnection_("BEGIN", yield); - if (!res || res.status() != PGRES_COMMAND_OK) - { - std::stringstream msg; - msg << "Postgres error creating transaction: " << res.msg(); - throw std::runtime_error(msg.str()); - } - }); -} - -bool -PostgresBackend::doFinishWrites() -{ - synchronous([&](boost::asio::yield_context yield) { - if (!abortWrite_) - { - std::string txStr = transactionsBuffer_.str(); - writeConnection_.bulkInsert("transactions", txStr, yield); - writeConnection_.bulkInsert( - "account_transactions", accountTxBuffer_.str(), yield); - std::string objectsStr = objectsBuffer_.str(); - if (objectsStr.size()) - writeConnection_.bulkInsert("objects", objectsStr, yield); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " objects size = " << objectsStr.size() - << " txns size = " << txStr.size(); - std::string successorStr = successorBuffer_.str(); - if (successorStr.size()) - writeConnection_.bulkInsert("successor", successorStr, yield); - if (!range) - { - std::stringstream indexCreate; - indexCreate - << "CREATE INDEX diff ON objects USING hash(ledger_seq) " - "WHERE NOT " - "ledger_seq = " - << std::to_string(inProcessLedger); - writeConnection_(indexCreate.str().data(), yield); - } - } - auto res = writeConnection_("COMMIT", yield); - if (!res || res.status() != PGRES_COMMAND_OK) - { - std::stringstream msg; - msg << "Postgres error committing transaction: " << res.msg(); - throw std::runtime_error(msg.str()); - } - transactionsBuffer_.str(""); - transactionsBuffer_.clear(); - objectsBuffer_.str(""); - objectsBuffer_.clear(); - successorBuffer_.str(""); - successorBuffer_.clear(); - successors_.clear(); - accountTxBuffer_.str(""); - accountTxBuffer_.clear(); - numRowsInObjectsBuffer_ = 0; - }); - - return !abortWrite_; -} - -bool -PostgresBackend::doOnlineDelete( - std::uint32_t const numLedgersToKeep, - boost::asio::yield_context& yield) const -{ - auto rng = fetchLedgerRange(); - if (!rng) - return false; - std::uint32_t minLedger = rng->maxSequence - numLedgersToKeep; - if (minLedger <= rng->minSequence) - return false; - PgQuery pgQuery(pgPool_); - pgQuery("SET statement_timeout TO 0", yield); - std::optional cursor; - while (true) - { - auto [objects, curCursor] = retryOnTimeout([&]() { - return fetchLedgerPage(cursor, minLedger, 256, false, yield); - }); - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - std::stringstream objectsBuffer; - - for (auto& obj : objects) - { - objectsBuffer << "\\\\x" << ripple::strHex(obj.key) << '\t' - << std::to_string(minLedger) << '\t' << "\\\\x" - << ripple::strHex(obj.blob) << '\n'; - } - pgQuery.bulkInsert("objects", objectsBuffer.str(), yield); - cursor = curCursor; - if (!cursor) - break; - } - BOOST_LOG_TRIVIAL(info) << __func__ << " finished inserting into objects"; - { - std::stringstream sql; - sql << "DELETE FROM ledgers WHERE ledger_seq < " - << std::to_string(minLedger); - auto res = pgQuery(sql.str().data(), yield); - if (res.msg() != "ok") - throw std::runtime_error("Error deleting from ledgers table"); - } - { - std::stringstream sql; - sql << "DELETE FROM keys WHERE ledger_seq < " - << std::to_string(minLedger); - auto res = pgQuery(sql.str().data(), yield); - if (res.msg() != "ok") - throw std::runtime_error("Error deleting from keys table"); - } - { - std::stringstream sql; - sql << "DELETE FROM books WHERE ledger_seq < " - << std::to_string(minLedger); - auto res = pgQuery(sql.str().data(), yield); - if (res.msg() != "ok") - throw std::runtime_error("Error deleting from books table"); - } - return true; -} - -} // namespace Backend diff --git a/src/backend/PostgresBackend.h b/src/backend/PostgresBackend.h deleted file mode 100644 index c14449e3..00000000 --- a/src/backend/PostgresBackend.h +++ /dev/null @@ -1,171 +0,0 @@ -#ifndef RIPPLE_APP_REPORTING_POSTGRESBACKEND_H_INCLUDED -#define RIPPLE_APP_REPORTING_POSTGRESBACKEND_H_INCLUDED -#include -#include - -namespace Backend { -class PostgresBackend : public BackendInterface -{ -private: - mutable size_t numRowsInObjectsBuffer_ = 0; - mutable std::stringstream objectsBuffer_; - mutable size_t numRowsInSuccessorBuffer_ = 0; - mutable std::stringstream successorBuffer_; - mutable std::stringstream transactionsBuffer_; - mutable std::stringstream accountTxBuffer_; - std::shared_ptr pgPool_; - mutable PgQuery writeConnection_; - mutable bool abortWrite_ = false; - std::uint32_t writeInterval_ = 1000000; - std::uint32_t inProcessLedger = 0; - mutable std::unordered_set successors_; - - const char* const set_timeout = "SET statement_timeout TO 10000"; - -public: - PostgresBackend( - boost::asio::io_context& ioc, - boost::json::object const& config); - - std::optional - fetchLatestLedgerSequence(boost::asio::yield_context& yield) const override; - - std::optional - fetchLedgerBySequence( - std::uint32_t const sequence, - boost::asio::yield_context& yield) const override; - - std::optional - fetchLedgerByHash( - ripple::uint256 const& hash, - boost::asio::yield_context& yield) const override; - - std::optional - doFetchLedgerObject( - ripple::uint256 const& key, - std::uint32_t const sequence, - boost::asio::yield_context& yield) const override; - - // returns a transaction, metadata pair - std::optional - fetchTransaction( - ripple::uint256 const& hash, - boost::asio::yield_context& yield) const override; - - std::vector - fetchAllTransactionsInLedger( - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const override; - - std::vector - fetchAllTransactionHashesInLedger( - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const override; - - std::optional - fetchNFT( - ripple::uint256 const& tokenID, - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const override; - - TransactionsAndCursor - fetchNFTTransactions( - ripple::uint256 const& tokenID, - std::uint32_t const limit, - bool const forward, - std::optional const& cursorIn, - boost::asio::yield_context& yield) const override; - - std::vector - fetchLedgerDiff( - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const override; - - std::optional - hardFetchLedgerRange(boost::asio::yield_context& yield) const override; - - std::optional - doFetchSuccessorKey( - ripple::uint256 key, - std::uint32_t const ledgerSequence, - boost::asio::yield_context& yield) const override; - - std::vector - fetchTransactions( - std::vector const& hashes, - boost::asio::yield_context& yield) const override; - - std::vector - doFetchLedgerObjects( - std::vector const& keys, - std::uint32_t const sequence, - boost::asio::yield_context& yield) const override; - - TransactionsAndCursor - fetchAccountTransactions( - ripple::AccountID const& account, - std::uint32_t const limit, - bool forward, - std::optional const& cursor, - boost::asio::yield_context& yield) const override; - - void - writeLedger( - ripple::LedgerInfo const& ledgerInfo, - std::string&& ledgerHeader) override; - - void - doWriteLedgerObject( - std::string&& key, - std::uint32_t const seq, - std::string&& blob) override; - - void - writeSuccessor( - std::string&& key, - std::uint32_t const seq, - std::string&& successor) override; - - void - writeTransaction( - std::string&& hash, - std::uint32_t const seq, - std::uint32_t const date, - std::string&& transaction, - std::string&& metadata) override; - - void - writeNFTs(std::vector&& data) override; - - void - writeAccountTransactions( - std::vector&& data) override; - - void - writeNFTTransactions(std::vector&& data) override; - - void - open(bool readOnly) override; - - void - close() override; - - void - startWrites() const override; - - bool - doFinishWrites() override; - - bool - isTooBusy() const override - { - return false; - } - - bool - doOnlineDelete( - std::uint32_t const numLedgersToKeep, - boost::asio::yield_context& yield) const override; -}; -} // namespace Backend -#endif diff --git a/src/backend/README.md b/src/backend/README.md index 06918dcf..da05db1d 100644 --- a/src/backend/README.md +++ b/src/backend/README.md @@ -1,6 +1,6 @@ # Clio Backend ## Background -The backend of Clio is responsible for handling the proper reading and writing of past ledger data from and to a given database. As of right now, Cassandra is the only supported database that is production-ready. However, support for more databases like PostgreSQL and DynamoDB may be added in future versions. Support for database types can be easily extended by creating new implementations which implements the virtual methods of `BackendInterface.h`. Then, use the Factory Object Design Pattern to simply add logic statements to `BackendFactory.h` that return the new database interface for a specific `type` in Clio's configuration file. +The backend of Clio is responsible for handling the proper reading and writing of past ledger data from and to a given database. As of right now, Cassandra and ScyllaDB are the only supported databases that are production-ready. Support for database types can be easily extended by creating new implementations which implements the virtual methods of `BackendInterface.h`. Then, use the Factory Object Design Pattern to simply add logic statements to `BackendFactory.h` that return the new database interface for a specific `type` in Clio's configuration file. ## Data Model The data model used by Clio to read and write ledger data is different from what Rippled uses. Rippled uses a novel data structure named [*SHAMap*](https://github.com/ripple/rippled/blob/master/src/ripple/shamap/README.md), which is a combination of a Merkle Tree and a Radix Trie. In a SHAMap, ledger objects are stored in the root vertices of the tree. Thus, looking up a record located at the leaf node of the SHAMap executes a tree search, where the path from the root node to the leaf node is the key of the record. Rippled nodes can also generate a proof-tree by forming a subtree with all the path nodes and their neighbors, which can then be used to prove the existnce of the leaf node data to other Rippled nodes. In short, the main purpose of the SHAMap data structure is to facilitate the fast validation of data integrity between different decentralized Rippled nodes. @@ -129,4 +129,4 @@ In each new ledger version with sequence `n`, a ledger object `v` can either be 2. If `v` is... 1. Being **created**, add two new records of `seq=n` with one being `e` pointing to `v`, and `v` pointing to `w` (Linked List insertion operation). 2. Being **modified**, do nothing. - 3. Being **deleted**, add a record of `seq=n` with `e` pointing to `v`'s `next` value (Linked List deletion operation). \ No newline at end of file + 3. Being **deleted**, add a record of `seq=n` with `e` pointing to `v`'s `next` value (Linked List deletion operation). diff --git a/src/etl/README.md b/src/etl/README.md index fe314ebc..b52c28d5 100644 --- a/src/etl/README.md +++ b/src/etl/README.md @@ -22,7 +22,7 @@ read-only mode. In read-only mode, the server does not perform ETL and simply publishes new ledgers as they are written to the database. If the database is not updated within a certain time period (currently hard coded at 20 seconds), clio will begin the ETL -process and start writing to the database. Postgres will report an error when +process and start writing to the database. The database will report an error when trying to write a record with a key that already exists. ETL uses this error to determine that another process is writing to the database, and subsequently falls back to a soft read-only mode. clio can also operate in strict diff --git a/src/etl/ReportingETL.h b/src/etl/ReportingETL.h index 1fb6eedf..c66b452e 100644 --- a/src/etl/ReportingETL.h +++ b/src/etl/ReportingETL.h @@ -262,7 +262,7 @@ private: /// following parent /// @param parent the previous ledger /// @param rawData data extracted from an ETL source - /// @return the newly built ledger and data to write to Postgres + /// @return the newly built ledger and data to write to the database std::pair buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData); diff --git a/src/rpc/RPCHelpers.cpp b/src/rpc/RPCHelpers.cpp index bfe29b86..2c753a77 100644 --- a/src/rpc/RPCHelpers.cpp +++ b/src/rpc/RPCHelpers.cpp @@ -1,6 +1,11 @@ #include +#include + +#include + #include #include + namespace RPC { std::optional diff --git a/src/rpc/handlers/AccountChannels.cpp b/src/rpc/handlers/AccountChannels.cpp index e4359d51..a9f377e8 100644 --- a/src/rpc/handlers/AccountChannels.cpp +++ b/src/rpc/handlers/AccountChannels.cpp @@ -8,7 +8,7 @@ #include #include #include -#include + #include namespace RPC { diff --git a/src/rpc/handlers/BookOffers.cpp b/src/rpc/handlers/BookOffers.cpp index 3e433ae9..0666106c 100644 --- a/src/rpc/handlers/BookOffers.cpp +++ b/src/rpc/handlers/BookOffers.cpp @@ -10,7 +10,6 @@ #include #include -#include namespace RPC { diff --git a/src/rpc/handlers/Tx.cpp b/src/rpc/handlers/Tx.cpp index 03de231b..0549495d 100644 --- a/src/rpc/handlers/Tx.cpp +++ b/src/rpc/handlers/Tx.cpp @@ -1,5 +1,4 @@ #include -#include #include namespace RPC { diff --git a/unittests/README.md b/unittests/README.md index 4a0a4282..e67495dc 100644 --- a/unittests/README.md +++ b/unittests/README.md @@ -3,12 +3,6 @@ The correctness of new implementations can be verified via running unit tests. B ## Requirements ### 1. Cassandra cluster Have access to a **local (127.0.0.1)** Cassandra cluster, opened at port **9042**. Please ensure that the cluster is successfully running before running Unit Tests. -### 2. Postgres server -Have access to a **local (127.0.0.1)** Postgres server, opened at port **5432**. The server must also have a super user named **postgres** with password set to **postgres**. In addition, modify *postgresql.conf* with the following field values: -``` -max_connections = 1000 -shared_buffers = 1280MB -``` ## Running To run the unit tests, first build Clio as normal, then execute `./clio_tests` to run the unit tests. @@ -28,4 +22,4 @@ TEST(module_name, test_name) { // Test code goes here } -``` \ No newline at end of file +``` diff --git a/unittests/main.cpp b/unittests/main.cpp index 5c69550a..130f3f7a 100644 --- a/unittests/main.cpp +++ b/unittests/main.cpp @@ -37,20 +37,7 @@ TEST(BackendTest, Basic) {"max_requests_outstanding", 1000}, {"indexer_key_shift", 2}, {"threads", 8}}}}}}; - boost::json::object postgresConfig{ - {"database", - {{"type", "postgres"}, - {"experimental", true}, - {"postgres", - {{"contact_point", "127.0.0.1"}, - {"username", "postgres"}, - {"database", keyspace.c_str()}, - {"password", "postgres"}, - {"indexer_key_shift", 2}, - {"max_connections", 100}, - {"threads", 8}}}}}}; - std::vector configs = { - cassandraConfig, postgresConfig}; + std::vector configs = {cassandraConfig}; for (auto& config : configs) { std::cout << keyspace << std::endl; @@ -1853,20 +1840,7 @@ TEST(Backend, cacheIntegration) {"max_requests_outstanding", 1000}, {"indexer_key_shift", 2}, {"threads", 8}}}}}}; - boost::json::object postgresConfig{ - {"database", - {{"type", "postgres"}, - {"experimental", true}, - {"postgres", - {{"contact_point", "127.0.0.1"}, - {"username", "postgres"}, - {"database", keyspace.c_str()}, - {"password", "postgres"}, - {"indexer_key_shift", 2}, - {"max_connections", 100}, - {"threads", 8}}}}}}; - std::vector configs = { - cassandraConfig, postgresConfig}; + std::vector configs = {cassandraConfig}; for (auto& config : configs) { std::cout << keyspace << std::endl;