mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-30 08:35:52 +00:00
small fixes in CassandraBackend
This commit is contained in:
@@ -36,7 +36,7 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func)
|
|||||||
std::make_shared<boost::asio::steady_timer>(
|
std::make_shared<boost::asio::steady_timer>(
|
||||||
backend.getIOContext(),
|
backend.getIOContext(),
|
||||||
std::chrono::steady_clock::now() + wait);
|
std::chrono::steady_clock::now() + wait);
|
||||||
timer->async_wait([timer, &requestParams, &func](
|
timer->async_wait([timer, &requestParams, func](
|
||||||
const boost::system::error_code& error) {
|
const boost::system::error_code& error) {
|
||||||
func(requestParams, true);
|
func(requestParams, true);
|
||||||
});
|
});
|
||||||
@@ -66,16 +66,6 @@ flatMapWriteCallback(CassFuture* fut, void* cbData)
|
|||||||
processAsyncWriteResponse(requestParams, fut, func);
|
processAsyncWriteResponse(requestParams, fut, func);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
|
||||||
flatMapWriteBookCallback(CassFuture* fut, void* cbData)
|
|
||||||
{
|
|
||||||
CassandraBackend::WriteCallbackData& requestParams =
|
|
||||||
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
|
|
||||||
auto func = [](auto& params, bool retry) {
|
|
||||||
params.backend->writeBook(params, retry);
|
|
||||||
};
|
|
||||||
processAsyncWriteResponse(requestParams, fut, func);
|
|
||||||
}
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -564,7 +554,7 @@ struct WriteBookCallbackData
|
|||||||
ripple::uint256 offerKey;
|
ripple::uint256 offerKey;
|
||||||
uint32_t ledgerSequence;
|
uint32_t ledgerSequence;
|
||||||
std::condition_variable& cv;
|
std::condition_variable& cv;
|
||||||
std::atomic_uint32_t& numRemaining;
|
std::atomic_uint32_t& numOutstanding;
|
||||||
std::mutex& mtx;
|
std::mutex& mtx;
|
||||||
uint32_t currentRetries = 0;
|
uint32_t currentRetries = 0;
|
||||||
WriteBookCallbackData(
|
WriteBookCallbackData(
|
||||||
@@ -574,14 +564,14 @@ struct WriteBookCallbackData
|
|||||||
uint32_t ledgerSequence,
|
uint32_t ledgerSequence,
|
||||||
std::condition_variable& cv,
|
std::condition_variable& cv,
|
||||||
std::mutex& mtx,
|
std::mutex& mtx,
|
||||||
std::atomic_uint32_t& numRemaining)
|
std::atomic_uint32_t& numOutstanding)
|
||||||
: backend(backend)
|
: backend(backend)
|
||||||
, book(book)
|
, book(book)
|
||||||
, offerKey(offerKey)
|
, offerKey(offerKey)
|
||||||
, ledgerSequence(ledgerSequence)
|
, ledgerSequence(ledgerSequence)
|
||||||
, cv(cv)
|
, cv(cv)
|
||||||
, mtx(mtx)
|
, mtx(mtx)
|
||||||
, numRemaining(numRemaining)
|
, numOutstanding(numOutstanding)
|
||||||
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -589,7 +579,7 @@ struct WriteBookCallbackData
|
|||||||
void
|
void
|
||||||
writeBookCallback(CassFuture* fut, void* cbData);
|
writeBookCallback(CassFuture* fut, void* cbData);
|
||||||
void
|
void
|
||||||
writeBook2(WriteBookCallbackData& cb)
|
writeBook(WriteBookCallbackData& cb)
|
||||||
{
|
{
|
||||||
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
|
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
|
||||||
statement.bindBytes(cb.book);
|
statement.bindBytes(cb.book);
|
||||||
@@ -622,7 +612,7 @@ writeBookCallback(CassFuture* fut, void* cbData)
|
|||||||
std::chrono::steady_clock::now() + wait);
|
std::chrono::steady_clock::now() + wait);
|
||||||
timer->async_wait(
|
timer->async_wait(
|
||||||
[timer, &requestParams](const boost::system::error_code& error) {
|
[timer, &requestParams](const boost::system::error_code& error) {
|
||||||
writeBook2(requestParams);
|
writeBook(requestParams);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -630,7 +620,7 @@ writeBookCallback(CassFuture* fut, void* cbData)
|
|||||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
|
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
|
||||||
{
|
{
|
||||||
std::lock_guard lck(requestParams.mtx);
|
std::lock_guard lck(requestParams.mtx);
|
||||||
--requestParams.numRemaining;
|
--requestParams.numOutstanding;
|
||||||
requestParams.cv.notify_one();
|
requestParams.cv.notify_one();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -789,7 +779,7 @@ CassandraBackend::writeBooks(
|
|||||||
cv,
|
cv,
|
||||||
mtx,
|
mtx,
|
||||||
numOutstanding));
|
numOutstanding));
|
||||||
writeBook2(*cbs.back());
|
writeBook(*cbs.back());
|
||||||
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
|
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
|
||||||
std::unique_lock<std::mutex> lck(mtx);
|
std::unique_lock<std::mutex> lck(mtx);
|
||||||
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
|
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
|
||||||
|
|||||||
@@ -1220,54 +1220,6 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
void
|
|
||||||
writeDeletedKey(WriteCallbackData& data, bool isRetry) const
|
|
||||||
{
|
|
||||||
CassandraStatement statement{insertKey_};
|
|
||||||
statement.bindBytes(data.key);
|
|
||||||
statement.bindInt(data.createdSequence);
|
|
||||||
statement.bindInt(data.sequence);
|
|
||||||
executeAsyncWrite(statement, flatMapWriteKeyCallback, data, isRetry);
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
writeKey(WriteCallbackData& data, bool isRetry) const
|
|
||||||
{
|
|
||||||
if (data.isCreated)
|
|
||||||
{
|
|
||||||
CassandraStatement statement{insertKey_};
|
|
||||||
statement.bindBytes(data.key);
|
|
||||||
statement.bindInt(data.sequence);
|
|
||||||
statement.bindInt(INT64_MAX);
|
|
||||||
|
|
||||||
executeAsyncWrite(
|
|
||||||
statement, flatMapWriteKeyCallback, data, isRetry);
|
|
||||||
}
|
|
||||||
else if (data.isDeleted)
|
|
||||||
{
|
|
||||||
CassandraStatement statement{getCreated_};
|
|
||||||
|
|
||||||
executeAsyncWrite(
|
|
||||||
statement, flatMapGetCreatedCallback, data, isRetry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
void
|
|
||||||
writeBook(WriteCallbackData& data, bool isRetry) const
|
|
||||||
{
|
|
||||||
assert(data.isCreated or data.isDeleted);
|
|
||||||
assert(data.book);
|
|
||||||
CassandraStatement statement{
|
|
||||||
(data.isCreated ? insertBook_ : deleteBook_)};
|
|
||||||
statement.bindBytes(*data.book);
|
|
||||||
statement.bindBytes(data.key);
|
|
||||||
statement.bindInt(data.sequence);
|
|
||||||
if (data.isCreated)
|
|
||||||
statement.bindInt(INT64_MAX);
|
|
||||||
executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry);
|
|
||||||
}
|
|
||||||
void
|
void
|
||||||
doWriteLedgerObject(
|
doWriteLedgerObject(
|
||||||
std::string&& key,
|
std::string&& key,
|
||||||
|
|||||||
Reference in New Issue
Block a user