Merge branch 'master' of github.com:jedmccaleb/NewCoin

This commit is contained in:
Arthur Britto
2012-11-01 15:14:52 -07:00
31 changed files with 213 additions and 104 deletions

View File

@@ -18,7 +18,7 @@
#include <boost/thread.hpp>
SETUP_LOG();
LogPartition TaggedCachePartition("TaggedCache");
Application* theApp = NULL;
DatabaseCon::DatabaseCon(const std::string& strName, const char *initStrings[], int initCount)
@@ -39,7 +39,7 @@ DatabaseCon::~DatabaseCon()
Application::Application() :
mIOWork(mIOService), mAuxWork(mAuxService), mUNL(mIOService),
mNetOps(mIOService, &mMasterLedger), mTempNodeCache(16384, 90), mHashedObjectStore(16384, 300),
mNetOps(mIOService, &mMasterLedger), mTempNodeCache("NodeCache", 16384, 90), mHashedObjectStore(16384, 300),
mSNTPClient(mAuxService), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL),
mHashNodeDB(NULL), mNetNodeDB(NULL),
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mSweepTimer(mAuxService)

View File

@@ -95,7 +95,11 @@ public:
HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; }
ValidationCollection& getValidations() { return mValidations; }
JobQueue& getJobQueue() { return mJobQueue; }
SuppressionTable& getSuppression() { return mSuppressions; }
bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); }
bool isNew(const uint256& s, uint64 p) { return mSuppressions.addSuppressionPeer(s, p); }
bool isNewFlag(const uint256& s, int f) { return mSuppressions.setFlag(s, f); }
bool running() { return mTxnDB != NULL; }
bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); }

View File

@@ -36,7 +36,7 @@ std::string EncodeBase64(const std::string& s)
BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
bmem = BIO_new(BIO_s_mem());
b64 = BIO_push(b64, bmem);
BIO_write(b64, s.c_str(), s.size());
BIO_write(b64, s.data(), s.size());
(void) BIO_flush(b64);
BIO_get_mem_ptr(b64, &bptr);

View File

@@ -251,22 +251,30 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m
return sentTo;
}
int ConnectionPool::relayMessage(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg)
{
int sentTo = 0;
void ConnectionPool::relayMessageBut(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg)
{ // Relay message to all but the specified peers
boost::mutex::scoped_lock sl(mPeerLock);
BOOST_FOREACH(naPeer pair, mConnectedMap)
{
Peer::ref peer = pair.second;
if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0))
{
++sentTo;
peer->sendPacket(msg);
}
}
return sentTo;
}
void ConnectionPool::relayMessageTo(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg)
{ // Relay message to the specified peers
boost::mutex::scoped_lock sl(mPeerLock);
BOOST_FOREACH(naPeer pair, mConnectedMap)
{
Peer::ref peer = pair.second;
if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) > 0))
peer->sendPacket(msg);
}
}
// Schedule a connection via scanning.
@@ -305,7 +313,7 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort)
if ((it = mIpMap.find(pipPeer)) == mIpMap.end())
{
Peer::pointer ppNew(Peer::create(theApp->getIOService(), mCtx));
Peer::pointer ppNew(Peer::create(theApp->getIOService(), mCtx, ++mLastPeer));
// Did not find it. Not already connecting or connected.
ppNew->connect(strIp, iPort);

View File

@@ -62,7 +62,8 @@ public:
// Send message to network.
int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg);
int relayMessage(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
void relayMessageTo(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
void relayMessageBut(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
// Manual connection request.
// Queue for immediate scanning.

View File

@@ -19,7 +19,7 @@ uint128 CKey::PassPhraseToKey(const std::string& passPhrase)
{
Serializer s;
s.addRaw(passPhrase.c_str(), passPhrase.size());
s.addRaw(passPhrase);
uint256 hash256 = s.getSHA512Half();
uint128 ret(hash256);

View File

@@ -12,7 +12,7 @@ SETUP_LOG();
DECLARE_INSTANCE(HashedObject);
HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) :
mCache(cacheSize, cacheAge), mWritePending(false)
mCache("HashedObjectStore", cacheSize, cacheAge), mWritePending(false)
{
mWriteSet.reserve(128);
}

View File

@@ -1073,7 +1073,7 @@ int Ledger::getPendingSaves()
void Ledger::pendSave(bool fromConsensus)
{
if (!fromConsensus && !theApp->isNew(getHash()))
if (!fromConsensus && !theApp->isNewFlag(getHash(), SF_SAVED))
return;
boost::thread thread(boost::bind(&Ledger::saveAcceptedLedger, shared_from_this(), fromConsensus));

View File

@@ -11,6 +11,7 @@
#include "HashPrefixes.h"
SETUP_LOG();
DECLARE_INSTANCE(PeerSet);
#define LA_DEBUG
#define LEDGER_ACQUIRE_TIMEOUT 750

View File

@@ -14,9 +14,12 @@
#include "Ledger.h"
#include "Peer.h"
#include "TaggedCache.h"
#include "InstanceCounter.h"
#include "../obj/src/ripple.pb.h"
class PeerSet
DEFINE_INSTANCE(PeerSet);
class PeerSet : private IS_INSTANCE(PeerSet)
{
protected:
uint256 mHash;

View File

@@ -24,6 +24,7 @@ typedef std::pair<const uint160, LedgerProposal::pointer> u160_prop_pair;
typedef std::pair<const uint256, LCTransaction::pointer> u256_lct_pair;
SETUP_LOG();
DECLARE_INSTANCE(LedgerConsensus);
TransactionAcquire::TransactionAcquire(const uint256& hash) : PeerSet(hash, TX_ACQUIRE_TIMEOUT), mHaveRoot(false)
{
@@ -845,7 +846,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec
txn->setVote(pit.first, cit->second->hasItem(txID));
}
if (!ourVote && theApp->isNew(txID))
if (!ourVote && theApp->isNewFlag(txID, SF_RELAYED))
{
ripple::TMTransaction msg;
msg.set_rawtransaction(&(tx.front()), tx.size());

View File

@@ -17,6 +17,9 @@
#include "Peer.h"
#include "CanonicalTXSet.h"
#include "TransactionEngine.h"
#include "InstanceCounter.h"
DEFINE_INSTANCE(LedgerConsensus);
class TransactionAcquire : public PeerSet, public boost::enable_shared_from_this<TransactionAcquire>
{ // A transaction set we are trying to acquire
@@ -78,7 +81,7 @@ enum LCState
lcsACCEPTED, // We have accepted/validated a new last closed ledger
};
class LedgerConsensus : public boost::enable_shared_from_this<LedgerConsensus>
class LedgerConsensus : public boost::enable_shared_from_this<LedgerConsensus>, IS_INSTANCE(LedgerConsensus)
{
protected:
LCState mState;

View File

@@ -7,6 +7,8 @@
#include "Log.h"
SETUP_LOG();
DECLARE_INSTANCE(LedgerEntrySetEntry);
DECLARE_INSTANCE(LedgerEntrySet)
// #define META_DEBUG

View File

@@ -7,6 +7,10 @@
#include "TransactionMeta.h"
#include "Ledger.h"
#include "TransactionErr.h"
#include "InstanceCounter.h"
DEFINE_INSTANCE(LedgerEntrySetEntry);
DEFINE_INSTANCE(LedgerEntrySet);
enum LedgerEntryAction
{
@@ -17,7 +21,7 @@ enum LedgerEntryAction
taaCREATE, // Newly created.
};
class LedgerEntrySetEntry
class LedgerEntrySetEntry : private IS_INSTANCE(LedgerEntrySetEntry)
{
public:
SLE::pointer mEntry;
@@ -28,7 +32,7 @@ public:
};
class LedgerEntrySet
class LedgerEntrySet : private IS_INSTANCE(LedgerEntrySet)
{
protected:
Ledger::pointer mLedger;

View File

@@ -10,16 +10,16 @@
#include "Application.h"
#ifndef CACHED_LEDGER_NUM
#define CACHED_LEDGER_NUM 512
#define CACHED_LEDGER_NUM 128
#endif
#ifndef CACHED_LEDGER_AGE
#define CACHED_LEDGER_AGE 900
#endif
// FIXME: Need to clean up ledgers by index, probably should switch to just mapping sequence to hash
// FIXME: Need to clean up ledgers by index at some point
LedgerHistory::LedgerHistory() : mLedgersByHash(CACHED_LEDGER_NUM, CACHED_LEDGER_AGE)
LedgerHistory::LedgerHistory() : mLedgersByHash("LedgerCache", CACHED_LEDGER_NUM, CACHED_LEDGER_AGE)
{ ; }
void LedgerHistory::addLedger(Ledger::pointer ledger)
@@ -36,7 +36,7 @@ void LedgerHistory::addAcceptedLedger(Ledger::pointer ledger, bool fromConsensus
assert(ledger);
assert(ledger->isAccepted());
assert(ledger->isImmutable());
mLedgersByIndex.insert(std::make_pair(ledger->getLedgerSeq(), ledger));
mLedgersByIndex[ledger->getLedgerSeq()] = ledger->getHash();
ledger->pendSave(fromConsensus);
}
@@ -44,9 +44,13 @@ void LedgerHistory::addAcceptedLedger(Ledger::pointer ledger, bool fromConsensus
Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index)
{
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
std::map<uint32, Ledger::pointer>::iterator it(mLedgersByIndex.find(index));
std::map<uint32, uint256>::iterator it(mLedgersByIndex.find(index));
if (it != mLedgersByIndex.end())
return it->second;
{
uint256 hash = it->second;
sl.unlock();
return getLedgerByHash(hash);
}
sl.unlock();
Ledger::pointer ret(Ledger::loadByIndex(index));
@@ -56,8 +60,8 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index)
sl.lock();
mLedgersByHash.canonicalize(ret->getHash(), ret);
mLedgersByIndex.insert(std::make_pair(index, ret));
return ret;
mLedgersByIndex[ret->getLedgerSeq()] = ret->getHash();
return (ret->getLedgerSeq() == index) ? ret : Ledger::pointer();
}
Ledger::pointer LedgerHistory::getLedgerByHash(const uint256& hash)
@@ -91,7 +95,7 @@ Ledger::pointer LedgerHistory::canonicalizeLedger(Ledger::pointer ledger, bool s
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
mLedgersByHash.canonicalize(h, ledger);
if (ledger->isAccepted())
mLedgersByIndex[ledger->getLedgerSeq()] = ledger;
mLedgersByIndex[ledger->getLedgerSeq()] = ledger->getHash();
return ledger;
}

View File

@@ -7,7 +7,7 @@
class LedgerHistory
{
TaggedCache<uint256, Ledger> mLedgersByHash;
std::map<uint32, Ledger::pointer> mLedgersByIndex; // accepted ledgers
std::map<uint32, uint256> mLedgersByIndex; // accepted ledgers
public:
LedgerHistory();

View File

@@ -112,7 +112,7 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t
return tpTransNew;
}
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, Peer* source)
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
{
Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true);
if (dbtx) return dbtx;
@@ -160,27 +160,28 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
trans->setStatus(INCLUDED);
theApp->getMasterTransaction().canonicalize(trans, true);
// FIXME: Need code to get all accounts affected by a transaction and re-synch
// any of them that affect local accounts cached in memory. Or, we need to
// no cache the account balance information and always get it from the current ledger
// theApp->getWallet().applyTransaction(trans);
std::set<uint64> peers;
if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
ripple::TMTransaction tx;
Serializer s;
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
ripple::TMTransaction tx;
Serializer s;
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
int sentTo = theApp->getConnectionPool().relayMessage(source, packet);
cLog(lsINFO) << "Transaction relayed to " << sentTo << " node(s)";
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessageBut(peers, packet);
}
return trans;
}
cLog(lsDEBUG) << "Status other than success " << r;
if ((mMode != omFULL) && (mMode != omTRACKING) && (theApp->isNew(trans->getID())))
std::set<uint64> peers;
if ((mMode != omFULL) && (mMode != omTRACKING) &&
theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
ripple::TMTransaction tx;
Serializer s;
@@ -189,7 +190,7 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessage(source, packet);
theApp->getConnectionPool().relayMessageTo(peers, packet);
}
trans->setStatus(INVALID);
@@ -694,50 +695,38 @@ bool NetworkOPs::haveConsensusObject()
}
// <-- bool: true to relay
bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger,
uint32 closeTime, const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic)
bool NetworkOPs::recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash,
const uint256& prevLedger, uint32 closeTime, const std::string& signature,
const RippleAddress& nodePublic)
{
// JED: does mConsensus need to be locked?
// XXX Validate key.
// XXX Take a vuc for pubkey.
// Get a preliminary hash to use to suppress duplicates
Serializer s(256);
s.add256(proposeHash);
s.add256(prevLedger);
s.add32(proposeSeq);
s.add32(closeTime);
s.addRaw(pubKey);
s.addRaw(signature);
if (!theApp->isNew(s.getSHA512Half()))
return false;
RippleAddress naPeerPublic = RippleAddress::createNodePublic(strCopy(pubKey));
if (!haveConsensusObject())
{
cLog(lsINFO) << "Received proposal outside consensus window";
return mMode != omFULL;
}
if (mConsensus->isOurPubKey(naPeerPublic))
if (mConsensus->isOurPubKey(nodePublic))
{
cLog(lsTRACE) << "Received our own validation";
return false;
}
// Is this node on our UNL?
if (!theApp->getUNL().nodeInUNL(naPeerPublic))
if (!theApp->getUNL().nodeInUNL(nodePublic))
{
cLog(lsINFO) << "Untrusted proposal: " << naPeerPublic.humanNodePublic() << " " << proposeHash;
cLog(lsINFO) << "Untrusted proposal: " << nodePublic.humanNodePublic() << " " << proposeHash;
return true;
}
if (prevLedger.isNonZero())
{ // proposal includes a previous ledger
LedgerProposal::pointer proposal =
boost::make_shared<LedgerProposal>(prevLedger, proposeSeq, proposeHash, closeTime, naPeerPublic);
boost::make_shared<LedgerProposal>(prevLedger, proposeSeq, proposeHash, closeTime, nodePublic);
if (!proposal->checkSign(signature))
{
cLog(lsWARNING) << "New-style ledger proposal fails signature check";
@@ -750,7 +739,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons
}
LedgerProposal::pointer proposal =
boost::make_shared<LedgerProposal>(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, naPeerPublic);
boost::make_shared<LedgerProposal>(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, nodePublic);
if (!proposal->checkSign(signature))
{ // Note that if the LCL is different, the signature check will fail
cLog(lsWARNING) << "Ledger proposal fails signature check";

View File

@@ -121,7 +121,7 @@ public:
//
Transaction::pointer submitTransaction(const Transaction::pointer& tpTrans);
Transaction::pointer processTransaction(Transaction::pointer transaction, Peer* source = NULL);
Transaction::pointer processTransaction(Transaction::pointer transaction);
Transaction::pointer findTransactionByID(const uint256& transactionID);
int findTransactionsBySource(const uint256& uLedger, std::list<Transaction::pointer>&, const RippleAddress& sourceAccount,
uint32 minSeq, uint32 maxSeq);
@@ -168,8 +168,8 @@ public:
const std::vector<unsigned char>& myNode, std::list< std::vector<unsigned char> >& newNodes);
// ledger proposal/close functions
bool recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, uint32 closeTime,
const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic);
bool recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash,
const uint256& prevLedger, uint32 closeTime, const std::string& signature, const RippleAddress& nodePublic);
bool gotTXData(const boost::shared_ptr<Peer>& peer, const uint256& hash,
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData);
bool recvValidation(const SerializedValidation::pointer& val);

View File

@@ -24,14 +24,14 @@ DECLARE_INSTANCE(Peer);
// Node has this long to verify its identity from connection accepted or connection attempt.
#define NODE_VERIFY_SECONDS 15
Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) :
Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerID) :
mHelloed(false),
mDetaching(false),
mPeerId(peerID),
mSocketSsl(io_service, ctx),
mVerifyTimer(io_service)
{
cLog(lsDEBUG) << "CREATING PEER: " << ADDRESS(this);
mPeerId = theApp->getConnectionPool().assignPeerId();
}
void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
@@ -708,8 +708,12 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
SerializerIterator sit(s);
SerializedTransaction::pointer stx = boost::make_shared<SerializedTransaction>(boost::ref(sit));
if (!theApp->isNew(stx->getTransactionID(), mPeerId))
return;
tx = boost::make_shared<Transaction>(stx, true);
if (tx->getStatus() == INVALID) throw(0);
if (tx->getStatus() == INVALID)
throw(0);
#ifndef TRUST_NETWORK
}
catch (...)
@@ -723,7 +727,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
}
#endif
tx = theApp->getOPs().processTransaction(tx, this);
tx = theApp->getOPs().processTransaction(tx);
if(tx->getStatus() != INCLUDED)
{ // transaction wasn't accepted into ledger
@@ -736,7 +740,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
void Peer::recvPropose(ripple::TMProposeSet& packet)
{
if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) ||
(packet.signature().size() < 56))
(packet.signature().size() < 56) || (packet.nodepubkey().size() > 128) || (packet.signature().size() > 128))
{
cLog(lsWARNING) << "Received proposal is malformed";
return;
@@ -748,8 +752,23 @@ void Peer::recvPropose(ripple::TMProposeSet& packet)
if ((packet.has_previousledger()) && (packet.previousledger().size() == 32))
memcpy(prevLedger.begin(), packet.previousledger().data(), 32);
if(theApp->getOPs().recvPropose(packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(),
packet.nodepubkey(), packet.signature(), mNodePublic))
Serializer s(512);
s.add256(currentTxHash);
s.add256(prevLedger);
s.add32(packet.proposeseq());
s.add32(packet.closetime());
s.addVL(packet.nodepubkey());
s.addVL(packet.signature());
uint256 suppression = s.getSHA512Half();
if (!theApp->isNew(suppression, mPeerId))
return;
RippleAddress nodePublic = RippleAddress::createNodePublic(strCopy(packet.nodepubkey()));
// bool isTrusted = theApp->getUNL().nodeInUNL(nodePublic);
if(theApp->getOPs().recvPropose(suppression, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(),
packet.signature(), nodePublic))
{ // FIXME: Not all nodes will want proposals
PackedMessage::pointer message = boost::make_shared<PackedMessage>(packet, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessage(this, message);
@@ -788,11 +807,11 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig
return;
}
if (theApp->getOPs().recvValidation(val))
std::set<uint64> peers;
if (theApp->getOPs().recvValidation(val) && theApp->getSuppression().swapSet(signingHash, peers, SF_RELAYED))
{
Peer::pointer pp = peer.lock();
PackedMessage::pointer message = boost::make_shared<PackedMessage>(*packet, ripple::mtVALIDATION);
theApp->getConnectionPool().relayMessage(pp ? pp.get() : NULL, message);
theApp->getConnectionPool().relayMessageBut(peers, message);
}
}
#ifndef TRUST_NETWORK
@@ -822,7 +841,7 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
SerializedValidation::pointer val = boost::make_shared<SerializedValidation>(boost::ref(sit), false);
uint256 signingHash = val->getSigningHash();
if (!theApp->isNew(signingHash))
if (!theApp->isNew(signingHash, mPeerId))
{
cLog(lsTRACE) << "Validation is duplicate";
return;
@@ -831,7 +850,7 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
bool isTrusted = theApp->getUNL().nodeInUNL(val->getSignerPublic());
theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
boost::bind(&checkValidation, _1, val, signingHash, isTrusted, packet,
boost::weak_ptr<Peer>(shared_from_this())));
boost::weak_ptr<Peer>(shared_from_this())));
}
#ifndef TRUST_NETWORK
catch (...)

View File

@@ -5,6 +5,7 @@
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include "../obj/src/ripple.pb.h"
@@ -72,7 +73,7 @@ protected:
ripple::TMStatusChange mLastStatus;
ripple::TMHello mHello;
Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx);
Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerId);
void handleShutdown(const boost::system::error_code& error) { ; }
static void sHandleShutdown(Peer::ref ptr, const boost::system::error_code& error)
@@ -132,9 +133,9 @@ public:
void setIpPort(const std::string& strIP, int iPort);
static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx)
static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id)
{
return pointer(new Peer(io_service, ctx));
return pointer(new Peer(io_service, ctx, id));
}
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket()

View File

@@ -39,7 +39,8 @@ PeerDoor::PeerDoor(boost::asio::io_service& io_service) :
void PeerDoor::startListening()
{
Peer::pointer new_connection = Peer::create(mAcceptor.get_io_service(), mCtx);
Peer::pointer new_connection = Peer::create(mAcceptor.get_io_service(), mCtx,
theApp->getConnectionPool().assignPeerId());
mAcceptor.async_accept(new_connection->getSocket(),
boost::bind(&PeerDoor::handleConnect, this, new_connection,

View File

@@ -411,7 +411,7 @@ Json::Value RPCHandler::doAccountEmailSet(const Json::Value &params)
boost::to_lower(strEmail);
std::vector<unsigned char> vucMD5(128/8, 0);
MD5(reinterpret_cast<const unsigned char*>(strEmail.c_str()), strEmail.size(), &vucMD5.front());
MD5(reinterpret_cast<const unsigned char*>(strEmail.data()), strEmail.size(), &vucMD5.front());
uint128 uEmailHash(vucMD5);
std::vector<unsigned char> vucDomain;
@@ -2607,7 +2607,7 @@ Json::Value RPCHandler::doLogin(const Json::Value& params)
Json::Value RPCHandler::doGetCounts(const Json::Value& params)
{
int minCount = 1;
int minCount = 10;
if (params.size() > 0)
minCount = params[0u].asInt();

View File

@@ -305,7 +305,7 @@ uint256 Serializer::getSHA512Half(const unsigned char *data, int len)
uint256 Serializer::getSHA512Half(const std::string& strData)
{
return getSHA512Half(reinterpret_cast<const unsigned char*>(strData.c_str()), strData.size());
return getSHA512Half(reinterpret_cast<const unsigned char*>(strData.data()), strData.size());
}
uint256 Serializer::getPrefixHash(uint32 prefix, const unsigned char *data, int len)
@@ -367,7 +367,16 @@ int Serializer::addVL(const std::vector<unsigned char>& vector)
int Serializer::addVL(const void *ptr, int len)
{
int ret = addRaw(encodeVL(len));
if (len) addRaw(ptr, len);
if (len)
addRaw(ptr, len);
return ret;
}
int Serializer::addVL(const std::string& string)
{
int ret = addRaw(string.size());
if (!string.empty())
addRaw(string.data(), string.size());
return ret;
}

View File

@@ -44,6 +44,7 @@ public:
int addZeros(size_t uBytes);
int addVL(const std::vector<unsigned char> &vector);
int addVL(const std::string& string);
int addVL(const void *ptr, int len);
int addTaggedList(const std::list<TaggedListItem>&);
int addTaggedList(const std::vector<TaggedListItem>&);
@@ -87,7 +88,7 @@ public:
static uint256 getPrefixHash(uint32 prefix, const std::vector<unsigned char>& data)
{ return getPrefixHash(prefix, &(data.front()), data.size()); }
static uint256 getPrefixHash(uint32 prefix, const std::string& strData)
{ return getPrefixHash(prefix, reinterpret_cast<const unsigned char *>(strData.c_str()), strData.size()); }
{ return getPrefixHash(prefix, reinterpret_cast<const unsigned char *>(strData.data()), strData.size()); }
// totality functions
const std::vector<unsigned char>& peekData() const { return mData; }

View File

@@ -66,3 +66,35 @@ bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag)
findCreateEntry(index, created).setFlag(flag);
return created;
}
bool SuppressionTable::setFlag(const uint256& index, int flag)
{ // return: true = changed, false = unchanged
assert(flag != 0);
boost::mutex::scoped_lock sl(mSuppressionMutex);
bool created;
Suppression &s = findCreateEntry(index, created);
if ((s.getFlags() & flag) == flag)
return false;
s.setFlag(flag);
return true;
}
bool SuppressionTable::swapSet(const uint256& index, std::set<uint64>& peers, int flag)
{
boost::mutex::scoped_lock sl(mSuppressionMutex);
bool created;
Suppression &s = findCreateEntry(index, created);
if ((s.getFlags() & flag) == flag)
return false;
s.swapSet(peers);
s.setFlag(flag);
return true;
}

View File

@@ -14,6 +14,11 @@
DEFINE_INSTANCE(Suppression);
#define SF_RELAYED 0x01
#define SF_SIGBAD 0x02
#define SF_SIGGOOD 0x04
#define SF_SAVED 0x08
class Suppression : private IS_INSTANCE(Suppression)
{
protected:
@@ -27,9 +32,11 @@ public:
void addPeer(uint64 peer) { mPeers.insert(peer); }
bool hasPeer(uint64 peer) { return mPeers.count(peer) > 0; }
int getFlags(void) { return mFlags; }
bool hasFlag(int f) { return (mFlags & f) != 0; }
void setFlag(int f) { mFlags |= f; }
void clearFlag(int f) { mFlags &= ~f; }
void swapSet(std::set<uint64>& s) { mPeers.swap(s); }
};
class SuppressionTable
@@ -55,8 +62,11 @@ public:
bool addSuppressionPeer(const uint256& index, uint64 peer);
bool addSuppressionFlags(const uint256& index, int flag);
bool setFlag(const uint256& index, int flag);
Suppression getEntry(const uint256&);
bool swapSet(const uint256& index, std::set<uint64>& peers, int flag);
};
#endif

View File

@@ -1,12 +1,17 @@
#ifndef __TAGGEDCACHE__
#define __TAGGEDCACHE__
#include <string>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/unordered_map.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/ref.hpp>
#include <boost/make_shared.hpp>
#include "Log.h"
extern LogPartition TaggedCachePartition;
// This class implemented a cache and a map. The cache keeps objects alive
// in the map. The map allows multiple code paths that reference objects
// with the same tag to get the same actual object.
@@ -30,6 +35,7 @@ public:
protected:
mutable boost::recursive_mutex mLock;
std::string mName;
int mTargetSize, mTargetAge;
boost::unordered_map<key_type, cache_entry> mCache; // Hold strong reference to recent objects
@@ -38,7 +44,8 @@ protected:
boost::unordered_map<key_type, weak_data_ptr> mMap; // Track stored objects
public:
TaggedCache(int size, int age) : mTargetSize(size), mTargetAge(age), mLastSweep(time(NULL)) { ; }
TaggedCache(const char *name, int size, int age)
: mName(name), mTargetSize(size), mTargetAge(age), mLastSweep(time(NULL)) { ; }
int getTargetSize() const;
int getTargetAge() const;
@@ -89,35 +96,40 @@ template<typename c_Key, typename c_Data> void TaggedCache<c_Key, c_Data>::sweep
{
boost::recursive_mutex::scoped_lock sl(mLock);
if (mCache.size() < mTargetSize)
return;
time_t now = time(NULL);
if ((mLastSweep + 10) < now)
return;
mLastSweep = now;
time_t target = now - mTargetAge;
mLastSweep = time(NULL);
time_t target = mLastSweep - mTargetAge;
// Pass 1, remove old objects from cache
int cacheRemovals = 0;
typename boost::unordered_map<key_type, cache_entry>::iterator cit = mCache.begin();
while (cit != mCache.end())
{
if (cit->second.first < target)
{
++cacheRemovals;
mCache.erase(cit++);
}
else
++cit;
}
// Pass 2, remove dead objects from map
int mapRemovals = 0;
typename boost::unordered_map<key_type, weak_data_ptr>::iterator mit = mMap.begin();
while (mit != mMap.end())
{
if (mit->second.expired())
{
++mapRemovals;
mMap.erase(mit++);
}
else
++mit;
}
if (TaggedCachePartition.doLog(lsTRACE) && (mapRemovals || cacheRemovals))
Log(lsTRACE, TaggedCachePartition) << mName << ": cache = " << mCache.size() << "-" << cacheRemovals <<
", map = " << mMap.size() << "-" << mapRemovals;
}
template<typename c_Key, typename c_Data> bool TaggedCache<c_Key, c_Data>::touch(const key_type& key)

View File

@@ -15,6 +15,7 @@
#include "utils.h"
SETUP_LOG();
DECLARE_INSTANCE(TransactionEngine);
void TransactionEngine::txnWrite()
{

View File

@@ -9,6 +9,9 @@
#include "SerializedLedger.h"
#include "LedgerEntrySet.h"
#include "TransactionErr.h"
#include "InstanceCounter.h"
DEFINE_INSTANCE(TransactionEngine);
// A TransactionEngine applies serialized transactions to a ledger
// It can also, verify signatures, verify fees, and give rejection reasons
@@ -29,7 +32,7 @@ enum TransactionEngineParams
// One instance per ledger.
// Only one transaction applied at a time.
class TransactionEngine
class TransactionEngine : private IS_INSTANCE(TransactionEngine)
{
private:
LedgerEntrySet mNodes;

View File

@@ -13,7 +13,7 @@
#define CACHED_TRANSACTION_AGE 1800
#endif
TransactionMaster::TransactionMaster() : mCache(CACHED_TRANSACTION_NUM, CACHED_TRANSACTION_AGE)
TransactionMaster::TransactionMaster() : mCache("TransactionCache", CACHED_TRANSACTION_NUM, CACHED_TRANSACTION_AGE)
{
;
}

View File

@@ -189,7 +189,7 @@ std::string DecodeBase64(std::string s)
b64 = BIO_new(BIO_f_base64());
BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
bmem = BIO_new_mem_buf(const_cast<char*>(s.c_str()), s.size());
bmem = BIO_new_mem_buf(const_cast<char*>(s.data()), s.size());
bmem = BIO_push(b64, bmem);
BIO_read(bmem, buffer, s.size());
BIO_free_all(bmem);