Dispatch write ahead logging through our job queue. SQL logging improvements.

This commit is contained in:
JoelKatz
2013-01-28 14:32:52 -08:00
parent e48ef29f8c
commit faa344c7d0
7 changed files with 47 additions and 27 deletions

View File

@@ -9,9 +9,14 @@
#include <boost/foreach.hpp>
#include <boost/bind.hpp>
#include "../ripple/JobQueue.h"
#include "../ripple/Log.h"
SETUP_NLOG("DataBase");
using namespace std;
SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"",""), walRunning(false)
SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"",""), mWalQ(NULL), walRunning(false)
{
mConnection = NULL;
mCurrentStmt = NULL;
@@ -22,7 +27,7 @@ void SqliteDatabase::connect()
int rc = sqlite3_open(mHost.c_str(), &mConnection);
if (rc)
{
cout << "Can't open database: " << mHost << " " << rc << endl;
cLog(lsFATAL) << "Can't open database: " << mHost << " " << rc;
sqlite3_close(mConnection);
assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED));
}
@@ -46,9 +51,9 @@ bool SqliteDatabase::executeSQL(const char* sql, bool fail_ok)
if (!fail_ok)
{
#ifdef DEBUG
cout << "SQL Perror:" << rc << endl;
cout << "Statement: " << sql << endl;
cout << "Error: " << sqlite3_errmsg(mConnection) << endl;
cLog(lsWARNING) << "SQL Perror:" << rc;
cLog(lsWARNING) << "Statement: " << sql;
cLog(lsWARNING) << "Error: " << sqlite3_errmsg(mConnection);
#endif
}
return false;
@@ -70,9 +75,9 @@ bool SqliteDatabase::executeSQL(const char* sql, bool fail_ok)
if (!fail_ok)
{
#ifdef DEBUG
cout << "SQL Serror:" << rc << endl;
cout << "Statement: " << sql << endl;
cout << "Error: " << sqlite3_errmsg(mConnection) << endl;
cLog(lsWARNING) << "SQL Serror:" << rc;
cLog(lsWARNING) << "Statement: " << sql;
cLog(lsWARNING) << "Error: " << sqlite3_errmsg(mConnection);
#endif
}
return false;
@@ -130,7 +135,7 @@ bool SqliteDatabase::getNextRow()
else
{
assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED));
cout << "SQL Rerror:" << rc << endl;
cLog(lsWARNING) << "SQL Rerror:" << rc;
return(false);
}
}
@@ -194,27 +199,32 @@ static int SqliteWALHook(void *s, sqlite3* dbCon, const char *dbName, int walSiz
return SQLITE_OK;
}
bool SqliteDatabase::setupCheckpointing()
bool SqliteDatabase::setupCheckpointing(JobQueue *q)
{
mWalQ = q;
sqlite3_wal_hook(mConnection, SqliteWALHook, this);
return true;
}
void SqliteDatabase::doHook(const char *db, int pages)
{
if (pages < 256)
if (pages < 512)
return;
boost::mutex::scoped_lock sl(walMutex);
if (walDBs.insert(db).second && !walRunning)
{
walRunning = true;
boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach();
if (mWalQ)
mWalQ->addJob(jtWAL, boost::bind(&SqliteDatabase::runWal, this));
else
boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach();
}
}
void SqliteDatabase::runWal()
{
std::set<std::string> walSet;
std::string name = sqlite3_db_filename(mConnection, "main");
while (1)
{
@@ -231,10 +241,14 @@ void SqliteDatabase::runWal()
BOOST_FOREACH(const std::string& db, walSet)
{
int log, ckpt;
sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
int ret = sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
if (ret != SQLITE_OK)
{
cLog((ret == SQLITE_LOCKED) ? lsDEBUG : lsWARNING) << "WAL "
<< sqlite3_db_filename(mConnection, "main") << " / " << db << " errror " << ret;
}
}
walSet.clear();
}
}

View File

@@ -16,6 +16,7 @@ class SqliteDatabase : public Database
bool mMoreRows;
boost::mutex walMutex;
JobQueue* mWalQ;
std::set<std::string> walDBs;
bool walRunning;
@@ -51,7 +52,7 @@ public:
uint64 getBigInt(int colIndex);
sqlite3* peekConnection() { return mConnection; }
virtual bool setupCheckpointing();
virtual bool setupCheckpointing(JobQueue*);
virtual SqliteDatabase* getSqliteDB() { return this; }
void runWal();

View File

@@ -18,6 +18,7 @@
*/
class SqliteDatabase;
class JobQueue;
class Database
{
@@ -86,8 +87,8 @@ public:
// float getSingleDBValueFloat(const char* sql);
// char* getSingleDBValueStr(const char* sql, std::string& retStr);
virtual bool setupCheckpointing() { return false; }
virtual SqliteDatabase* getSqliteDB() { return NULL; }
virtual bool setupCheckpointing(JobQueue*) { return false; }
virtual SqliteDatabase* getSqliteDB() { return NULL; }
};
#endif

View File

@@ -62,10 +62,10 @@ void Application::stop()
{
cLog(lsINFO) << "Received shutdown request";
mIOService.stop();
mJobQueue.shutdown();
mHashedObjectStore.bulkWrite();
mValidations.flush();
mAuxService.stop();
mJobQueue.shutdown();
cLog(lsINFO) << "Stopped: " << mIOService.stopped();
Instance::shutdown();
@@ -122,9 +122,9 @@ void Application::setup()
boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount));
boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount));
t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join();
mTxnDB->getDB()->setupCheckpointing();
mLedgerDB->getDB()->setupCheckpointing();
mHashNodeDB->getDB()->setupCheckpointing();
mTxnDB->getDB()->setupCheckpointing(&mJobQueue);
mLedgerDB->getDB()->setupCheckpointing(&mJobQueue);
mHashNodeDB->getDB()->setupCheckpointing(&mJobQueue);
if (theConfig.START_UP == Config::FRESH)
{

View File

@@ -16,6 +16,7 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000);
mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250);
mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500);
mJobLoads[jtWAL].setTargetLatency(1000, 2500);
mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500);
mJobLoads[jtWRITE].setTargetLatency(750, 1500);
mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500);
@@ -41,6 +42,7 @@ const char* Job::toString(JobType t)
case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishLedger";
case jtVALIDATION_t: return "trustedValidation";
case jtWAL: return "writeAhead";
case jtWRITE: return "writeObjects";
case jtTRANSACTION_l: return "localTransaction";
case jtPROPOSAL_t: return "trustedProposal";

View File

@@ -27,12 +27,13 @@ enum JobType
jtCLIENT = 4, // A websocket command from the client
jtTRANSACTION = 5, // A transaction received from the network
jtPUBLEDGER = 6, // Publish a fully-accepted ledger
jtVALIDATION_t = 7, // A validation from a trusted source
jtWRITE = 8, // Write out hashed objects
jtTRANSACTION_l = 9, // A local transaction
jtPROPOSAL_t = 10, // A proposal from a trusted source
jtADMIN = 11, // An administrative operation
jtDEATH = 12, // job of death, used internally
jtWAL = 7, // Write-ahead logging
jtVALIDATION_t = 8, // A validation from a trusted source
jtWRITE = 9, // Write out hashed objects
jtTRANSACTION_l = 10, // A local transaction
jtPROPOSAL_t = 11, // A proposal from a trusted source
jtADMIN = 12, // An administrative operation
jtDEATH = 13, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 17,

View File

@@ -15,6 +15,7 @@
// Put at the beginning of a C++ file that needs its own log partition
#define SETUP_LOG() static LogPartition logPartition(__FILE__)
#define SETUP_NLOG(x) static LogPartition logPartition(x)
// Standard conditional log
#define cLog(x) if (!logPartition.doLog(x)) do {} while (0); else Log(x, logPartition)