mirror of
https://github.com/Xahau/xahaud.git
synced 2026-04-06 03:42:21 +00:00
Compare commits
9 Commits
export
...
subscripti
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7241fed6d0 | ||
|
|
a6dfb40413 | ||
|
|
3e0d6b9cd2 | ||
|
|
a8388e48a4 | ||
|
|
49908096d5 | ||
|
|
12e1afb694 | ||
|
|
c355ad9971 | ||
|
|
a8d7b2619e | ||
|
|
775fb3a8b2 |
15
.github/actions/xahau-ga-dependencies/action.yml
vendored
15
.github/actions/xahau-ga-dependencies/action.yml
vendored
@@ -134,10 +134,17 @@ runs:
|
||||
- name: Export custom recipes
|
||||
shell: bash
|
||||
run: |
|
||||
conan export external/snappy --version 1.1.10 --user xahaud --channel stable
|
||||
conan export external/soci --version 4.0.3 --user xahaud --channel stable
|
||||
conan export external/wasmedge --version 0.11.2 --user xahaud --channel stable
|
||||
|
||||
# Export snappy if not already exported
|
||||
conan list snappy/1.1.10@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
|
||||
conan export external/snappy --version 1.1.10 --user xahaud --channel stable
|
||||
|
||||
# Export soci if not already exported
|
||||
conan list soci/4.0.3@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
|
||||
conan export external/soci --version 4.0.3 --user xahaud --channel stable
|
||||
|
||||
# Export wasmedge if not already exported
|
||||
conan list wasmedge/0.11.2@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
|
||||
conan export external/wasmedge --version 0.11.2 --user xahaud --channel stable
|
||||
- name: Install dependencies
|
||||
shell: bash
|
||||
env:
|
||||
|
||||
12
.github/workflows/xahau-ga-macos.yml
vendored
12
.github/workflows/xahau-ga-macos.yml
vendored
@@ -43,14 +43,22 @@ jobs:
|
||||
# To isolate environments for each Runner, instead of installing globally with brew,
|
||||
# use mise to isolate environments for each Runner directory.
|
||||
- name: Setup toolchain (mise)
|
||||
uses: jdx/mise-action@v2
|
||||
uses: jdx/mise-action@v3.6.1
|
||||
with:
|
||||
cache: false
|
||||
install: true
|
||||
mise_toml: |
|
||||
[tools]
|
||||
cmake = "3.23.1"
|
||||
python = "3.12"
|
||||
pipx = "latest"
|
||||
conan = "2"
|
||||
ninja = "latest"
|
||||
ccache = "latest"
|
||||
|
||||
- name: Install tools via mise
|
||||
run: |
|
||||
mise install
|
||||
mise use cmake@3.23.1 python@3.12 pipx@latest conan@2 ninja@latest ccache@latest
|
||||
mise reshim
|
||||
echo "$HOME/.local/share/mise/shims" >> "$GITHUB_PATH"
|
||||
|
||||
|
||||
@@ -957,6 +957,7 @@ if (tests)
|
||||
subdir: net
|
||||
#]===============================]
|
||||
src/test/net/DatabaseDownloader_test.cpp
|
||||
src/test/net/HTTPClient_test.cpp
|
||||
#[===============================[
|
||||
test sources:
|
||||
subdir: nodestore
|
||||
|
||||
@@ -149,6 +149,7 @@ test.ledger > ripple.ledger
|
||||
test.ledger > ripple.protocol
|
||||
test.ledger > test.jtx
|
||||
test.ledger > test.toplevel
|
||||
test.net > ripple.basics
|
||||
test.net > ripple.net
|
||||
test.net > test.jtx
|
||||
test.net > test.toplevel
|
||||
|
||||
@@ -350,10 +350,7 @@ enum hook_return_code : int64_t {
|
||||
MEM_OVERLAP = -43, // one or more specified buffers are the same memory
|
||||
TOO_MANY_STATE_MODIFICATIONS = -44, // more than 5000 modified state
|
||||
// entires in the combined hook chains
|
||||
TOO_MANY_NAMESPACES = -45,
|
||||
EXPORT_FAILURE = -46,
|
||||
TOO_MANY_EXPORTED_TXN = -47,
|
||||
|
||||
TOO_MANY_NAMESPACES = -45
|
||||
};
|
||||
|
||||
enum ExitType : uint8_t {
|
||||
@@ -367,7 +364,6 @@ const uint16_t max_state_modifications = 256;
|
||||
const uint8_t max_slots = 255;
|
||||
const uint8_t max_nonce = 255;
|
||||
const uint8_t max_emit = 255;
|
||||
const uint8_t max_export = 4;
|
||||
const uint8_t max_params = 16;
|
||||
const double fee_base_multiplier = 1.1f;
|
||||
|
||||
@@ -473,13 +469,6 @@ static const APIWhitelist import_whitelist_1{
|
||||
// clang-format on
|
||||
};
|
||||
|
||||
static const APIWhitelist import_whitelist_2{
|
||||
// clang-format off
|
||||
HOOK_API_DEFINITION(I64, xport, (I32, I32)),
|
||||
HOOK_API_DEFINITION(I64, xport_reserve, (I32)),
|
||||
// clang-format on
|
||||
};
|
||||
|
||||
#undef HOOK_API_DEFINITION
|
||||
#undef I32
|
||||
#undef I64
|
||||
|
||||
@@ -1034,12 +1034,6 @@ validateGuards(
|
||||
{
|
||||
// PASS, this is a version 1 api
|
||||
}
|
||||
else if (rulesVersion & 0x04U &&
|
||||
hook_api::import_whitelist_2.find(import_name) !=
|
||||
hook_api::import_whitelist_2.end())
|
||||
{
|
||||
// PASS, this is an export api
|
||||
}
|
||||
else
|
||||
{
|
||||
GUARDLOG(hook::log::IMPORT_ILLEGAL)
|
||||
|
||||
@@ -406,17 +406,6 @@ DECLARE_HOOK_FUNCTION(
|
||||
uint32_t slot_no_tx,
|
||||
uint32_t slot_no_meta);
|
||||
|
||||
DECLARE_HOOK_FUNCTION(
|
||||
int64_t,
|
||||
xport,
|
||||
uint32_t write_ptr,
|
||||
uint32_t write_len,
|
||||
uint32_t read_ptr,
|
||||
uint32_t read_len);
|
||||
DECLARE_HOOK_FUNCTION(
|
||||
int64_t,
|
||||
xport_reserve,
|
||||
uint32_t count);
|
||||
/*
|
||||
DECLARE_HOOK_FUNCTION(int64_t, str_find, uint32_t hread_ptr,
|
||||
uint32_t hread_len, uint32_t nread_ptr, uint32_t nread_len, uint32_t mode,
|
||||
@@ -496,8 +485,6 @@ struct HookResult
|
||||
|
||||
std::queue<std::shared_ptr<ripple::Transaction>>
|
||||
emittedTxn{}; // etx stored here until accept/rollback
|
||||
std::queue<std::shared_ptr<ripple::Transaction>>
|
||||
exportedTxn{};
|
||||
HookStateMap& stateMap;
|
||||
uint16_t changedStateCount = 0;
|
||||
std::map<
|
||||
@@ -554,7 +541,6 @@ struct HookContext
|
||||
uint16_t ledger_nonce_counter{0};
|
||||
int64_t expected_etxn_count{-1}; // make this a 64bit int so the uint32
|
||||
// from the hookapi cant overflow it
|
||||
int64_t expected_export_count{-1};
|
||||
std::map<ripple::uint256, bool> nonce_used{};
|
||||
uint32_t generation =
|
||||
0; // used for caching, only generated when txn_generation is called
|
||||
@@ -891,9 +877,6 @@ public:
|
||||
ADD_HOOK_FUNCTION(meta_slot, ctx);
|
||||
ADD_HOOK_FUNCTION(xpop_slot, ctx);
|
||||
|
||||
ADD_HOOK_FUNCTION(xport, ctx);
|
||||
ADD_HOOK_FUNCTION(xport_reserve, ctx);
|
||||
|
||||
/*
|
||||
ADD_HOOK_FUNCTION(str_find, ctx);
|
||||
ADD_HOOK_FUNCTION(str_replace, ctx);
|
||||
|
||||
@@ -79,7 +79,7 @@ main(int argc, char** argv)
|
||||
|
||||
close(fd);
|
||||
|
||||
auto result = validateGuards(hook, std::cout, "", 7);
|
||||
auto result = validateGuards(hook, std::cout, "", 3);
|
||||
|
||||
if (!result)
|
||||
{
|
||||
|
||||
@@ -1971,8 +1971,6 @@ hook::finalizeHookResult(
|
||||
// directory) if we are allowed to
|
||||
std::vector<std::pair<uint256 /* txnid */, uint256 /* emit nonce */>>
|
||||
emission_txnid;
|
||||
std::vector<uint256 /* txnid */>
|
||||
exported_txnid;
|
||||
|
||||
if (doEmit)
|
||||
{
|
||||
@@ -2028,58 +2026,6 @@ hook::finalizeHookResult(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
DBG_PRINTF("exported txn count: %d\n", hookResult.exportedTxn.size());
|
||||
for (; hookResult.exportedTxn.size() > 0; hookResult.exportedTxn.pop())
|
||||
{
|
||||
auto& tpTrans = hookResult.exportedTxn.front();
|
||||
auto& id = tpTrans->getID();
|
||||
JLOG(j.trace()) << "HookExport[" << HR_ACC() << "]: " << id;
|
||||
|
||||
// exported txns must be marked bad by the hash router to ensure under
|
||||
// no circumstances they will enter consensus on *this* chain.
|
||||
applyCtx.app.getHashRouter().setFlags(id, SF_BAD);
|
||||
|
||||
std::shared_ptr<const ripple::STTx> ptr =
|
||||
tpTrans->getSTransaction();
|
||||
|
||||
auto exportedId = keylet::exportedTxn(id);
|
||||
auto sleExported = applyCtx.view().peek(exportedId);
|
||||
|
||||
if (!sleExported)
|
||||
{
|
||||
exported_txnid.emplace_back(id);
|
||||
|
||||
sleExported = std::make_shared<SLE>(exportedId);
|
||||
|
||||
// RH TODO: add a new constructor to STObject to avoid this
|
||||
// serder thing
|
||||
ripple::Serializer s;
|
||||
ptr->add(s);
|
||||
SerialIter sit(s.slice());
|
||||
|
||||
sleExported->emplace_back(ripple::STObject(sit, sfExportedTxn));
|
||||
auto page = applyCtx.view().dirInsert(
|
||||
keylet::exportedDir(), exportedId, [&](SLE::ref sle) {
|
||||
(*sle)[sfFlags] = lsfEmittedDir;
|
||||
});
|
||||
|
||||
if (page)
|
||||
{
|
||||
(*sleExported)[sfOwnerNode] = *page;
|
||||
applyCtx.view().insert(sleExported);
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j.warn())
|
||||
<< "HookError[" << HR_ACC() << "]: "
|
||||
<< "Export Directory full when trying to insert "
|
||||
<< id;
|
||||
return tecDIR_FULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool const fixV2 = applyCtx.view().rules().enabled(fixXahauV2);
|
||||
@@ -2106,12 +2052,6 @@ hook::finalizeHookResult(
|
||||
meta.setFieldU16(
|
||||
sfHookEmitCount,
|
||||
emission_txnid.size()); // this will never wrap, hard limit
|
||||
if (applyCtx.view().rules().enabled(featureExport))
|
||||
{
|
||||
meta.setFieldU16(
|
||||
sfHookExportCount,
|
||||
exported_txnid.size());
|
||||
}
|
||||
meta.setFieldU16(sfHookExecutionIndex, exec_index);
|
||||
meta.setFieldU16(sfHookStateChangeCount, hookResult.changedStateCount);
|
||||
meta.setFieldH256(sfHookHash, hookResult.hookHash);
|
||||
@@ -3948,27 +3888,6 @@ DEFINE_HOOK_FUNCTION(int64_t, etxn_reserve, uint32_t count)
|
||||
HOOK_TEARDOWN();
|
||||
}
|
||||
|
||||
DEFINE_HOOK_FUNCTION(int64_t, xport_reserve, uint32_t count)
|
||||
{
|
||||
HOOK_SETUP(); // populates memory_ctx, memory, memory_length, applyCtx,
|
||||
// hookCtx on current stack
|
||||
|
||||
if (hookCtx.expected_export_count > -1)
|
||||
return ALREADY_SET;
|
||||
|
||||
if (count < 1)
|
||||
return TOO_SMALL;
|
||||
|
||||
if (count > hook_api::max_export)
|
||||
return TOO_BIG;
|
||||
|
||||
hookCtx.expected_export_count = count;
|
||||
|
||||
return count;
|
||||
|
||||
HOOK_TEARDOWN();
|
||||
}
|
||||
|
||||
// Compute the burden of an emitted transaction based on a number of factors
|
||||
DEFINE_HOOK_FUNCNARG(int64_t, etxn_burden)
|
||||
{
|
||||
@@ -6237,92 +6156,6 @@ DEFINE_HOOK_FUNCTION(
|
||||
|
||||
HOOK_TEARDOWN();
|
||||
}
|
||||
|
||||
DEFINE_HOOK_FUNCTION(
|
||||
int64_t,
|
||||
xport,
|
||||
uint32_t write_ptr,
|
||||
uint32_t write_len,
|
||||
uint32_t read_ptr,
|
||||
uint32_t read_len)
|
||||
{
|
||||
HOOK_SETUP();
|
||||
|
||||
if (NOT_IN_BOUNDS(read_ptr, read_len, memory_length))
|
||||
return OUT_OF_BOUNDS;
|
||||
|
||||
if (NOT_IN_BOUNDS(write_ptr, write_len, memory_length))
|
||||
return OUT_OF_BOUNDS;
|
||||
|
||||
if (write_len < 32)
|
||||
return TOO_SMALL;
|
||||
|
||||
auto& app = hookCtx.applyCtx.app;
|
||||
|
||||
if (hookCtx.expected_export_count < 0)
|
||||
return PREREQUISITE_NOT_MET;
|
||||
|
||||
if (hookCtx.result.exportedTxn.size() >= hookCtx.expected_export_count)
|
||||
return TOO_MANY_EXPORTED_TXN;
|
||||
|
||||
ripple::Blob blob{memory + read_ptr, memory + read_ptr + read_len};
|
||||
|
||||
std::shared_ptr<STTx const> stpTrans;
|
||||
try
|
||||
{
|
||||
stpTrans = std::make_shared<STTx const>(
|
||||
SerialIter{memory + read_ptr, read_len});
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
JLOG(j.trace()) << "HookExport[" << HC_ACC() << "]: Failed " << e.what()
|
||||
<< "\n";
|
||||
return EXPORT_FAILURE;
|
||||
}
|
||||
|
||||
if (!stpTrans->isFieldPresent(sfAccount) ||
|
||||
stpTrans->getAccountID(sfAccount) != hookCtx.result.account)
|
||||
{
|
||||
JLOG(j.trace()) << "HookExport[" << HC_ACC()
|
||||
<< "]: Attempted to export a txn that's not for this Hook's Account ID.";
|
||||
return EXPORT_FAILURE;
|
||||
}
|
||||
|
||||
std::string reason;
|
||||
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, app);
|
||||
// RHTODO: is this needed or wise? VVV
|
||||
if (tpTrans->getStatus() != NEW)
|
||||
{
|
||||
JLOG(j.trace()) << "HookExport[" << HC_ACC()
|
||||
<< "]: tpTrans->getStatus() != NEW";
|
||||
return EXPORT_FAILURE;
|
||||
}
|
||||
auto const& txID = tpTrans->getID();
|
||||
|
||||
if (txID.size() > write_len)
|
||||
return TOO_SMALL;
|
||||
|
||||
if (NOT_IN_BOUNDS(write_ptr, txID.size(), memory_length))
|
||||
return OUT_OF_BOUNDS;
|
||||
|
||||
auto const write_txid = [&]() -> int64_t {
|
||||
WRITE_WASM_MEMORY_AND_RETURN(
|
||||
write_ptr,
|
||||
txID.size(),
|
||||
txID.data(),
|
||||
txID.size(),
|
||||
memory,
|
||||
memory_length);
|
||||
};
|
||||
|
||||
int64_t result = write_txid();
|
||||
|
||||
if (result == 32)
|
||||
hookCtx.result.exportedTxn.push(tpTrans);
|
||||
|
||||
return result;
|
||||
HOOK_TEARDOWN();
|
||||
}
|
||||
/*
|
||||
|
||||
DEFINE_HOOK_FUNCTION(
|
||||
|
||||
@@ -599,13 +599,6 @@ public:
|
||||
return validatorKeys_.publicKey;
|
||||
}
|
||||
|
||||
ValidatorKeys const&
|
||||
getValidatorKeys() const override
|
||||
{
|
||||
return validatorKeys_;
|
||||
}
|
||||
|
||||
|
||||
NetworkOPs&
|
||||
getOPs() override
|
||||
{
|
||||
|
||||
@@ -240,8 +240,7 @@ public:
|
||||
|
||||
virtual PublicKey const&
|
||||
getValidationPublicKey() const = 0;
|
||||
virtual ValidatorKeys const&
|
||||
getValidatorKeys() const = 0;
|
||||
|
||||
virtual Resource::Manager&
|
||||
getResourceManager() = 0;
|
||||
virtual PathRequests&
|
||||
|
||||
@@ -471,6 +471,10 @@ ManifestCache::applyManifest(Manifest m)
|
||||
|
||||
auto masterKey = m.masterKey;
|
||||
map_.emplace(std::move(masterKey), std::move(m));
|
||||
|
||||
// Increment sequence to invalidate cached manifest messages
|
||||
seq_++;
|
||||
|
||||
return ManifestDisposition::accepted;
|
||||
}
|
||||
|
||||
|
||||
@@ -27,8 +27,6 @@
|
||||
#include <ripple/protocol/Feature.h>
|
||||
#include <ripple/protocol/jss.h>
|
||||
#include <ripple/protocol/st.h>
|
||||
#include <ripple/app/misc/ValidatorKeys.h>
|
||||
#include <ripple/protocol/Sign.h>
|
||||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <numeric>
|
||||
@@ -1541,247 +1539,6 @@ TxQ::accept(Application& app, OpenView& view)
|
||||
}
|
||||
}
|
||||
|
||||
// Inject exported transactions/signatures, if any
|
||||
if (view.rules().enabled(featureExport))
|
||||
{
|
||||
do
|
||||
{
|
||||
// if we're not a validator we do nothing here
|
||||
if (app.getValidationPublicKey().empty())
|
||||
break;
|
||||
|
||||
auto const& keys = app.getValidatorKeys();
|
||||
|
||||
if (keys.configInvalid())
|
||||
break;
|
||||
|
||||
// and if we're not on the UNLReport we also do nothing
|
||||
|
||||
auto const unlRep = view.read(keylet::UNLReport());
|
||||
if (!unlRep || !unlRep->isFieldPresent(sfActiveValidators))
|
||||
{
|
||||
// nothing to do without a unlreport object
|
||||
break;
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
auto const& avs = unlRep->getFieldArray(sfActiveValidators);
|
||||
for (auto const& av : avs)
|
||||
{
|
||||
if (PublicKey(av[sfPublicKey]) == keys.masterPublicKey)
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found)
|
||||
break;
|
||||
|
||||
// execution to here means we're a validator and on the UNLReport
|
||||
|
||||
AccountID signingAcc = calcAccountID(keys.publicKey);
|
||||
|
||||
Keylet const exportedDirKeylet{keylet::exportedDir()};
|
||||
if (dirIsEmpty(view, exportedDirKeylet))
|
||||
break;
|
||||
|
||||
std::shared_ptr<SLE const> sleDirNode{};
|
||||
unsigned int uDirEntry{0};
|
||||
uint256 dirEntry{beast::zero};
|
||||
|
||||
if (!cdirFirst(
|
||||
view,
|
||||
exportedDirKeylet.key,
|
||||
sleDirNode,
|
||||
uDirEntry,
|
||||
dirEntry))
|
||||
break;
|
||||
|
||||
do
|
||||
{
|
||||
Keylet const itemKeylet{ltCHILD, dirEntry};
|
||||
auto sleItem = view.read(itemKeylet);
|
||||
if (!sleItem)
|
||||
{
|
||||
// Directory node has an invalid index. Bail out.
|
||||
JLOG(j_.warn())
|
||||
<< "ExportedTxn processing: directory node in ledger "
|
||||
<< view.seq()
|
||||
<< " has index to object that is missing: "
|
||||
<< to_string(dirEntry);
|
||||
|
||||
// RH TODO: if this ever happens the entry should be
|
||||
// gracefully removed (somehow)
|
||||
continue;
|
||||
}
|
||||
|
||||
LedgerEntryType const nodeType{
|
||||
safe_cast<LedgerEntryType>((*sleItem)[sfLedgerEntryType])};
|
||||
|
||||
if (nodeType != ltEXPORTED_TXN)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "ExportedTxn processing: emitted directory contained "
|
||||
"non ltEMITTED_TXN type";
|
||||
// RH TODO: if this ever happens the entry should be
|
||||
// gracefully removed (somehow)
|
||||
continue;
|
||||
}
|
||||
|
||||
JLOG(j_.info()) << "Processing exported txn: " << *sleItem;
|
||||
|
||||
auto const& exported =
|
||||
const_cast<ripple::STLedgerEntry&>(*sleItem)
|
||||
.getField(sfExportedTxn)
|
||||
.downcast<STObject>();
|
||||
|
||||
auto const& txnHash = sleItem->getFieldH256(sfTransactionHash);
|
||||
|
||||
auto exportedLgrSeq = exported.getFieldU32(sfLedgerSequence);
|
||||
|
||||
auto const seq = view.seq();
|
||||
|
||||
if (exportedLgrSeq == seq)
|
||||
{
|
||||
// this shouldn't happen, but do nothing
|
||||
continue;
|
||||
}
|
||||
|
||||
if (exportedLgrSeq < seq - 1)
|
||||
{
|
||||
// all old entries need to be turned into Export transactions so they can be removed
|
||||
// from the directory
|
||||
|
||||
// in the previous ledger all the ExportSign transactions were executed, and one-by-one
|
||||
// added the validators' signatures to the ltEXPORTED_TXN's sfSigners array.
|
||||
// now we need to collect these together and place them inside the ExportedTxn blob
|
||||
// and publish the blob in the Export transaction type.
|
||||
|
||||
STArray signers = sleItem->getFieldArray(sfSigners);
|
||||
|
||||
auto s = std::make_shared<ripple::Serializer>();
|
||||
exported.add(*s);
|
||||
SerialIter sitTrans(s->slice());
|
||||
try
|
||||
{
|
||||
auto stpTrans =
|
||||
std::make_shared<STTx>(std::ref(sitTrans));
|
||||
|
||||
if (!stpTrans->isFieldPresent(sfAccount) ||
|
||||
stpTrans->getAccountID(sfAccount) == beast::zero)
|
||||
{
|
||||
JLOG(j_.warn()) << "Hook: Export failure: "
|
||||
<< "sfAccount missing or zero.";
|
||||
// RH TODO: if this ever happens the entry should be
|
||||
// gracefully removed (somehow)
|
||||
continue;
|
||||
}
|
||||
|
||||
// RH TODO: should we force remove signingpubkey here?
|
||||
|
||||
stpTrans->setFieldArray(sfSigners, signers);
|
||||
|
||||
Blob const& blob = stpTrans->getSerializer().peekData();
|
||||
|
||||
STTx exportTx(ttEXPORT, [&](auto& obj) {
|
||||
obj.setFieldVL(sfExportedTxn, blob);
|
||||
obj.setFieldU32(sfLedgerSequence, seq);
|
||||
obj.setFieldH256(sfTransactionHash, txnHash);
|
||||
obj.setFieldArray(sfSigners, signers);
|
||||
});
|
||||
|
||||
// submit to the ledger
|
||||
{
|
||||
uint256 txID = exportTx.getTransactionID();
|
||||
auto s = std::make_shared<ripple::Serializer>();
|
||||
exportTx.add(*s);
|
||||
app.getHashRouter().setFlags(txID, SF_PRIVATE2);
|
||||
app.getHashRouter().setFlags(txID, SF_EMITTED);
|
||||
view.rawTxInsert(txID, std::move(s), nullptr);
|
||||
ledgerChanged = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
catch (std::exception& e)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "ExportedTxn Processing: Failure: " << e.what()
|
||||
<< "\n";
|
||||
}
|
||||
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// this ledger is the one after the exported txn was added to the directory
|
||||
// so generate the export sign txns
|
||||
|
||||
auto s = std::make_shared<ripple::Serializer>();
|
||||
exported.add(*s);
|
||||
SerialIter sitTrans(s->slice());
|
||||
try
|
||||
{
|
||||
auto const& stpTrans =
|
||||
std::make_shared<STTx const>(std::ref(sitTrans));
|
||||
|
||||
if (!stpTrans->isFieldPresent(sfAccount) ||
|
||||
stpTrans->getAccountID(sfAccount) == beast::zero)
|
||||
{
|
||||
JLOG(j_.warn()) << "Hook: Export failure: "
|
||||
<< "sfAccount missing or zero.";
|
||||
// RH TODO: if this ever happens the entry should be
|
||||
// gracefully removed (somehow)
|
||||
continue;
|
||||
}
|
||||
|
||||
auto seq = view.info().seq;
|
||||
auto txnHash = stpTrans->getTransactionID();
|
||||
|
||||
Serializer s =
|
||||
buildMultiSigningData(*stpTrans, signingAcc);
|
||||
|
||||
auto multisig = ripple::sign(keys.publicKey, keys.secretKey, s.slice());
|
||||
|
||||
STTx exportSignTx(ttEXPORT_SIGN, [&](auto& obj) {
|
||||
obj.set(([&]() {
|
||||
auto inner = std::make_unique<STObject>(sfSigner);
|
||||
inner->setFieldVL(sfSigningPubKey, keys.publicKey);
|
||||
inner->setAccountID(sfAccount, signingAcc);
|
||||
inner->setFieldVL(sfTxnSignature, multisig);
|
||||
return inner;
|
||||
})());
|
||||
obj.setFieldU32(sfLedgerSequence, seq);
|
||||
obj.setFieldH256(sfTransactionHash, txnHash);
|
||||
});
|
||||
|
||||
// submit to the ledger
|
||||
{
|
||||
uint256 txID = exportSignTx.getTransactionID();
|
||||
auto s = std::make_shared<ripple::Serializer>();
|
||||
exportSignTx.add(*s);
|
||||
app.getHashRouter().setFlags(txID, SF_PRIVATE2);
|
||||
app.getHashRouter().setFlags(txID, SF_EMITTED);
|
||||
view.rawTxInsert(txID, std::move(s), nullptr);
|
||||
ledgerChanged = true;
|
||||
}
|
||||
}
|
||||
|
||||
catch (std::exception& e)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "ExportedTxn Processing: Failure: " << e.what()
|
||||
<< "\n";
|
||||
}
|
||||
|
||||
} while (cdirNext(
|
||||
view, exportedDirKeylet.key, sleDirNode, uDirEntry, dirEntry));
|
||||
|
||||
} while (0);
|
||||
|
||||
}
|
||||
|
||||
// Inject emitted transactions if any
|
||||
if (view.rules().enabled(featureHooks))
|
||||
do
|
||||
|
||||
@@ -96,13 +96,6 @@ Change::preflight(PreflightContext const& ctx)
|
||||
}
|
||||
}
|
||||
|
||||
if ((ctx.tx.getTxnType() == ttEXPORT_SIGN || ctx.tx.getTxnType() == ttEXPORT) &&
|
||||
!ctx.rules.enabled(featureExport))
|
||||
{
|
||||
JLOG(ctx.j.warn()) << "Change: Export not enabled";
|
||||
return temDISABLED;
|
||||
}
|
||||
|
||||
return tesSUCCESS;
|
||||
}
|
||||
|
||||
@@ -161,8 +154,6 @@ Change::preclaim(PreclaimContext const& ctx)
|
||||
case ttAMENDMENT:
|
||||
case ttUNL_MODIFY:
|
||||
case ttEMIT_FAILURE:
|
||||
case ttEXPORT:
|
||||
case ttEXPORT_SIGN:
|
||||
return tesSUCCESS;
|
||||
case ttUNL_REPORT: {
|
||||
if (!ctx.tx.isFieldPresent(sfImportVLKey) ||
|
||||
@@ -218,11 +209,6 @@ Change::doApply()
|
||||
return applyEmitFailure();
|
||||
case ttUNL_REPORT:
|
||||
return applyUNLReport();
|
||||
case ttEXPORT:
|
||||
return applyExport();
|
||||
case ttEXPORT_SIGN:
|
||||
return applyExportSign();
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
return tefFAILURE;
|
||||
@@ -620,8 +606,7 @@ Change::activateXahauGenesis()
|
||||
loggerStream,
|
||||
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh",
|
||||
(ctx_.view().rules().enabled(featureHooksUpdate1) ? 1 : 0) +
|
||||
(ctx_.view().rules().enabled(fix20250131) ? 2 : 0) +
|
||||
(ctx_.view().rules().enabled(featureExport) ? 4 : 0));
|
||||
(ctx_.view().rules().enabled(fix20250131) ? 2 : 0));
|
||||
|
||||
if (!result)
|
||||
{
|
||||
@@ -1087,80 +1072,6 @@ Change::applyEmitFailure()
|
||||
return tesSUCCESS;
|
||||
}
|
||||
|
||||
TER
|
||||
Change::applyExport()
|
||||
{
|
||||
uint256 txnID(ctx_.tx.getFieldH256(sfTransactionHash));
|
||||
do
|
||||
{
|
||||
JLOG(j_.info()) << "HookExport[" << txnID
|
||||
<< "]: ttExport exporting transaction";
|
||||
|
||||
auto key = keylet::exportedTxn(txnID);
|
||||
|
||||
auto const& sle = view().peek(key);
|
||||
|
||||
if (!sle)
|
||||
{
|
||||
// most likely explanation is that this was somehow a double-up, so just ignore
|
||||
JLOG(j_.warn())
|
||||
<< "HookError[" << txnID << "]: ttExport could not find exported txn in ledger";
|
||||
break;
|
||||
}
|
||||
|
||||
if (!view().dirRemove(
|
||||
keylet::exportedDir(),
|
||||
sle->getFieldU64(sfOwnerNode),
|
||||
key,
|
||||
false))
|
||||
{
|
||||
JLOG(j_.fatal()) << "HookError[" << txnID
|
||||
<< "]: ttExport (Change) tefBAD_LEDGER";
|
||||
return tefBAD_LEDGER;
|
||||
}
|
||||
|
||||
view().erase(sle);
|
||||
} while (0);
|
||||
return tesSUCCESS;
|
||||
}
|
||||
|
||||
TER
|
||||
Change::applyExportSign()
|
||||
{
|
||||
uint256 txnID(ctx_.tx.getFieldH256(sfTransactionHash));
|
||||
do
|
||||
{
|
||||
JLOG(j_.info()) << "HookExport[" << txnID
|
||||
<< "]: ttExportSign adding signature to transaction";
|
||||
|
||||
auto key = keylet::exportedTxn(txnID);
|
||||
|
||||
auto const& sle = view().peek(key);
|
||||
|
||||
if (!sle)
|
||||
{
|
||||
// most likely explanation is that this was somehow a double-up, so just ignore
|
||||
JLOG(j_.warn())
|
||||
<< "HookError[" << txnID << "]: ttExportSign could not find exported txn in ledger";
|
||||
break;
|
||||
}
|
||||
|
||||
// grab the signer object off the txn
|
||||
STObject signerObj = const_cast<ripple::STTx&>(ctx_.tx)
|
||||
.getField(sfSigner)
|
||||
.downcast<STObject>();
|
||||
|
||||
// append it to the signers field in the ledger object
|
||||
STArray signers = sle->getFieldArray(sfSigners);
|
||||
signers.push_back(signerObj);
|
||||
sle->setFieldArray(sfSigners, signers);
|
||||
|
||||
// done
|
||||
view().update(sle);
|
||||
} while (0);
|
||||
return tesSUCCESS;
|
||||
}
|
||||
|
||||
TER
|
||||
Change::applyUNLModify()
|
||||
{
|
||||
|
||||
@@ -74,12 +74,6 @@ private:
|
||||
TER
|
||||
applyEmitFailure();
|
||||
|
||||
TER
|
||||
applyExport();
|
||||
|
||||
TER
|
||||
applyExportSign();
|
||||
|
||||
TER
|
||||
applyUNLReport();
|
||||
};
|
||||
|
||||
@@ -37,12 +37,9 @@
|
||||
#include <charconv>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <ripple/app/hook/applyHook.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
static const uint256 shadowTicketNamespace = uint256::fromVoid("RESERVED NAMESPACE SHADOW TICKET");
|
||||
|
||||
TxConsequences
|
||||
Import::makeTxConsequences(PreflightContext const& ctx)
|
||||
{
|
||||
@@ -200,7 +197,7 @@ Import::preflight(PreflightContext const& ctx)
|
||||
if (!stpTrans || !meta)
|
||||
return temMALFORMED;
|
||||
|
||||
if (stpTrans->isFieldPresent(sfTicketSequence) && !ctx.rules.enabled(featureExport))
|
||||
if (stpTrans->isFieldPresent(sfTicketSequence))
|
||||
{
|
||||
JLOG(ctx.j.warn()) << "Import: cannot use TicketSequence XPOP.";
|
||||
return temMALFORMED;
|
||||
@@ -891,26 +888,6 @@ Import::preclaim(PreclaimContext const& ctx)
|
||||
return tefINTERNAL;
|
||||
}
|
||||
|
||||
bool const hasTicket = stpTrans->isFieldPresent(sfTicketSequence);
|
||||
|
||||
if (hasTicket)
|
||||
{
|
||||
if (!ctx.view.rules().enabled(featureExport))
|
||||
return tefINTERNAL;
|
||||
|
||||
auto const acc = stpTrans->getAccountID(sfAccount);
|
||||
uint256 const seq = uint256(stpTrans->getFieldU32(sfTicketSequence));
|
||||
|
||||
// check if there is a shadow ticket, and if not we won't allow
|
||||
// the txn to pass into consensus
|
||||
|
||||
if (!ctx.view.exists(keylet::hookState(acc, seq, shadowTicketNamespace)))
|
||||
{
|
||||
JLOG(ctx.j.warn()) << "Import: attempted to import a txn without shadow ticket.";
|
||||
return telSHADOW_TICKET_REQUIRED; // tel code to avoid consensus/forward without SF_BAD
|
||||
}
|
||||
}
|
||||
|
||||
auto const& sle = ctx.view.read(keylet::account(ctx.tx[sfAccount]));
|
||||
|
||||
auto const tt = stpTrans->getTxnType();
|
||||
@@ -951,17 +928,13 @@ Import::preclaim(PreclaimContext const& ctx)
|
||||
} while (0);
|
||||
}
|
||||
|
||||
if (!hasTicket)
|
||||
if (sle && sle->isFieldPresent(sfImportSequence))
|
||||
{
|
||||
uint32_t sleImportSequence = sle->getFieldU32(sfImportSequence);
|
||||
|
||||
if (sle && sle->isFieldPresent(sfImportSequence))
|
||||
{
|
||||
uint32_t sleImportSequence = sle->getFieldU32(sfImportSequence);
|
||||
|
||||
// replay attempt
|
||||
if (sleImportSequence >= stpTrans->getFieldU32(sfSequence))
|
||||
return tefPAST_IMPORT_SEQ;
|
||||
}
|
||||
// replay attempt
|
||||
if (sleImportSequence >= stpTrans->getFieldU32(sfSequence))
|
||||
return tefPAST_IMPORT_SEQ;
|
||||
}
|
||||
|
||||
// when importing for the first time the fee must be zero
|
||||
@@ -1269,11 +1242,7 @@ Import::doApply()
|
||||
auto const id = ctx_.tx[sfAccount];
|
||||
auto sle = view().peek(keylet::account(id));
|
||||
|
||||
std::optional<uint256> ticket;
|
||||
if (stpTrans->isFieldPresent(sfTicketSequence))
|
||||
ticket = uint256(stpTrans->getFieldU32(sfTicketSequence));
|
||||
|
||||
if (sle && !ticket.has_value() && sle->getFieldU32(sfImportSequence) >= importSequence)
|
||||
if (sle && sle->getFieldU32(sfImportSequence) >= importSequence)
|
||||
{
|
||||
// make double sure import seq hasn't passed
|
||||
JLOG(ctx_.journal.warn()) << "Import: ImportSequence passed";
|
||||
@@ -1366,24 +1335,8 @@ Import::doApply()
|
||||
}
|
||||
}
|
||||
|
||||
if (!ticket.has_value())
|
||||
sle->setFieldU32(sfImportSequence, importSequence);
|
||||
|
||||
sle->setFieldU32(sfImportSequence, importSequence);
|
||||
sle->setFieldAmount(sfBalance, finalBal);
|
||||
|
||||
if (ticket.has_value())
|
||||
{
|
||||
auto sleTicket = view().peek(keylet::hookState(id, *ticket, shadowTicketNamespace));
|
||||
if (!sleTicket)
|
||||
return tefINTERNAL;
|
||||
|
||||
TER result = hook::setHookState(ctx_, id, shadowTicketNamespace, *ticket, {});
|
||||
if (result != tesSUCCESS)
|
||||
return result;
|
||||
|
||||
// RHUPTO: ticketseq billing?
|
||||
}
|
||||
|
||||
|
||||
if (create)
|
||||
{
|
||||
|
||||
@@ -491,8 +491,7 @@ SetHook::validateHookSetEntry(SetHookCtx& ctx, STObject const& hookSetObj)
|
||||
logger,
|
||||
hsacc,
|
||||
(ctx.rules.enabled(featureHooksUpdate1) ? 1 : 0) +
|
||||
(ctx.rules.enabled(fix20250131) ? 2 : 0) +
|
||||
(ctx.rules.enabled(featureExport) ? 4 : 0));
|
||||
(ctx.rules.enabled(fix20250131) ? 2 : 0));
|
||||
|
||||
if (ctx.j.trace())
|
||||
{
|
||||
|
||||
@@ -374,8 +374,6 @@ invoke_calculateBaseFee(ReadView const& view, STTx const& tx)
|
||||
case ttUNL_MODIFY:
|
||||
case ttUNL_REPORT:
|
||||
case ttEMIT_FAILURE:
|
||||
case ttEXPORT_SIGN:
|
||||
case ttEXPORT:
|
||||
return Change::calculateBaseFee(view, tx);
|
||||
case ttNFTOKEN_MINT:
|
||||
return NFTokenMint::calculateBaseFee(view, tx);
|
||||
@@ -546,8 +544,6 @@ invoke_apply(ApplyContext& ctx)
|
||||
case ttFEE:
|
||||
case ttUNL_MODIFY:
|
||||
case ttUNL_REPORT:
|
||||
case ttEXPORT:
|
||||
case ttEXPORT_SIGN:
|
||||
case ttEMIT_FAILURE: {
|
||||
Change p(ctx);
|
||||
return p();
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
|
||||
#include <ripple/core/JobQueue.h>
|
||||
#include <ripple/net/InfoSub.h>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -39,11 +38,9 @@ protected:
|
||||
explicit RPCSub(InfoSub::Source& source);
|
||||
};
|
||||
|
||||
// VFALCO Why is the io_service needed?
|
||||
std::shared_ptr<RPCSub>
|
||||
make_RPCSub(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
|
||||
@@ -451,6 +451,12 @@ public:
|
||||
if (mShutdown)
|
||||
{
|
||||
JLOG(j_.trace()) << "Complete.";
|
||||
|
||||
mResponse.commit(bytes_transferred);
|
||||
std::string strBody{
|
||||
{std::istreambuf_iterator<char>(&mResponse)},
|
||||
std::istreambuf_iterator<char>()};
|
||||
invokeComplete(ecResult, mStatus, mBody + strBody);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -1805,6 +1805,7 @@ rpcClient(
|
||||
}
|
||||
|
||||
{
|
||||
//@@start blocking-request
|
||||
boost::asio::io_service isService;
|
||||
RPCCall::fromNetwork(
|
||||
isService,
|
||||
@@ -1828,6 +1829,7 @@ rpcClient(
|
||||
headers);
|
||||
isService.run(); // This blocks until there are no more
|
||||
// outstanding async calls.
|
||||
//@@end blocking-request
|
||||
}
|
||||
if (jvOutput.isMember("result"))
|
||||
{
|
||||
@@ -1946,6 +1948,7 @@ fromNetwork(
|
||||
// HTTP call?
|
||||
auto constexpr RPC_NOTIFY = 30s;
|
||||
|
||||
//@@start async-request
|
||||
HTTPClient::request(
|
||||
bSSL,
|
||||
io_service,
|
||||
@@ -1970,6 +1973,7 @@ fromNetwork(
|
||||
std::placeholders::_3,
|
||||
j),
|
||||
j);
|
||||
//@@end async-request
|
||||
}
|
||||
|
||||
} // namespace RPCCall
|
||||
|
||||
@@ -33,14 +33,12 @@ class RPCSubImp : public RPCSub
|
||||
public:
|
||||
RPCSubImp(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
std::string const& strPassword,
|
||||
Logs& logs)
|
||||
: RPCSub(source)
|
||||
, m_io_service(io_service)
|
||||
, m_jobQueue(jobQueue)
|
||||
, mUrl(strUrl)
|
||||
, mSSL(false)
|
||||
@@ -78,14 +76,14 @@ public:
|
||||
{
|
||||
std::lock_guard sl(mLock);
|
||||
|
||||
// Wietse: we're not going to limit this, this is admin-port only, scale
|
||||
// accordingly Dropping events just like this results in inconsistent
|
||||
// data on the receiving end if (mDeque.size() >= eventQueueMax)
|
||||
// {
|
||||
// // Drop the previous event.
|
||||
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
|
||||
// mDeque.pop_back();
|
||||
// }
|
||||
if (mDeque.size() >= maxQueueSize)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "RPCCall::fromNetwork drop: queue full (" << mDeque.size()
|
||||
<< "), seq=" << mSeq << ", endpoint=" << mIp;
|
||||
++mSeq;
|
||||
return;
|
||||
}
|
||||
|
||||
auto jm = broadcast ? j_.debug() : j_.info();
|
||||
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
|
||||
@@ -121,48 +119,49 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
// XXX Could probably create a bunch of send jobs in a single get of the
|
||||
// lock.
|
||||
// Maximum concurrent HTTP deliveries per batch. Bounds file
|
||||
// descriptor usage while still allowing parallel delivery to
|
||||
// capable endpoints. With a 1024 FD process limit shared across
|
||||
// peers, clients, and the node store, 32 per subscriber is a
|
||||
// meaningful but survivable chunk even with multiple subscribers.
|
||||
static constexpr int maxInFlight = 32;
|
||||
|
||||
// Maximum queued events before dropping. At ~5-10KB per event
|
||||
// this is ~80-160MB worst case — trivial memory-wise. The real
|
||||
// purpose is detecting a hopelessly behind endpoint: at 100+
|
||||
// events per ledger (every ~4s), 16384 events is ~10 minutes
|
||||
// of buffer. Consumers detect gaps via the seq field.
|
||||
static constexpr std::size_t maxQueueSize = 16384;
|
||||
|
||||
void
|
||||
sendThread()
|
||||
{
|
||||
Json::Value jvEvent;
|
||||
bool bSend;
|
||||
|
||||
do
|
||||
{
|
||||
// Local io_service per batch — cheap to create (just an
|
||||
// internal event queue, no threads, no syscalls). Using a
|
||||
// local io_service is what makes .run() block until exactly
|
||||
// this batch completes, giving us flow control. Same
|
||||
// pattern used by rpcClient() in RPCCall.cpp for CLI
|
||||
// commands.
|
||||
boost::asio::io_service io_service;
|
||||
int dispatched = 0;
|
||||
|
||||
{
|
||||
// Obtain the lock to manipulate the queue and change sending.
|
||||
std::lock_guard sl(mLock);
|
||||
|
||||
if (mDeque.empty())
|
||||
{
|
||||
mSending = false;
|
||||
bSend = false;
|
||||
}
|
||||
else
|
||||
while (!mDeque.empty() && dispatched < maxInFlight)
|
||||
{
|
||||
auto const [seq, env] = mDeque.front();
|
||||
|
||||
mDeque.pop_front();
|
||||
|
||||
jvEvent = env;
|
||||
Json::Value jvEvent = env;
|
||||
jvEvent["seq"] = seq;
|
||||
|
||||
bSend = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Send outside of the lock.
|
||||
if (bSend)
|
||||
{
|
||||
// XXX Might not need this in a try.
|
||||
try
|
||||
{
|
||||
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
|
||||
|
||||
RPCCall::fromNetwork(
|
||||
m_io_service,
|
||||
io_service,
|
||||
mIp,
|
||||
mPort,
|
||||
mUsername,
|
||||
@@ -173,21 +172,38 @@ private:
|
||||
mSSL,
|
||||
true,
|
||||
logs_);
|
||||
++dispatched;
|
||||
}
|
||||
|
||||
if (dispatched == 0)
|
||||
mSending = false;
|
||||
}
|
||||
|
||||
bSend = dispatched > 0;
|
||||
|
||||
if (bSend)
|
||||
{
|
||||
try
|
||||
{
|
||||
JLOG(j_.info())
|
||||
<< "RPCCall::fromNetwork: " << mIp << " dispatching "
|
||||
<< dispatched << " events";
|
||||
io_service.run();
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
JLOG(j_.info())
|
||||
JLOG(j_.warn())
|
||||
<< "RPCCall::fromNetwork exception: " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn()) << "RPCCall::fromNetwork unknown exception";
|
||||
}
|
||||
}
|
||||
} while (bSend);
|
||||
}
|
||||
|
||||
private:
|
||||
// Wietse: we're not going to limit this, this is admin-port only, scale
|
||||
// accordingly enum { eventQueueMax = 32 };
|
||||
|
||||
boost::asio::io_service& m_io_service;
|
||||
JobQueue& m_jobQueue;
|
||||
|
||||
std::string mUrl;
|
||||
@@ -217,7 +233,6 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
|
||||
std::shared_ptr<RPCSub>
|
||||
make_RPCSub(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
@@ -226,7 +241,6 @@ make_RPCSub(
|
||||
{
|
||||
return std::make_shared<RPCSubImp>(
|
||||
std::ref(source),
|
||||
std::ref(io_service),
|
||||
std::ref(jobQueue),
|
||||
strUrl,
|
||||
strUsername,
|
||||
|
||||
@@ -484,44 +484,61 @@ OverlayImpl::start()
|
||||
m_peerFinder->setConfig(config);
|
||||
m_peerFinder->start();
|
||||
|
||||
auto addIps = [&](std::vector<std::string> bootstrapIps) -> void {
|
||||
auto addIps = [this](std::vector<std::string> ips, bool fixed) {
|
||||
beast::Journal const& j = app_.journal("Overlay");
|
||||
for (auto& ip : bootstrapIps)
|
||||
for (auto& ip : ips)
|
||||
{
|
||||
std::size_t pos = ip.find('#');
|
||||
if (pos != std::string::npos)
|
||||
ip.erase(pos);
|
||||
|
||||
JLOG(j.trace()) << "Found boostrap IP: " << ip;
|
||||
JLOG(j.trace())
|
||||
<< "Found " << (fixed ? "fixed" : "bootstrap") << " IP: " << ip;
|
||||
}
|
||||
|
||||
m_resolver.resolve(
|
||||
bootstrapIps,
|
||||
[&](std::string const& name,
|
||||
ips,
|
||||
[this, fixed](
|
||||
std::string const& name,
|
||||
std::vector<beast::IP::Endpoint> const& addresses) {
|
||||
std::vector<std::string> ips;
|
||||
ips.reserve(addresses.size());
|
||||
beast::Journal const& j = app_.journal("Overlay");
|
||||
std::string const base("config: ");
|
||||
|
||||
std::vector<beast::IP::Endpoint> eps;
|
||||
eps.reserve(addresses.size());
|
||||
for (auto const& addr : addresses)
|
||||
{
|
||||
std::string addrStr = addr.port() == 0
|
||||
? to_string(addr.at_port(DEFAULT_PEER_PORT))
|
||||
: to_string(addr);
|
||||
JLOG(j.trace()) << "Parsed boostrap IP: " << addrStr;
|
||||
ips.push_back(addrStr);
|
||||
auto ep = addr.port() == 0 ? addr.at_port(DEFAULT_PEER_PORT)
|
||||
: addr;
|
||||
JLOG(j.trace())
|
||||
<< "Parsed " << (fixed ? "fixed" : "bootstrap")
|
||||
<< " IP: " << ep;
|
||||
eps.push_back(ep);
|
||||
}
|
||||
|
||||
std::string const base("config: ");
|
||||
if (!ips.empty())
|
||||
m_peerFinder->addFallbackStrings(base + name, ips);
|
||||
if (eps.empty())
|
||||
return;
|
||||
|
||||
if (fixed)
|
||||
{
|
||||
m_peerFinder->addFixedPeer(base + name, eps);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::vector<std::string> strs;
|
||||
strs.reserve(eps.size());
|
||||
for (auto const& ep : eps)
|
||||
strs.push_back(to_string(ep));
|
||||
m_peerFinder->addFallbackStrings(base + name, strs);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
if (!app_.config().IPS.empty())
|
||||
addIps(app_.config().IPS);
|
||||
addIps(app_.config().IPS, false);
|
||||
|
||||
if (!app_.config().IPS_FIXED.empty())
|
||||
addIps(app_.config().IPS_FIXED);
|
||||
addIps(app_.config().IPS_FIXED, true);
|
||||
|
||||
auto const timer = std::make_shared<Timer>(*this);
|
||||
std::lock_guard lock(mutex_);
|
||||
|
||||
@@ -74,7 +74,7 @@ namespace detail {
|
||||
// Feature.cpp. Because it's only used to reserve storage, and determine how
|
||||
// large to make the FeatureBitset, it MAY be larger. It MUST NOT be less than
|
||||
// the actual number of amendments. A LogicError on startup will verify this.
|
||||
static constexpr std::size_t numFeatures = 91;
|
||||
static constexpr std::size_t numFeatures = 90;
|
||||
|
||||
/** Amendments that this server supports and the default voting behavior.
|
||||
Whether they are enabled depends on the Rules defined in the validated
|
||||
@@ -378,7 +378,6 @@ extern uint256 const fixInvalidTxFlags;
|
||||
extern uint256 const featureExtendedHookState;
|
||||
extern uint256 const fixCronStacking;
|
||||
extern uint256 const fixHookAPI20251128;
|
||||
extern uint256 const featureExport;
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
|
||||
@@ -56,15 +56,9 @@ namespace keylet {
|
||||
Keylet const&
|
||||
emittedDir() noexcept;
|
||||
|
||||
Keylet const&
|
||||
exportedDir() noexcept;
|
||||
|
||||
Keylet
|
||||
emittedTxn(uint256 const& id) noexcept;
|
||||
|
||||
Keylet
|
||||
exportedTxn(uint256 const& id) noexcept;
|
||||
|
||||
Keylet
|
||||
hookDefinition(uint256 const& hash) noexcept;
|
||||
|
||||
|
||||
@@ -260,8 +260,6 @@ enum LedgerEntryType : std::uint16_t
|
||||
\sa keylet::emitted
|
||||
*/
|
||||
ltEMITTED_TXN = 'E',
|
||||
|
||||
ltEXPORTED_TXN = 0x4578, // Ex (exported transaction)
|
||||
};
|
||||
// clang-format off
|
||||
|
||||
@@ -320,8 +318,7 @@ enum LedgerSpecificFlags {
|
||||
// ltDIR_NODE
|
||||
lsfNFTokenBuyOffers = 0x00000001,
|
||||
lsfNFTokenSellOffers = 0x00000002,
|
||||
lsfEmittedDir = 0x00000004,
|
||||
lsfExportedDir = 0x00000008,
|
||||
lsfEmittedDir = 0x00000004,
|
||||
|
||||
// ltNFTOKEN_OFFER
|
||||
lsfSellNFToken = 0x00000001,
|
||||
|
||||
@@ -355,7 +355,6 @@ extern SF_UINT16 const sfHookEmitCount;
|
||||
extern SF_UINT16 const sfHookExecutionIndex;
|
||||
extern SF_UINT16 const sfHookApiVersion;
|
||||
extern SF_UINT16 const sfHookStateScale;
|
||||
extern SF_UINT16 const sfHookExportCount;
|
||||
|
||||
// 32-bit integers (common)
|
||||
extern SF_UINT32 const sfNetworkID;
|
||||
@@ -596,7 +595,6 @@ extern SField const sfSigner;
|
||||
extern SField const sfMajority;
|
||||
extern SField const sfDisabledValidator;
|
||||
extern SField const sfEmittedTxn;
|
||||
extern SField const sfExportedTxn;
|
||||
extern SField const sfHookExecution;
|
||||
extern SField const sfHookDefinition;
|
||||
extern SField const sfHookParameter;
|
||||
|
||||
@@ -67,7 +67,6 @@ enum TELcodes : TERUnderlyingType {
|
||||
telNON_LOCAL_EMITTED_TXN,
|
||||
telIMPORT_VL_KEY_NOT_RECOGNISED,
|
||||
telCAN_NOT_QUEUE_IMPORT,
|
||||
telSHADOW_TICKET_REQUIRED,
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -149,12 +149,6 @@ enum TxType : std::uint16_t
|
||||
ttURITOKEN_CREATE_SELL_OFFER = 48,
|
||||
ttURITOKEN_CANCEL_SELL_OFFER = 49,
|
||||
|
||||
/* A pseudo-txn containing an exported transaction plus signatures from the validators */
|
||||
ttEXPORT = 90,
|
||||
|
||||
/* A pseudo-txn containing a validator's signature for an export transaction */
|
||||
ttEXPORT_SIGN = 91,
|
||||
|
||||
/* A pseudo-txn alarm signal for invoking a hook, emitted by validators after alarm set conditions are met */
|
||||
ttCRON = 92,
|
||||
|
||||
|
||||
@@ -484,7 +484,6 @@ REGISTER_FIX (fixInvalidTxFlags, Supported::yes, VoteBehavior::De
|
||||
REGISTER_FEATURE(ExtendedHookState, Supported::yes, VoteBehavior::DefaultNo);
|
||||
REGISTER_FIX (fixCronStacking, Supported::yes, VoteBehavior::DefaultYes);
|
||||
REGISTER_FIX (fixHookAPI20251128, Supported::yes, VoteBehavior::DefaultYes);
|
||||
REGISTER_FEATURE(Export, Supported::yes, VoteBehavior::DefaultNo);
|
||||
|
||||
// The following amendments are obsolete, but must remain supported
|
||||
// because they could potentially get enabled.
|
||||
|
||||
@@ -66,8 +66,6 @@ enum class LedgerNameSpace : std::uint16_t {
|
||||
HOOK_DEFINITION = 'D',
|
||||
EMITTED_TXN = 'E',
|
||||
EMITTED_DIR = 'F',
|
||||
EXPORTED_TXN = 0x4578, // Ex
|
||||
EXPORTED_DIR = 0x4564, // Ed
|
||||
NFTOKEN_OFFER = 'q',
|
||||
NFTOKEN_BUY_OFFERS = 'h',
|
||||
NFTOKEN_SELL_OFFERS = 'i',
|
||||
@@ -149,14 +147,6 @@ emittedDir() noexcept
|
||||
return ret;
|
||||
}
|
||||
|
||||
Keylet const&
|
||||
exportedDir() noexcept
|
||||
{
|
||||
static Keylet const ret{
|
||||
ltDIR_NODE, indexHash(LedgerNameSpace::EXPORTED_DIR)};
|
||||
return ret;
|
||||
}
|
||||
|
||||
Keylet
|
||||
hookStateDir(AccountID const& id, uint256 const& ns) noexcept
|
||||
{
|
||||
@@ -169,12 +159,6 @@ emittedTxn(uint256 const& id) noexcept
|
||||
return {ltEMITTED_TXN, indexHash(LedgerNameSpace::EMITTED_TXN, id)};
|
||||
}
|
||||
|
||||
Keylet
|
||||
exportedTxn(uint256 const& id) noexcept
|
||||
{
|
||||
return {ltEXPORTED_TXN, indexHash(LedgerNameSpace::EXPORTED_TXN, id)};
|
||||
}
|
||||
|
||||
Keylet
|
||||
hook(AccountID const& id) noexcept
|
||||
{
|
||||
|
||||
@@ -380,15 +380,6 @@ LedgerFormats::LedgerFormats()
|
||||
{sfPreviousTxnLgrSeq, soeREQUIRED}
|
||||
},
|
||||
commonFields);
|
||||
|
||||
add(jss::ExportedTxn,
|
||||
ltEXPORTED_TXN,
|
||||
{
|
||||
{sfExportedTxn, soeOPTIONAL},
|
||||
{sfOwnerNode, soeREQUIRED},
|
||||
{sfLedgerSequence, soeREQUIRED},
|
||||
},
|
||||
commonFields);
|
||||
|
||||
// clang-format on
|
||||
}
|
||||
|
||||
@@ -103,7 +103,6 @@ CONSTRUCT_TYPED_SFIELD(sfHookEmitCount, "HookEmitCount", UINT16,
|
||||
CONSTRUCT_TYPED_SFIELD(sfHookExecutionIndex, "HookExecutionIndex", UINT16, 19);
|
||||
CONSTRUCT_TYPED_SFIELD(sfHookApiVersion, "HookApiVersion", UINT16, 20);
|
||||
CONSTRUCT_TYPED_SFIELD(sfHookStateScale, "HookStateScale", UINT16, 21);
|
||||
CONSTRUCT_TYPED_SFIELD(sfHookExportCount, "HookExportCount", UINT16, 22);
|
||||
|
||||
// 32-bit integers (common)
|
||||
CONSTRUCT_TYPED_SFIELD(sfNetworkID, "NetworkID", UINT32, 1);
|
||||
@@ -362,7 +361,6 @@ CONSTRUCT_UNTYPED_SFIELD(sfImportVLKey, "ImportVLKey", OBJECT,
|
||||
CONSTRUCT_UNTYPED_SFIELD(sfHookEmission, "HookEmission", OBJECT, 93);
|
||||
CONSTRUCT_UNTYPED_SFIELD(sfMintURIToken, "MintURIToken", OBJECT, 92);
|
||||
CONSTRUCT_UNTYPED_SFIELD(sfAmountEntry, "AmountEntry", OBJECT, 91);
|
||||
CONSTRUCT_UNTYPED_SFIELD(sfExportedTxn, "ExportedTxn", OBJECT, 90);
|
||||
|
||||
// array of objects
|
||||
// ARRAY/1 is reserved for end of array
|
||||
|
||||
@@ -141,7 +141,6 @@ transResults()
|
||||
MAKE_ERROR(telNON_LOCAL_EMITTED_TXN, "Emitted transaction cannot be applied because it was not generated locally."),
|
||||
MAKE_ERROR(telIMPORT_VL_KEY_NOT_RECOGNISED, "Import vl key was not recognized."),
|
||||
MAKE_ERROR(telCAN_NOT_QUEUE_IMPORT, "Import transaction was not able to be directly applied and cannot be queued."),
|
||||
MAKE_ERROR(telSHADOW_TICKET_REQUIRED, "The imported transaction uses a TicketSequence but no shadow ticket exists."),
|
||||
MAKE_ERROR(temMALFORMED, "Malformed transaction."),
|
||||
MAKE_ERROR(temBAD_AMOUNT, "Can only send positive amounts."),
|
||||
MAKE_ERROR(temBAD_CURRENCY, "Malformed: Bad currency."),
|
||||
|
||||
@@ -490,26 +490,6 @@ TxFormats::TxFormats()
|
||||
{sfStartTime, soeOPTIONAL},
|
||||
},
|
||||
commonFields);
|
||||
|
||||
add(jss::ExportSign,
|
||||
ttEXPORT_SIGN,
|
||||
{
|
||||
{sfSigner, soeREQUIRED},
|
||||
{sfLedgerSequence, soeREQUIRED},
|
||||
{sfTransactionHash, soeREQUIRED},
|
||||
},
|
||||
commonFields);
|
||||
|
||||
add(jss::Export,
|
||||
ttEXPORT,
|
||||
{
|
||||
{sfTransactionHash, soeREQUIRED},
|
||||
{sfExportedTxn, soeREQUIRED},
|
||||
{sfSigners, soeREQUIRED},
|
||||
{sfLedgerSequence, soeREQUIRED},
|
||||
},
|
||||
commonFields);
|
||||
|
||||
}
|
||||
|
||||
TxFormats const&
|
||||
|
||||
@@ -140,9 +140,6 @@ JSS(HookState); // ledger type.
|
||||
JSS(HookStateData); // field.
|
||||
JSS(HookStateKey); // field.
|
||||
JSS(EmittedTxn); // ledger type.
|
||||
JSS(ExportedTxn);
|
||||
JSS(Export);
|
||||
JSS(ExportSign);
|
||||
JSS(SignerList); // ledger type.
|
||||
JSS(SignerListSet); // transaction type.
|
||||
JSS(SigningPubKey); // field.
|
||||
|
||||
@@ -76,7 +76,6 @@ doSubscribe(RPC::JsonContext& context)
|
||||
{
|
||||
auto rspSub = make_RPCSub(
|
||||
context.app.getOPs(),
|
||||
context.app.getIOService(),
|
||||
context.app.getJobQueue(),
|
||||
strUrl,
|
||||
strUsername,
|
||||
|
||||
819
src/test/net/HTTPClient_test.cpp
Normal file
819
src/test/net/HTTPClient_test.cpp
Normal file
@@ -0,0 +1,819 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/basics/ByteUtilities.h>
|
||||
#include <ripple/net/HTTPClient.h>
|
||||
#include <test/jtx.h>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
// Minimal TCP server for testing HTTPClient behavior.
|
||||
// Accepts connections and sends configurable HTTP responses.
|
||||
class MockHTTPServer
|
||||
{
|
||||
boost::asio::io_service ios_;
|
||||
std::unique_ptr<boost::asio::io_service::work> work_;
|
||||
boost::asio::ip::tcp::acceptor acceptor_;
|
||||
std::thread thread_;
|
||||
std::atomic<bool> running_{true};
|
||||
unsigned short port_;
|
||||
|
||||
// Metrics
|
||||
std::atomic<int> activeConnections_{0};
|
||||
std::atomic<int> peakConnections_{0};
|
||||
std::atomic<int> totalAccepted_{0};
|
||||
|
||||
// Configurable behavior
|
||||
std::atomic<int> statusCode_{200};
|
||||
std::atomic<int> delayMs_{0};
|
||||
std::atomic<bool> sendResponse_{true};
|
||||
std::atomic<bool> closeImmediately_{false};
|
||||
std::atomic<bool> noContentLength_{false};
|
||||
|
||||
public:
|
||||
MockHTTPServer()
|
||||
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
|
||||
, acceptor_(
|
||||
ios_,
|
||||
boost::asio::ip::tcp::endpoint(
|
||||
boost::asio::ip::address::from_string("127.0.0.1"),
|
||||
0))
|
||||
{
|
||||
port_ = acceptor_.local_endpoint().port();
|
||||
accept();
|
||||
thread_ = std::thread([this] { ios_.run(); });
|
||||
}
|
||||
|
||||
~MockHTTPServer()
|
||||
{
|
||||
running_ = false;
|
||||
work_.reset(); // Allow io_service to stop.
|
||||
boost::system::error_code ec;
|
||||
acceptor_.close(ec);
|
||||
ios_.stop();
|
||||
if (thread_.joinable())
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
unsigned short
|
||||
port() const
|
||||
{
|
||||
return port_;
|
||||
}
|
||||
int
|
||||
activeConnectionCount() const
|
||||
{
|
||||
return activeConnections_;
|
||||
}
|
||||
int
|
||||
peakConnectionCount() const
|
||||
{
|
||||
return peakConnections_;
|
||||
}
|
||||
int
|
||||
totalAcceptedCount() const
|
||||
{
|
||||
return totalAccepted_;
|
||||
}
|
||||
|
||||
void
|
||||
setStatus(int code)
|
||||
{
|
||||
statusCode_ = code;
|
||||
}
|
||||
void
|
||||
setDelay(int ms)
|
||||
{
|
||||
delayMs_ = ms;
|
||||
}
|
||||
void
|
||||
setSendResponse(bool send)
|
||||
{
|
||||
sendResponse_ = send;
|
||||
}
|
||||
void
|
||||
setCloseImmediately(bool close)
|
||||
{
|
||||
closeImmediately_ = close;
|
||||
}
|
||||
void
|
||||
setNoContentLength(bool noContentLength)
|
||||
{
|
||||
noContentLength_ = noContentLength;
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
accept()
|
||||
{
|
||||
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
|
||||
acceptor_.async_accept(*sock, [this, sock](auto ec) {
|
||||
if (!ec && running_)
|
||||
{
|
||||
++totalAccepted_;
|
||||
int current = ++activeConnections_;
|
||||
int prev = peakConnections_.load();
|
||||
while (current > prev &&
|
||||
!peakConnections_.compare_exchange_weak(prev, current))
|
||||
;
|
||||
|
||||
handleConnection(sock);
|
||||
accept();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
handleConnection(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
if (closeImmediately_)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||
sock->close(ec);
|
||||
--activeConnections_;
|
||||
return;
|
||||
}
|
||||
|
||||
auto buf = std::make_shared<boost::asio::streambuf>();
|
||||
boost::asio::async_read_until(
|
||||
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, size_t) {
|
||||
if (ec)
|
||||
{
|
||||
--activeConnections_;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!sendResponse_)
|
||||
{
|
||||
// Hold connection open without responding.
|
||||
// The socket shared_ptr prevents cleanup.
|
||||
// This simulates a server that accepts but
|
||||
// never responds (e.g., overloaded).
|
||||
return;
|
||||
}
|
||||
|
||||
auto delay = delayMs_.load();
|
||||
if (delay > 0)
|
||||
{
|
||||
auto timer =
|
||||
std::make_shared<boost::asio::steady_timer>(ios_);
|
||||
timer->expires_from_now(std::chrono::milliseconds(delay));
|
||||
timer->async_wait(
|
||||
[this, sock, timer](auto) { sendHTTPResponse(sock); });
|
||||
}
|
||||
else
|
||||
{
|
||||
sendHTTPResponse(sock);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
sendHTTPResponse(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
auto body = std::string("{}");
|
||||
std::string header =
|
||||
"HTTP/1.0 " + std::to_string(statusCode_.load()) + " OK\r\n";
|
||||
if (!noContentLength_)
|
||||
header += "Content-Length: " + std::to_string(body.size()) + "\r\n";
|
||||
header += "\r\n";
|
||||
auto response = std::make_shared<std::string>(header + body);
|
||||
|
||||
boost::asio::async_write(
|
||||
*sock,
|
||||
boost::asio::buffer(*response),
|
||||
[this, sock, response](auto, size_t) {
|
||||
boost::system::error_code ec;
|
||||
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||
sock->close(ec);
|
||||
--activeConnections_;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
class HTTPClient_test : public beast::unit_test::suite
|
||||
{
|
||||
// Helper: fire an HTTP request and track completion via atomic counter.
|
||||
void
|
||||
fireRequest(
|
||||
boost::asio::io_service& ios,
|
||||
std::string const& host,
|
||||
unsigned short port,
|
||||
std::atomic<int>& completed,
|
||||
beast::Journal& j,
|
||||
std::chrono::seconds timeout = std::chrono::seconds{5})
|
||||
{
|
||||
HTTPClient::request(
|
||||
false, // no SSL
|
||||
ios,
|
||||
host,
|
||||
port,
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
timeout,
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
void
|
||||
testCleanupAfterSuccess()
|
||||
{
|
||||
testcase("Socket cleanup after successful response");
|
||||
|
||||
// After a successful HTTP request completes, the
|
||||
// HTTPClientImp should be destroyed and its socket
|
||||
// closed promptly — not held until the deadline fires.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(server.totalAcceptedCount() == 1);
|
||||
// After io_service.run() returns, the server should
|
||||
// see zero active connections — socket was released.
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfter500()
|
||||
{
|
||||
testcase("Socket cleanup after HTTP 500");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterConnectionRefused()
|
||||
{
|
||||
testcase("Socket cleanup after connection refused");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
// Bind a port, then close it — guarantees nothing is listening.
|
||||
boost::asio::io_service tmp;
|
||||
boost::asio::ip::tcp::acceptor acc(
|
||||
tmp,
|
||||
boost::asio::ip::tcp::endpoint(
|
||||
boost::asio::ip::address::from_string("127.0.0.1"), 0));
|
||||
auto port = acc.local_endpoint().port();
|
||||
acc.close();
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", port, completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// Callback should still be invoked (with error).
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterTimeout()
|
||||
{
|
||||
testcase("Socket cleanup after timeout");
|
||||
|
||||
// Server accepts but never responds. HTTPClient should
|
||||
// time out, clean up, and invoke the callback.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setSendResponse(false); // accept, read, but never respond
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
// Short timeout to keep the test fast.
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{2});
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// Callback must be invoked even on timeout.
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterServerCloseBeforeResponse()
|
||||
{
|
||||
testcase("Socket cleanup after server closes before response");
|
||||
|
||||
// Server accepts the connection then immediately closes
|
||||
// it without sending anything.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setCloseImmediately(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testEOFCompletionCallsCallback()
|
||||
{
|
||||
testcase("EOF completion invokes callback (handleData bug)");
|
||||
|
||||
// HTTPClientImp::handleData has a code path where
|
||||
// mShutdown == eof results in logging "Complete." but
|
||||
// never calling invokeComplete(). This means:
|
||||
// - The completion callback is never invoked
|
||||
// - The deadline timer is never cancelled
|
||||
// - The socket is held open until the 30s deadline
|
||||
//
|
||||
// This test verifies the callback IS invoked after an
|
||||
// EOF response. If this test fails (completed == 0 after
|
||||
// ios.run()), the handleData EOF bug is confirmed.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{3});
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// If handleData EOF path doesn't call invokeComplete,
|
||||
// the callback won't fire until the deadline (3s) expires,
|
||||
// and even then handleDeadline doesn't invoke mComplete.
|
||||
// The io_service.run() will still return (deadline fires,
|
||||
// handleShutdown runs, all handlers done), but completed
|
||||
// will be 0.
|
||||
if (completed != 1)
|
||||
{
|
||||
log << " BUG CONFIRMED: handleData EOF path does not"
|
||||
<< " call invokeComplete(). Callback was not invoked."
|
||||
<< " Socket held open until deadline." << std::endl;
|
||||
}
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testConcurrentRequestCleanup()
|
||||
{
|
||||
testcase("Concurrent requests all clean up");
|
||||
|
||||
// Fire N requests at once on the same io_service.
|
||||
// All should complete and release their sockets.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
static constexpr int N = 50;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
}
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
// Brief sleep to let server-side shutdown complete.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
|
||||
log << " Completed: " << completed
|
||||
<< ", Peak concurrent: " << server.peakConnectionCount()
|
||||
<< ", Active after: " << server.activeConnectionCount()
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
void
|
||||
testConcurrent500Cleanup()
|
||||
{
|
||||
testcase("Concurrent 500 requests all clean up");
|
||||
|
||||
// Fire N requests that all get 500 responses. Verify
|
||||
// all sockets are released and no FDs leak.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
static constexpr int N = 50;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
}
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testEOFWithoutContentLength()
|
||||
{
|
||||
testcase("EOF without Content-Length (handleData EOF path)");
|
||||
|
||||
// When a server sends a response WITHOUT Content-Length,
|
||||
// HTTPClientImp reads up to maxResponseSize. The server
|
||||
// closes the connection, causing EOF in handleData.
|
||||
//
|
||||
// In handleData, the EOF path (mShutdown == eof) logs
|
||||
// "Complete." but does NOT call invokeComplete(). This
|
||||
// means:
|
||||
// - mComplete (callback) is never invoked
|
||||
// - deadline timer is never cancelled
|
||||
// - socket + object held alive until deadline fires
|
||||
//
|
||||
// This test uses a SHORT deadline to keep it fast. If
|
||||
// the callback IS invoked, ios.run() returns quickly.
|
||||
// If NOT, ios.run() blocks until the deadline (2s).
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
server.setNoContentLength(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{2});
|
||||
ios.run();
|
||||
}
|
||||
auto elapsed = std::chrono::steady_clock::now() - start;
|
||||
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
|
||||
.count();
|
||||
|
||||
if (completed == 0)
|
||||
{
|
||||
log << " BUG CONFIRMED: handleData EOF path does not"
|
||||
<< " call invokeComplete(). Callback never invoked."
|
||||
<< " io_service.run() blocked for " << ms << "ms"
|
||||
<< " (deadline timeout)." << std::endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
log << " Callback invoked in " << ms << "ms." << std::endl;
|
||||
}
|
||||
// This WILL fail if the EOF bug exists — the callback
|
||||
// is only invoked via the deadline timeout path, which
|
||||
// does NOT call mComplete.
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testPersistentIOServiceCleanup()
|
||||
{
|
||||
testcase("Cleanup on persistent io_service (no destructor mask)");
|
||||
|
||||
// Previous tests destroy the io_service after run(),
|
||||
// which releases all pending handlers' shared_ptrs.
|
||||
// This masks leaks. Here we use a PERSISTENT io_service
|
||||
// (with work guard, running on its own thread) and check
|
||||
// that HTTPClientImp objects are destroyed WITHOUT relying
|
||||
// on io_service destruction.
|
||||
//
|
||||
// We track the object's lifetime via the completion
|
||||
// callback — if it fires, the async chain completed
|
||||
// normally. If it doesn't fire within a reasonable time
|
||||
// but the io_service is still running, something is stuck.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
// Persistent io_service — stays alive the whole test.
|
||||
boost::asio::io_service ios;
|
||||
auto work = std::make_unique<boost::asio::io_service::work>(ios);
|
||||
std::thread runner([&ios] { ios.run(); });
|
||||
|
||||
// Fire request on the persistent io_service.
|
||||
HTTPClient::request(
|
||||
false,
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
|
||||
// Wait for completion without destroying io_service.
|
||||
auto deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds{5};
|
||||
while (completed == 0 && std::chrono::steady_clock::now() < deadline)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
|
||||
// Give server-side shutdown a moment.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
|
||||
if (server.activeConnectionCount() != 0)
|
||||
{
|
||||
log << " BUG: Socket still open on persistent"
|
||||
<< " io_service. FD leaked." << std::endl;
|
||||
}
|
||||
|
||||
// Clean shutdown.
|
||||
work.reset();
|
||||
ios.stop();
|
||||
runner.join();
|
||||
}
|
||||
|
||||
void
|
||||
testPersistentIOService500Cleanup()
|
||||
{
|
||||
testcase("500 cleanup on persistent io_service");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
static constexpr int N = 20;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
boost::asio::io_service ios;
|
||||
auto work = std::make_unique<boost::asio::io_service::work>(ios);
|
||||
std::thread runner([&ios] { ios.run(); });
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
HTTPClient::request(
|
||||
false,
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
}
|
||||
|
||||
auto deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds{10};
|
||||
while (completed < N && std::chrono::steady_clock::now() < deadline)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
|
||||
log << " Completed: " << completed << "/" << N
|
||||
<< ", Active connections after: " << server.activeConnectionCount()
|
||||
<< std::endl;
|
||||
|
||||
work.reset();
|
||||
ios.stop();
|
||||
runner.join();
|
||||
}
|
||||
|
||||
void
|
||||
testGetSelfReferenceCleanup()
|
||||
{
|
||||
testcase("get() shared_from_this cycle releases");
|
||||
|
||||
// HTTPClientImp::get() binds shared_from_this() into
|
||||
// mBuild via makeGet. This creates a reference cycle:
|
||||
// object -> mBuild -> shared_ptr<object>
|
||||
// The object can only be destroyed if mBuild is cleared.
|
||||
// Since mBuild is never explicitly cleared, this may be
|
||||
// a permanent FD leak.
|
||||
//
|
||||
// This test fires a GET request and checks whether the
|
||||
// HTTPClientImp is destroyed (and socket closed) after
|
||||
// completion.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
HTTPClient::get(
|
||||
false, // no SSL
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
"/test",
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
// If the get() self-reference cycle leaks, the server
|
||||
// will still show an active connection here (the socket
|
||||
// in the leaked HTTPClientImp is never closed).
|
||||
if (server.activeConnectionCount() != 0)
|
||||
{
|
||||
log << " BUG CONFIRMED: get() self-reference cycle"
|
||||
<< " prevents HTTPClientImp destruction."
|
||||
<< " Socket FD leaked." << std::endl;
|
||||
}
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testCleanupAfterSuccess();
|
||||
testCleanupAfter500();
|
||||
testCleanupAfterConnectionRefused();
|
||||
testCleanupAfterTimeout();
|
||||
testCleanupAfterServerCloseBeforeResponse();
|
||||
testEOFCompletionCallsCallback();
|
||||
testConcurrentRequestCleanup();
|
||||
testConcurrent500Cleanup();
|
||||
testEOFWithoutContentLength();
|
||||
testPersistentIOServiceCleanup();
|
||||
testPersistentIOService500Cleanup();
|
||||
testGetSelfReferenceCleanup();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(HTTPClient, net, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
Reference in New Issue
Block a user