Begin using the new code.

This commit is contained in:
JoelKatz
2012-10-31 20:36:41 -07:00
parent 41da9c740f
commit 40dc8e15c5
10 changed files with 96 additions and 36 deletions

View File

@@ -95,7 +95,11 @@ public:
HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; }
ValidationCollection& getValidations() { return mValidations; }
JobQueue& getJobQueue() { return mJobQueue; }
SuppressionTable& getSuppression() { return mSuppressions; }
bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); }
bool isNew(const uint256& s, uint64 p) { return mSuppressions.addSuppressionPeer(s, p); }
bool isNewFlag(const uint256& s, int f) { return mSuppressions.setFlag(s, f); }
bool running() { return mTxnDB != NULL; }
bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); }

View File

@@ -251,22 +251,30 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m
return sentTo;
}
int ConnectionPool::relayMessage(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg)
{
int sentTo = 0;
void ConnectionPool::relayMessageBut(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg)
{ // Relay message to all but the specified peers
boost::mutex::scoped_lock sl(mPeerLock);
BOOST_FOREACH(naPeer pair, mConnectedMap)
{
Peer::ref peer = pair.second;
if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0))
{
++sentTo;
peer->sendPacket(msg);
}
}
return sentTo;
}
void ConnectionPool::relayMessageTo(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg)
{ // Relay message to the specified peers
boost::mutex::scoped_lock sl(mPeerLock);
BOOST_FOREACH(naPeer pair, mConnectedMap)
{
Peer::ref peer = pair.second;
if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) > 0))
peer->sendPacket(msg);
}
}
// Schedule a connection via scanning.

View File

@@ -62,7 +62,8 @@ public:
// Send message to network.
int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg);
int relayMessage(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
void relayMessageTo(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
void relayMessageBut(const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
// Manual connection request.
// Queue for immediate scanning.

View File

@@ -1073,7 +1073,7 @@ int Ledger::getPendingSaves()
void Ledger::pendSave(bool fromConsensus)
{
if (!fromConsensus && !theApp->isNew(getHash()))
if (!fromConsensus && !theApp->isNewFlag(getHash(), SF_SAVED))
return;
boost::thread thread(boost::bind(&Ledger::saveAcceptedLedger, shared_from_this(), fromConsensus));

View File

@@ -845,7 +845,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec
txn->setVote(pit.first, cit->second->hasItem(txID));
}
if (!ourVote && theApp->isNew(txID))
if (!ourVote && theApp->isNewFlag(txID, SF_RELAYED))
{
ripple::TMTransaction msg;
msg.set_rawtransaction(&(tx.front()), tx.size());

View File

@@ -103,7 +103,7 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t
return tpTransNew;
}
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, Peer* source)
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
{
Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true);
if (dbtx) return dbtx;
@@ -151,27 +151,28 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
trans->setStatus(INCLUDED);
theApp->getMasterTransaction().canonicalize(trans, true);
// FIXME: Need code to get all accounts affected by a transaction and re-synch
// any of them that affect local accounts cached in memory. Or, we need to
// no cache the account balance information and always get it from the current ledger
// theApp->getWallet().applyTransaction(trans);
std::set<uint64> peers;
if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
ripple::TMTransaction tx;
Serializer s;
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
ripple::TMTransaction tx;
Serializer s;
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
int sentTo = theApp->getConnectionPool().relayMessage(source, packet);
cLog(lsINFO) << "Transaction relayed to " << sentTo << " node(s)";
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessageBut(peers, packet);
}
return trans;
}
cLog(lsDEBUG) << "Status other than success " << r;
if ((mMode != omFULL) && (mMode != omTRACKING) && (theApp->isNew(trans->getID())))
std::set<uint64> peers;
if ((mMode != omFULL) && (mMode != omTRACKING) &&
theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
ripple::TMTransaction tx;
Serializer s;
@@ -180,7 +181,7 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessage(source, packet);
theApp->getConnectionPool().relayMessageTo(peers, packet);
}
trans->setStatus(INVALID);
@@ -685,7 +686,7 @@ bool NetworkOPs::haveConsensusObject()
}
// <-- bool: true to relay
bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger,
bool NetworkOPs::recvPropose(uint64 peerId, uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger,
uint32 closeTime, const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic)
{
// JED: does mConsensus need to be locked?
@@ -701,7 +702,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons
s.add32(closeTime);
s.addRaw(pubKey);
s.addRaw(signature);
if (!theApp->isNew(s.getSHA512Half()))
if (!theApp->isNew(s.getSHA512Half(), peerId))
return false;
RippleAddress naPeerPublic = RippleAddress::createNodePublic(strCopy(pubKey));

View File

@@ -121,7 +121,7 @@ public:
//
Transaction::pointer submitTransaction(const Transaction::pointer& tpTrans);
Transaction::pointer processTransaction(Transaction::pointer transaction, Peer* source = NULL);
Transaction::pointer processTransaction(Transaction::pointer transaction);
Transaction::pointer findTransactionByID(const uint256& transactionID);
int findTransactionsBySource(const uint256& uLedger, std::list<Transaction::pointer>&, const RippleAddress& sourceAccount,
uint32 minSeq, uint32 maxSeq);
@@ -168,8 +168,8 @@ public:
const std::vector<unsigned char>& myNode, std::list< std::vector<unsigned char> >& newNodes);
// ledger proposal/close functions
bool recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, uint32 closeTime,
const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic);
bool recvPropose(uint64 peerId, uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger,
uint32 closeTime, const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic);
bool gotTXData(const boost::shared_ptr<Peer>& peer, const uint256& hash,
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData);
bool recvValidation(const SerializedValidation::pointer& val);

View File

@@ -708,8 +708,12 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
SerializerIterator sit(s);
SerializedTransaction::pointer stx = boost::make_shared<SerializedTransaction>(boost::ref(sit));
if (!theApp->isNew(stx->getTransactionID(), mPeerId))
return;
tx = boost::make_shared<Transaction>(stx, true);
if (tx->getStatus() == INVALID) throw(0);
if (tx->getStatus() == INVALID)
throw(0);
#ifndef TRUST_NETWORK
}
catch (...)
@@ -723,7 +727,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
}
#endif
tx = theApp->getOPs().processTransaction(tx, this);
tx = theApp->getOPs().processTransaction(tx);
if(tx->getStatus() != INCLUDED)
{ // transaction wasn't accepted into ledger
@@ -748,7 +752,7 @@ void Peer::recvPropose(ripple::TMProposeSet& packet)
if ((packet.has_previousledger()) && (packet.previousledger().size() == 32))
memcpy(prevLedger.begin(), packet.previousledger().data(), 32);
if(theApp->getOPs().recvPropose(packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(),
if(theApp->getOPs().recvPropose(mPeerId, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(),
packet.nodepubkey(), packet.signature(), mNodePublic))
{ // FIXME: Not all nodes will want proposals
PackedMessage::pointer message = boost::make_shared<PackedMessage>(packet, ripple::mtPROPOSE_LEDGER);
@@ -822,7 +826,7 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
SerializedValidation::pointer val = boost::make_shared<SerializedValidation>(boost::ref(sit), false);
uint256 signingHash = val->getSigningHash();
if (!theApp->isNew(signingHash))
if (!theApp->isNew(signingHash, mPeerId))
{
cLog(lsTRACE) << "Validation is duplicate";
return;

View File

@@ -66,3 +66,35 @@ bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag)
findCreateEntry(index, created).setFlag(flag);
return created;
}
bool SuppressionTable::setFlag(const uint256& index, int flag)
{ // return: true = changed, false = unchanged
assert(flag != 0);
boost::mutex::scoped_lock sl(mSuppressionMutex);
bool created;
Suppression &s = findCreateEntry(index, created);
if ((s.getFlags() & flag) == flag)
return false;
s.setFlag(flag);
return true;
}
bool SuppressionTable::swapSet(const uint256& index, std::set<uint64>& peers, int flag)
{
boost::mutex::scoped_lock sl(mSuppressionMutex);
bool created;
Suppression &s = findCreateEntry(index, created);
if ((s.getFlags() & flag) == flag)
return false;
s.swapSet(peers);
s.setFlag(flag);
return true;
}

View File

@@ -14,6 +14,11 @@
DEFINE_INSTANCE(Suppression);
#define SF_RELAYED 0x01
#define SF_SIGBAD 0x02
#define SF_SIGGOOD 0x04
#define SF_SAVED 0x08
class Suppression : private IS_INSTANCE(Suppression)
{
protected:
@@ -27,9 +32,11 @@ public:
void addPeer(uint64 peer) { mPeers.insert(peer); }
bool hasPeer(uint64 peer) { return mPeers.count(peer) > 0; }
int getFlags(void) { return mFlags; }
bool hasFlag(int f) { return (mFlags & f) != 0; }
void setFlag(int f) { mFlags |= f; }
void clearFlag(int f) { mFlags &= ~f; }
void swapSet(std::set<uint64>& s) { mPeers.swap(s); }
};
class SuppressionTable
@@ -55,8 +62,11 @@ public:
bool addSuppressionPeer(const uint256& index, uint64 peer);
bool addSuppressionFlags(const uint256& index, int flag);
bool setFlag(const uint256& index, int flag);
Suppression getEntry(const uint256&);
bool swapSet(const uint256& index, std::set<uint64>& peers, int flag);
};
#endif