Add cassandra

This commit is contained in:
CJ Cobb
2020-12-15 15:48:00 -05:00
parent 1d737014e9
commit 4e801db9a3
6 changed files with 1452 additions and 162 deletions

View File

@@ -25,14 +25,11 @@
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/ReportingBackend.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
namespace ripple {
class ReportingETL;
/// This class manages a connection to a single ETL source. This is almost
/// always a p2p node, but really could be another reporting node. This class
/// subscribes to the ledgers and transactions_proposed streams of the
@@ -88,6 +85,8 @@ class ETLSource
// used for retrying connections
boost::asio::steady_timer timer_;
CassandraFlatMapBackend& backend_;
public:
bool
isConnected() const
@@ -112,10 +111,9 @@ public:
/// Create ETL source without gRPC endpoint
/// Fetch ledger and load initial ledger will fail for this source
/// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource(std::string ip, std::string wsPort);
/// Create ETL source with gRPC endpoint
ETLSource(std::string ip, std::string wsPort, std::string grpcPort);
ETLSource(
boost::json::object const& config,
CassandraFlatMapBackend& backend);
/// @param sequence ledger sequence to check for
/// @return true if this source has the desired ledger
@@ -269,8 +267,6 @@ public:
getP2pForwardingStub() const;
*/
};
/*
*
/// This class is used to manage connections to transaction processing processes
/// This class spawns a listener for each etl source, which listens to messages
/// on the ledgers stream (to keep track of which ledgers have been validated by
@@ -280,29 +276,14 @@ public:
class ETLLoadBalancer
{
private:
ReportingETL& etl_;
beast::Journal journal_;
// ReportingETL& etl_;
std::vector<std::unique_ptr<ETLSource>> sources_;
public:
ETLLoadBalancer(ReportingETL& etl);
/// Add an ETL source
/// @param host host or ip of ETL source
/// @param websocketPort port where ETL source accepts websocket connections
/// @param grpcPort port where ETL source accepts gRPC requests
void
add(std::string& host, std::string& websocketPort, std::string& grpcPort);
/// Add an ETL source without gRPC support. This source will send messages
/// on the ledgers and transactions_proposed streams, but will not be able
/// to handle the gRPC requests that are used for ETL
/// @param host host or ip of ETL source
/// @param websocketPort port where ETL source accepts websocket connections
void
add(std::string& host, std::string& websocketPort);
ETLLoadBalancer(
boost::json::array const& config,
CassandraFlatMapBackend& backend);
/// Load the initial ledger, writing data to the queue
/// @param sequence sequence of ledger to download
@@ -336,47 +317,47 @@ public:
/// to clients).
/// @param in ETLSource in question
/// @return true if messages should be forwarded
bool
shouldPropagateTxnStream(ETLSource* in) const
{
for (auto& src : sources_)
{
assert(src);
// We pick the first ETLSource encountered that is connected
if (src->isConnected())
{
if (src.get() == in)
return true;
else
return false;
}
}
// bool
// shouldPropagateTxnStream(ETLSource* in) const
// {
// for (auto& src : sources_)
// {
// assert(src);
// // We pick the first ETLSource encountered that is connected
// if (src->isConnected())
// {
// if (src.get() == in)
// return true;
// else
// return false;
// }
// }
//
// // If no sources connected, then this stream has not been
// forwarded. return true;
// }
// If no sources connected, then this stream has not been forwarded.
return true;
}
Json::Value
toJson() const
{
Json::Value ret(Json::arrayValue);
for (auto& src : sources_)
{
ret.append(src->toJson());
}
return ret;
}
/// Randomly select a p2p node to forward a gRPC request to
/// @return gRPC stub to forward requests to p2p node
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getP2pForwardingStub() const;
/// Forward a JSON RPC request to a randomly selected p2p node
/// @param context context of the request
/// @return response received from p2p node
Json::Value
forwardToP2p(RPC::JsonContext& context) const;
// Json::Value
// toJson() const
// {
// Json::Value ret(Json::arrayValue);
// for (auto& src : sources_)
// {
// ret.append(src->toJson());
// }
// return ret;
// }
//
// /// Randomly select a p2p node to forward a gRPC request to
// /// @return gRPC stub to forward requests to p2p node
// std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
// getP2pForwardingStub() const;
//
// /// Forward a JSON RPC request to a randomly selected p2p node
// /// @param context context of the request
// /// @return response received from p2p node
// Json::Value
// forwardToP2p(RPC::JsonContext& context) const;
private:
/// f is a function that takes an ETLSource as an argument and returns a
@@ -393,6 +374,4 @@ private:
bool
execute(Func f, uint32_t ledgerSequence);
};
*/
} // namespace ripple
#endif