Defer publishing a ledger to clients until it accumulates sufficient

validations. We now have an open ledger, a last closed ledger, and a last
validated ledger. Normally, a ledger will be validated right after it
closes, but in edge cases, we might see a ledger close that then doesn't get
validated. This makes the code do the right thing.
This commit is contained in:
JoelKatz
2013-01-02 12:04:16 -08:00
parent b71d0a93f9
commit 1127ae560e
6 changed files with 95 additions and 74 deletions

View File

@@ -15,6 +15,7 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
mJobLoads[jtPROOFWORK].setTargetLatency(2000, 5000); mJobLoads[jtPROOFWORK].setTargetLatency(2000, 5000);
mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000); mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000);
mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250); mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250);
mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500);
mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500); mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500);
mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500); mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500);
mJobLoads[jtPROPOSAL_t].setTargetLatency(100, 500); mJobLoads[jtPROPOSAL_t].setTargetLatency(100, 500);
@@ -24,7 +25,6 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
mJobLoads[jtDISK].setTargetLatency(500, 1000); mJobLoads[jtDISK].setTargetLatency(500, 1000);
mJobLoads[jtRPC].setTargetLatency(250, 750); mJobLoads[jtRPC].setTargetLatency(250, 750);
mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500); mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500);
mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500);
} }
@@ -38,6 +38,7 @@ const char* Job::toString(JobType t)
case jtPROPOSAL_ut: return "untrustedProposal"; case jtPROPOSAL_ut: return "untrustedProposal";
case jtCLIENT: return "clientCommand"; case jtCLIENT: return "clientCommand";
case jtTRANSACTION: return "transaction"; case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishLedger";
case jtVALIDATION_t: return "trustedValidation"; case jtVALIDATION_t: return "trustedValidation";
case jtTRANSACTION_l: return "localTransaction"; case jtTRANSACTION_l: return "localTransaction";
case jtPROPOSAL_t: return "trustedProposal"; case jtPROPOSAL_t: return "trustedProposal";
@@ -48,7 +49,6 @@ const char* Job::toString(JobType t)
case jtDISK: return "diskAccess"; case jtDISK: return "diskAccess";
case jtRPC: return "rpc"; case jtRPC: return "rpc";
case jtACCEPTLEDGER: return "acceptLedger"; case jtACCEPTLEDGER: return "acceptLedger";
case jtPUBLEDGER: return "pubLedger";
case jtTXN_PROC: return "processTransaction"; case jtTXN_PROC: return "processTransaction";
default: assert(false); return "unknown"; default: assert(false); return "unknown";
} }

View File

@@ -26,19 +26,19 @@ enum JobType
jtPROPOSAL_ut = 3, // A proposal from an untrusted source jtPROPOSAL_ut = 3, // A proposal from an untrusted source
jtCLIENT = 4, // A websocket command from the client jtCLIENT = 4, // A websocket command from the client
jtTRANSACTION = 5, // A transaction received from the network jtTRANSACTION = 5, // A transaction received from the network
jtVALIDATION_t = 6, // A validation from a trusted source jtPUBLEDGER = 6, // Publish a fully-accepted ledger
jtTRANSACTION_l = 7, // A local transaction jtVALIDATION_t = 7, // A validation from a trusted source
jtPROPOSAL_t = 8, // A proposal from a trusted source jtTRANSACTION_l = 8, // A local transaction
jtADMIN = 9, // An administrative operation jtPROPOSAL_t = 9, // A proposal from a trusted source
jtDEATH = 10, // job of death, used internally jtADMIN = 10, // An administrative operation
jtDEATH = 11, // job of death, used internally
// special types not dispatched by the job pool // special types not dispatched by the job pool
jtPEER = 17, jtPEER = 17,
jtDISK = 18, jtDISK = 18,
jtRPC = 19, jtRPC = 19,
jtACCEPTLEDGER = 20, jtACCEPTLEDGER = 20,
jtPUBLEDGER = 21, jtTXN_PROC = 21,
jtTXN_PROC = 22,
}; // CAUTION: If you add new types, add them to JobType.cpp too }; // CAUTION: If you add new types, add them to JobType.cpp too
#define NUM_JOB_TYPES 24 #define NUM_JOB_TYPES 24

View File

@@ -467,9 +467,6 @@ void Ledger::saveAcceptedLedger(bool fromConsensus, LoadEvent::pointer event)
theApp->getLedgerMaster().setFullLedger(shared_from_this()); theApp->getLedgerMaster().setFullLedger(shared_from_this());
event->stop(); event->stop();
// FIXME: Need to put on hold until the ledger acquires sufficient validations
theApp->getOPs().pubLedger(shared_from_this());
decPendingSaves(); decPendingSaves();
} }

View File

@@ -322,62 +322,89 @@ void LedgerMaster::checkPublish(const uint256& hash)
void LedgerMaster::checkPublish(const uint256& hash, uint32 seq) void LedgerMaster::checkPublish(const uint256& hash, uint32 seq)
{ // check if we need to publish any held ledgers { // check if we need to publish any held ledgers
std::list<Ledger::pointer> pubLedgers; boost::recursive_mutex::scoped_lock ml(mLock);
boost::recursive_mutex::scoped_lock pl(mPubLock); if (seq <= mLastValidateSeq)
return;
int minVal = mMinValidations;
if (mLastValidateHash.isNonZero())
{ {
boost::recursive_mutex::scoped_lock ml(mLock); int val = theApp->getValidations().getTrustedValidationCount(mLastValidateHash);
val *= MIN_VALIDATION_RATIO;
val /= 256;
if (val > minVal)
minVal = val;
}
if (seq <= mLastValidateSeq) if (theConfig.RUN_STANDALONE)
return; minVal = 0;
int minVal = mMinValidations; cLog(lsTRACE) << "Sweeping for ledgers to publish: minval=" << minVal;
if (mLastValidateHash.isNonZero()) // See if any later ledgers have at least the minimum number of validations
{ for (seq = mFinalizedLedger->getLedgerSeq(); seq > mLastValidateSeq; --seq)
int val = theApp->getValidations().getTrustedValidationCount(mLastValidateHash); {
val *= MIN_VALIDATION_RATIO; Ledger::pointer ledger = mLedgerHistory.getLedgerBySeq(seq);
val /= 256; if (ledger && (theApp->getValidations().getTrustedValidationCount(ledger->getHash()) >= minVal))
if (val > minVal) { // this ledger (and any priors) can be published
minVal = val; if (ledger->getLedgerSeq() > (mLastValidateSeq + MAX_LEDGER_GAP))
} mLastValidateSeq = ledger->getLedgerSeq() - MAX_LEDGER_GAP;
cLog(lsTRACE) << "Sweeping for leders to publish: minval=" << minVal;
// See if any later ledgers have at least the minimum number of validations cLog(lsTRACE) << "Ledger " << ledger->getLedgerSeq() << " can be published";
for (seq = mFinalizedLedger->getLedgerSeq(); seq > mLastValidateSeq; --seq) for (uint32 pubSeq = mLastValidateSeq + 1; pubSeq <= seq; ++pubSeq)
{ {
Ledger::pointer ledger = mLedgerHistory.getLedgerBySeq(seq); uint256 pubHash = ledger->getLedgerHash(pubSeq);
if (ledger && (theApp->getValidations().getTrustedValidationCount(ledger->getHash()) >= minVal)) if (pubHash.isZero()) // CHECKME: Should we double-check validations in this case?
{ // this ledger (and any priors) can be published pubHash = mLedgerHistory.getLedgerHash(pubSeq);
if (ledger->getLedgerSeq() > (mLastValidateSeq + MAX_LEDGER_GAP)) if (pubHash.isNonZero())
mLastValidateSeq = ledger->getLedgerSeq() - MAX_LEDGER_GAP;
cLog(lsTRACE) << "Ledger " << ledger->getLedgerSeq() << " can be published";
for (uint32 pubSeq = mLastValidateSeq + 1; pubSeq <= seq; ++pubSeq)
{ {
uint256 pubHash = ledger->getLedgerHash(pubSeq); Ledger::pointer ledger = mLedgerHistory.getLedgerByHash(pubHash);
if (pubHash.isZero()) // CHECKME: Should we double-check validations in this case? if (ledger)
pubHash = mLedgerHistory.getLedgerHash(pubSeq);
if (pubHash.isNonZero())
{ {
Ledger::pointer pubLedger = mLedgerHistory.getLedgerByHash(pubHash); mPubLedgers.push_back(ledger);
if (pubLedger) mValidLedger = ledger;
{ mLastValidateSeq = ledger->getLedgerSeq();
pubLedgers.push_back(pubLedger); mLastValidateHash = ledger->getHash();
mLastValidateSeq = pubLedger->getLedgerSeq();
mLastValidateHash = pubLedger->getHash();
}
} }
} }
} }
} }
} }
BOOST_FOREACH(Ledger::ref l, pubLedgers) if (!mPubThread)
{ {
BOOST_FOREACH(callback& c, mOnValidate) mPubThread = true;
theApp->getJobQueue().addJob(jtPUBLEDGER, boost::bind(&LedgerMaster::pubThread, this));
}
}
void LedgerMaster::pubThread()
{
std::list<Ledger::pointer> ledgers;
while (1)
{
ledgers.clear();
{ {
c(l); boost::recursive_mutex::scoped_lock ml(mLock);
mPubLedgers.swap(ledgers);
if (ledgers.empty())
{
mPubThread = false;
return;
}
}
BOOST_FOREACH(Ledger::ref l, ledgers)
{
theApp->getOPs().pubLedger(l);
BOOST_FOREACH(callback& c, mOnValidate)
{
c(l);
}
} }
} }
} }

View File

@@ -27,6 +27,7 @@ protected:
Ledger::pointer mCurrentLedger; // The ledger we are currently processiong Ledger::pointer mCurrentLedger; // The ledger we are currently processiong
Ledger::pointer mFinalizedLedger; // The ledger that most recently closed Ledger::pointer mFinalizedLedger; // The ledger that most recently closed
Ledger::pointer mValidLedger; // The ledger we most recently fully accepted
LedgerHistory mLedgerHistory; LedgerHistory mLedgerHistory;
@@ -37,23 +38,26 @@ protected:
uint32 mMissingSeq; uint32 mMissingSeq;
bool mTooFast; // We are acquiring faster than we're writing bool mTooFast; // We are acquiring faster than we're writing
boost::recursive_mutex mPubLock;
int mMinValidations; // The minimum validations to publish a ledger int mMinValidations; // The minimum validations to publish a ledger
uint256 mLastValidateHash; uint256 mLastValidateHash;
uint32 mLastValidateSeq; uint32 mLastValidateSeq;
std::list<callback> mOnValidate; // Called when a ledger has enough validations std::list<callback> mOnValidate; // Called when a ledger has enough validations
std::list<Ledger::pointer> mPubLedgers; // List of ledgers to publish
bool mPubThread; // Publish thread is running
void applyFutureTransactions(uint32 ledgerIndex); void applyFutureTransactions(uint32 ledgerIndex);
bool isValidTransaction(const Transaction::pointer& trans); bool isValidTransaction(const Transaction::pointer& trans);
bool isTransactionOnFutureList(const Transaction::pointer& trans); bool isTransactionOnFutureList(const Transaction::pointer& trans);
void acquireMissingLedger(const uint256& ledgerHash, uint32 ledgerSeq); void acquireMissingLedger(const uint256& ledgerHash, uint32 ledgerSeq);
void missingAcquireComplete(LedgerAcquire::pointer); void missingAcquireComplete(LedgerAcquire::pointer);
void pubThread();
public: public:
LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0), mTooFast(false), LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0), mTooFast(false),
mMinValidations(0), mLastValidateSeq(0) mMinValidations(0), mLastValidateSeq(0), mPubThread(false)
{ ; } { ; }
uint32 getCurrentLedgerIndex(); uint32 getCurrentLedgerIndex();

View File

@@ -1149,11 +1149,6 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted)
{ {
// Don't publish to clients ledgers we don't trust. // Don't publish to clients ledgers we don't trust.
// TODO: we need to publish old transactions when we get reconnected to the network otherwise clients can miss transactions // TODO: we need to publish old transactions when we get reconnected to the network otherwise clients can miss transactions
if (NetworkOPs::omDISCONNECTED == getOperatingMode())
return;
LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtPUBLEDGER));
{ {
boost::recursive_mutex::scoped_lock sl(mMonitorLock); boost::recursive_mutex::scoped_lock sl(mMonitorLock);
@@ -1178,26 +1173,24 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted)
} }
} }
// we don't lock since pubAcceptedTransaction is locking
if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() )
{ {
// we don't lock since pubAcceptedTransaction is locking SHAMap& txSet = *lpAccepted->peekTransactionMap();
if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() )
for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag()))
{ {
SHAMap& txSet = *lpAccepted->peekTransactionMap(); SerializerIterator it(item->peekSerializer());
for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag())) // OPTIMIZEME: Could get transaction from txn master, but still must call getVL
{ Serializer txnSer(it.getVL());
SerializerIterator it(item->peekSerializer()); SerializerIterator txnIt(txnSer);
SerializedTransaction stTxn(txnIt);
// OPTIMIZEME: Could get transaction from txn master, but still must call getVL TransactionMetaSet::pointer meta = boost::make_shared<TransactionMetaSet>(
Serializer txnSer(it.getVL()); stTxn.getTransactionID(), lpAccepted->getLedgerSeq(), it.getVL());
SerializerIterator txnIt(txnSer);
SerializedTransaction stTxn(txnIt);
TransactionMetaSet::pointer meta = boost::make_shared<TransactionMetaSet>( pubAcceptedTransaction(lpAccepted, stTxn, meta->getResultTER(), meta);
stTxn.getTransactionID(), lpAccepted->getLedgerSeq(), it.getVL());
pubAcceptedTransaction(lpAccepted, stTxn, meta->getResultTER(), meta);
}
} }
} }
} }