Improve shard concurrency:

* Reduce lock scope on all public functions
* Use TaskQueue to process shard finalization in separate thread
* Store shard last ledger hash and other info in backend
* Use temp SQLite DB versus control file when acquiring
* Remove boost serialization from cmake files
This commit is contained in:
Miguel Portilla
2019-09-13 18:44:24 -04:00
committed by manojsdoshi
parent f00f263852
commit cc452dfa9b
47 changed files with 1972 additions and 1346 deletions

View File

@@ -21,7 +21,6 @@ find_dependency (Boost 1.70
filesystem
program_options
regex
serialization
system
thread)
#[=========================================================[

View File

@@ -513,6 +513,7 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
main sources:
subdir: overlay

View File

@@ -21,9 +21,7 @@ target_compile_definitions (opts
>
$<$<BOOL:${beast_no_unit_test_inline}>:BEAST_NO_UNIT_TEST_INLINE=1>
$<$<BOOL:${beast_disable_autolink}>:BEAST_DONT_AUTOLINK_TO_WIN32_LIBRARIES=1>
$<$<BOOL:${single_io_service_thread}>:RIPPLE_SINGLE_IO_SERVICE_THREAD=1>
# doesn't currently compile ? :
$<$<BOOL:${verify_nodeobject_keys}>:RIPPLE_VERIFY_NODEOBJECT_KEYS=1>)
$<$<BOOL:${single_io_service_thread}>:RIPPLE_SINGLE_IO_SERVICE_THREAD=1>)
target_compile_options (opts
INTERFACE
$<$<AND:$<BOOL:${is_gcc}>,$<COMPILE_LANGUAGE:CXX>>:-Wsuggest-override>

View File

@@ -100,12 +100,6 @@ option (have_package_container
option (beast_no_unit_test_inline
"Prevents unit test definitions from being inserted into global table"
OFF)
# NOTE - THIS OPTION CURRENTLY DOES NOT COMPILE :
# TODO: fix or remove
option (verify_nodeobject_keys
"This verifies that the hash of node objects matches the payload. \
This check is expensive - use with caution."
OFF)
option (single_io_service_thread
"Restricts the number of threads calling io_service::run to one. \
This can be useful when debugging."

View File

@@ -47,7 +47,6 @@ find_package (Boost 1.70 REQUIRED
filesystem
program_options
regex
serialization
system
thread)
@@ -69,7 +68,6 @@ target_link_libraries (ripple_boost
Boost::filesystem
Boost::program_options
Boost::regex
Boost::serialization
Boost::system
Boost::thread)
if (Boost_COMPILER)

View File

@@ -39,7 +39,6 @@ else
BLDARGS+=(--with-filesystem)
BLDARGS+=(--with-program_options)
BLDARGS+=(--with-regex)
BLDARGS+=(--with-serialization)
BLDARGS+=(--with-system)
BLDARGS+=(--with-atomic)
BLDARGS+=(--with-thread)

View File

@@ -110,7 +110,7 @@ InboundLedger::init(ScopedLockType& collectionLock)
if (mFailed)
return;
}
else if (shardStore && mSeq >= shardStore->earliestSeq())
else if (shardStore && mSeq >= shardStore->earliestLedgerSeq())
{
if (auto l = shardStore->fetchLedger(mHash, mSeq))
{

View File

@@ -106,7 +106,7 @@ public:
if (reason == InboundLedger::Reason::HISTORY)
{
if (inbound->getLedger()->stateMap().family().isShardBacked())
app_.getNodeStore().copyLedger(inbound->getLedger());
app_.getNodeStore().storeLedger(inbound->getLedger());
}
else if (reason == InboundLedger::Reason::SHARD)
{
@@ -120,7 +120,7 @@ public:
if (inbound->getLedger()->stateMap().family().isShardBacked())
shardStore->setStored(inbound->getLedger());
else
shardStore->copyLedger(inbound->getLedger());
shardStore->storeLedger(inbound->getLedger());
}
return inbound->getLedger();
}

View File

@@ -1742,7 +1742,7 @@ LedgerMaster::fetchForHistory(
*hash, missing, reason);
if (!ledger &&
missing != fetch_seq_ &&
missing > app_.getNodeStore().earliestSeq())
missing > app_.getNodeStore().earliestLedgerSeq())
{
JLOG(m_journal.trace())
<< "fetchForHistory want fetch pack " << missing;
@@ -1771,7 +1771,7 @@ LedgerMaster::fetchForHistory(
mShardLedger = ledger;
}
if (!ledger->stateMap().family().isShardBacked())
app_.getShardStore()->copyLedger(ledger);
app_.getShardStore()->storeLedger(ledger);
}
else
{
@@ -1807,7 +1807,7 @@ LedgerMaster::fetchForHistory(
else
// Do not fetch ledger sequences lower
// than the earliest ledger sequence
fetchSz = app_.getNodeStore().earliestSeq();
fetchSz = app_.getNodeStore().earliestLedgerSeq();
fetchSz = missing >= fetchSz ?
std::min(ledger_fetch_size_, (missing - fetchSz) + 1) : 0;
try
@@ -1867,7 +1867,7 @@ void LedgerMaster::doAdvance (std::unique_lock<std::recursive_mutex>& sl)
std::lock_guard sll(mCompleteLock);
missing = prevMissing(mCompleteLedgers,
mPubLedger->info().seq,
app_.getNodeStore().earliestSeq());
app_.getNodeStore().earliestLedgerSeq());
}
if (missing)
{

View File

@@ -345,9 +345,9 @@ public:
// These are Stoppable-related
std::unique_ptr <JobQueue> m_jobQueue;
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
detail::AppFamily family_;
std::unique_ptr <detail::AppFamily> sFamily_;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
std::unique_ptr <detail::AppFamily> shardFamily_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
@@ -463,18 +463,18 @@ public:
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))
, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, m_nodeStore (m_shaMapStore->makeNodeStore ("NodeStore.main", 4))
, family_ (*this, *m_nodeStore, *m_collectorManager)
// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
, shardStore_ (make_ShardStore (
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))
, family_ (*this, *m_nodeStore, *m_collectorManager)
, m_orderBookDB (*this, *m_jobQueue)
, m_pathRequests (std::make_unique<PathRequests> (
@@ -558,14 +558,6 @@ public:
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this))
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);
}
add (m_resourceManager.get ());
//
@@ -626,7 +618,7 @@ public:
Family* shardFamily() override
{
return sFamily_.get();
return shardFamily_.get();
}
TimeKeeper&
@@ -943,7 +935,7 @@ public:
}
bool
initNodeStoreDBs()
initNodeStore()
{
if (config_->doImport)
{
@@ -961,12 +953,12 @@ public:
JLOG(j.warn()) <<
"Starting node import from '" << source->getName() <<
"' to '" << getNodeStore().getName() << "'.";
"' to '" << m_nodeStore->getName() << "'.";
using namespace std::chrono;
auto const start = steady_clock::now();
getNodeStore().import(*source);
m_nodeStore->import(*source);
auto const elapsed = duration_cast <seconds>
(steady_clock::now() - start);
@@ -990,14 +982,6 @@ public:
family().treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
if (sFamily_)
{
sFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
sFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
}
return true;
}
@@ -1252,8 +1236,8 @@ public:
// have listeners register for "onSweep ()" notification.
family().fullbelow().sweep();
if (sFamily_)
sFamily_->fullbelow().sweep();
if (shardFamily_)
shardFamily_->fullbelow().sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
if (shardStore_)
@@ -1264,8 +1248,8 @@ public:
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
if (sFamily_)
sFamily_->treecache().sweep();
if (shardFamily_)
shardFamily_->treecache().sweep();
cachedSLEs_.expire();
// Set timer to do another sweep later.
@@ -1350,9 +1334,26 @@ bool ApplicationImp::setup()
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);
if (!initSQLiteDBs() || !initNodeStoreDBs())
if (!initSQLiteDBs() || !initNodeStore())
return false;
if (shardStore_)
{
shardFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);
using namespace std::chrono;
shardFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
shardFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
if (!shardStore_->init())
return false;
}
if (!peerReservations_->load(getWalletDB()))
{
JLOG(m_journal.fatal()) << "Cannot find peer reservations!";

View File

@@ -27,16 +27,16 @@ namespace ripple {
////////////////////////////////////////////////////////////////////////////////
// Ledger database holds ledgers and ledger confirmations
static constexpr auto LgrDBName {"ledger.db"};
inline constexpr auto LgrDBName {"ledger.db"};
static constexpr
inline constexpr
std::array<char const*, 3> LgrDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};
static constexpr
inline constexpr
std::array<char const*, 5> LgrDBInit {{
"BEGIN TRANSACTION;",
@@ -63,9 +63,9 @@ std::array<char const*, 5> LgrDBInit {{
////////////////////////////////////////////////////////////////////////////////
// Transaction database holds transactions and public keys
static constexpr auto TxDBName {"transaction.db"};
inline constexpr auto TxDBName {"transaction.db"};
static constexpr
inline constexpr
#if (ULONG_MAX > UINT_MAX) && !defined (NO_SQLITE_MMAP)
std::array<char const*, 6> TxDBPragma {{
#else
@@ -81,7 +81,7 @@ static constexpr
#endif
}};
static constexpr
inline constexpr
std::array<char const*, 8> TxDBInit {{
"BEGIN TRANSACTION;",
@@ -116,18 +116,39 @@ std::array<char const*, 8> TxDBInit {{
////////////////////////////////////////////////////////////////////////////////
// Temporary database used with an incomplete shard that is being acquired
inline constexpr auto AcquireShardDBName {"acquire.db"};
inline constexpr
std::array<char const*, 3> AcquireShardDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};
inline constexpr
std::array<char const*, 1> AcquireShardDBInit {{
"CREATE TABLE IF NOT EXISTS Shard ( \
ShardIndex INTEGER PRIMARY KEY, \
LastLedgerHash CHARACTER(64), \
StoredLedgerSeqs BLOB \
);"
}};
////////////////////////////////////////////////////////////////////////////////
// Pragma for Ledger and Transaction databases with complete shards
static constexpr
std::array<char const*, 2> CompleteShardDBPragma {{
inline constexpr
std::array<char const*, 2> CompleteShardDBPragma{{
"PRAGMA synchronous=OFF;",
"PRAGMA journal_mode=OFF;"
}};
////////////////////////////////////////////////////////////////////////////////
static constexpr auto WalletDBName {"wallet.db"};
inline constexpr auto WalletDBName {"wallet.db"};
static constexpr
inline constexpr
std::array<char const*, 6> WalletDBInit {{
"BEGIN TRANSACTION;",

View File

@@ -142,7 +142,7 @@ void printHelp (const po::options_description& desc)
" connect <ip> [<port>]\n"
" consensus_info\n"
" deposit_authorized <source_account> <destination_account> [<ledger>]\n"
" download_shard [[<index> <url>]] <validate>\n"
" download_shard [[<index> <url>]]\n"
" feature [<feature> [accept|reject]]\n"
" fetch_info [clear]\n"
" gateway_balances [<ledger>] <issuer_account> [ <hotwallet> [ <hotwallet> ]]\n"

View File

@@ -449,7 +449,7 @@ SHAMapStoreImp::run()
std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
std::unique_ptr<NodeStore::Backend> oldBackend;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());
@@ -457,7 +457,8 @@ SHAMapStoreImp::run()
nextArchiveDir, lastRotated});
clearCaches (validatedSeq);
oldBackend = dbRotating_->rotateBackends(
std::move(newBackend));
std::move(newBackend),
lock);
}
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;

View File

@@ -20,11 +20,14 @@
#ifndef RIPPLE_BASICS_RANGESET_H_INCLUDED
#define RIPPLE_BASICS_RANGESET_H_INCLUDED
#include <string>
#include <boost/optional.hpp>
#include <ripple/beast/core/LexicalCast.h>
#include <boost/algorithm/string.hpp>
#include <boost/icl/closed_interval.hpp>
#include <boost/icl/interval_set.hpp>
#include <boost/serialization/split_free.hpp>
#include <boost/optional.hpp>
#include <string>
namespace ripple
{
@@ -86,8 +89,8 @@ std::string to_string(ClosedInterval<T> const & ci)
/** Convert the given RangeSet to a styled string.
The styled string represention is the set of disjoint intervals joined by
commas. The string "empty" is returned if the set is empty.
The styled string representation is the set of disjoint intervals joined
by commas. The string "empty" is returned if the set is empty.
@param rs The rangeset to convert
@return The styled string
@@ -109,6 +112,67 @@ std::string to_string(RangeSet<T> const & rs)
return res;
}
/** Convert the given styled string to a RangeSet.
The styled string representation is the set
of disjoint intervals joined by commas.
@param rs The set to be populated
@param s The styled string to convert
@return True on successfully converting styled string
*/
template <class T>
bool
from_string(RangeSet<T>& rs, std::string const& s)
{
std::vector<std::string> intervals;
std::vector<std::string> tokens;
bool result {true};
boost::split(tokens, s, boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
boost::split(intervals, t, boost::algorithm::is_any_of("-"));
switch (intervals.size())
{
case 1:
{
T front;
if (!beast::lexicalCastChecked(front, intervals.front()))
result = false;
else
rs.insert(front);
break;
}
case 2:
{
T front;
if (!beast::lexicalCastChecked(front, intervals.front()))
result = false;
else
{
T back;
if (!beast::lexicalCastChecked(back, intervals.back()))
result = false;
else
rs.insert(range(front, back));
}
break;
}
default:
result = false;
}
if (!result)
break;
intervals.clear();
}
if (!result)
rs.clear();
return result;
}
/** Find the largest value not in the set that is less than a given value.
@param rs The set of interest
@@ -129,75 +193,8 @@ prevMissing(RangeSet<T> const & rs, T t, T minVal = 0)
return boost::none;
return boost::icl::last(tgt);
}
} // namespace ripple
// The boost serialization documents recommended putting free-function helpers
// in the boost serialization namespace
namespace boost {
namespace serialization {
template <class Archive, class T>
void
save(Archive& ar,
ripple::ClosedInterval<T> const& ci,
const unsigned int version)
{
auto l = ci.lower();
auto u = ci.upper();
ar << l << u;
}
template <class Archive, class T>
void
load(Archive& ar, ripple::ClosedInterval<T>& ci, const unsigned int version)
{
T low, up;
ar >> low >> up;
ci = ripple::ClosedInterval<T>{low, up};
}
template <class Archive, class T>
void
serialize(Archive& ar,
ripple::ClosedInterval<T>& ci,
const unsigned int version)
{
split_free(ar, ci, version);
}
template <class Archive, class T>
void
save(Archive& ar, ripple::RangeSet<T> const& rs, const unsigned int version)
{
auto s = rs.iterative_size();
ar << s;
for (auto const& r : rs)
ar << r;
}
template <class Archive, class T>
void
load(Archive& ar, ripple::RangeSet<T>& rs, const unsigned int version)
{
rs.clear();
std::size_t intervals;
ar >> intervals;
for (std::size_t i = 0; i < intervals; ++i)
{
ripple::ClosedInterval<T> ci;
ar >> ci;
rs.insert(ci);
}
}
template <class Archive, class T>
void
serialize(Archive& ar, ripple::RangeSet<T>& rs, const unsigned int version)
{
split_free(ar, rs, version);
}
} // serialization
} // boost
#endif

View File

@@ -31,9 +31,6 @@
namespace ripple {
// VFALCO NOTE Deprecated
struct TaggedCacheLog;
/** Map/cache combination.
This class implements a cache and a map. The cache keeps objects alive
in the map. The map allows multiple code paths that reference objects

View File

@@ -186,13 +186,13 @@ class RootStoppable;
|
JobQueue
|
+-----------+-----------+-----------+-----------+----+--------+
| | | | | |
| NetworkOPs | InboundLedgers | OrderbookDB
| | |
Overlay InboundTransactions LedgerMaster
| |
PeerFinder LedgerCleaner
+--------+-----------+-----------+-----------+-------+---+----------+
| | | | | | |
| NetworkOPs | InboundLedgers | OrderbookDB |
| | | |
Overlay InboundTransactions LedgerMaster Database
| | |
PeerFinder LedgerCleaner TaskQueue
@endcode
*/

View File

@@ -31,7 +31,7 @@ JobQueue::JobQueue (beast::insight::Collector::ptr const& collector,
, m_lastJob (0)
, m_invalidJobData (JobTypes::instance().getInvalid (), collector, logs)
, m_processCount (0)
, m_workers (*this, perfLog, "JobQueue", 0)
, m_workers (*this, &perfLog, "JobQueue", 0)
, m_cancelCallback (std::bind (&Stoppable::isStopping, this))
, perfLog_ (perfLog)
, m_collector (collector)

View File

@@ -26,7 +26,7 @@ namespace ripple {
Workers::Workers (
Callback& callback,
perf::PerfLog& perfLog,
perf::PerfLog* perfLog,
std::string const& threadNames,
int numberOfThreads)
: m_callback (callback)
@@ -63,7 +63,8 @@ void Workers::setNumberOfThreads (int numberOfThreads)
static int instance {0};
if (m_numberOfThreads != numberOfThreads)
{
perfLog_.resizeJobs(numberOfThreads);
if (perfLog_)
perfLog_->resizeJobs(numberOfThreads);
if (numberOfThreads > m_numberOfThreads)
{

View File

@@ -69,7 +69,7 @@ public:
@param threadNames The name given to each created worker thread.
*/
explicit Workers (Callback& callback,
perf::PerfLog& perfLog,
perf::PerfLog* perfLog,
std::string const& threadNames = "Worker",
int numberOfThreads =
static_cast<int>(std::thread::hardware_concurrency()));
@@ -166,7 +166,7 @@ private:
private:
Callback& m_callback;
perf::PerfLog& perfLog_;
perf::PerfLog* perfLog_;
std::string m_threadNames; // The name to give each thread
std::condition_variable m_cv; // signaled when all threads paused
std::mutex m_mut;

View File

@@ -186,7 +186,6 @@ private:
++i;
else if (!boost::iequals(jvParams[--sz].asString(), "novalidate"))
return rpcError(rpcINVALID_PARAMS);
jvResult[jss::validate] = false;
}
// Create the 'shards' array

View File

@@ -149,7 +149,7 @@ public:
*/
virtual
bool
copyLedger(std::shared_ptr<Ledger const> const& ledger) = 0;
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) = 0;
/** Wait for all currently pending async reads to complete.
*/
@@ -211,12 +211,15 @@ public:
void
onStop() override;
void
onChildrenStopped() override;
/** @return The earliest ledger sequence allowed
*/
std::uint32_t
earliestSeq() const
earliestLedgerSeq() const
{
return earliestSeq_;
return earliestLedgerSeq_;
}
protected:
@@ -234,14 +237,17 @@ protected:
storeSz_ += sz;
}
// Called by the public asyncFetch function
void
asyncFetch(uint256 const& hash, std::uint32_t seq,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache);
// Called by the public fetch function
std::shared_ptr<NodeObject>
fetchInternal(uint256 const& hash, Backend& srcBackend);
fetchInternal(uint256 const& hash, std::shared_ptr<Backend> backend);
// Called by the public import function
void
importInternal(Backend& dstBackend, Database& srcDB);
@@ -250,11 +256,14 @@ protected:
TaggedCache<uint256, NodeObject>& pCache,
KeyCache<uint256>& nCache, bool isAsync);
// Called by the public storeLedger function
bool
copyLedger(Backend& dstBackend, Ledger const& srcLedger,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache,
std::shared_ptr<Ledger const> const& srcNext);
storeLedger(
Ledger const& srcLedger,
std::shared_ptr<Backend> dstBackend,
std::shared_ptr<TaggedCache<uint256, NodeObject>> dstPCache,
std::shared_ptr<KeyCache<uint256>> dstNCache,
std::shared_ptr<Ledger const> next);
private:
std::atomic<std::uint32_t> storeCount_ {0};
@@ -283,7 +292,7 @@ private:
// The default is 32570 to match the XRP ledger network's earliest
// allowed sequence. Alternate networks may set this value.
std::uint32_t const earliestSeq_;
std::uint32_t const earliestLedgerSeq_;
virtual
std::shared_ptr<NodeObject>

View File

@@ -50,12 +50,14 @@ public:
virtual std::mutex& peekMutex() const = 0;
virtual
std::unique_ptr<Backend> const&
std::shared_ptr<Backend> const&
getWritableBackend() const = 0;
virtual
std::unique_ptr<Backend>
rotateBackends(std::unique_ptr<Backend> newBackend) = 0;
std::shared_ptr<Backend>
rotateBackends(
std::shared_ptr<Backend> newBackend,
std::lock_guard<std::mutex> const&) = 0;
};
}

View File

@@ -109,14 +109,14 @@ public:
@param shardIndex Shard index to import
@param srcDir The directory to import from
@param validate If true validate shard ledger data
@return true If the shard was successfully imported
@implNote if successful, srcDir is moved to the database directory
*/
virtual
bool
importShard(std::uint32_t shardIndex,
boost::filesystem::path const& srcDir, bool validate) = 0;
importShard(
std::uint32_t shardIndex,
boost::filesystem::path const& srcDir) = 0;
/** Fetch a ledger from the shard store
@@ -137,15 +137,6 @@ public:
void
setStored(std::shared_ptr<Ledger const> const& ledger) = 0;
/** Query if a ledger with the given sequence is stored
@param seq The ledger sequence to check if stored
@return `true` if the ledger is stored
*/
virtual
bool
contains(std::uint32_t seq) = 0;
/** Query which complete shards are stored
@return the indexes of complete shards

View File

@@ -95,9 +95,9 @@ public:
Blob const& getData () const;
private:
NodeObjectType mType;
uint256 mHash;
Blob mData;
NodeObjectType const mType;
uint256 const mHash;
Blob const mData;
};
}

View File

@@ -36,12 +36,12 @@ Database::Database(
: Stoppable(name, parent.getRoot())
, j_(journal)
, scheduler_(scheduler)
, earliestSeq_(get<std::uint32_t>(
, earliestLedgerSeq_(get<std::uint32_t>(
config,
"earliest_seq",
XRP_LEDGER_EARLIEST_SEQ))
{
if (earliestSeq_ < 1)
if (earliestLedgerSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq");
while (readThreads-- > 0)
@@ -83,6 +83,11 @@ Database::onStop()
// After stop time we can no longer use the JobQueue for background
// reads. Join the background read threads.
stopThreads();
}
void
Database::onChildrenStopped()
{
stopped();
}
@@ -115,13 +120,13 @@ Database::asyncFetch(uint256 const& hash, std::uint32_t seq,
}
std::shared_ptr<NodeObject>
Database::fetchInternal(uint256 const& hash, Backend& srcBackend)
Database::fetchInternal(uint256 const& hash, std::shared_ptr<Backend> backend)
{
std::shared_ptr<NodeObject> nObj;
Status status;
try
{
status = srcBackend.fetch(hash.begin(), &nObj);
status = backend->fetch(hash.begin(), &nObj);
}
catch (std::exception const& e)
{
@@ -226,12 +231,14 @@ Database::doFetch(uint256 const& hash, std::uint32_t seq,
}
bool
Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache,
std::shared_ptr<Ledger const> const& srcNext)
Database::storeLedger(
Ledger const& srcLedger,
std::shared_ptr<Backend> dstBackend,
std::shared_ptr<TaggedCache<uint256, NodeObject>> dstPCache,
std::shared_ptr<KeyCache<uint256>> dstNCache,
std::shared_ptr<Ledger const> next)
{
assert(static_cast<bool>(pCache) == static_cast<bool>(nCache));
assert(static_cast<bool>(dstPCache) == static_cast<bool>(dstNCache));
if (srcLedger.info().hash.isZero() ||
srcLedger.info().accountHash.isZero())
{
@@ -254,48 +261,42 @@ Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger,
Batch batch;
batch.reserve(batchWritePreallocationSize);
auto storeBatch = [&]() {
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
for (auto& nObj : batch)
if (dstPCache && dstNCache)
{
assert(nObj->getHash() ==
sha512Hash(makeSlice(nObj->getData())));
if (pCache && nCache)
for (auto& nObj : batch)
{
pCache->canonicalize(nObj->getHash(), nObj, true);
nCache->erase(nObj->getHash());
dstPCache->canonicalize(nObj->getHash(), nObj, true);
dstNCache->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
}
#else
if (pCache && nCache)
for (auto& nObj : batch)
{
pCache->canonicalize(nObj->getHash(), nObj, true);
nCache->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
#endif
dstBackend.storeBatch(batch);
dstBackend->storeBatch(batch);
batch.clear();
batch.reserve(batchWritePreallocationSize);
};
bool error = false;
auto f = [&](SHAMapAbstractNode& node) {
auto visit = [&](SHAMapAbstractNode& node)
{
if (auto nObj = srcDB.fetch(
node.getNodeHash().as_uint256(), srcLedger.info().seq))
{
batch.emplace_back(std::move(nObj));
if (batch.size() >= batchWritePreallocationSize)
storeBatch();
if (batch.size() < batchWritePreallocationSize)
return true;
storeBatch();
if (!isStopping())
return true;
}
else
error = true;
return !error;
error = true;
return false;
};
// Store ledger header
{
Serializer s(1024);
Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
s.add32(HashPrefix::ledgerMaster);
addRaw(srcLedger.info(), s);
auto nObj = NodeObject::createObject(hotLEDGER,
@@ -313,14 +314,14 @@ Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger,
" state map invalid";
return false;
}
if (srcNext && srcNext->info().parentHash == srcLedger.info().hash)
if (next && next->info().parentHash == srcLedger.info().hash)
{
auto have = srcNext->stateMap().snapShot(false);
auto have = next->stateMap().snapShot(false);
srcLedger.stateMap().snapShot(
false)->visitDifferences(&(*have), f);
false)->visitDifferences(&(*have), visit);
}
else
srcLedger.stateMap().snapShot(false)->visitNodes(f);
srcLedger.stateMap().snapShot(false)->visitNodes(visit);
if (error)
return false;
}
@@ -335,7 +336,7 @@ Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger,
" transaction map invalid";
return false;
}
srcLedger.txMap().snapShot(false)->visitNodes(f);
srcLedger.txMap().snapShot(false)->visitNodes(visit);
if (error)
return false;
}

View File

@@ -28,9 +28,6 @@ void
DatabaseNodeImp::store(NodeObjectType type, Blob&& data,
uint256 const& hash, std::uint32_t seq)
{
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
assert(hash == sha512Hash(makeSlice(data)));
#endif
auto nObj = NodeObject::createObject(type, std::move(data), hash);
pCache_->canonicalize(hash, nObj, true);
backend_->store(nObj);

View File

@@ -38,7 +38,7 @@ public:
Scheduler& scheduler,
int readThreads,
Stoppable& parent,
std::unique_ptr<Backend> backend,
std::shared_ptr<Backend> backend,
Section const& config,
beast::Journal j)
: Database(name, parent, scheduler, readThreads, config, j)
@@ -91,10 +91,10 @@ public:
std::shared_ptr<NodeObject>& object) override;
bool
copyLedger(std::shared_ptr<Ledger const> const& ledger) override
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override
{
return Database::copyLedger(
*backend_, *ledger, pCache_, nCache_, nullptr);
return Database::storeLedger(
*srcLedger, backend_, pCache_, nCache_, nullptr);
}
int
@@ -123,12 +123,12 @@ private:
std::shared_ptr<KeyCache<uint256>> nCache_;
// Persistent key/value storage
std::unique_ptr<Backend> backend_;
std::shared_ptr<Backend> backend_;
std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) override
{
return fetchInternal(hash, *backend_);
return fetchInternal(hash, backend_);
}
void

View File

@@ -29,8 +29,8 @@ DatabaseRotatingImp::DatabaseRotatingImp(
Scheduler& scheduler,
int readThreads,
Stoppable& parent,
std::unique_ptr<Backend> writableBackend,
std::unique_ptr<Backend> archiveBackend,
std::shared_ptr<Backend> writableBackend,
std::shared_ptr<Backend> archiveBackend,
Section const& config,
beast::Journal j)
: DatabaseRotating(name, parent, scheduler, readThreads, config, j)
@@ -48,10 +48,10 @@ DatabaseRotatingImp::DatabaseRotatingImp(
setParent(parent);
}
// Make sure to call it already locked!
std::unique_ptr<Backend>
std::shared_ptr<Backend>
DatabaseRotatingImp::rotateBackends(
std::unique_ptr<Backend> newBackend)
std::shared_ptr<Backend> newBackend,
std::lock_guard<std::mutex> const&)
{
auto oldBackend {std::move(archiveBackend_)};
archiveBackend_ = std::move(writableBackend_);
@@ -63,9 +63,6 @@ void
DatabaseRotatingImp::store(NodeObjectType type, Blob&& data,
uint256 const& hash, std::uint32_t seq)
{
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
assert(hash == sha512Hash(makeSlice(data)));
#endif
auto nObj = NodeObject::createObject(type, std::move(data), hash);
pCache_->canonicalize(hash, nObj, true);
getWritableBackend()->store(nObj);
@@ -106,10 +103,10 @@ std::shared_ptr<NodeObject>
DatabaseRotatingImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
{
Backends b = getBackends();
auto nObj = fetchInternal(hash, *b.writableBackend);
auto nObj = fetchInternal(hash, b.writableBackend);
if (! nObj)
{
nObj = fetchInternal(hash, *b.archiveBackend);
nObj = fetchInternal(hash, b.archiveBackend);
if (nObj)
{
getWritableBackend()->store(nObj);

View File

@@ -37,8 +37,8 @@ public:
Scheduler& scheduler,
int readThreads,
Stoppable& parent,
std::unique_ptr<Backend> writableBackend,
std::unique_ptr<Backend> archiveBackend,
std::shared_ptr<Backend> writableBackend,
std::shared_ptr<Backend> archiveBackend,
Section const& config,
beast::Journal j);
@@ -48,15 +48,17 @@ public:
stopThreads();
}
std::unique_ptr<Backend> const&
std::shared_ptr<Backend> const&
getWritableBackend() const override
{
std::lock_guard lock (rotateMutex_);
return writableBackend_;
}
std::unique_ptr<Backend>
rotateBackends(std::unique_ptr<Backend> newBackend) override;
std::shared_ptr<Backend>
rotateBackends(
std::shared_ptr<Backend> newBackend,
std::lock_guard<std::mutex> const&) override;
std::mutex& peekMutex() const override
{
@@ -92,10 +94,10 @@ public:
std::shared_ptr<NodeObject>& object) override;
bool
copyLedger(std::shared_ptr<Ledger const> const& ledger) override
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override
{
return Database::copyLedger(
*getWritableBackend(), *ledger, pCache_, nCache_, nullptr);
return Database::storeLedger(
*srcLedger, getWritableBackend(), pCache_, nCache_, nullptr);
}
int
@@ -126,13 +128,13 @@ private:
// Negative cache
std::shared_ptr<KeyCache<uint256>> nCache_;
std::unique_ptr<Backend> writableBackend_;
std::unique_ptr<Backend> archiveBackend_;
std::shared_ptr<Backend> writableBackend_;
std::shared_ptr<Backend> archiveBackend_;
mutable std::mutex rotateMutex_;
struct Backends {
std::unique_ptr<Backend> const& writableBackend;
std::unique_ptr<Backend> const& archiveBackend;
std::shared_ptr<Backend> const& writableBackend;
std::shared_ptr<Backend> const& archiveBackend;
};
Backends getBackends() const

File diff suppressed because it is too large Load Diff

View File

@@ -22,6 +22,7 @@
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/nodestore/impl/Shard.h>
#include <ripple/nodestore/impl/TaskQueue.h>
namespace ripple {
namespace NodeStore {
@@ -61,8 +62,9 @@ public:
getPreShards() override;
bool
importShard(std::uint32_t shardIndex,
boost::filesystem::path const& srcDir, bool validate) override;
importShard(
std::uint32_t shardIndex,
boost::filesystem::path const& srcDir) override;
std::shared_ptr<Ledger>
fetchLedger(uint256 const& hash, std::uint32_t seq) override;
@@ -70,9 +72,6 @@ public:
void
setStored(std::shared_ptr<Ledger const> const& ledger) override;
bool
contains(std::uint32_t seq) override;
std::string
getCompleteShards() override;
@@ -94,7 +93,7 @@ public:
std::uint32_t
seqToShardIndex(std::uint32_t seq) const override
{
assert(seq >= earliestSeq());
assert(seq >= earliestLedgerSeq());
return NodeStore::seqToShardIndex(seq, ledgersPerShard_);
}
@@ -103,7 +102,7 @@ public:
{
assert(shardIndex >= earliestShardIndex_);
if (shardIndex <= earliestShardIndex_)
return earliestSeq();
return earliestLedgerSeq();
return 1 + (shardIndex * ledgersPerShard_);
}
@@ -126,6 +125,9 @@ public:
return backendName_;
}
void
onStop() override;
/** Import the application local node store
@param source The application node store.
@@ -137,18 +139,23 @@ public:
getWriteLoad() const override;
void
store(NodeObjectType type, Blob&& data,
uint256 const& hash, std::uint32_t seq) override;
store(
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t seq) override;
std::shared_ptr<NodeObject>
fetch(uint256 const& hash, std::uint32_t seq) override;
bool
asyncFetch(uint256 const& hash, std::uint32_t seq,
asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<NodeObject>& object) override;
bool
copyLedger(std::shared_ptr<Ledger const> const& ledger) override;
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override;
int
getDesiredAsyncReadCount(std::uint32_t seq) override;
@@ -163,21 +170,43 @@ public:
sweep() override;
private:
struct ShardInfo
{
enum class State
{
none,
final, // Immutable, complete and validated
acquire, // Being acquired
import, // Being imported
finalize // Being finalized
};
ShardInfo() = default;
ShardInfo(std::shared_ptr<Shard> shard_, State state_)
: shard(std::move(shard_))
, state(state_)
{}
std::shared_ptr<Shard> shard;
State state {State::none};
};
Application& app_;
mutable std::mutex m_;
Stoppable& parent_;
mutable std::mutex mutex_;
bool init_ {false};
// The context shared with all shard backend databases
std::unique_ptr<nudb::context> ctx_;
// Complete shards
std::map<std::uint32_t, std::shared_ptr<Shard>> complete_;
// Queue of background tasks to be performed
std::unique_ptr<TaskQueue> taskQueue_;
// A shard being acquired from the peer network
std::unique_ptr<Shard> incomplete_;
// Shards held by this server
std::map<std::uint32_t, ShardInfo> shards_;
// Shards prepared for import
std::map<std::uint32_t, Shard*> preShards_;
// Shard index being acquired from the peer network
std::uint32_t acquireIndex_ {0};
// The shard store root directory
boost::filesystem::path dir_;
@@ -188,9 +217,6 @@ private:
// Complete shard indexes
std::string status_;
// If backend type uses permanent storage
bool backed_;
// The name associated with the backend used with the shard store
std::string backendName_;
@@ -214,6 +240,11 @@ private:
// File name used to mark shards being imported from node store
static constexpr auto importMarker_ = "import";
// Initialize settings from the configuration file
// Lock must be held
bool
initConfig(std::lock_guard<std::mutex>&);
std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) override;
@@ -223,17 +254,25 @@ private:
Throw<std::runtime_error>("Shard store import not supported");
}
// Finds a random shard index that is not stored
// Randomly select a shard index not stored
// Lock must be held
boost::optional<std::uint32_t>
findShardIndexToAdd(
findAcquireIndex(
std::uint32_t validLedgerSeq,
std::lock_guard<std::mutex>&);
// Set storage and file descriptor usage stats
// Queue a task to finalize a shard by validating its databases
// Lock must be held
void
setFileStats(std::lock_guard<std::mutex>&);
finalizeShard(
ShardInfo& shardInfo,
bool writeSQLite,
std::lock_guard<std::mutex>&);
// Set storage and file descriptor usage stats
// Lock must NOT be held
void
setFileStats();
// Update status string
// Lock must be held
@@ -241,11 +280,16 @@ private:
updateStatus(std::lock_guard<std::mutex>&);
std::pair<std::shared_ptr<PCache>, std::shared_ptr<NCache>>
selectCache(std::uint32_t seq);
getCache(std::uint32_t seq);
// Returns available storage space
std::uint64_t
available() const;
bool
storeLedgerInShard(
std::shared_ptr<Shard>& shard,
std::shared_ptr<Ledger const> const& ledger);
};
} // NodeStore

View File

@@ -31,8 +31,8 @@ NodeObject::NodeObject (
PrivateAccess)
: mType (type)
, mHash (hash)
, mData (std::move(data))
{
mData = std::move (data);
}
std::shared_ptr<NodeObject>

File diff suppressed because it is too large Load Diff

View File

@@ -30,29 +30,12 @@
#include <boost/filesystem.hpp>
#include <nudb/nudb.hpp>
#include <atomic>
#include <tuple>
namespace ripple {
namespace NodeStore {
// Removes a path in its entirety
inline static
bool
removeAll(
boost::filesystem::path const& path,
beast::Journal const& j)
{
try
{
boost::filesystem::remove_all(path);
}
catch (std::exception const& e)
{
JLOG(j.error()) <<
"exception: " << e.what();
return false;
}
return true;
}
using PCache = TaggedCache<uint256, NodeObject>;
using NCache = KeyCache<uint256>;
class DatabaseShard;
@@ -65,7 +48,7 @@ class DatabaseShard;
Public functions can be called concurrently from any thread.
*/
class Shard
class Shard final
{
public:
Shard(
@@ -77,29 +60,37 @@ public:
bool
open(Scheduler& scheduler, nudb::context& ctx);
bool
setStored(std::shared_ptr<Ledger const> const& ledger);
boost::optional<std::uint32_t>
prepare();
bool
contains(std::uint32_t seq) const;
store(std::shared_ptr<Ledger const> const& ledger);
bool
containsLedger(std::uint32_t seq) const;
void
sweep();
std::uint32_t
index() const
{
return index_;
}
index() const {return index_;}
std::shared_ptr<Backend> const&
boost::filesystem::path const&
getDir() const {return dir_;}
std::tuple<
std::shared_ptr<Backend>,
std::shared_ptr<PCache>,
std::shared_ptr<NCache>>
getBackendAll() const;
std::shared_ptr<Backend>
getBackend() const;
/** Returns `true` if all shard ledgers have been stored in the backend
*/
bool
complete() const;
isBackendComplete() const;
std::shared_ptr<PCache>
pCache() const;
@@ -107,36 +98,65 @@ public:
std::shared_ptr<NCache>
nCache() const;
std::uint64_t
fileSize() const;
std::uint32_t
fdRequired() const;
std::shared_ptr<Ledger const>
lastStored() const;
/** Returns a pair where the first item describes the storage space
utilized and the second item is the number of file descriptors required.
*/
std::pair<std::uint64_t, std::uint32_t>
fileInfo() const;
/** Returns `true` if the shard is complete, validated, and immutable.
*/
bool
validate() const;
isFinal() const;
/** Returns `true` if the shard is older, without final key data
*/
bool
isLegacy() const;
/** Finalize shard by walking its ledgers and verifying each Merkle tree.
@param writeSQLite If true, SQLite entries will be rewritten using
verified backend data.
*/
bool
finalize(const bool writeSQLite);
void
stop() {stop_ = true;}
// Current shard version
static constexpr std::uint32_t version {2};
// The finalKey is a hard coded value of zero. It is used to store
// finalizing shard data to the backend. The data contains a version,
// last ledger's hash, and the first and last ledger sequences.
static uint256 const finalKey;
private:
static constexpr auto controlFileName = "control.txt";
struct AcquireInfo
{
// SQLite database to track information about what has been acquired
std::unique_ptr<DatabaseCon> SQLiteDB;
// Tracks the sequences of ledgers acquired and stored in the backend
RangeSet<std::uint32_t> storedSeqs;
};
Application& app_;
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
// Shard Index
std::uint32_t const index_;
// First ledger sequence in this shard
// First ledger sequence in the shard
std::uint32_t const firstSeq_;
// Last ledger sequence in this shard
// Last ledger sequence in the shard
std::uint32_t const lastSeq_;
// The maximum number of ledgers this shard can store
// The earliest shard may store less ledgers than
// subsequent shards
// The maximum number of ledgers the shard can store
// The earliest shard may store fewer ledgers than subsequent shards
std::uint32_t const maxLedgers_;
// Database positive cache
@@ -148,14 +168,11 @@ private:
// Path to database files
boost::filesystem::path const dir_;
// Path to control file
boost::filesystem::path const control_;
// Storage space utilized by the shard
std::uint64_t fileSz_;
std::uint64_t fileSz_ {0};
// Number of file descriptors required by the shard
std::uint32_t fdRequired_;
std::uint32_t fdRequired_ {0};
// NuDB key/value store for node objects
std::shared_ptr<Backend> backend_;
@@ -166,58 +183,54 @@ private:
// Transaction SQLite database used for indexes
std::unique_ptr<DatabaseCon> txSQLiteDB_;
// Tracking information used only when acquiring a shard from the network.
// If the shard is complete, this member will be null.
std::unique_ptr<AcquireInfo> acquireInfo_;
beast::Journal const j_;
// True if shard has its entire ledger range stored
bool complete_ {false};
// True if backend has stored all ledgers pertaining to the shard
bool backendComplete_ {false};
// Sequences of ledgers stored with an incomplete shard
RangeSet<std::uint32_t> storedSeqs_;
// Older shard without an acquire database or final key
// Eventually there will be no need for this and should be removed
bool legacy_ {false};
// Used as an optimization for visitDifferences
std::shared_ptr<Ledger const> lastStored_;
// True if the backend has a final key stored
bool final_ {false};
// Marks shard immutable
// Lock over mutex_ required
bool
setComplete(std::lock_guard<std::mutex> const& lock);
// Determines if the shard needs to stop processing for shutdown
std::atomic<bool> stop_ {false};
// Set the backend cache
// Lock over mutex_ required
void
setCache(std::lock_guard<std::mutex> const& lock);
setBackendCache(std::lock_guard<std::recursive_mutex> const& lock);
// Open/Create SQLite databases
// Lock over mutex_ required
bool
initSQLite(std::lock_guard<std::mutex> const& lock);
initSQLite(std::lock_guard<std::recursive_mutex> const& lock);
// Write SQLite entries for a ledger stored in this shard's backend
// Write SQLite entries for this ledger
// Lock over mutex_ required
bool
setSQLiteStored(
storeSQLite(
std::shared_ptr<Ledger const> const& ledger,
std::lock_guard<std::mutex> const& lock);
std::lock_guard<std::recursive_mutex> const& lock);
// Set storage and file descriptor usage stats
// Lock over mutex_ required
bool
setFileStats(std::lock_guard<std::mutex> const& lock);
void
setFileStats(std::lock_guard<std::recursive_mutex> const& lock);
// Save the control file for an incomplete shard
// Lock over mutex_ required
bool
saveControl(std::lock_guard<std::mutex> const& lock);
// Validate this ledger by walking its SHAMaps
// and verifying each merkle tree
// Validate this ledger by walking its SHAMaps and verifying Merkle trees
bool
valLedger(
std::shared_ptr<Ledger const> const& ledger,
std::shared_ptr<Ledger const> const& next) const;
// Fetches from the backend and will log
// errors based on status codes
// Fetches from backend and log errors based on status codes
std::shared_ptr<NodeObject>
valFetch(uint256 const& hash) const;
};

View File

@@ -0,0 +1,66 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2019 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/nodestore/impl/TaskQueue.h>
#include <cassert>
namespace ripple {
namespace NodeStore {
TaskQueue::TaskQueue(Stoppable& parent)
: Stoppable("TaskQueue", parent)
, workers_(*this, nullptr, "Shard store taskQueue", 1)
{
}
void
TaskQueue::onStop()
{
workers_.pauseAllThreadsAndWait();
stopped();
}
void
TaskQueue::addTask(std::function<void()> task)
{
std::lock_guard lock {mutex_};
tasks_.emplace(std::move(task));
workers_.addTask();
}
void
TaskQueue::processTask(int instance)
{
std::function<void()> task;
{
std::lock_guard lock {mutex_};
assert(!tasks_.empty());
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
} // NodeStore
} // ripple

View File

@@ -0,0 +1,62 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2019 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_NODESTORE_TASKQUEUE_H_INCLUDED
#define RIPPLE_NODESTORE_TASKQUEUE_H_INCLUDED
#include <ripple/core/impl/Workers.h>
#include <ripple/core/Stoppable.h>
#include <functional>
#include <queue>
namespace ripple {
namespace NodeStore {
class TaskQueue
: public Stoppable
, private Workers::Callback
{
public:
explicit
TaskQueue(Stoppable& parent);
void
onStop() override;
/** Adds a task to the queue
@param task std::function with signature void()
*/
void
addTask(std::function<void()> task);
private:
std::mutex mutex_;
Workers workers_;
std::queue<std::function<void()>> tasks_;
void
processTask(int instance) override;
};
} // NodeStore
} // ripple
#endif

View File

@@ -431,7 +431,7 @@ PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const
return true;
}
return seq >= app_.getNodeStore().earliestSeq() &&
return seq >= app_.getNodeStore().earliestLedgerSeq() &&
hasShard(NodeStore::seqToShardIndex(seq));
}
@@ -1259,6 +1259,9 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMPeerShardInfo> const& m)
// Parse the shard indexes received in the shard info
RangeSet<std::uint32_t> shardIndexes;
{
if (!from_string(shardIndexes, m->shardindexes()))
return badData("Invalid shard indexes");
std::uint32_t earliestShard;
boost::optional<std::uint32_t> latestShard;
{
@@ -1267,70 +1270,23 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMPeerShardInfo> const& m)
if (auto shardStore = app_.getShardStore())
{
earliestShard = shardStore->earliestShardIndex();
if (curLedgerSeq >= shardStore->earliestSeq())
if (curLedgerSeq >= shardStore->earliestLedgerSeq())
latestShard = shardStore->seqToShardIndex(curLedgerSeq);
}
else
{
auto const earliestSeq {app_.getNodeStore().earliestSeq()};
earliestShard = NodeStore::seqToShardIndex(earliestSeq);
if (curLedgerSeq >= earliestSeq)
auto const earliestLedgerSeq {
app_.getNodeStore().earliestLedgerSeq()};
earliestShard = NodeStore::seqToShardIndex(earliestLedgerSeq);
if (curLedgerSeq >= earliestLedgerSeq)
latestShard = NodeStore::seqToShardIndex(curLedgerSeq);
}
}
auto getIndex = [this, &earliestShard, &latestShard]
(std::string const& s) -> boost::optional<std::uint32_t>
if (boost::icl::first(shardIndexes) < earliestShard ||
(latestShard && boost::icl::last(shardIndexes) > latestShard))
{
std::uint32_t shardIndex;
if (!beast::lexicalCastChecked(shardIndex, s))
{
fee_ = Resource::feeBadData;
return boost::none;
}
if (shardIndex < earliestShard ||
(latestShard && shardIndex > latestShard))
{
fee_ = Resource::feeBadData;
JLOG(p_journal_.error()) <<
"Invalid shard index " << shardIndex;
return boost::none;
}
return shardIndex;
};
std::vector<std::string> tokens;
boost::split(tokens, m->shardindexes(),
boost::algorithm::is_any_of(","));
std::vector<std::string> indexes;
for (auto const& t : tokens)
{
indexes.clear();
boost::split(indexes, t, boost::algorithm::is_any_of("-"));
switch (indexes.size())
{
case 1:
{
auto const first {getIndex(indexes.front())};
if (!first)
return;
shardIndexes.insert(*first);
break;
}
case 2:
{
auto const first {getIndex(indexes.front())};
if (!first)
return;
auto const second {getIndex(indexes.back())};
if (!second)
return;
shardIndexes.insert(range(*first, *second));
break;
}
default:
return badData("Invalid shard indexes");
}
return badData("Invalid shard indexes");
}
}
@@ -1340,7 +1296,7 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMPeerShardInfo> const& m)
{
if (m->endpoint() != "0")
{
auto result =
auto result =
beast::IP::Endpoint::from_string_checked(m->endpoint());
if (!result)
return badData("Invalid incoming endpoint: " + m->endpoint());
@@ -2268,7 +2224,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
{
if (auto shardStore = app_.getShardStore())
{
if (seq >= shardStore->earliestSeq())
if (seq >= shardStore->earliestLedgerSeq())
hObj = shardStore->fetch(hash, seq);
}
}
@@ -2714,7 +2670,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (auto shardStore = app_.getShardStore())
{
auto seq = packet.ledgerseq();
if (seq >= shardStore->earliestSeq())
if (seq >= shardStore->earliestLedgerSeq())
ledger = shardStore->fetchLedger(ledgerhash, seq);
}
}

View File

@@ -554,7 +554,6 @@ JSS ( url_password ); // in: Subscribe
JSS ( url_username ); // in: Subscribe
JSS ( urlgravatar ); //
JSS ( username ); // in: Subscribe
JSS ( validate ); // in: DownloadShard
JSS ( validated ); // out: NetworkOPs, RPCHelpers, AccountTx*
// Tx
JSS ( validator_list_expires ); // out: NetworkOps, ValidatorList

View File

@@ -41,8 +41,7 @@ public:
ShardArchiveHandler& operator= (ShardArchiveHandler&&) = delete;
ShardArchiveHandler& operator= (ShardArchiveHandler const&) = delete;
/** @param validate if shard data should be verified with network. */
ShardArchiveHandler(Application& app, bool validate);
ShardArchiveHandler(Application& app);
~ShardArchiveHandler();
@@ -80,7 +79,6 @@ private:
Application& app_;
std::shared_ptr<SSLHTTPDownloader> downloader_;
boost::filesystem::path const downloadDir_;
bool const validate_;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
bool process_;
std::map<std::uint32_t, parsedURL> archives_;

View File

@@ -34,7 +34,6 @@ namespace ripple {
/** RPC command that downloads and import shard archives.
{
shards: [{index: <integer>, url: <string>}]
validate: <bool> // optional, default is true
}
example:
@@ -124,20 +123,9 @@ doDownloadShard(RPC::JsonContext& context)
}
}
bool validate {true};
if (context.params.isMember(jss::validate))
{
if (!context.params[jss::validate].isBool())
{
return RPC::expected_field_error(
std::string(jss::validate), "a bool");
}
validate = context.params[jss::validate].asBool();
}
// Begin downloading. The handler keeps itself alive while downloading.
auto handler {
std::make_shared<RPC::ShardArchiveHandler>(context.app, validate)};
std::make_shared<RPC::ShardArchiveHandler>(context.app)};
for (auto& [index, url] : archives)
{
if (!handler->add(index, std::move(url)))

View File

@@ -31,11 +31,10 @@ namespace RPC {
using namespace boost::filesystem;
using namespace std::chrono_literals;
ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate)
ShardArchiveHandler::ShardArchiveHandler(Application& app)
: app_(app)
, downloadDir_(get(app_.config().section(
ConfigSection::shardDatabase()), "path", "") + "/download")
, validate_(validate)
, timer_(app_.getIOService())
, process_(false)
, j_(app.journal("ShardArchiveHandler"))
@@ -209,7 +208,7 @@ ShardArchiveHandler::complete(path dstPath)
{
// If validating and not synced then defer and retry
auto const mode {ptr->app_.getOPs().getOperatingMode()};
if (ptr->validate_ && mode != OperatingMode::FULL)
if (mode != OperatingMode::FULL)
{
std::lock_guard lock(m_);
timer_.expires_from_now(static_cast<std::chrono::seconds>(
@@ -265,7 +264,7 @@ ShardArchiveHandler::process(path const& dstPath)
}
// Import the shard into the shard store
if (!app_.getShardStore()->importShard(shardIndex, shardDir, validate_))
if (!app_.getShardStore()->importShard(shardIndex, shardDir))
{
JLOG(j_.error()) <<
"Importing shard " << shardIndex;

View File

@@ -112,24 +112,6 @@ SHAMapNodeID SHAMapNodeID::getChildNodeID (int m) const
// Which branch would contain the specified hash
int SHAMapNodeID::selectBranch (uint256 const& hash) const
{
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
if (mDepth >= 64)
{
assert (false);
return -1;
}
if ((hash & Masks(mDepth)) != mNodeID)
{
std::cerr << "selectBranch(" << getString () << std::endl;
std::cerr << " " << hash << " off branch" << std::endl;
assert (false);
return -1; // does not go under this node
}
#endif
int branch = * (hash.begin () + (mDepth / 2));
if (mDepth & 1)

View File

@@ -19,8 +19,6 @@
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/unit_test.h>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
namespace ripple
{
@@ -78,39 +76,73 @@ public:
}
void
testSerialization()
testFromString()
{
testcase("fromString");
auto works = [](RangeSet<std::uint32_t> const & orig)
{
std::stringstream ss;
boost::archive::binary_oarchive oa(ss);
oa << orig;
RangeSet<std::uint32_t> set;
boost::archive::binary_iarchive ia(ss);
RangeSet<std::uint32_t> deser;
ia >> deser;
BEAST_EXPECT(!from_string(set, ""));
BEAST_EXPECT(boost::icl::length(set) == 0);
return orig == deser;
};
BEAST_EXPECT(!from_string(set, "#"));
BEAST_EXPECT(boost::icl::length(set) == 0);
RangeSet<std::uint32_t> rs;
BEAST_EXPECT(!from_string(set, ","));
BEAST_EXPECT(boost::icl::length(set) == 0);
BEAST_EXPECT(works(rs));
BEAST_EXPECT(!from_string(set, ",-"));
BEAST_EXPECT(boost::icl::length(set) == 0);
rs.insert(3);
BEAST_EXPECT(works(rs));
BEAST_EXPECT(!from_string(set, "1,,2"));
BEAST_EXPECT(boost::icl::length(set) == 0);
rs.insert(range(7u, 10u));
BEAST_EXPECT(works(rs));
set.clear();
BEAST_EXPECT(from_string(set, "1"));
BEAST_EXPECT(boost::icl::length(set) == 1);
BEAST_EXPECT(boost::icl::first(set) == 1);
set.clear();
BEAST_EXPECT(from_string(set, "1,1"));
BEAST_EXPECT(boost::icl::length(set) == 1);
BEAST_EXPECT(boost::icl::first(set) == 1);
set.clear();
BEAST_EXPECT(from_string(set, "1-1"));
BEAST_EXPECT(boost::icl::length(set) == 1);
BEAST_EXPECT(boost::icl::first(set) == 1);
set.clear();
BEAST_EXPECT(from_string(set, "1,4-6"));
BEAST_EXPECT(boost::icl::length(set) == 4);
BEAST_EXPECT(boost::icl::first(set) == 1);
BEAST_EXPECT(!boost::icl::contains(set, 2));
BEAST_EXPECT(!boost::icl::contains(set, 3));
BEAST_EXPECT(boost::icl::contains(set, 4));
BEAST_EXPECT(boost::icl::contains(set, 5));
BEAST_EXPECT(boost::icl::last(set) == 6);
set.clear();
BEAST_EXPECT(from_string(set, "1-2,4-6"));
BEAST_EXPECT(boost::icl::length(set) == 5);
BEAST_EXPECT(boost::icl::first(set) == 1);
BEAST_EXPECT(boost::icl::contains(set, 2));
BEAST_EXPECT(boost::icl::contains(set, 4));
BEAST_EXPECT(boost::icl::last(set) == 6);
set.clear();
BEAST_EXPECT(from_string(set, "1-2,6"));
BEAST_EXPECT(boost::icl::length(set) == 3);
BEAST_EXPECT(boost::icl::first(set) == 1);
BEAST_EXPECT(boost::icl::contains(set, 2));
BEAST_EXPECT(boost::icl::last(set) == 6);
}
void
run() override
{
testPrevMissing();
testToString();
testSerialization();
testFromString();
}
};

View File

@@ -109,7 +109,7 @@ public:
std::unique_ptr<perf::PerfLog> perfLog =
std::make_unique<perf::PerfLogTest>();
Workers w(cb, *perfLog, "Test", tc1);
Workers w(cb, perfLog.get(), "Test", tc1);
BEAST_EXPECT(w.getNumberOfThreads() == tc1);
auto testForThreadCount = [this, &cb, &w] (int const threadCount)

View File

@@ -165,7 +165,7 @@ public:
std::unique_ptr<Database> db =
Manager::instance().make_Database(
"test", scheduler, 2, parent, nodeParams, journal_);
BEAST_EXPECT(db->earliestSeq() == XRP_LEDGER_EARLIEST_SEQ);
BEAST_EXPECT(db->earliestLedgerSeq() == XRP_LEDGER_EARLIEST_SEQ);
}
// Set an invalid earliest ledger sequence
@@ -190,7 +190,7 @@ public:
"test", scheduler, 2, parent, nodeParams, journal_);
// Verify database uses the earliest ledger sequence setting
BEAST_EXPECT(db->earliestSeq() == 1);
BEAST_EXPECT(db->earliestLedgerSeq() == 1);
}

View File

@@ -195,7 +195,7 @@ public:
db.store (object->getType (),
std::move (data),
object->getHash (),
db.earliestSeq());
db.earliestLedgerSeq());
}
}

View File

@@ -2998,10 +2998,9 @@ static RPCCallTestData const rpcCallTestArray [] =
})"
},
{
"download_shard: novalidate.", __LINE__,
"download_shard:", __LINE__,
{
"download_shard",
"novalidate",
"20",
"url_NotValidated",
},
@@ -3016,8 +3015,7 @@ static RPCCallTestData const rpcCallTestArray [] =
"index" : 20,
"url" : "url_NotValidated"
}
],
"validate" : false
]
}
]
})"
@@ -3064,10 +3062,9 @@ static RPCCallTestData const rpcCallTestArray [] =
})"
},
{
"download_shard: novalidate many shards.", __LINE__,
"download_shard: many shards.", __LINE__,
{
"download_shard",
"novalidate",
"2000000",
"url_NotValidated0",
"2000001",
@@ -3106,8 +3103,7 @@ static RPCCallTestData const rpcCallTestArray [] =
"index" : 2000004,
"url" : "url_NotValidated4"
}
],
"validate" : false
]
}
]
})"
@@ -3160,8 +3156,7 @@ static RPCCallTestData const rpcCallTestArray [] =
"index" : 20,
"url" : "url_NotValidated"
}
],
"validate" : false
]
}
]
})"