Ledger acquire improvements and I/O reductions.

This commit is contained in:
JoelKatz
2013-04-25 11:09:57 -07:00
parent d2a541d615
commit 494202fbcf
7 changed files with 183 additions and 140 deletions

View File

@@ -106,80 +106,97 @@ LedgerAcquire::LedgerAcquire(const uint256& hash, uint32 seq) : PeerSet(hash, LE
tryLocal(); tryLocal();
} }
void LedgerAcquire::checkLocal()
{
boost::recursive_mutex::scoped_lock sl(mLock);
if (mComplete)
return;
if (tryLocal())
done();
}
bool LedgerAcquire::tryLocal() bool LedgerAcquire::tryLocal()
{ // return value: true = no more work to do { // return value: true = no more work to do
assert(!mHaveBase && !mHaveTransactions && !mHaveState); if (!mHaveBase)
// Nothing we can do without the ledger base
HashedObject::pointer node = theApp->getHashedObjectStore().retrieve(mHash);
if (!node)
{ {
std::vector<unsigned char> data; // Nothing we can do without the ledger base
if (!theApp->getOPs().getFetchPack(mHash, data)) HashedObject::pointer node = theApp->getHashedObjectStore().retrieve(mHash);
return false; if (!node)
cLog(lsTRACE) << "Ledger base found in fetch pack";
mLedger = boost::make_shared<Ledger>(data, true);
theApp->getHashedObjectStore().store(hotLEDGER, mLedger->getLedgerSeq(), data, mHash);
}
else
{
mLedger = boost::make_shared<Ledger>(strCopy(node->getData()), true);
}
if (mLedger->getHash() != mHash)
{ // We know for a fact the ledger can never be acquired
cLog(lsWARNING) << mHash << " cannot be a ledger";
mFailed = true;
return true;
}
mHaveBase = true;
if (mLedger->getTransHash().isZero())
{
cLog(lsDEBUG) << "No TXNs to fetch";
mHaveTransactions = true;
}
else
{
try
{ {
TransactionStateSF filter(mLedger->getLedgerSeq()); std::vector<unsigned char> data;
mLedger->peekTransactionMap()->fetchRoot(mLedger->getTransHash(), &filter); if (!theApp->getOPs().getFetchPack(mHash, data))
cLog(lsDEBUG) << "Got root txn map locally"; return false;
std::vector<uint256> h = mLedger->getNeededTransactionHashes(1, &filter); cLog(lsTRACE) << "Ledger base found in fetch pack";
if (h.empty()) mLedger = boost::make_shared<Ledger>(data, true);
theApp->getHashedObjectStore().store(hotLEDGER, mLedger->getLedgerSeq(), data, mHash);
}
else
{
mLedger = boost::make_shared<Ledger>(strCopy(node->getData()), true);
}
if (mLedger->getHash() != mHash)
{ // We know for a fact the ledger can never be acquired
cLog(lsWARNING) << mHash << " cannot be a ledger";
mFailed = true;
return true;
}
mHaveBase = true;
}
if (!mHaveTransactions)
{
if (mLedger->getTransHash().isZero())
{
cLog(lsDEBUG) << "No TXNs to fetch";
mHaveTransactions = true;
}
else
{
try
{
TransactionStateSF filter(mLedger->getLedgerSeq());
mLedger->peekTransactionMap()->fetchRoot(mLedger->getTransHash(), &filter);
cLog(lsDEBUG) << "Got root txn map locally";
std::vector<uint256> h = mLedger->getNeededTransactionHashes(1, &filter);
if (h.empty())
{
cLog(lsDEBUG) << "Had full txn map locally";
mHaveTransactions = true;
}
}
catch (SHAMapMissingNode&)
{ {
cLog(lsDEBUG) << "Had full txn map locally";
mHaveTransactions = true;
} }
} }
catch (SHAMapMissingNode&)
{
}
} }
if (mLedger->getAccountHash().isZero()) if (!mHaveState)
{ {
cLog(lsFATAL) << "We are acquiring a ledger with a zero account hash"; if (mLedger->getAccountHash().isZero())
mHaveState = true;
}
else
{
try
{ {
AccountStateSF filter(mLedger->getLedgerSeq()); cLog(lsFATAL) << "We are acquiring a ledger with a zero account hash";
mLedger->peekAccountStateMap()->fetchRoot(mLedger->getAccountHash(), &filter); mHaveState = true;
cLog(lsDEBUG) << "Got root AS map locally";
std::vector<uint256> h = mLedger->getNeededAccountStateHashes(1, &filter);
if (h.empty())
{
cLog(lsDEBUG) << "Had full AS map locally";
mHaveState = true;
}
} }
catch (SHAMapMissingNode&) else
{ {
try
{
AccountStateSF filter(mLedger->getLedgerSeq());
mLedger->peekAccountStateMap()->fetchRoot(mLedger->getAccountHash(), &filter);
cLog(lsDEBUG) << "Got root AS map locally";
std::vector<uint256> h = mLedger->getNeededAccountStateHashes(1, &filter);
if (h.empty())
{
cLog(lsDEBUG) << "Had full AS map locally";
mHaveState = true;
}
}
catch (SHAMapMissingNode&)
{
}
} }
} }
@@ -1028,4 +1045,22 @@ int LedgerAcquireMaster::getFetchCount(int& timeoutCount)
return ret; return ret;
} }
void LedgerAcquireMaster::gotFetchPack(Job&)
{
std::vector<LedgerAcquire::pointer> acquires;
{
boost::mutex::scoped_lock sl(mLock);
acquires.reserve(mLedgers.size());
typedef std::pair<uint256, LedgerAcquire::pointer> u256_acq_pair;
BOOST_FOREACH(const u256_acq_pair& it, mLedgers)
acquires.push_back(it.second);
}
BOOST_FOREACH(const LedgerAcquire::pointer& acquire, acquires)
{
acquire->checkLocal();
}
}
// vim:ts=4 // vim:ts=4

View File

@@ -130,6 +130,7 @@ public:
void addPeers(); void addPeers();
void awaitData(); void awaitData();
void noAwaitData(); void noAwaitData();
void checkLocal();
typedef std::pair<ripple::TMGetObjectByHash::ObjectType, uint256> neededHash_t; typedef std::pair<ripple::TMGetObjectByHash::ObjectType, uint256> neededHash_t;
std::vector<neededHash_t> getNeededHashes(); std::vector<neededHash_t> getNeededHashes();
@@ -162,6 +163,7 @@ public:
void logFailure(const uint256& h) { mRecentFailures.add(h); } void logFailure(const uint256& h) { mRecentFailures.add(h); }
bool isFailure(const uint256& h) { return mRecentFailures.isPresent(h, false); } bool isFailure(const uint256& h) { return mRecentFailures.isPresent(h, false); }
void gotFetchPack(Job&);
void sweep(); void sweep();
}; };

View File

@@ -283,7 +283,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
} }
} }
if (theApp->getOPs().shouldFetchPack()) if (theApp->getOPs().shouldFetchPack() && (ledgerSeq > 40000))
{ // refill our fetch pack { // refill our fetch pack
Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(ledgerSeq + 1); Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(ledgerSeq + 1);
if (nextLedger) if (nextLedger)

View File

@@ -2108,4 +2108,11 @@ int NetworkOPs::getFetchSize()
return mFetchPack.getCacheSize(); return mFetchPack.getCacheSize();
} }
void NetworkOPs::gotFetchPack(bool progress)
{
mLastFetchPack = 0;
theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack",
boost::bind(&LedgerAcquireMaster::gotFetchPack, &theApp->getMasterLedgerAcquire(), _1));
}
// vim:ts=4 // vim:ts=4

View File

@@ -263,7 +263,7 @@ public:
void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request, void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer wantLedger, Ledger::pointer haveLedger); Ledger::pointer wantLedger, Ledger::pointer haveLedger);
bool shouldFetchPack(); bool shouldFetchPack();
void gotFetchPack() { mLastFetchPack = 0; } void gotFetchPack(bool progress);
void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data); void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data);
bool getFetchPack(const uint256& hash, std::vector<unsigned char>& data); bool getFetchPack(const uint256& hash, std::vector<unsigned char>& data);
int getFetchSize(); int getFetchSize();

View File

@@ -1235,10 +1235,9 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
} }
else else
{ // this is a reply { // this is a reply
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
theApp->getOPs().gotFetchPack();
uint32 pLSeq = 0; uint32 pLSeq = 0;
bool pLDo = true; bool pLDo = true;
bool progress = false;
for (int i = 0; i < packet.objects_size(); ++i) for (int i = 0; i < packet.objects_size(); ++i)
{ {
const ripple::TMIndexedObject& obj = packet.objects(i); const ripple::TMIndexedObject& obj = packet.objects(i);
@@ -1256,6 +1255,8 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
{ {
cLog(lsDEBUG) << "Got pack for " << pLSeq << " too late"; cLog(lsDEBUG) << "Got pack for " << pLSeq << " too late";
} }
else
progress = true;
} }
} }
@@ -1272,6 +1273,8 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
} }
} }
tLog(pLDo && (pLSeq != 0), lsDEBUG) << "Received partial fetch pack for " << pLSeq; tLog(pLDo && (pLSeq != 0), lsDEBUG) << "Received partial fetch pack for " << pLSeq;
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
theApp->getOPs().gotFetchPack(progress);
} }
} }

View File

@@ -336,94 +336,90 @@ SMAddNode SHAMap::addKnownNode(const SHAMapNode& node, const std::vector<unsigne
boost::recursive_mutex::scoped_lock sl(mLock); boost::recursive_mutex::scoped_lock sl(mLock);
if (checkCacheNode(node)) if (checkCacheNode(node)) // Do we already have this node?
return SMAddNode::okay(); return SMAddNode::okay();
std::stack<SHAMapTreeNode::pointer> stack = getStack(node.getNodeID(), true, true); SHAMapTreeNode* iNode = root.get();
if (stack.empty()) while (!iNode->isLeaf() && !iNode->isFullBelow())
{ {
cLog(lsWARNING) << "AddKnownNode with empty stack"; if (iNode->isLeaf() || iNode->isFullBelow() || (iNode->getDepth() >= node.getDepth()))
return SMAddNode::invalid(); return SMAddNode::okay();
}
SHAMapTreeNode::pointer iNode = stack.top(); int branch = iNode->selectBranch(node.getNodeID());
if (!iNode) assert(branch >= 0);
{ // we should always have a root
assert(false);
return SMAddNode::invalid();
}
if (iNode->isLeaf() || (iNode->getDepth() >= node.getDepth())) if (iNode->isEmptyBranch(branch))
{ {
cLog(lsTRACE) << "got inner node, already had it (late)"; cLog(lsWARNING) << "Add known node for empty branch" << node;
return SMAddNode::okay(); return SMAddNode::invalid();
} }
if (fullBelowCache.isPresent(iNode->getChildHash(branch)))
return SMAddNode::okay();
if (iNode->getDepth() != (node.getDepth() - 1)) try
{ // Either this node is broken or we didn't request it (yet) {
cLog(lsWARNING) << "unable to hook node " << node; iNode = getNodePointer(iNode->getChildNodeID(branch), iNode->getChildHash(branch));
cLog(lsINFO) << " stuck at " << *iNode; }
cLog(lsINFO) << "got depth=" << node.getDepth() << ", walked to= " << iNode->getDepth(); catch (SHAMapMissingNode)
return SMAddNode::invalid(); {
} if (iNode->getDepth() != (node.getDepth() - 1))
{ // Either this node is broken or we didn't request it (yet)
cLog(lsWARNING) << "unable to hook node " << node;
cLog(lsINFO) << " stuck at " << *iNode;
cLog(lsINFO) << "got depth=" << node.getDepth() << ", walked to= " << iNode->getDepth();
return SMAddNode::invalid();
}
int branch = iNode->selectBranch(node.getNodeID()); SHAMapTreeNode::pointer newNode =
if (branch < 0) boost::make_shared<SHAMapTreeNode>(node, rawNode, mSeq - 1, snfWIRE, uZero, false);
{ if (iNode->getChildHash(branch) != newNode->getNodeHash())
assert(false);
return SMAddNode::invalid();
}
uint256 hash = iNode->getChildHash(branch);
if (hash.isZero())
{
cLog(lsWARNING) << "AddKnownNode for empty branch";
return SMAddNode::invalid();
}
assert(mSeq >= 1);
SHAMapTreeNode::pointer newNode =
boost::make_shared<SHAMapTreeNode>(node, rawNode, mSeq - 1, snfWIRE, uZero, false);
if (hash != newNode->getNodeHash()) // these aren't the droids we're looking for
return SMAddNode::invalid();
if (filter)
{
Serializer s;
newNode->addRaw(s, snfPREFIX);
filter->gotNode(false, node, hash, s.peekData(), newNode->getType());
}
mTNByID[*newNode] = newNode;
if (!newNode->isLeaf())
return SMAddNode::useful(); // only a leaf can fill a branch
// did this new leaf cause its parents to fill up
do
{
iNode = stack.top();
stack.pop();
assert(iNode->isInner());
for (int i = 0; i < 16; ++i)
if (!iNode->isEmptyBranch(i))
{ {
try return SMAddNode::invalid();
{ }
SHAMapTreeNode::pointer nextNode = getNode(iNode->getChildNodeID(i), iNode->getChildHash(i), false);
if (nextNode->isInner() && !nextNode->isFullBelow()) if (filter)
return SMAddNode::useful(); {
} Serializer s;
catch (SHAMapMissingNode&) newNode->addRaw(s, snfPREFIX);
{ filter->gotNode(false, node, iNode->getChildHash(branch), s.peekData(), newNode->getType());
return SMAddNode::useful(); }
mTNByID[node] = newNode;
if (!newNode->isLeaf()) // only a leaf can fill an inner node
return SMAddNode::useful();
try
{
for (int i = 0; i < 16; ++i)
{ // does the parent still need more nodes
if (!iNode->isEmptyBranch(i) && !fullBelowCache.isPresent(iNode->getChildHash(i)))
{
SHAMapTreeNode* d = getNodePointer(iNode->getChildNodeID(i), iNode->getChildHash(i));
if (d->isInner() && !d->isFullBelow()) // unfilled inner node
return SMAddNode::useful();
}
} }
} }
iNode->setFullBelow(); catch (SHAMapMissingNode)
} while (!stack.empty()); { // still missing something
return SMAddNode::useful();
}
if (root->isFullBelow()) // received leaf fills its parent
clearSynching(); iNode->setFullBelow();
if (mType == smtSTATE)
{
fullBelowCache.add(iNode->getNodeHash());
dropBelow(iNode);
}
if (root->isFullBelow())
clearSynching();
return SMAddNode::useful();
}
}
return SMAddNode::useful(); cLog(lsTRACE) << "got inner node, already had it (late)";
return SMAddNode::okay();
} }
bool SHAMap::deepCompare(SHAMap& other) bool SHAMap::deepCompare(SHAMap& other)