checkpoint

This commit is contained in:
CJ Cobb
2020-12-18 19:40:38 -05:00
parent d6e54c398a
commit 80a8ed8d1b
18 changed files with 2186 additions and 17 deletions

87
.clang-format Normal file
View File

@@ -0,0 +1,87 @@
---
Language: Cpp
AccessModifierOffset: -4
AlignAfterOpenBracket: AlwaysBreak
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlignEscapedNewlinesLeft: true
AlignOperands: false
AlignTrailingComments: true
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: false
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterReturnType: All
AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: true
BinPackArguments: false
BinPackParameters: false
BraceWrapping:
AfterClass: true
AfterControlStatement: true
AfterEnum: false
AfterFunction: true
AfterNamespace: false
AfterObjCDeclaration: true
AfterStruct: true
AfterUnion: true
BeforeCatch: true
BeforeElse: true
IndentBraces: false
BreakBeforeBinaryOperators: false
BreakBeforeBraces: Custom
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: true
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
ConstructorInitializerAllOnOneLineOrOnePerLine: true
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DerivePointerAlignment: false
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
ForEachMacros: [ Q_FOREACH, BOOST_FOREACH ]
IncludeCategories:
- Regex: '^<(BeastConfig)'
Priority: 0
- Regex: '^<(ripple)/'
Priority: 2
- Regex: '^<(boost)/'
Priority: 3
- Regex: '.*'
Priority: 4
IncludeIsMainRegex: '$'
IndentCaseLabels: true
IndentFunctionDeclarationAfterType: false
IndentWidth: 4
IndentWrappedFunctionNames: false
KeepEmptyLinesAtTheStartOfBlocks: false
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: false
PenaltyBreakBeforeFirstCallParameter: 1
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 200
PointerAlignment: Left
ReflowComments: true
SortIncludes: true
SpaceAfterCStyleCast: false
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: ControlStatements
SpaceInEmptyParentheses: false
SpacesBeforeTrailingComments: 2
SpacesInAngles: false
SpacesInContainerLiterals: true
SpacesInCStyleCastParentheses: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
Standard: Cpp11
TabWidth: 8
UseTab: Never

View File

@@ -122,7 +122,7 @@ include(Postgres)
#target_link_libraries (grpc_protobufs ${_REFLECTION} ${_PROTOBUF_LIBPROTOBUF} ${_GRPC_GRPCPP})
target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/ReportingBackend.cpp reporting/Pg.cpp)
target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/ReportingBackend.cpp reporting/Pg.cpp reporting/DBHelpers.cpp)
message(${Boost_LIBRARIES})

25
config.json Normal file
View File

@@ -0,0 +1,25 @@
{
"database":
{
"cassandra":
{
"username":"xrplreporting",
"password":"",
"secure_connect_bundle":"/home/cj/secure-connect-xrplreporting.zip",
"keyspace":"xrplreporting",
"table_name":"cj",
"max_requests_outstanding":1000
},
"postgres": {
"connection":""
}
},
"etl_sources":
[
{
"ip":"127.0.0.1",
"ws_port":"6005",
"grpc_port":"50051"
}
]
}

156
deps/cassandra.cmake vendored Normal file
View File

@@ -0,0 +1,156 @@
find_library(cassandra NAMES cassandra)
if(NOT cassandra)
message("System installed Cassandra cpp driver not found. Will build")
find_library(zlib NAMES zlib1g-dev zlib-devel zlib z)
if(NOT zlib)
message("zlib not found. will build")
add_library(zlib STATIC IMPORTED GLOBAL)
ExternalProject_Add(zlib_src
PREFIX ${nih_cache_path}
GIT_REPOSITORY https://github.com/madler/zlib.git
GIT_TAG master
INSTALL_COMMAND ""
BUILD_BYPRODUCTS <BINARY_DIR>/${ep_lib_prefix}z.a
)
ExternalProject_Get_Property (zlib_src SOURCE_DIR)
ExternalProject_Get_Property (zlib_src BINARY_DIR)
set (zlib_src_SOURCE_DIR "${SOURCE_DIR}")
file (MAKE_DIRECTORY ${zlib_src_SOURCE_DIR}/include)
set_target_properties (zlib PROPERTIES
IMPORTED_LOCATION
${BINARY_DIR}/${ep_lib_prefix}z.a
INTERFACE_INCLUDE_DIRECTORIES
${SOURCE_DIR}/include)
add_dependencies(zlib zlib_src)
file(TO_CMAKE_PATH "${zlib_src_SOURCE_DIR}" zlib_src_SOURCE_DIR)
endif()
find_library(krb5 NAMES krb5-dev libkrb5-dev)
if(NOT krb5)
message("krb5 not found. will build")
add_library(krb5 STATIC IMPORTED GLOBAL)
ExternalProject_Add(krb5_src
PREFIX ${nih_cache_path}
GIT_REPOSITORY https://github.com/krb5/krb5.git
GIT_TAG master
UPDATE_COMMAND ""
CONFIGURE_COMMAND autoreconf src && ./src/configure --enable-static --disable-shared
BUILD_IN_SOURCE 1
BUILD_COMMAND make
INSTALL_COMMAND ""
BUILD_BYPRODUCTS <SOURCE_DIR>/lib/${ep_lib_prefix}krb5.a
)
ExternalProject_Get_Property (krb5_src SOURCE_DIR)
ExternalProject_Get_Property (krb5_src BINARY_DIR)
set (krb5_src_SOURCE_DIR "${SOURCE_DIR}")
file (MAKE_DIRECTORY ${krb5_src_SOURCE_DIR}/include)
set_target_properties (krb5 PROPERTIES
IMPORTED_LOCATION
${BINARY_DIR}/lib/${ep_lib_prefix}krb5.a
INTERFACE_INCLUDE_DIRECTORIES
${SOURCE_DIR}/include)
add_dependencies(krb5 krb5_src)
file(TO_CMAKE_PATH "${krb5_src_SOURCE_DIR}" krb5_src_SOURCE_DIR)
endif()
find_library(libuv1 NAMES uv1 libuv1 liubuv1-dev libuv1:amd64)
if(NOT libuv1)
message("libuv1 not found, will build")
add_library(libuv1 STATIC IMPORTED GLOBAL)
ExternalProject_Add(libuv_src
PREFIX ${nih_cache_path}
GIT_REPOSITORY https://github.com/libuv/libuv.git
GIT_TAG v1.x
INSTALL_COMMAND ""
BUILD_BYPRODUCTS <BINARY_DIR>/${ep_lib_prefix}uv_a.a
)
ExternalProject_Get_Property (libuv_src SOURCE_DIR)
ExternalProject_Get_Property (libuv_src BINARY_DIR)
set (libuv_src_SOURCE_DIR "${SOURCE_DIR}")
file (MAKE_DIRECTORY ${libuv_src_SOURCE_DIR}/include)
set_target_properties (libuv1 PROPERTIES
IMPORTED_LOCATION
${BINARY_DIR}/${ep_lib_prefix}uv_a.a
INTERFACE_INCLUDE_DIRECTORIES
${SOURCE_DIR}/include)
add_dependencies(libuv1 libuv_src)
file(TO_CMAKE_PATH "${libuv_src_SOURCE_DIR}" libuv_src_SOURCE_DIR)
endif()
add_library (cassandra STATIC IMPORTED GLOBAL)
ExternalProject_Add(cassandra_src
PREFIX ${nih_cache_path}
GIT_REPOSITORY https://github.com/datastax/cpp-driver.git
GIT_TAG master
CMAKE_ARGS
-DLIBUV_ROOT_DIR=${BINARY_DIR}
-DLIBUV_LIBARY=${BINARY_DIR}/libuv_a.a
-DLIBUV_INCLUDE_DIR=${SOURCE_DIR}/include
-DCASS_BUILD_STATIC=ON
INSTALL_COMMAND ""
BUILD_BYPRODUCTS <BINARY_DIR>/${ep_lib_prefix}cassandra_static.a
)
ExternalProject_Get_Property (cassandra_src SOURCE_DIR)
ExternalProject_Get_Property (cassandra_src BINARY_DIR)
set (cassandra_src_SOURCE_DIR "${SOURCE_DIR}")
file (MAKE_DIRECTORY ${cassandra_src_SOURCE_DIR}/include)
set_target_properties (cassandra PROPERTIES
IMPORTED_LOCATION
${BINARY_DIR}/${ep_lib_prefix}cassandra_static.a
INTERFACE_INCLUDE_DIRECTORIES
${SOURCE_DIR}/include)
add_dependencies(cassandra cassandra_src)
if(NOT libuv1)
ExternalProject_Add_StepDependencies(cassandra_src build libuv1)
target_link_libraries(cassandra INTERFACE libuv1)
else()
target_link_libraries(cassandra INTERFACE ${libuv1})
endif()
if(NOT krb5)
ExternalProject_Add_StepDependencies(cassandra_src build krb5)
target_link_libraries(cassandra INTERFACE krb5)
else()
target_link_libraries(cassandra INTERFACE ${krb5})
endif()
if(NOT zlib)
ExternalProject_Add_StepDependencies(cassandra_src build zlib)
target_link_libraries(cassandra INTERFACE zlib)
else()
target_link_libraries(cassandra INTERFACE ${zlib})
endif()
file(TO_CMAKE_PATH "${cassandra_src_SOURCE_DIR}" cassandra_src_SOURCE_DIR)
target_link_libraries(reporting INTERFACE cassandra)
else()
message("Found system installed cassandra cpp driver")
message(${cassandra})
find_path(cassandra_includes NAMES cassandra.h REQUIRED)
target_link_libraries (reporting PUBLIC ${cassandra})
target_include_directories(reporting INTERFACE ${cassandra_includes})
endif()

133
reporting/DBHelpers.cpp Normal file
View File

@@ -0,0 +1,133 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <boost/format.hpp>
#include <memory>
#include <reporting/DBHelpers.h>
static bool
writeToLedgersDB(
ripple::LedgerInfo const& info,
PgQuery& pgQuery,
beast::Journal& j)
{
BOOST_LOG_TRIVIAL(debug) << __func__;
auto cmd = boost::format(
R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
auto ledgerInsert = boost::str(
cmd % info.seq % ripple::strHex(info.hash) %
ripple::strHex(info.parentHash) % info.drops.drops() %
info.closeTime.time_since_epoch().count() %
info.parentCloseTime.time_since_epoch().count() %
info.closeTimeResolution.count() % info.closeFlags %
ripple::strHex(info.accountHash) % ripple::strHex(info.txHash));
BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< " : "
<< "query string = " << ledgerInsert;
auto res = pgQuery(ledgerInsert.data());
return res;
}
bool
writeToPostgres(
ripple::LedgerInfo const& info,
std::vector<AccountTransactionsData> const& accountTxData,
std::shared_ptr<PgPool> const& pgPool)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Beginning write to Postgres";
try
{
// Create a PgQuery object to run multiple commands over the same
// connection in a single transaction block.
PgQuery pg(pgPool);
auto res = pg("BEGIN");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "bulkWriteToTable : Postgres insert error: " << res.msg();
throw std::runtime_error(msg.str());
}
// Writing to the ledgers db fails if the ledger already exists in the
// db. In this situation, the ETL process has detected there is another
// writer, and falls back to only publishing
if (!writeToLedgersDB(info, pg, j))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : "
<< "Failed to write to ledgers database.";
return false;
}
std::stringstream transactionsCopyBuffer;
std::stringstream accountTransactionsCopyBuffer;
for (auto const& data : accountTxData)
{
std::string txHash = ripple::strHex(data.txHash);
std::string nodestoreHash = ripple::strHex(data.nodestoreHash);
auto idx = data.transactionIndex;
auto ledgerSeq = data.ledgerSequence;
transactionsCopyBuffer << std::to_string(ledgerSeq) << '\t'
<< std::to_string(idx) << '\t' << "\\\\x"
<< txHash << '\t' << "\\\\x" << nodestoreHash
<< '\n';
for (auto const& a : data.accounts)
{
std::string acct = ripple::strHex(a);
accountTransactionsCopyBuffer
<< "\\\\x" << acct << '\t' << std::to_string(ledgerSeq)
<< '\t' << std::to_string(idx) << '\n';
}
}
pg.bulkInsert("transactions", transactionsCopyBuffer.str());
pg.bulkInsert(
"account_transactions", accountTransactionsCopyBuffer.str());
res = pg("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "bulkWriteToTable : Postgres insert error: " << res.msg();
assert(false);
throw std::runtime_error(msg.str());
}
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Successfully wrote to Postgres";
return true;
}
catch (std::exception& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__
<< "Caught exception writing to Postgres : " << e.what();
assert(false);
return false;
}
}

63
reporting/DBHelpers.h Normal file
View File

@@ -0,0 +1,63 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_REPORTING_DBHELPERS_H_INCLUDED
#define RIPPLE_APP_REPORTING_DBHELPERS_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/ledger/TxMeta.h>
#include <boost/container/flat_set.hpp>
#include <reporting/Pg.h>
/// Struct used to keep track of what to write to transactions and
/// account_transactions tables in Postgres
struct AccountTransactionsData
{
boost::container::flat_set<ripple::AccountID> accounts;
uint32_t ledgerSequence;
uint32_t transactionIndex;
ripple::uint256 txHash;
ripple::uint256 nodestoreHash;
AccountTransactionsData(
ripple::TxMeta& meta,
ripple::uint256&& nodestoreHash,
beast::Journal& j)
: accounts(meta.getAffectedAccounts(j))
, ledgerSequence(meta.getLgrSeq())
, transactionIndex(meta.getIndex())
, txHash(meta.getTxID())
, nodestoreHash(std::move(nodestoreHash))
{
}
};
/// Write new ledger and transaction data to Postgres
/// @param info Ledger Info to write
/// @param accountTxData transaction data to write
/// @param pgPool pool of Postgres connections
/// @param j journal (for logging)
/// @return whether the write succeeded
bool
writeToPostgres(
ripple::LedgerInfo const& info,
std::vector<AccountTransactionsData> const& accountTxData,
std::shared_ptr<PgPool> const& pgPool);
#endif

181
reporting/ETLHelpers.h Normal file
View File

@@ -0,0 +1,181 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_REPORTING_ETLHELPERS_H_INCLUDED
#define RIPPLE_APP_REPORTING_ETLHELPERS_H_INCLUDED
#include <ripple/basics/base_uint.h>
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <sstream>
/// This datastructure is used to keep track of the sequence of the most recent
/// ledger validated by the network. There are two methods that will wait until
/// certain conditions are met. This datastructure is able to be "stopped". When
/// the datastructure is stopped, any threads currently waiting are unblocked.
/// Any later calls to methods of this datastructure will not wait. Once the
/// datastructure is stopped, the datastructure remains stopped for the rest of
/// its lifetime.
class NetworkValidatedLedgers
{
// max sequence validated by network
std::optional<uint32_t> max_;
mutable std::mutex m_;
std::condition_variable cv_;
bool stopping_ = false;
public:
/// Notify the datastructure that idx has been validated by the network
/// @param idx sequence validated by network
void
push(uint32_t idx)
{
std::lock_guard lck(m_);
if (!max_ || idx > *max_)
max_ = idx;
cv_.notify_all();
}
/// Get most recently validated sequence. If no ledgers are known to have
/// been validated, this function waits until the next ledger is validated
/// @return sequence of most recently validated ledger. empty optional if
/// the datastructure has been stopped
std::optional<uint32_t>
getMostRecent()
{
std::unique_lock lck(m_);
cv_.wait(lck, [this]() { return max_ || stopping_; });
return max_;
}
/// Waits for the sequence to be validated by the network
/// @param sequence to wait for
/// @return true if sequence was validated, false otherwise
/// a return value of false means the datastructure has been stopped
bool
waitUntilValidatedByNetwork(uint32_t sequence)
{
std::unique_lock lck(m_);
cv_.wait(lck, [sequence, this]() {
return (max_ && sequence <= *max_) || stopping_;
});
return !stopping_;
}
/// Puts the datastructure in the stopped state
/// Future calls to this datastructure will not block
/// This operation cannot be reversed
void
stop()
{
std::lock_guard lck(m_);
stopping_ = true;
cv_.notify_all();
}
};
/// Generic thread-safe queue with an optional maximum size
/// Note, we can't use a lockfree queue here, since we need the ability to wait
/// for an element to be added or removed from the queue. These waits are
/// blocking calls.
template <class T>
class ThreadSafeQueue
{
std::queue<T> queue_;
mutable std::mutex m_;
std::condition_variable cv_;
std::optional<uint32_t> maxSize_;
public:
/// @param maxSize maximum size of the queue. Calls that would cause the
/// queue to exceed this size will block until free space is available
explicit ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize)
{
}
/// Create a queue with no maximum size
ThreadSafeQueue() = default;
/// @param elt element to push onto queue
/// if maxSize is set, this method will block until free space is available
void
push(T const& elt)
{
std::unique_lock lck(m_);
// if queue has a max size, wait until not full
if (maxSize_)
cv_.wait(lck, [this]() { return queue_.size() <= *maxSize_; });
queue_.push(elt);
cv_.notify_all();
}
/// @param elt element to push onto queue. elt is moved from
/// if maxSize is set, this method will block until free space is available
void
push(T&& elt)
{
std::unique_lock lck(m_);
// if queue has a max size, wait until not full
if (maxSize_)
cv_.wait(lck, [this]() { return queue_.size() <= *maxSize_; });
queue_.push(std::move(elt));
cv_.notify_all();
}
/// @return element popped from queue. Will block until queue is non-empty
T
pop()
{
std::unique_lock lck(m_);
cv_.wait(lck, [this]() { return !queue_.empty(); });
T ret = std::move(queue_.front());
queue_.pop();
// if queue has a max size, unblock any possible pushers
if (maxSize_)
cv_.notify_all();
return ret;
}
};
/// Parititions the uint256 keyspace into numMarkers partitions, each of equal
/// size.
inline std::vector<ripple::uint256>
getMarkers(size_t numMarkers)
{
assert(numMarkers <= 256);
unsigned char incr = 256 / numMarkers;
std::vector<ripple::uint256> markers;
markers.reserve(numMarkers);
ripple::uint256 base{0};
for (size_t i = 0; i < numMarkers; ++i)
{
markers.push_back(base);
base.data()[0] += incr;
}
return markers;
}
#endif

View File

@@ -46,7 +46,6 @@ class ETLSource
std::string grpcPort_;
// a reference to the applications io_service
boost::asio::io_context ioc_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;

83
reporting/P2pProxy.cpp Normal file
View File

@@ -0,0 +1,83 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/reporting/P2pProxy.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/json/json_reader.h>
#include <ripple/json/json_writer.h>
namespace ripple {
Json::Value
forwardToP2p(RPC::JsonContext& context)
{
return context.app.getReportingETL().getETLLoadBalancer().forwardToP2p(
context);
}
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getP2pForwardingStub(RPC::Context& context)
{
return context.app.getReportingETL()
.getETLLoadBalancer()
.getP2pForwardingStub();
}
// We only forward requests where ledger_index is "current" or "closed"
// otherwise, attempt to handle here
bool
shouldForwardToP2p(RPC::JsonContext& context)
{
if (!context.app.config().reporting())
return false;
Json::Value& params = context.params;
std::string strCommand = params.isMember(jss::command)
? params[jss::command].asString()
: params[jss::method].asString();
JLOG(context.j.trace()) << "COMMAND:" << strCommand;
JLOG(context.j.trace()) << "REQUEST:" << params;
auto handler = RPC::getHandler(context.apiVersion, strCommand);
if (!handler)
{
JLOG(context.j.error())
<< "Error getting handler. command = " << strCommand;
return false;
}
if (handler->condition_ == RPC::NEEDS_CURRENT_LEDGER ||
handler->condition_ == RPC::NEEDS_CLOSED_LEDGER)
{
return true;
}
if (params.isMember(jss::ledger_index))
{
auto indexValue = params[jss::ledger_index];
if (!indexValue.isNumeric())
{
auto index = indexValue.asString();
return index == "current" || index == "closed";
}
}
return false;
}
} // namespace ripple

116
reporting/P2pProxy.h Normal file
View File

@@ -0,0 +1,116 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED
#define RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/impl/Handler.h>
#include <boost/beast/websocket.hpp>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
namespace ripple {
/// Forward a JSON request to a p2p node and return the response
/// @param context context of the request
/// @return response from p2p node
Json::Value
forwardToP2p(RPC::JsonContext& context);
/// Whether a request should be forwarded, based on request parameters
/// @param context context of the request
/// @return true if should be forwarded
bool
shouldForwardToP2p(RPC::JsonContext& context);
template <class Request>
bool
needCurrentOrClosed(Request& request)
{
// These are the only gRPC requests that specify a ledger
if constexpr (
std::is_same<Request, org::xrpl::rpc::v1::GetAccountInfoRequest>::
value ||
std::is_same<Request, org::xrpl::rpc::v1::GetLedgerRequest>::value ||
std::is_same<Request, org::xrpl::rpc::v1::GetLedgerDataRequest>::
value ||
std::is_same<Request, org::xrpl::rpc::v1::GetLedgerEntryRequest>::value)
{
if (request.ledger().ledger_case() ==
org::xrpl::rpc::v1::LedgerSpecifier::LedgerCase::kShortcut)
{
if (request.ledger().shortcut() !=
org::xrpl::rpc::v1::LedgerSpecifier::SHORTCUT_VALIDATED &&
request.ledger().shortcut() !=
org::xrpl::rpc::v1::LedgerSpecifier::SHORTCUT_UNSPECIFIED)
return true;
}
}
// GetLedgerDiff specifies two ledgers
else if constexpr (std::is_same<
Request,
org::xrpl::rpc::v1::GetLedgerDiffRequest>::value)
{
auto help = [](auto specifier) {
if (specifier.ledger_case() ==
org::xrpl::rpc::v1::LedgerSpecifier::LedgerCase::kShortcut)
{
if (specifier.shortcut() !=
org::xrpl::rpc::v1::LedgerSpecifier::
SHORTCUT_VALIDATED &&
specifier.shortcut() !=
org::xrpl::rpc::v1::LedgerSpecifier::
SHORTCUT_UNSPECIFIED)
return true;
}
return false;
};
return help(request.base_ledger()) || help(request.desired_ledger());
}
return false;
}
/// Whether a request should be forwarded, based on request parameters
/// @param context context of the request
/// @condition required condition for the request
/// @return true if should be forwarded
template <class Request>
bool
shouldForwardToP2p(RPC::GRPCContext<Request>& context, RPC::Condition condition)
{
if (!context.app.config().reporting())
return false;
if (condition == RPC::NEEDS_CURRENT_LEDGER ||
condition == RPC::NEEDS_CLOSED_LEDGER)
return true;
return needCurrentOrClosed(context.params);
}
/// Get stub used to forward gRPC requests to a p2p node
/// @param context context of the request
/// @return stub to forward requests
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getP2pForwardingStub(RPC::Context& context);
} // namespace ripple
#endif

View File

@@ -1416,3 +1416,125 @@ initSchema(std::shared_ptr<PgPool> const& pool)
}
}
// Load the ledger info for the specified ledger/s from the database
// @param whichLedger specifies the ledger to load via ledger sequence, ledger
// hash, a range of ledgers, or std::monostate (which loads the most recent)
// @return LedgerInfo
std::optional<ripple::LedgerInfo>
getLedger(
std::variant<
std::monostate,
ripple::uint256,
uint32_t,
std::pair<uint32_t, uint32_t>> const& whichLedger,
std::shared_ptr<PgPool>& pgPool)
{
ripple::LedgerInfo lgrInfo;
std::stringstream sql;
sql << "SELECT ledger_hash, prev_hash, account_set_hash, trans_set_hash, "
"total_coins, closing_time, prev_closing_time, close_time_res, "
"close_flags, ledger_seq FROM ledgers ";
uint32_t expNumResults = 1;
if (auto ledgerSeq = std::get_if<uint32_t>(&whichLedger))
{
sql << "WHERE ledger_seq = " + std::to_string(*ledgerSeq);
}
else if (auto ledgerHash = std::get_if<ripple::uint256>(&whichLedger))
{
sql << ("WHERE ledger_hash = \'\\x" + strHex(*ledgerHash) + "\'");
}
else
{
sql << ("ORDER BY ledger_seq desc LIMIT 1");
}
sql << ";";
BOOST_LOG_TRIVIAL(trace) << __func__ << " : sql = " << sql.str();
auto res = PgQuery(pgPool)(sql.str().data());
if (!res)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : Postgres response is null - sql = " << sql.str();
assert(false);
return {};
}
else if (res.status() != PGRES_TUPLES_OK)
{
BOOST_LOG_TRIVIAL(error) << __func__
<< " : Postgres response should have been "
"PGRES_TUPLES_OK but instead was "
<< res.status() << " - msg = " << res.msg()
<< " - sql = " << sql.str();
assert(false);
return {};
}
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " Postgres result msg : " << res.msg();
if (res.isNull() || res.ntuples() == 0)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : Ledger not found. sql = " << sql.str();
return {};
}
else if (res.ntuples() > 0)
{
if (res.nfields() != 10)
{
BOOST_LOG_TRIVIAL(error)
<< __func__
<< " : Wrong number of fields in Postgres "
"response. Expected 10, but got "
<< res.nfields() << " . sql = " << sql.str();
assert(false);
return {};
}
}
char const* hash = res.c_str(0, 0);
char const* prevHash = res.c_str(0, 1);
char const* accountHash = res.c_str(0, 2);
char const* txHash = res.c_str(0, 3);
std::int64_t totalCoins = res.asBigInt(0, 4);
std::int64_t closeTime = res.asBigInt(0, 5);
std::int64_t parentCloseTime = res.asBigInt(0, 6);
std::int64_t closeTimeRes = res.asBigInt(0, 7);
std::int64_t closeFlags = res.asBigInt(0, 8);
std::int64_t ledgerSeq = res.asBigInt(0, 9);
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " - Postgres response = " << hash << " , " << prevHash
<< " , " << accountHash << " , " << txHash << " , " << totalCoins
<< ", " << closeTime << ", " << parentCloseTime << ", " << closeTimeRes
<< ", " << closeFlags << ", " << ledgerSeq << " - sql = " << sql.str();
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " - Successfully fetched ledger with sequence = " << ledgerSeq
<< " from Postgres";
using time_point = ripple::NetClock::time_point;
using duration = ripple::NetClock::duration;
ripple::LedgerInfo info;
if (!info.parentHash.parseHex(prevHash + 2))
assert(false);
if (!info.txHash.parseHex(txHash + 2))
assert(false);
if (!info.accountHash.parseHex(accountHash + 2))
assert(false);
info.drops = totalCoins;
info.closeTime = time_point{duration{closeTime}};
info.parentCloseTime = time_point{duration{parentCloseTime}};
info.closeFlags = closeFlags;
info.closeTimeResolution = duration{closeTimeRes};
info.seq = ledgerSeq;
if (!info.hash.parseHex(hash + 2))
assert(false);
info.validated = true;
return info;
}

View File

@@ -20,8 +20,11 @@
#ifndef RIPPLE_CORE_PG_H_INCLUDED
#define RIPPLE_CORE_PG_H_INCLUDED
#include <ripple/basics/chrono.h>
#include <ripple/ledger/ReadView.h>
#include <boost/json.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/log/trivial.hpp>
#include <atomic>
#include <chrono>
#include <condition_variable>
@@ -511,4 +514,13 @@ make_PgPool(boost::json::object const& pgConfig);
void
initSchema(std::shared_ptr<PgPool> const& pool);
// Load the ledger info for the specified ledger/s from the database
// @param whichLedger specifies the ledger to load via ledger sequence, ledger
// hash or std::monostate (which loads the most recent)
// @return vector of LedgerInfos
std::optional<ripple::LedgerInfo>
getLedger(
std::variant<std::monostate, ripple::uint256, uint32_t> const& whichLedger,
std::shared_ptr<PgPool>& pgPool);
#endif // RIPPLE_CORE_PG_H_INCLUDED

108
reporting/README.md Normal file
View File

@@ -0,0 +1,108 @@
Reporting mode is a special operating mode of rippled, designed to handle RPCs
for validated data. A server running in reporting mode does not connect to the
p2p network, but rather extracts validated data from a node that is connected
to the p2p network. To run rippled in reporting mode, you must also run a
separate rippled node in p2p mode, to use as an ETL source. Multiple reporting
nodes can share access to the same network accessible databases (Postgres and
Cassandra); at any given time, only one reporting node will be performing ETL
and writing to the databases, while the others simply read from the databases.
A server running in reporting mode will forward any requests that require access
to the p2p network to a p2p node.
# Reporting ETL
A single reporting node has one or more ETL sources, specified in the config
file. A reporting node will subscribe to the "ledgers" stream of each of the ETL
sources. This stream sends a message whenever a new ledger is validated. Upon
receiving a message on the stream, reporting will then fetch the data associated
with the newly validated ledger from one of the ETL sources. The fetch is
performed via a gRPC request ("GetLedger"). This request returns the ledger
header, transactions+metadata blobs, and every ledger object
added/modified/deleted as part of this ledger. ETL then writes all of this data
to the databases, and moves on to the next ledger. ETL does not apply
transactions, but rather extracts the already computed results of those
transactions (all of the added/modified/deleted SHAMap leaf nodes of the state
tree). The new SHAMap inner nodes are computed by the ETL writer; this computation mainly
involves manipulating child pointers and recomputing hashes, logic which is
buried inside of SHAMap.
If the database is entirely empty, ETL must download an entire ledger in full
(as opposed to just the diff, as described above). This download is done via the
"GetLedgerData" gRPC request. "GetLedgerData" allows clients to page through an
entire ledger over several RPC calls. ETL will page through an entire ledger,
and write each object to the database.
If the database is not empty, the reporting node will first come up in a "soft"
read-only mode. In read-only mode, the server does not perform ETL and simply
publishes new ledgers as they are written to the database.
If the database is not updated within a certain time period
(currently hard coded at 20 seconds), the reporting node will begin the ETL
process and start writing to the database. Postgres will report an error when
trying to write a record with a key that already exists. ETL uses this error to
determine that another process is writing to the database, and subsequently
falls back to a soft read-only mode. Reporting nodes can also operate in strict
read-only mode, in which case they will never write to the database.
# Database Nuances
The database schema for reporting mode does not allow any history gaps.
Attempting to write a ledger to a non-empty database where the previous ledger
does not exist will return an error.
The databases must be set up prior to running reporting mode. This requires
creating the Postgres database, and setting up the Cassandra keyspace. Reporting
mode will create the objects table in Cassandra if the table does not yet exist.
Creating the Postgres database:
```
$ psql -h [host] -U [user]
postgres=# create database [database];
```
Creating the keyspace:
```
$ cqlsh [host] [port]
> CREATE KEYSPACE rippled WITH REPLICATION =
{'class' : 'SimpleStrategy', 'replication_factor' : 3 };
```
A replication factor of 3 is recommended. However, when running locally, only a
replication factor of 1 is supported.
Online delete is not supported by reporting mode and must be done manually. The
easiest way to do this would be to setup a second Cassandra keyspace and
Postgres database, bring up a single reporting mode instance that uses those
databases, and start ETL at a ledger of your choosing (via --startReporting on
the command line). Once this node is caught up, the other databases can be
deleted.
To delete:
```
$ psql -h [host] -U [user] -d [database]
reporting=$ truncate table ledgers cascade;
```
```
$ cqlsh [host] [port]
> truncate table objects;
```
# Proxy
RPCs that require access to the p2p network and/or the open ledger are forwarded
from the reporting node to one of the ETL sources. The request is not processed
prior to forwarding, and the response is delivered as-is to the client.
Reporting will forward any requests that always require p2p/open ledger access
(fee and submit, for instance). In addition, any request that explicitly
requests data from the open or closed ledger (via setting
"ledger_index":"current" or "ledger_index":"closed"), will be forwarded to a
p2p node.
For the stream "transactions_proposed" (AKA "rt_transactions"), reporting
subscribes to the "transactions_proposed" streams of each ETL source, and then
forwards those messages to any clients subscribed to the same stream on the
reporting node. A reporting node will subscribe to the stream on each ETL
source, but will only forward the messages from one of the streams at any given
time (to avoid sending the same message more than once to the same client).
# API changes
A reporting node defaults to only returning validated data. If a ledger is not
specified, the most recently validated ledger is used. This is in contrast to
the normal rippled behavior, where the open ledger is used by default.
Reporting will reject all subscribe requests for streams "server", "manifests",
"validations", "peer_status" and "consensus".

View File

@@ -0,0 +1,7 @@
class ReportingApplication
{
Config config_;
ReportingApplication(Config const& config) : config_(config)
{
}
};

View File

@@ -7,7 +7,7 @@ flatMapWriteCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteCallbackData*>(cbData);
CassandraFlatMapBackend& backend = *requestParams.backend;
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
@@ -42,7 +42,7 @@ flatMapWriteTransactionCallback(CassFuture* fut, void* cbData)
CassandraFlatMapBackend::WriteTransactionCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteTransactionCallbackData*>(
cbData);
CassandraFlatMapBackend& backend = *requestParams.backend;
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{

View File

@@ -81,24 +81,24 @@ private:
const CassPrepared* selectObject_ = nullptr;
// io_context used for exponential backoff for write retries
boost::asio::io_context ioContext_;
mutable boost::asio::io_context ioContext_;
std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_;
// maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded
uint32_t maxRequestsOutstanding = 10000000;
std::atomic_uint32_t numRequestsOutstanding_ = 0;
mutable std::atomic_uint32_t numRequestsOutstanding_ = 0;
// mutex and condition_variable to limit the number of concurrent in flight
// requests
std::mutex throttleMutex_;
std::condition_variable throttleCv_;
mutable std::mutex throttleMutex_;
mutable std::condition_variable throttleCv_;
// writes are asynchronous. This mutex and condition_variable is used to
// wait for all writes to finish
std::mutex syncMutex_;
std::condition_variable syncCv_;
mutable std::mutex syncMutex_;
mutable std::condition_variable syncCv_;
boost::json::object config_;
@@ -906,7 +906,7 @@ public:
struct WriteCallbackData
{
CassandraFlatMapBackend* backend;
CassandraFlatMapBackend const* backend;
// The shared pointer to the node object must exist until it's
// confirmed persisted. Otherwise, it can become deleted
// prematurely if other copies are removed from caches.
@@ -917,7 +917,7 @@ public:
uint32_t currentRetries = 0;
WriteCallbackData(
CassandraFlatMapBackend* f,
CassandraFlatMapBackend const* f,
std::string&& key,
uint32_t sequence,
std::string&& blob)
@@ -930,7 +930,7 @@ public:
};
void
write(WriteCallbackData& data, bool isRetry)
write(WriteCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
@@ -997,7 +997,7 @@ public:
}
void
store(std::string&& key, uint32_t seq, std::string&& blob)
store(std::string&& key, uint32_t seq, std::string&& blob) const
{
BOOST_LOG_TRIVIAL(trace) << "Writing to cassandra";
WriteCallbackData* data =
@@ -1009,7 +1009,7 @@ public:
struct WriteTransactionCallbackData
{
CassandraFlatMapBackend* backend;
CassandraFlatMapBackend const* backend;
// The shared pointer to the node object must exist until it's
// confirmed persisted. Otherwise, it can become deleted
// prematurely if other copies are removed from caches.
@@ -1021,7 +1021,7 @@ public:
uint32_t currentRetries = 0;
WriteTransactionCallbackData(
CassandraFlatMapBackend* f,
CassandraFlatMapBackend const* f,
std::string&& hash,
uint32_t sequence,
std::string&& transaction,
@@ -1036,7 +1036,7 @@ public:
};
void
writeTransaction(WriteTransactionCallbackData& data, bool isRetry)
writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);

743
reporting/ReportingETL.cpp Normal file
View File

@@ -0,0 +1,743 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/reporting/DBHelpers.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/beast/core/CurrentThreadName.h>
#include <ripple/json/json_reader.h>
#include <ripple/json/json_writer.h>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <cstdlib>
#include <iostream>
#include <string>
#include <variant>
namespace ripple {
namespace detail {
/// Convenience function for printing out basic ledger info
std::string
toString(LedgerInfo const& info)
{
std::stringstream ss;
ss << "LedgerInfo { Sequence : " << info.seq
<< " Hash : " << strHex(info.hash) << " TxHash : " << strHex(info.txHash)
<< " AccountHash : " << strHex(info.accountHash)
<< " ParentHash : " << strHex(info.parentHash) << " }";
return ss.str();
}
} // namespace detail
std::vector<AccountTransactionsData>
ReportingETL::insertTransactions(
ripple::LedgerInfo const& ledger,
org::xrpl::rpc::v1::GetLedgerResponse& data)
{
std::vector<AccountTransactionsData> accountTxData;
for (auto& txn :
*(data.mutable_transactions_list()->mutable_transactions()))
{
std::string* raw = txn.mutable_transaction_blob();
ripple::SerialIter it{raw->data(), raw->size()};
ripple::STTx sttx{it};
auto txSerializer =
std::make_shared<ripple::Serializer>(sttx.getSerializer());
ripple::TxMeta txMeta{
sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
auto metaSerializer =
std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Inserting transaction = " << sttx.getTransactionID();
ripple::uint256 nodestoreHash = sttx.getTransactionID();
accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal_);
std::string keyStr{(const char*)sttx.getTransactionID().data(), 32};
flatMapBackend_.storeTransaction(
std::move(keyStr),
ledger->info().seq,
std::move(*raw),
std::move(*txn.mutable_metadata_blob()));
}
return accountTxData;
}
std::shared_ptr<Ledger>
ReportingETL::loadInitialLedger(uint32_t startingSequence)
{
// check that database is actually empty
auto ledgers = getLedger(startingSequence);
if (ledgers.size())
{
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
<< "Database is not empty";
assert(false);
return {};
}
// fetch the ledger from the network. This function will not return until
// either the fetch is successful, or the server is being shutdown. This
// only fetches the ledger header and the transactions+metadata
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> ledgerData{
fetchLedgerData(startingSequence)};
if (!ledgerData)
return {};
ripple::LedgerInfo lgrInfo = ripple::deserializeHeader(
ripple::makeSlice(ledgerData->ledger_header()), true);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
std::vector<AccountTransactionsData> accountTxData =
insertTransactions(lgrInfo, *ledgerData);
auto start = std::chrono::system_clock::now();
// download the full account state map. This function downloads full ledger
// data and pushes the downloaded data into the writeQueue. asyncWriter
// consumes from the queue and inserts the data into the Ledger object.
// Once the below call returns, all data has been pushed into the queue
loadBalancer_.loadInitialLedger(startingSequence);
if (!stopping_)
{
flatMapBackend_.sync();
writeToPostgres(lgrInfo, accountTxData, pgPool_);
}
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = "
<< ((end - start).count()) / 1000000000.0;
return ledger;
}
/*
void
ReportingETL::publishLedger(std::shared_ptr<Ledger>& ledger)
{
app_.getOPs().pubLedger(ledger);
setLastPublish();
}
*/
/*
bool
ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
while (!stopping_)
{
auto ledger = app_.getLedgerMaster().getLedgerBySeq(ledgerSequence);
if (!ledger)
{
BOOST_LOG_TRIVIAL(warn)
<< __func__ << " : "
<< "Trying to publish. Could not find ledger with sequence = "
<< ledgerSequence;
// We try maxAttempts times to publish the ledger, waiting one
// second in between each attempt.
// If the ledger is not present in the database after maxAttempts,
// we attempt to take over as the writer. If the takeover fails,
// doContinuousETL will return, and this node will go back to
// publishing.
// If the node is in strict read only mode, we simply
// skip publishing this ledger and return false indicating the
// publish failed
if (numAttempts >= maxAttempts)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " : "
<< "Failed to publish ledger after "
<< numAttempts << " attempts.";
if (!readOnly_)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Attempting to become ETL writer";
return false;
}
else
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "In strict read-only mode. "
<< "Skipping publishing this ledger. "
<< "Beginning fast forward.";
return false;
}
}
else
{
std::this_thread::sleep_for(std::chrono::seconds(1));
++numAttempts;
}
continue;
}
publishStrand_.post([this, ledger]() {
app_.getOPs().pubLedger(ledger);
setLastPublish();
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Published ledger. " << detail::toString(ledger->info());
});
return true;
}
return false;
}
*/
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
ReportingETL::fetchLedgerData(uint32_t idx)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Attempting to fetch ledger with sequence = " << idx;
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
loadBalancer_.fetchLedger(idx, false);
BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< "GetLedger reply = " << response->DebugString();
return response;
}
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
ReportingETL::fetchLedgerDataAndDiff(uint32_t idx)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Attempting to fetch ledger with sequence = " << idx;
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
loadBalancer_.fetchLedger(idx, true);
BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< "GetLedger reply = " << response->DebugString();
return response;
}
std::pair<ripple::LedgerInfo, std::vector<AccountTransactionsData>>
ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Beginning ledger update";
LedgerInfo lgrInfo =
deserializeHeader(makeSlice(rawData.ledger_header()), true);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
std::vector<AccountTransactionsData> accountTxData{
insertTransactions(rawData)};
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserted all transactions. Number of transactions = "
<< rawData.transactions_list().transactions_size();
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
{
flatMapBackend_.store(
std::move(*obj.mutable_key()),
next->info().seq,
std::move(*obj.mutable_data()));
}
flatMapBackend_.sync();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserted/modified/deleted all objects. Number of objects = "
<< rawData.ledger_objects().objects_size();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Finished ledger update. " << detail::toString(next->info());
return {info, std::move(accountTxData)};
}
// Database must be populated when this starts
std::optional<uint32_t>
ReportingETL::runETLPipeline(uint32_t startSequence)
{
/*
* Behold, mortals! This function spawns three separate threads, which talk
* to each other via 2 different thread safe queues and 1 atomic variable.
* All threads and queues are function local. This function returns when all
* of the threads exit. There are two termination conditions: the first is
* if the load thread encounters a write conflict. In this case, the load
* thread sets writeConflict, an atomic bool, to true, which signals the
* other threads to stop. The second termination condition is when the
* entire server is shutting down, which is detected in one of three ways:
* 1. isStopping() returns true if the server is shutting down
* 2. networkValidatedLedgers_.waitUntilValidatedByNetwork returns
* false, signaling the wait was aborted.
* 3. fetchLedgerDataAndDiff returns an empty optional, signaling the fetch
* was aborted.
* In all cases, the extract thread detects this condition,
* and pushes an empty optional onto the transform queue. The transform
* thread, upon popping an empty optional, pushes an empty optional onto the
* load queue, and then returns. The load thread, upon popping an empty
* optional, returns.
*/
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Starting etl pipeline";
writing_ = true;
auto parent = getLedger(startSequence - 1);
if (!parent)
{
assert(false);
Throw<std::runtime_error>("runETLPipeline: parent ledger is null");
}
std::atomic_bool writeConflict = false;
std::optional<uint32_t> lastPublishedSequence;
constexpr uint32_t maxQueueSize = 1000;
auto begin = std::chrono::system_clock::now();
ThreadSafeQueue<std::optional<org::xrpl::rpc::v1::GetLedgerResponse>>
transformQueue{maxQueueSize};
std::thread extracter{[this,
&startSequence,
&writeConflict,
&transformQueue]() {
beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence;
// there are two stopping conditions here.
// First, if there is a write conflict in the load thread, the ETL
// mechanism should stop.
// The other stopping condition is if the entire server is shutting
// down. This can be detected in a variety of ways. See the comment
// at the top of the function
while (networkValidatedLedgers_.waitUntilValidatedByNetwork(
currentSequence) &&
!writeConflict && !isStopping())
{
auto start = std::chrono::system_clock::now();
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{
fetchLedgerDataAndDiff(currentSequence)};
auto end = std::chrono::system_clock::now();
auto time = ((end - start).count()) / 1000000000.0;
auto tps =
fetchResponse->transactions_list().transactions_size() / time;
BOOST_LOG_TRIVIAL(debug) << "Extract phase time = " << time
<< " . Extract phase tps = " << tps;
// if the fetch is unsuccessful, stop. fetchLedger only returns
// false if the server is shutting down, or if the ledger was
// found in the database (which means another process already
// wrote the ledger that this process was trying to extract;
// this is a form of a write conflict). Otherwise,
// fetchLedgerDataAndDiff will keep trying to fetch the
// specified ledger until successful
if (!fetchResponse)
{
break;
}
transformQueue.push(std::move(fetchResponse));
++currentSequence;
}
// empty optional tells the transformer to shut down
transformQueue.push({});
}};
std::thread transformer{
[this, &parent, &writeConflict, &loadQueue, &transformQueue]() {
beast::setCurrentThreadName("rippled: ReportingETL transform");
while (!writeConflict)
{
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchResponse{transformQueue.pop()};
// if fetchResponse is an empty optional, the extracter thread
// has stopped and the transformer should stop as well
if (!fetchResponse)
{
break;
}
if (isStopping())
continue;
auto start = std::chrono::system_clock::now();
auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse);
auto end = std::chrono::system_clock::now();
if (!writeToPostgres(lgrInfo, accountTxData, pgPool_))
writeConflict = true;
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(debug) << "transform time = " << duration;
if (!writeConflict)
{
publishLedger(ledger);
lastPublishedSequence = lgrInfo.seq;
}
}
}};
std::thread loader{[this,
&lastPublishedSequence,
&loadQueue,
&writeConflict]() {
beast::setCurrentThreadName("rippled: ReportingETL load");
size_t totalTransactions = 0;
double totalTime = 0;
while (!writeConflict)
{
std::optional<std::pair<
std::shared_ptr<Ledger>,
std::vector<AccountTransactionsData>>>
result{loadQueue.pop()};
// if result is an empty optional, the transformer thread has
// stopped and the loader should stop as well
if (!result)
break;
if (isStopping())
continue;
auto& ledger = result->first;
auto& accountTxData = result->second;
auto start = std::chrono::system_clock::now();
// write to the key-value store
// flushLedger(ledger);
auto mid = std::chrono::system_clock::now();
// write to RDBMS
// if there is a write conflict, some other process has already
// written this ledger and has taken over as the ETL writer
#ifdef RIPPLED_REPORTING
if (!writeToPostgres(
ledger->info(), accountTxData, app_.getPgPool(), journal_))
writeConflict = true;
#endif
auto end = std::chrono::system_clock::now();
if (!writeConflict)
{
publishLedger(ledger);
lastPublishedSequence = ledger->info().seq;
}
// print some performance numbers
auto kvTime = ((mid - start).count()) / 1000000000.0;
auto relationalTime = ((end - mid).count()) / 1000000000.0;
size_t numTxns = accountTxData.size();
totalTime += kvTime;
totalTransactions += numTxns;
BOOST_LOG_TRIVIAL(info)
<< "Load phase of etl : "
<< "Successfully published ledger! Ledger info: "
<< detail::toString(ledger->info())
<< ". txn count = " << numTxns
<< ". key-value write time = " << kvTime
<< ". relational write time = " << relationalTime
<< ". key-value tps = " << numTxns / kvTime
<< ". relational tps = " << numTxns / relationalTime
<< ". total key-value tps = " << totalTransactions / totalTime;
}
}};
// wait for all of the threads to stop
loader.join();
extracter.join();
transformer.join();
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence
<< " in " << ((end - begin).count()) / 1000000000.0;
writing_ = false;
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Stopping etl pipeline";
return lastPublishedSequence;
}
// main loop. The software begins monitoring the ledgers that are validated
// by the nework. The member networkValidatedLedgers_ keeps track of the
// sequences of ledgers validated by the network. Whenever a ledger is validated
// by the network, the software looks for that ledger in the database. Once the
// ledger is found in the database, the software publishes that ledger to the
// ledgers stream. If a network validated ledger is not found in the database
// after a certain amount of time, then the software attempts to take over
// responsibility of the ETL process, where it writes new ledgers to the
// database. The software will relinquish control of the ETL process if it
// detects that another process has taken over ETL.
void
ReportingETL::monitor()
{
auto ledgers = getLedger(std::monostate);
if (!ledgers.size())
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Database is empty. Will download a ledger "
"from the network.";
if (startSequence_)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "ledger sequence specified in config. "
<< "Will begin ETL process starting with ledger "
<< *startSequence_;
ledger = loadInitialLedger(*startSequence_);
}
else
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Waiting for next ledger to be validated by network...";
std::optional<uint32_t> mostRecentValidated =
networkValidatedLedgers_.getMostRecent();
if (mostRecentValidated)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Ledger " << *mostRecentValidated
<< " has been validated. "
<< "Downloading...";
ledger = loadInitialLedger(*mostRecentValidated);
}
else
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "The wait for the next validated "
<< "ledger has been aborted. "
<< "Exiting monitor loop";
return;
}
}
}
else
{
if (startSequence_)
{
Throw<std::runtime_error>(
"start sequence specified but db is already populated");
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Database already populated. Picking up from the tip of history";
}
if (!ledger)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Failed to load initial ledger. Exiting monitor loop";
return;
}
else
{
// publishLedger(ledger);
}
uint32_t nextSequence = ledger->info().seq + 1;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Database is populated. "
<< "Starting monitor loop. sequence = " << nextSequence;
while (!stopping_ &&
networkValidatedLedgers_.waitUntilValidatedByNetwork(nextSequence))
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Ledger with sequence = " << nextSequence
<< " has been validated by the network. "
<< "Attempting to find in database and publish";
// Attempt to take over responsibility of ETL writer after 10 failed
// attempts to publish the ledger. publishLedger() fails if the
// ledger that has been validated by the network is not found in the
// database after the specified number of attempts. publishLedger()
// waits one second between each attempt to read the ledger from the
// database
//
// In strict read-only mode, when the software fails to find a
// ledger in the database that has been validated by the network,
// the software will only try to publish subsequent ledgers once,
// until one of those ledgers is found in the database. Once the
// software successfully publishes a ledger, the software will fall
// back to the normal behavior of trying several times to publish
// the ledger that has been validated by the network. In this
// manner, a reporting processing running in read-only mode does not
// need to restart if the database is wiped.
constexpr size_t timeoutSeconds = 10;
bool success = publishLedger(nextSequence, timeoutSeconds);
if (!success)
{
BOOST_LOG_TRIVIAL(warn)
<< __func__ << " : "
<< "Failed to publish ledger with sequence = " << nextSequence
<< " . Beginning ETL";
// doContinousETLPipelined returns the most recent sequence
// published empty optional if no sequence was published
std::optional<uint32_t> lastPublished =
runETLPipeline(nextSequence);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Aborting ETL. Falling back to publishing";
// if no ledger was published, don't increment nextSequence
if (lastPublished)
nextSequence = *lastPublished + 1;
}
else
{
++nextSequence;
}
}
}
void
ReportingETL::monitorReadOnly()
{
BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode";
std::optional<uint32_t> mostRecent =
networkValidatedLedgers_.getMostRecent();
if (!mostRecent)
return;
uint32_t sequence = *mostRecent;
bool success = true;
while (!stopping_ &&
networkValidatedLedgers_.waitUntilValidatedByNetwork(sequence))
{
success = publishLedger(sequence, success ? 30 : 1);
++sequence;
}
}
void
ReportingETL::doWork()
{
worker_ = std::thread([this]() {
beast::setCurrentThreadName("rippled: ReportingETL worker");
if (readOnly_)
monitorReadOnly();
else
monitor();
});
}
ReportingETL::ReportingETL(
boost::json::object const& config,
boost::asio::io_context& ioc)
: publishStrand_(ioc)
, ioContext_(ioc)
, loadBalancer_(*this)
, flatMapBackend_(
(*config).at("database").as_object().at("cassandra").as_object())
{
// if present, get endpoint from config
if (app_.config().exists("reporting"))
{
flatMapBackend_.open();
Section section = app_.config().section("reporting");
BOOST_LOG_TRIVIAL(debug) << "Parsing config info";
auto& vals = section.values();
for (auto& v : vals)
{
BOOST_LOG_TRIVIAL(debug) << "val is " << v;
Section source = app_.config().section(v);
std::pair<std::string, bool> ipPair = source.find("source_ip");
if (!ipPair.second)
continue;
std::pair<std::string, bool> wsPortPair =
source.find("source_ws_port");
if (!wsPortPair.second)
continue;
std::pair<std::string, bool> grpcPortPair =
source.find("source_grpc_port");
if (!grpcPortPair.second)
{
// add source without grpc port
// used in read-only mode to detect when new ledgers have
// been validated. Used for publishing
if (app_.config().reportingReadOnly())
loadBalancer_.add(ipPair.first, wsPortPair.first);
continue;
}
loadBalancer_.add(
ipPair.first, wsPortPair.first, grpcPortPair.first);
}
// this is true iff --reportingReadOnly was passed via command line
readOnly_ = app_.config().reportingReadOnly();
// if --reportingReadOnly was not passed via command line, check config
// file. Command line takes precedence
if (!readOnly_)
{
std::pair<std::string, bool> ro = section.find("read_only");
if (ro.second)
{
readOnly_ = (ro.first == "true" || ro.first == "1");
app_.config().setReportingReadOnly(readOnly_);
}
}
// handle command line arguments
if (app_.config().START_UP == Config::StartUpType::FRESH && !readOnly_)
{
startSequence_ = std::stol(app_.config().START_LEDGER);
}
// if not passed via command line, check config for start sequence
if (!startSequence_)
{
std::pair<std::string, bool> start = section.find("start_sequence");
if (start.second)
{
startSequence_ = std::stoi(start.first);
}
}
std::pair<std::string, bool> flushInterval =
section.find("flush_interval");
if (flushInterval.second)
flushInterval_ = std::stoi(flushInterval.first);
std::pair<std::string, bool> numMarkers = section.find("num_markers");
if (numMarkers.second)
numMarkers_ = std::stoi(numMarkers.first);
ReportingETL::instance_ = this;
}
}
ReportingETL* ReportingETL::instance_ = 0;
} // namespace ripple

334
reporting/ReportingETL.h Normal file
View File

@@ -0,0 +1,334 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED
#define RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED
#include <boost/algorithm/string.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/ETLHelpers.h>
#include <reporting/ETLSource.h>
#include <reporting/ReportingBackend.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <chrono>
namespace ripple {
struct AccountTransactionsData;
/**
* This class is responsible for continuously extracting data from a
* p2p node, and writing that data to the databases. Usually, multiple different
* processes share access to the same network accessible databases, in which
* case only one such process is performing ETL and writing to the database. The
* other processes simply monitor the database for new ledgers, and publish
* those ledgers to the various subscription streams. If a monitoring process
* determines that the ETL writer has failed (no new ledgers written for some
* time), the process will attempt to become the ETL writer. If there are
* multiple monitoring processes that try to become the ETL writer at the same
* time, one will win out, and the others will fall back to
* monitoring/publishing. In this sense, this class dynamically transitions from
* monitoring to writing and from writing to monitoring, based on the activity
* of other processes running on different machines.
*/
class ReportingETL
{
private:
CassandraFlatMapBackend flatMapBackend_;
std::thread worker_;
// boost::asio::io_context& ioContext_;
/// Strand to ensure that ledgers are published in order.
/// If ETL is started far behind the network, ledgers will be written and
/// published very rapidly. Monitoring processes will publish ledgers as
/// they are written. However, to publish a ledger, the monitoring process
/// needs to read all of the transactions for that ledger from the database.
/// Reading the transactions from the database requires network calls, which
/// can be slow. It is imperative however that the monitoring processes keep
/// up with the writer, else the monitoring processes will not be able to
/// detect if the writer failed. Therefore, publishing each ledger (which
/// includes reading all of the transactions from the database) is done from
/// the application wide asio io_service, and a strand is used to ensure
/// ledgers are published in order
// boost::asio::io_context::strand publishStrand_;
/// Mechanism for communicating with ETL sources. ETLLoadBalancer wraps an
/// arbitrary number of ETL sources and load balances ETL requests across
/// those sources.
ETLLoadBalancer loadBalancer_;
/// Mechanism for detecting when the network has validated a new ledger.
/// This class provides a way to wait for a specific ledger to be validated
NetworkValidatedLedgers networkValidatedLedgers_;
/// Whether the software is stopping
std::atomic_bool stopping_ = false;
/// Used to determine when to write to the database during the initial
/// ledger download. By default, the software downloads an entire ledger and
/// then writes to the database. If flushInterval_ is non-zero, the software
/// will write to the database as new ledger data (SHAMap leaf nodes)
/// arrives. It is not neccesarily more effient to write the data as it
/// arrives, as different SHAMap leaf nodes share the same SHAMap inner
/// nodes; flushing prematurely can result in the same SHAMap inner node
/// being written to the database more than once. It is recommended to use
/// the default value of 0 for this variable; however, different values can
/// be experimented with if better performance is desired.
size_t flushInterval_ = 0;
/// This variable controls the number of GetLedgerData calls that will be
/// executed in parallel during the initial ledger download. GetLedgerData
/// allows clients to page through a ledger over many RPC calls.
/// GetLedgerData returns a marker that is used as an offset in a subsequent
/// call. If numMarkers_ is greater than 1, there will be multiple chains of
/// GetLedgerData calls iterating over different parts of the same ledger in
/// parallel. This can dramatically speed up the time to download the
/// initial ledger. However, a higher value for this member variable puts
/// more load on the ETL source.
size_t numMarkers_ = 2;
/// Whether the process is in strict read-only mode. In strict read-only
/// mode, the process will never attempt to become the ETL writer, and will
/// only publish ledgers as they are written to the database.
bool readOnly_ = false;
/// Whether the process is writing to the database. Used by server_info
std::atomic_bool writing_ = false;
/// Ledger sequence to start ETL from. If this is empty, ETL will start from
/// the next ledger validated by the network. If this is set, and the
/// database is already populated, an error is thrown.
std::optional<uint32_t> startSequence_;
/// The time that the most recently published ledger was published. Used by
/// server_info
std::chrono::time_point<std::chrono::system_clock> lastPublish_;
std::mutex publishTimeMtx_;
std::chrono::time_point<std::chrono::system_clock>
getLastPublish()
{
std::unique_lock<std::mutex> lck(publishTimeMtx_);
return lastPublish_;
}
void
setLastPublish()
{
std::unique_lock<std::mutex> lck(publishTimeMtx_);
lastPublish_ = std::chrono::system_clock::now();
}
/// Download a ledger with specified sequence in full, via GetLedgerData,
/// and write the data to the databases. This takes several minutes or
/// longer.
/// @param sequence the sequence of the ledger to download
/// @return The ledger downloaded, with a full transaction and account state
/// map
std::shared_ptr<Ledger>
loadInitialLedger(uint32_t sequence);
/// Run ETL. Extracts ledgers and writes them to the database, until a write
/// conflict occurs (or the server shuts down).
/// @note database must already be populated when this function is called
/// @param startSequence the first ledger to extract
/// @return the last ledger written to the database, if any
std::optional<uint32_t>
runETLPipeline(uint32_t startSequence);
/// Monitor the network for newly validated ledgers. Also monitor the
/// database to see if any process is writing those ledgers. This function
/// is called when the application starts, and will only return when the
/// application is shutting down. If the software detects the database is
/// empty, this function will call loadInitialLedger(). If the software
/// detects ledgers are not being written, this function calls
/// runETLPipeline(). Otherwise, this function publishes ledgers as they are
/// written to the database.
void
monitor();
/// Monitor the database for newly written ledgers.
/// Similar to the monitor(), except this function will never call
/// runETLPipeline() or loadInitialLedger(). This function only publishes
/// ledgers as they are written to the database.
void
monitorReadOnly();
/// Extract data for a particular ledger from an ETL source. This function
/// continously tries to extract the specified ledger (using all available
/// ETL sources) until the extraction succeeds, or the server shuts down.
/// @param sequence sequence of the ledger to extract
/// @return ledger header and transaction+metadata blobs. Empty optional
/// if the server is shutting down
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedgerData(uint32_t sequence);
/// Extract data for a particular ledger from an ETL source. This function
/// continously tries to extract the specified ledger (using all available
/// ETL sources) until the extraction succeeds, or the server shuts down.
/// @param sequence sequence of the ledger to extract
/// @return ledger header, transaction+metadata blobs, and all ledger
/// objects created, modified or deleted between this ledger and the parent.
/// Empty optional if the server is shutting down
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedgerDataAndDiff(uint32_t sequence);
/// Insert all of the extracted transactions into the ledger
/// @param ledger ledger to insert transactions into
/// @param data data extracted from an ETL source
/// @return struct that contains the neccessary info to write to the
/// transctions and account_transactions tables in Postgres (mostly
/// transaction hashes, corresponding nodestore hashes and affected
/// accounts)
std::vector<AccountTransactionsData>
insertTransactions(
ripple::LedgerInfo const& ledger,
org::xrpl::rpc::v1::GetLedgerResponse& data);
/// Build the next ledger using the previous ledger and the extracted data.
/// This function calls insertTransactions()
/// @note rawData should be data that corresponds to the ledger immediately
/// following parent
/// @param parent the previous ledger
/// @param rawData data extracted from an ETL source
/// @return the newly built ledger and data to write to Postgres
std::pair<std::shared_ptr<Ledger>, std::vector<AccountTransactionsData>>
buildNextLedger(
std::shared_ptr<Ledger>& parent,
org::xrpl::rpc::v1::GetLedgerResponse& rawData);
/*
/// Attempt to read the specified ledger from the database, and then publish
/// that ledger to the ledgers stream.
/// @param ledgerSequence the sequence of the ledger to publish
/// @param maxAttempts the number of times to attempt to read the ledger
/// from the database. 1 attempt per second
/// @return whether the ledger was found in the database and published
bool
publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts = 10);
/// Publish the passed in ledger
/// @param ledger the ledger to publish
void
publishLedger(std::shared_ptr<Ledger>& ledger);
*/
public:
ReportingETL(boost::json::object& config);
~ReportingETL()
{
onStop();
}
NetworkValidatedLedgers&
getNetworkValidatedLedgers()
{
return networkValidatedLedgers_;
}
bool
isStopping()
{
return stopping_;
}
/// Get the number of markers to use during the initial ledger download.
/// This is equivelent to the degree of parallelism during the initial
/// ledger download
/// @return the number of markers
uint32_t
getNumMarkers()
{
return numMarkers_;
}
/*
Json::Value
getInfo()
{
Json::Value result(Json::objectValue);
result["etl_sources"] = loadBalancer_.toJson();
result["is_writer"] = writing_.load();
auto last = getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_time"] = to_string(
date::floor<std::chrono::microseconds>(getLastPublish()));
return result;
}
*/
/// start all of the necessary components and begin ETL
void
run()
{
BOOST_LOG_TRIVIAL(info) << "Starting reporting etl";
stopping_ = false;
loadBalancer_.start();
doWork();
}
/// Stop all the necessary components
void
onStop()
{
BOOST_LOG_TRIVIAL(info) << "onStop called";
BOOST_LOG_TRIVIAL(debug) << "Stopping Reporting ETL";
stopping_ = true;
networkValidatedLedgers_.stop();
loadBalancer_.stop();
BOOST_LOG_TRIVIAL(debug) << "Stopped loadBalancer";
if (worker_.joinable())
worker_.join();
BOOST_LOG_TRIVIAL(debug) << "Joined worker thread";
}
ETLLoadBalancer&
getETLLoadBalancer()
{
return loadBalancer_;
}
CassandraFlatMapBackend&
getFlatMapBackend()
{
return flatMapBackend_;
}
private:
void
doWork();
};
} // namespace ripple
#endif