Merge branch 'develop' of github.com:jedmccaleb/NewCoin into develop

This commit is contained in:
Arthur Britto
2013-04-17 02:19:25 -07:00
6 changed files with 115 additions and 49 deletions

View File

@@ -223,10 +223,10 @@ bool SqliteDatabase::setupCheckpointing(JobQueue *q)
void SqliteDatabase::doHook(const char *db, int pages)
{
if (pages < 512)
if (pages < 1024)
return;
boost::mutex::scoped_lock sl(walMutex);
if (walDBs.insert(db).second && !walRunning)
if (!walRunning)
{
walRunning = true;
if (mWalQ)
@@ -238,36 +238,20 @@ void SqliteDatabase::doHook(const char *db, int pages)
void SqliteDatabase::runWal()
{
std::set<std::string> walSet;
std::string name = sqlite3_db_filename(mConnection, "main");
int pass = 1;
while (1)
int log = 0, ckpt = 0;
int ret = sqlite3_wal_checkpoint_v2(mConnection, NULL, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
if (ret != SQLITE_OK)
{
{
boost::mutex::scoped_lock sl(walMutex);
if (walDBs.empty())
{
walRunning = false;
return;
}
walDBs.swap(walSet);
}
cLog((ret == SQLITE_LOCKED) ? lsTRACE : lsWARNING) << "WAL("
<< sqlite3_db_filename(mConnection, "main") << "): error " << ret;
}
else
cLog(lsTRACE) << "WAL(" << sqlite3_db_filename(mConnection, "main") <<
"): frames=" << log << ", written=" << ckpt;
BOOST_FOREACH(const std::string& db, walSet)
{
int log = 0, ckpt = 0;
int ret = sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
if (ret != SQLITE_OK)
{
cLog((ret == SQLITE_LOCKED) ? lsTRACE : lsWARNING) << "WAL " << name << ":"
<< db << " error " << ret;
}
else
cLog(lsTRACE) << "WAL(" << name << "): pass=" << pass << ", frames=" << log << ", written=" << ckpt;
}
walSet.clear();
++pass;
{
boost::mutex::scoped_lock sl(walMutex);
walRunning = false;
}
}

View File

@@ -17,7 +17,6 @@ class SqliteDatabase : public Database
boost::mutex walMutex;
JobQueue* mWalQ;
std::set<std::string> walDBs;
bool walRunning;
public:

View File

@@ -920,7 +920,7 @@ uint256 LedgerEntrySet::getNextLedgerIndex(const uint256& uHash)
// node found in LES, node found in ledger, return earliest
if (it->second.mAction != taaDELETE)
return (ledgerNext < it->first) ? ledgerNext : it->first;
return (!ledgerNext.isZero() && (ledgerNext < it->first)) ? ledgerNext : it->first;
}

View File

@@ -885,10 +885,6 @@ TER RippleCalc::calcNodeAdvance(
if (bDirectAdvance)
{ // Get next quality.
// FIXME: This looks at the original ledger and doesn't take into account any changes
// in the LedgerEntrySet. If this code, for example, created offers, this would
// not return the pages they're in.
uDirectTip = lesActive.getNextLedgerIndex(uDirectTip, uDirectEnd);
bDirectDirDirty = true;

View File

@@ -51,6 +51,11 @@ protected:
boost::asio::deadline_timer mPingTimer;
bool mPinged;
boost::recursive_mutex mRcvQueueLock;
std::queue<message_ptr> mRcvQueue;
bool mRcvQueueRunning;
bool mDead;
public:
// WSConnection()
// : mHandler((WSServerHandler<websocketpp::WSDOOR_SERVER>*)(NULL)),
@@ -59,7 +64,8 @@ public:
WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection)
: mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()),
mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()),
mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false)
mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false),
mRcvQueueRunning(false), mDead(false)
{
cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP;
setPingTimer();
@@ -69,6 +75,9 @@ public:
{ // sever connection
mPingTimer.cancel();
mConnection.reset();
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
mDead = true;
}
virtual ~WSConnection() { ; }
@@ -194,6 +203,47 @@ public:
&WSConnection<endpoint_type>::pingTimer, mConnection, mHandler, boost::asio::placeholders::error));
}
void rcvMessage(message_ptr msg, bool& msgRejected, bool& runQueue)
{
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
if (mDead)
{
msgRejected = false;
runQueue = false;
return;
}
if (mDead || (mRcvQueue.size() >= 1000))
{
msgRejected = !mDead;
runQueue = false;
}
else
{
msgRejected = false;
mRcvQueue.push(msg);
if (mRcvQueueRunning)
runQueue = false;
else
{
runQueue = true;
mRcvQueueRunning = true;
}
}
}
message_ptr getMessage()
{
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
if (mDead || mRcvQueue.empty())
{
mRcvQueueRunning = false;
return message_ptr();
}
message_ptr m = mRcvQueue.front();
mRcvQueue.pop();
return m;
}
};
// vim:ts=4

View File

@@ -158,11 +158,57 @@ public:
void on_message(connection_ptr cpClient, message_ptr mpMessage)
{
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command",
BIND_TYPE(&WSServerHandler<endpoint_type>::do_message, this, P_1, cpClient, mpMessage));
wsc_ptr ptr;
{
boost::mutex::scoped_lock sl(mMapLock);
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
ptr = it->second;
}
bool bRejected, bRunQ;
ptr->rcvMessage(mpMessage, bRejected, bRunQ);
if (bRejected)
{
try
{
cLog(lsDEBUG) << "Ws:: Rejected("
<< cpClient->get_socket().lowest_layer().remote_endpoint().address().to_string()
<< ") '" << mpMessage->get_payload() << "'";
}
catch (...)
{
}
}
if (bRunQ)
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command",
BIND_TYPE(&WSServerHandler<endpoint_type>::do_messages, this, P_1, cpClient));
}
void do_message(Job& job, connection_ptr cpClient, message_ptr mpMessage)
void do_messages(Job& job, connection_ptr cpClient)
{
wsc_ptr ptr;
{
boost::mutex::scoped_lock sl(mMapLock);
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
ptr = it->second;
}
for (int i = 0; i < 10; ++i)
{
message_ptr msg = ptr->getMessage();
if (!msg)
return;
do_message(job, cpClient, ptr, msg);
}
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::more",
BIND_TYPE(&WSServerHandler<endpoint_type>::do_messages, this, P_1, cpClient));
}
void do_message(Job& job, const connection_ptr& cpClient, const wsc_ptr& conn, const message_ptr& mpMessage)
{
Json::Value jvRequest;
Json::Reader jrReader;
@@ -200,15 +246,6 @@ public:
{
if (jvRequest.isMember("command"))
job.rename(std::string("WSClient::") + jvRequest["command"].asString());
boost::shared_ptr< WSConnection<endpoint_type> > conn;
{
boost::mutex::scoped_lock sl(mMapLock);
typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr;
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
conn = it->second;
}
send(cpClient, conn->invokeCommand(jvRequest), false);
}
}