diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index 214affcc..536ac423 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -444,7 +444,9 @@ public: backend.store( std::move(*obj.mutable_key()), request_.ledger().sequence(), - std::move(*obj.mutable_data())); + std::move(*obj.mutable_data()), + true, + false); } return more ? CallStatus::MORE : CallStatus::DONE; diff --git a/reporting/ReportingBackend.cpp b/reporting/ReportingBackend.cpp index e4722d5b..b9a4c2da 100644 --- a/reporting/ReportingBackend.cpp +++ b/reporting/ReportingBackend.cpp @@ -36,6 +36,104 @@ flatMapWriteCallback(CassFuture* fut, void* cbData) delete &requestParams; } } + +void +flatMapWriteKeyCallback(CassFuture* fut, void* cbData) +{ + CassandraFlatMapBackend::WriteCallbackData& requestParams = + *static_cast(cbData); + CassandraFlatMapBackend const& backend = *requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.ioContext_, std::chrono::steady_clock::now() + wait); + timer->async_wait([timer, &requestParams, &backend]( + const boost::system::error_code& error) { + if (requestParams.isDeleted) + backend.writeDeletedKey(requestParams, true); + else + backend.writeKey(requestParams, true); + }); + } + else + { + --(backend.numRequestsOutstanding_); + + backend.throttleCv_.notify_all(); + if (backend.numRequestsOutstanding_ == 0) + backend.syncCv_.notify_all(); + delete &requestParams; + } +} +void +flatMapGetCreatedCallback(CassFuture* fut, void* cbData) +{ + CassandraFlatMapBackend::WriteCallbackData& requestParams = + *static_cast(cbData); + CassandraFlatMapBackend const& backend = *requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.ioContext_, std::chrono::steady_clock::now() + wait); + timer->async_wait([timer, &requestParams, &backend]( + const boost::system::error_code& error) { + backend.writeKey(requestParams, true); + }); + } + else + { + auto finish = [&backend]() { + --(backend.numRequestsOutstanding_); + + backend.throttleCv_.notify_all(); + if (backend.numRequestsOutstanding_ == 0) + backend.syncCv_.notify_all(); + }; + CassResult const* res = cass_future_get_result(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc + << ", " << cass_error_desc(rc); + finish(); + return; + } + cass_int64_t created; + rc = cass_value_get_int64(cass_row_get_column(row, 0), &created); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) + << "Cassandra fetch get bytes error : " << rc << ", " + << cass_error_desc(rc); + finish(); + return; + } + cass_result_free(res); + requestParams.createdSequence = created; + backend.writeDeletedKey(requestParams, false); + } +} void flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) { @@ -145,3 +243,63 @@ flatMapReadCallback(CassFuture* fut, void* cbData) finish(); } } + +// Process the result of an asynchronous read. Retry on error +// @param fut cassandra future associated with the read +// @param cbData struct that holds the request parameters +void +flatMapReadObjectCallback(CassFuture* fut, void* cbData) +{ + CassandraFlatMapBackend::ReadObjectCallbackData& requestParams = + *static_cast(cbData); + + CassError rc = cass_future_error_code(fut); + + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(warning) << "Cassandra fetch error : " << rc << " : " + << cass_error_desc(rc) << " - retrying"; + // Retry right away. The only time the cluster should ever be overloaded + // is when the very first ledger is being written in full (millions of + // writes at once), during which no reads should be occurring. If reads + // are timing out, the code/architecture should be modified to handle + // greater read load, as opposed to just exponential backoff + requestParams.backend.readObject(requestParams); + } + else + { + auto finish = [&requestParams]() { + size_t batchSize = requestParams.batchSize; + if (++(requestParams.numFinished) == batchSize) + requestParams.cv.notify_all(); + }; + CassResult const* res = cass_future_get_result(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc + << ", " << cass_error_desc(rc); + finish(); + return; + } + cass_byte_t const* buf; + std::size_t bufSize; + rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) + << "Cassandra fetch get bytes error : " << rc << ", " + << cass_error_desc(rc); + finish(); + return; + } + + std::vector obj{buf, buf + bufSize}; + requestParams.result = std::move(obj); + cass_result_free(res); + finish(); + } +} diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index e687c94f..801fee18 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -35,9 +35,15 @@ void flatMapWriteCallback(CassFuture* fut, void* cbData); void +flatMapWriteKeyCallback(CassFuture* fut, void* cbData); +void flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); void flatMapReadCallback(CassFuture* fut, void* cbData); +void +flatMapReadObjectCallback(CassFuture* fut, void* cbData); +void +flatMapGetCreatedCallback(CassFuture* fut, void* cbData); class CassandraFlatMapBackend { private: @@ -82,6 +88,8 @@ private: const CassPrepared* selectObject_ = nullptr; const CassPrepared* upperBound_ = nullptr; const CassPrepared* getToken_ = nullptr; + const CassPrepared* insertKey_ = nullptr; + const CassPrepared* getCreated_ = nullptr; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -425,6 +433,51 @@ public: continue; } } + query = {}; + query << "CREATE TABLE IF NOT EXISTS " << tableName << "keys" + << " ( key blob, created bigint, deleted bigint, PRIMARY KEY " + "(key, created)) with clustering order by (created " + "desc) "; + statement = makeStatement(query.str().c_str(), 0); + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + cass_future_free(fut); + cass_statement_free(statement); + if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) + { + std::stringstream ss; + ss << "nodestore: Error creating Cassandra table: " << rc + << ", " << cass_error_desc(rc) << " - " << query.str(); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + query = {}; + query << "SELECT * FROM " << tableName << "keys" + << " LIMIT 1"; + statement = makeStatement(query.str().c_str(), 0); + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + cass_future_free(fut); + cass_statement_free(statement); + if (rc != CASS_OK) + { + if (rc == CASS_ERROR_SERVER_INVALID_QUERY) + { + BOOST_LOG_TRIVIAL(warning) + << "table not here yet, sleeping 1s to " + "see if table creation propagates"; + continue; + } + else + { + std::stringstream ss; + ss << "nodestore: Error checking for table: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + } setupSessionAndTable = true; } @@ -489,6 +542,56 @@ public: insertTransaction_ = cass_future_get_prepared(prepare_future); cass_future_free(prepare_future); + query = {}; + query << "INSERT INTO " << tableName << "keys" + << " (key, created, deleted) VALUES (?, ?, ?)"; + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + /* Wait for the statement to prepare and get the result */ + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + /* Handle error */ + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: Error preparing insert : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + /* Get the prepared object from the future */ + insertKey_ = cass_future_get_prepared(prepare_future); + cass_future_free(prepare_future); + + query = {}; + query << "SELECT created FROM " << tableName << "keys" + << " WHERE key = ? ORDER BY created desc LIMIT 1"; + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + /* Wait for the statement to prepare and get the result */ + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + /* Handle error */ + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: Error preparing insert : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + /* Get the prepared object from the future */ + getCreated_ = cass_future_get_prepared(prepare_future); + cass_future_free(prepare_future); + query = {}; query << "SELECT object, sequence FROM " << tableName << "flat" << " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC " @@ -550,9 +653,9 @@ public: cass_future_free(prepare_future); query = {}; - query << "SELECT key, object FROM " << tableName << "flat " - << " WHERE TOKEN(key) >= ? and sequence <= ?" - << " and object > 0x" + query << "SELECT key FROM " << tableName << "keys " + << " WHERE TOKEN(key) >= ? and created <= ?" + << " and deleted > ?" << " PER PARTITION LIMIT 1 LIMIT ?" << " ALLOW FILTERING"; @@ -636,6 +739,11 @@ public: cass_prepared_free(insertObject_); insertObject_ = nullptr; } + if (insertKey_) + { + cass_prepared_free(insertKey_); + insertKey_ = nullptr; + } if (selectTransaction_) { cass_prepared_free(selectTransaction_); @@ -656,6 +764,11 @@ public: cass_prepared_free(getToken_); getToken_ = nullptr; } + if (getCreated_) + { + cass_prepared_free(getCreated_); + getCreated_ = nullptr; + } work_.reset(); ioThread_.join(); } @@ -906,8 +1019,17 @@ public: << cass_error_desc(rc); return {}; } + rc = cass_statement_bind_int64(statement, 2, seq); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra seq to doUpperBound query: " << rc << ", " + << cass_error_desc(rc); + return {}; + } - rc = cass_statement_bind_int32(statement, 2, limit + 1); + rc = cass_statement_bind_int32(statement, 3, limit); if (rc != CASS_OK) { cass_statement_free(statement); @@ -936,7 +1058,7 @@ public: cass_statement_free(statement); cass_future_free(fut); - std::vector result; + std::vector keys; CassIterator* iter = cass_iterator_from_result(res); while (cass_iterator_next(iter)) @@ -957,35 +1079,22 @@ public: ss << ": " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(warning) << ss.str(); } - ripple::uint256 resultHash = ripple::uint256::fromVoid(outData); - - CassValue const* entry = cass_row_get_column(row, 1); - rc = cass_value_get_bytes(entry, &outData, &outSize); - if (rc != CASS_OK) - { - cass_iterator_free(iter); - - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - if (outSize > 0) - { - std::vector resultBlob{ - outData, outData + outSize}; - - result.push_back({resultHash, resultBlob}); - } + keys.push_back(ripple::uint256::fromVoid(outData)); } - if (result.size()) + if (keys.size()) { - auto token = getToken(result[result.size() - 1].key.data()); + std::vector results; + std::vector objs = fetchObjectsBatch(keys, seq); + for (size_t i = 0; i < objs.size(); ++i) + { + results.push_back({keys[i], objs[i]}); + } + auto token = getToken(results[results.size() - 1].key.data()); assert(token); - return {result, token}; + return std::make_pair(results, token); } - return {result, {}}; + return {{}, {}}; } bool @@ -1086,6 +1195,112 @@ public: cass_future_free(fut); } + struct ReadObjectCallbackData + { + CassandraFlatMapBackend const& backend; + ripple::uint256 const& key; + uint32_t sequence; + Blob& result; + std::condition_variable& cv; + + std::atomic_uint32_t& numFinished; + size_t batchSize; + + ReadObjectCallbackData( + CassandraFlatMapBackend const& backend, + ripple::uint256 const& key, + uint32_t sequence, + Blob& result, + std::condition_variable& cv, + std::atomic_uint32_t& numFinished, + size_t batchSize) + : backend(backend) + , key(key) + , sequence(sequence) + , result(result) + , cv(cv) + , numFinished(numFinished) + , batchSize(batchSize) + { + } + + ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; + }; + std::vector + fetchObjectsBatch( + std::vector const& keys, + uint32_t sequence) const + { + std::size_t const numKeys = keys.size(); + BOOST_LOG_TRIVIAL(trace) + << "Fetching " << numKeys << " records from Cassandra"; + std::atomic_uint32_t numFinished = 0; + std::condition_variable cv; + std::mutex mtx; + std::vector results{numKeys}; + std::vector> cbs; + cbs.reserve(numKeys); + for (std::size_t i = 0; i < keys.size(); ++i) + { + cbs.push_back(std::make_shared( + *this, + keys[i], + sequence, + results[i], + cv, + numFinished, + numKeys)); + readObject(*cbs[i]); + } + assert(results.size() == cbs.size()); + + std::unique_lock lck(mtx); + cv.wait( + lck, [&numFinished, &numKeys]() { return numFinished == numKeys; }); + + BOOST_LOG_TRIVIAL(trace) + << "Fetched " << numKeys << " records from Cassandra"; + return results; + } + void + readObject(ReadObjectCallbackData& data) const + { + CassStatement* statement = cass_prepared_bind(selectObject_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassError rc = cass_statement_bind_bytes( + statement, 0, static_cast(data.key.data()), 32); + if (rc != CASS_OK) + { + size_t batchSize = data.batchSize; + if (++(data.numFinished) == batchSize) + data.cv.notify_all(); + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc + << ", " << cass_error_desc(rc); + return; + } + rc = cass_statement_bind_int64(statement, 1, data.sequence); + + if (rc != CASS_OK) + { + size_t batchSize = data.batchSize; + if (++(data.numFinished) == batchSize) + data.cv.notify_all(); + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc + << ", " << cass_error_desc(rc); + return; + } + + CassFuture* fut = cass_session_execute(session_.get(), statement); + + cass_statement_free(statement); + + cass_future_set_callback( + fut, flatMapReadObjectCallback, static_cast(&data)); + cass_future_free(fut); + } + struct WriteCallbackData { CassandraFlatMapBackend const* backend; @@ -1094,7 +1309,10 @@ public: // prematurely if other copies are removed from caches. std::string key; uint32_t sequence; + uint32_t createdSequence = 0; std::string blob; + bool isCreated; + bool isDeleted; uint32_t currentRetries = 0; @@ -1102,11 +1320,15 @@ public: CassandraFlatMapBackend const* f, std::string&& key, uint32_t sequence, - std::string&& blob) + std::string&& blob, + bool isCreated, + bool isDeleted) : backend(f) , key(std::move(key)) , sequence(sequence) , blob(std::move(blob)) + , isCreated(isCreated) + , isDeleted(isDeleted) { } }; @@ -1127,8 +1349,75 @@ public: }); } } + { + CassStatement* statement = cass_prepared_bind(insertObject_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + const unsigned char* keyData = (unsigned char*)data.key.data(); + CassError rc = cass_statement_bind_bytes( + statement, + 0, + static_cast(keyData), + data.key.size()); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "Binding cassandra insert hash: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + rc = cass_statement_bind_int64(statement, 1, data.sequence); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "Binding cassandra insert object: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + const unsigned char* blobData = (unsigned char*)data.blob.data(); + rc = cass_statement_bind_bytes( + statement, + 2, + static_cast(blobData), + data.blob.size()); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "Binding cassandra insert hash: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + CassFuture* fut = cass_session_execute(session_.get(), statement); + cass_statement_free(statement); - CassStatement* statement = cass_prepared_bind(insertObject_); + cass_future_set_callback( + fut, flatMapWriteCallback, static_cast(&data)); + cass_future_free(fut); + } + } + + void + writeDeletedKey(WriteCallbackData& data, bool isRetry) const + { + { + std::unique_lock lck(throttleMutex_); + if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) + { + BOOST_LOG_TRIVIAL(trace) + << __func__ << " : " + << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; + throttleCv_.wait(lck, [this]() { + return numRequestsOutstanding_ < maxRequestsOutstanding; + }); + } + } + CassStatement* statement = cass_prepared_bind(insertKey_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); const unsigned char* keyData = (unsigned char*)data.key.data(); CassError rc = cass_statement_bind_bytes( @@ -1145,27 +1434,22 @@ public: BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); throw std::runtime_error(ss.str()); } - rc = cass_statement_bind_int64(statement, 1, data.sequence); + rc = cass_statement_bind_int64(statement, 1, data.createdSequence); if (rc != CASS_OK) { cass_statement_free(statement); std::stringstream ss; - ss << "Binding cassandra insert object: " << rc << ", " + ss << "binding cassandra insert object: " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); throw std::runtime_error(ss.str()); } - const unsigned char* blobData = (unsigned char*)data.blob.data(); - rc = cass_statement_bind_bytes( - statement, - 2, - static_cast(blobData), - data.blob.size()); + rc = cass_statement_bind_int64(statement, 2, data.sequence); if (rc != CASS_OK) { cass_statement_free(statement); std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " + ss << "binding cassandra insert object: " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); throw std::runtime_error(ss.str()); @@ -1174,19 +1458,118 @@ public: cass_statement_free(statement); cass_future_set_callback( - fut, flatMapWriteCallback, static_cast(&data)); + fut, flatMapWriteKeyCallback, static_cast(&data)); cass_future_free(fut); } void - store(std::string&& key, uint32_t seq, std::string&& blob) const + writeKey(WriteCallbackData& data, bool isRetry) const + { + { + std::unique_lock lck(throttleMutex_); + if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) + { + BOOST_LOG_TRIVIAL(trace) + << __func__ << " : " + << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; + throttleCv_.wait(lck, [this]() { + return numRequestsOutstanding_ < maxRequestsOutstanding; + }); + } + } + if (data.isCreated) + { + CassStatement* statement = cass_prepared_bind(insertKey_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + const unsigned char* keyData = (unsigned char*)data.key.data(); + CassError rc = cass_statement_bind_bytes( + statement, + 0, + static_cast(keyData), + data.key.size()); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "Binding cassandra insert hash: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + rc = cass_statement_bind_int64(statement, 1, data.sequence); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "binding cassandra insert object: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + rc = cass_statement_bind_int64(statement, 2, INT64_MAX); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "binding cassandra insert object: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + CassFuture* fut = cass_session_execute(session_.get(), statement); + cass_statement_free(statement); + + cass_future_set_callback( + fut, flatMapWriteKeyCallback, static_cast(&data)); + cass_future_free(fut); + } + else if (data.isDeleted) + { + CassStatement* statement = cass_prepared_bind(getCreated_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + const unsigned char* keyData = (unsigned char*)data.key.data(); + CassError rc = cass_statement_bind_bytes( + statement, + 0, + static_cast(keyData), + data.key.size()); + if (rc != CASS_OK) + { + cass_statement_free(statement); + std::stringstream ss; + ss << "Binding cassandra insert hash: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + CassFuture* fut = cass_session_execute(session_.get(), statement); + cass_statement_free(statement); + + cass_future_set_callback( + fut, flatMapGetCreatedCallback, static_cast(&data)); + cass_future_free(fut); + } + } + + void + store( + std::string&& key, + uint32_t seq, + std::string&& blob, + bool isCreated = false, + bool isDeleted = false) const { BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; - WriteCallbackData* data = - new WriteCallbackData(this, std::move(key), seq, std::move(blob)); + WriteCallbackData* data = new WriteCallbackData( + this, std::move(key), seq, std::move(blob), isCreated, isDeleted); ++numRequestsOutstanding_; + if (isCreated || isDeleted) + ++numRequestsOutstanding_; write(*data, false); + if (isCreated || isDeleted) + writeKey(*data, false); } struct WriteTransactionCallbackData @@ -1329,10 +1712,16 @@ public: friend void flatMapWriteCallback(CassFuture* fut, void* cbData); friend void + flatMapWriteKeyCallback(CassFuture* fut, void* cbData); + friend void flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); friend void flatMapReadCallback(CassFuture* fut, void* cbData); + friend void + flatMapReadObjectCallback(CassFuture* fut, void* cbData); + friend void + flatMapGetCreatedCallback(CassFuture* fut, void* cbData); }; #endif diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index d6f5055a..7091de84 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -285,10 +285,21 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) { + bool isCreated = false; + bool isDeleted = false; + if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::CREATED) + isCreated = true; + else if ( + obj.mod_type() == org ::xrpl::rpc::v1::RawLedgerObject::DELETED) + isDeleted = true; + + assert(not(isCreated and isDeleted)); flatMapBackend_.store( std::move(*obj.mutable_key()), lgrInfo.seq, - std::move(*obj.mutable_data())); + std::move(*obj.mutable_data()), + isCreated, + isDeleted); } flatMapBackend_.sync(); BOOST_LOG_TRIVIAL(debug) diff --git a/rippled b/rippled index 2978847d..063363ff 160000 --- a/rippled +++ b/rippled @@ -1 +1 @@ -Subproject commit 2978847d8d96c6ed693c83d49b8ef2cce794c7ff +Subproject commit 063363ffae64d19e63602d892ceeef576986c1e9