Add Shard Family

This commit is contained in:
Miguel Portilla
2020-06-02 16:52:13 -04:00
committed by manojsdoshi
parent 91e857874f
commit 86e8f2e232
31 changed files with 742 additions and 339 deletions

View File

@@ -29,10 +29,10 @@
#include <ripple/app/main/Application.h>
#include <ripple/app/main/BasicApp.h>
#include <ripple/app/main/DBInit.h>
#include <ripple/app/main/GRPCServer.h>
#include <ripple/app/main/LoadManager.h>
#include <ripple/app/main/NodeIdentity.h>
#include <ripple/app/main/NodeStoreScheduler.h>
#include <ripple/app/main/Tuning.h>
#include <ripple/app/misc/AmendmentTable.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/LoadFeeTrack.h>
@@ -64,8 +64,9 @@
#include <ripple/resource/Fees.h>
#include <ripple/rpc/ShardArchiveHandler.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/shamap/NodeFamily.h>
#include <ripple/shamap/ShardFamily.h>
#include <ripple/app/main/GRPCServer.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/system/error_code.hpp>
@@ -77,174 +78,6 @@
namespace ripple {
//------------------------------------------------------------------------------
namespace detail {
class AppFamily : public Family
{
private:
Application& app_;
TreeNodeCache treecache_;
FullBelowCache fullbelow_;
NodeStore::Database& db_;
bool const shardBacked_;
beast::Journal const j_;
// missing node handler
LedgerIndex maxSeq = 0;
std::mutex maxSeqLock;
void
acquire(uint256 const& hash, std::uint32_t seq)
{
if (hash.isNonZero())
{
auto j = app_.journal("Ledger");
JLOG(j.error()) << "Missing node in " << to_string(hash);
app_.getInboundLedgers().acquire(
hash,
seq,
shardBacked_ ? InboundLedger::Reason::SHARD
: InboundLedger::Reason::GENERIC);
}
}
public:
AppFamily(AppFamily const&) = delete;
AppFamily&
operator=(AppFamily const&) = delete;
AppFamily(
Application& app,
NodeStore::Database& db,
CollectorManager& collectorManager)
: app_(app)
, treecache_(
"TreeNodeCache",
65536,
std::chrono::minutes{1},
stopwatch(),
app.journal("TaggedCache"))
, fullbelow_(
"full_below",
stopwatch(),
collectorManager.collector(),
fullBelowTargetSize,
fullBelowExpiration)
, db_(db)
, shardBacked_(dynamic_cast<NodeStore::DatabaseShard*>(&db) != nullptr)
, j_(app.journal("SHAMap"))
{
}
beast::Journal const&
journal() override
{
return j_;
}
FullBelowCache&
fullbelow() override
{
return fullbelow_;
}
FullBelowCache const&
fullbelow() const override
{
return fullbelow_;
}
TreeNodeCache&
treecache() override
{
return treecache_;
}
TreeNodeCache const&
treecache() const override
{
return treecache_;
}
NodeStore::Database&
db() override
{
return db_;
}
NodeStore::Database const&
db() const override
{
return db_;
}
bool
isShardBacked() const override
{
return shardBacked_;
}
void
missing_node(std::uint32_t seq) override
{
auto j = app_.journal("Ledger");
JLOG(j.error()) << "Missing node in " << seq;
// prevent recursive invocation
std::unique_lock<std::mutex> lock(maxSeqLock);
if (maxSeq == 0)
{
maxSeq = seq;
do
{
// Try to acquire the most recent missing ledger
seq = maxSeq;
lock.unlock();
// This can invoke the missing node handler
acquire(app_.getLedgerMaster().getHashBySeq(seq), seq);
lock.lock();
} while (maxSeq != seq);
}
else if (maxSeq < seq)
{
// We found a more recent ledger with a
// missing node
maxSeq = seq;
}
}
void
missing_node(uint256 const& hash, std::uint32_t seq) override
{
acquire(hash, seq);
}
void
reset() override
{
{
std::lock_guard lock(maxSeqLock);
maxSeq = 0;
}
fullbelow_.reset();
treecache_.reset();
}
};
} // namespace detail
//------------------------------------------------------------------------------
// VFALCO TODO Move the function definitions into the class declaration
class ApplicationImp : public Application, public RootStoppable, public BasicApp
{
@@ -343,9 +176,9 @@ public:
// These are Stoppable-related
std::unique_ptr<JobQueue> m_jobQueue;
std::unique_ptr<NodeStore::Database> m_nodeStore;
detail::AppFamily family_;
NodeFamily nodeFamily_;
std::unique_ptr<NodeStore::DatabaseShard> shardStore_;
std::unique_ptr<detail::AppFamily> shardFamily_;
std::unique_ptr<ShardFamily> shardFamily_;
std::unique_ptr<RPC::ShardArchiveHandler> shardArchiveHandler_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
@@ -476,7 +309,7 @@ public:
, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, family_(*this, *m_nodeStore, *m_collectorManager)
, nodeFamily_(*this, *m_collectorManager)
// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
@@ -671,13 +504,15 @@ public:
}
Family&
family() override
getNodeFamily() override
{
return family_;
return nodeFamily_;
}
// The shard store is an optional feature. If the sever is configured for
// shards, this function will return a valid pointer, otherwise a nullptr.
Family*
shardFamily() override
getShardFamily() override
{
return shardFamily_.get();
}
@@ -779,6 +614,8 @@ public:
return *m_nodeStore;
}
// The shard store is an optional feature. If the sever is configured for
// shards, this function will return a valid pointer, otherwise a nullptr.
NodeStore::DatabaseShard*
getShardStore() override
{
@@ -1127,11 +964,6 @@ public:
config_->getValueFor(SizedItem::ledgerSize),
seconds{config_->getValueFor(SizedItem::ledgerAge)});
family().treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
family().treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
return true;
}
@@ -1372,9 +1204,9 @@ public:
// VFALCO TODO fix the dependency inversion using an observer,
// have listeners register for "onSweep ()" notification.
family().fullbelow().sweep();
nodeFamily_.sweep();
if (shardFamily_)
shardFamily_->fullbelow().sweep();
shardFamily_->sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
if (shardStore_)
@@ -1384,9 +1216,6 @@ public:
getValidations().expire();
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
if (shardFamily_)
shardFamily_->treecache().sweep();
cachedSLEs_.expire();
// Set timer to do another sweep later.
@@ -1491,14 +1320,8 @@ ApplicationImp::setup()
if (shardStore_)
{
shardFamily_ = std::make_unique<detail::AppFamily>(
*this, *shardStore_, *m_collectorManager);
using namespace std::chrono;
shardFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
shardFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
shardFamily_ =
std::make_unique<ShardFamily>(*this, *m_collectorManager);
if (!shardStore_->init())
return false;
@@ -1906,7 +1729,7 @@ ApplicationImp::startGenesisLedger()
: std::vector<uint256>{};
std::shared_ptr<Ledger> const genesis = std::make_shared<Ledger>(
create_genesis, *config_, initialAmendments, family());
create_genesis, *config_, initialAmendments, nodeFamily_);
m_ledgerMaster->storeLedger(genesis);
auto const next =
@@ -2037,7 +1860,7 @@ ApplicationImp::loadLedgerFromFile(std::string const& name)
}
auto loadLedger =
std::make_shared<Ledger>(seq, closeTime, *config_, family());
std::make_shared<Ledger>(seq, closeTime, *config_, nodeFamily_);
loadLedger->setTotalDrops(totalDrops);
for (Json::UInt index = 0; index < ledger.get().size(); ++index)