Merge branch 'develop' of github.com:jedmccaleb/NewCoin into develop

This commit is contained in:
Arthur Britto
2013-04-22 19:33:27 -07:00
20 changed files with 346 additions and 154 deletions

View File

@@ -360,6 +360,7 @@ void Application::sweep()
mSLECache.sweep();
AcceptedLedger::sweep();
SHAMap::sweep();
mNetOps.sweepFetchPack();
mSweepTimer.expires_from_now(boost::posix_time::seconds(theConfig.getSize(siSweepInterval)));
mSweepTimer.async_wait(boost::bind(&Application::sweep, this));
}

View File

@@ -225,7 +225,6 @@ void JobQueue::shutdown()
mJobCond.notify_all();
while (mThreadCount != 0)
mJobCond.wait(sl);
cLog(lsDEBUG) << "Job queue has shut down";
}
void JobQueue::setThreadCount(int c)

View File

@@ -58,7 +58,7 @@ Ledger::Ledger(const uint256 &parentHash, const uint256 &transHash, const uint25
try
{
if (mTransHash.isNonZero())
mTransactionMap->fetchRoot(mTransHash);
mTransactionMap->fetchRoot(mTransHash, NULL);
}
catch (...)
{
@@ -68,7 +68,7 @@ Ledger::Ledger(const uint256 &parentHash, const uint256 &transHash, const uint25
try
{
if (mAccountHash.isNonZero())
mAccountStateMap->fetchRoot(mAccountHash);
mAccountStateMap->fetchRoot(mAccountHash, NULL);
}
catch (...)
{
@@ -1625,7 +1625,7 @@ uint64 Ledger::scaleFeeLoad(uint64 fee, bool bAdmin)
return theApp->getFeeTrack().scaleFeeLoad(fee, mBaseFee, mReferenceFeeUnits, bAdmin);
}
std::vector<uint256> Ledger::getNeededTransactionHashes(int max)
std::vector<uint256> Ledger::getNeededTransactionHashes(int max, SHAMapSyncFilter* filter)
{
std::vector<uint256> ret;
if (mTransHash.isNonZero())
@@ -1633,12 +1633,12 @@ std::vector<uint256> Ledger::getNeededTransactionHashes(int max)
if (mTransactionMap->getHash().isZero())
ret.push_back(mTransHash);
else
ret = mTransactionMap->getNeededHashes(max);
ret = mTransactionMap->getNeededHashes(max, filter);
}
return ret;
}
std::vector<uint256> Ledger::getNeededAccountStateHashes(int max)
std::vector<uint256> Ledger::getNeededAccountStateHashes(int max, SHAMapSyncFilter* filter)
{
std::vector<uint256> ret;
if (mAccountHash.isNonZero())
@@ -1646,7 +1646,7 @@ std::vector<uint256> Ledger::getNeededAccountStateHashes(int max)
if (mAccountStateMap->getHash().isZero())
ret.push_back(mAccountHash);
else
ret = mAccountStateMap->getNeededHashes(max);
ret = mAccountStateMap->getNeededHashes(max, filter);
}
return ret;
}

View File

@@ -222,8 +222,8 @@ public:
static uint256 getLedgerFeatureIndex();
static uint256 getLedgerFeeIndex();
std::vector<uint256> getNeededTransactionHashes(int max);
std::vector<uint256> getNeededAccountStateHashes(int max);
std::vector<uint256> getNeededTransactionHashes(int max, SHAMapSyncFilter* filter);
std::vector<uint256> getNeededAccountStateHashes(int max, SHAMapSyncFilter* filter);
// index calculation functions
static uint256 getAccountRootIndex(const uint160& uAccountID);

View File

@@ -114,9 +114,19 @@ bool LedgerAcquire::tryLocal()
// Nothing we can do without the ledger base
HashedObject::pointer node = theApp->getHashedObjectStore().retrieve(mHash);
if (!node)
return false;
{
std::vector<unsigned char> data;
if (!theApp->getOPs().getFetchPack(mHash, data))
return false;
cLog(lsTRACE) << "Ledger base found in fetch pack";
mLedger = boost::make_shared<Ledger>(data, true);
theApp->getHashedObjectStore().store(hotLEDGER, mLedger->getLedgerSeq(), data, mHash);
}
else
{
mLedger = boost::make_shared<Ledger>(strCopy(node->getData()), true);
}
mLedger = boost::make_shared<Ledger>(strCopy(node->getData()), true);
if (mLedger->getHash() != mHash)
{ // We know for a fact the ledger can never be acquired
cLog(lsWARNING) << mHash << " cannot be a ledger";
@@ -134,9 +144,10 @@ bool LedgerAcquire::tryLocal()
{
try
{
mLedger->peekTransactionMap()->fetchRoot(mLedger->getTransHash());
TransactionStateSF filter(mLedger->getLedgerSeq());
mLedger->peekTransactionMap()->fetchRoot(mLedger->getTransHash(), &filter);
cLog(lsDEBUG) << "Got root txn map locally";
std::vector<uint256> h = mLedger->getNeededTransactionHashes(1);
std::vector<uint256> h = mLedger->getNeededTransactionHashes(1, &filter);
if (h.empty())
{
cLog(lsDEBUG) << "Had full txn map locally";
@@ -157,9 +168,10 @@ bool LedgerAcquire::tryLocal()
{
try
{
mLedger->peekAccountStateMap()->fetchRoot(mLedger->getAccountHash());
AccountStateSF filter(mLedger->getLedgerSeq());
mLedger->peekAccountStateMap()->fetchRoot(mLedger->getAccountHash(), &filter);
cLog(lsDEBUG) << "Got root AS map locally";
std::vector<uint256> h = mLedger->getNeededAccountStateHashes(1);
std::vector<uint256> h = mLedger->getNeededAccountStateHashes(1, &filter);
if (h.empty())
{
cLog(lsDEBUG) << "Had full AS map locally";
@@ -357,13 +369,10 @@ void LedgerAcquire::trigger(Peer::ref peer)
ripple::TMGetObjectByHash tmBH;
tmBH.set_query(true);
tmBH.set_ledgerhash(mHash.begin(), mHash.size());
if (mHaveBase)
tmBH.set_seq(mLedger->getLedgerSeq());
bool typeSet = false;
BOOST_FOREACH(neededHash_t& p, need)
{
cLog(lsWARNING) << "Want: " << p.second;
theApp->getOPs().addWantedHash(p.second);
if (!typeSet)
{
tmBH.set_type(p.first);
@@ -783,7 +792,13 @@ LedgerAcquire::pointer LedgerAcquireMaster::findCreate(const uint256& hash)
ptr->setTimer(); // Cannot call in constructor
}
else
{
Ledger::pointer ledger = ptr->getLedger();
ledger->setClosed();
ledger->setImmutable();
theApp->getLedgerMaster().storeLedger(ledger);
cLog(lsDEBUG) << "Acquiring ledger we already have: " << hash;
}
return ptr;
}
@@ -810,13 +825,15 @@ std::vector<LedgerAcquire::neededHash_t> LedgerAcquire::getNeededHashes()
}
if (!mHaveState)
{
std::vector<uint256> v = mLedger->getNeededAccountStateHashes(4);
AccountStateSF filter(mLedger->getLedgerSeq());
std::vector<uint256> v = mLedger->getNeededAccountStateHashes(4, &filter);
BOOST_FOREACH(const uint256& h, v)
ret.push_back(std::make_pair(ripple::TMGetObjectByHash::otSTATE_NODE, h));
}
if (!mHaveTransactions)
{
std::vector<uint256> v = mLedger->getNeededAccountStateHashes(4);
TransactionStateSF filter(mLedger->getLedgerSeq());
std::vector<uint256> v = mLedger->getNeededAccountStateHashes(4, &filter);
BOOST_FOREACH(const uint256& h, v)
ret.push_back(std::make_pair(ripple::TMGetObjectByHash::otTRANSACTION_NODE, h));
}
@@ -840,7 +857,7 @@ Json::Value LedgerAcquire::getJson(int)
if (mHaveBase && !mHaveState)
{
Json::Value hv(Json::arrayValue);
std::vector<uint256> v = mLedger->peekAccountStateMap()->getNeededHashes(16);
std::vector<uint256> v = mLedger->peekAccountStateMap()->getNeededHashes(16, NULL);
BOOST_FOREACH(const uint256& h, v)
hv.append(h.GetHex());
ret["needed_state_hashes"] = hv;
@@ -848,7 +865,7 @@ Json::Value LedgerAcquire::getJson(int)
if (mHaveBase && !mHaveTransactions)
{
Json::Value hv(Json::arrayValue);
std::vector<uint256> v = mLedger->peekTransactionMap()->getNeededHashes(16);
std::vector<uint256> v = mLedger->peekTransactionMap()->getNeededHashes(16, NULL);
BOOST_FOREACH(const uint256& h, v)
hv.append(h.GetHex());
ret["needed_transaction_hashes"] = hv;

View File

@@ -474,6 +474,12 @@ void LedgerConsensus::statusChange(ripple::NodeEvent event, Ledger& ledger)
s.set_ledgerhashprevious(hash.begin(), hash.size());
hash = ledger.getHash();
s.set_ledgerhash(hash.begin(), hash.size());
uint32 uMin, uMax;
theApp->getOPs().getValidatedRange(uMin, uMax);
s.set_firstseq(uMin);
s.set_lastseq(uMax);
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(s, ripple::mtSTATUS_CHANGE);
theApp->getConnectionPool().relayMessage(NULL, packet);
cLog(lsTRACE) << "send status change to peer";

View File

@@ -265,18 +265,59 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
{
typedef std::pair<uint32, uint256> u_pair;
std::vector<u_pair> vec = origLedger->getLedgerHashes();
BOOST_REVERSE_FOREACH(const u_pair& it, vec)
BOOST_FOREACH(const u_pair& it, vec)
{
if ((fetchCount < fetchMax) && (it.first < ledgerSeq) &&
!mCompleteLedgers.hasValue(it.first) && !theApp->getMasterLedgerAcquire().find(it.second))
{
++fetchCount;
theApp->getMasterLedgerAcquire().findCreate(it.second);
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(it.second);
if (acq && acq->isComplete())
{
acq->getLedger()->setAccepted();
setFullLedger(acq->getLedger());
mLedgerHistory.addAcceptedLedger(acq->getLedger(), false);
}
else ++fetchCount;
}
}
}
}
if (theApp->getOPs().shouldFetchPack())
{ // refill our fetch pack
Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(ledgerSeq + 1);
if (nextLedger)
{
ripple::TMGetObjectByHash tmBH;
tmBH.set_type(ripple::TMGetObjectByHash::otFETCH_PACK);
tmBH.set_query(true);
tmBH.set_seq(ledgerSeq);
tmBH.set_ledgerhash(ledgerHash.begin(), 32);
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
Peer::pointer target;
int count = 0;
BOOST_FOREACH(const Peer::pointer& peer, peerList)
{
if (peer->hasRange(ledgerSeq, ledgerSeq + 1))
{
if (count++ == 0)
target = peer;
else if ((rand() % count) == 0)
target = peer;
}
}
if (target)
{
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tmBH, ripple::mtGET_OBJECTS);
target->sendPacket(packet, false);
}
else
cLog(lsTRACE) << "No peer for fetch pack";
}
}
return true;
}

View File

@@ -182,6 +182,7 @@ public:
void setRemoteFee(uint32);
bool raiseLocalFee();
bool lowerLocalFee();
bool isLoaded() { return (raiseCount != 0) || (mLocalTxnLoadFee != lftNormalFee); }
};

View File

@@ -6,6 +6,7 @@
#include "utils.h"
#include "Application.h"
#include "Transaction.h"
#include "HashPrefixes.h"
#include "LedgerConsensus.h"
#include "LedgerTiming.h"
#include "Log.h"
@@ -35,6 +36,7 @@ NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedge
mMode(omDISCONNECTED), mNeedNetworkLedger(false), mProposing(false), mValidating(false),
mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0), mLastCloseProposers(0),
mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0),
mFetchPack("FetchPack", 2048, 30), mLastFetchPack(0),
mLastLoadBase(256), mLastLoadFactor(256)
{
}
@@ -156,18 +158,6 @@ bool NetworkOPs::isValidated(uint32 seq)
return haveLedger(seq) && (seq <= mLedgerMaster->getValidatedLedger()->getLedgerSeq());
}
bool NetworkOPs::addWantedHash(const uint256& h)
{
boost::recursive_mutex::scoped_lock sl(mWantedHashLock);
return mWantedHashes.insert(h).second;
}
bool NetworkOPs::isWantedHash(const uint256& h, bool remove)
{
boost::recursive_mutex::scoped_lock sl(mWantedHashLock);
return (remove ? mWantedHashes.erase(h) : mWantedHashes.count(h)) != 0;
}
void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, stCallback callback)
{ // this is an asynchronous interface
Serializer s;
@@ -1276,6 +1266,12 @@ Json::Value NetworkOPs::getServerInfo(bool human, bool admin)
info["complete_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers();
size_t fp = mFetchPack.getCacheSize();
if (fp != 0)
info["fetch_pack"] = Json::UInt(fp);
info["peers"] = theApp->getConnectionPool().getPeerCount();
Json::Value lastClose = Json::objectValue;
@@ -2007,7 +2003,7 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays
}
void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer prevLedger, Ledger::pointer reqLedger)
Ledger::pointer wantLedger, Ledger::pointer haveLedger)
{
try
{
@@ -2020,26 +2016,46 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
if (request->has_seq())
reply.set_seq(request->seq());
reply.set_ledgerhash(reply.ledgerhash());
reply.set_type(ripple::TMGetObjectByHash::otFETCH_PACK);
std::list<SHAMap::fetchPackEntry_t> pack =
reqLedger->peekAccountStateMap()->getFetchPack(prevLedger->peekAccountStateMap().get(), false, 1024);
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
do
{
uint32 lSeq = wantLedger->getLedgerSeq();
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
}
newObj.set_hash(wantLedger->getHash().begin(), 256 / 8);
Serializer s(256);
s.add32(sHP_Ledger);
wantLedger->addRaw(s);
newObj.set_data(s.getDataPtr(), s.getLength());
newObj.set_ledgerseq(lSeq);
if (reqLedger->getAccountHash().isNonZero() && (pack.size() < 768))
{
pack = reqLedger->peekTransactionMap()->getFetchPack(NULL, true, 256);
std::list<SHAMap::fetchPackEntry_t> pack = wantLedger->peekAccountStateMap()->getFetchPack(
haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size());
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
newObj.set_ledgerseq(lSeq);
}
}
if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768))
{
pack = wantLedger->peekTransactionMap()->getFetchPack(NULL, true, 256);
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
newObj.set_ledgerseq(lSeq);
}
}
if (reply.objects().size() >= 512)
break;
haveLedger = wantLedger;
wantLedger = getLedgerByHash(haveLedger->getParentHash());
} while (wantLedger);
cLog(lsINFO) << "Built fetch pack with " << reply.objects().size() << " nodes";
PackedMessage::pointer msg = boost::make_shared<PackedMessage>(reply, ripple::mtGET_OBJECTS);
@@ -2051,4 +2067,45 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
}
}
void NetworkOPs::sweepFetchPack()
{
mFetchPack.sweep();
}
void NetworkOPs::addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data)
{
mFetchPack.canonicalize(hash, data, false);
}
bool NetworkOPs::getFetchPack(const uint256& hash, std::vector<unsigned char>& data)
{
bool ret = mFetchPack.retrieve(hash, data);
if (!ret)
return false;
mFetchPack.del(hash, false);
if (hash != Serializer::getSHA512Half(data))
{
cLog(lsWARNING) << "Bad entry in fetch pack";
return false;
}
return true;
}
bool NetworkOPs::shouldFetchPack()
{
uint32 now = getNetworkTimeNC();
if (mLastFetchPack == now)
return false;
mFetchPack.sweep();
if (mFetchPack.getCacheSize() > 384)
return false;
mLastFetchPack = now;
return true;
}
int NetworkOPs::getFetchSize()
{
return mFetchPack.getCacheSize();
}
// vim:ts=4

View File

@@ -128,8 +128,8 @@ protected:
subMapType mSubTransactions; // all accepted transactions
subMapType mSubRTTransactions; // all proposed and accepted transactions
boost::recursive_mutex mWantedHashLock;
boost::unordered_set<uint256> mWantedHashes;
TaggedCache< uint256, std::vector<unsigned char> > mFetchPack;
uint32 mLastFetchPack;
uint32 mLastLoadBase;
uint32 mLastLoadFactor;
@@ -261,7 +261,13 @@ public:
void mapComplete(const uint256& hash, SHAMap::ref map);
bool stillNeedTXSet(const uint256& hash);
void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer prevLedger, Ledger::pointer reqLedger);
Ledger::pointer wantLedger, Ledger::pointer haveLedger);
bool shouldFetchPack();
void gotFetchPack() { mLastFetchPack = 0; }
void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data);
bool getFetchPack(const uint256& hash, std::vector<unsigned char>& data);
int getFetchSize();
void sweepFetchPack();
// network state machine
void checkState(const boost::system::error_code& result);
@@ -294,9 +300,6 @@ public:
uint256 getConsensusLCL();
void reportFeeChange();
bool addWantedHash(const uint256& h);
bool isWantedHash(const uint256& h, bool remove);
//Helper function to generate SQL query to get transactions
std::string transactionsSQL(std::string selection, const RippleAddress& account,
int32 minLedger, int32 maxLedger, bool descending, uint32 offset, int limit,

View File

@@ -36,6 +36,8 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx,
mPeerId(peerID),
mPrivate(false),
mLoad(""),
mMinLedger(0),
mMaxLedger(0),
mSocketSsl(io_service, ctx),
mActivityTimer(io_service),
mIOStrand(io_service)
@@ -727,7 +729,7 @@ void Peer::recvHello(ripple::TMHello& packet)
cLog(lsINFO) << "Recv(Hello): " << getIP() << " :Clock far off -" << ourTime - packet.nettime();
}
}
else if (packet.protoversionmin() < MAKE_VERSION_INT(MIN_PROTO_MAJOR, MIN_PROTO_MINOR))
else if (packet.protoversionmin() > MAKE_VERSION_INT(PROTO_VERSION_MAJOR, PROTO_VERSION_MINOR))
{
cLog(lsINFO) << "Recv(Hello): Server requires protocol version " <<
GET_VERSION_MAJOR(packet.protoversion()) << "." << GET_VERSION_MINOR(packet.protoversion())
@@ -1189,14 +1191,14 @@ void Peer::recvPeers(ripple::TMPeers& packet)
void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& ptr)
{
ripple::TMGetObjectByHash& packet = *ptr;
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
{
doFetchPack(ptr);
return;
}
if (packet.query())
{ // this is a query
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
{
doFetchPack(ptr);
return;
}
ripple::TMGetObjectByHash reply;
reply.set_query(false);
@@ -1233,42 +1235,43 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
}
else
{ // this is a reply
uint32 seq = packet.has_seq() ? packet.seq() : 0;
HashedObjectType type;
switch (packet.type())
{
case ripple::TMGetObjectByHash::otLEDGER: type = hotLEDGER; break;
case ripple::TMGetObjectByHash::otTRANSACTION: type = hotTRANSACTION; break;
case ripple::TMGetObjectByHash::otSTATE_NODE: type = hotACCOUNT_NODE; break;
case ripple::TMGetObjectByHash::otTRANSACTION_NODE: type = hotTRANSACTION_NODE; break;
default: type = hotUNKNOWN;
}
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
theApp->getOPs().gotFetchPack();
uint32 pLSeq = 0;
bool pLDo = true;
for (int i = 0; i < packet.objects_size(); ++i)
{
const ripple::TMIndexedObject& obj = packet.objects(i);
if (obj.has_hash() && (obj.hash().size() == (256/8)))
{
uint256 hash;
memcpy(hash.begin(), obj.hash().data(), 256 / 8);
if (theApp->getOPs().isWantedHash(hash, true))
if (obj.has_ledgerseq())
{
std::vector<unsigned char> data(obj.data().begin(), obj.data().end());
if (Serializer::getSHA512Half(data) != hash)
if (obj.ledgerseq() != pLSeq)
{
cLog(lsWARNING) << "Bad hash in data from peer";
theApp->getOPs().addWantedHash(hash);
punishPeer(LT_BadData);
}
else
{
cLog(lsDEBUG) << "Got wanted hash " << hash;
theApp->getHashedObjectStore().store(type, seq, data, hash);
tLog(pLDo && (pLSeq != 0), lsDEBUG) << "Recevied full fetch pack for " << pLSeq;
pLSeq = obj.ledgerseq();
pLDo = !theApp->getOPs().haveLedger(pLSeq);
if (!pLDo)
{
cLog(lsDEBUG) << "Got pack for " << pLSeq << " too late";
}
}
}
else
cLog(lsWARNING) << "Received unwanted hash " << getIP() << " " << hash;
if (pLDo)
{
uint256 hash;
memcpy(hash.begin(), obj.hash().data(), 256 / 8);
boost::shared_ptr< std::vector<unsigned char> > data = boost::make_shared< std::vector<unsigned char> >
(obj.data().begin(), obj.data().end());
theApp->getOPs().addFetchPack(hash, data);
}
}
}
tLog(pLDo && (pLSeq != 0), lsDEBUG) << "Received partial fetch pack for " << pLSeq;
}
}
@@ -1403,6 +1406,11 @@ void Peer::recvStatus(ripple::TMStatusChange& packet)
addLedger(mPreviousLedgerHash);
}
else mPreviousLedgerHash.zero();
if (packet.has_firstseq())
mMinLedger = packet.firstseq();
if (packet.has_lastseq())
mMaxLedger = packet.lastseq();
}
void Peer::recvGetLedger(ripple::TMGetLedger& packet)
@@ -1857,46 +1865,49 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr<Peer> peer, ProofOfWork::pointer
void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet)
{
if (packet->query())
if (theApp->getFeeTrack().isLoaded())
{
if (packet->ledgerhash().size() != 32)
{
cLog(lsWARNING) << "FetchPack hash size malformed";
punishPeer(LT_InvalidRequest);
return;
}
uint256 hash;
memcpy(hash.begin(), packet->ledgerhash().data(), 32);
Ledger::pointer reqLedger = theApp->getOPs().getLedgerByHash(hash);
if (!reqLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
if (!reqLedger->isClosed())
{
cLog(lsWARNING) << "Peer requests fetch pack for open ledger: " << hash;
punishPeer(LT_InvalidRequest);
return;
}
Ledger::pointer prevLedger = theApp->getOPs().getLedgerByHash(reqLedger->getParentHash());
if (!prevLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack",
BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1,
boost::weak_ptr<Peer>(shared_from_this()), packet, prevLedger, reqLedger));
cLog(lsINFO) << "Too busy to make fetch pack";
return;
}
else
{ // received fetch pack
// WRITEME
if (packet->ledgerhash().size() != 32)
{
cLog(lsWARNING) << "FetchPack hash size malformed";
punishPeer(LT_InvalidRequest);
return;
}
uint256 hash;
memcpy(hash.begin(), packet->ledgerhash().data(), 32);
Ledger::pointer haveLedger = theApp->getOPs().getLedgerByHash(hash);
if (!haveLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
if (!haveLedger->isClosed())
{
cLog(lsWARNING) << "Peer requests fetch pack from open ledger: " << hash;
punishPeer(LT_InvalidRequest);
return;
}
Ledger::pointer wantLedger = theApp->getOPs().getLedgerByHash(haveLedger->getParentHash());
if (!wantLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack",
BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1,
boost::weak_ptr<Peer>(shared_from_this()), packet, wantLedger, haveLedger));
}
bool Peer::hasProto(int version)
{
return mHello.has_protoversion() && (mHello.protoversion() >= version);
}
Json::Value Peer::getJson()

View File

@@ -47,6 +47,7 @@ private:
uint64 mPeerId;
bool mPrivate; // Keep peer IP private.
LoadSource mLoad;
uint32 mMinLedger, mMaxLedger;
uint256 mClosedLedgerHash;
uint256 mPreviousLedgerHash;
@@ -162,6 +163,8 @@ public:
const RippleAddress& getNodePublic() const { return mNodePublic; }
void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); }
bool hasProto(int version);
bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); }
};
#endif

View File

@@ -731,7 +731,7 @@ SHAMapTreeNode::pointer SHAMap::fetchNodeExternal(const SHAMapNode& id, const ui
}
}
void SHAMap::fetchRoot(const uint256& hash)
void SHAMap::fetchRoot(const uint256& hash, SHAMapSyncFilter* filter)
{
if (sLog(lsTRACE))
{
@@ -742,7 +742,20 @@ void SHAMap::fetchRoot(const uint256& hash)
else
cLog(lsTRACE) << "Fetch root SHAMap node " << hash;
}
root = fetchNodeExternal(SHAMapNode(), hash);
try
{
root = fetchNodeExternal(SHAMapNode(), hash);
}
catch (SHAMapMissingNode& mn)
{
std::vector<unsigned char> nodeData;
if (!filter || !filter->haveNode(SHAMapNode(), hash, nodeData))
throw;
root = boost::make_shared<SHAMapTreeNode>(SHAMapNode(), nodeData,
mSeq - 1, snfPREFIX, hash, true);
mTNByID[*root] = root;
filter->gotNode(true, SHAMapNode(), hash, nodeData, root->getType());
}
assert(root->getNodeHash() == hash);
}

View File

@@ -21,6 +21,7 @@ DEFINE_INSTANCE(SHAMapItem);
DEFINE_INSTANCE(SHAMapTreeNode);
class SHAMap;
class SHAMapSyncFilter;
// A tree-like map of SHA256 hashes
// The trees are designed for rapid synchronization and compression of differences
@@ -253,7 +254,7 @@ public:
SHAMapSyncFilter() { ; }
virtual ~SHAMapSyncFilter() { ; }
virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash,
virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash,
const std::vector<unsigned char>& nodeData, SHAMapTreeNode::TNType type)
{ ; }
@@ -397,7 +398,7 @@ public:
ScopedLock Lock() const { return ScopedLock(mLock); }
bool hasNode(const SHAMapNode& id);
void fetchRoot(const uint256& hash);
void fetchRoot(const uint256& hash, SHAMapSyncFilter* filter);
// normal hash access functions
bool hasItem(const uint256& id);
@@ -431,7 +432,7 @@ public:
bool getNodeFat(const SHAMapNode& node, std::vector<SHAMapNode>& nodeIDs,
std::list<std::vector<unsigned char> >& rawNode, bool fatRoot, bool fatLeaves);
bool getRootNode(Serializer& s, SHANodeFormat format);
std::vector<uint256> getNeededHashes(int max);
std::vector<uint256> getNeededHashes(int max, SHAMapSyncFilter* filter);
SMAddNode addRootNode(const uint256& hash, const std::vector<unsigned char>& rootNode, SHANodeFormat format,
SHAMapSyncFilter* filter);
SMAddNode addRootNode(const std::vector<unsigned char>& rootNode, SHANodeFormat format,
@@ -477,7 +478,7 @@ public:
virtual void dump(bool withHashes = false);
typedef std::pair< uint256, std::vector<unsigned char> > fetchPackEntry_t;
std::list<fetchPackEntry_t> getFetchPack(SHAMap* prior, bool includeLeaves, int max);
std::list<fetchPackEntry_t> getFetchPack(SHAMap* have, bool includeLeaves, int max);
static void sweep() { fullBelowCache.sweep(); }
};

View File

@@ -69,10 +69,12 @@ void SHAMap::getMissingNodes(std::vector<SHAMapNode>& nodeIDs, std::vector<uint2
{
assert(mSeq >= 1);
SHAMapTreeNode::pointer ptr =
boost::make_shared<SHAMapTreeNode>(childID, nodeData, mSeq - 1, snfPREFIX, childHash, true);
boost::make_shared<SHAMapTreeNode>(childID, nodeData, mSeq - 1,
snfPREFIX, childHash, true);
cLog(lsTRACE) << "Got sync node from cache: " << *ptr;
mTNByID[*ptr] = ptr;
d = ptr.get();
filter->gotNode(true, childID, childHash, nodeData, ptr->getType());
}
}
}
@@ -106,7 +108,7 @@ void SHAMap::getMissingNodes(std::vector<SHAMapNode>& nodeIDs, std::vector<uint2
clearSynching();
}
std::vector<uint256> SHAMap::getNeededHashes(int max)
std::vector<uint256> SHAMap::getNeededHashes(int max, SHAMapSyncFilter* filter)
{
std::vector<uint256> ret;
boost::recursive_mutex::scoped_lock sl(mLock);
@@ -135,20 +137,38 @@ std::vector<uint256> SHAMap::getNeededHashes(int max)
if (!node->isEmptyBranch(branch))
{
const uint256& childHash = node->getChildHash(branch);
SHAMapNode childID = node->getChildNodeID(branch);
if (!fullBelowCache.isPresent(childHash))
{
SHAMapTreeNode* d = NULL;
try
{
SHAMapTreeNode* d = getNodePointer(node->getChildNodeID(branch), childHash);
d = getNodePointer(node->getChildNodeID(branch), childHash);
assert(d);
}
catch (SHAMapMissingNode&)
{ // node is not in the map
std::vector<unsigned char> nodeData;
if (filter && filter->haveNode(childID, childHash, nodeData))
{
SHAMapTreeNode::pointer ptr =
boost::make_shared<SHAMapTreeNode>(childID, nodeData, mSeq -1,
snfPREFIX, childHash, true);
mTNByID[*ptr] = ptr;
d = ptr.get();
filter->gotNode(true, childID, childHash, nodeData, ptr->getType());
}
}
if (d)
{
if (d->isInner() && !d->isFullBelow())
{
have_all = false;
stack.push(d);
}
}
catch (SHAMapMissingNode&)
{ // node is not in the map
else
{
have_all = false;
ret.push_back(childHash);
if (--max <= 0)
@@ -263,7 +283,7 @@ SMAddNode SHAMap::addRootNode(const std::vector<unsigned char>& rootNode, SHANod
{
Serializer s;
root->addRaw(s, snfPREFIX);
filter->gotNode(*root, root->getNodeHash(), s.peekData(), root->getType());
filter->gotNode(false, *root, root->getNodeHash(), s.peekData(), root->getType());
}
return SMAddNode::useful();
@@ -299,7 +319,7 @@ SMAddNode SHAMap::addRootNode(const uint256& hash, const std::vector<unsigned ch
{
Serializer s;
root->addRaw(s, snfPREFIX);
filter->gotNode(*root, root->getNodeHash(), s.peekData(), root->getType());
filter->gotNode(false, *root, root->getNodeHash(), s.peekData(), root->getType());
}
return SMAddNode::useful();
@@ -371,7 +391,7 @@ SMAddNode SHAMap::addKnownNode(const SHAMapNode& node, const std::vector<unsigne
{
Serializer s;
newNode->addRaw(s, snfPREFIX);
filter->gotNode(node, hash, s.peekData(), newNode->getType());
filter->gotNode(false, node, hash, s.peekData(), newNode->getType());
}
mTNByID[*newNode] = newNode;
@@ -472,7 +492,7 @@ bool SHAMap::deepCompare(SHAMap& other)
bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash)
{
SHAMapTreeNode* node = root.get();
while (node->isInner() && (node->getDepth() <= nodeID.getDepth()))
while (node->isInner() && (node->getDepth() < nodeID.getDepth()))
{
int branch = node->selectBranch(nodeID.getNodeID());
if (node->isEmptyBranch(branch))
@@ -482,14 +502,30 @@ bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash)
return node->getNodeHash() == nodeHash;
}
std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* prior, bool includeLeaves, int max)
std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* have, bool includeLeaves, int max)
{
std::list<fetchPackEntry_t> ret;
boost::recursive_mutex::scoped_lock ul1(mLock);
UPTR_T< boost::unique_lock<boost::recursive_mutex> > ul2;
if (have)
{
UPTR_T< boost::unique_lock<boost::recursive_mutex> > ul3(
new boost::unique_lock<boost::recursive_mutex>(have->mLock, boost::try_to_lock));
if (!(*ul3))
{
cLog(lsINFO) << "Unable to create pack due to lock";
return ret;
}
ul2.swap(ul3);
}
if (root->isLeaf())
{
if (includeLeaves && !root->getNodeHash().isZero() &&
(!prior || !prior->hasNode(*root, root->getNodeHash())))
(!have || !have->hasNode(*root, root->getNodeHash())))
{
Serializer s;
root->addRaw(s, snfPREFIX);
@@ -501,7 +537,7 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* prior, bool inc
if (root->getNodeHash().isZero())
return ret;
if (prior && (root->getNodeHash() == prior->root->getNodeHash()))
if (have && (root->getNodeHash() == have->root->getNodeHash()))
return ret;
std::stack<SHAMapTreeNode*> stack; // contains unexplored non-matching inner node entries
@@ -529,10 +565,10 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* prior, bool inc
SHAMapTreeNode *next = getNodePointer(childID, childHash);
if (next->isInner())
{
if (!prior || !prior->hasNode(*next, childHash))
if (!have || !have->hasNode(*next, childHash))
stack.push(next);
}
else if (includeLeaves && (!prior || !prior->hasNode(childID, childHash)))
else if (includeLeaves && (!have || !have->hasNode(childID, childHash)))
{
Serializer s;
node->addRaw(s, snfPREFIX);
@@ -545,8 +581,6 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* prior, bool inc
if (max <= 0)
break;
}
cLog(lsINFO) << "Fetch pack has " << ret.size() << " entries";
return ret;
}

View File

@@ -13,7 +13,7 @@ class ConsensusTransSetSF : public SHAMapSyncFilter
public:
ConsensusTransSetSF() { ; }
virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash,
virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash,
const std::vector<unsigned char>& nodeData, SHAMapTreeNode::TNType);
virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector<unsigned char>& nodeData);
@@ -29,14 +29,14 @@ public:
AccountStateSF(uint32 ledgerSeq) : mLedgerSeq(ledgerSeq)
{ ; }
virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash,
virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash,
const std::vector<unsigned char>& nodeData, SHAMapTreeNode::TNType)
{
theApp->getHashedObjectStore().store(hotACCOUNT_NODE, mLedgerSeq, nodeData, nodeHash);
}
virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector<unsigned char>& nodeData)
{ // fetchNodeExternal already tried
return false;
{
return theApp->getOPs().getFetchPack(nodeHash, nodeData);
}
};
@@ -50,7 +50,7 @@ public:
TransactionStateSF(uint32 ledgerSeq) : mLedgerSeq(ledgerSeq)
{ ; }
virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash,
virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash,
const std::vector<unsigned char>& nodeData, SHAMapTreeNode::TNType type)
{
theApp->getHashedObjectStore().store(
@@ -58,8 +58,8 @@ public:
mLedgerSeq, nodeData, nodeHash);
}
virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector<unsigned char>& nodeData)
{ // fetchNodeExternal already tried
return false;
{
return theApp->getOPs().getFetchPack(nodeHash, nodeData);
}
};

View File

@@ -232,7 +232,7 @@ template<typename c_Key, typename c_Data> bool TaggedCache<c_Key, c_Data>::del(c
if (!valid || entry.isExpired())
mCache.erase(cit);
return true;
return ret;
}
template<typename c_Key, typename c_Data>

View File

@@ -195,9 +195,11 @@ SMAddNode TransactionAcquire::takeNodes(const std::list<SHAMapNode>& nodeIDs,
}
}
void ConsensusTransSetSF::gotNode(const SHAMapNode& id, const uint256& nodeHash,
void ConsensusTransSetSF::gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash,
const std::vector<unsigned char>& nodeData, SHAMapTreeNode::TNType type)
{
if (fromFilter)
return;
theApp->getTempNodeCache().store(nodeHash, nodeData);
if ((type == SHAMapTreeNode::tnTRANSACTION_NM) && (nodeData.size() > 16))
{ // this is a transaction, and we didn't have it

View File

@@ -6,7 +6,7 @@
#define SERVER_VERSION_MAJOR 0
#define SERVER_VERSION_MINOR 8
#define SERVER_VERSION_SUB "-b"
#define SERVER_VERSION_SUB "-c"
#define SERVER_NAME "Ripple"
#define SV_STRINGIZE(x) SV_STRINGIZE2(x)

View File

@@ -119,6 +119,8 @@ message TMStatusChange {
optional bytes ledgerHash = 4;
optional bytes ledgerHashPrevious = 5;
optional uint64 networkTime = 6;
optional uint32 firstSeq = 7;
optional uint32 lastSeq = 8;
}
@@ -222,6 +224,7 @@ message TMIndexedObject
optional bytes nodeID = 2;
optional bytes index = 3;
optional bytes data = 4;
optional uint32 ledgerSeq = 5;
}
message TMGetObjectByHash