Compare commits

..

9 Commits

Author SHA1 Message Date
Nicholas Dudfield
7241fed6d0 refactor: remove unused io_service parameter from RPCSub
sendThread() now uses a local io_service per batch, so the app's
io_service passed via make_RPCSub is dead code. Removes it from the
header, constructor, factory function, and sole call site.
2026-02-10 13:00:28 +07:00
Nicholas Dudfield
a6dfb40413 chore: update levelization ordering for HTTPClient_test
Add test.net > ripple.basics dependency introduced by the new test file.
2026-02-10 08:23:59 +07:00
Nicholas Dudfield
3e0d6b9cd2 fix: increment mSeq on queue drop and fix flaky connection-refused test
- Advance mSeq when dropping events so consumers can detect gaps via
  sequence numbers, and log the dropped seq
- Use ephemeral port (bind + close) instead of hardcoded 19999 for the
  connection-refused test to avoid false negatives on busy machines
2026-02-10 07:52:02 +07:00
Nicholas Dudfield
a8388e48a4 fix: call invokeComplete on EOF in HTTPClient handleData
When an HTTP response has no Content-Length header, HTTPClient reads
until EOF. The EOF path in handleData logged "Complete." but never
called invokeComplete(), leaving the socket held open for the full
30s deadline timeout and the completion callback never invoked.

This is the likely root cause of webhook delivery permanently stalling
after repeated 500 errors — many web frameworks omit Content-Length on
error responses, triggering this path. Each leaked socket holds an FD
for 30s, eventually exhausting the process FD budget.

Includes HTTPClient_test with 12 test cases covering resource cleanup
across success, error, timeout, connection-refused, concurrent request,
and EOF-without-Content-Length scenarios.
2026-02-10 07:33:31 +07:00
Nicholas Dudfield
49908096d5 fix: bound subscription webhook delivery concurrency and queue size
fromNetwork() is async — it posts handlers to the io_service and
returns immediately. The original sendThread() loop fires all queued
events as concurrent HTTP connections at once. Under sustained load
with a slow/failing endpoint, connections accumulate (each held up to
30s by RPC_NOTIFY timeout), exhausting file descriptors and breaking
all network I/O for the entire process.

Fix: use a local io_service per batch with .run() to block until the
batch completes (same pattern as rpcClient() in RPCCall.cpp). This
bounds concurrent connections to maxInFlight (32) per subscriber while
still allowing parallel delivery.

Also add a queue cap (maxQueueSize = 16384, ~80-160MB) so a hopelessly
behind endpoint doesn't grow the deque indefinitely. Consumers detect
gaps via the existing seq field.

Ref: XRPLF/rippled#6341
2026-02-09 18:31:25 +07:00
tequ
12e1afb694 Enhance dependency export process in GitHub Action to check for existing exports before executing. (#660) 2026-01-28 13:14:40 +10:00
tequ
c355ad9971 update mise-action to use cmake as aqua:Kitware/CMake (#671) 2026-01-27 19:30:50 +10:00
Niq Dudfield
a8d7b2619e fix: restore [ips_fixed] to use addFixedPeer instead of addFallbackStrings (#641) 2026-01-05 13:46:02 +10:00
Niq Dudfield
775fb3a8b2 fix: increment manifest sequence for client code cache invalidation (#631) 2025-12-24 11:16:00 +10:00
38 changed files with 961 additions and 754 deletions

View File

@@ -134,10 +134,17 @@ runs:
- name: Export custom recipes
shell: bash
run: |
conan export external/snappy --version 1.1.10 --user xahaud --channel stable
conan export external/soci --version 4.0.3 --user xahaud --channel stable
conan export external/wasmedge --version 0.11.2 --user xahaud --channel stable
# Export snappy if not already exported
conan list snappy/1.1.10@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
conan export external/snappy --version 1.1.10 --user xahaud --channel stable
# Export soci if not already exported
conan list soci/4.0.3@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
conan export external/soci --version 4.0.3 --user xahaud --channel stable
# Export wasmedge if not already exported
conan list wasmedge/0.11.2@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
conan export external/wasmedge --version 0.11.2 --user xahaud --channel stable
- name: Install dependencies
shell: bash
env:

View File

@@ -43,14 +43,22 @@ jobs:
# To isolate environments for each Runner, instead of installing globally with brew,
# use mise to isolate environments for each Runner directory.
- name: Setup toolchain (mise)
uses: jdx/mise-action@v2
uses: jdx/mise-action@v3.6.1
with:
cache: false
install: true
mise_toml: |
[tools]
cmake = "3.23.1"
python = "3.12"
pipx = "latest"
conan = "2"
ninja = "latest"
ccache = "latest"
- name: Install tools via mise
run: |
mise install
mise use cmake@3.23.1 python@3.12 pipx@latest conan@2 ninja@latest ccache@latest
mise reshim
echo "$HOME/.local/share/mise/shims" >> "$GITHUB_PATH"

View File

@@ -957,6 +957,7 @@ if (tests)
subdir: net
#]===============================]
src/test/net/DatabaseDownloader_test.cpp
src/test/net/HTTPClient_test.cpp
#[===============================[
test sources:
subdir: nodestore

View File

@@ -149,6 +149,7 @@ test.ledger > ripple.ledger
test.ledger > ripple.protocol
test.ledger > test.jtx
test.ledger > test.toplevel
test.net > ripple.basics
test.net > ripple.net
test.net > test.jtx
test.net > test.toplevel

View File

@@ -350,10 +350,7 @@ enum hook_return_code : int64_t {
MEM_OVERLAP = -43, // one or more specified buffers are the same memory
TOO_MANY_STATE_MODIFICATIONS = -44, // more than 5000 modified state
// entires in the combined hook chains
TOO_MANY_NAMESPACES = -45,
EXPORT_FAILURE = -46,
TOO_MANY_EXPORTED_TXN = -47,
TOO_MANY_NAMESPACES = -45
};
enum ExitType : uint8_t {
@@ -367,7 +364,6 @@ const uint16_t max_state_modifications = 256;
const uint8_t max_slots = 255;
const uint8_t max_nonce = 255;
const uint8_t max_emit = 255;
const uint8_t max_export = 4;
const uint8_t max_params = 16;
const double fee_base_multiplier = 1.1f;
@@ -473,13 +469,6 @@ static const APIWhitelist import_whitelist_1{
// clang-format on
};
static const APIWhitelist import_whitelist_2{
// clang-format off
HOOK_API_DEFINITION(I64, xport, (I32, I32)),
HOOK_API_DEFINITION(I64, xport_reserve, (I32)),
// clang-format on
};
#undef HOOK_API_DEFINITION
#undef I32
#undef I64

View File

@@ -1034,12 +1034,6 @@ validateGuards(
{
// PASS, this is a version 1 api
}
else if (rulesVersion & 0x04U &&
hook_api::import_whitelist_2.find(import_name) !=
hook_api::import_whitelist_2.end())
{
// PASS, this is an export api
}
else
{
GUARDLOG(hook::log::IMPORT_ILLEGAL)

View File

@@ -406,17 +406,6 @@ DECLARE_HOOK_FUNCTION(
uint32_t slot_no_tx,
uint32_t slot_no_meta);
DECLARE_HOOK_FUNCTION(
int64_t,
xport,
uint32_t write_ptr,
uint32_t write_len,
uint32_t read_ptr,
uint32_t read_len);
DECLARE_HOOK_FUNCTION(
int64_t,
xport_reserve,
uint32_t count);
/*
DECLARE_HOOK_FUNCTION(int64_t, str_find, uint32_t hread_ptr,
uint32_t hread_len, uint32_t nread_ptr, uint32_t nread_len, uint32_t mode,
@@ -496,8 +485,6 @@ struct HookResult
std::queue<std::shared_ptr<ripple::Transaction>>
emittedTxn{}; // etx stored here until accept/rollback
std::queue<std::shared_ptr<ripple::Transaction>>
exportedTxn{};
HookStateMap& stateMap;
uint16_t changedStateCount = 0;
std::map<
@@ -554,7 +541,6 @@ struct HookContext
uint16_t ledger_nonce_counter{0};
int64_t expected_etxn_count{-1}; // make this a 64bit int so the uint32
// from the hookapi cant overflow it
int64_t expected_export_count{-1};
std::map<ripple::uint256, bool> nonce_used{};
uint32_t generation =
0; // used for caching, only generated when txn_generation is called
@@ -891,9 +877,6 @@ public:
ADD_HOOK_FUNCTION(meta_slot, ctx);
ADD_HOOK_FUNCTION(xpop_slot, ctx);
ADD_HOOK_FUNCTION(xport, ctx);
ADD_HOOK_FUNCTION(xport_reserve, ctx);
/*
ADD_HOOK_FUNCTION(str_find, ctx);
ADD_HOOK_FUNCTION(str_replace, ctx);

View File

@@ -79,7 +79,7 @@ main(int argc, char** argv)
close(fd);
auto result = validateGuards(hook, std::cout, "", 7);
auto result = validateGuards(hook, std::cout, "", 3);
if (!result)
{

View File

@@ -1971,8 +1971,6 @@ hook::finalizeHookResult(
// directory) if we are allowed to
std::vector<std::pair<uint256 /* txnid */, uint256 /* emit nonce */>>
emission_txnid;
std::vector<uint256 /* txnid */>
exported_txnid;
if (doEmit)
{
@@ -2028,58 +2026,6 @@ hook::finalizeHookResult(
}
}
}
DBG_PRINTF("exported txn count: %d\n", hookResult.exportedTxn.size());
for (; hookResult.exportedTxn.size() > 0; hookResult.exportedTxn.pop())
{
auto& tpTrans = hookResult.exportedTxn.front();
auto& id = tpTrans->getID();
JLOG(j.trace()) << "HookExport[" << HR_ACC() << "]: " << id;
// exported txns must be marked bad by the hash router to ensure under
// no circumstances they will enter consensus on *this* chain.
applyCtx.app.getHashRouter().setFlags(id, SF_BAD);
std::shared_ptr<const ripple::STTx> ptr =
tpTrans->getSTransaction();
auto exportedId = keylet::exportedTxn(id);
auto sleExported = applyCtx.view().peek(exportedId);
if (!sleExported)
{
exported_txnid.emplace_back(id);
sleExported = std::make_shared<SLE>(exportedId);
// RH TODO: add a new constructor to STObject to avoid this
// serder thing
ripple::Serializer s;
ptr->add(s);
SerialIter sit(s.slice());
sleExported->emplace_back(ripple::STObject(sit, sfExportedTxn));
auto page = applyCtx.view().dirInsert(
keylet::exportedDir(), exportedId, [&](SLE::ref sle) {
(*sle)[sfFlags] = lsfEmittedDir;
});
if (page)
{
(*sleExported)[sfOwnerNode] = *page;
applyCtx.view().insert(sleExported);
}
else
{
JLOG(j.warn())
<< "HookError[" << HR_ACC() << "]: "
<< "Export Directory full when trying to insert "
<< id;
return tecDIR_FULL;
}
}
}
}
bool const fixV2 = applyCtx.view().rules().enabled(fixXahauV2);
@@ -2106,12 +2052,6 @@ hook::finalizeHookResult(
meta.setFieldU16(
sfHookEmitCount,
emission_txnid.size()); // this will never wrap, hard limit
if (applyCtx.view().rules().enabled(featureExport))
{
meta.setFieldU16(
sfHookExportCount,
exported_txnid.size());
}
meta.setFieldU16(sfHookExecutionIndex, exec_index);
meta.setFieldU16(sfHookStateChangeCount, hookResult.changedStateCount);
meta.setFieldH256(sfHookHash, hookResult.hookHash);
@@ -3948,27 +3888,6 @@ DEFINE_HOOK_FUNCTION(int64_t, etxn_reserve, uint32_t count)
HOOK_TEARDOWN();
}
DEFINE_HOOK_FUNCTION(int64_t, xport_reserve, uint32_t count)
{
HOOK_SETUP(); // populates memory_ctx, memory, memory_length, applyCtx,
// hookCtx on current stack
if (hookCtx.expected_export_count > -1)
return ALREADY_SET;
if (count < 1)
return TOO_SMALL;
if (count > hook_api::max_export)
return TOO_BIG;
hookCtx.expected_export_count = count;
return count;
HOOK_TEARDOWN();
}
// Compute the burden of an emitted transaction based on a number of factors
DEFINE_HOOK_FUNCNARG(int64_t, etxn_burden)
{
@@ -6237,92 +6156,6 @@ DEFINE_HOOK_FUNCTION(
HOOK_TEARDOWN();
}
DEFINE_HOOK_FUNCTION(
int64_t,
xport,
uint32_t write_ptr,
uint32_t write_len,
uint32_t read_ptr,
uint32_t read_len)
{
HOOK_SETUP();
if (NOT_IN_BOUNDS(read_ptr, read_len, memory_length))
return OUT_OF_BOUNDS;
if (NOT_IN_BOUNDS(write_ptr, write_len, memory_length))
return OUT_OF_BOUNDS;
if (write_len < 32)
return TOO_SMALL;
auto& app = hookCtx.applyCtx.app;
if (hookCtx.expected_export_count < 0)
return PREREQUISITE_NOT_MET;
if (hookCtx.result.exportedTxn.size() >= hookCtx.expected_export_count)
return TOO_MANY_EXPORTED_TXN;
ripple::Blob blob{memory + read_ptr, memory + read_ptr + read_len};
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(
SerialIter{memory + read_ptr, read_len});
}
catch (std::exception& e)
{
JLOG(j.trace()) << "HookExport[" << HC_ACC() << "]: Failed " << e.what()
<< "\n";
return EXPORT_FAILURE;
}
if (!stpTrans->isFieldPresent(sfAccount) ||
stpTrans->getAccountID(sfAccount) != hookCtx.result.account)
{
JLOG(j.trace()) << "HookExport[" << HC_ACC()
<< "]: Attempted to export a txn that's not for this Hook's Account ID.";
return EXPORT_FAILURE;
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, app);
// RHTODO: is this needed or wise? VVV
if (tpTrans->getStatus() != NEW)
{
JLOG(j.trace()) << "HookExport[" << HC_ACC()
<< "]: tpTrans->getStatus() != NEW";
return EXPORT_FAILURE;
}
auto const& txID = tpTrans->getID();
if (txID.size() > write_len)
return TOO_SMALL;
if (NOT_IN_BOUNDS(write_ptr, txID.size(), memory_length))
return OUT_OF_BOUNDS;
auto const write_txid = [&]() -> int64_t {
WRITE_WASM_MEMORY_AND_RETURN(
write_ptr,
txID.size(),
txID.data(),
txID.size(),
memory,
memory_length);
};
int64_t result = write_txid();
if (result == 32)
hookCtx.result.exportedTxn.push(tpTrans);
return result;
HOOK_TEARDOWN();
}
/*
DEFINE_HOOK_FUNCTION(

View File

@@ -599,13 +599,6 @@ public:
return validatorKeys_.publicKey;
}
ValidatorKeys const&
getValidatorKeys() const override
{
return validatorKeys_;
}
NetworkOPs&
getOPs() override
{

View File

@@ -240,8 +240,7 @@ public:
virtual PublicKey const&
getValidationPublicKey() const = 0;
virtual ValidatorKeys const&
getValidatorKeys() const = 0;
virtual Resource::Manager&
getResourceManager() = 0;
virtual PathRequests&

View File

@@ -471,6 +471,10 @@ ManifestCache::applyManifest(Manifest m)
auto masterKey = m.masterKey;
map_.emplace(std::move(masterKey), std::move(m));
// Increment sequence to invalidate cached manifest messages
seq_++;
return ManifestDisposition::accepted;
}

View File

@@ -27,8 +27,6 @@
#include <ripple/protocol/Feature.h>
#include <ripple/protocol/jss.h>
#include <ripple/protocol/st.h>
#include <ripple/app/misc/ValidatorKeys.h>
#include <ripple/protocol/Sign.h>
#include <algorithm>
#include <limits>
#include <numeric>
@@ -1541,247 +1539,6 @@ TxQ::accept(Application& app, OpenView& view)
}
}
// Inject exported transactions/signatures, if any
if (view.rules().enabled(featureExport))
{
do
{
// if we're not a validator we do nothing here
if (app.getValidationPublicKey().empty())
break;
auto const& keys = app.getValidatorKeys();
if (keys.configInvalid())
break;
// and if we're not on the UNLReport we also do nothing
auto const unlRep = view.read(keylet::UNLReport());
if (!unlRep || !unlRep->isFieldPresent(sfActiveValidators))
{
// nothing to do without a unlreport object
break;
}
bool found = false;
auto const& avs = unlRep->getFieldArray(sfActiveValidators);
for (auto const& av : avs)
{
if (PublicKey(av[sfPublicKey]) == keys.masterPublicKey)
{
found = true;
break;
}
}
if (!found)
break;
// execution to here means we're a validator and on the UNLReport
AccountID signingAcc = calcAccountID(keys.publicKey);
Keylet const exportedDirKeylet{keylet::exportedDir()};
if (dirIsEmpty(view, exportedDirKeylet))
break;
std::shared_ptr<SLE const> sleDirNode{};
unsigned int uDirEntry{0};
uint256 dirEntry{beast::zero};
if (!cdirFirst(
view,
exportedDirKeylet.key,
sleDirNode,
uDirEntry,
dirEntry))
break;
do
{
Keylet const itemKeylet{ltCHILD, dirEntry};
auto sleItem = view.read(itemKeylet);
if (!sleItem)
{
// Directory node has an invalid index. Bail out.
JLOG(j_.warn())
<< "ExportedTxn processing: directory node in ledger "
<< view.seq()
<< " has index to object that is missing: "
<< to_string(dirEntry);
// RH TODO: if this ever happens the entry should be
// gracefully removed (somehow)
continue;
}
LedgerEntryType const nodeType{
safe_cast<LedgerEntryType>((*sleItem)[sfLedgerEntryType])};
if (nodeType != ltEXPORTED_TXN)
{
JLOG(j_.warn())
<< "ExportedTxn processing: emitted directory contained "
"non ltEMITTED_TXN type";
// RH TODO: if this ever happens the entry should be
// gracefully removed (somehow)
continue;
}
JLOG(j_.info()) << "Processing exported txn: " << *sleItem;
auto const& exported =
const_cast<ripple::STLedgerEntry&>(*sleItem)
.getField(sfExportedTxn)
.downcast<STObject>();
auto const& txnHash = sleItem->getFieldH256(sfTransactionHash);
auto exportedLgrSeq = exported.getFieldU32(sfLedgerSequence);
auto const seq = view.seq();
if (exportedLgrSeq == seq)
{
// this shouldn't happen, but do nothing
continue;
}
if (exportedLgrSeq < seq - 1)
{
// all old entries need to be turned into Export transactions so they can be removed
// from the directory
// in the previous ledger all the ExportSign transactions were executed, and one-by-one
// added the validators' signatures to the ltEXPORTED_TXN's sfSigners array.
// now we need to collect these together and place them inside the ExportedTxn blob
// and publish the blob in the Export transaction type.
STArray signers = sleItem->getFieldArray(sfSigners);
auto s = std::make_shared<ripple::Serializer>();
exported.add(*s);
SerialIter sitTrans(s->slice());
try
{
auto stpTrans =
std::make_shared<STTx>(std::ref(sitTrans));
if (!stpTrans->isFieldPresent(sfAccount) ||
stpTrans->getAccountID(sfAccount) == beast::zero)
{
JLOG(j_.warn()) << "Hook: Export failure: "
<< "sfAccount missing or zero.";
// RH TODO: if this ever happens the entry should be
// gracefully removed (somehow)
continue;
}
// RH TODO: should we force remove signingpubkey here?
stpTrans->setFieldArray(sfSigners, signers);
Blob const& blob = stpTrans->getSerializer().peekData();
STTx exportTx(ttEXPORT, [&](auto& obj) {
obj.setFieldVL(sfExportedTxn, blob);
obj.setFieldU32(sfLedgerSequence, seq);
obj.setFieldH256(sfTransactionHash, txnHash);
obj.setFieldArray(sfSigners, signers);
});
// submit to the ledger
{
uint256 txID = exportTx.getTransactionID();
auto s = std::make_shared<ripple::Serializer>();
exportTx.add(*s);
app.getHashRouter().setFlags(txID, SF_PRIVATE2);
app.getHashRouter().setFlags(txID, SF_EMITTED);
view.rawTxInsert(txID, std::move(s), nullptr);
ledgerChanged = true;
}
}
catch (std::exception& e)
{
JLOG(j_.warn())
<< "ExportedTxn Processing: Failure: " << e.what()
<< "\n";
}
continue;
}
// this ledger is the one after the exported txn was added to the directory
// so generate the export sign txns
auto s = std::make_shared<ripple::Serializer>();
exported.add(*s);
SerialIter sitTrans(s->slice());
try
{
auto const& stpTrans =
std::make_shared<STTx const>(std::ref(sitTrans));
if (!stpTrans->isFieldPresent(sfAccount) ||
stpTrans->getAccountID(sfAccount) == beast::zero)
{
JLOG(j_.warn()) << "Hook: Export failure: "
<< "sfAccount missing or zero.";
// RH TODO: if this ever happens the entry should be
// gracefully removed (somehow)
continue;
}
auto seq = view.info().seq;
auto txnHash = stpTrans->getTransactionID();
Serializer s =
buildMultiSigningData(*stpTrans, signingAcc);
auto multisig = ripple::sign(keys.publicKey, keys.secretKey, s.slice());
STTx exportSignTx(ttEXPORT_SIGN, [&](auto& obj) {
obj.set(([&]() {
auto inner = std::make_unique<STObject>(sfSigner);
inner->setFieldVL(sfSigningPubKey, keys.publicKey);
inner->setAccountID(sfAccount, signingAcc);
inner->setFieldVL(sfTxnSignature, multisig);
return inner;
})());
obj.setFieldU32(sfLedgerSequence, seq);
obj.setFieldH256(sfTransactionHash, txnHash);
});
// submit to the ledger
{
uint256 txID = exportSignTx.getTransactionID();
auto s = std::make_shared<ripple::Serializer>();
exportSignTx.add(*s);
app.getHashRouter().setFlags(txID, SF_PRIVATE2);
app.getHashRouter().setFlags(txID, SF_EMITTED);
view.rawTxInsert(txID, std::move(s), nullptr);
ledgerChanged = true;
}
}
catch (std::exception& e)
{
JLOG(j_.warn())
<< "ExportedTxn Processing: Failure: " << e.what()
<< "\n";
}
} while (cdirNext(
view, exportedDirKeylet.key, sleDirNode, uDirEntry, dirEntry));
} while (0);
}
// Inject emitted transactions if any
if (view.rules().enabled(featureHooks))
do

View File

@@ -96,13 +96,6 @@ Change::preflight(PreflightContext const& ctx)
}
}
if ((ctx.tx.getTxnType() == ttEXPORT_SIGN || ctx.tx.getTxnType() == ttEXPORT) &&
!ctx.rules.enabled(featureExport))
{
JLOG(ctx.j.warn()) << "Change: Export not enabled";
return temDISABLED;
}
return tesSUCCESS;
}
@@ -161,8 +154,6 @@ Change::preclaim(PreclaimContext const& ctx)
case ttAMENDMENT:
case ttUNL_MODIFY:
case ttEMIT_FAILURE:
case ttEXPORT:
case ttEXPORT_SIGN:
return tesSUCCESS;
case ttUNL_REPORT: {
if (!ctx.tx.isFieldPresent(sfImportVLKey) ||
@@ -218,11 +209,6 @@ Change::doApply()
return applyEmitFailure();
case ttUNL_REPORT:
return applyUNLReport();
case ttEXPORT:
return applyExport();
case ttEXPORT_SIGN:
return applyExportSign();
default:
assert(0);
return tefFAILURE;
@@ -620,8 +606,7 @@ Change::activateXahauGenesis()
loggerStream,
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh",
(ctx_.view().rules().enabled(featureHooksUpdate1) ? 1 : 0) +
(ctx_.view().rules().enabled(fix20250131) ? 2 : 0) +
(ctx_.view().rules().enabled(featureExport) ? 4 : 0));
(ctx_.view().rules().enabled(fix20250131) ? 2 : 0));
if (!result)
{
@@ -1087,80 +1072,6 @@ Change::applyEmitFailure()
return tesSUCCESS;
}
TER
Change::applyExport()
{
uint256 txnID(ctx_.tx.getFieldH256(sfTransactionHash));
do
{
JLOG(j_.info()) << "HookExport[" << txnID
<< "]: ttExport exporting transaction";
auto key = keylet::exportedTxn(txnID);
auto const& sle = view().peek(key);
if (!sle)
{
// most likely explanation is that this was somehow a double-up, so just ignore
JLOG(j_.warn())
<< "HookError[" << txnID << "]: ttExport could not find exported txn in ledger";
break;
}
if (!view().dirRemove(
keylet::exportedDir(),
sle->getFieldU64(sfOwnerNode),
key,
false))
{
JLOG(j_.fatal()) << "HookError[" << txnID
<< "]: ttExport (Change) tefBAD_LEDGER";
return tefBAD_LEDGER;
}
view().erase(sle);
} while (0);
return tesSUCCESS;
}
TER
Change::applyExportSign()
{
uint256 txnID(ctx_.tx.getFieldH256(sfTransactionHash));
do
{
JLOG(j_.info()) << "HookExport[" << txnID
<< "]: ttExportSign adding signature to transaction";
auto key = keylet::exportedTxn(txnID);
auto const& sle = view().peek(key);
if (!sle)
{
// most likely explanation is that this was somehow a double-up, so just ignore
JLOG(j_.warn())
<< "HookError[" << txnID << "]: ttExportSign could not find exported txn in ledger";
break;
}
// grab the signer object off the txn
STObject signerObj = const_cast<ripple::STTx&>(ctx_.tx)
.getField(sfSigner)
.downcast<STObject>();
// append it to the signers field in the ledger object
STArray signers = sle->getFieldArray(sfSigners);
signers.push_back(signerObj);
sle->setFieldArray(sfSigners, signers);
// done
view().update(sle);
} while (0);
return tesSUCCESS;
}
TER
Change::applyUNLModify()
{

View File

@@ -74,12 +74,6 @@ private:
TER
applyEmitFailure();
TER
applyExport();
TER
applyExportSign();
TER
applyUNLReport();
};

View File

@@ -37,12 +37,9 @@
#include <charconv>
#include <iostream>
#include <vector>
#include <ripple/app/hook/applyHook.h>
namespace ripple {
static const uint256 shadowTicketNamespace = uint256::fromVoid("RESERVED NAMESPACE SHADOW TICKET");
TxConsequences
Import::makeTxConsequences(PreflightContext const& ctx)
{
@@ -200,7 +197,7 @@ Import::preflight(PreflightContext const& ctx)
if (!stpTrans || !meta)
return temMALFORMED;
if (stpTrans->isFieldPresent(sfTicketSequence) && !ctx.rules.enabled(featureExport))
if (stpTrans->isFieldPresent(sfTicketSequence))
{
JLOG(ctx.j.warn()) << "Import: cannot use TicketSequence XPOP.";
return temMALFORMED;
@@ -891,26 +888,6 @@ Import::preclaim(PreclaimContext const& ctx)
return tefINTERNAL;
}
bool const hasTicket = stpTrans->isFieldPresent(sfTicketSequence);
if (hasTicket)
{
if (!ctx.view.rules().enabled(featureExport))
return tefINTERNAL;
auto const acc = stpTrans->getAccountID(sfAccount);
uint256 const seq = uint256(stpTrans->getFieldU32(sfTicketSequence));
// check if there is a shadow ticket, and if not we won't allow
// the txn to pass into consensus
if (!ctx.view.exists(keylet::hookState(acc, seq, shadowTicketNamespace)))
{
JLOG(ctx.j.warn()) << "Import: attempted to import a txn without shadow ticket.";
return telSHADOW_TICKET_REQUIRED; // tel code to avoid consensus/forward without SF_BAD
}
}
auto const& sle = ctx.view.read(keylet::account(ctx.tx[sfAccount]));
auto const tt = stpTrans->getTxnType();
@@ -951,17 +928,13 @@ Import::preclaim(PreclaimContext const& ctx)
} while (0);
}
if (!hasTicket)
if (sle && sle->isFieldPresent(sfImportSequence))
{
uint32_t sleImportSequence = sle->getFieldU32(sfImportSequence);
if (sle && sle->isFieldPresent(sfImportSequence))
{
uint32_t sleImportSequence = sle->getFieldU32(sfImportSequence);
// replay attempt
if (sleImportSequence >= stpTrans->getFieldU32(sfSequence))
return tefPAST_IMPORT_SEQ;
}
// replay attempt
if (sleImportSequence >= stpTrans->getFieldU32(sfSequence))
return tefPAST_IMPORT_SEQ;
}
// when importing for the first time the fee must be zero
@@ -1269,11 +1242,7 @@ Import::doApply()
auto const id = ctx_.tx[sfAccount];
auto sle = view().peek(keylet::account(id));
std::optional<uint256> ticket;
if (stpTrans->isFieldPresent(sfTicketSequence))
ticket = uint256(stpTrans->getFieldU32(sfTicketSequence));
if (sle && !ticket.has_value() && sle->getFieldU32(sfImportSequence) >= importSequence)
if (sle && sle->getFieldU32(sfImportSequence) >= importSequence)
{
// make double sure import seq hasn't passed
JLOG(ctx_.journal.warn()) << "Import: ImportSequence passed";
@@ -1366,24 +1335,8 @@ Import::doApply()
}
}
if (!ticket.has_value())
sle->setFieldU32(sfImportSequence, importSequence);
sle->setFieldU32(sfImportSequence, importSequence);
sle->setFieldAmount(sfBalance, finalBal);
if (ticket.has_value())
{
auto sleTicket = view().peek(keylet::hookState(id, *ticket, shadowTicketNamespace));
if (!sleTicket)
return tefINTERNAL;
TER result = hook::setHookState(ctx_, id, shadowTicketNamespace, *ticket, {});
if (result != tesSUCCESS)
return result;
// RHUPTO: ticketseq billing?
}
if (create)
{

View File

@@ -491,8 +491,7 @@ SetHook::validateHookSetEntry(SetHookCtx& ctx, STObject const& hookSetObj)
logger,
hsacc,
(ctx.rules.enabled(featureHooksUpdate1) ? 1 : 0) +
(ctx.rules.enabled(fix20250131) ? 2 : 0) +
(ctx.rules.enabled(featureExport) ? 4 : 0));
(ctx.rules.enabled(fix20250131) ? 2 : 0));
if (ctx.j.trace())
{

View File

@@ -374,8 +374,6 @@ invoke_calculateBaseFee(ReadView const& view, STTx const& tx)
case ttUNL_MODIFY:
case ttUNL_REPORT:
case ttEMIT_FAILURE:
case ttEXPORT_SIGN:
case ttEXPORT:
return Change::calculateBaseFee(view, tx);
case ttNFTOKEN_MINT:
return NFTokenMint::calculateBaseFee(view, tx);
@@ -546,8 +544,6 @@ invoke_apply(ApplyContext& ctx)
case ttFEE:
case ttUNL_MODIFY:
case ttUNL_REPORT:
case ttEXPORT:
case ttEXPORT_SIGN:
case ttEMIT_FAILURE: {
Change p(ctx);
return p();

View File

@@ -22,7 +22,6 @@
#include <ripple/core/JobQueue.h>
#include <ripple/net/InfoSub.h>
#include <boost/asio/io_service.hpp>
namespace ripple {
@@ -39,11 +38,9 @@ protected:
explicit RPCSub(InfoSub::Source& source);
};
// VFALCO Why is the io_service needed?
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,

View File

@@ -451,6 +451,12 @@ public:
if (mShutdown)
{
JLOG(j_.trace()) << "Complete.";
mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
invokeComplete(ecResult, mStatus, mBody + strBody);
}
else
{

View File

@@ -1805,6 +1805,7 @@ rpcClient(
}
{
//@@start blocking-request
boost::asio::io_service isService;
RPCCall::fromNetwork(
isService,
@@ -1828,6 +1829,7 @@ rpcClient(
headers);
isService.run(); // This blocks until there are no more
// outstanding async calls.
//@@end blocking-request
}
if (jvOutput.isMember("result"))
{
@@ -1946,6 +1948,7 @@ fromNetwork(
// HTTP call?
auto constexpr RPC_NOTIFY = 30s;
//@@start async-request
HTTPClient::request(
bSSL,
io_service,
@@ -1970,6 +1973,7 @@ fromNetwork(
std::placeholders::_3,
j),
j);
//@@end async-request
}
} // namespace RPCCall

View File

@@ -33,14 +33,12 @@ class RPCSubImp : public RPCSub
public:
RPCSubImp(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs)
: RPCSub(source)
, m_io_service(io_service)
, m_jobQueue(jobQueue)
, mUrl(strUrl)
, mSSL(false)
@@ -78,14 +76,14 @@ public:
{
std::lock_guard sl(mLock);
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly Dropping events just like this results in inconsistent
// data on the receiving end if (mDeque.size() >= eventQueueMax)
// {
// // Drop the previous event.
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
// mDeque.pop_back();
// }
if (mDeque.size() >= maxQueueSize)
{
JLOG(j_.warn())
<< "RPCCall::fromNetwork drop: queue full (" << mDeque.size()
<< "), seq=" << mSeq << ", endpoint=" << mIp;
++mSeq;
return;
}
auto jm = broadcast ? j_.debug() : j_.info();
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
@@ -121,48 +119,49 @@ public:
}
private:
// XXX Could probably create a bunch of send jobs in a single get of the
// lock.
// Maximum concurrent HTTP deliveries per batch. Bounds file
// descriptor usage while still allowing parallel delivery to
// capable endpoints. With a 1024 FD process limit shared across
// peers, clients, and the node store, 32 per subscriber is a
// meaningful but survivable chunk even with multiple subscribers.
static constexpr int maxInFlight = 32;
// Maximum queued events before dropping. At ~5-10KB per event
// this is ~80-160MB worst case — trivial memory-wise. The real
// purpose is detecting a hopelessly behind endpoint: at 100+
// events per ledger (every ~4s), 16384 events is ~10 minutes
// of buffer. Consumers detect gaps via the seq field.
static constexpr std::size_t maxQueueSize = 16384;
void
sendThread()
{
Json::Value jvEvent;
bool bSend;
do
{
// Local io_service per batch — cheap to create (just an
// internal event queue, no threads, no syscalls). Using a
// local io_service is what makes .run() block until exactly
// this batch completes, giving us flow control. Same
// pattern used by rpcClient() in RPCCall.cpp for CLI
// commands.
boost::asio::io_service io_service;
int dispatched = 0;
{
// Obtain the lock to manipulate the queue and change sending.
std::lock_guard sl(mLock);
if (mDeque.empty())
{
mSending = false;
bSend = false;
}
else
while (!mDeque.empty() && dispatched < maxInFlight)
{
auto const [seq, env] = mDeque.front();
mDeque.pop_front();
jvEvent = env;
Json::Value jvEvent = env;
jvEvent["seq"] = seq;
bSend = true;
}
}
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
RPCCall::fromNetwork(
m_io_service,
io_service,
mIp,
mPort,
mUsername,
@@ -173,21 +172,38 @@ private:
mSSL,
true,
logs_);
++dispatched;
}
if (dispatched == 0)
mSending = false;
}
bSend = dispatched > 0;
if (bSend)
{
try
{
JLOG(j_.info())
<< "RPCCall::fromNetwork: " << mIp << " dispatching "
<< dispatched << " events";
io_service.run();
}
catch (const std::exception& e)
{
JLOG(j_.info())
JLOG(j_.warn())
<< "RPCCall::fromNetwork exception: " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "RPCCall::fromNetwork unknown exception";
}
}
} while (bSend);
}
private:
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly enum { eventQueueMax = 32 };
boost::asio::io_service& m_io_service;
JobQueue& m_jobQueue;
std::string mUrl;
@@ -217,7 +233,6 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
@@ -226,7 +241,6 @@ make_RPCSub(
{
return std::make_shared<RPCSubImp>(
std::ref(source),
std::ref(io_service),
std::ref(jobQueue),
strUrl,
strUsername,

View File

@@ -484,44 +484,61 @@ OverlayImpl::start()
m_peerFinder->setConfig(config);
m_peerFinder->start();
auto addIps = [&](std::vector<std::string> bootstrapIps) -> void {
auto addIps = [this](std::vector<std::string> ips, bool fixed) {
beast::Journal const& j = app_.journal("Overlay");
for (auto& ip : bootstrapIps)
for (auto& ip : ips)
{
std::size_t pos = ip.find('#');
if (pos != std::string::npos)
ip.erase(pos);
JLOG(j.trace()) << "Found boostrap IP: " << ip;
JLOG(j.trace())
<< "Found " << (fixed ? "fixed" : "bootstrap") << " IP: " << ip;
}
m_resolver.resolve(
bootstrapIps,
[&](std::string const& name,
ips,
[this, fixed](
std::string const& name,
std::vector<beast::IP::Endpoint> const& addresses) {
std::vector<std::string> ips;
ips.reserve(addresses.size());
beast::Journal const& j = app_.journal("Overlay");
std::string const base("config: ");
std::vector<beast::IP::Endpoint> eps;
eps.reserve(addresses.size());
for (auto const& addr : addresses)
{
std::string addrStr = addr.port() == 0
? to_string(addr.at_port(DEFAULT_PEER_PORT))
: to_string(addr);
JLOG(j.trace()) << "Parsed boostrap IP: " << addrStr;
ips.push_back(addrStr);
auto ep = addr.port() == 0 ? addr.at_port(DEFAULT_PEER_PORT)
: addr;
JLOG(j.trace())
<< "Parsed " << (fixed ? "fixed" : "bootstrap")
<< " IP: " << ep;
eps.push_back(ep);
}
std::string const base("config: ");
if (!ips.empty())
m_peerFinder->addFallbackStrings(base + name, ips);
if (eps.empty())
return;
if (fixed)
{
m_peerFinder->addFixedPeer(base + name, eps);
}
else
{
std::vector<std::string> strs;
strs.reserve(eps.size());
for (auto const& ep : eps)
strs.push_back(to_string(ep));
m_peerFinder->addFallbackStrings(base + name, strs);
}
});
};
if (!app_.config().IPS.empty())
addIps(app_.config().IPS);
addIps(app_.config().IPS, false);
if (!app_.config().IPS_FIXED.empty())
addIps(app_.config().IPS_FIXED);
addIps(app_.config().IPS_FIXED, true);
auto const timer = std::make_shared<Timer>(*this);
std::lock_guard lock(mutex_);

View File

@@ -74,7 +74,7 @@ namespace detail {
// Feature.cpp. Because it's only used to reserve storage, and determine how
// large to make the FeatureBitset, it MAY be larger. It MUST NOT be less than
// the actual number of amendments. A LogicError on startup will verify this.
static constexpr std::size_t numFeatures = 91;
static constexpr std::size_t numFeatures = 90;
/** Amendments that this server supports and the default voting behavior.
Whether they are enabled depends on the Rules defined in the validated
@@ -378,7 +378,6 @@ extern uint256 const fixInvalidTxFlags;
extern uint256 const featureExtendedHookState;
extern uint256 const fixCronStacking;
extern uint256 const fixHookAPI20251128;
extern uint256 const featureExport;
} // namespace ripple
#endif

View File

@@ -56,15 +56,9 @@ namespace keylet {
Keylet const&
emittedDir() noexcept;
Keylet const&
exportedDir() noexcept;
Keylet
emittedTxn(uint256 const& id) noexcept;
Keylet
exportedTxn(uint256 const& id) noexcept;
Keylet
hookDefinition(uint256 const& hash) noexcept;

View File

@@ -260,8 +260,6 @@ enum LedgerEntryType : std::uint16_t
\sa keylet::emitted
*/
ltEMITTED_TXN = 'E',
ltEXPORTED_TXN = 0x4578, // Ex (exported transaction)
};
// clang-format off
@@ -320,8 +318,7 @@ enum LedgerSpecificFlags {
// ltDIR_NODE
lsfNFTokenBuyOffers = 0x00000001,
lsfNFTokenSellOffers = 0x00000002,
lsfEmittedDir = 0x00000004,
lsfExportedDir = 0x00000008,
lsfEmittedDir = 0x00000004,
// ltNFTOKEN_OFFER
lsfSellNFToken = 0x00000001,

View File

@@ -355,7 +355,6 @@ extern SF_UINT16 const sfHookEmitCount;
extern SF_UINT16 const sfHookExecutionIndex;
extern SF_UINT16 const sfHookApiVersion;
extern SF_UINT16 const sfHookStateScale;
extern SF_UINT16 const sfHookExportCount;
// 32-bit integers (common)
extern SF_UINT32 const sfNetworkID;
@@ -596,7 +595,6 @@ extern SField const sfSigner;
extern SField const sfMajority;
extern SField const sfDisabledValidator;
extern SField const sfEmittedTxn;
extern SField const sfExportedTxn;
extern SField const sfHookExecution;
extern SField const sfHookDefinition;
extern SField const sfHookParameter;

View File

@@ -67,7 +67,6 @@ enum TELcodes : TERUnderlyingType {
telNON_LOCAL_EMITTED_TXN,
telIMPORT_VL_KEY_NOT_RECOGNISED,
telCAN_NOT_QUEUE_IMPORT,
telSHADOW_TICKET_REQUIRED,
};
//------------------------------------------------------------------------------

View File

@@ -149,12 +149,6 @@ enum TxType : std::uint16_t
ttURITOKEN_CREATE_SELL_OFFER = 48,
ttURITOKEN_CANCEL_SELL_OFFER = 49,
/* A pseudo-txn containing an exported transaction plus signatures from the validators */
ttEXPORT = 90,
/* A pseudo-txn containing a validator's signature for an export transaction */
ttEXPORT_SIGN = 91,
/* A pseudo-txn alarm signal for invoking a hook, emitted by validators after alarm set conditions are met */
ttCRON = 92,

View File

@@ -484,7 +484,6 @@ REGISTER_FIX (fixInvalidTxFlags, Supported::yes, VoteBehavior::De
REGISTER_FEATURE(ExtendedHookState, Supported::yes, VoteBehavior::DefaultNo);
REGISTER_FIX (fixCronStacking, Supported::yes, VoteBehavior::DefaultYes);
REGISTER_FIX (fixHookAPI20251128, Supported::yes, VoteBehavior::DefaultYes);
REGISTER_FEATURE(Export, Supported::yes, VoteBehavior::DefaultNo);
// The following amendments are obsolete, but must remain supported
// because they could potentially get enabled.

View File

@@ -66,8 +66,6 @@ enum class LedgerNameSpace : std::uint16_t {
HOOK_DEFINITION = 'D',
EMITTED_TXN = 'E',
EMITTED_DIR = 'F',
EXPORTED_TXN = 0x4578, // Ex
EXPORTED_DIR = 0x4564, // Ed
NFTOKEN_OFFER = 'q',
NFTOKEN_BUY_OFFERS = 'h',
NFTOKEN_SELL_OFFERS = 'i',
@@ -149,14 +147,6 @@ emittedDir() noexcept
return ret;
}
Keylet const&
exportedDir() noexcept
{
static Keylet const ret{
ltDIR_NODE, indexHash(LedgerNameSpace::EXPORTED_DIR)};
return ret;
}
Keylet
hookStateDir(AccountID const& id, uint256 const& ns) noexcept
{
@@ -169,12 +159,6 @@ emittedTxn(uint256 const& id) noexcept
return {ltEMITTED_TXN, indexHash(LedgerNameSpace::EMITTED_TXN, id)};
}
Keylet
exportedTxn(uint256 const& id) noexcept
{
return {ltEXPORTED_TXN, indexHash(LedgerNameSpace::EXPORTED_TXN, id)};
}
Keylet
hook(AccountID const& id) noexcept
{

View File

@@ -380,15 +380,6 @@ LedgerFormats::LedgerFormats()
{sfPreviousTxnLgrSeq, soeREQUIRED}
},
commonFields);
add(jss::ExportedTxn,
ltEXPORTED_TXN,
{
{sfExportedTxn, soeOPTIONAL},
{sfOwnerNode, soeREQUIRED},
{sfLedgerSequence, soeREQUIRED},
},
commonFields);
// clang-format on
}

View File

@@ -103,7 +103,6 @@ CONSTRUCT_TYPED_SFIELD(sfHookEmitCount, "HookEmitCount", UINT16,
CONSTRUCT_TYPED_SFIELD(sfHookExecutionIndex, "HookExecutionIndex", UINT16, 19);
CONSTRUCT_TYPED_SFIELD(sfHookApiVersion, "HookApiVersion", UINT16, 20);
CONSTRUCT_TYPED_SFIELD(sfHookStateScale, "HookStateScale", UINT16, 21);
CONSTRUCT_TYPED_SFIELD(sfHookExportCount, "HookExportCount", UINT16, 22);
// 32-bit integers (common)
CONSTRUCT_TYPED_SFIELD(sfNetworkID, "NetworkID", UINT32, 1);
@@ -362,7 +361,6 @@ CONSTRUCT_UNTYPED_SFIELD(sfImportVLKey, "ImportVLKey", OBJECT,
CONSTRUCT_UNTYPED_SFIELD(sfHookEmission, "HookEmission", OBJECT, 93);
CONSTRUCT_UNTYPED_SFIELD(sfMintURIToken, "MintURIToken", OBJECT, 92);
CONSTRUCT_UNTYPED_SFIELD(sfAmountEntry, "AmountEntry", OBJECT, 91);
CONSTRUCT_UNTYPED_SFIELD(sfExportedTxn, "ExportedTxn", OBJECT, 90);
// array of objects
// ARRAY/1 is reserved for end of array

View File

@@ -141,7 +141,6 @@ transResults()
MAKE_ERROR(telNON_LOCAL_EMITTED_TXN, "Emitted transaction cannot be applied because it was not generated locally."),
MAKE_ERROR(telIMPORT_VL_KEY_NOT_RECOGNISED, "Import vl key was not recognized."),
MAKE_ERROR(telCAN_NOT_QUEUE_IMPORT, "Import transaction was not able to be directly applied and cannot be queued."),
MAKE_ERROR(telSHADOW_TICKET_REQUIRED, "The imported transaction uses a TicketSequence but no shadow ticket exists."),
MAKE_ERROR(temMALFORMED, "Malformed transaction."),
MAKE_ERROR(temBAD_AMOUNT, "Can only send positive amounts."),
MAKE_ERROR(temBAD_CURRENCY, "Malformed: Bad currency."),

View File

@@ -490,26 +490,6 @@ TxFormats::TxFormats()
{sfStartTime, soeOPTIONAL},
},
commonFields);
add(jss::ExportSign,
ttEXPORT_SIGN,
{
{sfSigner, soeREQUIRED},
{sfLedgerSequence, soeREQUIRED},
{sfTransactionHash, soeREQUIRED},
},
commonFields);
add(jss::Export,
ttEXPORT,
{
{sfTransactionHash, soeREQUIRED},
{sfExportedTxn, soeREQUIRED},
{sfSigners, soeREQUIRED},
{sfLedgerSequence, soeREQUIRED},
},
commonFields);
}
TxFormats const&

View File

@@ -140,9 +140,6 @@ JSS(HookState); // ledger type.
JSS(HookStateData); // field.
JSS(HookStateKey); // field.
JSS(EmittedTxn); // ledger type.
JSS(ExportedTxn);
JSS(Export);
JSS(ExportSign);
JSS(SignerList); // ledger type.
JSS(SignerListSet); // transaction type.
JSS(SigningPubKey); // field.

View File

@@ -76,7 +76,6 @@ doSubscribe(RPC::JsonContext& context)
{
auto rspSub = make_RPCSub(
context.app.getOPs(),
context.app.getIOService(),
context.app.getJobQueue(),
strUrl,
strUsername,

View File

@@ -0,0 +1,819 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/basics/ByteUtilities.h>
#include <ripple/net/HTTPClient.h>
#include <test/jtx.h>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <atomic>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
namespace ripple {
namespace test {
// Minimal TCP server for testing HTTPClient behavior.
// Accepts connections and sends configurable HTTP responses.
class MockHTTPServer
{
boost::asio::io_service ios_;
std::unique_ptr<boost::asio::io_service::work> work_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread thread_;
std::atomic<bool> running_{true};
unsigned short port_;
// Metrics
std::atomic<int> activeConnections_{0};
std::atomic<int> peakConnections_{0};
std::atomic<int> totalAccepted_{0};
// Configurable behavior
std::atomic<int> statusCode_{200};
std::atomic<int> delayMs_{0};
std::atomic<bool> sendResponse_{true};
std::atomic<bool> closeImmediately_{false};
std::atomic<bool> noContentLength_{false};
public:
MockHTTPServer()
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
, acceptor_(
ios_,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"),
0))
{
port_ = acceptor_.local_endpoint().port();
accept();
thread_ = std::thread([this] { ios_.run(); });
}
~MockHTTPServer()
{
running_ = false;
work_.reset(); // Allow io_service to stop.
boost::system::error_code ec;
acceptor_.close(ec);
ios_.stop();
if (thread_.joinable())
thread_.join();
}
unsigned short
port() const
{
return port_;
}
int
activeConnectionCount() const
{
return activeConnections_;
}
int
peakConnectionCount() const
{
return peakConnections_;
}
int
totalAcceptedCount() const
{
return totalAccepted_;
}
void
setStatus(int code)
{
statusCode_ = code;
}
void
setDelay(int ms)
{
delayMs_ = ms;
}
void
setSendResponse(bool send)
{
sendResponse_ = send;
}
void
setCloseImmediately(bool close)
{
closeImmediately_ = close;
}
void
setNoContentLength(bool noContentLength)
{
noContentLength_ = noContentLength;
}
private:
void
accept()
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
acceptor_.async_accept(*sock, [this, sock](auto ec) {
if (!ec && running_)
{
++totalAccepted_;
int current = ++activeConnections_;
int prev = peakConnections_.load();
while (current > prev &&
!peakConnections_.compare_exchange_weak(prev, current))
;
handleConnection(sock);
accept();
}
});
}
void
handleConnection(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
if (closeImmediately_)
{
boost::system::error_code ec;
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
sock->close(ec);
--activeConnections_;
return;
}
auto buf = std::make_shared<boost::asio::streambuf>();
boost::asio::async_read_until(
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, size_t) {
if (ec)
{
--activeConnections_;
return;
}
if (!sendResponse_)
{
// Hold connection open without responding.
// The socket shared_ptr prevents cleanup.
// This simulates a server that accepts but
// never responds (e.g., overloaded).
return;
}
auto delay = delayMs_.load();
if (delay > 0)
{
auto timer =
std::make_shared<boost::asio::steady_timer>(ios_);
timer->expires_from_now(std::chrono::milliseconds(delay));
timer->async_wait(
[this, sock, timer](auto) { sendHTTPResponse(sock); });
}
else
{
sendHTTPResponse(sock);
}
});
}
void
sendHTTPResponse(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
auto body = std::string("{}");
std::string header =
"HTTP/1.0 " + std::to_string(statusCode_.load()) + " OK\r\n";
if (!noContentLength_)
header += "Content-Length: " + std::to_string(body.size()) + "\r\n";
header += "\r\n";
auto response = std::make_shared<std::string>(header + body);
boost::asio::async_write(
*sock,
boost::asio::buffer(*response),
[this, sock, response](auto, size_t) {
boost::system::error_code ec;
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
sock->close(ec);
--activeConnections_;
});
}
};
//------------------------------------------------------------------------------
class HTTPClient_test : public beast::unit_test::suite
{
// Helper: fire an HTTP request and track completion via atomic counter.
void
fireRequest(
boost::asio::io_service& ios,
std::string const& host,
unsigned short port,
std::atomic<int>& completed,
beast::Journal& j,
std::chrono::seconds timeout = std::chrono::seconds{5})
{
HTTPClient::request(
false, // no SSL
ios,
host,
port,
[](boost::asio::streambuf& sb, std::string const& strHost) {
std::ostream os(&sb);
os << "POST / HTTP/1.0\r\n"
<< "Host: " << strHost << "\r\n"
<< "Content-Type: application/json\r\n"
<< "Content-Length: 2\r\n"
<< "\r\n"
<< "{}";
},
megabytes(1),
timeout,
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
}
//--------------------------------------------------------------------------
void
testCleanupAfterSuccess()
{
testcase("Socket cleanup after successful response");
// After a successful HTTP request completes, the
// HTTPClientImp should be destroyed and its socket
// closed promptly — not held until the deadline fires.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
ios.run();
}
BEAST_EXPECT(completed == 1);
BEAST_EXPECT(server.totalAcceptedCount() == 1);
// After io_service.run() returns, the server should
// see zero active connections — socket was released.
BEAST_EXPECT(server.activeConnectionCount() == 0);
}
void
testCleanupAfter500()
{
testcase("Socket cleanup after HTTP 500");
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(500);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
ios.run();
}
BEAST_EXPECT(completed == 1);
BEAST_EXPECT(server.activeConnectionCount() == 0);
}
void
testCleanupAfterConnectionRefused()
{
testcase("Socket cleanup after connection refused");
using namespace jtx;
Env env{*this};
// Bind a port, then close it — guarantees nothing is listening.
boost::asio::io_service tmp;
boost::asio::ip::tcp::acceptor acc(
tmp,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"), 0));
auto port = acc.local_endpoint().port();
acc.close();
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", port, completed, j);
ios.run();
}
// Callback should still be invoked (with error).
BEAST_EXPECT(completed == 1);
}
void
testCleanupAfterTimeout()
{
testcase("Socket cleanup after timeout");
// Server accepts but never responds. HTTPClient should
// time out, clean up, and invoke the callback.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setSendResponse(false); // accept, read, but never respond
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
// Short timeout to keep the test fast.
fireRequest(
ios,
"127.0.0.1",
server.port(),
completed,
j,
std::chrono::seconds{2});
ios.run();
}
// Callback must be invoked even on timeout.
BEAST_EXPECT(completed == 1);
}
void
testCleanupAfterServerCloseBeforeResponse()
{
testcase("Socket cleanup after server closes before response");
// Server accepts the connection then immediately closes
// it without sending anything.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setCloseImmediately(true);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
ios.run();
}
BEAST_EXPECT(completed == 1);
BEAST_EXPECT(server.activeConnectionCount() == 0);
}
void
testEOFCompletionCallsCallback()
{
testcase("EOF completion invokes callback (handleData bug)");
// HTTPClientImp::handleData has a code path where
// mShutdown == eof results in logging "Complete." but
// never calling invokeComplete(). This means:
// - The completion callback is never invoked
// - The deadline timer is never cancelled
// - The socket is held open until the 30s deadline
//
// This test verifies the callback IS invoked after an
// EOF response. If this test fails (completed == 0 after
// ios.run()), the handleData EOF bug is confirmed.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(
ios,
"127.0.0.1",
server.port(),
completed,
j,
std::chrono::seconds{3});
ios.run();
}
// If handleData EOF path doesn't call invokeComplete,
// the callback won't fire until the deadline (3s) expires,
// and even then handleDeadline doesn't invoke mComplete.
// The io_service.run() will still return (deadline fires,
// handleShutdown runs, all handlers done), but completed
// will be 0.
if (completed != 1)
{
log << " BUG CONFIRMED: handleData EOF path does not"
<< " call invokeComplete(). Callback was not invoked."
<< " Socket held open until deadline." << std::endl;
}
BEAST_EXPECT(completed == 1);
}
void
testConcurrentRequestCleanup()
{
testcase("Concurrent requests all clean up");
// Fire N requests at once on the same io_service.
// All should complete and release their sockets.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
static constexpr int N = 50;
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
for (int i = 0; i < N; ++i)
{
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
}
ios.run();
}
BEAST_EXPECT(completed == N);
// Brief sleep to let server-side shutdown complete.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(server.activeConnectionCount() == 0);
log << " Completed: " << completed
<< ", Peak concurrent: " << server.peakConnectionCount()
<< ", Active after: " << server.activeConnectionCount()
<< std::endl;
}
void
testConcurrent500Cleanup()
{
testcase("Concurrent 500 requests all clean up");
// Fire N requests that all get 500 responses. Verify
// all sockets are released and no FDs leak.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(500);
static constexpr int N = 50;
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
for (int i = 0; i < N; ++i)
{
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
}
ios.run();
}
BEAST_EXPECT(completed == N);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(server.activeConnectionCount() == 0);
}
void
testEOFWithoutContentLength()
{
testcase("EOF without Content-Length (handleData EOF path)");
// When a server sends a response WITHOUT Content-Length,
// HTTPClientImp reads up to maxResponseSize. The server
// closes the connection, causing EOF in handleData.
//
// In handleData, the EOF path (mShutdown == eof) logs
// "Complete." but does NOT call invokeComplete(). This
// means:
// - mComplete (callback) is never invoked
// - deadline timer is never cancelled
// - socket + object held alive until deadline fires
//
// This test uses a SHORT deadline to keep it fast. If
// the callback IS invoked, ios.run() returns quickly.
// If NOT, ios.run() blocks until the deadline (2s).
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
server.setNoContentLength(true);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
auto start = std::chrono::steady_clock::now();
{
boost::asio::io_service ios;
fireRequest(
ios,
"127.0.0.1",
server.port(),
completed,
j,
std::chrono::seconds{2});
ios.run();
}
auto elapsed = std::chrono::steady_clock::now() - start;
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
.count();
if (completed == 0)
{
log << " BUG CONFIRMED: handleData EOF path does not"
<< " call invokeComplete(). Callback never invoked."
<< " io_service.run() blocked for " << ms << "ms"
<< " (deadline timeout)." << std::endl;
}
else
{
log << " Callback invoked in " << ms << "ms." << std::endl;
}
// This WILL fail if the EOF bug exists — the callback
// is only invoked via the deadline timeout path, which
// does NOT call mComplete.
BEAST_EXPECT(completed == 1);
}
void
testPersistentIOServiceCleanup()
{
testcase("Cleanup on persistent io_service (no destructor mask)");
// Previous tests destroy the io_service after run(),
// which releases all pending handlers' shared_ptrs.
// This masks leaks. Here we use a PERSISTENT io_service
// (with work guard, running on its own thread) and check
// that HTTPClientImp objects are destroyed WITHOUT relying
// on io_service destruction.
//
// We track the object's lifetime via the completion
// callback — if it fires, the async chain completed
// normally. If it doesn't fire within a reasonable time
// but the io_service is still running, something is stuck.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
// Persistent io_service — stays alive the whole test.
boost::asio::io_service ios;
auto work = std::make_unique<boost::asio::io_service::work>(ios);
std::thread runner([&ios] { ios.run(); });
// Fire request on the persistent io_service.
HTTPClient::request(
false,
ios,
"127.0.0.1",
server.port(),
[](boost::asio::streambuf& sb, std::string const& strHost) {
std::ostream os(&sb);
os << "POST / HTTP/1.0\r\n"
<< "Host: " << strHost << "\r\n"
<< "Content-Type: application/json\r\n"
<< "Content-Length: 2\r\n"
<< "\r\n"
<< "{}";
},
megabytes(1),
std::chrono::seconds{5},
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
// Wait for completion without destroying io_service.
auto deadline =
std::chrono::steady_clock::now() + std::chrono::seconds{5};
while (completed == 0 && std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
BEAST_EXPECT(completed == 1);
// Give server-side shutdown a moment.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(server.activeConnectionCount() == 0);
if (server.activeConnectionCount() != 0)
{
log << " BUG: Socket still open on persistent"
<< " io_service. FD leaked." << std::endl;
}
// Clean shutdown.
work.reset();
ios.stop();
runner.join();
}
void
testPersistentIOService500Cleanup()
{
testcase("500 cleanup on persistent io_service");
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(500);
static constexpr int N = 20;
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
boost::asio::io_service ios;
auto work = std::make_unique<boost::asio::io_service::work>(ios);
std::thread runner([&ios] { ios.run(); });
for (int i = 0; i < N; ++i)
{
HTTPClient::request(
false,
ios,
"127.0.0.1",
server.port(),
[](boost::asio::streambuf& sb, std::string const& strHost) {
std::ostream os(&sb);
os << "POST / HTTP/1.0\r\n"
<< "Host: " << strHost << "\r\n"
<< "Content-Type: application/json\r\n"
<< "Content-Length: 2\r\n"
<< "\r\n"
<< "{}";
},
megabytes(1),
std::chrono::seconds{5},
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
}
auto deadline =
std::chrono::steady_clock::now() + std::chrono::seconds{10};
while (completed < N && std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
BEAST_EXPECT(completed == N);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(server.activeConnectionCount() == 0);
log << " Completed: " << completed << "/" << N
<< ", Active connections after: " << server.activeConnectionCount()
<< std::endl;
work.reset();
ios.stop();
runner.join();
}
void
testGetSelfReferenceCleanup()
{
testcase("get() shared_from_this cycle releases");
// HTTPClientImp::get() binds shared_from_this() into
// mBuild via makeGet. This creates a reference cycle:
// object -> mBuild -> shared_ptr<object>
// The object can only be destroyed if mBuild is cleared.
// Since mBuild is never explicitly cleared, this may be
// a permanent FD leak.
//
// This test fires a GET request and checks whether the
// HTTPClientImp is destroyed (and socket closed) after
// completion.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
HTTPClient::get(
false, // no SSL
ios,
"127.0.0.1",
server.port(),
"/test",
megabytes(1),
std::chrono::seconds{5},
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
ios.run();
}
BEAST_EXPECT(completed == 1);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// If the get() self-reference cycle leaks, the server
// will still show an active connection here (the socket
// in the leaked HTTPClientImp is never closed).
if (server.activeConnectionCount() != 0)
{
log << " BUG CONFIRMED: get() self-reference cycle"
<< " prevents HTTPClientImp destruction."
<< " Socket FD leaked." << std::endl;
}
BEAST_EXPECT(server.activeConnectionCount() == 0);
}
public:
void
run() override
{
testCleanupAfterSuccess();
testCleanupAfter500();
testCleanupAfterConnectionRefused();
testCleanupAfterTimeout();
testCleanupAfterServerCloseBeforeResponse();
testEOFCompletionCallsCallback();
testConcurrentRequestCleanup();
testConcurrent500Cleanup();
testEOFWithoutContentLength();
testPersistentIOServiceCleanup();
testPersistentIOService500Cleanup();
testGetSelfReferenceCleanup();
}
};
BEAST_DEFINE_TESTSUITE(HTTPClient, net, ripple);
} // namespace test
} // namespace ripple