From 6e6f47421d4712a975180769957693a64452f7fc Mon Sep 17 00:00:00 2001 From: CJ Cobb <46455409+cjcobb23@users.noreply.github.com> Date: Fri, 7 Jan 2022 14:48:48 -0500 Subject: [PATCH] Postgres fixes (#84) * Postgres fixes * Create partial index for ledger_diffs, to avoid indexing the first ledger's objects * Don't write duplicate keys to successor table * default to 4 markers when syncing cache * remove isFirst from writeLedger interface --- src/backend/BackendInterface.cpp | 13 ++------ src/backend/BackendInterface.h | 4 +-- src/backend/CassandraBackend.cpp | 17 ++++------ src/backend/CassandraBackend.h | 9 ++---- src/backend/Pg.cpp | 2 -- src/backend/PostgresBackend.cpp | 25 +++++++++++++-- src/backend/PostgresBackend.h | 6 ++-- src/etl/ETLSource.cpp | 55 +++++++++++++++++++++----------- src/etl/ReportingETL.cpp | 14 ++++---- unittests/main.cpp | 4 +-- 10 files changed, 84 insertions(+), 65 deletions(-) diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index a8f9b280..8ac4d556 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -8,16 +8,8 @@ BackendInterface::finishWrites(uint32_t ledgerSequence) auto commitRes = doFinishWrites(); if (commitRes) { - isFirst_ = false; updateRange(ledgerSequence); } - else - { - // if commitRes is false, we are relinquishing control of ETL. We - // reset isFirst_ to true so that way if we later regain control of - // ETL, we trigger the index repair - isFirst_ = true; - } return commitRes; } void @@ -242,8 +234,9 @@ BackendInterface::fetchLedgerPage( std::vector keys; while (keys.size() < limit) { - ripple::uint256 const& curCursor = - keys.size() ? keys.back() : cursor ? *cursor : firstKey; + ripple::uint256 const& curCursor = keys.size() ? keys.back() + : cursor ? *cursor + : firstKey; auto succ = fetchSuccessorKey(curCursor, ledgerSequence); if (!succ) break; diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index cb2f3c67..e32e3515 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -19,7 +19,6 @@ class DatabaseTimeout : public std::exception class BackendInterface { protected: - bool isFirst_ = true; std::optional range; SimpleCache cache_; @@ -153,8 +152,7 @@ public: virtual void writeLedger( ripple::LedgerInfo const& ledgerInfo, - std::string&& ledgerHeader, - bool isFirst = false) = 0; + std::string&& ledgerHeader) = 0; void writeLedgerObject(std::string&& key, uint32_t seq, std::string&& blob); diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index c581cd41..4b654b58 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -151,7 +151,7 @@ CassandraBackend::doWriteLedgerObject( std::string&& blob) { BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; - if (!isFirst_) + if (range) makeAndExecuteAsyncWrite( this, std::move(std::make_tuple(seq, key)), [this](auto& params) { auto& [sequence, key] = params.data; @@ -201,8 +201,7 @@ CassandraBackend::writeSuccessor( void CassandraBackend::writeLedger( ripple::LedgerInfo const& ledgerInfo, - std::string&& header, - bool isFirst) + std::string&& header) { makeAndExecuteAsyncWrite( this, @@ -225,7 +224,6 @@ CassandraBackend::writeLedger( return statement; }); ledgerSequence_ = ledgerInfo.seq; - isFirstLedger_ = isFirst; } void CassandraBackend::writeAccountTransactions( @@ -462,10 +460,9 @@ CassandraBackend::fetchAccountTransactions( { statement.bindNextIntTuple( cursor->ledgerSequence, cursor->transactionIndex); - BOOST_LOG_TRIVIAL(debug) - << " account = " << ripple::strHex(account) - << " tuple = " << cursor->ledgerSequence << " : " - << cursor->transactionIndex; + BOOST_LOG_TRIVIAL(debug) << " account = " << ripple::strHex(account) + << " tuple = " << cursor->ledgerSequence + << " : " << cursor->transactionIndex; } else { @@ -474,8 +471,8 @@ CassandraBackend::fetchAccountTransactions( statement.bindNextIntTuple(placeHolder, placeHolder); BOOST_LOG_TRIVIAL(debug) - << " account = " << ripple::strHex(account) - << " idx = " << seq << " tuple = " << placeHolder; + << " account = " << ripple::strHex(account) << " idx = " << seq + << " tuple = " << placeHolder; } statement.bindNextUInt(limit); diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 74904070..c36d6a95 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -597,7 +597,6 @@ private: boost::json::object config_; mutable uint32_t ledgerSequence_ = 0; - mutable bool isFirstLedger_ = false; public: CassandraBackend(boost::json::object const& config) @@ -648,7 +647,7 @@ public: // wait for all other writes to finish sync(); // write range - if (isFirstLedger_) + if (!range) { CassandraStatement statement{updateLedgerRange_}; statement.bindNextInt(ledgerSequence_); @@ -672,10 +671,8 @@ public: return true; } void - writeLedger( - ripple::LedgerInfo const& ledgerInfo, - std::string&& header, - bool isFirst = false) override; + writeLedger(ripple::LedgerInfo const& ledgerInfo, std::string&& header) + override; std::optional fetchLatestLedgerSequence() const override diff --git a/src/backend/Pg.cpp b/src/backend/Pg.cpp index 7909eebb..fed03f8e 100644 --- a/src/backend/Pg.cpp +++ b/src/backend/Pg.cpp @@ -773,8 +773,6 @@ CREATE TABLE IF NOT EXISTS objects ( CREATE INDEX objects_idx ON objects USING btree(key,ledger_seq); -CREATE INDEX diff ON objects USING hash(ledger_seq); - create table if not exists objects1 partition of objects for values from (0) to (10000000); create table if not exists objects2 partition of objects for values from (10000000) to (20000000); create table if not exists objects3 partition of objects for values from (20000000) to (30000000); diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index c69e186e..ffcc13d7 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -17,8 +17,7 @@ PostgresBackend::PostgresBackend(boost::json::object const& config) void PostgresBackend::writeLedger( ripple::LedgerInfo const& ledgerInfo, - std::string&& ledgerHeader, - bool isFirst) + std::string&& ledgerHeader) { auto cmd = boost::format( R"(INSERT INTO ledgers @@ -35,6 +34,7 @@ PostgresBackend::writeLedger( auto res = writeConnection_(ledgerInsert.data()); abortWrite_ = !res; + inProcessLedger = ledgerInfo.seq; } void @@ -88,9 +88,17 @@ PostgresBackend::writeSuccessor( uint32_t seq, std::string&& successor) { + if (range) + { + if (successors_.count(key) > 0) + return; + successors_.insert(key); + } successorBuffer_ << "\\\\x" << ripple::strHex(key) << '\t' << std::to_string(seq) << '\t' << "\\\\x" << ripple::strHex(successor) << '\n'; + BOOST_LOG_TRIVIAL(trace) + << __func__ << ripple::strHex(key) << " - " << std::to_string(seq); numRowsInSuccessorBuffer_++; if (numRowsInSuccessorBuffer_ % writeInterval_ == 0) { @@ -532,7 +540,7 @@ PostgresBackend::fetchLedgerDiff(uint32_t ledgerSequence) const "WHERE " << "ledger_seq = " << std::to_string(ledgerSequence); auto res = pgQuery(sql.str().data()); - if (size_t numRows = checkResult(res, 4)) + if (size_t numRows = checkResult(res, 2)) { std::vector objects; for (size_t i = 0; i < numRows; ++i) @@ -666,6 +674,17 @@ PostgresBackend::doFinishWrites() std::string successorStr = successorBuffer_.str(); if (successorStr.size()) writeConnection_.bulkInsert("successor", successorStr); + successors_.clear(); + if (!range) + { + std::stringstream indexCreate; + indexCreate + << "CREATE INDEX diff ON objects USING hash(ledger_seq) " + "WHERE NOT " + "ledger_seq = " + << std::to_string(inProcessLedger); + writeConnection_(indexCreate.str().data()); + } } auto res = writeConnection_("COMMIT"); if (!res || res.status() != PGRES_COMMAND_OK) diff --git a/src/backend/PostgresBackend.h b/src/backend/PostgresBackend.h index ba4a06c9..1ba60e96 100644 --- a/src/backend/PostgresBackend.h +++ b/src/backend/PostgresBackend.h @@ -11,7 +11,6 @@ private: mutable std::stringstream objectsBuffer_; mutable size_t numRowsInSuccessorBuffer_ = 0; mutable std::stringstream successorBuffer_; - mutable std::stringstream keysBuffer_; mutable std::stringstream transactionsBuffer_; mutable std::stringstream accountTxBuffer_; std::shared_ptr pgPool_; @@ -19,6 +18,8 @@ private: mutable bool abortWrite_ = false; mutable boost::asio::thread_pool pool_{16}; uint32_t writeInterval_ = 1000000; + uint32_t inProcessLedger = 0; + std::unordered_set successors_; public: PostgresBackend(boost::json::object const& config); @@ -75,8 +76,7 @@ public: void writeLedger( ripple::LedgerInfo const& ledgerInfo, - std::string&& ledgerHeader, - bool isFirst) override; + std::string&& ledgerHeader) override; void doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index 0c6c1c3c..864ce15d 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -599,6 +599,11 @@ public: for (int i = 0; i < cur_->ledger_objects().objects_size(); ++i) { auto& obj = *(cur_->mutable_ledger_objects()->mutable_objects(i)); + if (!more) + { + if (((unsigned char)obj.key()[0]) >= nextPrefix_) + continue; + } cacheUpdates.push_back( {*ripple::uint256::fromVoidChecked(obj.key()), {obj.mutable_data()->begin(), obj.mutable_data()->end()}}); @@ -743,6 +748,9 @@ ETLSourceImpl::loadInitialLedger( auto start = std::chrono::system_clock::now(); for (auto& key : edgeKeys) { + BOOST_LOG_TRIVIAL(debug) + << __func__ + << " writing edge key = " << ripple::strHex(key); auto succ = backend_->cache().getSuccessor( *ripple::uint256::fromVoidChecked(key), sequence); if (succ) @@ -761,18 +769,23 @@ ETLSourceImpl::loadInitialLedger( if (isBookDir(cur->key, cur->blob)) { auto base = getBookBase(cur->key); - auto succ = backend_->cache().getSuccessor(base, sequence); - assert(succ); - if (succ->key == cur->key) + // make sure the base is not an actual object + if (!backend_->cache().get(cur->key, sequence)) { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Writing book successor = " - << ripple::strHex(base) << " - " - << ripple::strHex(cur->key); - backend_->writeSuccessor( - uint256ToString(base), - sequence, - uint256ToString(cur->key)); + auto succ = + backend_->cache().getSuccessor(base, sequence); + assert(succ); + if (succ->key == cur->key) + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " Writing book successor = " + << ripple::strHex(base) << " - " + << ripple::strHex(cur->key); + backend_->writeSuccessor( + uint256ToString(base), + sequence, + uint256ToString(cur->key)); + } } ++numWrites; } @@ -848,6 +861,10 @@ ETLLoadBalancer::ETLLoadBalancer( downloadRanges_ = std::clamp(downloadRanges_, {1}, {256}); } + else if (backend->fetchLedgerRange()) + { + downloadRanges_ = 4; + } for (auto& entry : config.at("etl_sources").as_array()) { @@ -923,7 +940,9 @@ ETLLoadBalancer::fetchLedger( } std::optional -ETLLoadBalancer::forwardToRippled(boost::json::object const& request, std::string const& clientIp) const +ETLLoadBalancer::forwardToRippled( + boost::json::object const& request, + std::string const& clientIp) const { srand((unsigned)time(0)); auto sourceIdx = rand() % sources_.size(); @@ -942,7 +961,8 @@ ETLLoadBalancer::forwardToRippled(boost::json::object const& request, std::strin template std::optional ETLSourceImpl::forwardToRippled( - boost::json::object const& request, std::string const& clientIp) const + boost::json::object const& request, + std::string const& clientIp) const { BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " << "request = " << boost::json::serialize(request); @@ -983,17 +1003,14 @@ ETLSourceImpl::forwardToRippled( // // https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg ws->set_option(websocket::stream_base::decorator( - [&request,&clientIp] (websocket::request_type& req) { + [&request, &clientIp](websocket::request_type& req) { req.set( http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro"); - req.set( - http::field::forwarded, - "for=" + clientIp); + req.set(http::field::forwarded, "for=" + clientIp); })); - BOOST_LOG_TRIVIAL(debug) - << "client ip: " << clientIp; + BOOST_LOG_TRIVIAL(debug) << "client ip: " << clientIp; BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake"; // Perform the websocket handshake diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 34264dcd..09f01a49 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -11,8 +11,8 @@ #include #include #include -#include #include +#include namespace detail { /// Convenience function for printing out basic ledger info @@ -101,7 +101,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) backend_->startWrites(); BOOST_LOG_TRIVIAL(debug) << __func__ << " started writes"; backend_->writeLedger( - lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); + lgrInfo, std::move(*ledgerData->mutable_ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote ledger"; std::vector accountTxData = insertTransactions(lgrInfo, *ledgerData); @@ -864,14 +864,14 @@ ReportingETL::monitorReadOnly() if (!mostRecent) return; uint32_t sequence = *mostRecent; + std::thread t{[this, sequence]() { + BOOST_LOG_TRIVIAL(info) << "Loading cache"; + loadBalancer_->loadInitialLedger(sequence, true); + }}; + t.detach(); while (!stopping_ && networkValidatedLedgers_->waitUntilValidatedByNetwork(sequence)) { - std::thread t{[this, sequence]() { - BOOST_LOG_TRIVIAL(info) << "Loading cache"; - loadBalancer_->loadInitialLedger(sequence, true); - }}; - t.detach(); publishLedger(sequence, {}); ++sequence; } diff --git a/unittests/main.cpp b/unittests/main.cpp index bfe0c125..434dca65 100644 --- a/unittests/main.cpp +++ b/unittests/main.cpp @@ -85,7 +85,7 @@ TEST(BackendTest, Basic) deserializeHeader(ripple::makeSlice(rawHeaderBlob)); backend->startWrites(); - backend->writeLedger(lgrInfo, std::move(rawHeaderBlob), true); + backend->writeLedger(lgrInfo, std::move(rawHeaderBlob)); backend->writeSuccessor( uint256ToString(Backend::firstKey), lgrInfo.seq, @@ -1590,7 +1590,7 @@ TEST(Backend, CacheIntegration) deserializeHeader(ripple::makeSlice(rawHeaderBlob)); backend->startWrites(); - backend->writeLedger(lgrInfo, std::move(rawHeaderBlob), true); + backend->writeLedger(lgrInfo, std::move(rawHeaderBlob)); backend->writeSuccessor( uint256ToString(Backend::firstKey), lgrInfo.seq,