From f475dcb1fbefd1382d9b18c5b8803709b0573b9b Mon Sep 17 00:00:00 2001 From: Chalith Desaman Date: Wed, 18 Nov 2020 12:58:58 +0530 Subject: [PATCH] Dynamic peer list implementation. (#152) --- examples/nodejs_client/package-lock.json | 18 +- src/comm/comm_session.cpp | 15 +- src/comm/comm_session.hpp | 4 +- src/conf.cpp | 25 +- src/conf.hpp | 46 +++- src/msg/fbuf/p2pmsg_content.fbs | 24 +- src/msg/fbuf/p2pmsg_content_generated.h | 316 ++++++++++++++++++++++- src/msg/fbuf/p2pmsg_helpers.cpp | 127 ++++++++- src/msg/fbuf/p2pmsg_helpers.hpp | 18 +- src/p2p/p2p.cpp | 139 +++++++++- src/p2p/p2p.hpp | 16 ++ src/p2p/peer_comm_server.cpp | 85 +++++- src/p2p/peer_comm_server.hpp | 10 +- src/p2p/peer_comm_session.cpp | 4 +- src/p2p/peer_comm_session.hpp | 4 +- src/p2p/peer_session_handler.cpp | 50 +++- src/p2p/peer_session_handler.hpp | 4 +- src/usr/user_comm_session.cpp | 4 +- src/usr/user_comm_session.hpp | 2 +- src/usr/user_session_handler.cpp | 6 +- src/usr/user_session_handler.hpp | 2 +- 21 files changed, 848 insertions(+), 71 deletions(-) diff --git a/examples/nodejs_client/package-lock.json b/examples/nodejs_client/package-lock.json index d63ba334..88cefa61 100644 --- a/examples/nodejs_client/package-lock.json +++ b/examples/nodejs_client/package-lock.json @@ -5,17 +5,17 @@ "async-limiter": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", - "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==" + "integrity": "sha1-3TeelPDbgxCwgpH51kwyCXZmF/0=" }, "base64-js": { "version": "1.3.1", "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.3.1.tgz", - "integrity": "sha512-mLQ4i2QO1ytvGWFWmcngKO//JXAQueZvwEKtjgQFM4jIK0kU+ytMfplL8j+n5mspOfjHwoAg+9yhb7BwAHm36g==" + "integrity": "sha1-WOzoy3XdB+ce0IxzarxfrE2/jfE=" }, "bson": { "version": "4.0.4", "resolved": "https://registry.npmjs.org/bson/-/bson-4.0.4.tgz", - "integrity": "sha512-Ioi3TD0/1V3aI8+hPfC56TetYmzfq2H07jJa9A1lKTxWsFtHtYdLMGMXjtGEg9v0f72NSM07diRQEUNYhLupIA==", + "integrity": "sha1-S9os7fKuehjRXLJO4e3ox5f47s8=", "requires": { "buffer": "^5.1.0", "long": "^4.0.0" @@ -24,7 +24,7 @@ "buffer": { "version": "5.6.0", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz", - "integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==", + "integrity": "sha1-oxdJ3H2B2E2wir+Te2uMQDP2J4Y=", "requires": { "base64-js": "^1.0.2", "ieee754": "^1.1.4" @@ -41,7 +41,7 @@ "ieee754": { "version": "1.1.13", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.1.13.tgz", - "integrity": "sha512-4vf7I2LYV/HaWerSo3XmlMkp5eZ83i+/CDluXi/IGTs/O1sejBNhTtnxzmRZfvOUqj7lZjqHkeTvpgSFDlWZTg==" + "integrity": "sha1-7BaFWOlaoYH9h9N/VcMrvLZwi4Q=" }, "isomorphic-ws": { "version": "4.0.1", @@ -51,12 +51,12 @@ "libsodium": { "version": "0.7.6", "resolved": "https://registry.npmjs.org/libsodium/-/libsodium-0.7.6.tgz", - "integrity": "sha512-hPb/04sEuLcTRdWDtd+xH3RXBihpmbPCsKW/Jtf4PsvdyKh+D6z2D2gvp/5BfoxseP+0FCOg66kE+0oGUE/loQ==" + "integrity": "sha1-AYuAxXKAVIF4Rfv/pVQnREG9onc=" }, "libsodium-wrappers": { "version": "0.7.6", "resolved": "https://registry.npmjs.org/libsodium-wrappers/-/libsodium-wrappers-0.7.6.tgz", - "integrity": "sha512-OUO2CWW5bHdLr6hkKLHIKI4raEkZrf3QHkhXsJ1yCh6MZ3JDA7jFD3kCATNquuGSG6MjjPHQIQms0y0gBDzjQg==", + "integrity": "sha1-uu1MFtS/lhAQSHWtio4WTSWdSPs=", "requires": { "libsodium": "0.7.6" } @@ -64,7 +64,7 @@ "long": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", - "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" + "integrity": "sha1-mntxz7fTYaGU6lVSQckvdGjVvyg=" }, "node-gyp-build": { "version": "3.7.0", @@ -82,7 +82,7 @@ "ws": { "version": "7.1.2", "resolved": "https://registry.npmjs.org/ws/-/ws-7.1.2.tgz", - "integrity": "sha512-gftXq3XI81cJCgkUiAVixA0raD9IVmXqsylCrjRygw4+UOOGzPoxnQ6r/CnVL9i+mDncJo94tSkyrtuuQVBmrg==", + "integrity": "sha1-xnLRYp3ouyepaZ61mb5Hru7dj3M=", "requires": { "async-limiter": "^1.0.0" } diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index e9e97e62..04b89c13 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -31,17 +31,25 @@ namespace comm /** * Init() should be called to activate the session. * Because we are starting threads here, after init() is called, the session object must not be "std::moved". + * @return returns 0 on successful init, otherwise -1; */ - void comm_session::init() + int comm_session::init() { if (state == SESSION_STATE::NONE) { - handle_connect(); + if (handle_connect() == -1) + { + mark_for_closure(); + return -1; + } + reader_thread = std::thread(&comm_session::reader_loop, this); writer_thread = std::thread(&comm_session::process_outbound_msg_queue, this); state = SESSION_STATE::ACTIVE; last_activity_timestamp = util::get_epoch_milliseconds(); } + + return 0; } void comm_session::reader_loop() @@ -297,8 +305,9 @@ namespace comm } } - void comm_session::handle_connect() + int comm_session::handle_connect() { + return 0; } int comm_session::handle_message(std::string_view msg) diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 7a948fe5..3776f44a 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -41,7 +41,7 @@ namespace comm void reader_loop(); protected: - virtual void handle_connect(); + virtual int handle_connect(); virtual int handle_message(std::string_view msg); virtual void handle_close(); @@ -56,7 +56,7 @@ namespace comm comm_session( std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[4]); - void init(); + int init(); int process_next_inbound_message(); int send(const std::vector &message); int send(std::string_view message); diff --git a/src/conf.cpp b/src/conf.cpp index 057fb0b1..a724689b 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -86,8 +86,10 @@ namespace conf cfg.peerport = 22860; cfg.roundtime = 1000; cfg.pubport = 8080; + cfg.peerdiscoverytime = 30000; cfg.msgforwarding = false; + cfg.dynamicpeerdiscovery = false; #ifndef NDEBUG cfg.loglevel_type = conf::LOG_SEVERITY::DEBUG; @@ -252,7 +254,11 @@ namespace conf return -1; } - cfg.peers.emplace(std::make_pair(splitted_peers.front(), std::stoi(splitted_peers.back()))); + peer_properties peer; + peer.ip_port.host_address = splitted_peers.front(); + peer.ip_port.port = std::stoi(splitted_peers.back()); + + cfg.peers.push_back(peer); splitted_peers.clear(); } @@ -276,6 +282,7 @@ namespace conf cfg.peerport = d["peerport"].as(); cfg.pubport = d["pubport"].as(); cfg.roundtime = d["roundtime"].as(); + cfg.peerdiscoverytime = d["peerdiscoverytime"].as(); cfg.pubmaxsize = d["pubmaxsize"].as(); cfg.pubmaxcpm = d["pubmaxcpm"].as(); @@ -288,8 +295,17 @@ namespace conf cfg.peermaxbadmpm = d["peermaxbadmpm"].as(); cfg.peermaxbadsigpm = d["peermaxbadsigpm"].as(); cfg.peermaxcons = d["peermaxcons"].as(); + cfg.peermaxknowncons = d["peermaxknowncons"].as(); + + // If peermaxknowcons is greater than peermaxcons then show error and stop execution. + if (cfg.peermaxknowncons > cfg.peermaxcons) + { + std::cout << "Invalid configuration values: peermaxknowncons count should not exceed peermaxcons." << '\n'; + return -1; + } cfg.msgforwarding = d["msgforwarding"].as(); + cfg.dynamicpeerdiscovery = d["dynamicpeerdiscovery"].as(); cfg.loglevel = d["loglevel"].as(); cfg.loglevel_type = get_loglevel_type(cfg.loglevel); @@ -324,9 +340,9 @@ namespace conf d.insert_or_assign("appbillargs", cfg.appbillargs.data()); jsoncons::ojson peers(jsoncons::json_array_arg); - for (const auto &ipport_pair : cfg.peers) + for (const auto &peer : cfg.peers) { - const std::string concat_str = std::string(ipport_pair.first).append(":").append(std::to_string(ipport_pair.second)); + const std::string concat_str = std::string(peer.ip_port.host_address).append(":").append(std::to_string(peer.ip_port.port)); peers.push_back(concat_str); } d.insert_or_assign("peers", peers); @@ -346,6 +362,7 @@ namespace conf d.insert_or_assign("peerport", cfg.peerport); d.insert_or_assign("pubport", cfg.pubport); d.insert_or_assign("roundtime", cfg.roundtime); + d.insert_or_assign("peerdiscoverytime", cfg.peerdiscoverytime); d.insert_or_assign("pubmaxsize", cfg.pubmaxsize); d.insert_or_assign("pubmaxcpm", cfg.pubmaxcpm); @@ -358,8 +375,10 @@ namespace conf d.insert_or_assign("peermaxbadmpm", cfg.peermaxbadmpm); d.insert_or_assign("peermaxbadsigpm", cfg.peermaxbadsigpm); d.insert_or_assign("peermaxcons", cfg.peermaxcons); + d.insert_or_assign("peermaxknowncons", cfg.peermaxknowncons); d.insert_or_assign("msgforwarding", cfg.msgforwarding); + d.insert_or_assign("dynamicpeerdiscovery", cfg.dynamicpeerdiscovery); d.insert_or_assign("loglevel", cfg.loglevel); diff --git a/src/conf.hpp b/src/conf.hpp index 9588ed2b..7216913f 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -9,11 +9,32 @@ */ namespace conf { + // Struct to represent ip and port of the peer. + struct ip_port_prop + { + std::string host_address; + uint16_t port; - constexpr const char *SELF_HOST = "127.0.0.1"; + bool operator==(ip_port_prop ip_port) + { + return host_address == ip_port.host_address && port == ip_port.port; + } - // Typedef to represent ip address and port pair. - typedef std::pair ip_port_pair; + bool operator!=(ip_port_prop ip_port) + { + return !(host_address == ip_port.host_address && port == ip_port.port); + } + }; + + // Struct to represent information about a peer. + // Initially available capacity is set to -1 and timestamp is set to 0. + // Later it will be updated according to the capacity anouncement from the peers. + struct peer_properties + { + ip_port_prop ip_port; + int16_t available_capacity = -1; + uint64_t timestamp = 0; + }; // The operating mode of the contract node. enum OPERATING_MODE @@ -71,25 +92,28 @@ namespace conf std::string binargs; // CLI arguments to pass to the contract binary std::string appbill; // binary to execute for appbill std::string appbillargs; // any arguments to supply to appbill binary by default - std::set peers; // Set of peers keyed by ":" concatenated format + std::vector peers; // Vector of peers with ip_port, timestamp, capacity std::unordered_set unl; // Unique node list (list of binary public keys) uint16_t peerport = 0; // Listening port for peer connections uint16_t roundtime = 0; // Consensus round time in ms uint16_t pubport = 0; // Listening port for public user connections + uint16_t peerdiscoverytime = 0; // Time interval in ms to find for peers dynamicpeerdiscovery should be on for this uint64_t pubmaxsize = 0; // User message max size in bytes uint64_t pubmaxcpm = 0; // User message rate (characters(bytes) per minute) uint64_t pubmaxbadmpm = 0; // User bad messages per minute uint16_t pubmaxcons = 0; // Max inbound user connections - uint64_t peermaxsize = 0; // Peer message max size in bytes - uint64_t peermaxcpm = 0; // Peer message rate (characters(bytes) per minute) - uint64_t peermaxdupmpm = 0; // Peer max duplicate messages per minute - uint64_t peermaxbadmpm = 0; // Peer bad messages per minute - uint64_t peermaxbadsigpm = 0; // Peer bad signatures per minute - uint16_t peermaxcons = 0; // Max inbound peer connections + uint64_t peermaxsize = 0; // Peer message max size in bytes + uint64_t peermaxcpm = 0; // Peer message rate (characters(bytes) per minute) + uint64_t peermaxdupmpm = 0; // Peer max duplicate messages per minute + uint64_t peermaxbadmpm = 0; // Peer bad messages per minute + uint64_t peermaxbadsigpm = 0; // Peer bad signatures per minute + uint16_t peermaxcons = 0; // Max peer connections + uint16_t peermaxknowncons = 0; // Max known peer connections - bool msgforwarding = false; // Whether peer message forwarding is on/off. + bool msgforwarding = false; // Whether peer message forwarding is on/off. + bool dynamicpeerdiscovery = false; // Whether dynamic peer discovery is on/off. std::string loglevel; // Log severity level (debug, info, warn, error) LOG_SEVERITY loglevel_type; // Log severity level enum (debug, info, warn, error) diff --git a/src/msg/fbuf/p2pmsg_content.fbs b/src/msg/fbuf/p2pmsg_content.fbs index b0916d52..c1f05271 100644 --- a/src/msg/fbuf/p2pmsg_content.fbs +++ b/src/msg/fbuf/p2pmsg_content.fbs @@ -34,7 +34,10 @@ union Message { State_Response_Message, History_Request_Message, History_Response_Message, - Connected_Status_Announcement_Message + Connected_Status_Announcement_Message, + Peer_List_Request_Message, + Peer_List_Response_Message, + Available_Capacity_Announcement_Message } //message content type table Content { @@ -125,4 +128,23 @@ table Connected_Status_Announcement_Message{ is_weakly_connected: bool; } +table Available_Capacity_Announcement_Message{ + available_capacity:int16; + timestamp:uint64; +} + +table Peer_List_Request_Message{ +} + +table Peer_List_Response_Message{ + peer_list: [Peer_Properties]; +} + +table Peer_Properties { + host_address:string; + port:uint16; + available_capacity:int16; + timestamp:uint64; +} + root_type Content; //root type for message content \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg_content_generated.h b/src/msg/fbuf/p2pmsg_content_generated.h index 8085380c..0b6dc0b7 100644 --- a/src/msg/fbuf/p2pmsg_content_generated.h +++ b/src/msg/fbuf/p2pmsg_content_generated.h @@ -69,6 +69,18 @@ struct State_FS_Hash_EntryBuilder; struct Connected_Status_Announcement_Message; struct Connected_Status_Announcement_MessageBuilder; +struct Available_Capacity_Announcement_Message; +struct Available_Capacity_Announcement_MessageBuilder; + +struct Peer_List_Request_Message; +struct Peer_List_Request_MessageBuilder; + +struct Peer_List_Response_Message; +struct Peer_List_Response_MessageBuilder; + +struct Peer_Properties; +struct Peer_PropertiesBuilder; + enum Message { Message_NONE = 0, Message_Peer_Challenge_Response_Message = 1, @@ -81,11 +93,14 @@ enum Message { Message_History_Request_Message = 8, Message_History_Response_Message = 9, Message_Connected_Status_Announcement_Message = 10, + Message_Peer_List_Request_Message = 11, + Message_Peer_List_Response_Message = 12, + Message_Available_Capacity_Announcement_Message = 13, Message_MIN = Message_NONE, - Message_MAX = Message_Connected_Status_Announcement_Message + Message_MAX = Message_Available_Capacity_Announcement_Message }; -inline const Message (&EnumValuesMessage())[11] { +inline const Message (&EnumValuesMessage())[14] { static const Message values[] = { Message_NONE, Message_Peer_Challenge_Response_Message, @@ -97,13 +112,16 @@ inline const Message (&EnumValuesMessage())[11] { Message_State_Response_Message, Message_History_Request_Message, Message_History_Response_Message, - Message_Connected_Status_Announcement_Message + Message_Connected_Status_Announcement_Message, + Message_Peer_List_Request_Message, + Message_Peer_List_Response_Message, + Message_Available_Capacity_Announcement_Message }; return values; } inline const char * const *EnumNamesMessage() { - static const char * const names[12] = { + static const char * const names[15] = { "NONE", "Peer_Challenge_Response_Message", "Peer_Challenge_Message", @@ -115,13 +133,16 @@ inline const char * const *EnumNamesMessage() { "History_Request_Message", "History_Response_Message", "Connected_Status_Announcement_Message", + "Peer_List_Request_Message", + "Peer_List_Response_Message", + "Available_Capacity_Announcement_Message", nullptr }; return names; } inline const char *EnumNameMessage(Message e) { - if (flatbuffers::IsOutRange(e, Message_NONE, Message_Connected_Status_Announcement_Message)) return ""; + if (flatbuffers::IsOutRange(e, Message_NONE, Message_Available_Capacity_Announcement_Message)) return ""; const size_t index = static_cast(e); return EnumNamesMessage()[index]; } @@ -170,6 +191,18 @@ template<> struct MessageTraits struct MessageTraits { + static const Message enum_value = Message_Peer_List_Request_Message; +}; + +template<> struct MessageTraits { + static const Message enum_value = Message_Peer_List_Response_Message; +}; + +template<> struct MessageTraits { + static const Message enum_value = Message_Available_Capacity_Announcement_Message; +}; + bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type); bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); @@ -587,6 +620,15 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *message_as_Connected_Status_Announcement_Message() const { return message_type() == msg::fbuf::p2pmsg::Message_Connected_Status_Announcement_Message ? static_cast(message()) : nullptr; } + const msg::fbuf::p2pmsg::Peer_List_Request_Message *message_as_Peer_List_Request_Message() const { + return message_type() == msg::fbuf::p2pmsg::Message_Peer_List_Request_Message ? static_cast(message()) : nullptr; + } + const msg::fbuf::p2pmsg::Peer_List_Response_Message *message_as_Peer_List_Response_Message() const { + return message_type() == msg::fbuf::p2pmsg::Message_Peer_List_Response_Message ? static_cast(message()) : nullptr; + } + const msg::fbuf::p2pmsg::Available_Capacity_Announcement_Message *message_as_Available_Capacity_Announcement_Message() const { + return message_type() == msg::fbuf::p2pmsg::Message_Available_Capacity_Announcement_Message ? static_cast(message()) : nullptr; + } void *mutable_message() { return GetPointer(VT_MESSAGE); } @@ -639,6 +681,18 @@ template<> inline const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message return message_as_Connected_Status_Announcement_Message(); } +template<> inline const msg::fbuf::p2pmsg::Peer_List_Request_Message *Content::message_as() const { + return message_as_Peer_List_Request_Message(); +} + +template<> inline const msg::fbuf::p2pmsg::Peer_List_Response_Message *Content::message_as() const { + return message_as_Peer_List_Response_Message(); +} + +template<> inline const msg::fbuf::p2pmsg::Available_Capacity_Announcement_Message *Content::message_as() const { + return message_as_Available_Capacity_Announcement_Message(); +} + struct ContentBuilder { typedef Content Table; flatbuffers::FlatBufferBuilder &fbb_; @@ -1749,6 +1803,246 @@ inline flatbuffers::Offset CreateConnecte return builder_.Finish(); } +struct Available_Capacity_Announcement_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef Available_Capacity_Announcement_MessageBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_AVAILABLE_CAPACITY = 4, + VT_TIMESTAMP = 6 + }; + int16_t available_capacity() const { + return GetField(VT_AVAILABLE_CAPACITY, 0); + } + bool mutate_available_capacity(int16_t _available_capacity) { + return SetField(VT_AVAILABLE_CAPACITY, _available_capacity, 0); + } + uint64_t timestamp() const { + return GetField(VT_TIMESTAMP, 0); + } + bool mutate_timestamp(uint64_t _timestamp) { + return SetField(VT_TIMESTAMP, _timestamp, 0); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_AVAILABLE_CAPACITY) && + VerifyField(verifier, VT_TIMESTAMP) && + verifier.EndTable(); + } +}; + +struct Available_Capacity_Announcement_MessageBuilder { + typedef Available_Capacity_Announcement_Message Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_available_capacity(int16_t available_capacity) { + fbb_.AddElement(Available_Capacity_Announcement_Message::VT_AVAILABLE_CAPACITY, available_capacity, 0); + } + void add_timestamp(uint64_t timestamp) { + fbb_.AddElement(Available_Capacity_Announcement_Message::VT_TIMESTAMP, timestamp, 0); + } + explicit Available_Capacity_Announcement_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateAvailable_Capacity_Announcement_Message( + flatbuffers::FlatBufferBuilder &_fbb, + int16_t available_capacity = 0, + uint64_t timestamp = 0) { + Available_Capacity_Announcement_MessageBuilder builder_(_fbb); + builder_.add_timestamp(timestamp); + builder_.add_available_capacity(available_capacity); + return builder_.Finish(); +} + +struct Peer_List_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef Peer_List_Request_MessageBuilder Builder; + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + verifier.EndTable(); + } +}; + +struct Peer_List_Request_MessageBuilder { + typedef Peer_List_Request_Message Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + explicit Peer_List_Request_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreatePeer_List_Request_Message( + flatbuffers::FlatBufferBuilder &_fbb) { + Peer_List_Request_MessageBuilder builder_(_fbb); + return builder_.Finish(); +} + +struct Peer_List_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef Peer_List_Response_MessageBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_PEER_LIST = 4 + }; + const flatbuffers::Vector> *peer_list() const { + return GetPointer> *>(VT_PEER_LIST); + } + flatbuffers::Vector> *mutable_peer_list() { + return GetPointer> *>(VT_PEER_LIST); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_PEER_LIST) && + verifier.VerifyVector(peer_list()) && + verifier.VerifyVectorOfTables(peer_list()) && + verifier.EndTable(); + } +}; + +struct Peer_List_Response_MessageBuilder { + typedef Peer_List_Response_Message Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_peer_list(flatbuffers::Offset>> peer_list) { + fbb_.AddOffset(Peer_List_Response_Message::VT_PEER_LIST, peer_list); + } + explicit Peer_List_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreatePeer_List_Response_Message( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset>> peer_list = 0) { + Peer_List_Response_MessageBuilder builder_(_fbb); + builder_.add_peer_list(peer_list); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreatePeer_List_Response_MessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector> *peer_list = nullptr) { + auto peer_list__ = peer_list ? _fbb.CreateVector>(*peer_list) : 0; + return msg::fbuf::p2pmsg::CreatePeer_List_Response_Message( + _fbb, + peer_list__); +} + +struct Peer_Properties FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef Peer_PropertiesBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_HOST_ADDRESS = 4, + VT_PORT = 6, + VT_AVAILABLE_CAPACITY = 8, + VT_TIMESTAMP = 10 + }; + const flatbuffers::String *host_address() const { + return GetPointer(VT_HOST_ADDRESS); + } + flatbuffers::String *mutable_host_address() { + return GetPointer(VT_HOST_ADDRESS); + } + uint16_t port() const { + return GetField(VT_PORT, 0); + } + bool mutate_port(uint16_t _port) { + return SetField(VT_PORT, _port, 0); + } + int16_t available_capacity() const { + return GetField(VT_AVAILABLE_CAPACITY, 0); + } + bool mutate_available_capacity(int16_t _available_capacity) { + return SetField(VT_AVAILABLE_CAPACITY, _available_capacity, 0); + } + uint64_t timestamp() const { + return GetField(VT_TIMESTAMP, 0); + } + bool mutate_timestamp(uint64_t _timestamp) { + return SetField(VT_TIMESTAMP, _timestamp, 0); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_HOST_ADDRESS) && + verifier.VerifyString(host_address()) && + VerifyField(verifier, VT_PORT) && + VerifyField(verifier, VT_AVAILABLE_CAPACITY) && + VerifyField(verifier, VT_TIMESTAMP) && + verifier.EndTable(); + } +}; + +struct Peer_PropertiesBuilder { + typedef Peer_Properties Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_host_address(flatbuffers::Offset host_address) { + fbb_.AddOffset(Peer_Properties::VT_HOST_ADDRESS, host_address); + } + void add_port(uint16_t port) { + fbb_.AddElement(Peer_Properties::VT_PORT, port, 0); + } + void add_available_capacity(int16_t available_capacity) { + fbb_.AddElement(Peer_Properties::VT_AVAILABLE_CAPACITY, available_capacity, 0); + } + void add_timestamp(uint64_t timestamp) { + fbb_.AddElement(Peer_Properties::VT_TIMESTAMP, timestamp, 0); + } + explicit Peer_PropertiesBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreatePeer_Properties( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset host_address = 0, + uint16_t port = 0, + int16_t available_capacity = 0, + uint64_t timestamp = 0) { + Peer_PropertiesBuilder builder_(_fbb); + builder_.add_timestamp(timestamp); + builder_.add_host_address(host_address); + builder_.add_available_capacity(available_capacity); + builder_.add_port(port); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreatePeer_PropertiesDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const char *host_address = nullptr, + uint16_t port = 0, + int16_t available_capacity = 0, + uint64_t timestamp = 0) { + auto host_address__ = host_address ? _fbb.CreateString(host_address) : 0; + return msg::fbuf::p2pmsg::CreatePeer_Properties( + _fbb, + host_address__, + port, + available_capacity, + timestamp); +} + inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type) { switch (type) { case Message_NONE: { @@ -1794,6 +2088,18 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } + case Message_Peer_List_Request_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + case Message_Peer_List_Response_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + case Message_Available_Capacity_Announcement_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } default: return true; } } diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index 15628e8b..afb78e16 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -275,6 +275,16 @@ namespace msg::fbuf::p2pmsg return sr; } + /** + * Creates a peer property list from the given peer list response message. + * @param msg Flatbuffer Peer List response message received from the peer. + * @return A Peer list representing the message. + */ + const std::vector create_peer_list_response_from_msg(const Peer_List_Response_Message &msg) + { + return flatbuf_peer_propertieslist_to_peer_propertiesvector(msg.peer_list()); + } + //---Message creation helpers---// /** @@ -574,7 +584,7 @@ namespace msg::fbuf::p2pmsg * @param is_weakly_connected True if number of connections are below threshold and false otherwise. * @param lcl Lcl value to be passed in the container message. */ - void create_msg_for_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl) + void create_msg_from_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl) { flatbuffers::FlatBufferBuilder builder(1024); @@ -590,6 +600,73 @@ namespace msg::fbuf::p2pmsg create_containermsg_from_content(container_builder, builder, lcl, false); } + /** + * Create available capacity announcement message. + * @param container_builder Flatbuffer builder for the container message. + * @param available_capacity Number of incoming connection slots available, -1 means there's no limitation for connections. + * @param timestamp Announced timestamp. + * @param lcl Lcl value to be passed in the container message. + */ + void create_msg_from_available_capacity_announcement(flatbuffers::FlatBufferBuilder &container_builder, const int16_t &available_capacity, const uint64_t ×tamp, std::string_view lcl) + { + flatbuffers::FlatBufferBuilder builder(1024); + + const flatbuffers::Offset announcement = + CreateAvailable_Capacity_Announcement_Message( + builder, + available_capacity, + timestamp); + + const flatbuffers::Offset message = CreateContent(builder, Message_Available_Capacity_Announcement_Message, announcement.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + create_containermsg_from_content(container_builder, builder, lcl, false); + } + + /** + * Create peer list request message. + * @param container_builder Flatbuffer builder for the container message. + * @param lcl Lcl value to be passed in the container message. + */ + void create_msg_from_peer_list_request(flatbuffers::FlatBufferBuilder &container_builder, std::string_view lcl) + { + flatbuffers::FlatBufferBuilder builder(1024); + + const flatbuffers::Offset request = + CreatePeer_List_Request_Message( + builder); + + const flatbuffers::Offset message = CreateContent(builder, Message_Peer_List_Request_Message, request.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + create_containermsg_from_content(container_builder, builder, lcl, false); + } + + /** + * Create peer list response message. + * @param container_builder Flatbuffer builder for the container message. + * @param peers Peer list to be sent to another peer. + * @param skipping_peer Peer that does not need to be sent. + * @param lcl Lcl value to be passed in the container message. + */ + void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &container_builder, const std::vector &peers, const std::optional &skipping_ip_port, std::string_view lcl) + { + flatbuffers::FlatBufferBuilder builder(1024); + + const flatbuffers::Offset response = + CreatePeer_List_Response_Message( + builder, + peer_propertiesvector_to_flatbuf_peer_propertieslist(builder, peers, skipping_ip_port)); + + const flatbuffers::Offset message = CreateContent(builder, Message_Peer_List_Response_Message, response.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + create_containermsg_from_content(container_builder, builder, lcl, false); + } + /** * Creates a Flatbuffer container message from the given Content message. * @param container_builder The Flatbuffer builder to which the final container message should be written to. @@ -760,4 +837,52 @@ namespace msg::fbuf::p2pmsg } return builder.CreateVector(fbvec); } + + /** + * Create peer list message from the given vector of peer properties structs. + * @param container_builder Flatbuffer builder for the container message. + * @param peers The Vector of peer properties to be placed in the container message. + * @param skipping_peer Peer that does not need to be sent. + */ + const flatbuffers::Offset>> + peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port) + { + std::vector> fbvec; + fbvec.reserve(peers.size()); + for (auto peer : peers) + { + // Skipping the requestedc peer from the peer list response. + if (!skipping_ip_port.has_value() || peer.ip_port != skipping_ip_port.value()) + fbvec.push_back(CreatePeer_Properties( + builder, + sv_to_flatbuff_str(builder, peer.ip_port.host_address), + peer.ip_port.port, + peer.available_capacity, + peer.timestamp)); + } + return builder.CreateVector(fbvec); + } + + /** + * Create vector of peer properties structs from the given peer list message. + * @param fbvec The peer list message to be convert to a list of peer properties structs. + */ + const std::vector + flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector> *fbvec) + { + std::vector peers; + + for (const Peer_Properties *peer : *fbvec) + { + conf::peer_properties properties; + + properties.ip_port.host_address = flatbuff_str_to_sv(peer->host_address()); + properties.ip_port.port = peer->port(); + properties.timestamp = peer->timestamp(); + properties.available_capacity = peer->available_capacity(); + + peers.push_back(properties); + } + return peers; + } } // namespace msg::fbuf::p2pmsg \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index 34b94ddd..f8c202ee 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -37,6 +37,8 @@ namespace msg::fbuf::p2pmsg const p2p::history_response create_history_response_from_msg(const History_Response_Message &msg); const p2p::state_request create_state_request_from_msg(const State_Request_Message &msg); + + const std::vector create_peer_list_response_from_msg(const Peer_List_Response_Message &msg); //---Message creation helpers---// void create_peer_challenge_response_from_challenge(flatbuffers::FlatBufferBuilder &container_builder, const std::string &challenge); @@ -68,6 +70,14 @@ namespace msg::fbuf::p2pmsg void create_containermsg_from_content( flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign); + void create_msg_from_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl); + + void create_msg_from_available_capacity_announcement(flatbuffers::FlatBufferBuilder &container_builder, const int16_t &available_capacity, const uint64_t ×tamp, std::string_view lcl); + + void create_msg_from_peer_list_request(flatbuffers::FlatBufferBuilder &container_builder, std::string_view lcl); + + void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &container_builder, const std::vector &peers, const std::optional &skipping_ip_port, std::string_view lcl); + //---Conversion helpers from flatbuffers data types to std data types---// const std::unordered_map> @@ -81,9 +91,15 @@ namespace msg::fbuf::p2pmsg const std::map flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector> *fbvec); + const std::vector + flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector> *fbvec); + const flatbuffers::Offset>> historyledgermap_to_flatbuf_historyledgermap(flatbuffers::FlatBufferBuilder &builder, const std::map &map); + const flatbuffers::Offset>> + peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); + void flatbuf_statefshashentry_to_statefshashentry(std::unordered_map &fs_entries, const flatbuffers::Vector> *fhashes); @@ -95,8 +111,6 @@ namespace msg::fbuf::p2pmsg flatbuffers::FlatBufferBuilder &builder, std::vector &hash_nodes); - void create_msg_for_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl); - } // namespace msg::fbuf::p2pmsg #endif diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 01f99320..36ddcaf3 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -114,7 +114,7 @@ namespace p2p if (replace_needed) { // If we happen to replace a peer session with known IP, transfer required details to the new session. - if (session.known_ipport.first.empty()) + if (!session.known_ipport.has_value()) session.known_ipport.swap(ex_session.known_ipport); session.uniqueid.swap(pubkeyhex); session.challenge_status = comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED; @@ -129,7 +129,7 @@ namespace p2p LOG_DEBUG << "Replacing existing connection [" << ex_session.display_name() << "] with [" << session.display_name() << "]"; return 0; } - else if (ex_session.known_ipport.first.empty() || !session.known_ipport.first.empty()) + else if (!ex_session.known_ipport.has_value() || session.known_ipport.has_value()) { // If we have any known ip-port info from the new session, transfer them to the existing session. ex_session.known_ipport.swap(session.known_ipport); @@ -267,8 +267,141 @@ namespace p2p */ void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected) { - msg::fbuf::p2pmsg::create_msg_for_connected_status_announcement(fbuf, is_weakly_connected, ledger::ctx.get_lcl()); + msg::fbuf::p2pmsg::create_msg_from_connected_status_announcement(fbuf, is_weakly_connected, ledger::ctx.get_lcl()); broadcast_message(fbuf, false); } + /** + * Sends theavailable capacity announcement to all the connected peers. + * @param available_capacity Available capacity of the known peer. + */ + void send_available_capacity_announcement(const int16_t &available_capacity) + { + const uint64_t time_now = util::get_epoch_milliseconds(); + flatbuffers::FlatBufferBuilder fbuf(1024); + msg::fbuf::p2pmsg::create_msg_from_available_capacity_announcement(fbuf, available_capacity, time_now, ledger::ctx.get_lcl()); + broadcast_message(fbuf, false); + } + + /** + * Send known peer list to a given peer. + * @param session Session to be sent the peers. + */ + void send_known_peer_list(peer_comm_session *session) + { + flatbuffers::FlatBufferBuilder fbuf(1024); + msg::fbuf::p2pmsg::create_msg_from_peer_list_response(fbuf, ctx.server->req_known_remotes, session->known_ipport, ledger::ctx.get_lcl()); + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + session->send(msg); + } + + /** + * Updates the capacity of the given known peer. + * @param ip_port Ip and port of the know peer. + * @param available_capacity Available capacity of the known peer, -1 if number of connections is unlimited. + * @param timestamp Capacity announced time. + */ + void update_known_peer_available_capacity(const conf::ip_port_prop &ip_port, const int16_t available_capacity, const uint64_t ×tamp) + { + std::scoped_lock lock(ctx.server->req_known_remotes_mutex); + + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](conf::peer_properties &p) { return p.ip_port == ip_port; }); + if (itr != ctx.server->req_known_remotes.end()) + { + LOG_DEBUG << "Updating peer available capacity: Host address: " << itr->ip_port.host_address << ":" << itr->ip_port.port << ", Capacity: " << std::to_string(available_capacity); + itr->available_capacity = available_capacity; + itr->timestamp = timestamp; + + // Sorting the known remote list according to the weight value after updating the peer properties. + sort_known_remotes(); + } + } + + /** + * Send peer list request to a random peer. + */ + void send_peer_list_request() + { + flatbuffers::FlatBufferBuilder fbuf(1024); + msg::fbuf::p2pmsg::create_msg_from_peer_list_request(fbuf, ledger::ctx.get_lcl()); + std::string target_pubkey; + send_message_to_random_peer(fbuf, target_pubkey); + LOG_DEBUG << "Peer list request: Requesting from [" << target_pubkey.substr(0, 10) << "]"; + } + + /** + * Merging the response peer list with the own known peer list. + * @param peers Incoming peer list. + */ + void merge_peer_list(const std::vector &peers) + { + std::scoped_lock lock(ctx.server->req_known_remotes_mutex); + + for (const conf::peer_properties &peer : peers) + { + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](conf::peer_properties &p) { return p.ip_port == peer.ip_port; }); + + // If the new peer is not in the peer list then add to the req_known_remotes + // Otherwise if new peer is recently updated (timestamp >) replace with the current one. + if (itr == ctx.server->req_known_remotes.end()) + { + ctx.server->req_known_remotes.push_back(peer); + LOG_DEBUG << "Adding " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list"; + } + else if (itr->timestamp < peer.timestamp) + { + itr->available_capacity = peer.available_capacity; + itr->timestamp = peer.timestamp; + LOG_DEBUG << "Replacing " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list"; + } + } + + // Sorting the known remote list according to the weight value after merging the peer list. + sort_known_remotes(); + } + + /** + * Sorting the known remote list according to the weight value. + */ + void sort_known_remotes() + { + std::sort(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), + [](const conf::peer_properties &p1, const conf::peer_properties &p2) { + return get_peer_weight(p1) < 0 || get_peer_weight(p1) > get_peer_weight(p2); + }); + } + + /** + * Calculate the weight value for the peer. + * @param peer Properties of the peer. + * @returns -1 if available capacity is unlimited otherwise weight value. + */ + int32_t get_peer_weight(const conf::peer_properties &peer) + { + const uint64_t time_now = util::get_epoch_milliseconds(); + return peer.available_capacity >= 0 ? peer.available_capacity * 1000 * 60 / ceil(time_now - peer.timestamp) : -1; + } + + /** + * Calculate and retunrns the available capacity. + * @returns -1 if available capacity is unlimited otherwise available value. + */ + int16_t get_available_capacity() + { + // If both peermaxcons and peermaxknowncons are configured calculate the capacity. + if (conf::cfg.peermaxcons != 0 && conf::cfg.peermaxknowncons != 0) + { + // If known peer max connection count is equal to the peer max connection count then return 0. + // Otherwise peer max con count - know peer max con count - inbound peer cons. + if (conf::cfg.peermaxcons != conf::cfg.peermaxknowncons) + return conf::cfg.peermaxcons - conf::cfg.peermaxknowncons - ctx.peer_connections.size() + ctx.server->known_remote_count; + else + return 0; + } + else if (conf::cfg.peermaxcons != 0 && conf::cfg.peermaxknowncons == 0) + return conf::cfg.peermaxcons - ctx.peer_connections.size(); + return -1; + } + } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index ea818111..7fe737cb 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -152,6 +152,22 @@ namespace p2p bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type); void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected); + + void send_available_capacity_announcement(const int16_t &available_capacity); + + void send_known_peer_list(peer_comm_session *session); + + void send_peer_list_request(); + + void update_known_peer_available_capacity(const conf::ip_port_prop &ip_port, const int16_t available_capacity, const uint64_t ×tamp); + + void merge_peer_list(const std::vector &peers); + + int32_t get_peer_weight(const conf::peer_properties &peer); + + void sort_known_remotes(); + + int16_t get_available_capacity(); } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index 44b98119..b43c5d73 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -1,5 +1,7 @@ #include "../comm/comm_server.hpp" #include "../util.hpp" +#include "../msg/fbuf/p2pmsg_helpers.hpp" +#include "../ledger.hpp" #include "peer_comm_server.hpp" #include "peer_comm_session.hpp" #include "self_node.hpp" @@ -7,7 +9,7 @@ namespace p2p { peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[4], - const uint64_t max_msg_size, const std::set &req_known_remotes) + const uint64_t max_msg_size, std::vector &req_known_remotes) : comm::comm_server("Peer", port, metric_thresholds, max_msg_size), req_known_remotes(req_known_remotes) { @@ -16,11 +18,13 @@ namespace p2p void peer_comm_server::start_custom_jobs() { // known_peers_thread = std::thread(&peer_comm_server::peer_monitor_loop, this); + peer_managing_thread = std::thread(&peer_comm_server::peer_managing_loop, this); } void peer_comm_server::stop_custom_jobs() { // known_peers_thread.join(); + peer_managing_thread.join(); } int peer_comm_server::process_custom_messages() @@ -54,31 +58,91 @@ namespace p2p // LOG_INFO << "Stopped peer monitor."; // } + void peer_comm_server::peer_managing_loop() + { + util::mask_signal(); + + LOG_INFO << "Started peer managing thread."; + + int peer_managing_counter = 0; + + while (!is_shutting_down) + { + peer_managing_counter++; + + // Send available peer capacity if peermaxcons is configured. + if (conf::cfg.peermaxcons != 0) + p2p::send_available_capacity_announcement(p2p::get_available_capacity()); + + // Start peer list request loop is dynamic peer discovery is enabled. + if (conf::cfg.dynamicpeerdiscovery && known_remote_count > 0) + { + // If max known peer connection cap is reached then periodically request peer list from random known peer. + // Otherwise frequently request peer list from a random known peer. + // Peer discovery time interval can be configured in the config. + if (conf::cfg.peermaxknowncons != 0 && known_remote_count == conf::cfg.peermaxknowncons) + { + if (peer_managing_counter * 100 >= conf::cfg.peerdiscoverytime * 5) + { + p2p::send_peer_list_request(); + peer_managing_counter = 0; + } + } + else if (peer_managing_counter * 100 >= conf::cfg.peerdiscoverytime) + { + p2p::send_peer_list_request(); + peer_managing_counter = 0; + } + } + + util::sleep(100); + } + + LOG_INFO << "Stopped peer managing thread."; + } + void peer_comm_server::maintain_known_connections() { // Find already connected known remote parties list. - std::set known_remotes; + std::vector known_remotes; { std::scoped_lock lock(sessions_mutex); for (const p2p::peer_comm_session &session : sessions) { - if (session.state != comm::SESSION_STATE::CLOSED && !session.known_ipport.first.empty()) - known_remotes.emplace(session.known_ipport); + if (session.state != comm::SESSION_STATE::CLOSED && session.known_ipport.has_value()) + known_remotes.push_back(session.known_ipport.value()); } } - for (const auto &ipport : req_known_remotes) + // Update global known remote count when new connections are made. + known_remote_count = known_remotes.size(); + + std::scoped_lock lock(req_known_remotes_mutex); + + for (const auto &peer : req_known_remotes) { if (is_shutting_down) break; - // Check if we are already connected to this remote party. - if (known_remotes.find(ipport) != known_remotes.end()) + // Break if known peer cap is reached. + if (conf::cfg.peermaxknowncons != 0 && known_remote_count == conf::cfg.peermaxknowncons) + break; + + // Break if max peer connection cap is reached. + if (conf::cfg.peermaxcons != 0 && known_remote_count == conf::cfg.peermaxcons) + break; + + // Continue if the peer has no free slots. + if (peer.available_capacity == 0) continue; - std::string_view host = ipport.first; - const uint16_t port = ipport.second; + // Check if we are already connected to this remote party. + if (std::find(known_remotes.begin(), known_remotes.end(), peer.ip_port) != known_remotes.end()) + continue; + + std::string_view host = peer.ip_port.host_address; + const uint16_t port = peer.ip_port.port; LOG_DEBUG << "Trying to connect " << host << ":" << std::to_string(port); std::variant client_result = hpws::client::connect(conf::ctx.hpws_exe_path, max_msg_size, host, port, "/", {}, util::fork_detach); @@ -102,7 +166,8 @@ namespace p2p { const std::string &host_address = std::get(host_result); p2p::peer_comm_session session(host_address, std::move(client), false, metric_thresholds); - session.known_ipport = ipport; + session.known_ipport.emplace(peer.ip_port); + known_remote_count++; std::scoped_lock lock(new_sessions_mutex); new_sessions.emplace_back(std::move(session)); diff --git a/src/p2p/peer_comm_server.hpp b/src/p2p/peer_comm_server.hpp index 6dbfbea9..e85051cd 100644 --- a/src/p2p/peer_comm_server.hpp +++ b/src/p2p/peer_comm_server.hpp @@ -9,10 +9,12 @@ namespace p2p class peer_comm_server : public comm::comm_server { private: - const std::set &req_known_remotes; int custom_connection_invocations = -1; // std::thread known_peers_thread; // Known peers connection establishment thread. + std::thread peer_managing_thread; // Thread to request known peer list from a random peer and announce available capacity. + void maintain_known_connections(); + void peer_managing_loop(); protected: void start_custom_jobs(); @@ -20,9 +22,13 @@ namespace p2p int process_custom_messages(); void custom_connections(); + public: + std::atomic known_remote_count = 0; + std::mutex req_known_remotes_mutex; + std::vector &req_known_remotes; peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[4], - const uint64_t max_msg_size, const std::set &req_known_remotes); + const uint64_t max_msg_size, std::vector &req_known_remotes); }; } // namespace p2p diff --git a/src/p2p/peer_comm_session.cpp b/src/p2p/peer_comm_session.cpp index 04b0200c..c3125445 100644 --- a/src/p2p/peer_comm_session.cpp +++ b/src/p2p/peer_comm_session.cpp @@ -4,9 +4,9 @@ namespace p2p { - void peer_comm_session::handle_connect() + int peer_comm_session::handle_connect() { - p2p::handle_peer_connect(*this); + return p2p::handle_peer_connect(*this); } int peer_comm_session::handle_message(std::string_view msg) diff --git a/src/p2p/peer_comm_session.hpp b/src/p2p/peer_comm_session.hpp index b81ad3ee..8a6443c0 100644 --- a/src/p2p/peer_comm_session.hpp +++ b/src/p2p/peer_comm_session.hpp @@ -15,12 +15,12 @@ namespace p2p using comm_session::comm_session; // Inherit constructors. private: - void handle_connect(); + int handle_connect(); int handle_message(std::string_view msg); void handle_close(); public: - conf::ip_port_pair known_ipport; // A known ip/port information that matches with our peer list configuration. + std::optional known_ipport; // A known ip/port information that matches with our peer list configuration. bool is_weakly_connected = false; // Holds whether this node is weakly connected to the other nodes. const std::string display_name(); }; diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index e04d36ea..b43e0da3 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -22,9 +22,18 @@ namespace p2p /** * This gets hit every time a peer connects to HP via the peer port (configured in contract config). + * @param session connected session. + * @return returns 0 if connection is successful and peer challenge is sent otherwise, -1. */ - void handle_peer_connect(p2p::peer_comm_session &session) + int handle_peer_connect(p2p::peer_comm_session &session) { + // Skip new inbound connection if max inbound connection cap is reached. + if (session.is_inbound && get_available_capacity() == 0) + { + LOG_DEBUG << "Max peer connection cap reached. Rejecting new peer connection [" << session.display_name() << "]"; + return -1; + } + // Send peer challenge. flatbuffers::FlatBufferBuilder fbuf(1024); p2pmsg::create_msg_from_peer_challenge(fbuf, session.issued_challenge); @@ -32,6 +41,7 @@ namespace p2p reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); session.send(msg); session.challenge_status = comm::CHALLENGE_ISSUED; + return 0; } // peer session on message callback method. @@ -103,7 +113,23 @@ namespace p2p return 0; } - if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message. + if (content_message_type == p2pmsg::Message_Peer_List_Response_Message) // This message is the peer list response message. + { + p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(*content->message_as_Peer_List_Response_Message())); + } + else if (content_message_type == p2pmsg::Message_Peer_List_Request_Message) // This message is the peer list request message. + { + p2p::send_known_peer_list(&session); + } + else if (content_message_type == p2pmsg::Message_Available_Capacity_Announcement_Message) // This message is the available capacity announcement message. + { + if (session.known_ipport.has_value()) + { + const p2pmsg::Available_Capacity_Announcement_Message *announcement_msg = content->message_as_Available_Capacity_Announcement_Message(); + p2p::update_known_peer_available_capacity(session.known_ipport.value(), announcement_msg->available_capacity(), announcement_msg->timestamp()); + } + } + else if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message. { const p2pmsg::Connected_Status_Announcement_Message *announcement_msg = content->message_as_Connected_Status_Announcement_Message(); session.is_weakly_connected = announcement_msg->is_weakly_connected(); @@ -246,13 +272,21 @@ namespace p2p } //peer session on message callback method - int handle_peer_close(const comm::comm_session &session) + int handle_peer_close(const p2p::peer_comm_session &session) { - // Erase the corresponding uniqueid peer connection if it's this session. - std::scoped_lock lock(ctx.peer_connections_mutex); - const auto itr = ctx.peer_connections.find(session.uniqueid); - if (itr != ctx.peer_connections.end() && itr->second == &session) - ctx.peer_connections.erase(itr); + { + // Erase the corresponding uniqueid peer connection if it's this session. + std::scoped_lock lock(ctx.peer_connections_mutex); + const auto itr = ctx.peer_connections.find(session.uniqueid); + if (itr != ctx.peer_connections.end() && itr->second == &session) + { + ctx.peer_connections.erase(itr); + } + } + + // Update peer properties to default on peer close. + if (session.known_ipport.has_value()) + p2p::update_known_peer_available_capacity(session.known_ipport.value(), -1, 0); return 0; } diff --git a/src/p2p/peer_session_handler.hpp b/src/p2p/peer_session_handler.hpp index 46fe8dbe..aafe1c3c 100644 --- a/src/p2p/peer_session_handler.hpp +++ b/src/p2p/peer_session_handler.hpp @@ -10,10 +10,10 @@ namespace p2pmsg = msg::fbuf::p2pmsg; namespace p2p { - void handle_peer_connect(p2p::peer_comm_session &session); + int handle_peer_connect(p2p::peer_comm_session &session); int handle_peer_message(p2p::peer_comm_session &session, std::string_view message); int handle_self_message(std::string_view message); - int handle_peer_close(const comm::comm_session &session); + int handle_peer_close(const p2p::peer_comm_session &session); void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content); diff --git a/src/usr/user_comm_session.cpp b/src/usr/user_comm_session.cpp index 533b26c0..a4ed3777 100644 --- a/src/usr/user_comm_session.cpp +++ b/src/usr/user_comm_session.cpp @@ -5,9 +5,9 @@ namespace usr { - void user_comm_session::handle_connect() + int user_comm_session::handle_connect() { - usr::handle_user_connect(*this); + return usr::handle_user_connect(*this); } int user_comm_session::handle_message(std::string_view msg) diff --git a/src/usr/user_comm_session.hpp b/src/usr/user_comm_session.hpp index 207930c2..baeb1e74 100644 --- a/src/usr/user_comm_session.hpp +++ b/src/usr/user_comm_session.hpp @@ -14,7 +14,7 @@ namespace usr using comm_session::comm_session; // Inherit constructors. private: - void handle_connect(); + int handle_connect(); int handle_message(std::string_view msg); void handle_close(); diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 101e4456..96049a4c 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -12,8 +12,10 @@ namespace usr { /** * This gets hit every time a client connects to HP via the public port (configured in contract config). + * @param session connected session. + * @return returns 0 if connection is successful and user challenge is sent, otherwise -1. */ - void handle_user_connect(usr::user_comm_session &session) + int handle_user_connect(usr::user_comm_session &session) { LOG_DEBUG << "User client connected " << session.display_name(); @@ -25,6 +27,8 @@ namespace usr // Set the challenge-issued value to true. session.challenge_status = comm::CHALLENGE_ISSUED; + + return 0; } /** diff --git a/src/usr/user_session_handler.hpp b/src/usr/user_session_handler.hpp index 73511eb4..3c72590b 100644 --- a/src/usr/user_session_handler.hpp +++ b/src/usr/user_session_handler.hpp @@ -6,7 +6,7 @@ namespace usr { - void handle_user_connect(usr::user_comm_session &session); + int handle_user_connect(usr::user_comm_session &session); int handle_user_message(usr::user_comm_session &session, std::string_view message); int handle_user_close(const usr::user_comm_session &session);