Add OpenLedger:

The OpenLedger class encapsulates the functionality of
maintaining the open ledger. It uses an OpenView with the
last closed ledger as its base. Routines are provided to
modify the open ledger to add new transactions, and to
accept a new last closed ledger. Business logic for
performing transaction retries is rewritten to fit this
framework and used in the implementation of accept.

When the RIPPLE_OPEN_LEDGER macro is set to 1 (BeastConfig.h),
the global Application OpenLedger singleton maintains
its open ledger in parallel by applying new transactions
and accepting new last closed ledgers. In the current
implementation this does not affect transaction processing
but logs any differences in the results as compared to
the original code.

Logging shows an occasional mismatch in what the OpenLedger
builds versus the original code, usually an OfferCreate
which gets a terINSUF_RESERVE instead of tesSUCCESS.
This commit is contained in:
Vinnie Falco
2015-06-30 14:13:03 -07:00
parent f5873bcad0
commit 023715474c
20 changed files with 841 additions and 66 deletions

View File

@@ -1409,6 +1409,10 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\app\ledger\impl\OpenLedger.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\app\ledger\impl\TxMeta.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
@@ -1447,6 +1451,8 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\app\ledger\MetaView.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\app\ledger\OpenLedger.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\app\ledger\OrderBookDB.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>

View File

@@ -2145,6 +2145,9 @@
<ClCompile Include="..\..\src\ripple\app\ledger\impl\MetaView.cpp">
<Filter>ripple\app\ledger\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\app\ledger\impl\OpenLedger.cpp">
<Filter>ripple\app\ledger\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\app\ledger\impl\TxMeta.cpp">
<Filter>ripple\app\ledger\impl</Filter>
</ClCompile>
@@ -2190,6 +2193,9 @@
<ClInclude Include="..\..\src\ripple\app\ledger\MetaView.h">
<Filter>ripple\app\ledger</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\app\ledger\OpenLedger.h">
<Filter>ripple\app\ledger</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\app\ledger\OrderBookDB.cpp">
<Filter>ripple\app\ledger</Filter>
</ClCompile>

View File

@@ -199,4 +199,9 @@
#define RIPPLE_ENABLE_DELIVERMIN 0
#endif
// Enables the experimental OpenLedger
#ifndef RIPPLE_OPEN_LEDGER
#define RIPPLE_OPEN_LEDGER 0
#endif
#endif

View File

@@ -1094,6 +1094,8 @@ Ledger::txInsert (uint256 const& key,
> const& txn, std::shared_ptr<
Serializer const> const& metaData)
{
assert (static_cast<bool>(metaData) != info_.open);
if (metaData)
{
// low-level - just add to table

View File

@@ -0,0 +1,312 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LEDGER_OPENLEDGER_H_INCLUDED
#define RIPPLE_APP_LEDGER_OPENLEDGER_H_INCLUDED
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/MetaView.h>
#include <ripple/app/misc/CanonicalTXSet.h>
#include <ripple/app/misc/IHashRouter.h>
#include <ripple/basics/chrono.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/UnorderedContainers.h>
#include <ripple/core/Config.h>
#include <beast/container/aged_unordered_map.h>
#include <beast/utility/Journal.h>
#include <cassert>
#include <mutex>
namespace ripple {
// How many total extra passes we make
// We must ensure we make at least one non-retriable pass
#define LEDGER_TOTAL_PASSES 3
// How many extra retry passes we
// make if the previous retry pass made changes
#define LEDGER_RETRY_PASSES 1
using OrderedTxs = CanonicalTXSet;
namespace detail {
class CachedSLEs
{
public:
using key_type = uint256;
using value_type =
std::shared_ptr<SLE const>;
CachedSLEs (CachedSLEs const&) = delete;
CachedSLEs& operator= (CachedSLEs const&) = delete;
template <class Rep, class Period>
CachedSLEs (std::chrono::duration<
Rep, Period> const& timeToLive,
Stopwatch& clock)
: clock_ (clock)
, timeToLive_ (timeToLive)
, map_ (clock)
{
}
value_type
find (key_type const& key);
// `false` if the insert failed
std::pair<value_type, bool>
insert (key_type const& key,
value_type const& value);
private:
Stopwatch const& clock_;
Stopwatch::duration timeToLive_;
beast::aged_unordered_map<key_type,
value_type, Stopwatch::clock_type,
hardened_hash<strong_hash>
> map_;
};
} // detail
//------------------------------------------------------------------------------
/** Represents the open ledger. */
class OpenLedger
{
private:
beast::Journal j_;
Config const& config_;
Stopwatch& clock_;
detail::CachedSLEs cache_;
std::mutex mutable modify_mutex_;
std::mutex mutable current_mutex_;
std::shared_ptr<MetaView const> current_;
public:
OpenLedger() = delete;
OpenLedger (OpenLedger const&) = delete;
OpenLedger& operator= (OpenLedger const&) = delete;
/** Create a new open ledger object.
@param ledger A closed ledger
*/
// Although ledger is not const, we do not modify it.
explicit
OpenLedger (std::shared_ptr<
Ledger> const& ledger,
Config const& config,
Stopwatch& clock,
beast::Journal journal);
/** Returns a view to the current open ledger.
Thread safety:
Can be called concurrently from any thread.
Effects:
The caller is given ownership of a
non-modifiable snapshot of the open ledger
at the time of the call.
*/
std::shared_ptr<BasicView const>
current() const;
/** Modify the open ledger
Thread safety:
Can be called concurrently from any thread.
`f` will be called as
bool(BasicView&)
If `f` returns `true`, the changes made in the
View will be published to the open ledger.
@return `true` if a the open ledger was changed
*/
// VFALCO This should take a `BasicView`
bool
modify (std::function<
bool(View&, beast::Journal)> const& f);
/** Accept a new ledger.
Thread safety:
Can be called concurrently from any thread.
Effects:
A new open view based on the accepted ledger
is created, and the list of retriable
transactions is optionally applied first
depending on the value of `retriesFirst`.
The transactions in the current open view
are applied to the new open view.
The list of local transactions are applied
to the new open view.
Any failed, retriable transactions are left
in `retries` for the caller.
The current view is atomically set to the
new open view.
@param ledger A new closed ledger
*/
// Although ledger is not const, we do not modify it.
void
accept(std::shared_ptr<Ledger> const& ledger,
OrderedTxs const& locals, bool retriesFirst,
OrderedTxs& retries, IHashRouter& router,
std::string const& suffix = "");
/** Algorithm for applying transactions.
This has the retry logic and ordering semantics
used for consensus and building the open ledger.
*/
template <class FwdRange>
static
void
apply (View& view, BasicView const& check,
FwdRange const& txs, OrderedTxs& retries,
IHashRouter& router, Config const& config,
beast::Journal j);
private:
enum Result
{
success,
failure,
retry
};
std::shared_ptr<MetaView>
create (std::shared_ptr<
Ledger> const& ledger);
static
Result
apply_one (View& view, std::shared_ptr<
STTx const> const& tx, bool retry,
IHashRouter& router, Config const& config,
beast::Journal j);
public:
//--------------------------------------------------------------------------
//
// TEST CODE
//
// Verify that the open ledger has the right contents
// This is called while holding the master and ledger master mutexes
bool
verify (Ledger const& ledger,
std::string const& suffix = "") const;
};
//------------------------------------------------------------------------------
template <class FwdRange>
void
OpenLedger::apply (View& view,
BasicView const& check, FwdRange const& txs,
OrderedTxs& retries, IHashRouter& router,
Config const& config, beast::Journal j)
{
for (auto iter = txs.begin();
iter != txs.end(); ++iter)
{
try
{
// Dereferencing the iterator can
// throw since it may be transformed.
auto const tx = *iter;
if (check.txExists(tx->getTransactionID()))
continue;
auto const result = apply_one(view,
tx, true, router, config, j);
if (result == Result::retry)
retries.insert(tx);
}
catch(...)
{
JLOG(j.error) <<
"Caught exception";
}
}
bool retry = true;
for (int pass = 0;
pass < LEDGER_TOTAL_PASSES;
++pass)
{
int changes = 0;
auto iter = retries.begin();
while (iter != retries.end())
{
switch (apply_one(view,
iter->second, retry,
router, config, j))
{
case Result::success:
++changes;
case Result::failure:
iter = retries.erase (iter);
break;
case Result::retry:
++iter;
}
}
// A non-retry pass made no changes
if (! changes && ! retry)
return;
// Stop retriable passes
if (! changes || (pass >= LEDGER_RETRY_PASSES))
retry = false;
}
// If there are any transactions left, we must have
// tried them in at least one final pass
assert (retries.empty() || ! retry);
}
//------------------------------------------------------------------------------
// For debug logging
std::string
debugTxstr (std::shared_ptr<STTx const> const& tx);
std::string
debugTostr (OrderedTxs const& set);
std::string
debugTostr (SHAMap const& set);
std::string
debugTostr (std::shared_ptr<BasicView const> const& view);
} // ripple
#endif

View File

@@ -90,14 +90,6 @@ private:
hash_map <NodeID, bool> mVotes;
};
// How many total extra passes we make
// We must ensure we make at least one non-retriable pass
#define LEDGER_TOTAL_PASSES 3
// How many extra retry passes we
// make if the previous retry pass made changes
#define LEDGER_RETRY_PASSES 1
} // ripple
#endif

View File

@@ -23,6 +23,7 @@
#include <ripple/app/ledger/LedgerTiming.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/ledger/MetaView.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/ledger/impl/DisputedTx.h>
#include <ripple/app/ledger/impl/LedgerConsensusImp.h>
#include <ripple/app/main/Application.h>
@@ -987,6 +988,7 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
WriteLog (lsDEBUG, LedgerConsensus)
<< "Applying consensus set transactions to the"
<< " last closed ledger";
{
MetaView accum(*newLCL, tapNONE);
assert(accum.closed());
@@ -1000,6 +1002,12 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
// made it into the consensus set but failed during application
// to the ledger.
// Make a copy for OpenLedger
#if RIPPLE_OPEN_LEDGER
CanonicalTXSet retries =
retriableTransactions;
#endif
newLCL->updateSkipList ();
int asf = newLCL->stateMap().flushDirty (
@@ -1023,10 +1031,10 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
WriteLog (lsDEBUG, LedgerConsensus)
<< "Consensus built new ledger";
uint256 const newLCLHash = newLCL->getHash ();
WriteLog (lsDEBUG, LedgerConsensus)
<< "Report: NewL = " << newLCL->getHash ()
<< "Report: NewL = " << newLCLHash
<< ":" << newLCL->getLedgerSeq ();
uint256 newLCLHash = newLCL->getHash ();
// Tell directly connected peers that we have a new LCL
statusChange (protocol::neACCEPTED_LEDGER, *newLCL);
@@ -1099,7 +1107,12 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
auto txn = std::make_shared<STTx>(sit);
retriableTransactions.push_back (txn);
retriableTransactions.insert (txn);
// For OpenLedger
#if RIPPLE_OPEN_LEDGER
retries.insert(txn);
#endif
anyDisputes = true;
}
@@ -1122,8 +1135,12 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
LedgerMaster::ScopedLockType sl (ledgerMaster_.peekMutex (), std::defer_lock);
std::lock(lock, sl);
// Apply transactions from the old open ledger
Ledger::pointer oldOL = ledgerMaster_.getCurrentLedger();
auto const localTx = m_localTX.getTxSet();
auto const oldOL = ledgerMaster_.getCurrentLedger();
#if RIPPLE_OPEN_LEDGER
getApp().openLedger().verify(*oldOL, "consensus before");
#endif
if (oldOL->txMap().getHash().isNonZero ())
{
WriteLog (lsDEBUG, LedgerConsensus)
@@ -1131,29 +1148,20 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
applyTransactions (&oldOL->txMap(),
accum, newLCL, retriableTransactions);
}
// Apply local transactions
for (auto it : m_localTX.getTxSet ())
{
try
{
apply (accum, *it.second, tapNONE, getConfig(),
for (auto const& item : localTx)
apply (accum, *item.second, tapNONE, getConfig(),
deprecatedLogs().journal("LedgerConsensus"));
}
catch (...)
{
// Nothing special we need to do.
// It's possible a cleverly malformed transaction or
// corrupt back end database could cause an exception
// during transaction processing.
}
}
accum.apply(*newOL,
deprecatedLogs().journal("LedgerConsensus"));
// We have a new Last Closed Ledger and new Open Ledger
ledgerMaster_.pushLedger (newLCL, newOL);
#if RIPPLE_OPEN_LEDGER
getApp().openLedger().accept(newLCL,
localTx, anyDisputes, retries,
getApp().getHashRouter(), "consensus");
getApp().openLedger().verify(*newOL, "consensus after");
#endif
}
mNewLedgerHash = newLCL->getHash ();
@@ -1750,7 +1758,7 @@ make_LedgerConsensus (ConsensusImp& consensus, int previousProposers,
static
int
applyTransaction (BasicView& view,
STTx const& txn,
std::shared_ptr<STTx const> const& txn,
bool retryAssured,
bool enableTesting)
{
@@ -1765,23 +1773,22 @@ applyTransaction (BasicView& view,
parms = static_cast<ViewFlags> (parms | tapRETRY);
}
if ((getApp().getHashRouter ().getFlags (txn.getTransactionID ())
if ((getApp().getHashRouter ().getFlags (txn->getTransactionID ())
& SF_SIGGOOD) == SF_SIGGOOD)
{
parms = static_cast<ViewFlags>
(parms | tapNO_CHECK_SIGN);
}
WriteLog (lsDEBUG, LedgerConsensus) << "TXN "
<< txn.getTransactionID ()
<< txn->getTransactionID ()
//<< (engine.view().open() ? " open" : " closed") // because of the optional in engine
<< (retryAssured ? "/retry" : "/final");
WriteLog (lsTRACE, LedgerConsensus) << txn.getJson (0);
WriteLog (lsTRACE, LedgerConsensus) << txn->getJson (0);
try
{
auto const result = apply(view, txn, parms, getConfig(),
auto const result = apply(view, *txn, parms, getConfig(),
deprecatedLogs().journal("LedgerConsensus"));
if (result.second)
{
WriteLog (lsDEBUG, LedgerConsensus)
@@ -1827,24 +1834,27 @@ void applyTransactions (
WriteLog (lsDEBUG, LedgerConsensus) <<
"Processing candidate transaction: " << item->key();
std::shared_ptr<STTx const> txn;
try
{
SerialIter sit (item->slice());
auto const txn =
std::make_shared<STTx const>(sit);
if (applyTransaction(applyView, *txn, true, enableTesting) ==
LedgerConsensusImp::resultRetry)
{
// On failure, stash the failed transaction for
// later retry.
retriableTransactions.push_back (txn);
}
txn = std::make_shared<STTx const>(sit);
}
catch (...)
{
WriteLog (lsWARNING, LedgerConsensus) << " Throws";
}
if (txn)
{
if (applyTransaction(applyView, txn, true, enableTesting) ==
LedgerConsensusImp::resultRetry)
{
// On failure, stash the failed transaction for
// later retry.
retriableTransactions.insert (txn);
}
}
}
}
@@ -1864,7 +1874,7 @@ void applyTransactions (
try
{
switch (applyTransaction (applyView,
*it->second, certainRetry, enableTesting))
it->second, certainRetry, enableTesting))
{
case LedgerConsensusImp::resultSuccess:
it = retriableTransactions.erase (it);

View File

@@ -21,6 +21,7 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerHistory.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/ledger/OrderBookDB.h>
#include <ripple/app/ledger/PendingSaves.h>
#include <ripple/app/ledger/impl/LedgerCleaner.h>
@@ -250,7 +251,7 @@ public:
{
// returns true if transaction was added
ScopedLockType ml (m_mutex);
mHeldTransactions.push_back (transaction->getSTransaction ());
mHeldTransactions.insert (transaction->getSTransaction ());
}
void pushLedger (Ledger::pointer newLedger)
@@ -354,6 +355,31 @@ public:
mCurrentLedger.getMutable();
int recovers = 0;
#if RIPPLE_OPEN_LEDGER
bool const openLedger = true;
#else
bool const openLedger = false;
#endif
if (openLedger)
{
getApp().openLedger().modify(
[&](View& view, beast::Journal j)
{
bool any = false;
for (auto const& it : mHeldTransactions)
{
ViewFlags flags = tapNONE;
if (getApp().getHashRouter().addSuppressionFlags (it.first.getTXID (), SF_SIGGOOD))
flags = flags | tapNO_CHECK_SIGN;
auto const result = apply(view,
*it.second, flags, getConfig(), j);
if (result.second)
any = true;
}
return any;
});
}
for (auto const& it : mHeldTransactions)
{
try

View File

@@ -377,11 +377,10 @@ MetaView::txInsert (uint256 const& key,
> const& txn, std::shared_ptr<
Serializer const> const& metaData)
{
if (txs_.count(key) ||
base_.txExists(key))
if (base_.txExists(key) ||
! txs_.emplace(key,
std::make_pair(txn, metaData)).second)
LogicError("duplicate_tx: " + to_string(key));
txs_.emplace(key, std::make_pair(
txn, metaData));
}
std::vector<uint256>

View File

@@ -0,0 +1,317 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/tx/apply.h>
#include <boost/range/adaptor/transformed.hpp>
namespace ripple {
namespace detail {
auto
CachedSLEs::find(
key_type const& key) ->
value_type
{
auto const iter =
map_.find(key);
if (iter == map_.end())
return nullptr;
map_.touch(iter);
return iter->second;
}
auto
CachedSLEs::insert(
key_type const& key,
value_type const& value) ->
std::pair<value_type, bool>
{
beast::expire(map_, timeToLive_);
auto const result =
map_.emplace(key, value);
if (result.second)
return { value, true };
return { result.first->second, false };
}
} // detail
//------------------------------------------------------------------------------
class CachedSLEView
: public BasicViewWrapper<Ledger&>
{
private:
detail::CachedSLEs& cache_;
std::mutex mutable mutex_;
std::shared_ptr<Ledger> ledger_;
public:
// Retains ownership of ledger
// for lifetime management.
CachedSLEView(
std::shared_ptr<Ledger> const& ledger,
detail::CachedSLEs& cache)
: BasicViewWrapper(*ledger)
, cache_ (cache)
, ledger_ (ledger)
{
}
std::shared_ptr<SLE const>
read (Keylet const& k) const override;
};
std::shared_ptr<SLE const>
CachedSLEView::read (Keylet const& k) const
{
key_type key;
auto const item =
view_.stateMap().peekItem(k.key, key);
if (! item)
return nullptr;
{
std::lock_guard<
std::mutex> lock(mutex_);
if (auto sle = cache_.find(key))
{
if(! k.check(*sle))
return nullptr;
return sle;
}
}
SerialIter sit(item->slice());
// VFALCO This should be <SLE const>
auto sle = std::make_shared<
SLE>(sit, item->key());
if (! k.check(*sle))
return nullptr;
// VFALCO TODO Eliminate "immutable" runtime property
sle->setImmutable ();
std::lock_guard<
std::mutex> lock(mutex_);
auto const result =
cache_.insert(key, sle);
if (! result.second)
return result.first;
// Need std::move to avoid a copy
// because return type is different
return std::move(sle);
}
//------------------------------------------------------------------------------
OpenLedger::OpenLedger(std::shared_ptr<
Ledger> const& ledger,
Config const& config,
Stopwatch& clock,
beast::Journal journal)
: j_ (journal)
, config_ (config)
, clock_ (clock)
, cache_ (std::chrono::minutes(1), clock)
, current_ (create(ledger))
{
}
std::shared_ptr<BasicView const>
OpenLedger::current() const
{
std::lock_guard<
std::mutex> lock(
current_mutex_);
return current_;
}
bool
OpenLedger::modify (std::function<
bool(View&, beast::Journal)> const& f)
{
std::lock_guard<
std::mutex> lock1(modify_mutex_);
auto next = std::make_shared<
MetaView>(shallow_copy, *current_);
auto const changed = f(*next, j_);
if (changed)
{
std::lock_guard<
std::mutex> lock2(
current_mutex_);
current_ = std::move(next);
}
return changed;
}
void
OpenLedger::accept (std::shared_ptr<Ledger> const& ledger,
OrderedTxs const& locals, bool retriesFirst,
OrderedTxs& retries, IHashRouter& router,
std::string const& suffix)
{
JLOG(j_.error) <<
"accept ledger " << ledger->seq() << " " << suffix;
auto next = create(ledger);
if (retriesFirst)
{
// Handle disputed tx, outside lock
using empty =
std::vector<std::shared_ptr<
STTx const>>;
apply (*next, *ledger, empty{},
retries, router, config_, j_);
}
// Block calls to modify, otherwise
// new tx going into the open ledger
// would get lost.
std::lock_guard<
std::mutex> lock1(modify_mutex_);
// Apply tx from the current open view
if (! current_->txs.empty())
apply (*next, *ledger,
boost::adaptors::transform(
current_->txs,
[](std::pair<std::shared_ptr<
STTx const>, std::shared_ptr<
STObject const>> const& p)
{
return p.first;
}),
retries, router, config_, j_);
// Apply local tx
for (auto const& item : locals)
ripple::apply(*next, *item.second,
tapNONE, config_, j_);
// Switch to the new open view
std::lock_guard<
std::mutex> lock2(current_mutex_);
current_ = std::move(next);
}
//------------------------------------------------------------------------------
std::shared_ptr<MetaView>
OpenLedger::create (std::shared_ptr<
Ledger> const& ledger)
{
auto cache = std::make_shared<
CachedSLEView>(ledger, cache_);
return std::make_shared<
MetaView>(open_ledger,
*cache, cache);
}
auto
OpenLedger::apply_one (View& view,
std::shared_ptr<STTx const> const& tx,
bool retry, IHashRouter& router,
Config const& config, beast::Journal j) ->
Result
{
auto flags = view.flags();
if (retry)
flags = flags | tapRETRY;
if ((router.getFlags(
tx->getTransactionID()) & SF_SIGGOOD) ==
SF_SIGGOOD)
flags = flags | tapNO_CHECK_SIGN;
auto const result = ripple::apply(
view, *tx, flags, config, j);
if (result.second)
return Result::success;
if (isTefFailure (result.first) ||
isTemMalformed (result.first) ||
isTelLocal (result.first))
return Result::failure;
return Result::retry;
}
//------------------------------------------------------------------------------
bool
OpenLedger::verify (Ledger const& ledger,
std::string const& suffix) const
{
std::lock_guard<
std::mutex> lock(modify_mutex_);
auto list1 = ledger.txList();
auto list2 = current_->txList();
std::sort(list1.begin(), list1.end());
std::sort(list2.begin(), list2.end());
if (list1 == list2)
return true;
JLOG(j_.error) <<
"verify ledger " << ledger.seq() << ": " <<
list1.size() << " / " << list2.size() <<
" " << " MISMATCH " << suffix;
return false;
}
//------------------------------------------------------------------------------
std::string
debugTxstr (std::shared_ptr<STTx const> const& tx)
{
std::stringstream ss;
ss << tx->getTransactionID();
return ss.str().substr(0, 4);
}
std::string
debugTostr (OrderedTxs const& set)
{
std::stringstream ss;
for(auto const& item : set)
ss << debugTxstr(item.second) << ", ";
return ss.str();
}
std::string
debugTostr (SHAMap const& set)
{
std::stringstream ss;
for (auto const& item : set)
{
try
{
SerialIter sit(item->slice());
auto const tx = std::make_shared<
STTx const>(sit);
ss << debugTxstr(tx) << ", ";
}
catch(...)
{
ss << "THRO, ";
}
}
return ss.str();
}
std::string
debugTostr (std::shared_ptr<BasicView const> const& view)
{
std::stringstream ss;
for(auto const& item : view->txs)
ss << debugTxstr(item.first) << ", ";
return ss.str();
}
} // ripple

View File

@@ -27,6 +27,7 @@
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/ledger/OrderBookDB.h>
#include <ripple/app/ledger/PendingSaves.h>
#include <ripple/app/main/CollectorManager.h>
@@ -67,9 +68,10 @@
#include <ripple/websocket/MakeServer.h>
#include <ripple/crypto/RandomNumbers.h>
#include <beast/asio/io_latency_probe.h>
#include <boost/asio/signal_set.hpp>
#include <beast/module/core/text/LexicalCast.h>
#include <beast/module/core/thread/DeadlineTimer.h>
#include <boost/asio/signal_set.hpp>
#include <boost/optional.hpp>
#include <fstream>
namespace ripple {
@@ -261,6 +263,7 @@ public:
std::unique_ptr <NodeStore::Database> m_nodeStore;
PendingSaves pendingSaves_;
AccountIDCache accountIDCache_;
boost::optional<OpenLedger> openLedger_;
// These are not Stoppable-derived
NodeCache m_tempNodeCache;
@@ -608,6 +611,12 @@ public:
return accountIDCache_;
}
OpenLedger&
openLedger() override
{
return *openLedger_;
}
Overlay& overlay ()
{
return *m_overlay;
@@ -1067,6 +1076,9 @@ void ApplicationImp::startNewLedger ()
assert (secondLedger->exists(keylet::account(
calcAccountID(rootAddress))));
m_networkOPs->setLastCloseTime (secondLedger->getCloseTimeNC ());
openLedger_.emplace(secondLedger, getConfig(),
stopwatch(), deprecatedLogs().journal("OpenLedger"));
}
}

View File

@@ -54,6 +54,7 @@ class InboundTransactions;
class LedgerMaster;
class LoadManager;
class NetworkOPs;
class OpenLedger;
class OrderBookDB;
class Overlay;
class PathRequests;
@@ -118,6 +119,7 @@ public:
virtual SHAMapStore& getSHAMapStore () = 0;
virtual PendingSaves& pendingSaves() = 0;
virtual AccountIDCache const& accountIDCache() const = 0;
virtual OpenLedger& openLedger() = 0;
virtual DatabaseCon& getTxnDB () = 0;
virtual DatabaseCon& getLedgerDB () = 0;

View File

@@ -74,7 +74,7 @@ bool CanonicalTXSet::Key::operator>= (Key const& rhs)const
return mTXid >= rhs.mTXid;
}
void CanonicalTXSet::push_back (std::shared_ptr<STTx const> const& txn)
void CanonicalTXSet::insert (std::shared_ptr<STTx const> const& txn)
{
uint256 effectiveAccount = mSetHash;

View File

@@ -81,7 +81,7 @@ public:
{
}
void push_back (std::shared_ptr<STTx const> const& txn);
void insert (std::shared_ptr<STTx const> const& txn);
// VFALCO TODO remove this function
void reset (LedgerHash const& newLastClosedLedgerHash)

View File

@@ -31,6 +31,7 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerTiming.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/ledger/OrderBookDB.h>
#include <ripple/app/main/LoadManager.h>
#include <ripple/app/main/LocalCredentials.h>
@@ -967,6 +968,25 @@ void NetworkOPsImp::transactionBatch()
}
}
#if RIPPLE_OPEN_LEDGER
static
void
mismatch (std::shared_ptr<SLE const> const& sle1, TER ter1,
std::shared_ptr<SLE const> const& sle2, TER ter2,
std::shared_ptr<STTx const> tx,
beast::Journal j)
{
JLOG(j.error) <<
"TER " << (ter1 == ter2 ? " " : "MISMATCH ") <<
transToken(ter1) << " vs " <<
transToken(ter2);
JLOG(j.error) <<
tx->getJson(0) << '\n' <<
(sle1 ? sle1->getJson(0) : "MISSING ACCOUNTROOT1") << '\n' <<
(sle2 ? sle2->getJson(0) : "MISSING ACCOUNTROOT2") << '\n';
}
#endif
void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
{
std::vector<TransactionStatus> transactions;
@@ -986,6 +1006,10 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
{
std::lock_guard <std::recursive_mutex> lock (
m_ledgerMaster.peekMutex());
#if RIPPLE_OPEN_LEDGER
auto const oldOL = m_ledgerMaster.getCurrentLedgerHolder().get();
getApp().openLedger().verify(*oldOL, "apply before");
#endif
ledger = m_ledgerMaster.getCurrentLedgerHolder().getMutable();
accum.emplace(*ledger, tapNONE);
for (TransactionStatus& e : transactions)
@@ -994,12 +1018,34 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
flags = flags | tapNO_CHECK_SIGN;
if (e.admin)
flags = flags | tapADMIN;
#if RIPPLE_OPEN_LEDGER
auto const sle1 = accum->read(
keylet::account(e.transaction->getSTransaction()->getAccountID(sfAccount)));
#endif
std::tie (e.result, e.applied) =
ripple::apply (*accum,
*e.transaction->getSTransaction(), flags,
getConfig(), deprecatedLogs().journal(
"NetworkOPs"));
applied |= e.applied;
#if RIPPLE_OPEN_LEDGER
auto const sle2 = getApp().openLedger().current()->read(keylet::account(e.transaction->getSTransaction()->getAccountID(sfAccount)));
// VFALCO Should do the loop inside modify()
getApp().openLedger().modify(
[&](View& view, beast::Journal j)
{
auto const result = ripple::apply(
view, *e.transaction->getSTransaction(),
flags, getConfig(), j);
if (result.first != e.result)
mismatch(sle1, e.result, sle2, result.first,
e.transaction->getSTransaction(), j);
return result.second;
});
#endif
}
}
@@ -1007,6 +1053,9 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
{
accum->apply(*ledger,
deprecatedLogs().journal("NetworkOPs"));
#if RIPPLE_OPEN_LEDGER
getApp().openLedger().verify(*ledger, "apply after");
#endif
ledger->setImmutable();
m_ledgerMaster.getCurrentLedgerHolder().set (ledger);
}
@@ -1404,7 +1453,12 @@ void NetworkOPsImp::switchLastClosedLedger (
// set the newLCL as our last closed ledger -- this is abnormal code
auto msg = duringConsensus ? "JUMPdc" : "JUMP";
m_journal.error << msg << " last closed ledger to " << newLCL->getHash ();
#if RIPPLE_OPEN_LEDGER
m_journal.fatal
#else
m_journal.error
#endif
<< msg << " last closed ledger to " << newLCL->getHash ();
clearNeedNetworkLedger ();
newLCL->setClosed ();
@@ -1412,16 +1466,32 @@ void NetworkOPsImp::switchLastClosedLedger (
Ledger>(false, std::ref (*newLCL));
// Caller must own master lock
{
// Apply tx in old open ledger to new
// open ledger, then apply local tx.
auto const oldOL =
m_ledgerMaster.getCurrentLedger();
#if RIPPLE_OPEN_LEDGER
getApp().openLedger().verify(
*oldOL, "jump before");
#endif
// Apply tx in old open ledger to new
// open ledger. Then apply local tx.
MetaView accum(*newOL, tapNONE);
assert(accum.open());
auto retries = m_localTX->getTxSet();
auto const localTx = m_localTX->getTxSet();
{
auto retries = localTx;
applyTransactions (&oldOL->txMap(),
accum, newLCL, retries);
}
accum.apply(*newOL, m_journal);
#if RIPPLE_OPEN_LEDGER
auto retries = localTx;
getApp().openLedger().accept(
newLCL, OrderedTxs({}), false, retries,
getApp().getHashRouter(), "jump");
getApp().openLedger().verify(
*newOL, "jump after");
#endif
}
m_ledgerMaster.switchLedgers (newLCL, newOL);

View File

@@ -740,6 +740,11 @@ CreateOffer::applyGuts (View& view, View& view_cancel)
if (mPriorBalance < getAccountReserve (sleCreator))
{
#if RIPPLE_OPEN_LEDGER
deprecatedLogs().journal("OpenLedger").error <<
"mPriorBalance < getAccountReserve(sleCreator)";
#endif
// If we are here, the signing account had an insufficient reserve
// *prior* to our processing. If something actually crossed, then
// we allow this; otherwise, we just claim a fee.

View File

@@ -147,7 +147,7 @@ public:
std::lock_guard <std::mutex> lock (m_lock);
for (auto const& it : m_txns)
tset.push_back (it.getTX());
tset.insert (it.getTX());
}
return tset;

View File

@@ -631,14 +631,17 @@ TER Transactor::checkMultiSign ()
//------------------------------------------------------------------------------
static
inline
void
log (std::pair<
TER, bool> const& result,
beast::Journal j)
{
if(j.trace) j.trace <<
#if 0
JLOG(j.error) <<
"apply: { " << transToken(result.first) <<
", " << (result.second ? "true" : "false") << " }";
#endif
}
std::pair<TER, bool>

View File

@@ -117,6 +117,9 @@ public:
using mapped_type =
std::shared_ptr<SLE const>;
BasicView (BasicView const&) = delete;
BasicView& operator= (BasicView const&) = delete;
BasicView()
: txs(*this)
{
@@ -643,6 +646,10 @@ operator&(ViewFlags const& lhs,
class View : public BasicView
{
public:
View() = default;
View (View const&) = delete;
View& operator= (View const&) = delete;
virtual ~View() = default;
/** Returns the contextual tx processing flags.

View File

@@ -39,6 +39,7 @@
#include <ripple/app/ledger/impl/LedgerMaster.cpp>
#include <ripple/app/ledger/impl/LedgerTiming.cpp>
#include <ripple/app/ledger/impl/MetaView.cpp>
#include <ripple/app/ledger/impl/OpenLedger.cpp>
#include <ripple/app/ledger/impl/TxMeta.cpp>
#include <ripple/app/ledger/tests/common_ledger.cpp>