rippled
InboundLedgers.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/main/Application.h>
23 #include <ripple/app/misc/NetworkOPs.h>
24 #include <ripple/basics/DecayingSample.h>
25 #include <ripple/basics/Log.h>
26 #include <ripple/beast/container/aged_map.h>
27 #include <ripple/beast/core/LexicalCast.h>
28 #include <ripple/core/JobQueue.h>
29 #include <ripple/nodestore/DatabaseShard.h>
30 #include <ripple/protocol/jss.h>
31 #include <memory>
32 #include <mutex>
33 
34 namespace ripple {
35 
37 {
38 private:
41  // measures ledgers per second, constants are important
44 
45 public:
46  // How long before we try again to acquire the same ledger
47  static constexpr std::chrono::minutes const kReacquireInterval{5};
48 
50  Application& app,
51  clock_type& clock,
52  beast::insight::Collector::ptr const& collector,
53  std::unique_ptr<PeerSetBuilder> peerSetBuilder)
54  : app_(app)
55  , fetchRate_(clock.now())
56  , j_(app.journal("InboundLedger"))
57  , m_clock(clock)
58  , mRecentFailures(clock)
59  , mCounter(collector->make_counter("ledger_fetches"))
60  , mPeerSetBuilder(std::move(peerSetBuilder))
61  {
62  }
63 
67  uint256 const& hash,
68  std::uint32_t seq,
69  InboundLedger::Reason reason) override
70  {
71  assert(hash.isNonZero());
72  assert(
73  reason != InboundLedger::Reason::SHARD ||
74  (seq != 0 && app_.getShardStore()));
75 
76  bool isNew = true;
78  {
80  if (stopping_)
81  {
82  return {};
83  }
84  auto it = mLedgers.find(hash);
85  if (it != mLedgers.end())
86  {
87  isNew = false;
88  inbound = it->second;
89  }
90  else
91  {
92  inbound = std::make_shared<InboundLedger>(
93  app_,
94  hash,
95  seq,
96  reason,
98  mPeerSetBuilder->build());
99  mLedgers.emplace(hash, inbound);
100  inbound->init(sl);
101  ++mCounter;
102  }
103  }
104 
105  if (inbound->isFailed())
106  return {};
107 
108  if (!isNew)
109  inbound->update(seq);
110 
111  if (!inbound->isComplete())
112  return {};
113 
114  if (reason == InboundLedger::Reason::HISTORY)
115  {
116  if (inbound->getLedger()->stateMap().family().isShardBacked())
117  app_.getNodeStore().storeLedger(inbound->getLedger());
118  }
119  else if (reason == InboundLedger::Reason::SHARD)
120  {
121  auto shardStore = app_.getShardStore();
122  if (!shardStore)
123  {
124  JLOG(j_.error())
125  << "Acquiring shard with no shard store available";
126  return {};
127  }
128  if (inbound->getLedger()->stateMap().family().isShardBacked())
129  shardStore->setStored(inbound->getLedger());
130  else
131  shardStore->storeLedger(inbound->getLedger());
132  }
133  return inbound->getLedger();
134  }
135 
137  find(uint256 const& hash) override
138  {
139  assert(hash.isNonZero());
140 
142 
143  {
144  ScopedLockType sl(mLock);
145 
146  auto it = mLedgers.find(hash);
147  if (it != mLedgers.end())
148  {
149  ret = it->second;
150  }
151  }
152 
153  return ret;
154  }
155 
156  /*
157  This gets called when
158  "We got some data from an inbound ledger"
159 
160  inboundLedgerTrigger:
161  "What do we do with this partial data?"
162  Figures out what to do with the responses to our requests for information.
163 
164  */
165  // means "We got some data from an inbound ledger"
166 
167  // VFALCO TODO Remove the dependency on the Peer object.
170  bool
172  LedgerHash const& hash,
175  {
176  if (auto ledger = find(hash))
177  {
178  JLOG(j_.trace()) << "Got data (" << packet->nodes().size()
179  << ") for acquiring ledger: " << hash;
180 
181  // Stash the data for later processing and see if we need to
182  // dispatch
183  if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
185  jtLEDGER_DATA, "processLedgerData", [ledger](Job&) {
186  ledger->runData();
187  });
188 
189  return true;
190  }
191 
192  JLOG(j_.trace()) << "Got data for ledger " << hash
193  << " which we're no longer acquiring";
194 
195  // If it's state node data, stash it because it still might be
196  // useful.
197  if (packet->type() == protocol::liAS_NODE)
198  {
200  jtLEDGER_DATA, "gotStaleData", [this, packet](Job&) {
201  gotStaleData(packet);
202  });
203  }
204 
205  return false;
206  }
207 
208  void
209  logFailure(uint256 const& h, std::uint32_t seq) override
210  {
211  ScopedLockType sl(mLock);
212 
213  mRecentFailures.emplace(h, seq);
214  }
215 
216  bool
217  isFailure(uint256 const& h) override
218  {
219  ScopedLockType sl(mLock);
220 
222  return mRecentFailures.find(h) != mRecentFailures.end();
223  }
224 
231  void
233  {
234  Serializer s;
235  try
236  {
237  for (int i = 0; i < packet_ptr->nodes().size(); ++i)
238  {
239  auto const& node = packet_ptr->nodes(i);
240 
241  if (!node.has_nodeid() || !node.has_nodedata())
242  return;
243 
244  auto newNode =
245  SHAMapTreeNode::makeFromWire(makeSlice(node.nodedata()));
246 
247  if (!newNode)
248  return;
249 
250  s.erase();
251  newNode->serializeWithPrefix(s);
252 
254  newNode->getHash().as_uint256(),
255  std::make_shared<Blob>(s.begin(), s.end()));
256  }
257  }
258  catch (std::exception const&)
259  {
260  }
261  }
262 
263  void
264  clearFailures() override
265  {
266  ScopedLockType sl(mLock);
267 
268  mRecentFailures.clear();
269  mLedgers.clear();
270  }
271 
273  fetchRate() override
274  {
276  return 60 * fetchRate_.value(m_clock.now());
277  }
278 
279  // Should only be called with an inboundledger that has
280  // a reason of history or shard
281  void
282  onLedgerFetched() override
283  {
285  fetchRate_.add(1, m_clock.now());
286  }
287 
289  getInfo() override
290  {
292 
294 
295  {
296  ScopedLockType sl(mLock);
297 
298  acqs.reserve(mLedgers.size());
299  for (auto const& it : mLedgers)
300  {
301  assert(it.second);
302  acqs.push_back(it);
303  }
304  for (auto const& it : mRecentFailures)
305  {
306  if (it.second > 1)
307  ret[std::to_string(it.second)][jss::failed] = true;
308  else
309  ret[to_string(it.first)][jss::failed] = true;
310  }
311  }
312 
313  for (auto const& it : acqs)
314  {
315  // getJson is expensive, so call without the lock
316  std::uint32_t seq = it.second->getSeq();
317  if (seq > 1)
318  ret[std::to_string(seq)] = it.second->getJson(0);
319  else
320  ret[to_string(it.first)] = it.second->getJson(0);
321  }
322 
323  return ret;
324  }
325 
326  void
327  gotFetchPack() override
328  {
330  {
331  ScopedLockType sl(mLock);
332 
333  acquires.reserve(mLedgers.size());
334  for (auto const& it : mLedgers)
335  {
336  assert(it.second);
337  acquires.push_back(it.second);
338  }
339  }
340 
341  for (auto const& acquire : acquires)
342  {
343  acquire->checkLocal();
344  }
345  }
346 
347  void
348  sweep() override
349  {
350  clock_type::time_point const now(m_clock.now());
351 
352  // Make a list of things to sweep, while holding the lock
354  std::size_t total;
355  {
356  ScopedLockType sl(mLock);
357  MapType::iterator it(mLedgers.begin());
358  total = mLedgers.size();
359  stuffToSweep.reserve(total);
360 
361  while (it != mLedgers.end())
362  {
363  if (it->second->getLastAction() > now)
364  {
365  it->second->touch();
366  ++it;
367  }
368  else if (
369  (it->second->getLastAction() + std::chrono::minutes(1)) <
370  now)
371  {
372  stuffToSweep.push_back(it->second);
373  // shouldn't cause the actual final delete
374  // since we are holding a reference in the vector.
375  it = mLedgers.erase(it);
376  }
377  else
378  {
379  ++it;
380  }
381  }
382 
384  }
385 
386  JLOG(j_.debug()) << "Swept " << stuffToSweep.size() << " out of "
387  << total << " inbound ledgers.";
388  }
389 
390  void
391  stop() override
392  {
393  ScopedLockType lock(mLock);
394  stopping_ = true;
395  mLedgers.clear();
396  mRecentFailures.clear();
397  }
398 
399 private:
401 
404 
405  bool stopping_ = false;
408 
410 
412 
414 };
415 
416 //------------------------------------------------------------------------------
417 
420  Application& app,
422  beast::insight::Collector::ptr const& collector)
423 {
424  return std::make_unique<InboundLedgersImp>(
425  app, clock, collector, make_PeerSetBuilder(app));
426 }
427 
428 } // namespace ripple
ripple::InboundLedgersImp::getInfo
Json::Value getInfo() override
Definition: InboundLedgers.cpp:289
ripple::Application
Definition: Application.h:103
ripple::InboundLedgersImp::fetchRate_
DecayWindow< 30, clock_type > fetchRate_
Definition: InboundLedgers.cpp:42
ripple::InboundLedger::Reason::HISTORY
@ HISTORY
ripple::Serializer::end
Blob ::iterator end()
Definition: Serializer.h:223
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:240
std::shared_ptr< Collector >
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:516
beast::insight::Counter
A metric for measuring an integral value.
Definition: Counter.h:38
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::InboundLedgersImp::m_clock
clock_type & m_clock
Definition: InboundLedgers.cpp:400
ripple::InboundLedgersImp::app_
Application & app_
Definition: InboundLedgers.cpp:39
ripple::Serializer::erase
void erase()
Definition: Serializer.h:209
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
std::unordered_map::find
T find(T... args)
std::unordered_map::size
T size(T... args)
ripple::InboundLedgersImp::stop
void stop() override
Definition: InboundLedgers.cpp:391
ripple::InboundLedgersImp::mPeerSetBuilder
std::unique_ptr< PeerSetBuilder > mPeerSetBuilder
Definition: InboundLedgers.cpp:413
ripple::make_InboundLedgers
std::unique_ptr< InboundLedgers > make_InboundLedgers(Application &app, InboundLedgers::clock_type &clock, beast::insight::Collector::ptr const &collector)
Definition: InboundLedgers.cpp:419
ripple::InboundLedgersImp
Definition: InboundLedgers.cpp:36
std::chrono::minutes
ripple::InboundLedgersImp::kReacquireInterval
static constexpr const std::chrono::minutes kReacquireInterval
Definition: InboundLedgers.cpp:47
ripple::DecayWindow< 30, clock_type >
std::unordered_map::emplace
T emplace(T... args)
std::recursive_mutex
STL class.
std::lock_guard
STL class.
ripple::Application::getShardStore
virtual NodeStore::DatabaseShard * getShardStore()=0
ripple::InboundLedgersImp::clearFailures
void clearFailures() override
Definition: InboundLedgers.cpp:264
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:165
ripple::SHAMapTreeNode::makeFromWire
static std::shared_ptr< SHAMapTreeNode > makeFromWire(Slice rawNode)
Definition: SHAMapTreeNode.cpp:116
ripple::InboundLedgersImp::stopping_
bool stopping_
Definition: InboundLedgers.cpp:405
ripple::Serializer::begin
Blob ::iterator begin()
Definition: Serializer.h:218
ripple::jtLEDGER_DATA
@ jtLEDGER_DATA
Definition: Job.h:50
std::unordered_map::clear
T clear(T... args)
beast::abstract_clock::now
virtual time_point now() const =0
Returns the current time.
ripple::InboundLedgersImp::fetchRate
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
Definition: InboundLedgers.cpp:273
ripple::InboundLedgersImp::sweep
void sweep() override
Definition: InboundLedgers.cpp:348
ripple::InboundLedgersImp::j_
const beast::Journal j_
Definition: InboundLedgers.cpp:43
std::vector::push_back
T push_back(T... args)
ripple::base_uint< 256 >
ripple::InboundLedgersImp::InboundLedgersImp
InboundLedgersImp(Application &app, clock_type &clock, beast::insight::Collector::ptr const &collector, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
Definition: InboundLedgers.cpp:49
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::InboundLedgersImp::logFailure
void logFailure(uint256 const &h, std::uint32_t seq) override
Definition: InboundLedgers.cpp:209
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
std::unique_lock< std::recursive_mutex >
ripple::InboundLedgersImp::acquire
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
Definition: InboundLedgers.cpp:66
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
beast::Journal::error
Stream error() const
Definition: Journal.h:333
std::unordered_map::erase
T erase(T... args)
ripple::Job
Definition: Job.h:87
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
beast::expire
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
Definition: aged_container_utility.h:33
std::uint32_t
ripple::InboundLedgersImp::gotFetchPack
void gotFetchPack() override
Definition: InboundLedgers.cpp:327
beast::abstract_clock< std::chrono::steady_clock >
memory
ripple::InboundLedgersImp::mRecentFailures
beast::aged_map< uint256, std::uint32_t > mRecentFailures
Definition: InboundLedgers.cpp:409
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:33
ripple::InboundLedgersImp::onLedgerFetched
void onLedgerFetched() override
Called when a complete ledger is obtained.
Definition: InboundLedgers.cpp:282
ripple::InboundLedgersImp::fetchRateMutex_
std::mutex fetchRateMutex_
Definition: InboundLedgers.cpp:40
std::weak_ptr< Peer >
ripple::Serializer
Definition: Serializer.h:39
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::DecayWindow::value
double value(time_point now)
Definition: DecayingSample.h:129
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
beast::detail::aged_ordered_container
Associative container where each element is also indexed by time.
Definition: aged_ordered_container.h:82
ripple::InboundLedgersImp::isFailure
bool isFailure(uint256 const &h) override
Definition: InboundLedgers.cpp:217
ripple::LedgerMaster::addFetchPack
void addFetchPack(uint256 const &hash, std::shared_ptr< Blob > data)
Definition: LedgerMaster.cpp:2094
ripple::make_PeerSetBuilder
std::unique_ptr< PeerSetBuilder > make_PeerSetBuilder(Application &app)
Definition: PeerSet.cpp:144
std::unordered_map::begin
T begin(T... args)
std
STL namespace.
ripple::DecayWindow::add
void add(double value, time_point now)
Definition: DecayingSample.h:122
ripple::InboundLedgersImp::mLock
std::recursive_mutex mLock
Definition: InboundLedgers.cpp:403
ripple::InboundLedgersImp::gotStaleData
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
Definition: InboundLedgers.cpp:232
mutex
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::InboundLedgersImp::mCounter
beast::insight::Counter mCounter
Definition: InboundLedgers.cpp:411
std::size_t
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:38
std::unordered_map::end
T end(T... args)
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:46
ripple::InboundLedgersImp::gotLedgerData
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet) override
We received a TMLedgerData from a peer.
Definition: InboundLedgers.cpp:171
std::unique_ptr
STL class.
ripple::InboundLedger::Reason::SHARD
@ SHARD
std::unordered_map
STL class.
beast::abstract_clock< std::chrono::steady_clock >::time_point
typename std::chrono::steady_clock ::time_point time_point
Definition: abstract_clock.h:63
ripple::InboundLedgers::clock_type
beast::abstract_clock< std::chrono::steady_clock > clock_type
Definition: InboundLedgers.h:36
ripple::InboundLedgersImp::mLedgers
MapType mLedgers
Definition: InboundLedgers.cpp:407
ripple::InboundLedgersImp::find
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
Definition: InboundLedgers.cpp:137
std::ref
T ref(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145