diff --git a/handlers/AccountObjects.cpp b/handlers/AccountObjects.cpp index 01a3e802..4e92a6e7 100644 --- a/handlers/AccountObjects.cpp +++ b/handlers/AccountObjects.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include #include #include @@ -11,7 +11,7 @@ #include #include -std::unordered_map types { +std::unordered_map types{ {"state", ripple::ltRIPPLE_STATE}, {"ticket", ripple::ltTICKET}, {"signer_list", ripple::ltSIGNER_LIST}, @@ -36,21 +36,21 @@ doAccountObjects( return response; } - if(!request.contains("account")) + if (!request.contains("account")) { response["error"] = "Must contain account"; return response; } - if(!request.at("account").is_string()) + if (!request.at("account").is_string()) { response["error"] = "Account must be a string"; return response; } - + ripple::AccountID accountID; auto parsed = ripple::parseBase58( - request.at("account").as_string().c_str()); + request.at("account").as_string().c_str()); if (!parsed) { @@ -63,7 +63,7 @@ doAccountObjects( ripple::uint256 cursor = beast::zero; if (request.contains("cursor")) { - if(!request.at("cursor").is_string()) + if (!request.at("cursor").is_string()) { response["error"] = "limit must be string"; return response; @@ -82,14 +82,14 @@ doAccountObjects( std::optional objectType = {}; if (request.contains("type")) { - if(!request.at("type").is_string()) + if (!request.at("type").is_string()) { response["error"] = "type must be string"; return response; } std::string typeAsString = request.at("type").as_string().c_str(); - if(types.find(typeAsString) == types.end()) + if (types.find(typeAsString) == types.end()) { response["error"] = "invalid object type"; return response; @@ -97,29 +97,24 @@ doAccountObjects( objectType = types[typeAsString]; } - + response["objects"] = boost::json::value(boost::json::array_kind); boost::json::array& jsonObjects = response.at("objects").as_array(); auto const addToResponse = [&](ripple::SLE const& sle) { if (!objectType || objectType == sle.getType()) { - jsonObjects.push_back(getJson(sle)); + jsonObjects.push_back(toJson(sle)); } return true; }; - - auto nextCursor = - traverseOwnedNodes( - backend, - accountID, - *ledgerSequence, - cursor, - addToResponse); + + auto nextCursor = traverseOwnedNodes( + backend, accountID, *ledgerSequence, cursor, addToResponse); if (nextCursor) response["next_cursor"] = ripple::strHex(*nextCursor); return response; -} \ No newline at end of file +} diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index 2e1a5648..4f1ce9d0 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -104,6 +104,11 @@ toJson(ripple::SLE const& sle) .count(); return value.as_object(); } +boost::json::value +toBoostJson(RippledJson const& value) +{ + return boost::json::parse(value.toStyledString()); +} boost::json::object toJson(ripple::LedgerInfo const& lgrInfo) diff --git a/handlers/RPCHelpers.h b/handlers/RPCHelpers.h index c5dba05e..ee185ccd 100644 --- a/handlers/RPCHelpers.h +++ b/handlers/RPCHelpers.h @@ -32,10 +32,11 @@ boost::json::object toJson(ripple::LedgerInfo const& info); boost::json::object -getJson(ripple::TxMeta const& meta); +toJson(ripple::TxMeta const& meta); +using RippledJson = Json::Value; boost::json::value -getJson(Json::Value const& value); +toBoostJson(RippledJson const& value); std::optional ledgerSequenceFromRequest( diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 5761ac03..adff9a0e 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -241,8 +241,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) } else { - auto lgr = - flatMapBackend_->fetchLedgerBySequence(ledgerSequence); + auto lgr = backend_->fetchLedgerBySequence(ledgerSequence); assert(lgr); publishLedger(*lgr); } @@ -574,7 +573,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) BOOST_LOG_TRIVIAL(info) << "Running online delete"; backend_->doOnlineDelete(*onlineDeleteInterval_); BOOST_LOG_TRIVIAL(info) << "Finished online delete"; - auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); + auto rng = backend_->fetchLedgerRangeNoThrow(); minSequence = rng->minSequence; deleting_ = false; }); diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 6f5afc01..0e34829a 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -276,7 +276,7 @@ private: { boost::json::object result; - result["etl_sources"] = loadBalancer_.toJson(); + result["etl_sources"] = loadBalancer_->toJson(); result["is_writer"] = writing_.load(); result["read_only"] = readOnly_; auto last = getLastPublish(); diff --git a/reporting/server/SubscriptionManager.cpp b/reporting/server/SubscriptionManager.cpp index 76ca5272..813d2d3c 100644 --- a/reporting/server/SubscriptionManager.cpp +++ b/reporting/server/SubscriptionManager.cpp @@ -1,6 +1,6 @@ -#include -#include - +#include +#include + void SubscriptionManager::subLedger(std::shared_ptr& session) { @@ -27,15 +27,15 @@ SubscriptionManager::pubLedger( pubMsg["ledger_hash"] = to_string(lgrInfo.hash); pubMsg["ledger_time"] = lgrInfo.closeTime.time_since_epoch().count(); - pubMsg["fee_ref"] = getJson(fees.units.jsonClipped()); - pubMsg["fee_base"] = getJson(fees.base.jsonClipped()); - pubMsg["reserve_base"] = getJson(fees.accountReserve(0).jsonClipped()); - pubMsg["reserve_inc"] = getJson(fees.increment.jsonClipped()); + pubMsg["fee_ref"] = toBoostJson(fees.units.jsonClipped()); + pubMsg["fee_base"] = toBoostJson(fees.base.jsonClipped()); + pubMsg["reserve_base"] = toBoostJson(fees.accountReserve(0).jsonClipped()); + pubMsg["reserve_inc"] = toBoostJson(fees.increment.jsonClipped()); pubMsg["validated_ledgers"] = ledgerRange; pubMsg["txn_count"] = txnCount; - for (auto const& session: streamSubscribers_[Ledgers]) + for (auto const& session : streamSubscribers_[Ledgers]) session->send(boost::json::serialize(pubMsg)); } @@ -53,7 +53,7 @@ SubscriptionManager::unsubTransactions(std::shared_ptr& session) void SubscriptionManager::subAccount( - ripple::AccountID const& account, + ripple::AccountID const& account, std::shared_ptr& session) { accountSubscribers_[account].emplace(std::move(session)); @@ -61,7 +61,7 @@ SubscriptionManager::subAccount( void SubscriptionManager::unsubAccount( - ripple::AccountID const& account, + ripple::AccountID const& account, std::shared_ptr& session) { accountSubscribers_[account].erase(session); @@ -75,23 +75,23 @@ SubscriptionManager::pubTransaction( auto [tx, meta] = deserializeTxPlusMeta(blob, seq); boost::json::object pubMsg; - pubMsg["transaction"] = getJson(*tx); - pubMsg["meta"] = getJson(*meta); + pubMsg["transaction"] = toJson(*tx); + pubMsg["meta"] = toJson(*meta); - for (auto const& session: streamSubscribers_[Transactions]) + for (auto const& session : streamSubscribers_[Transactions]) session->send(boost::json::serialize(pubMsg)); auto journal = ripple::debugLog(); auto accounts = meta->getAffectedAccounts(journal); for (ripple::AccountID const& account : accounts) - for (auto const& session: accountSubscribers_[account]) + for (auto const& session : accountSubscribers_[account]) session->send(boost::json::serialize(pubMsg)); } - void -SubscriptionManager::forwardProposedTransaction(boost::json::object const& response) +SubscriptionManager::forwardProposedTransaction( + boost::json::object const& response) { for (auto const& session : streamSubscribers_[TransactionsProposed]) session->send(boost::json::serialize(response)); @@ -100,13 +100,13 @@ SubscriptionManager::forwardProposedTransaction(boost::json::object const& respo auto accounts = getAccountsFromTransaction(transaction); for (ripple::AccountID const& account : accounts) - for (auto const& session: accountProposedSubscribers_[account]) + for (auto const& session : accountProposedSubscribers_[account]) session->send(boost::json::serialize(response)); } void SubscriptionManager::subProposedAccount( - ripple::AccountID const& account, + ripple::AccountID const& account, std::shared_ptr& session) { accountProposedSubscribers_[account].emplace(std::move(session)); @@ -127,7 +127,8 @@ SubscriptionManager::subProposedTransactions(std::shared_ptr& session) } void -SubscriptionManager::unsubProposedTransactions(std::shared_ptr& session) +SubscriptionManager::unsubProposedTransactions( + std::shared_ptr& session) { streamSubscribers_[TransactionsProposed].erase(session); -} \ No newline at end of file +} diff --git a/reporting/server/listener.h b/reporting/server/listener.h index c46e4fe1..e6dc3915 100644 --- a/reporting/server/listener.h +++ b/reporting/server/listener.h @@ -48,10 +48,11 @@ public: boost::asio::ip::tcp::endpoint endpoint, std::shared_ptr backend, std::shared_ptr subscriptions, - std::shared_ptr balancer) + std::shared_ptr balancer, + DOSGuard& dosGuard) { std::make_shared( - ioc, endpoint, backend, subscriptions, balancer) + ioc, endpoint, backend, subscriptions, balancer, dosGuard) ->run(); } @@ -67,6 +68,7 @@ public: , backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) + , dosGuard_(dosGuard) { boost::beast::error_code ec; diff --git a/reporting/server/session.cpp b/reporting/server/session.cpp index b4e82d70..c60a4fda 100644 --- a/reporting/server/session.cpp +++ b/reporting/server/session.cpp @@ -20,20 +20,20 @@ buildResponse( boost::json::object response; if (shouldForwardToP2p(request)) - return balancer->forwardToP2p(request); + return {balancer->forwardToP2p(request), 10}; switch (commandMap[command]) { case tx: return {doTx(request, *backend), 1}; case account_tx: { - auto res = doAccountTx(request, backend); + auto res = doAccountTx(request, *backend); if (res.contains("transactions")) return {res, res["transactions"].as_array().size()}; return {res, 1}; } case ledger: { - auto res = doLedger(request, backend); + auto res = doLedger(request, *backend); if (res.contains("transactions")) return {res, res["transactions"].as_array().size()}; return {res, 1}; @@ -43,7 +43,7 @@ buildResponse( case ledger_range: return {doLedgerRange(request, *backend), 1}; case ledger_data: { - auto res = doLedgerData(request, backend); + auto res = doLedgerData(request, *backend); if (res.contains("objects")) return {res, res["objects"].as_array().size() * 4}; return {res, 1}; @@ -51,7 +51,7 @@ buildResponse( case account_info: return {doAccountInfo(request, *backend), 1}; case book_offers: { - auto res = doBookOffers(request, backend); + auto res = doBookOffers(request, *backend); if (res.contains("offers")) return {res, res["offers"].as_array().size() * 4}; return {res, 1}; @@ -73,7 +73,7 @@ buildResponse( size_t count = 1; if (res.contains("send_currencies")) count = res["send_currencies"].as_array().size(); - if(res.contains("receive_currencies"])) + if (res.contains("receive_currencies")) count += res["receive_currencies"].as_array().size(); return {res, count}; } @@ -100,7 +100,7 @@ buildResponse( case unsubscribe: return {doUnsubscribe(request, session, *manager), 1}; case server_info: { - return {doServerInfo(request, backend), 1}; + return {doServerInfo(request, *backend), 1}; break; } default: diff --git a/reporting/server/session.h b/reporting/server/session.h index 9edccef2..f14adc21 100644 --- a/reporting/server/session.h +++ b/reporting/server/session.h @@ -27,6 +27,7 @@ #include #include #include +#include class session; class SubscriptionManager; @@ -134,6 +135,10 @@ boost::json::object doAccountObjects( boost::json::object const& request, BackendInterface const& backend); +boost::json::object +doServerInfo( + boost::json::object const& request, + BackendInterface const& backend); boost::json::object doChannelAuthorize(boost::json::object const& request); @@ -151,7 +156,7 @@ doUnsubscribe( std::shared_ptr& session, SubscriptionManager& manager); -boost::json::object +std::pair buildResponse( boost::json::object const& request, std::shared_ptr backend, diff --git a/server/websocket_server_async.cpp b/server/websocket_server_async.cpp index e31d2d66..65e014b3 100644 --- a/server/websocket_server_async.cpp +++ b/server/websocket_server_async.cpp @@ -32,386 +32,11 @@ #include #include #include -#include #include #include #include #include -<<<<<<< HEAD:websocket_server_async.cpp -======= -//------------------------------------------------------------------------------ -enum RPCCommand { - tx, - account_tx, - ledger, - account_info, - ledger_data, - book_offers, - ledger_range, - ledger_entry, - server_info -}; -std::unordered_map commandMap{ - {"tx", tx}, - {"account_tx", account_tx}, - {"ledger", ledger}, - {"ledger_range", ledger_range}, - {"ledger_entry", ledger_entry}, - {"account_info", account_info}, - {"ledger_data", ledger_data}, - {"book_offers", book_offers}, - {"server_info", server_info}}; - -boost::json::object -doAccountInfo( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doTx(boost::json::object const& request, BackendInterface const& backend); -boost::json::object -doAccountTx( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doLedgerData( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doLedgerEntry( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doBookOffers( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doLedger(boost::json::object const& request, BackendInterface const& backend); -boost::json::object -doLedgerRange( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doServerInfo( - boost::json::object const& request, - BackendInterface const& backend); - -std::pair -buildResponse( - boost::json::object const& request, - BackendInterface const& backend) -{ - std::string command = request.at("command").as_string().c_str(); - BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; - boost::json::object response; - switch (commandMap[command]) - { - case tx: - return {doTx(request, backend), 1}; - break; - case account_tx: { - auto res = doAccountTx(request, backend); - if (res.contains("transactions")) - return {res, res["transactions"].as_array().size()}; - return {res, 1}; - } - break; - case ledger: { - auto res = doLedger(request, backend); - if (res.contains("transactions")) - return {res, res["transactions"].as_array().size()}; - return {res, 1}; - } - break; - case ledger_entry: - return {doLedgerEntry(request, backend), 1}; - break; - case ledger_range: - return {doLedgerRange(request, backend), 1}; - break; - case ledger_data: { - auto res = doLedgerData(request, backend); - if (res.contains("objects")) - return {res, res["objects"].as_array().size()}; - return {res, 1}; - } - break; - case server_info: { - return {doServerInfo(request, backend), 1}; - break; - } - case account_info: - return {doAccountInfo(request, backend), 1}; - break; - case book_offers: { - auto res = doBookOffers(request, backend); - if (res.contains("offers")) - return {res, res["offers"].as_array().size()}; - return {res, 1}; - } - break; - default: - BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command; - } - return {response, 1}; -} -// Report a failure -void -fail(boost::beast::error_code ec, char const* what) -{ - std::cerr << what << ": " << ec.message() << "\n"; -} - -// Echoes back all received WebSocket messages -class session : public std::enable_shared_from_this -{ - boost::beast::websocket::stream ws_; - boost::beast::flat_buffer buffer_; - std::string response_; - BackendInterface const& backend_; - DOSGuard& dosGuard_; - -public: - // Take ownership of the socket - explicit session( - boost::asio::ip::tcp::socket&& socket, - BackendInterface const& backend, - DOSGuard& dosGuard) - : ws_(std::move(socket)), backend_(backend), dosGuard_(dosGuard) - { - } - - // Get on the correct executor - void - run() - { - // We need to be executing within a strand to perform async - // operations on the I/O objects in this session. Although not - // strictly necessary for single-threaded contexts, this example - // code is written to be thread-safe by default. - boost::asio::dispatch( - ws_.get_executor(), - boost::beast::bind_front_handler( - &session::on_run, shared_from_this())); - } - - // Start the asynchronous operation - void - on_run() - { - // Set suggested timeout settings for the websocket - ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested( - boost::beast::role_type::server)); - - // Set a decorator to change the Server of the handshake - ws_.set_option(boost::beast::websocket::stream_base::decorator( - [](boost::beast::websocket::response_type& res) { - res.set( - boost::beast::http::field::server, - std::string(BOOST_BEAST_VERSION_STRING) + - " websocket-server-async"); - })); - // Accept the websocket handshake - ws_.async_accept(boost::beast::bind_front_handler( - &session::on_accept, shared_from_this())); - } - - void - on_accept(boost::beast::error_code ec) - { - if (ec) - return fail(ec, "accept"); - - // Read a message - do_read(); - } - - void - do_read() - { - // Read a message into our buffer - ws_.async_read( - buffer_, - boost::beast::bind_front_handler( - &session::on_read, shared_from_this())); - } - - void - on_read(boost::beast::error_code ec, std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - // This indicates that the session was closed - if (ec == boost::beast::websocket::error::closed) - return; - - if (ec) - fail(ec, "read"); - std::string msg{ - static_cast(buffer_.data().data()), buffer_.size()}; - // BOOST_LOG_TRIVIAL(debug) << __func__ << msg; - boost::json::object response; - auto ip = - ws_.next_layer().socket().remote_endpoint().address().to_string(); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " received request from ip = " << ip; - if (!dosGuard_.isOk(ip)) - response["error"] = "Too many requests. Slow down"; - else - { - try - { - boost::json::value raw = boost::json::parse(msg); - boost::json::object request = raw.as_object(); - BOOST_LOG_TRIVIAL(debug) << " received request : " << request; - try - { - auto start = std::chrono::system_clock::now(); - auto [res, cost] = buildResponse(request, backend_); - response = std::move(res); - if (!dosGuard_.add(ip, cost)) - { - response["warning"] = "Too many requests"; - } - - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << " RPC call took " - << ((end - start).count() / 1000000000.0) - << " . request = " << request; - } - catch (Backend::DatabaseTimeout const& t) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout"; - response["error"] = - "Database read timeout. Please retry the request"; - } - } - catch (std::exception const& e) - { - BOOST_LOG_TRIVIAL(error) - << __func__ << "caught exception : " << e.what(); - response["error"] = "Unknown exception"; - } - } - BOOST_LOG_TRIVIAL(trace) << __func__ << response; - response_ = boost::json::serialize(response); - - // Echo the message - ws_.text(ws_.got_text()); - ws_.async_write( - boost::asio::buffer(response_), - boost::beast::bind_front_handler( - &session::on_write, shared_from_this())); - } - - void - on_write(boost::beast::error_code ec, std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - if (ec) - return fail(ec, "write"); - - // Clear the buffer - buffer_.consume(buffer_.size()); - - // Do another read - do_read(); - } -}; - -//------------------------------------------------------------------------------ - -// Accepts incoming connections and launches the sessions -class listener : public std::enable_shared_from_this -{ - boost::asio::io_context& ioc_; - boost::asio::ip::tcp::acceptor acceptor_; - BackendInterface const& backend_; - DOSGuard& dosGuard_; - -public: - listener( - boost::asio::io_context& ioc, - boost::asio::ip::tcp::endpoint endpoint, - BackendInterface const& backend, - DOSGuard& dosGuard) - : ioc_(ioc), acceptor_(ioc), backend_(backend), dosGuard_(dosGuard) - { - boost::beast::error_code ec; - - // Open the acceptor - acceptor_.open(endpoint.protocol(), ec); - if (ec) - { - fail(ec, "open"); - return; - } - - // Allow address reuse - acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); - if (ec) - { - fail(ec, "set_option"); - return; - } - - // Bind to the server address - acceptor_.bind(endpoint, ec); - if (ec) - { - fail(ec, "bind"); - return; - } - - // Start listening for connections - acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); - if (ec) - { - fail(ec, "listen"); - return; - } - } - - // Start accepting incoming connections - void - run() - { - do_accept(); - } - -private: - void - do_accept() - { - // The new connection gets its own strand - acceptor_.async_accept( - boost::asio::make_strand(ioc_), - boost::beast::bind_front_handler( - &listener::on_accept, shared_from_this())); - } - - void - on_accept(boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) - { - if (ec) - { - fail(ec, "accept"); - } - else - { - // Create the session and run it - std::make_shared(std::move(socket), backend_, dosGuard_) - ->run(); - } - - // Accept another connection - do_accept(); - } -}; - ->>>>>>> dev:server/websocket_server_async.cpp std::optional parse_config(const char* filename) { @@ -515,34 +140,20 @@ main(int argc, char* argv[]) // The io_context is required for all I/O boost::asio::io_context ioc{threads}; -<<<<<<< HEAD:websocket_server_async.cpp + + DOSGuard dosGuard{config.value(), ioc}; std::shared_ptr backend{Backend::make_Backend(*config)}; std::shared_ptr subscriptions{ SubscriptionManager::make_SubscriptionManager()}; -======= - ReportingETL etl{config.value(), ioc}; - DOSGuard dosGuard{config.value(), ioc}; ->>>>>>> dev:server/websocket_server_async.cpp std::shared_ptr ledgers{ NetworkValidatedLedgers::make_ValidatedLedgers()}; std::shared_ptr balancer{ ETLLoadBalancer::make_ETLLoadBalancer( - *config, - ioc, -<<<<<<< HEAD:websocket_server_async.cpp - backend, - subscriptions, - ledgers)}; -======= - boost::asio::ip::tcp::endpoint{address, port}, - etl.getFlatMapBackend(), - dosGuard) - ->run(); ->>>>>>> dev:server/websocket_server_async.cpp + *config, ioc, backend, subscriptions, ledgers)}; std::shared_ptr etl{ReportingETL::make_ReportingETL( *config, ioc, backend, subscriptions, balancer, ledgers)}; @@ -552,7 +163,8 @@ main(int argc, char* argv[]) boost::asio::ip::tcp::endpoint{address, port}, backend, subscriptions, - balancer); + balancer, + dosGuard); // Blocks until stopped. // When stopped, shared_ptrs fall out of scope diff --git a/unittests/main.cpp b/unittests/main.cpp index 4ac86e25..e344a03c 100644 --- a/unittests/main.cpp +++ b/unittests/main.cpp @@ -45,7 +45,7 @@ TEST(BackendTest, Basic) for (auto& config : configs) { std::cout << keyspace << std::endl; - auto backend = Backend::makeBackend(config); + auto backend = Backend::make_Backend(config); backend->open(false); std::string rawHeader =