From 22c9de487a6171c4ed8a48850b328d87c8a3c6bb Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Fri, 9 Aug 2019 11:57:07 -0400 Subject: [PATCH] Add shard thread safety --- .../nodestore/impl/DatabaseShardImp.cpp | 207 ++++---- src/ripple/nodestore/impl/DatabaseShardImp.h | 5 +- src/ripple/nodestore/impl/Shard.cpp | 464 ++++++++++-------- src/ripple/nodestore/impl/Shard.h | 133 ++--- 4 files changed, 424 insertions(+), 385 deletions(-) diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index b0e70a8e9d..c72ac264fb 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -75,9 +75,9 @@ DatabaseShardImp::init() using namespace boost::beast::detail; std::lock_guard lock(m_); - auto fail = [this](std::string const& msg) + auto fail = [j = j_](std::string const& msg) { - JLOG(j_.error()) << + JLOG(j.error()) << "[" << ConfigSection::shardDatabase() << "] " << msg; return false; }; @@ -293,10 +293,9 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) std::lock_guard lock(m_); assert(init_); - auto fail = [this, shardIndex](std::string const& msg) + auto fail = [j = j_, shardIndex](std::string const& msg) { - JLOG(j_.error()) << - "shard " << shardIndex << " " << msg; + JLOG(j.error()) << "shard " << shardIndex << " " << msg; return false; }; @@ -421,78 +420,76 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, return true; }; + std::unique_lock lock(m_); + assert(init_); + + // Check shard is prepared + auto it {preShards_.find(shardIndex)}; + if(it == preShards_.end()) { - std::unique_lock lock(m_); - assert(init_); - - // Check shard is prepared - auto it {preShards_.find(shardIndex)}; - if(it == preShards_.end()) - { - JLOG(j_.error()) << "shard " << shardIndex << " is an invalid index"; - return false; - } - - // Move source directory to the shard database directory - auto const dstDir {dir_ / std::to_string(shardIndex)}; - if (!move(srcDir, dstDir)) - return false; - - // Create the new shard - auto shard {std::make_unique(app_, *this, shardIndex, j_)}; - auto fail = [&](std::string const& msg) - { - if (!msg.empty()) - { - JLOG(j_.error()) << "shard " << shardIndex << " " << msg; - } - shard.reset(); - move(dstDir, srcDir); - return false; - }; - - if (!shard->open(scheduler_, *ctx_)) - return fail({}); - if (!shard->complete()) - return fail("is incomplete"); - - try - { - // Verify database integrity - shard->getBackend()->verify(); - } - catch (std::exception const& e) - { - return fail(std::string("exception ") + - e.what() + " in function " + __func__); - } - - // Validate shard ledgers - if (validate) - { - // Shard validation requires releasing the lock - // so the database can fetch data from it - it->second = shard.get(); - lock.unlock(); - auto const valid {shard->validate()}; - lock.lock(); - if (!valid) - { - it = preShards_.find(shardIndex); - if(it != preShards_.end()) - it->second = nullptr; - return fail("failed validation"); - } - } - - // Add the shard - complete_.emplace(shardIndex, std::move(shard)); - preShards_.erase(shardIndex); + JLOG(j_.error()) << "shard " << shardIndex << " is an invalid index"; + return false; } - std::lock_guard lock(m_); - setFileStats(lock); - updateStatus(lock); + // Move source directory to the shard database directory + auto const dstDir {dir_ / std::to_string(shardIndex)}; + if (!move(srcDir, dstDir)) + return false; + + // Create the new shard + auto shard {std::make_unique(app_, *this, shardIndex, j_)}; + auto fail = [&](std::string const& msg) + { + if (!msg.empty()) + { + JLOG(j_.error()) << "shard " << shardIndex << " " << msg; + } + shard.reset(); + move(dstDir, srcDir); + return false; + }; + + if (!shard->open(scheduler_, *ctx_)) + return fail({}); + if (!shard->complete()) + return fail("is incomplete"); + + try + { + // Verify database integrity + shard->getBackend()->verify(); + } + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + + // Validate shard ledgers + if (validate) + { + // Shard validation requires releasing the lock + // so the database can fetch data from it + it->second = shard.get(); + lock.unlock(); + auto const valid {shard->validate()}; + lock.lock(); + if (!valid) + { + it = preShards_.find(shardIndex); + if(it != preShards_.end()) + it->second = nullptr; + return fail("failed validation"); + } + } + + // Add the shard + complete_.emplace(shardIndex, std::move(shard)); + preShards_.erase(shardIndex); + + std::lock_guard lockg(*lock.release(), std::adopt_lock); + setFileStats(lockg); + updateStatus(lockg); return true; } @@ -552,9 +549,9 @@ void DatabaseShardImp::setStored(std::shared_ptr const& ledger) { auto const shardIndex {seqToShardIndex(ledger->info().seq)}; - auto fail = [this, shardIndex](std::string const& msg) + auto fail = [j = j_, shardIndex](std::string const& msg) { - JLOG(j_.error()) << "shard " << shardIndex << " " << msg; + JLOG(j.error()) << "shard " << shardIndex << " " << msg; }; if (ledger->info().hash.isZero()) @@ -622,36 +619,28 @@ DatabaseShardImp::getCompleteShards() void DatabaseShardImp::validate() { + std::vector> completeShards; { std::lock_guard lock(m_); assert(init_); - if (complete_.empty() && !incomplete_) + if (complete_.empty()) { JLOG(j_.error()) << "no shards found to validate"; return; } - std::string s {"Found shards "}; - for (auto const& e : complete_) - s += std::to_string(e.second->index()) + ","; - if (incomplete_) - s += std::to_string(incomplete_->index()); - else - s.pop_back(); - JLOG(j_.debug()) << s; + JLOG(j_.debug()) << "Validating shards " << status_; + + completeShards.reserve(complete_.size()); + for (auto const& shard : complete_) + completeShards.push_back(shard.second); } - for (auto& e : complete_) - { - app_.shardFamily()->reset(); - e.second->validate(); - } - if (incomplete_) - { - app_.shardFamily()->reset(); - incomplete_->validate(); - } + // Verify each complete stored shard + for (auto const& shard : completeShards) + shard->validate(); + app_.shardFamily()->reset(); } @@ -1116,29 +1105,15 @@ DatabaseShardImp::setFileStats(std::lock_guard&) void DatabaseShardImp::updateStatus(std::lock_guard&) { - status_.clear(); - status_.reserve(complete_.size() * 8); - for (auto it = complete_.begin(); it != complete_.end(); ++it) + if (!complete_.empty()) { - if (it == complete_.begin()) - status_ = std::to_string(it->first); - else - { - if (it->first - std::prev(it)->first > 1) - { - if (status_.back() == '-') - status_ += std::to_string(std::prev(it)->first); - status_ += ',' + std::to_string(it->first); - } - else - { - if (status_.back() != '-') - status_ += '-'; - if (std::next(it) == complete_.end()) - status_ += std::to_string(it->first); - } - } + RangeSet rs; + for (auto const& e : complete_) + rs.insert(e.second->index()); + status_ = to_string(rs); } + else + status_.clear(); } std::pair, std::shared_ptr> diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 40fb1d2a79..9d90e4d5ba 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -171,7 +171,7 @@ private: std::unique_ptr ctx_; // Complete shards - std::map> complete_; + std::map> complete_; // A shard being acquired from the peer network std::unique_ptr incomplete_; @@ -226,7 +226,8 @@ private: // Finds a random shard index that is not stored // Lock must be held boost::optional - findShardIndexToAdd(std::uint32_t validLedgerSeq, + findShardIndexToAdd( + std::uint32_t validLedgerSeq, std::lock_guard&); // Set storage and file descriptor usage stats diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 759ceceebd..6513beffe3 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -26,6 +26,8 @@ #include #include +#include +#include #include #include @@ -55,9 +57,8 @@ Shard::Shard( bool Shard::open(Scheduler& scheduler, nudb::context& ctx) { + std::lock_guard lock(mutex_); assert(!backend_); - using namespace boost::filesystem; - using namespace boost::beast::detail; Config const& config {app_.config()}; Section section {config.section(ConfigSection::shardDatabase())}; @@ -108,7 +109,7 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx) if (!preexist) { // New shard, create a control file - if (!saveControl()) + if (!saveControl(lock)) return fail({}); } else if (is_regular_file(control_)) @@ -133,17 +134,19 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx) JLOG(j_.warn()) << "shard " << index_ << " has a control file for complete shard"; - setComplete(); - remove_all(control_); + setComplete(lock); } } } else - setComplete(); + setComplete(lock); - setCache(); - if (!initSQLite() || !setFileStats()) - return fail({}); + if (!complete_) + { + setCache(lock); + if (!initSQLite(lock) ||!setFileStats(lock)) + return fail({}); + } } catch (std::exception const& e) { @@ -157,7 +160,9 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx) bool Shard::setStored(std::shared_ptr const& ledger) { - assert(backend_&& !complete_); + std::lock_guard lock(mutex_); + assert(backend_ && !complete_); + if (boost::icl::contains(storedSeqs_, ledger->info().seq)) { JLOG(j_.debug()) << @@ -166,27 +171,16 @@ Shard::setStored(std::shared_ptr const& ledger) return false; } - if (!setSQLiteStored(ledger)) + if (!setSQLiteStored(ledger, lock)) return false; // Check if the shard is complete if (boost::icl::length(storedSeqs_) >= maxLedgers_ - 1) - { - setComplete(); - if (backend_->backed()) - { - if (!removeAll(control_, j_)) - return false; - - setCache(); - if (!initSQLite() || !setFileStats()) - return false; - } - } + setComplete(lock); else { storedSeqs_.insert(ledger->info().seq); - if (backend_->backed() && !saveControl()) + if (backend_->backed() && !saveControl(lock)) return false; } @@ -202,7 +196,9 @@ Shard::setStored(std::shared_ptr const& ledger) boost::optional Shard::prepare() { - assert(backend_); + std::lock_guard lock(mutex_); + assert(backend_ && !complete_); + if (storedSeqs_.empty()) return lastSeq_; return prevMissing(storedSeqs_, 1 + lastSeq_, firstSeq_); @@ -211,33 +207,102 @@ Shard::prepare() bool Shard::contains(std::uint32_t seq) const { - assert(backend_); if (seq < firstSeq_ || seq > lastSeq_) return false; - if (complete_) - return true; - return boost::icl::contains(storedSeqs_, seq); + + std::lock_guard lock(mutex_); + assert(backend_); + + return complete_ || boost::icl::contains(storedSeqs_, seq); } void Shard::sweep() { - assert(backend_); + std::lock_guard lock(mutex_); + assert(pCache_ && nCache_); + pCache_->sweep(); nCache_->sweep(); } +std::shared_ptr const& +Shard::getBackend() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return backend_; +} + bool -Shard::validate() +Shard::complete() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return complete_; +} + +std::shared_ptr +Shard::pCache() const +{ + std::lock_guard lock(mutex_); + assert(pCache_); + + return pCache_; +} + +std::shared_ptr +Shard::nCache() const +{ + std::lock_guard lock(mutex_); + assert(nCache_); + + return nCache_; +} + +std::uint64_t +Shard::fileSize() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return fileSz_; +} + +std::uint32_t +Shard::fdRequired() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return fdRequired_; +} + +std::shared_ptr +Shard::lastStored() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return lastStored_; +} + +bool +Shard::validate() const { uint256 hash; - std::uint32_t seq; - std::shared_ptr ledger; - auto fail = [this](std::string const& msg) + std::uint32_t seq {0}; + auto fail = [j = j_, index = index_, &hash, &seq](std::string const& msg) { - JLOG(j_.error()) << "shard " << index_ << " " << msg; + JLOG(j.error()) << + "shard " << index << ". " << msg << + (hash.isZero() ? "" : ". Ledger hash " + to_string(hash)) << + (seq == 0 ? "" : ". Ledger sequence " + std::to_string(seq)); return false; }; + std::shared_ptr ledger; // Find the hash of the last ledger in this shard { @@ -245,13 +310,13 @@ Shard::validate() "WHERE LedgerSeq >= " + std::to_string(lastSeq_) + " order by LedgerSeq desc limit 1", app_, false); if (!ledger) - return fail("is unable to validate due to lacking lookup data"); + return fail("Unable to validate due to lacking lookup data"); if (seq != lastSeq_) { - ledger->setImmutable(app_.config()); boost::optional h; + ledger->setImmutable(app_.config()); try { h = hashOfSeq(*ledger, lastSeq_, j_); @@ -263,215 +328,92 @@ Shard::validate() } if (!h) - { - return fail("is missing hash for last ledger sequence " + - std::to_string(lastSeq_)); - } + return fail("Missing hash for last ledger sequence"); hash = *h; seq = lastSeq_; } } - JLOG(j_.debug()) << - "shard " << index_ << - " has ledger sequences " << firstSeq_ << "-" << lastSeq_; - - // Use a short age to keep memory consumption low - auto const savedAge {pCache_->getTargetAge()}; - using namespace std::chrono_literals; - pCache_->setTargetAge(1s); - // Validate every ledger stored in this shard std::shared_ptr next; while (seq >= firstSeq_) { auto nObj = valFetch(hash); if (!nObj) - break; + return fail("Invalid ledger"); + ledger = std::make_shared( - InboundLedger::deserializeHeader(makeSlice(nObj->getData()), - true), app_.config(), *app_.shardFamily()); + InboundLedger::deserializeHeader(makeSlice(nObj->getData()), true), + app_.config(), + *app_.shardFamily()); if (ledger->info().seq != seq) - { - fail("encountered invalid ledger sequence " + std::to_string(seq)); - break; - } + return fail("Invalid ledger header sequence"); if (ledger->info().hash != hash) - { - fail("encountered invalid ledger hash " + to_string(hash) + - " on sequence " + std::to_string(seq)); - break; - } + return fail("Invalid ledger header hash"); + ledger->stateMap().setLedgerSeq(seq); ledger->txMap().setLedgerSeq(seq); ledger->setImmutable(app_.config()); if (!ledger->stateMap().fetchRoot( SHAMapHash {ledger->info().accountHash}, nullptr)) { - fail("is missing root STATE node on sequence " + - std::to_string(seq)); - break; + return fail("Missing root STATE node"); } - if (ledger->info().txHash.isNonZero()) + if (ledger->info().txHash.isNonZero() && + !ledger->txMap().fetchRoot( + SHAMapHash {ledger->info().txHash}, + nullptr)) { - if (!ledger->txMap().fetchRoot( - SHAMapHash {ledger->info().txHash}, nullptr)) - { - fail("is missing root TXN node on sequence " + - std::to_string(seq)); - break; - } + return fail("Missing root TXN node"); } + if (!valLedger(ledger, next)) - break; + return false; + hash = ledger->info().parentHash; --seq; next = ledger; - if (seq % 128 == 0) - pCache_->sweep(); } - pCache_->reset(); - nCache_->reset(); - pCache_->setTargetAge(savedAge); - - if (seq >= firstSeq_) { - return fail(std::string(" is ") + - (complete_ ? "invalid, failed" : "incomplete, stopped") + - " on hash " + to_string(hash) + " on sequence " + - std::to_string(seq)); + std::lock_guard lock(mutex_); + pCache_->reset(); + nCache_->reset(); } - JLOG(j_.debug()) << - "shard " << index_ << " is valid and complete"; + JLOG(j_.debug()) << "shard " << index_ << " is valid"; return true; } bool -Shard::valLedger(std::shared_ptr const& ledger, - std::shared_ptr const& next) +Shard::setComplete(std::lock_guard const& lock) { - auto fail = [this](std::string const& msg) - { - JLOG(j_.error()) << "shard " << index_ << " " << msg; - return false; - }; - - if (ledger->info().hash.isZero()) - { - return fail("encountered a zero ledger hash on sequence " + - std::to_string(ledger->info().seq)); - } - if (ledger->info().accountHash.isZero()) - { - return fail("encountered a zero account hash on sequence " + - std::to_string(ledger->info().seq)); - } - - bool error {false}; - auto f = [this, &error](SHAMapAbstractNode& node) - { - if (!valFetch(node.getNodeHash().as_uint256())) - error = true; - return !error; - }; - - // Validate the state map - if (ledger->stateMap().getHash().isNonZero()) - { - if (!ledger->stateMap().isValid()) - { - return fail("has an invalid state map on sequence " + - std::to_string(ledger->info().seq)); - } - - try - { - if (next && next->info().parentHash == ledger->info().hash) - ledger->stateMap().visitDifferences(&next->stateMap(), f); - else - ledger->stateMap().visitNodes(f); - } - catch (std::exception const& e) - { - return fail(std::string("exception ") + - e.what() + " in function " + __func__); - } - if (error) - return false; - } - // Validate the transaction map - if (ledger->info().txHash.isNonZero()) - { - if (!ledger->txMap().isValid()) - { - return fail("has an invalid transaction map on sequence " + - std::to_string(ledger->info().seq)); - } - - try - { - ledger->txMap().visitNodes(f); - } - catch (std::exception const& e) - { - return fail(std::string("exception ") + - e.what() + " in function " + __func__); - } - if (error) - return false; - } - return true; -}; - -std::shared_ptr -Shard::valFetch(uint256 const& hash) -{ - assert(backend_); - std::shared_ptr nObj; - auto fail = [this](std::string const& msg) - { - JLOG(j_.error()) << "shard " << index_ << " " << msg; - }; - + // Remove the control file if one exists try { - switch (backend_->fetch(hash.begin(), &nObj)) - { - case ok: - break; - case notFound: - { - fail("is missing node object on hash " + to_string(hash)); - break; - } - case dataCorrupt: - { - fail("has a corrupt node object on hash " + to_string(hash)); - break; - } - default: - fail("encountered unknown error on hash " + to_string(hash)); - } + using namespace boost::filesystem; + if (is_regular_file(control_)) + remove_all(control_); + } catch (std::exception const& e) { - fail(std::string("exception ") + - e.what() + " in function " + __func__); + JLOG(j_.error()) << + "shard " << index_ << + " exception " << e.what() << + " in function " << __func__; + return false; } - return nObj; -} -void -Shard::setComplete() -{ storedSeqs_.clear(); complete_ = true; + + setCache(lock); + return initSQLite(lock) && setFileStats(lock); } void -Shard::setCache() +Shard::setCache(std::lock_guard const&) { // complete shards use the smallest cache and // fastest expiration to reduce memory consumption. @@ -503,7 +445,7 @@ Shard::setCache() } bool -Shard::initSQLite() +Shard::initSQLite(std::lock_guard const&) { Config const& config {app_.config()}; DatabaseCon::Setup setup; @@ -515,9 +457,8 @@ Shard::initSQLite() { if (complete_) { - using namespace boost::filesystem; - // Remove WAL files if they exist + using namespace boost::filesystem; for (auto const& d : directory_iterator(dir_)) { if (is_regular_file(d) && @@ -601,7 +542,9 @@ Shard::initSQLite() } bool -Shard::setSQLiteStored(std::shared_ptr const& ledger) +Shard::setSQLiteStored( + std::shared_ptr const& ledger, + std::lock_guard const&) { auto const seq {ledger->info().seq}; assert(backend_ && !complete_); @@ -733,7 +676,7 @@ Shard::setSQLiteStored(std::shared_ptr const& ledger) } bool -Shard::setFileStats() +Shard::setFileStats(std::lock_guard const&) { fileSz_ = 0; fdRequired_ = 0; @@ -764,7 +707,7 @@ Shard::setFileStats() } bool -Shard::saveControl() +Shard::saveControl(std::lock_guard const&) { std::ofstream ofs {control_.string(), std::ios::trunc}; if (!ofs.is_open()) @@ -779,5 +722,120 @@ Shard::saveControl() return true; } +bool +Shard::valLedger( + std::shared_ptr const& ledger, + std::shared_ptr const& next) const +{ + auto fail = [j = j_, index = index_, &ledger](std::string const& msg) + { + JLOG(j.error()) << + "shard " << index << ". " << msg << + (ledger->info().hash.isZero() ? + "" : ". Ledger header hash " + + to_string(ledger->info().hash)) << + (ledger->info().seq == 0 ? + "" : ". Ledger header sequence " + + std::to_string(ledger->info().seq)); + return false; + }; + + if (ledger->info().hash.isZero()) + return fail("Invalid ledger header hash"); + if (ledger->info().accountHash.isZero()) + return fail("Invalid ledger header account hash"); + + bool error {false}; + auto visit = [this, &error](SHAMapAbstractNode& node) + { + if (!valFetch(node.getNodeHash().as_uint256())) + error = true; + return !error; + }; + + // Validate the state map + if (ledger->stateMap().getHash().isNonZero()) + { + if (!ledger->stateMap().isValid()) + return fail("Invalid state map"); + + try + { + if (next && next->info().parentHash == ledger->info().hash) + ledger->stateMap().visitDifferences(&next->stateMap(), visit); + else + ledger->stateMap().visitNodes(visit); + } + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + if (error) + return fail("Invalid state map"); + } + + // Validate the transaction map + if (ledger->info().txHash.isNonZero()) + { + if (!ledger->txMap().isValid()) + return fail("Invalid transaction map"); + + try + { + ledger->txMap().visitNodes(visit); + } + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + if (error) + return fail("Invalid transaction map"); + } + return true; +}; + +std::shared_ptr +Shard::valFetch(uint256 const& hash) const +{ + std::shared_ptr nObj; + auto fail = [j = j_, index = index_, &hash, &nObj](std::string const& msg) + { + JLOG(j.error()) << + "shard " << index << ". " << msg << + ". Node object hash " << to_string(hash); + nObj.reset(); + return nObj; + }; + Status status; + + try + { + { + std::lock_guard lock(mutex_); + status = backend_->fetch(hash.begin(), &nObj); + } + + switch (status) + { + case ok: + break; + case notFound: + return fail("Missing node object"); + case dataCorrupt: + return fail("Corrupt node object"); + default: + return fail("Unknown error"); + } + } + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + return nObj; +} + } // NodeStore } // ripple diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 31fe047d4b..4a7e0228a2 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -27,11 +27,8 @@ #include #include -#include #include -#include -#include -#include +#include namespace ripple { namespace NodeStore { @@ -63,6 +60,8 @@ class DatabaseShard; Shard `i` stores ledgers starting with sequence: `1 + (i * ledgersPerShard)` and ending with sequence: `(i + 1) * ledgersPerShard`. Once a shard has all its ledgers, it is never written to again. + + Public functions can be called concurrently from any thread. */ class Shard { @@ -88,44 +87,41 @@ public: void sweep(); - bool - validate(); - std::uint32_t - index() const {return index_;} - - bool - complete() const {assert(backend_); return complete_;} - - std::shared_ptr& - pCache() {assert(backend_); return pCache_;} - - std::shared_ptr& - nCache() {assert(backend_); return nCache_;} - - std::uint64_t - fileSize() const {assert(backend_); return fileSz_;} - - std::uint32_t - fdRequired() const {assert(backend_); return fdRequired_;} - - std::shared_ptr const& - getBackend() const {assert(backend_); return backend_;} - - std::shared_ptr - lastStored() {assert(backend_); return lastStored_;} - -private: - friend class boost::serialization::access; - template - void serialize(Archive & ar, const unsigned int version) + index() const { - ar & storedSeqs_; + return index_; } + std::shared_ptr const& + getBackend() const; + + bool + complete() const; + + std::shared_ptr + pCache() const; + + std::shared_ptr + nCache() const; + + std::uint64_t + fileSize() const; + + std::uint32_t + fdRequired() const; + + std::shared_ptr + lastStored() const; + + bool + validate() const; + +private: static constexpr auto controlFileName = "control.txt"; Application& app_; + mutable std::mutex mutex_; // Shard Index std::uint32_t const index_; @@ -179,43 +175,52 @@ private: // Used as an optimization for visitDifferences std::shared_ptr lastStored_; + // Marks shard immutable + // Lock over mutex_ required + bool + setComplete(std::lock_guard const& lock); + + // Set the backend cache + // Lock over mutex_ required + void + setCache(std::lock_guard const& lock); + + // Open/Create SQLite databases + // Lock over mutex_ required + bool + initSQLite(std::lock_guard const& lock); + + // Write SQLite entries for a ledger stored in this shard's backend + // Lock over mutex_ required + bool + setSQLiteStored( + std::shared_ptr const& ledger, + std::lock_guard const& lock); + + // Set storage and file descriptor usage stats + // Lock over mutex_ required + bool + setFileStats(std::lock_guard const& lock); + + // Save the control file for an incomplete shard + // Lock over mutex_ required + bool + saveControl(std::lock_guard const& lock); + // Validate this ledger by walking its SHAMaps // and verifying each merkle tree bool - valLedger(std::shared_ptr const& ledger, - std::shared_ptr const& next); + valLedger( + std::shared_ptr const& ledger, + std::shared_ptr const& next) const; // Fetches from the backend and will log // errors based on status codes std::shared_ptr - valFetch(uint256 const& hash); - - // Marks shard immutable, having stored all of its ledgers - void - setComplete(); - - // Set the backend cache - void - setCache(); - - // Open/Create SQLite databases - bool - initSQLite(); - - // Create SQLite entries for a ledger stored in this shard's backend - bool - setSQLiteStored(std::shared_ptr const& ledger); - - // Set storage and file descriptor usage stats - bool - setFileStats(); - - // Save the control file for an incomplete shard - bool - saveControl(); + valFetch(uint256 const& hash) const; }; -} // NodeStore -} // ripple +} // namespace NodeStore +} // namespace ripple #endif