mirror of
https://github.com/Xahau/xahaud.git
synced 2026-04-05 11:22:25 +00:00
Compare commits
7 Commits
hook-helpe
...
subscripti
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7241fed6d0 | ||
|
|
a6dfb40413 | ||
|
|
3e0d6b9cd2 | ||
|
|
a8388e48a4 | ||
|
|
49908096d5 | ||
|
|
12e1afb694 | ||
|
|
c355ad9971 |
15
.github/actions/xahau-ga-dependencies/action.yml
vendored
15
.github/actions/xahau-ga-dependencies/action.yml
vendored
@@ -134,10 +134,17 @@ runs:
|
||||
- name: Export custom recipes
|
||||
shell: bash
|
||||
run: |
|
||||
conan export external/snappy --version 1.1.10 --user xahaud --channel stable
|
||||
conan export external/soci --version 4.0.3 --user xahaud --channel stable
|
||||
conan export external/wasmedge --version 0.11.2 --user xahaud --channel stable
|
||||
|
||||
# Export snappy if not already exported
|
||||
conan list snappy/1.1.10@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
|
||||
conan export external/snappy --version 1.1.10 --user xahaud --channel stable
|
||||
|
||||
# Export soci if not already exported
|
||||
conan list soci/4.0.3@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
|
||||
conan export external/soci --version 4.0.3 --user xahaud --channel stable
|
||||
|
||||
# Export wasmedge if not already exported
|
||||
conan list wasmedge/0.11.2@xahaud/stable 2>/dev/null | (grep -q "not found" && exit 1 || exit 0) || \
|
||||
conan export external/wasmedge --version 0.11.2 --user xahaud --channel stable
|
||||
- name: Install dependencies
|
||||
shell: bash
|
||||
env:
|
||||
|
||||
12
.github/workflows/xahau-ga-macos.yml
vendored
12
.github/workflows/xahau-ga-macos.yml
vendored
@@ -43,14 +43,22 @@ jobs:
|
||||
# To isolate environments for each Runner, instead of installing globally with brew,
|
||||
# use mise to isolate environments for each Runner directory.
|
||||
- name: Setup toolchain (mise)
|
||||
uses: jdx/mise-action@v2
|
||||
uses: jdx/mise-action@v3.6.1
|
||||
with:
|
||||
cache: false
|
||||
install: true
|
||||
mise_toml: |
|
||||
[tools]
|
||||
cmake = "3.23.1"
|
||||
python = "3.12"
|
||||
pipx = "latest"
|
||||
conan = "2"
|
||||
ninja = "latest"
|
||||
ccache = "latest"
|
||||
|
||||
- name: Install tools via mise
|
||||
run: |
|
||||
mise install
|
||||
mise use cmake@3.23.1 python@3.12 pipx@latest conan@2 ninja@latest ccache@latest
|
||||
mise reshim
|
||||
echo "$HOME/.local/share/mise/shims" >> "$GITHUB_PATH"
|
||||
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -121,5 +121,3 @@ CMakeUserPresets.json
|
||||
bld.rippled/
|
||||
|
||||
generated
|
||||
guard_checker
|
||||
guard_checker.dSYM
|
||||
|
||||
@@ -957,6 +957,7 @@ if (tests)
|
||||
subdir: net
|
||||
#]===============================]
|
||||
src/test/net/DatabaseDownloader_test.cpp
|
||||
src/test/net/HTTPClient_test.cpp
|
||||
#[===============================[
|
||||
test sources:
|
||||
subdir: nodestore
|
||||
|
||||
@@ -149,6 +149,7 @@ test.ledger > ripple.ledger
|
||||
test.ledger > ripple.protocol
|
||||
test.ledger > test.jtx
|
||||
test.ledger > test.toplevel
|
||||
test.net > ripple.basics
|
||||
test.net > ripple.net
|
||||
test.net > test.jtx
|
||||
test.net > test.toplevel
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <ostream>
|
||||
#include <set>
|
||||
#include <stack>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
@@ -283,8 +282,7 @@ check_guard(
|
||||
* might have unforeseen consequences, without also rolling back further
|
||||
* changes that are fine.
|
||||
*/
|
||||
uint64_t rulesVersion = 0,
|
||||
std::set<int>* out_callees = nullptr
|
||||
uint64_t rulesVersion = 0
|
||||
|
||||
)
|
||||
{
|
||||
@@ -494,27 +492,17 @@ check_guard(
|
||||
{
|
||||
REQUIRE(1);
|
||||
uint64_t callee_idx = LEB();
|
||||
|
||||
// record user-defined function calls if tracking is enabled
|
||||
// disallow calling of user defined functions inside a hook
|
||||
if (callee_idx > last_import_idx)
|
||||
{
|
||||
if (out_callees != nullptr)
|
||||
{
|
||||
// record the callee for call graph analysis
|
||||
out_callees->insert(callee_idx);
|
||||
}
|
||||
else
|
||||
{
|
||||
// if not tracking, maintain original behavior: reject
|
||||
GUARDLOG(hook::log::CALL_ILLEGAL)
|
||||
<< "GuardCheck "
|
||||
<< "Hook calls a function outside of the whitelisted "
|
||||
"imports "
|
||||
<< "codesec: " << codesec << " hook byte offset: " << i
|
||||
<< "\n";
|
||||
GUARDLOG(hook::log::CALL_ILLEGAL)
|
||||
<< "GuardCheck "
|
||||
<< "Hook calls a function outside of the whitelisted "
|
||||
"imports "
|
||||
<< "codesec: " << codesec << " hook byte offset: " << i
|
||||
<< "\n";
|
||||
|
||||
return {};
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
// enforce guard call limit
|
||||
@@ -849,42 +837,6 @@ validateGuards(
|
||||
*/
|
||||
uint64_t rulesVersion = 0)
|
||||
{
|
||||
// Structure to track function call graph information
|
||||
struct FunctionInfo
|
||||
{
|
||||
int func_idx;
|
||||
std::set<int> callees; // functions this function calls
|
||||
std::set<int> callers; // functions that call this function
|
||||
bool has_loops; // whether this function contains loops
|
||||
uint64_t local_wce; // local worst-case execution count
|
||||
uint64_t total_wce; // total WCE including callees
|
||||
bool wce_calculated; // whether total_wce has been computed
|
||||
bool in_calculation; // for cycle detection in WCE calculation
|
||||
|
||||
FunctionInfo()
|
||||
: func_idx(-1)
|
||||
, has_loops(false)
|
||||
, local_wce(0)
|
||||
, total_wce(0)
|
||||
, wce_calculated(false)
|
||||
, in_calculation(false)
|
||||
{
|
||||
}
|
||||
|
||||
FunctionInfo(int idx, uint64_t local_wce_val, bool has_loops_val)
|
||||
: func_idx(idx)
|
||||
, has_loops(has_loops_val)
|
||||
, local_wce(local_wce_val)
|
||||
, total_wce(0)
|
||||
, wce_calculated(false)
|
||||
, in_calculation(false)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
// Call graph: maps function index to its information
|
||||
std::map<int, FunctionInfo> call_graph;
|
||||
|
||||
uint64_t byteCount = wasm.size();
|
||||
|
||||
// 63 bytes is the smallest possible valid hook wasm
|
||||
@@ -1224,12 +1176,6 @@ validateGuards(
|
||||
if (DEBUG_GUARD)
|
||||
printf("Function map: func %d -> type %d\n", j, type_idx);
|
||||
func_type_map[j] = type_idx;
|
||||
|
||||
// Step 4: Initialize FunctionInfo for each user-defined
|
||||
// function func_idx starts from last_import_number + 1
|
||||
int actual_func_idx = last_import_number + 1 + j;
|
||||
call_graph[actual_func_idx] = FunctionInfo();
|
||||
call_graph[actual_func_idx].func_idx = actual_func_idx;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1271,6 +1217,9 @@ validateGuards(
|
||||
return {};
|
||||
}
|
||||
|
||||
int64_t maxInstrCountHook = 0;
|
||||
int64_t maxInstrCountCbak = 0;
|
||||
|
||||
// second pass... where we check all the guard function calls follow the
|
||||
// guard rules minimal other validation in this pass because first pass
|
||||
// caught most of it
|
||||
@@ -1304,7 +1253,6 @@ validateGuards(
|
||||
std::optional<
|
||||
std::reference_wrapper<std::vector<uint8_t> const>>
|
||||
first_signature;
|
||||
bool helper_function = false;
|
||||
if (auto const& usage = import_type_map.find(j);
|
||||
usage != import_type_map.end())
|
||||
{
|
||||
@@ -1340,7 +1288,7 @@ validateGuards(
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (j == hook_type_idx) // hook() or cbak() function type
|
||||
else if (j == hook_type_idx)
|
||||
{
|
||||
// pass
|
||||
}
|
||||
@@ -1353,8 +1301,7 @@ validateGuards(
|
||||
<< "Codesec: " << section_type << " "
|
||||
<< "Local: " << j << " "
|
||||
<< "Offset: " << i << "\n";
|
||||
// return {};
|
||||
helper_function = true;
|
||||
return {};
|
||||
}
|
||||
|
||||
int param_count = parseLeb128(wasm, i, &i);
|
||||
@@ -1371,19 +1318,12 @@ validateGuards(
|
||||
return {};
|
||||
}
|
||||
}
|
||||
else if (helper_function)
|
||||
{
|
||||
// pass
|
||||
}
|
||||
else if (param_count != (*first_signature).get().size() - 1)
|
||||
{
|
||||
GUARDLOG(hook::log::FUNC_TYPE_INVALID)
|
||||
<< "Malformed transaction. "
|
||||
<< "Hook API: " << *first_name
|
||||
<< " has the wrong number of parameters.\n"
|
||||
<< "param_count: " << param_count << " "
|
||||
<< "first_signature: "
|
||||
<< (*first_signature).get().size() - 1 << "\n";
|
||||
<< " has the wrong number of parameters.\n";
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -1430,10 +1370,6 @@ validateGuards(
|
||||
return {};
|
||||
}
|
||||
}
|
||||
else if (helper_function)
|
||||
{
|
||||
// pass
|
||||
}
|
||||
else if ((*first_signature).get()[k + 1] != param_type)
|
||||
{
|
||||
GUARDLOG(hook::log::FUNC_PARAM_INVALID)
|
||||
@@ -1510,10 +1446,6 @@ validateGuards(
|
||||
return {};
|
||||
}
|
||||
}
|
||||
else if (helper_function)
|
||||
{
|
||||
// pass
|
||||
}
|
||||
else if ((*first_signature).get()[0] != result_type)
|
||||
{
|
||||
GUARDLOG(hook::log::FUNC_RETURN_INVALID)
|
||||
@@ -1565,17 +1497,6 @@ validateGuards(
|
||||
// execution to here means we are up to the actual expr for the
|
||||
// codesec/function
|
||||
|
||||
// Step 5: Calculate actual function index and prepare callees
|
||||
// tracking
|
||||
int actual_func_idx = last_import_number + 1 + j;
|
||||
std::set<int>* out_callees_ptr = nullptr;
|
||||
|
||||
// Only track callees if this function is in the call_graph
|
||||
if (call_graph.find(actual_func_idx) != call_graph.end())
|
||||
{
|
||||
out_callees_ptr = &call_graph[actual_func_idx].callees;
|
||||
}
|
||||
|
||||
auto valid = check_guard(
|
||||
wasm,
|
||||
j,
|
||||
@@ -1585,188 +1506,33 @@ validateGuards(
|
||||
last_import_number,
|
||||
guardLog,
|
||||
guardLogAccStr,
|
||||
rulesVersion,
|
||||
out_callees_ptr);
|
||||
rulesVersion);
|
||||
|
||||
if (!valid)
|
||||
return {};
|
||||
|
||||
// Step 5: Store local WCE and build bidirectional call
|
||||
// relationships
|
||||
if (call_graph.find(actual_func_idx) != call_graph.end())
|
||||
if (hook_func_idx && *hook_func_idx == j)
|
||||
maxInstrCountHook = *valid;
|
||||
else if (cbak_func_idx && *cbak_func_idx == j)
|
||||
maxInstrCountCbak = *valid;
|
||||
else
|
||||
{
|
||||
call_graph[actual_func_idx].local_wce = *valid;
|
||||
|
||||
// Build bidirectional relationships: for each callee, add
|
||||
// this function as a caller
|
||||
for (int callee_idx : call_graph[actual_func_idx].callees)
|
||||
{
|
||||
if (call_graph.find(callee_idx) != call_graph.end())
|
||||
{
|
||||
call_graph[callee_idx].callers.insert(
|
||||
actual_func_idx);
|
||||
}
|
||||
}
|
||||
if (DEBUG_GUARD)
|
||||
printf(
|
||||
"code section: %d not hook_func_idx: %d or "
|
||||
"cbak_func_idx: %d\n",
|
||||
j,
|
||||
*hook_func_idx,
|
||||
(cbak_func_idx ? *cbak_func_idx : -1));
|
||||
// assert(false);
|
||||
}
|
||||
|
||||
// Note: We will calculate total WCE later after processing all
|
||||
// functions
|
||||
i = code_end;
|
||||
}
|
||||
}
|
||||
i = next_section;
|
||||
}
|
||||
|
||||
// Step 6: Cycle detection using DFS
|
||||
// Lambda function for DFS-based cycle detection
|
||||
std::set<int> visited;
|
||||
std::set<int> rec_stack;
|
||||
std::function<bool(int)> detect_cycles_dfs = [&](int func_idx) -> bool {
|
||||
if (rec_stack.find(func_idx) != rec_stack.end())
|
||||
{
|
||||
// Found a cycle: func_idx is already in the recursion stack
|
||||
return true;
|
||||
}
|
||||
// execution to here means guards are installed correctly
|
||||
|
||||
if (visited.find(func_idx) != visited.end())
|
||||
{
|
||||
// Already visited and no cycle found from this node
|
||||
return false;
|
||||
}
|
||||
|
||||
visited.insert(func_idx);
|
||||
rec_stack.insert(func_idx);
|
||||
|
||||
// Check all callees
|
||||
if (call_graph.find(func_idx) != call_graph.end())
|
||||
{
|
||||
for (int callee_idx : call_graph[func_idx].callees)
|
||||
{
|
||||
if (detect_cycles_dfs(callee_idx))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rec_stack.erase(func_idx);
|
||||
return false;
|
||||
};
|
||||
|
||||
// Run cycle detection on all user-defined functions
|
||||
for (const auto& [func_idx, func_info] : call_graph)
|
||||
{
|
||||
if (detect_cycles_dfs(func_idx))
|
||||
{
|
||||
GUARDLOG(hook::log::CALL_ILLEGAL)
|
||||
<< "GuardCheck: Recursive function calls detected. "
|
||||
<< "Hooks cannot contain recursive or mutually recursive "
|
||||
"functions.\n";
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
// Step 7: Calculate total WCE for each function using bottom-up approach
|
||||
// Lambda function for recursive WCE calculation with memoization
|
||||
std::function<uint64_t(int)> calculate_function_wce =
|
||||
[&](int func_idx) -> uint64_t {
|
||||
// Check if function exists in call graph
|
||||
if (call_graph.find(func_idx) == call_graph.end())
|
||||
{
|
||||
// This is an imported function, WCE = 0 (already accounted for)
|
||||
return 0;
|
||||
}
|
||||
|
||||
FunctionInfo& func_info = call_graph[func_idx];
|
||||
|
||||
// If already calculated, return cached result
|
||||
if (func_info.wce_calculated)
|
||||
{
|
||||
return func_info.total_wce;
|
||||
}
|
||||
|
||||
// Detect circular dependency in WCE calculation (should not happen
|
||||
// after cycle detection)
|
||||
if (func_info.in_calculation)
|
||||
{
|
||||
GUARDLOG(hook::log::CALL_ILLEGAL)
|
||||
<< "GuardCheck: Internal error - circular dependency detected "
|
||||
"during WCE calculation.\n";
|
||||
return 0xFFFFFFFFU; // Return large value to trigger overflow error
|
||||
}
|
||||
|
||||
func_info.in_calculation = true;
|
||||
|
||||
// Start with local WCE
|
||||
uint64_t total = func_info.local_wce;
|
||||
|
||||
// Add WCE of all callees
|
||||
for (int callee_idx : func_info.callees)
|
||||
{
|
||||
uint64_t callee_wce = calculate_function_wce(callee_idx);
|
||||
|
||||
// Check for overflow
|
||||
if (total > 0xFFFFU || callee_wce > 0xFFFFU ||
|
||||
(total + callee_wce) > 0xFFFFU)
|
||||
{
|
||||
func_info.in_calculation = false;
|
||||
return 0xFFFFFFFFU; // Signal overflow
|
||||
}
|
||||
|
||||
total += callee_wce;
|
||||
}
|
||||
|
||||
func_info.total_wce = total;
|
||||
func_info.wce_calculated = true;
|
||||
func_info.in_calculation = false;
|
||||
|
||||
return total;
|
||||
};
|
||||
|
||||
// Calculate WCE for hook and cbak functions
|
||||
int64_t hook_wce_actual = 0;
|
||||
int64_t cbak_wce_actual = 0;
|
||||
|
||||
if (hook_func_idx)
|
||||
{
|
||||
int actual_hook_idx = last_import_number + 1 + *hook_func_idx;
|
||||
hook_wce_actual = calculate_function_wce(actual_hook_idx);
|
||||
|
||||
if (hook_wce_actual >= 0xFFFFU)
|
||||
{
|
||||
GUARDLOG(hook::log::INSTRUCTION_EXCESS)
|
||||
<< "GuardCheck: hook() function exceeds maximum instruction "
|
||||
"count (65535). "
|
||||
<< "Total WCE including called functions: " << hook_wce_actual
|
||||
<< "\n";
|
||||
return {};
|
||||
}
|
||||
|
||||
if (DEBUG_GUARD)
|
||||
printf("hook() total WCE: %ld\n", hook_wce_actual);
|
||||
}
|
||||
|
||||
if (cbak_func_idx)
|
||||
{
|
||||
int actual_cbak_idx = last_import_number + 1 + *cbak_func_idx;
|
||||
cbak_wce_actual = calculate_function_wce(actual_cbak_idx);
|
||||
|
||||
if (cbak_wce_actual >= 0xFFFFU)
|
||||
{
|
||||
GUARDLOG(hook::log::INSTRUCTION_EXCESS)
|
||||
<< "GuardCheck: cbak() function exceeds maximum instruction "
|
||||
"count (65535). "
|
||||
<< "Total WCE including called functions: " << cbak_wce_actual
|
||||
<< "\n";
|
||||
return {};
|
||||
}
|
||||
|
||||
if (DEBUG_GUARD)
|
||||
printf("cbak() total WCE: %ld\n", cbak_wce_actual);
|
||||
}
|
||||
|
||||
// execution to here means guards are installed correctly and WCE is within
|
||||
// limits
|
||||
|
||||
return std::pair<uint64_t, uint64_t>{hook_wce_actual, cbak_wce_actual};
|
||||
return std::pair<uint64_t, uint64_t>{maxInstrCountHook, maxInstrCountCbak};
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
|
||||
#include <ripple/core/JobQueue.h>
|
||||
#include <ripple/net/InfoSub.h>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -39,11 +38,9 @@ protected:
|
||||
explicit RPCSub(InfoSub::Source& source);
|
||||
};
|
||||
|
||||
// VFALCO Why is the io_service needed?
|
||||
std::shared_ptr<RPCSub>
|
||||
make_RPCSub(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
|
||||
@@ -451,6 +451,12 @@ public:
|
||||
if (mShutdown)
|
||||
{
|
||||
JLOG(j_.trace()) << "Complete.";
|
||||
|
||||
mResponse.commit(bytes_transferred);
|
||||
std::string strBody{
|
||||
{std::istreambuf_iterator<char>(&mResponse)},
|
||||
std::istreambuf_iterator<char>()};
|
||||
invokeComplete(ecResult, mStatus, mBody + strBody);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -1805,6 +1805,7 @@ rpcClient(
|
||||
}
|
||||
|
||||
{
|
||||
//@@start blocking-request
|
||||
boost::asio::io_service isService;
|
||||
RPCCall::fromNetwork(
|
||||
isService,
|
||||
@@ -1828,6 +1829,7 @@ rpcClient(
|
||||
headers);
|
||||
isService.run(); // This blocks until there are no more
|
||||
// outstanding async calls.
|
||||
//@@end blocking-request
|
||||
}
|
||||
if (jvOutput.isMember("result"))
|
||||
{
|
||||
@@ -1946,6 +1948,7 @@ fromNetwork(
|
||||
// HTTP call?
|
||||
auto constexpr RPC_NOTIFY = 30s;
|
||||
|
||||
//@@start async-request
|
||||
HTTPClient::request(
|
||||
bSSL,
|
||||
io_service,
|
||||
@@ -1970,6 +1973,7 @@ fromNetwork(
|
||||
std::placeholders::_3,
|
||||
j),
|
||||
j);
|
||||
//@@end async-request
|
||||
}
|
||||
|
||||
} // namespace RPCCall
|
||||
|
||||
@@ -33,14 +33,12 @@ class RPCSubImp : public RPCSub
|
||||
public:
|
||||
RPCSubImp(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
std::string const& strPassword,
|
||||
Logs& logs)
|
||||
: RPCSub(source)
|
||||
, m_io_service(io_service)
|
||||
, m_jobQueue(jobQueue)
|
||||
, mUrl(strUrl)
|
||||
, mSSL(false)
|
||||
@@ -78,14 +76,14 @@ public:
|
||||
{
|
||||
std::lock_guard sl(mLock);
|
||||
|
||||
// Wietse: we're not going to limit this, this is admin-port only, scale
|
||||
// accordingly Dropping events just like this results in inconsistent
|
||||
// data on the receiving end if (mDeque.size() >= eventQueueMax)
|
||||
// {
|
||||
// // Drop the previous event.
|
||||
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
|
||||
// mDeque.pop_back();
|
||||
// }
|
||||
if (mDeque.size() >= maxQueueSize)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "RPCCall::fromNetwork drop: queue full (" << mDeque.size()
|
||||
<< "), seq=" << mSeq << ", endpoint=" << mIp;
|
||||
++mSeq;
|
||||
return;
|
||||
}
|
||||
|
||||
auto jm = broadcast ? j_.debug() : j_.info();
|
||||
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
|
||||
@@ -121,48 +119,49 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
// XXX Could probably create a bunch of send jobs in a single get of the
|
||||
// lock.
|
||||
// Maximum concurrent HTTP deliveries per batch. Bounds file
|
||||
// descriptor usage while still allowing parallel delivery to
|
||||
// capable endpoints. With a 1024 FD process limit shared across
|
||||
// peers, clients, and the node store, 32 per subscriber is a
|
||||
// meaningful but survivable chunk even with multiple subscribers.
|
||||
static constexpr int maxInFlight = 32;
|
||||
|
||||
// Maximum queued events before dropping. At ~5-10KB per event
|
||||
// this is ~80-160MB worst case — trivial memory-wise. The real
|
||||
// purpose is detecting a hopelessly behind endpoint: at 100+
|
||||
// events per ledger (every ~4s), 16384 events is ~10 minutes
|
||||
// of buffer. Consumers detect gaps via the seq field.
|
||||
static constexpr std::size_t maxQueueSize = 16384;
|
||||
|
||||
void
|
||||
sendThread()
|
||||
{
|
||||
Json::Value jvEvent;
|
||||
bool bSend;
|
||||
|
||||
do
|
||||
{
|
||||
// Local io_service per batch — cheap to create (just an
|
||||
// internal event queue, no threads, no syscalls). Using a
|
||||
// local io_service is what makes .run() block until exactly
|
||||
// this batch completes, giving us flow control. Same
|
||||
// pattern used by rpcClient() in RPCCall.cpp for CLI
|
||||
// commands.
|
||||
boost::asio::io_service io_service;
|
||||
int dispatched = 0;
|
||||
|
||||
{
|
||||
// Obtain the lock to manipulate the queue and change sending.
|
||||
std::lock_guard sl(mLock);
|
||||
|
||||
if (mDeque.empty())
|
||||
{
|
||||
mSending = false;
|
||||
bSend = false;
|
||||
}
|
||||
else
|
||||
while (!mDeque.empty() && dispatched < maxInFlight)
|
||||
{
|
||||
auto const [seq, env] = mDeque.front();
|
||||
|
||||
mDeque.pop_front();
|
||||
|
||||
jvEvent = env;
|
||||
Json::Value jvEvent = env;
|
||||
jvEvent["seq"] = seq;
|
||||
|
||||
bSend = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Send outside of the lock.
|
||||
if (bSend)
|
||||
{
|
||||
// XXX Might not need this in a try.
|
||||
try
|
||||
{
|
||||
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
|
||||
|
||||
RPCCall::fromNetwork(
|
||||
m_io_service,
|
||||
io_service,
|
||||
mIp,
|
||||
mPort,
|
||||
mUsername,
|
||||
@@ -173,21 +172,38 @@ private:
|
||||
mSSL,
|
||||
true,
|
||||
logs_);
|
||||
++dispatched;
|
||||
}
|
||||
|
||||
if (dispatched == 0)
|
||||
mSending = false;
|
||||
}
|
||||
|
||||
bSend = dispatched > 0;
|
||||
|
||||
if (bSend)
|
||||
{
|
||||
try
|
||||
{
|
||||
JLOG(j_.info())
|
||||
<< "RPCCall::fromNetwork: " << mIp << " dispatching "
|
||||
<< dispatched << " events";
|
||||
io_service.run();
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
JLOG(j_.info())
|
||||
JLOG(j_.warn())
|
||||
<< "RPCCall::fromNetwork exception: " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn()) << "RPCCall::fromNetwork unknown exception";
|
||||
}
|
||||
}
|
||||
} while (bSend);
|
||||
}
|
||||
|
||||
private:
|
||||
// Wietse: we're not going to limit this, this is admin-port only, scale
|
||||
// accordingly enum { eventQueueMax = 32 };
|
||||
|
||||
boost::asio::io_service& m_io_service;
|
||||
JobQueue& m_jobQueue;
|
||||
|
||||
std::string mUrl;
|
||||
@@ -217,7 +233,6 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
|
||||
std::shared_ptr<RPCSub>
|
||||
make_RPCSub(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
@@ -226,7 +241,6 @@ make_RPCSub(
|
||||
{
|
||||
return std::make_shared<RPCSubImp>(
|
||||
std::ref(source),
|
||||
std::ref(io_service),
|
||||
std::ref(jobQueue),
|
||||
strUrl,
|
||||
strUsername,
|
||||
|
||||
@@ -76,7 +76,6 @@ doSubscribe(RPC::JsonContext& context)
|
||||
{
|
||||
auto rspSub = make_RPCSub(
|
||||
context.app.getOPs(),
|
||||
context.app.getIOService(),
|
||||
context.app.getJobQueue(),
|
||||
strUrl,
|
||||
strUsername,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -58,21 +58,8 @@ cat $INPUT_FILE | tr '\n' '\f' |
|
||||
then
|
||||
echo '#include "api.h"' > "$WASM_DIR/test-$COUNTER-gen.c"
|
||||
tr '\f' '\n' <<< $line >> "$WASM_DIR/test-$COUNTER-gen.c"
|
||||
DECLARED="`tr '\f' '\n' <<< $line \
|
||||
| grep -E '(extern|static|define) ' \
|
||||
| grep -Eo '[a-z\-\_]+ *\(' \
|
||||
| grep -v 'sizeof' \
|
||||
| sed -E 's/[^a-z\-\_]//g' \
|
||||
| grep -vE '^__attribute__$' \
|
||||
| sort | uniq`"
|
||||
|
||||
USED="`tr '\f' '\n' <<< $line \
|
||||
| grep -vE '(extern|static|define) ' \
|
||||
| grep -Eo '[a-z\-\_]+\(' \
|
||||
| grep -v 'sizeof' \
|
||||
| sed -E 's/[^a-z\-\_]//g' \
|
||||
| grep -vE '^(__attribute__|hook|cbak)$' \
|
||||
| sort | uniq`"
|
||||
DECLARED="`tr '\f' '\n' <<< $line | grep -E '(extern|define) ' | grep -Eo '[a-z\-\_]+ *\(' | grep -v 'sizeof' | sed -E 's/[^a-z\-\_]//g' | sort | uniq`"
|
||||
USED="`tr '\f' '\n' <<< $line | grep -vE '(extern|define) ' | grep -Eo '[a-z\-\_]+\(' | grep -v 'sizeof' | sed -E 's/[^a-z\-\_]//g' | grep -vE '^(hook|cbak)' | sort | uniq`"
|
||||
ONCE="`echo $DECLARED $USED | tr ' ' '\n' | sort | uniq -c | grep '1 ' | sed -E 's/^ *1 //g'`"
|
||||
FILTER="`echo $DECLARED | tr ' ' '|' | sed -E 's/\|$//g'`"
|
||||
UNDECL="`echo $ONCE | grep -v -E $FILTER 2>/dev/null || echo ''`"
|
||||
@@ -82,7 +69,7 @@ cat $INPUT_FILE | tr '\n' '\f' |
|
||||
echo "$line"
|
||||
exit 1
|
||||
fi
|
||||
wasmcc -x c /dev/stdin -o /dev/stdout -O2 -Wl,--allow-undefined,--export=hook,--export=cbak <<< "`tr '\f' '\n' <<< $line`" |
|
||||
wasmcc -x c /dev/stdin -o /dev/stdout -O2 -Wl,--allow-undefined <<< "`tr '\f' '\n' <<< $line`" |
|
||||
hook-cleaner - - 2>/dev/null |
|
||||
xxd -p -u -c 10 |
|
||||
sed -E 's/../0x&U,/g' | sed -E 's/^/ /g' >> $OUTPUT_FILE
|
||||
|
||||
819
src/test/net/HTTPClient_test.cpp
Normal file
819
src/test/net/HTTPClient_test.cpp
Normal file
@@ -0,0 +1,819 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/basics/ByteUtilities.h>
|
||||
#include <ripple/net/HTTPClient.h>
|
||||
#include <test/jtx.h>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
// Minimal TCP server for testing HTTPClient behavior.
|
||||
// Accepts connections and sends configurable HTTP responses.
|
||||
class MockHTTPServer
|
||||
{
|
||||
boost::asio::io_service ios_;
|
||||
std::unique_ptr<boost::asio::io_service::work> work_;
|
||||
boost::asio::ip::tcp::acceptor acceptor_;
|
||||
std::thread thread_;
|
||||
std::atomic<bool> running_{true};
|
||||
unsigned short port_;
|
||||
|
||||
// Metrics
|
||||
std::atomic<int> activeConnections_{0};
|
||||
std::atomic<int> peakConnections_{0};
|
||||
std::atomic<int> totalAccepted_{0};
|
||||
|
||||
// Configurable behavior
|
||||
std::atomic<int> statusCode_{200};
|
||||
std::atomic<int> delayMs_{0};
|
||||
std::atomic<bool> sendResponse_{true};
|
||||
std::atomic<bool> closeImmediately_{false};
|
||||
std::atomic<bool> noContentLength_{false};
|
||||
|
||||
public:
|
||||
MockHTTPServer()
|
||||
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
|
||||
, acceptor_(
|
||||
ios_,
|
||||
boost::asio::ip::tcp::endpoint(
|
||||
boost::asio::ip::address::from_string("127.0.0.1"),
|
||||
0))
|
||||
{
|
||||
port_ = acceptor_.local_endpoint().port();
|
||||
accept();
|
||||
thread_ = std::thread([this] { ios_.run(); });
|
||||
}
|
||||
|
||||
~MockHTTPServer()
|
||||
{
|
||||
running_ = false;
|
||||
work_.reset(); // Allow io_service to stop.
|
||||
boost::system::error_code ec;
|
||||
acceptor_.close(ec);
|
||||
ios_.stop();
|
||||
if (thread_.joinable())
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
unsigned short
|
||||
port() const
|
||||
{
|
||||
return port_;
|
||||
}
|
||||
int
|
||||
activeConnectionCount() const
|
||||
{
|
||||
return activeConnections_;
|
||||
}
|
||||
int
|
||||
peakConnectionCount() const
|
||||
{
|
||||
return peakConnections_;
|
||||
}
|
||||
int
|
||||
totalAcceptedCount() const
|
||||
{
|
||||
return totalAccepted_;
|
||||
}
|
||||
|
||||
void
|
||||
setStatus(int code)
|
||||
{
|
||||
statusCode_ = code;
|
||||
}
|
||||
void
|
||||
setDelay(int ms)
|
||||
{
|
||||
delayMs_ = ms;
|
||||
}
|
||||
void
|
||||
setSendResponse(bool send)
|
||||
{
|
||||
sendResponse_ = send;
|
||||
}
|
||||
void
|
||||
setCloseImmediately(bool close)
|
||||
{
|
||||
closeImmediately_ = close;
|
||||
}
|
||||
void
|
||||
setNoContentLength(bool noContentLength)
|
||||
{
|
||||
noContentLength_ = noContentLength;
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
accept()
|
||||
{
|
||||
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
|
||||
acceptor_.async_accept(*sock, [this, sock](auto ec) {
|
||||
if (!ec && running_)
|
||||
{
|
||||
++totalAccepted_;
|
||||
int current = ++activeConnections_;
|
||||
int prev = peakConnections_.load();
|
||||
while (current > prev &&
|
||||
!peakConnections_.compare_exchange_weak(prev, current))
|
||||
;
|
||||
|
||||
handleConnection(sock);
|
||||
accept();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
handleConnection(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
if (closeImmediately_)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||
sock->close(ec);
|
||||
--activeConnections_;
|
||||
return;
|
||||
}
|
||||
|
||||
auto buf = std::make_shared<boost::asio::streambuf>();
|
||||
boost::asio::async_read_until(
|
||||
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, size_t) {
|
||||
if (ec)
|
||||
{
|
||||
--activeConnections_;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!sendResponse_)
|
||||
{
|
||||
// Hold connection open without responding.
|
||||
// The socket shared_ptr prevents cleanup.
|
||||
// This simulates a server that accepts but
|
||||
// never responds (e.g., overloaded).
|
||||
return;
|
||||
}
|
||||
|
||||
auto delay = delayMs_.load();
|
||||
if (delay > 0)
|
||||
{
|
||||
auto timer =
|
||||
std::make_shared<boost::asio::steady_timer>(ios_);
|
||||
timer->expires_from_now(std::chrono::milliseconds(delay));
|
||||
timer->async_wait(
|
||||
[this, sock, timer](auto) { sendHTTPResponse(sock); });
|
||||
}
|
||||
else
|
||||
{
|
||||
sendHTTPResponse(sock);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
sendHTTPResponse(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
auto body = std::string("{}");
|
||||
std::string header =
|
||||
"HTTP/1.0 " + std::to_string(statusCode_.load()) + " OK\r\n";
|
||||
if (!noContentLength_)
|
||||
header += "Content-Length: " + std::to_string(body.size()) + "\r\n";
|
||||
header += "\r\n";
|
||||
auto response = std::make_shared<std::string>(header + body);
|
||||
|
||||
boost::asio::async_write(
|
||||
*sock,
|
||||
boost::asio::buffer(*response),
|
||||
[this, sock, response](auto, size_t) {
|
||||
boost::system::error_code ec;
|
||||
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||
sock->close(ec);
|
||||
--activeConnections_;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
class HTTPClient_test : public beast::unit_test::suite
|
||||
{
|
||||
// Helper: fire an HTTP request and track completion via atomic counter.
|
||||
void
|
||||
fireRequest(
|
||||
boost::asio::io_service& ios,
|
||||
std::string const& host,
|
||||
unsigned short port,
|
||||
std::atomic<int>& completed,
|
||||
beast::Journal& j,
|
||||
std::chrono::seconds timeout = std::chrono::seconds{5})
|
||||
{
|
||||
HTTPClient::request(
|
||||
false, // no SSL
|
||||
ios,
|
||||
host,
|
||||
port,
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
timeout,
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
void
|
||||
testCleanupAfterSuccess()
|
||||
{
|
||||
testcase("Socket cleanup after successful response");
|
||||
|
||||
// After a successful HTTP request completes, the
|
||||
// HTTPClientImp should be destroyed and its socket
|
||||
// closed promptly — not held until the deadline fires.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(server.totalAcceptedCount() == 1);
|
||||
// After io_service.run() returns, the server should
|
||||
// see zero active connections — socket was released.
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfter500()
|
||||
{
|
||||
testcase("Socket cleanup after HTTP 500");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterConnectionRefused()
|
||||
{
|
||||
testcase("Socket cleanup after connection refused");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
// Bind a port, then close it — guarantees nothing is listening.
|
||||
boost::asio::io_service tmp;
|
||||
boost::asio::ip::tcp::acceptor acc(
|
||||
tmp,
|
||||
boost::asio::ip::tcp::endpoint(
|
||||
boost::asio::ip::address::from_string("127.0.0.1"), 0));
|
||||
auto port = acc.local_endpoint().port();
|
||||
acc.close();
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", port, completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// Callback should still be invoked (with error).
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterTimeout()
|
||||
{
|
||||
testcase("Socket cleanup after timeout");
|
||||
|
||||
// Server accepts but never responds. HTTPClient should
|
||||
// time out, clean up, and invoke the callback.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setSendResponse(false); // accept, read, but never respond
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
// Short timeout to keep the test fast.
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{2});
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// Callback must be invoked even on timeout.
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterServerCloseBeforeResponse()
|
||||
{
|
||||
testcase("Socket cleanup after server closes before response");
|
||||
|
||||
// Server accepts the connection then immediately closes
|
||||
// it without sending anything.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setCloseImmediately(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testEOFCompletionCallsCallback()
|
||||
{
|
||||
testcase("EOF completion invokes callback (handleData bug)");
|
||||
|
||||
// HTTPClientImp::handleData has a code path where
|
||||
// mShutdown == eof results in logging "Complete." but
|
||||
// never calling invokeComplete(). This means:
|
||||
// - The completion callback is never invoked
|
||||
// - The deadline timer is never cancelled
|
||||
// - The socket is held open until the 30s deadline
|
||||
//
|
||||
// This test verifies the callback IS invoked after an
|
||||
// EOF response. If this test fails (completed == 0 after
|
||||
// ios.run()), the handleData EOF bug is confirmed.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{3});
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// If handleData EOF path doesn't call invokeComplete,
|
||||
// the callback won't fire until the deadline (3s) expires,
|
||||
// and even then handleDeadline doesn't invoke mComplete.
|
||||
// The io_service.run() will still return (deadline fires,
|
||||
// handleShutdown runs, all handlers done), but completed
|
||||
// will be 0.
|
||||
if (completed != 1)
|
||||
{
|
||||
log << " BUG CONFIRMED: handleData EOF path does not"
|
||||
<< " call invokeComplete(). Callback was not invoked."
|
||||
<< " Socket held open until deadline." << std::endl;
|
||||
}
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testConcurrentRequestCleanup()
|
||||
{
|
||||
testcase("Concurrent requests all clean up");
|
||||
|
||||
// Fire N requests at once on the same io_service.
|
||||
// All should complete and release their sockets.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
static constexpr int N = 50;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
}
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
// Brief sleep to let server-side shutdown complete.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
|
||||
log << " Completed: " << completed
|
||||
<< ", Peak concurrent: " << server.peakConnectionCount()
|
||||
<< ", Active after: " << server.activeConnectionCount()
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
void
|
||||
testConcurrent500Cleanup()
|
||||
{
|
||||
testcase("Concurrent 500 requests all clean up");
|
||||
|
||||
// Fire N requests that all get 500 responses. Verify
|
||||
// all sockets are released and no FDs leak.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
static constexpr int N = 50;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
}
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testEOFWithoutContentLength()
|
||||
{
|
||||
testcase("EOF without Content-Length (handleData EOF path)");
|
||||
|
||||
// When a server sends a response WITHOUT Content-Length,
|
||||
// HTTPClientImp reads up to maxResponseSize. The server
|
||||
// closes the connection, causing EOF in handleData.
|
||||
//
|
||||
// In handleData, the EOF path (mShutdown == eof) logs
|
||||
// "Complete." but does NOT call invokeComplete(). This
|
||||
// means:
|
||||
// - mComplete (callback) is never invoked
|
||||
// - deadline timer is never cancelled
|
||||
// - socket + object held alive until deadline fires
|
||||
//
|
||||
// This test uses a SHORT deadline to keep it fast. If
|
||||
// the callback IS invoked, ios.run() returns quickly.
|
||||
// If NOT, ios.run() blocks until the deadline (2s).
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
server.setNoContentLength(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{2});
|
||||
ios.run();
|
||||
}
|
||||
auto elapsed = std::chrono::steady_clock::now() - start;
|
||||
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
|
||||
.count();
|
||||
|
||||
if (completed == 0)
|
||||
{
|
||||
log << " BUG CONFIRMED: handleData EOF path does not"
|
||||
<< " call invokeComplete(). Callback never invoked."
|
||||
<< " io_service.run() blocked for " << ms << "ms"
|
||||
<< " (deadline timeout)." << std::endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
log << " Callback invoked in " << ms << "ms." << std::endl;
|
||||
}
|
||||
// This WILL fail if the EOF bug exists — the callback
|
||||
// is only invoked via the deadline timeout path, which
|
||||
// does NOT call mComplete.
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testPersistentIOServiceCleanup()
|
||||
{
|
||||
testcase("Cleanup on persistent io_service (no destructor mask)");
|
||||
|
||||
// Previous tests destroy the io_service after run(),
|
||||
// which releases all pending handlers' shared_ptrs.
|
||||
// This masks leaks. Here we use a PERSISTENT io_service
|
||||
// (with work guard, running on its own thread) and check
|
||||
// that HTTPClientImp objects are destroyed WITHOUT relying
|
||||
// on io_service destruction.
|
||||
//
|
||||
// We track the object's lifetime via the completion
|
||||
// callback — if it fires, the async chain completed
|
||||
// normally. If it doesn't fire within a reasonable time
|
||||
// but the io_service is still running, something is stuck.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
// Persistent io_service — stays alive the whole test.
|
||||
boost::asio::io_service ios;
|
||||
auto work = std::make_unique<boost::asio::io_service::work>(ios);
|
||||
std::thread runner([&ios] { ios.run(); });
|
||||
|
||||
// Fire request on the persistent io_service.
|
||||
HTTPClient::request(
|
||||
false,
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
|
||||
// Wait for completion without destroying io_service.
|
||||
auto deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds{5};
|
||||
while (completed == 0 && std::chrono::steady_clock::now() < deadline)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
|
||||
// Give server-side shutdown a moment.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
|
||||
if (server.activeConnectionCount() != 0)
|
||||
{
|
||||
log << " BUG: Socket still open on persistent"
|
||||
<< " io_service. FD leaked." << std::endl;
|
||||
}
|
||||
|
||||
// Clean shutdown.
|
||||
work.reset();
|
||||
ios.stop();
|
||||
runner.join();
|
||||
}
|
||||
|
||||
void
|
||||
testPersistentIOService500Cleanup()
|
||||
{
|
||||
testcase("500 cleanup on persistent io_service");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
static constexpr int N = 20;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
boost::asio::io_service ios;
|
||||
auto work = std::make_unique<boost::asio::io_service::work>(ios);
|
||||
std::thread runner([&ios] { ios.run(); });
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
HTTPClient::request(
|
||||
false,
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
}
|
||||
|
||||
auto deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds{10};
|
||||
while (completed < N && std::chrono::steady_clock::now() < deadline)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
|
||||
log << " Completed: " << completed << "/" << N
|
||||
<< ", Active connections after: " << server.activeConnectionCount()
|
||||
<< std::endl;
|
||||
|
||||
work.reset();
|
||||
ios.stop();
|
||||
runner.join();
|
||||
}
|
||||
|
||||
void
|
||||
testGetSelfReferenceCleanup()
|
||||
{
|
||||
testcase("get() shared_from_this cycle releases");
|
||||
|
||||
// HTTPClientImp::get() binds shared_from_this() into
|
||||
// mBuild via makeGet. This creates a reference cycle:
|
||||
// object -> mBuild -> shared_ptr<object>
|
||||
// The object can only be destroyed if mBuild is cleared.
|
||||
// Since mBuild is never explicitly cleared, this may be
|
||||
// a permanent FD leak.
|
||||
//
|
||||
// This test fires a GET request and checks whether the
|
||||
// HTTPClientImp is destroyed (and socket closed) after
|
||||
// completion.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
HTTPClient::get(
|
||||
false, // no SSL
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
"/test",
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
// If the get() self-reference cycle leaks, the server
|
||||
// will still show an active connection here (the socket
|
||||
// in the leaked HTTPClientImp is never closed).
|
||||
if (server.activeConnectionCount() != 0)
|
||||
{
|
||||
log << " BUG CONFIRMED: get() self-reference cycle"
|
||||
<< " prevents HTTPClientImp destruction."
|
||||
<< " Socket FD leaked." << std::endl;
|
||||
}
|
||||
BEAST_EXPECT(server.activeConnectionCount() == 0);
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testCleanupAfterSuccess();
|
||||
testCleanupAfter500();
|
||||
testCleanupAfterConnectionRefused();
|
||||
testCleanupAfterTimeout();
|
||||
testCleanupAfterServerCloseBeforeResponse();
|
||||
testEOFCompletionCallsCallback();
|
||||
testConcurrentRequestCleanup();
|
||||
testConcurrent500Cleanup();
|
||||
testEOFWithoutContentLength();
|
||||
testPersistentIOServiceCleanup();
|
||||
testPersistentIOService500Cleanup();
|
||||
testGetSelfReferenceCleanup();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(HTTPClient, net, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
Reference in New Issue
Block a user