Fetch pack scheme. Phase two.

This commit is contained in:
JoelKatz
2013-04-21 20:37:46 -07:00
parent 9c0a156c13
commit 0c7066944e
9 changed files with 70 additions and 83 deletions

View File

@@ -360,6 +360,7 @@ void Application::sweep()
mSLECache.sweep();
AcceptedLedger::sweep();
SHAMap::sweep();
mNetOps.sweepFetchPack();
mSweepTimer.expires_from_now(boost::posix_time::seconds(theConfig.getSize(siSweepInterval)));
mSweepTimer.async_wait(boost::bind(&Application::sweep, this));
}

View File

@@ -363,7 +363,6 @@ void LedgerAcquire::trigger(Peer::ref peer)
BOOST_FOREACH(neededHash_t& p, need)
{
cLog(lsWARNING) << "Want: " << p.second;
theApp->getOPs().addWantedHash(p.second);
if (!typeSet)
{
tmBH.set_type(p.first);

View File

@@ -265,7 +265,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
{
typedef std::pair<uint32, uint256> u_pair;
std::vector<u_pair> vec = origLedger->getLedgerHashes();
BOOST_REVERSE_FOREACH(const u_pair& it, vec)
BOOST_FOREACH(const u_pair& it, vec)
{
if ((fetchCount < fetchMax) && (it.first < ledgerSeq) &&
!mCompleteLedgers.hasValue(it.first) && !theApp->getMasterLedgerAcquire().find(it.second))

View File

@@ -34,7 +34,7 @@ void InfoSub::onSendEmpty()
NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster) :
mMode(omDISCONNECTED), mNeedNetworkLedger(false), mProposing(false), mValidating(false),
mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0), mLastCloseProposers(0),
mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0),
mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0), mFetchPack("FetchPack", 2048, 12),
mLastLoadBase(256), mLastLoadFactor(256)
{
}
@@ -156,18 +156,6 @@ bool NetworkOPs::isValidated(uint32 seq)
return haveLedger(seq) && (seq <= mLedgerMaster->getValidatedLedger()->getLedgerSeq());
}
bool NetworkOPs::addWantedHash(const uint256& h)
{
boost::recursive_mutex::scoped_lock sl(mWantedHashLock);
return mWantedHashes.insert(h).second;
}
bool NetworkOPs::isWantedHash(const uint256& h, bool remove)
{
boost::recursive_mutex::scoped_lock sl(mWantedHashLock);
return (remove ? mWantedHashes.erase(h) : mWantedHashes.count(h)) != 0;
}
void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, stCallback callback)
{ // this is an asynchronous interface
Serializer s;
@@ -2007,7 +1995,7 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays
}
void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer prevLedger, Ledger::pointer reqLedger)
Ledger::pointer wantLedger, Ledger::pointer haveLedger)
{
try
{
@@ -2021,25 +2009,32 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
reply.set_seq(request->seq());
reply.set_ledgerhash(reply.ledgerhash());
std::list<SHAMap::fetchPackEntry_t> pack =
reqLedger->peekAccountStateMap()->getFetchPack(prevLedger->peekAccountStateMap().get(), false, 1024);
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
do
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
}
if (reqLedger->getAccountHash().isNonZero() && (pack.size() < 768))
{
pack = reqLedger->peekTransactionMap()->getFetchPack(NULL, true, 256);
std::list<SHAMap::fetchPackEntry_t> pack = wantLedger->peekAccountStateMap()->getFetchPack(
haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size());
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
}
}
if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768))
{
pack = wantLedger->peekTransactionMap()->getFetchPack(NULL, true, 256);
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
}
}
if (reply.objects().size() >= 768)
break;
haveLedger = wantLedger;
wantLedger = getLedgerByHash(haveLedger->getParentHash());
} while (wantLedger);
cLog(lsINFO) << "Built fetch pack with " << reply.objects().size() << " nodes";
PackedMessage::pointer msg = boost::make_shared<PackedMessage>(reply, ripple::mtGET_OBJECTS);
@@ -2051,4 +2046,19 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
}
}
void NetworkOPs::sweepFetchPack()
{
mFetchPack.sweep();
}
void NetworkOPs::addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data)
{
mFetchPack.canonicalize(hash, data, false);
}
bool NetworkOPs::getFetchPack(const uint256& hash, std::vector<unsigned char>& data)
{
return mFetchPack.retrieve(hash, data);
}
// vim:ts=4

View File

@@ -128,8 +128,7 @@ protected:
subMapType mSubTransactions; // all accepted transactions
subMapType mSubRTTransactions; // all proposed and accepted transactions
boost::recursive_mutex mWantedHashLock;
boost::unordered_set<uint256> mWantedHashes;
TaggedCache< uint256, std::vector<unsigned char> > mFetchPack;
uint32 mLastLoadBase;
uint32 mLastLoadFactor;
@@ -261,7 +260,10 @@ public:
void mapComplete(const uint256& hash, SHAMap::ref map);
bool stillNeedTXSet(const uint256& hash);
void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer prevLedger, Ledger::pointer reqLedger);
Ledger::pointer wantLedger, Ledger::pointer haveLedger);
void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data);
bool getFetchPack(const uint256& hash, std::vector<unsigned char>& data);
void sweepFetchPack();
// network state machine
void checkState(const boost::system::error_code& result);
@@ -294,9 +296,6 @@ public:
uint256 getConsensusLCL();
void reportFeeChange();
bool addWantedHash(const uint256& h);
bool isWantedHash(const uint256& h, bool remove);
//Helper function to generate SQL query to get transactions
std::string transactionsSQL(std::string selection, const RippleAddress& account,
int32 minLedger, int32 maxLedger, bool descending, uint32 offset, int limit,

View File

@@ -1189,14 +1189,14 @@ void Peer::recvPeers(ripple::TMPeers& packet)
void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& ptr)
{
ripple::TMGetObjectByHash& packet = *ptr;
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
{
doFetchPack(ptr);
return;
}
if (packet.query())
{ // this is a query
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
{
doFetchPack(ptr);
return;
}
ripple::TMGetObjectByHash reply;
reply.set_query(false);
@@ -1233,16 +1233,6 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
}
else
{ // this is a reply
uint32 seq = packet.has_seq() ? packet.seq() : 0;
HashedObjectType type;
switch (packet.type())
{
case ripple::TMGetObjectByHash::otLEDGER: type = hotLEDGER; break;
case ripple::TMGetObjectByHash::otTRANSACTION: type = hotTRANSACTION; break;
case ripple::TMGetObjectByHash::otSTATE_NODE: type = hotACCOUNT_NODE; break;
case ripple::TMGetObjectByHash::otTRANSACTION_NODE: type = hotTRANSACTION_NODE; break;
default: type = hotUNKNOWN;
}
for (int i = 0; i < packet.objects_size(); ++i)
{
const ripple::TMIndexedObject& obj = packet.objects(i);
@@ -1250,23 +1240,11 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
{
uint256 hash;
memcpy(hash.begin(), obj.hash().data(), 256 / 8);
if (theApp->getOPs().isWantedHash(hash, true))
{
std::vector<unsigned char> data(obj.data().begin(), obj.data().end());
if (Serializer::getSHA512Half(data) != hash)
{
cLog(lsWARNING) << "Bad hash in data from peer";
theApp->getOPs().addWantedHash(hash);
punishPeer(LT_BadData);
}
else
{
cLog(lsDEBUG) << "Got wanted hash " << hash;
theApp->getHashedObjectStore().store(type, seq, data, hash);
}
}
else
cLog(lsWARNING) << "Received unwanted hash " << getIP() << " " << hash;
boost::shared_ptr< std::vector<unsigned char> > data = boost::make_shared< std::vector<unsigned char> >
(obj.data().begin(), obj.data().end());
theApp->getOPs().addFetchPack(hash, data);
}
}
}
@@ -1868,22 +1846,22 @@ void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packe
uint256 hash;
memcpy(hash.begin(), packet->ledgerhash().data(), 32);
Ledger::pointer reqLedger = theApp->getOPs().getLedgerByHash(hash);
if (!reqLedger)
Ledger::pointer haveLedger = theApp->getOPs().getLedgerByHash(hash);
if (!haveLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
if (!reqLedger->isClosed())
if (!haveLedger->isClosed())
{
cLog(lsWARNING) << "Peer requests fetch pack for open ledger: " << hash;
cLog(lsWARNING) << "Peer requests fetch pack from open ledger: " << hash;
punishPeer(LT_InvalidRequest);
return;
}
Ledger::pointer prevLedger = theApp->getOPs().getLedgerByHash(reqLedger->getParentHash());
if (!prevLedger)
Ledger::pointer wantLedger = theApp->getOPs().getLedgerByHash(haveLedger->getParentHash());
if (!wantLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash;
punishPeer(LT_RequestNoReply);
@@ -1891,7 +1869,7 @@ void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packe
}
theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack",
BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1,
boost::weak_ptr<Peer>(shared_from_this()), packet, prevLedger, reqLedger));
boost::weak_ptr<Peer>(shared_from_this()), packet, wantLedger, haveLedger));
}
else
{ // received fetch pack

View File

@@ -477,7 +477,7 @@ public:
virtual void dump(bool withHashes = false);
typedef std::pair< uint256, std::vector<unsigned char> > fetchPackEntry_t;
std::list<fetchPackEntry_t> getFetchPack(SHAMap* prior, bool includeLeaves, int max);
std::list<fetchPackEntry_t> getFetchPack(SHAMap* have, bool includeLeaves, int max);
static void sweep() { fullBelowCache.sweep(); }
};

View File

@@ -482,14 +482,14 @@ bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash)
return node->getNodeHash() == nodeHash;
}
std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* prior, bool includeLeaves, int max)
std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* have, bool includeLeaves, int max)
{
std::list<fetchPackEntry_t> ret;
if (root->isLeaf())
{
if (includeLeaves && !root->getNodeHash().isZero() &&
(!prior || !prior->hasNode(*root, root->getNodeHash())))
(!have || !have->hasNode(*root, root->getNodeHash())))
{
Serializer s;
root->addRaw(s, snfPREFIX);
@@ -501,7 +501,7 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* prior, bool inc
if (root->getNodeHash().isZero())
return ret;
if (prior && (root->getNodeHash() == prior->root->getNodeHash()))
if (have && (root->getNodeHash() == have->root->getNodeHash()))
return ret;
std::stack<SHAMapTreeNode*> stack; // contains unexplored non-matching inner node entries
@@ -529,10 +529,10 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* prior, bool inc
SHAMapTreeNode *next = getNodePointer(childID, childHash);
if (next->isInner())
{
if (!prior || !prior->hasNode(*next, childHash))
if (!have || !have->hasNode(*next, childHash))
stack.push(next);
}
else if (includeLeaves && (!prior || !prior->hasNode(childID, childHash)))
else if (includeLeaves && (!have || !have->hasNode(childID, childHash)))
{
Serializer s;
node->addRaw(s, snfPREFIX);

View File

@@ -35,8 +35,8 @@ public:
theApp->getHashedObjectStore().store(hotACCOUNT_NODE, mLedgerSeq, nodeData, nodeHash);
}
virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector<unsigned char>& nodeData)
{ // fetchNodeExternal already tried
return false;
{
return theApp->getOPs().getFetchPack(nodeHash, nodeData);
}
};
@@ -58,8 +58,8 @@ public:
mLedgerSeq, nodeData, nodeHash);
}
virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector<unsigned char>& nodeData)
{ // fetchNodeExternal already tried
return false;
{
return theApp->getOPs().getFetchPack(nodeHash, nodeData);
}
};