rippled
ShardArchiveHandler.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012-2014 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/misc/NetworkOPs.h>
21 #include <ripple/basics/Archive.h>
22 #include <ripple/basics/BasicConfig.h>
23 #include <ripple/core/ConfigSections.h>
24 #include <ripple/nodestore/DatabaseShard.h>
25 #include <ripple/protocol/ErrorCodes.h>
26 #include <ripple/rpc/ShardArchiveHandler.h>
27 #include <ripple/rpc/impl/Handler.h>
28 
29 #include <memory>
30 
31 namespace ripple {
32 namespace RPC {
33 
34 using namespace boost::filesystem;
35 using namespace std::chrono_literals;
36 
39 
40 boost::filesystem::path
42 {
43  return get(config.section(ConfigSection::shardDatabase()),
44  "download_path",
46  "path",
47  "")) /
48  "download";
49 }
50 
51 auto
53 {
54  std::lock_guard lock(instance_mutex_);
55 
56  return instance_;
57 }
58 
59 auto
61 {
62  std::lock_guard lock(instance_mutex_);
63  assert(!instance_);
64 
65  instance_.reset(new ShardArchiveHandler(app, parent));
66 
67  return instance_;
68 }
69 
70 auto
72  -> pointer
73 {
74  std::lock_guard lock(instance_mutex_);
75  assert(!instance_);
76 
77  instance_.reset(new ShardArchiveHandler(app, parent, true));
78 
79  return instance_;
80 }
81 
82 bool
84 {
85  std::lock_guard lock(instance_mutex_);
86 
87  return instance_.get() != nullptr;
88 }
89 
91  Application& app,
92  Stoppable& parent,
93  bool recovery)
94  : Stoppable("ShardArchiveHandler", parent)
95  , app_(app)
96  , j_(app.journal("ShardArchiveHandler"))
97  , downloadDir_(getDownloadDirectory(app.config()))
98  , timer_(app_.getIOService())
99  , process_(false)
100 {
101  assert(app_.getShardStore());
102 
103  if (recovery)
104  downloader_.reset(
106 }
107 
108 bool
110 {
111  try
112  {
113  create_directories(downloadDir_);
114 
115  sqliteDB_ = std::make_unique<DatabaseCon>(
116  downloadDir_,
117  stateDBName,
120  }
121  catch (std::exception const& e)
122  {
123  JLOG(j_.error()) << "exception: " << e.what()
124  << " in function: " << __func__;
125 
126  return false;
127  }
128 
129  return true;
130 }
131 
132 bool
134 {
135  try
136  {
137  using namespace boost::filesystem;
138 
139  assert(
140  exists(downloadDir_ / stateDBName) &&
141  is_regular_file(downloadDir_ / stateDBName));
142 
143  sqliteDB_ = std::make_unique<DatabaseCon>(
144  downloadDir_,
145  stateDBName,
148 
149  auto& session{sqliteDB_->getSession()};
150 
151  soci::rowset<soci::row> rs =
152  (session.prepare << "SELECT * FROM State;");
153 
155 
156  for (auto it = rs.begin(); it != rs.end(); ++it)
157  {
158  parsedURL url;
159 
160  if (!parseUrl(url, it->get<std::string>(1)))
161  {
162  JLOG(j_.error())
163  << "Failed to parse url: " << it->get<std::string>(1);
164 
165  continue;
166  }
167 
168  add(it->get<int>(0), std::move(url), lock);
169  }
170 
171  // Failed to load anything
172  // from the state database.
173  if (archives_.empty())
174  {
175  release();
176  return false;
177  }
178  }
179  catch (std::exception const& e)
180  {
181  JLOG(j_.error()) << "exception: " << e.what()
182  << " in function: " << __func__;
183 
184  return false;
185  }
186 
187  return true;
188 }
189 
190 void
192 {
194 
195  if (downloader_)
196  {
197  downloader_->onStop();
198  downloader_.reset();
199  }
200 
201  stopped();
202 }
203 
204 bool
206  std::uint32_t shardIndex,
208 {
210 
211  if (!add(shardIndex, std::forward<parsedURL>(url.first), lock))
212  return false;
213 
214  auto& session{sqliteDB_->getSession()};
215 
216  session << "INSERT INTO State VALUES (:index, :url);",
217  soci::use(shardIndex), soci::use(url.second);
218 
219  return true;
220 }
221 
222 bool
224  std::uint32_t shardIndex,
225  parsedURL&& url,
227 {
228  if (process_)
229  {
230  JLOG(j_.error()) << "Download and import already in progress";
231  return false;
232  }
233 
234  auto const it{archives_.find(shardIndex)};
235  if (it != archives_.end())
236  return url == it->second;
237 
238  if (!app_.getShardStore()->prepareShard(shardIndex))
239  return false;
240 
241  archives_.emplace(shardIndex, std::move(url));
242 
243  return true;
244 }
245 
246 bool
248 {
249  std::lock_guard lock(m_);
250  if (!app_.getShardStore())
251  {
252  JLOG(j_.error()) << "No shard store available";
253  return false;
254  }
255  if (process_)
256  {
257  JLOG(j_.warn()) << "Archives already being processed";
258  return false;
259  }
260  if (archives_.empty())
261  {
262  JLOG(j_.warn()) << "No archives to process";
263  return false;
264  }
265 
266  try
267  {
268  // Create temp root download directory
269  create_directories(downloadDir_);
270 
271  if (!downloader_)
272  {
273  // will throw if can't initialize ssl context
274  downloader_ = std::make_shared<DatabaseDownloader>(
275  app_.getIOService(), j_, app_.config());
276  }
277  }
278  catch (std::exception const& e)
279  {
280  JLOG(j_.error()) << "exception: " << e.what();
281  return false;
282  }
283 
284  return next(lock);
285 }
286 
287 void
289 {
291  doRelease(lock);
292 }
293 
294 bool
296 {
297  if (archives_.empty())
298  {
299  doRelease(l);
300  return false;
301  }
302 
303  // Create a temp archive directory at the root
304  auto const shardIndex{archives_.begin()->first};
305  auto const dstDir{downloadDir_ / std::to_string(shardIndex)};
306  try
307  {
308  create_directory(dstDir);
309  }
310  catch (std::exception const& e)
311  {
312  JLOG(j_.error()) << "exception: " << e.what();
313  remove(l);
314  return next(l);
315  }
316 
317  // Download the archive. Process in another thread
318  // to prevent holding up the lock if the downloader
319  // sleeps.
320  auto const& url{archives_.begin()->second};
322  jtCLIENT,
323  "ShardArchiveHandler",
324  [this, ptr = shared_from_this(), url, dstDir](Job&) {
325  if (!downloader_->download(
326  url.domain,
327  std::to_string(url.port.get_value_or(443)),
328  url.path,
329  11,
330  dstDir / "archive.tar.lz4",
331  std::bind(
333  ptr,
334  std::placeholders::_1)))
335  {
336  std::lock_guard<std::mutex> l(m_);
337  remove(l);
338  next(l);
339  }
340  });
341 
342  process_ = true;
343  return true;
344 }
345 
346 void
348 {
349  {
350  std::lock_guard lock(m_);
351  try
352  {
353  if (!is_regular_file(dstPath))
354  {
355  auto ar{archives_.begin()};
356  JLOG(j_.error())
357  << "Downloading shard id " << ar->first << " form URL "
358  << ar->second.domain << ar->second.path;
359  remove(lock);
360  next(lock);
361  return;
362  }
363  }
364  catch (std::exception const& e)
365  {
366  JLOG(j_.error()) << "exception: " << e.what();
367  remove(lock);
368  next(lock);
369  return;
370  }
371  }
372 
373  // Process in another thread to not hold up the IO service
375  jtCLIENT,
376  "ShardArchiveHandler",
377  [=, dstPath = std::move(dstPath), ptr = shared_from_this()](Job&) {
378  // If not synced then defer and retry
379  auto const mode{ptr->app_.getOPs().getOperatingMode()};
380  if (mode != OperatingMode::FULL)
381  {
382  std::lock_guard lock(m_);
383  timer_.expires_from_now(static_cast<std::chrono::seconds>(
384  (static_cast<std::size_t>(OperatingMode::FULL) -
385  static_cast<std::size_t>(mode)) *
386  10));
387  timer_.async_wait(
388  [=, dstPath = std::move(dstPath), ptr = std::move(ptr)](
389  boost::system::error_code const& ec) {
390  if (ec != boost::asio::error::operation_aborted)
391  ptr->complete(std::move(dstPath));
392  });
393  }
394  else
395  {
396  ptr->process(dstPath);
397  std::lock_guard lock(m_);
398  remove(lock);
399  next(lock);
400  }
401  });
402 }
403 
404 void
405 ShardArchiveHandler::process(path const& dstPath)
406 {
407  std::uint32_t shardIndex;
408  {
409  std::lock_guard lock(m_);
410  shardIndex = archives_.begin()->first;
411  }
412 
413  auto const shardDir{dstPath.parent_path() / std::to_string(shardIndex)};
414  try
415  {
416  // Extract the downloaded archive
417  extractTarLz4(dstPath, dstPath.parent_path());
418 
419  // The extracted root directory name must match the shard index
420  if (!is_directory(shardDir))
421  {
422  JLOG(j_.error()) << "Shard " << shardIndex
423  << " mismatches archive shard directory";
424  return;
425  }
426  }
427  catch (std::exception const& e)
428  {
429  JLOG(j_.error()) << "exception: " << e.what();
430  return;
431  }
432 
433  // Import the shard into the shard store
434  if (!app_.getShardStore()->importShard(shardIndex, shardDir))
435  {
436  JLOG(j_.error()) << "Importing shard " << shardIndex;
437  return;
438  }
439 
440  JLOG(j_.debug()) << "Shard " << shardIndex << " downloaded and imported";
441 }
442 
443 void
445 {
446  auto const shardIndex{archives_.begin()->first};
447  app_.getShardStore()->removePreShard(shardIndex);
448  archives_.erase(shardIndex);
449 
450  auto& session{sqliteDB_->getSession()};
451 
452  session << "DELETE FROM State WHERE ShardIndex = :index;",
453  soci::use(shardIndex);
454 
455  auto const dstDir{downloadDir_ / std::to_string(shardIndex)};
456  try
457  {
458  remove_all(dstDir);
459  }
460  catch (std::exception const& e)
461  {
462  JLOG(j_.error()) << "exception: " << e.what();
463  }
464 }
465 
466 void
468 {
469  process_ = false;
470 
471  timer_.cancel();
472  for (auto const& ar : archives_)
473  app_.getShardStore()->removePreShard(ar.first);
474  archives_.clear();
475 
476  {
477  auto& session{sqliteDB_->getSession()};
478 
479  session << "DROP TABLE State;";
480  }
481 
482  sqliteDB_.reset();
483 
484  // Remove temp root download directory
485  try
486  {
487  remove_all(downloadDir_);
488  }
489  catch (std::exception const& e)
490  {
491  JLOG(j_.error()) << "exception: " << e.what()
492  << " in function: " << __func__;
493  }
494 
495  downloader_.reset();
496 }
497 
498 } // namespace RPC
499 } // namespace ripple
ripple::Application
Definition: Application.h:94
ripple::RPC::ShardArchiveHandler::getDownloadDirectory
static boost::filesystem::path getDownloadDirectory(Config const &config)
Definition: ShardArchiveHandler.cpp:41
ripple::RPC::ShardArchiveHandler::downloader_
std::shared_ptr< DatabaseDownloader > downloader_
Definition: ShardArchiveHandler.h:132
ripple::RPC::ShardArchiveHandler::downloadDir_
const boost::filesystem::path downloadDir_
Definition: ShardArchiveHandler.h:133
std::bind
T bind(T... args)
std::string
STL class.
std::shared_ptr
STL class.
ripple::RPC::ShardArchiveHandler::start
bool start()
Starts downloading and importing archives.
Definition: ShardArchiveHandler.cpp:247
ripple::jtCLIENT
@ jtCLIENT
Definition: Job.h:48
std::exception
STL class.
ripple::RPC::ShardArchiveHandler::add
bool add(std::uint32_t shardIndex, std::pair< parsedURL, std::string > &&url)
Definition: ShardArchiveHandler.cpp:205
ripple::Stoppable::stopped
void stopped()
Called by derived classes to indicate that the stoppable has stopped.
Definition: Stoppable.cpp:72
ripple::parsedURL
Definition: StringUtilities.h:123
std::pair
ripple::RPC::ShardArchiveHandler::recoverInstance
static pointer recoverInstance(Application &app, Stoppable &parent)
Definition: ShardArchiveHandler.cpp:71
ripple::ConfigSection::shardDatabase
static std::string shardDatabase()
Definition: ConfigSections.h:38
ripple::RPC::ShardArchiveHandler::pointer
std::shared_ptr< ShardArchiveHandler > pointer
Definition: ShardArchiveHandler.h:43
ripple::RPC::ShardArchiveHandler::onStop
void onStop() override
Override called when the stop notification is issued.
Definition: ShardArchiveHandler.cpp:191
std::chrono::seconds
ripple::RPC::ShardArchiveHandler::initFromDB
bool initFromDB()
Definition: ShardArchiveHandler.cpp:133
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
ripple::RPC::ShardArchiveHandler::doRelease
void doRelease(std::lock_guard< std::mutex > const &)
Definition: ShardArchiveHandler.cpp:467
std::lock_guard
STL class.
ripple::Application::getShardStore
virtual NodeStore::DatabaseShard * getShardStore()=0
ripple::ShardArchiveHandlerDBInit
static constexpr std::array< char const *, 3 > ShardArchiveHandlerDBInit
Definition: DBInit.h:178
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::extractTarLz4
void extractTarLz4(boost::filesystem::path const &src, boost::filesystem::path const &dst)
Extract a tar archive compressed with lz4.
Definition: Archive.cpp:29
ripple::DownloaderDBPragma
static constexpr std::array< char const *, 2 > DownloaderDBPragma
Definition: DBInit.h:175
ripple::RPC::ShardArchiveHandler::timer_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
Definition: ShardArchiveHandler.h:134
ripple::RPC::ShardArchiveHandler::instance_
static pointer instance_
Definition: ShardArchiveHandler.h:126
ripple::Stoppable
Provides an interface for starting and stopping.
Definition: Stoppable.h:200
ripple::RPC::ShardArchiveHandler::release
void release()
Definition: ShardArchiveHandler.cpp:288
ripple::RPC::ShardArchiveHandler::process
void process(boost::filesystem::path const &dstPath)
Definition: ShardArchiveHandler.cpp:405
std::enable_shared_from_this< ShardArchiveHandler >::shared_from_this
T shared_from_this(T... args)
ripple::RPC::ShardArchiveHandler::remove
void remove(std::lock_guard< std::mutex > &)
Definition: ShardArchiveHandler.cpp:444
ripple::Config
Definition: Config.h:66
ripple::Application::config
virtual Config & config()=0
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::parseUrl
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
Definition: StringUtilities.cpp:55
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::RPC::ShardArchiveHandler::sqliteDB_
std::unique_ptr< DatabaseCon > sqliteDB_
Definition: ShardArchiveHandler.h:131
ripple::Job
Definition: Job.h:82
ripple::RPC::ShardArchiveHandler::complete
void complete(boost::filesystem::path dstPath)
Definition: ShardArchiveHandler.cpp:347
std::uint32_t
ripple::RPC::ShardArchiveHandler::j_
const beast::Journal j_
Definition: ShardArchiveHandler.h:130
memory
ripple::Application::getIOService
virtual boost::asio::io_service & getIOService()=0
ripple::stateDBName
static constexpr auto stateDBName
Definition: DBInit.h:173
ripple::NodeStore::DatabaseShard::importShard
virtual bool importShard(std::uint32_t shardIndex, boost::filesystem::path const &srcDir)=0
Import a shard into the shard database.
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::RPC::ShardArchiveHandler::archives_
std::map< std::uint32_t, parsedURL > archives_
Definition: ShardArchiveHandler.h:136
ripple::NodeStore::DatabaseShard::prepareShard
virtual bool prepareShard(std::uint32_t shardIndex)=0
Prepare a shard index to be imported into the database.
ripple::RPC::ShardArchiveHandler::app_
Application & app_
Definition: ShardArchiveHandler.h:129
ripple::RPC::ShardArchiveHandler::hasInstance
static bool hasInstance()
Definition: ShardArchiveHandler.cpp:83
ripple::RPC::ShardArchiveHandler
Handles the download and import one or more shard archives.
Definition: ShardArchiveHandler.h:38
ripple::RPC::ShardArchiveHandler::ShardArchiveHandler
ShardArchiveHandler()=delete
std::mutex
STL class.
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::RPC::ShardArchiveHandler::instance_mutex_
static std::mutex instance_mutex_
Definition: ShardArchiveHandler.h:125
std::size_t
ripple::RPC::ShardArchiveHandler::m_
std::mutex m_
Definition: ShardArchiveHandler.h:128
ripple::DatabaseDownloader
Definition: DatabaseDownloader.h:28
ripple::NodeStore::DatabaseShard::removePreShard
virtual void removePreShard(std::uint32_t shardIndex)=0
Remove a previously prepared shard index for import.
ripple::RPC::ShardArchiveHandler::process_
bool process_
Definition: ShardArchiveHandler.h:135
ripple::RPC::ShardArchiveHandler::init
bool init()
Definition: ShardArchiveHandler.cpp:109
ripple::RPC::ShardArchiveHandler::getInstance
static pointer getInstance()
Definition: ShardArchiveHandler.cpp:52
std::exception::what
T what(T... args)
ripple::RPC::ShardArchiveHandler::next
bool next(std::lock_guard< std::mutex > &l)
Definition: ShardArchiveHandler.cpp:295
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:116
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:138
ripple::OperatingMode::FULL
@ FULL
we have the ledger and can even validate