keys table, new ledger_data algorithm

This commit is contained in:
CJ Cobb
2021-01-20 14:39:32 -05:00
parent e8332b17dc
commit 176f91e15c
5 changed files with 607 additions and 47 deletions

View File

@@ -444,7 +444,9 @@ public:
backend.store( backend.store(
std::move(*obj.mutable_key()), std::move(*obj.mutable_key()),
request_.ledger().sequence(), request_.ledger().sequence(),
std::move(*obj.mutable_data())); std::move(*obj.mutable_data()),
true,
false);
} }
return more ? CallStatus::MORE : CallStatus::DONE; return more ? CallStatus::MORE : CallStatus::DONE;

View File

@@ -36,6 +36,104 @@ flatMapWriteCallback(CassFuture* fut, void* cbData)
delete &requestParams; delete &requestParams;
} }
} }
void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteCallbackData*>(cbData);
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
if (requestParams.isDeleted)
backend.writeDeletedKey(requestParams, true);
else
backend.writeKey(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
}
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteCallbackData*>(cbData);
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeKey(requestParams, true);
});
}
else
{
auto finish = [&backend]() {
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
};
CassResult const* res = cass_future_get_result(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
cass_int64_t created;
rc = cass_value_get_int64(cass_row_get_column(row, 0), &created);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
cass_result_free(res);
requestParams.createdSequence = created;
backend.writeDeletedKey(requestParams, false);
}
}
void void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) flatMapWriteTransactionCallback(CassFuture* fut, void* cbData)
{ {
@@ -145,3 +243,63 @@ flatMapReadCallback(CassFuture* fut, void* cbData)
finish(); finish();
} }
} }
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::ReadObjectCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::ReadObjectCallbackData*>(cbData);
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(warning) << "Cassandra fetch error : " << rc << " : "
<< cass_error_desc(rc) << " - retrying";
// Retry right away. The only time the cluster should ever be overloaded
// is when the very first ledger is being written in full (millions of
// writes at once), during which no reads should be occurring. If reads
// are timing out, the code/architecture should be modified to handle
// greater read load, as opposed to just exponential backoff
requestParams.backend.readObject(requestParams);
}
else
{
auto finish = [&requestParams]() {
size_t batchSize = requestParams.batchSize;
if (++(requestParams.numFinished) == batchSize)
requestParams.cv.notify_all();
};
CassResult const* res = cass_future_get_result(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
cass_byte_t const* buf;
std::size_t bufSize;
rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
std::vector<unsigned char> obj{buf, buf + bufSize};
requestParams.result = std::move(obj);
cass_result_free(res);
finish();
}
}

View File

@@ -35,9 +35,15 @@
void void
flatMapWriteCallback(CassFuture* fut, void* cbData); flatMapWriteCallback(CassFuture* fut, void* cbData);
void void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
void void
flatMapReadCallback(CassFuture* fut, void* cbData); flatMapReadCallback(CassFuture* fut, void* cbData);
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
class CassandraFlatMapBackend class CassandraFlatMapBackend
{ {
private: private:
@@ -82,6 +88,8 @@ private:
const CassPrepared* selectObject_ = nullptr; const CassPrepared* selectObject_ = nullptr;
const CassPrepared* upperBound_ = nullptr; const CassPrepared* upperBound_ = nullptr;
const CassPrepared* getToken_ = nullptr; const CassPrepared* getToken_ = nullptr;
const CassPrepared* insertKey_ = nullptr;
const CassPrepared* getCreated_ = nullptr;
// io_context used for exponential backoff for write retries // io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_; mutable boost::asio::io_context ioContext_;
@@ -425,6 +433,51 @@ public:
continue; continue;
} }
} }
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "keys"
<< " ( key blob, created bigint, deleted bigint, PRIMARY KEY "
"(key, created)) with clustering order by (created "
"desc) ";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc
<< ", " << cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "keys"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
setupSessionAndTable = true; setupSessionAndTable = true;
} }
@@ -489,6 +542,56 @@ public:
insertTransaction_ = cass_future_get_prepared(prepare_future); insertTransaction_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future); cass_future_free(prepare_future);
query = {};
query << "INSERT INTO " << tableName << "keys"
<< " (key, created, deleted) VALUES (?, ?, ?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
/* Get the prepared object from the future */
insertKey_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "SELECT created FROM " << tableName << "keys"
<< " WHERE key = ? ORDER BY created desc LIMIT 1";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
/* Get the prepared object from the future */
getCreated_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {}; query = {};
query << "SELECT object, sequence FROM " << tableName << "flat" query << "SELECT object, sequence FROM " << tableName << "flat"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC " << " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
@@ -550,9 +653,9 @@ public:
cass_future_free(prepare_future); cass_future_free(prepare_future);
query = {}; query = {};
query << "SELECT key, object FROM " << tableName << "flat " query << "SELECT key FROM " << tableName << "keys "
<< " WHERE TOKEN(key) >= ? and sequence <= ?" << " WHERE TOKEN(key) >= ? and created <= ?"
<< " and object > 0x" << " and deleted > ?"
<< " PER PARTITION LIMIT 1 LIMIT ?" << " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING"; << " ALLOW FILTERING";
@@ -636,6 +739,11 @@ public:
cass_prepared_free(insertObject_); cass_prepared_free(insertObject_);
insertObject_ = nullptr; insertObject_ = nullptr;
} }
if (insertKey_)
{
cass_prepared_free(insertKey_);
insertKey_ = nullptr;
}
if (selectTransaction_) if (selectTransaction_)
{ {
cass_prepared_free(selectTransaction_); cass_prepared_free(selectTransaction_);
@@ -656,6 +764,11 @@ public:
cass_prepared_free(getToken_); cass_prepared_free(getToken_);
getToken_ = nullptr; getToken_ = nullptr;
} }
if (getCreated_)
{
cass_prepared_free(getCreated_);
getCreated_ = nullptr;
}
work_.reset(); work_.reset();
ioThread_.join(); ioThread_.join();
} }
@@ -906,8 +1019,17 @@ public:
<< cass_error_desc(rc); << cass_error_desc(rc);
return {}; return {};
} }
rc = cass_statement_bind_int64(statement, 2, seq);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra seq to doUpperBound query: " << rc << ", "
<< cass_error_desc(rc);
return {};
}
rc = cass_statement_bind_int32(statement, 2, limit + 1); rc = cass_statement_bind_int32(statement, 3, limit);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
@@ -936,7 +1058,7 @@ public:
cass_statement_free(statement); cass_statement_free(statement);
cass_future_free(fut); cass_future_free(fut);
std::vector<LedgerObject> result; std::vector<ripple::uint256> keys;
CassIterator* iter = cass_iterator_from_result(res); CassIterator* iter = cass_iterator_from_result(res);
while (cass_iterator_next(iter)) while (cass_iterator_next(iter))
@@ -957,35 +1079,22 @@ public:
ss << ": " << cass_error_desc(rc); ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str(); BOOST_LOG_TRIVIAL(warning) << ss.str();
} }
ripple::uint256 resultHash = ripple::uint256::fromVoid(outData); keys.push_back(ripple::uint256::fromVoid(outData));
CassValue const* entry = cass_row_get_column(row, 1);
rc = cass_value_get_bytes(entry, &outData, &outSize);
if (rc != CASS_OK)
{
cass_iterator_free(iter);
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
if (outSize > 0)
{
std::vector<unsigned char> resultBlob{
outData, outData + outSize};
result.push_back({resultHash, resultBlob});
}
} }
if (result.size()) if (keys.size())
{ {
auto token = getToken(result[result.size() - 1].key.data()); std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchObjectsBatch(keys, seq);
for (size_t i = 0; i < objs.size(); ++i)
{
results.push_back({keys[i], objs[i]});
}
auto token = getToken(results[results.size() - 1].key.data());
assert(token); assert(token);
return {result, token}; return std::make_pair(results, token);
} }
return {result, {}}; return {{}, {}};
} }
bool bool
@@ -1086,6 +1195,112 @@ public:
cass_future_free(fut); cass_future_free(fut);
} }
struct ReadObjectCallbackData
{
CassandraFlatMapBackend const& backend;
ripple::uint256 const& key;
uint32_t sequence;
Blob& result;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadObjectCallbackData(
CassandraFlatMapBackend const& backend,
ripple::uint256 const& key,
uint32_t sequence,
Blob& result,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, key(key)
, sequence(sequence)
, result(result)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
ReadObjectCallbackData(ReadObjectCallbackData const& other) = default;
};
std::vector<Blob>
fetchObjectsBatch(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const
{
std::size_t const numKeys = keys.size();
BOOST_LOG_TRIVIAL(trace)
<< "Fetching " << numKeys << " records from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<Blob> results{numKeys};
std::vector<std::shared_ptr<ReadObjectCallbackData>> cbs;
cbs.reserve(numKeys);
for (std::size_t i = 0; i < keys.size(); ++i)
{
cbs.push_back(std::make_shared<ReadObjectCallbackData>(
*this,
keys[i],
sequence,
results[i],
cv,
numFinished,
numKeys));
readObject(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(
lck, [&numFinished, &numKeys]() { return numFinished == numKeys; });
BOOST_LOG_TRIVIAL(trace)
<< "Fetched " << numKeys << " records from Cassandra";
return results;
}
void
readObject(ReadObjectCallbackData& data) const
{
CassStatement* statement = cass_prepared_bind(selectObject_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(data.key.data()), 32);
if (rc != CASS_OK)
{
size_t batchSize = data.batchSize;
if (++(data.numFinished) == batchSize)
data.cv.notify_all();
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc
<< ", " << cass_error_desc(rc);
return;
}
rc = cass_statement_bind_int64(statement, 1, data.sequence);
if (rc != CASS_OK)
{
size_t batchSize = data.batchSize;
if (++(data.numFinished) == batchSize)
data.cv.notify_all();
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc
<< ", " << cass_error_desc(rc);
return;
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapReadObjectCallback, static_cast<void*>(&data));
cass_future_free(fut);
}
struct WriteCallbackData struct WriteCallbackData
{ {
CassandraFlatMapBackend const* backend; CassandraFlatMapBackend const* backend;
@@ -1094,7 +1309,10 @@ public:
// prematurely if other copies are removed from caches. // prematurely if other copies are removed from caches.
std::string key; std::string key;
uint32_t sequence; uint32_t sequence;
uint32_t createdSequence = 0;
std::string blob; std::string blob;
bool isCreated;
bool isDeleted;
uint32_t currentRetries = 0; uint32_t currentRetries = 0;
@@ -1102,11 +1320,15 @@ public:
CassandraFlatMapBackend const* f, CassandraFlatMapBackend const* f,
std::string&& key, std::string&& key,
uint32_t sequence, uint32_t sequence,
std::string&& blob) std::string&& blob,
bool isCreated,
bool isDeleted)
: backend(f) : backend(f)
, key(std::move(key)) , key(std::move(key))
, sequence(sequence) , sequence(sequence)
, blob(std::move(blob)) , blob(std::move(blob))
, isCreated(isCreated)
, isDeleted(isDeleted)
{ {
} }
}; };
@@ -1127,8 +1349,75 @@ public:
}); });
} }
} }
{
CassStatement* statement = cass_prepared_bind(insertObject_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 1, data.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
const unsigned char* blobData = (unsigned char*)data.blob.data();
rc = cass_statement_bind_bytes(
statement,
2,
static_cast<cass_byte_t const*>(blobData),
data.blob.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
CassStatement* statement = cass_prepared_bind(insertObject_); cass_future_set_callback(
fut, flatMapWriteCallback, static_cast<void*>(&data));
cass_future_free(fut);
}
}
void
writeDeletedKey(WriteCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
CassStatement* statement = cass_prepared_bind(insertKey_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data(); const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes( CassError rc = cass_statement_bind_bytes(
@@ -1145,27 +1434,22 @@ public:
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
rc = cass_statement_bind_int64(statement, 1, data.sequence); rc = cass_statement_bind_int64(statement, 1, data.createdSequence);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
std::stringstream ss; std::stringstream ss;
ss << "Binding cassandra insert object: " << rc << ", " ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc); << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
const unsigned char* blobData = (unsigned char*)data.blob.data(); rc = cass_statement_bind_int64(statement, 2, data.sequence);
rc = cass_statement_bind_bytes(
statement,
2,
static_cast<cass_byte_t const*>(blobData),
data.blob.size());
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
std::stringstream ss; std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", " ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc); << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
@@ -1174,19 +1458,118 @@ public:
cass_statement_free(statement); cass_statement_free(statement);
cass_future_set_callback( cass_future_set_callback(
fut, flatMapWriteCallback, static_cast<void*>(&data)); fut, flatMapWriteKeyCallback, static_cast<void*>(&data));
cass_future_free(fut); cass_future_free(fut);
} }
void void
store(std::string&& key, uint32_t seq, std::string&& blob) const writeKey(WriteCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
if (data.isCreated)
{
CassStatement* statement = cass_prepared_bind(insertKey_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 1, data.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 2, INT64_MAX);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapWriteKeyCallback, static_cast<void*>(&data));
cass_future_free(fut);
}
else if (data.isDeleted)
{
CassStatement* statement = cass_prepared_bind(getCreated_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapGetCreatedCallback, static_cast<void*>(&data));
cass_future_free(fut);
}
}
void
store(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated = false,
bool isDeleted = false) const
{ {
BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
WriteCallbackData* data = WriteCallbackData* data = new WriteCallbackData(
new WriteCallbackData(this, std::move(key), seq, std::move(blob)); this, std::move(key), seq, std::move(blob), isCreated, isDeleted);
++numRequestsOutstanding_; ++numRequestsOutstanding_;
if (isCreated || isDeleted)
++numRequestsOutstanding_;
write(*data, false); write(*data, false);
if (isCreated || isDeleted)
writeKey(*data, false);
} }
struct WriteTransactionCallbackData struct WriteTransactionCallbackData
@@ -1329,10 +1712,16 @@ public:
friend void friend void
flatMapWriteCallback(CassFuture* fut, void* cbData); flatMapWriteCallback(CassFuture* fut, void* cbData);
friend void friend void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
friend void friend void
flatMapReadCallback(CassFuture* fut, void* cbData); flatMapReadCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
friend void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
}; };
#endif #endif

View File

@@ -285,10 +285,21 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
{ {
bool isCreated = false;
bool isDeleted = false;
if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::CREATED)
isCreated = true;
else if (
obj.mod_type() == org ::xrpl::rpc::v1::RawLedgerObject::DELETED)
isDeleted = true;
assert(not(isCreated and isDeleted));
flatMapBackend_.store( flatMapBackend_.store(
std::move(*obj.mutable_key()), std::move(*obj.mutable_key()),
lgrInfo.seq, lgrInfo.seq,
std::move(*obj.mutable_data())); std::move(*obj.mutable_data()),
isCreated,
isDeleted);
} }
flatMapBackend_.sync(); flatMapBackend_.sync();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)

Submodule rippled updated: 2978847d8d...063363ffae