Merge code to advertise and generate fetch packs.

This commit is contained in:
JoelKatz
2013-04-22 14:18:45 -07:00
parent a6ce832ed3
commit 455595891b
9 changed files with 252 additions and 20 deletions

View File

@@ -37,6 +37,7 @@ const char* Job::toString(JobType t)
switch(t)
{
case jtINVALID: return "invalid";
case jtPACK: return "makeFetchPack";
case jtPUBOLDLEDGER: return "publishAcqLedger";
case jtVALIDATION_ut: return "untrustedValidation";
case jtPROOFWORK: return "proofOfWork";
@@ -304,7 +305,7 @@ void JobQueue::threadEntry()
// }
}
if (mShuttingDown)
if (mJobSet.empty())
break;
JobType type;

View File

@@ -22,21 +22,22 @@
enum JobType
{ // must be in priority order, low to high
jtINVALID = -1,
jtPUBOLDLEDGER = 1, // An old ledger has been accepted
jtVALIDATION_ut = 2, // A validation from an untrusted source
jtPROOFWORK = 3, // A proof of work demand from another server
jtPROPOSAL_ut = 4, // A proposal from an untrusted source
jtLEDGER_DATA = 5, // Received data for a ledger we're acquiring
jtCLIENT = 6, // A websocket command from the client
jtTRANSACTION = 7, // A transaction received from the network
jtPUBLEDGER = 8, // Publish a fully-accepted ledger
jtWAL = 9, // Write-ahead logging
jtVALIDATION_t = 10, // A validation from a trusted source
jtWRITE = 11, // Write out hashed objects
jtTRANSACTION_l = 12, // A local transaction
jtPROPOSAL_t = 13, // A proposal from a trusted source
jtADMIN = 14, // An administrative operation
jtDEATH = 15, // job of death, used internally
jtPACK = 1, // Make a fetch pack for a peer
jtPUBOLDLEDGER = 2, // An old ledger has been accepted
jtVALIDATION_ut = 3, // A validation from an untrusted source
jtPROOFWORK = 4, // A proof of work demand from another server
jtPROPOSAL_ut = 5, // A proposal from an untrusted source
jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring
jtCLIENT = 7, // A websocket command from the client
jtTRANSACTION = 8, // A transaction received from the network
jtPUBLEDGER = 9, // Publish a fully-accepted ledger
jtWAL = 10, // Write-ahead logging
jtVALIDATION_t = 11, // A validation from a trusted source
jtWRITE = 12, // Write out hashed objects
jtTRANSACTION_l = 13, // A local transaction
jtPROPOSAL_t = 14, // A proposal from a trusted source
jtADMIN = 15, // An administrative operation
jtDEATH = 16, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 24,

View File

@@ -472,6 +472,12 @@ void LedgerConsensus::statusChange(ripple::NodeEvent event, Ledger& ledger)
s.set_networktime(theApp->getOPs().getNetworkTimeNC());
uint256 hash = ledger.getParentHash();
s.set_ledgerhashprevious(hash.begin(), hash.size());
uint32 uMin, uMax;
theApp->getOPs().getValidatedRange(uMin, uMax);
s.set_firstseq(uMin);
s.set_lastseq(uMax);
hash = ledger.getHash();
s.set_ledgerhash(hash.begin(), hash.size());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(s, ripple::mtSTATUS_CHANGE);

View File

@@ -6,6 +6,7 @@
#include "utils.h"
#include "Application.h"
#include "Transaction.h"
#include "HashPrefixes.h"
#include "LedgerConsensus.h"
#include "LedgerTiming.h"
#include "Log.h"
@@ -2003,4 +2004,69 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays
// jvResult["nodes"] = Json::Value(Json::arrayValue);
}
void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer wantLedger, Ledger::pointer haveLedger)
{
try
{
Peer::pointer peer = wPeer.lock();
if (!peer)
return;
ripple::TMGetObjectByHash reply;
reply.set_query(false);
if (request->has_seq())
reply.set_seq(request->seq());
reply.set_ledgerhash(reply.ledgerhash());
reply.set_type(ripple::TMGetObjectByHash::otFETCH_PACK);
do
{
uint32 lSeq = wantLedger->getLedgerSeq();
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(wantLedger->getHash().begin(), 256 / 8);
Serializer s(256);
s.add32(sHP_Ledger);
wantLedger->addRaw(s);
newObj.set_data(s.getDataPtr(), s.getLength());
newObj.set_ledgerseq(lSeq);
std::list<SHAMap::fetchPackEntry_t> pack = wantLedger->peekAccountStateMap()->getFetchPack(
haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size());
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
newObj.set_ledgerseq(lSeq);
}
if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768))
{
pack = wantLedger->peekTransactionMap()->getFetchPack(NULL, true, 256);
BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack)
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
newObj.set_ledgerseq(lSeq);
}
}
if (reply.objects().size() >= 768)
break;
haveLedger = wantLedger;
wantLedger = getLedgerByHash(haveLedger->getParentHash());
} while (wantLedger);
cLog(lsINFO) << "Built fetch pack with " << reply.objects().size() << " nodes";
PackedMessage::pointer msg = boost::make_shared<PackedMessage>(reply, ripple::mtGET_OBJECTS);
peer->sendPacket(msg, false);
}
catch (...)
{
cLog(lsWARNING) << "Exception building fetch pach";
}
}
// vim:ts=4

View File

@@ -260,6 +260,8 @@ public:
bool hasTXSet(const boost::shared_ptr<Peer>& peer, const uint256& set, ripple::TxSetStatus status);
void mapComplete(const uint256& hash, SHAMap::ref map);
bool stillNeedTXSet(const uint256& hash);
void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer wantLedger, Ledger::pointer haveLedger);
// network state machine
void checkState(const boost::system::error_code& result);

View File

@@ -36,6 +36,8 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx,
mPeerId(peerID),
mPrivate(false),
mLoad(""),
mMinLedger(0),
mMaxLedger(0),
mSocketSsl(io_service, ctx),
mActivityTimer(io_service),
mIOStrand(io_service)
@@ -652,8 +654,8 @@ void Peer::processReadBuffer()
case ripple::mtGET_OBJECTS:
{
event->reName("Peer::getobjects");
ripple::TMGetObjectByHash msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
boost::shared_ptr<ripple::TMGetObjectByHash> msg = boost::make_shared<ripple::TMGetObjectByHash>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvGetObjectByHash(msg);
else
cLog(lsWARNING) << "parse error: " << type;
@@ -1184,10 +1186,18 @@ void Peer::recvPeers(ripple::TMPeers& packet)
}
}
void Peer::recvGetObjectByHash(ripple::TMGetObjectByHash& packet)
void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& ptr)
{
ripple::TMGetObjectByHash& packet = *ptr;
if (packet.query())
{ // this is a query
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
{
doFetchPack(ptr);
return;
}
ripple::TMGetObjectByHash reply;
reply.set_query(false);
@@ -1846,6 +1856,48 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr<Peer> peer, ProofOfWork::pointer
}
}
void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet)
{
if (packet->ledgerhash().size() != 32)
{
cLog(lsWARNING) << "FetchPack hash size malformed";
punishPeer(LT_InvalidRequest);
return;
}
uint256 hash;
memcpy(hash.begin(), packet->ledgerhash().data(), 32);
Ledger::pointer haveLedger = theApp->getOPs().getLedgerByHash(hash);
if (!haveLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
if (!haveLedger->isClosed())
{
cLog(lsWARNING) << "Peer requests fetch pack from open ledger: " << hash;
punishPeer(LT_InvalidRequest);
return;
}
Ledger::pointer wantLedger = theApp->getOPs().getLedgerByHash(haveLedger->getParentHash());
if (!wantLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack",
BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1,
boost::weak_ptr<Peer>(shared_from_this()), packet, wantLedger, haveLedger));
}
bool Peer::hasProto(int version)
{
return mHello.has_protoversion() && (mHello.protoversion() >= version);
}
Json::Value Peer::getJson()
{
Json::Value ret(Json::objectValue);

View File

@@ -47,6 +47,7 @@ private:
uint64 mPeerId;
bool mPrivate; // Keep peer IP private.
LoadSource mLoad;
uint32 mMinLedger, mMaxLedger;
uint256 mClosedLedgerHash;
uint256 mPreviousLedgerHash;
@@ -93,7 +94,7 @@ protected:
void recvGetContacts(ripple::TMGetContacts& packet);
void recvGetPeers(ripple::TMGetPeers& packet);
void recvPeers(ripple::TMPeers& packet);
void recvGetObjectByHash(ripple::TMGetObjectByHash& packet);
void recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
void recvPing(ripple::TMPing& packet);
void recvErrorMessage(ripple::TMErrorMsg& packet);
void recvSearchTransaction(ripple::TMSearchTransaction& packet);
@@ -111,6 +112,8 @@ protected:
void addLedger(const uint256& ledger);
void addTxSet(const uint256& TxSet);
void doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
static void doProofOfWork(Job&, boost::weak_ptr<Peer>, ProofOfWork::pointer);
public:
@@ -160,6 +163,8 @@ public:
const RippleAddress& getNodePublic() const { return mNodePublic; }
void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); }
bool hasProto(int version);
bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); }
};
#endif

View File

@@ -375,6 +375,7 @@ protected:
SHAMapItem::pointer onlyBelow(SHAMapTreeNode*);
void eraseChildren(SHAMapTreeNode::pointer);
void dropBelow(SHAMapTreeNode*);
bool hasNode(const SHAMapNode& id, const uint256& hash);
bool walkBranch(SHAMapTreeNode* node, SHAMapItem::ref otherMapItem, bool isFirstMap,
SHAMapDiff& differences, int& maxCount);
@@ -476,6 +477,9 @@ public:
bool deepCompare(SHAMap& other);
virtual void dump(bool withHashes = false);
typedef std::pair< uint256, std::vector<unsigned char> > fetchPackEntry_t;
std::list<fetchPackEntry_t> getFetchPack(SHAMap* have, bool includeLeaves, int max);
static void sweep() { fullBelowCache.sweep(); }
};

View File

@@ -461,6 +461,101 @@ bool SHAMap::deepCompare(SHAMap& other)
return true;
}
bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash)
{
SHAMapTreeNode* node = root.get();
while (node->isInner() && (node->getDepth() < nodeID.getDepth()))
{
int branch = node->selectBranch(nodeID.getNodeID());
if (node->isEmptyBranch(branch))
break;
node = getNodePointer(node->getChildNodeID(branch), node->getChildHash(branch));
}
return node->getNodeHash() == nodeHash;
}
std::list<SHAMap::fetchPackEntry_t> SHAMap::getFetchPack(SHAMap* have, bool includeLeaves, int max)
{
std::list<fetchPackEntry_t> ret;
boost::recursive_mutex::scoped_lock ul1(mLock);
UPTR_T< boost::unique_lock<boost::recursive_mutex> > ul2;
if (have)
{
UPTR_T< boost::unique_lock<boost::recursive_mutex> > ul3(
new boost::unique_lock<boost::recursive_mutex>(have->mLock, boost::try_to_lock));
if (!(*ul3))
{
cLog(lsINFO) << "Unable to create pack due to lock";
return ret;
}
ul2.swap(ul3);
}
if (root->isLeaf())
{
if (includeLeaves && !root->getNodeHash().isZero() &&
(!have || !have->hasNode(*root, root->getNodeHash())))
{
Serializer s;
root->addRaw(s, snfPREFIX);
ret.push_back(fetchPackEntry_t(root->getNodeHash(), s.peekData()));
}
return ret;
}
if (root->getNodeHash().isZero())
return ret;
if (have && (root->getNodeHash() == have->root->getNodeHash()))
return ret;
std::stack<SHAMapTreeNode*> stack; // contains unexplored non-matching inner node entries
stack.push(root.get());
while (!stack.empty())
{
SHAMapTreeNode* node = stack.top();
stack.pop();
// 1) Add this node to the pack
Serializer s;
node->addRaw(s, snfPREFIX);
ret.push_back(fetchPackEntry_t(node->getNodeHash(), s.peekData()));
--max;
// 2) push non-matching child inner nodes
for (int i = 0; i < 16; ++i)
{
if (!node->isEmptyBranch(i))
{
const uint256& childHash = node->getChildHash(i);
SHAMapNode childID = node->getChildNodeID(i);
SHAMapTreeNode *next = getNodePointer(childID, childHash);
if (next->isInner())
{
if (!have || !have->hasNode(*next, childHash))
stack.push(next);
}
else if (includeLeaves && (!have || !have->hasNode(childID, childHash)))
{
Serializer s;
node->addRaw(s, snfPREFIX);
ret.push_back(fetchPackEntry_t(node->getNodeHash(), s.peekData()));
--max;
}
}
}
if (max <= 0)
break;
}
return ret;
}
#ifdef DEBUG
#define SMS_DEBUG
#endif