Compare commits

..

1 Commits

9 changed files with 92 additions and 1719 deletions

View File

@@ -77,11 +77,6 @@ test.ledger > xrpld.app
test.ledger > xrpld.core
test.ledger > xrpld.ledger
test.ledger > xrpl.protocol
test.net > test.toplevel
test.net > xrpl.basics
test.net > xrpld.core
test.net > xrpld.net
test.net > xrpl.json
test.nodestore > test.jtx
test.nodestore > test.toplevel
test.nodestore > test.unit_test

View File

@@ -95,8 +95,16 @@ if [[ "$4" == "" ]]; then
echo "Non GH, local building, no Action runner magic"
else
# GH Action, runner
cp /io/release-build/xahaud /data/builds/$(date +%Y).$(date +%-m).$(date +%-d)-$(git rev-parse --abbrev-ref HEAD)+$4
cp /io/release-build/release.info /data/builds/$(date +%Y).$(date +%-m).$(date +%-d)-$(git rev-parse --abbrev-ref HEAD)+$4.releaseinfo
if [[ "$(git rev-parse --abbrev-ref HEAD)" == "release" ]]; then
echo "building on the release branch... placing it in builds/candidate"
mkdir /data/builds/candidate
cp /io/release-build/xahaud /data/builds/candidate/$(date +%Y).$(date +%-m).$(date +%-d)-$(git rev-parse --abbrev-ref HEAD)+$4
cp /io/release-build/release.info /data/builds/candidate/$(date +%Y).$(date +%-m).$(date +%-d)-$(git rev-parse --abbrev-ref HEAD)+$4.releaseinfo
else
echo "building non-release branch, placing it in builds root"
cp /io/release-build/xahaud /data/builds/$(date +%Y).$(date +%-m).$(date +%-d)-$(git rev-parse --abbrev-ref HEAD)+$4
cp /io/release-build/release.info /data/builds/$(date +%Y).$(date +%-m).$(date +%-d)-$(git rev-parse --abbrev-ref HEAD)+$4.releaseinfo
fi
echo "Published build to: http://build.xahau.tech/"
echo $(date +%Y).$(date +%-m).$(date +%-d)-$(git rev-parse --abbrev-ref HEAD)+$4
fi

File diff suppressed because it is too large Load Diff

View File

@@ -1,351 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
#include <xrpld/core/Job.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/net/RPCSub.h>
#include <xrpl/json/json_value.h>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <atomic>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
namespace ripple {
namespace test {
// Minimal HTTP endpoint that counts received webhook POSTs and replies
// with a configurable status. Responses are EOF-delimited (no
// Content-Length) and the socket is closed right after writing — the
// exact shape that triggered the original handleData EOF-completion
// leak. So these tests exercise RPCSub flow control AND the HTTPClient
// EOF fix end to end: if either regressed, delivery would stall and the
// expected count would never be reached within the timeout.
class MockWebhookEndpoint
{
boost::asio::io_service ios_;
std::unique_ptr<boost::asio::io_service::work> work_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread thread_;
unsigned short port_;
std::atomic<int> received_{0};
std::atomic<int> status_{200};
std::atomic<int> delayMs_{0};
public:
MockWebhookEndpoint()
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
, acceptor_(
ios_,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"),
0))
{
port_ = acceptor_.local_endpoint().port();
accept();
thread_ = std::thread([this] { ios_.run(); });
}
~MockWebhookEndpoint()
{
work_.reset();
boost::system::error_code ec;
acceptor_.close(ec);
ios_.stop();
if (thread_.joinable())
thread_.join();
}
unsigned short
port() const
{
return port_;
}
int
received() const
{
return received_;
}
void
setStatus(int s)
{
status_ = s;
}
// Delay each reply so delivery is deterministically slower than the
// microsecond-fast enqueue loop — keeps the deque full for the
// queue-cap drop test regardless of scheduling.
void
setResponseDelay(int ms)
{
delayMs_ = ms;
}
private:
void
accept()
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
acceptor_.async_accept(*sock, [this, sock](auto ec) {
if (ec)
return;
handle(sock);
accept();
});
}
void
handle(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
auto buf = std::make_shared<boost::asio::streambuf>();
boost::asio::async_read_until(
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, std::size_t) {
if (ec)
return;
++received_;
auto const delay = delayMs_.load();
if (delay > 0)
{
auto timer =
std::make_shared<boost::asio::steady_timer>(ios_);
timer->expires_from_now(std::chrono::milliseconds(delay));
timer->async_wait(
[this, sock, timer](auto) { reply(sock); });
}
else
{
reply(sock);
}
});
}
void
reply(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
// EOF-delimited reply: no Content-Length, close after writing.
// This is the realistic failing-webhook shape.
auto resp = std::make_shared<std::string>(
"HTTP/1.0 " + std::to_string(status_.load()) +
" Reply\r\n\r\n{\"result\":{}}");
boost::asio::async_write(
*sock, boost::asio::buffer(*resp), [sock, resp](auto, std::size_t) {
boost::system::error_code ig;
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ig);
sock->close(ig);
});
}
};
//------------------------------------------------------------------------------
class RPCSub_test : public beast::unit_test::suite
{
// Generous ceiling: the instrumented Debug (coverage) build is much
// slower than Release, so timeouts are sized for that, not Release.
template <class Cond>
bool
waitFor(Cond cond, std::chrono::seconds timeout = std::chrono::seconds{30})
{
auto const deadline = std::chrono::steady_clock::now() + timeout;
while (!cond() && std::chrono::steady_clock::now() < deadline)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return cond();
}
std::shared_ptr<RPCSub>
makeSub(
jtx::Env& env,
MockWebhookEndpoint& ep,
std::size_t maxQueueSize = 16384)
{
return make_RPCSub(
env.app().getOPs(),
env.app().getJobQueue(),
"http://127.0.0.1:" + std::to_string(ep.port()) + "/",
"",
"",
env.app().logs(),
maxQueueSize);
}
// True once no RPCSub sending job is queued or running. sendThread
// captures a raw `this`, so the RPCSub must not be destroyed while a
// job is still in flight — wait on this before letting the sub die.
bool
sendingIdle(jtx::Env& env)
{
return env.app().getJobQueue().getJobCountTotal(jtCLIENT_SUBSCRIBE) ==
0;
}
// Wait for all events to reach the endpoint AND the sending job to
// finish, so the sub can be torn down without racing sendThread.
void
drainAndSettle(jtx::Env& env, MockWebhookEndpoint& ep, int expected)
{
bool const delivered =
waitFor([&] { return ep.received() >= expected; });
bool const idle = waitFor([&] { return sendingIdle(env); });
log << " drainAndSettle: received=" << ep.received() << "/" << expected
<< " idle=" << idle << std::endl;
BEAST_EXPECT(delivered);
BEAST_EXPECT(idle);
}
void
send(std::shared_ptr<RPCSub> const& sub, int n)
{
Json::Value ev(Json::objectValue);
ev["n"] = n;
sub->send(ev, false);
}
void
testDelivery()
{
testcase("Webhook events are delivered");
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
static constexpr int N = 10;
{
auto sub = makeSub(env, ep);
for (int i = 0; i < N; ++i)
send(sub, i);
drainAndSettle(env, ep, N);
}
BEAST_EXPECT(ep.received() == N);
}
void
testErrorsDoNotStall()
{
testcase("Delivery continues when endpoint returns HTTP 500");
// The original bug (xrpld #6341): an endpoint returning errors
// without Content-Length never completed, stalling delivery to
// ALL subscribers. Here every response is a 500 with no
// Content-Length (EOF-delimited) — all N must still arrive.
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
ep.setStatus(500);
static constexpr int N = 10;
{
auto sub = makeSub(env, ep);
for (int i = 0; i < N; ++i)
send(sub, i);
drainAndSettle(env, ep, N);
}
BEAST_EXPECT(ep.received() == N);
}
void
testRestartAfterDrain()
{
testcase("Sending restarts after the queue drains");
// After a batch drains, sendThread clears mSending and returns.
// A later send() must start a fresh sending job; if mSending were
// left set (the #6341 failure mode) the second burst would never
// be delivered.
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
{
auto sub = makeSub(env, ep);
// First burst, then wait for the sending job to fully drain
// and exit (mSending cleared) — deterministically, not via a
// sleep.
for (int i = 0; i < 5; ++i)
send(sub, i);
drainAndSettle(env, ep, 5);
// Second burst must start a fresh sending job.
for (int i = 5; i < 10; ++i)
send(sub, i);
drainAndSettle(env, ep, 10);
}
BEAST_EXPECT(ep.received() == 10);
}
void
testQueueCapDrops()
{
testcase("Events past the queue cap are dropped");
// With a tiny cap, pushing far more events than delivery can keep
// up with forces send() down the drop path: enqueue is microsecond
// -fast while each (delayed) HTTP delivery is a full round-trip, so
// the deque sits at the cap and excess events are dropped. The
// delay makes "delivery slower than enqueue" hold regardless of
// scheduling, so this isn't timing-dependent. We just need some
// delivered (cap works) and some dropped (drop path exercised).
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
ep.setResponseDelay(50);
static constexpr int pushed = 50;
{
auto sub = makeSub(env, ep, /*maxQueueSize*/ 2);
for (int i = 0; i < pushed; ++i)
send(sub, i);
BEAST_EXPECT(waitFor([&] { return sendingIdle(env); }));
}
log << " queue cap: received " << ep.received() << "/" << pushed
<< std::endl;
BEAST_EXPECT(ep.received() > 0);
BEAST_EXPECT(ep.received() < pushed);
}
public:
void
run() override
{
testDelivery();
testErrorsDoNotStall();
testRestartAfterDrain();
testQueueCapDrops();
}
};
BEAST_DEFINE_TESTSUITE(RPCSub, net, ripple);
} // namespace test
} // namespace ripple

View File

@@ -22,6 +22,7 @@
#include <xrpld/core/JobQueue.h>
#include <xrpld/net/InfoSub.h>
#include <boost/asio/io_service.hpp>
namespace ripple {
@@ -38,17 +39,16 @@ protected:
explicit RPCSub(InfoSub::Source& source);
};
// VFALCO Why is the io_service needed?
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs,
// Max events buffered before new ones are dropped. Configurable so
// tests can exercise the drop path without queueing the full default.
std::size_t maxQueueSize = 16384);
Logs& logs);
} // namespace ripple

View File

@@ -122,20 +122,12 @@ public:
mComplete = complete;
mTimeout = timeout;
// Bind a non-owning `this` (not shared_from_this()) into mBuild.
// mBuild is a member, so capturing a shared_ptr to self here would
// form a reference cycle (this -> mBuild -> shared_ptr<this>) that
// never breaks, leaking the object and its socket FD after the
// request completes. mBuild is only ever invoked from
// handleRequest(), which always runs inside an async handler that
// already holds a shared_from_this(), so the object is guaranteed
// alive whenever mBuild fires — a raw `this` is safe.
request(
bSSL,
deqSites,
std::bind(
&HTTPClientImp::makeGet,
this,
shared_from_this(),
strPath,
std::placeholders::_1,
std::placeholders::_2),
@@ -401,12 +393,8 @@ public:
if (boost::regex_match(strHeader, smMatch, reBody)) // we got some body
mBody = smMatch[1];
bool const hasContentLength =
boost::regex_match(strHeader, smMatch, reSize);
mReceivedContentLength = hasContentLength;
std::size_t const responseSize = [&] {
if (hasContentLength)
if (boost::regex_match(strHeader, smMatch, reSize))
return beast::lexicalCast<std::size_t>(
std::string(smMatch[1]), maxResponseSize_);
return maxResponseSize_;
@@ -457,24 +445,22 @@ public:
JLOG(j_.trace()) << "Read error: " << mShutdown.message();
invokeComplete(mShutdown);
return;
}
// Either the read completed normally or it ended at EOF. EOF is a
// successful completion for EOF-delimited responses, but it is an
// error when the server promised a Content-Length and closed early.
JLOG(j_.trace()) << "Complete.";
mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
auto completeEc = ecResult;
if (completeEc == boost::asio::error::eof && !mReceivedContentLength)
completeEc.clear();
invokeComplete(completeEc, mStatus, mBody + strBody);
else
{
if (mShutdown)
{
JLOG(j_.trace()) << "Complete.";
}
else
{
mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
invokeComplete(ecResult, mStatus, mBody + strBody);
}
}
}
// Call cancel the deadline timer and invoke the completion routine.
@@ -530,7 +516,6 @@ private:
boost::asio::streambuf mHeader;
boost::asio::streambuf mResponse;
std::string mBody;
bool mReceivedContentLength = false;
const unsigned short mPort;
std::size_t const maxResponseSize_;
int mStatus;

View File

@@ -1585,10 +1585,6 @@ struct RPCCallImp
// callbackFuncP.
// Receive reply
if (ecResult)
Throw<std::runtime_error>(
"RPC transport error: " + ecResult.message());
if (strData.empty())
Throw<std::runtime_error>(
"no response from server. Please "
@@ -1752,7 +1748,6 @@ rpcClient(
}
{
//@@start blocking-request
boost::asio::io_service isService;
RPCCall::fromNetwork(
isService,
@@ -1776,7 +1771,6 @@ rpcClient(
headers);
isService.run(); // This blocks until there are no more
// outstanding async calls.
//@@end blocking-request
}
if (jvOutput.isMember("result"))
{
@@ -1887,21 +1881,15 @@ fromNetwork(
// Send request
// Number of bytes to try to receive if no Content-Length header is
// received. Webhook event deliveries ("event") ignore the response
// body, so a missing Content-Length must not pre-allocate the full
// 256MB RPC reply budget per in-flight delivery (maxInFlight can be
// 32 -> 8GB). Cap those small; genuine RPC replies (CLI) keep the
// large budget.
auto const RPC_REPLY_MAX_BYTES =
(strMethod == "event") ? megabytes(1) : megabytes(256);
// Number of bytes to try to receive if no
// Content-Length header received
constexpr auto RPC_REPLY_MAX_BYTES = megabytes(256);
using namespace std::chrono_literals;
// auto constexpr RPC_NOTIFY = 10min; // Wietse: lolwut 10 minutes for one
// HTTP call?
auto constexpr RPC_NOTIFY = 30s;
//@@start async-request
HTTPClient::request(
bSSL,
io_service,
@@ -1926,7 +1914,6 @@ fromNetwork(
std::placeholders::_3,
j),
j);
//@@end async-request
}
} // namespace RPCCall

View File

@@ -24,30 +24,29 @@
#include <xrpl/basics/contract.h>
#include <xrpl/json/to_string.h>
#include <deque>
#include <memory>
namespace ripple {
// Subscription object for JSON-RPC
class RPCSubImp : public RPCSub, public std::enable_shared_from_this<RPCSubImp>
class RPCSubImp : public RPCSub
{
public:
RPCSubImp(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs,
std::size_t maxQueueSize)
Logs& logs)
: RPCSub(source)
, m_io_service(io_service)
, m_jobQueue(jobQueue)
, mUrl(strUrl)
, mSSL(false)
, mUsername(strUsername)
, mPassword(strPassword)
, mSending(false)
, maxQueueSize_(maxQueueSize)
, j_(logs.journal("RPCSub"))
, logs_(logs)
{
@@ -79,26 +78,14 @@ public:
{
std::lock_guard sl(mLock);
if (mDeque.size() >= maxQueueSize_)
{
// Always advance mSeq so consumers can detect the gap, but
// rate-limit the log: a hopelessly behind endpoint drops on
// every send() and would otherwise flood the log. Warn on
// the first drop of a run and then once per dropLogInterval.
if (mDropped++ % dropLogInterval == 0)
{
JLOG(j_.warn())
<< "RPCCall::fromNetwork drop: queue full ("
<< mDeque.size() << "), seq=" << mSeq
<< ", endpoint=" << mIp << ", dropped=" << mDropped;
}
++mSeq;
return;
}
// Endpoint caught up enough to accept again; reset so the next
// overflow burst logs its first drop immediately.
mDropped = 0;
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly Dropping events just like this results in inconsistent
// data on the receiving end if (mDeque.size() >= eventQueueMax)
// {
// // Drop the previous event.
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
// mDeque.pop_back();
// }
auto jm = broadcast ? j_.debug() : j_.info();
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
@@ -110,7 +97,10 @@ public:
// Start a sending thread.
JLOG(j_.info()) << "RPCCall::fromNetwork start";
startSendingJob();
mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
sendThread();
});
}
}
@@ -131,66 +121,48 @@ public:
}
private:
// Maximum concurrent HTTP deliveries per batch. Bounds file
// descriptor usage while still allowing parallel delivery to
// capable endpoints. With a 1024 FD process limit shared across
// peers, clients, and the node store, 32 per subscriber is a
// meaningful but survivable chunk even with multiple subscribers.
static constexpr int maxInFlight = 32;
// Log one drop warning per this many drops while the queue stays
// full, to avoid flooding the log on a persistently behind endpoint.
static constexpr std::size_t dropLogInterval = 1000;
// Schedule a sending job. Must be called under mLock. The job holds a
// weak_ptr and re-locks it on entry, so the RPCSub is kept alive for
// the duration of the batch even if it is unsubscribed (and would
// otherwise be destroyed) concurrently — sendThread dereferences this
// only via that strong ref. mDeque events are delivered until the sub
// is gone, after which weak.lock() fails and the job is a no-op.
void
startSendingJob()
{
std::weak_ptr<RPCSubImp> weak = weak_from_this();
mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [weak]() {
if (auto self = weak.lock())
self->sendThread();
});
}
// XXX Could probably create a bunch of send jobs in a single get of the
// lock.
void
sendThread()
{
// Process exactly ONE batch per job, then re-queue if more events
// remain, rather than draining the whole backlog in a single job.
// A local io_service's .run() blocks this worker thread for the
// batch (up to the per-request timeout), so re-queueing between
// batches keeps one slow/hung subscriber from monopolising a
// job-queue worker and starving consensus/ledger/RPC work.
//
// mSending must be cleared under the lock on every non-requeue
// exit path; if it ever stays set without a job in flight, send()
// sees mSending == true and never restarts us, stalling the queue
// forever — the original bug (xrpld issue #6341).
boost::asio::io_service io_service;
int dispatched = 0;
Json::Value jvEvent;
bool bSend;
try
do
{
{
// Obtain the lock to manipulate the queue and change sending.
std::lock_guard sl(mLock);
while (!mDeque.empty() && dispatched < maxInFlight)
if (mDeque.empty())
{
mSending = false;
bSend = false;
}
else
{
auto const [seq, env] = mDeque.front();
mDeque.pop_front();
Json::Value jvEvent = env;
jvEvent = env;
jvEvent["seq"] = seq;
bSend = true;
}
}
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
RPCCall::fromNetwork(
io_service,
m_io_service,
mIp,
mPort,
mUsername,
@@ -201,51 +173,21 @@ private:
mSSL,
true,
logs_);
++dispatched;
}
catch (const std::exception& e)
{
JLOG(j_.info())
<< "RPCCall::fromNetwork exception: " << e.what();
}
}
// dispatched is always > 0 here (send() only starts a job
// after enqueuing, and the re-queue below only fires with a
// non-empty deque), but guard anyway so an empty batch can't
// log/spin — it falls straight through to clear mSending.
if (dispatched > 0)
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp
<< " dispatching " << dispatched << " events";
io_service.run();
}
}
catch (std::exception const& e)
{
// Bail rather than re-queue: a persistently failing endpoint
// would otherwise spin the job queue. mSending is reset so the
// next send() restarts delivery.
JLOG(j_.warn()) << "RPCSub::sendThread exception: " << e.what();
std::lock_guard sl(mLock);
mSending = false;
return;
}
catch (...)
{
JLOG(j_.warn()) << "RPCSub::sendThread unknown exception";
std::lock_guard sl(mLock);
mSending = false;
return;
}
// Batch complete: re-queue for the next one (mSending stays set)
// or clear mSending if the queue drained — both under the lock to
// avoid a lost-wakeup race with send().
std::lock_guard sl(mLock);
if (mDeque.empty())
mSending = false;
else
startSendingJob();
} while (bSend);
}
private:
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly enum { eventQueueMax = 32 };
boost::asio::io_service& m_io_service;
JobQueue& m_jobQueue;
std::string mUrl;
@@ -258,15 +200,8 @@ private:
int mSeq; // Next id to allocate.
std::size_t mDropped = 0; // Consecutive drops while queue is full.
bool mSending; // Sending threead is active.
// Maximum queued events before dropping. The default (16384) is a
// ~10-minute buffer at 100+ events/ledger; a hopelessly behind
// endpoint trips it and consumers detect the gap via the seq field.
std::size_t const maxQueueSize_;
std::deque<std::pair<int, Json::Value>> mDeque;
beast::Journal const j_;
@@ -282,21 +217,21 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs,
std::size_t maxQueueSize)
Logs& logs)
{
return std::make_shared<RPCSubImp>(
std::ref(source),
std::ref(io_service),
std::ref(jobQueue),
strUrl,
strUsername,
strPassword,
logs,
maxQueueSize);
logs);
}
} // namespace ripple

View File

@@ -76,6 +76,7 @@ doSubscribe(RPC::JsonContext& context)
{
auto rspSub = make_RPCSub(
context.app.getOPs(),
context.app.getIOService(),
context.app.getJobQueue(),
strUrl,
strUsername,