Merge branch 'master' of github.com:jedmccaleb/NewCoin

This commit is contained in:
Arthur Britto
2012-05-11 14:59:11 -07:00
5 changed files with 179 additions and 62 deletions

View File

@@ -1,18 +1,28 @@
#include "boost/foreach.hpp"
#include "boost/make_shared.hpp"
#include "Application.h"
#include "LedgerAcquire.h"
#include <boost/foreach.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include "Application.h"
#define LEDGER_ACQUIRE_TIMEOUT 2
LedgerAcquire::LedgerAcquire(const uint256& hash) : mHash(hash),
mComplete(false), mFailed(false), mHaveBase(false), mHaveState(false), mHaveTransactions(false)
mComplete(false), mFailed(false), mHaveBase(false), mHaveState(false), mHaveTransactions(false),
mTimer(theApp->getIOService())
{
;
#ifdef DEBUG
std::cerr << "Acquiring ledger " << mHash.GetHex() << std::endl;
#endif
}
void LedgerAcquire::done()
{
#ifdef DEBUG
std::cerr << "Done acquiring ledger " << mHash.GetHex() << std::endl;
#endif
std::vector< boost::function<void (LedgerAcquire::pointer)> > triggers;
mLock.lock();
@@ -24,15 +34,18 @@ void LedgerAcquire::done()
triggers[i](shared_from_this());
}
void LedgerAcquire::setTimer()
void LedgerAcquire::resetTimer()
{
// WRITEME
mTimer.expires_from_now(boost::posix_time::seconds(LEDGER_ACQUIRE_TIMEOUT));
mTimer.async_wait(boost::bind(&LedgerAcquire::timerEntry,
boost::weak_ptr<LedgerAcquire>(shared_from_this()), boost::asio::placeholders::error));
}
void LedgerAcquire::timerEntry(boost::weak_ptr<LedgerAcquire> wptr)
void LedgerAcquire::timerEntry(boost::weak_ptr<LedgerAcquire> wptr, const boost::system::error_code& result)
{
if (result == boost::asio::error::operation_aborted) return;
LedgerAcquire::pointer ptr = wptr.lock();
if (ptr) ptr->trigger(true);
if (!!ptr) ptr->trigger(Peer::pointer());
}
void LedgerAcquire::addOnComplete(boost::function<void (LedgerAcquire::pointer)> trigger)
@@ -42,20 +55,36 @@ void LedgerAcquire::addOnComplete(boost::function<void (LedgerAcquire::pointer)>
mLock.unlock();
}
void LedgerAcquire::trigger(bool timer)
void LedgerAcquire::trigger(Peer::pointer peer)
{
if (mComplete || mFailed) return;
#ifdef DEBUG
std::cerr << "Trigger acquiring ledger " << mHash.GetHex() << std::endl;
std::cerr << "complete=" << mComplete << " failed=" << mFailed << std::endl;
std::cerr << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState << std::endl;
#endif
if (mComplete || mFailed)
return;
if (!mHaveBase)
{
#ifdef DEBUG
std::cerr << "need base" << std::endl;
#endif
boost::shared_ptr<newcoin::TMGetLedger> tmGL = boost::make_shared<newcoin::TMGetLedger>();
tmGL->set_ledgerhash(mHash.begin(), mHash.size());
tmGL->set_itype(newcoin::liBASE);
sendRequest(tmGL);
if (peer)
{
sendRequest(tmGL, peer);
return;
}
else sendRequest(tmGL);
}
if (mHaveBase && !mHaveTransactions)
{
#ifdef DEBUG
std::cerr << "need tx" << std::endl;
#endif
assert(mLedger);
if (mLedger->peekTransactionMap()->getHash().isZero())
{ // we need the root node
@@ -64,6 +93,11 @@ void LedgerAcquire::trigger(bool timer)
tmGL->set_ledgerseq(mLedger->getLedgerSeq());
tmGL->set_itype(newcoin::liTX_NODE);
*(tmGL->add_nodeids()) = SHAMapNode().getRawString();
if (peer)
{
sendRequest(tmGL, peer);
return;
}
sendRequest(tmGL);
}
else
@@ -88,6 +122,11 @@ void LedgerAcquire::trigger(bool timer)
tmGL->set_itype(newcoin::liTX_NODE);
for (std::vector<SHAMapNode>::iterator it = nodeIDs.begin(); it != nodeIDs.end(); ++it)
*(tmGL->add_nodeids()) = it->getRawString();
if (peer)
{
sendRequest(tmGL, peer);
return;
}
sendRequest(tmGL);
}
}
@@ -95,6 +134,9 @@ void LedgerAcquire::trigger(bool timer)
if (mHaveBase && !mHaveState)
{
#ifdef DEBUG
std::cerr << "need as" << std::endl;
#endif
assert(mLedger);
if (mLedger->peekAccountStateMap()->getHash().isZero())
{ // we need the root node
@@ -103,6 +145,11 @@ void LedgerAcquire::trigger(bool timer)
tmGL->set_ledgerseq(mLedger->getLedgerSeq());
tmGL->set_itype(newcoin::liAS_NODE);
*(tmGL->add_nodeids()) = SHAMapNode().getRawString();
if (peer)
{
sendRequest(tmGL, peer);
return;
}
sendRequest(tmGL);
}
else
@@ -127,6 +174,11 @@ void LedgerAcquire::trigger(bool timer)
tmGL->set_itype(newcoin::liAS_NODE);
for (std::vector<SHAMapNode>::iterator it =nodeIDs.begin(); it != nodeIDs.end(); ++it)
*(tmGL->add_nodeids()) = it->getRawString();
if (peer)
{
sendRequest(tmGL, peer);
return;
}
sendRequest(tmGL);
}
}
@@ -134,8 +186,13 @@ void LedgerAcquire::trigger(bool timer)
if (mComplete || mFailed)
done();
else if (timer)
setTimer();
else
resetTimer();
}
void LedgerAcquire::sendRequest(boost::shared_ptr<newcoin::TMGetLedger> tmGL, Peer::pointer peer)
{
peer->sendPacket(boost::make_shared<PackedMessage>(tmGL, newcoin::mtGET_LEDGER));
}
void LedgerAcquire::sendRequest(boost::shared_ptr<newcoin::TMGetLedger> tmGL)
@@ -153,7 +210,8 @@ void LedgerAcquire::sendRequest(boost::shared_ptr<newcoin::TMGetLedger> tmGL)
else
{
// FIXME: Track last peer sent to and time sent
it->lock()->sendPacket(packet);
Peer::pointer peer = it->lock();
if (peer) peer->sendPacket(packet);
return;
}
}
@@ -162,7 +220,7 @@ void LedgerAcquire::sendRequest(boost::shared_ptr<newcoin::TMGetLedger> tmGL)
void LedgerAcquire::peerHas(Peer::pointer ptr)
{
boost::recursive_mutex::scoped_lock sl(mLock);
std::list<boost::weak_ptr<Peer> >::iterator it=mPeers.begin();
std::list<boost::weak_ptr<Peer> >::iterator it = mPeers.begin();
while (it != mPeers.end())
{
Peer::pointer pr = it->lock();
@@ -175,6 +233,7 @@ void LedgerAcquire::peerHas(Peer::pointer ptr)
}
}
mPeers.push_back(ptr);
if (mPeers.size() == 1) trigger(ptr);
}
void LedgerAcquire::badPeer(Peer::pointer ptr)
@@ -198,13 +257,20 @@ void LedgerAcquire::badPeer(Peer::pointer ptr)
}
}
bool LedgerAcquire::takeBase(const std::string& data)
bool LedgerAcquire::takeBase(const std::string& data, Peer::pointer peer)
{ // Return value: true=normal, false=bad data
#ifdef DEBUG
std::cerr << "got base acquiring ledger " << mHash.GetHex() << std::endl;
#endif
boost::recursive_mutex::scoped_lock sl(mLock);
if (mHaveBase) return true;
mLedger = boost::make_shared<Ledger>(data);
if (mLedger->getHash() != mHash)
{
#ifdef DEBUG
std::cerr << "Acquire hash mismatch" << std::endl;
std::cerr << mLedger->getHash().GetHex() << "!=" << mHash.GetHex() << std::endl;
#endif
mLedger = Ledger::pointer();
return false;
}
@@ -212,40 +278,64 @@ bool LedgerAcquire::takeBase(const std::string& data)
if (!mLedger->getTransHash()) mHaveTransactions = true;
if (!mLedger->getAccountHash()) mHaveState = true;
mLedger->setAcquiring();
trigger(peer);
return true;
}
bool LedgerAcquire::takeTxNode(const std::list<SHAMapNode>& nodeIDs,
const std::list<std::vector<unsigned char> >& data)
const std::list<std::vector<unsigned char> >& data, Peer::pointer peer)
{
if (!mHaveBase) return false;
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin();
std::list<std::vector<unsigned char> >::const_iterator nodeDatait = data.begin();
while (nodeIDit != nodeIDs.end())
{
if (!mLedger->peekTransactionMap()->addKnownNode(*nodeIDit, *nodeDatait))
if (nodeIDit->isRoot())
{
if (!mLedger->peekTransactionMap()->addRootNode(mLedger->getTransHash(), *nodeDatait))
return false;
}
else if (!mLedger->peekTransactionMap()->addKnownNode(*nodeIDit, *nodeDatait))
return false;
++nodeIDit;
++nodeDatait;
}
if (!mLedger->peekTransactionMap()->isSynching()) mHaveTransactions = true;
if (!mLedger->peekTransactionMap()->isSynching())
{
mHaveTransactions = true;
if (mHaveState) mComplete = true;
}
trigger(peer);
return true;
}
bool LedgerAcquire::takeAsNode(const std::list<SHAMapNode>& nodeIDs,
const std::list<std::vector<unsigned char> >& data)
const std::list<std::vector<unsigned char> >& data, Peer::pointer peer)
{
#ifdef DEBUG
std::cerr << "got ASdata acquiring ledger " << mHash.GetHex() << std::endl;
#endif
if (!mHaveBase) return false;
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin();
std::list<std::vector<unsigned char> >::const_iterator nodeDatait = data.begin();
while (nodeIDit != nodeIDs.end())
{
if (!mLedger->peekAccountStateMap()->addKnownNode(*nodeIDit, *nodeDatait))
if (nodeIDit->isRoot())
{
if (!mLedger->peekAccountStateMap()->addRootNode(mLedger->getAccountHash(), *nodeDatait))
return false;
}
else if (!mLedger->peekAccountStateMap()->addKnownNode(*nodeIDit, *nodeDatait))
return false;
++nodeIDit;
++nodeDatait;
}
if (!mLedger->peekAccountStateMap()->isSynching()) mHaveState = true;
if (!mLedger->peekAccountStateMap()->isSynching())
{
mHaveState = true;
if (mHaveTransactions) mComplete = true;
}
trigger(peer);
return true;
}
@@ -254,7 +344,10 @@ LedgerAcquire::pointer LedgerAcquireMaster::findCreate(const uint256& hash)
boost::mutex::scoped_lock sl(mLock);
LedgerAcquire::pointer& ptr = mLedgers[hash];
if (ptr) return ptr;
return boost::make_shared<LedgerAcquire>(hash);
ptr = boost::make_shared<LedgerAcquire>(hash);
assert(mLedgers[hash] == ptr);
ptr->resetTimer(); // Cannot call in constructor
return ptr;
}
LedgerAcquire::pointer LedgerAcquireMaster::find(const uint256& hash)
@@ -277,13 +370,25 @@ bool LedgerAcquireMaster::dropLedger(const uint256& hash)
return mLedgers.erase(hash);
}
bool LedgerAcquireMaster::gotLedgerData(newcoin::TMLedgerData& packet)
bool LedgerAcquireMaster::gotLedgerData(newcoin::TMLedgerData& packet, Peer::pointer peer)
{
#ifdef DEBUG
std::cerr << "got data for acquiring ledger ";
#endif
uint256 hash;
if (packet.ledgerhash().size() != 32) return false;
if (packet.ledgerhash().size() != 32)
{
#ifdef DEBUG
std::cerr << "error" << std::endl;
#endif
return false;
}
memcpy(&hash, packet.ledgerhash().data(), 32);
#ifdef DEBUG
std::cerr << hash.GetHex() << std::endl;
#endif
LedgerAcquire::pointer ledger=find(hash);
LedgerAcquire::pointer ledger = find(hash);
if (!ledger) return false;
if (packet.type() == newcoin::liBASE)
@@ -291,9 +396,9 @@ bool LedgerAcquireMaster::gotLedgerData(newcoin::TMLedgerData& packet)
if (packet.nodes_size() != 1) return false;
const newcoin::TMLedgerNode& node = packet.nodes(0);
if (!node.has_nodedata()) return false;
return ledger->takeBase(node.nodedata());
return ledger->takeBase(node.nodedata(), peer);
}
else if ((packet.type() == newcoin::liTX_NODE) || (packet.type() == newcoin::liAS_NODE) )
else if ((packet.type() == newcoin::liTX_NODE) || (packet.type() == newcoin::liAS_NODE))
{
std::list<SHAMapNode> nodeIDs;
std::list<std::vector<unsigned char> > nodeData;
@@ -307,8 +412,9 @@ bool LedgerAcquireMaster::gotLedgerData(newcoin::TMLedgerData& packet)
nodeIDs.push_back(SHAMapNode(node.nodeid().data(), node.nodeid().size()));
nodeData.push_back(std::vector<unsigned char>(node.nodedata().begin(), node.nodedata().end()));
}
if (packet.type() == newcoin::liTX_NODE) return ledger->takeTxNode(nodeIDs, nodeData);
else return ledger->takeAsNode(nodeIDs, nodeData);
if (packet.type() == newcoin::liTX_NODE) return ledger->takeTxNode(nodeIDs, nodeData, peer);
else return ledger->takeAsNode(nodeIDs, nodeData, peer);
}
else return false;
return false;
}

View File

@@ -4,8 +4,10 @@
#include <vector>
#include <map>
#include "boost/enable_shared_from_this.hpp"
#include "boost/function.hpp"
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/asio.hpp>
#include <boost/thread/mutex.hpp>
#include "Ledger.h"
#include "Peer.h"
@@ -21,16 +23,19 @@ protected:
Ledger::pointer mLedger;
uint256 mHash;
bool mComplete, mFailed, mHaveBase, mHaveState, mHaveTransactions;
boost::asio::deadline_timer mTimer;
std::vector< boost::function<void (LedgerAcquire::pointer)> > mOnComplete;
std::list<boost::weak_ptr<Peer> > mPeers; // peers known to have this ledger
void done();
void trigger(bool timer);
static void timerEntry(boost::weak_ptr<LedgerAcquire>);
static void timerEntry(boost::weak_ptr<LedgerAcquire>, const boost::system::error_code&);
void sendRequest(boost::shared_ptr<newcoin::TMGetLedger> message);
void setTimer();
void sendRequest(boost::shared_ptr<newcoin::TMGetLedger> message, Peer::pointer peer);
void trigger(Peer::pointer peer);
public:
LedgerAcquire(const uint256& hash);
@@ -47,9 +52,12 @@ public:
void peerHas(Peer::pointer);
void badPeer(Peer::pointer);
bool takeBase(const std::string& data);
bool takeTxNode(const std::list<SHAMapNode>& IDs, const std::list<std::vector<unsigned char> >& data);
bool takeAsNode(const std::list<SHAMapNode>& IDs, const std::list<std::vector<unsigned char> >& data);
bool takeBase(const std::string& data, Peer::pointer);
bool takeTxNode(const std::list<SHAMapNode>& IDs, const std::list<std::vector<unsigned char> >& data,
Peer::pointer);
bool takeAsNode(const std::list<SHAMapNode>& IDs, const std::list<std::vector<unsigned char> >& data,
Peer::pointer);
void resetTimer();
};
class LedgerAcquireMaster
@@ -65,7 +73,7 @@ public:
LedgerAcquire::pointer find(const uint256& hash);
bool hasLedger(const uint256& ledgerHash);
bool dropLedger(const uint256& ledgerHash);
bool gotLedgerData(newcoin::TMLedgerData& packet);
bool gotLedgerData(newcoin::TMLedgerData& packet, Peer::pointer);
};
#endif

View File

@@ -39,12 +39,12 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index)
{
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
std::map<uint32, Ledger::pointer>::iterator it(mLedgersByIndex.find(index));
if(it!=mLedgersByIndex.end()) return it->second;
if (it != mLedgersByIndex.end()) return it->second;
sl.unlock();
Ledger::pointer ret(Ledger::loadByIndex(index));
if(!ret) return ret;
assert(ret->getLedgerSeq()==index);
if (!ret) return ret;
assert(ret->getLedgerSeq() == index);
sl.lock();
mLedgersByHash.canonicalize(ret->getHash(), ret);
@@ -54,16 +54,16 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index)
Ledger::pointer LedgerHistory::getLedgerByHash(const uint256& hash)
{
Ledger::pointer ret=mLedgersByHash.fetch(hash);
if(ret) return ret;
Ledger::pointer ret = mLedgersByHash.fetch(hash);
if (ret) return ret;
ret=Ledger::loadByHash(hash);
if(!ret) return ret;
assert(ret->getHash()==hash);
ret = Ledger::loadByHash(hash);
if (!ret) return ret;
assert(ret->getHash() == hash);
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
mLedgersByHash.canonicalize(hash, ret);
if(ret->isAccepted()) mLedgersByIndex[ret->getLedgerSeq()]=ret;
if (ret->isAccepted()) mLedgersByIndex[ret->getLedgerSeq()] = ret;
return ret;
}
@@ -71,17 +71,17 @@ Ledger::pointer LedgerHistory::canonicalizeLedger(Ledger::pointer ledger, bool s
{
uint256 h(ledger->getHash());
if(!save)
if (!save)
{ // return input ledger if not in map, otherwise, return corresponding map ledger
Ledger::pointer ret=mLedgersByHash.fetch(h);
if(ret) return ret;
Ledger::pointer ret = mLedgersByHash.fetch(h);
if (ret) return ret;
return ledger;
}
// save input ledger in map if not in map, otherwise return corresponding map ledger
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
mLedgersByHash.canonicalize(h, ledger);
if(ledger->isAccepted()) mLedgersByIndex[ledger->getLedgerSeq()]=ledger;
if (ledger->isAccepted()) mLedgersByIndex[ledger->getLedgerSeq()]=ledger;
return ledger;
}
// vim:ts=4

View File

@@ -155,8 +155,11 @@ void NetworkOPs::setStateTimer(int sec)
uint64 closedTime = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC();
uint64 now = getNetworkTimeNC();
if (now >= closedTime) sec = 0;
else if (sec > (closedTime - now)) sec = (closedTime - now);
if (mMode == omFULL)
{
if (now >= closedTime) sec = 0;
else if (sec > (closedTime - now)) sec = (closedTime - now);
}
mNetTimer.expires_from_now(boost::posix_time::seconds(sec));
mNetTimer.async_wait(boost::bind(&NetworkOPs::checkState, this, boost::asio::placeholders::error));
@@ -264,6 +267,9 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
Ledger::pointer consensus = theApp->getMasterLedger().getLedgerByHash(closedLedger);
if (!consensus)
{
#ifdef DEBUG
std::cerr << "Acquiring consensus ledger" << std::endl;
#endif
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(closedLedger);
if (!acq || acq->isFailed())
{

View File

@@ -256,9 +256,6 @@ void Peer::sendPacket(PackedMessage::pointer packet)
void Peer::start_read_header()
{
#ifdef DEBUG
std::cerr << "SRH" << std::endl;
#endif
mReadbuf.clear();
mReadbuf.resize(HEADER_SIZE);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf),
@@ -776,7 +773,7 @@ void Peer::recvGetLedger(newcoin::TMGetLedger& packet)
void Peer::recvLedger(newcoin::TMLedgerData& packet)
{
if(!theApp->getMasterLedgerAcquire().gotLedgerData(packet))
if(!theApp->getMasterLedgerAcquire().gotLedgerData(packet, shared_from_this()))
punishPeer(PP_UNWANTED_DATA);
}