diff --git a/src/cpp/database/SqliteDatabase.cpp b/src/cpp/database/SqliteDatabase.cpp index c9a540ff8..d8b355189 100644 --- a/src/cpp/database/SqliteDatabase.cpp +++ b/src/cpp/database/SqliteDatabase.cpp @@ -223,10 +223,10 @@ bool SqliteDatabase::setupCheckpointing(JobQueue *q) void SqliteDatabase::doHook(const char *db, int pages) { - if (pages < 512) + if (pages < 1024) return; boost::mutex::scoped_lock sl(walMutex); - if (walDBs.insert(db).second && !walRunning) + if (!walRunning) { walRunning = true; if (mWalQ) @@ -238,36 +238,20 @@ void SqliteDatabase::doHook(const char *db, int pages) void SqliteDatabase::runWal() { - std::set walSet; - std::string name = sqlite3_db_filename(mConnection, "main"); - - int pass = 1; - while (1) + int log = 0, ckpt = 0; + int ret = sqlite3_wal_checkpoint_v2(mConnection, NULL, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); + if (ret != SQLITE_OK) { - { - boost::mutex::scoped_lock sl(walMutex); - if (walDBs.empty()) - { - walRunning = false; - return; - } - walDBs.swap(walSet); - } + cLog((ret == SQLITE_LOCKED) ? lsTRACE : lsWARNING) << "WAL(" + << sqlite3_db_filename(mConnection, "main") << "): error " << ret; + } + else + cLog(lsTRACE) << "WAL(" << sqlite3_db_filename(mConnection, "main") << + "): frames=" << log << ", written=" << ckpt; - BOOST_FOREACH(const std::string& db, walSet) - { - int log = 0, ckpt = 0; - int ret = sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); - if (ret != SQLITE_OK) - { - cLog((ret == SQLITE_LOCKED) ? lsTRACE : lsWARNING) << "WAL " << name << ":" - << db << " error " << ret; - } - else - cLog(lsTRACE) << "WAL(" << name << "): pass=" << pass << ", frames=" << log << ", written=" << ckpt; - } - walSet.clear(); - ++pass; + { + boost::mutex::scoped_lock sl(walMutex); + walRunning = false; } } diff --git a/src/cpp/database/SqliteDatabase.h b/src/cpp/database/SqliteDatabase.h index 19569a69b..7e6a8c67a 100644 --- a/src/cpp/database/SqliteDatabase.h +++ b/src/cpp/database/SqliteDatabase.h @@ -17,7 +17,6 @@ class SqliteDatabase : public Database boost::mutex walMutex; JobQueue* mWalQ; - std::set walDBs; bool walRunning; public: diff --git a/src/cpp/ripple/LedgerEntrySet.cpp b/src/cpp/ripple/LedgerEntrySet.cpp index 7c24e8094..aec83902e 100644 --- a/src/cpp/ripple/LedgerEntrySet.cpp +++ b/src/cpp/ripple/LedgerEntrySet.cpp @@ -920,7 +920,7 @@ uint256 LedgerEntrySet::getNextLedgerIndex(const uint256& uHash) // node found in LES, node found in ledger, return earliest if (it->second.mAction != taaDELETE) - return (ledgerNext < it->first) ? ledgerNext : it->first; + return (!ledgerNext.isZero() && (ledgerNext < it->first)) ? ledgerNext : it->first; } diff --git a/src/cpp/ripple/RippleCalc.cpp b/src/cpp/ripple/RippleCalc.cpp index 030c9f370..15d0f0446 100644 --- a/src/cpp/ripple/RippleCalc.cpp +++ b/src/cpp/ripple/RippleCalc.cpp @@ -885,10 +885,6 @@ TER RippleCalc::calcNodeAdvance( if (bDirectAdvance) { // Get next quality. - - // FIXME: This looks at the original ledger and doesn't take into account any changes - // in the LedgerEntrySet. If this code, for example, created offers, this would - // not return the pages they're in. uDirectTip = lesActive.getNextLedgerIndex(uDirectTip, uDirectEnd); bDirectDirDirty = true; diff --git a/src/cpp/ripple/WSConnection.h b/src/cpp/ripple/WSConnection.h index 6f3aa2cd2..060e538c1 100644 --- a/src/cpp/ripple/WSConnection.h +++ b/src/cpp/ripple/WSConnection.h @@ -51,6 +51,11 @@ protected: boost::asio::deadline_timer mPingTimer; bool mPinged; + boost::recursive_mutex mRcvQueueLock; + std::queue mRcvQueue; + bool mRcvQueueRunning; + bool mDead; + public: // WSConnection() // : mHandler((WSServerHandler*)(NULL)), @@ -59,7 +64,8 @@ public: WSConnection(WSServerHandler* wshpHandler, const connection_ptr& cpConnection) : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()), mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()), - mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false) + mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false), + mRcvQueueRunning(false), mDead(false) { cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP; setPingTimer(); @@ -69,6 +75,9 @@ public: { // sever connection mPingTimer.cancel(); mConnection.reset(); + + boost::recursive_mutex::scoped_lock sl(mRcvQueueLock); + mDead = true; } virtual ~WSConnection() { ; } @@ -194,6 +203,47 @@ public: &WSConnection::pingTimer, mConnection, mHandler, boost::asio::placeholders::error)); } + void rcvMessage(message_ptr msg, bool& msgRejected, bool& runQueue) + { + boost::recursive_mutex::scoped_lock sl(mRcvQueueLock); + if (mDead) + { + msgRejected = false; + runQueue = false; + return; + } + if (mDead || (mRcvQueue.size() >= 1000)) + { + msgRejected = !mDead; + runQueue = false; + } + else + { + msgRejected = false; + mRcvQueue.push(msg); + if (mRcvQueueRunning) + runQueue = false; + else + { + runQueue = true; + mRcvQueueRunning = true; + } + } + } + + message_ptr getMessage() + { + boost::recursive_mutex::scoped_lock sl(mRcvQueueLock); + if (mDead || mRcvQueue.empty()) + { + mRcvQueueRunning = false; + return message_ptr(); + } + message_ptr m = mRcvQueue.front(); + mRcvQueue.pop(); + return m; + } + }; // vim:ts=4 diff --git a/src/cpp/ripple/WSHandler.h b/src/cpp/ripple/WSHandler.h index 1f2d2f59b..b95ed85fb 100644 --- a/src/cpp/ripple/WSHandler.h +++ b/src/cpp/ripple/WSHandler.h @@ -158,11 +158,57 @@ public: void on_message(connection_ptr cpClient, message_ptr mpMessage) { - theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", - BIND_TYPE(&WSServerHandler::do_message, this, P_1, cpClient, mpMessage)); + wsc_ptr ptr; + { + boost::mutex::scoped_lock sl(mMapLock); + typename boost::unordered_map::iterator it = mMap.find(cpClient); + if (it == mMap.end()) + return; + ptr = it->second; + } + + bool bRejected, bRunQ; + ptr->rcvMessage(mpMessage, bRejected, bRunQ); + if (bRejected) + { + try + { + cLog(lsDEBUG) << "Ws:: Rejected(" + << cpClient->get_socket().lowest_layer().remote_endpoint().address().to_string() + << ") '" << mpMessage->get_payload() << "'"; + } + catch (...) + { + } + } + if (bRunQ) + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", + BIND_TYPE(&WSServerHandler::do_messages, this, P_1, cpClient)); } - void do_message(Job& job, connection_ptr cpClient, message_ptr mpMessage) + void do_messages(Job& job, connection_ptr cpClient) + { + wsc_ptr ptr; + { + boost::mutex::scoped_lock sl(mMapLock); + typename boost::unordered_map::iterator it = mMap.find(cpClient); + if (it == mMap.end()) + return; + ptr = it->second; + } + + for (int i = 0; i < 10; ++i) + { + message_ptr msg = ptr->getMessage(); + if (!msg) + return; + do_message(job, cpClient, ptr, msg); + } + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::more", + BIND_TYPE(&WSServerHandler::do_messages, this, P_1, cpClient)); + } + + void do_message(Job& job, const connection_ptr& cpClient, const wsc_ptr& conn, const message_ptr& mpMessage) { Json::Value jvRequest; Json::Reader jrReader; @@ -200,15 +246,6 @@ public: { if (jvRequest.isMember("command")) job.rename(std::string("WSClient::") + jvRequest["command"].asString()); - boost::shared_ptr< WSConnection > conn; - { - boost::mutex::scoped_lock sl(mMapLock); - typedef boost::shared_ptr< WSConnection > wsc_ptr; - typename boost::unordered_map::iterator it = mMap.find(cpClient); - if (it == mMap.end()) - return; - conn = it->second; - } send(cpClient, conn->invokeCommand(jvRequest), false); } }