ech pack stuff.

This commit is contained in:
JoelKatz
2013-04-22 11:09:07 -07:00
parent 94838b0db7
commit aeccecb578
7 changed files with 62 additions and 11 deletions

View File

@@ -118,6 +118,7 @@ bool LedgerAcquire::tryLocal()
std::vector<unsigned char> data;
if (!theApp->getOPs().getFetchPack(mHash, data))
return false;
cLog(lsINFO) << "Ledger base found in fetch pack";
mLedger = boost::make_shared<Ledger>(data, true);
theApp->getHashedObjectStore().store(hotLEDGER, mLedger->getLedgerSeq(), data, mHash);
}

View File

@@ -251,7 +251,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
theApp->getIOService().post(boost::bind(&LedgerMaster::missingAcquireComplete, this, mMissingLedger));
}
if (theApp->getOPs().getFetchSize() < 256)
if (theApp->getOPs().shouldFetchPack())
{ // refill our fetch pack
Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(ledgerSeq + 1);
if (nextLedger)
@@ -282,7 +282,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
target->sendPacket(packet, false);
}
else
cLog(lsINFO) << "No peer for fetch pack";
cLog(lsTRACE) << "No peer for fetch pack";
}
}

View File

@@ -6,6 +6,7 @@
#include "utils.h"
#include "Application.h"
#include "Transaction.h"
#include "HashPrefixes.h"
#include "LedgerConsensus.h"
#include "LedgerTiming.h"
#include "Log.h"
@@ -34,7 +35,8 @@ void InfoSub::onSendEmpty()
NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster) :
mMode(omDISCONNECTED), mNeedNetworkLedger(false), mProposing(false), mValidating(false),
mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0), mLastCloseProposers(0),
mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0), mFetchPack("FetchPack", 2048, 12),
mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0),
mFetchPack("FetchPack", 2048, 3), mLastFetchPack(0),
mLastLoadBase(256), mLastLoadFactor(256)
{
}
@@ -2018,6 +2020,16 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
do
{
uint32 lSeq = wantLedger->getLedgerSeq();
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(wantLedger->getHash().begin(), 256 / 8);
Serializer s(256);
s.add32(sHP_Ledger);
wantLedger->addRaw(s);
newObj.set_data(s.getDataPtr(), s.getLength());
newObj.set_ledgerseq(lSeq);
std::list<SHAMap::fetchPackEntry_t> pack = wantLedger->peekAccountStateMap()->getFetchPack(
haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size());
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
@@ -2025,6 +2037,7 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
newObj.set_ledgerseq(lSeq);
}
if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768))
@@ -2035,6 +2048,7 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
newObj.set_ledgerseq(lSeq);
}
}
if (reply.objects().size() >= 768)
@@ -2074,7 +2088,19 @@ bool NetworkOPs::getFetchPack(const uint256& hash, std::vector<unsigned char>& d
cLog(lsWARNING) << "Bad entry in fetch pack";
return false;
}
cLog(lsTRACE) << hash << " found in fetch pack";
cLog(lsINFO) << hash << " found in fetch pack";
return true;
}
bool NetworkOPs::shouldFetchPack()
{
uint32 now = getNetworkTimeNC();
if (mLastFetchPack == now)
return false;
mFetchPack.sweep();
if (mFetchPack.getCacheSize() > 384)
return false;
mLastFetchPack = now;
return true;
}

View File

@@ -129,6 +129,7 @@ protected:
subMapType mSubRTTransactions; // all proposed and accepted transactions
TaggedCache< uint256, std::vector<unsigned char> > mFetchPack;
uint32 mLastFetchPack;
uint32 mLastLoadBase;
uint32 mLastLoadFactor;
@@ -261,6 +262,8 @@ public:
bool stillNeedTXSet(const uint256& hash);
void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer wantLedger, Ledger::pointer haveLedger);
bool shouldFetchPack();
void gotFetchPack() { mLastFetchPack = 0; }
void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data);
bool getFetchPack(const uint256& hash, std::vector<unsigned char>& data);
int getFetchSize();

View File

@@ -1235,18 +1235,40 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
}
else
{ // this is a reply
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
theApp->getOPs().gotFetchPack();
for (int i = 0; i < packet.objects_size(); ++i)
{
const ripple::TMIndexedObject& obj = packet.objects(i);
if (obj.has_hash() && (obj.hash().size() == (256/8)))
{
uint256 hash;
memcpy(hash.begin(), obj.hash().data(), 256 / 8);
uint32 pLSeq = 0;
bool pLDo = true;
boost::shared_ptr< std::vector<unsigned char> > data = boost::make_shared< std::vector<unsigned char> >
(obj.data().begin(), obj.data().end());
if (obj.has_ledgerseq())
{
if (obj.ledgerseq() != pLSeq)
{
pLSeq = obj.ledgerseq();
pLDo = !theApp->getOPs().haveLedger(pLSeq);
if (!pLDo)
{
cLog(lsDEBUG) << "Got pack for " << pLSeq << " too late";
}
else cLog(lsDEBUG) << "Got pack for " << pLSeq;
}
}
theApp->getOPs().addFetchPack(hash, data);
if (pLDo)
{
uint256 hash;
memcpy(hash.begin(), obj.hash().data(), 256 / 8);
boost::shared_ptr< std::vector<unsigned char> > data = boost::make_shared< std::vector<unsigned char> >
(obj.data().begin(), obj.data().end());
theApp->getOPs().addFetchPack(hash, data);
}
}
}
}

View File

@@ -581,8 +581,6 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* have, bool incl
if (max <= 0)
break;
}
cLog(lsINFO) << "Made pack with " << ret.size() << " entries";
return ret;
}

View File

@@ -224,6 +224,7 @@ message TMIndexedObject
optional bytes nodeID = 2;
optional bytes index = 3;
optional bytes data = 4;
optional uint32 ledgerSeq = 5;
}
message TMGetObjectByHash