First part of fetch acceleration changes. Includes a bugfix that I'll push to master.

This commit is contained in:
JoelKatz
2013-04-21 12:44:21 -07:00
parent b993c6ee32
commit 79ec8e6070
11 changed files with 272 additions and 28 deletions

View File

@@ -37,6 +37,7 @@ const char* Job::toString(JobType t)
switch(t)
{
case jtINVALID: return "invalid";
case jtPACK: return "makeFetchPack";
case jtPUBOLDLEDGER: return "publishAcqLedger";
case jtVALIDATION_ut: return "untrustedValidation";
case jtPROOFWORK: return "proofOfWork";

View File

@@ -22,21 +22,22 @@
enum JobType
{ // must be in priority order, low to high
jtINVALID = -1,
jtPUBOLDLEDGER = 1, // An old ledger has been accepted
jtVALIDATION_ut = 2, // A validation from an untrusted source
jtPROOFWORK = 3, // A proof of work demand from another server
jtPROPOSAL_ut = 4, // A proposal from an untrusted source
jtLEDGER_DATA = 5, // Received data for a ledger we're acquiring
jtCLIENT = 6, // A websocket command from the client
jtTRANSACTION = 7, // A transaction received from the network
jtPUBLEDGER = 8, // Publish a fully-accepted ledger
jtWAL = 9, // Write-ahead logging
jtVALIDATION_t = 10, // A validation from a trusted source
jtWRITE = 11, // Write out hashed objects
jtTRANSACTION_l = 12, // A local transaction
jtPROPOSAL_t = 13, // A proposal from a trusted source
jtADMIN = 14, // An administrative operation
jtDEATH = 15, // job of death, used internally
jtPACK = 1, // Make a fetch pack for a peer
jtPUBOLDLEDGER = 2, // An old ledger has been accepted
jtVALIDATION_ut = 3, // A validation from an untrusted source
jtPROOFWORK = 4, // A proof of work demand from another server
jtPROPOSAL_ut = 5, // A proposal from an untrusted source
jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring
jtCLIENT = 7, // A websocket command from the client
jtTRANSACTION = 8, // A transaction received from the network
jtPUBLEDGER = 9, // Publish a fully-accepted ledger
jtWAL = 10, // Write-ahead logging
jtVALIDATION_t = 11, // A validation from a trusted source
jtWRITE = 12, // Write out hashed objects
jtTRANSACTION_l = 13, // A local transaction
jtPROPOSAL_t = 14, // A proposal from a trusted source
jtADMIN = 15, // An administrative operation
jtDEATH = 16, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 24,

View File

@@ -2004,4 +2004,45 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays
// jvResult["nodes"] = Json::Value(Json::arrayValue);
}
void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr<Peer> wPeer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer prevLedger, Ledger::pointer reqLedger)
{
Peer::pointer peer = wPeer.lock();
if (!peer)
return;
ripple::TMGetObjectByHash reply;
reply.set_query(false);
if (request->has_seq())
reply.set_seq(request->seq());
reply.set_ledgerhash(reply.ledgerhash());
std::list< std::pair<uint256, std::vector<unsigned char> > > pack1 = getSyncInfo(prevLedger->peekAccountStateMap(),
reqLedger->peekAccountStateMap(), 1024);
typedef std::pair< uint256, std::vector<unsigned char> > uvpair_t;
BOOST_FOREACH(uvpair_t& node, pack1)
{
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node.first.begin(), 256 / 8);
newObj.set_data(&node.second[0], node.second.size());
}
if (reqLedger->getAccountHash().isNonZero())
{
SHAMapIterator it(*reqLedger->peekTransactionMap(), true, true);
for (SHAMapTreeNode* node = it.getNext(); node != NULL; node = it.getNext())
{
Serializer s;
node->addRaw(s, snfPREFIX);
ripple::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(node->getNodeHash().begin(), 256 / 8);
newObj.set_data(&s.peekData()[0], s.peekData().size());
}
}
PackedMessage::pointer msg = boost::make_shared<PackedMessage>(reply, ripple::mtGET_OBJECTS);
peer->sendPacket(msg, false);
}
// vim:ts=4

View File

@@ -260,6 +260,8 @@ public:
bool hasTXSet(const boost::shared_ptr<Peer>& peer, const uint256& set, ripple::TxSetStatus status);
void mapComplete(const uint256& hash, SHAMap::ref map);
bool stillNeedTXSet(const uint256& hash);
void makeFetchPack(Job&, boost::weak_ptr<Peer> peer, boost::shared_ptr<ripple::TMGetObjectByHash> request,
Ledger::pointer prevLedger, Ledger::pointer reqLedger);
// network state machine
void checkState(const boost::system::error_code& result);

View File

@@ -652,8 +652,8 @@ void Peer::processReadBuffer()
case ripple::mtGET_OBJECTS:
{
event->reName("Peer::getobjects");
ripple::TMGetObjectByHash msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
boost::shared_ptr<ripple::TMGetObjectByHash> msg = boost::make_shared<ripple::TMGetObjectByHash>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvGetObjectByHash(msg);
else
cLog(lsWARNING) << "parse error: " << type;
@@ -1186,8 +1186,15 @@ void Peer::recvPeers(ripple::TMPeers& packet)
}
}
void Peer::recvGetObjectByHash(ripple::TMGetObjectByHash& packet)
void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& ptr)
{
ripple::TMGetObjectByHash& packet = *ptr;
if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK)
{
doFetchPack(ptr);
return;
}
if (packet.query())
{ // this is a query
ripple::TMGetObjectByHash reply;
@@ -1848,6 +1855,50 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr<Peer> peer, ProofOfWork::pointer
}
}
void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet)
{
if (packet->query())
{
if (packet->ledgerhash().size() != 32)
{
cLog(lsWARNING) << "FetchPack hash size malformed";
punishPeer(LT_InvalidRequest);
return;
}
uint256 hash;
memcpy(hash.begin(), packet->ledgerhash().data(), 32);
Ledger::pointer reqLedger = theApp->getOPs().getLedgerByHash(hash);
if (!reqLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
if (!reqLedger->isClosed())
{
cLog(lsWARNING) << "Peer requests fetch pack for open ledger: " << hash;
punishPeer(LT_InvalidRequest);
return;
}
Ledger::pointer prevLedger = theApp->getOPs().getLedgerByHash(reqLedger->getParentHash());
if (!prevLedger)
{
cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash;
punishPeer(LT_RequestNoReply);
return;
}
theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack",
BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1,
boost::weak_ptr<Peer>(shared_from_this()), packet, prevLedger, reqLedger));
}
else
{ // received fetch pack
// WRITEME
}
}
Json::Value Peer::getJson()
{
Json::Value ret(Json::objectValue);

View File

@@ -93,7 +93,7 @@ protected:
void recvGetContacts(ripple::TMGetContacts& packet);
void recvGetPeers(ripple::TMGetPeers& packet);
void recvPeers(ripple::TMPeers& packet);
void recvGetObjectByHash(ripple::TMGetObjectByHash& packet);
void recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
void recvPing(ripple::TMPing& packet);
void recvErrorMessage(ripple::TMErrorMsg& packet);
void recvSearchTransaction(ripple::TMSearchTransaction& packet);
@@ -111,6 +111,8 @@ protected:
void addLedger(const uint256& ledger);
void addTxSet(const uint256& TxSet);
void doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
static void doProofOfWork(Job&, boost::weak_ptr<Peer>, ProofOfWork::pointer);
public:

View File

@@ -25,7 +25,7 @@ DECLARE_INSTANCE(SHAMap);
DECLARE_INSTANCE(SHAMapItem);
DECLARE_INSTANCE(SHAMapTreeNode);
void SHAMapNode::setHash() const
void SHAMapNode::setMHash() const
{
std::size_t h = theApp->getNonceST() + (mDepth * 0x9e3779b9);
const unsigned int *ptr = reinterpret_cast<const unsigned int *>(mNodeID.begin());
@@ -36,7 +36,7 @@ void SHAMapNode::setHash() const
std::size_t hash_value(const SHAMapNode& mn)
{
return mn.getHash();
return mn.getMHash();
}
std::size_t hash_value(const uint256& u)

View File

@@ -35,7 +35,7 @@ private:
int mDepth;
mutable size_t mHash;
void setHash() const;
void setMHash() const;
protected:
SHAMapNode(int depth, const uint256& id, bool) : mNodeID(id), mDepth(depth), mHash(0) { ; }
@@ -51,7 +51,7 @@ public:
const uint256& getNodeID() const { return mNodeID; }
bool isValid() const { return (mDepth >= 0) && (mDepth < 64); }
bool isRoot() const { return mDepth == 0; }
size_t getHash() const { if (mHash == 0) setHash(); return mHash; }
size_t getMHash() const { if (mHash == 0) setMHash(); return mHash; }
virtual bool isPopulated() const { return false; }
@@ -329,10 +329,33 @@ public:
static SMAddNode invalid() { return SMAddNode(true, false); }
};
class SHAMapIterator
{
friend class SHAMap;
typedef std::pair<SHAMapTreeNode*, int> stack_t;
SHAMap& mMap;
std::stack<stack_t> mStack;
bool mInner, mLeaf, mLock;
public:
SHAMapIterator(SHAMap& map, bool returnInner, bool returnLeaf);
~SHAMapIterator();
bool lock();
bool unlock();
void reset();
SHAMapTreeNode* getNext();
};
extern bool SMANCombine(SMAddNode& existing, const SMAddNode& additional);
class SHAMap : public IS_INSTANCE(SHAMap)
{
friend class SHAMapIterator;
public:
typedef boost::shared_ptr<SHAMap> pointer;
typedef const boost::shared_ptr<SHAMap>& ref;
@@ -341,7 +364,7 @@ public:
typedef std::map<uint256, SHAMapDiffItem> SHAMapDiff;
typedef boost::unordered_map<SHAMapNode, SHAMapTreeNode::pointer> SHADirtyMap;
private:
protected:
uint32 mSeq;
mutable boost::recursive_mutex mLock;
boost::unordered_map<SHAMapNode, SHAMapTreeNode::pointer> mTNByID;
@@ -356,8 +379,6 @@ private:
static KeyCache<uint256> fullBelowCache;
protected:
void dirtyUp(std::stack<SHAMapTreeNode::pointer>& stack, const uint256& target, uint256 prevHash);
std::stack<SHAMapTreeNode::pointer> getStack(const uint256& id, bool include_nonmatching_leaf, bool partialOk);
SHAMapTreeNode::pointer walkTo(const uint256& id, bool modify);
@@ -479,5 +500,8 @@ public:
static void sweep() { fullBelowCache.sweep(); }
};
extern std::list< std::pair<uint256, std::vector<unsigned char> > >
getSyncInfo(SHAMap::pointer have, SHAMap::pointer want, int max);
#endif
// vim:ts=4

View File

@@ -571,6 +571,95 @@ std::ostream& operator<<(std::ostream& out, const SHAMapMissingNode& mn)
return out;
}
SHAMapIterator::SHAMapIterator(SHAMap& map, bool returnInner, bool returnLeaf) :
mMap(map), mInner(returnInner), mLeaf(returnLeaf), mLock(false)
{
mStack.push(stack_t(mMap.root.get(), 0));
}
SHAMapIterator::~SHAMapIterator()
{
if (mLock)
{
while (!mStack.empty())
mStack.pop();
mMap.mLock.unlock();
}
}
bool SHAMapIterator::lock()
{
if (mLock)
return false;
mMap.mLock.lock();
mLock = true;
return true;
}
bool SHAMapIterator::unlock()
{
if (!mLock)
return false;
mMap.mLock.unlock();
mLock = false;
return true;
}
void SHAMapIterator::reset()
{
while (!mStack.empty())
mStack.pop();
mStack.push(stack_t(mMap.root.get(), 0));
}
SHAMapTreeNode* SHAMapIterator::getNext()
{
if (mStack.empty())
return NULL;
stack_t& top = mStack.top();
if (top.first->isLeaf())
{ // special case, map has only one leaf
SHAMapTreeNode* ret = mLeaf ? top.first : NULL;
mStack.pop();
return ret;
}
while (1)
{
while (top.second < 16)
{ // continue where we left off
if (top.first->isEmptyBranch(top.second))
++top.second;
else
{
SHAMapTreeNode* next = mMap.getNodePointer(
top.first->getChildNodeID(top.second), top.first->getChildHash(top.second));
++top.second;
if (next->isLeaf())
{
if (mLeaf)
return next;
}
else
{
mStack.push(stack_t(next, 0));
top = mStack.top();
}
}
}
if (top.second == 16)
{ // we ran off the end of an inner node
SHAMapTreeNode* ret = top.first;
mStack.pop();
if (mInner)
return ret;
if (mStack.empty()) // ran off the end of the root
return NULL;
top = mStack.top();
}
}
}
// vim:ts=4

View File

@@ -97,7 +97,7 @@ void SHAMap::getMissingNodes(std::vector<SHAMapNode>& nodeIDs, std::vector<uint2
node->setFullBelow();
if (mType == smtSTATE)
{
fullBelowCache.add(node->getHash());
fullBelowCache.add(node->getNodeHash());
dropBelow(node);
}
}
@@ -162,7 +162,7 @@ std::vector<uint256> SHAMap::getNeededHashes(int max)
node->setFullBelow();
if (mType == smtSTATE)
{
fullBelowCache.add(node->getHash());
fullBelowCache.add(node->getNodeHash());
dropBelow(node);
}
}
@@ -172,6 +172,38 @@ std::vector<uint256> SHAMap::getNeededHashes(int max)
return ret;
}
std::list< std::pair<uint256, std::vector<unsigned char> > >
getSyncInfo(SHAMap::pointer have, SHAMap::pointer want, int max)
{
std::list< std::pair< uint256, std::vector<unsigned char> > > ret;
SHAMapIterator haveI(*have, true, false);
SHAMapIterator wantI(*want, true, false);
SHAMapTreeNode *haveN = haveI.getNext();
SHAMapTreeNode *wantN = wantI.getNext();
while (wantN != NULL)
{
if (haveN && (haveN->getNodeHash() == wantN->getNodeHash()))
{ // they match, advance both
haveN = haveI.getNext();
wantN = wantI.getNext();
}
else if (haveN && (haveN->getNodeHash() < wantN->getNodeHash()))
{ // need to advance have pointer
haveN = haveI.getNext();
}
else
{ // unmatched inner node
Serializer s;
wantN->addRaw(s, snfPREFIX);
ret.push_back(std::make_pair(wantN->getNodeHash(), s.peekData()));
if (--max <= 0)
break;
wantN = wantI.getNext();
}
}
return ret;
}
bool SHAMap::getNodeFat(const SHAMapNode& wanted, std::vector<SHAMapNode>& nodeIDs,
std::list<std::vector<unsigned char> >& rawNodes, bool fatRoot, bool fatLeaves)
{ // Gets a node and some of its children

View File

@@ -233,6 +233,7 @@ message TMGetObjectByHash
otTRANSACTION_NODE = 3;
otSTATE_NODE = 4;
otCAS_OBJECT = 5;
otFETCH_PACK = 6;
}
required ObjectType type = 1;