Merge branch 'master' of github.com:jedmccaleb/NewCoin

This commit is contained in:
Arthur Britto
2013-01-28 21:14:25 -08:00
7 changed files with 47 additions and 27 deletions

View File

@@ -9,9 +9,14 @@
#include <boost/foreach.hpp> #include <boost/foreach.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include "../ripple/JobQueue.h"
#include "../ripple/Log.h"
SETUP_NLOG("DataBase");
using namespace std; using namespace std;
SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"",""), walRunning(false) SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"",""), mWalQ(NULL), walRunning(false)
{ {
mConnection = NULL; mConnection = NULL;
mCurrentStmt = NULL; mCurrentStmt = NULL;
@@ -22,7 +27,7 @@ void SqliteDatabase::connect()
int rc = sqlite3_open(mHost.c_str(), &mConnection); int rc = sqlite3_open(mHost.c_str(), &mConnection);
if (rc) if (rc)
{ {
cout << "Can't open database: " << mHost << " " << rc << endl; cLog(lsFATAL) << "Can't open database: " << mHost << " " << rc;
sqlite3_close(mConnection); sqlite3_close(mConnection);
assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED)); assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED));
} }
@@ -46,9 +51,9 @@ bool SqliteDatabase::executeSQL(const char* sql, bool fail_ok)
if (!fail_ok) if (!fail_ok)
{ {
#ifdef DEBUG #ifdef DEBUG
cout << "SQL Perror:" << rc << endl; cLog(lsWARNING) << "SQL Perror:" << rc;
cout << "Statement: " << sql << endl; cLog(lsWARNING) << "Statement: " << sql;
cout << "Error: " << sqlite3_errmsg(mConnection) << endl; cLog(lsWARNING) << "Error: " << sqlite3_errmsg(mConnection);
#endif #endif
} }
return false; return false;
@@ -70,9 +75,9 @@ bool SqliteDatabase::executeSQL(const char* sql, bool fail_ok)
if (!fail_ok) if (!fail_ok)
{ {
#ifdef DEBUG #ifdef DEBUG
cout << "SQL Serror:" << rc << endl; cLog(lsWARNING) << "SQL Serror:" << rc;
cout << "Statement: " << sql << endl; cLog(lsWARNING) << "Statement: " << sql;
cout << "Error: " << sqlite3_errmsg(mConnection) << endl; cLog(lsWARNING) << "Error: " << sqlite3_errmsg(mConnection);
#endif #endif
} }
return false; return false;
@@ -130,7 +135,7 @@ bool SqliteDatabase::getNextRow()
else else
{ {
assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED)); assert((rc != SQLITE_BUSY) && (rc != SQLITE_LOCKED));
cout << "SQL Rerror:" << rc << endl; cLog(lsWARNING) << "SQL Rerror:" << rc;
return(false); return(false);
} }
} }
@@ -194,27 +199,32 @@ static int SqliteWALHook(void *s, sqlite3* dbCon, const char *dbName, int walSiz
return SQLITE_OK; return SQLITE_OK;
} }
bool SqliteDatabase::setupCheckpointing() bool SqliteDatabase::setupCheckpointing(JobQueue *q)
{ {
mWalQ = q;
sqlite3_wal_hook(mConnection, SqliteWALHook, this); sqlite3_wal_hook(mConnection, SqliteWALHook, this);
return true; return true;
} }
void SqliteDatabase::doHook(const char *db, int pages) void SqliteDatabase::doHook(const char *db, int pages)
{ {
if (pages < 256) if (pages < 512)
return; return;
boost::mutex::scoped_lock sl(walMutex); boost::mutex::scoped_lock sl(walMutex);
if (walDBs.insert(db).second && !walRunning) if (walDBs.insert(db).second && !walRunning)
{ {
walRunning = true; walRunning = true;
boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach(); if (mWalQ)
mWalQ->addJob(jtWAL, boost::bind(&SqliteDatabase::runWal, this));
else
boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach();
} }
} }
void SqliteDatabase::runWal() void SqliteDatabase::runWal()
{ {
std::set<std::string> walSet; std::set<std::string> walSet;
std::string name = sqlite3_db_filename(mConnection, "main");
while (1) while (1)
{ {
@@ -231,10 +241,14 @@ void SqliteDatabase::runWal()
BOOST_FOREACH(const std::string& db, walSet) BOOST_FOREACH(const std::string& db, walSet)
{ {
int log, ckpt; int log, ckpt;
sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); int ret = sqlite3_wal_checkpoint_v2(mConnection, db.c_str(), SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
if (ret != SQLITE_OK)
{
cLog((ret == SQLITE_LOCKED) ? lsDEBUG : lsWARNING) << "WAL "
<< sqlite3_db_filename(mConnection, "main") << " / " << db << " errror " << ret;
}
} }
walSet.clear(); walSet.clear();
} }
} }

View File

@@ -16,6 +16,7 @@ class SqliteDatabase : public Database
bool mMoreRows; bool mMoreRows;
boost::mutex walMutex; boost::mutex walMutex;
JobQueue* mWalQ;
std::set<std::string> walDBs; std::set<std::string> walDBs;
bool walRunning; bool walRunning;
@@ -51,7 +52,7 @@ public:
uint64 getBigInt(int colIndex); uint64 getBigInt(int colIndex);
sqlite3* peekConnection() { return mConnection; } sqlite3* peekConnection() { return mConnection; }
virtual bool setupCheckpointing(); virtual bool setupCheckpointing(JobQueue*);
virtual SqliteDatabase* getSqliteDB() { return this; } virtual SqliteDatabase* getSqliteDB() { return this; }
void runWal(); void runWal();

View File

@@ -18,6 +18,7 @@
*/ */
class SqliteDatabase; class SqliteDatabase;
class JobQueue;
class Database class Database
{ {
@@ -86,8 +87,8 @@ public:
// float getSingleDBValueFloat(const char* sql); // float getSingleDBValueFloat(const char* sql);
// char* getSingleDBValueStr(const char* sql, std::string& retStr); // char* getSingleDBValueStr(const char* sql, std::string& retStr);
virtual bool setupCheckpointing() { return false; } virtual bool setupCheckpointing(JobQueue*) { return false; }
virtual SqliteDatabase* getSqliteDB() { return NULL; } virtual SqliteDatabase* getSqliteDB() { return NULL; }
}; };
#endif #endif

View File

@@ -62,10 +62,10 @@ void Application::stop()
{ {
cLog(lsINFO) << "Received shutdown request"; cLog(lsINFO) << "Received shutdown request";
mIOService.stop(); mIOService.stop();
mJobQueue.shutdown();
mHashedObjectStore.bulkWrite(); mHashedObjectStore.bulkWrite();
mValidations.flush(); mValidations.flush();
mAuxService.stop(); mAuxService.stop();
mJobQueue.shutdown();
cLog(lsINFO) << "Stopped: " << mIOService.stopped(); cLog(lsINFO) << "Stopped: " << mIOService.stopped();
Instance::shutdown(); Instance::shutdown();
@@ -122,9 +122,9 @@ void Application::setup()
boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount)); boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount));
boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount)); boost::thread t6(boost::bind(&InitDB, &mNetNodeDB, "netnode.db", NetNodeDBInit, NetNodeDBCount));
t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join(); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); t6.join();
mTxnDB->getDB()->setupCheckpointing(); mTxnDB->getDB()->setupCheckpointing(&mJobQueue);
mLedgerDB->getDB()->setupCheckpointing(); mLedgerDB->getDB()->setupCheckpointing(&mJobQueue);
mHashNodeDB->getDB()->setupCheckpointing(); mHashNodeDB->getDB()->setupCheckpointing(&mJobQueue);
if (theConfig.START_UP == Config::FRESH) if (theConfig.START_UP == Config::FRESH)
{ {

View File

@@ -16,6 +16,7 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000); mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000);
mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250); mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250);
mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500); mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500);
mJobLoads[jtWAL].setTargetLatency(1000, 2500);
mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500); mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500);
mJobLoads[jtWRITE].setTargetLatency(750, 1500); mJobLoads[jtWRITE].setTargetLatency(750, 1500);
mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500); mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500);
@@ -41,6 +42,7 @@ const char* Job::toString(JobType t)
case jtTRANSACTION: return "transaction"; case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishLedger"; case jtPUBLEDGER: return "publishLedger";
case jtVALIDATION_t: return "trustedValidation"; case jtVALIDATION_t: return "trustedValidation";
case jtWAL: return "writeAhead";
case jtWRITE: return "writeObjects"; case jtWRITE: return "writeObjects";
case jtTRANSACTION_l: return "localTransaction"; case jtTRANSACTION_l: return "localTransaction";
case jtPROPOSAL_t: return "trustedProposal"; case jtPROPOSAL_t: return "trustedProposal";

View File

@@ -27,12 +27,13 @@ enum JobType
jtCLIENT = 4, // A websocket command from the client jtCLIENT = 4, // A websocket command from the client
jtTRANSACTION = 5, // A transaction received from the network jtTRANSACTION = 5, // A transaction received from the network
jtPUBLEDGER = 6, // Publish a fully-accepted ledger jtPUBLEDGER = 6, // Publish a fully-accepted ledger
jtVALIDATION_t = 7, // A validation from a trusted source jtWAL = 7, // Write-ahead logging
jtWRITE = 8, // Write out hashed objects jtVALIDATION_t = 8, // A validation from a trusted source
jtTRANSACTION_l = 9, // A local transaction jtWRITE = 9, // Write out hashed objects
jtPROPOSAL_t = 10, // A proposal from a trusted source jtTRANSACTION_l = 10, // A local transaction
jtADMIN = 11, // An administrative operation jtPROPOSAL_t = 11, // A proposal from a trusted source
jtDEATH = 12, // job of death, used internally jtADMIN = 12, // An administrative operation
jtDEATH = 13, // job of death, used internally
// special types not dispatched by the job pool // special types not dispatched by the job pool
jtPEER = 17, jtPEER = 17,

View File

@@ -15,6 +15,7 @@
// Put at the beginning of a C++ file that needs its own log partition // Put at the beginning of a C++ file that needs its own log partition
#define SETUP_LOG() static LogPartition logPartition(__FILE__) #define SETUP_LOG() static LogPartition logPartition(__FILE__)
#define SETUP_NLOG(x) static LogPartition logPartition(x)
// Standard conditional log // Standard conditional log
#define cLog(x) if (!logPartition.doLog(x)) do {} while (0); else Log(x, logPartition) #define cLog(x) if (!logPartition.doLog(x)) do {} while (0); else Log(x, logPartition)