Use lambdas everywhere in JobQueue.

Conflicts:
	src/ripple/app/ledger/impl/LedgerConsensusImp.cpp
This commit is contained in:
Tom Ritchford
2015-08-10 13:49:33 -04:00
committed by Nik Bougalis
parent a6f866b4d8
commit c1f50ca7b3
25 changed files with 154 additions and 129 deletions

View File

@@ -59,8 +59,9 @@ void ConsensusTransSetSF::gotNode (
assert (stx->getTransactionID () == nodeHash); assert (stx->getTransactionID () == nodeHash);
getApp().getJobQueue ().addJob ( getApp().getJobQueue ().addJob (
jtTRANSACTION, "TXS->TXN", jtTRANSACTION, "TXS->TXN",
std::bind (&NetworkOPs::submitTransaction, &getApp().getOPs (), [stx] (Job&) {
std::placeholders::_1, stx)); getApp().getOPs().submitTransaction(stx);
});
} }
catch (...) catch (...)
{ {

View File

@@ -56,7 +56,7 @@ public:
std::shared_ptr<Peer>, std::shared_ptr<Peer>,
std::shared_ptr <protocol::TMLedgerData>) = 0; std::shared_ptr <protocol::TMLedgerData>) = 0;
virtual void doLedgerData (Job&, LedgerHash hash) = 0; virtual void doLedgerData (LedgerHash hash) = 0;
virtual void gotStaleData ( virtual void gotStaleData (
std::shared_ptr <protocol::TMLedgerData> packet) = 0; std::shared_ptr <protocol::TMLedgerData> packet) = 0;
@@ -77,7 +77,7 @@ public:
/** Called when a complete ledger is obtained. */ /** Called when a complete ledger is obtained. */
virtual void onLedgerFetched (InboundLedger::fcReason why) = 0; virtual void onLedgerFetched (InboundLedger::fcReason why) = 0;
virtual void gotFetchPack (Job&) = 0; virtual void gotFetchPack () = 0;
virtual void sweep () = 0; virtual void sweep () = 0;
virtual void onStop() = 0; virtual void onStop() = 0;

View File

@@ -1295,20 +1295,22 @@ bool Ledger::pendSaveValidated (bool isSynchronous, bool isCurrent)
} }
if (isSynchronous) if (isSynchronous)
{
return saveValidatedLedger(isCurrent); return saveValidatedLedger(isCurrent);
}
else if (isCurrent) auto that = shared_from_this();
auto job = [that, isCurrent] (Job&) {
that->saveValidatedLedger(isCurrent);
};
if (isCurrent)
{ {
getApp().getJobQueue ().addJob (jtPUBLEDGER, "Ledger::pendSave", getApp().getJobQueue().addJob(
std::bind (&Ledger::saveValidatedLedgerAsync, shared_from_this (), jtPUBLEDGER, "Ledger::pendSave", job);
std::placeholders::_1, isCurrent));
} }
else else
{ {
getApp().getJobQueue ().addJob (jtPUBOLDLEDGER, "Ledger::pendOldSave", getApp().getJobQueue ().addJob(
std::bind (&Ledger::saveValidatedLedgerAsync, shared_from_this (), jtPUBOLDLEDGER, "Ledger::pendOldSave", job);
std::placeholders::_1, isCurrent));
} }
return true; return true;

View File

@@ -373,10 +373,6 @@ private:
class sles_iter_impl; class sles_iter_impl;
class txs_iter_impl; class txs_iter_impl;
void saveValidatedLedgerAsync(Job&, bool current)
{
saveValidatedLedger(current);
}
bool saveValidatedLedger (bool current); bool saveValidatedLedger (bool current);
void void

View File

@@ -191,7 +191,7 @@ public:
virtual virtual
void makeFetchPack ( void makeFetchPack (
Job&, std::weak_ptr<Peer> const& wPeer, std::weak_ptr<Peer> const& wPeer,
std::shared_ptr<protocol::TMGetObjectByHash> const& request, std::shared_ptr<protocol::TMGetObjectByHash> const& request,
uint256 haveLedgerHash, uint256 haveLedgerHash,
std::uint32_t uUptime) = 0; std::uint32_t uUptime) = 0;

View File

@@ -67,8 +67,9 @@ void OrderBookDB::setup(
if (getConfig().RUN_STANDALONE) if (getConfig().RUN_STANDALONE)
update(ledger); update(ledger);
else else
getApp().getJobQueue().addJob(jtUPDATE_PF, "OrderBookDB::update", getApp().getJobQueue().addJob(
std::bind(&OrderBookDB::update, this, ledger)); jtUPDATE_PF, "OrderBookDB::update",
[this, ledger] (Job&) { update(ledger); });
} }
void OrderBookDB::update( void OrderBookDB::update(

View File

@@ -326,7 +326,6 @@ std::weak_ptr<PeerSet> InboundLedger::pmDowncast ()
/** Dispatch acquire completion /** Dispatch acquire completion
*/ */
static void LADispatch ( static void LADispatch (
Job& job,
InboundLedger::pointer la, InboundLedger::pointer la,
std::vector< std::function<void (InboundLedger::pointer)> > trig) std::vector< std::function<void (InboundLedger::pointer)> > trig)
{ {
@@ -371,9 +370,10 @@ void InboundLedger::done ()
} }
// We hold the PeerSet lock, so must dispatch // We hold the PeerSet lock, so must dispatch
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "triggers", auto that = shared_from_this ();
std::bind (LADispatch, std::placeholders::_1, shared_from_this (), getApp().getJobQueue ().addJob (
triggers)); jtLEDGER_DATA, "triggers",
[that, triggers] (Job&) { LADispatch(that, triggers); });
} }
bool InboundLedger::addOnComplete ( bool InboundLedger::addOnComplete (

View File

@@ -164,8 +164,9 @@ public:
// useful. // useful.
if (packet.type () == protocol::liAS_NODE) if (packet.type () == protocol::liAS_NODE)
{ {
getApp().getJobQueue().addJob(jtLEDGER_DATA, "gotStaleData", getApp().getJobQueue().addJob(
std::bind(&InboundLedgers::gotStaleData, this, packet_ptr)); jtLEDGER_DATA, "gotStaleData",
[this, packet_ptr] (Job&) { gotStaleData(packet_ptr); });
} }
return false; return false;
@@ -173,9 +174,9 @@ public:
// Stash the data for later processing and see if we need to dispatch // Stash the data for later processing and see if we need to dispatch
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet_ptr)) if (ledger->gotData(std::weak_ptr<Peer>(peer), packet_ptr))
getApp().getJobQueue().addJob (jtLEDGER_DATA, "processLedgerData", getApp().getJobQueue().addJob (
std::bind (&InboundLedgers::doLedgerData, this, jtLEDGER_DATA, "processLedgerData",
std::placeholders::_1, hash)); [this, hash] (Job&) { doLedgerData(hash); });
return true; return true;
} }
@@ -223,7 +224,7 @@ public:
return mRecentFailures.find (h) != mRecentFailures.end(); return mRecentFailures.find (h) != mRecentFailures.end();
} }
void doLedgerData (Job&, LedgerHash hash) void doLedgerData (LedgerHash hash)
{ {
InboundLedger::pointer ledger = find (hash); InboundLedger::pointer ledger = find (hash);
@@ -334,7 +335,7 @@ public:
return ret; return ret;
} }
void gotFetchPack (Job&) void gotFetchPack ()
{ {
std::vector<InboundLedger::pointer> acquires; std::vector<InboundLedger::pointer> acquires;
{ {

View File

@@ -643,13 +643,13 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash)
// Tell the ledger acquire system that we need the consensus ledger // Tell the ledger acquire system that we need the consensus ledger
mAcquiringLedger = mPrevLedgerHash; mAcquiringLedger = mPrevLedgerHash;
auto& previousHash = mPrevLedgerHash; auto& previousHash = mPrevLedgerHash;
auto acquire = [previousHash] (Job&) { getApp().getJobQueue().addJob (
jtADVANCE, "getConsensusLedger",
[previousHash] (Job&) {
getApp().getInboundLedgers().acquire( getApp().getInboundLedgers().acquire(
previousHash, 0, InboundLedger::fcCONSENSUS); previousHash, 0, InboundLedger::fcCONSENSUS);
}; });
getApp().getJobQueue().addJob (
jtADVANCE, "getConsensusLedger", acquire);
;
mHaveCorrectLCL = false; mHaveCorrectLCL = false;
} }
return; return;

View File

@@ -1201,7 +1201,7 @@ public:
mAdvanceThread = true; mAdvanceThread = true;
getApp().getJobQueue ().addJob ( getApp().getJobQueue ().addJob (
jtADVANCE, "advanceLedger", jtADVANCE, "advanceLedger",
std::bind (&LedgerMasterImp::advanceThread, this)); [this] (Job&) { advanceThread(); });
} }
} }
@@ -1342,9 +1342,9 @@ public:
if (mPathFindThread < 2) if (mPathFindThread < 2)
{ {
++mPathFindThread; ++mPathFindThread;
getApp().getJobQueue().addJob (jtUPDATE_PF, name, getApp().getJobQueue().addJob (
std::bind (&LedgerMasterImp::updatePaths, this, jtUPDATE_PF, name,
std::placeholders::_1)); [this] (Job& j) { updatePaths(j); });
} }
} }
@@ -1642,7 +1642,7 @@ public:
Blob& data) override; Blob& data) override;
void makeFetchPack ( void makeFetchPack (
Job&, std::weak_ptr<Peer> const& wPeer, std::weak_ptr<Peer> const& wPeer,
std::shared_ptr<protocol::TMGetObjectByHash> const& request, std::shared_ptr<protocol::TMGetObjectByHash> const& request,
uint256 haveLedgerHash, uint256 haveLedgerHash,
std::uint32_t uUptime) override; std::uint32_t uUptime) override;
@@ -1751,9 +1751,10 @@ void LedgerMasterImp::doAdvance ()
ScopedLockType lock (m_mutex); ScopedLockType lock (m_mutex);
mFillInProgress = ledger->info().seq; mFillInProgress = ledger->info().seq;
getApp().getJobQueue().addJob( getApp().getJobQueue().addJob(
jtADVANCE, "tryFill", std::bind ( jtADVANCE, "tryFill",
&LedgerMasterImp::tryFill, this, [this, ledger] (Job& j) {
std::placeholders::_1, ledger)); tryFill(j, ledger);
});
} }
progress = true; progress = true;
} }
@@ -1864,12 +1865,11 @@ void LedgerMasterImp::gotFetchPack (
getApp().getJobQueue().addJob ( getApp().getJobQueue().addJob (
jtLEDGER_DATA, "gotFetchPack", jtLEDGER_DATA, "gotFetchPack",
std::bind (&InboundLedgers::gotFetchPack, [] (Job&) { getApp().getInboundLedgers().gotFetchPack(); });
&getApp().getInboundLedgers (), std::placeholders::_1));
} }
void LedgerMasterImp::makeFetchPack ( void LedgerMasterImp::makeFetchPack (
Job&, std::weak_ptr<Peer> const& wPeer, std::weak_ptr<Peer> const& wPeer,
std::shared_ptr<protocol::TMGetObjectByHash> const& request, std::shared_ptr<protocol::TMGetObjectByHash> const& request,
uint256 haveLedgerHash, uint256 haveLedgerHash,
std::uint32_t uUptime) std::uint32_t uUptime)

View File

@@ -997,13 +997,11 @@ public:
getApp().signalStop (); getApp().signalStop ();
} }
m_jobQueue->addJob(jtSWEEP, "sweep", m_jobQueue->addJob(jtSWEEP, "sweep", [this] (Job&) { doSweep(); });
std::bind(&ApplicationImp::doSweep, this,
std::placeholders::_1));
} }
} }
void doSweep (Job& j) void doSweep ()
{ {
// VFALCO NOTE Does the order of calls matter? // VFALCO NOTE Does the order of calls matter?
// VFALCO TODO fix the dependency inversion using an observer, // VFALCO TODO fix the dependency inversion using an observer,

View File

@@ -50,11 +50,10 @@ void NodeStoreScheduler::scheduleTask (NodeStore::Task& task)
m_jobQueue->addJob ( m_jobQueue->addJob (
jtWRITE, jtWRITE,
"NodeObject::store", "NodeObject::store",
std::bind (&NodeStoreScheduler::doTask, [this, &task] (Job&) { doTask(task); });
this, std::ref(task), std::placeholders::_1));
} }
void NodeStoreScheduler::doTask (NodeStore::Task& task, Job&) void NodeStoreScheduler::doTask (NodeStore::Task& task)
{ {
task.performScheduledTask (); task.performScheduledTask ();
if ((--m_taskCount == 0) && isStopping()) if ((--m_taskCount == 0) && isStopping())

View File

@@ -47,7 +47,7 @@ public:
void onBatchWrite (NodeStore::BatchWriteReport const& report) override; void onBatchWrite (NodeStore::BatchWriteReport const& report) override;
private: private:
void doTask (NodeStore::Task& task, Job&); void doTask (NodeStore::Task& task);
JobQueue* m_jobQueue; JobQueue* m_jobQueue;
std::atomic <int> m_taskCount; std::atomic <int> m_taskCount;

View File

@@ -152,7 +152,7 @@ public:
// //
// Must complete immediately. // Must complete immediately.
void submitTransaction (Job&, STTx::pointer) override; void submitTransaction (STTx::pointer) override;
void processTransaction ( void processTransaction (
Transaction::pointer& transaction, Transaction::pointer& transaction,
@@ -515,12 +515,12 @@ void NetworkOPsImp::onDeadlineTimer (beast::DeadlineTimer& timer)
if (timer == m_heartbeatTimer) if (timer == m_heartbeatTimer)
{ {
m_job_queue.addJob (jtNETOP_TIMER, "NetOPs.heartbeat", m_job_queue.addJob (jtNETOP_TIMER, "NetOPs.heartbeat",
std::bind (&NetworkOPsImp::processHeartbeatTimer, this)); [this] (Job&) { processHeartbeatTimer(); });
} }
else if (timer == m_clusterTimer) else if (timer == m_clusterTimer)
{ {
m_job_queue.addJob (jtNETOP_CLUSTER, "NetOPs.cluster", m_job_queue.addJob (jtNETOP_CLUSTER, "NetOPs.cluster",
std::bind (&NetworkOPsImp::processClusterTimer, this)); [this] (Job&) { processClusterTimer(); });
} }
} }
@@ -639,7 +639,7 @@ std::string NetworkOPsImp::strOperatingMode () const
return paStatusToken[mMode]; return paStatusToken[mMode];
} }
void NetworkOPsImp::submitTransaction (Job&, STTx::pointer iTrans) void NetworkOPsImp::submitTransaction (STTx::pointer iTrans)
{ {
if (isNeedNetworkLedger ()) if (isNeedNetworkLedger ())
{ {
@@ -698,14 +698,13 @@ void NetworkOPsImp::submitTransaction (Job&, STTx::pointer iTrans)
} }
} }
m_job_queue.addJob (jtTRANSACTION, "submitTxn", auto tx = std::make_shared<Transaction> (
std::bind (&NetworkOPsImp::processTransaction, trans, Validate::NO, directSigVerify, reason);
this,
std::make_shared<Transaction> (trans, Validate::NO, m_job_queue.addJob (jtTRANSACTION, "submitTxn", [this, tx] (Job&) {
directSigVerify, reason), auto t = tx;
false, processTransaction(t, false, false, FailHard::no);
false, });
FailHard::no));
} }
void NetworkOPsImp::processTransaction (Transaction::pointer& transaction, void NetworkOPsImp::processTransaction (Transaction::pointer& transaction,
@@ -763,7 +762,7 @@ void NetworkOPsImp::doTransactionAsync (Transaction::pointer transaction,
if (mDispatchState == DispatchState::none) if (mDispatchState == DispatchState::none)
{ {
m_job_queue.addJob (jtBATCH, "transactionBatch", m_job_queue.addJob (jtBATCH, "transactionBatch",
std::bind (&NetworkOPsImp::transactionBatch, this)); [this] (Job&) { transactionBatch(); });
mDispatchState = DispatchState::scheduled; mDispatchState = DispatchState::scheduled;
} }
} }
@@ -795,7 +794,7 @@ void NetworkOPsImp::doTransactionSync (Transaction::pointer transaction,
{ {
// More transactions need to be applied, but by another job. // More transactions need to be applied, but by another job.
m_job_queue.addJob (jtBATCH, "transactionBatch", m_job_queue.addJob (jtBATCH, "transactionBatch",
std::bind (&NetworkOPsImp::transactionBatch, this)); [this] (Job&) { transactionBatch(); });
mDispatchState = DispatchState::scheduled; mDispatchState = DispatchState::scheduled;
} }
} }
@@ -2163,7 +2162,7 @@ void NetworkOPsImp::reportFeeChange ()
m_job_queue.addJob ( m_job_queue.addJob (
jtCLIENT, "reportFeeChange->pubServer", jtCLIENT, "reportFeeChange->pubServer",
std::bind (&NetworkOPsImp::pubServer, this)); [this] (Job&) { pubServer(); });
} }
// This routine should only be used to publish accepted or validated // This routine should only be used to publish accepted or validated

View File

@@ -110,7 +110,7 @@ public:
// //
// must complete immediately // must complete immediately
virtual void submitTransaction (Job&, STTx::pointer) = 0; virtual void submitTransaction (STTx::pointer) = 0;
/** /**
* Process transactions as they arrive from the network or which are * Process transactions as they arrive from the network or which are

View File

@@ -451,13 +451,14 @@ void UniqueNodeListImp::onDeadlineTimer (beast::DeadlineTimer& timer)
{ {
if (timer == m_scoreTimer) if (timer == m_scoreTimer)
{ {
getApp().getJobQueue ().addJob (jtUNL, "UNL.score", getApp().getJobQueue ().addJob (
std::bind (&UniqueNodeListImp::doScore, this)); jtUNL, "UNL.score",
[this] (Job&) { doScore(); });
} }
else if (timer == m_fetchTimer) else if (timer == m_fetchTimer)
{ {
getApp().getJobQueue ().addJob (jtUNL, "UNL.fetch", getApp().getJobQueue ().addJob (jtUNL, "UNL.fetch",
std::bind (&UniqueNodeListImp::doFetch, this)); [this] (Job&) { doFetch(); });
} }
} }

View File

@@ -435,12 +435,12 @@ private:
return; return;
mWriting = true; mWriting = true;
getApp().getJobQueue ().addJob (jtWRITE, "Validations::doWrite", getApp().getJobQueue ().addJob (
std::bind (&ValidationsImp::doWrite, jtWRITE, "Validations::doWrite",
this, std::placeholders::_1)); [this] (Job&) { doWrite(); });
} }
void doWrite (Job&) void doWrite ()
{ {
LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtDISK, "ValidationWrite")); LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtDISK, "ValidationWrite"));
boost::format insVal ("INSERT INTO Validations " boost::format insVal ("INSERT INTO Validations "

View File

@@ -223,8 +223,7 @@ private:
running_ = true; running_ = true;
} }
jobQueue_.addJob ( jobQueue_.addJob (jtWAL, "WAL", [this] (Job&) { checkpoint(); });
jtWAL, "WAL", std::bind (&WALCheckpointer::checkpoint, this));
} }
void checkpoint () void checkpoint ()

View File

@@ -99,7 +99,9 @@ public:
WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork start"; WriteLog (lsINFO, RPCSub) << "RPCCall::fromNetwork start";
m_jobQueue.addJob ( m_jobQueue.addJob (
jtCLIENT, "RPCSub::sendThread", std::bind (&RPCSubImp::sendThread, this)); jtCLIENT, "RPCSub::sendThread", [this] (Job&) {
sendThread();
});
} }
} }

View File

@@ -104,8 +104,8 @@ public:
} }
private: private:
static void TimerEntry (std::weak_ptr<PeerSet>, const boost::system::error_code& result); static void timerEntry (std::weak_ptr<PeerSet>, const boost::system::error_code& result);
static void TimerJobEntry (Job&, std::shared_ptr<PeerSet>); static void timerJobEntry (std::shared_ptr<PeerSet>);
protected: protected:
// VFALCO TODO try to make some of these private // VFALCO TODO try to make some of these private

View File

@@ -631,7 +631,7 @@ OverlayImpl::onPeerDeactivate (Peer::id_t id,
} }
void void
OverlayImpl::onManifests (Job&, OverlayImpl::onManifests (
std::shared_ptr<protocol::TMManifests> const& m, std::shared_ptr<protocol::TMManifests> const& m,
std::shared_ptr<PeerImp> const& from) std::shared_ptr<PeerImp> const& from)
{ {

View File

@@ -253,7 +253,7 @@ public:
// Called when TMManifests is received from a peer // Called when TMManifests is received from a peer
void void
onManifests (Job&, onManifests (
std::shared_ptr<protocol::TMManifests> const& m, std::shared_ptr<protocol::TMManifests> const& m,
std::shared_ptr<PeerImp> const& from); std::shared_ptr<PeerImp> const& from);

View File

@@ -835,9 +835,10 @@ void
PeerImp::onMessage (std::shared_ptr<protocol::TMManifests> const& m) PeerImp::onMessage (std::shared_ptr<protocol::TMManifests> const& m)
{ {
// VFALCO What's the right job type? // VFALCO What's the right job type?
getApp().getJobQueue().addJob (jtVALIDATION_ut, auto that = shared_from_this();
"receiveManifests", std::bind(&OverlayImpl::onManifests, getApp().getJobQueue().addJob (
&overlay_, std::placeholders::_1, m, shared_from_this())); jtVALIDATION_ut, "receiveManifests",
[this, that, m] (Job&) { overlay_.onManifests(m, that); });
} }
void void
@@ -1018,8 +1019,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
try try
{ {
STTx::pointer stx = std::make_shared < auto stx = std::make_shared<STTx>(sit);
STTx> (std::ref (sit));
uint256 txID = stx->getTransactionID (); uint256 txID = stx->getTransactionID ();
int flags; int flags;
@@ -1063,10 +1063,15 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240) else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240)
p_journal_.trace << "No new transactions until synchronized"; p_journal_.trace << "No new transactions until synchronized";
else else
getApp().getJobQueue ().addJob (jtTRANSACTION, {
"recvTransaction->checkTransaction", std::weak_ptr<PeerImp> weak = shared_from_this();
std::bind(beast::weak_fn(&PeerImp::checkTransaction, getApp().getJobQueue ().addJob (
shared_from_this()), std::placeholders::_1, flags, stx)); jtTRANSACTION, "recvTransaction->checkTransaction",
[weak, flags, stx] (Job&) {
if (auto peer = weak.lock())
peer->checkTransaction(flags, stx);
});
}
} }
catch (...) catch (...)
{ {
@@ -1079,8 +1084,13 @@ void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetLedger> const& m) PeerImp::onMessage (std::shared_ptr <protocol::TMGetLedger> const& m)
{ {
fee_ = Resource::feeMediumBurdenPeer; fee_ = Resource::feeMediumBurdenPeer;
getApp().getJobQueue().addJob (jtLEDGER_REQ, "recvGetLedger", std::bind( std::weak_ptr<PeerImp> weak = shared_from_this();
beast::weak_fn(&PeerImp::getLedger, shared_from_this()), m)); getApp().getJobQueue().addJob (
jtLEDGER_REQ, "recvGetLedger",
[weak, m] (Job&) {
if (auto peer = weak.lock())
peer->getLedger(m);
});
} }
void void
@@ -1125,9 +1135,14 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
if (m->type () == protocol::liTS_CANDIDATE) if (m->type () == protocol::liTS_CANDIDATE)
{ {
// got data for a candidate transaction set // got data for a candidate transaction set
getApp().getJobQueue().addJob(jtTXN_DATA, "recvPeerData", std::bind( std::weak_ptr<PeerImp> weak = shared_from_this();
beast::weak_fn(&PeerImp::peerTXData, shared_from_this()), auto& journal = p_journal_;
std::placeholders::_1, hash, m, p_journal_)); getApp().getJobQueue().addJob(
jtTXN_DATA, "recvPeerData",
[weak, hash, journal, m] (Job&) {
if (auto peer = weak.lock())
peer->peerTXData(hash, m, journal);
});
return; return;
} }
@@ -1223,10 +1238,13 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
signerPublic, PublicKey(makeSlice(set.nodepubkey())), signerPublic, PublicKey(makeSlice(set.nodepubkey())),
suppression); suppression);
getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, std::weak_ptr<PeerImp> weak = shared_from_this();
"recvPropose->checkPropose", std::bind(beast::weak_fn( getApp().getJobQueue ().addJob (
&PeerImp::checkPropose, shared_from_this()), std::placeholders::_1, isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose",
m, proposal)); [weak, m, proposal] (Job& job) {
if (auto peer = weak.lock())
peer->checkPropose(job, m, proposal);
});
} }
void void
@@ -1457,11 +1475,14 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
} }
if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ()) if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ())
{ {
getApp().getJobQueue ().addJob (isTrusted ? std::weak_ptr<PeerImp> weak = shared_from_this();
jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", getApp().getJobQueue ().addJob (
std::bind(beast::weak_fn(&PeerImp::checkValidation, isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
shared_from_this()), std::placeholders::_1, val, "recvValidation->checkValidation",
isTrusted, m)); [weak, val, isTrusted, m] (Job&) {
if (auto peer = weak.lock())
peer->checkValidation(val, isTrusted, m);
});
} }
else else
{ {
@@ -1684,15 +1705,18 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
uint256 hash; uint256 hash;
memcpy (hash.begin (), packet->ledgerhash ().data (), 32); memcpy (hash.begin (), packet->ledgerhash ().data (), 32);
getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", std::weak_ptr<PeerImp> weak = shared_from_this();
std::bind (&LedgerMaster::makeFetchPack, &getApp().getLedgerMaster (), auto elapsed = UptimeTimer::getInstance().getElapsedSeconds();
std::placeholders::_1, std::weak_ptr<PeerImp> (shared_from_this ()), getApp().getJobQueue ().addJob (
packet, hash, UptimeTimer::getInstance ().getElapsedSeconds ())); jtPACK, "MakeFetchPack",
[weak, packet, hash, elapsed] (Job&) {
getApp().getLedgerMaster().makeFetchPack(
weak, packet, hash, elapsed);
});
} }
void void
PeerImp::checkTransaction (Job&, int flags, PeerImp::checkTransaction (int flags, STTx::pointer stx)
STTx::pointer stx)
{ {
// VFALCO TODO Rewrite to not use exceptions // VFALCO TODO Rewrite to not use exceptions
try try
@@ -1789,7 +1813,7 @@ PeerImp::checkPropose (Job& job,
} }
void void
PeerImp::checkValidation (Job&, STValidation::pointer val, PeerImp::checkValidation (STValidation::pointer val,
bool isTrusted, std::shared_ptr<protocol::TMValidation> const& packet) bool isTrusted, std::shared_ptr<protocol::TMValidation> const& packet)
{ {
try try
@@ -2212,7 +2236,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
} }
void void
PeerImp::peerTXData (Job&, uint256 const& hash, PeerImp::peerTXData (uint256 const& hash,
std::shared_ptr <protocol::TMLedgerData> const& pPacket, std::shared_ptr <protocol::TMLedgerData> const& pPacket,
beast::Journal journal) beast::Journal journal)
{ {

View File

@@ -456,7 +456,7 @@ private:
doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet); doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet);
void void
checkTransaction (Job&, int flags, STTx::pointer stx); checkTransaction (int flags, STTx::pointer stx);
void void
checkPropose (Job& job, checkPropose (Job& job,
@@ -464,7 +464,7 @@ private:
LedgerProposal::pointer proposal); LedgerProposal::pointer proposal);
void void
checkValidation (Job&, STValidation::pointer val, checkValidation (STValidation::pointer val,
bool isTrusted, std::shared_ptr<protocol::TMValidation> const& packet); bool isTrusted, std::shared_ptr<protocol::TMValidation> const& packet);
void void
@@ -472,7 +472,7 @@ private:
// Called when we receive tx set data. // Called when we receive tx set data.
void void
peerTXData (Job&, uint256 const& hash, peerTXData (uint256 const& hash,
std::shared_ptr <protocol::TMLedgerData> const& pPacket, std::shared_ptr <protocol::TMLedgerData> const& pPacket,
beast::Journal journal); beast::Journal journal);
}; };

View File

@@ -71,7 +71,7 @@ bool PeerSet::insert (Peer::ptr const& ptr)
void PeerSet::setTimer () void PeerSet::setTimer ()
{ {
mTimer.expires_from_now (boost::posix_time::milliseconds (mTimerInterval)); mTimer.expires_from_now (boost::posix_time::milliseconds (mTimerInterval));
mTimer.async_wait (std::bind (&PeerSet::TimerEntry, pmDowncast (), beast::asio::placeholders::error)); mTimer.async_wait (std::bind (&PeerSet::timerEntry, pmDowncast (), beast::asio::placeholders::error));
} }
void PeerSet::invokeOnTimer () void PeerSet::invokeOnTimer ()
@@ -98,7 +98,7 @@ void PeerSet::invokeOnTimer ()
setTimer (); setTimer ();
} }
void PeerSet::TimerEntry (std::weak_ptr<PeerSet> wptr, const boost::system::error_code& result) void PeerSet::timerEntry (std::weak_ptr<PeerSet> wptr, const boost::system::error_code& result)
{ {
if (result == boost::asio::error::operation_aborted) if (result == boost::asio::error::operation_aborted)
return; return;
@@ -113,9 +113,10 @@ void PeerSet::TimerEntry (std::weak_ptr<PeerSet> wptr, const boost::system::erro
// //
if (ptr->mTxnData) if (ptr->mTxnData)
{ {
getApp().getJobQueue ().addJob (jtTXN_DATA, "timerEntryTxn", getApp().getJobQueue ().addJob (
std::bind (&PeerSet::TimerJobEntry, std::placeholders::_1, jtTXN_DATA, "timerEntryTxn", [ptr] (Job&) {
ptr)); timerJobEntry(ptr);
});
} }
else else
{ {
@@ -127,14 +128,15 @@ void PeerSet::TimerEntry (std::weak_ptr<PeerSet> wptr, const boost::system::erro
ptr->setTimer (); ptr->setTimer ();
} }
else else
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntryLgr", getApp().getJobQueue ().addJob (
std::bind (&PeerSet::TimerJobEntry, std::placeholders::_1, jtLEDGER_DATA, "timerEntryLgr", [ptr] (Job&) {
ptr)); timerJobEntry(ptr);
});
} }
} }
} }
void PeerSet::TimerJobEntry (Job&, std::shared_ptr<PeerSet> ptr) void PeerSet::timerJobEntry (std::shared_ptr<PeerSet> ptr)
{ {
ptr->invokeOnTimer (); ptr->invokeOnTimer ();
} }