rippled
InboundLedgers.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/main/Application.h>
23 #include <ripple/app/misc/NetworkOPs.h>
24 #include <ripple/basics/DecayingSample.h>
25 #include <ripple/basics/Log.h>
26 #include <ripple/beast/container/aged_map.h>
27 #include <ripple/beast/core/LexicalCast.h>
28 #include <ripple/core/JobQueue.h>
29 #include <ripple/nodestore/DatabaseShard.h>
30 #include <ripple/protocol/jss.h>
31 #include <memory>
32 #include <mutex>
33 
34 namespace ripple {
35 
37 {
38 private:
41  // measures ledgers per second, constants are important
44 
45 public:
47 
48  // How long before we try again to acquire the same ledger
50 
52  Application& app,
53  clock_type& clock,
54  Stoppable& parent,
55  beast::insight::Collector::ptr const& collector)
56  : Stoppable("InboundLedgers", parent)
57  , app_(app)
58  , fetchRate_(clock.now())
59  , j_(app.journal("InboundLedger"))
60  , m_clock(clock)
61  , mRecentFailures(clock)
62  , mCounter(collector->make_counter("ledger_fetches"))
63  {
64  }
65 
69  uint256 const& hash,
70  std::uint32_t seq,
71  InboundLedger::Reason reason) override
72  {
73  assert(hash.isNonZero());
74  assert(
75  reason != InboundLedger::Reason::SHARD ||
76  (seq != 0 && app_.getShardStore()));
77  if (isStopping())
78  return {};
79 
80  bool isNew = true;
82  {
84  auto it = mLedgers.find(hash);
85  if (it != mLedgers.end())
86  {
87  isNew = false;
88  inbound = it->second;
89  }
90  else
91  {
92  inbound = std::make_shared<InboundLedger>(
93  app_, hash, seq, reason, std::ref(m_clock));
94  mLedgers.emplace(hash, inbound);
95  inbound->init(sl);
96  ++mCounter;
97  }
98  }
99 
100  if (inbound->isFailed())
101  return {};
102 
103  if (!isNew)
104  inbound->update(seq);
105 
106  if (!inbound->isComplete())
107  return {};
108 
109  if (reason == InboundLedger::Reason::HISTORY)
110  {
111  if (inbound->getLedger()->stateMap().family().isShardBacked())
112  app_.getNodeStore().storeLedger(inbound->getLedger());
113  }
114  else if (reason == InboundLedger::Reason::SHARD)
115  {
116  auto shardStore = app_.getShardStore();
117  if (!shardStore)
118  {
119  JLOG(j_.error())
120  << "Acquiring shard with no shard store available";
121  return {};
122  }
123  if (inbound->getLedger()->stateMap().family().isShardBacked())
124  shardStore->setStored(inbound->getLedger());
125  else
126  shardStore->storeLedger(inbound->getLedger());
127  }
128  return inbound->getLedger();
129  }
130 
132  find(uint256 const& hash) override
133  {
134  assert(hash.isNonZero());
135 
137 
138  {
139  ScopedLockType sl(mLock);
140 
141  auto it = mLedgers.find(hash);
142  if (it != mLedgers.end())
143  {
144  ret = it->second;
145  }
146  }
147 
148  return ret;
149  }
150 
151  /*
152  This gets called when
153  "We got some data from an inbound ledger"
154 
155  inboundLedgerTrigger:
156  "What do we do with this partial data?"
157  Figures out what to do with the responses to our requests for information.
158 
159  */
160  // means "We got some data from an inbound ledger"
161 
162  // VFALCO TODO Remove the dependency on the Peer object.
165  bool
167  LedgerHash const& hash,
169  std::shared_ptr<protocol::TMLedgerData> packet_ptr) override
170  {
171  protocol::TMLedgerData& packet = *packet_ptr;
172 
173  JLOG(j_.trace()) << "Got data (" << packet.nodes().size()
174  << ") for acquiring ledger: " << hash;
175 
176  auto ledger = find(hash);
177 
178  if (!ledger)
179  {
180  JLOG(j_.trace()) << "Got data for ledger we're no longer acquiring";
181 
182  // If it's state node data, stash it because it still might be
183  // useful.
184  if (packet.type() == protocol::liAS_NODE)
185  {
187  jtLEDGER_DATA, "gotStaleData", [this, packet_ptr](Job&) {
188  gotStaleData(packet_ptr);
189  });
190  }
191 
192  return false;
193  }
194 
195  // Stash the data for later processing and see if we need to dispatch
196  if (ledger->gotData(std::weak_ptr<Peer>(peer), packet_ptr))
198  jtLEDGER_DATA, "processLedgerData", [this, hash](Job&) {
199  doLedgerData(hash);
200  });
201 
202  return true;
203  }
204 
205  void
206  logFailure(uint256 const& h, std::uint32_t seq) override
207  {
208  ScopedLockType sl(mLock);
209 
210  mRecentFailures.emplace(h, seq);
211  }
212 
213  bool
214  isFailure(uint256 const& h) override
215  {
216  ScopedLockType sl(mLock);
217 
219  return mRecentFailures.find(h) != mRecentFailures.end();
220  }
221 
223  void
225  {
226  if (auto ledger = find(hash))
227  ledger->runData();
228  }
229 
236  void
238  {
239  const uint256 uZero;
240  Serializer s;
241  try
242  {
243  for (int i = 0; i < packet_ptr->nodes().size(); ++i)
244  {
245  auto const& node = packet_ptr->nodes(i);
246 
247  if (!node.has_nodeid() || !node.has_nodedata())
248  return;
249 
250  auto newNode = SHAMapAbstractNode::makeFromWire(
251  makeSlice(node.nodedata()));
252 
253  if (!newNode)
254  return;
255 
256  s.erase();
257  newNode->addRaw(s, snfPREFIX);
258 
260  newNode->getNodeHash().as_uint256(),
261  std::make_shared<Blob>(s.begin(), s.end()));
262  }
263  }
264  catch (std::exception const&)
265  {
266  }
267  }
268 
269  void
270  clearFailures() override
271  {
272  ScopedLockType sl(mLock);
273 
274  mRecentFailures.clear();
275  mLedgers.clear();
276  }
277 
279  fetchRate() override
280  {
282  return 60 * fetchRate_.value(m_clock.now());
283  }
284 
285  // Should only be called with an inboundledger that has
286  // a reason of history or shard
287  void
288  onLedgerFetched() override
289  {
291  fetchRate_.add(1, m_clock.now());
292  }
293 
295  getInfo() override
296  {
298 
300  {
301  ScopedLockType sl(mLock);
302 
303  acquires.reserve(mLedgers.size());
304  for (auto const& it : mLedgers)
305  {
306  assert(it.second);
307  acquires.push_back(it);
308  }
309  for (auto const& it : mRecentFailures)
310  {
311  if (it.second > 1)
312  ret[beast::lexicalCastThrow<std::string>(it.second)]
313  [jss::failed] = true;
314  else
315  ret[to_string(it.first)][jss::failed] = true;
316  }
317  }
318 
319  for (auto const& it : acquires)
320  {
321  // getJson is expensive, so call without the lock
322  std::uint32_t seq = it.second->getSeq();
323  if (seq > 1)
324  ret[std::to_string(seq)] = it.second->getJson(0);
325  else
326  ret[to_string(it.first)] = it.second->getJson(0);
327  }
328 
329  return ret;
330  }
331 
332  void
333  gotFetchPack() override
334  {
336  {
337  ScopedLockType sl(mLock);
338 
339  acquires.reserve(mLedgers.size());
340  for (auto const& it : mLedgers)
341  {
342  assert(it.second);
343  acquires.push_back(it.second);
344  }
345  }
346 
347  for (auto const& acquire : acquires)
348  {
349  acquire->checkLocal();
350  }
351  }
352 
353  void
354  sweep() override
355  {
356  clock_type::time_point const now(m_clock.now());
357 
358  // Make a list of things to sweep, while holding the lock
360  std::size_t total;
361  {
362  ScopedLockType sl(mLock);
363  MapType::iterator it(mLedgers.begin());
364  total = mLedgers.size();
365  stuffToSweep.reserve(total);
366 
367  while (it != mLedgers.end())
368  {
369  if (it->second->getLastAction() > now)
370  {
371  it->second->touch();
372  ++it;
373  }
374  else if (
375  (it->second->getLastAction() + std::chrono::minutes(1)) <
376  now)
377  {
378  stuffToSweep.push_back(it->second);
379  // shouldn't cause the actual final delete
380  // since we are holding a reference in the vector.
381  it = mLedgers.erase(it);
382  }
383  else
384  {
385  ++it;
386  }
387  }
388 
390  }
391 
392  JLOG(j_.debug()) << "Swept " << stuffToSweep.size() << " out of "
393  << total << " inbound ledgers.";
394  }
395 
396  void
397  onStop() override
398  {
399  ScopedLockType lock(mLock);
400 
401  mLedgers.clear();
402  mRecentFailures.clear();
403 
404  stopped();
405  }
406 
407 private:
409 
412 
415 
417 
419 };
420 
421 //------------------------------------------------------------------------------
422 
425 
427 
430  Application& app,
432  Stoppable& parent,
433  beast::insight::Collector::ptr const& collector)
434 {
435  return std::make_unique<InboundLedgersImp>(app, clock, parent, collector);
436 }
437 
438 } // namespace ripple
ripple::InboundLedgersImp::getInfo
Json::Value getInfo() override
Definition: InboundLedgers.cpp:295
ripple::Application
Definition: Application.h:97
ripple::InboundLedgersImp::fetchRate_
DecayWindow< 30, clock_type > fetchRate_
Definition: InboundLedgers.cpp:42
ripple::InboundLedger::Reason::HISTORY
@ HISTORY
ripple::Serializer::end
Blob ::iterator end()
Definition: Serializer.h:221
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:240
std::shared_ptr< Collector >
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:480
ripple::Stoppable::stopped
void stopped()
Called by derived classes to indicate that the stoppable has stopped.
Definition: Stoppable.cpp:72
beast::insight::Counter
A metric for measuring an integral value.
Definition: Counter.h:38
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::InboundLedgersImp::m_clock
clock_type & m_clock
Definition: InboundLedgers.cpp:408
ripple::InboundLedgersImp::app_
Application & app_
Definition: InboundLedgers.cpp:39
ripple::Serializer::erase
void erase()
Definition: Serializer.h:207
std::pair
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
std::unordered_map::find
T find(T... args)
std::unordered_map::size
T size(T... args)
ripple::snfPREFIX
@ snfPREFIX
Definition: SHAMapTreeNode.h:36
ripple::InboundLedgersImp
Definition: InboundLedgers.cpp:36
std::chrono::minutes
ripple::DecayWindow< 30, clock_type >
std::unordered_map::emplace
T emplace(T... args)
std::recursive_mutex
STL class.
std::lock_guard
STL class.
ripple::Application::getShardStore
virtual NodeStore::DatabaseShard * getShardStore()=0
ripple::InboundLedgersImp::clearFailures
void clearFailures() override
Definition: InboundLedgers.cpp:270
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::to_string
std::string to_string(ListDisposition disposition)
Definition: ValidatorList.cpp:42
ripple::Serializer::begin
Blob ::iterator begin()
Definition: Serializer.h:216
ripple::jtLEDGER_DATA
@ jtLEDGER_DATA
Definition: Job.h:47
ripple::make_InboundLedgers
std::unique_ptr< InboundLedgers > make_InboundLedgers(Application &app, InboundLedgers::clock_type &clock, Stoppable &parent, beast::insight::Collector::ptr const &collector)
Definition: InboundLedgers.cpp:429
std::unordered_map::clear
T clear(T... args)
beast::abstract_clock::now
virtual time_point now() const =0
Returns the current time.
ripple::InboundLedgersImp::fetchRate
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
Definition: InboundLedgers.cpp:279
ripple::InboundLedgersImp::sweep
void sweep() override
Definition: InboundLedgers.cpp:354
ripple::InboundLedgersImp::j_
const beast::Journal j_
Definition: InboundLedgers.cpp:43
std::vector::push_back
T push_back(T... args)
ripple::base_uint< 256 >
ripple::Stoppable
Provides an interface for starting and stopping.
Definition: Stoppable.h:200
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::InboundLedgersImp::logFailure
void logFailure(uint256 const &h, std::uint32_t seq) override
Definition: InboundLedgers.cpp:206
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
std::unique_lock< std::recursive_mutex >
ripple::InboundLedgersImp::acquire
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
Definition: InboundLedgers.cpp:68
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
beast::Journal::error
Stream error() const
Definition: Journal.h:333
std::unordered_map::erase
T erase(T... args)
ripple::Job
Definition: Job.h:82
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
beast::expire
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
Definition: aged_container_utility.h:33
std::uint32_t
ripple::InboundLedgersImp::gotFetchPack
void gotFetchPack() override
Definition: InboundLedgers.cpp:333
beast::abstract_clock< std::chrono::steady_clock >
memory
ripple::InboundLedgersImp::mRecentFailures
beast::aged_map< uint256, std::uint32_t > mRecentFailures
Definition: InboundLedgers.cpp:416
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:34
ripple::InboundLedgersImp::onLedgerFetched
void onLedgerFetched() override
Called when a complete ledger is obtained.
Definition: InboundLedgers.cpp:288
ripple::InboundLedgersImp::fetchRateMutex_
std::mutex fetchRateMutex_
Definition: InboundLedgers.cpp:40
std::weak_ptr
STL class.
ripple::Serializer
Definition: Serializer.h:39
ripple::InboundLedgersImp::gotLedgerData
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We received a TMLedgerData from a peer.
Definition: InboundLedgers.cpp:166
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::DecayWindow::value
double value(time_point now)
Definition: DecayingSample.h:129
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Copies a ledger stored in a different database to this one.
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::SHAMapAbstractNode::makeFromWire
static std::shared_ptr< SHAMapAbstractNode > makeFromWire(Slice rawNode)
Definition: SHAMapTreeNode.cpp:224
beast::detail::aged_ordered_container
Associative container where each element is also indexed by time.
Definition: aged_ordered_container.h:82
ripple::InboundLedgersImp::isFailure
bool isFailure(uint256 const &h) override
Definition: InboundLedgers.cpp:214
ripple::LedgerMaster::addFetchPack
void addFetchPack(uint256 const &hash, std::shared_ptr< Blob > data)
Definition: LedgerMaster.cpp:2007
std::unordered_map::begin
T begin(T... args)
ripple::DecayWindow::add
void add(double value, time_point now)
Definition: DecayingSample.h:122
ripple::InboundLedgersImp::mLock
std::recursive_mutex mLock
Definition: InboundLedgers.cpp:411
ripple::InboundLedgersImp::gotStaleData
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
Definition: InboundLedgers.cpp:237
mutex
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::InboundLedgersImp::mCounter
beast::insight::Counter mCounter
Definition: InboundLedgers.cpp:418
std::size_t
std::unordered_map::end
T end(T... args)
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:51
ripple::InboundLedgersImp::doLedgerData
void doLedgerData(LedgerHash hash)
Called (indirectly) only by gotLedgerData().
Definition: InboundLedgers.cpp:224
std::unique_ptr
STL class.
ripple::InboundLedger::Reason::SHARD
@ SHARD
std::unordered_map
STL class.
beast::abstract_clock< std::chrono::steady_clock >::time_point
typename std::chrono::steady_clock ::time_point time_point
Definition: abstract_clock.h:63
ripple::InboundLedgersImp::InboundLedgersImp
InboundLedgersImp(Application &app, clock_type &clock, Stoppable &parent, beast::insight::Collector::ptr const &collector)
Definition: InboundLedgers.cpp:51
ripple::InboundLedgers::clock_type
beast::abstract_clock< std::chrono::steady_clock > clock_type
Definition: InboundLedgers.h:37
ripple::InboundLedgersImp::mLedgers
MapType mLedgers
Definition: InboundLedgers.cpp:414
ripple::InboundLedgersImp::onStop
void onStop() override
Definition: InboundLedgers.cpp:397
ripple::InboundLedgersImp::find
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
Definition: InboundLedgers.cpp:132
std::ref
T ref(T... args)
ripple::InboundLedgersImp::kReacquireInterval
static const std::chrono::minutes kReacquireInterval
Definition: InboundLedgers.cpp:49
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::InboundLedgers::~InboundLedgers
virtual ~InboundLedgers()=0
ripple::Stoppable::isStopping
bool isStopping() const
Returns true if the stoppable should stop.
Definition: Stoppable.cpp:54