Compare commits

..

8 Commits

Author SHA1 Message Date
Denis Angell
b4f7b91216 Merge branch 'dev' into reduced-import 2024-12-06 13:02:32 +01:00
Denis Angell
cba05af058 [fold] clang-format 2024-12-06 13:00:45 +01:00
Denis Angell
8e88a49d26 [enhance] update blackhole requirements
only `tefIMPORT_BLACKHOLED` on;

- SetRegularKey
- SignersListSet
2024-12-06 12:47:32 +01:00
Denis Angell
3879c529c1 Fix: failing assert (#397) 2024-11-28 19:20:44 +10:00
Denis Angell
61753d39bb [fold] clang-format 2024-11-28 09:52:37 +01:00
Denis Angell
0033b3ae4c [fold] add test 2024-11-28 09:51:09 +01:00
Richard Holland
f636f8158e fixReduceImport 2024-11-27 10:16:22 +11:00
Ekiserrepé
a05d58a6e9 Update README.md (#396)
Updated Xaman link.
2024-11-26 08:52:49 +10:00
22 changed files with 188 additions and 2016 deletions

View File

@@ -392,7 +392,6 @@ target_sources (rippled PRIVATE
src/ripple/app/misc/NegativeUNLVote.cpp
src/ripple/app/misc/NetworkOPs.cpp
src/ripple/app/misc/SHAMapStoreImp.cpp
src/ripple/app/misc/StateAccounting.cpp
src/ripple/app/misc/detail/impl/WorkSSL.cpp
src/ripple/app/misc/impl/AccountTxPaging.cpp
src/ripple/app/misc/impl/AmendmentTable.cpp

View File

@@ -152,9 +152,6 @@ public:
std::string
getCompleteLedgers();
RangeSet<std::uint32_t>
getCompleteLedgersRangeSet();
/** Apply held transactions to the open ledger
This is normally called as we close the ledger.
The open ledger remains open to handle new transactions

View File

@@ -1714,13 +1714,6 @@ LedgerMaster::getCompleteLedgers()
return to_string(mCompleteLedgers);
}
RangeSet<std::uint32_t>
LedgerMaster::getCompleteLedgersRangeSet()
{
std::lock_guard sl(mCompleteLock);
return mCompleteLedgers;
}
std::optional<NetClock::time_point>
LedgerMaster::getCloseTimeBySeq(LedgerIndex ledgerIndex)
{

View File

@@ -37,7 +37,6 @@
#include <ripple/app/main/NodeStoreScheduler.h>
#include <ripple/app/main/Tuning.h>
#include <ripple/app/misc/AmendmentTable.h>
#include <ripple/app/misc/DatagramMonitor.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/LoadFeeTrack.h>
#include <ripple/app/misc/NetworkOPs.h>
@@ -168,8 +167,6 @@ public:
std::unique_ptr<Logs> logs_;
std::unique_ptr<TimeKeeper> timeKeeper_;
std::unique_ptr<DatagramMonitor> datagram_monitor_;
std::uint64_t const instanceCookie_;
beast::Journal m_journal;
@@ -1526,14 +1523,6 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
if (reportingETL_)
reportingETL_->start();
// Datagram monitor if applicable
if (!config_->standalone() && !config_->DATAGRAM_MONITOR.empty())
{
datagram_monitor_ = std::make_unique<DatagramMonitor>(*this);
if (datagram_monitor_)
datagram_monitor_->start();
}
return true;
}

File diff suppressed because it is too large Load Diff

View File

@@ -33,7 +33,6 @@
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/LoadFeeTrack.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/StateAccounting.h>
#include <ripple/app/misc/Transaction.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/app/misc/ValidatorKeys.h>
@@ -68,9 +67,9 @@
#include <ripple/rpc/CTID.h>
#include <ripple/rpc/DeliveredAmount.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/UDPInfoSub.h>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/steady_timer.hpp>
#include <exception>
#include <mutex>
#include <set>
@@ -117,6 +116,81 @@ class NetworkOPsImp final : public NetworkOPs
running,
};
static std::array<char const*, 5> const states_;
/**
* State accounting records two attributes for each possible server state:
* 1) Amount of time spent in each state (in microseconds). This value is
* updated upon each state transition.
* 2) Number of transitions to each state.
*
* This data can be polled through server_info and represented by
* monitoring systems similarly to how bandwidth, CPU, and other
* counter-based metrics are managed.
*
* State accounting is more accurate than periodic sampling of server
* state. With periodic sampling, it is very likely that state transitions
* are missed, and accuracy of time spent in each state is very rough.
*/
class StateAccounting
{
struct Counters
{
explicit Counters() = default;
std::uint64_t transitions = 0;
std::chrono::microseconds dur = std::chrono::microseconds(0);
};
OperatingMode mode_ = OperatingMode::DISCONNECTED;
std::array<Counters, 5> counters_;
mutable std::mutex mutex_;
std::chrono::steady_clock::time_point start_ =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point const processStart_ = start_;
std::uint64_t initialSyncUs_{0};
static std::array<Json::StaticString const, 5> const states_;
public:
explicit StateAccounting()
{
counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions = 1;
}
/**
* Record state transition. Update duration spent in previous
* state.
*
* @param om New state.
*/
void
mode(OperatingMode om);
/**
* Output state counters in JSON format.
*
* @obj Json object to which to add state accounting data.
*/
void
json(Json::Value& obj) const;
struct CounterData
{
decltype(counters_) counters;
decltype(mode_) mode;
decltype(start_) start;
decltype(initialSyncUs_) initialSyncUs;
};
CounterData
getCounterData() const
{
std::lock_guard lock(mutex_);
return {counters_, mode_, start_, initialSyncUs_};
}
};
//! Server fees published on `server` subscription
struct ServerFeeSummary
{
@@ -198,9 +272,6 @@ public:
std::string
strOperatingMode(bool const admin = false) const override;
StateAccounting::CounterData
getStateAccountingData();
//
// Transaction operations.
//
@@ -705,17 +776,11 @@ private:
DispatchState mDispatchState = DispatchState::none;
std::vector<TransactionStatus> mTransactions;
StateAccounting accounting_;
StateAccounting accounting_{};
std::set<uint256> pendingValidations_;
std::mutex validationsMutex_;
RCLConsensus&
getConsensus();
LedgerMaster&
getLedgerMaster();
private:
struct Stats
{
@@ -778,6 +843,19 @@ private:
//------------------------------------------------------------------------------
static std::array<char const*, 5> const stateNames{
{"disconnected", "connected", "syncing", "tracking", "full"}};
std::array<char const*, 5> const NetworkOPsImp::states_ = stateNames;
std::array<Json::StaticString const, 5> const
NetworkOPsImp::StateAccounting::states_ = {
{Json::StaticString(stateNames[0]),
Json::StaticString(stateNames[1]),
Json::StaticString(stateNames[2]),
Json::StaticString(stateNames[3]),
Json::StaticString(stateNames[4])}};
static auto const genesisAccountId = calcAccountID(
generateKeyPair(KeyType::secp256k1, generateSeed("masterpassphrase"))
.first);
@@ -1052,7 +1130,7 @@ NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
}
}
return {StateAccounting::states_[static_cast<std::size_t>(mode)].c_str()};
return states_[static_cast<std::size_t>(mode)];
}
void
@@ -2318,19 +2396,6 @@ NetworkOPsImp::getConsensusInfo()
return mConsensus.getJson(true);
}
// RHTODO: not threadsafe?
RCLConsensus&
NetworkOPsImp::getConsensus()
{
return mConsensus;
}
LedgerMaster&
NetworkOPsImp::getLedgerMaster()
{
return m_ledgerMaster;
}
Json::Value
NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
{
@@ -4128,12 +4193,6 @@ NetworkOPsImp::stateAccounting(Json::Value& obj)
accounting_.json(obj);
}
StateAccounting::CounterData
NetworkOPsImp::getStateAccountingData()
{
return accounting_.getCounterData();
}
// <-- bool: true=erased, false=was not there
bool
NetworkOPsImp::unsubValidations(std::uint64_t uSeq)
@@ -4604,6 +4663,50 @@ NetworkOPsImp::collect_metrics()
counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
}
void
NetworkOPsImp::StateAccounting::mode(OperatingMode om)
{
auto now = std::chrono::steady_clock::now();
std::lock_guard lock(mutex_);
++counters_[static_cast<std::size_t>(om)].transitions;
if (om == OperatingMode::FULL &&
counters_[static_cast<std::size_t>(om)].transitions == 1)
{
initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
now - processStart_)
.count();
}
counters_[static_cast<std::size_t>(mode_)].dur +=
std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
mode_ = om;
start_ = now;
}
void
NetworkOPsImp::StateAccounting::json(Json::Value& obj) const
{
auto [counters, mode, start, initialSync] = getCounterData();
auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
obj[jss::state_accounting] = Json::objectValue;
for (std::size_t i = static_cast<std::size_t>(OperatingMode::DISCONNECTED);
i <= static_cast<std::size_t>(OperatingMode::FULL);
++i)
{
obj[jss::state_accounting][states_[i]] = Json::objectValue;
auto& state = obj[jss::state_accounting][states_[i]];
state[jss::transitions] = std::to_string(counters[i].transitions);
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}
obj[jss::server_state_duration_us] = std::to_string(current.count());
if (initialSync)
obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
}
//------------------------------------------------------------------------------
std::unique_ptr<NetworkOPs>

View File

@@ -20,10 +20,8 @@
#ifndef RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
#define RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
#include <ripple/app/consensus/RCLConsensus.h>
#include <ripple/app/consensus/RCLCxPeerPos.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/misc/StateAccounting.h>
#include <ripple/core/JobQueue.h>
#include <ripple/ledger/ReadView.h>
#include <ripple/net/InfoSub.h>
@@ -44,6 +42,35 @@ class LedgerMaster;
class Transaction;
class ValidatorKeys;
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
// should use this interface. The RPC code will primarily be a light wrapper
// over this code.
//
// Eventually, it will check the node's operating mode (synched, unsynched,
// etectera) and defer to the correct means of processing. The current
// code assumes this node is synched (and will continue to do so until
// there's a functional network.
//
/** Specifies the mode under which the server believes it's operating.
This has implications about how the server processes transactions and
how it responds to requests (e.g. account balance request).
@note Other code relies on the numerical values of these constants; do
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
SYNCING = 2, //!< fallen slightly behind
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
/** Provides server functionality for clients.
Clients include backend applications, local commands, and connected
@@ -194,13 +221,6 @@ public:
virtual Json::Value
getConsensusInfo() = 0;
virtual RCLConsensus&
getConsensus() = 0;
virtual LedgerMaster&
getLedgerMaster() = 0;
virtual Json::Value
getServerInfo(bool human, bool admin, bool counters) = 0;
virtual void
@@ -208,9 +228,6 @@ public:
virtual Json::Value
getLedgerFetchInfo() = 0;
virtual StateAccounting::CounterData
getStateAccountingData() = 0;
/** Accepts the current transaction tree, return the new ledger's sequence
This API is only used via RPC with the server in STANDALONE mode and

View File

@@ -1,49 +0,0 @@
#include <ripple/app/misc/StateAccounting.h>
namespace ripple {
void
StateAccounting::mode(OperatingMode om)
{
std::lock_guard lock(mutex_);
auto now = std::chrono::steady_clock::now();
++counters_[static_cast<std::size_t>(om)].transitions;
if (om == OperatingMode::FULL &&
counters_[static_cast<std::size_t>(om)].transitions == 1)
{
initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
now - processStart_)
.count();
}
counters_[static_cast<std::size_t>(mode_)].dur +=
std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
mode_ = om;
start_ = now;
}
void
StateAccounting::json(Json::Value& obj)
{
auto [counters, mode, start, initialSync] = getCounterData();
auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
obj[jss::state_accounting] = Json::objectValue;
for (std::size_t i = static_cast<std::size_t>(OperatingMode::DISCONNECTED);
i <= static_cast<std::size_t>(OperatingMode::FULL);
++i)
{
obj[jss::state_accounting][states_[i]] = Json::objectValue;
auto& state = obj[jss::state_accounting][states_[i]];
state[jss::transitions] = std::to_string(counters[i].transitions);
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}
obj[jss::server_state_duration_us] = std::to_string(current.count());
if (initialSync)
obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
}
} // namespace ripple

View File

@@ -1,99 +0,0 @@
#ifndef RIPPLE_APP_MAIN_STATEACCOUNTING_H_INCLUDED
#define RIPPLE_APP_MAIN_STATEACCOUNTING_H_INCLUDED
#include <ripple/basics/chrono.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/json/json_value.h>
#include <ripple/protocol/jss.h>
#include <array>
#include <mutex>
namespace ripple {
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
// should use this interface. The RPC code will primarily be a light wrapper
// over this code.
//
// Eventually, it will check the node's operating mode (synched, unsynched,
// etectera) and defer to the correct means of processing. The current
// code assumes this node is synched (and will continue to do so until
// there's a functional network.
//
/** Specifies the mode under which the server believes it's operating.
This has implications about how the server processes transactions and
how it responds to requests (e.g. account balance request).
@note Other code relies on the numerical values of these constants; do
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
SYNCING = 2, //!< fallen slightly behind
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
class StateAccounting
{
public:
constexpr static std::array<Json::StaticString const, 5> const states_ = {
{Json::StaticString("disconnected"),
Json::StaticString("connected"),
Json::StaticString("syncing"),
Json::StaticString("tracking"),
Json::StaticString("full")}};
struct Counters
{
explicit Counters() = default;
std::uint64_t transitions = 0;
std::chrono::microseconds dur = std::chrono::microseconds(0);
};
private:
OperatingMode mode_ = OperatingMode::DISCONNECTED;
std::array<Counters, 5> counters_;
mutable std::mutex mutex_;
std::chrono::steady_clock::time_point start_ =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point const processStart_ = start_;
std::uint64_t initialSyncUs_{0};
public:
explicit StateAccounting()
{
counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions = 1;
}
//! Record state transition. Update duration spent in previous state.
void
mode(OperatingMode om);
//! Output state counters in JSON format.
void
json(Json::Value& obj);
using CounterData = std::tuple<
decltype(counters_),
decltype(mode_),
decltype(start_),
decltype(initialSyncUs_)>;
CounterData
getCounterData()
{
return {counters_, mode_, start_, initialSyncUs_};
}
};
} // namespace ripple
#endif

View File

@@ -155,8 +155,6 @@ public:
std::map<std::string, PublicKey>
IMPORT_VL_KEYS; // hex string -> class PublicKey (for caching purposes)
std::vector<std::string> DATAGRAM_MONITOR;
enum StartUpType {
FRESH,
NORMAL,

View File

@@ -101,7 +101,6 @@ struct ConfigSection
#define SECTION_SWEEP_INTERVAL "sweep_interval"
#define SECTION_NETWORK_ID "network_id"
#define SECTION_IMPORT_VL_KEYS "import_vl_keys"
#define SECTION_DATAGRAM_MONITOR "datagram_monitor"
} // namespace ripple

View File

@@ -281,9 +281,6 @@ Config::setupControl(bool bQuiet, bool bSilent, bool bStandalone)
// RAM and CPU resources. We default to "tiny" for standalone mode.
if (!bStandalone)
{
NODE_SIZE = 4;
return;
// First, check against 'minimum' RAM requirements per node size:
auto const& threshold =
sizedItems[std::underlying_type_t<SizedItem>(SizedItem::ramSizeGB)];
@@ -468,24 +465,26 @@ Config::loadFromString(std::string const& fileContents)
SNTP_SERVERS = *s;
// if the user has specified ip:port then replace : with a space.
auto replaceColons = [](std::vector<std::string>& strVec) {
const static std::regex e(":([0-9]+)$");
for (auto& line : strVec)
{
// skip anything that might be an ipv6 address
if (std::count(line.begin(), line.end(), ':') != 1)
continue;
{
auto replaceColons = [](std::vector<std::string>& strVec) {
const static std::regex e(":([0-9]+)$");
for (auto& line : strVec)
{
// skip anything that might be an ipv6 address
if (std::count(line.begin(), line.end(), ':') != 1)
continue;
std::string result = std::regex_replace(line, e, " $1");
// sanity check the result of the replace, should be same length
// as input
if (result.size() == line.size())
line = result;
}
};
std::string result = std::regex_replace(line, e, " $1");
// sanity check the result of the replace, should be same length
// as input
if (result.size() == line.size())
line = result;
}
};
replaceColons(IPS_FIXED);
replaceColons(IPS);
replaceColons(IPS_FIXED);
replaceColons(IPS);
}
{
std::string dbPath;
@@ -510,12 +509,6 @@ Config::loadFromString(std::string const& fileContents)
NETWORK_ID = beast::lexicalCastThrow<uint32_t>(strTemp);
}
if (auto s = getIniFileSection(secConfig, SECTION_DATAGRAM_MONITOR))
{
DATAGRAM_MONITOR = *s;
replaceColons(DATAGRAM_MONITOR);
}
if (getSingleSection(secConfig, SECTION_PEER_PRIVATE, strTemp, j_))
PEER_PRIVATE = beast::lexicalCastThrow<bool>(strTemp);

View File

@@ -30,7 +30,6 @@
#include <ripple/rpc/Context.h>
#include <ripple/rpc/Role.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/UDPInfoSub.h>
namespace ripple {
@@ -43,7 +42,7 @@ doSubscribe(RPC::JsonContext& context)
if (!context.infoSub && !context.params.isMember(jss::url))
{
// Must be a JSON-RPC call.
JLOG(context.j.warn()) << "doSubscribe: RPC subscribe requires a url";
JLOG(context.j.info()) << "doSubscribe: RPC subscribe requires a url";
return rpcError(rpcINVALID_PARAMS);
}
@@ -374,13 +373,6 @@ doSubscribe(RPC::JsonContext& context)
}
}
if (ispSub)
{
if (std::shared_ptr<UDPInfoSub> udp =
std::dynamic_pointer_cast<UDPInfoSub>(ispSub))
udp->increment();
}
return jvResult;
}

View File

@@ -25,7 +25,6 @@
#include <ripple/rpc/Context.h>
#include <ripple/rpc/Role.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/UDPInfoSub.h>
namespace ripple {
@@ -246,12 +245,6 @@ doUnsubscribe(RPC::JsonContext& context)
context.netOps.tryRemoveRpcSub(context.params[jss::url].asString());
}
if (ispSub)
{
if (auto udp = std::dynamic_pointer_cast<UDPInfoSub>(ispSub))
udp->destroy();
}
return jvResult;
}

View File

@@ -361,67 +361,6 @@ ServerHandlerImp::onWSMessage(
}
}
void
ServerHandlerImp::onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
std::function<void(std::string const&)> sendResponse)
{
Json::Value jv;
if (message.size() > RPC::Tuning::maxRequestSize ||
!Json::Reader{}.parse(message, jv) || !jv.isObject())
{
Json::Value jvResult(Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid";
jvResult[jss::value] = message;
std::string const response = to_string(jvResult);
JLOG(m_journal.trace())
<< "UDP sending error response: '" << jvResult << "'";
sendResponse(response);
return;
}
JLOG(m_journal.trace())
<< "UDP received '" << jv << "' from " << remoteEndpoint;
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, // Using RPC job type since this is admin RPC
"UDP-RPC",
[this,
remoteEndpoint,
jv = std::move(jv),
sendResponse = std::move(sendResponse)](
std::shared_ptr<JobQueue::Coro> const& coro) {
// Process the request similar to WebSocket but with UDP context
Role const role = Role::ADMIN; // UDP-RPC is admin-only
auto const jr =
this->processUDP(jv, role, coro, sendResponse, remoteEndpoint);
std::string const response = to_string(jr);
JLOG(m_journal.trace())
<< "UDP sending '" << jr << "' to " << remoteEndpoint;
// Send response back via UDP
sendResponse(response);
});
if (postResult == nullptr)
{
// Request rejected, probably shutting down
Json::Value jvResult(Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "serverShuttingDown";
jvResult[jss::value] = "Server is shutting down";
std::string const response = to_string(jvResult);
JLOG(m_journal.trace())
<< "UDP sending shutdown response to " << remoteEndpoint;
sendResponse(response);
}
}
void
ServerHandlerImp::onClose(Session& session, boost::system::error_code const&)
{
@@ -458,145 +397,6 @@ logDuration(
<< " microseconds. request = " << request;
}
Json::Value
ServerHandlerImp::processUDP(
Json::Value const& jv,
Role const& role,
std::shared_ptr<JobQueue::Coro> const& coro,
std::optional<std::function<void(std::string const&)>>
sendResponse /* used for subscriptions */,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::shared_ptr<InfoSub> is;
// Requests without "command" are invalid.
Json::Value jr(Json::objectValue);
try
{
auto apiVersion =
RPC::getAPIVersionNumber(jv, app_.config().BETA_RPC_API);
if (apiVersion == RPC::apiInvalidVersion ||
(!jv.isMember(jss::command) && !jv.isMember(jss::method)) ||
(jv.isMember(jss::command) && !jv[jss::command].isString()) ||
(jv.isMember(jss::method) && !jv[jss::method].isString()) ||
(jv.isMember(jss::command) && jv.isMember(jss::method) &&
jv[jss::command].asString() != jv[jss::method].asString()))
{
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = apiVersion == RPC::apiInvalidVersion
? jss::invalid_API_version
: jss::missingCommand;
jr[jss::request] = jv;
if (jv.isMember(jss::id))
jr[jss::id] = jv[jss::id];
if (jv.isMember(jss::jsonrpc))
jr[jss::jsonrpc] = jv[jss::jsonrpc];
if (jv.isMember(jss::ripplerpc))
jr[jss::ripplerpc] = jv[jss::ripplerpc];
if (jv.isMember(jss::api_version))
jr[jss::api_version] = jv[jss::api_version];
return jr;
}
auto required = RPC::roleRequired(
apiVersion,
app_.config().BETA_RPC_API,
jv.isMember(jss::command) ? jv[jss::command].asString()
: jv[jss::method].asString());
if (Role::FORBID == role)
{
jr[jss::result] = rpcError(rpcFORBIDDEN);
}
else
{
Resource::Consumer c;
Resource::Charge loadType = Resource::feeReferenceRPC;
if (sendResponse.has_value())
is = UDPInfoSub::getInfoSub(
m_networkOPs, *sendResponse, remoteEndpoint);
RPC::JsonContext context{
{app_.journal("RPCHandler"),
app_,
loadType,
app_.getOPs(),
app_.getLedgerMaster(),
c,
role,
coro,
is,
apiVersion},
jv};
auto start = std::chrono::system_clock::now();
RPC::doCommand(context, jr[jss::result]);
auto end = std::chrono::system_clock::now();
logDuration(jv, end - start, m_journal);
}
}
catch (std::exception const& ex)
{
jr[jss::result] = RPC::make_error(rpcINTERNAL);
JLOG(m_journal.error())
<< "Exception while processing WS: " << ex.what() << "\n"
<< "Input JSON: " << Json::Compact{Json::Value{jv}};
}
if (is)
{
if (auto udp = std::dynamic_pointer_cast<UDPInfoSub>(is))
udp->destroy();
}
// Currently we will simply unwrap errors returned by the RPC
// API, in the future maybe we can make the responses
// consistent.
//
// Regularize result. This is duplicate code.
if (jr[jss::result].isMember(jss::error))
{
jr = jr[jss::result];
jr[jss::status] = jss::error;
auto rq = jv;
if (rq.isObject())
{
if (rq.isMember(jss::passphrase.c_str()))
rq[jss::passphrase.c_str()] = "<masked>";
if (rq.isMember(jss::secret.c_str()))
rq[jss::secret.c_str()] = "<masked>";
if (rq.isMember(jss::seed.c_str()))
rq[jss::seed.c_str()] = "<masked>";
if (rq.isMember(jss::seed_hex.c_str()))
rq[jss::seed_hex.c_str()] = "<masked>";
}
jr[jss::request] = rq;
}
else
{
if (jr[jss::result].isMember("forwarded") &&
jr[jss::result]["forwarded"])
jr = jr[jss::result];
jr[jss::status] = jss::success;
}
if (jv.isMember(jss::id))
jr[jss::id] = jv[jss::id];
if (jv.isMember(jss::jsonrpc))
jr[jss::jsonrpc] = jv[jss::jsonrpc];
if (jv.isMember(jss::ripplerpc))
jr[jss::ripplerpc] = jv[jss::ripplerpc];
if (jv.isMember(jss::api_version))
jr[jss::api_version] = jv[jss::api_version];
jr[jss::type] = jss::response;
return jr;
}
Json::Value
ServerHandlerImp::processSession(
std::shared_ptr<WSSession> const& session,

View File

@@ -24,7 +24,6 @@
#include <ripple/core/JobQueue.h>
#include <ripple/json/Output.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/rpc/impl/UDPInfoSub.h>
#include <ripple/rpc/impl/WSInfoSub.h>
#include <ripple/server/Server.h>
#include <ripple/server/Session.h>
@@ -165,12 +164,6 @@ public:
std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers);
void
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
std::function<void(std::string const&)> sendResponse);
void
onClose(Session& session, boost::system::error_code const&);
@@ -184,14 +177,6 @@ private:
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
Json::Value
processUDP(
Json::Value const& jv,
Role const& role,
std::shared_ptr<JobQueue::Coro> const& coro,
std::optional<std::function<void(std::string const&)>> sendResponse,
boost::asio::ip::tcp::endpoint const& remoteEndpoint);
void
processSession(
std::shared_ptr<Session> const&,

View File

@@ -1,140 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_RPC_UDPINFOSUB_H
#define RIPPLE_RPC_UDPINFOSUB_H
#include <ripple/beast/net/IPAddressConversion.h>
#include <ripple/json/json_writer.h>
#include <ripple/json/to_string.h>
#include <ripple/net/InfoSub.h>
#include <ripple/rpc/Role.h>
#include <ripple/server/WSSession.h>
#include <boost/utility/string_view.hpp>
#include <memory>
#include <string>
namespace ripple {
class UDPInfoSub : public InfoSub
{
std::function<void(std::string const&)> send_;
boost::asio::ip::tcp::endpoint endpoint_;
UDPInfoSub(
Source& source,
std::function<void(std::string const&)>& sendResponse,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
: InfoSub(source), send_(sendResponse), endpoint_(remoteEndpoint)
{
}
struct RefCountedSub
{
std::shared_ptr<UDPInfoSub> sub;
size_t refCount;
RefCountedSub(std::shared_ptr<UDPInfoSub> s)
: sub(std::move(s)), refCount(1)
{
}
};
static inline std::mutex mtx_;
static inline std::map<boost::asio::ip::tcp::endpoint, RefCountedSub> map_;
public:
static std::shared_ptr<UDPInfoSub>
getInfoSub(
Source& source,
std::function<void(std::string const&)>& sendResponse,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::lock_guard<std::mutex> lock(mtx_);
auto it = map_.find(remoteEndpoint);
if (it != map_.end())
{
it->second.refCount++;
return it->second.sub;
}
auto sub = std::shared_ptr<UDPInfoSub>(
new UDPInfoSub(source, sendResponse, remoteEndpoint));
map_.emplace(remoteEndpoint, RefCountedSub(sub));
return sub;
}
static bool
increment(boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::lock_guard<std::mutex> lock(mtx_);
auto it = map_.find(remoteEndpoint);
if (it != map_.end())
{
it->second.refCount++;
return true;
}
return false;
}
bool
increment()
{
return increment(endpoint_);
}
static bool
destroy(boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::lock_guard<std::mutex> lock(mtx_);
auto it = map_.find(remoteEndpoint);
if (it != map_.end())
{
if (--it->second.refCount == 0)
{
map_.erase(it);
return true;
}
}
return false;
}
bool
destroy()
{
return destroy(endpoint_);
}
void
send(Json::Value const& jv, bool) override
{
std::string const str = to_string(jv);
send_(str);
}
boost::asio::ip::tcp::endpoint const&
endpoint() const
{
return endpoint_;
}
};
} // namespace ripple
#endif

View File

@@ -86,15 +86,6 @@ struct Port
// Returns a string containing the list of protocols
std::string
protocols() const;
bool
has_udp() const
{
return protocol.count("udp") > 0;
}
// Maximum UDP packet size (default 64KB)
std::size_t udp_packet_size = 65536;
};
std::ostream&

View File

@@ -244,13 +244,6 @@ parse_Port(ParsedPort& port, Section const& section, std::ostream& log)
optResult->begin(), optResult->end()))
port.protocol.insert(s);
}
if (port.protocol.count("udp") > 0 && port.protocol.size() > 1)
{
log << "Port " << section.name()
<< " cannot mix UDP with other protocols";
Throw<std::exception>();
}
}
{

View File

@@ -24,7 +24,6 @@
#include <ripple/beast/core/List.h>
#include <ripple/server/Server.h>
#include <ripple/server/impl/Door.h>
#include <ripple/server/impl/UDPDoor.h>
#include <ripple/server/impl/io_list.h>
#include <boost/asio.hpp>
#include <array>
@@ -163,35 +162,18 @@ ServerImpl<Handler>::ports(std::vector<Port> const& ports)
{
if (closed())
Throw<std::logic_error>("ports() on closed Server");
ports_.reserve(ports.size());
Endpoints eps;
eps.reserve(ports.size());
for (auto const& port : ports)
{
ports_.push_back(port);
if (port.has_udp())
if (auto sp = ios_.emplace<Door<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
// UDP-RPC door
if (auto sp = ios_.emplace<UDPDoor<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
eps.push_back(sp->get_endpoint());
sp->run();
}
}
else
{
// Standard TCP door
if (auto sp = ios_.emplace<Door<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
list_.push_back(sp);
eps.push_back(sp->get_endpoint());
sp->run();
}
list_.push_back(sp);
eps.push_back(sp->get_endpoint());
sp->run();
}
}
return eps;

View File

@@ -1,284 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_UDPDOOR_H_INCLUDED
#define RIPPLE_SERVER_UDPDOOR_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/basics/contract.h>
#include <ripple/server/impl/PlainHTTPPeer.h>
#include <ripple/server/impl/SSLHTTPPeer.h>
#include <ripple/server/impl/io_list.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast/core/detect_ssl.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/container/flat_map.hpp>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
namespace ripple {
template <class Handler>
class UDPDoor : public io_list::work,
public std::enable_shared_from_this<UDPDoor<Handler>>
{
private:
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using udp_socket = boost::asio::ip::udp::socket;
beast::Journal const j_;
Port const& port_;
Handler& handler_;
boost::asio::io_context& ioc_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
udp_socket socket_;
std::vector<char> recv_buffer_;
endpoint_type local_endpoint_; // Store TCP-style endpoint
public:
UDPDoor(
Handler& handler,
boost::asio::io_context& io_context,
Port const& port,
beast::Journal j)
: j_(j)
, port_(port)
, handler_(handler)
, ioc_(io_context)
, strand_(io_context.get_executor())
, socket_(io_context)
, recv_buffer_(port.udp_packet_size)
, local_endpoint_(port.ip, port.port) // Store as TCP endpoint
{
error_code ec;
// Create UDP endpoint from port configuration
auto const addr = port_.ip.to_v4();
boost::asio::ip::udp::endpoint udp_endpoint(addr, port_.port);
socket_.open(boost::asio::ip::udp::v4(), ec);
if (ec)
{
JLOG(j_.error()) << "UDP socket open failed: " << ec.message();
return;
}
// Set socket options
socket_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec)
{
JLOG(j_.error())
<< "UDP set reuse_address failed: " << ec.message();
return;
}
socket_.bind(udp_endpoint, ec);
if (ec)
{
JLOG(j_.error()) << "UDP socket bind failed: " << ec.message();
return;
}
JLOG(j_.info()) << "UDP-RPC listening on " << udp_endpoint;
}
endpoint_type
get_endpoint() const
{
return local_endpoint_;
}
void
run()
{
if (!socket_.is_open())
return;
do_receive();
}
void
close() override
{
error_code ec;
socket_.close(ec);
}
private:
void
do_receive()
{
if (!socket_.is_open())
return;
socket_.async_receive_from(
boost::asio::buffer(recv_buffer_),
sender_endpoint_,
boost::asio::bind_executor(
strand_,
std::bind(
&UDPDoor::on_receive,
this->shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
void
on_receive(error_code ec, std::size_t bytes_transferred)
{
if (ec)
{
if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error()) << "UDP receive failed: " << ec.message();
do_receive();
}
return;
}
// Convert UDP endpoint to TCP endpoint for compatibility
endpoint_type tcp_endpoint(
sender_endpoint_.address(), sender_endpoint_.port());
// Handle the received UDP message
handler_.onUDPMessage(
std::string(recv_buffer_.data(), bytes_transferred),
tcp_endpoint,
[this, tcp_endpoint](std::string const& response) {
do_send(response, tcp_endpoint);
});
do_receive();
}
void
do_send(std::string const& response, endpoint_type const& tcp_endpoint)
{
if (!socket_.is_open())
{
std::cout << "UDP SOCKET NOT OPEN WHEN SENDING\n\n";
return;
}
const size_t HEADER_SIZE = 16;
const size_t MAX_DATAGRAM_SIZE =
65487; // Allow for ipv6 header 40 bytes + 8 bytes of udp header
const size_t MAX_PAYLOAD_SIZE = MAX_DATAGRAM_SIZE - HEADER_SIZE;
// Convert TCP endpoint back to UDP for sending
boost::asio::ip::udp::endpoint udp_endpoint(
tcp_endpoint.address(), tcp_endpoint.port());
// If message fits in single datagram, send normally
if (response.length() <= MAX_DATAGRAM_SIZE)
{
socket_.async_send_to(
boost::asio::buffer(response),
udp_endpoint,
boost::asio::bind_executor(
strand_,
[this, self = this->shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error())
<< "UDP send failed: " << ec.message();
}
}));
return;
}
// Calculate number of packets needed
const size_t payload_size = MAX_PAYLOAD_SIZE;
const uint16_t total_packets =
(response.length() + payload_size - 1) / payload_size;
// Get current timestamp in microseconds
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch())
.count();
uint64_t timestamp = static_cast<uint64_t>(micros);
// Send fragmented packets
for (uint16_t packet_num = 0; packet_num < total_packets; packet_num++)
{
std::string fragment;
fragment.reserve(MAX_DATAGRAM_SIZE);
// Add header - 4 bytes of zeros
fragment.push_back(0);
fragment.push_back(0);
fragment.push_back(0);
fragment.push_back(0);
// Add packet number (little endian)
fragment.push_back(packet_num & 0xFF);
fragment.push_back((packet_num >> 8) & 0xFF);
// Add total packets (little endian)
fragment.push_back(total_packets & 0xFF);
fragment.push_back((total_packets >> 8) & 0xFF);
// Add timestamp (8 bytes, little endian)
fragment.push_back(timestamp & 0xFF);
fragment.push_back((timestamp >> 8) & 0xFF);
fragment.push_back((timestamp >> 16) & 0xFF);
fragment.push_back((timestamp >> 24) & 0xFF);
fragment.push_back((timestamp >> 32) & 0xFF);
fragment.push_back((timestamp >> 40) & 0xFF);
fragment.push_back((timestamp >> 48) & 0xFF);
fragment.push_back((timestamp >> 56) & 0xFF);
// Calculate payload slice
size_t start = packet_num * payload_size;
size_t length = std::min(payload_size, response.length() - start);
fragment.append(response.substr(start, length));
socket_.async_send_to(
boost::asio::buffer(fragment),
udp_endpoint,
boost::asio::bind_executor(
strand_,
[this, self = this->shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error())
<< "UDP send failed: " << ec.message();
}
}));
}
}
boost::asio::ip::udp::endpoint sender_endpoint_;
};
} // namespace ripple
#endif

View File

@@ -144,14 +144,6 @@ public:
{
}
void
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
std::function<void(std::string const&)> sendResponse)
{
}
void
onClose(Session& session, boost::system::error_code const&)
{
@@ -357,14 +349,6 @@ public:
{
}
void
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
std::function<void(std::string const&)> sendResponse)
{
}
void
onClose(Session& session, boost::system::error_code const&)
{