Merge branch 'ws_dispatch'

This commit is contained in:
JoelKatz
2013-05-08 11:00:54 -07:00
14 changed files with 163 additions and 75 deletions

View File

@@ -1,16 +1,14 @@
To use LevelDB, follow these steps:
1) In the SConstruct file, change
LevelDB = bool(0)
to
LevelDB = bool(1)
1) Add the following to your rippled.cfg file:
[node_db]
LevelDB
2) Compile with 'scons'.
3) If you have no databases you care about, remove them. Otherwise, start
rippled with '--import' to import your existing hashnode database into
LevelDB. After you import the hashnode database, either delete (or move) the
2) If you have no databases, you can just start rippled. If you have
databases you don't care about, just remove them. Otherwise, start rippled
with '--import' to import your existing hashnode database into LevelDB.
After you import the hashnode database, either delete (or move) the
'hashnode.db', 'hashnode.db-shm' and 'hashnode.db-wal' files from your
database directory. Do not touch the LevelDB files in the 'hashnode'
directory.

View File

@@ -8,7 +8,7 @@ import glob
import platform
import re
LevelDB = bool(0)
LevelDB = bool(1)
OSX = bool(platform.mac_ver()[0])
FreeBSD = bool('FreeBSD' == platform.system())

View File

@@ -222,6 +222,12 @@
# Examples: RASH BUSH MILK LOOK BAD BRIM AVID GAFF BAIT ROT POD LOVE
# shfArahZT9Q9ckTf3s1psJ7C7qzVN
#
# [node_db]
# Sets the database used for hashed nodes. Options are "LevelDB" and
# "SQLite". LevelDB is recommended for optimum performance and SQLite
# support will soon be removed. Databases can be migrated by enabling
# "LevelDB" and starting rippled with the "--import" flag
#
# [node_size]
# Tunes the servers based on the expected load and available memory. Legal
# sizes are "tiny", "small", "medium", "large", and "huge". We recommend

View File

@@ -62,6 +62,9 @@ Application::Application() :
mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL),
mNetNodeDB(NULL), mPathFindDB(NULL), mHashNodeDB(NULL),
#ifdef USE_LEVELDB
mHashNodeLDB(NULL),
#endif
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL),
mSweepTimer(mAuxService), mShutdown(false)
{
@@ -87,8 +90,8 @@ void Application::stop()
mJobQueue.shutdown();
#ifdef HAVE_LEVELDB
delete mHashNodeDB:
mHashNodeDB = NULL;
delete mHashNodeLDB:
mHashNodeLDB = NULL;
#endif
cLog(lsINFO) << "Stopped: " << mIOService.stopped();
@@ -167,21 +170,28 @@ void Application::setup()
t4.join(); t6.join(); t7.join();
#ifdef USE_LEVELDB
leveldb::Options options;
options.create_if_missing = true;
options.block_cache = leveldb::NewLRUCache(theConfig.getSize(siHashNodeDBCache) * 1024 * 1024);
if (theConfig.NODE_SIZE >= 2)
options.filter_policy = leveldb::NewBloomFilterPolicy(10);
leveldb::Status status = leveldb::DB::Open(options, (theConfig.DATA_DIR / "hashnode").string(), &mHashNodeDB);
if (!status.ok() || !mHashNodeDB)
if (mHashedObjectStore.isLevelDB())
{
cLog(lsFATAL) << "Unable to open/create hash node db";
exit(3);
cLog(lsFATAL) << "LDB";
leveldb::Options options;
options.create_if_missing = true;
options.block_cache = leveldb::NewLRUCache(theConfig.getSize(siHashNodeDBCache) * 1024 * 1024);
if (theConfig.NODE_SIZE >= 2)
options.filter_policy = leveldb::NewBloomFilterPolicy(10);
leveldb::Status status = leveldb::DB::Open(options, (theConfig.DATA_DIR / "hashnode").string(), &mHashNodeLDB);
if (!status.ok() || !mHashNodeLDB)
{
cLog(lsFATAL) << "Unable to open/create hash node db";
exit(3);
}
}
#else
boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount));
t5.join();
else
#endif
{
cLog(lsFATAL) << "SQLite";
boost::thread t5(boost::bind(&InitDB, &mHashNodeDB, "hashnode.db", HashNodeDBInit, HashNodeDBCount));
t5.join();
}
mTxnDB->getDB()->setupCheckpointing(&mJobQueue);
mLedgerDB->getDB()->setupCheckpointing(&mJobQueue);
@@ -233,10 +243,11 @@ void Application::setup()
mLedgerMaster.tune(theConfig.getSize(siLedgerSize), theConfig.getSize(siLedgerAge));
mLedgerMaster.setMinValidations(theConfig.VALIDATION_QUORUM);
#ifndef USE_LEVELDB
theApp->getHashNodeDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") %
(theConfig.getSize(siHashNodeDBCache) * 1024)));
#ifdef USE_LEVELDB
if (!mHashedObjectStore.isLevelDB())
#endif
theApp->getHashNodeDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") %
(theConfig.getSize(siHashNodeDBCache) * 1024)));
theApp->getLedgerDB()->getDB()->executeSQL(boost::str(boost::format("PRAGMA cache_size=-%d;") %
(theConfig.getSize(siTxnDBCache) * 1024)));
@@ -398,6 +409,9 @@ Application::~Application()
delete mHashNodeDB;
delete mNetNodeDB;
delete mPathFindDB;
#ifdef USE_LEVELDB
delete mHashNodeLDB;
#endif
}
void Application::startNewLedger()

View File

@@ -79,12 +79,10 @@ class Application
FeeVote mFeeVote;
FeatureTable mFeatureTable;
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB;
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB, *mHashNodeDB;
#ifdef USE_LEVELDB
leveldb::DB *mHashNodeDB;
#else
DatabaseCon *mHashNodeDB;
leveldb::DB *mHashNodeLDB;
#endif
ConnectionPool mConnectionPool;
@@ -154,11 +152,10 @@ public:
DatabaseCon* getWalletDB() { return mWalletDB; }
DatabaseCon* getNetNodeDB() { return mNetNodeDB; }
DatabaseCon* getPathFindDB() { return mPathFindDB; }
DatabaseCon* getHashNodeDB() { return mHashNodeDB; }
#ifdef USE_LEVELDB
leveldb::DB* getHashNodeDB() { return mHashNodeDB; }
#else
DatabaseCon* getHashNodeDB() { return mHashNodeDB; }
leveldb::DB* getHashNodeLDB() { return mHashNodeLDB; }
#endif
uint256 getNonce256() { return mNonce256; }

View File

@@ -23,6 +23,7 @@
#define SECTION_FEE_OPERATION "fee_operation"
#define SECTION_FEE_ACCOUNT_RESERVE "fee_account_reserve"
#define SECTION_FEE_OWNER_RESERVE "fee_owner_reserve"
#define SECTION_NODE_DB "node_db"
#define SECTION_LEDGER_HISTORY "ledger_history"
#define SECTION_IPS "ips"
#define SECTION_NETWORK_QUORUM "network_quorum"
@@ -248,6 +249,8 @@ Config::Config()
SSL_VERIFY = true;
NODE_DB = "sqlite";
LDB_IMPORT = false;
RUN_STANDALONE = false;
START_UP = NORMAL;
@@ -330,6 +333,7 @@ void Config::load()
if (sectionSingleB(secConfig, SECTION_DATABASE_PATH, DATABASE_PATH))
DATA_DIR = DATABASE_PATH;
(void) sectionSingleB(secConfig, SECTION_VALIDATORS_SITE, VALIDATORS_SITE);
(void) sectionSingleB(secConfig, SECTION_PEER_IP, PEER_IP);
@@ -351,6 +355,7 @@ void Config::load()
(void) sectionSingleB(secConfig, SECTION_RPC_IP, RPC_IP);
(void) sectionSingleB(secConfig, SECTION_RPC_PASSWORD, RPC_PASSWORD);
(void) sectionSingleB(secConfig, SECTION_RPC_USER, RPC_USER);
(void) sectionSingleB(secConfig, SECTION_NODE_DB, NODE_DB);
if (sectionSingleB(secConfig, SECTION_RPC_PORT, strTemp))
RPC_PORT = boost::lexical_cast<int>(strTemp);

View File

@@ -88,6 +88,7 @@ public:
boost::filesystem::path DATA_DIR;
boost::filesystem::path DEBUG_LOGFILE;
boost::filesystem::path VALIDATORS_FILE; // As specifed in rippled.cfg.
std::string NODE_DB; // Database to use for nodes
bool LDB_IMPORT; // Import into LevelDB
std::string VALIDATORS_SITE; // Where to find validators.txt on the Internet.

View File

@@ -19,9 +19,27 @@ DECLARE_INSTANCE(HashedObject);
HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) :
mCache("HashedObjectStore", cacheSize, cacheAge), mNegativeCache("HashedObjectNegativeCache", 0, 120),
mWriteGeneration(0), mWritePending(false)
mWriteGeneration(0), mWritePending(false), mLevelDB(false)
{
mWriteSet.reserve(128);
if (theConfig.NODE_DB == "leveldb" || theConfig.NODE_DB == "LevelDB")
mLevelDB = true;
else if (theConfig.NODE_DB == "SQLite" || theConfig.NODE_DB == "sqlite")
mLevelDB = false;
else
{
cLog(lsFATAL) << "Incorrect database selection";
assert(false);
}
#ifndef USE_LEVELDB
if (mLevelDB)
{
cLog(lsFATAL) << "LevelDB has been selected but not compiled";
assert(false);
}
#endif
}
void HashedObjectStore::tune(int size, int age)
@@ -32,10 +50,10 @@ void HashedObjectStore::tune(int size, int age)
#ifdef USE_LEVELDB
bool HashedObjectStore::store(HashedObjectType type, uint32 index,
bool HashedObjectStore::storeLevelDB(HashedObjectType type, uint32 index,
const std::vector<unsigned char>& data, const uint256& hash)
{ // return: false = already in cache, true = added to cache
if (!theApp->getHashNodeDB())
if (!theApp->getHashNodeLDB())
{
cLog(lsWARNING) << "HOS: no db";
return true;
@@ -60,7 +78,7 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index,
*(bufPtr + 8) = static_cast<unsigned char>(type);
memcpy(bufPtr + 9, &data.front(), data.size());
leveldb::Status st = theApp->getHashNodeDB()->Put(leveldb::WriteOptions(),
leveldb::Status st = theApp->getHashNodeLDB()->Put(leveldb::WriteOptions(),
leveldb::Slice(reinterpret_cast<const char *>(hash.begin()), hash.size()),
leveldb::Slice(reinterpret_cast<const char *>(bufPtr), 9 + data.size()));
if (!st.ok())
@@ -74,17 +92,13 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index,
return true;
}
void HashedObjectStore::waitWrite()
{
}
HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
HashedObject::pointer HashedObjectStore::retrieveLevelDB(const uint256& hash)
{
HashedObject::pointer obj = mCache.fetch(hash);
if (obj)
return obj;
if (!theApp || !theApp->getHashNodeDB())
if (!theApp || !theApp->getHashNodeLDB())
{
cLog(lsWARNING) << "HOS: no db";
return obj;
@@ -92,7 +106,7 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtHO_READ, "HOS::retrieve"));
std::string sData;
leveldb::Status st = theApp->getHashNodeDB()->Get(leveldb::ReadOptions(),
leveldb::Status st = theApp->getHashNodeLDB()->Get(leveldb::ReadOptions(),
leveldb::Slice(reinterpret_cast<const char *>(hash.begin()), hash.size()), &sData);
if (!st.ok())
{
@@ -112,9 +126,9 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
return obj;
}
#else
#endif
bool HashedObjectStore::store(HashedObjectType type, uint32 index,
bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index,
const std::vector<unsigned char>& data, const uint256& hash)
{ // return: false = already in cache, true = added to cache
if (!theApp->getHashNodeDB())
@@ -150,6 +164,8 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index,
void HashedObjectStore::waitWrite()
{
if (mLevelDB)
return;
boost::mutex::scoped_lock sl(mWriteMutex);
int gen = mWriteGeneration;
while (mWritePending && (mWriteGeneration == gen))
@@ -158,6 +174,7 @@ void HashedObjectStore::waitWrite()
void HashedObjectStore::bulkWrite()
{
assert(!mLevelDB);
while (1)
{
std::vector< boost::shared_ptr<HashedObject> > set;
@@ -255,7 +272,7 @@ void HashedObjectStore::bulkWrite()
}
}
HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
HashedObject::pointer HashedObjectStore::retrieveSQLite(const uint256& hash)
{
HashedObject::pointer obj = mCache.fetch(hash);
if (obj)
@@ -347,7 +364,7 @@ HashedObject::pointer HashedObjectStore::retrieve(const uint256& hash)
return obj;
}
#endif
#ifdef USE_LEVELDB
int HashedObjectStore::import(const std::string& file, bool checkHashes)
{
@@ -413,4 +430,6 @@ int HashedObjectStore::import(const std::string& file, bool checkHashes)
return count;
}
#endif
// vim:ts=4

View File

@@ -58,15 +58,42 @@ protected:
std::vector< boost::shared_ptr<HashedObject> > mWriteSet;
bool mWritePending;
bool mLevelDB;
public:
HashedObjectStore(int cacheSize, int cacheAge);
bool store(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash);
bool isLevelDB() { return mLevelDB; }
HashedObject::pointer retrieve(const uint256& hash);
bool store(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash)
{
#ifdef USE_LEVELDB
if (mLevelDB)
return storeLevelDB(type, index, data, hash);
#endif
return storeSQLite(type, index, data, hash);
}
HashedObject::pointer retrieve(const uint256& hash)
{
#ifdef USE_LEVELDB
if (mLevelDB)
return retrieveLevelDB(hash);
#endif
return retrieveSQLite(hash);
}
bool storeSQLite(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash);
HashedObject::pointer retrieveSQLite(const uint256& hash);
#ifdef USE_LEVELDB
bool storeLevelDB(HashedObjectType type, uint32 index, const std::vector<unsigned char>& data,
const uint256& hash);
HashedObject::pointer retrieveLevelDB(const uint256& hash);
#endif
void bulkWrite();
void waitWrite();

View File

@@ -2159,11 +2159,12 @@ Json::Value RPCHandler::doGetCounts(Json::Value jvRequest, int& cost, ScopedLock
if (dbKB > 0)
ret["dbKBLedger"] = dbKB;
#ifndef USE_LEVELDB
dbKB = theApp->getHashNodeDB()->getDB()->getKBUsedDB();
if (dbKB > 0)
ret["dbKBHashNode"] = dbKB;
#endif
if (!theApp->getHashedObjectStore().isLevelDB())
{
dbKB = theApp->getHashNodeDB()->getDB()->getKBUsedDB();
if (dbKB > 0)
ret["dbKBHashNode"] = dbKB;
}
dbKB = theApp->getTxnDB()->getDB()->getKBUsedDB();
if (dbKB > 0)

View File

@@ -108,20 +108,23 @@ void Application::updateTables(bool ldbImport)
addTxnSeqField();
#ifdef USE_LEVELDB
boost::filesystem::path hashPath = theConfig.DATA_DIR / "hashnode.db";
if (boost::filesystem::exists(hashPath))
if (theApp->getHashedObjectStore().isLevelDB())
{
if (theConfig.LDB_IMPORT)
boost::filesystem::path hashPath = theConfig.DATA_DIR / "hashnode.db";
if (boost::filesystem::exists(hashPath))
{
Log(lsWARNING) << "Importing SQLite -> LevelDB";
theApp->getHashedObjectStore().import(hashPath.string(), true);
Log(lsWARNING) << "Remove or remname the hashnode.db file";
}
else
{
Log(lsWARNING) << "SQLite hashnode database exists. Please either remove or import";
Log(lsWARNING) << "To import, start with the '--import' option. Otherwise, remove hashnode.db";
exit(1);
if (theConfig.LDB_IMPORT)
{
Log(lsWARNING) << "Importing SQLite -> LevelDB";
theApp->getHashedObjectStore().import(hashPath.string(), true);
Log(lsWARNING) << "Remove or remname the hashnode.db file";
}
else
{
Log(lsWARNING) << "SQLite hashnode database exists. Please either remove or import";
Log(lsWARNING) << "To import, start with the '--import' option. Otherwise, remove hashnode.db";
exit(1);
}
}
}
#endif

View File

@@ -6,7 +6,7 @@
#define SERVER_VERSION_MAJOR 0
#define SERVER_VERSION_MINOR 9
#define SERVER_VERSION_SUB "-a"
#define SERVER_VERSION_SUB "-b"
#define SERVER_NAME "Ripple"
#define SV_STRINGIZE(x) SV_STRINGIZE2(x)

View File

@@ -101,6 +101,7 @@ public:
if (theApp->getLoadManager().shouldCutoff(mLoadSource))
{
#if SHOULD_DISCONNECT
// FIXME: Must dispatch to strand
connection_ptr ptr = mConnection.lock();
if (ptr)
ptr->close(websocketpp::close::status::PROTOCOL_ERROR, "overload");
@@ -199,9 +200,13 @@ public:
void setPingTimer()
{
mPingTimer.expires_from_now(boost::posix_time::seconds(WEBSOCKET_PING_FREQUENCY));
mPingTimer.async_wait(boost::bind(
&WSConnection<endpoint_type>::pingTimer, mConnection, mHandler, boost::asio::placeholders::error));
connection_ptr ptr = mConnection.lock();
if (ptr)
{
mPingTimer.expires_from_now(boost::posix_time::seconds(WEBSOCKET_PING_FREQUENCY));
mPingTimer.async_wait(ptr->get_strand().wrap(boost::bind(
&WSConnection<endpoint_type>::pingTimer, mConnection, mHandler, boost::asio::placeholders::error)));
}
}
void rcvMessage(message_ptr msg, bool& msgRejected, bool& runQueue)

View File

@@ -49,7 +49,7 @@ public:
bool getPublic() { return mPublic; };
void send(connection_ptr cpClient, message_ptr mpMessage)
static void ssend(connection_ptr cpClient, message_ptr mpMessage)
{
try
{
@@ -61,7 +61,7 @@ public:
}
}
void send(connection_ptr cpClient, const std::string& strMessage, bool broadcast)
static void ssend(connection_ptr cpClient, const std::string& strMessage, bool broadcast)
{
try
{
@@ -75,6 +75,18 @@ public:
}
}
void send(connection_ptr cpClient, message_ptr mpMessage)
{
cpClient->get_strand().post(boost::bind(
&WSServerHandler<endpoint_type>::ssend, cpClient, mpMessage));
}
void send(connection_ptr cpClient, const std::string& strMessage, bool broadcast)
{
cpClient->get_strand().post(boost::bind(
&WSServerHandler<endpoint_type>::ssend, cpClient, strMessage, broadcast));
}
void send(connection_ptr cpClient, const Json::Value& jvObj, bool broadcast)
{
Json::FastWriter jfwWriter;