Merge branch 'develop' into faster_acquire

This commit is contained in:
JoelKatz
2013-05-24 10:06:29 -07:00
29 changed files with 231 additions and 125 deletions

4
.gitattributes vendored
View File

@@ -1,6 +1,10 @@
# Set default behaviour, in case users don't have core.autocrlf set.
* text=auto
# These annoying files
rippled.1 binary
LICENSE binary
# Visual Studio
*.sln text eol=crlf
*.vcproj text eol=crlf

View File

@@ -2,7 +2,7 @@
#include <boost/foreach.hpp>
TaggedCache<uint256, AcceptedLedger> AcceptedLedger::ALCache("AcceptedLedger", 4, 60);
TaggedCache<uint256, AcceptedLedger> AcceptedLedger::ALCache("AcceptedLedger", 8, 120);
ALTransaction::ALTransaction(uint32 seq, SerializerIterator& sit)
{

View File

@@ -70,6 +70,8 @@ public:
int getLedgerSeq() const { return mLedger->getLedgerSeq(); }
int getTxnCount() const { return mMap.size(); }
static float getCacheHitRate() { return ALCache.getHitRate(); }
ALTransaction::pointer getTxn(int) const;
};

View File

@@ -29,14 +29,12 @@ void AccountItems::fillItems(const uint160& accountID, Ledger::ref ledger)
SLE::pointer ownerDir = ledger->getDirNode(currentIndex);
if (!ownerDir) return;
STVector256 svOwnerNodes = ownerDir->getFieldV256(sfIndexes);
BOOST_FOREACH(uint256& uNode, svOwnerNodes.peekValue())
BOOST_FOREACH(const uint256& uNode, ownerDir->getFieldV256(sfIndexes).peekValue())
{
SLE::pointer sleCur = ledger->getSLEi(uNode);
AccountItem::pointer item = mOfType->makeItem(accountID, sleCur);
if(item)
if (item)
{
mItems.push_back(item);
}

View File

@@ -437,7 +437,18 @@ Json::Value RPCParser::parseAccountLines(const Json::Value& jvParams)
Json::Value RPCParser::parseAccountRaw(const Json::Value& jvParams, bool bPeer)
{
std::string strIdent = jvParams[0u].asString();
std::string strPeer = bPeer && jvParams.size() >= 2 ? jvParams[1u].asString() : "";
unsigned int iCursor = jvParams.size();
bool bStrict = false;
std::string strPeer;
if (!bPeer && iCursor >= 2 && jvParams[iCursor-1] == "strict")
{
bStrict = true;
--iCursor;
}
if (bPeer && iCursor >= 2)
strPeer = jvParams[iCursor].asString();
int iIndex = 0;
// int iIndex = jvParams.size() >= 2 ? lexical_cast_s<int>(jvParams[1u].asString()) : 0;
@@ -452,6 +463,9 @@ Json::Value RPCParser::parseAccountRaw(const Json::Value& jvParams, bool bPeer)
jvRequest["account"] = strIdent;
if (bStrict)
jvRequest["strict"] = 1;
if (iIndex)
jvRequest["account_index"] = iIndex;
@@ -465,7 +479,7 @@ Json::Value RPCParser::parseAccountRaw(const Json::Value& jvParams, bool bPeer)
jvRequest["peer"] = strPeer;
}
if (jvParams.size() == (2+bPeer) && !jvParseLedger(jvRequest, jvParams[1u+bPeer].asString()))
if (iCursor == (2+bPeer) && !jvParseLedger(jvRequest, jvParams[1u+bPeer].asString()))
return rpcError(rpcLGR_IDX_MALFORMED);
return jvRequest;

View File

@@ -86,14 +86,14 @@ bool HashedObjectStore::storeLevelDB(HashedObjectType type, uint32 index,
{
mWritePending = true;
theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store",
BIND_TYPE(&HashedObjectStore::bulkWriteLevelDB, this));
BIND_TYPE(&HashedObjectStore::bulkWriteLevelDB, this, P_1));
}
}
mNegativeCache.del(hash);
return true;
}
void HashedObjectStore::bulkWriteLevelDB()
void HashedObjectStore::bulkWriteLevelDB(Job &)
{
assert(mLevelDB);
int setSize = 0;
@@ -205,7 +205,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index,
{
mWritePending = true;
theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store",
BIND_TYPE(&HashedObjectStore::bulkWriteSQLite, this));
BIND_TYPE(&HashedObjectStore::bulkWriteSQLite, this, P_1));
}
}
// else
@@ -214,7 +214,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index,
return true;
}
void HashedObjectStore::bulkWriteSQLite()
void HashedObjectStore::bulkWriteSQLite(Job&)
{
assert(!mLevelDB);
while (1)

View File

@@ -15,6 +15,8 @@
DEFINE_INSTANCE(HashedObject);
class Job;
enum HashedObjectType
{
hotUNKNOWN = 0,
@@ -65,7 +67,9 @@ public:
HashedObjectStore(int cacheSize, int cacheAge);
bool isLevelDB() { return mLevelDB; }
bool isLevelDB() { return mLevelDB; }
float getCacheHitRate() { return mCache.getHitRate(); }
bool store(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash)
@@ -89,13 +93,13 @@ public:
bool storeSQLite(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash);
HashedObject::pointer retrieveSQLite(const uint256& hash);
void bulkWriteSQLite();
void bulkWriteSQLite(Job&);
#ifdef USE_LEVELDB
bool storeLevelDB(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash);
HashedObject::pointer retrieveLevelDB(const uint256& hash);
void bulkWriteLevelDB();
void bulkWriteLevelDB(Job&);
#endif

View File

@@ -699,6 +699,7 @@ Ledger::pointer Ledger::getSQL1(SqliteStatement *stmt)
void Ledger::getSQL2(Ledger::ref ret)
{
ret->setClosed();
ret->setImmutable();
if (theApp->getOPs().haveLedger(ret->getLedgerSeq()))
ret->setAccepted();
cLog(lsTRACE) << "Loaded ledger: " << ret->getHash().GetHex();

View File

@@ -695,7 +695,8 @@ void LedgerConsensus::updateOurPositions()
for (std::map<uint32, int>::iterator it = closeTimes.begin(), end = closeTimes.end(); it != end; ++it)
{
cLog(lsDEBUG) << "CCTime: " << it->first << " has " << it->second << ", " << threshVote << " required";
cLog(lsDEBUG) << "CCTime: seq" << mPreviousLedger->getLedgerSeq() + 1 << ": " <<
it->first << " has " << it->second << ", " << threshVote << " required";
if (it->second >= threshVote)
{
cLog(lsDEBUG) << "Close time consensus reached: " << it->first;
@@ -1205,6 +1206,7 @@ void LedgerConsensus::accept(SHAMap::ref set, LoadEvent::pointer)
newLCL->setAccepted(closeTime, mCloseResolution, closeTimeCorrect);
newLCL->updateHash();
newLCL->setImmutable();
cLog(lsDEBUG) << "Report: NewL = " << newLCL->getHash() << ":" << newLCL->getLedgerSeq();
uint256 newLCLHash = newLCL->getHash();

View File

@@ -15,6 +15,7 @@ public:
void addLedger(Ledger::pointer ledger);
void addAcceptedLedger(Ledger::pointer ledger, bool fromConsensus);
float getCacheHitRate() { return mLedgersByHash.getHitRate(); }
uint256 getLedgerHash(uint32 index);
Ledger::pointer getLedgerBySeq(uint32 index);
Ledger::pointer getLedgerByHash(const uint256& hash);

View File

@@ -26,6 +26,22 @@ Ledger::ref LedgerMaster::getCurrentSnapshot()
return mCurrentSnapshot;
}
int LedgerMaster::getValidatedLedgerAge()
{
if (!mValidLedger)
{
cLog(lsDEBUG) << "No validated ledger";
return 999999;
}
int64 ret = theApp->getOPs().getCloseTimeNC();
ret -= static_cast<int64>(mValidLedger->getCloseTimeNC());
ret = std::max(0LL, ret);
cLog(lsTRACE) << "Validated ledger age is " << ret;
return static_cast<int>(ret);
}
void LedgerMaster::addHeldTransaction(Transaction::ref transaction)
{ // returns true if transaction was added
boost::recursive_mutex::scoped_lock ml(mLock);
@@ -557,7 +573,10 @@ void LedgerMaster::checkAccept(const uint256& hash, uint32 seq)
Ledger::pointer ledger = mLedgerHistory.getLedgerByHash(hash);
if (!ledger)
{
theApp->getMasterLedgerAcquire().findCreate(hash, seq);
return;
}
mValidLedger = ledger;
tryPublish();
@@ -575,6 +594,8 @@ void LedgerMaster::tryPublish()
}
else if (mValidLedger->getLedgerSeq() > (mPubLedger->getLedgerSeq() + MAX_LEDGER_GAP))
{
cLog(lsWARNING) << "Gap in validated ledger stream " << mPubLedger->getLedgerSeq() << " - " <<
mValidLedger->getLedgerSeq() - 1;
mPubLedger = mValidLedger;
mPubLedgers.push_back(mValidLedger);
}
@@ -613,11 +634,11 @@ void LedgerMaster::tryPublish()
{
if (theApp->getMasterLedgerAcquire().isFailure(hash))
{
cLog(lsFATAL) << "Unable to acquire a recent validated ledger";
cLog(lsWARNING) << "Unable to acquire a recent validated ledger";
}
else
{
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(hash, 0);
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(hash, seq);
if (!acq->isDone())
{
acq->setAccept();

View File

@@ -83,6 +83,7 @@ public:
// The published ledger is the last fully validated ledger
Ledger::ref getValidatedLedger() { return mPubLedger; }
int getValidatedLedgerAge();
TER doTransaction(SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply);
@@ -157,8 +158,9 @@ public:
void resumeAcquiring();
void tune(int size, int age) { mLedgerHistory.tune(size, age); }
void sweep(void) { mLedgerHistory.sweep(); }
void tune(int size, int age) { mLedgerHistory.tune(size, age); }
void sweep() { mLedgerHistory.sweep(); }
float getCacheHitRate() { return mLedgerHistory.getCacheHitRate(); }
void addValidateCallback(callback& c) { mOnValidate.push_back(c); }

View File

@@ -285,6 +285,12 @@ bool LoadFeeTrack::raiseLocalFee()
return true;
}
bool LoadFeeTrack::isLoaded()
{
boost::mutex::scoped_lock sl(mLock);
return (raiseCount != 0) || (mLocalTxnLoadFee != lftNormalFee);
}
bool LoadFeeTrack::lowerLocalFee()
{
boost::mutex::scoped_lock sl(mLock);

View File

@@ -185,7 +185,7 @@ public:
void setRemoteFee(uint32);
bool raiseLocalFee();
bool lowerLocalFee();
bool isLoaded() { return (raiseCount != 0) || (mLocalTxnLoadFee != lftNormalFee); }
bool isLoaded();
};

View File

@@ -49,6 +49,7 @@ std::string NetworkOPs::strOperatingMode()
static const char* paStatusToken[] = {
"disconnected",
"connected",
"syncing",
"tracking",
"full"
};
@@ -615,6 +616,12 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
cLog(lsINFO) << "Node count (" << peerList.size() << ") is sufficient.";
}
// Check if the last validated ledger forces a change between these states
if (mMode == omSYNCING)
setMode(omSYNCING);
else if (mMode == omCONNECTED)
setMode(omCONNECTED);
if (!mConsensus)
tryStartConsensus();
@@ -638,14 +645,14 @@ void NetworkOPs::tryStartConsensus()
// there shouldn't be a newer LCL. We need this information to do the next three
// tests.
if ((mMode == omCONNECTED) && !ledgerChange)
if (((mMode == omCONNECTED) || (mMode == omSYNCING)) && !ledgerChange)
{ // count number of peers that agree with us and UNL nodes whose validations we have for LCL
// if the ledger is good enough, go to omTRACKING - TODO
if (!mNeedNetworkLedger)
setMode(omTRACKING);
}
if ((mMode == omTRACKING) && !ledgerChange )
if (((mMode == omCONNECTED) || (mMode == omTRACKING)) && !ledgerChange)
{
// check if the ledger is good enough to go to omFULL
// Note: Do not go to omFULL if we don't have the previous ledger
@@ -654,12 +661,6 @@ void NetworkOPs::tryStartConsensus()
setMode(omFULL);
}
if (mMode == omFULL)
{
// WRITEME
// check if the ledger is bad enough to go to omTRACKING
}
if ((!mConsensus) && (mMode != omDISCONNECTED))
beginConsensus(networkClosed, mLedgerMaster->getCurrentLedger());
}
@@ -783,24 +784,7 @@ bool NetworkOPs::checkLastClosedLedger(const std::vector<Peer::pointer>& peerLis
return true;
}
if (!mAcquiringLedger->isComplete())
{ // add more peers
int count = 0;
BOOST_FOREACH(Peer::ref it, peerList)
{
if (it->getClosedLedgerHash() == closedLedger)
{
++count;
mAcquiringLedger->peerHas(it);
}
}
if (!count)
{ // just ask everyone
BOOST_FOREACH(Peer::ref it, peerList)
if (it->isConnected())
mAcquiringLedger->peerHas(it);
}
return true;
}
clearNeedNetworkLedger();
consensus = mAcquiringLedger->getLedger();
}
@@ -1053,7 +1037,21 @@ void NetworkOPs::pubServer()
void NetworkOPs::setMode(OperatingMode om)
{
if (mMode == om) return;
if (om == omCONNECTED)
{
if (theApp->getLedgerMaster().getValidatedLedgerAge() < 60)
om = omSYNCING;
}
if (om == omSYNCING)
{
if (theApp->getLedgerMaster().getValidatedLedgerAge() >= 60)
om = omCONNECTED;
}
if (mMode == om)
return;
if ((om >= omCONNECTED) && (mMode == omDISCONNECTED))
mConnectTime = boost::posix_time::second_clock::universal_time();
@@ -1415,7 +1413,7 @@ void NetworkOPs::pubLedger(Ledger::ref accepted)
jvObj["txn_count"] = Json::UInt(alpAccepted->getTxnCount());
if ((mMode == omFULL) || (mMode == omTRACKING))
if (mMode >= omSYNCING)
jvObj["validated_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers();
NetworkOPs::subMapType::const_iterator it = mSubLedger.begin();
@@ -1748,7 +1746,7 @@ bool NetworkOPs::subLedger(InfoSub::ref isrListener, Json::Value& jvResult)
jvResult["reserve_inc"] = Json::UInt(lpClosed->getReserveInc());
}
if (((mMode == omFULL) || (mMode == omTRACKING)) && !isNeedNetworkLedger())
if ((mMode >= omSYNCING) && !isNeedNetworkLedger())
jvResult["validated_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers();
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
@@ -2004,9 +2002,21 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays
// jvResult["nodes"] = Json::Value(Json::arrayValue);
}
void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer wantLedger, Ledger::pointer haveLedger)
void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer,
boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer wantLedger, Ledger::pointer haveLedger, uint32 uUptime)
{
if (upTime() > (uUptime + 1))
{
cLog(lsINFO) << "Fetch pack request got stale";
return;
}
if (theApp->getFeeTrack().isLoaded())
{
cLog(lsINFO) << "Too busy to make fetch pack";
return;
}
try
{
Peer::pointer peer = wPeer.lock();
@@ -2017,7 +2027,7 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
reply.set_query(false);
if (request->has_seq())
reply.set_seq(request->seq());
reply.set_ledgerhash(reply.ledgerhash());
reply.set_ledgerhash(request->ledgerhash());
reply.set_type(ripple::TMGetObjectByHash::otFETCH_PACK);
do
@@ -2042,7 +2052,7 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
newObj.set_ledgerseq(lSeq);
}
if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768))
if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 512))
{
pack = wantLedger->peekTransactionMap()->getFetchPack(NULL, true, 256);
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
@@ -2053,11 +2063,11 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_
newObj.set_ledgerseq(lSeq);
}
}
if (reply.objects().size() >= 512)
if (reply.objects().size() >= 256)
break;
haveLedger = wantLedger;
wantLedger = getLedgerByHash(haveLedger->getParentHash());
} while (wantLedger);
} while (wantLedger && (upTime() <= (uUptime + 1)));
cLog(lsINFO) << "Built fetch pack with " << reply.objects().size() << " nodes";
PackedMessage::pointer msg = boost::make_shared<PackedMessage>(reply, ripple::mtGET_OBJECTS);

View File

@@ -98,8 +98,9 @@ public:
{ // how we process transactions or account balance requests
omDISCONNECTED = 0, // not ready to process requests
omCONNECTED = 1, // convinced we are talking to the network
omTRACKING = 2, // convinced we agree with the network
omFULL = 3 // we have the ledger and can even validate
omSYNCING = 2, // fallen slightly behind
omTRACKING = 3, // convinced we agree with the network
omFULL = 4 // we have the ledger and can even validate
};
typedef boost::unordered_map<uint64, InfoSub::wptr> subMapType;
@@ -283,7 +284,7 @@ public:
void mapComplete(const uint256& hash, SHAMap::ref map);
bool stillNeedTXSet(const uint256& hash);
void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer wantLedger, Ledger::pointer haveLedger);
Ledger::pointer wantLedger, Ledger::pointer haveLedger, uint32 uUptime);
bool shouldFetchPack(uint32 seq);
void gotFetchPack(bool progress, uint32 seq);
void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector<unsigned char> >& data);

View File

@@ -15,9 +15,9 @@ public:
AccountItem::pointer makeItem(const uint160&, SerializedLedgerEntry::ref ledgerEntry);
LedgerEntryType getType(){ return(ltOFFER); }
STAmount getTakerPays(){ return(mTakerPays); }
STAmount getTakerGets(){ return(mTakerGets); }
RippleAddress getAccount(){ return(mAccount); }
const STAmount& getTakerPays(){ return(mTakerPays); }
const STAmount& getTakerGets(){ return(mTakerGets); }
const RippleAddress& getAccount(){ return(mAccount); }
int getSeq(){ return(mSeq); }
Json::Value getJson(int);

View File

@@ -438,7 +438,7 @@ void Peer::processReadBuffer()
LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtPEER, "Peer::read"));
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
ScopedLock sl(theApp->getMasterLock());
// If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent.
if (mHelloed == (type == ripple::mtHELLO))
@@ -512,7 +512,7 @@ void Peer::processReadBuffer()
ripple::TMGetPeers msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvGetPeers(msg);
recvGetPeers(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -568,7 +568,7 @@ void Peer::processReadBuffer()
event->reName("Peer::transaction");
ripple::TMTransaction msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvTransaction(msg);
recvTransaction(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -601,7 +601,7 @@ void Peer::processReadBuffer()
event->reName("Peer::getledger");
ripple::TMGetLedger msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvGetLedger(msg);
recvGetLedger(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -612,7 +612,7 @@ void Peer::processReadBuffer()
event->reName("Peer::ledgerdata");
boost::shared_ptr<ripple::TMLedgerData> msg = boost::make_shared<ripple::TMLedgerData>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvLedger(msg);
recvLedger(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -634,7 +634,7 @@ void Peer::processReadBuffer()
event->reName("Peer::validation");
boost::shared_ptr<ripple::TMValidation> msg = boost::make_shared<ripple::TMValidation>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvValidation(msg);
recvValidation(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -864,16 +864,15 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx
#endif
}
void Peer::recvTransaction(ripple::TMTransaction& packet)
void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
Transaction::pointer tx;
#ifndef TRUST_NETWORK
try
{
#endif
std::string rawTx = packet.rawtransaction();
Serializer s(rawTx);
Serializer s(packet.rawtransaction());
SerializerIterator sit(s);
SerializedTransaction::pointer stx = boost::make_shared<SerializedTransaction>(boost::ref(sit));
@@ -1085,8 +1084,9 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig
#endif
}
void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
if (packet->validation().size() < 50)
{
cLog(lsWARNING) << "Too small validation from peer";
@@ -1138,8 +1138,9 @@ void Peer::recvGetContacts(ripple::TMGetContacts& packet)
// Return a list of your favorite people
// TODO: filter out all the LAN peers
// TODO: filter out the peer you are talking to
void Peer::recvGetPeers(ripple::TMGetPeers& packet)
void Peer::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
std::vector<std::string> addrs;
theApp->getConnectionPool().getTopNAddrs(30, addrs);
@@ -1417,7 +1418,7 @@ void Peer::recvStatus(ripple::TMStatusChange& packet)
mMaxLedger = packet.lastseq();
}
void Peer::recvGetLedger(ripple::TMGetLedger& packet)
void Peer::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder)
{
SHAMap::pointer map;
ripple::TMLedgerData reply;
@@ -1545,6 +1546,13 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
return;
}
if (ledger->isImmutable())
MasterLockHolder.unlock();
else
{
cLog(lsWARNING) << "Request for data from mutable ledger";
}
// Fill out the reply
uint256 lHash = ledger->getHash();
reply.set_ledgerhash(lHash.begin(), lHash.size());
@@ -1608,7 +1616,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
SHAMapNode mn(packet.nodeids(i).data(), packet.nodeids(i).size());
if(!mn.isValid())
{
cLog(lsWARNING) << "Request for invalid node";
cLog(lsWARNING) << "Request for invalid node: " << logMe;
punishPeer(LT_InvalidRequest);
return;
}
@@ -1657,8 +1665,9 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
sendPacket(oPacket, true);
}
void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr)
void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
ripple::TMLedgerData& packet = *packet_ptr;
if (packet.nodes().size() <= 0)
{
@@ -1911,7 +1920,7 @@ void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packe
}
theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack",
BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1,
boost::weak_ptr<Peer>(shared_from_this()), packet, wantLedger, haveLedger));
boost::weak_ptr<Peer>(shared_from_this()), packet, wantLedger, haveLedger, upTime()));
}
bool Peer::hasProto(int version)

View File

@@ -87,12 +87,12 @@ protected:
void sendHello();
void recvHello(ripple::TMHello& packet);
void recvTransaction(ripple::TMTransaction& packet);
void recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet);
void recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder);
void recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder);
void recvGetValidation(ripple::TMGetValidations& packet);
void recvContact(ripple::TMContact& packet);
void recvGetContacts(ripple::TMGetContacts& packet);
void recvGetPeers(ripple::TMGetPeers& packet);
void recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder);
void recvPeers(ripple::TMPeers& packet);
void recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
void recvPing(ripple::TMPing& packet);
@@ -100,8 +100,8 @@ protected:
void recvSearchTransaction(ripple::TMSearchTransaction& packet);
void recvGetAccount(ripple::TMGetAccount& packet);
void recvAccount(ripple::TMAccount& packet);
void recvGetLedger(ripple::TMGetLedger& packet);
void recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet);
void recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder);
void recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet, ScopedLock& MasterLockHolder);
void recvStatus(ripple::TMStatusChange& packet);
void recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet);
void recvHaveTxSet(ripple::TMHaveTransactionSet& packet);

View File

@@ -1071,9 +1071,6 @@ Json::Value RPCHandler::doAccountLines(Json::Value jvRequest, int& cost, ScopedL
{
jvResult["account"] = raAccount.humanAccountID();
// XXX This is wrong, we do access the current ledger and do need to worry about changes.
// We access a committed ledger and need not worry about changes.
AccountItems rippleLines(raAccount.getAccountID(), lpLedger, AccountItem::pointer(new RippleState()));
Json::Value& jsonLines = (jvResult["lines"] = Json::arrayValue);
@@ -1083,9 +1080,9 @@ Json::Value RPCHandler::doAccountLines(Json::Value jvRequest, int& cost, ScopedL
if (!raPeer.isValid() || raPeer.getAccountID() == line->getAccountIDPeer())
{
STAmount saBalance = line->getBalance();
STAmount saLimit = line->getLimit();
STAmount saLimitPeer = line->getLimitPeer();
const STAmount& saBalance = line->getBalance();
const STAmount& saLimit = line->getLimit();
const STAmount& saLimitPeer = line->getLimitPeer();
Json::Value& jPeer = jsonLines.append(Json::objectValue);
@@ -1148,9 +1145,6 @@ Json::Value RPCHandler::doAccountOffers(Json::Value jvRequest, int& cost, Scoped
AccountState::pointer as = mNetOps->getAccountState(lpLedger, raAccount);
if (lpLedger->isImmutable())
MasterLockHolder.unlock();
if (as)
{
Json::Value& jsonLines = (jvResult["offers"] = Json::arrayValue);
@@ -1160,15 +1154,10 @@ Json::Value RPCHandler::doAccountOffers(Json::Value jvRequest, int& cost, Scoped
{
Offer* offer=(Offer*)item.get();
STAmount takerPays = offer->getTakerPays();
STAmount takerGets = offer->getTakerGets();
//RippleAddress account = offer->getAccount();
Json::Value& obj = jsonLines.append(Json::objectValue);
//obj["account"] = account.humanAccountID();
takerPays.setJson(obj["taker_pays"]);
takerGets.setJson(obj["taker_gets"]);
offer->getTakerPays().setJson(obj["taker_pays"]);
offer->getTakerGets().setJson(obj["taker_gets"]);
obj["seq"] = offer->getSeq();
}
@@ -2315,6 +2304,11 @@ Json::Value RPCHandler::doGetCounts(Json::Value jvRequest, int& cost, ScopedLock
ret["write_load"] = theApp->getHashedObjectStore().getWriteLoad();
ret["SLE_hit_rate"] = theApp->getSLECache().getHitRate();
ret["node_hit_rate"] = theApp->getHashedObjectStore().getCacheHitRate();
ret["ledger_hit_rate"] = theApp->getLedgerMaster().getCacheHitRate();
ret["AL_hit_rate"] = AcceptedLedger::getCacheHitRate();
std::string uptime;
int s = upTime();
textTime(uptime, s, "year", 365*24*60*60);
@@ -2618,16 +2612,19 @@ Json::Value RPCHandler::lookupLedger(Json::Value jvRequest, Ledger::pointer& lpL
case LEDGER_CURRENT:
lpLedger = mNetOps->getCurrentSnapshot();
iLedgerIndex = lpLedger->getLedgerSeq();
assert(lpLedger->isImmutable() && !lpLedger->isClosed());
break;
case LEDGER_CLOSED:
lpLedger = theApp->getLedgerMaster().getClosedLedger();
iLedgerIndex = lpLedger->getLedgerSeq();
assert(lpLedger->isImmutable() && lpLedger->isClosed());
break;
case LEDGER_VALIDATED:
lpLedger = mNetOps->getValidatedLedger();
iLedgerIndex = lpLedger->getLedgerSeq();
assert(lpLedger->isImmutable() && lpLedger->isClosed());
break;
}
@@ -3548,9 +3545,7 @@ Json::Value RPCHandler::doCommand(const Json::Value& jvRequest, int iRole, int &
ScopedLock MasterLockHolder(theApp->getMasterLock());
if (commandsA[i].iOptions & optNetwork
&& mNetOps->getOperatingMode() != NetworkOPs::omTRACKING
&& mNetOps->getOperatingMode() != NetworkOPs::omFULL)
if ((commandsA[i].iOptions & optNetwork) && (mNetOps->getOperatingMode() < NetworkOPs::omSYNCING))
{
cLog(lsINFO) << "Insufficient network mode for RPC: " << mNetOps->strOperatingMode();

View File

@@ -35,11 +35,6 @@ RippleAddress::RippleAddress() : mIsValid(false)
nVersion = VER_NONE;
}
bool RippleAddress::isValid() const
{
return mIsValid;
}
void RippleAddress::clear()
{
nVersion = VER_NONE;

View File

@@ -28,7 +28,7 @@ public:
RippleAddress();
// For public and private key, checks if they are legal.
bool isValid() const;
bool isValid() const { return mIsValid; }
void clear();
bool isSet() const;

View File

@@ -377,7 +377,8 @@ protected:
SHAMapItem::pointer onlyBelow(SHAMapTreeNode*);
void eraseChildren(SHAMapTreeNode::pointer);
void dropBelow(SHAMapTreeNode*);
bool hasNode(const SHAMapNode& id, const uint256& hash);
bool hasInnerNode(const SHAMapNode& nodeID, const uint256& hash);
bool hasLeafNode(const uint256& tag, const uint256& hash);
bool walkBranch(SHAMapTreeNode* node, SHAMapItem::ref otherMapItem, bool isFirstMap,
SHAMapDiff& differences, int& maxCount);

View File

@@ -410,7 +410,7 @@ bool SHAMap::deepCompare(SHAMap& other)
return true;
}
bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash)
bool SHAMap::hasInnerNode(const SHAMapNode& nodeID, const uint256& nodeHash)
{
SHAMapTreeNode* node = root.get();
while (node->isInner() && (node->getDepth() < nodeID.getDepth()))
@@ -423,6 +423,19 @@ bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash)
return node->getNodeHash() == nodeHash;
}
bool SHAMap::hasLeafNode(const uint256& tag, const uint256& nodeHash)
{
SHAMapTreeNode* node = root.get();
while (node->isInner())
{
int branch = node->selectBranch(tag);
if (node->isEmptyBranch(branch))
return false;
node = getNodePointer(node->getChildNodeID(branch), node->getChildHash(branch));
}
return node->getNodeHash() == nodeHash;
}
std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* have, bool includeLeaves, int max)
{
std::list<fetchPackEntry_t> ret;
@@ -446,7 +459,7 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* have, bool incl
if (root->isLeaf())
{
if (includeLeaves && !root->getNodeHash().isZero() &&
(!have || !have->hasNode(*root, root->getNodeHash())))
(!have || !have->hasLeafNode(root->getTag(), root->getNodeHash())))
{
Serializer s;
root->addRaw(s, snfPREFIX);
@@ -486,10 +499,10 @@ std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* have, bool incl
SHAMapTreeNode *next = getNodePointer(childID, childHash);
if (next->isInner())
{
if (!have || !have->hasNode(*next, childHash))
if (!have || !have->hasInnerNode(*next, childHash))
stack.push(next);
}
else if (includeLeaves && (!have || !have->hasNode(childID, childHash)))
else if (includeLeaves && (!have || !have->hasLeafNode(next->getTag(), childHash)))
{
Serializer s;
node->addRaw(s, snfPREFIX);

View File

@@ -1,5 +1,5 @@
#ifndef __SHAMAPYSNC__
#define __SHAMAPSYNC__
#ifndef SHAMAPSYNC_H
#define SHAMAPSYNC_H
#include "SHAMap.h"
#include "Application.h"

View File

@@ -639,38 +639,40 @@ std::vector<unsigned char> STObject::getFieldVL(SField::ref field) const
return cf->getValue();
}
static const STAmount defaultAmount;
const STAmount& STObject::getFieldAmount(SField::ref field) const
{
static STAmount empty;
const SerializedType* rf = peekAtPField(field);
if (!rf)
throw std::runtime_error("Field not found");
SerializedTypeID id = rf->getSType();
if (id == STI_NOTPRESENT)
return defaultAmount; // optional field not present
return empty; // optional field not present
const STAmount *cf = dynamic_cast<const STAmount *>(rf);
if (!cf)
throw std::runtime_error("Wrong field type");
return *cf;
}
STPathSet STObject::getFieldPathSet(SField::ref field) const
const STPathSet& STObject::getFieldPathSet(SField::ref field) const
{
static STPathSet empty;
const SerializedType* rf = peekAtPField(field);
if (!rf) throw std::runtime_error("Field not found");
SerializedTypeID id = rf->getSType();
if (id == STI_NOTPRESENT) return STPathSet(); // optional field not present
if (id == STI_NOTPRESENT) return empty; // optional field not present
const STPathSet *cf = dynamic_cast<const STPathSet *>(rf);
if (!cf) throw std::runtime_error("Wrong field type");
return *cf;
}
STVector256 STObject::getFieldV256(SField::ref field) const
const STVector256& STObject::getFieldV256(SField::ref field) const
{
static STVector256 empty;
const SerializedType* rf = peekAtPField(field);
if (!rf) throw std::runtime_error("Field not found");
SerializedTypeID id = rf->getSType();
if (id == STI_NOTPRESENT) return STVector256(); // optional field not present
if (id == STI_NOTPRESENT) return empty; // optional field not present
const STVector256 *cf = dynamic_cast<const STVector256 *>(rf);
if (!cf) throw std::runtime_error("Wrong field type");
return *cf;

View File

@@ -131,8 +131,8 @@ public:
uint160 getFieldAccount160(SField::ref field) const;
std::vector<unsigned char> getFieldVL(SField::ref field) const;
const STAmount& getFieldAmount(SField::ref field) const;
STPathSet getFieldPathSet(SField::ref field) const;
STVector256 getFieldV256(SField::ref field) const;
const STPathSet& getFieldPathSet(SField::ref field) const;
const STVector256& getFieldV256(SField::ref field) const;
void setFieldU8(SField::ref field, unsigned char);
void setFieldU16(SField::ref field, uint16);

View File

@@ -63,16 +63,21 @@ protected:
cache_type mCache; // Hold strong reference to recent objects
int mLastSweep;
uint64 mHits, mMisses;
public:
TaggedCache(const char *name, int size, int age)
: mName(name), mTargetSize(size), mTargetAge(age), mCacheCount(0), mLastSweep(upTime()) { ; }
: mName(name), mTargetSize(size), mTargetAge(age), mCacheCount(0), mLastSweep(upTime()),
mHits(0), mMisses(0)
{ ; }
int getTargetSize() const;
int getTargetAge() const;
int getCacheSize();
int getTrackSize();
int getSweepAge();
float getHitRate();
void clearStats();
void setTargetSize(int size);
void setTargetAge(int age);
@@ -129,6 +134,19 @@ template<typename c_Key, typename c_Data> int TaggedCache<c_Key, c_Data>::getTra
return mCache.size();
}
template<typename c_Key, typename c_Data> float TaggedCache<c_Key, c_Data>::getHitRate()
{
boost::recursive_mutex::scoped_lock sl(mLock);
return (static_cast<float>(mHits) * 100) / (1.0f + mHits + mMisses);
}
template<typename c_Key, typename c_Data> void TaggedCache<c_Key, c_Data>::clearStats()
{
boost::recursive_mutex::scoped_lock sl(mLock);
mHits = 0;
mMisses = 0;
}
template<typename c_Key, typename c_Data> void TaggedCache<c_Key, c_Data>::clear()
{
boost::recursive_mutex::scoped_lock sl(mLock);
@@ -302,20 +320,27 @@ boost::shared_ptr<c_Data> TaggedCache<c_Key, c_Data>::fetch(const key_type& key)
cache_iterator cit = mCache.find(key);
if (cit == mCache.end())
{
++mMisses;
return data_ptr();
}
cache_entry& entry = cit->second;
entry.touch();
if (entry.isCached())
{
++mHits;
return entry.ptr;
}
entry.ptr = entry.lock();
if (entry.isCached())
{
{ // independent of cache size, so not counted as a hit
++mCacheCount;
return entry.ptr;
}
mCache.erase(cit);
++mMisses;
return data_ptr();
}

View File

@@ -70,7 +70,7 @@ void printHelp(const po::options_description& desc)
cerr << desc << endl;
cerr << "Commands: " << endl;
cerr << " account_info <account>|<nickname>|<seed>|<pass_phrase>|<key> [<ledger>]" << endl;
cerr << " account_info <account>|<nickname>|<seed>|<pass_phrase>|<key> [<ledger>] [strict]" << endl;
cerr << " account_lines <account> <account>|\"\" [<ledger>]" << endl;
cerr << " account_offers <account>|<nickname>|<account_public_key> [<ledger>]" << endl;
cerr << " account_tx accountID [ledger_min [ledger_max [limit [offset]]]] [binary] [count] [descending]" << endl;