Start adding support for concurrent I/O.

This commit is contained in:
JoelKatz
2012-11-09 14:14:47 -08:00
parent 82d26ea756
commit 40527cca2b
5 changed files with 15 additions and 2 deletions

View File

@@ -43,6 +43,8 @@ class Application
boost::asio::io_service mIOService, mAuxService; boost::asio::io_service mIOService, mAuxService;
boost::asio::io_service::work mIOWork, mAuxWork; boost::asio::io_service::work mIOWork, mAuxWork;
boost::recursive_mutex mMasterLock;
Wallet mWallet; Wallet mWallet;
UniqueNodeList mUNL; UniqueNodeList mUNL;
LedgerMaster mMasterLedger; LedgerMaster mMasterLedger;
@@ -99,6 +101,7 @@ public:
JobQueue& getJobQueue() { return mJobQueue; } JobQueue& getJobQueue() { return mJobQueue; }
SuppressionTable& getSuppression() { return mSuppressions; } SuppressionTable& getSuppression() { return mSuppressions; }
RPCHandler& getRPCHandler() { return mRPCHandler; } RPCHandler& getRPCHandler() { return mRPCHandler; }
boost::recursive_mutex& getMasterLock() { return mMasterLock; }
bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); }

View File

@@ -1141,6 +1141,7 @@ uint32 LedgerConsensus::roundCloseTime(uint32 closeTime)
void LedgerConsensus::accept(SHAMap::ref set) void LedgerConsensus::accept(SHAMap::ref set)
{ {
boost::recursive_mutex::scoped_lock masterLock(theApp->getMasterLock());
assert(set->getHash() == mOurPosition->getCurrentHash()); assert(set->getHash() == mOurPosition->getCurrentHash());
uint32 closeTime = roundCloseTime(mOurPosition->getCloseTime()); uint32 closeTime = roundCloseTime(mOurPosition->getCloseTime());

View File

@@ -161,7 +161,6 @@ Transaction::pointer NetworkOPs::submitTransactionSync(const Transaction::pointe
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback) Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback)
{ {
Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true);
int newFlags = theApp->getSuppression().getFlags(trans->getID()); int newFlags = theApp->getSuppression().getFlags(trans->getID());
if ((newFlags & SF_BAD) != 0) if ((newFlags & SF_BAD) != 0)
@@ -182,6 +181,8 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
theApp->isNewFlag(trans->getID(), SF_SIGGOOD); theApp->isNewFlag(trans->getID(), SF_SIGGOOD);
} }
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true);
TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER | tapNO_CHECK_SIGN); TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER | tapNO_CHECK_SIGN);
trans->setResult(r); trans->setResult(r);
@@ -762,6 +763,8 @@ uint256 NetworkOPs::getConsensusLCL()
void NetworkOPs::processTrustedProposal(LedgerProposal::pointer proposal, void NetworkOPs::processTrustedProposal(LedgerProposal::pointer proposal,
boost::shared_ptr<ripple::TMProposeSet> set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood) boost::shared_ptr<ripple::TMProposeSet> set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood)
{ {
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
bool relay = true; bool relay = true;
if (!haveConsensusObject()) if (!haveConsensusObject())

View File

@@ -41,6 +41,8 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra
// std::cerr << "Peer::handle_write bytes: "<< bytes_transferred << std::endl; // std::cerr << "Peer::handle_write bytes: "<< bytes_transferred << std::endl;
#endif #endif
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
mSendingPacket.reset(); mSendingPacket.reset();
if (mDetaching) if (mDetaching)
@@ -357,6 +359,7 @@ void Peer::handle_read_body(const boost::system::error_code& error)
else else
{ {
cLog(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; cLog(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
detach("hrb"); detach("hrb");
} }
} }
@@ -371,6 +374,7 @@ void Peer::processReadBuffer()
// std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl; // std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl;
// If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent. // If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent.
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
if (mHelloed == (type == ripple::mtHELLO)) if (mHelloed == (type == ripple::mtHELLO))
{ {
cLog(lsWARNING) << "Wrong message type: " << type; cLog(lsWARNING) << "Wrong message type: " << type;

View File

@@ -1384,7 +1384,9 @@ Json::Value RPCHandler::doCommand(const std::string& command, Json::Value& param
return rpcError(rpcNO_NETWORK); return rpcError(rpcNO_NETWORK);
} }
// XXX Should verify we have a current ledger. // XXX Should verify we have a current ledger.
else if ((commandsA[i].iOptions & optCurrent) && false)
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
if ((commandsA[i].iOptions & optCurrent) && false)
{ {
return rpcError(rpcNO_CURRENT); return rpcError(rpcNO_CURRENT);
} }