Merge branch 'master' of github.com:jedmccaleb/NewCoin into api

This commit is contained in:
jed
2012-11-05 06:50:23 -08:00
21 changed files with 451 additions and 141 deletions

57
js/events.js Normal file
View File

@@ -0,0 +1,57 @@
var EventEmitter = function () {
this._events = {};
};
EventEmitter.prototype.on = function (e, f) {
console.log('on', e, f)
if (e in this._events) {
if (this._events[e].indexOf(f) < 0) {
this._events[e].push(f);
}
} else {
this._events[e] = [f];
}
return this;
};
EventEmitter.prototype.off = function (e, f) {
if (f) {
function eq(x) { return function (y) { return x === y; } }
this._events[e] = this.listeners(e).filter(eq(f));
} else {
delete this._events[e];
}
};
EventEmitter.prototype.removeListener = function (e, f) {
this.off(e, f);
};
EventEmitter.prototype.removeAllListeners = function (e) {
this.off(e);
};
EventEmitter.prototype.emit = function (e) {
var args = Array.prototype.slice.call(arguments, 1),
fs = this.listeners(e);
console.log('emit', e, args)
for (var i = 0; i < fs.length; i++) {
fs[i].apply(e, args);
}
};
EventEmitter.prototype.listeners = function (e) {
return this._events[e] || [];
};
EventEmitter.prototype.once = function (e, f) {
var that = this;
this.on(e, function g() {
f.apply(e, arguments);
that.off(e, g);
});
return this;
};
exports.EventEmitter = EventEmitter;

View File

@@ -102,6 +102,7 @@ public:
bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); }
bool isNew(const uint256& s, uint64 p) { return mSuppressions.addSuppressionPeer(s, p); }
bool isNew(const uint256& s, uint64 p, int& f) { return mSuppressions.addSuppressionPeer(s, p, f); }
bool isNewFlag(const uint256& s, int f) { return mSuppressions.setFlag(s, f); }
bool running() { return mTxnDB != NULL; }
bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); }

View File

@@ -39,6 +39,13 @@ public:
CanonicalTXSet(const uint256& lclHash) : mSetHash(lclHash) { ; }
void push_back(SerializedTransaction::ref txn);
void reset(const uint256& newLCL)
{
mSetHash = newLCL;
mMap.clear();
}
iterator erase(const iterator& it);
iterator begin() { return mMap.begin(); }

View File

@@ -21,7 +21,6 @@ HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) :
bool HashedObjectStore::store(HashedObjectType type, uint32 index,
const std::vector<unsigned char>& data, const uint256& hash)
{ // return: false = already in cache, true = added to cache
assert(hash == Serializer::getSHA512Half(data));
if (!theApp->getHashNodeDB())
{
cLog(lsTRACE) << "HOS: no db";
@@ -32,6 +31,7 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index,
cLog(lsTRACE) << "HOS: " << hash << " store: incache";
return false;
}
assert(hash == Serializer::getSHA512Half(data));
HashedObject::pointer object = boost::make_shared<HashedObject>(type, index, data, hash);
if (!mCache.canonicalize(hash, object))

View File

@@ -170,7 +170,7 @@ void LedgerAcquire::addPeers()
if (!found)
{
BOOST_FOREACH(Peer::ref peer, peerList)
peerHas(peer);
peerHas(peer);
}
}

View File

@@ -35,16 +35,43 @@ void TransactionAcquire::done()
{
if (mFailed)
{
cLog(lsWARNING) << "Failed to acquire TXs " << mHash;
cLog(lsWARNING) << "Failed to acquire TX set " << mHash;
theApp->getOPs().mapComplete(mHash, SHAMap::pointer());
}
else
{
cLog(lsINFO) << "Acquired TX set " << mHash;
mMap->setImmutable();
theApp->getOPs().mapComplete(mHash, mMap);
}
}
void TransactionAcquire::onTimer()
{
if (!getPeerCount())
{ // out of peers
cLog(lsWARNING) << "Out of peers for TX set " << getHash();
bool found = false;
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
BOOST_FOREACH(Peer::ref peer, peerList)
{
if (peer->hasTxSet(getHash()))
{
found = true;
peerHas(peer);
}
}
if (!found)
{
BOOST_FOREACH(Peer::ref peer, peerList)
peerHas(peer);
}
}
else
trigger(Peer::pointer(), true);
}
boost::weak_ptr<PeerSet> TransactionAcquire::pmDowncast()
{
return boost::shared_polymorphic_downcast<PeerSet>(shared_from_this());
@@ -59,6 +86,7 @@ void TransactionAcquire::trigger(Peer::ref peer, bool timer)
}
if (!mHaveRoot)
{
cLog(lsTRACE) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer") << " no root";
ripple::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
tmGL.set_itype(ripple::liTS_CANDIDATE);
@@ -98,9 +126,15 @@ bool TransactionAcquire::takeNodes(const std::list<SHAMapNode>& nodeIDs,
const std::list< std::vector<unsigned char> >& data, Peer::ref peer)
{
if (mComplete)
{
cLog(lsTRACE) << "TX set complete";
return true;
}
if (mFailed)
{
cLog(lsTRACE) << "TX set failed";
return false;
}
try
{
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin();
@@ -116,12 +150,18 @@ bool TransactionAcquire::takeNodes(const std::list<SHAMapNode>& nodeIDs,
return false;
}
if (!mMap->addRootNode(getHash(), *nodeDatait, snfWIRE, NULL))
{
cLog(lsWARNING) << "TX acquire got bad root node";
return false;
}
else
mHaveRoot = true;
}
else if (!mMap->addKnownNode(*nodeIDit, *nodeDatait, &sf))
{
cLog(lsWARNING) << "TX acquire got bad non-root node";
return false;
}
++nodeIDit;
++nodeDatait;
}
@@ -323,7 +363,6 @@ void LedgerConsensus::handleLCL(const uint256& lclHash)
mProposing = false;
mValidating = false;
mPeerPositions.clear();
mPeerData.clear();
mDisputes.clear();
mCloseTimes.clear();
mDeadNodes.clear();
@@ -537,7 +576,7 @@ void LedgerConsensus::closeLedger()
mCloseTime = theApp->getOPs().getCloseTimeNC();
theApp->getOPs().setLastCloseTime(mCloseTime);
statusChange(ripple::neCLOSING_LEDGER, *mPreviousLedger);
takeInitialPosition(*theApp->getMasterLedger().closeLedger());
takeInitialPosition(*theApp->getMasterLedger().closeLedger(true));
}
void LedgerConsensus::stateEstablish()
@@ -842,7 +881,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec
{
boost::unordered_map<uint256, SHAMap::pointer>::const_iterator cit =
mAcquired.find(pit.second->getCurrentHash());
if (cit != mAcquired.end() && cit->second)
if ((cit != mAcquired.end()) && cit->second)
txn->setVote(pit.first, cit->second->hasItem(txID));
}
@@ -927,7 +966,10 @@ bool LedgerConsensus::peerGaveNodes(Peer::ref peer, const uint256& setHash,
{
boost::unordered_map<uint256, TransactionAcquire::pointer>::iterator acq = mAcquiring.find(setHash);
if (acq == mAcquiring.end())
{
cLog(lsINFO) << "Got TX data for set not acquiring: " << setHash;
return false;
}
TransactionAcquire::pointer set = acq->second; // We must keep the set around during the function
return set->takeNodes(nodeIDs, nodeData, peer);
}
@@ -956,6 +998,7 @@ void LedgerConsensus::playbackProposals()
for (boost::unordered_map< uint160, std::list<LedgerProposal::pointer> >::iterator
it = storedProposals.begin(), end = storedProposals.end(); it != end; ++it)
{
bool relay = false;
BOOST_FOREACH(const LedgerProposal::pointer& proposal, it->second)
{
if (proposal->hasSignature())
@@ -964,11 +1007,33 @@ void LedgerConsensus::playbackProposals()
if (proposal->checkSign())
{
cLog(lsINFO) << "Applying stored proposal";
peerPosition(proposal);
relay = peerPosition(proposal);
}
}
else if (proposal->isPrevLedger(mPrevLedgerHash))
peerPosition(proposal);
relay = peerPosition(proposal);
if (relay)
{
cLog(lsWARNING) << "We should do delayed relay of this proposal, but we cannot";
}
#if 0 // FIXME: We can't do delayed relay because we don't have the signature
std::set<uint64> peers
if (relay && theApp->getSuppression().swapSet(proposal.getSuppress(), set, SF_RELAYED))
{
cLog(lsDEBUG) << "Stored proposal delayed relay";
ripple::TMProposeSet set;
set.set_proposeseq
set.set_currenttxhash(, 256 / 8);
previousledger
closetime
nodepubkey
signature
PackedMessage::pointer message = boost::make_shared<PackedMessage>(set, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessageBut(peers, message);
}
#endif
}
}
}

View File

@@ -30,7 +30,7 @@ protected:
SHAMap::pointer mMap;
bool mHaveRoot;
void onTimer() { trigger(Peer::pointer(), true); }
void onTimer();
void newPeer(Peer::ref peer) { trigger(peer, false); }
void done();

View File

@@ -447,9 +447,8 @@ void LedgerEntrySet::calcRawMeta(Serializer& s, TER result)
it != end; ++it)
entryModify(it->second);
cLog(lsTRACE) << "Metadata:" << mSet.getJson(0);
mSet.addRaw(s, result);
cLog(lsTRACE) << "Metadata:" << mSet.getJson(0);
}
// <-- uNodeDir: For deletion, present to make dirDelete efficient.

View File

@@ -14,10 +14,10 @@ uint32 LedgerMaster::getCurrentLedgerIndex()
return mCurrentLedger->getLedgerSeq();
}
bool LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction)
void LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction)
{ // returns true if transaction was added
boost::recursive_mutex::scoped_lock ml(mLock);
return mHeldTransactionsByID.insert(std::make_pair(transaction->getID(), transaction)).second;
mHeldTransactions.push_back(transaction->getSTransaction());
}
void LedgerMaster::pushLedger(Ledger::ref newLedger)
@@ -79,12 +79,34 @@ void LedgerMaster::storeLedger(Ledger::ref ledger)
}
Ledger::pointer LedgerMaster::closeLedger()
Ledger::pointer LedgerMaster::closeLedger(bool recover)
{
boost::recursive_mutex::scoped_lock sl(mLock);
Ledger::pointer closingLedger = mCurrentLedger;
if (recover)
{
int recovers = 0;
for (CanonicalTXSet::iterator it = mHeldTransactions.begin(), end = mHeldTransactions.end(); it != end; ++it)
{
try
{
TER result = mEngine.applyTransaction(*it->second, tapOPEN_LEDGER);
if (isTepSuccess(result))
++recovers;
}
catch (...)
{
cLog(lsWARNING) << "Held transaction throws";
}
}
tLog(recovers != 0, lsINFO) << "Recovered " << recovers << " held transactions";
mHeldTransactions.reset(closingLedger->getHash());
}
mCurrentLedger = boost::make_shared<Ledger>(boost::ref(*closingLedger), true);
mEngine.setLedger(mCurrentLedger);
return closingLedger;
}

View File

@@ -9,6 +9,7 @@
#include "Transaction.h"
#include "TransactionEngine.h"
#include "RangeSet.h"
#include "CanonicalTXSet.h"
// Tracks the current ledger and any ledgers in the process of closing
// Tracks ledger history
@@ -25,7 +26,7 @@ class LedgerMaster
LedgerHistory mLedgerHistory;
std::map<uint256, Transaction::pointer> mHeldTransactionsByID;
CanonicalTXSet mHeldTransactions;
RangeSet mCompleteLedgers;
LedgerAcquire::pointer mMissingLedger;
@@ -40,7 +41,7 @@ class LedgerMaster
public:
LedgerMaster() : mMissingSeq(0) { ; }
LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0) { ; }
uint32 getCurrentLedgerIndex();
@@ -64,7 +65,7 @@ public:
std::string getCompleteLedgers() { return mCompleteLedgers.toString(); }
Ledger::pointer closeLedger();
Ledger::pointer closeLedger(bool recoverHeldTransactions);
Ledger::pointer getLedgerBySeq(uint32 index)
{
@@ -90,7 +91,7 @@ public:
void setLedgerRangePresent(uint32 minV, uint32 maxV) { mCompleteLedgers.setRange(minV, maxV); }
bool addHeldTransaction(const Transaction::pointer& trans);
void addHeldTransaction(const Transaction::pointer& trans);
void sweep(void) { mLedgerHistory.sweep(); }
};

View File

@@ -10,8 +10,9 @@
DECLARE_INSTANCE(LedgerProposal);
LedgerProposal::LedgerProposal(const uint256& pLgr, uint32 seq, const uint256& tx, uint32 closeTime,
const RippleAddress& naPeerPublic) :
mPreviousLedger(pLgr), mCurrentHash(tx), mCloseTime(closeTime), mProposeSeq(seq), mPublicKey(naPeerPublic)
const RippleAddress& naPeerPublic, const uint256& suppression) :
mPreviousLedger(pLgr), mCurrentHash(tx), mSuppression(suppression), mCloseTime(closeTime),
mProposeSeq(seq), mPublicKey(naPeerPublic)
{
// XXX Validate key.
// if (!mKey->SetPubKey(pubKey))
@@ -26,7 +27,7 @@ LedgerProposal::LedgerProposal(const RippleAddress& naPub, const RippleAddress&
const uint256& prevLgr, const uint256& position, uint32 closeTime) :
mPreviousLedger(prevLgr), mCurrentHash(position), mCloseTime(closeTime), mProposeSeq(0),
mPublicKey(naPub), mPrivateKey(naPriv)
{ // OPTIMIZEME: This is expensive. We create both the public and private keys separately each time
{
mPeerID = mPublicKey.getNodeID();
mTime = boost::posix_time::second_clock::universal_time();
}

View File

@@ -18,7 +18,7 @@ class LedgerProposal : private IS_INSTANCE(LedgerProposal)
{
protected:
uint256 mPreviousLedger, mCurrentHash;
uint256 mPreviousLedger, mCurrentHash, mSuppression;
uint32 mCloseTime, mProposeSeq;
uint160 mPeerID;
@@ -35,7 +35,7 @@ public:
// proposal from peer
LedgerProposal(const uint256& prevLgr, uint32 proposeSeq, const uint256& propose,
uint32 closeTime, const RippleAddress& naPeerPublic);
uint32 closeTime, const RippleAddress& naPeerPublic, const uint256& suppress);
// our first proposal
LedgerProposal(const RippleAddress& pubKey, const RippleAddress& privKey,
@@ -52,6 +52,7 @@ public:
const uint160& getPeerID() const { return mPeerID; }
const uint256& getCurrentHash() const { return mCurrentHash; }
const uint256& getPrevLedger() const { return mPreviousLedger; }
const uint256& getSuppression() const { return mSuppression; }
uint32 getProposeSeq() const { return mProposeSeq; }
uint32 getCloseTime() const { return mCloseTime; }
const RippleAddress& peekPublic() const { return mPublicKey; }

View File

@@ -115,19 +115,36 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
{
Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true);
if (dbtx) return dbtx;
if (dbtx)
return dbtx;
if (!trans->checkSign())
{
cLog(lsINFO) << "Transaction has bad signature";
int newFlags = theApp->getSuppression().getFlags(trans->getID());
if ((newFlags & SF_BAD) != 0)
{ // cached bad
trans->setStatus(INVALID);
return trans;
}
TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER);
if ((newFlags & SF_SIGGOOD) == 0)
{ // signature not checked
if (!trans->checkSign())
{
cLog(lsINFO) << "Transaction has bad signature";
trans->setStatus(INVALID);
theApp->isNewFlag(trans->getID(), SF_BAD);
return trans;
}
theApp->isNewFlag(trans->getID(), SF_SIGGOOD);
}
TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER | tapNO_CHECK_SIGN);
trans->setResult(r);
if (isTemMalformed(r)) // malformed, cache bad
theApp->isNewFlag(trans->getID(), SF_BAD);
else if(isTelLocal(r) || isTerRetry(r)) // can be retried
theApp->isNewFlag(trans->getID(), SF_RETRY);
#ifdef DEBUG
if (r != tesSUCCESS)
{
@@ -139,9 +156,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
if (r == tefFAILURE)
throw Fault(IO_ERROR);
if (r == terPRE_SEQ)
if (isTerRetry(r))
{ // transaction should be held
cLog(lsDEBUG) << "Transaction should be held";
cLog(lsDEBUG) << "Transaction should be held: " << r;
trans->setStatus(HELD);
theApp->getMasterTransaction().canonicalize(trans, true);
mLedgerMaster->addHeldTransaction(trans);
@@ -154,12 +171,24 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
return trans;
}
bool relay = true;
if (r == tesSUCCESS)
{
cLog(lsINFO) << "Transaction is now included";
cLog(lsINFO) << "Transaction is now included in open ledger";
trans->setStatus(INCLUDED);
theApp->getMasterTransaction().canonicalize(trans, true);
}
else
{
cLog(lsDEBUG) << "Status other than success " << r;
if (mMode == omFULL)
relay = false;
trans->setStatus(INVALID);
}
if (relay)
{
std::set<uint64> peers;
if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
@@ -168,32 +197,13 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessageBut(peers, packet);
}
return trans;
}
cLog(lsDEBUG) << "Status other than success " << r;
std::set<uint64> peers;
if ((mMode != omFULL) && (mMode != omTRACKING) &&
theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
ripple::TMTransaction tx;
Serializer s;
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessageTo(peers, packet);
}
trans->setStatus(INVALID);
return trans;
}
@@ -694,60 +704,55 @@ bool NetworkOPs::haveConsensusObject()
return mConsensus;
}
// <-- bool: true to relay
bool NetworkOPs::recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash,
const uint256& prevLedger, uint32 closeTime, const std::string& signature,
const RippleAddress& nodePublic)
uint256 NetworkOPs::getConsensusLCL()
{
// JED: does mConsensus need to be locked?
if (!haveConsensusObject())
return uint256();
return mConsensus->getLCL();
}
// XXX Validate key.
// XXX Take a vuc for pubkey.
void NetworkOPs::processTrustedProposal(LedgerProposal::pointer proposal,
boost::shared_ptr<ripple::TMProposeSet> set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood)
{
bool relay = true;
if (!haveConsensusObject())
{
cLog(lsINFO) << "Received proposal outside consensus window";
return mMode != omFULL;
if (mMode == omFULL)
relay = false;
}
if (mConsensus->isOurPubKey(nodePublic))
else
{
cLog(lsTRACE) << "Received our own validation";
return false;
}
storeProposal(proposal, nodePublic);
// Is this node on our UNL?
if (!theApp->getUNL().nodeInUNL(nodePublic))
{
cLog(lsINFO) << "Untrusted proposal: " << nodePublic.humanNodePublic() << " " << proposeHash;
return true;
}
uint256 consensusLCL = mConsensus->getLCL();
if (prevLedger.isNonZero())
{ // proposal includes a previous ledger
LedgerProposal::pointer proposal =
boost::make_shared<LedgerProposal>(prevLedger, proposeSeq, proposeHash, closeTime, nodePublic);
if (!proposal->checkSign(signature))
if (!set->has_previousledger() && (checkLedger != consensusLCL))
{
cLog(lsWARNING) << "New-style ledger proposal fails signature check";
return false;
cLog(lsWARNING) << "Have to re-check proposal signature due to consensus view change";
assert(proposal->hasSignature());
proposal->setPrevLedger(consensusLCL);
if (proposal->checkSign())
sigGood = true;
}
if (sigGood && (consensusLCL == proposal->getPrevLedger()))
{
relay = mConsensus->peerPosition(proposal);
cLog(lsTRACE) << "Proposal processing finished, relay=" << relay;
}
if (prevLedger == mConsensus->getLCL())
return mConsensus->peerPosition(proposal);
storeProposal(proposal, nodePublic);
return false;
}
LedgerProposal::pointer proposal =
boost::make_shared<LedgerProposal>(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, nodePublic);
if (!proposal->checkSign(signature))
{ // Note that if the LCL is different, the signature check will fail
cLog(lsWARNING) << "Ledger proposal fails signature check";
proposal->setSignature(signature);
storeProposal(proposal, nodePublic);
return false;
if (relay)
{
std::set<uint64> peers;
theApp->getSuppression().swapSet(proposal->getSuppression(), peers, SF_RELAYED);
PackedMessage::pointer message = boost::make_shared<PackedMessage>(*set, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessageBut(peers, message);
}
return mConsensus->peerPosition(proposal);
else
cLog(lsINFO) << "Not relaying trusted proposal";
}
SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash)
@@ -761,7 +766,10 @@ bool NetworkOPs::gotTXData(const boost::shared_ptr<Peer>& peer, const uint256& h
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData)
{
if (!haveConsensusObject())
{
cLog(lsWARNING) << "Got TX data with no consensus object";
return false;
}
return mConsensus->peerGaveNodes(peer, hash, nodeIDs, nodeData);
}
@@ -777,7 +785,7 @@ bool NetworkOPs::hasTXSet(const boost::shared_ptr<Peer>& peer, const uint256& se
void NetworkOPs::mapComplete(const uint256& hash, SHAMap::ref map)
{
if (!haveConsensusObject())
if (haveConsensusObject())
mConsensus->mapComplete(hash, map, true);
}

View File

@@ -172,8 +172,8 @@ public:
const std::vector<unsigned char>& myNode, std::list< std::vector<unsigned char> >& newNodes);
// ledger proposal/close functions
bool recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash,
const uint256& prevLedger, uint32 closeTime, const std::string& signature, const RippleAddress& nodePublic);
void processTrustedProposal(LedgerProposal::pointer proposal, boost::shared_ptr<ripple::TMProposeSet> set,
RippleAddress nodePublic, uint256 checkLedger, bool sigGood);
bool gotTXData(const boost::shared_ptr<Peer>& peer, const uint256& hash,
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData);
bool recvValidation(const SerializedValidation::pointer& val);
@@ -203,6 +203,7 @@ public:
boost::unordered_map<uint160,
std::list<LedgerProposal::pointer> >& peekStoredProposals() { return mStoredProposals; }
void storeProposal(const LedgerProposal::pointer& proposal, const RippleAddress& peerPublic);
uint256 getConsensusLCL();
// client information retrieval functions
std::vector< std::pair<uint32, uint256> >

View File

@@ -500,8 +500,8 @@ void Peer::processReadBuffer()
case ripple::mtPROPOSE_LEDGER:
{
ripple::TMProposeSet msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
boost::shared_ptr<ripple::TMProposeSet> msg = boost::make_shared<ripple::TMProposeSet>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvPropose(msg);
else
cLog(lsWARNING) << "parse error: " << type;
@@ -694,6 +694,42 @@ void Peer::recvHello(ripple::TMHello& packet)
}
}
static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx, boost::weak_ptr<Peer> peer)
{
#ifndef TRUST_NETWORK
try
{
#endif
Transaction::pointer tx;
if ((flags & SF_SIGGOOD) != 0)
{
tx = boost::make_shared<Transaction>(stx, true);
if (tx->getStatus() == INVALID)
{
theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD);
Peer::punishPeer(peer, PP_BAD_SIGNATURE);
return;
}
else
theApp->getSuppression().setFlag(stx->getTransactionID(), SF_SIGGOOD);
}
else
tx = boost::make_shared<Transaction>(stx, false);
theApp->getIOService().post(boost::bind(&NetworkOPs::processTransaction, &theApp->getOPs(), tx));
#ifndef TRUST_NETWORK
}
catch (...)
{
theApp->getSuppression().setFlags(stx->getTransactionID(), SF_BAD);
punishPeer(peer, PP_INVALID_REQUEST);
}
#endif
}
void Peer::recvTransaction(ripple::TMTransaction& packet)
{
cLog(lsDEBUG) << "Got transaction from peer";
@@ -708,12 +744,22 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
SerializerIterator sit(s);
SerializedTransaction::pointer stx = boost::make_shared<SerializedTransaction>(boost::ref(sit));
if (!theApp->isNew(stx->getTransactionID(), mPeerId))
return;
int flags;
if (!theApp->isNew(stx->getTransactionID(), mPeerId, flags))
{ // we have seen this transaction recently
if ((flags & SF_BAD) != 0)
{
punishPeer(PP_BAD_SIGNATURE);
return;
}
if ((flags & SF_RETRY) == 0)
return;
}
theApp->getJobQueue().addJob(jtTRANSACTION,
boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr<Peer>(shared_from_this())));
tx = boost::make_shared<Transaction>(stx, true);
if (tx->getStatus() == INVALID)
throw(0);
#ifndef TRUST_NETWORK
}
catch (...)
@@ -727,52 +773,122 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
}
#endif
tx = theApp->getOPs().processTransaction(tx);
if(tx->getStatus() != INCLUDED)
{ // transaction wasn't accepted into ledger
#ifdef DEBUG
std::cerr << "Transaction from peer won't go in ledger" << std::endl;
#endif
}
}
void Peer::recvPropose(ripple::TMProposeSet& packet)
static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packet,
LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, boost::weak_ptr<Peer> peer)
{
if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) ||
(packet.signature().size() < 56) || (packet.nodepubkey().size() > 128) || (packet.signature().size() > 128))
bool sigGood = false;
bool isTrusted = (job.getType() == jtPROPOSAL_t);
cLog(lsTRACE) << "Checking " << (isTrusted ? "trusted" : "UNtrusted") << " proposal";
assert(packet);
ripple::TMProposeSet& set = *packet;
uint256 prevLedger;
if (set.has_previousledger())
{ // proposal includes a previous ledger
cLog(lsTRACE) << "proposal with previous ledger";
memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8);
if (!proposal->checkSign(set.signature()))
{
cLog(lsWARNING) << "proposal with previous ledger fails signature check";
Peer::punishPeer(peer, PP_BAD_SIGNATURE);
return;
}
else
sigGood = true;
}
else
{
if (consensusLCL.isNonZero() && proposal->checkSign(set.signature()))
{
prevLedger = consensusLCL;
sigGood = true;
}
else
{
cLog(lsWARNING) << "Ledger proposal fails signature check";
proposal->setSignature(set.signature());
}
}
if (isTrusted)
{
theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(),
proposal, packet, nodePublic, prevLedger, sigGood));
}
else if (sigGood && (prevLedger == consensusLCL))
{ // relay untrusted proposal
cLog(lsTRACE) << "relaying untrusted proposal";
std::set<uint64> peers;
theApp->getSuppression().swapSet(proposal->getSuppression(), peers, SF_RELAYED);
PackedMessage::pointer message = boost::make_shared<PackedMessage>(set, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessageBut(peers, message);
}
else
cLog(lsDEBUG) << "Not relaying untrusted proposal";
}
void Peer::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
{
assert(packet);
ripple::TMProposeSet& set = *packet;
if ((set.currenttxhash().size() != 32) || (set.nodepubkey().size() < 28) ||
(set.signature().size() < 56) || (set.nodepubkey().size() > 128) || (set.signature().size() > 128))
{
cLog(lsWARNING) << "Received proposal is malformed";
punishPeer(PP_INVALID_REQUEST);
return;
}
uint256 currentTxHash, prevLedger;
memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32);
if (set.has_previousledger() && (set.previousledger().size() != 32))
{
cLog(lsWARNING) << "Received proposal is malformed";
punishPeer(PP_INVALID_REQUEST);
return;
}
if ((packet.has_previousledger()) && (packet.previousledger().size() == 32))
memcpy(prevLedger.begin(), packet.previousledger().data(), 32);
uint256 proposeHash, prevLedger;
memcpy(proposeHash.begin(), set.currenttxhash().data(), 32);
if (set.has_previousledger())
memcpy(prevLedger.begin(), set.previousledger().data(), 32);
Serializer s(512);
s.add256(currentTxHash);
s.add256(prevLedger);
s.add32(packet.proposeseq());
s.add32(packet.closetime());
s.addVL(packet.nodepubkey());
s.addVL(packet.signature());
s.add256(proposeHash);
s.add32(set.proposeseq());
s.add32(set.closetime());
s.addVL(set.nodepubkey());
s.addVL(set.signature());
if (set.has_previousledger())
s.add256(prevLedger);
uint256 suppression = s.getSHA512Half();
if (!theApp->isNew(suppression, mPeerId))
{
cLog(lsTRACE) << "Received duplicate proposal from peer " << mPeerId;
return;
RippleAddress nodePublic = RippleAddress::createNodePublic(strCopy(packet.nodepubkey()));
// bool isTrusted = theApp->getUNL().nodeInUNL(nodePublic);
if(theApp->getOPs().recvPropose(suppression, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(),
packet.signature(), nodePublic))
{ // FIXME: Not all nodes will want proposals
PackedMessage::pointer message = boost::make_shared<PackedMessage>(packet, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessage(this, message);
}
RippleAddress signerPublic = RippleAddress::createNodePublic(strCopy(set.nodepubkey()));
if (signerPublic == theConfig.VALIDATION_PUB)
{
cLog(lsTRACE) << "Received our own proposal from peer " << mPeerId;
return;
}
bool isTrusted = theApp->getUNL().nodeInUNL(signerPublic);
cLog(lsTRACE) << "Received " << (isTrusted ? "trusted" : "UNtrusted") << " proposal from " << mPeerId;
uint256 consensusLCL = theApp->getOPs().getConsensusLCL();
LedgerProposal::pointer proposal = boost::make_shared<LedgerProposal>(
prevLedger.isNonZero() ? prevLedger : consensusLCL,
set.proposeseq(), proposeHash, set.closetime(), signerPublic, suppression);
theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
boost::bind(&checkPropose, _1, packet, proposal, consensusLCL,
mNodePublic, boost::weak_ptr<Peer>(shared_from_this())));
}
void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet)
@@ -1043,11 +1159,12 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
reply.set_requestcookie(packet.requestcookie());
if (packet.itype() == ripple::liTS_CANDIDATE)
{ // Request is for a transaction candidate set
{ // Request is for a transaction candidate set
cLog(lsINFO) << "Received request for TX candidate set data " << getIP();
if ((!packet.has_ledgerhash() || packet.ledgerhash().size() != 32))
{
punishPeer(PP_INVALID_REQUEST);
cLog(lsWARNING) << "invalid request";
return;
}
uint256 txHash;
@@ -1171,6 +1288,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
SHAMapNode mn(packet.nodeids(i).data(), packet.nodeids(i).size());
if(!mn.isValid())
{
cLog(lsWARNING) << "Request for invalid node";
punishPeer(PP_INVALID_REQUEST);
return;
}
@@ -1178,6 +1296,8 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
std::list< std::vector<unsigned char> > rawNodes;
if(map->getNodeFat(mn, nodeIDs, rawNodes, fatRoot, fatLeaves))
{
assert(nodeIDs.size() == rawNodes.size());
cLog(lsDEBUG) << "getNodeFat got " << rawNodes.size() << " nodes";
std::vector<SHAMapNode>::iterator nodeIDIterator;
std::list< std::vector<unsigned char> >::iterator rawNodeIterator;
for(nodeIDIterator = nodeIDs.begin(), rawNodeIterator = rawNodes.begin();
@@ -1190,6 +1310,8 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
node->set_nodedata(&rawNodeIterator->front(), rawNodeIterator->size());
}
}
else
cLog(lsWARNING) << "getNodeFat returns false";
}
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage>(reply, ripple::mtLEDGER_DATA);
sendPacket(oPacket);
@@ -1199,7 +1321,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
{
if (packet.nodes().size() <= 0)
{
cLog(lsWARNING) << "Ledger data with no nodes";
cLog(lsWARNING) << "Ledger/TXset data with no nodes";
punishPeer(PP_INVALID_REQUEST);
return;
}
@@ -1209,6 +1331,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
uint256 hash;
if(packet.ledgerhash().size() != 32)
{
cLog(lsWARNING) << "TX candidate reply with invalid hash size";
punishPeer(PP_INVALID_REQUEST);
return;
}
@@ -1368,7 +1491,7 @@ Json::Value Peer::getJson()
if (mHello.has_protoversion() &&
(mHello.protoversion() != MAKE_VERSION_INT(PROTO_VERSION_MAJOR, PROTO_VERSION_MINOR)))
ret["protocol"] = boost::lexical_cast<std::string>(GET_VERSION_MAJOR(mHello.protoversion())) + "." +
ret["protocol"] = boost::lexical_cast<std::string>(GET_VERSION_MAJOR(mHello.protoversion())) + "." +
boost::lexical_cast<std::string>(GET_VERSION_MINOR(mHello.protoversion()));
if (!!mClosedLedgerHash)

View File

@@ -19,6 +19,7 @@ enum PeerPunish
PP_INVALID_REQUEST = 1, // The peer sent a request that makes no sense
PP_UNKNOWN_REQUEST = 2, // The peer sent a request that might be garbage
PP_UNWANTED_DATA = 3, // The peer sent us data we didn't want/need
PP_BAD_SIGNATURE = 4, // Object had bad signature
};
typedef std::pair<std::string,int> ipPort;
@@ -116,7 +117,7 @@ protected:
void recvGetLedger(ripple::TMGetLedger& packet);
void recvLedger(ripple::TMLedgerData& packet);
void recvStatus(ripple::TMStatusChange& packet);
void recvPropose(ripple::TMProposeSet& packet);
void recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet);
void recvHaveTxSet(ripple::TMHaveTransactionSet& packet);
void getSessionCookie(std::string& strDst);

View File

@@ -716,23 +716,22 @@ SHAMapTreeNode::pointer SHAMap::fetchNodeExternal(const SHAMapNode& id, const ui
// Log(lsTRACE) << "fetchNodeExternal: missing " << hash;
throw SHAMapMissingNode(mType, id, hash);
}
assert(Serializer::getSHA512Half(obj->getData()) == hash);
try
{
SHAMapTreeNode::pointer ret = boost::make_shared<SHAMapTreeNode>(id, obj->getData(), mSeq - 1, snfPREFIX);
#ifdef DEBUG
if (id != *ret)
{
Log(lsFATAL) << "id:" << id << ", got:" << *ret;
assert(false);
return SHAMapTreeNode::pointer();
}
if (ret->getNodeHash() != hash)
{
Log(lsFATAL) << "Hashes don't match";
assert(false);
return SHAMapTreeNode::pointer();
}
#endif
return ret;
}
catch (...)

View File

@@ -58,6 +58,25 @@ bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer)
return created;
}
bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer, int& flags)
{
boost::mutex::scoped_lock sl(mSuppressionMutex);
bool created;
Suppression &s = findCreateEntry(index, created);
s.addPeer(peer);
flags = s.getFlags();
return created;
}
int SuppressionTable::getFlags(const uint256& index)
{
boost::mutex::scoped_lock sl(mSuppressionMutex);
bool created;
return findCreateEntry(index, created).getFlags();
}
bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag)
{
boost::mutex::scoped_lock sl(mSuppressionMutex);

View File

@@ -14,10 +14,11 @@
DEFINE_INSTANCE(Suppression);
#define SF_RELAYED 0x01
#define SF_SIGBAD 0x02
#define SF_SIGGOOD 0x04
#define SF_RELAYED 0x01 // Has already been relayed to other nodes
#define SF_BAD 0x02 // Signature/format is bad
#define SF_SIGGOOD 0x04 // Signature is good
#define SF_SAVED 0x08
#define SF_RETRY 0x10 // Transaction can be retried
class Suppression : private IS_INSTANCE(Suppression)
{
@@ -61,12 +62,15 @@ public:
bool addSuppression(const uint256& index);
bool addSuppressionPeer(const uint256& index, uint64 peer);
bool addSuppressionPeer(const uint256& index, uint64 peer, int& flags);
bool addSuppressionFlags(const uint256& index, int flag);
bool setFlag(const uint256& index, int flag);
int getFlags(const uint256& index);
Suppression getEntry(const uint256&);
bool swapSet(const uint256& index, std::set<uint64>& peers, int flag);
bool swapSet(const uint256& index, std::set<uint64>& peers);
};
#endif

View File

@@ -455,7 +455,7 @@ TER TransactionEngine::applyTransaction(const SerializedTransaction& txn, Transa
terResult = terRETRY;
}
if (tesSUCCESS == terResult || isTepPartial(terResult))
if ((tesSUCCESS == terResult) || isTepPartial(terResult))
{
// Transaction succeeded fully or (retries are not allowed and the transaction succeeded partially).
Serializer m;

View File

@@ -121,6 +121,7 @@ static bool compare(const STObject& o1, const STObject& o2)
STObject TransactionMetaSet::getAsObject() const
{
STObject metaData(sfTransactionMetaData);
assert(mResult != 255);
metaData.setFieldU8(sfTransactionResult, mResult);
metaData.addObject(mNodes);
return metaData;