Merge branch 'syncing'

This commit is contained in:
JoelKatz
2013-05-24 09:41:37 -07:00
10 changed files with 72 additions and 35 deletions

View File

@@ -86,14 +86,14 @@ bool HashedObjectStore::storeLevelDB(HashedObjectType type, uint32 index,
{
mWritePending = true;
theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store",
BIND_TYPE(&HashedObjectStore::bulkWriteLevelDB, this));
BIND_TYPE(&HashedObjectStore::bulkWriteLevelDB, this, P_1));
}
}
mNegativeCache.del(hash);
return true;
}
void HashedObjectStore::bulkWriteLevelDB()
void HashedObjectStore::bulkWriteLevelDB(Job &)
{
assert(mLevelDB);
int setSize = 0;
@@ -205,7 +205,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index,
{
mWritePending = true;
theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store",
BIND_TYPE(&HashedObjectStore::bulkWriteSQLite, this));
BIND_TYPE(&HashedObjectStore::bulkWriteSQLite, this, P_1));
}
}
// else
@@ -214,7 +214,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index,
return true;
}
void HashedObjectStore::bulkWriteSQLite()
void HashedObjectStore::bulkWriteSQLite(Job&)
{
assert(!mLevelDB);
while (1)

View File

@@ -15,6 +15,8 @@
DEFINE_INSTANCE(HashedObject);
class Job;
enum HashedObjectType
{
hotUNKNOWN = 0,
@@ -91,13 +93,13 @@ public:
bool storeSQLite(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash);
HashedObject::pointer retrieveSQLite(const uint256& hash);
void bulkWriteSQLite();
void bulkWriteSQLite(Job&);
#ifdef USE_LEVELDB
bool storeLevelDB(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash);
HashedObject::pointer retrieveLevelDB(const uint256& hash);
void bulkWriteLevelDB();
void bulkWriteLevelDB(Job&);
#endif

View File

@@ -26,6 +26,22 @@ Ledger::ref LedgerMaster::getCurrentSnapshot()
return mCurrentSnapshot;
}
int LedgerMaster::getValidatedLedgerAge()
{
if (!mValidLedger)
{
cLog(lsDEBUG) << "No validated ledger";
return 999999;
}
int64 ret = theApp->getOPs().getCloseTimeNC();
ret -= static_cast<int64>(mValidLedger->getCloseTimeNC());
ret = std::max(0LL, ret);
cLog(lsTRACE) << "Validated ledger age is " << ret;
return static_cast<int>(ret);
}
void LedgerMaster::addHeldTransaction(Transaction::ref transaction)
{ // returns true if transaction was added
boost::recursive_mutex::scoped_lock ml(mLock);

View File

@@ -83,6 +83,7 @@ public:
// The published ledger is the last fully validated ledger
Ledger::ref getValidatedLedger() { return mPubLedger; }
int getValidatedLedgerAge();
TER doTransaction(SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply);

View File

@@ -49,6 +49,7 @@ std::string NetworkOPs::strOperatingMode()
static const char* paStatusToken[] = {
"disconnected",
"connected",
"syncing",
"tracking",
"full"
};
@@ -615,6 +616,12 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
cLog(lsINFO) << "Node count (" << peerList.size() << ") is sufficient.";
}
// Check if the last validated ledger forces a change between these states
if (mMode == omSYNCING)
setMode(omSYNCING);
else if (mMode == omCONNECTED)
setMode(omCONNECTED);
if (!mConsensus)
tryStartConsensus();
@@ -638,14 +645,14 @@ void NetworkOPs::tryStartConsensus()
// there shouldn't be a newer LCL. We need this information to do the next three
// tests.
if ((mMode == omCONNECTED) && !ledgerChange)
if (((mMode == omCONNECTED) || (mMode == omSYNCING)) && !ledgerChange)
{ // count number of peers that agree with us and UNL nodes whose validations we have for LCL
// if the ledger is good enough, go to omTRACKING - TODO
if (!mNeedNetworkLedger)
setMode(omTRACKING);
}
if ((mMode == omTRACKING) && !ledgerChange )
if (((mMode == omCONNECTED) || (mMode == omTRACKING)) && !ledgerChange)
{
// check if the ledger is good enough to go to omFULL
// Note: Do not go to omFULL if we don't have the previous ledger
@@ -654,12 +661,6 @@ void NetworkOPs::tryStartConsensus()
setMode(omFULL);
}
if (mMode == omFULL)
{
// WRITEME
// check if the ledger is bad enough to go to omTRACKING
}
if ((!mConsensus) && (mMode != omDISCONNECTED))
beginConsensus(networkClosed, mLedgerMaster->getCurrentLedger());
}
@@ -1036,7 +1037,21 @@ void NetworkOPs::pubServer()
void NetworkOPs::setMode(OperatingMode om)
{
if (mMode == om) return;
if (om == omCONNECTED)
{
if (theApp->getLedgerMaster().getValidatedLedgerAge() < 60)
om = omSYNCING;
}
if (om == omSYNCING)
{
if (theApp->getLedgerMaster().getValidatedLedgerAge() >= 60)
om = omCONNECTED;
}
if (mMode == om)
return;
if ((om >= omCONNECTED) && (mMode == omDISCONNECTED))
mConnectTime = boost::posix_time::second_clock::universal_time();
@@ -1398,7 +1413,7 @@ void NetworkOPs::pubLedger(Ledger::ref accepted)
jvObj["txn_count"] = Json::UInt(alpAccepted->getTxnCount());
if ((mMode == omFULL) || (mMode == omTRACKING))
if (mMode >= omSYNCING)
jvObj["validated_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers();
NetworkOPs::subMapType::const_iterator it = mSubLedger.begin();
@@ -1731,7 +1746,7 @@ bool NetworkOPs::subLedger(InfoSub::ref isrListener, Json::Value& jvResult)
jvResult["reserve_inc"] = Json::UInt(lpClosed->getReserveInc());
}
if (((mMode == omFULL) || (mMode == omTRACKING)) && !isNeedNetworkLedger())
if ((mMode >= omSYNCING) && !isNeedNetworkLedger())
jvResult["validated_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers();
boost::recursive_mutex::scoped_lock sl(mMonitorLock);

View File

@@ -98,8 +98,9 @@ public:
{ // how we process transactions or account balance requests
omDISCONNECTED = 0, // not ready to process requests
omCONNECTED = 1, // convinced we are talking to the network
omTRACKING = 2, // convinced we agree with the network
omFULL = 3 // we have the ledger and can even validate
omSYNCING = 2, // fallen slightly behind
omTRACKING = 3, // convinced we agree with the network
omFULL = 4 // we have the ledger and can even validate
};
typedef boost::unordered_map<uint64, InfoSub::wptr> subMapType;

View File

@@ -512,7 +512,7 @@ void Peer::processReadBuffer()
ripple::TMGetPeers msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvGetPeers(msg);
recvGetPeers(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -568,7 +568,7 @@ void Peer::processReadBuffer()
event->reName("Peer::transaction");
ripple::TMTransaction msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvTransaction(msg);
recvTransaction(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -634,7 +634,7 @@ void Peer::processReadBuffer()
event->reName("Peer::validation");
boost::shared_ptr<ripple::TMValidation> msg = boost::make_shared<ripple::TMValidation>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvValidation(msg);
recvValidation(msg, sl);
else
cLog(lsWARNING) << "parse error: " << type;
}
@@ -864,15 +864,15 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx
#endif
}
void Peer::recvTransaction(ripple::TMTransaction& packet)
void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
Transaction::pointer tx;
#ifndef TRUST_NETWORK
try
{
#endif
std::string rawTx = packet.rawtransaction();
Serializer s(rawTx);
Serializer s(packet.rawtransaction());
SerializerIterator sit(s);
SerializedTransaction::pointer stx = boost::make_shared<SerializedTransaction>(boost::ref(sit));
@@ -1084,8 +1084,9 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig
#endif
}
void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
if (packet->validation().size() < 50)
{
cLog(lsWARNING) << "Too small validation from peer";
@@ -1137,8 +1138,9 @@ void Peer::recvGetContacts(ripple::TMGetContacts& packet)
// Return a list of your favorite people
// TODO: filter out all the LAN peers
// TODO: filter out the peer you are talking to
void Peer::recvGetPeers(ripple::TMGetPeers& packet)
void Peer::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
std::vector<std::string> addrs;
theApp->getConnectionPool().getTopNAddrs(30, addrs);

View File

@@ -87,12 +87,12 @@ protected:
void sendHello();
void recvHello(ripple::TMHello& packet);
void recvTransaction(ripple::TMTransaction& packet);
void recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet);
void recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder);
void recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder);
void recvGetValidation(ripple::TMGetValidations& packet);
void recvContact(ripple::TMContact& packet);
void recvGetContacts(ripple::TMGetContacts& packet);
void recvGetPeers(ripple::TMGetPeers& packet);
void recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder);
void recvPeers(ripple::TMPeers& packet);
void recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
void recvPing(ripple::TMPing& packet);

View File

@@ -2618,11 +2618,13 @@ Json::Value RPCHandler::lookupLedger(Json::Value jvRequest, Ledger::pointer& lpL
case LEDGER_CLOSED:
lpLedger = theApp->getLedgerMaster().getClosedLedger();
iLedgerIndex = lpLedger->getLedgerSeq();
assert(lpLedger->isImmutable() && lpLedger->isClosed());
break;
case LEDGER_VALIDATED:
lpLedger = mNetOps->getValidatedLedger();
iLedgerIndex = lpLedger->getLedgerSeq();
assert(lpLedger->isImmutable() && lpLedger->isClosed());
break;
}
@@ -3543,9 +3545,7 @@ Json::Value RPCHandler::doCommand(const Json::Value& jvRequest, int iRole, int &
ScopedLock MasterLockHolder(theApp->getMasterLock());
if (commandsA[i].iOptions & optNetwork
&& mNetOps->getOperatingMode() != NetworkOPs::omTRACKING
&& mNetOps->getOperatingMode() != NetworkOPs::omFULL)
if ((commandsA[i].iOptions & optNetwork) && (mNetOps->getOperatingMode() < NetworkOPs::omSYNCING))
{
cLog(lsINFO) << "Insufficient network mode for RPC: " << mNetOps->strOperatingMode();

View File

@@ -1,5 +1,5 @@
#ifndef __SHAMAPYSNC__
#define __SHAMAPSYNC__
#ifndef SHAMAPSYNC_H
#define SHAMAPSYNC_H
#include "SHAMap.h"
#include "Application.h"