rippled
LedgerReplayer.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/LedgerReplayer.h>
21 #include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
22 #include <ripple/app/ledger/impl/SkipListAcquire.h>
23 #include <ripple/core/JobQueue.h>
24 
25 namespace ripple {
26 
28  Application& app,
29  InboundLedgers& inboundLedgers,
30  std::unique_ptr<PeerSetBuilder> peerSetBuilder,
31  Stoppable& parent)
32  : Stoppable("LedgerReplayer", parent)
33  , app_(app)
34  , inboundLedgers_(inboundLedgers)
35  , peerSetBuilder_(std::move(peerSetBuilder))
36  , j_(app.journal("LedgerReplayer"))
37 {
38 }
39 
41 {
43  tasks_.clear();
44 }
45 
46 void
49  uint256 const& finishLedgerHash,
50  std::uint32_t totalNumLedgers)
51 {
52  assert(
53  finishLedgerHash.isNonZero() && totalNumLedgers > 0 &&
54  totalNumLedgers <= LedgerReplayParameters::MAX_TASK_SIZE);
55 
57  r, finishLedgerHash, totalNumLedgers);
58 
61  bool newSkipList = false;
62  {
64  if (isStopping())
65  return;
67  {
68  JLOG(j_.info()) << "Too many replay tasks, dropping new task "
69  << parameter.finishHash_;
70  return;
71  }
72 
73  for (auto const& t : tasks_)
74  {
75  if (parameter.canMergeInto(t->getTaskParameter()))
76  {
77  JLOG(j_.info()) << "Task " << parameter.finishHash_ << " with "
78  << totalNumLedgers
79  << " ledgers merged into an existing task.";
80  return;
81  }
82  }
83  JLOG(j_.info()) << "Replay " << totalNumLedgers
84  << " ledgers. Finish ledger hash "
85  << parameter.finishHash_;
86 
87  auto i = skipLists_.find(parameter.finishHash_);
88  if (i != skipLists_.end())
89  skipList = i->second.lock();
90 
91  if (!skipList) // cannot find, or found but cannot lock
92  {
93  skipList = std::make_shared<SkipListAcquire>(
94  app_,
96  parameter.finishHash_,
97  peerSetBuilder_->build());
98  skipLists_[parameter.finishHash_] = skipList;
99  newSkipList = true;
100  }
101 
102  task = std::make_shared<LedgerReplayTask>(
103  app_, inboundLedgers_, *this, skipList, std::move(parameter));
104  tasks_.push_back(task);
105  }
106 
107  if (newSkipList)
108  skipList->init(1);
109  // task init after skipList init, could save a timeout
110  task->init();
111 }
112 
113 void
115 {
116  {
117  // TODO for use cases like Consensus (i.e. totalLedgers = 1 or small):
118  // check if the last closed or validated ledger l the local node has
119  // is in the skip list and is an ancestor of parameter.startLedger
120  // that has to be downloaded, if so expand the task to start with l.
121  }
122 
123  auto const& parameter = task->getTaskParameter();
124  JLOG(j_.trace()) << "Creating " << parameter.totalLedgers_ - 1 << " deltas";
125  if (parameter.totalLedgers_ > 1)
126  {
127  auto skipListItem = std::find(
128  parameter.skipList_.begin(),
129  parameter.skipList_.end(),
130  parameter.startHash_);
131  if (skipListItem == parameter.skipList_.end() ||
132  ++skipListItem == parameter.skipList_.end())
133  {
134  JLOG(j_.error()) << "Task parameter error when creating deltas "
135  << parameter.finishHash_;
136  return;
137  }
138 
139  for (std::uint32_t seq = parameter.startSeq_ + 1;
140  seq <= parameter.finishSeq_ &&
141  skipListItem != parameter.skipList_.end();
142  ++seq, ++skipListItem)
143  {
145  bool newDelta = false;
146  {
148  if (isStopping())
149  return;
150  auto i = deltas_.find(*skipListItem);
151  if (i != deltas_.end())
152  delta = i->second.lock();
153 
154  if (!delta) // cannot find, or found but cannot lock
155  {
156  delta = std::make_shared<LedgerDeltaAcquire>(
157  app_,
159  *skipListItem,
160  seq,
161  peerSetBuilder_->build());
162  deltas_[*skipListItem] = delta;
163  newDelta = true;
164  }
165  }
166 
167  task->addDelta(delta);
168  if (newDelta)
169  delta->init(1);
170  }
171  }
172 }
173 
174 void
176  LedgerInfo const& info,
178 {
179  std::shared_ptr<SkipListAcquire> skipList = {};
180  {
182  auto i = skipLists_.find(info.hash);
183  if (i == skipLists_.end())
184  return;
185  skipList = i->second.lock();
186  if (!skipList)
187  {
188  skipLists_.erase(i);
189  return;
190  }
191  }
192 
193  if (skipList)
194  skipList->processData(info.seq, item);
195 }
196 
197 void
199  LedgerInfo const& info,
201 {
203  {
205  auto i = deltas_.find(info.hash);
206  if (i == deltas_.end())
207  return;
208  delta = i->second.lock();
209  if (!delta)
210  {
211  deltas_.erase(i);
212  return;
213  }
214  }
215 
216  if (delta)
217  delta->processData(info, std::move(txns));
218 }
219 
220 void
222 {
224  JLOG(j_.debug()) << "Sweeping, LedgerReplayer has " << tasks_.size()
225  << " tasks, " << skipLists_.size() << " skipLists, and "
226  << deltas_.size() << " deltas.";
227 
228  tasks_.erase(
230  tasks_.begin(),
231  tasks_.end(),
232  [this](auto const& t) -> bool {
233  if (t->finished())
234  {
235  JLOG(j_.debug())
236  << "Sweep task " << t->getTaskParameter().finishHash_;
237  return true;
238  }
239  return false;
240  }),
241  tasks_.end());
242 
243  auto removeCannotLocked = [](auto& subTasks) {
244  for (auto it = subTasks.begin(); it != subTasks.end();)
245  {
246  if (auto item = it->second.lock(); !item)
247  {
248  it = subTasks.erase(it);
249  }
250  else
251  ++it;
252  }
253  };
254  removeCannotLocked(skipLists_);
255  removeCannotLocked(deltas_);
256 }
257 
258 void
259 LedgerReplayer::onStop()
260 {
261  JLOG(j_.info()) << "Stopping...";
262  {
263  std::lock_guard<std::mutex> lock(mtx_);
265  tasks_.begin(), tasks_.end(), [](auto& i) { i->cancel(); });
266  tasks_.clear();
267  auto lockAndCancel = [](auto& i) {
268  if (auto sptr = i.second.lock(); sptr)
269  {
270  sptr->cancel();
271  }
272  };
273  std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
274  skipLists_.clear();
275  std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
276  deltas_.clear();
277  }
278 
279  stopped();
280  JLOG(j_.info()) << "Stopped";
281 }
282 
283 } // namespace ripple
ripple::Application
Definition: Application.h:102
ripple::LedgerReplayer::skipLists_
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
Definition: LedgerReplayer.h:134
std::for_each
T for_each(T... args)
std::shared_ptr
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:513
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::LedgerReplayParameters::MAX_TASKS
constexpr std::uint32_t MAX_TASKS
Definition: LedgerReplayer.h:61
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:100
std::find
T find(T... args)
std::lock_guard
STL class.
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:92
ripple::LedgerReplayParameters::MAX_TASK_SIZE
constexpr std::uint32_t MAX_TASK_SIZE
Definition: LedgerReplayer.h:64
ripple::LedgerReplayer::deltas_
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
Definition: LedgerReplayer.h:133
ripple::LedgerReplayer::createDeltas
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
Definition: LedgerReplayer.cpp:114
ripple::base_uint< 256 >
ripple::Stoppable
Provides an interface for starting and stopping.
Definition: Stoppable.h:201
ripple::LedgerReplayer::inboundLedgers_
InboundLedgers & inboundLedgers_
Definition: LedgerReplayer.h:137
ripple::LedgerReplayer::~LedgerReplayer
~LedgerReplayer()
Definition: LedgerReplayer.cpp:40
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::uint32_t
ripple::LedgerReplayer::gotReplayDelta
void gotReplayDelta(LedgerInfo const &info, std::map< std::uint32_t, std::shared_ptr< STTx const >> &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
Definition: LedgerReplayer.cpp:198
ripple::LedgerReplayer::replay
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
Definition: LedgerReplayer.cpp:47
std::remove_if
T remove_if(T... args)
std::map
STL class.
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:34
ripple::LedgerReplayer::LedgerReplayer
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder, Stoppable &parent)
Definition: LedgerReplayer.cpp:27
ripple::LedgerReplayTask::TaskParameter
Definition: LedgerReplayTask.h:46
ripple::LedgerReplayer::tasks_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
Definition: LedgerReplayer.h:132
ripple::LedgerReplayer::sweep
void sweep()
Remove completed tasks.
Definition: LedgerReplayer.cpp:221
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std
STL namespace.
ripple::LedgerReplayer::j_
beast::Journal j_
Definition: LedgerReplayer.h:139
ripple::LedgerReplayTask::TaskParameter::finishHash_
uint256 finishHash_
Definition: LedgerReplayTask.h:51
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::LedgerReplayer::app_
Application & app_
Definition: LedgerReplayer.h:136
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:84
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:46
ripple::LedgerReplayer::peerSetBuilder_
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
Definition: LedgerReplayer.h:138
std::unique_ptr
STL class.
ripple::LedgerReplayer::gotSkipList
void gotSkipList(LedgerInfo const &info, std::shared_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
Definition: LedgerReplayer.cpp:175
ripple::LedgerReplayTask::TaskParameter::canMergeInto
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
Definition: LedgerReplayTask.cpp:58
ripple::Stoppable::isStopping
bool isStopping() const
Returns true if the stoppable should stop.
Definition: Stoppable.cpp:54
ripple::LedgerReplayer::mtx_
std::mutex mtx_
Definition: LedgerReplayer.h:131