Compare commits

...

4 Commits

Author SHA1 Message Date
Valentin Balaschenko
7c918d8122 improve packing of Number 2026-01-22 15:20:41 +00:00
Valentin Balaschenko
68fb4e7ad6 improve packing of Number 2026-01-22 14:19:02 +00:00
Bart
8695313565 ci: Run on-trigger and on-pr when generate-version is modified (#6257)
This change ensures that the `on-pr` and `on-trigger` workflows run when the generate-version action is modified.
2026-01-22 13:48:50 +00:00
Valentin Balaschenko
68c9d5ca0f refactor: Enforce 15-char limit and simplify labels for thread naming (#6212)
This change continues the thread naming work from #5691 and #5758, which enables more useful lock contention profiling by ensuring threads/jobs have short, stable, human-readable names (rather than being truncated/failing due to OS limits). This changes diagnostic naming only (thread names and job/load-event labels), not behavior.

Specific modifications are:
* Shortens all thread/job names used with `beast::setCurrentThreadName`, so the effective Linux thread name stays within the 15-character limit.
* Removes per-ledger sequence numbers from job/thread names to avoid long labels. This improves aggregation in lock contention profiling for short-lived job executions.
2026-01-22 08:19:29 -05:00
23 changed files with 111 additions and 135 deletions

View File

@@ -59,6 +59,7 @@ jobs:
# Keep the paths below in sync with those in `on-trigger.yml`.
.github/actions/build-deps/**
.github/actions/build-test/**
.github/actions/generate-version/**
.github/actions/setup-conan/**
.github/scripts/strategy-matrix/**
.github/workflows/reusable-build.yml

View File

@@ -16,6 +16,7 @@ on:
# Keep the paths below in sync with those in `on-pr.yml`.
- ".github/actions/build-deps/**"
- ".github/actions/build-test/**"
- ".github/actions/generate-version/**"
- ".github/actions/setup-conan/**"
- ".github/scripts/strategy-matrix/**"
- ".github/workflows/reusable-build.yml"

View File

@@ -213,9 +213,9 @@ class Number
using rep = std::int64_t;
using internalrep = MantissaRange::rep;
bool negative_{false};
internalrep mantissa_{0};
int exponent_{std::numeric_limits<int>::lowest()};
bool negative_{false};
public:
// The range for the exponent when normalized
@@ -524,7 +524,7 @@ inline constexpr Number::Number(
internalrep mantissa,
int exponent,
unchecked) noexcept
: negative_(negative), mantissa_{mantissa}, exponent_{exponent}
: mantissa_{mantissa}, exponent_{exponent}, negative_(negative)
{
}

View File

@@ -1,4 +1,5 @@
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <string>
#include <string_view>
@@ -95,6 +96,11 @@ setCurrentThreadNameImpl(std::string_view name)
std::cerr << "WARNING: Thread name \"" << name << "\" (length "
<< name.size() << ") exceeds maximum of "
<< maxThreadNameLength << " characters on Linux.\n";
XRPL_ASSERT(
false,
"beast::detail::setCurrentThreadNameImpl : Thread name exceeds "
"maximum length for Linux");
}
#endif
}

View File

@@ -41,7 +41,7 @@ Job::queue_time() const
void
Job::doJob()
{
beast::setCurrentThreadName("doJob: " + mName);
beast::setCurrentThreadName("j:" + mName);
m_loadEvent->start();
m_loadEvent->setName(mName);

View File

@@ -88,20 +88,15 @@ public:
BEAST_EXPECT(stateB == 2);
}
#if BOOST_OS_LINUX
// On Linux, verify that thread names longer than 15 characters
// are truncated to 15 characters (the 16th character is reserved for
// the null terminator).
// On Linux, verify that thread names within the 15 character limit
// are set correctly (the 16th character is reserved for the null
// terminator).
{
testName(
"123456789012345",
"123456789012345"); // 15 chars, no truncation
testName(
"1234567890123456", "123456789012345"); // 16 chars, truncated
testName(
"ThisIsAVeryLongThreadNameExceedingLimit",
"ThisIsAVeryLong"); // 39 chars, truncated
"123456789012345"); // 15 chars, maximum allowed
testName("", ""); // empty name
testName("short", "short"); // short name, no truncation
testName("short", "short"); // short name
}
#endif
}

View File

@@ -56,7 +56,7 @@ public:
gate g1, g2;
std::shared_ptr<JobQueue::Coro> c;
env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& cr) {
jtCLIENT, "CoroTest", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
@@ -83,7 +83,7 @@ public:
gate g;
env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
jtCLIENT, "CoroTest", [&](auto const& c) {
c->post();
c->yield();
g.signal();
@@ -109,7 +109,7 @@ public:
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValue-Test", [&]() {
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
@@ -120,7 +120,7 @@ public:
for (int i = 0; i < N; ++i)
{
jq.postCoro(jtCLIENT, "Coroutine-Test", [&, id = i](auto const& c) {
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
a[id] = c;
g.signal();
c->yield();
@@ -148,7 +148,7 @@ public:
c->join();
}
jq.addJob(jtCLIENT, "LocalValue-Test", [&]() {
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});

View File

@@ -119,9 +119,7 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
acquiringLedger_ = hash;
app_.getJobQueue().addJob(
jtADVANCE,
"getConsensusLedger1",
[id = hash, &app = app_, this]() {
jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(
@@ -420,7 +418,7 @@ RCLConsensus::Adaptor::onAccept(
{
app_.getJobQueue().addJob(
jtACCEPT,
"acceptLedger",
"AcceptLedger",
[=, this, cj = std::move(consensusJson)]() mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger

View File

@@ -122,13 +122,11 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
Application* pApp = &app_;
app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(
hash, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getJobQueue().addJob(jtADVANCE, "GetConsL2", [pApp, hash, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(
hash, 0, InboundLedger::Reason::CONSENSUS);
});
return std::nullopt;
}

View File

@@ -46,7 +46,7 @@ ConsensusTransSetSF::gotNode(
"xrpl::ConsensusTransSetSF::gotNode : transaction hash "
"match");
auto const pap = &app_;
app_.getJobQueue().addJob(jtTRANSACTION, "TXS->TXN", [pap, stx]() {
app_.getJobQueue().addJob(jtTRANSACTION, "TxsToTxn", [pap, stx]() {
pap->getOPs().submitTransaction(stx);
});
}

View File

@@ -48,9 +48,9 @@ OrderBookDB::setup(std::shared_ptr<ReadView const> const& ledger)
update(ledger);
else
app_.getJobQueue().addJob(
jtUPDATE_PF,
"OrderBookDB::update: " + std::to_string(ledger->seq()),
[this, ledger]() { update(ledger); });
jtUPDATE_PF, "OrderBookUpd", [this, ledger]() {
update(ledger);
});
}
}

View File

@@ -454,7 +454,7 @@ InboundLedger::done()
// We hold the PeerSet lock, so must dispatch
app_.getJobQueue().addJob(
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
jtLEDGER_DATA, "AcqDone", [self = shared_from_this()]() {
if (self->complete_ && !self->failed_)
{
self->app_.getLedgerMaster().checkAccept(self->getLedger());

View File

@@ -192,7 +192,7 @@ public:
// dispatch
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
app_.getJobQueue().addJob(
jtLEDGER_DATA, "processLedgerData", [ledger]() {
jtLEDGER_DATA, "ProcessLData", [ledger]() {
ledger->runData();
});
@@ -207,7 +207,7 @@ public:
if (packet->type() == protocol::liAS_NODE)
{
app_.getJobQueue().addJob(
jtLEDGER_DATA, "gotStaleData", [this, packet]() {
jtLEDGER_DATA, "GotStaleData", [this, packet]() {
gotStaleData(packet);
});
}

View File

@@ -21,7 +21,7 @@ LedgerDeltaAcquire::LedgerDeltaAcquire(
ledgerHash,
LedgerReplayParameters::SUB_TASK_TIMEOUT,
{jtREPLAY_TASK,
"LedgerReplayDelta",
"LedReplDelta",
LedgerReplayParameters::MAX_QUEUED_TASKS},
app.journal("LedgerReplayDelta"))
, inboundLedgers_(inboundLedgers)
@@ -225,7 +225,7 @@ LedgerDeltaAcquire::onLedgerBuilt(
}
app_.getJobQueue().addJob(
jtREPLAY_TASK,
"onLedgerBuilt",
"OnLedBuilt",
[=, ledger = this->fullLedger_, &app = this->app_]() {
for (auto reason : reasons)
{

View File

@@ -1344,7 +1344,7 @@ LedgerMaster::tryAdvance()
if (!mAdvanceThread && !mValidLedger.empty())
{
mAdvanceThread = true;
app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this]() {
app_.getJobQueue().addJob(jtADVANCE, "AdvanceLedger", [this]() {
std::unique_lock sl(m_mutex);
XRPL_ASSERT(
@@ -1482,7 +1482,7 @@ bool
LedgerMaster::newPathRequest()
{
std::unique_lock ml(m_mutex);
mPathFindNewRequest = newPFWork("pf:newRequest", ml);
mPathFindNewRequest = newPFWork("PthFindNewReq", ml);
return mPathFindNewRequest;
}
@@ -1503,7 +1503,7 @@ LedgerMaster::newOrderBookDB()
std::unique_lock ml(m_mutex);
mPathLedger.reset();
return newPFWork("pf:newOBDB", ml);
return newPFWork("PthFindOBDB", ml);
}
/** A thread needs to be dispatched to handle pathfinding work of some kind.
@@ -1841,7 +1841,7 @@ LedgerMaster::fetchForHistory(
mFillInProgress = seq;
}
app_.getJobQueue().addJob(
jtADVANCE, "tryFill", [this, ledger]() {
jtADVANCE, "TryFill", [this, ledger]() {
tryFill(ledger);
});
}
@@ -1980,7 +1980,7 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
}
app_.getOPs().clearNeedNetworkLedger();
progress = newPFWork("pf:newLedger", sl);
progress = newPFWork("PthFindNewLed", sl);
}
if (progress)
mAdvanceWork = true;
@@ -2011,7 +2011,7 @@ LedgerMaster::gotFetchPack(bool progress, std::uint32_t seq)
{
if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire))
{
app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&]() {
app_.getJobQueue().addJob(jtLEDGER_DATA, "GotFetchPack", [&]() {
app_.getInboundLedgers().gotFetchPack();
mGotFetchPackThread.clear(std::memory_order_release);
});

View File

@@ -77,7 +77,7 @@ LedgerReplayTask::LedgerReplayTask(
parameter.finishHash_,
LedgerReplayParameters::TASK_TIMEOUT,
{jtREPLAY_TASK,
"LedgerReplayTask",
"LedReplTask",
LedgerReplayParameters::MAX_QUEUED_TASKS},
app.journal("LedgerReplayTask"))
, inboundLedgers_(inboundLedgers)

View File

@@ -16,7 +16,7 @@ SkipListAcquire::SkipListAcquire(
ledgerHash,
LedgerReplayParameters::SUB_TASK_TIMEOUT,
{jtREPLAY_TASK,
"SkipListAcquire",
"SkipListAcq",
LedgerReplayParameters::MAX_QUEUED_TASKS},
app.journal("LedgerReplaySkipList"))
, inboundLedgers_(inboundLedgers)

View File

@@ -27,7 +27,7 @@ TransactionAcquire::TransactionAcquire(
app,
hash,
TX_ACQUIRE_TIMEOUT,
{jtTXN_DATA, "TransactionAcquire", {}},
{jtTXN_DATA, "TxAcq", {}},
app.journal("TransactionAcquire"))
, mHaveRoot(false)
, mPeerSet(std::move(peerSet))
@@ -60,7 +60,7 @@ TransactionAcquire::done()
// just updates the consensus and related structures when we acquire
// a transaction set. No need to update them if we're shutting down.
app_.getJobQueue().addJob(
jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
jtTXN_DATA, "ComplAcquire", [pap, hash, map]() {
pap->getInboundTransactions().giveSet(hash, map, true);
});
}

View File

@@ -331,8 +331,7 @@ run(int argc, char** argv)
{
using namespace std;
beast::setCurrentThreadName(
"rippled: main " + BuildInfo::getVersionString());
beast::setCurrentThreadName("main");
po::variables_map vm;

View File

@@ -12,9 +12,8 @@ NodeStoreScheduler::scheduleTask(NodeStore::Task& task)
if (jobQueue_.isStopped())
return;
if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task]() {
task.performScheduledTask();
}))
if (!jobQueue_.addJob(
jtWRITE, "NObjStore", [&task]() { task.performScheduledTask(); }))
{
// Job not added, presumably because we're shutting down.
// Recover by executing the task synchronously.

View File

@@ -981,7 +981,7 @@ NetworkOPsImp::setHeartbeatTimer()
heartbeatTimer_,
mConsensus.parms().ledgerGRANULARITY,
[this]() {
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
m_job_queue.addJob(jtNETOP_TIMER, "NetHeart", [this]() {
processHeartbeatTimer();
});
},
@@ -997,7 +997,7 @@ NetworkOPsImp::setClusterTimer()
clusterTimer_,
10s,
[this]() {
m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() {
m_job_queue.addJob(jtNETOP_CLUSTER, "NetCluster", [this]() {
processClusterTimer();
});
},
@@ -1225,7 +1225,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
auto tx = std::make_shared<Transaction>(trans, reason, app_);
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
m_job_queue.addJob(jtTRANSACTION, "SubmitTxn", [this, tx]() {
auto t = tx;
processTransaction(t, false, false, FailHard::no);
});
@@ -1323,7 +1323,7 @@ NetworkOPsImp::doTransactionAsync(
if (mDispatchState == DispatchState::none)
{
if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
jtBATCH, "TxBatchAsync", [this]() { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
}
@@ -1370,7 +1370,7 @@ NetworkOPsImp::doTransactionSyncBatch(
if (mTransactions.size())
{
// More transactions need to be applied, but by another job.
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
if (m_job_queue.addJob(jtBATCH, "TxBatchSync", [this]() {
transactionBatch();
}))
{
@@ -3208,19 +3208,16 @@ NetworkOPsImp::reportFeeChange()
if (f != mLastFeeSummary)
{
m_job_queue.addJob(
jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() {
pubServer();
});
jtCLIENT_FEE_CHANGE, "PubFee", [this]() { pubServer(); });
}
}
void
NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase)
{
m_job_queue.addJob(
jtCLIENT_CONSENSUS,
"reportConsensusStateChange->pubConsensus",
[this, phase]() { pubConsensus(phase); });
m_job_queue.addJob(jtCLIENT_CONSENSUS, "PubCons", [this, phase]() {
pubConsensus(phase);
});
}
inline void
@@ -3728,7 +3725,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
app_.getJobQueue().addJob(
jtCLIENT_ACCT_HIST,
"AccountHistoryTxStream",
"HistTxStream",
[this, dbType = databaseType, subInfo]() {
auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;

View File

@@ -1158,7 +1158,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
fee_.update(Resource::feeModerateBurdenPeer, "oversize");
app_.getJobQueue().addJob(
jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
jtMANIFEST, "RcvManifests", [this, that = shared_from_this(), m]() {
overlay_.onManifests(m, that);
});
}
@@ -1452,7 +1452,7 @@ PeerImp::handleTransaction(
{
app_.getJobQueue().addJob(
jtTRANSACTION,
"recvTransaction->checkTransaction",
"RcvCheckTx",
[weak = std::weak_ptr<PeerImp>(shared_from_this()),
flags,
checkSignature,
@@ -1555,7 +1555,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
// Queue a job to process the request
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
app_.getJobQueue().addJob(jtLEDGER_REQ, "RcvGetLedger", [weak, m]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m);
});
@@ -1575,29 +1575,27 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
fee_.update(
Resource::feeModerateBurdenPeer, "received a proof path request");
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
if (auto peer = weak.lock())
app_.getJobQueue().addJob(jtREPLAY_REQ, "RcvProofPReq", [weak, m]() {
if (auto peer = weak.lock())
{
auto reply =
peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
if (reply.has_error())
{
auto reply =
peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
if (reply.has_error())
{
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(
Resource::feeMalformedRequest,
"proof_path_request");
else
peer->charge(
Resource::feeRequestNoReply, "proof_path_request");
}
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(
Resource::feeMalformedRequest, "proof_path_request");
else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtPROOF_PATH_RESPONSE));
}
peer->charge(
Resource::feeRequestNoReply, "proof_path_request");
}
});
else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtPROOF_PATH_RESPONSE));
}
}
});
}
void
@@ -1629,30 +1627,27 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
fee_.fee = Resource::feeModerateBurdenPeer;
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
if (auto peer = weak.lock())
app_.getJobQueue().addJob(jtREPLAY_REQ, "RcvReplDReq", [weak, m]() {
if (auto peer = weak.lock())
{
auto reply =
peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
if (reply.has_error())
{
auto reply =
peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
if (reply.has_error())
{
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(
Resource::feeMalformedRequest,
"replay_delta_request");
else
peer->charge(
Resource::feeRequestNoReply,
"replay_delta_request");
}
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(
Resource::feeMalformedRequest, "replay_delta_request");
else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtREPLAY_DELTA_RESPONSE));
}
peer->charge(
Resource::feeRequestNoReply, "replay_delta_request");
}
});
else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtREPLAY_DELTA_RESPONSE));
}
}
});
}
void
@@ -1748,7 +1743,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{
std::weak_ptr<PeerImp> weak{shared_from_this()};
app_.getJobQueue().addJob(
jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() {
jtTXN_DATA, "RcvPeerData", [weak, ledgerHash, m]() {
if (auto peer = weak.lock())
{
peer->app_.getInboundTransactions().gotData(
@@ -1876,7 +1871,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"recvPropose->checkPropose",
"checkPropose",
[weak, isTrusted, m, proposal]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
@@ -2490,18 +2485,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
}
else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
{
std::string const name = [isTrusted, val]() {
std::string ret =
isTrusted ? "Trusted validation" : "Untrusted validation";
#ifdef DEBUG
ret += " " +
std::to_string(val->getFieldU32(sfLedgerSequence)) + ": " +
to_string(val->getNodeID());
#endif
return ret;
}();
std::string const name = isTrusted ? "ChkTrust" : "ChkUntrust";
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
@@ -2561,11 +2545,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
}
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREQUESTED_TXN, "doTransactions", [weak, m]() {
if (auto peer = weak.lock())
peer->doTransactions(m);
});
app_.getJobQueue().addJob(jtREQUESTED_TXN, "DoTxs", [weak, m]() {
if (auto peer = weak.lock())
peer->doTransactions(m);
});
return;
}
@@ -2705,11 +2688,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
}
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtMISSING_TXN, "handleHaveTransactions", [weak, m]() {
if (auto peer = weak.lock())
peer->handleHaveTransactions(m);
});
app_.getJobQueue().addJob(jtMISSING_TXN, "HandleHaveTxs", [weak, m]() {
if (auto peer = weak.lock())
peer->handleHaveTransactions(m);
});
}
void

View File

@@ -72,7 +72,7 @@ public:
JLOG(j_.info()) << "RPCCall::fromNetwork start";
mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
jtCLIENT_SUBSCRIBE, "RPCSubSendThr", [this]() {
sendThread();
});
}