From db8763ef19daa7dddc902fcd9d628fef00957ee1 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 13 Feb 2012 10:48:59 -0800 Subject: [PATCH] More work on peer to peer ledger sync. --- Peer.cpp | 169 +++++++++++++++++++++++++++++++++++++----------- Peer.h | 8 +++ SHAMap.h | 6 ++ SHAMapNodes.cpp | 16 +++++ Serializer.h | 1 + newcoin.proto | 31 +++++---- 6 files changed, 184 insertions(+), 47 deletions(-) diff --git a/Peer.cpp b/Peer.cpp index 5f6b685b0..05827f428 100644 --- a/Peer.cpp +++ b/Peer.cpp @@ -14,10 +14,6 @@ #include "Application.h" #include "Conversion.h" -using namespace std; -using namespace boost; -using namespace boost::asio::ip; - Peer::Peer(boost::asio::io_service& io_service) : mSocket(io_service) { @@ -27,9 +23,9 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra { #ifdef DEBUG if(error) - cout << "Peer::handle_write Error: " << error << " bytes: "<< bytes_transferred << endl; + std::cout << "Peer::handle_write Error: " << error << " bytes: "<< bytes_transferred << std::endl; else - cout << "Peer::handle_write bytes: "<< bytes_transferred << endl; + std::cout << "Peer::handle_write bytes: "<< bytes_transferred << std::endl; #endif mSendingPacket=PackedMessage::pointer(); @@ -62,7 +58,7 @@ void Peer::connected(const boost::system::error_code& error) { if(!error) { - cout << "Connected to Peer." << endl; //BOOST_LOG_TRIVIAL(info) << "Connected to Peer."; + std::cout << "Connected to Peer." << std::endl; //BOOST_LOG_TRIVIAL(info) << "Connected to Peer."; sendHello(); start_read_header(); @@ -70,7 +66,7 @@ void Peer::connected(const boost::system::error_code& error) else { detach(); - cout << "Peer::connected Error: " << error << endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; + std::cout << "Peer::connected Error: " << error << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; } } @@ -106,8 +102,8 @@ void Peer::start_read_header() #endif mReadbuf.clear(); mReadbuf.resize(HEADER_SIZE); - asio::async_read(mSocket, asio::buffer(mReadbuf), - boost::bind(&Peer::handle_read_header, shared_from_this(), asio::placeholders::error)); + boost::asio::async_read(mSocket, boost::asio::buffer(mReadbuf), + boost::bind(&Peer::handle_read_header, shared_from_this(), boost::asio::placeholders::error)); } void Peer::start_read_body(unsigned msg_len) @@ -117,8 +113,8 @@ void Peer::start_read_body(unsigned msg_len) // read into the body. // mReadbuf.resize(HEADER_SIZE + msg_len); - asio::async_read(mSocket, asio::buffer(&mReadbuf[HEADER_SIZE], msg_len), - boost::bind(&Peer::handle_read_body, shared_from_this(), asio::placeholders::error)); + boost::asio::async_read(mSocket, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len), + boost::bind(&Peer::handle_read_body, shared_from_this(), boost::asio::placeholders::error)); } void Peer::handle_read_header(const boost::system::error_code& error) @@ -137,7 +133,7 @@ void Peer::handle_read_header(const boost::system::error_code& error) else { detach(); - cout << "Peer::connected Error: " << error << endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; + std::cout << "Peer::connected Error: " << error << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; } } @@ -151,7 +147,7 @@ void Peer::handle_read_body(const boost::system::error_code& error) else { detach(); - cout << "Peer::connected Error: " << error << endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; + std::cout << "Peer::connected Error: " << error << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; } } @@ -169,7 +165,7 @@ void Peer::processReadBuffer() newcoin::TMHello msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvHello(msg); - else cout << "parse error: " << type << endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; + else std::cout << "parse error: " << type << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; } break; @@ -178,7 +174,7 @@ void Peer::processReadBuffer() newcoin::TMErrorMsg msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvErrorMessage(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -187,7 +183,7 @@ void Peer::processReadBuffer() newcoin::TMPing msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvPing(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -196,7 +192,7 @@ void Peer::processReadBuffer() newcoin::TMGetContacts msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetContacts(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -205,7 +201,7 @@ void Peer::processReadBuffer() newcoin::TMContact msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvContact(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -214,7 +210,7 @@ void Peer::processReadBuffer() newcoin::TMSearchTransaction msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvSearchTransaction(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -223,7 +219,7 @@ void Peer::processReadBuffer() newcoin::TMGetAccount msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetAccount(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -232,7 +228,7 @@ void Peer::processReadBuffer() newcoin::TMAccount msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvAccount(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -241,7 +237,7 @@ void Peer::processReadBuffer() newcoin::TMTransaction msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvTransaction(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -250,7 +246,7 @@ void Peer::processReadBuffer() newcoin::TMGetLedger msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetLedger(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -259,7 +255,7 @@ void Peer::processReadBuffer() newcoin::TMLedgerData msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvLedger(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -269,7 +265,7 @@ void Peer::processReadBuffer() newcoin::TM msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recv(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -278,7 +274,7 @@ void Peer::processReadBuffer() newcoin::TM msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recv(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -287,7 +283,7 @@ void Peer::processReadBuffer() newcoin::TM msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recv(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -296,7 +292,7 @@ void Peer::processReadBuffer() newcoin::TM msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recv(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -307,7 +303,7 @@ void Peer::processReadBuffer() newcoin::TMGetObjectByHash msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetObjectByHash(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; @@ -316,12 +312,12 @@ void Peer::processReadBuffer() newcoin::TMObjectByHash msg; if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvObjectByHash(msg); - else cout << "pars error: " << type << endl; + else std::cout << "pars error: " << type << std::endl; } break; default: - cout << "Unknown Msg: " << type << endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; + std::cout << "Unknown Msg: " << type << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; } } @@ -414,6 +410,103 @@ void Peer::recvAccount(newcoin::TMAccount& packet) void Peer::recvGetLedger(newcoin::TMGetLedger& packet) { + // Figure out what ledger they want + Ledger::pointer ledger; + if(packet.has_ledgerhash()) + { + uint256 ledgerhash; + if(packet.ledgerhash().size()!=32) + { + punishPeer(PP_INVALID_REQUEST); + return; + } + memcpy(&ledgerhash, packet.ledgerhash().data(), 32); + ledger=theApp->getMasterLedger().getLedgerByHash(ledgerhash); + } + else if(packet.has_ledgerseq()) + ledger=theApp->getMasterLedger().getLedgerBySeq(packet.ledgerseq()); + else if(packet.has_ltype() && (packet.ltype()==newcoin::ltCURRENT) ) + ledger=theApp->getMasterLedger().getCurrentLedger(); + else if(packet.has_ltype() && (packet.ltype()==newcoin::ltCLOSING) ) + { + ledger=theApp->getMasterLedger().getClosingLedger(); + } + else if(packet.has_ltype() && (packet.ltype()==newcoin::ltCLOSED) ) + { + ledger=theApp->getMasterLedger().getClosingLedger(); + if(ledger && !ledger->isClosed()) + ledger=theApp->getMasterLedger().getLedgerBySeq(ledger->getLedgerSeq()-1); + } + else + { + punishPeer(PP_INVALID_REQUEST); + return; + } + + if( (!ledger) || (packet.has_ledgerseq() && (packet.ledgerseq()!=ledger->getLedgerSeq())) ) + { + punishPeer(PP_UNKNOWN_REQUEST); + return; + } + + // Figure out what information they want + newcoin::TMLedgerData* data=new newcoin::TMLedgerData; + uint256 lHash=ledger->getHash(); + data->set_ledgerhash(lHash.begin(), lHash.size()); + data->set_ledgerseq(ledger->getLedgerSeq()); + data->set_type(packet.itype()); + + if(packet.itype()==newcoin::liBASE) + { + Serializer nData(116); + ledger->addRaw(nData); + newcoin::TMLedgerNode* node=data->add_nodes(); + node->set_nodedata(nData.getDataPtr(), nData.getLength()); + } + else if ( (packet.itype()==newcoin::liTX_NODE) || (packet.itype()==newcoin::liAS_NODE) ) + { + SHAMap::pointer map=(packet.itype()==newcoin::liTX_NODE) ? ledger->peekTransactionMap() + : ledger->peekAccountStateMap(); + if(!map) return; + if(packet.nodeids_size()==0) + { + punishPeer(PP_INVALID_REQUEST); + return; + } + for(int i=0; i nodeIDs; + std::list > rawNodes; + if(map->getNodeFat(mn, nodeIDs, rawNodes)) + { + std::vector::iterator nodeIDIterator; + std::list >::iterator rawNodeIterator; + for(nodeIDIterator=nodeIDs.begin(), rawNodeIterator=rawNodes.begin(); + nodeIDIterator!=nodeIDs.end(); ++nodeIDIterator, ++rawNodeIterator) + { + newcoin::TMLedgerNode* node=data->add_nodes(); + Serializer nID(33); + nodeIDIterator->addIDRaw(nID); + node->set_nodeid(nID.getDataPtr(), nID.getLength()); + node->set_nodedata(&rawNodeIterator->front(), rawNodeIterator->size()); + } + } + } + } + else + { + punishPeer(PP_INVALID_REQUEST); + return; + } + PackedMessage::pointer oPacket=boost::make_shared + (PackedMessage::MessagePointer(data), newcoin::mtLEDGER); + sendPacket(oPacket); } void Peer::recvLedger(newcoin::TMLedgerData& packet) @@ -439,9 +532,12 @@ void Peer::sendHello() PackedMessage::pointer packet=boost::make_shared (PackedMessage::MessagePointer(h), newcoin::mtHELLO); - sendPacket(packet); + sendPacket(packet); } +void Peer::punishPeer(PeerPunish) +{ +} #if 0 @@ -561,9 +657,10 @@ void Peer::receiveTransaction(TransactionPtr trans) PackedMessage::pointer packet=boost::make_shread (PackedMessage::MessagePointer(new newcoin::Transaction(*(trans.get()))),newcoin::TRANSACTION); pool.relayMessage(this,packet); - }else + } + else { - cout << "Invalid transaction: " << trans->from() << endl; + std::cout << "Invalid transaction: " << trans->from() << std::endl; } } @@ -582,7 +679,7 @@ void Peer::connectTo(KnownNode& node) { tcp::endpoint endpoint( address::from_string(node.mIP), node.mPort); mSocket.async_connect(endpoint, - boost::bind(&Peer::connected, this, asio::placeholders::error) ); + boost::bind(&Peer::connected, this, boost::asio::placeholders::error) ); } diff --git a/Peer.h b/Peer.h index bec265e9d..f874bbd46 100644 --- a/Peer.h +++ b/Peer.h @@ -12,6 +12,12 @@ #include "Transaction.h" #include "NetworkOPs.h" +enum PeerPunish +{ + PP_INVALID_REQUEST=1, // The peer sent a request that makes no sense + PP_UNKNOWN_REQUEST=2, // The peer sent a request that might be garbage +}; + class Peer : public boost::enable_shared_from_this { public: @@ -84,6 +90,8 @@ public: void sendFullLedger(Ledger::pointer ledger); void sendGetFullLedger(uint256& hash); + void punishPeer(PeerPunish pp); + //static PackedMessage::pointer createFullLedger(Ledger::pointer ledger); static PackedMessage::pointer createLedgerProposal(Ledger::pointer ledger); static PackedMessage::pointer createValidation(Ledger::pointer ledger); diff --git a/SHAMap.h b/SHAMap.h index 38ba597b6..5703f2cb2 100644 --- a/SHAMap.h +++ b/SHAMap.h @@ -40,6 +40,7 @@ public: SHAMapNode(int depth, const uint256& hash); int getDepth() const { return mDepth; } const uint256& getNodeID() const { return mNodeID; } + bool isValid() const { return (mDepth>=0) && (mDepth<64); } virtual bool isPopulated() const { return false; } @@ -66,6 +67,11 @@ public: static void ClassInit(); static uint256 getNodeID(int depth, const uint256& hash); + + // Convert to/from wire format (256-bit nodeID, 1-byte depth) + void addIDRaw(Serializer &s) const; + static int getRawIDLength(void) { return 33; } + SHAMapNode(const void *ptr, int len); }; class hash_SMN diff --git a/SHAMapNodes.cpp b/SHAMapNodes.cpp index 28794ea8c..688970bd8 100644 --- a/SHAMapNodes.cpp +++ b/SHAMapNodes.cpp @@ -95,6 +95,22 @@ SHAMapNode::SHAMapNode(int depth, const uint256 &hash) : mDepth(depth) mNodeID = getNodeID(depth, hash); } +SHAMapNode::SHAMapNode(const void *ptr, int len) +{ + if(len<33) mDepth=-1; + else + { + memcpy(&mNodeID, ptr, 32); + mDepth=*(reinterpret_cast(ptr) + 32); + } +} + +void SHAMapNode::addIDRaw(Serializer &s) const +{ + s.add256(mNodeID); + s.add1(mDepth); +} + SHAMapNode SHAMapNode::getChildNodeID(int m) const { // This can be optimized to avoid the << if needed assert((m>=0) && (m<16)); diff --git a/Serializer.h b/Serializer.h index 2cb3fe242..42efc85fb 100644 --- a/Serializer.h +++ b/Serializer.h @@ -19,6 +19,7 @@ class Serializer public: Serializer(int n=256) { mData.reserve(n); } Serializer(const std::vector &data) : mData(data) { ; } + Serializer(const std::string& data) : mData(data.data(), (data.data()) + data.size()) { ; } // assemble functions int add1(unsigned char byte); diff --git a/newcoin.proto b/newcoin.proto index 7a5144c9c..6a66f6c6f 100644 --- a/newcoin.proto +++ b/newcoin.proto @@ -175,28 +175,37 @@ message TMObjectByHash optional uint32 seq = 3; // matches seq from query } -message LedgerNodes { - required bytes nodeid = 1; +message TMLedgerNode { + optional bytes nodeid = 1; required bytes nodedata = 2; } enum TMLedgerInfoType { - BASE = 0; // basic ledger info - TX_NODE = 1; // transaction node - AS_NODE = 2; // account state node - TX = 3; // transaction + liBASE = 0; // basic ledger info + liTX_NODE = 1; // transaction node + liAS_NODE = 2; // account state node +} + +enum TMLedgerType { + ltACCEPTED = 0; + ltCURRENT = 1; + ltCLOSING = 2; + ltCLOSED = 3; } message TMGetLedger { - required bytes ledgerHash = 1; - required TMLedgerInfoType type = 2; - repeated bytes nodes = 3; + optional TMLedgerType ltype = 1; + optional bytes ledgerHash = 2; + optional uint32 ledgerSeq = 3; + required TMLedgerInfoType itype = 4; + repeated bytes nodeIDs = 5; } message TMLedgerData { required bytes ledgerHash = 1; - required TMLedgerInfoType type = 2; - repeated LedgerNodes nodes = 3; + required uint32 ledgerSeq = 2; + required TMLedgerInfoType type = 3; + repeated TMLedgerNode nodes = 4; }