diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 4997d8c3..e3e4bbe4 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -93,6 +93,7 @@ doBookOffers( CassandraFlatMapBackend const& backend, std::shared_ptr& pool) { + std::cout << "enter" << std::endl; boost::json::object response; auto sequence = ledgerSequenceFromRequest(request, pool); @@ -292,7 +293,8 @@ doBookOffers( } std::uint32_t limit = 2048; - if (request.at("limit").kind() == boost::json::kind::int64) + if (request.contains("limit") and + request.at("limit").kind() == boost::json::kind::int64) limit = request.at("limit").as_int64(); ripple::Book book = { diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index 3a854c17..4beabff4 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -60,7 +60,12 @@ getBook(std::string const& offer) { ripple::SerialIter it{offer.data(), offer.size()}; ripple::SLE sle{it, {}}; - return sle.getFieldH256(ripple::sfBookDirectory); + ripple::uint256 book = sle.getFieldH256(ripple::sfBookDirectory); + for (size_t i = 0; i < 8; ++i) + { + book.data()[book.size() - 1 - i] = 0x00; + } + return book; } /// Write new ledger and transaction data to Postgres diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index f36ae874..55b084d6 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -452,6 +452,10 @@ public: ripple::SerialIter it{obj.data().data(), obj.data().size()}; ripple::SLE sle{it, {}}; book = sle.getFieldH256(ripple::sfBookDirectory); + for (size_t i = 0; i < 8; ++i) + { + book->data()[book->size() - 1 - i] = 0x00; + } } backend.store( std::move(*obj.mutable_key()), @@ -571,8 +575,7 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) << "ETLSource::fetchLedger - is_unlimited is " "false. Make sure secure_gateway is set " "correctly on the ETL source. source = " - << toString() << " response = " << response.DebugString() - << " status = " << status.error_message(); + << toString() << " status = " << status.error_message(); assert(false); } return {status, std::move(response)}; diff --git a/reporting/ReportingBackend.cpp b/reporting/ReportingBackend.cpp index d1082722..dea9b238 100644 --- a/reporting/ReportingBackend.cpp +++ b/reporting/ReportingBackend.cpp @@ -321,8 +321,10 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) if (!row) { cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc - << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) + << "Cassandra fetch get row error : " << rc << ", " + << cass_error_desc(rc) + << " key = " << ripple::strHex(requestParams.key); finish(); return; } diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index 60f6a0ce..d0e30a81 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -94,6 +94,7 @@ private: const CassPrepared* getCreated_ = nullptr; const CassPrepared* getBook_ = nullptr; const CassPrepared* insertBook_ = nullptr; + const CassPrepared* deleteBook_ = nullptr; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -485,8 +486,8 @@ public: query = {}; query << "CREATE TABLE IF NOT EXISTS " << tableName << "books" << " ( book blob, sequence bigint, key blob, deleted_at " - "bigint static, PRIMARY KEY " - "(book, sequence, key))"; + "bigint, PRIMARY KEY " + "(book, key))"; statement = makeStatement(query.str().c_str(), 0); fut = cass_session_execute(session_.get(), statement); rc = cass_future_error_code(fut); @@ -618,7 +619,7 @@ public: query = {}; query << "INSERT INTO " << tableName << "books" - << " (book, sequence, key, deleted_at) VALUES (?, ?, ?, ?)"; + << " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)"; prepare_future = cass_session_prepare(session_.get(), query.str().c_str()); @@ -640,6 +641,31 @@ public: /* Get the prepared object from the future */ insertBook_ = cass_future_get_prepared(prepare_future); cass_future_free(prepare_future); + query = {}; + + query << "INSERT INTO " << tableName << "books" + << " (book, key, deleted_at) VALUES (?, ?, ?)"; + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + /* Wait for the statement to prepare and get the result */ + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + /* Handle error */ + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: Error preparing insert : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + /* Get the prepared object from the future */ + deleteBook_ = cass_future_get_prepared(prepare_future); + cass_future_free(prepare_future); query = {}; query << "SELECT created FROM " << tableName << "keys" @@ -873,6 +899,16 @@ public: cass_prepared_free(getBook_); getBook_ = nullptr; } + if (insertBook_) + { + cass_prepared_free(insertBook_); + insertBook_ = nullptr; + } + if (deleteBook_) + { + cass_prepared_free(deleteBook_); + deleteBook_ = nullptr; + } work_.reset(); ioThread_.join(); } @@ -1209,7 +1245,7 @@ public: doBookOffers(ripple::uint256 const& book, uint32_t sequence) const { BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers"; - CassStatement* statement = cass_prepared_bind(upperBound_); + CassStatement* statement = cass_prepared_bind(getBook_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); CassError rc = cass_statement_bind_bytes( statement, 0, static_cast(book.data()), 32); @@ -1284,9 +1320,10 @@ public: BOOST_LOG_TRIVIAL(warning) << ss.str(); } keys.push_back(ripple::uint256::fromVoid(outData)); + std::cout << ripple::strHex(keys.back()) << std::endl; } BOOST_LOG_TRIVIAL(debug) - << "doUpperBound - populated keys. num keys = " << keys.size(); + << "doBookOffers - populated keys. num keys = " << keys.size(); if (keys.size()) { std::vector results; @@ -1766,6 +1803,10 @@ public: void writeBook(WriteCallbackData& data, bool isRetry) const { + assert(data.isCreated or data.isDeleted); + if (!data.isCreated and !data.isDeleted) + throw std::runtime_error( + "writing book that's neither created or deleted"); { std::unique_lock lck(throttleMutex_); if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) @@ -1779,7 +1820,11 @@ public: }); } } - CassStatement* statement = cass_prepared_bind(insertBook_); + CassStatement* statement = nullptr; + if (data.isCreated) + statement = cass_prepared_bind(insertBook_); + else if (data.isDeleted) + statement = cass_prepared_bind(deleteBook_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); const unsigned char* bookData = (unsigned char*)data.book->data(); CassError rc = cass_statement_bind_bytes( @@ -1796,21 +1841,10 @@ public: BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); throw std::runtime_error(ss.str()); } - rc = cass_statement_bind_int64( - statement, 1, (data.isCreated ? data.sequence : 0)); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } const unsigned char* keyData = (unsigned char*)data.key.data(); rc = cass_statement_bind_bytes( statement, - 2, + 1, static_cast(keyData), data.key.size()); if (rc != CASS_OK) @@ -1822,18 +1856,30 @@ public: BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); throw std::runtime_error(ss.str()); } - rc = cass_statement_bind_int64( - statement, 3, (data.isDeleted ? data.sequence : INT64_MAX)); + rc = cass_statement_bind_int64(statement, 2, data.sequence); if (rc != CASS_OK) { cass_statement_free(statement); std::stringstream ss; - ss << "binding cassandra insert object: " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); throw std::runtime_error(ss.str()); } + if (data.isCreated) + { + rc = cass_statement_bind_int64(statement, 3, INT64_MAX); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + + ss << "binding cassandra insert object: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + } CassFuture* fut = cass_session_execute(session_.get(), statement); cass_statement_free(statement); @@ -1850,7 +1896,7 @@ public: bool isDeleted, std::optional&& book) const { - BOOST_LOG_TRIVIAL(info) << "Writing ledger object to cassandra"; + BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; WriteCallbackData* data = new WriteCallbackData( this, std::move(key), diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 9549eadb..b095e68d 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -295,7 +295,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) isDeleted = true; std::optional bookDir; - if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::DELETED) + if (isCreated) { if (isOffer(obj.data())) bookDir = getBook(obj.data()); @@ -304,6 +304,10 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) { bookDir = ripple::uint256::fromVoid(obj.book_of_deleted_offer().data()); + for (size_t i = 0; i < 8; ++i) + { + bookDir->data()[bookDir->size() - 1 - i] = 0x00; + } } assert(not(isCreated and isDeleted)); @@ -335,6 +339,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence) * Behold, mortals! This function spawns three separate threads, which talk * to each other via 2 different thread safe queues and 1 atomic variable. * All threads and queues are function local. This function returns when all + * * of the threads exit. There are two termination conditions: the first is * if the load thread encounters a write conflict. In this case, the load * thread sets writeConflict, an atomic bool, to true, which signals the diff --git a/test.py b/test.py index ea6af7b5..99cbc229 100755 --- a/test.py +++ b/test.py @@ -93,10 +93,10 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, try: async with websockets.connect(address) as ws: - taker_gets = json.loads("{\"currency\":" + get_currency+"}") + taker_gets = json.loads("{\"currency\":\"" + get_currency+"\"}") if get_issuer is not None: taker_gets["issuer"] = get_issuer - taker_pays = json.loads("{\"currency\":" + pay_currency + "}") + taker_pays = json.loads("{\"currency\":\"" + pay_currency + "\"}") if pay_issuer is not None: taker_pays["issuer"] = pay_issuer @@ -116,9 +116,9 @@ parser.add_argument('--hash') parser.add_argument('--account', default="rLC64xxNif3GiY9FQnbaM4kcE6VvDhwRod") parser.add_argument('--ledger') parser.add_argument('--limit', default='200') -paresr.add_argument('--taker_pays_issuer') +parser.add_argument('--taker_pays_issuer') parser.add_argument('--taker_pays_currency') -paresr.add_argument('--taker_gets_issuer') +parser.add_argument('--taker_gets_issuer') parser.add_argument('--taker_gets_currency') @@ -145,7 +145,7 @@ def run(args): ledger_data_full(args.ip, args.port, args.ledger)) elif args.action == "book_offers": asyncio.get_event_loop().run_until_complete( - book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer); + book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer)) else: print("incorrect arguments")