mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-27 23:25:53 +00:00
partial merge needs fixing
This commit is contained in:
@@ -394,7 +394,7 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
|
||||
return objects;
|
||||
}
|
||||
LedgerPage
|
||||
CassandraBackend::fetchLedgerPage(
|
||||
CassandraBackend::doFetchLedgerPage(
|
||||
std::optional<ripple::uint256> const& cursor,
|
||||
std::uint32_t ledgerSequence,
|
||||
std::uint32_t limit) const
|
||||
@@ -405,12 +405,12 @@ CassandraBackend::fetchLedgerPage(
|
||||
LedgerPage page;
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " ledgerSequence = " << std::to_string(ledgerSequence)
|
||||
<< " index = " << std::to_string(*index);
|
||||
<< " index = " << std::to_string(index->keyIndex);
|
||||
if (cursor)
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
|
||||
CassandraStatement statement{selectKeys_};
|
||||
statement.bindInt(*index);
|
||||
statement.bindInt(index->keyIndex);
|
||||
if (cursor)
|
||||
statement.bindBytes(*cursor);
|
||||
else
|
||||
@@ -422,7 +422,7 @@ CassandraBackend::fetchLedgerPage(
|
||||
CassandraResult result = executeSyncRead(statement);
|
||||
if (!!result)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " - got keys - size = " << result.numRows();
|
||||
std::vector<ripple::uint256> keys;
|
||||
|
||||
@@ -430,17 +430,17 @@ CassandraBackend::fetchLedgerPage(
|
||||
{
|
||||
keys.push_back(result.getUInt256());
|
||||
} while (result.nextRow());
|
||||
if (keys.size() && keys.size() == limit)
|
||||
if (keys.size() && keys.size() >= limit)
|
||||
{
|
||||
page.cursor = keys.back();
|
||||
keys.pop_back();
|
||||
++(*page.cursor);
|
||||
}
|
||||
auto objects = fetchLedgerObjects(keys, ledgerSequence);
|
||||
if (objects.size() != keys.size())
|
||||
throw std::runtime_error("Mismatch in size of objects and keys");
|
||||
|
||||
if (cursor)
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor);
|
||||
|
||||
for (size_t i = 0; i < objects.size(); ++i)
|
||||
@@ -494,129 +494,6 @@ CassandraBackend::fetchLedgerObjects(
|
||||
<< "Fetched " << numKeys << " records from Cassandra";
|
||||
return results;
|
||||
}
|
||||
BookOffersPage
|
||||
CassandraBackend::fetchBookOffers(
|
||||
ripple::uint256 const& book,
|
||||
uint32_t ledgerSequence,
|
||||
std::uint32_t limit,
|
||||
std::optional<ripple::uint256> const& cursor) const
|
||||
{
|
||||
auto rng = fetchLedgerRange();
|
||||
auto limitTuningFactor = 50;
|
||||
|
||||
if(!rng)
|
||||
return {{},{}};
|
||||
|
||||
auto readBooks =
|
||||
[this, &book, &limit, &limitTuningFactor]
|
||||
(std::uint32_t sequence)
|
||||
-> std::pair<bool, std::vector<std::pair<std::uint64_t, ripple::uint256>>>
|
||||
{
|
||||
CassandraStatement completeQuery{completeBook_};
|
||||
completeQuery.bindInt(sequence);
|
||||
CassandraResult completeResult = executeSyncRead(completeQuery);
|
||||
bool complete = completeResult.hasResult();
|
||||
|
||||
CassandraStatement statement{selectBook_};
|
||||
std::vector<std::pair<std::uint64_t, ripple::uint256>> keys = {};
|
||||
|
||||
statement.bindBytes(book.data(), 24);
|
||||
statement.bindInt(sequence);
|
||||
|
||||
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(sequence)
|
||||
<< " book = " << ripple::strHex(std::string((char*)book.data(), 24));
|
||||
|
||||
ripple::uint256 zero = beast::zero;
|
||||
statement.bindBytes(zero.data(), 8);
|
||||
statement.bindBytes(zero);
|
||||
|
||||
statement.bindUInt(limit * limitTuningFactor);
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
|
||||
CassandraResult result = executeSyncRead(statement);
|
||||
|
||||
auto end = std::chrono::system_clock::now();
|
||||
auto duration = ((end - start).count()) / 1000000000.0;
|
||||
|
||||
BOOST_LOG_TRIVIAL(info) << "Book directory fetch took "
|
||||
<< std::to_string(duration) << " seconds.";
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
|
||||
if (!result)
|
||||
{
|
||||
return {false, {{}, {}}};
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
auto [quality, index] = result.getBytesTuple();
|
||||
std::uint64_t q = 0;
|
||||
memcpy(&q, quality.data(), 8);
|
||||
keys.push_back({q, ripple::uint256::fromVoid(index.data())});
|
||||
|
||||
} while (result.nextRow());
|
||||
|
||||
return {complete, keys};
|
||||
};
|
||||
|
||||
auto upper = indexer_.getBookIndexOfSeq(ledgerSequence);
|
||||
auto [complete, quality_keys] = readBooks(upper);
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " - populated keys. num keys = " << quality_keys.size();
|
||||
|
||||
std::optional<std::string> warning = {};
|
||||
if (!complete)
|
||||
{
|
||||
warning = "Data may be incomplete";
|
||||
BOOST_LOG_TRIVIAL(info) << "May be incomplete. Fetching other page";
|
||||
|
||||
auto bookShift = indexer_.getBookShift();
|
||||
std::uint32_t lower = upper - (1 << bookShift);
|
||||
auto originalKeys = std::move(quality_keys);
|
||||
auto [lowerComplete, otherKeys] = readBooks(lower);
|
||||
|
||||
assert(lowerComplete);
|
||||
|
||||
std::vector<std::pair<std::uint64_t, ripple::uint256>> merged_keys;
|
||||
merged_keys.reserve(originalKeys.size() + otherKeys.size());
|
||||
std::merge(originalKeys.begin(), originalKeys.end(),
|
||||
otherKeys.begin(), otherKeys.end(),
|
||||
std::back_inserter(merged_keys),
|
||||
[](auto pair1, auto pair2)
|
||||
{
|
||||
return pair1.first < pair2.first;
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256> merged(quality_keys.size());
|
||||
std::transform(quality_keys.begin(), quality_keys.end(),
|
||||
std::back_inserter(merged),
|
||||
[](auto pair) { return pair.second; });
|
||||
|
||||
auto uniqEnd = std::unique(merged.begin(), merged.end());
|
||||
std::vector<ripple::uint256> keys{merged.begin(), uniqEnd};
|
||||
|
||||
std::cout << keys.size() << std::endl;
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
std::vector<Blob> objs = fetchLedgerObjects(keys, ledgerSequence);
|
||||
auto end = std::chrono::system_clock::now();
|
||||
auto duration = ((end - start).count()) / 1000000000.0;
|
||||
|
||||
BOOST_LOG_TRIVIAL(info) << "Book object fetch took "
|
||||
<< std::to_string(duration) << " seconds.";
|
||||
|
||||
std::vector<LedgerObject> results;
|
||||
for (size_t i = 0; i < objs.size(); ++i)
|
||||
{
|
||||
if (objs[i].size() != 0)
|
||||
results.push_back({keys[i], objs[i]});
|
||||
}
|
||||
|
||||
return {results, {}, warning};
|
||||
}
|
||||
struct WriteBookCallbackData
|
||||
{
|
||||
CassandraBackend const& backend;
|
||||
@@ -654,7 +531,7 @@ writeBook(WriteBookCallbackData& cb)
|
||||
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
|
||||
statement.bindBytes(cb.book.data(), 24);
|
||||
statement.bindInt(cb.ledgerSequence);
|
||||
statement.bindBytes(cb.book.data()+24, 8);
|
||||
statement.bindBytes(cb.book.data() + 24, 8);
|
||||
statement.bindBytes(cb.offerKey);
|
||||
// Passing isRetry as true bypasses incrementing numOutstanding
|
||||
cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true);
|
||||
@@ -723,6 +600,87 @@ struct WriteKeyCallbackData
|
||||
{
|
||||
}
|
||||
};
|
||||
struct OnlineDeleteCallbackData
|
||||
{
|
||||
CassandraBackend const& backend;
|
||||
ripple::uint256 key;
|
||||
uint32_t ledgerSequence;
|
||||
std::vector<unsigned char> object;
|
||||
std::condition_variable& cv;
|
||||
std::atomic_uint32_t& numOutstanding;
|
||||
std::mutex& mtx;
|
||||
uint32_t currentRetries = 0;
|
||||
OnlineDeleteCallbackData(
|
||||
CassandraBackend const& backend,
|
||||
ripple::uint256&& key,
|
||||
uint32_t ledgerSequence,
|
||||
std::vector<unsigned char>&& object,
|
||||
std::condition_variable& cv,
|
||||
std::mutex& mtx,
|
||||
std::atomic_uint32_t& numOutstanding)
|
||||
: backend(backend)
|
||||
, key(std::move(key))
|
||||
, ledgerSequence(ledgerSequence)
|
||||
, object(std::move(object))
|
||||
, cv(cv)
|
||||
, mtx(mtx)
|
||||
, numOutstanding(numOutstanding)
|
||||
|
||||
{
|
||||
}
|
||||
};
|
||||
void
|
||||
onlineDeleteCallback(CassFuture* fut, void* cbData);
|
||||
void
|
||||
onlineDelete(OnlineDeleteCallbackData& cb)
|
||||
{
|
||||
{
|
||||
CassandraStatement statement{
|
||||
cb.backend.getInsertObjectPreparedStatement()};
|
||||
statement.bindBytes(cb.key);
|
||||
statement.bindInt(cb.ledgerSequence);
|
||||
statement.bindBytes(cb.object);
|
||||
|
||||
cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true);
|
||||
}
|
||||
}
|
||||
void
|
||||
onlineDeleteCallback(CassFuture* fut, void* cbData)
|
||||
{
|
||||
OnlineDeleteCallbackData& requestParams =
|
||||
*static_cast<OnlineDeleteCallbackData*>(cbData);
|
||||
|
||||
CassandraBackend const& backend = requestParams.backend;
|
||||
auto rc = cass_future_error_code(fut);
|
||||
if (rc != CASS_OK)
|
||||
{
|
||||
// exponential backoff with a max wait of 2^10 ms (about 1 second)
|
||||
auto wait = std::chrono::milliseconds(
|
||||
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
|
||||
BOOST_LOG_TRIVIAL(error)
|
||||
<< "ERROR!!! Cassandra insert book error: " << rc << ", "
|
||||
<< cass_error_desc(rc) << ", retrying in " << wait.count()
|
||||
<< " milliseconds";
|
||||
++requestParams.currentRetries;
|
||||
std::shared_ptr<boost::asio::steady_timer> timer =
|
||||
std::make_shared<boost::asio::steady_timer>(
|
||||
backend.getIOContext(),
|
||||
std::chrono::steady_clock::now() + wait);
|
||||
timer->async_wait(
|
||||
[timer, &requestParams](const boost::system::error_code& error) {
|
||||
onlineDelete(requestParams);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
|
||||
{
|
||||
std::lock_guard lck(requestParams.mtx);
|
||||
--requestParams.numOutstanding;
|
||||
requestParams.cv.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
void
|
||||
writeKeyCallback(CassFuture* fut, void* cbData);
|
||||
void
|
||||
@@ -775,14 +733,9 @@ writeKeyCallback(CassFuture* fut, void* cbData)
|
||||
bool
|
||||
CassandraBackend::writeKeys(
|
||||
std::unordered_set<ripple::uint256> const& keys,
|
||||
uint32_t ledgerSequence,
|
||||
KeyIndex const& index,
|
||||
bool isAsync) const
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
|
||||
<< " . num keys = " << std::to_string(keys.size())
|
||||
<< " . concurrentLimit = "
|
||||
<< std::to_string(indexerMaxRequestsOutstanding);
|
||||
std::atomic_uint32_t numRemaining = keys.size();
|
||||
std::condition_variable cv;
|
||||
std::mutex mtx;
|
||||
@@ -790,11 +743,16 @@ CassandraBackend::writeKeys(
|
||||
cbs.reserve(keys.size());
|
||||
uint32_t concurrentLimit =
|
||||
isAsync ? indexerMaxRequestsOutstanding : keys.size();
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " Ledger = " << std::to_string(index.keyIndex)
|
||||
<< " . num keys = " << std::to_string(keys.size())
|
||||
<< " . concurrentLimit = "
|
||||
<< std::to_string(indexerMaxRequestsOutstanding);
|
||||
uint32_t numSubmitted = 0;
|
||||
for (auto& key : keys)
|
||||
{
|
||||
cbs.push_back(std::make_shared<WriteKeyCallbackData>(
|
||||
*this, key, ledgerSequence, cv, mtx, numRemaining));
|
||||
*this, key, index.keyIndex, cv, mtx, numRemaining));
|
||||
writeKey(*cbs.back());
|
||||
++numSubmitted;
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
|
||||
@@ -812,7 +770,7 @@ CassandraBackend::writeKeys(
|
||||
concurrentLimit;
|
||||
});
|
||||
if (numSubmitted % 100000 == 0)
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " Submitted " << std::to_string(numSubmitted)
|
||||
<< " write requests. Completed "
|
||||
<< (keys.size() - numRemaining);
|
||||
@@ -823,57 +781,6 @@ CassandraBackend::writeKeys(
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
CassandraBackend::writeBooks(
|
||||
std::unordered_map<
|
||||
ripple::uint256,
|
||||
std::unordered_set<ripple::uint256>> const& books,
|
||||
uint32_t ledgerSequence,
|
||||
bool isAsync) const
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
|
||||
<< " . num books = " << std::to_string(books.size());
|
||||
std::condition_variable cv;
|
||||
std::mutex mtx;
|
||||
std::vector<std::shared_ptr<WriteBookCallbackData>> cbs;
|
||||
uint32_t concurrentLimit =
|
||||
isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding;
|
||||
std::atomic_uint32_t numOutstanding = 0;
|
||||
size_t count = 0;
|
||||
auto start = std::chrono::system_clock::now();
|
||||
for (auto& book : books)
|
||||
{
|
||||
for (auto& offer : book.second)
|
||||
{
|
||||
++numOutstanding;
|
||||
++count;
|
||||
cbs.push_back(std::make_shared<WriteBookCallbackData>(
|
||||
*this,
|
||||
book.first,
|
||||
offer,
|
||||
ledgerSequence,
|
||||
cv,
|
||||
mtx,
|
||||
numOutstanding));
|
||||
writeBook(*cbs.back());
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
|
||||
std::unique_lock<std::mutex> lck(mtx);
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
|
||||
cv.wait(lck, [&numOutstanding, concurrentLimit]() {
|
||||
return numOutstanding < concurrentLimit;
|
||||
});
|
||||
}
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(info) << __func__
|
||||
<< "Submitted all book writes. Waiting for them to "
|
||||
"finish. num submitted = "
|
||||
<< std::to_string(count);
|
||||
std::unique_lock<std::mutex> lck(mtx);
|
||||
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
|
||||
BOOST_LOG_TRIVIAL(info) << __func__ << "Finished writing books";
|
||||
return true;
|
||||
}
|
||||
bool
|
||||
CassandraBackend::isIndexed(uint32_t ledgerSequence) const
|
||||
{
|
||||
@@ -1100,10 +1007,79 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
|
||||
*/
|
||||
}
|
||||
bool
|
||||
CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
|
||||
CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
|
||||
{
|
||||
throw std::runtime_error("doOnlineDelete : unimplemented");
|
||||
return false;
|
||||
// calculate TTL
|
||||
// ledgers close roughly every 4 seconds. We double the TTL so that way
|
||||
// there is a window of time to update the database, to prevent unchanging
|
||||
// records from being deleted.
|
||||
auto rng = fetchLedgerRangeNoThrow();
|
||||
if (!rng)
|
||||
return false;
|
||||
uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
|
||||
if (minLedger <= rng->minSequence)
|
||||
return false;
|
||||
std::condition_variable cv;
|
||||
std::mutex mtx;
|
||||
std::vector<std::shared_ptr<OnlineDeleteCallbackData>> cbs;
|
||||
uint32_t concurrentLimit = 10;
|
||||
std::atomic_uint32_t numOutstanding = 0;
|
||||
|
||||
// iterate through latest ledger, updating TTL
|
||||
std::optional<ripple::uint256> cursor;
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto [objects, curCursor, warning] =
|
||||
fetchLedgerPage(cursor, minLedger, 256);
|
||||
if (warning)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__
|
||||
<< " online delete running but flag ledger is not complete";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto& obj : objects)
|
||||
{
|
||||
++numOutstanding;
|
||||
cbs.push_back(std::make_shared<OnlineDeleteCallbackData>(
|
||||
*this,
|
||||
std::move(obj.key),
|
||||
minLedger,
|
||||
std::move(obj.blob),
|
||||
cv,
|
||||
mtx,
|
||||
numOutstanding));
|
||||
|
||||
onlineDelete(*cbs.back());
|
||||
std::unique_lock<std::mutex> lck(mtx);
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
|
||||
cv.wait(lck, [&numOutstanding, concurrentLimit]() {
|
||||
return numOutstanding < concurrentLimit;
|
||||
});
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
|
||||
cursor = curCursor;
|
||||
if (!cursor)
|
||||
break;
|
||||
}
|
||||
catch (DatabaseTimeout const& e)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " Database timeout fetching keys";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
}
|
||||
}
|
||||
std::unique_lock<std::mutex> lck(mtx);
|
||||
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
|
||||
CassandraStatement statement{deleteLedgerRange_};
|
||||
statement.bindInt(minLedger);
|
||||
executeSyncWrite(statement);
|
||||
// update ledger_range
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -1117,6 +1093,11 @@ CassandraBackend::open(bool readOnly)
|
||||
}
|
||||
return {""};
|
||||
};
|
||||
auto getInt = [this](std::string const& field) -> std::optional<int> {
|
||||
if (config_.contains(field) && config_.at(field).is_int64())
|
||||
return config_[field].as_int64();
|
||||
return {};
|
||||
};
|
||||
if (open_)
|
||||
{
|
||||
assert(false);
|
||||
@@ -1169,14 +1150,14 @@ CassandraBackend::open(bool readOnly)
|
||||
throw std::runtime_error(ss.str());
|
||||
}
|
||||
|
||||
int port = config_.contains("port") ? config_["port"].as_int64() : 0;
|
||||
auto port = getInt("port");
|
||||
if (port)
|
||||
{
|
||||
rc = cass_cluster_set_port(cluster, port);
|
||||
rc = cass_cluster_set_port(cluster, *port);
|
||||
if (rc != CASS_OK)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "nodestore: Error setting Cassandra port: " << port
|
||||
ss << "nodestore: Error setting Cassandra port: " << *port
|
||||
<< ", result: " << rc << ", " << cass_error_desc(rc);
|
||||
|
||||
throw std::runtime_error(ss.str());
|
||||
@@ -1204,9 +1185,8 @@ CassandraBackend::open(bool readOnly)
|
||||
cass_cluster_set_credentials(
|
||||
cluster, username.c_str(), getString("password").c_str());
|
||||
}
|
||||
int threads = config_.contains("threads")
|
||||
? config_["threads"].as_int64()
|
||||
: std::thread::hardware_concurrency();
|
||||
int threads = getInt("threads") ? *getInt("threads")
|
||||
: std::thread::hardware_concurrency();
|
||||
|
||||
rc = cass_cluster_set_num_threads_io(cluster, threads);
|
||||
if (rc != CASS_OK)
|
||||
@@ -1216,6 +1196,8 @@ CassandraBackend::open(bool readOnly)
|
||||
<< ", result: " << rc << ", " << cass_error_desc(rc);
|
||||
throw std::runtime_error(ss.str());
|
||||
}
|
||||
if (getInt("max_requests_outstanding"))
|
||||
maxRequestsOutstanding = *getInt("max_requests_outstanding");
|
||||
|
||||
cass_cluster_set_request_timeout(cluster, 10000);
|
||||
|
||||
@@ -1272,10 +1254,13 @@ CassandraBackend::open(bool readOnly)
|
||||
std::string keyspace = getString("keyspace");
|
||||
if (keyspace.empty())
|
||||
{
|
||||
throw std::runtime_error(
|
||||
"nodestore: Missing keyspace in Cassandra config");
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< "No keyspace specified. Using keyspace oceand";
|
||||
keyspace = "oceand";
|
||||
}
|
||||
|
||||
int rf = getInt("replication_factor") ? *getInt("replication_factor") : 3;
|
||||
|
||||
std::string tablePrefix = getString("table_prefix");
|
||||
if (tablePrefix.empty())
|
||||
{
|
||||
@@ -1284,6 +1269,19 @@ CassandraBackend::open(bool readOnly)
|
||||
|
||||
cass_cluster_set_connect_timeout(cluster, 10000);
|
||||
|
||||
int ttl = getInt("ttl") ? *getInt("ttl") * 2 : 0;
|
||||
int keysTtl = (ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0);
|
||||
int incr = keysTtl;
|
||||
while (keysTtl < ttl)
|
||||
{
|
||||
keysTtl += incr;
|
||||
}
|
||||
int booksTtl = 0;
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " setting ttl to " << std::to_string(ttl)
|
||||
<< " , books ttl to " << std::to_string(booksTtl) << " , keys ttl to "
|
||||
<< std::to_string(keysTtl);
|
||||
|
||||
auto executeSimpleStatement = [this](std::string const& query) {
|
||||
CassStatement* statement = makeStatement(query.c_str(), 0);
|
||||
CassFuture* fut = cass_session_execute(session_.get(), statement);
|
||||
@@ -1317,8 +1315,36 @@ CassandraBackend::open(bool readOnly)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "nodestore: Error connecting Cassandra session keyspace: "
|
||||
<< rc << ", " << cass_error_desc(rc);
|
||||
<< rc << ", " << cass_error_desc(rc)
|
||||
<< ", trying to create it ourselves";
|
||||
BOOST_LOG_TRIVIAL(error) << ss.str();
|
||||
// if the keyspace doesn't exist, try to create it
|
||||
session_.reset(cass_session_new());
|
||||
fut = cass_session_connect(session_.get(), cluster);
|
||||
rc = cass_future_error_code(fut);
|
||||
cass_future_free(fut);
|
||||
if (rc != CASS_OK)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "nodestore: Error connecting Cassandra session at all: "
|
||||
<< rc << ", " << cass_error_desc(rc);
|
||||
BOOST_LOG_TRIVIAL(error) << ss.str();
|
||||
}
|
||||
else
|
||||
{
|
||||
std::stringstream query;
|
||||
query << "CREATE KEYSPACE IF NOT EXISTS " << keyspace
|
||||
<< " WITH replication = {'class': 'SimpleStrategy', "
|
||||
"'replication_factor': '"
|
||||
<< std::to_string(rf) << "'} AND durable_writes = true";
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
query = {};
|
||||
query << "USE " << keyspace;
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1326,7 +1352,8 @@ CassandraBackend::open(bool readOnly)
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "objects"
|
||||
<< " ( key blob, sequence bigint, object blob, PRIMARY "
|
||||
"KEY(key, "
|
||||
"sequence)) WITH CLUSTERING ORDER BY (sequence DESC)";
|
||||
"sequence)) WITH CLUSTERING ORDER BY (sequence DESC) AND"
|
||||
<< " default_time_to_live = " << std::to_string(ttl);
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
|
||||
@@ -1337,6 +1364,7 @@ CassandraBackend::open(bool readOnly)
|
||||
continue;
|
||||
|
||||
query.str("");
|
||||
<<<<<<< HEAD
|
||||
query << "CREATE INDEX ON " << tablePrefix << "objects(sequence)";
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
@@ -1352,6 +1380,13 @@ CassandraBackend::open(bool readOnly)
|
||||
<< "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
|
||||
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, transaction "
|
||||
"blob, metadata blob)";
|
||||
=======
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
|
||||
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
|
||||
"transaction "
|
||||
"blob, metadata blob)"
|
||||
<< " WITH default_time_to_live = " << std::to_string(ttl);
|
||||
>>>>>>> dev
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
|
||||
@@ -1376,7 +1411,9 @@ CassandraBackend::open(bool readOnly)
|
||||
query.str("");
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys"
|
||||
<< " ( sequence bigint, key blob, PRIMARY KEY "
|
||||
"(sequence, key))";
|
||||
"(sequence, key))"
|
||||
" WITH default_time_to_live = "
|
||||
<< std::to_string(keysTtl);
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
|
||||
@@ -1386,24 +1423,14 @@ CassandraBackend::open(bool readOnly)
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
query.str("");
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
|
||||
<< " ( book blob, sequence bigint, quality_key tuple<blob, blob>, PRIMARY KEY "
|
||||
"((book, sequence), quality_key)) WITH CLUSTERING ORDER BY (quality_key "
|
||||
"ASC)";
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
query.str("");
|
||||
query << "SELECT * FROM " << tablePrefix << "books"
|
||||
<< " LIMIT 1";
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
query.str("");
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
|
||||
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
|
||||
" hash blob, "
|
||||
"PRIMARY KEY "
|
||||
"(account, seq_idx)) WITH "
|
||||
"CLUSTERING ORDER BY (seq_idx desc)";
|
||||
"CLUSTERING ORDER BY (seq_idx desc)"
|
||||
<< " AND default_time_to_live = " << std::to_string(ttl);
|
||||
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
|
||||
@@ -1415,7 +1442,8 @@ CassandraBackend::open(bool readOnly)
|
||||
|
||||
query.str("");
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers"
|
||||
<< " ( sequence bigint PRIMARY KEY, header blob )";
|
||||
<< " ( sequence bigint PRIMARY KEY, header blob )"
|
||||
<< " WITH default_time_to_live = " << std::to_string(ttl);
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
|
||||
@@ -1427,7 +1455,8 @@ CassandraBackend::open(bool readOnly)
|
||||
|
||||
query.str("");
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes"
|
||||
<< " (hash blob PRIMARY KEY, sequence bigint)";
|
||||
<< " (hash blob PRIMARY KEY, sequence bigint)"
|
||||
<< " WITH default_time_to_live = " << std::to_string(ttl);
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
|
||||
@@ -1478,12 +1507,15 @@ CassandraBackend::open(bool readOnly)
|
||||
continue;
|
||||
|
||||
query.str("");
|
||||
<<<<<<< HEAD
|
||||
query << "INSERT INTO " << tablePrefix << "books"
|
||||
<< " (book, sequence, quality_key) VALUES (?, ?, (?, ?))";
|
||||
if (!insertBook2_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
|
||||
query.str("");
|
||||
=======
|
||||
>>>>>>> dev
|
||||
query << "SELECT key FROM " << tablePrefix << "keys"
|
||||
<< " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?";
|
||||
if (!selectKeys_.prepareStatement(query, session_.get()))
|
||||
@@ -1541,24 +1573,6 @@ CassandraBackend::open(bool readOnly)
|
||||
if (!getToken_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
|
||||
query.str("");
|
||||
query << "SELECT quality_key FROM " << tablePrefix << "books "
|
||||
<< " WHERE book = ? AND sequence = ?"
|
||||
<< " AND quality_key >= (?, ?)"
|
||||
" ORDER BY quality_key ASC "
|
||||
" LIMIT ?";
|
||||
if (!selectBook_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
|
||||
query.str("");
|
||||
query << "SELECT * FROM " << tablePrefix << "books "
|
||||
<< "WHERE book = "
|
||||
<< "0x000000000000000000000000000000000000000000000000"
|
||||
<< " AND sequence = ?";
|
||||
if (!completeBook_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
|
||||
|
||||
query.str("");
|
||||
query << " INSERT INTO " << tablePrefix << "account_tx"
|
||||
<< " (account, seq_idx, hash) "
|
||||
@@ -1591,6 +1605,11 @@ CassandraBackend::open(bool readOnly)
|
||||
"(?,null)";
|
||||
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
query = {};
|
||||
query << " update " << tablePrefix << "ledger_range"
|
||||
<< " set sequence = ? where is_latest = false";
|
||||
if (!deleteLedgerRange_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
|
||||
query.str("");
|
||||
query << " select header from " << tablePrefix
|
||||
@@ -1610,15 +1629,17 @@ CassandraBackend::open(bool readOnly)
|
||||
<< " is_latest IN (true, false)";
|
||||
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
/*
|
||||
query.str("");
|
||||
query << " SELECT key,object FROM " << tablePrefix
|
||||
<< "objects WHERE sequence = ?";
|
||||
if (!selectLedgerDiff_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
|
||||
*/
|
||||
setupPreparedStatements = true;
|
||||
}
|
||||
|
||||
<<<<<<< HEAD
|
||||
if (config_.contains("max_requests_outstanding"))
|
||||
{
|
||||
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
|
||||
@@ -1629,6 +1650,8 @@ CassandraBackend::open(bool readOnly)
|
||||
config_["indexer_max_requests_outstanding"].as_int64();
|
||||
}
|
||||
|
||||
=======
|
||||
>>>>>>> dev
|
||||
work_.emplace(ioContext_);
|
||||
ioThread_ = std::thread{[this]() { ioContext_.run(); }};
|
||||
open_ = true;
|
||||
|
||||
Reference in New Issue
Block a user