From 658f904ce0fb3a07d72fc9c3dcdfd5a13cb54af2 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Fri, 18 May 2018 09:56:46 -0400 Subject: [PATCH] Add shard import support to shard database --- src/ripple/app/ledger/impl/LedgerMaster.cpp | 2 +- src/ripple/app/main/Application.cpp | 2 +- src/ripple/nodestore/Backend.h | 3 +- src/ripple/nodestore/DatabaseShard.h | 67 +++- .../nodestore/backend/MemoryFactory.cpp | 2 +- src/ripple/nodestore/backend/NuDBFactory.cpp | 24 +- src/ripple/nodestore/backend/NullFactory.cpp | 2 +- .../nodestore/backend/RocksDBFactory.cpp | 4 +- .../nodestore/backend/RocksDBQuickFactory.cpp | 2 +- .../nodestore/impl/DatabaseShardImp.cpp | 354 +++++++++++++++--- src/ripple/nodestore/impl/DatabaseShardImp.h | 51 ++- src/ripple/nodestore/impl/Shard.cpp | 139 +++---- src/ripple/nodestore/impl/Shard.h | 21 +- 13 files changed, 512 insertions(+), 161 deletions(-) diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 8a35d5306..c84d10e1a 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -1697,7 +1697,7 @@ void LedgerMaster::doAdvance (ScopedLockType& sl) { if (auto shardStore = app_.getShardStore()) { - missing = shardStore->prepare(mValidLedgerSeq); + missing = shardStore->prepareLedger(mValidLedgerSeq); if (missing) reason = InboundLedger::Reason::SHARD; } diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 261bfbf7e..6117d2d31 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -2151,7 +2151,7 @@ bool ApplicationImp::nodeToShards() "Invalid [shard_db] configuration"; return false; } - shardStore_->importNodeStore(); + shardStore_->import(getNodeStore()); return true; } diff --git a/src/ripple/nodestore/Backend.h b/src/ripple/nodestore/Backend.h index 98872db3c..7b76806f6 100644 --- a/src/ripple/nodestore/Backend.h +++ b/src/ripple/nodestore/Backend.h @@ -51,9 +51,10 @@ public: virtual std::string getName() = 0; /** Open the backend. + @param createIfMissing Create the database files if necessary. This allows the caller to catch exceptions. */ - virtual void open() = 0; + virtual void open(bool createIfMissing = true) = 0; /** Close the backend. This allows the caller to catch exceptions. diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index 75e1f2c87..a7fa7de0a 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -64,20 +65,58 @@ public: bool init() = 0; - /** Prepare to store a new ledger in the shard + /** Prepare to store a new ledger in the shard being acquired - @param validLedgerSeq the index of the maximum valid ledgers - @return if a ledger should be fetched and stored, then returns the ledger + @param validLedgerSeq The index of the maximum valid ledgers + @return If a ledger should be fetched and stored, then returns the ledger index of the ledger to request. Otherwise returns boost::none. - Some reasons this may return boost::none are: this database does - not store shards, all shards are are stored and full, max allowed - disk space would be exceeded, or a ledger was recently requested - and not enough time has passed between requests. + Some reasons this may return boost::none are: all shards are + stored and full, max allowed disk space would be exceeded, or a + ledger was recently requested and not enough time has passed + between requests. @implNote adds a new writable shard if necessary */ virtual boost::optional - prepare(std::uint32_t validLedgerSeq) = 0; + prepareLedger(std::uint32_t validLedgerSeq) = 0; + + /** Prepare a shard index to be imported into the database + + @param shardIndex Shard index to be prepared for import + @return true if shard index successfully prepared for import + */ + virtual + bool + prepareShard(std::uint32_t shardIndex) = 0; + + /** Remove shard indexes from prepared import + + @param indexes Shard indexes to be removed from import + */ + virtual + void + removePreShard(std::uint32_t shardIndex) = 0; + + /** Get shard indexes being imported + + @return The number of shards prepared for import + */ + virtual + std::uint32_t + getNumPreShard() = 0; + + /** Import a shard into the shard database + + @param shardIndex Shard index to import + @param srcDir The directory to import from + @param validate If true validate shard ledger data + @return true If the shard was successfully imported + @implNote if successful, srcDir is moved to the database directory + */ + virtual + bool + importShard(std::uint32_t shardIndex, + boost::filesystem::path const& srcDir, bool validate) = 0; /** Fetch a ledger from the shard store @@ -123,12 +162,6 @@ public: void validate() = 0; - /** Import the node store into the shard store. - */ - virtual - void - importNodeStore() = 0; - /** @return The maximum number of ledgers stored in a shard */ virtual @@ -168,6 +201,12 @@ public: std::uint32_t lastLedgerSeq(std::uint32_t shardIndex) const = 0; + /** Returns the root database directory + */ + virtual + boost::filesystem::path const& + getRootDir() const = 0; + /** The number of ledgers in a shard */ static constexpr std::uint32_t ledgersPerShardDefault {16384u}; }; diff --git a/src/ripple/nodestore/backend/MemoryFactory.cpp b/src/ripple/nodestore/backend/MemoryFactory.cpp index 3ed6ad080..609117d76 100644 --- a/src/ripple/nodestore/backend/MemoryFactory.cpp +++ b/src/ripple/nodestore/backend/MemoryFactory.cpp @@ -105,7 +105,7 @@ public: } void - open() override + open(bool createIfMissing) override { db_ = &memoryFactory.open(name_); } diff --git a/src/ripple/nodestore/backend/NuDBFactory.cpp b/src/ripple/nodestore/backend/NuDBFactory.cpp index 546342139..19f8eae3f 100644 --- a/src/ripple/nodestore/backend/NuDBFactory.cpp +++ b/src/ripple/nodestore/backend/NuDBFactory.cpp @@ -81,8 +81,9 @@ public: } void - open() override + open(bool createIfMissing) override { + using namespace boost::filesystem; if (db_.is_open()) { assert(false); @@ -90,19 +91,22 @@ public: "database is already open"; return; } - auto const folder = boost::filesystem::path(name_); - boost::filesystem::create_directories (folder); + auto const folder = path(name_); auto const dp = (folder / "nudb.dat").string(); auto const kp = (folder / "nudb.key").string(); auto const lp = (folder / "nudb.log").string(); nudb::error_code ec; - nudb::create(dp, kp, lp, - currentType, nudb::make_salt(), keyBytes_, - nudb::block_size(kp), 0.50, ec); - if(ec == nudb::errc::file_exists) - ec = {}; - if(ec) - Throw(ec); + if (createIfMissing) + { + create_directories(folder); + nudb::create(dp, kp, lp, + currentType, nudb::make_salt(), keyBytes_, + nudb::block_size(kp), 0.50, ec); + if(ec == nudb::errc::file_exists) + ec = {}; + if(ec) + Throw(ec); + } db_.open (dp, kp, lp, ec); if(ec) Throw(ec); diff --git a/src/ripple/nodestore/backend/NullFactory.cpp b/src/ripple/nodestore/backend/NullFactory.cpp index 583367210..c35b08b01 100644 --- a/src/ripple/nodestore/backend/NullFactory.cpp +++ b/src/ripple/nodestore/backend/NullFactory.cpp @@ -39,7 +39,7 @@ public: } void - open() override + open(bool createIfMissing) override { } diff --git a/src/ripple/nodestore/backend/RocksDBFactory.cpp b/src/ripple/nodestore/backend/RocksDBFactory.cpp index fd348a453..04c4fe6f4 100644 --- a/src/ripple/nodestore/backend/RocksDBFactory.cpp +++ b/src/ripple/nodestore/backend/RocksDBFactory.cpp @@ -113,7 +113,6 @@ public: Throw ("Missing path in RocksDBFactory backend"); rocksdb::BlockBasedTableOptions table_options; - m_options.create_if_missing = true; m_options.env = env; if (keyValues.exists ("cache_mb")) @@ -208,7 +207,7 @@ public: } void - open() override + open(bool createIfMissing) override { if (m_db) { @@ -218,6 +217,7 @@ public: return; } rocksdb::DB* db = nullptr; + m_options.create_if_missing = createIfMissing; rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db); if (!status.ok() || !db) Throw( diff --git a/src/ripple/nodestore/backend/RocksDBQuickFactory.cpp b/src/ripple/nodestore/backend/RocksDBQuickFactory.cpp index 9e9a94f73..a15e3b1d7 100644 --- a/src/ripple/nodestore/backend/RocksDBQuickFactory.cpp +++ b/src/ripple/nodestore/backend/RocksDBQuickFactory.cpp @@ -179,7 +179,7 @@ public: } void - open() override + open(bool createIfMissing) override { if (m_db) { diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 89dc28d0b..70cd13b54 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -20,9 +20,10 @@ #include #include -#include +#include #include #include +#include #include #include @@ -38,6 +39,8 @@ DatabaseShardImp::DatabaseShardImp(Application& app, , app_(app) , config_(config) , dir_(get(config, "path")) + , backendName_(Manager::instance().find( + get(config_, "type"))->getName()) , maxDiskSpace_(get(config, "max_size_gb") << 30) , ledgersPerShard_(get( config, "ledgers_per_shard", ledgersPerShardDefault)) @@ -58,15 +61,16 @@ DatabaseShardImp::~DatabaseShardImp() bool DatabaseShardImp::init() { + using namespace boost::filesystem; std::lock_guard l(m_); if (init_) { + assert(false); JLOG(j_.error()) << "Already initialized"; return false; } - using namespace boost::filesystem; // Find backend type and file handle requirement try { @@ -93,13 +97,19 @@ DatabaseShardImp::init() { if (!is_directory(d)) continue; + + // Validate shard directory name is numeric auto dirName = d.path().stem().string(); - if (!std::all_of(dirName.begin(), dirName.end(), - [](auto c) - { - return ::isdigit(static_cast(c)); - })) + if (!std::all_of( + dirName.begin(), + dirName.end(), + [](auto c){ + return ::isdigit(static_cast(c)); + })) + { continue; + } + auto const shardIndex {std::stoul(dirName)}; if (shardIndex < earliestShardIndex()) { @@ -116,13 +126,14 @@ DatabaseShardImp::init() JLOG(j_.warn()) << "shard " << shardIndex << " previously failed import, removing"; - remove_all(dir_ / std::to_string(shardIndex)); + if (!this->remove(dir_ / std::to_string(shardIndex))) + return false; continue; } auto shard = std::make_unique( *this, shardIndex, cacheSz_, cacheAge_, j_); - if (!shard->open(config_, scheduler_, dir_)) + if (!shard->open(config_, scheduler_)) return false; usedDiskSpace_ += shard->fileSize(); if (shard->complete()) @@ -156,7 +167,7 @@ DatabaseShardImp::init() } boost::optional -DatabaseShardImp::prepare(std::uint32_t validLedgerSeq) +DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) { std::lock_guard l(m_); assert(init_); @@ -166,7 +177,7 @@ DatabaseShardImp::prepare(std::uint32_t validLedgerSeq) return boost::none; if (backed_) { - // Create a new shard to acquire + // Check available disk space if (usedDiskSpace_ + avgShardSz_ > maxDiskSpace_) { JLOG(j_.debug()) << @@ -176,7 +187,7 @@ DatabaseShardImp::prepare(std::uint32_t validLedgerSeq) } if (avgShardSz_ > boost::filesystem::space(dir_).free) { - JLOG(j_.warn()) << + JLOG(j_.error()) << "Insufficient disk space"; canAdd_ = false; return boost::none; @@ -197,15 +208,213 @@ DatabaseShardImp::prepare(std::uint32_t validLedgerSeq) 1, static_cast(complete_.size() + 1)))}; incomplete_ = std::make_unique( *this, *shardIndex, sz, cacheAge_, j_); - if (!incomplete_->open(config_, scheduler_, dir_)) + if (!incomplete_->open(config_, scheduler_)) { incomplete_.reset(); - remove_all(dir_ / std::to_string(*shardIndex)); + this->remove(dir_ / std::to_string(*shardIndex)); return boost::none; } return incomplete_->prepare(); } +bool +DatabaseShardImp::prepareShard(std::uint32_t shardIndex) +{ + std::lock_guard l(m_); + assert(init_); + if (!canAdd_) + { + JLOG(j_.error()) << + "Unable to add more shards to the database"; + return false; + } + + if (shardIndex < earliestShardIndex()) + { + JLOG(j_.error()) << + "Invalid shard index " << shardIndex; + return false; + } + + // If we are synced to the network, check if the shard index + // is greater or equal to the current shard. + auto seqCheck = [&](std::uint32_t seq) + { + // seq will be greater than zero if valid + if (seq > earliestSeq() && shardIndex >= seqToShardIndex(seq)) + { + JLOG(j_.error()) << + "Invalid shard index " << shardIndex; + return false; + } + return true; + }; + if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex()) || + !seqCheck(app_.getLedgerMaster().getCurrentLedgerIndex())) + { + return false; + } + if (complete_.find(shardIndex) != complete_.end()) + { + JLOG(j_.debug()) << + "Shard index " << shardIndex << + " stored"; + return false; + } + if (incomplete_ && incomplete_->index() == shardIndex) + { + JLOG(j_.debug()) << + "Shard index " << shardIndex << + " is being acquired"; + return false; + } + if (preShards_.find(shardIndex) != preShards_.end()) + { + JLOG(j_.debug()) << + "Shard index " << shardIndex << + " is prepared for import"; + return false; + } + + // Check limit and space requirements + if (backed_) + { + std::uint64_t const sz { + (preShards_.size() + 1 + (incomplete_ ? 1 : 0)) * avgShardSz_}; + if (usedDiskSpace_ + sz > maxDiskSpace_) + { + JLOG(j_.debug()) << + "Exceeds maximum size"; + return false; + } + if (sz > space(dir_).free) + { + JLOG(j_.error()) << + "Insufficient disk space"; + return false; + } + } + + // Add to shards prepared + preShards_.emplace(shardIndex, nullptr); + return true; +} + +void +DatabaseShardImp::removePreShard(std::uint32_t shardIndex) +{ + std::lock_guard l(m_); + assert(init_); + preShards_.erase(shardIndex); +} + +std::uint32_t +DatabaseShardImp::getNumPreShard() +{ + std::lock_guard l(m_); + assert(init_); + return preShards_.size(); +} + +bool +DatabaseShardImp::importShard(std::uint32_t shardIndex, + boost::filesystem::path const& srcDir, bool validate) +{ + using namespace boost::filesystem; + if (!is_directory(srcDir) || is_empty(srcDir)) + { + JLOG(j_.error()) << + "Invalid source directory " << srcDir.string(); + return false; + } + + auto move = [&](path const& src, path const& dst) + { + try + { + rename(src, dst); + } + catch (const filesystem_error& e) + { + JLOG(j_.error()) << + "rename " << src.string() << + " to " << dst.string() << + ": Exception, " << e.code().message(); + return false; + } + return true; + }; + + std::unique_lock l(m_); + assert(init_); + + // Check shard is prepared + auto it {preShards_.find(shardIndex)}; + if(it == preShards_.end()) + { + JLOG(j_.error()) << + "Invalid shard index " << std::to_string(shardIndex); + return false; + } + + // Move source directory to the shard database directory + auto const dstDir {dir_ / std::to_string(shardIndex)}; + if (!move(srcDir, dstDir)) + return false; + + // Create the new shard + auto shard {std::make_unique( + *this, shardIndex, cacheSz_, cacheAge_, j_)}; + auto fail = [&](std::string msg) + { + if (!msg.empty()) + { + JLOG(j_.error()) << msg; + } + shard.release(); + move(dstDir, srcDir); + return false; + }; + if (!shard->open(config_, scheduler_)) + return fail({}); + if (!shard->complete()) + return fail("Incomplete shard"); + + // Verify database integrity + try + { + shard->getBackend()->verify(); + } + catch (std::exception const& e) + { + return fail(std::string("Verify: Exception, ") + e.what()); + } + + // Validate shard ledgers + if (validate) + { + // Shard validation requires releasing the lock + // so the database can fetch data from it + it->second = shard.get(); + l.unlock(); + auto valid {shard->validate(app_)}; + l.lock(); + if (!valid) + { + it = preShards_.find(shardIndex); + if(it != preShards_.end()) + it->second = nullptr; + return fail({}); + } + } + + // Add the shard + usedDiskSpace_ += shard->fileSize(); + complete_.emplace(shardIndex, std::move(shard)); + preShards_.erase(shardIndex); + return true; +} + std::shared_ptr DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t seq) { @@ -320,7 +529,7 @@ DatabaseShardImp::validate() assert(init_); if (complete_.empty() && !incomplete_) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "No shards to validate"; return; } @@ -332,7 +541,7 @@ DatabaseShardImp::validate() s += std::to_string(incomplete_->index()); else s.pop_back(); - JLOG(j_.fatal()) << s; + JLOG(j_.debug()) << s; } for (auto& e : complete_) @@ -349,10 +558,20 @@ DatabaseShardImp::validate() } void -DatabaseShardImp::importNodeStore() +DatabaseShardImp::import(Database& source) { std::unique_lock l(m_); assert(init_); + + // Only the application local node store can be imported + if (&source != &app_.getNodeStore()) + { + assert(false); + JLOG(j_.error()) << + "Invalid source database"; + return; + } + std::uint32_t earliestIndex; std::uint32_t latestIndex; { @@ -446,7 +665,7 @@ DatabaseShardImp::importNodeStore() bool valid {true}; for (std::uint32_t n = firstSeq; n <= lastSeq; n += 256) { - if (!app_.getNodeStore().fetch(ledgerHashes[n].first, n)) + if (!source.fetch(ledgerHashes[n].first, n)) { JLOG(j_.warn()) << "SQL DB ledger sequence " << n << @@ -461,25 +680,26 @@ DatabaseShardImp::importNodeStore() // Create the new shard app_.shardFamily()->reset(); + auto const shardDir {dir_ / std::to_string(shardIndex)}; auto shard = std::make_unique( *this, shardIndex, shardCacheSz, cacheAge_, j_); - if (!shard->open(config_, scheduler_, dir_)) + if (!shard->open(config_, scheduler_)) { shard.reset(); - remove_all(dir_ / std::to_string(shardIndex)); + this->remove(shardDir); continue; } // Create a marker file to signify an import in progress - auto f {dir_ / std::to_string(shardIndex) / importMarker_}; - std::ofstream ofs {f.string()}; + auto const markerFile {shardDir / importMarker_}; + std::ofstream ofs {markerFile.string()}; if (!ofs.is_open()) { JLOG(j_.error()) << "shard " << shardIndex << - " unable to create temp file"; + " unable to create temp marker file"; shard.reset(); - remove_all(dir_ / std::to_string(shardIndex)); + this->remove(shardDir); continue; } ofs.close(); @@ -504,10 +724,10 @@ DatabaseShardImp::importNodeStore() if (shard->complete()) { - remove(f); JLOG(j_.debug()) << "shard " << shardIndex << " successfully imported"; + this->remove(markerFile); break; } } @@ -518,7 +738,7 @@ DatabaseShardImp::importNodeStore() "shard " << shardIndex << " failed to import"; shard.reset(); - remove_all(dir_ / std::to_string(shardIndex)); + this->remove(shardDir); } } @@ -725,20 +945,31 @@ DatabaseShardImp::sweep() std::shared_ptr DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq) { - std::shared_ptr backend; auto const shardIndex {seqToShardIndex(seq)}; + std::unique_lock l(m_); + assert(init_); { - std::lock_guard l(m_); - assert(init_); auto it = complete_.find(shardIndex); if (it != complete_.end()) - backend = it->second->getBackend(); - else if (incomplete_ && incomplete_->index() == shardIndex) - backend = incomplete_->getBackend(); - else - return {}; + { + l.unlock(); + return fetchInternal(hash, *it->second->getBackend()); + } } - return fetchInternal(hash, *backend); + if (incomplete_ && incomplete_->index() == shardIndex) + { + l.unlock(); + return fetchInternal(hash, *incomplete_->getBackend()); + } + + // Used to validate import shards + auto it = preShards_.find(shardIndex); + if (it != preShards_.end() && it->second) + { + l.unlock(); + return fetchInternal(hash, *it->second->getBackend()); + } + return {}; } boost::optional @@ -763,7 +994,8 @@ DatabaseShardImp::findShardIndexToAdd( for (std::uint32_t i = earliestShardIndex(); i <= maxShardIndex; ++i) { if (complete_.find(i) == complete_.end() && - (!incomplete_ || incomplete_->index() != i)) + (!incomplete_ || incomplete_->index() != i) && + preShards_.find(i) == preShards_.end()) available.push_back(i); } if (!available.empty()) @@ -776,9 +1008,10 @@ DatabaseShardImp::findShardIndexToAdd( // chances of running more than 30 times is less than 1 in a billion for (int i = 0; i < 40; ++i) { - auto const r = rand_int(earliestShardIndex(), maxShardIndex); + auto const r {rand_int(earliestShardIndex(), maxShardIndex)}; if (complete_.find(r) == complete_.end() && - (!incomplete_ || incomplete_->index() != r)) + (!incomplete_ || incomplete_->index() != r) && + preShards_.find(r) == preShards_.end()) return r; } assert(0); @@ -850,21 +1083,48 @@ DatabaseShardImp::updateStats(std::lock_guard&) std::pair, std::shared_ptr> DatabaseShardImp::selectCache(std::uint32_t seq) { - std::pair, - std::shared_ptr> cache; auto const shardIndex {seqToShardIndex(seq)}; + std::lock_guard l(m_); + assert(init_); { - std::lock_guard l(m_); - assert(init_); auto it = complete_.find(shardIndex); if (it != complete_.end()) - cache = std::make_pair(it->second->pCache(), + { + return std::make_pair(it->second->pCache(), it->second->nCache()); - else if (incomplete_ && incomplete_->index() == shardIndex) - cache = std::make_pair(incomplete_->pCache(), - incomplete_->nCache()); + } } - return cache; + if (incomplete_ && incomplete_->index() == shardIndex) + { + return std::make_pair(incomplete_->pCache(), + incomplete_->nCache()); + } + + // Used to validate import shards + auto it = preShards_.find(shardIndex); + if (it != preShards_.end() && it->second) + { + return std::make_pair(it->second->pCache(), + it->second->nCache()); + } + return {}; +} + +bool +DatabaseShardImp::remove(boost::filesystem::path const& path) +{ + try + { + boost::filesystem::remove_all(path); + } + catch (const boost::filesystem::filesystem_error& e) + { + JLOG(j_.error()) << + "remove_all " << path.string() << + ": Exception, " << e.code().message(); + return false; + } + return true; } } // NodeStore diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 357d784cf..9231d19e0 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -43,7 +43,20 @@ public: init() override; boost::optional - prepare(std::uint32_t validLedgerSeq) override; + prepareLedger(std::uint32_t validLedgerSeq) override; + + bool + prepareShard(std::uint32_t shardIndex) override; + + void + removePreShard(std::uint32_t shardIndex) override; + + std::uint32_t + getNumPreShard() override; + + bool + importShard(std::uint32_t shardIndex, + boost::filesystem::path const& srcDir, bool validate) override; std::shared_ptr fetchLedger(uint256 const& hash, std::uint32_t seq) override; @@ -60,9 +73,6 @@ public: void validate() override; - void - importNodeStore() override; - std::uint32_t ledgersPerShard() const override { @@ -98,17 +108,24 @@ public: return (shardIndex + 1) * ledgersPerShard_; } + boost::filesystem::path const& + getRootDir() const override + { + return dir_; + } + std::string getName() const override { - return "shardstore"; + return backendName_; } + /** Import the application local node store + + @param source The application node store. + */ void - import(Database& source) override - { - Throw("Shard store import not supported"); - } + import(Database& source) override; std::int32_t getWriteLoad() const override; @@ -143,10 +160,18 @@ private: Application& app_; mutable std::mutex m_; bool init_ {false}; + + // Complete shards std::map> complete_; + + // A shard being acquired from the peer network std::unique_ptr incomplete_; + + // Shards prepared for import + std::map preShards_; + Section const config_; - boost::filesystem::path dir_; + boost::filesystem::path const dir_; // If new shards can be stored bool canAdd_ {true}; @@ -157,6 +182,9 @@ private: // If backend type uses permanent storage bool backed_; + // The name associated with the backend used with the shard store + std::string const backendName_; + // Maximum disk space the DB can use (in bytes) std::uint64_t const maxDiskSpace_; @@ -212,6 +240,9 @@ private: return std::max(shardCacheSz, cacheSz_ / std::max( 1, static_cast(complete_.size() + (incomplete_ ? 1 : 0)))); } + + bool + remove(boost::filesystem::path const& path); }; } // NodeStore diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 62d7aabdf..10fd36d05 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -40,6 +40,8 @@ Shard::Shard(DatabaseShard const& db, std::uint32_t index, , nCache_(std::make_shared( "shard " + std::to_string(index_), stopwatch(), cacheSz, cacheAge)) + , dir_(db.getRootDir() / std::to_string(index_)) + , control_(dir_ / controlFileName) , j_(j) { if (index_ < db.earliestShardIndex()) @@ -47,66 +49,64 @@ Shard::Shard(DatabaseShard const& db, std::uint32_t index, } bool -Shard::open(Section config, Scheduler& scheduler, - boost::filesystem::path dir) +Shard::open(Section config, Scheduler& scheduler) { assert(!backend_); using namespace boost::filesystem; - dir_ = dir / std::to_string(index_); + auto const newShard {!is_directory(dir_) || is_empty(dir_)}; + auto fail = [&](std::string msg) + { + if (!msg.empty()) + { + JLOG(j_.error()) << msg; + } + if (newShard) + this->remove(dir_); + return false; + }; + config.set("path", dir_.string()); - auto newShard {!is_directory(dir_) || is_empty(dir_)}; try { backend_ = Manager::instance().make_Backend( config, scheduler, j_); - backend_->open(); + backend_->open(newShard); } catch (std::exception const& e) { - JLOG(j_.error()) << - "shard " << index_ << - " exception: " << e.what(); - return false; + return fail("shard " + std::to_string(index_) + + ": Exception, " + e.what()); } if (backend_->fdlimit() == 0) return true; - control_ = dir_ / controlFileName; if (newShard) { if (!saveControl()) - return false; + return fail({}); } else if (is_regular_file(control_)) { std::ifstream ifs(control_.string()); if (!ifs.is_open()) - { - JLOG(j_.error()) << - "shard " << index_ << - " unable to open control file"; - return false; - } + return fail("shard " + std::to_string(index_) + + ": Unable to open control file"); boost::archive::text_iarchive ar(ifs); ar & storedSeqs_; if (!storedSeqs_.empty()) { if (boost::icl::first(storedSeqs_) < firstSeq_ || boost::icl::last(storedSeqs_) > lastSeq_) - { - JLOG(j_.error()) << - "shard " << index_ << - " invalid control file"; - return false; - } + return fail("shard " + std::to_string(index_) + + ": Invalid control file"); if (boost::icl::length(storedSeqs_) >= maxLedgers_) { JLOG(j_.error()) << "shard " << index_ << " found control file for complete shard"; storedSeqs_.clear(); - remove(control_); + this->remove(control_); complete_ = true; } } @@ -133,7 +133,8 @@ Shard::setStored(std::shared_ptr const& l) { if (backend_->fdlimit() != 0) { - remove(control_); + if (!this->remove(control_)) + return false; updateFileSize(); } complete_ = true; @@ -178,7 +179,7 @@ Shard::contains(std::uint32_t seq) const return boost::icl::contains(storedSeqs_, seq); } -void +bool Shard::validate(Application& app) { uint256 hash; @@ -191,10 +192,10 @@ Shard::validate(Application& app) " order by LedgerSeq desc limit 1", app, false); if (!l) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "shard " << index_ << " unable to validate. No lookup data"; - return; + return false; } if (seq != lastSeq_) { @@ -206,23 +207,23 @@ Shard::validate(Application& app) } catch (std::exception const& e) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "exception: " << e.what(); - return; + return false; } if (!h) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "shard " << index_ << " No hash for last ledger seq " << lastSeq_; - return; + return false; } hash = *h; seq = lastSeq_; } } - JLOG(j_.fatal()) << + JLOG(j_.debug()) << "Validating shard " << index_ << " ledgers " << firstSeq_ << "-" << lastSeq_; @@ -244,7 +245,7 @@ Shard::validate(Application& app) true), app.config(), *app.shardFamily()); if (l->info().hash != hash || l->info().seq != seq) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "ledger seq " << seq << " hash " << hash << " cannot be a ledger"; @@ -256,7 +257,7 @@ Shard::validate(Application& app) if (!l->stateMap().fetchRoot( SHAMapHash {l->info().accountHash}, nullptr)) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "ledger seq " << seq << " missing Account State root"; break; @@ -266,7 +267,7 @@ Shard::validate(Application& app) if (!l->txMap().fetchRoot( SHAMapHash {l->info().txHash}, nullptr)) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "ledger seq " << seq << " missing TX root"; break; @@ -280,30 +281,25 @@ Shard::validate(Application& app) if (seq % 128 == 0) pCache_->sweep(); } - if (seq < firstSeq_) - { - JLOG(j_.fatal()) << - "shard " << index_ << - " is complete."; - } - else if (complete_) - { - JLOG(j_.fatal()) << - "shard " << index_ << - " is invalid, failed on seq " << seq << - " hash " << hash; - } - else - { - JLOG(j_.fatal()) << - "shard " << index_ << - " is incomplete, stopped at seq " << seq << - " hash " << hash; - } pCache_->reset(); nCache_->reset(); pCache_->setTargetAge(savedAge); + + if (seq >= firstSeq_) + { + JLOG(j_.error()) << + "shard " << index_ << + (complete_ ? " is invalid, failed" : " is incomplete, stopped") << + " at seq " << seq << + " hash " << hash; + return false; + } + + JLOG(j_.debug()) << + "shard " << index_ << + " is complete."; + return true; } bool @@ -312,7 +308,7 @@ Shard::valLedger(std::shared_ptr const& l, { if (l->info().hash.isZero() || l->info().accountHash.isZero()) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "invalid ledger"; return false; } @@ -340,7 +336,7 @@ Shard::valLedger(std::shared_ptr const& l, } catch (std::exception const& e) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "exception: " << e.what(); return false; } @@ -362,7 +358,7 @@ Shard::valLedger(std::shared_ptr const& l, } catch (std::exception const& e) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "exception: " << e.what(); return false; } @@ -385,26 +381,26 @@ Shard::valFetch(uint256 const& hash) break; case notFound: { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "NodeObject not found. hash " << hash; break; } case dataCorrupt: { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "NodeObject is corrupt. hash " << hash; break; } default: { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "unknown error. hash " << hash; } } } catch (std::exception const& e) { - JLOG(j_.fatal()) << + JLOG(j_.error()) << "exception: " << e.what(); } return nObj; @@ -436,5 +432,22 @@ Shard::saveControl() return true; } +bool +Shard::remove(boost::filesystem::path const& path) +{ + try + { + boost::filesystem::remove_all(path); + } + catch (const boost::filesystem::filesystem_error& e) + { + JLOG(j_.error()) << + "remove_all " << path.string() << + ": Exception, " << e.code().message(); + return false; + } + return true; +} + } // NodeStore } // ripple diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index cf89f0e0b..9e7d81835 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -51,8 +51,7 @@ public: std::chrono::seconds cacheAge, beast::Journal& j); bool - open(Section config, Scheduler& scheduler, - boost::filesystem::path dir); + open(Section config, Scheduler& scheduler); bool setStored(std::shared_ptr const& l); @@ -63,7 +62,7 @@ public: bool contains(std::uint32_t seq) const; - void + bool validate(Application& app); std::uint32_t @@ -128,22 +127,22 @@ private: // Database negative cache std::shared_ptr nCache_; + // Path to database files + boost::filesystem::path const dir_; + + // Path to control file + boost::filesystem::path const control_; + std::uint64_t fileSize_ {0}; std::shared_ptr backend_; beast::Journal j_; - // Path to database files - boost::filesystem::path dir_; - // True if shard has its entire ledger range stored bool complete_ {false}; // Sequences of ledgers stored with an incomplete shard RangeSet storedSeqs_; - // Path to control file - boost::filesystem::path control_; - // Used as an optimization for visitDifferences std::shared_ptr lastStored_; @@ -165,6 +164,10 @@ private: // Save the control file for an incomplete shard bool saveControl(); + + // Remove directory or file + bool + remove(boost::filesystem::path const& path); }; } // NodeStore