diff --git a/reporting/DBHelpers.cpp b/reporting/DBHelpers.cpp index 90147226..761b0974 100644 --- a/reporting/DBHelpers.cpp +++ b/reporting/DBHelpers.cpp @@ -45,6 +45,44 @@ writeToLedgersDB(ripple::LedgerInfo const& info, PgQuery& pgQuery) return res; } +/* +bool +writeBooks(std::vector const& bookDirData, PgQuery& pg) +{ + BOOST_LOG_TRIVIAL(debug) + << __func__ << " : " + << "Writing " << bookDirData.size() << "books to Postgres"; + + try + { + std::stringstream booksCopyBuffer; + for (auto const& data : bookDirData) + { + std::string directoryIndex = ripple::strHex(data.directoryIndex); + std::string bookIndex = ripple::strHex(data.bookIndex); + auto ledgerSeq = data.ledgerSequence; + + booksCopyBuffer << "\\\\x" << directoryIndex << '\t' + << std::to_string(ledgerSeq) << '\t' << "\\\\x" + << bookIndex << '\n'; + } + + pg.bulkInsert("books", booksCopyBuffer.str()); + + BOOST_LOG_TRIVIAL(info) << __func__ << " : " + << "Successfully inserted books"; + return true; + } + catch (std::exception& e) + { + BOOST_LOG_TRIVIAL(error) + << __func__ << "Caught exception inserting books : " << e.what(); + assert(false); + return false; + } +} +*/ + bool writeToPostgres( ripple::LedgerInfo const& info, @@ -56,8 +94,8 @@ writeToPostgres( try { - // Create a PgQuery object to run multiple commands over the same - // connection in a single transaction block. + // Create a PgQuery object to run multiple commands over the + // same connection in a single transaction block. PgQuery pg(pgPool); auto res = pg("BEGIN"); if (!res || res.status() != PGRES_COMMAND_OK) @@ -67,9 +105,10 @@ writeToPostgres( throw std::runtime_error(msg.str()); } - // Writing to the ledgers db fails if the ledger already exists in the - // db. In this situation, the ETL process has detected there is another - // writer, and falls back to only publishing + // Writing to the ledgers db fails if the ledger already + // exists in the db. In this situation, the ETL process has + // detected there is another writer, and falls back to only + // publishing if (!writeToLedgersDB(info, pg)) { BOOST_LOG_TRIVIAL(warning) diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index ae2ee1b8..3a854c17 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -48,6 +48,21 @@ struct AccountTransactionsData } }; +inline bool +isOffer(std::string const& object) +{ + short offer_bytes = (object[1] << 8) | object[2]; + return offer_bytes == 0x006f; +} + +inline ripple::uint256 +getBook(std::string const& offer) +{ + ripple::SerialIter it{offer.data(), offer.size()}; + ripple::SLE sle{it, {}}; + return sle.getFieldH256(ripple::sfBookDirectory); +} + /// Write new ledger and transaction data to Postgres /// @param info Ledger Info to write /// @param accountTxData transaction data to write diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index c585de1c..18668c65 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -19,6 +19,7 @@ */ //============================================================================== +#include #include #include #include @@ -441,12 +442,22 @@ public: for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects())) { + std::optional book; + + short offer_bytes = (obj.data()[1] << 8) | obj.data()[2]; + if (offer_bytes == 0x006f) + { + ripple::SerialIter it{obj.data().data(), obj.data().size()}; + ripple::SLE sle{it, {}}; + book = sle.getFieldH256(ripple::sfBookDirectory); + } backend.store( std::move(*obj.mutable_key()), request_.ledger().sequence(), std::move(*obj.mutable_data()), true, - false); + false, + std::move(book)); } return more ? CallStatus::MORE : CallStatus::DONE; diff --git a/reporting/ReportingBackend.cpp b/reporting/ReportingBackend.cpp index e47cda1d..d1082722 100644 --- a/reporting/ReportingBackend.cpp +++ b/reporting/ReportingBackend.cpp @@ -38,6 +38,42 @@ flatMapWriteCallback(CassFuture* fut, void* cbData) delete &requestParams; } } +void +flatMapWriteBookCallback(CassFuture* fut, void* cbData) +{ + CassandraFlatMapBackend::WriteCallbackData& requestParams = + *static_cast(cbData); + CassandraFlatMapBackend const& backend = *requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.ioContext_, std::chrono::steady_clock::now() + wait); + timer->async_wait([timer, &requestParams, &backend]( + const boost::system::error_code& error) { + backend.writeBook(requestParams, true); + }); + } + else + { + --(backend.numRequestsOutstanding_); + + backend.throttleCv_.notify_all(); + if (backend.numRequestsOutstanding_ == 0) + backend.syncCv_.notify_all(); + int remaining = --requestParams.refs; + if (remaining == 0) + delete &requestParams; + } +} void flatMapWriteKeyCallback(CassFuture* fut, void* cbData) diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index d2d8b422..5e08ff73 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -39,6 +39,8 @@ flatMapWriteKeyCallback(CassFuture* fut, void* cbData); void flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); void +flatMapWriteBookCallback(CassFuture* fut, void* cbData); +void flatMapReadCallback(CassFuture* fut, void* cbData); void flatMapReadObjectCallback(CassFuture* fut, void* cbData); @@ -90,6 +92,8 @@ private: const CassPrepared* getToken_ = nullptr; const CassPrepared* insertKey_ = nullptr; const CassPrepared* getCreated_ = nullptr; + const CassPrepared* getBook_ = nullptr; + const CassPrepared* insertBook_ = nullptr; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -478,6 +482,51 @@ public: continue; } } + query = {}; + query << "CREATE TABLE IF NOT EXISTS " << tableName << "books" + << " ( book blob, sequence bigint, key blob, deleted_at " + "bigint static, PRIMARY KEY " + "(book, sequence, key))"; + statement = makeStatement(query.str().c_str(), 0); + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + cass_future_free(fut); + cass_statement_free(statement); + if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) + { + std::stringstream ss; + ss << "nodestore: Error creating Cassandra table: " << rc + << ", " << cass_error_desc(rc) << " - " << query.str(); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + query = {}; + query << "SELECT * FROM " << tableName << "books" + << " LIMIT 1"; + statement = makeStatement(query.str().c_str(), 0); + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + cass_future_free(fut); + cass_statement_free(statement); + if (rc != CASS_OK) + { + if (rc == CASS_ERROR_SERVER_INVALID_QUERY) + { + BOOST_LOG_TRIVIAL(warning) + << "table not here yet, sleeping 1s to " + "see if table creation propagates"; + continue; + } + else + { + std::stringstream ss; + ss << "nodestore: Error checking for table: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + } setupSessionAndTable = true; } @@ -567,6 +616,31 @@ public: insertKey_ = cass_future_get_prepared(prepare_future); cass_future_free(prepare_future); + query = {}; + query << "INSERT INTO " << tableName << "books" + << " (book, sequence, key, deleted_at) VALUES (?, ?, ?, ?)"; + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + /* Wait for the statement to prepare and get the result */ + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + /* Handle error */ + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: Error preparing insert : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + /* Get the prepared object from the future */ + insertBook_ = cass_future_get_prepared(prepare_future); + cass_future_free(prepare_future); + query = {}; query << "SELECT created FROM " << tableName << "keys" << " WHERE key = ? ORDER BY created desc LIMIT 1"; @@ -708,6 +782,31 @@ public: getToken_ = cass_future_get_prepared(prepare_future); + query = {}; + query << "SELECT key FROM " << tableName << "books " + << " WHERE book = ? AND sequence <= ? AND deleted_at > ? " + "ALLOW FILTERING"; + + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + // Wait for the statement to prepare and get the result + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + // Handle error + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: Error preparing getToken : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + getBook_ = cass_future_get_prepared(prepare_future); + setupPreparedStatements = true; } @@ -769,6 +868,11 @@ public: cass_prepared_free(getCreated_); getCreated_ = nullptr; } + if (getBook_) + { + cass_prepared_free(getBook_); + getBook_ = nullptr; + } work_.reset(); ioThread_.join(); } @@ -1101,6 +1205,102 @@ public: return {{}, {}}; } + std::vector + doBookOffers(std::vector const& book, uint32_t sequence) + const + { + BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers"; + CassStatement* statement = cass_prepared_bind(upperBound_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassError rc = cass_statement_bind_bytes( + statement, 0, static_cast(book.data()), 32); + + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra book to doBookOffers query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + + rc = cass_statement_bind_int64(statement, 1, sequence); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra sequence to doBookOffers query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + rc = cass_statement_bind_int64(statement, 2, sequence); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra deleted_at to doBookOffers query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); + + BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys"; + std::vector keys; + + CassIterator* iter = cass_iterator_from_result(res); + while (cass_iterator_next(iter)) + { + CassRow const* row = cass_iterator_get_row(iter); + + cass_byte_t const* outData; + std::size_t outSize; + + CassValue const* hash = cass_row_get_column(row, 0); + rc = cass_value_get_bytes(hash, &outData, &outSize); + if (rc != CASS_OK) + { + cass_iterator_free(iter); + + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + keys.push_back(ripple::uint256::fromVoid(outData)); + } + BOOST_LOG_TRIVIAL(debug) + << "doUpperBound - populated keys. num keys = " << keys.size(); + if (keys.size()) + { + std::vector results; + std::vector objs = fetchObjectsBatch(keys, sequence); + for (size_t i = 0; i < objs.size(); ++i) + { + results.push_back({keys[i], objs[i]}); + } + return results; + } + + return {}; + } bool canFetchBatch() { @@ -1317,6 +1517,7 @@ public: std::string blob; bool isCreated; bool isDeleted; + std::optional book; uint32_t currentRetries = 0; std::atomic refs = 1; @@ -1327,16 +1528,20 @@ public: uint32_t sequence, std::string&& blob, bool isCreated, - bool isDeleted) + bool isDeleted, + std::optional&& book) : backend(f) , key(std::move(key)) , sequence(sequence) , blob(std::move(blob)) , isCreated(isCreated) , isDeleted(isDeleted) + , book(std::move(book)) { if (isCreated or isDeleted) - refs = 2; + ++refs; + if (book) + ++refs; } }; @@ -1559,17 +1764,102 @@ public: } } + void + writeBook(WriteCallbackData& data, bool isRetry) const + { + { + std::unique_lock lck(throttleMutex_); + if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) + { + BOOST_LOG_TRIVIAL(trace) + << __func__ << " : " + << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; + throttleCv_.wait(lck, [this]() { + return numRequestsOutstanding_ < maxRequestsOutstanding; + }); + } + } + CassStatement* statement = cass_prepared_bind(insertBook_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + const unsigned char* bookData = (unsigned char*)data.book->data(); + CassError rc = cass_statement_bind_bytes( + statement, + 0, + static_cast(bookData), + data.book->size()); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "Binding cassandra insert hash: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + rc = cass_statement_bind_int64( + statement, 1, (data.isCreated ? data.sequence : 0)); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "binding cassandra insert object: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + const unsigned char* keyData = (unsigned char*)data.key.data(); + rc = cass_statement_bind_bytes( + statement, + 2, + static_cast(keyData), + data.key.size()); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "Binding cassandra insert hash: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + rc = cass_statement_bind_int64( + statement, 3, (data.isDeleted ? data.sequence : INT64_MAX)); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + + ss << "binding cassandra insert object: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + CassFuture* fut = cass_session_execute(session_.get(), statement); + cass_statement_free(statement); + + cass_future_set_callback( + fut, flatMapWriteBookCallback, static_cast(&data)); + cass_future_free(fut); + } void store( std::string&& key, uint32_t seq, std::string&& blob, - bool isCreated = false, - bool isDeleted = false) const + bool isCreated, + bool isDeleted, + std::optional&& book) const { BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; WriteCallbackData* data = new WriteCallbackData( - this, std::move(key), seq, std::move(blob), isCreated, isDeleted); + this, + std::move(key), + seq, + std::move(blob), + isCreated, + isDeleted, + std::move(book)); ++numRequestsOutstanding_; if (isCreated || isDeleted) @@ -1577,6 +1867,9 @@ public: write(*data, false); if (isCreated || isDeleted) writeKey(*data, false); + if (book) + writeBook(*data, false); + // handle book } struct WriteTransactionCallbackData @@ -1722,6 +2015,8 @@ public: flatMapWriteKeyCallback(CassFuture* fut, void* cbData); friend void flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); + friend void + flatMapWriteBookCallback(CassFuture* fut, void* cbData); friend void flatMapReadCallback(CassFuture* fut, void* cbData); diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 7091de84..9549eadb 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -17,6 +17,7 @@ */ //============================================================================== +#include #include #include @@ -293,13 +294,26 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) obj.mod_type() == org ::xrpl::rpc::v1::RawLedgerObject::DELETED) isDeleted = true; + std::optional bookDir; + if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::DELETED) + { + if (isOffer(obj.data())) + bookDir = getBook(obj.data()); + } + else if (obj.book_of_deleted_offer().size()) + { + bookDir = + ripple::uint256::fromVoid(obj.book_of_deleted_offer().data()); + } + assert(not(isCreated and isDeleted)); flatMapBackend_.store( std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()), isCreated, - isDeleted); + isDeleted, + std::move(bookDir)); } flatMapBackend_.sync(); BOOST_LOG_TRIVIAL(debug) diff --git a/rippled b/rippled index 063363ff..e8b8ff37 160000 --- a/rippled +++ b/rippled @@ -1 +1 @@ -Subproject commit 063363ffae64d19e63602d892ceeef576986c1e9 +Subproject commit e8b8ff37174f29340a6b4c346716ea7f7351ebbf