From faa344c7d0145e8a4f1807fc75e7996705bc7714 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 28 Jan 2013 14:32:52 -0800 Subject: [PATCH] Dispatch write ahead logging through our job queue. SQL logging improvements. --- src/cpp/database/SqliteDatabase.cpp | 42 +++++++++++++++++++---------- src/cpp/database/SqliteDatabase.h | 3 ++- src/cpp/database/database.h | 5 ++-- src/cpp/ripple/Application.cpp | 8 +++--- src/cpp/ripple/JobQueue.cpp | 2 ++ src/cpp/ripple/JobQueue.h | 13 ++++----- src/cpp/ripple/Log.h | 1 + 7 files changed, 47 insertions(+), 27 deletions(-) diff --git a/src/cpp/database/SqliteDatabase.cpp b/src/cpp/database/SqliteDatabase.cpp index addf399b68..4575a9e8b4 100644 --- a/src/cpp/database/SqliteDatabase.cpp +++ b/src/cpp/database/SqliteDatabase.cpp @@ -9,9 +9,14 @@ #include #include +#include "../ripple/JobQueue.h" +#include "../ripple/Log.h" + +SETUP_NLOG("DataBase"); + using namespace std; -SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"",""), walRunning(false) +SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"",""), mWalQ(NULL), walRunning(false) { mConnection = NULL; mCurrentStmt = NULL; @@ -22,7 +27,7 @@ void SqliteDatabase::connect() int rc = sqlite3_open(mHost.c_str(), &mConnection); if (rc) { - cout << "Can't open database: " << mHost << " " << rc << endl; + cLog(lsFATAL) << "Can't open database: " << mHost << " " << rc; sqlite3_close(mConnection); assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED)); } @@ -46,9 +51,9 @@ bool SqliteDatabase::executeSQL(const char* sql, bool fail_ok) if (!fail_ok) { #ifdef DEBUG - cout << "SQL Perror:" << rc << endl; - cout << "Statement: " << sql << endl; - cout << "Error: " << sqlite3_errmsg(mConnection) << endl; + cLog(lsWARNING) << "SQL Perror:" << rc; + cLog(lsWARNING) << "Statement: " << sql; + cLog(lsWARNING) << "Error: " << sqlite3_errmsg(mConnection); #endif } return false; @@ -70,9 +75,9 @@ bool SqliteDatabase::executeSQL(const char* sql, bool fail_ok) if (!fail_ok) { #ifdef DEBUG - cout << "SQL Serror:" << rc << endl; - cout << "Statement: " << sql << endl; - cout << "Error: " << sqlite3_errmsg(mConnection) << endl; + cLog(lsWARNING) << "SQL Serror:" << rc; + cLog(lsWARNING) << "Statement: " << sql; + cLog(lsWARNING) << "Error: " << sqlite3_errmsg(mConnection); #endif } return false; @@ -130,7 +135,7 @@ bool SqliteDatabase::getNextRow() else { assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED)); - cout << "SQL Rerror:" << rc << endl; + cLog(lsWARNING) << "SQL Rerror:" << rc; return(false); } } @@ -194,27 +199,32 @@ static int SqliteWALHook(void *s, sqlite3* dbCon, const char *dbName, int walSiz return SQLITE_OK; } -bool SqliteDatabase::setupCheckpointing() +bool SqliteDatabase::setupCheckpointing(JobQueue *q) { + mWalQ = q; sqlite3_wal_hook(mConnection, SqliteWALHook, this); return true; } void SqliteDatabase::doHook(const char *db, int pages) { - if (pages < 256) + if (pages < 512) return; boost::mutex::scoped_lock sl(walMutex); if (walDBs.insert(db).second && !walRunning) { walRunning = true; - boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach(); + if (mWalQ) + mWalQ->addJob(jtWAL, boost::bind(&SqliteDatabase::runWal, this)); + else + boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach(); } } void SqliteDatabase::runWal() { std::set walSet; + std::string name = sqlite3_db_filename(mConnection, "main"); while (1) { @@ -231,10 +241,14 @@ void SqliteDatabase::runWal() BOOST_FOREACH(const std::string& db, walSet) { int log, ckpt; - sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); + int ret = sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); + if (ret != SQLITE_OK) + { + cLog((ret == SQLITE_LOCKED) ? lsDEBUG : lsWARNING) << "WAL " + << sqlite3_db_filename(mConnection, "main") << " / " << db << " errror " << ret; + } } walSet.clear(); - } } diff --git a/src/cpp/database/SqliteDatabase.h b/src/cpp/database/SqliteDatabase.h index ad0c0eab66..5b4cfa048d 100644 --- a/src/cpp/database/SqliteDatabase.h +++ b/src/cpp/database/SqliteDatabase.h @@ -16,6 +16,7 @@ class SqliteDatabase : public Database bool mMoreRows; boost::mutex walMutex; + JobQueue* mWalQ; std::set walDBs; bool walRunning; @@ -51,7 +52,7 @@ public: uint64 getBigInt(int colIndex); sqlite3* peekConnection() { return mConnection; } - virtual bool setupCheckpointing(); + virtual bool setupCheckpointing(JobQueue*); virtual SqliteDatabase* getSqliteDB() { return this; } void runWal(); diff --git a/src/cpp/database/database.h b/src/cpp/database/database.h index aaad8ac488..05f9e87700 100644 --- a/src/cpp/database/database.h +++ b/src/cpp/database/database.h @@ -18,6 +18,7 @@ */ class SqliteDatabase; +class JobQueue; class Database { @@ -86,8 +87,8 @@ public: // float getSingleDBValueFloat(const char* sql); // char* getSingleDBValueStr(const char* sql, std::string& retStr); - virtual bool setupCheckpointing() { return false; } - virtual SqliteDatabase* getSqliteDB() { return NULL; } + virtual bool setupCheckpointing(JobQueue*) { return false; } + virtual SqliteDatabase* getSqliteDB() { return NULL; } }; #endif diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 09e37a2e4f..a2d608279c 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -62,10 +62,10 @@ void Application::stop() { cLog(lsINFO) << "Received shutdown request"; mIOService.stop(); - mJobQueue.shutdown(); mHashedObjectStore.bulkWrite(); mValidations.flush(); mAuxService.stop(); + mJobQueue.shutdown(); cLog(lsINFO) << "Stopped: " << mIOService.stopped(); Instance::shutdown(); @@ -122,9 +122,9 @@ void Application::setup() boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount)); boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount)); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); - mTxnDB->getDB()->setupCheckpointing(); - mLedgerDB->getDB()->setupCheckpointing(); - mHashNodeDB->getDB()->setupCheckpointing(); + mTxnDB->getDB()->setupCheckpointing(&mJobQueue); + mLedgerDB->getDB()->setupCheckpointing(&mJobQueue); + mHashNodeDB->getDB()->setupCheckpointing(&mJobQueue); if (theConfig.START_UP == Config::FRESH) { diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index fe1dcd829a..5d0106f381 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -16,6 +16,7 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000); mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250); mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500); + mJobLoads[jtWAL].setTargetLatency(1000, 2500); mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500); mJobLoads[jtWRITE].setTargetLatency(750, 1500); mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500); @@ -41,6 +42,7 @@ const char* Job::toString(JobType t) case jtTRANSACTION: return "transaction"; case jtPUBLEDGER: return "publishLedger"; case jtVALIDATION_t: return "trustedValidation"; + case jtWAL: return "writeAhead"; case jtWRITE: return "writeObjects"; case jtTRANSACTION_l: return "localTransaction"; case jtPROPOSAL_t: return "trustedProposal"; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index dfdcd1039e..1df31aca7e 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -27,12 +27,13 @@ enum JobType jtCLIENT = 4, // A websocket command from the client jtTRANSACTION = 5, // A transaction received from the network jtPUBLEDGER = 6, // Publish a fully-accepted ledger - jtVALIDATION_t = 7, // A validation from a trusted source - jtWRITE = 8, // Write out hashed objects - jtTRANSACTION_l = 9, // A local transaction - jtPROPOSAL_t = 10, // A proposal from a trusted source - jtADMIN = 11, // An administrative operation - jtDEATH = 12, // job of death, used internally + jtWAL = 7, // Write-ahead logging + jtVALIDATION_t = 8, // A validation from a trusted source + jtWRITE = 9, // Write out hashed objects + jtTRANSACTION_l = 10, // A local transaction + jtPROPOSAL_t = 11, // A proposal from a trusted source + jtADMIN = 12, // An administrative operation + jtDEATH = 13, // job of death, used internally // special types not dispatched by the job pool jtPEER = 17, diff --git a/src/cpp/ripple/Log.h b/src/cpp/ripple/Log.h index 9f72af8176..f9ec0b8f81 100644 --- a/src/cpp/ripple/Log.h +++ b/src/cpp/ripple/Log.h @@ -15,6 +15,7 @@ // Put at the beginning of a C++ file that needs its own log partition #define SETUP_LOG() static LogPartition logPartition(__FILE__) +#define SETUP_NLOG(x) static LogPartition logPartition(x) // Standard conditional log #define cLog(x) if (!logPartition.doLog(x)) do {} while (0); else Log(x, logPartition)