mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
@@ -21,10 +21,14 @@
|
||||
|
||||
#include "util/Taggable.hpp"
|
||||
#include "web/SubscriptionContextInterface.hpp"
|
||||
#include "web/interface/ConnectionBase.hpp"
|
||||
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
@@ -32,22 +36,39 @@ namespace web {
|
||||
|
||||
SubscriptionContext::SubscriptionContext(
|
||||
util::TagDecoratorFactory const& factory,
|
||||
std::shared_ptr<ConnectionBase> connection
|
||||
impl::WsConnectionBase& connection,
|
||||
std::optional<size_t> maxSendQueueSize,
|
||||
boost::asio::yield_context yield,
|
||||
ErrorHandler errorHandler
|
||||
)
|
||||
: SubscriptionContextInterface{factory}, connection_{connection}
|
||||
: web::SubscriptionContextInterface(factory)
|
||||
, connection_(connection)
|
||||
, maxSendQueueSize_(maxSendQueueSize)
|
||||
, tasksGroup_(yield)
|
||||
, yield_(yield)
|
||||
, errorHandler_(std::move(errorHandler))
|
||||
{
|
||||
}
|
||||
|
||||
SubscriptionContext::~SubscriptionContext()
|
||||
{
|
||||
onDisconnect_(this);
|
||||
}
|
||||
|
||||
void
|
||||
SubscriptionContext::send(std::shared_ptr<std::string> message)
|
||||
{
|
||||
if (auto connection = connection_.lock(); connection != nullptr)
|
||||
connection->send(std::move(message));
|
||||
if (disconnected_)
|
||||
return;
|
||||
|
||||
if (maxSendQueueSize_.has_value() and tasksGroup_.size() >= *maxSendQueueSize_) {
|
||||
tasksGroup_.spawn(yield_, [this](boost::asio::yield_context innerYield) {
|
||||
connection_.get().close(innerYield);
|
||||
});
|
||||
disconnected_ = true;
|
||||
return;
|
||||
}
|
||||
|
||||
tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) {
|
||||
auto const maybeError = connection_.get().sendBuffer(boost::asio::buffer(*message), innerYield);
|
||||
if (maybeError.has_value() and errorHandler_(*maybeError, connection_))
|
||||
connection_.get().close(innerYield);
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
@@ -59,13 +80,21 @@ SubscriptionContext::onDisconnect(OnDisconnectSlot const& slot)
|
||||
void
|
||||
SubscriptionContext::setApiSubversion(uint32_t value)
|
||||
{
|
||||
apiSubVersion_ = value;
|
||||
apiSubversion_ = value;
|
||||
}
|
||||
|
||||
uint32_t
|
||||
SubscriptionContext::apiSubversion() const
|
||||
{
|
||||
return apiSubVersion_;
|
||||
return apiSubversion_;
|
||||
}
|
||||
|
||||
void
|
||||
SubscriptionContext::disconnect(boost::asio::yield_context yield)
|
||||
{
|
||||
onDisconnect_(this);
|
||||
disconnected_ = true;
|
||||
tasksGroup_.asyncWait(yield);
|
||||
}
|
||||
|
||||
} // namespace web
|
||||
|
||||
Reference in New Issue
Block a user