20 #ifdef RIPPLED_REPORTING
22 #include <cassandra.h>
25 #include <ripple/basics/Slice.h>
26 #include <ripple/basics/StringUtilities.h>
27 #include <ripple/basics/contract.h>
28 #include <ripple/basics/strHex.h>
29 #include <ripple/nodestore/Backend.h>
30 #include <ripple/nodestore/Factory.h>
31 #include <ripple/nodestore/Manager.h>
32 #include <ripple/nodestore/impl/DecodedBlob.h>
33 #include <ripple/nodestore/impl/EncodedBlob.h>
34 #include <ripple/nodestore/impl/codec.h>
35 #include <ripple/protocol/digest.h>
36 #include <boost/asio/steady_timer.hpp>
37 #include <boost/filesystem.hpp>
49 #include <nudb/nudb.hpp>
60 writeCallback(CassFuture* fut,
void* cbData);
62 readCallback(CassFuture* fut,
void* cbData);
64 class CassandraBackend :
public Backend
70 makeStatement(
char const* query,
std::size_t params)
72 CassStatement* ret = cass_statement_new(query, params);
74 cass_statement_set_consistency(ret, CASS_CONSISTENCY_QUORUM);
78 ss <<
"nodestore: Error setting query consistency: " << query
79 <<
", result: " << rc <<
", " << cass_error_desc(rc);
80 Throw<std::runtime_error>(ss.
str());
87 size_t const keyBytes_;
89 Section
const config_;
98 [](CassSession* session) {
100 CassFuture* fut = cass_session_close(session);
101 cass_future_wait(fut);
102 cass_future_free(fut);
103 cass_session_free(session);
108 const CassPrepared* insert_ =
nullptr;
109 const CassPrepared* select_ =
nullptr;
112 boost::asio::io_context ioContext_;
118 uint32_t maxRequestsOutstanding = 10000000;
131 Counters<std::atomic<std::uint64_t>> counters_;
136 Section
const& keyValues,
138 : j_(journal), keyBytes_(keyBytes), config_(keyValues)
142 ~CassandraBackend()
override
163 open(
bool createIfMissing)
override
168 JLOG(j_.
error()) <<
"database is already open";
173 CassCluster* cluster = cass_cluster_new();
175 Throw<std::runtime_error>(
176 "nodestore:: Failed to create CassCluster");
178 std::string secureConnectBundle =
get(config_,
"secure_connect_bundle");
180 if (!secureConnectBundle.
empty())
184 if (cass_cluster_set_cloud_secure_connection_bundle(
185 cluster, secureConnectBundle.
c_str()) != CASS_OK)
187 JLOG(j_.
error()) <<
"Unable to configure cloud using the "
188 "secure connection bundle: "
189 << secureConnectBundle;
190 Throw<std::runtime_error>(
191 "nodestore: Failed to connect using secure connection "
199 if (contact_points.
empty())
201 Throw<std::runtime_error>(
202 "nodestore: Missing contact_points in Cassandra config");
204 CassError rc = cass_cluster_set_contact_points(
205 cluster, contact_points.
c_str());
209 ss <<
"nodestore: Error setting Cassandra contact_points: "
210 << contact_points <<
", result: " << rc <<
", "
211 << cass_error_desc(rc);
213 Throw<std::runtime_error>(ss.
str());
216 int port = get<int>(config_,
"port");
219 rc = cass_cluster_set_port(cluster, port);
223 ss <<
"nodestore: Error setting Cassandra port: " << port
224 <<
", result: " << rc <<
", " << cass_error_desc(rc);
226 Throw<std::runtime_error>(ss.
str());
230 cass_cluster_set_token_aware_routing(cluster, cass_true);
231 CassError rc = cass_cluster_set_protocol_version(
232 cluster, CASS_PROTOCOL_VERSION_V4);
236 ss <<
"nodestore: Error setting cassandra protocol version: "
237 <<
", result: " << rc <<
", " << cass_error_desc(rc);
239 Throw<std::runtime_error>(ss.
str());
246 <<
" password = " <<
get(config_,
"password")
248 cass_cluster_set_credentials(
249 cluster, username.
c_str(),
get(config_,
"password").c_str());
253 rc = cass_cluster_set_num_threads_io(cluster, workers);
257 ss <<
"nodestore: Error setting Cassandra io threads to " << workers
258 <<
", result: " << rc <<
", " << cass_error_desc(rc);
259 Throw<std::runtime_error>(ss.
str());
262 cass_cluster_set_request_timeout(cluster, 2000);
264 rc = cass_cluster_set_queue_size_io(
266 maxRequestsOutstanding);
271 ss <<
"nodestore: Error setting Cassandra max core connections per "
273 <<
", result: " << rc <<
", " << cass_error_desc(rc);
283 boost::filesystem::path(certfile).
string(), std::ios::in);
287 ss <<
"opening config file " << certfile;
288 Throw<std::system_error>(
294 if (fileStream.bad())
297 ss <<
"reading config file " << certfile;
298 Throw<std::system_error>(
302 CassSsl* context = cass_ssl_new();
303 cass_ssl_set_verify_flags(context, CASS_SSL_VERIFY_NONE);
304 rc = cass_ssl_add_trusted_cert(context, cert.c_str());
308 ss <<
"nodestore: Error setting Cassandra ssl context: " << rc
309 <<
", " << cass_error_desc(rc);
310 Throw<std::runtime_error>(ss.
str());
313 cass_cluster_set_ssl(cluster, context);
314 cass_ssl_free(context);
318 if (keyspace.
empty())
320 Throw<std::runtime_error>(
321 "nodestore: Missing keyspace in Cassandra config");
325 if (tableName.
empty())
327 Throw<std::runtime_error>(
328 "nodestore: Missing table name in Cassandra config");
331 cass_cluster_set_connect_timeout(cluster, 10000);
333 CassStatement* statement;
335 bool setupSessionAndTable =
false;
336 while (!setupSessionAndTable)
339 session_.reset(cass_session_new());
342 fut = cass_session_connect_keyspace(
343 session_.get(), cluster, keyspace.
c_str());
344 rc = cass_future_error_code(fut);
345 cass_future_free(fut);
349 ss <<
"nodestore: Error connecting Cassandra session keyspace: "
350 << rc <<
", " << cass_error_desc(rc);
356 query <<
"CREATE TABLE IF NOT EXISTS " << tableName
357 <<
" ( hash blob PRIMARY KEY, object blob)";
359 statement = makeStatement(query.
str().c_str(), 0);
360 fut = cass_session_execute(session_.get(), statement);
361 rc = cass_future_error_code(fut);
362 cass_future_free(fut);
363 cass_statement_free(statement);
364 if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
367 ss <<
"nodestore: Error creating Cassandra table: " << rc
368 <<
", " << cass_error_desc(rc);
374 query <<
"SELECT * FROM " << tableName <<
" LIMIT 1";
375 statement = makeStatement(query.
str().c_str(), 0);
376 fut = cass_session_execute(session_.get(), statement);
377 rc = cass_future_error_code(fut);
378 cass_future_free(fut);
379 cass_statement_free(statement);
382 if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
384 JLOG(j_.
warn()) <<
"table not here yet, sleeping 1s to "
385 "see if table creation propagates";
391 ss <<
"nodestore: Error checking for table: " << rc <<
", "
392 << cass_error_desc(rc);
398 setupSessionAndTable =
true;
401 cass_cluster_free(cluster);
403 bool setupPreparedStatements =
false;
404 while (!setupPreparedStatements)
408 query <<
"INSERT INTO " << tableName
409 <<
" (hash, object) VALUES (?, ?)";
410 CassFuture* prepare_future =
411 cass_session_prepare(session_.get(), query.
str().c_str());
414 rc = cass_future_error_code(prepare_future);
419 cass_future_free(prepare_future);
422 ss <<
"nodestore: Error preparing insert : " << rc <<
", "
423 << cass_error_desc(rc);
429 insert_ = cass_future_get_prepared(prepare_future);
434 cass_future_free(prepare_future);
437 query <<
"SELECT object FROM " << tableName <<
" WHERE hash = ?";
439 cass_session_prepare(session_.get(), query.
str().c_str());
442 rc = cass_future_error_code(prepare_future);
447 cass_future_free(prepare_future);
450 ss <<
"nodestore: Error preparing select : " << rc <<
", "
451 << cass_error_desc(rc);
457 select_ = cass_future_get_prepared(prepare_future);
462 cass_future_free(prepare_future);
463 setupPreparedStatements =
true;
467 ioThread_ =
std::thread{[
this]() { ioContext_.run(); }};
470 if (config_.exists(
"max_requests_outstanding"))
472 maxRequestsOutstanding =
473 get<int>(config_,
"max_requests_outstanding");
485 cass_prepared_free(insert_);
490 cass_prepared_free(select_);
506 JLOG(j_.
trace()) <<
"Fetching from cassandra";
507 CassStatement* statement = cass_prepared_bind(select_);
508 cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
509 CassError rc = cass_statement_bind_bytes(
510 statement, 0,
static_cast<cass_byte_t const*
>(key), keyBytes_);
513 cass_statement_free(statement);
514 JLOG(j_.
error()) <<
"Binding Cassandra fetch query: " << rc <<
", "
515 << cass_error_desc(rc);
522 fut = cass_session_execute(session_.get(), statement);
523 rc = cass_future_error_code(fut);
527 ss <<
"Cassandra fetch error";
529 ++counters_.readRetries;
530 ss <<
": " << cass_error_desc(rc);
533 }
while (rc != CASS_OK);
535 CassResult
const* res = cass_future_get_result(fut);
536 cass_statement_free(statement);
537 cass_future_free(fut);
539 CassRow
const* row = cass_result_first_row(res);
542 cass_result_free(res);
546 cass_byte_t
const* buf;
548 rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
551 cass_result_free(res);
553 JLOG(j_.
error()) <<
"Cassandra fetch result error: " << rc <<
", "
554 << cass_error_desc(rc);
555 ++counters_.readErrors;
559 nudb::detail::buffer bf;
562 DecodedBlob decoded(key, uncompressed.
first, uncompressed.
second);
563 cass_result_free(res);
565 if (!decoded.wasOk())
568 JLOG(j_.
error()) <<
"Cassandra error decoding result: " << rc
569 <<
", " << cass_error_desc(rc);
570 ++counters_.readErrors;
573 *pno = decoded.createObject();
577 struct ReadCallbackData
579 CassandraBackend& backend;
580 const void*
const key;
588 CassandraBackend& backend,
589 const void*
const key,
598 , numFinished(numFinished)
599 , batchSize(batchSize)
603 ReadCallbackData(ReadCallbackData
const& other) =
default;
610 JLOG(j_.
trace()) <<
"Fetching " << numHashes
611 <<
" records from Cassandra";
620 cbs.
push_back(std::make_shared<ReadCallbackData>(
622 static_cast<void const*
>(hashes[i]),
629 assert(results.size() == cbs.
size());
632 cv.
wait(lck, [&numFinished, &numHashes]() {
633 return numFinished == numHashes;
636 JLOG(j_.
trace()) <<
"Fetched " << numHashes
637 <<
" records from Cassandra";
638 return {results,
ok};
642 read(ReadCallbackData& data)
644 CassStatement* statement = cass_prepared_bind(select_);
645 cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
646 CassError rc = cass_statement_bind_bytes(
647 statement, 0,
static_cast<cass_byte_t const*
>(
data.key), keyBytes_);
650 size_t batchSize =
data.batchSize;
651 if (++(
data.numFinished) == batchSize)
652 data.cv.notify_all();
653 cass_statement_free(statement);
654 JLOG(j_.
error()) <<
"Binding Cassandra fetch query: " << rc <<
", "
655 << cass_error_desc(rc);
659 CassFuture* fut = cass_session_execute(session_.get(), statement);
661 cass_statement_free(statement);
663 cass_future_set_callback(fut, readCallback,
static_cast<void*
>(&data));
664 cass_future_free(fut);
667 struct WriteCallbackData
669 CassandraBackend* backend;
674 NodeStore::EncodedBlob e;
676 std::chrono::steady_clock::time_point
begin;
679 nudb::detail::buffer bf;
682 uint32_t currentRetries = 0;
688 : backend(f), no(nobj), totalWriteRetries(retries)
698 write(WriteCallbackData& data,
bool isRetry)
709 if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
711 JLOG(j_.
trace()) << __func__ <<
" : "
712 <<
"Max outstanding requests reached. "
713 <<
"Waiting for other requests to finish";
714 ++counters_.writesDelayed;
715 throttleCv_.
wait(lck, [
this]() {
716 return numRequestsOutstanding_ < maxRequestsOutstanding;
721 CassStatement* statement = cass_prepared_bind(insert_);
722 cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
723 CassError rc = cass_statement_bind_bytes(
726 static_cast<cass_byte_t const*
>(
data.e.getKey()),
730 cass_statement_free(statement);
732 ss <<
"Binding cassandra insert hash: " << rc <<
", "
733 << cass_error_desc(rc);
734 JLOG(j_.
error()) << __func__ <<
" : " << ss.
str();
735 Throw<std::runtime_error>(ss.
str());
737 rc = cass_statement_bind_bytes(
740 static_cast<cass_byte_t const*
>(
data.compressed.first),
741 data.compressed.second);
744 cass_statement_free(statement);
746 ss <<
"Binding cassandra insert object: " << rc <<
", "
747 << cass_error_desc(rc);
748 JLOG(j_.
error()) << __func__ <<
" : " << ss.
str();
749 Throw<std::runtime_error>(ss.
str());
752 CassFuture* fut = cass_session_execute(session_.get(), statement);
753 cass_statement_free(statement);
755 cass_future_set_callback(fut, writeCallback,
static_cast<void*
>(&data));
756 cass_future_free(fut);
762 JLOG(j_.
trace()) <<
"Writing to cassandra";
763 WriteCallbackData*
data =
764 new WriteCallbackData(
this, no, counters_.writeRetries);
766 ++numRequestsOutstanding_;
771 storeBatch(
Batch const& batch)
override
773 for (
auto const& no : batch)
784 syncCv_.
wait(lck, [
this]() {
return numRequestsOutstanding_ == 0; });
794 Throw<std::runtime_error>(
"not implemented");
798 getWriteLoad()
override
804 setDeletePath()
override
809 fdRequired()
const override
815 counters()
const override
821 writeCallback(CassFuture* fut,
void* cbData);
824 readCallback(CassFuture* fut,
void* cbData);
831 readCallback(CassFuture* fut,
void* cbData)
833 CassandraBackend::ReadCallbackData& requestParams =
834 *
static_cast<CassandraBackend::ReadCallbackData*
>(cbData);
836 CassError rc = cass_future_error_code(fut);
840 ++(requestParams.backend.counters_.readRetries);
841 JLOG(requestParams.backend.j_.warn())
842 <<
"Cassandra fetch error : " << rc <<
" : " << cass_error_desc(rc)
849 requestParams.backend.read(requestParams);
853 auto finish = [&requestParams]() {
854 size_t batchSize = requestParams.batchSize;
855 if (++(requestParams.numFinished) == batchSize)
856 requestParams.cv.notify_all();
858 CassResult
const* res = cass_future_get_result(fut);
860 CassRow
const* row = cass_result_first_row(res);
863 cass_result_free(res);
864 JLOG(requestParams.backend.j_.error())
865 <<
"Cassandra fetch get row error : " << rc <<
", "
866 << cass_error_desc(rc);
870 cass_byte_t
const* buf;
872 rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
875 cass_result_free(res);
876 JLOG(requestParams.backend.j_.error())
877 <<
"Cassandra fetch get bytes error : " << rc <<
", "
878 << cass_error_desc(rc);
879 ++requestParams.backend.counters_.readErrors;
883 nudb::detail::buffer bf;
887 requestParams.key, uncompressed.
first, uncompressed.
second);
888 cass_result_free(res);
890 if (!decoded.wasOk())
892 JLOG(requestParams.backend.j_.fatal())
893 <<
"Cassandra fetch error - data corruption : " << rc <<
", "
894 << cass_error_desc(rc);
895 ++requestParams.backend.counters_.readErrors;
899 requestParams.result = decoded.createObject();
908 writeCallback(CassFuture* fut,
void* cbData)
910 CassandraBackend::WriteCallbackData& requestParams =
911 *
static_cast<CassandraBackend::WriteCallbackData*
>(cbData);
912 CassandraBackend& backend = *requestParams.backend;
913 auto rc = cass_future_error_code(fut);
916 JLOG(backend.j_.error())
917 <<
"ERROR!!! Cassandra insert error: " << rc <<
", "
918 << cass_error_desc(rc) <<
", retrying ";
919 ++requestParams.totalWriteRetries;
923 ++requestParams.currentRetries;
925 std::make_shared<boost::asio::steady_timer>(
927 timer->async_wait([timer, &requestParams, &backend](
928 const boost::system::error_code& error) {
929 backend.write(requestParams,
true);
934 backend.counters_.writeDurationUs +=
935 std::chrono::duration_cast<std::chrono::microseconds>(
938 --(backend.numRequestsOutstanding_);
940 backend.throttleCv_.notify_all();
941 if (backend.numRequestsOutstanding_ == 0)
942 backend.syncCv_.notify_all();
943 delete &requestParams;
949 class CassandraFactory :
public Factory
957 ~CassandraFactory()
override
963 getName()
const override
971 Section
const& keyValues,
973 Scheduler& scheduler,
976 return std::make_unique<CassandraBackend>(keyBytes, keyValues, journal);
980 static CassandraFactory cassandraFactory;