rippled
PathRequests.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/LedgerMaster.h>
21 #include <ripple/app/main/Application.h>
22 #include <ripple/app/paths/PathRequests.h>
23 #include <ripple/basics/Log.h>
24 #include <ripple/core/JobQueue.h>
25 #include <ripple/net/RPCErr.h>
26 #include <ripple/protocol/ErrorCodes.h>
27 #include <ripple/protocol/jss.h>
28 #include <ripple/resource/Fees.h>
29 #include <algorithm>
30 
31 namespace ripple {
32 
38  std::shared_ptr<ReadView const> const& ledger,
39  bool authoritative)
40 {
42 
43  auto lineCache = lineCache_.lock();
44 
45  std::uint32_t const lineSeq = lineCache ? lineCache->getLedger()->seq() : 0;
46  std::uint32_t const lgrSeq = ledger->seq();
47  JLOG(mJournal.debug()) << "getLineCache has cache for " << lineSeq
48  << ", considering " << lgrSeq;
49 
50  if ((lineSeq == 0) || // no ledger
51  (authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger
52  (authoritative &&
53  ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason
54  (lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
55  {
56  JLOG(mJournal.debug())
57  << "getLineCache creating new cache for " << lgrSeq;
58  // Assign to the local before the member, because the member is a
59  // weak_ptr, and will immediately discard it if there are no other
60  // references.
61  lineCache_ = lineCache = std::make_shared<RippleLineCache>(
62  ledger, app_.journal("RippleLineCache"));
63  }
64  return lineCache;
65 }
66 
67 void
69 {
70  auto event =
71  app_.getJobQueue().makeLoadEvent(jtPATH_FIND, "PathRequest::updateAll");
72 
75 
76  // Get the ledger and cache we should be using
77  {
79  requests = requests_;
80  cache = getLineCache(inLedger, true);
81  }
82 
83  bool newRequests = app_.getLedgerMaster().isNewPathRequest();
84  bool mustBreak = false;
85 
86  JLOG(mJournal.trace()) << "updateAll seq=" << cache->getLedger()->seq()
87  << ", " << requests.size() << " requests";
88 
89  int processed = 0, removed = 0;
90 
91  auto getSubscriber =
92  [](PathRequest::pointer const& request) -> InfoSub::pointer {
93  if (auto ipSub = request->getSubscriber();
94  ipSub && ipSub->getRequest() == request)
95  {
96  return ipSub;
97  }
98  request->doAborting();
99  return nullptr;
100  };
101 
102  do
103  {
104  JLOG(mJournal.trace()) << "updateAll looping";
105  for (auto const& wr : requests)
106  {
107  if (app_.getJobQueue().isStopping())
108  break;
109 
110  auto request = wr.lock();
111  bool remove = true;
112  JLOG(mJournal.trace())
113  << "updateAll request " << (request ? "" : "not ") << "found";
114 
115  if (request)
116  {
117  auto continueCallback = [&getSubscriber, &request]() {
118  // This callback is used by doUpdate to determine whether to
119  // continue working. If getSubscriber returns null, that
120  // indicates that this request is no longer relevant.
121  return (bool)getSubscriber(request);
122  };
123  if (!request->needsUpdate(
124  newRequests, cache->getLedger()->seq()))
125  remove = false;
126  else
127  {
128  if (auto ipSub = getSubscriber(request))
129  {
130  if (!ipSub->getConsumer().warn())
131  {
132  // Release the shared ptr to the subscriber so that
133  // it can be freed if the client disconnects, and
134  // thus fail to lock later.
135  ipSub.reset();
136  Json::Value update = request->doUpdate(
137  cache, false, continueCallback);
138  request->updateComplete();
139  update[jss::type] = "path_find";
140  if ((ipSub = getSubscriber(request)))
141  {
142  ipSub->send(update, false);
143  remove = false;
144  ++processed;
145  }
146  }
147  }
148  else if (request->hasCompletion())
149  {
150  // One-shot request with completion function
151  request->doUpdate(cache, false);
152  request->updateComplete();
153  ++processed;
154  }
155  }
156  }
157 
158  if (remove)
159  {
161 
162  // Remove any dangling weak pointers or weak
163  // pointers that refer to this path request.
164  auto ret = std::remove_if(
165  requests_.begin(),
166  requests_.end(),
167  [&removed, &request](auto const& wl) {
168  auto r = wl.lock();
169 
170  if (r && r != request)
171  return false;
172  ++removed;
173  return true;
174  });
175 
176  requests_.erase(ret, requests_.end());
177  }
178 
179  mustBreak =
180  !newRequests && app_.getLedgerMaster().isNewPathRequest();
181 
182  // We weren't handling new requests and then
183  // there was a new request
184  if (mustBreak)
185  break;
186  }
187 
188  if (mustBreak)
189  { // a new request came in while we were working
190  newRequests = true;
191  }
192  else if (newRequests)
193  { // we only did new requests, so we always need a last pass
194  newRequests = app_.getLedgerMaster().isNewPathRequest();
195  }
196  else
197  { // if there are no new requests, we are done
198  newRequests = app_.getLedgerMaster().isNewPathRequest();
199  if (!newRequests)
200  break;
201  }
202 
203  // Hold on to the line cache until after the lock is released, so it can
204  // be destroyed outside of the lock
206  {
207  // Get the latest requests, cache, and ledger for next pass
209 
210  if (requests_.empty())
211  break;
212  requests = requests_;
213  lastCache = cache;
214  cache = getLineCache(cache->getLedger(), false);
215  }
216  } while (!app_.getJobQueue().isStopping());
217 
218  JLOG(mJournal.debug()) << "updateAll complete: " << processed
219  << " processed and " << removed << " removed";
220 }
221 
222 bool
224 {
226  return !requests_.empty();
227 }
228 
229 void
231 {
233 
234  // Insert after any older unserviced requests but before
235  // any serviced requests
236  auto ret =
237  std::find_if(requests_.begin(), requests_.end(), [](auto const& wl) {
238  auto r = wl.lock();
239 
240  // We come before handled requests
241  return r && !r->isNew();
242  });
243 
244  requests_.emplace(ret, req);
245 }
246 
247 // Make a new-style path_find request
250  std::shared_ptr<InfoSub> const& subscriber,
251  std::shared_ptr<ReadView const> const& inLedger,
252  Json::Value const& requestJson)
253 {
254  auto req = std::make_shared<PathRequest>(
255  app_, subscriber, ++mLastIdentifier, *this, mJournal);
256 
257  auto [valid, jvRes] =
258  req->doCreate(getLineCache(inLedger, false), requestJson);
259 
260  if (valid)
261  {
262  subscriber->setRequest(req);
263  insertPathRequest(req);
265  }
266  return std::move(jvRes);
267 }
268 
269 // Make an old-style ripple_path_find request
273  std::function<void(void)> completion,
274  Resource::Consumer& consumer,
275  std::shared_ptr<ReadView const> const& inLedger,
276  Json::Value const& request)
277 {
278  // This assignment must take place before the
279  // completion function is called
280  req = std::make_shared<PathRequest>(
281  app_, completion, consumer, ++mLastIdentifier, *this, mJournal);
282 
283  auto [valid, jvRes] = req->doCreate(getLineCache(inLedger, false), request);
284 
285  if (!valid)
286  {
287  req.reset();
288  }
289  else
290  {
291  insertPathRequest(req);
293  {
294  // The newPathRequest failed. Tell the caller.
295  jvRes = rpcError(rpcTOO_BUSY);
296  req.reset();
297  }
298  }
299 
300  return std::move(jvRes);
301 }
302 
305  Resource::Consumer& consumer,
306  std::shared_ptr<ReadView const> const& inLedger,
307  Json::Value const& request)
308 {
309  auto cache = std::make_shared<RippleLineCache>(
310  inLedger, app_.journal("RippleLineCache"));
311 
312  auto req = std::make_shared<PathRequest>(
313  app_, [] {}, consumer, ++mLastIdentifier, *this, mJournal);
314 
315  auto [valid, jvRes] = req->doCreate(cache, request);
316  if (valid)
317  jvRes = req->doUpdate(cache, false);
318  return std::move(jvRes);
319 }
320 
321 } // namespace ripple
ripple::PathRequests::doLegacyPathRequest
Json::Value doLegacyPathRequest(Resource::Consumer &consumer, std::shared_ptr< ReadView const > const &inLedger, Json::Value const &request)
Definition: PathRequests.cpp:304
std::shared_ptr
STL class.
ripple::PathRequests::makePathRequest
Json::Value makePathRequest(std::shared_ptr< InfoSub > const &subscriber, std::shared_ptr< ReadView const > const &ledger, Json::Value const &request)
Definition: PathRequests.cpp:249
ripple::rpcError
Json::Value rpcError(int iError)
Definition: RPCErr.cpp:29
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:308
ripple::jtPATH_FIND
@ jtPATH_FIND
Definition: Job.h:85
std::vector
STL class.
std::find_if
T find_if(T... args)
std::vector::size
T size(T... args)
ripple::PathRequests::makeLegacyPathRequest
Json::Value makeLegacyPathRequest(PathRequest::pointer &req, std::function< void(void)> completion, Resource::Consumer &consumer, std::shared_ptr< ReadView const > const &inLedger, Json::Value const &request)
Definition: PathRequests.cpp:271
std::lock_guard
STL class.
ripple::rpcTOO_BUSY
@ rpcTOO_BUSY
Definition: ErrorCodes.h:56
ripple::LedgerMaster::isNewPathRequest
bool isNewPathRequest()
Definition: LedgerMaster.cpp:1656
std::function
std::shared_ptr::reset
T reset(T... args)
algorithm
ripple::PathRequests::requestsPending
bool requestsPending() const
Definition: PathRequests.cpp:223
ripple::PathRequests::mLastIdentifier
std::atomic< int > mLastIdentifier
Definition: PathRequests.h:117
ripple::PathRequests::app_
Application & app_
Definition: PathRequests.h:105
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::PathRequests::insertPathRequest
void insertPathRequest(PathRequest::pointer const &)
Definition: PathRequests.cpp:230
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::JobQueue::isStopping
bool isStopping() const
Definition: JobQueue.h:230
ripple::PathRequests::getLineCache
std::shared_ptr< RippleLineCache > getLineCache(std::shared_ptr< ReadView const > const &ledger, bool authoritative)
Get the current RippleLineCache, updating it if necessary.
Definition: PathRequests.cpp:37
ripple::LedgerMaster::newPathRequest
bool newPathRequest()
Definition: LedgerMaster.cpp:1648
ripple::PathRequests::mJournal
beast::Journal mJournal
Definition: PathRequests.h:106
std::uint32_t
std::remove_if
T remove_if(T... args)
ripple::PathRequests::mLock
std::recursive_mutex mLock
Definition: PathRequests.h:119
ripple::PathRequests::lineCache_
std::weak_ptr< RippleLineCache > lineCache_
Definition: PathRequests.h:115
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Application::journal
virtual beast::Journal journal(std::string const &name)=0
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:34
ripple::PathRequests::updateAll
void updateAll(std::shared_ptr< ReadView const > const &ledger)
Update all of the contained PathRequest instances.
Definition: PathRequests.cpp:68
beast::Journal::debug
Stream debug() const
Definition: Journal.h:314
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:165
ripple::PathRequests::requests_
std::vector< PathRequest::wptr > requests_
Definition: PathRequests.h:112
Json::Value
Represents a JSON value.
Definition: json_value.h:145