21#include <test/jtx/WSClient.h>
23#include <xrpl/json/json_reader.h>
24#include <xrpl/json/to_string.h>
25#include <xrpl/protocol/jss.h>
26#include <xrpl/server/Port.h>
28#include <boost/beast/core/multi_buffer.hpp>
29#include <boost/beast/websocket.hpp>
51 static boost::asio::ip::tcp::endpoint
57 auto const ps = v2 ?
"ws2" :
"ws";
66 using namespace boost::asio::ip;
67 if (pp.
ip && pp.
ip->is_unspecified())
68 *pp.
ip = pp.
ip->is_v6() ? address{address_v6::loopback()}
69 : address{address_v4::loopback()};
72 Throw<std::runtime_error>(
"Use fixConfigPorts with auto ports");
76 Throw<std::runtime_error>(
"Missing WebSocket port");
80 template <
class ConstBuffers>
84 using boost::asio::buffer;
85 using boost::asio::buffer_size;
88 buffer_copy(buffer(&s[0], s.
size()), b);
92 boost::asio::io_service
ios_;
97 boost::beast::websocket::stream<boost::asio::ip::tcp::socket&>
ws_;
98 boost::beast::multi_buffer
rb_;
120 ws_.async_close({}, strand_.wrap([&](error_code ec) {
125 work_ = std::nullopt;
133 unsigned rpc_version,
137 , thread_([&] { ios_.run(); })
140 , rpc_version_(rpc_version)
144 auto const ep = getEndpoint(cfg, v2);
146 ws_.set_option(boost::beast::websocket::stream_base::decorator(
147 [&](boost::beast::websocket::request_type& req) {
148 for (
auto const& h : headers)
149 req.set(h.first, h.second);
174 using boost::asio::buffer;
175 using namespace std::chrono_literals;
181 if (rpc_version_ == 2)
183 jp[jss::method] = cmd;
184 jp[jss::jsonrpc] =
"2.0";
185 jp[jss::ripplerpc] =
"2.0";
189 jp[jss::command] = cmd;
191 ws_.write_some(
true, buffer(s));
194 auto jv = findMsg(5s, [&](
Json::Value const& jval) {
195 return jval[jss::type] == jss::response;
200 jv->removeMember(jss::type);
201 if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
204 ret[jss::result] = *jv;
205 if ((*jv).isMember(jss::error))
206 ret[jss::error] = (*jv)[jss::error];
207 ret[jss::status] = jss::error;
210 if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
211 (*jv)[jss::result][jss::status] = (*jv)[jss::status];
223 if (!cv_.
wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
225 m = std::move(msgs_.
back());
228 return std::move(m->jv);
239 if (!cv_.
wait_for(lock, timeout, [&] {
240 for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
255 return std::move(m->jv);
270 if (ec == boost::beast::websocket::error::closed)
277 jr.
parse(buffer_string(rb_.data()), jv);
278 rb_.consume(rb_.size());
279 auto m = std::make_shared<msg>(std::move(jv));
288 &WSClientImpl::on_read_msg,
this, std::placeholders::_1)));
305 unsigned rpc_version,
308 return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
Unserialize a JSON document into a Value.
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Holds unparsed configuration information.
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
std::vector< std::string > const & values() const
Returns all the values in the section.
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
boost::system::error_code error_code
std::condition_variable cv0_
static std::string buffer_string(ConstBuffers const &b)
boost::beast::multi_buffer rb_
std::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Json::Value invoke(std::string const &cmd, Json::Value const ¶ms) override
Submit a command synchronously.
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
void on_read_msg(error_code const &ec)
boost::asio::io_service::strand strand_
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
boost::asio::io_service ios_
std::optional< boost::asio::io_service::work > work_
std::list< std::shared_ptr< msg > > msgs_
boost::asio::ip::tcp::socket stream_
std::condition_variable cv_
std::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
void parse_Port(ParsedPort &port, Section const §ion, std::ostream &log)
std::string to_string(base_uint< Bits, Tag > const &a)
void Rethrow()
Rethrow the exception currently being handled.
std::optional< std::uint16_t > port
std::set< std::string, boost::beast::iless > protocol
std::optional< boost::asio::ip::address > ip