Do WAL checkpointing in our own thread(s).

This commit is contained in:
JoelKatz
2013-01-11 18:45:16 -08:00
parent b21e751ce7
commit 0d49bc877e
4 changed files with 80 additions and 1 deletions

View File

@@ -1,12 +1,17 @@
#include "SqliteDatabase.h" #include "SqliteDatabase.h"
#include "sqlite3.h" #include "sqlite3.h"
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
#include <iostream> #include <iostream>
#include <boost/thread.hpp>
#include <boost/foreach.hpp>
#include <boost/bind.hpp>
using namespace std; using namespace std;
SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"","") SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"",""), walRunning(false)
{ {
mConnection = NULL; mConnection = NULL;
mCurrentStmt = NULL; mCurrentStmt = NULL;
@@ -182,4 +187,56 @@ uint64 SqliteDatabase::getBigInt(int colIndex)
return(sqlite3_column_int64(mCurrentStmt, colIndex)); return(sqlite3_column_int64(mCurrentStmt, colIndex));
} }
static int SqliteWALHook(void *s, sqlite3* dbCon, const char *dbName, int walSize)
{
(reinterpret_cast<SqliteDatabase*>(s))->doHook(dbName, walSize);
return SQLITE_OK;
}
bool SqliteDatabase::setupCheckpointing()
{
sqlite3_wal_hook(mConnection, SqliteWALHook, this);
return true;
}
void SqliteDatabase::doHook(const char *db, int pages)
{
if (pages < 256)
return;
boost::mutex::scoped_lock sl(walMutex);
if (walDBs.insert(db).second && !walRunning)
{
walRunning = true;
boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach();
}
}
void SqliteDatabase::runWal()
{
std::set<std::string> walSet;
while (1)
{
{
boost::mutex::scoped_lock sl(walMutex);
walDBs.swap(walSet);
if (walSet.empty())
{
walRunning = false;
return;
}
}
BOOST_FOREACH(const std::string& db, walSet)
{
int log, ckpt;
sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
std::cerr << "Checkpoint " << db << ": " << log << " of " << ckpt << std::endl;
}
walSet.clear();
}
}
// vim:ts=4 // vim:ts=4

View File

@@ -1,13 +1,24 @@
#include "database.h" #include "database.h"
#include <string>
#include <set>
#include <boost/thread/mutex.hpp>
struct sqlite3; struct sqlite3;
struct sqlite3_stmt; struct sqlite3_stmt;
class SqliteDatabase : public Database class SqliteDatabase : public Database
{ {
sqlite3* mConnection; sqlite3* mConnection;
sqlite3_stmt* mCurrentStmt; sqlite3_stmt* mCurrentStmt;
bool mMoreRows; bool mMoreRows;
boost::mutex walMutex;
std::set<std::string> walDBs;
bool walRunning;
public: public:
SqliteDatabase(const char* host); SqliteDatabase(const char* host);
@@ -38,6 +49,12 @@ public:
int getBinary(int colIndex,unsigned char* buf,int maxSize); int getBinary(int colIndex,unsigned char* buf,int maxSize);
std::vector<unsigned char> getBinary(int colIndex); std::vector<unsigned char> getBinary(int colIndex);
uint64 getBigInt(int colIndex); uint64 getBigInt(int colIndex);
sqlite3* peekConnection() { return mConnection; }
virtual bool setupCheckpointing();
void runWal();
void doHook(const char *db, int walSize);
}; };
// vim:ts=4 // vim:ts=4

View File

@@ -82,6 +82,8 @@ public:
// int getSingleDBValueInt(const char* sql); // int getSingleDBValueInt(const char* sql);
// float getSingleDBValueFloat(const char* sql); // float getSingleDBValueFloat(const char* sql);
// char* getSingleDBValueStr(const char* sql, std::string& retStr); // char* getSingleDBValueStr(const char* sql, std::string& retStr);
virtual bool setupCheckpointing() { return false; }
}; };
#endif #endif

View File

@@ -118,6 +118,9 @@ void Application::setup()
boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount)); boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount));
boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount)); boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount));
t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join();
mTxnDB->getDB()->setupCheckpointing();
mLedgerDB->getDB()->setupCheckpointing();
mHashNodeDB->getDB()->setupCheckpointing();
if (theConfig.START_UP == Config::FRESH) if (theConfig.START_UP == Config::FRESH)
{ {