From bda64fac2ad8f2b0aadfe9536427942fc969ee07 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Thu, 7 Jun 2012 12:25:44 -0700 Subject: [PATCH 1/7] Tx set exchange improvements. --- src/LedgerConsensus.cpp | 47 ++++++++++++++++++----------------------- src/LedgerConsensus.h | 4 ++-- src/newcoin.proto | 20 ++++++++++++------ 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 0c14e9bdc4..4232c57e24 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -244,17 +244,15 @@ void LedgerConsensus::mapComplete(const uint256& hash, SHAMap::pointer map) if (!peers.empty()) adjustCount(map, peers); - std::vector hashes; - hashes.push_back(hash); - sendHaveTxSet(hashes); + sendHaveTxSet(hash, true); } -void LedgerConsensus::sendHaveTxSet(const std::vector& hashes) +void LedgerConsensus::sendHaveTxSet(const uint256& hash, bool direct) { - newcoin::TMHaveTransactionSet set; - for (std::vector::const_iterator it = hashes.begin(), end = hashes.end(); it != end; ++it) - set.add_hashes(it->begin(), 256 / 8); - PackedMessage::pointer packet = boost::make_shared(set, newcoin::mtHAVE_SET); + newcoin::TMHaveTransactionSet msg; + msg.set_hash(hash.begin(), 256 / 8); + msg.set_status(direct ? newcoin::tsHAVE : newcoin::tsCAN_GET); + PackedMessage::pointer packet = boost::make_shared(msg, newcoin::mtHAVE_SET); theApp->getConnectionPool().relayMessage(NULL, packet); } @@ -411,9 +409,7 @@ bool LedgerConsensus::updateOurPositions(int sinceClose) uint256 newHash = ourPosition->getHash(); mOurPosition->changePosition(newHash); propose(addedTx, removedTx); - std::vector hashes; - hashes.push_back(newHash); - sendHaveTxSet(hashes); + sendHaveTxSet(newHash, true); } return stable; @@ -535,23 +531,20 @@ bool LedgerConsensus::peerPosition(LedgerProposal::pointer newPosition) return true; } -bool LedgerConsensus::peerHasSet(Peer::pointer peer, const std::vector& sets) +bool LedgerConsensus::peerHasSet(Peer::pointer peer, const uint256& hashSet, newcoin::TxSetStatus status) { - for (std::vector::const_iterator it = sets.begin(), end = sets.end(); it != end; ++it) - { - std::vector< boost::weak_ptr >& set = mPeerData[*it]; - bool found = false; - for (std::vector< boost::weak_ptr >::iterator iit = set.begin(), iend = set.end(); iit != iend; ++iit) - if (iit->lock() == peer) - found = true; - if (!found) - { - set.push_back(peer); - boost::unordered_map::iterator acq = mAcquiring.find(*it); - if (acq != mAcquiring.end()) - acq->second->peerHas(peer); - } - } + if (status != newcoin::tsHAVE) // Indirect requests are for future support + return true; + + std::vector< boost::weak_ptr >& set = mPeerData[hashSet]; + for (std::vector< boost::weak_ptr >::iterator iit = set.begin(), iend = set.end(); iit != iend; ++iit) + if (iit->lock() == peer) + return false; + + set.push_back(peer); + boost::unordered_map::iterator acq = mAcquiring.find(hashSet); + if (acq != mAcquiring.end()) + acq->second->peerHas(peer); return true; } diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index ea77f87aff..820d65a65c 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -111,7 +111,7 @@ protected: void addPosition(LedgerProposal&, bool ours); void removePosition(LedgerProposal&, bool ours); - void sendHaveTxSet(const std::vector& txSetHashes); + void sendHaveTxSet(const uint256& set, bool direct); void applyTransactions(SHAMap::pointer transactionSet, Ledger::pointer targetLedger, std::list& failedTransactions); @@ -148,7 +148,7 @@ public: bool peerPosition(LedgerProposal::pointer); - bool peerHasSet(Peer::pointer peer, const std::vector& sets); + bool peerHasSet(Peer::pointer peer, const uint256& set, newcoin::TxSetStatus status); bool peerGaveNodes(Peer::pointer peer, const uint256& setHash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); diff --git a/src/newcoin.proto b/src/newcoin.proto index 5e3a0c4bc7..2e0fa5b678 100644 --- a/src/newcoin.proto +++ b/src/newcoin.proto @@ -108,9 +108,15 @@ message TMProposeSet { repeated bytes removedTransactions = 6; // not required if number is large } -// Announce to a peer that we have fully acquired a transaction set +enum TxSetStatus { + tsHAVE = 1; // We have this set locally + tsCAN_GET = 2; // We have a peer with this set + tsNEED = 3; // We need this set and can't get it +} + message TMHaveTransactionSet { - repeated bytes hashes = 1; + required TxSetStatus status = 1; + required bytes hash = 2; } @@ -221,11 +227,12 @@ enum TMLedgerType { } message TMGetLedger { - optional TMLedgerType ltype = 1; - optional bytes ledgerHash = 2; // Can also be the transaction set hash if liTS_CANDIDATE - optional uint32 ledgerSeq = 3; - required TMLedgerInfoType itype = 4; + required TMLedgerInfoType itype = 1; + optional TMLedgerType ltype = 2; + optional bytes ledgerHash = 3; // Can also be the transaction set hash if liTS_CANDIDATE + optional uint32 ledgerSeq = 4; repeated bytes nodeIDs = 5; + optional uint32 requestCookie = 6; } message TMLedgerData { @@ -233,6 +240,7 @@ message TMLedgerData { required uint32 ledgerSeq = 2; required TMLedgerInfoType type = 3; repeated TMLedgerNode nodes = 4; + optional uint32 requestCookie = 5; } From 6c49630bf67b58287da37dd9ad75fdf4c54b13d9 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Thu, 7 Jun 2012 12:25:55 -0700 Subject: [PATCH 2/7] Tighten timing. --- src/LedgerTiming.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/LedgerTiming.h b/src/LedgerTiming.h index 9bde0a248f..0d3bb04056 100644 --- a/src/LedgerTiming.h +++ b/src/LedgerTiming.h @@ -7,16 +7,16 @@ #ifdef LEDGER_CLOSE_FAST // Time between one ledger close and the next ledger close -# define LEDGER_INTERVAL 45 +# define LEDGER_INTERVAL 30 // Time before we take a position -# define LEDGER_WOBBLE_TIME 2 +# define LEDGER_WOBBLE_TIME 1 // Time we expect avalanche to finish -# define LEDGER_CONVERGE 20 +# define LEDGER_CONVERGE 14 // Time we forcibly abort avalanche -# define LEDGER_FORCE_CONVERGE 25 +# define LEDGER_FORCE_CONVERGE 18 #endif From 2f7beb970fbd15964f06bbe2b3046c86452afd7b Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Thu, 7 Jun 2012 12:26:12 -0700 Subject: [PATCH 3/7] Updates to support the new HaveTXSet message. --- src/NetworkOPs.cpp | 4 ++-- src/NetworkOPs.h | 2 +- src/Peer.cpp | 19 +++++++++---------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index ef517b5d33..85508664fe 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -500,10 +500,10 @@ bool NetworkOPs::gotTXData(boost::shared_ptr peer, const uint256& hash, return mConsensus->peerGaveNodes(peer, hash, nodeIDs, nodeData); } -bool NetworkOPs::hasTXSet(boost::shared_ptr peer, const std::vector& sets) +bool NetworkOPs::hasTXSet(boost::shared_ptr peer, const uint256& set, newcoin::TxSetStatus status) { if (!mConsensus) return false; - return mConsensus->peerHasSet(peer, sets); + return mConsensus->peerHasSet(peer, set, status); } void NetworkOPs::mapComplete(const uint256& hash, SHAMap::pointer map) diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index f7fd4c121b..005ef02a97 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -116,7 +116,7 @@ public: bool gotTXData(boost::shared_ptr peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); SHAMap::pointer getTXMap(const uint256& hash); - bool hasTXSet(boost::shared_ptr peer, const std::vector& sets); + bool hasTXSet(boost::shared_ptr peer, const uint256& set, newcoin::TxSetStatus status); void mapComplete(const uint256& hash, SHAMap::pointer map); // network state machine diff --git a/src/Peer.cpp b/src/Peer.cpp index 87b6dc8176..e4ef4938b0 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -636,17 +636,16 @@ void Peer::recvPropose(newcoin::TMProposeSet& packet) void Peer::recvHaveTxSet(newcoin::TMHaveTransactionSet& packet) { - std::vector hashes; - for (int i = 0; i < packet.hashes_size(); ++i) + // FIXME: We should have some limit on the number of HaveTxSet messages a peer can send us + // per consensus pass, to keep a peer from running up our memory without limit + uint256 hashes; + if (packet.hash().size() != (256 / 8)) { - if (packet.hashes(i).size() == 32) - { - uint256 hash; - memcpy(hash.begin(), packet.hashes(i).data(), 32); - hashes.push_back(hash); - } + punishPeer(PP_INVALID_REQUEST); + return; } - if (hashes.empty() || !theApp->getOPs().hasTXSet(shared_from_this(), hashes)) + memcpy(hashes.begin(), packet.hash().data(), 32); + if (!theApp->getOPs().hasTXSet(shared_from_this(), hashes, packet.status())) punishPeer(PP_UNWANTED_DATA); } @@ -846,9 +845,9 @@ void Peer::recvGetLedger(newcoin::TMGetLedger& packet) node->set_nodedata(&rawNodeIterator->front(), rawNodeIterator->size()); ++count; } - Log(lsTRACE) << "GetNodeFat: sending " << count << " nodes"; } } + if (packet.has_requestcookie()) reply.set_requestcookie(packet.requestcookie()); PackedMessage::pointer oPacket = boost::make_shared(reply, newcoin::mtLEDGER_DATA); Log(lsTRACE) << "sending reply"; sendPacket(oPacket); From 04b95816525594ee3ba4fedb01b763fbc534cef2 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Thu, 7 Jun 2012 12:37:58 -0700 Subject: [PATCH 4/7] Add RPC commands data_delete, data_fetch, and data_store. --- src/DBInit.cpp | 6 ++++ src/RPCServer.cpp | 77 +++++++++++++++++++++++++++++++++++++++++++++-- src/RPCServer.h | 3 ++ src/Wallet.cpp | 51 ++++++++++++++++++++++++++++++- src/Wallet.h | 5 +++ src/main.cpp | 3 ++ 6 files changed, 142 insertions(+), 3 deletions(-) diff --git a/src/DBInit.cpp b/src/DBInit.cpp index ba9d954503..9f1f3a92a8 100644 --- a/src/DBInit.cpp +++ b/src/DBInit.cpp @@ -58,6 +58,12 @@ const char *WalletDBInit[] = { Dh1024 TEXT \ );", + // Local persistence of the RPC client + "CREATE TABLE RPCData ( \ + Key TEXT PRIMARY Key, \ + Value TEXT \ + );", + // Miscellaneous persistent information // Integer: 1 : Used to simplify SQL. // ScoreUpdated: when scores was last updated. diff --git a/src/RPCServer.cpp b/src/RPCServer.cpp index 624bb67198..8fb2d0e683 100644 --- a/src/RPCServer.cpp +++ b/src/RPCServer.cpp @@ -827,6 +827,76 @@ Json::Value RPCServer::doCreditSet(Json::Value& params) } } +// data_delete +Json::Value RPCServer::doDataDelete(Json::Value& params) +{ + if (params.size() != 1) + { + return JSONRPCError(400, "invalid params"); + } + + std::string strKey = params[0u].asString(); + + Json::Value ret = Json::Value(Json::objectValue); + + if (theApp->getWallet().dataDelete(strKey)) + { + ret["key"] = strKey; + } + else + { + ret = JSONRPCError(500, "internal error"); + } + + return ret; +} + +// data_fetch +Json::Value RPCServer::doDataFetch(Json::Value& params) +{ + if (params.size() != 1) + { + return JSONRPCError(400, "invalid params"); + } + + std::string strKey = params[0u].asString(); + std::string strValue; + + Json::Value ret = Json::Value(Json::objectValue); + + ret["key"] = strKey; + if (theApp->getWallet().dataFetch(strKey, strValue)) + ret["value"] = strValue; + + return ret; +} + +// data_store +Json::Value RPCServer::doDataStore(Json::Value& params) +{ + if (params.size() != 2) + { + return JSONRPCError(400, "invalid params"); + } + + std::string strKey = params[0u].asString(); + std::string strValue = params[1u].asString(); + + Json::Value ret = Json::Value(Json::objectValue); + + if (theApp->getWallet().dataStore(strKey, strValue)) + { + ret["key"] = strKey; + ret["value"] = strValue; + } + else + { + ret = JSONRPCError(500, "internal error"); + } + + return ret; +} + // nickname_info // Note: Nicknames are not automatically looked up by commands as they are advisory and can be changed. Json::Value RPCServer::doNicknameInfo(Json::Value& params) @@ -1336,7 +1406,7 @@ Json::Value RPCServer::doTx(Json::Value& params) return txn->getJson(true); } - return "not implemented"; + return JSONRPCError(501, "not implemented"); } // ledger @@ -1352,7 +1422,7 @@ Json::Value RPCServer::doLedger(Json::Value& params) return ret; } - return "not implemented"; + return JSONRPCError(501, "not implemented"); } // unl_add | [] @@ -2006,6 +2076,9 @@ Json::Value RPCServer::doCommand(const std::string& command, Json::Value& params if (command == "account_wallet_set") return doAccountWalletSet(params); if (command == "connect") return doConnect(params); if (command == "credit_set") return doCreditSet(params); + if (command == "data_delete") return doDataDelete(params); + if (command == "data_fetch") return doDataFetch(params); + if (command == "data_store") return doDataStore(params); if (command == "nickname_info") return doNicknameInfo(params); if (command == "nickname_set") return doNicknameSet(params); if (command == "password_fund") return doPasswordFund(params); diff --git a/src/RPCServer.h b/src/RPCServer.h index a972b05a6e..4af47f84d4 100644 --- a/src/RPCServer.h +++ b/src/RPCServer.h @@ -55,6 +55,9 @@ private: Json::Value doAccountWalletSet(Json::Value ¶ms); Json::Value doConnect(Json::Value& params); Json::Value doCreditSet(Json::Value& params); + Json::Value doDataDelete(Json::Value& params); + Json::Value doDataFetch(Json::Value& params); + Json::Value doDataStore(Json::Value& params); Json::Value doLedger(Json::Value& params); Json::Value doNicknameInfo(Json::Value& params); Json::Value doNicknameSet(Json::Value& params); diff --git a/src/Wallet.cpp b/src/Wallet.cpp index c636dd992e..f3c079fa71 100644 --- a/src/Wallet.cpp +++ b/src/Wallet.cpp @@ -43,7 +43,7 @@ bool Wallet::nodeIdentityLoad() ScopedLock sl(theApp->getWalletDB()->getDBLock()); bool bSuccess = false; - if(db->executeSQL("SELECT * FROM NodeIdentity;") && db->startIterRows()) + if (db->executeSQL("SELECT * FROM NodeIdentity;") && db->startIterRows()) { std::string strPublicKey, strPrivateKey; @@ -112,6 +112,55 @@ bool Wallet::nodeIdentityCreate() { return true; } +bool Wallet::dataDelete(const std::string& strKey) +{ + Database* db = theApp->getWalletDB()->getDB(); + + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + return db->executeSQL(str(boost::format("DELETE FROM RPCData WHERE Key=%s;") + % db->escape(strKey))); +} + +bool Wallet::dataFetch(const std::string& strKey, std::string& strValue) +{ + Database* db = theApp->getWalletDB()->getDB(); + + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + bool bSuccess = false; + + if (db->executeSQL(str(boost::format("SELECT Value FROM RPCData WHERE Key=%s;") + % db->escape(strKey))) && db->startIterRows()) + { + std::string strPublicKey, strPrivateKey; + + db->getStr("Value", strValue); + + db->endIterRows(); + + bSuccess = true; + } + + return bSuccess; +} + +bool Wallet::dataStore(const std::string& strKey, const std::string& strValue) +{ + Database* db = theApp->getWalletDB()->getDB(); + + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + bool bSuccess = false; + + return (db->executeSQL(str(boost::format("REPLACE INTO RPCData (Key, Value) VALUES (%s,%s);") + % db->escape(strKey) + % db->escape(strValue) + ))); + + return bSuccess; +} + bool Wallet::unitTest() { // Create 100 keys for each of 1,000 families and ensure all keys match diff --git a/src/Wallet.h b/src/Wallet.h index 982f80a91c..09ffa211d6 100644 --- a/src/Wallet.h +++ b/src/Wallet.h @@ -50,6 +50,11 @@ public: DH* getDh512() { return DHparams_dup(mDh512); } DH* getDh1024() { return DHparams_dup(mDh1024); } + // Local persistence of RPC clients + bool dataDelete(const std::string& strKey); + bool dataFetch(const std::string& strKey, std::string& strValue); + bool dataStore(const std::string& strKey, const std::string& strValue); + static bool unitTest(); }; diff --git a/src/main.cpp b/src/main.cpp index caa32cb01b..ce16372ced 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -45,6 +45,9 @@ void printHelp(const po::options_description& desc) cout << " account_wallet_set []" << endl; cout << " connect []" << endl; cout << " credit_set []" << endl; + cout << " data_delete " << endl; + cout << " data_fetch " << endl; + cout << " data_store " << endl; cout << " ledger" << endl; cout << " nickname_info " << endl; cout << " nickname_set [] []" << endl; From 24c7689808993894433b31c82e9ad03096a256dc Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Thu, 7 Jun 2012 13:06:26 -0700 Subject: [PATCH 5/7] Add binary data support for RPC data_fetch. --- src/Wallet.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Wallet.cpp b/src/Wallet.cpp index f3c079fa71..29454a18ae 100644 --- a/src/Wallet.cpp +++ b/src/Wallet.cpp @@ -135,7 +135,8 @@ bool Wallet::dataFetch(const std::string& strKey, std::string& strValue) { std::string strPublicKey, strPrivateKey; - db->getStr("Value", strValue); + std::vector vucData = db->getBinary("Value"); + strValue.assign(vucData.begin(), vucData.end()); db->endIterRows(); From c6b290c474180e0beec1a1788c0d2a323f00589f Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Thu, 7 Jun 2012 13:14:39 -0700 Subject: [PATCH 6/7] Sql binary support for vector. --- database/SqliteDatabase.cpp | 16 ++++++++++++++-- database/SqliteDatabase.h | 1 + database/database.cpp | 12 ++++++++++++ database/database.h | 2 ++ 4 files changed, 29 insertions(+), 2 deletions(-) diff --git a/database/SqliteDatabase.cpp b/database/SqliteDatabase.cpp index d3625e7384..d29374acee 100644 --- a/database/SqliteDatabase.cpp +++ b/database/SqliteDatabase.cpp @@ -3,6 +3,7 @@ #include #include #include + using namespace std; SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"","") @@ -13,8 +14,7 @@ SqliteDatabase::SqliteDatabase(const char* host) : Database(host,"","") void SqliteDatabase::connect() { - -; int rc = sqlite3_open(mHost.c_str(), &mConnection); + int rc = sqlite3_open(mHost.c_str(), &mConnection); if( rc ) { cout << "Can't open database: " << mHost << " " << rc << endl; @@ -155,6 +155,18 @@ int SqliteDatabase::getBinary(int colIndex,unsigned char* buf,int maxSize) return(size); } +std::vector SqliteDatabase::getBinary(int colIndex) +{ + const unsigned char* blob = reinterpret_cast(sqlite3_column_blob(mCurrentStmt, colIndex)); + size_t iSize = sqlite3_column_bytes(mCurrentStmt, colIndex); + std::vector vucResult; + + vucResult.resize(iSize); + std::copy(blob, blob+iSize, vucResult.begin()); + + return vucResult; +} + uint64 SqliteDatabase::getBigInt(int colIndex) { return(sqlite3_column_int64(mCurrentStmt, colIndex)); diff --git a/database/SqliteDatabase.h b/database/SqliteDatabase.h index f615aed578..7cd94f45e1 100644 --- a/database/SqliteDatabase.h +++ b/database/SqliteDatabase.h @@ -36,6 +36,7 @@ public: bool getBool(int colIndex); // returns amount stored in buf int getBinary(int colIndex,unsigned char* buf,int maxSize); + std::vector getBinary(int colIndex); uint64 getBigInt(int colIndex); void escape(const unsigned char* start,int size,std::string& retStr); diff --git a/database/database.cpp b/database/database.cpp index 66c2eb080f..f0148e5949 100644 --- a/database/database.cpp +++ b/database/database.cpp @@ -74,6 +74,18 @@ int Database::getBinary(const char* colName,unsigned char* buf,int maxSize) return(0); } +std::vector Database::getBinary(const char* colName) +{ + int index; + + if (getColNumber(colName,&index)) + { + return getBinary(index); + } + + return std::vector(); +} + uint64 Database::getBigInt(const char* colName) { int index; diff --git a/database/database.h b/database/database.h index 7e94693fbe..4babf93fc6 100644 --- a/database/database.h +++ b/database/database.h @@ -59,6 +59,7 @@ public: bool getBool(const char* colName); // returns amount stored in buf int getBinary(const char* colName,unsigned char* buf,int maxSize); + std::vector getBinary(const char* colName); uint64 getBigInt(const char* colName); virtual bool getNull(int colIndex)=0; @@ -68,6 +69,7 @@ public: virtual bool getBool(int colIndex)=0; virtual int getBinary(int colIndex,unsigned char* buf,int maxSize)=0; virtual uint64 getBigInt(int colIndex)=0; + virtual std::vector getBinary(int colIndex)=0; // int getSingleDBValueInt(const char* sql); // float getSingleDBValueFloat(const char* sql); From 59dd45544d60bfc55bcb9e4fcbea4b61f1835e04 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Thu, 7 Jun 2012 13:24:54 -0700 Subject: [PATCH 7/7] Catcg errors parsing command line options. --- src/main.cpp | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index ce16372ced..7d2e6df332 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -112,12 +112,18 @@ int main(int argc, char* argv[]) else { // Parse options, if no error. - po::store(po::command_line_parser(argc, argv) - .options(desc) // Parse options. - .positional(p) // Remainder as --parameters. - .run(), - vm); - po::notify(vm); // Invoke option notify functions. + try { + po::store(po::command_line_parser(argc, argv) + .options(desc) // Parse options. + .positional(p) // Remainder as --parameters. + .run(), + vm); + po::notify(vm); // Invoke option notify functions. + } + catch (...) + { + iResult = 1; + } } if (iResult)