Rewrite of ledger proposal receiving code. Dispatch to thread pool for

signature check. Re-dispatch to main I/O service for handover to
consensus engine.
This commit is contained in:
JoelKatz
2012-11-04 18:59:50 -08:00
parent d1acf6953d
commit d065231d8d
4 changed files with 143 additions and 68 deletions

View File

@@ -500,8 +500,8 @@ void Peer::processReadBuffer()
case ripple::mtPROPOSE_LEDGER:
{
ripple::TMProposeSet msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
boost::shared_ptr<ripple::TMProposeSet> msg = boost::make_shared<ripple::TMProposeSet>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvPropose(msg);
else
cLog(lsWARNING) << "parse error: " << type;
@@ -709,6 +709,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx
if (tx->getStatus() == INVALID)
{
theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD);
punshPeer(peer, PP_BAD_SIGNATURE);
return;
}
else
@@ -748,7 +749,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
{ // we have seen this transaction recently
if ((flags & SF_BAD) != 0)
{
punishPeer(PP_INVALID_REQUEST);
punishPeer(PP_BAD_SIGNATURE);
return;
}
@@ -757,7 +758,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
}
theApp->getJobQueue().addJob(jtTRANSACTION,
boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr<Peer>(shared_from_this())));
boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr<Peer>(shared_from_this())));
#ifndef TRUST_NETWORK
}
@@ -774,42 +775,118 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
}
void Peer::recvPropose(ripple::TMProposeSet& packet)
static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packet,
uint256 suppression, LedgerProposal::pointer proposal, uint256 consensusLCL,
RippleAddress nodePublic, boost::weak_ptr<Peer> peer)
{ // FIXME: Suppress relaying proposals with incorrect LCLs
bool sigGood = false;
bool isTrusted = (job.getType() == jtPROPOSAL_t);
cLog(lsTRACE) << "Checking " << (isTrusted ? "trusted" : "UNtrusted") << " proposal";
assert(packet);
ripple::TMProposeSet& set = *packet;
uint256 prevLedger;
if (set.has_previousledger())
{ // proposal includes a previous ledger
cLog(lsTRACE) << "proposal with previous ledger";
memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8);
if (!proposal->checkSign(set.signature()))
{
cLog(lsWARNING) << "proposal with previous ledger fails signature check";
Peer::punishPeer(peer, PP_BAD_SIGNATURE);
return;
}
else
sigGood = true;
}
else
{
if (consensusLCL.isNonZero() && proposal->checkSign(set.signature()))
{
prevLedger = consensusLCL;
sigGood = true;
}
else
{
cLog(lsWARNING) << "Ledger proposal fails signature check";
proposal->setSignature(set.signature());
}
}
if (isTrusted)
{
theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(),
suppression, proposal, packet, nodePublic, prevLedger, sigGood));
}
else
{ // untrusted proposal, just relay it
cLog(lsTRACE) << "relaying untrusted proposal";
std::set<uint64> peers;
theApp->getSuppression().swapSet(suppression, peers, SF_RELAYED);
PackedMessage::pointer message = boost::make_shared<PackedMessage>(set, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessageBut(peers, message);
}
}
void Peer::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
{
if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) ||
(packet.signature().size() < 56) || (packet.nodepubkey().size() > 128) || (packet.signature().size() > 128))
assert(packet);
ripple::TMProposeSet& set = *packet;
if ((set.currenttxhash().size() != 32) || (set.nodepubkey().size() < 28) ||
(set.signature().size() < 56) || (set.nodepubkey().size() > 128) || (set.signature().size() > 128))
{
cLog(lsWARNING) << "Received proposal is malformed";
return;
}
uint256 currentTxHash, prevLedger;
memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32);
if (set.has_previousledger() && (set.previousledger().size() != 32))
{
cLog(lsWARNING) << "Received proposal is malformed";
return;
}
if ((packet.has_previousledger()) && (packet.previousledger().size() == 32))
memcpy(prevLedger.begin(), packet.previousledger().data(), 32);
uint256 proposeHash, prevLedger;
memcpy(proposeHash.begin(), set.currenttxhash().data(), 32);
if ((set.has_previousledger()) && (set.previousledger().size() == 32))
memcpy(prevLedger.begin(), set.previousledger().data(), 32);
Serializer s(512);
s.add256(currentTxHash);
s.add256(prevLedger);
s.add32(packet.proposeseq());
s.add32(packet.closetime());
s.addVL(packet.nodepubkey());
s.addVL(packet.signature());
s.add256(proposeHash);
s.add32(set.proposeseq());
s.add32(set.closetime());
s.addVL(set.nodepubkey());
s.addVL(set.signature());
if (set.has_previousledger())
s.add256(prevLedger);
uint256 suppression = s.getSHA512Half();
if (!theApp->isNew(suppression, mPeerId))
{
cLog(lsTRACE) << "Received duplicate proposal from peer " << mPeerId;
return;
RippleAddress nodePublic = RippleAddress::createNodePublic(strCopy(packet.nodepubkey()));
// bool isTrusted = theApp->getUNL().nodeInUNL(nodePublic);
if(theApp->getOPs().recvPropose(suppression, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(),
packet.signature(), nodePublic))
{ // FIXME: Not all nodes will want proposals
PackedMessage::pointer message = boost::make_shared<PackedMessage>(packet, ripple::mtPROPOSE_LEDGER);
theApp->getConnectionPool().relayMessage(this, message);
}
RippleAddress signerPublic = RippleAddress::createNodePublic(strCopy(set.nodepubkey()));
if (signerPublic == theConfig.VALIDATION_PUB)
{
cLog(lsTRACE) << "Received our own proposal from peer " << mPeerId;
return;
}
bool isTrusted = theApp->getUNL().nodeInUNL(signerPublic);
cLog(lsTRACE) << "Received " << (isTrusted ? "trusted" : "UNtrusted") << " proposal from " << mPeerId;
uint256 consensusLCL = theApp->getOPs().getConsensusLCL();
LedgerProposal::pointer proposal = boost::make_shared<LedgerProposal>(
prevLedger.isNonZero() ? prevLedger : consensusLCL,
set.proposeseq(), proposeHash, set.closetime(), signerPublic);
theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
boost::bind(&checkPropose, _1, packet, suppression, proposal, consensusLCL,
mNodePublic, boost::weak_ptr<Peer>(shared_from_this())));
}
void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet)