mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 17:27:58 +00:00
boost json
This commit is contained in:
@@ -11,7 +11,7 @@ project(reporting)
|
||||
cmake_minimum_required(VERSION 3.17)
|
||||
set (CMAKE_CXX_STANDARD 17)
|
||||
|
||||
FIND_PACKAGE( Boost 1.75 COMPONENTS thread log REQUIRED )
|
||||
FIND_PACKAGE( Boost 1.75 COMPONENTS thread system log REQUIRED )
|
||||
|
||||
add_executable (reporting
|
||||
websocket_server_async.cpp
|
||||
@@ -82,6 +82,6 @@ target_link_libraries (grpc_pbufs ${_REFLECTION} ${_PROTOBUF_LIBPROTOBUF} ${_GRP
|
||||
target_sources(reporting PRIVATE reporting/ETLSource.cpp)
|
||||
|
||||
|
||||
|
||||
message(${Boost_LIBRARIES})
|
||||
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR} ${Boost_INCLUDE_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
TARGET_LINK_LIBRARIES(reporting LINK_PUBLIC ${Boost_LIBRARIES} ${Boost_LOG_LIBRARY} grpc_pbufs)
|
||||
TARGET_LINK_LIBRARIES(reporting PUBLIC ${Boost_LIBRARIES} grpc_pbufs)
|
||||
|
||||
@@ -20,9 +20,11 @@
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <reporting/ETLSource.h>
|
||||
#include <boost/log/trivial.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/json.hpp>
|
||||
#include <boost/json/src.hpp>
|
||||
#include <boost/log/trivial.hpp>
|
||||
#include <reporting/ETLSource.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -40,10 +42,7 @@ ETLSource::ETLSource(std::string ip, std::string wsPort)
|
||||
{
|
||||
}
|
||||
|
||||
ETLSource::ETLSource(
|
||||
std::string ip,
|
||||
std::string wsPort,
|
||||
std::string grpcPort)
|
||||
ETLSource::ETLSource(std::string ip, std::string wsPort, std::string grpcPort)
|
||||
: ip_(ip)
|
||||
, wsPort_(wsPort)
|
||||
, grpcPort_(grpcPort)
|
||||
@@ -60,17 +59,14 @@ ETLSource::ETLSource(
|
||||
std::stringstream ss;
|
||||
ss << endpoint;
|
||||
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
|
||||
grpc::CreateChannel(ss.str(),
|
||||
grpc::InsecureChannelCredentials()));
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Made stub for remote = " << toString();
|
||||
grpc::CreateChannel(ss.str(), grpc::InsecureChannelCredentials()));
|
||||
BOOST_LOG_TRIVIAL(debug) << "Made stub for remote = " << toString();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Exception while creating stub = " << e.what()
|
||||
<< " . Remote = " << toString();
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Exception while creating stub = " << e.what()
|
||||
<< " . Remote = " << toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,16 +80,17 @@ ETLSource::reconnect(boost::beast::error_code ec)
|
||||
if (ec != boost::asio::error::operation_aborted &&
|
||||
ec != boost::asio::error::connection_refused)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error) << __func__ << " : "
|
||||
<< "error code = " << ec << " - " << toString();
|
||||
BOOST_LOG_TRIVIAL(error)
|
||||
<< __func__ << " : "
|
||||
<< "error code = " << ec << " - " << toString();
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning) << __func__ << " : "
|
||||
<< "error code = " << ec << " - " << toString();
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " : "
|
||||
<< "error code = " << ec << " - " << toString();
|
||||
}
|
||||
|
||||
|
||||
// exponentially increasing timeouts, with a max of 30 seconds
|
||||
size_t waitTime = std::min(pow(2, numFailures_), 30.0);
|
||||
numFailures_++;
|
||||
@@ -157,8 +154,8 @@ ETLSource::onResolve(
|
||||
boost::beast::error_code ec,
|
||||
boost::asio::ip::tcp::resolver::results_type results)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
|
||||
<< toString();
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
if (ec)
|
||||
{
|
||||
// try again
|
||||
@@ -178,8 +175,8 @@ ETLSource::onConnect(
|
||||
boost::beast::error_code ec,
|
||||
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
|
||||
<< toString();
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
if (ec)
|
||||
{
|
||||
// start over
|
||||
@@ -218,8 +215,8 @@ ETLSource::onConnect(
|
||||
void
|
||||
ETLSource::onHandshake(boost::beast::error_code ec)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
|
||||
<< toString();
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
if (ec)
|
||||
{
|
||||
// start over
|
||||
@@ -227,30 +224,23 @@ ETLSource::onHandshake(boost::beast::error_code ec)
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
Json::Value jv;
|
||||
jv["command"] = "subscribe";
|
||||
|
||||
jv["streams"] = Json::arrayValue;
|
||||
Json::Value ledgerStream("ledger");
|
||||
jv["streams"].append(ledgerStream);
|
||||
Json::Value txnStream("transactions_proposed");
|
||||
jv["streams"].append(txnStream);
|
||||
Json::FastWriter fastWriter;
|
||||
*/
|
||||
boost::json::object jv{
|
||||
{"command", "subscribe"},
|
||||
{"streams", {"ledger", "transactions_proposed"}}};
|
||||
std::string s = boost::json::serialize(jv);
|
||||
BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message";
|
||||
// Send the message
|
||||
ws_->async_write(
|
||||
boost::asio::buffer("foo"),
|
||||
[this](auto ec, size_t size) { onWrite(ec, size); });
|
||||
ws_->async_write(boost::asio::buffer(s), [this](auto ec, size_t size) {
|
||||
onWrite(ec, size);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
|
||||
<< toString();
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
if (ec)
|
||||
{
|
||||
// start over
|
||||
@@ -266,8 +256,8 @@ ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
|
||||
void
|
||||
ETLSource::onRead(boost::beast::error_code ec, size_t size)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - "
|
||||
<< toString();
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
// if error or error reading message, start over
|
||||
if (ec)
|
||||
{
|
||||
@@ -295,62 +285,61 @@ ETLSource::handleMessage()
|
||||
connected_ = true;
|
||||
try
|
||||
{
|
||||
/*
|
||||
Json::Value response;
|
||||
Json::Reader reader;
|
||||
if (!reader.parse(
|
||||
static_cast<char const*>(readBuffer_.data().data()), response))
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error)
|
||||
<< __func__ << " : "
|
||||
<< "Error parsing stream message."
|
||||
<< " Message = " << readBuffer_.data().data();
|
||||
return false;
|
||||
}
|
||||
boost::json::value raw = boost::json::parse(
|
||||
static_cast<char const*>(readBuffer_.data().data()));
|
||||
boost::json::object response = raw.as_object();
|
||||
|
||||
uint32_t ledgerIndex = 0;
|
||||
if (response.isMember("result"))
|
||||
if (response.contains("result"))
|
||||
{
|
||||
if (response["result"].isMember(jss::ledger_index))
|
||||
boost::json::object result = response["result"].as_object();
|
||||
if (result.contains("ledger_index"))
|
||||
{
|
||||
ledgerIndex = response["result"][jss::ledger_index].asUInt();
|
||||
ledgerIndex = result["ledger_index"].as_uint64();
|
||||
}
|
||||
if (response[jss::result].isMember(jss::validated_ledgers))
|
||||
if (result.contains("validated_ledgers"))
|
||||
{
|
||||
boost::json::string const& validatedLedgers =
|
||||
result["validated_ledgers"].as_string();
|
||||
setValidatedRange(
|
||||
response[jss::result][jss::validated_ledgers].asString());
|
||||
{validatedLedgers.c_str(), validatedLedgers.size()});
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " : "
|
||||
<< "Received a message on ledger "
|
||||
<< " subscription stream. Message : "
|
||||
<< response.toStyledString() << " - " << toString();
|
||||
<< " subscription stream. Message : " << response << " - "
|
||||
<< toString();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (response.isMember(jss::transaction))
|
||||
if (response.contains("transaction"))
|
||||
{
|
||||
if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this))
|
||||
/*
|
||||
if
|
||||
(etl_.getETLLoadBalancer().shouldPropagateTxnStream(this))
|
||||
{
|
||||
etl_.getApplication().getOPs().forwardProposedTransaction(
|
||||
response);
|
||||
}
|
||||
*/
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " : "
|
||||
<< "Received a message on ledger "
|
||||
<< " subscription stream. Message : "
|
||||
<< response.toStyledString() << " - " << toString();
|
||||
if (response.isMember(jss::ledger_index))
|
||||
<< " subscription stream. Message : " << response << " - "
|
||||
<< toString();
|
||||
if (response.contains("ledger_index"))
|
||||
{
|
||||
ledgerIndex = response[jss::ledger_index].asUInt();
|
||||
ledgerIndex = response["ledger_index"].as_uint64();
|
||||
}
|
||||
if (response.isMember(jss::validated_ledgers))
|
||||
if (response.contains("validated_ledgers"))
|
||||
{
|
||||
boost::json::string const& validatedLedgers =
|
||||
response["validated_ledgers"].as_string();
|
||||
setValidatedRange(
|
||||
response[jss::validated_ledgers].asString());
|
||||
{validatedLedgers.c_str(), validatedLedgers.size()});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -361,9 +350,8 @@ ETLSource::handleMessage()
|
||||
<< __func__ << " : "
|
||||
<< "Pushing ledger sequence = " << ledgerIndex << " - "
|
||||
<< toString();
|
||||
networkValidatedLedgers_.push(ledgerIndex);
|
||||
// networkValidatedLedgers_.push(ledgerIndex);
|
||||
}
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
@@ -383,16 +371,12 @@ class AsyncCallData
|
||||
|
||||
grpc::Status status_;
|
||||
|
||||
|
||||
|
||||
public:
|
||||
AsyncCallData(
|
||||
uint32_t seq)
|
||||
AsyncCallData(uint32_t seq)
|
||||
{
|
||||
request_.mutable_ledger()->set_sequence(seq);
|
||||
request_.set_user("ETL");
|
||||
|
||||
|
||||
cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
|
||||
|
||||
next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
|
||||
@@ -415,9 +399,10 @@ public:
|
||||
}
|
||||
if (!status_.ok())
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(debug) << "AsyncCallData status_ not ok: "
|
||||
<< " code = " << status_.error_code()
|
||||
<< " message = " << status_.error_message();
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "AsyncCallData status_ not ok: "
|
||||
<< " code = " << status_.error_code()
|
||||
<< " message = " << status_.error_message();
|
||||
return CallStatus::ERRORED;
|
||||
}
|
||||
if (!next_->is_unlimited())
|
||||
@@ -436,7 +421,6 @@ public:
|
||||
if (cur_->marker().size() == 0)
|
||||
more = false;
|
||||
|
||||
|
||||
// if we are not done, make the next async call
|
||||
if (more)
|
||||
{
|
||||
@@ -498,17 +482,15 @@ ETLSource::loadInitialLedger(uint32_t sequence)
|
||||
std::vector<AsyncCallData> calls;
|
||||
calls.emplace_back(sequence);
|
||||
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << "Starting data download for ledger " << sequence
|
||||
<< ". Using source = " << toString();
|
||||
<< ". Using source = " << toString();
|
||||
|
||||
for (auto& c : calls)
|
||||
c.call(stub_, cq);
|
||||
|
||||
size_t numFinished = 0;
|
||||
bool abort = false;
|
||||
while (numFinished < calls.size() &&
|
||||
cq.Next(&tag, &ok))
|
||||
while (numFinished < calls.size() && cq.Next(&tag, &ok))
|
||||
{
|
||||
assert(tag);
|
||||
|
||||
@@ -524,8 +506,7 @@ ETLSource::loadInitialLedger(uint32_t sequence)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
auto result =
|
||||
ptr->process(stub_, cq, abort);
|
||||
auto result = ptr->process(stub_, cq, abort);
|
||||
if (result != AsyncCallData::CallStatus::MORE)
|
||||
{
|
||||
numFinished++;
|
||||
@@ -608,9 +589,12 @@ ETLLoadBalancer::loadInitialLedger(uint32_t sequence)
|
||||
bool res = source->loadInitialLedger(sequence);
|
||||
if (!res)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger. "
|
||||
BOOST_LOG_TRIVIAL(error) << "Failed to download initial
|
||||
ledger.
|
||||
"
|
||||
<< " Sequence = " << sequence
|
||||
<< " source = " << source->toString();
|
||||
<< " source = " <<
|
||||
source->toString();
|
||||
}
|
||||
return res;
|
||||
},
|
||||
@@ -708,9 +692,8 @@ ETLSource::getP2pForwardingStub() const
|
||||
return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
|
||||
grpc::CreateChannel(
|
||||
beast::IP::Endpoint(
|
||||
boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
|
||||
.to_string(),
|
||||
grpc::InsecureChannelCredentials()));
|
||||
boost::asio::ip::make_address(ip_),
|
||||
std::stoi(grpcPort_)) .to_string(), grpc::InsecureChannelCredentials()));
|
||||
}
|
||||
catch (std::exception const&)
|
||||
{
|
||||
@@ -723,7 +706,8 @@ Json::Value
|
||||
ETLSource::forwardToP2p(RPC::JsonContext& context) const
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. "
|
||||
<< "request = " << context.params.toStyledString();
|
||||
<< "request = " <<
|
||||
context.params.toStyledString();
|
||||
|
||||
Json::Value response;
|
||||
if (!connected_)
|
||||
@@ -734,11 +718,10 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const
|
||||
}
|
||||
namespace beast = boost::beast; // from <boost/beast.hpp>
|
||||
namespace http = beast::http; // from <boost/beast/http.hpp>
|
||||
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
|
||||
namespace net = boost::asio; // from <boost/asio.hpp>
|
||||
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
|
||||
Json::Value& request = context.params;
|
||||
try
|
||||
namespace websocket = beast::websocket; // from
|
||||
<boost/beast/websocket.hpp> namespace net = boost::asio; // from
|
||||
<boost/asio.hpp> using tcp = boost::asio::ip::tcp; // from
|
||||
<boost/asio/ip/tcp.hpp> Json::Value& request = context.params; try
|
||||
{
|
||||
// The io_context is required for all I/O
|
||||
net::io_context ioc;
|
||||
@@ -759,7 +742,8 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const
|
||||
// Set a decorator to change the User-Agent of the handshake
|
||||
// and to tell rippled to charge the client IP for RPC
|
||||
// resources. See "secure_gateway" in
|
||||
// https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
|
||||
//
|
||||
https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
|
||||
ws->set_option(websocket::stream_base::decorator(
|
||||
[&context](websocket::request_type& req) {
|
||||
req.set(
|
||||
@@ -770,7 +754,8 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const
|
||||
http::field::forwarded,
|
||||
"for=" + context.consumer.to_string());
|
||||
}));
|
||||
BOOST_LOG_TRIVIAL(debug) << "client ip: " << context.consumer.to_string();
|
||||
BOOST_LOG_TRIVIAL(debug) << "client ip: " <<
|
||||
context.consumer.to_string();
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake";
|
||||
// Perform the websocket handshake
|
||||
@@ -852,8 +837,10 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
|
||||
numAttempts++;
|
||||
if (numAttempts % sources_.size() == 0)
|
||||
{
|
||||
// If another process loaded the ledger into the database, we can
|
||||
// abort trying to fetch the ledger from a transaction processing
|
||||
// If another process loaded the ledger into the database, we
|
||||
can
|
||||
// abort trying to fetch the ledger from a transaction
|
||||
processing
|
||||
// process
|
||||
if (etl_.getApplication().getLedgerMaster().getLedgerBySeq(
|
||||
ledgerSequence))
|
||||
|
||||
@@ -21,10 +21,10 @@
|
||||
#define RIPPLE_APP_REPORTING_ETLSOURCE_H_INCLUDED
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/beast/core.hpp>
|
||||
#include <boost/beast/core/string.hpp>
|
||||
#include <boost/beast/websocket.hpp>
|
||||
#include <boost/asio.hpp>
|
||||
|
||||
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
|
||||
#include <grpcpp/grpcpp.h>
|
||||
@@ -63,9 +63,9 @@ class ETLSource
|
||||
|
||||
std::string validatedLedgersRaw_;
|
||||
|
||||
//NetworkValidatedLedgers& networkValidatedLedgers_;
|
||||
// NetworkValidatedLedgers& networkValidatedLedgers_;
|
||||
|
||||
//beast::Journal journal_;
|
||||
// beast::Journal journal_;
|
||||
|
||||
mutable std::mutex mtx_;
|
||||
|
||||
@@ -115,10 +115,7 @@ public:
|
||||
ETLSource(std::string ip, std::string wsPort);
|
||||
|
||||
/// Create ETL source with gRPC endpoint
|
||||
ETLSource(
|
||||
std::string ip,
|
||||
std::string wsPort,
|
||||
std::string grpcPort);
|
||||
ETLSource(std::string ip, std::string wsPort, std::string grpcPort);
|
||||
|
||||
/// @param sequence ledger sequence to check for
|
||||
/// @return true if this source has the desired ledger
|
||||
@@ -195,7 +192,6 @@ public:
|
||||
void
|
||||
stop()
|
||||
{
|
||||
|
||||
assert(ws_);
|
||||
close(false);
|
||||
}
|
||||
@@ -216,7 +212,6 @@ public:
|
||||
", grpc port : " + grpcPort_ + " }";
|
||||
}
|
||||
|
||||
|
||||
/// Download a ledger in full
|
||||
/// @param ledgerSequence sequence of the ledger to download
|
||||
/// @param writeQueue queue to push downloaded ledger objects
|
||||
@@ -273,7 +268,6 @@ public:
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
getP2pForwardingStub() const;
|
||||
*/
|
||||
|
||||
};
|
||||
/*
|
||||
*
|
||||
|
||||
@@ -13,24 +13,27 @@
|
||||
//
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
#include <boost/beast/core.hpp>
|
||||
#include <boost/beast/websocket.hpp>
|
||||
#include <boost/asio/dispatch.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/beast/core.hpp>
|
||||
#include <boost/beast/websocket.hpp>
|
||||
#include <boost/json.hpp>
|
||||
#include <algorithm>
|
||||
#include <cstdlib>
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
namespace beast = boost::beast; // from <boost/beast.hpp>
|
||||
namespace http = beast::http; // from <boost/beast/http.hpp>
|
||||
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
|
||||
namespace net = boost::asio; // from <boost/asio.hpp>
|
||||
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
|
||||
namespace beast = boost::beast; // from <boost/beast.hpp>
|
||||
namespace http = beast::http; // from <boost/beast/http.hpp>
|
||||
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
|
||||
namespace net = boost::asio; // from <boost/asio.hpp>
|
||||
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -49,9 +52,7 @@ class session : public std::enable_shared_from_this<session>
|
||||
|
||||
public:
|
||||
// Take ownership of the socket
|
||||
explicit
|
||||
session(tcp::socket&& socket)
|
||||
: ws_(std::move(socket))
|
||||
explicit session(tcp::socket&& socket) : ws_(std::move(socket))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -63,10 +64,9 @@ public:
|
||||
// on the I/O objects in this session. Although not strictly necessary
|
||||
// for single-threaded contexts, this example code is written to be
|
||||
// thread-safe by default.
|
||||
net::dispatch(ws_.get_executor(),
|
||||
beast::bind_front_handler(
|
||||
&session::on_run,
|
||||
shared_from_this()));
|
||||
net::dispatch(
|
||||
ws_.get_executor(),
|
||||
beast::bind_front_handler(&session::on_run, shared_from_this()));
|
||||
}
|
||||
|
||||
// Start the asynchronous operation
|
||||
@@ -74,29 +74,26 @@ public:
|
||||
on_run()
|
||||
{
|
||||
// Set suggested timeout settings for the websocket
|
||||
ws_.set_option(
|
||||
websocket::stream_base::timeout::suggested(
|
||||
beast::role_type::server));
|
||||
ws_.set_option(websocket::stream_base::timeout::suggested(
|
||||
beast::role_type::server));
|
||||
|
||||
// Set a decorator to change the Server of the handshake
|
||||
ws_.set_option(websocket::stream_base::decorator(
|
||||
[](websocket::response_type& res)
|
||||
{
|
||||
res.set(http::field::server,
|
||||
[](websocket::response_type& res) {
|
||||
res.set(
|
||||
http::field::server,
|
||||
std::string(BOOST_BEAST_VERSION_STRING) +
|
||||
" websocket-server-async");
|
||||
}));
|
||||
// Accept the websocket handshake
|
||||
ws_.async_accept(
|
||||
beast::bind_front_handler(
|
||||
&session::on_accept,
|
||||
shared_from_this()));
|
||||
beast::bind_front_handler(&session::on_accept, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
on_accept(beast::error_code ec)
|
||||
{
|
||||
if(ec)
|
||||
if (ec)
|
||||
return fail(ec, "accept");
|
||||
|
||||
// Read a message
|
||||
@@ -109,42 +106,34 @@ public:
|
||||
// Read a message into our buffer
|
||||
ws_.async_read(
|
||||
buffer_,
|
||||
beast::bind_front_handler(
|
||||
&session::on_read,
|
||||
shared_from_this()));
|
||||
beast::bind_front_handler(&session::on_read, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
on_read(
|
||||
beast::error_code ec,
|
||||
std::size_t bytes_transferred)
|
||||
on_read(beast::error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
|
||||
// This indicates that the session was closed
|
||||
if(ec == websocket::error::closed)
|
||||
if (ec == websocket::error::closed)
|
||||
return;
|
||||
|
||||
if(ec)
|
||||
if (ec)
|
||||
fail(ec, "read");
|
||||
|
||||
// Echo the message
|
||||
ws_.text(ws_.got_text());
|
||||
ws_.async_write(
|
||||
buffer_.data(),
|
||||
beast::bind_front_handler(
|
||||
&session::on_write,
|
||||
shared_from_this()));
|
||||
beast::bind_front_handler(&session::on_write, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
on_write(
|
||||
beast::error_code ec,
|
||||
std::size_t bytes_transferred)
|
||||
on_write(beast::error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
|
||||
if(ec)
|
||||
if (ec)
|
||||
return fail(ec, "write");
|
||||
|
||||
// Clear the buffer
|
||||
@@ -164,17 +153,14 @@ class listener : public std::enable_shared_from_this<listener>
|
||||
tcp::acceptor acceptor_;
|
||||
|
||||
public:
|
||||
listener(
|
||||
net::io_context& ioc,
|
||||
tcp::endpoint endpoint)
|
||||
: ioc_(ioc)
|
||||
, acceptor_(ioc)
|
||||
listener(net::io_context& ioc, tcp::endpoint endpoint)
|
||||
: ioc_(ioc), acceptor_(ioc)
|
||||
{
|
||||
beast::error_code ec;
|
||||
|
||||
// Open the acceptor
|
||||
acceptor_.open(endpoint.protocol(), ec);
|
||||
if(ec)
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "open");
|
||||
return;
|
||||
@@ -182,7 +168,7 @@ public:
|
||||
|
||||
// Allow address reuse
|
||||
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
|
||||
if(ec)
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "set_option");
|
||||
return;
|
||||
@@ -190,16 +176,15 @@ public:
|
||||
|
||||
// Bind to the server address
|
||||
acceptor_.bind(endpoint, ec);
|
||||
if(ec)
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "bind");
|
||||
return;
|
||||
}
|
||||
|
||||
// Start listening for connections
|
||||
acceptor_.listen(
|
||||
net::socket_base::max_listen_connections, ec);
|
||||
if(ec)
|
||||
acceptor_.listen(net::socket_base::max_listen_connections, ec);
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "listen");
|
||||
return;
|
||||
@@ -221,14 +206,13 @@ private:
|
||||
acceptor_.async_accept(
|
||||
net::make_strand(ioc_),
|
||||
beast::bind_front_handler(
|
||||
&listener::on_accept,
|
||||
shared_from_this()));
|
||||
&listener::on_accept, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
on_accept(beast::error_code ec, tcp::socket socket)
|
||||
{
|
||||
if(ec)
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "accept");
|
||||
}
|
||||
@@ -243,22 +227,38 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
std::optional<boost::json::object>
|
||||
parse_config(const char* filename)
|
||||
{
|
||||
std::ifstream in(filename, std::ios::in | std::ios::binary);
|
||||
if (in)
|
||||
{
|
||||
std::stringstream contents;
|
||||
contents << in.rdbuf();
|
||||
in.close();
|
||||
boost::json::value value = boost::json::parse(contents.str());
|
||||
return value.as_object();
|
||||
}
|
||||
return {};
|
||||
}
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
int
|
||||
main(int argc, char* argv[])
|
||||
{
|
||||
// Check command line arguments.
|
||||
if (argc != 4)
|
||||
{
|
||||
std::cerr <<
|
||||
"Usage: websocket-server-async <address> <port> <threads>\n" <<
|
||||
"Example:\n" <<
|
||||
" websocket-server-async 0.0.0.0 8080 1\n";
|
||||
std::cerr
|
||||
<< "Usage: websocket-server-async <address> <port> <threads>\n"
|
||||
<< "Example:\n"
|
||||
<< " websocket-server-async 0.0.0.0 8080 1\n";
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
auto const address = net::ip::make_address(argv[1]);
|
||||
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
|
||||
auto const threads = std::max<int>(1, std::atoi(argv[3]));
|
||||
auto const config = parse_config(argv[4]);
|
||||
|
||||
// The io_context is required for all I/O
|
||||
net::io_context ioc{threads};
|
||||
@@ -269,12 +269,8 @@ int main(int argc, char* argv[])
|
||||
// Run the I/O service on the requested number of threads
|
||||
std::vector<std::thread> v;
|
||||
v.reserve(threads - 1);
|
||||
for(auto i = threads - 1; i > 0; --i)
|
||||
v.emplace_back(
|
||||
[&ioc]
|
||||
{
|
||||
ioc.run();
|
||||
});
|
||||
for (auto i = threads - 1; i > 0; --i)
|
||||
v.emplace_back([&ioc] { ioc.run(); });
|
||||
ioc.run();
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user