diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj
index 2e8134720..5e0e2db0d 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj
+++ b/Builds/VisualStudio2015/RippleD.vcxproj
@@ -2275,6 +2275,8 @@
+
+
@@ -2289,7 +2291,19 @@
-
+
+ True
+ True
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+
+
+ True
+ True
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+
+
True
@@ -2299,6 +2313,14 @@
+
+ True
+ True
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+
+
+
True
True
@@ -2335,6 +2357,14 @@
..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+
+ True
+ True
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+ ..\..\src\rocksdb2\include;..\..\src\snappy\config;..\..\src\snappy\snappy;%(AdditionalIncludeDirectories)
+
+
+
diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters
index 89437dd78..90f32e1ee 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters
@@ -2931,6 +2931,9 @@
ripple\nodestore
+
+ ripple\nodestore
+
ripple\nodestore
@@ -2946,7 +2949,13 @@
ripple\nodestore\impl
-
+
+ ripple\nodestore\impl
+
+
+ ripple\nodestore\impl
+
+
ripple\nodestore\impl
@@ -2955,6 +2964,12 @@
ripple\nodestore\impl
+
+ ripple\nodestore\impl
+
+
+ ripple\nodestore\impl
+
ripple\nodestore\impl
@@ -2979,6 +2994,12 @@
ripple\nodestore\impl
+
+ ripple\nodestore\impl
+
+
+ ripple\nodestore\impl
+
ripple\nodestore\impl
diff --git a/doc/rippled-example.cfg b/doc/rippled-example.cfg
index d0ab9dc62..52e4d3102 100644
--- a/doc/rippled-example.cfg
+++ b/doc/rippled-example.cfg
@@ -834,6 +834,33 @@
# [import_db] Settings for performing a one-time import (optional)
# [database_path] Path to the book-keeping databases.
#
+# [shard_db] Settings for the Shard Database (optional)
+#
+# Format (without spaces):
+# One or more lines of case-insensitive key / value pairs:
+# '='
+# ...
+#
+# Example:
+# type=nudb
+# path=db/nudb
+#
+# The "type" field must be present and controls the choice of backend:
+#
+# type = NuDB
+#
+# type = RocksDB
+#
+# The RocksDB backend also provides these optional parameters:
+#
+# compression 0 for none, 1 for Snappy compression
+#
+# Required keys:
+# path Location to store the database (all types)
+#
+# max_size_gb Maximum disk space the database will utilize (in gigabytes)
+#
+#
# There are 4 bookkeeping SQLite database that the server creates and
# maintains. If you omit this configuration setting, it will default to
# creating a directory called "db" located in the same place as your
diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp
index 6092774a9..925b701a4 100644
--- a/src/ripple/app/consensus/RCLConsensus.cpp
+++ b/src/ripple/app/consensus/RCLConsensus.cpp
@@ -36,6 +36,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -106,7 +107,7 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& ledger)
app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [app, hash](Job&) {
app->getInboundLedgers().acquire(
- hash, 0, InboundLedger::fcCONSENSUS);
+ hash, 0, InboundLedger::Reason::CONSENSUS);
});
}
return boost::none;
@@ -625,9 +626,16 @@ RCLConsensus::Adaptor::notify(
}
s.set_firstseq(uMin);
s.set_lastseq(uMax);
- app_.overlay().foreach (
- send_always(std::make_shared(s, protocol::mtSTATUS_CHANGE)));
- JLOG(j_.trace()) << "send status change to peer";
+ if (auto shardStore = app_.getShardStore())
+ {
+ auto shards = shardStore->getCompleteShards();
+ if (! shards.empty())
+ s.set_shardseqs(shards);
+ }
+ app_.overlay ().foreach (send_always (
+ std::make_shared (
+ s, protocol::mtSTATUS_CHANGE)));
+ JLOG (j_.trace()) << "send status change to peer";
}
/** Apply a set of transactions to a ledger.
diff --git a/src/ripple/app/ledger/AccountStateSF.cpp b/src/ripple/app/ledger/AccountStateSF.cpp
index 33e3fc044..1d4cef3c6 100644
--- a/src/ripple/app/ledger/AccountStateSF.cpp
+++ b/src/ripple/app/ledger/AccountStateSF.cpp
@@ -22,19 +22,13 @@
namespace ripple {
-AccountStateSF::AccountStateSF(Family& f, AbstractFetchPackContainer& fp)
- : f_(f)
- , fp_(fp)
+void
+AccountStateSF::gotNode(bool, SHAMapHash const& nodeHash,
+ std::uint32_t ledgerSeq, Blob&& nodeData,
+ SHAMapTreeNode::TNType) const
{
-}
-
-void AccountStateSF::gotNode (bool fromFilter,
- SHAMapHash const& nodeHash,
- Blob&& nodeData,
- SHAMapTreeNode::TNType) const
-{
- f_.db().store(hotACCOUNT_NODE, std::move(nodeData),
- nodeHash.as_uint256());
+ db_.store(hotACCOUNT_NODE, std::move(nodeData),
+ nodeHash.as_uint256(), ledgerSeq);
}
boost::optional
diff --git a/src/ripple/app/ledger/AccountStateSF.h b/src/ripple/app/ledger/AccountStateSF.h
index dd28eb3d2..0b5b83b42 100644
--- a/src/ripple/app/ledger/AccountStateSF.h
+++ b/src/ripple/app/ledger/AccountStateSF.h
@@ -21,31 +21,32 @@
#define RIPPLE_APP_LEDGER_ACCOUNTSTATESF_H_INCLUDED
#include
+#include
#include
-#include
namespace ripple {
// This class is only needed on add functions
// sync filter for account state nodes during ledger sync
-class AccountStateSF
- : public SHAMapSyncFilter
+class AccountStateSF : public SHAMapSyncFilter
{
-private:
- Family& f_;
- AbstractFetchPackContainer& fp_;
-
public:
- AccountStateSF(Family&, AbstractFetchPackContainer&);
+ AccountStateSF(NodeStore::Database& db, AbstractFetchPackContainer& fp)
+ : db_(db)
+ , fp_(fp)
+ {}
- // Note that the nodeData is overwritten by this call
- void gotNode (bool fromFilter,
- SHAMapHash const& nodeHash,
- Blob&& nodeData,
- SHAMapTreeNode::TNType) const override;
+ void
+ gotNode(bool fromFilter, SHAMapHash const& nodeHash,
+ std::uint32_t ledgerSeq, Blob&& nodeData,
+ SHAMapTreeNode::TNType type) const override;
boost::optional
getNode(SHAMapHash const& nodeHash) const override;
+
+private:
+ NodeStore::Database& db_;
+ AbstractFetchPackContainer& fp_;
};
} // ripple
diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.cpp b/src/ripple/app/ledger/ConsensusTransSetSF.cpp
index 96b7569ca..dea15568b 100644
--- a/src/ripple/app/ledger/ConsensusTransSetSF.cpp
+++ b/src/ripple/app/ledger/ConsensusTransSetSF.cpp
@@ -38,9 +38,9 @@ ConsensusTransSetSF::ConsensusTransSetSF (Application& app, NodeCache& nodeCache
{
}
-void ConsensusTransSetSF::gotNode (
- bool fromFilter, SHAMapHash const& nodeHash,
- Blob&& nodeData, SHAMapTreeNode::TNType type) const
+void
+ConsensusTransSetSF::gotNode(bool fromFilter, SHAMapHash const& nodeHash,
+ std::uint32_t, Blob&& nodeData, SHAMapTreeNode::TNType type) const
{
if (fromFilter)
return;
diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.h b/src/ripple/app/ledger/ConsensusTransSetSF.h
index 666d3792c..9daef878d 100644
--- a/src/ripple/app/ledger/ConsensusTransSetSF.h
+++ b/src/ripple/app/ledger/ConsensusTransSetSF.h
@@ -39,10 +39,10 @@ public:
ConsensusTransSetSF (Application& app, NodeCache& nodeCache);
// Note that the nodeData is overwritten by this call
- void gotNode (bool fromFilter,
- SHAMapHash const& nodeHash,
- Blob&& nodeData,
- SHAMapTreeNode::TNType) const override;
+ void
+ gotNode(bool fromFilter, SHAMapHash const& nodeHash,
+ std::uint32_t ledgerSeq, Blob&& nodeData,
+ SHAMapTreeNode::TNType type) const override;
boost::optional
getNode (SHAMapHash const& nodeHash) const override;
diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h
index d8ca700f4..a878f805b 100644
--- a/src/ripple/app/ledger/InboundLedger.h
+++ b/src/ripple/app/ledger/InboundLedger.h
@@ -44,18 +44,16 @@ public:
std::shared_ptr>;
// These are the reasons we might acquire a ledger
- enum fcReason
+ enum class Reason
{
- fcHISTORY, // Acquiring past ledger
- fcGENERIC, // Generic other reasons
- fcVALIDATION, // Validations suggest this ledger is important
- fcCURRENT, // This might be the current ledger
- fcCONSENSUS, // We believe the consensus round requires this ledger
+ HISTORY, // Acquiring past ledger
+ SHARD, // Acquiring for shard
+ GENERIC, // Generic other reasons
+ CONSENSUS // We believe the consensus round requires this ledger
};
-public:
- InboundLedger(Application& app,
- uint256 const& hash, std::uint32_t seq, fcReason reason, clock_type&);
+ InboundLedger(Application& app, uint256 const& hash,
+ std::uint32_t seq, Reason reason, clock_type&);
~InboundLedger ();
@@ -70,15 +68,24 @@ public:
{
return mLedger;
}
+
std::uint32_t getSeq () const
{
return mSeq;
}
+ Reason
+ getReason() const
+ {
+ return mReason;
+ }
+
bool checkLocal ();
void init (ScopedLockType& collectionLock);
- bool gotData (std::weak_ptr, std::shared_ptr);
+ bool
+ gotData(std::weak_ptr,
+ std::shared_ptr const&);
using neededHash_t =
std::pair ;
@@ -88,6 +95,10 @@ public:
void runData ();
+ static
+ LedgerInfo
+ deserializeHeader(Slice data, bool hasPrefix);
+
private:
enum class TriggerReason
{
@@ -105,7 +116,7 @@ private:
std::vector getNeededHashes ();
void addPeers ();
- bool tryLocal ();
+ void tryDB (Family& f);
void done ();
@@ -115,7 +126,7 @@ private:
{
// For historical nodes, do not trigger too soon
// since a fetch pack is probably coming
- if (mReason != fcHISTORY)
+ if (mReason != Reason::HISTORY)
trigger (peer, TriggerReason::added);
}
@@ -146,24 +157,18 @@ private:
neededStateHashes (
int max, SHAMapSyncFilter* filter) const;
- LedgerInfo
- deserializeHeader (
- Slice data,
- bool hasPrefix);
-
-private:
std::shared_ptr mLedger;
- bool mHaveHeader;
- bool mHaveState;
- bool mHaveTransactions;
- bool mSignaled;
- bool mByHash;
- std::uint32_t mSeq;
- fcReason mReason;
+ bool mHaveHeader;
+ bool mHaveState;
+ bool mHaveTransactions;
+ bool mSignaled;
+ bool mByHash;
+ std::uint32_t mSeq;
+ Reason const mReason;
std::set mRecentNodes;
- SHAMapAddNode mStats;
+ SHAMapAddNode mStats;
// Data we have received from peers
std::mutex mReceivedDataLock;
diff --git a/src/ripple/app/ledger/InboundLedgers.h b/src/ripple/app/ledger/InboundLedgers.h
index e610f84ab..9988e6a6b 100644
--- a/src/ripple/app/ledger/InboundLedgers.h
+++ b/src/ripple/app/ledger/InboundLedgers.h
@@ -43,7 +43,7 @@ public:
virtual
std::shared_ptr
acquire (uint256 const& hash,
- std::uint32_t seq, InboundLedger::fcReason) = 0;
+ std::uint32_t seq, InboundLedger::Reason) = 0;
virtual std::shared_ptr find (LedgerHash const& hash) = 0;
@@ -72,7 +72,7 @@ public:
virtual std::size_t fetchRate() = 0;
/** Called when a complete ledger is obtained. */
- virtual void onLedgerFetched (InboundLedger::fcReason why) = 0;
+ virtual void onLedgerFetched() = 0;
virtual void gotFetchPack () = 0;
virtual void sweep () = 0;
diff --git a/src/ripple/app/ledger/Ledger.cpp b/src/ripple/app/ledger/Ledger.cpp
index 61543b9b9..7cf8421ab 100644
--- a/src/ripple/app/ledger/Ledger.cpp
+++ b/src/ripple/app/ledger/Ledger.cpp
@@ -262,7 +262,7 @@ Ledger::Ledger (
if (! loaded)
{
info_.hash = calculateLedgerHash(info_);
- family.missing_node (info_.hash);
+ family.missing_node (info_.hash, info_.seq);
}
}
@@ -311,9 +311,11 @@ Ledger::Ledger (
Family& family)
: mImmutable (true)
, txMap_ (std::make_shared (SHAMapType::TRANSACTION,
- info.txHash, family, SHAMap::version{1}))
+ info.txHash, family,
+ SHAMap::version{getSHAMapV2(info) ? 2 : 1}))
, stateMap_ (std::make_shared (SHAMapType::STATE,
- info.accountHash, family, SHAMap::version{1}))
+ info.accountHash, family,
+ SHAMap::version{getSHAMapV2(info) ? 2 : 1}))
, rules_{config.features}
, info_ (info)
{
@@ -802,8 +804,8 @@ static bool saveValidatedLedger (
bool current)
{
auto j = app.journal ("Ledger");
-
- if (! app.pendingSaves().startWork (ledger->info().seq))
+ auto seq = ledger->info().seq;
+ if (! app.pendingSaves().startWork (seq))
{
// The save was completed synchronously
JLOG (j.debug()) << "Save aborted";
@@ -813,7 +815,7 @@ static bool saveValidatedLedger (
// TODO(tom): Fix this hard-coded SQL!
JLOG (j.trace())
<< "saveValidatedLedger "
- << (current ? "" : "fromAcquire ") << ledger->info().seq;
+ << (current ? "" : "fromAcquire ") << seq;
static boost::format deleteLedger (
"DELETE FROM Ledgers WHERE LedgerSeq = %u;");
static boost::format deleteTrans1 (
@@ -823,8 +825,6 @@ static bool saveValidatedLedger (
static boost::format deleteAcctTrans (
"DELETE FROM AccountTransactions WHERE TransID = '%s';");
- auto seq = ledger->info().seq;
-
if (! ledger->info().accountHash.isNonZero ())
{
JLOG (j.fatal()) << "AH is zero: "
@@ -848,11 +848,10 @@ static bool saveValidatedLedger (
Serializer s (128);
s.add32 (HashPrefix::ledgerMaster);
addRaw(ledger->info(), s);
- app.getNodeStore ().store (
- hotLEDGER, std::move (s.modData ()), ledger->info().hash);
+ app.getNodeStore().store(hotLEDGER,
+ std::move(s.modData()), ledger->info().hash, seq);
}
-
AcceptedLedger::pointer aLedger;
try
{
diff --git a/src/ripple/app/ledger/Ledger.h b/src/ripple/app/ledger/Ledger.h
index 0df1d4d20..d0b813865 100644
--- a/src/ripple/app/ledger/Ledger.h
+++ b/src/ripple/app/ledger/Ledger.h
@@ -261,8 +261,10 @@ public:
void
setFull() const
{
- txMap_->setLedgerSeq (info_.seq);
- stateMap_->setLedgerSeq (info_.seq);
+ txMap_->setFull();
+ stateMap_->setFull();
+ txMap_->setLedgerSeq(info_.seq);
+ stateMap_->setLedgerSeq(info_.seq);
}
void setTotalDrops (std::uint64_t totDrops)
diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h
index 8be9bc0bd..2b25886f0 100644
--- a/src/ripple/app/ledger/LedgerMaster.h
+++ b/src/ripple/app/ledger/LedgerMaster.h
@@ -22,6 +22,7 @@
#include
#include
+#include
#include
#include
#include
@@ -180,7 +181,7 @@ public:
LedgerIndex ledgerIndex);
boost::optional getCloseTimeByHash (
- LedgerHash const& ledgerHash);
+ LedgerHash const& ledgerHash, LedgerIndex ledgerIndex);
void addHeldTransaction (std::shared_ptr const& trans);
void fixMismatch (ReadView const& ledger);
@@ -255,14 +256,21 @@ private:
Job& job,
std::shared_ptr ledger);
- void getFetchPack(LedgerHash missingHash, LedgerIndex missingIndex);
- boost::optional getLedgerHashForHistory(LedgerIndex index);
+ void getFetchPack(
+ LedgerIndex missingIndex, InboundLedger::Reason reason);
+
+ boost::optional getLedgerHashForHistory(
+ LedgerIndex index, InboundLedger::Reason reason);
+
std::size_t getNeededValidations();
void advanceThread();
+ void fetchForHistory(
+ std::uint32_t missing,
+ bool& progress,
+ InboundLedger::Reason reason);
// Try to publish ledgers, acquire missing ledgers. Always called with
// m_mutex locked. The passed ScopedLockType is a reminder to callers.
void doAdvance(ScopedLockType&);
- bool shouldFetchPack(std::uint32_t seq) const;
bool shouldAcquire(
std::uint32_t const currentLedger,
std::uint32_t const ledgerHistory,
@@ -299,6 +307,9 @@ private:
// The last ledger we handled fetching history
std::shared_ptr mHistLedger;
+ // The last ledger we handled fetching for a shard
+ std::shared_ptr mShardLedger;
+
// Fully validated ledger, whether or not we have the ledger resident.
std::pair mLastValidLedger {uint256(), 0};
@@ -342,7 +353,7 @@ private:
// How much history do we want to keep
std::uint32_t const ledger_history_;
- int const ledger_fetch_size_;
+ std::uint32_t const ledger_fetch_size_;
TaggedCache fetch_packs_;
diff --git a/src/ripple/app/ledger/TransactionStateSF.cpp b/src/ripple/app/ledger/TransactionStateSF.cpp
index 7f1401d20..c30ee9829 100644
--- a/src/ripple/app/ledger/TransactionStateSF.cpp
+++ b/src/ripple/app/ledger/TransactionStateSF.cpp
@@ -22,22 +22,14 @@
namespace ripple {
-TransactionStateSF::TransactionStateSF(Family& f,
- AbstractFetchPackContainer& fp)
- : f_(f)
- , fp_(fp)
-{
-}
+void
+TransactionStateSF::gotNode(bool, SHAMapHash const& nodeHash,
+ std::uint32_t ledgerSeq, Blob&& nodeData, SHAMapTreeNode::TNType type) const
-void TransactionStateSF::gotNode (bool fromFilter,
- SHAMapHash const& nodeHash,
- Blob&& nodeData,
- SHAMapTreeNode::TNType type) const
{
- assert(type !=
- SHAMapTreeNode::tnTRANSACTION_NM);
- f_.db().store(hotTRANSACTION_NODE,
- std::move (nodeData), nodeHash.as_uint256());
+ assert(type != SHAMapTreeNode::tnTRANSACTION_NM);
+ db_.store(hotTRANSACTION_NODE, std::move(nodeData),
+ nodeHash.as_uint256(), ledgerSeq);
}
boost::optional
diff --git a/src/ripple/app/ledger/TransactionStateSF.h b/src/ripple/app/ledger/TransactionStateSF.h
index 9d3474e06..928977b8f 100644
--- a/src/ripple/app/ledger/TransactionStateSF.h
+++ b/src/ripple/app/ledger/TransactionStateSF.h
@@ -21,33 +21,32 @@
#define RIPPLE_APP_LEDGER_TRANSACTIONSTATESF_H_INCLUDED
#include
+#include
#include
-#include
-#include
namespace ripple {
// This class is only needed on add functions
// sync filter for transactions tree during ledger sync
-class TransactionStateSF
- : public SHAMapSyncFilter
+class TransactionStateSF : public SHAMapSyncFilter
{
-private:
- Family& f_;
- AbstractFetchPackContainer& fp_;
-
public:
- explicit
- TransactionStateSF(Family&, AbstractFetchPackContainer&);
+ TransactionStateSF(NodeStore::Database& db, AbstractFetchPackContainer& fp)
+ : db_(db)
+ , fp_(fp)
+ {}
- // Note that the nodeData is overwritten by this call
- void gotNode (bool fromFilter,
- SHAMapHash const& nodeHash,
- Blob&& nodeData,
- SHAMapTreeNode::TNType) const override;
+ void
+ gotNode(bool fromFilter, SHAMapHash const& nodeHash,
+ std::uint32_t ledgerSeq, Blob&& nodeData,
+ SHAMapTreeNode::TNType type) const override;
boost::optional
getNode(SHAMapHash const& nodeHash) const override;
+
+private:
+ NodeStore::Database& db_;
+ AbstractFetchPackContainer& fp_;
};
} // ripple
diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp
index 8261b6f79..f427d5d71 100644
--- a/src/ripple/app/ledger/impl/InboundLedger.cpp
+++ b/src/ripple/app/ledger/impl/InboundLedger.cpp
@@ -18,9 +18,9 @@
//==============================================================================
#include
+#include
#include
#include
-#include
#include
#include
#include
@@ -32,7 +32,8 @@
#include
#include
#include
-#include
+#include
+
#include
namespace ripple {
@@ -66,8 +67,8 @@ enum
// millisecond for each ledger timeout
auto constexpr ledgerAcquireTimeout = 2500ms;
-InboundLedger::InboundLedger (
- Application& app, uint256 const& hash, std::uint32_t seq, fcReason reason, clock_type& clock)
+InboundLedger::InboundLedger(Application& app, uint256 const& hash,
+ std::uint32_t seq, Reason reason, clock_type& clock)
: PeerSet (app, hash, ledgerAcquireTimeout, clock,
app.journal("InboundLedger"))
, mHaveHeader (false)
@@ -79,37 +80,69 @@ InboundLedger::InboundLedger (
, mReason (reason)
, mReceiveDispatched (false)
{
- JLOG (m_journal.trace()) <<
- "Acquiring ledger " << mHash;
+ JLOG (m_journal.trace()) << "Acquiring ledger " << mHash;
}
-void InboundLedger::init (ScopedLockType& collectionLock)
+void
+InboundLedger::init(ScopedLockType& collectionLock)
{
ScopedLockType sl (mLock);
- collectionLock.unlock ();
-
- if (!tryLocal ())
+ collectionLock.unlock();
+ tryDB(app_.family());
+ if (mFailed)
+ return;
+ if (! mComplete)
{
- addPeers ();
- setTimer ();
- }
- else if (!isFailed ())
- {
- JLOG (m_journal.debug()) <<
- "Acquiring ledger we already have locally: " << getHash ();
- mLedger->setImmutable (app_.config());
-
- if (mReason != fcHISTORY)
- app_.getLedgerMaster ().storeLedger (mLedger);
-
- // Check if this could be a newer fully-validated ledger
- if (mReason == fcVALIDATION ||
- mReason == fcCURRENT ||
- mReason == fcCONSENSUS)
+ auto shardStore = app_.getShardStore();
+ if (mReason == Reason::SHARD)
{
- app_.getLedgerMaster ().checkAccept (mLedger);
+ if (! shardStore || ! app_.shardFamily())
+ {
+ JLOG(m_journal.error()) <<
+ "Acquiring shard with no shard store available";
+ mFailed = true;
+ return;
+ }
+ mHaveHeader = false;
+ mHaveTransactions = false;
+ mHaveState = false;
+ mLedger.reset();
+ tryDB(*app_.shardFamily());
+ if (mFailed)
+ return;
+ }
+ else if (shardStore && mSeq >= NodeStore::genesisSeq)
+ {
+ if (auto l = shardStore->fetchLedger(mHash, mSeq))
+ {
+ mHaveHeader = true;
+ mHaveTransactions = true;
+ mHaveState = true;
+ mComplete = true;
+ mLedger = std::move(l);
+ }
}
}
+ if (! mComplete)
+ {
+ addPeers();
+ execute();
+ return;
+ }
+
+ JLOG (m_journal.debug()) <<
+ "Acquiring ledger we already have in " <<
+ " local store. " << mHash;
+ mLedger->setImmutable(app_.config());
+
+ if (mReason == Reason::HISTORY || mReason == Reason::SHARD)
+ return;
+
+ app_.getLedgerMaster().storeLedger(mLedger);
+
+ // Check if this could be a newer fully-validated ledger
+ if (mReason == Reason::CONSENSUS)
+ app_.getLedgerMaster().checkAccept(mLedger);
}
void InboundLedger::execute ()
@@ -144,11 +177,19 @@ void InboundLedger::update (std::uint32_t seq)
bool InboundLedger::checkLocal ()
{
ScopedLockType sl (mLock);
-
- if (!isDone () && tryLocal())
+ if (! isDone())
{
- done();
- return true;
+ if (mLedger)
+ tryDB(mLedger->stateMap().family());
+ else if(mReason == Reason::SHARD)
+ tryDB(*app_.shardFamily());
+ else
+ tryDB(app_.family());
+ if (mFailed || mComplete)
+ {
+ done();
+ return true;
+ }
}
return false;
}
@@ -232,75 +273,79 @@ InboundLedger::deserializeHeader (
return info;
}
-/** See how much of the ledger data, if any, is
- in our node store
-*/
-bool InboundLedger::tryLocal ()
+// See how much of the ledger data is stored locally
+// Data found in a fetch pack will be stored
+void
+InboundLedger::tryDB(Family& f)
{
- // return value: true = no more work to do
-
- if (!mHaveHeader)
+ if (! mHaveHeader)
{
- // Nothing we can do without the ledger header
- auto node = app_.getNodeStore ().fetch (mHash);
+ auto makeLedger = [&, this](Blob const& data)
+ {
+ JLOG(m_journal.trace()) <<
+ "Ledger header found in fetch pack";
+ mLedger = std::make_shared(
+ deserializeHeader(makeSlice(data), true),
+ app_.config(), f);
+ if (mLedger->info().hash != mHash ||
+ (mSeq != 0 && mSeq != mLedger->info().seq))
+ {
+ // We know for a fact the ledger can never be acquired
+ JLOG(m_journal.warn()) <<
+ "hash " << mHash <<
+ " seq " << std::to_string(mSeq) <<
+ " cannot be a ledger";
+ mLedger.reset();
+ mFailed = true;
+ }
+ };
- if (!node)
+ // Try to fetch the ledger header from the DB
+ auto node = f.db().fetch(mHash, mSeq);
+ if (! node)
{
auto data = app_.getLedgerMaster().getFetchPack(mHash);
if (! data)
- return false;
-
+ return;
JLOG (m_journal.trace()) <<
"Ledger header found in fetch pack";
-
- mLedger = std::make_shared (
- deserializeHeader (makeSlice(*data), true),
- app_.config(),
- app_.family());
-
- app_.getNodeStore ().store (
- hotLEDGER, std::move (*data), mHash);
+ makeLedger(*data);
+ if (mLedger)
+ f.db().store(hotLEDGER, std::move(*data),
+ mHash, mLedger->info().seq);
}
else
{
- mLedger = std::make_shared(
- deserializeHeader (makeSlice (node->getData()), true),
- app_.config(),
- app_.family());
+ JLOG (m_journal.trace()) <<
+ "Ledger header found in node store";
+ makeLedger(node->getData());
}
-
- if (mLedger->info().hash != mHash)
- {
- // We know for a fact the ledger can never be acquired
- JLOG (m_journal.warn()) <<
- mHash << " cannot be a ledger";
- mFailed = true;
- return true;
- }
-
+ if (mFailed)
+ return;
+ if (mSeq == 0)
+ mSeq = mLedger->info().seq;
+ mLedger->stateMap().setLedgerSeq(mSeq);
+ mLedger->txMap().setLedgerSeq(mSeq);
mHaveHeader = true;
}
- if (!mHaveTransactions)
+ if (! mHaveTransactions)
{
- if (mLedger->info().txHash.isZero ())
+ if (mLedger->info().txHash.isZero())
{
- JLOG (m_journal.trace()) <<
- "No TXNs to fetch";
+ JLOG (m_journal.trace()) << "No TXNs to fetch";
mHaveTransactions = true;
}
else
{
- TransactionStateSF filter(mLedger->txMap().family(),
+ TransactionStateSF filter(mLedger->txMap().family().db(),
app_.getLedgerMaster());
-
- if (mLedger->txMap().fetchRoot (
+ if (mLedger->txMap().fetchRoot(
SHAMapHash{mLedger->info().txHash}, &filter))
{
- auto h = neededTxHashes (1, &filter);
- if (h.empty ())
+ if (neededTxHashes(1, &filter).empty())
{
- JLOG (m_journal.trace()) <<
+ JLOG(m_journal.trace()) <<
"Had full txn map locally";
mHaveTransactions = true;
}
@@ -308,26 +353,23 @@ bool InboundLedger::tryLocal ()
}
}
- if (!mHaveState)
+ if (! mHaveState)
{
- if (mLedger->info().accountHash.isZero ())
+ if (mLedger->info().accountHash.isZero())
{
JLOG (m_journal.fatal()) <<
"We are acquiring a ledger with a zero account hash";
mFailed = true;
- return true;
+ return;
}
-
- AccountStateSF filter(mLedger->stateMap().family(),
+ AccountStateSF filter(mLedger->stateMap().family().db(),
app_.getLedgerMaster());
-
- if (mLedger->stateMap().fetchRoot (
+ if (mLedger->stateMap().fetchRoot(
SHAMapHash{mLedger->info().accountHash}, &filter))
{
- auto h = neededStateHashes (1, &filter);
- if (h.empty ())
+ if (neededStateHashes(1, &filter).empty())
{
- JLOG (m_journal.trace()) <<
+ JLOG(m_journal.trace()) <<
"Had full AS map locally";
mHaveState = true;
}
@@ -336,13 +378,11 @@ bool InboundLedger::tryLocal ()
if (mHaveTransactions && mHaveState)
{
- JLOG (m_journal.debug()) <<
+ JLOG(m_journal.debug()) <<
"Had everything locally";
mComplete = true;
- mLedger->setImmutable (app_.config());
+ mLedger->setImmutable(app_.config());
}
-
- return mComplete;
}
/** Called with a lock by the PeerSet when the timer expires
@@ -386,14 +426,14 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
"No progress(" << pc <<
") for ledger " << mHash;
- // addPeers triggers if the reason is not fcHISTORY
- // So if the reason IS fcHISTORY, need to trigger after we add
+ // addPeers triggers if the reason is not HISTORY
+ // So if the reason IS HISTORY, need to trigger after we add
// otherwise, we need to trigger before we add
// so each peer gets triggered once
- if (mReason != fcHISTORY)
+ if (mReason != Reason::HISTORY)
trigger (nullptr, TriggerReason::timeout);
addPeers ();
- if (mReason == fcHISTORY)
+ if (mReason == Reason::HISTORY)
trigger (nullptr, TriggerReason::timeout);
}
}
@@ -421,20 +461,29 @@ void InboundLedger::done ()
JLOG (m_journal.debug()) <<
"Acquire " << mHash <<
- (isFailed () ? " fail " : " ") <<
+ (mFailed ? " fail " : " ") <<
((getTimeouts () == 0) ? std::string() :
(std::string ("timeouts:") +
to_string (getTimeouts ()) + " ")) <<
mStats.get ();
- assert (isComplete () || isFailed ());
+ assert (mComplete || mFailed);
- if (isComplete () && !isFailed () && mLedger)
+ if (mComplete && ! mFailed && mLedger)
{
mLedger->setImmutable (app_.config());
- if (mReason != fcHISTORY)
- app_.getLedgerMaster ().storeLedger (mLedger);
- app_.getInboundLedgers().onLedgerFetched(mReason);
+ switch (mReason)
+ {
+ case Reason::SHARD:
+ app_.getShardStore()->setStored(mLedger);
+ // TODO c++17: [[fallthrough]]
+ case Reason::HISTORY:
+ app_.getInboundLedgers().onLedgerFetched();
+ break;
+ default:
+ app_.getLedgerMaster().storeLedger(mLedger);
+ break;
+ }
}
// We hold the PeerSet lock, so must dispatch
@@ -442,7 +491,7 @@ void InboundLedger::done ()
jtLEDGER_DATA, "AcquisitionDone",
[self = shared_from_this()](Job&)
{
- if (self->isComplete() && !self->isFailed())
+ if (self->mComplete && !self->mFailed)
{
self->app().getLedgerMaster().checkAccept(
self->getLedger());
@@ -487,10 +536,10 @@ void InboundLedger::trigger (std::shared_ptr const& peer, TriggerReason re
" as=" << mHaveState;
}
- if (!mHaveHeader)
+ if (! mHaveHeader)
{
- tryLocal ();
-
+ tryDB(mReason == Reason::SHARD ?
+ *app_.shardFamily() : app_.family());
if (mFailed)
{
JLOG (m_journal.warn()) <<
@@ -506,17 +555,17 @@ void InboundLedger::trigger (std::shared_ptr const& peer, TriggerReason re
{ // Be more aggressive if we've timed out at least once
tmGL.set_querytype (protocol::qtINDIRECT);
- if (!isProgress () && !mFailed && mByHash && (
- getTimeouts () > ledgerBecomeAggressiveThreshold))
+ if (! isProgress () && ! mFailed && mByHash &&
+ (getTimeouts () > ledgerBecomeAggressiveThreshold))
{
auto need = getNeededHashes ();
if (!need.empty ())
{
protocol::TMGetObjectByHash tmBH;
+ bool typeSet = false;
tmBH.set_query (true);
tmBH.set_ledgerhash (mHash.begin (), mHash.size ());
- bool typeSet = false;
for (auto const& p : need)
{
JLOG (m_journal.warn()) <<
@@ -532,6 +581,8 @@ void InboundLedger::trigger (std::shared_ptr const& peer, TriggerReason re
{
protocol::TMIndexedObject* io = tmBH.add_objects ();
io->set_hash (p.second.begin (), p.second.size ());
+ if (mSeq != 0)
+ io->set_ledgerseq(mSeq);
}
}
@@ -564,6 +615,8 @@ void InboundLedger::trigger (std::shared_ptr const& peer, TriggerReason re
if (!mHaveHeader && !mFailed)
{
tmGL.set_itype (protocol::liBASE);
+ if (mSeq != 0)
+ tmGL.set_ledgerseq (mSeq);
JLOG (m_journal.trace()) <<
"Sending header request to " <<
(peer ? "selected peer" : "all peers");
@@ -610,7 +663,7 @@ void InboundLedger::trigger (std::shared_ptr const& peer, TriggerReason re
}
else
{
- AccountStateSF filter(mLedger->stateMap().family(),
+ AccountStateSF filter(mLedger->stateMap().family().db(),
app_.getLedgerMaster());
// Release the lock while we process the large state map
@@ -684,7 +737,7 @@ void InboundLedger::trigger (std::shared_ptr const& peer, TriggerReason re
}
else
{
- TransactionStateSF filter(mLedger->txMap().family(),
+ TransactionStateSF filter(mLedger->txMap().family().db(),
app_.getLedgerMaster());
auto nodes = mLedger->txMap().getMissingNodes (
@@ -799,12 +852,12 @@ bool InboundLedger::takeHeader (std::string const& data)
if (mComplete || mFailed || mHaveHeader)
return true;
- mLedger = std::make_shared(
- deserializeHeader (makeSlice(data), false),
- app_.config(),
- app_.family());
-
- if (mLedger->info().hash != mHash)
+ auto* f = mReason == Reason::SHARD ?
+ app_.shardFamily() : &app_.family();
+ mLedger = std::make_shared(deserializeHeader(
+ makeSlice(data), false), app_.config(), *f);
+ if (mLedger->info().hash != mHash ||
+ (mSeq != 0 && mSeq != mLedger->info().seq))
{
JLOG (m_journal.warn()) <<
"Acquire hash mismatch: " << mLedger->info().hash <<
@@ -812,14 +865,16 @@ bool InboundLedger::takeHeader (std::string const& data)
mLedger.reset ();
return false;
}
-
+ if (mSeq == 0)
+ mSeq = mLedger->info().seq;
+ mLedger->stateMap().setLedgerSeq(mSeq);
+ mLedger->txMap().setLedgerSeq(mSeq);
mHaveHeader = true;
Serializer s (data.size () + 4);
s.add32 (HashPrefix::ledgerMaster);
s.addRaw (data.data(), data.size());
- app_.getNodeStore ().store (
- hotLEDGER, std::move (s.modData ()), mHash);
+ f->db().store(hotLEDGER, std::move (s.modData ()), mHash, mSeq);
if (mLedger->info().txHash.isZero ())
mHaveTransactions = true;
@@ -855,7 +910,7 @@ bool InboundLedger::takeTxNode (const std::vector& nodeIDs,
auto nodeIDit = nodeIDs.cbegin ();
auto nodeDatait = data.begin ();
- TransactionStateSF filter(mLedger->txMap().family(),
+ TransactionStateSF filter(mLedger->txMap().family().db(),
app_.getLedgerMaster());
while (nodeIDit != nodeIDs.cend ())
@@ -927,7 +982,7 @@ bool InboundLedger::takeAsNode (const std::vector& nodeIDs,
auto nodeIDit = nodeIDs.cbegin ();
auto nodeDatait = data.begin ();
- AccountStateSF filter(mLedger->stateMap().family(),
+ AccountStateSF filter(mLedger->stateMap().family().db(),
app_.getLedgerMaster());
while (nodeIDit != nodeIDs.cend ())
@@ -991,7 +1046,7 @@ bool InboundLedger::takeAsRootNode (Slice const& data, SHAMapAddNode& san)
return false;
}
- AccountStateSF filter(mLedger->stateMap().family(),
+ AccountStateSF filter(mLedger->stateMap().family().db(),
app_.getLedgerMaster());
san += mLedger->stateMap().addRootNode (
SHAMapHash{mLedger->info().accountHash}, data, snfWIRE, &filter);
@@ -1015,7 +1070,7 @@ bool InboundLedger::takeTxRootNode (Slice const& data, SHAMapAddNode& san)
return false;
}
- TransactionStateSF filter(mLedger->txMap().family(),
+ TransactionStateSF filter(mLedger->txMap().family().db(),
app_.getLedgerMaster());
san += mLedger->txMap().addRootNode (
SHAMapHash{mLedger->info().txHash}, data, snfWIRE, &filter);
@@ -1036,7 +1091,7 @@ InboundLedger::getNeededHashes ()
if (!mHaveState)
{
- AccountStateSF filter(mLedger->stateMap().family(),
+ AccountStateSF filter(mLedger->stateMap().family().db(),
app_.getLedgerMaster());
for (auto const& h : neededStateHashes (4, &filter))
{
@@ -1047,7 +1102,7 @@ InboundLedger::getNeededHashes ()
if (!mHaveTransactions)
{
- TransactionStateSF filter(mLedger->txMap().family(),
+ TransactionStateSF filter(mLedger->txMap().family().db(),
app_.getLedgerMaster());
for (auto const& h : neededTxHashes (4, &filter))
{
@@ -1062,9 +1117,9 @@ InboundLedger::getNeededHashes ()
/** Stash a TMLedgerData received from a peer for later processing
Returns 'true' if we need to dispatch
*/
-// VFALCO TODO Why isn't the shared_ptr passed by const& ?
-bool InboundLedger::gotData (std::weak_ptr peer,
- std::shared_ptr data)
+bool
+InboundLedger::gotData(std::weak_ptr peer,
+ std::shared_ptr const& data)
{
std::lock_guard sl (mReceivedDataLock);
diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp
index 5aba17851..f7d783772 100644
--- a/src/ripple/app/ledger/impl/InboundLedgers.cpp
+++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp
@@ -25,6 +25,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -65,41 +66,64 @@ public:
}
std::shared_ptr
- acquire (
- uint256 const& hash,
- std::uint32_t seq,
- InboundLedger::fcReason reason)
+ acquire(uint256 const& hash, std::uint32_t seq,
+ InboundLedger::Reason reason)
{
- assert (hash.isNonZero ());
+ assert(hash.isNonZero());
+ assert(reason != InboundLedger::Reason::SHARD ||
+ (seq != 0 && app_.getShardStore()));
+ if (isStopping())
+ return {};
+
bool isNew = true;
std::shared_ptr inbound;
{
- ScopedLockType sl (mLock);
-
- if (! isStopping ())
+ ScopedLockType sl(mLock);
+ auto it = mLedgers.find(hash);
+ if (it != mLedgers.end())
{
- auto it = mLedgers.find (hash);
- if (it != mLedgers.end ())
- {
- isNew = false;
- inbound = it->second;
- }
- else
- {
- inbound = std::make_shared (app_,
- hash, seq, reason, std::ref (m_clock));
- mLedgers.emplace (hash, inbound);
- inbound->init (sl);
- ++mCounter;
- }
+ isNew = false;
+ inbound = it->second;
+ }
+ else
+ {
+ inbound = std::make_shared (
+ app_, hash, seq, reason, std::ref(m_clock));
+ mLedgers.emplace(hash, inbound);
+ inbound->init(sl);
+ ++mCounter;
}
}
- if (inbound && ! isNew && ! inbound->isFailed ())
- inbound->update (seq);
- if (inbound && inbound->isComplete ())
- return inbound->getLedger();
- return {};
+ if (inbound->isFailed())
+ return {};
+
+ if (! isNew)
+ inbound->update(seq);
+
+ if (! inbound->isComplete())
+ return {};
+
+ if (reason == InboundLedger::Reason::HISTORY)
+ {
+ if (inbound->getLedger()->stateMap().family().isShardBacked())
+ app_.getNodeStore().copyLedger(inbound->getLedger());
+ }
+ else if (reason == InboundLedger::Reason::SHARD)
+ {
+ auto shardStore = app_.getShardStore();
+ if (!shardStore)
+ {
+ JLOG(j_.error()) <<
+ "Acquiring shard with no shard store available";
+ return {};
+ }
+ if (inbound->getLedger()->stateMap().family().isShardBacked())
+ shardStore->setStored(inbound->getLedger());
+ else
+ shardStore->copyLedger(inbound->getLedger());
+ }
+ return inbound->getLedger();
}
std::shared_ptr find (uint256 const& hash)
@@ -280,13 +304,11 @@ public:
m_clock.now());
}
- void onLedgerFetched (
- InboundLedger::fcReason why)
+ // Should only be called with an inboundledger that has
+ // a reason of history or shard
+ void onLedgerFetched()
{
- if (why != InboundLedger::fcHISTORY)
- return;
- std::lock_guard<
- std::mutex> lock(fetchRateMutex_);
+ std::lock_guard lock(fetchRateMutex_);
fetchRate_.add(1, m_clock.now());
}
diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp
index f132c28cb..f768df6d9 100644
--- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp
+++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp
@@ -284,7 +284,7 @@ private:
"Node missing from ledger " << ledger->info().seq;
app_.getInboundLedgers().acquire (
ledger->info().hash, ledger->info().seq,
- InboundLedger::fcGENERIC);
+ InboundLedger::Reason::GENERIC);
}
return hash ? *hash : zero; // kludge
}
@@ -303,13 +303,13 @@ private:
bool doTxns)
{
auto nodeLedger = app_.getInboundLedgers().acquire (
- ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
+ ledgerHash, ledgerIndex, InboundLedger::Reason::GENERIC);
if (!nodeLedger)
{
JLOG (j_.debug()) << "Ledger " << ledgerIndex << " not available";
app_.getLedgerMaster().clearLedger (ledgerIndex);
app_.getInboundLedgers().acquire(
- ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
+ ledgerHash, ledgerIndex, InboundLedger::Reason::GENERIC);
return false;
}
@@ -336,7 +336,7 @@ private:
JLOG (j_.debug()) << "Ledger " << ledgerIndex << " is missing nodes";
app_.getLedgerMaster().clearLedger (ledgerIndex);
app_.getInboundLedgers().acquire(
- ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
+ ledgerHash, ledgerIndex, InboundLedger::Reason::GENERIC);
return false;
}
@@ -390,7 +390,7 @@ private:
// ledger.
referenceLedger =
app_.getInboundLedgers().acquire(
- refHash, refIndex, InboundLedger::fcGENERIC);
+ refHash, refIndex, InboundLedger::Reason::GENERIC);
if (referenceLedger)
ledgerHash = getLedgerHash(
referenceLedger, ledgerIndex);
diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp
index 68733671e..6ed8a8b6c 100644
--- a/src/ripple/app/ledger/impl/LedgerMaster.cpp
+++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp
@@ -19,7 +19,6 @@
#include
#include
-#include
#include
#include
#include
@@ -40,6 +39,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -499,13 +499,16 @@ LedgerMaster::tryFill (
/** Request a fetch pack to get to the specified ledger
*/
void
-LedgerMaster::getFetchPack (LedgerHash missingHash, LedgerIndex missingIndex)
+LedgerMaster::getFetchPack (LedgerIndex missingIndex,
+ InboundLedger::Reason reason)
{
- auto haveHash = getLedgerHashForHistory (missingIndex + 1);
-
+ auto haveHash = getLedgerHashForHistory(
+ missingIndex + 1, reason);
if (!haveHash)
{
- JLOG (m_journal.error()) << "No hash for fetch pack";
+ JLOG (m_journal.error()) <<
+ "No hash for fetch pack. Missing Index " <<
+ std::to_string(missingIndex);
return;
}
assert(haveHash->isNonZero());
@@ -668,7 +671,7 @@ LedgerMaster::failedSave(std::uint32_t seq, uint256 const& hash)
{
clearLedger(seq);
app_.getInboundLedgers().acquire(
- hash, seq, InboundLedger::fcGENERIC);
+ hash, seq, InboundLedger::Reason::GENERIC);
}
// Check if the specified ledger can become the new last fully-validated
@@ -716,7 +719,7 @@ LedgerMaster::checkAccept (uint256 const& hash, std::uint32_t seq)
// FIXME: We may not want to fetch a ledger with just one
// trusted validation
ledger = app_.getInboundLedgers().acquire(
- hash, 0, InboundLedger::fcGENERIC);
+ hash, seq, InboundLedger::Reason::GENERIC);
}
if (ledger)
@@ -922,16 +925,19 @@ LedgerMaster::advanceThread()
}
boost::optional
-LedgerMaster::getLedgerHashForHistory (LedgerIndex index)
+LedgerMaster::getLedgerHashForHistory(
+ LedgerIndex index, InboundLedger::Reason reason)
{
// Try to get the hash of a ledger we need to fetch for history
boost::optional ret;
+ auto const& l {reason == InboundLedger::Reason::SHARD ?
+ mShardLedger : mHistLedger};
- if (mHistLedger && (mHistLedger->info().seq >= index))
+ if (l && l->info().seq >= index)
{
- ret = hashOfSeq(*mHistLedger, index, m_journal);
+ ret = hashOfSeq(*l, index, m_journal);
if (! ret)
- ret = walkHashBySeq (index, mHistLedger);
+ ret = walkHashBySeq (index, l);
}
if (! ret)
@@ -940,12 +946,6 @@ LedgerMaster::getLedgerHashForHistory (LedgerIndex index)
return ret;
}
-bool
-LedgerMaster::shouldFetchPack (std::uint32_t seq) const
-{
- return (fetch_seq_ != seq);
-}
-
std::vector>
LedgerMaster::findNewLedgersToPublish ()
{
@@ -1030,7 +1030,7 @@ LedgerMaster::findNewLedgersToPublish ()
// Can we try to acquire the ledger we need?
if (! ledger && (++acqCount < ledger_fetch_size_))
ledger = app_.getInboundLedgers ().acquire(
- *hash, seq, InboundLedger::fcGENERIC);
+ *hash, seq, InboundLedger::Reason::GENERIC);
// Did we acquire the next ledger we need to publish?
if (ledger && (ledger->info().seq == pubSeq))
@@ -1171,7 +1171,7 @@ LedgerMaster::updatePaths (Job& job)
app_.getInboundLedgers().acquire(
lastLedger->info().parentHash,
lastLedger->info().seq - 1,
- InboundLedger::fcGENERIC);
+ InboundLedger::Reason::GENERIC);
}
else
{
@@ -1179,7 +1179,7 @@ LedgerMaster::updatePaths (Job& job)
app_.getInboundLedgers().acquire(
lastLedger->info().hash,
lastLedger->info().seq,
- InboundLedger::fcGENERIC);
+ InboundLedger::Reason::GENERIC);
}
}
}
@@ -1278,13 +1278,15 @@ boost::optional
LedgerMaster::getCloseTimeBySeq (LedgerIndex ledgerIndex)
{
uint256 hash = getHashBySeq (ledgerIndex);
- return hash.isNonZero() ? getCloseTimeByHash (hash) : boost::none;
+ return hash.isNonZero() ? getCloseTimeByHash(
+ hash, ledgerIndex) : boost::none;
}
boost::optional
-LedgerMaster::getCloseTimeByHash (LedgerHash const& ledgerHash)
+LedgerMaster::getCloseTimeByHash(LedgerHash const& ledgerHash,
+ std::uint32_t index)
{
- auto node = app_.getNodeStore().fetch (ledgerHash);
+ auto node = app_.getNodeStore().fetch(ledgerHash, index);
if (node &&
(node->getData().size() >= 120))
{
@@ -1365,7 +1367,7 @@ LedgerMaster::walkHashBySeq (
if (!ledger)
{
auto const ledger = app_.getInboundLedgers().acquire (
- *refHash, refIndex, InboundLedger::fcGENERIC);
+ *refHash, refIndex, InboundLedger::Reason::GENERIC);
if (ledger)
{
ledgerHash = hashOfSeq(*ledger, index, m_journal);
@@ -1514,11 +1516,129 @@ LedgerMaster::shouldAcquire (
return ret;
}
+void
+LedgerMaster::fetchForHistory(
+ std::uint32_t missing,
+ bool& progress,
+ InboundLedger::Reason reason)
+{
+ ScopedUnlockType sl(m_mutex);
+ if (auto hash = getLedgerHashForHistory(missing, reason))
+ {
+ assert(hash->isNonZero());
+ auto ledger = getLedgerByHash(*hash);
+ if (! ledger)
+ {
+ if (!app_.getInboundLedgers().isFailure(*hash))
+ {
+ ledger = app_.getInboundLedgers().acquire(
+ *hash, missing, reason);
+ if (!ledger &&
+ missing > NodeStore::genesisSeq &&
+ missing != fetch_seq_)
+ {
+ JLOG(m_journal.trace())
+ << "fetchForHistory want fetch pack " << missing;
+ fetch_seq_ = missing;
+ getFetchPack(missing, reason);
+ }
+ else
+ JLOG(m_journal.trace())
+ << "fetchForHistory no fetch pack for " << missing;
+ }
+ else
+ JLOG(m_journal.debug())
+ << "fetchForHistory found failed acquire";
+ }
+ if (ledger)
+ {
+ auto seq = ledger->info().seq;
+ assert(seq == missing);
+ JLOG(m_journal.trace()) <<
+ "fetchForHistory acquired " << seq;
+ if (reason == InboundLedger::Reason::SHARD)
+ {
+ ledger->setFull();
+ {
+ ScopedLockType lock(m_mutex);
+ mShardLedger = ledger;
+ }
+ if (!ledger->stateMap().family().isShardBacked())
+ app_.getShardStore()->copyLedger(ledger);
+ }
+ else
+ {
+ setFullLedger(ledger, false, false);
+ int fillInProgress;
+ {
+ ScopedLockType lock(m_mutex);
+ mHistLedger = ledger;
+ fillInProgress = mFillInProgress;
+ }
+ if (fillInProgress == 0 &&
+ getHashByIndex(seq - 1, app_) == ledger->info().parentHash)
+ {
+ {
+ // Previous ledger is in DB
+ ScopedLockType lock(m_mutex);
+ mFillInProgress = seq;
+ }
+ app_.getJobQueue().addJob(jtADVANCE, "tryFill",
+ [this, ledger](Job& j) { tryFill(j, ledger); });
+ }
+ }
+ progress = true;
+ }
+ else
+ {
+ std::uint32_t fetchSz;
+ if (reason == InboundLedger::Reason::SHARD)
+ // Do not fetch ledger sequences lower
+ // than the shard's first ledger sequence
+ fetchSz = NodeStore::DatabaseShard::firstSeq(
+ NodeStore::DatabaseShard::seqToShardIndex(missing));
+ else
+ // Do not fetch ledger sequences lower
+ // than the genesis ledger sequence
+ fetchSz = NodeStore::genesisSeq;
+ fetchSz = missing >= fetchSz ?
+ std::min(ledger_fetch_size_, (missing - fetchSz) + 1) : 0;
+ try
+ {
+ for (std::uint32_t i = 0; i < fetchSz; ++i)
+ {
+ std::uint32_t seq = missing - i;
+ if (auto h = getLedgerHashForHistory(seq, reason))
+ {
+ assert(h->isNonZero());
+ app_.getInboundLedgers().acquire(*h, seq, reason);
+ }
+ }
+ }
+ catch (std::exception const&)
+ {
+ JLOG(m_journal.warn()) << "Threw while prefetching";
+ }
+ }
+ }
+ else
+ {
+ JLOG(m_journal.fatal()) << "Can't find ledger following prevMissing "
+ << missing;
+ JLOG(m_journal.fatal()) << "Pub:" << mPubLedgerSeq
+ << " Val:" << mValidLedgerSeq;
+ JLOG(m_journal.fatal()) << "Ledgers: "
+ << app_.getLedgerMaster().getCompleteLedgers();
+ JLOG(m_journal.fatal()) << "Acquire reason: "
+ << (reason == InboundLedger::Reason::HISTORY ? "HISTORY" : "SHARD");
+ clearLedger(missing + 1);
+ progress = true;
+ }
+}
+
// Try to publish ledgers, acquire missing ledgers
void LedgerMaster::doAdvance (ScopedLockType& sl)
{
- // TODO NIKB: simplify and unindent this a bit!
-
do
{
mAdvanceWork = false; // If there's work to do, we'll make progress
@@ -1531,147 +1651,53 @@ void LedgerMaster::doAdvance (ScopedLockType& sl)
(app_.getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) &&
(mValidLedgerSeq == mPubLedgerSeq) &&
(getValidatedLedgerAge() < MAX_LEDGER_AGE_ACQUIRE))
- { // We are in sync, so can acquire
- boost::optional maybeMissing;
+ {
+ // We are in sync, so can acquire
+ InboundLedger::Reason reason = InboundLedger::Reason::HISTORY;
+ boost::optional missing;
{
- ScopedLockType sl (mCompleteLock);
- maybeMissing =
- prevMissing(mCompleteLedgers, mPubLedger->info().seq);
+ ScopedLockType sl(mCompleteLock);
+ missing = prevMissing(mCompleteLedgers,
+ mPubLedger->info().seq, NodeStore::genesisSeq);
}
- if (maybeMissing)
+ if (missing)
{
- std::uint32_t missing = *maybeMissing;
- JLOG(m_journal.trace())
- << "tryAdvance discovered missing " << missing;
- if ((missing > 0) &&
+ JLOG(m_journal.trace()) <<
+ "tryAdvance discovered missing " << *missing;
+ if ((mFillInProgress == 0 || *missing > mFillInProgress) &&
shouldAcquire(mValidLedgerSeq, ledger_history_,
- app_.getSHAMapStore().getCanDelete(), missing) &&
- ((mFillInProgress == 0) || (missing > mFillInProgress)))
+ app_.getSHAMapStore().getCanDelete(), *missing))
{
- JLOG(m_journal.trace())
- << "advanceThread should acquire";
- {
- ScopedUnlockType sl(m_mutex);
- auto hash = getLedgerHashForHistory(missing);
- if (hash)
- {
- assert(hash->isNonZero());
- auto ledger = getLedgerByHash(*hash);
- if (!ledger)
- {
- if (!app_.getInboundLedgers().isFailure(
- *hash))
- {
- ledger =
- app_.getInboundLedgers().acquire(
- *hash, missing,
- InboundLedger::fcHISTORY);
- if (!ledger && (missing > 32600) &&
- shouldFetchPack(missing))
- {
- JLOG(m_journal.trace()) <<
- "tryAdvance want fetch pack " <<
- missing;
- fetch_seq_ = missing;
- getFetchPack(*hash, missing);
- }
- else
- JLOG(m_journal.trace()) <<
- "tryAdvance no fetch pack for " <<
- missing;
- }
- else
- JLOG(m_journal.debug()) <<
- "tryAdvance found failed acquire";
- }
- if (ledger)
- {
- auto seq = ledger->info().seq;
- assert(seq == missing);
- JLOG(m_journal.trace())
- << "tryAdvance acquired "
- << ledger->info().seq;
- setFullLedger(
- ledger,
- false,
- false);
- auto const& parent = ledger->info().parentHash;
-
- int fillInProgress;
- {
- ScopedLockType lock(m_mutex);
- mHistLedger = ledger;
- fillInProgress = mFillInProgress;
- }
-
- if (fillInProgress == 0 &&
- getHashByIndex(seq - 1, app_) == parent)
- {
- {
- // Previous ledger is in DB
- ScopedLockType lock(m_mutex);
- mFillInProgress = ledger->info().seq;
- }
-
- app_.getJobQueue().addJob(
- jtADVANCE, "tryFill",
- [this, ledger](Job& j) {
- tryFill(j, ledger);
- });
- }
-
- progress = true;
- }
- else
- {
- try
- {
- for (int i = 0; i < ledger_fetch_size_; ++i)
- {
- std::uint32_t seq = missing - i;
- auto hash2 =
- getLedgerHashForHistory(seq);
- if (hash2)
- {
- assert(hash2->isNonZero());
- app_.getInboundLedgers().acquire
- (*hash2, seq,
- InboundLedger::fcHISTORY);
- }
- }
- }
- catch (std::exception const&)
- {
- JLOG(m_journal.warn()) <<
- "Threw while prefetching";
- }
- }
- }
- else
- {
- JLOG(m_journal.fatal()) <<
- "Can't find ledger following prevMissing " <<
- missing;
- JLOG(m_journal.fatal()) << "Pub:" <<
- mPubLedgerSeq << " Val:" << mValidLedgerSeq;
- JLOG(m_journal.fatal()) << "Ledgers: " <<
- app_.getLedgerMaster().getCompleteLedgers();
- clearLedger(missing + 1);
- progress = true;
- }
- }
- if (mValidLedgerSeq != mPubLedgerSeq)
- {
- JLOG(m_journal.debug()) <<
- "tryAdvance found last valid changed";
- progress = true;
- }
+ JLOG(m_journal.trace()) <<
+ "advanceThread should acquire";
+ }
+ else
+ missing = boost::none;
+ }
+ if (! missing && mFillInProgress == 0)
+ {
+ if (auto shardStore = app_.getShardStore())
+ {
+ missing = shardStore->prepare(mValidLedgerSeq);
+ if (missing)
+ reason = InboundLedger::Reason::SHARD;
+ }
+ }
+ if(missing)
+ {
+ fetchForHistory(*missing, progress, reason);
+ if (mValidLedgerSeq != mPubLedgerSeq)
+ {
+ JLOG (m_journal.debug()) <<
+ "tryAdvance found last valid changed";
+ progress = true;
}
}
}
else
{
mHistLedger.reset();
+ mShardLedger.reset();
JLOG (m_journal.trace()) <<
"tryAdvance not fetching history";
}
@@ -1687,11 +1713,7 @@ void LedgerMaster::doAdvance (ScopedLockType& sl)
ScopedUnlockType sul (m_mutex);
JLOG (m_journal.debug()) <<
"tryAdvance publishing seq " << ledger->info().seq;
-
- setFullLedger(
- ledger,
- true,
- true);
+ setFullLedger(ledger, true, true);
}
setPubLedger(ledger);
diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp
index d947495fe..152498deb 100644
--- a/src/ripple/app/main/Application.cpp
+++ b/src/ripple/app/main/Application.cpp
@@ -79,6 +79,7 @@ private:
TreeNodeCache treecache_;
FullBelowCache fullbelow_;
NodeStore::Database& db_;
+ bool const shardBacked_;
beast::Journal j_;
// missing node handler
@@ -97,7 +98,9 @@ private:
"Missing node in " << to_string (hash);
app_.getInboundLedgers ().acquire (
- hash, seq, InboundLedger::fcGENERIC);
+ hash, seq, shardBacked_ ?
+ InboundLedger::Reason::SHARD :
+ InboundLedger::Reason::GENERIC);
}
}
@@ -114,6 +117,8 @@ public:
collectorManager.collector(),
fullBelowTargetSize, fullBelowExpirationSeconds)
, db_ (db)
+ , shardBacked_ (
+ dynamic_cast(&db) != nullptr)
, j_ (app.journal("SHAMap"))
{
}
@@ -160,6 +165,12 @@ public:
return db_;
}
+ bool
+ isShardBacked() const override
+ {
+ return shardBacked_;
+ }
+
void
missing_node (std::uint32_t seq) override
{
@@ -200,9 +211,20 @@ public:
}
void
- missing_node (uint256 const& hash) override
+ missing_node (uint256 const& hash, std::uint32_t seq) override
{
- acquire (hash, 0);
+ acquire (hash, seq);
+ }
+
+ void
+ reset () override
+ {
+ {
+ std::lock_guard l(maxSeqLock);
+ maxSeq = 0;
+ }
+ fullbelow_.reset();
+ treecache_.reset();
}
};
@@ -308,7 +330,9 @@ public:
// These are Stoppable-related
std::unique_ptr m_jobQueue;
std::unique_ptr m_nodeStore;
+ std::unique_ptr shardStore_;
detail::AppFamily family_;
+ std::unique_ptr sFamily_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr m_pathRequests;
@@ -381,9 +405,8 @@ public:
, m_nodeStoreScheduler (*this)
, m_shaMapStore (make_SHAMapStore (*this, setup_SHAMapStore (*config_),
- *this, m_nodeStoreScheduler,
- logs_->journal ("SHAMapStore"), logs_->journal ("NodeObject"),
- m_txMaster, *config_))
+ *this, m_nodeStoreScheduler, logs_->journal("SHAMapStore"),
+ logs_->journal("NodeObject"), m_txMaster, *config_))
, accountIDCache_(128000)
@@ -411,6 +434,9 @@ public:
, m_nodeStore (
m_shaMapStore->makeDatabase ("NodeStore.main", 4, *m_jobQueue))
+ , shardStore_ (
+ m_shaMapStore->makeDatabaseShard ("ShardStore", 4, *m_jobQueue))
+
, family_ (*this, *m_nodeStore, *m_collectorManager)
, m_orderBookDB (*this, *m_jobQueue)
@@ -493,6 +519,9 @@ public:
, m_io_latency_sampler (m_collectorManager->collector()->make_event ("ios_latency"),
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
{
+ if (shardStore_)
+ sFamily_ = std::make_unique(
+ *this, *shardStore_, *m_collectorManager);
add (m_resourceManager.get ());
//
@@ -546,12 +575,16 @@ public:
return *m_collectorManager;
}
- Family&
- family() override
+ Family& family() override
{
return family_;
}
+ Family* shardFamily() override
+ {
+ return sFamily_.get();
+ }
+
TimeKeeper&
timeKeeper() override
{
@@ -632,6 +665,11 @@ public:
return *m_nodeStore;
}
+ NodeStore::DatabaseShard* getShardStore () override
+ {
+ return shardStore_.get();
+ }
+
Application::MutexType& getMasterMutex () override
{
return m_masterMutex;
@@ -988,15 +1026,21 @@ public:
// VFALCO TODO fix the dependency inversion using an observer,
// have listeners register for "onSweep ()" notification.
- family().fullbelow().sweep ();
+ family().fullbelow().sweep();
+ if (sFamily_)
+ sFamily_->fullbelow().sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
+ if (shardStore_)
+ shardStore_->sweep();
getLedgerMaster().sweep();
getTempNodeCache().sweep();
getValidations().expire();
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
+ if (sFamily_)
+ sFamily_->treecache().sweep();
cachedSLEs_.expire();
// Set timer to do another sweep later.
@@ -1017,6 +1061,7 @@ private:
void addTxnSeqField();
void addValidationSeqFields();
bool updateTables ();
+ bool validateShards ();
void startGenesisLedger ();
std::shared_ptr
@@ -1199,6 +1244,13 @@ bool ApplicationImp::setup()
m_ledgerMaster->tune (config_->getSize (siLedgerSize), config_->getSize (siLedgerAge));
family().treecache().setTargetSize (config_->getSize (siTreeCacheSize));
family().treecache().setTargetAge (config_->getSize (siTreeCacheAge));
+ if (shardStore_)
+ {
+ shardStore_->tune(config_->getSize(siNodeCacheSize),
+ config_->getSize(siNodeCacheAge));
+ sFamily_->treecache().setTargetSize(config_->getSize(siTreeCacheSize));
+ sFamily_->treecache().setTargetAge(config_->getSize(siTreeCacheAge));
+ }
//----------------------------------------------------------------------
//
@@ -1216,6 +1268,9 @@ bool ApplicationImp::setup()
*config_);
add (*m_overlay); // add to PropertyStream
+ if (config_->valShards && !validateShards())
+ return false;
+
validatorSites_->start ();
// start first consensus round
@@ -1624,7 +1679,7 @@ bool ApplicationImp::loadOldLedger (
{
// Try to build the ledger from the back end
auto il = std::make_shared (
- *this, hash, 0, InboundLedger::fcGENERIC,
+ *this, hash, 0, InboundLedger::Reason::GENERIC,
stopwatch());
if (il->checkLocal ())
loadLedger = il->getLedger ();
@@ -1664,7 +1719,7 @@ bool ApplicationImp::loadOldLedger (
// Try to build the ledger from the back end
auto il = std::make_shared (
*this, replayLedger->info().parentHash,
- 0, InboundLedger::fcGENERIC, stopwatch());
+ 0, InboundLedger::Reason::GENERIC, stopwatch());
if (il->checkLocal ())
loadLedger = il->getLedger ();
@@ -2008,6 +2063,32 @@ bool ApplicationImp::updateTables ()
return true;
}
+bool ApplicationImp::validateShards()
+{
+ if (!m_overlay)
+ Throw("no overlay");
+ if(config_->standalone())
+ {
+ JLOG(m_journal.fatal()) <<
+ "Shard validation cannot be run in standalone";
+ return false;
+ }
+ if (config_->section(ConfigSection::shardDatabase()).empty())
+ {
+ JLOG (m_journal.fatal()) <<
+ "The [shard_db] configuration setting must be set";
+ return false;
+ }
+ if (!shardStore_)
+ {
+ JLOG(m_journal.fatal()) <<
+ "Invalid [shard_db] configuration";
+ return false;
+ }
+ shardStore_->validate();
+ return true;
+}
+
void ApplicationImp::setMaxDisallowedLedger()
{
boost::optional seq;
diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h
index 2769be22d..af7c0032d 100644
--- a/src/ripple/app/main/Application.h
+++ b/src/ripple/app/main/Application.h
@@ -35,7 +35,7 @@ namespace ripple {
namespace unl { class Manager; }
namespace Resource { class Manager; }
-namespace NodeStore { class Database; }
+namespace NodeStore { class Database; class DatabaseShard; }
// VFALCO TODO Fix forward declares required for header dependency loops
class AmendmentTable;
@@ -116,32 +116,41 @@ public:
// ---
//
- virtual Logs& logs() = 0;
- virtual Config& config() = 0;
- virtual boost::asio::io_service& getIOService () = 0;
- virtual CollectorManager& getCollectorManager () = 0;
- virtual Family& family() = 0;
- virtual TimeKeeper& timeKeeper() = 0;
- virtual JobQueue& getJobQueue () = 0;
- virtual NodeCache& getTempNodeCache () = 0;
- virtual CachedSLEs& cachedSLEs() = 0;
- virtual AmendmentTable& getAmendmentTable() = 0;
- virtual HashRouter& getHashRouter () = 0;
- virtual LoadFeeTrack& getFeeTrack () = 0;
- virtual LoadManager& getLoadManager () = 0;
- virtual Overlay& overlay () = 0;
- virtual TxQ& getTxQ() = 0;
- virtual ValidatorList& validators () = 0;
- virtual ValidatorSite& validatorSites () = 0;
- virtual ManifestCache& validatorManifests () = 0;
- virtual ManifestCache& publisherManifests () = 0;
- virtual Cluster& cluster () = 0;
- virtual RCLValidations& getValidations () = 0;
- virtual NodeStore::Database& getNodeStore () = 0;
- virtual InboundLedgers& getInboundLedgers () = 0;
- virtual InboundTransactions& getInboundTransactions () = 0;
- virtual TaggedCache &
- getAcceptedLedgerCache () = 0;
+ virtual Logs& logs() = 0;
+ virtual Config& config() = 0;
+
+ virtual
+ boost::asio::io_service&
+ getIOService () = 0;
+
+ virtual CollectorManager& getCollectorManager () = 0;
+ virtual Family& family() = 0;
+ virtual Family* shardFamily() = 0;
+ virtual TimeKeeper& timeKeeper() = 0;
+ virtual JobQueue& getJobQueue () = 0;
+ virtual NodeCache& getTempNodeCache () = 0;
+ virtual CachedSLEs& cachedSLEs() = 0;
+ virtual AmendmentTable& getAmendmentTable() = 0;
+ virtual HashRouter& getHashRouter () = 0;
+ virtual LoadFeeTrack& getFeeTrack () = 0;
+ virtual LoadManager& getLoadManager () = 0;
+ virtual Overlay& overlay () = 0;
+ virtual TxQ& getTxQ() = 0;
+ virtual ValidatorList& validators () = 0;
+ virtual ValidatorSite& validatorSites () = 0;
+ virtual ManifestCache& validatorManifests () = 0;
+ virtual ManifestCache& publisherManifests () = 0;
+ virtual Cluster& cluster () = 0;
+ virtual RCLValidations& getValidations () = 0;
+ virtual NodeStore::Database& getNodeStore () = 0;
+ virtual NodeStore::DatabaseShard* getShardStore() = 0;
+ virtual InboundLedgers& getInboundLedgers () = 0;
+ virtual InboundTransactions& getInboundTransactions () = 0;
+
+ virtual
+ TaggedCache &
+ getAcceptedLedgerCache () = 0;
+
virtual LedgerMaster& getLedgerMaster () = 0;
virtual NetworkOPs& getOPs () = 0;
virtual OrderBookDB& getOrderBookDB () = 0;
@@ -162,10 +171,12 @@ public:
virtual AccountIDCache const& accountIDCache() const = 0;
virtual OpenLedger& openLedger() = 0;
virtual OpenLedger const& openLedger() const = 0;
- virtual DatabaseCon& getTxnDB () = 0;
- virtual DatabaseCon& getLedgerDB () = 0;
+ virtual DatabaseCon& getTxnDB () = 0;
+ virtual DatabaseCon& getLedgerDB () = 0;
- virtual std::chrono::milliseconds getIOLatency () = 0;
+ virtual
+ std::chrono::milliseconds
+ getIOLatency () = 0;
virtual bool serverOkay (std::string& reason) = 0;
diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp
index 93f053344..bf18c6827 100644
--- a/src/ripple/app/main/Main.cpp
+++ b/src/ripple/app/main/Main.cpp
@@ -270,6 +270,12 @@ int run (int argc, char** argv)
importText += ConfigSection::nodeDatabase ();
importText += "] configuration file section).";
}
+ std::string shardsText;
+ {
+ shardsText += "Validate an existing shard database (specified in the [";
+ shardsText += ConfigSection::shardDatabase();
+ shardsText += "] configuration file section).";
+ }
// Set up option parsing.
//
@@ -303,6 +309,7 @@ int run (int argc, char** argv)
("debug", "Enable normally suppressed debug logging")
("fg", "Run in the foreground.")
("import", importText.c_str ())
+ ("shards", shardsText.c_str ())
("version", "Display the build version.")
;
@@ -402,6 +409,9 @@ int run (int argc, char** argv)
if (vm.count ("import"))
config->doImport = true;
+ if (vm.count ("shards"))
+ config->valShards = true;
+
if (vm.count ("ledger"))
{
config->START_LEDGER = vm["ledger"].as ();
diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp
index 140c86b89..16223bcb2 100644
--- a/src/ripple/app/misc/NetworkOPs.cpp
+++ b/src/ripple/app/misc/NetworkOPs.cpp
@@ -1368,7 +1368,7 @@ bool NetworkOPsImp::checkLastClosedLedger (
if (!consensus)
consensus = app_.getInboundLedgers().acquire (
- closedLedger, 0, InboundLedger::fcCONSENSUS);
+ closedLedger, 0, InboundLedger::Reason::CONSENSUS);
if (consensus &&
! m_ledgerMaster.isCompatible (*consensus, m_journal.debug(),
diff --git a/src/ripple/app/misc/SHAMapStore.h b/src/ripple/app/misc/SHAMapStore.h
index 6ac3229e7..67fb0ba9f 100644
--- a/src/ripple/app/misc/SHAMapStore.h
+++ b/src/ripple/app/misc/SHAMapStore.h
@@ -48,6 +48,7 @@ public:
std::uint32_t deleteBatch = 100;
std::uint32_t backOff = 100;
std::int32_t ageThreshold = 60;
+ Section shardDatabase;
};
SHAMapStore (Stoppable& parent) : Stoppable ("SHAMapStore", parent) {}
@@ -63,6 +64,10 @@ public:
std::string const& name,
std::int32_t readThreads, Stoppable& parent) = 0;
+ virtual std::unique_ptr makeDatabaseShard(
+ std::string const& name, std::int32_t readThreads,
+ Stoppable& parent) = 0;
+
/** Highest ledger that may be deleted. */
virtual LedgerIndex setCanDelete (LedgerIndex canDelete) = 0;
diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp
index 35b8416cb..ef1f25110 100644
--- a/src/ripple/app/misc/SHAMapStoreImp.cpp
+++ b/src/ripple/app/misc/SHAMapStoreImp.cpp
@@ -19,11 +19,13 @@
#include
-#include
#include
#include
-#include
+#include
#include
+#include
+#include
+#include
namespace ripple {
void SHAMapStoreImp::SavedStateDB::init (BasicConfig const& config,
@@ -199,6 +201,41 @@ SHAMapStoreImp::SHAMapStoreImp (
dbPaths();
}
+ if (! setup_.shardDatabase.empty())
+ {
+ boost::filesystem::path dbPath =
+ get(setup_.shardDatabase, "path");
+ if (dbPath.empty())
+ Throw("shard path missing");
+ if (boost::filesystem::exists(dbPath))
+ {
+ if (! boost::filesystem::is_directory(dbPath))
+ Throw("shard db path must be a directory.");
+ }
+ else
+ boost::filesystem::create_directories(dbPath);
+
+ auto const maxDiskSpace = get(
+ setup_.shardDatabase, "max_size_gb", 0);
+ // Must be large enough for one shard
+ if (maxDiskSpace < 3)
+ Throw("max_size_gb too small");
+ if ((maxDiskSpace << 30) < maxDiskSpace)
+ Throw("overflow max_size_gb");
+
+ std::uint32_t lps;
+ if (get_if_exists(
+ setup_.shardDatabase, "ledgers_per_shard", lps))
+ {
+ // ledgers_per_shard to be set only in standalone for testing
+ if (! setup_.standalone)
+ Throw(
+ "ledgers_per_shard only honored in stand alone");
+ if (lps == 0 || lps % 256 != 0)
+ Throw(
+ "ledgers_per_shard must be a multiple of 256");
+ }
+ }
}
std::unique_ptr
@@ -206,39 +243,51 @@ SHAMapStoreImp::makeDatabase (std::string const& name,
std::int32_t readThreads, Stoppable& parent)
{
std::unique_ptr db;
-
if (setup_.deleteInterval)
{
SavedState state = state_db_.getState();
-
- std::shared_ptr writableBackend (
- makeBackendRotating (state.writableDb));
- std::shared_ptr archiveBackend (
- makeBackendRotating (state.archiveDb));
-
- fdlimit_ = writableBackend->fdlimit() + archiveBackend->fdlimit();
-
- std::unique_ptr dbr =
- makeDatabaseRotating (name, readThreads, parent,
- writableBackend, archiveBackend);
-
- if (!state.writableDb.size())
+ auto writableBackend = makeBackendRotating(state.writableDb);
+ auto archiveBackend = makeBackendRotating(state.archiveDb);
+ if (! state.writableDb.size())
{
state.writableDb = writableBackend->getName();
state.archiveDb = archiveBackend->getName();
state_db_.setState (state);
}
- database_ = dbr.get();
- db.reset (dynamic_cast (dbr.release()));
+ // Create NodeStore with two backends to allow online deletion of data
+ auto dbr = std::make_unique(
+ "NodeStore.main", scheduler_, readThreads, parent,
+ std::move(writableBackend), std::move(archiveBackend),
+ nodeStoreJournal_);
+ fdlimit_ += dbr->fdlimit();
+ dbRotating_ = dbr.get();
+ db.reset(dynamic_cast(dbr.release()));
}
else
{
db = NodeStore::Manager::instance().make_Database (name, scheduler_,
readThreads, parent, setup_.nodeDatabase, nodeStoreJournal_);
- fdlimit_ = db->fdlimit();
+ fdlimit_ += db->fdlimit();
}
+ return db;
+}
+std::unique_ptr
+SHAMapStoreImp::makeDatabaseShard(std::string const& name,
+ std::int32_t readThreads, Stoppable& parent)
+{
+ std::unique_ptr db;
+ if(! setup_.shardDatabase.empty())
+ {
+ db = std::make_unique(
+ app_, name, parent, scheduler_, readThreads,
+ setup_.shardDatabase, app_.journal("ShardStore"));
+ if (db->init())
+ fdlimit_ += db->fdlimit();
+ else
+ db.reset();
+ }
return db;
}
@@ -277,8 +326,8 @@ bool
SHAMapStoreImp::copyNode (std::uint64_t& nodeCount,
SHAMapAbstractNode const& node)
{
- // Copy a single record from node to database_
- database_->fetchNode (node.getNodeHash().as_uint256());
+ // Copy a single record from node to dbRotating_
+ dbRotating_->fetch(node.getNodeHash().as_uint256(), node.getSeq());
if (! (++nodeCount % checkHealthInterval_))
{
if (health())
@@ -399,11 +448,9 @@ SHAMapStoreImp::run()
;
}
- std::shared_ptr newBackend =
- makeBackendRotating();
+ auto newBackend = makeBackendRotating();
JLOG(journal_.debug()) << validatedSeq << " new backend "
<< newBackend->getName();
- std::shared_ptr oldBackend;
clearCaches (validatedSeq);
switch (health())
@@ -419,15 +466,17 @@ SHAMapStoreImp::run()
}
std::string nextArchiveDir =
- database_->getWritableBackend()->getName();
+ dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
+ std::unique_ptr oldBackend;
{
- std::lock_guard lock (database_->peekMutex());
+ std::lock_guard lock (dbRotating_->peekMutex());
state_db_.setState (SavedState {newBackend->getName(),
nextArchiveDir, lastRotated});
clearCaches (validatedSeq);
- oldBackend = database_->rotateBackends (newBackend);
+ oldBackend = dbRotating_->rotateBackends(
+ std::move(newBackend));
}
JLOG(journal_.debug()) << "finished rotation " << validatedSeq;
@@ -498,7 +547,7 @@ SHAMapStoreImp::dbPaths()
}
}
-std::shared_ptr
+std::unique_ptr
SHAMapStoreImp::makeBackendRotating (std::string path)
{
boost::filesystem::path newPath;
@@ -517,19 +566,10 @@ SHAMapStoreImp::makeBackendRotating (std::string path)
}
parameters.set("path", newPath.string());
- return NodeStore::Manager::instance().make_Backend (parameters, scheduler_,
- nodeStoreJournal_);
-}
-
-std::unique_ptr
-SHAMapStoreImp::makeDatabaseRotating (std::string const& name,
- std::int32_t readThreads, Stoppable& parent,
- std::shared_ptr writableBackend,
- std::shared_ptr archiveBackend) const
-{
- return NodeStore::Manager::instance().make_DatabaseRotating (
- name, scheduler_, readThreads, parent,
- writableBackend, archiveBackend, nodeStoreJournal_);
+ auto backend {NodeStore::Manager::instance().make_Backend(
+ parameters, scheduler_, nodeStoreJournal_)};
+ backend->open();
+ return backend;
}
bool
@@ -583,7 +623,7 @@ SHAMapStoreImp::clearCaches (LedgerIndex validatedSeq)
void
SHAMapStoreImp::freshenCaches()
{
- if (freshenCache (database_->getPositiveCache()))
+ if (freshenCache (dbRotating_->getPositiveCache()))
return;
if (freshenCache (*treeNodeCache_))
return;
@@ -825,12 +865,13 @@ setup_SHAMapStore (Config const& c)
get_if_exists (setup.nodeDatabase, "backOff", setup.backOff);
get_if_exists (setup.nodeDatabase, "age_threshold", setup.ageThreshold);
+ setup.shardDatabase = c.section(ConfigSection::shardDatabase());
return setup;
}
std::unique_ptr
make_SHAMapStore (Application& app,
- SHAMapStore::Setup const& s,
+ SHAMapStore::Setup const& setup,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal,
@@ -838,9 +879,8 @@ make_SHAMapStore (Application& app,
TransactionMaster& transactionMaster,
BasicConfig const& config)
{
- return std::make_unique(app, s, parent, scheduler,
- journal, nodeStoreJournal, transactionMaster,
- config);
+ return std::make_unique(app, setup, parent, scheduler,
+ journal, nodeStoreJournal, transactionMaster, config);
}
}
diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h
index 6d047f5d0..8ce8ec33e 100644
--- a/src/ripple/app/misc/SHAMapStoreImp.h
+++ b/src/ripple/app/misc/SHAMapStoreImp.h
@@ -27,7 +27,6 @@
#include
#include
-
namespace ripple {
class NetworkOPs;
@@ -87,7 +86,7 @@ private:
NodeStore::Scheduler& scheduler_;
beast::Journal journal_;
beast::Journal nodeStoreJournal_;
- NodeStore::DatabaseRotating* database_ = nullptr;
+ NodeStore::DatabaseRotating* dbRotating_ = nullptr;
SavedStateDB state_db_;
std::thread thread_;
bool stop_ = false;
@@ -136,6 +135,10 @@ public:
std::string const&name,
std::int32_t readThreads, Stoppable& parent) override;
+ std::unique_ptr
+ makeDatabaseShard(std::string const& name,
+ std::int32_t readThreads, Stoppable& parent) override;
+
LedgerIndex
setCanDelete (LedgerIndex seq) override
{
@@ -176,24 +179,9 @@ private:
bool copyNode (std::uint64_t& nodeCount, SHAMapAbstractNode const &node);
void run();
void dbPaths();
- std::shared_ptr makeBackendRotating (
- std::string path = std::string());
- /**
- * Creates a NodeStore with two
- * backends to allow online deletion of data.
- *
- * @param name A diagnostic label for the database.
- * @param readThreads The number of async read threads to create
- * @param writableBackend backend for writing
- * @param archiveBackend backend for archiving
- *
- * @return The opened database.
- */
- std::unique_ptr
- makeDatabaseRotating (std::string const&name,
- std::int32_t readThreads, Stoppable& parent,
- std::shared_ptr writableBackend,
- std::shared_ptr archiveBackend) const;
+
+ std::unique_ptr
+ makeBackendRotating (std::string path = std::string());
template
bool
@@ -203,7 +191,7 @@ private:
for (auto const& key: cache.getKeys())
{
- database_->fetchNode (key);
+ dbRotating_->fetch(key, 0);
if (! (++check % checkHealthInterval_) && health())
return true;
}
diff --git a/src/ripple/basics/KeyCache.h b/src/ripple/basics/KeyCache.h
index ed6560ab0..b5bf5528d 100644
--- a/src/ripple/basics/KeyCache.h
+++ b/src/ripple/basics/KeyCache.h
@@ -155,6 +155,14 @@ public:
m_map.clear ();
}
+ void reset ()
+ {
+ lock_guard lock(m_mutex);
+ m_map.clear();
+ m_stats.hits = 0;
+ m_stats.misses = 0;
+ }
+
void setTargetSize (size_type s)
{
lock_guard lock (m_mutex);
diff --git a/src/ripple/basics/TaggedCache.h b/src/ripple/basics/TaggedCache.h
index 2105eb5c0..2193be67d 100644
--- a/src/ripple/basics/TaggedCache.h
+++ b/src/ripple/basics/TaggedCache.h
@@ -146,13 +146,6 @@ public:
return m_hits * (100.0f / std::max (1.0f, total));
}
- void clearStats ()
- {
- lock_guard lock (m_mutex);
- m_hits = 0;
- m_misses = 0;
- }
-
void clear ()
{
lock_guard lock (m_mutex);
@@ -160,6 +153,15 @@ public:
m_cache_count = 0;
}
+ void reset ()
+ {
+ lock_guard lock (m_mutex);
+ m_cache.clear();
+ m_cache_count = 0;
+ m_hits = 0;
+ m_misses = 0;
+ }
+
void sweep ()
{
int cacheRemovals = 0;
@@ -476,7 +478,7 @@ public:
return m_mutex;
}
- std::vector getKeys ()
+ std::vector getKeys () const
{
std::vector v;
diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h
index b2d123eab..f70005542 100644
--- a/src/ripple/core/Config.h
+++ b/src/ripple/core/Config.h
@@ -110,6 +110,7 @@ private:
public:
bool doImport = false;
+ bool valShards = false;
bool ELB_SUPPORT = false;
std::vector IPS; // Peer IPs from rippled.cfg.
diff --git a/src/ripple/core/ConfigSections.h b/src/ripple/core/ConfigSections.h
index 29031f21a..fb3953043 100644
--- a/src/ripple/core/ConfigSections.h
+++ b/src/ripple/core/ConfigSections.h
@@ -28,6 +28,7 @@ namespace ripple {
struct ConfigSection
{
static std::string nodeDatabase () { return "node_db"; }
+ static std::string shardDatabase () { return "shard_db"; }
static std::string importNodeDatabase () { return "import_db"; }
};
diff --git a/src/ripple/nodestore/Backend.h b/src/ripple/nodestore/Backend.h
index 2ee7088f2..98872db3c 100644
--- a/src/ripple/nodestore/Backend.h
+++ b/src/ripple/nodestore/Backend.h
@@ -50,6 +50,11 @@ public:
*/
virtual std::string getName() = 0;
+ /** Open the backend.
+ This allows the caller to catch exceptions.
+ */
+ virtual void open() = 0;
+
/** Close the backend.
This allows the caller to catch exceptions.
*/
diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h
index 611a0850a..c411d1941 100644
--- a/src/ripple/nodestore/Database.h
+++ b/src/ripple/nodestore/Database.h
@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
+ Copyright (c) 2012, 2017 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -21,11 +21,19 @@
#define RIPPLE_NODESTORE_DATABASE_H_INCLUDED
#include
+#include
#include
-#include
#include
+#include
+#include
+#include
+
+#include
namespace ripple {
+
+class Ledger;
+
namespace NodeStore {
/** Persistency layer for NodeObject
@@ -50,22 +58,56 @@ public:
@param name The Stoppable name for this Database.
@param parent The parent Stoppable.
+ @param scheduler The scheduler to use for performing asynchronous tasks.
+ @param readThreads The number of async read threads to create.
+ @param journal Destination for logging output.
*/
- Database (std::string name, Stoppable& parent)
- : Stoppable (std::move (name), parent)
- { }
+ Database(std::string name, Stoppable& parent, Scheduler& scheduler,
+ int readThreads, beast::Journal j);
/** Destroy the node store.
All pending operations are completed, pending writes flushed,
and files closed before this returns.
*/
- virtual ~Database() = default;
+ virtual
+ ~Database();
/** Retrieve the name associated with this backend.
This is used for diagnostics and may not reflect the actual path
or paths used by the underlying backend.
*/
- virtual std::string getName () const = 0;
+ virtual
+ std::string
+ getName() const = 0;
+
+ /** Import objects from another database. */
+ virtual
+ void
+ import(Database& source) = 0;
+
+ /** Retrieve the estimated number of pending write operations.
+ This is used for diagnostics.
+ */
+ virtual
+ std::int32_t
+ getWriteLoad() const = 0;
+
+ /** Store the object.
+
+ The caller's Blob parameter is overwritten.
+
+ @param type The type of object.
+ @param data The payload of the object. The caller's
+ variable is overwritten.
+ @param hash The 256-bit hash of the payload data.
+ @param seq The sequence of the ledger the object belongs to.
+
+ @return `true` if the object was stored?
+ */
+ virtual
+ void
+ store(NodeObjectType type, Blob&& data,
+ uint256 const& hash, std::uint32_t seq) = 0;
/** Fetch an object.
If the object is known to be not in the database, isn't found in the
@@ -74,9 +116,12 @@ public:
@note This can be called concurrently.
@param hash The key of the object to retrieve.
+ @param seq The sequence of the ledger where the object is stored.
@return The object, or nullptr if it couldn't be retrieved.
*/
- virtual std::shared_ptr fetch (uint256 const& hash) = 0;
+ virtual
+ std::shared_ptr
+ fetch(uint256 const& hash, std::uint32_t seq) = 0;
/** Fetch an object without waiting.
If I/O is required to determine whether or not the object is present,
@@ -86,35 +131,143 @@ public:
@note This can be called concurrently.
@param hash The key of the object to retrieve
+ @param seq The sequence of the ledger where the object is stored.
@param object The object retrieved
@return Whether the operation completed
*/
- virtual bool asyncFetch (uint256 const& hash, std::shared_ptr& object) = 0;
+ virtual
+ bool
+ asyncFetch(uint256 const& hash, std::uint32_t seq,
+ std::shared_ptr& object) = 0;
+
+ /** Copies a ledger stored in a different database to this one.
+
+ @param ledger The ledger to copy.
+ @return true if the operation was successful
+ */
+ virtual
+ bool
+ copyLedger(std::shared_ptr const& ledger) = 0;
/** Wait for all currently pending async reads to complete.
*/
- virtual void waitReads () = 0;
+ void
+ waitReads();
/** Get the maximum number of async reads the node store prefers.
+
+ @param seq A ledger sequence specifying a shard to query.
@return The number of async reads preferred.
+ @note The sequence is only used with the shard store.
*/
- virtual int getDesiredAsyncReadCount () = 0;
+ virtual
+ int
+ getDesiredAsyncReadCount(std::uint32_t seq) = 0;
- /** Store the object.
+ /** Get the positive cache hits to total attempts ratio. */
+ virtual
+ float
+ getCacheHitRate() = 0;
- The caller's Blob parameter is overwritten.
+ /** Set the maximum number of entries and maximum cache age for both caches.
- @param type The type of object.
- @param ledgerIndex The ledger in which the object appears.
- @param data The payload of the object. The caller's
- variable is overwritten.
- @param hash The 256-bit hash of the payload data.
-
- @return `true` if the object was stored?
+ @param size Number of cache entries (0 = ignore)
+ @param age Maximum cache age in seconds
*/
- virtual void store (NodeObjectType type,
- Blob&& data,
- uint256 const& hash) = 0;
+ virtual
+ void
+ tune(int size, int age) = 0;
+
+ /** Remove expired entries from the positive and negative caches. */
+ virtual
+ void
+ sweep() = 0;
+
+ /** Gather statistics pertaining to read and write activities.
+
+ @return The total read and written bytes.
+ */
+ std::uint32_t
+ getStoreCount() const { return storeCount_; }
+
+ std::uint32_t
+ getFetchTotalCount() const { return fetchTotalCount_; }
+
+ std::uint32_t
+ getFetchHitCount() const { return fetchHitCount_; }
+
+ std::uint32_t
+ getStoreSize() const { return storeSz_; }
+
+ std::uint32_t
+ getFetchSize() const { return fetchSz_; }
+
+ /** Return the number of files needed by our backend(s) */
+ int
+ fdlimit() const { return fdLimit_; }
+
+ void
+ onStop();
+
+protected:
+ beast::Journal j_;
+ Scheduler& scheduler_;
+ int fdLimit_ {0};
+
+ void
+ stopThreads();
+
+ void
+ storeStats(size_t sz)
+ {
+ ++storeCount_;
+ storeSz_ += sz;
+ }
+
+ void
+ asyncFetch(uint256 const& hash, std::uint32_t seq,
+ std::shared_ptr> const& pCache,
+ std::shared_ptr> const& nCache);
+
+ std::shared_ptr
+ fetchInternal(uint256 const& hash, Backend& backend);
+
+ void
+ importInternal(Database& source, Backend& dest);
+
+ std::shared_ptr
+ doFetch(uint256 const& hash, std::uint32_t seq,
+ std::shared_ptr> const& pCache,
+ std::shared_ptr> const& nCache, bool isAsync);
+
+private:
+ std::atomic storeCount_ {0};
+ std::atomic fetchTotalCount_ {0};
+ std::atomic fetchHitCount_ {0};
+ std::atomic storeSz_ {0};
+ std::atomic fetchSz_ {0};
+
+ std::mutex readLock_;
+ std::condition_variable readCondVar_;
+ std::condition_variable readGenCondVar_;
+
+ // reads to do
+ std::map>,
+ std::weak_ptr>>> read_;
+
+ // last read
+ uint256 readLastHash_;
+
+ std::vector readThreads_;
+ bool readShut_ {false};
+
+ // current read generation
+ uint64_t readGen_ {0};
+
+ virtual
+ std::shared_ptr
+ fetchFrom(uint256 const& hash, std::uint32_t seq) = 0;
/** Visit every object in the database
This is usually called during import.
@@ -123,40 +276,12 @@ public:
or other methods.
@see import
*/
- virtual void for_each(std::function )> f) = 0;
+ virtual
+ void
+ for_each(std::function )> f) = 0;
- /** Import objects from another database. */
- virtual void import (Database& source) = 0;
-
- /** Retrieve the estimated number of pending write operations.
- This is used for diagnostics.
- */
- virtual std::int32_t getWriteLoad() const = 0;
-
- /** Get the positive cache hits to total attempts ratio. */
- virtual float getCacheHitRate () = 0;
-
- /** Set the maximum number of entries and maximum cache age for both caches.
-
- @param size Number of cache entries (0 = ignore)
- @param age Maximum cache age in seconds
- */
- virtual void tune (int size, int age) = 0;
-
- /** Remove expired entries from the positive and negative caches. */
- virtual void sweep () = 0;
-
- /** Gather statistics pertaining to read and write activities.
- Return the reads and writes, and total read and written bytes.
- */
- virtual std::uint32_t getStoreCount () const = 0;
- virtual std::uint32_t getFetchTotalCount () const = 0;
- virtual std::uint32_t getFetchHitCount () const = 0;
- virtual std::uint32_t getStoreSize () const = 0;
- virtual std::uint32_t getFetchSize () const = 0;
-
- /** Return the number of files needed by our backend */
- virtual int fdlimit() const = 0;
+ void
+ threadEntry();
};
}
diff --git a/src/ripple/nodestore/DatabaseRotating.h b/src/ripple/nodestore/DatabaseRotating.h
index 75173ed6b..7b2a02c38 100644
--- a/src/ripple/nodestore/DatabaseRotating.h
+++ b/src/ripple/nodestore/DatabaseRotating.h
@@ -30,24 +30,27 @@ namespace NodeStore {
* rotated in. Old ones are rotated out and deleted.
*/
-class DatabaseRotating
+class DatabaseRotating : public Database
{
public:
- virtual ~DatabaseRotating() = default;
+ DatabaseRotating(std::string const& name, Stoppable& parent,
+ Scheduler& scheduler, int readThreads, beast::Journal journal)
+ : Database(name, parent, scheduler, readThreads, journal)
+ {}
- virtual TaggedCache & getPositiveCache() = 0;
+ virtual
+ TaggedCache const&
+ getPositiveCache() = 0;
virtual std::mutex& peekMutex() const = 0;
- virtual std::shared_ptr const& getWritableBackend() const = 0;
+ virtual
+ std::unique_ptr const&
+ getWritableBackend() const = 0;
- virtual std::shared_ptr const& getArchiveBackend () const = 0;
-
- virtual std::shared_ptr rotateBackends (
- std::shared_ptr const& newBackend) = 0;
-
- /** Ensure that node is in writableBackend */
- virtual std::shared_ptr fetchNode (uint256 const& hash) = 0;
+ virtual
+ std::unique_ptr
+ rotateBackends(std::unique_ptr newBackend) = 0;
};
}
diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h
new file mode 100644
index 000000000..51db001e7
--- /dev/null
+++ b/src/ripple/nodestore/DatabaseShard.h
@@ -0,0 +1,177 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2017 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_NODESTORE_DATABASESHARD_H_INCLUDED
+#define RIPPLE_NODESTORE_DATABASESHARD_H_INCLUDED
+
+#include
+#include
+#include
+
+#include
+
+#include
+
+namespace ripple {
+namespace NodeStore {
+
+/** A collection of historical shards
+*/
+class DatabaseShard : public Database
+{
+public:
+ /** Construct a shard store
+
+ @param name The Stoppable name for this Database
+ @param parent The parent Stoppable
+ @param scheduler The scheduler to use for performing asynchronous tasks
+ @param readThreads The number of async read threads to create
+ @param config The configuration for the database
+ @param journal Destination for logging output
+ */
+ DatabaseShard(std::string const& name, Stoppable& parent,
+ Scheduler& scheduler, int readThreads,
+ Section const& config, beast::Journal journal)
+ : Database(name, parent, scheduler, readThreads, journal)
+ {
+ get_if_exists(config, "ledgers_per_shard", lps_);
+ }
+
+ /** Initialize the database
+
+ @return `true` if the database initialized without error
+ */
+ virtual
+ bool
+ init() = 0;
+
+ /** Prepare to store a new ledger in the shard
+
+ @param validLedgerSeq the index of the maximum valid ledgers
+ @return if a ledger should be fetched and stored, then returns the ledger
+ index of the ledger to request. Otherwise returns boost::none.
+ Some reasons this may return boost::none are: this database does
+ not store shards, all shards are are stored and full, max allowed
+ disk space would be exceeded, or a ledger was recently requested
+ and not enough time has passed between requests.
+ @implNote adds a new writable shard if necessary
+ */
+ virtual
+ boost::optional
+ prepare(std::uint32_t validLedgerSeq) = 0;
+
+ /** Fetch a ledger from the shard store
+
+ @param hash The key of the ledger to retrieve
+ @param seq The sequence of the ledger
+ @return The ledger if found, nullptr otherwise
+ */
+ virtual
+ std::shared_ptr
+ fetchLedger(uint256 const& hash, std::uint32_t seq) = 0;
+
+ /** Notifies the database that the given ledger has been
+ fully acquired and stored.
+
+ @param ledger The stored ledger to be marked as complete
+ */
+ virtual
+ void
+ setStored(std::shared_ptr const& ledger) = 0;
+
+ /** Query if a ledger with the given sequence is stored
+
+ @param seq The ledger sequence to check if stored
+ @return `true` if the ledger is stored
+ */
+ virtual
+ bool
+ contains(std::uint32_t seq) = 0;
+
+ /** Query which complete shards are stored
+
+ @return the indexes of complete shards
+ */
+ virtual
+ std::string
+ getCompleteShards() = 0;
+
+ /** Verifies shard store data is valid.
+
+ @param app The application object
+ */
+ virtual
+ void
+ validate() = 0;
+
+ /** @return The number of ledgers stored in a shard
+ */
+ static
+ std::uint32_t
+ ledgersPerShard()
+ {
+ return lps_;
+ }
+
+ /** Calculates the shard index for a given ledger sequence
+
+ @param seq ledger sequence
+ @return The shard index of the ledger sequence
+ */
+ static
+ std::uint32_t
+ seqToShardIndex(std::uint32_t seq)
+ {
+ assert(seq >= genesisSeq);
+ return (seq - 1) / lps_;
+ }
+
+ /** Calculates the first ledger sequence for a given shard index
+
+ @param shardIndex The shard index considered
+ @return The first ledger sequence pertaining to the shard index
+ */
+ static
+ std::uint32_t
+ firstSeq(std::uint32_t shardIndex)
+ {
+ return 1 + (shardIndex * lps_);
+ }
+
+ /** Calculates the last ledger sequence for a given shard index
+
+ @param shardIndex The shard index considered
+ @return The last ledger sequence pertaining to the shard index
+ */
+ static
+ std::uint32_t
+ lastSeq(std::uint32_t shardIndex)
+ {
+ return (shardIndex + 1) * lps_;
+ }
+
+protected:
+ // The number of ledgers stored in a shard, default is 16384
+ static std::uint32_t lps_;
+};
+
+}
+}
+
+#endif
diff --git a/src/ripple/nodestore/Manager.h b/src/ripple/nodestore/Manager.h
index 514668b93..c4ccc94a1 100644
--- a/src/ripple/nodestore/Manager.h
+++ b/src/ripple/nodestore/Manager.h
@@ -22,6 +22,7 @@
#include
#include
+#include
namespace ripple {
namespace NodeStore {
@@ -49,7 +50,9 @@ public:
@param name The name to match, performed case-insensitive.
@return `nullptr` if a match was not found.
*/
- //virtual Factory* find (std::string const& name) const = 0;
+ virtual
+ Factory*
+ find(std::string const& name) = 0;
/** Create a backend. */
virtual
@@ -87,15 +90,6 @@ public:
int readThreads, Stoppable& parent,
Section const& backendParameters,
beast::Journal journal) = 0;
-
- virtual
- std::unique_ptr
- make_DatabaseRotating (std::string const& name,
- Scheduler& scheduler, std::int32_t readThreads,
- Stoppable& parent,
- std::shared_ptr writableBackend,
- std::shared_ptr archiveBackend,
- beast::Journal journal) = 0;
};
//------------------------------------------------------------------------------
diff --git a/src/ripple/nodestore/Types.h b/src/ripple/nodestore/Types.h
index cb1fb1663..6c46b18b1 100644
--- a/src/ripple/nodestore/Types.h
+++ b/src/ripple/nodestore/Types.h
@@ -48,6 +48,10 @@ enum Status
/** A batch of NodeObjects to write at once. */
using Batch = std::vector >;
+
+// System constant/invariant
+static constexpr std::uint32_t genesisSeq {32570u};
+
}
}
diff --git a/src/ripple/nodestore/backend/MemoryFactory.cpp b/src/ripple/nodestore/backend/MemoryFactory.cpp
index aaf704e5c..36d576fb6 100644
--- a/src/ripple/nodestore/backend/MemoryFactory.cpp
+++ b/src/ripple/nodestore/backend/MemoryFactory.cpp
@@ -80,7 +80,7 @@ private:
std::string name_;
beast::Journal journal_;
- MemoryDB* db_;
+ MemoryDB* db_ {nullptr};
public:
MemoryBackend (size_t keyBytes, Section const& keyValues,
@@ -90,7 +90,6 @@ public:
{
if (name_.empty())
Throw ("Missing path in Memory backend");
- db_ = &memoryFactory.open(name_);
}
~MemoryBackend ()
@@ -104,6 +103,12 @@ public:
return name_;
}
+ void
+ open() override
+ {
+ db_ = &memoryFactory.open(name_);
+ }
+
void
close() override
{
@@ -115,6 +120,7 @@ public:
Status
fetch (void const* key, std::shared_ptr* pObject) override
{
+ assert(db_);
uint256 const hash (uint256::fromVoid (key));
std::lock_guard