rippled
InboundLedgers.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/main/Application.h>
23 #include <ripple/app/misc/NetworkOPs.h>
24 #include <ripple/basics/DecayingSample.h>
25 #include <ripple/basics/Log.h>
26 #include <ripple/beast/container/aged_map.h>
27 #include <ripple/beast/core/LexicalCast.h>
28 #include <ripple/core/JobQueue.h>
29 #include <ripple/nodestore/DatabaseShard.h>
30 #include <ripple/protocol/jss.h>
31 #include <memory>
32 #include <mutex>
33 
34 namespace ripple {
35 
37 {
38 private:
41  // measures ledgers per second, constants are important
44 
45 public:
47 
48  // How long before we try again to acquire the same ledger
50 
52  Application& app,
53  clock_type& clock,
54  Stoppable& parent,
55  beast::insight::Collector::ptr const& collector)
56  : Stoppable("InboundLedgers", parent)
57  , app_(app)
58  , fetchRate_(clock.now())
59  , j_(app.journal("InboundLedger"))
60  , m_clock(clock)
61  , mRecentFailures(clock)
62  , mCounter(collector->make_counter("ledger_fetches"))
63  {
64  }
65 
69  uint256 const& hash,
70  std::uint32_t seq,
71  InboundLedger::Reason reason) override
72  {
73  assert(hash.isNonZero());
74  assert(
75  reason != InboundLedger::Reason::SHARD ||
76  (seq != 0 && app_.getShardStore()));
77  if (isStopping())
78  return {};
79 
80  bool isNew = true;
82  {
84  auto it = mLedgers.find(hash);
85  if (it != mLedgers.end())
86  {
87  isNew = false;
88  inbound = it->second;
89  }
90  else
91  {
92  inbound = std::make_shared<InboundLedger>(
93  app_, hash, seq, reason, std::ref(m_clock));
94  mLedgers.emplace(hash, inbound);
95  inbound->init(sl);
96  ++mCounter;
97  }
98  }
99 
100  if (inbound->isFailed())
101  return {};
102 
103  if (!isNew)
104  inbound->update(seq);
105 
106  if (!inbound->isComplete())
107  return {};
108 
109  if (reason == InboundLedger::Reason::HISTORY)
110  {
111  if (inbound->getLedger()->stateMap().family().isShardBacked())
112  app_.getNodeStore().storeLedger(inbound->getLedger());
113  }
114  else if (reason == InboundLedger::Reason::SHARD)
115  {
116  auto shardStore = app_.getShardStore();
117  if (!shardStore)
118  {
119  JLOG(j_.error())
120  << "Acquiring shard with no shard store available";
121  return {};
122  }
123  if (inbound->getLedger()->stateMap().family().isShardBacked())
124  shardStore->setStored(inbound->getLedger());
125  else
126  shardStore->storeLedger(inbound->getLedger());
127  }
128  return inbound->getLedger();
129  }
130 
132  find(uint256 const& hash) override
133  {
134  assert(hash.isNonZero());
135 
137 
138  {
139  ScopedLockType sl(mLock);
140 
141  auto it = mLedgers.find(hash);
142  if (it != mLedgers.end())
143  {
144  ret = it->second;
145  }
146  }
147 
148  return ret;
149  }
150 
151  /*
152  This gets called when
153  "We got some data from an inbound ledger"
154 
155  inboundLedgerTrigger:
156  "What do we do with this partial data?"
157  Figures out what to do with the responses to our requests for information.
158 
159  */
160  // means "We got some data from an inbound ledger"
161 
162  // VFALCO TODO Remove the dependency on the Peer object.
165  bool
167  LedgerHash const& hash,
169  std::shared_ptr<protocol::TMLedgerData> packet_ptr) override
170  {
171  protocol::TMLedgerData& packet = *packet_ptr;
172 
173  JLOG(j_.trace()) << "Got data (" << packet.nodes().size()
174  << ") for acquiring ledger: " << hash;
175 
176  auto ledger = find(hash);
177 
178  if (!ledger)
179  {
180  JLOG(j_.trace()) << "Got data for ledger we're no longer acquiring";
181 
182  // If it's state node data, stash it because it still might be
183  // useful.
184  if (packet.type() == protocol::liAS_NODE)
185  {
187  jtLEDGER_DATA, "gotStaleData", [this, packet_ptr](Job&) {
188  gotStaleData(packet_ptr);
189  });
190  }
191 
192  return false;
193  }
194 
195  // Stash the data for later processing and see if we need to dispatch
196  if (ledger->gotData(std::weak_ptr<Peer>(peer), packet_ptr))
198  jtLEDGER_DATA, "processLedgerData", [this, hash](Job&) {
199  doLedgerData(hash);
200  });
201 
202  return true;
203  }
204 
205  void
206  logFailure(uint256 const& h, std::uint32_t seq) override
207  {
208  ScopedLockType sl(mLock);
209 
210  mRecentFailures.emplace(h, seq);
211  }
212 
213  bool
214  isFailure(uint256 const& h) override
215  {
216  ScopedLockType sl(mLock);
217 
219  return mRecentFailures.find(h) != mRecentFailures.end();
220  }
221 
223  void
225  {
226  if (auto ledger = find(hash))
227  ledger->runData();
228  }
229 
236  void
238  {
239  const uint256 uZero;
240  Serializer s;
241  try
242  {
243  for (int i = 0; i < packet_ptr->nodes().size(); ++i)
244  {
245  auto const& node = packet_ptr->nodes(i);
246 
247  if (!node.has_nodeid() || !node.has_nodedata())
248  return;
249 
250  auto id_string = node.nodeid();
251  auto newNode = SHAMapAbstractNode::make(
252  makeSlice(node.nodedata()),
253  0,
254  snfWIRE,
255  SHAMapHash{uZero},
256  false,
257  app_.journal("SHAMapNodeID"),
258  SHAMapNodeID(id_string.data(), id_string.size()));
259 
260  if (!newNode)
261  return;
262 
263  s.erase();
264  newNode->addRaw(s, snfPREFIX);
265 
266  auto blob = std::make_shared<Blob>(s.begin(), s.end());
267 
269  newNode->getNodeHash().as_uint256(), blob);
270  }
271  }
272  catch (std::exception const&)
273  {
274  }
275  }
276 
277  void
278  clearFailures() override
279  {
280  ScopedLockType sl(mLock);
281 
282  mRecentFailures.clear();
283  mLedgers.clear();
284  }
285 
287  fetchRate() override
288  {
290  return 60 * fetchRate_.value(m_clock.now());
291  }
292 
293  // Should only be called with an inboundledger that has
294  // a reason of history or shard
295  void
296  onLedgerFetched() override
297  {
299  fetchRate_.add(1, m_clock.now());
300  }
301 
303  getInfo() override
304  {
306 
308  {
309  ScopedLockType sl(mLock);
310 
311  acquires.reserve(mLedgers.size());
312  for (auto const& it : mLedgers)
313  {
314  assert(it.second);
315  acquires.push_back(it);
316  }
317  for (auto const& it : mRecentFailures)
318  {
319  if (it.second > 1)
320  ret[beast::lexicalCastThrow<std::string>(it.second)]
321  [jss::failed] = true;
322  else
323  ret[to_string(it.first)][jss::failed] = true;
324  }
325  }
326 
327  for (auto const& it : acquires)
328  {
329  // getJson is expensive, so call without the lock
330  std::uint32_t seq = it.second->getSeq();
331  if (seq > 1)
332  ret[std::to_string(seq)] = it.second->getJson(0);
333  else
334  ret[to_string(it.first)] = it.second->getJson(0);
335  }
336 
337  return ret;
338  }
339 
340  void
341  gotFetchPack() override
342  {
344  {
345  ScopedLockType sl(mLock);
346 
347  acquires.reserve(mLedgers.size());
348  for (auto const& it : mLedgers)
349  {
350  assert(it.second);
351  acquires.push_back(it.second);
352  }
353  }
354 
355  for (auto const& acquire : acquires)
356  {
357  acquire->checkLocal();
358  }
359  }
360 
361  void
362  sweep() override
363  {
364  clock_type::time_point const now(m_clock.now());
365 
366  // Make a list of things to sweep, while holding the lock
368  std::size_t total;
369  {
370  ScopedLockType sl(mLock);
371  MapType::iterator it(mLedgers.begin());
372  total = mLedgers.size();
373  stuffToSweep.reserve(total);
374 
375  while (it != mLedgers.end())
376  {
377  if (it->second->getLastAction() > now)
378  {
379  it->second->touch();
380  ++it;
381  }
382  else if (
383  (it->second->getLastAction() + std::chrono::minutes(1)) <
384  now)
385  {
386  stuffToSweep.push_back(it->second);
387  // shouldn't cause the actual final delete
388  // since we are holding a reference in the vector.
389  it = mLedgers.erase(it);
390  }
391  else
392  {
393  ++it;
394  }
395  }
396 
398  }
399 
400  JLOG(j_.debug()) << "Swept " << stuffToSweep.size() << " out of "
401  << total << " inbound ledgers.";
402  }
403 
404  void
405  onStop() override
406  {
407  ScopedLockType lock(mLock);
408 
409  mLedgers.clear();
410  mRecentFailures.clear();
411 
412  stopped();
413  }
414 
415 private:
417 
420 
423 
425 
427 };
428 
429 //------------------------------------------------------------------------------
430 
433 
435 
438  Application& app,
440  Stoppable& parent,
441  beast::insight::Collector::ptr const& collector)
442 {
443  return std::make_unique<InboundLedgersImp>(app, clock, parent, collector);
444 }
445 
446 } // namespace ripple
ripple::InboundLedgersImp::getInfo
Json::Value getInfo() override
Definition: InboundLedgers.cpp:303
ripple::Application
Definition: Application.h:97
ripple::InboundLedgersImp::fetchRate_
DecayWindow< 30, clock_type > fetchRate_
Definition: InboundLedgers.cpp:42
ripple::InboundLedger::Reason::HISTORY
@ HISTORY
ripple::Serializer::end
Blob ::iterator end()
Definition: Serializer.h:221
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:194
std::shared_ptr< Collector >
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:480
ripple::Stoppable::stopped
void stopped()
Called by derived classes to indicate that the stoppable has stopped.
Definition: Stoppable.cpp:72
beast::insight::Counter
A metric for measuring an integral value.
Definition: Counter.h:38
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::InboundLedgersImp::m_clock
clock_type & m_clock
Definition: InboundLedgers.cpp:416
ripple::SHAMapAbstractNode::make
static std::shared_ptr< SHAMapAbstractNode > make(Slice const &rawNode, std::uint32_t seq, SHANodeFormat format, SHAMapHash const &hash, bool hashValid, beast::Journal j, SHAMapNodeID const &id=SHAMapNodeID{})
Definition: SHAMapTreeNode.cpp:79
ripple::InboundLedgersImp::app_
Application & app_
Definition: InboundLedgers.cpp:39
ripple::Serializer::erase
void erase()
Definition: Serializer.h:207
std::pair
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
std::unordered_map::find
T find(T... args)
std::unordered_map::size
T size(T... args)
ripple::snfPREFIX
@ snfPREFIX
Definition: SHAMapTreeNode.h:36
ripple::InboundLedgersImp
Definition: InboundLedgers.cpp:36
std::chrono::minutes
ripple::DecayWindow< 30, clock_type >
std::unordered_map::emplace
T emplace(T... args)
std::recursive_mutex
STL class.
std::lock_guard
STL class.
ripple::Application::getShardStore
virtual NodeStore::DatabaseShard * getShardStore()=0
ripple::InboundLedgersImp::clearFailures
void clearFailures() override
Definition: InboundLedgers.cpp:278
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::to_string
std::string to_string(ListDisposition disposition)
Definition: ValidatorList.cpp:41
ripple::SHAMapNodeID
Definition: SHAMapNodeID.h:33
ripple::Serializer::begin
Blob ::iterator begin()
Definition: Serializer.h:216
ripple::SHAMapHash
Definition: SHAMapTreeNode.h:43
ripple::jtLEDGER_DATA
@ jtLEDGER_DATA
Definition: Job.h:47
ripple::make_InboundLedgers
std::unique_ptr< InboundLedgers > make_InboundLedgers(Application &app, InboundLedgers::clock_type &clock, Stoppable &parent, beast::insight::Collector::ptr const &collector)
Definition: InboundLedgers.cpp:437
std::unordered_map::clear
T clear(T... args)
beast::abstract_clock::now
virtual time_point now() const =0
Returns the current time.
ripple::InboundLedgersImp::fetchRate
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
Definition: InboundLedgers.cpp:287
ripple::InboundLedgersImp::sweep
void sweep() override
Definition: InboundLedgers.cpp:362
ripple::InboundLedgersImp::j_
const beast::Journal j_
Definition: InboundLedgers.cpp:43
std::vector::push_back
T push_back(T... args)
ripple::base_uint< 256 >
ripple::Stoppable
Provides an interface for starting and stopping.
Definition: Stoppable.h:200
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::InboundLedgersImp::logFailure
void logFailure(uint256 const &h, std::uint32_t seq) override
Definition: InboundLedgers.cpp:206
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
std::unique_lock
STL class.
ripple::InboundLedgersImp::acquire
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
Definition: InboundLedgers.cpp:68
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
beast::Journal::error
Stream error() const
Definition: Journal.h:333
std::unordered_map::erase
T erase(T... args)
ripple::Job
Definition: Job.h:82
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
beast::expire
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
Definition: aged_container_utility.h:33
std::uint32_t
ripple::InboundLedgersImp::gotFetchPack
void gotFetchPack() override
Definition: InboundLedgers.cpp:341
beast::abstract_clock< std::chrono::steady_clock >
memory
ripple::InboundLedgersImp::mRecentFailures
beast::aged_map< uint256, std::uint32_t > mRecentFailures
Definition: InboundLedgers.cpp:424
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:34
ripple::InboundLedgersImp::onLedgerFetched
void onLedgerFetched() override
Called when a complete ledger is obtained.
Definition: InboundLedgers.cpp:296
ripple::InboundLedgersImp::fetchRateMutex_
std::mutex fetchRateMutex_
Definition: InboundLedgers.cpp:40
std::weak_ptr
STL class.
ripple::Serializer
Definition: Serializer.h:39
ripple::InboundLedgersImp::gotLedgerData
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We received a TMLedgerData from a peer.
Definition: InboundLedgers.cpp:166
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::DecayWindow::value
double value(time_point now)
Definition: DecayingSample.h:129
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Copies a ledger stored in a different database to this one.
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::Application::journal
virtual beast::Journal journal(std::string const &name)=0
beast::detail::aged_ordered_container
Associative container where each element is also indexed by time.
Definition: aged_ordered_container.h:82
ripple::InboundLedgersImp::isFailure
bool isFailure(uint256 const &h) override
Definition: InboundLedgers.cpp:214
std::unordered_map::begin
T begin(T... args)
ripple::DecayWindow::add
void add(double value, time_point now)
Definition: DecayingSample.h:122
ripple::snfWIRE
@ snfWIRE
Definition: SHAMapTreeNode.h:37
ripple::InboundLedgersImp::mLock
std::recursive_mutex mLock
Definition: InboundLedgers.cpp:419
ripple::InboundLedgersImp::gotStaleData
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
Definition: InboundLedgers.cpp:237
mutex
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::InboundLedgersImp::mCounter
beast::insight::Counter mCounter
Definition: InboundLedgers.cpp:426
std::size_t
std::unordered_map::end
T end(T... args)
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:51
ripple::InboundLedgersImp::doLedgerData
void doLedgerData(LedgerHash hash)
Called (indirectly) only by gotLedgerData().
Definition: InboundLedgers.cpp:224
std::unique_ptr
STL class.
ripple::InboundLedger::Reason::SHARD
@ SHARD
std::unordered_map
STL class.
beast::abstract_clock< std::chrono::steady_clock >::time_point
typename std::chrono::steady_clock ::time_point time_point
Definition: abstract_clock.h:63
ripple::InboundLedgersImp::InboundLedgersImp
InboundLedgersImp(Application &app, clock_type &clock, Stoppable &parent, beast::insight::Collector::ptr const &collector)
Definition: InboundLedgers.cpp:51
ripple::InboundLedgers::clock_type
beast::abstract_clock< std::chrono::steady_clock > clock_type
Definition: InboundLedgers.h:37
ripple::InboundLedgersImp::mLedgers
MapType mLedgers
Definition: InboundLedgers.cpp:422
ripple::InboundLedgersImp::onStop
void onStop() override
Definition: InboundLedgers.cpp:405
ripple::LedgerMaster::addFetchPack
void addFetchPack(uint256 const &hash, std::shared_ptr< Blob > &data)
Definition: LedgerMaster.cpp:1951
ripple::InboundLedgersImp::find
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
Definition: InboundLedgers.cpp:132
std::ref
T ref(T... args)
ripple::InboundLedgersImp::kReacquireInterval
static const std::chrono::minutes kReacquireInterval
Definition: InboundLedgers.cpp:49
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::InboundLedgers::~InboundLedgers
virtual ~InboundLedgers()=0
ripple::Stoppable::isStopping
bool isStopping() const
Returns true if the stoppable should stop.
Definition: Stoppable.cpp:54