rippled
Loading...
Searching...
No Matches
ResolverAsio.cpp
1#include <xrpl/basics/Log.h>
2#include <xrpl/basics/Resolver.h>
3#include <xrpl/basics/ResolverAsio.h>
4#include <xrpl/beast/net/IPAddressConversion.h>
5#include <xrpl/beast/net/IPEndpoint.h>
6#include <xrpl/beast/utility/Journal.h>
7#include <xrpl/beast/utility/instrumentation.h>
8
9#include <boost/asio/bind_executor.hpp>
10#include <boost/asio/error.hpp>
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/system/detail/error_code.hpp>
14
15#include <algorithm>
16#include <atomic>
17#include <cctype>
18#include <condition_variable>
19#include <deque>
20#include <functional>
21#include <iterator>
22#include <locale>
23#include <memory>
24#include <mutex>
25#include <string>
26#include <utility>
27#include <vector>
28
29namespace xrpl {
30
35template <class Derived>
37{
38protected:
40 {
41 }
42
43public:
45 {
46 // Destroying the object with I/O pending? Not a clean exit!
47 XRPL_ASSERT(m_pending.load() == 0, "xrpl::AsyncObject::~AsyncObject : nothing pending");
48 }
49
55 {
56 public:
57 explicit CompletionCounter(Derived* owner) : m_owner(owner)
58 {
59 ++m_owner->m_pending;
60 }
61
63 {
64 ++m_owner->m_pending;
65 }
66
68 {
69 if (--m_owner->m_pending == 0)
70 m_owner->asyncHandlersComplete();
71 }
72
74 operator=(CompletionCounter const&) = delete;
75
76 private:
77 Derived* m_owner;
78 };
79
80 void
82 {
83 ++m_pending;
84 }
85
86 void
88 {
89 if (--m_pending == 0)
90 (static_cast<Derived*>(this))->asyncHandlersComplete();
91 }
92
93private:
94 // The number of handlers pending.
96};
97
98class ResolverAsioImpl : public ResolverAsio, public AsyncObject<ResolverAsioImpl>
99{
100public:
102
104
105 boost::asio::io_context& m_io_context;
106 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
107 boost::asio::ip::tcp::resolver m_resolver;
108
112
115
116 // Represents a unit of work for the resolver to do
117 struct Work
118 {
121
122 template <class StringSequence>
123 Work(StringSequence const& names_, HandlerType const& handler_) : handler(handler_)
124 {
125 names.reserve(names_.size());
126
127 std::reverse_copy(names_.begin(), names_.end(), std::back_inserter(names));
128 }
129 };
130
132
133 ResolverAsioImpl(boost::asio::io_context& io_context, beast::Journal journal)
134 : m_journal(journal)
135 , m_io_context(io_context)
136 , m_strand(boost::asio::make_strand(io_context))
137 , m_resolver(io_context)
139 , m_stop_called(false)
140 , m_stopped(true)
141 {
142 }
143
145 {
146 XRPL_ASSERT(m_work.empty(), "xrpl::ResolverAsioImpl::~ResolverAsioImpl : no pending work");
147 XRPL_ASSERT(m_stopped, "xrpl::ResolverAsioImpl::~ResolverAsioImpl : stopped");
148 }
149
150 //-------------------------------------------------------------------------
151 // AsyncObject
152 void
159
160 //--------------------------------------------------------------------------
161 //
162 // Resolver
163 //
164 //--------------------------------------------------------------------------
165
166 void
167 start() override
168 {
169 XRPL_ASSERT(m_stopped == true, "xrpl::ResolverAsioImpl::start : stopped");
170 XRPL_ASSERT(m_stop_called == false, "xrpl::ResolverAsioImpl::start : not stopping");
171
172 if (m_stopped.exchange(false) == true)
173 {
174 {
177 }
178 addReference();
179 }
180 }
181
182 void
183 stop_async() override
184 {
185 if (m_stop_called.exchange(true) == false)
186 {
187 boost::asio::dispatch(
189 boost::asio::bind_executor(
190 m_strand, std::bind(&ResolverAsioImpl::do_stop, this, CompletionCounter(this))));
191
192 JLOG(m_journal.debug()) << "Queued a stop request";
193 }
194 }
195
196 void
197 stop() override
198 {
199 stop_async();
200
201 JLOG(m_journal.debug()) << "Waiting to stop";
203 m_cv.wait(lk, [this] { return m_asyncHandlersCompleted; });
204 lk.unlock();
205 JLOG(m_journal.debug()) << "Stopped";
206 }
207
208 void
209 resolve(std::vector<std::string> const& names, HandlerType const& handler) override
210 {
211 XRPL_ASSERT(m_stop_called == false, "xrpl::ResolverAsioImpl::resolve : not stopping");
212 XRPL_ASSERT(!names.empty(), "xrpl::ResolverAsioImpl::resolve : names non-empty");
213
214 // TODO NIKB use rvalue references to construct and move
215 // reducing cost.
216 boost::asio::dispatch(
218 boost::asio::bind_executor(
219 m_strand, std::bind(&ResolverAsioImpl::do_resolve, this, names, handler, CompletionCounter(this))));
220 }
221
222 //-------------------------------------------------------------------------
223 // Resolver
224 void
225 do_stop(CompletionCounter)
226 {
227 XRPL_ASSERT(m_stop_called == true, "xrpl::ResolverAsioImpl::do_stop : stopping");
228
229 if (m_stopped.exchange(true) == false)
230 {
231 m_work.clear();
232 m_resolver.cancel();
233
235 }
236 }
237
238 void
240 std::string name,
241 boost::system::error_code const& ec,
242 HandlerType handler,
243 boost::asio::ip::tcp::resolver::results_type results,
244 CompletionCounter)
245 {
246 if (ec == boost::asio::error::operation_aborted)
247 return;
248
250 auto iter = results.begin();
251
252 // If we get an error message back, we don't return any
253 // results that we may have gotten.
254 if (!ec)
255 {
256 while (iter != results.end())
257 {
259 ++iter;
260 }
261 }
262
263 handler(name, addresses);
264
265 boost::asio::post(
267 boost::asio::bind_executor(m_strand, std::bind(&ResolverAsioImpl::do_work, this, CompletionCounter(this))));
268 }
269
272 {
273 // first attempt to parse as an endpoint (IP addr + port).
274 // If that doesn't succeed, fall back to generic name + port parsing
275
276 if (auto const result = beast::IP::Endpoint::from_string_checked(str))
277 {
278 return make_pair(result->address().to_string(), std::to_string(result->port()));
279 }
280
281 // generic name/port parsing, which doesn't work for
282 // IPv6 addresses in particular because it considers a colon
283 // a port separator
284
285 // Attempt to find the first and last non-whitespace
286 auto const find_whitespace =
288
289 auto host_first = std::find_if_not(str.begin(), str.end(), find_whitespace);
290
291 auto port_last = std::find_if_not(str.rbegin(), str.rend(), find_whitespace).base();
292
293 // This should only happen for all-whitespace strings
294 if (host_first >= port_last)
296
297 // Attempt to find the first and last valid port separators
298 auto const find_port_separator = [](char const c) -> bool {
299 if (std::isspace(static_cast<unsigned char>(c)))
300 return true;
301
302 if (c == ':')
303 return true;
304
305 return false;
306 };
307
308 auto host_last = std::find_if(host_first, port_last, find_port_separator);
309
310 auto port_first = std::find_if_not(host_last, port_last, find_port_separator);
311
312 return make_pair(std::string(host_first, host_last), std::string(port_first, port_last));
313 }
314
315 void
316 do_work(CompletionCounter)
317 {
318 if (m_stop_called == true)
319 return;
320
321 // We don't have any work to do at this time
322 if (m_work.empty())
323 return;
324
325 std::string const name(m_work.front().names.back());
326 HandlerType handler(m_work.front().handler);
327
328 m_work.front().names.pop_back();
329
330 if (m_work.front().names.empty())
331 m_work.pop_front();
332
333 auto const [host, port] = parseName(name);
334
335 if (host.empty())
336 {
337 JLOG(m_journal.error()) << "Unable to parse '" << name << "'";
338
339 boost::asio::post(
341 boost::asio::bind_executor(
342 m_strand, std::bind(&ResolverAsioImpl::do_work, this, CompletionCounter(this))));
343
344 return;
345 }
346
347 m_resolver.async_resolve(
348 host,
349 port,
350 std::bind(
352 this,
353 name,
354 std::placeholders::_1,
355 handler,
356 std::placeholders::_2,
357 CompletionCounter(this)));
358 }
359
360 void
361 do_resolve(std::vector<std::string> const& names, HandlerType const& handler, CompletionCounter)
362 {
363 XRPL_ASSERT(!names.empty(), "xrpl::ResolverAsioImpl::do_resolve : names non-empty");
364
365 if (m_stop_called == false)
366 {
367 m_work.emplace_back(names, handler);
368
369 JLOG(m_journal.debug()) << "Queued new job with " << names.size() << " tasks. " << m_work.size()
370 << " jobs outstanding.";
371
372 if (m_work.size() > 0)
373 {
374 boost::asio::post(
376 boost::asio::bind_executor(
377 m_strand, std::bind(&ResolverAsioImpl::do_work, this, CompletionCounter(this))));
378 }
379 }
380 }
381};
382
383//-----------------------------------------------------------------------------
384
386ResolverAsio::New(boost::asio::io_context& io_context, beast::Journal journal)
387{
388 return std::make_unique<ResolverAsioImpl>(io_context, journal);
389}
390
391//-----------------------------------------------------------------------------
392Resolver::~Resolver() = default;
393} // namespace xrpl
T back_inserter(T... args)
T begin(T... args)
T bind(T... args)
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
RAII container that maintains the count of pending I/O.
CompletionCounter & operator=(CompletionCounter const &)=delete
CompletionCounter(CompletionCounter const &other)
Mix-in to track when all pending I/O is complete.
std::atomic< int > m_pending
std::atomic< bool > m_stop_called
void do_stop(CompletionCounter)
boost::asio::io_context & m_io_context
boost::asio::ip::tcp::resolver m_resolver
ResolverAsioImpl(boost::asio::io_context &io_context, beast::Journal journal)
void resolve(std::vector< std::string > const &names, HandlerType const &handler) override
void start() override
Issue a synchronous start request.
void do_finish(std::string name, boost::system::error_code const &ec, HandlerType handler, boost::asio::ip::tcp::resolver::results_type results, CompletionCounter)
std::deque< Work > m_work
void stop() override
Issue a synchronous stop request.
std::condition_variable m_cv
std::atomic< bool > m_stopped
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_resolve(std::vector< std::string > const &names, HandlerType const &handler, CompletionCounter)
HostAndPort parseName(std::string const &str)
std::pair< std::string, std::string > HostAndPort
void stop_async() override
Issue an asynchronous stop request.
void do_work(CompletionCounter)
static std::unique_ptr< ResolverAsio > New(boost::asio::io_context &, beast::Journal)
virtual ~Resolver()=0
T empty(T... args)
T end(T... args)
T exchange(T... args)
T find_if_not(T... args)
T is_same_v
T load(T... args)
T make_pair(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T push_back(T... args)
T rbegin(T... args)
T rend(T... args)
T reserve(T... args)
T reverse_copy(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
std::vector< std::string > names
Work(StringSequence const &names_, HandlerType const &handler_)
T to_string(T... args)