mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-03 08:46:46 +00:00
2.9 KiB
2.9 KiB
WebSockets
Async WebSocket support for client RPC and real-time subscriptions. Both plain and SSL. Built on Boost.Beast + Boost.Asio.
Key Invariants
- All async operations run on a per-session Boost.Asio strand for thread safety
- Outgoing message queue (
wq_) has a per-session limit (port().ws_queue_limit); exceeding it closes the connection with policy error "client is too slow" complete()must be called after processing each message to resume reading; forgetting it stalls the sessionWSInfoSubholds a weak pointer toWSSession; dead sessions are automatically skipped during event delivery- Message size limit:
RPC::Tuning::maxRequestSize; oversized messages get ajsonInvaliderror response
Connection Flow
Dooraccepts TCP ->Detectorprobes for SSL vs plain -> createsSSLHTTPPeerorPlainHTTPPeer- HTTP request with WebSocket upgrade ->
ServerHandler::onHandoff->session.websocketUpgrade()-> createsPlainWSPeerorSSLWSPeer BaseWSPeer::run()-> set permessage-deflate options ->async_accepthandshake ->on_ws_handshake->do_readloopon_read->ServerHandler::onWSMessage-> validate JSON -> post to job queue ->processSession-> send response ->complete()
Common Bug Patterns
on_readconsumes the buffer AFTER callingonWSMessage; if the handler accesses the buffer asynchronously afteronWSMessagereturns, it reads garbageclose()with pending messages defers the actual close; callingsend()afterclose()but before actual close queues more messages- Missing
complete()call after sending response = session never reads again = appears hung - Job queue shutdown: if
postCororeturns nullptr, session must close withgoing_away; dropping this silently leaks sessions
Review Checklist
- Verify strand execution for all socket operations (read, write, close, timer)
- New subscription streams: ensure
WSInfoSub::sendhandles the new event type - Flow control: test with slow clients to verify queue limit enforcement
Key Patterns
complete() Resumes Read Loop
// REQUIRED after sending response — missing this = session hangs forever
void BaseWSPeer::complete()
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(
&BaseWSPeer::complete, impl().shared_from_this()));
do_read(); // resume reading next message
}
Queue Limit Enforcement
// REQUIRED: close slow clients to prevent memory exhaustion
if (wq_.size() >= port().ws_queue_limit)
{
close(boost::beast::websocket::close_code::policy_error);
return; // do NOT queue unboundedly
}
Key Files
include/xrpl/server/detail/BaseWSPeer.h- session lifecycle and message queueinclude/xrpl/server/detail/Door.h- connection acceptancesrc/xrpld/rpc/detail/ServerHandler.cpp-onWSMessageandonHandoffsrc/xrpld/rpc/detail/WSInfoSub.h- subscription delivery