Change initialization code to use "disable master key" txn as milestone:

* The initialization code now assumes that all transactions that come before
"disable master key" are part of setup. This is also used to set the initial
sequence numbers for the door accounts.

* Add additional logging information

* Rm start of historic transactions

* Delay unlocking of federator main loop

* Fix stop history tx only bug
This commit is contained in:
seelabs
2021-10-25 15:13:10 -04:00
parent d86b1f8b7d
commit 5743dc4537
14 changed files with 202 additions and 184 deletions

View File

@@ -312,25 +312,9 @@ struct RefundTransferResult
};
```
* `StartOfHistoricTransactions`. This is added when a federator detects that
there are no more historic transactions left to receive in a stream of
transactions. This is used when a federator initially syncs to the network.
```c++
struct StartOfHistoricTransactions
{
bool isMainchain_;
EventType
eventType() const;
Json::Value
toJson() const;
};
```
* `TicketCreateResult`. This is added when the federator detects a ticket create
transaction.
```
struct TicketCreateResult
{
@@ -436,8 +420,6 @@ void
onEvent(event::RefundTransferResult const& e);
void
onEvent(event::HeartbeatTimer const& e);
void
onEvent(event::StartOfHistoricTransactions const& e);
void
Federator::mainLoop()

View File

@@ -1707,7 +1707,8 @@ void
ApplicationImp::startFederator()
{
if (sidechainFederator_)
sidechainFederator_->unlockMainLoop();
sidechainFederator_->unlockMainLoop(
sidechain::Federator::UnlockMainLoopKey::app);
}
int

View File

@@ -83,6 +83,12 @@ getChainType(bool isMainchain)
: Federator::ChainType::sideChain;
}
[[nodiscard]] char const*
chainTypeStr(Federator::ChainType ct)
{
return ct == Federator::ChainType::mainChain ? "mainchain" : "sidechain";
}
[[nodiscard]] uint256
crossChainTxnSignatureId(
PublicKey signingPK,
@@ -384,6 +390,9 @@ parseFederatorSecrets(BasicConfig const& config, beast::Journal j)
}
auto seed = parseBase58<Seed>(elements[0]);
if (!seed)
seed = parseRippleLibSeed(elements[0]);
if (!seed)
{
std::string const msg =
@@ -671,7 +680,10 @@ make_Federator(
auto key = parseBase58<SecretKey>(TokenType::AccountSecret, *keyStr);
if (!key)
{
if (auto const seed = parseBase58<Seed>(*keyStr))
std::optional<Seed> seed = parseRippleLibSeed(*keyStr);
if (!seed)
seed = parseBase58<Seed>(*keyStr);
if (seed)
{
// TODO: we don't know the key type
key = generateKeyPair(KeyType::ed25519, *seed).second;
@@ -874,6 +886,12 @@ Federator::setLastTxnSeqConfirmedMax(
void
Federator::setAccountSeqMax(ChainType chaintype, std::uint32_t reqValue)
{
JLOGV(
j_.trace(),
"Federator setAccountSeqMax",
jv("chaintype", chainTypeStr(chaintype)),
jv("curValue", accountSeq_[chaintype]),
jv("reqValue", reqValue));
detail::lockfreeSetMax(accountSeq_[chaintype], reqValue);
}
@@ -926,6 +944,7 @@ Federator::payTxn(
}
auto const seq = accountSeq_[dstChain]++;
assert(seq > 1);
auto job = [federator = shared_from_this(),
txnType,
@@ -1140,6 +1159,11 @@ Federator::payTxn(
void
Federator::onEvent(event::XChainTransferDetected const& e)
{
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "XChainTransferDetected"),
jv("event", e.toJson()));
auto const srcChain = srcChainType(e.dir_);
std::optional<STAmount> toSendAmt =
toOtherChainAmount(srcChain, e.deliveredAmt_);
@@ -1200,7 +1224,11 @@ Federator::sendRefund(
void
Federator::onEvent(event::XChainTransferResult const& e)
{
JLOGV(j_.trace(), "Federator::onEvent", jv("event", e.toJson()));
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "XChainTransferResult"),
jv("event", e.toJson()));
// srcChain and dstChain are the chains of the triggering transaction.
// I.e. A srcChain of main is a transfer result is a transaction that
@@ -1294,7 +1322,11 @@ Federator::onEvent(event::XChainTransferResult const& e)
void
Federator::onEvent(event::RefundTransferResult const& e)
{
JLOGV(j_.trace(), "RefundTransferResult", jv("event", e.toJson()));
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "RefundTransferResult"),
jv("event", e.toJson()));
auto const srcChain = srcChainType(e.dir_);
onResult(srcChain, e.txnSeq_);
@@ -1317,7 +1349,11 @@ Federator::onEvent(event::RefundTransferResult const& e)
void
Federator::onEvent(event::HeartbeatTimer const& e)
{
JLOG(j_.trace()) << "HeartbeatTimer";
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "HeartbeatTimer"),
jv("event", e.toJson()));
}
void
@@ -1870,8 +1906,19 @@ Federator::sendTxns()
}
void
Federator::unlockMainLoop()
Federator::unlockMainLoop(UnlockMainLoopKey key)
{
// The main loop will only be unlocked when the app is ready and both side
// chain listeners are initialized.
unlockMainLoopKeys_[static_cast<std::size_t>(key)].store(true);
for (auto const& k : unlockMainLoopKeys_)
{
if (!k.load())
return;
}
JLOG(j_.trace()) << "Federator main loop unlocked";
// all keys set, unlock the loop
std::lock_guard<std::mutex> l(m_);
mainLoopLocked_ = false;
mainLoopCv_.notify_one();
@@ -1913,15 +1960,15 @@ Federator::mainLoop()
}
}
void
Federator::onEvent(event::StartOfHistoricTransactions const& e)
{
assert(0); // StartOfHistoricTransactions is only used in initial sync
}
void
Federator::onEvent(event::TicketCreateTrigger const& e)
{
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "TicketCreateTrigger"),
jv("event", e.toJson()));
Federator::ChainType toChain = e.dir_ == event::Dir::mainToSide
? Federator::sideChain
: Federator::mainChain;
@@ -1932,6 +1979,12 @@ Federator::onEvent(event::TicketCreateTrigger const& e)
void
Federator::onEvent(const event::TicketCreateResult& e)
{
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "TicketCreateResult"),
jv("event", e.toJson()));
auto const [fromChain, toChain] = e.dir_ == event::Dir::mainToSide
? std::make_pair(sideChain, mainChain)
: std::make_pair(mainChain, sideChain);
@@ -1952,6 +2005,12 @@ Federator::onEvent(const event::TicketCreateResult& e)
void
Federator::onEvent(event::DepositAuthResult const& e)
{
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "DepositAuthResult"),
jv("event", e.toJson()));
auto const chainType =
(e.dir_ == event::Dir::mainToSide ? sideChain : mainChain);
@@ -1971,6 +2030,12 @@ Federator::onEvent(event::DepositAuthResult const& e)
void
Federator::onEvent(event::BootstrapTicket const& e)
{
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "BootstrapTicket"),
jv("event", e.toJson()));
setAccountSeqMax(getChainType(e.isMainchain_), e.txnSeq_ + 1);
setLastTxnSeqSentMax(getChainType(e.isMainchain_), e.txnSeq_);
setLastTxnSeqConfirmedMax(getChainType(e.isMainchain_), e.txnSeq_);
@@ -1980,6 +2045,12 @@ Federator::onEvent(event::BootstrapTicket const& e)
void
Federator::onEvent(event::DisableMasterKeyResult const& e)
{
JLOGV(
j_.trace(),
"Federator::onEvent",
jv("eventtype", "DisableMasterKeyResult"),
jv("event", e.toJson()));
setAccountSeqMax(getChainType(e.isMainchain_), e.txnSeq_ + 1);
setLastTxnSeqSentMax(getChainType(e.isMainchain_), e.txnSeq_);
setLastTxnSeqConfirmedMax(getChainType(e.isMainchain_), e.txnSeq_);

View File

@@ -61,14 +61,16 @@ class Federator : public std::enable_shared_from_this<Federator>
{
public:
enum ChainType { sideChain, mainChain };
constexpr static size_t numChains = 2;
enum class UnlockMainLoopKey { app, mainChain, sideChain };
constexpr static size_t numUnlockMainLoopKeys = 3;
// These enums are encoded in the transaction. Changing the order will break
// backward compatibility. If a new type is added change txnTypeLast.
enum class TxnType { xChain, refund };
constexpr static std::uint8_t txnTypeLast = 2;
constexpr static size_t numChains = 2;
static constexpr std::uint32_t accountControlTxFee{1000};
private:
@@ -95,6 +97,10 @@ private:
std::array<std::atomic<std::uint32_t>, numChains> lastTxnSeqConfirmed_{
0,
0};
std::array<std::atomic<bool>, numUnlockMainLoopKeys> unlockMainLoopKeys_{
false,
false,
false};
std::shared_ptr<MainchainListener> mainchainListener_;
std::shared_ptr<SidechainListener> sidechainListener_;
@@ -226,7 +232,7 @@ public:
// Don't process any events until the bootstrap has a chance to run
void
unlockMainLoop() EXCLUDES(m_);
unlockMainLoop(UnlockMainLoopKey key) EXCLUDES(m_);
void
addPendingTxnSig(
@@ -286,6 +292,11 @@ public:
addTxToSend(ChainType chain, std::uint32_t seq, STTx const& tx)
EXCLUDES(toSendTxnsM_);
// Set the accountSeq to the max of the current value and the requested
// value. This is done with a lock free algorithm.
void
setAccountSeqMax(ChainType chaintype, std::uint32_t reqValue);
private:
// Two phase init needed for shared_from this.
// Only called from `make_Federator`
@@ -304,11 +315,6 @@ private:
[[nodiscard]] std::optional<STAmount>
toOtherChainAmount(ChainType srcChain, STAmount const& from) const;
// Set the accountSeq to the max of the current value and the requested
// value. This is done with a lock free algorithm.
void
setAccountSeqMax(ChainType chaintype, std::uint32_t reqValue);
// Set the lastTxnSeqSent to the max of the current value and the requested
// value. This is done with a lock free algorithm.
void
@@ -359,8 +365,6 @@ private:
void
onEvent(event::HeartbeatTimer const& e);
void
onEvent(event::StartOfHistoricTransactions const& e);
void
onEvent(event::TicketCreateTrigger const& e);
void
onEvent(event::TicketCreateResult const& e);

View File

@@ -155,21 +155,6 @@ RefundTransferResult::toJson() const
return result;
}
EventType
StartOfHistoricTransactions::eventType() const
{
return EventType::startOfTransactions;
}
Json::Value
StartOfHistoricTransactions::toJson() const
{
Json::Value result{Json::objectValue};
result["eventType"] = "StartOfHistoricTransactions";
result["isMainchain"] = isMainchain_;
return result;
}
EventType
TicketCreateTrigger::eventType() const
{

View File

@@ -49,7 +49,6 @@ enum class EventType {
result,
resultAndTrigger,
heartbeat,
startOfTransactions
};
// A cross chain transfer was detected on this federator
@@ -133,18 +132,6 @@ struct RefundTransferResult
toJson() const;
};
// The start of historic transactions has been reached
struct StartOfHistoricTransactions
{
bool isMainchain_;
EventType
eventType() const;
Json::Value
toJson() const;
};
struct TicketCreateTrigger
{
Dir dir_;
@@ -253,7 +240,6 @@ using FederatorEvent = std::variant<
event::HeartbeatTimer,
event::XChainTransferResult,
event::RefundTransferResult,
event::StartOfHistoricTransactions,
event::TicketCreateTrigger,
event::TicketCreateResult,
event::DepositAuthResult,

View File

@@ -70,17 +70,6 @@ ChainListener::chainName() const
}
namespace detail {
// consider making this available as a general utility
// Run a lambda on scope exit, unless the `reset` function is called.
template <class F>
[[nodiscard]] inline auto
make_scope(F f)
{
static int dummy = 0;
auto d = [f = std::move(f)](auto) { f(); };
return std::unique_ptr<int, decltype(d)>{&dummy, std::move(d)};
}
template <class T>
std::optional<T>
getMemoData(Json::Value const& v, std::uint32_t index) = delete;
@@ -215,51 +204,6 @@ ChainListener::processMessage(Json::Value const& msg)
return f.asString() == toMatch;
};
bool const isLastInHistory = [&msg] {
if (msg.isMember(jss::account_history_tx_last))
return msg[jss::account_history_tx_last].asBool();
return false;
}();
// no matter how this function exits, run this code. It handles the
// "lastInHistory" condition. It is difficult to see how to property
// annotate `make_scope` for clang's thread safety analysis, so it is
// skipped.
auto onExit = detail::make_scope([&]() NO_THREAD_SAFETY_ANALYSIS {
if (!isLastInHistory)
return;
if (initialSync_ && isLastInHistory)
{
event::StartOfHistoricTransactions e{isMainchain_};
auto const hasReplayed = initialSync_->onEvent(std::move(e));
if (hasReplayed)
initialSync_.reset();
JLOGV(
j_.trace(),
"sent start of historic transactions event",
jv("isMainchain", isMainchain_),
jv("hasReplayed", hasReplayed));
}
else
{
// Note this branch is needed since the other chain's
// "setNoLastXChainTxnWithResult" may reset the initialSync object.
if (auto f = federator_.lock())
{
// Inform the other sync object that the last transaction
// with a result was found. Note that if start of historic
// transactions is found while listening to the mainchain, the
// _sidechain_ listener needs to be informed that there is no
// last cross chain transaction with result.
Federator::ChainType const chainType =
getChainType(!isMainchain_);
f->setNoLastXChainTxnWithResult(chainType);
}
}
});
TER const txnTER = [&msg] {
return TER::fromInt(msg[jss::engine_result_code].asInt());
}();

View File

@@ -26,6 +26,7 @@
#include <ripple/json/json_writer.h>
#include <type_traits>
#include <variant>
namespace ripple {
namespace sidechain {
@@ -198,7 +199,7 @@ InitialSync::replay(std::lock_guard<std::mutex> const& l)
j_.trace(),
"InitialSync replay, remove trigger event from pendingEvents_",
jv("chain_name", (isMainchain_ ? "Mainchain" : "Sidechain")),
jv("txnHash", *txnHash));
jv("txn", toJson(i->second)));
if (*lastXChainTxnWithResult_ == *txnHash)
{
matched = true;
@@ -208,7 +209,7 @@ InitialSync::replay(std::lock_guard<std::mutex> const& l)
assert(matched);
if (matched)
{
for (auto i : toRemoveTrigger)
for (auto const& i : toRemoveTrigger)
{
if (auto ticketResult = std::get_if<event::TicketCreateResult>(
&(pendingEvents_.erase(i, i)->second));
@@ -217,21 +218,57 @@ InitialSync::replay(std::lock_guard<std::mutex> const& l)
ticketResult->removeTrigger();
}
}
for (auto i = toRemove.begin(), e = toRemove.end(); i != e; ++i)
for (auto const& i : toRemove)
{
pendingEvents_.erase(*i);
pendingEvents_.erase(i);
}
}
}
if (disableMasterKeySeq_)
{
// Remove trigger events that come beofre disableMasterKeySeq_
std::vector<decltype(pendingEvents_)::const_iterator> toRemove;
toRemove.reserve(pendingEvents_.size());
for (auto i = pendingEvents_.cbegin(), e = pendingEvents_.cend();
i != e;
++i)
{
if (std::holds_alternative<event::DisableMasterKeyResult>(
i->second))
break;
if (eventType(i->second) != event::EventType::trigger)
continue;
toRemove.push_back(i);
}
for (auto const& i : toRemove)
{
pendingEvents_.erase(i);
}
}
if (auto f = federator_.lock())
{
for (auto&& [_, e] : pendingEvents_)
{
JLOGV(
j_.trace(),
"InitialSync replay, pushing event",
jv("chain_name", (isMainchain_ ? "Mainchain" : "Sidechain")),
jv("txn", toJson(e)));
f->push(std::move(e));
}
}
seenTriggeringTxns_.clear();
pendingEvents_.clear();
if (auto f = federator_.lock())
{
auto const key = isMainchain_ ? Federator::UnlockMainLoopKey::mainChain
: Federator::UnlockMainLoopKey::sideChain;
f->unlockMainLoop(key);
}
done();
}
@@ -338,12 +375,6 @@ InitialSync::onEvent(event::DisableMasterKeyResult&& e)
{
std::lock_guard l{m_};
if (hasReplayed_)
{
assert(0);
return hasReplayed_;
}
JLOGV(
j_.trace(),
"InitialSync onDisableMasterKeyResultEvent",
@@ -351,6 +382,39 @@ InitialSync::onEvent(event::DisableMasterKeyResult&& e)
assert(!disableMasterKeySeq_);
disableMasterKeySeq_ = e.txnSeq_;
if (lastXChainTxnWithResult_)
LogicError("Initial sync could not find historic XChain transaction");
if (needsOtherChainLastXChainTxn_)
{
if (auto f = federator_.lock())
{
// Inform the other sync object that the last transaction
// with a result was found. Note that if start of historic
// transactions is found while listening to the mainchain, the
// _sidechain_ listener needs to be informed that there is no last
// cross chain transaction with result.
Federator::ChainType const chainType = getChainType(!isMainchain_);
f->setNoLastXChainTxnWithResult(chainType);
}
needsOtherChainLastXChainTxn_ = false;
}
if (hasReplayed_)
{
assert(0);
return hasReplayed_;
}
if (auto f = federator_.lock())
{
// Set the account sequence right away. Otherwise when replaying, a
// triggering transaction from the main chain can be replayed before the
// disable master key event on the side chain, and the sequence number
// will be wrong.
f->setAccountSeqMax(getChainType(e.isMainchain_), e.txnSeq_ + 1);
}
pendingEvents_[e.rpcOrder_] = std::move(e);
if (canReplay(l))
@@ -449,39 +513,6 @@ InitialSync::onEvent(event::RefundTransferResult&& e)
return hasReplayed_;
}
bool
InitialSync::onEvent(event::StartOfHistoricTransactions&& e)
{
std::lock_guard l{m_};
if (lastXChainTxnWithResult_)
LogicError("Initial sync could not find historic XChain transaction");
if (needsOtherChainLastXChainTxn_)
{
if (auto f = federator_.lock())
{
// Inform the other sync object that the last transaction
// with a result was found. Note that if start of historic
// transactions is found while listening to the mainchain, the
// _sidechain_ listener needs to be informed that there is no last
// cross chain transaction with result.
Federator::ChainType const chainType = getChainType(!isMainchain_);
f->setNoLastXChainTxnWithResult(chainType);
}
needsOtherChainLastXChainTxn_ = false;
}
acquiringHistoricData_ = false;
needsOtherChainLastXChainTxn_ = false;
if (canReplay(l))
{
replay(l);
}
return hasReplayed_;
}
namespace detail {
Json::Value

View File

@@ -167,8 +167,6 @@ public:
// Return `hasReplayed_`. This is used to determine if events should
// continue to be routed to this object. Once replayed, events can be
// processed normally.
[[nodiscard]] bool
onEvent(event::StartOfHistoricTransactions&& e) EXCLUDES(m_);
// Return `hasReplayed_`.
[[nodiscard]] bool
onEvent(event::TicketCreateTrigger&& e) EXCLUDES(m_);

View File

@@ -67,7 +67,10 @@ MainchainListener::onMessage(Json::Value const& msg)
if (callbackOpt)
{
JLOG(j_.trace()) << "Mainchain onMessage, reply to a callback: " << msg;
JLOGV(
j_.trace(),
"Mainchain onMessage, reply to a callback",
jv("msg", msg));
assert(msg.isMember(jss::result));
(*callbackOpt)(msg[jss::result]);
}
@@ -121,8 +124,8 @@ void
MainchainListener::stopHistoricalTxns()
{
Json::Value params;
params[jss::stop_history_tx_only] = true;
params[jss::account_history_tx_stream] = Json::objectValue;
params[jss::account_history_tx_stream][jss::stop_history_tx_only] = true;
params[jss::account_history_tx_stream][jss::account] = doorAccountStr_;
send("unsubscribe", params);
}

View File

@@ -22,6 +22,7 @@
#include <ripple/basics/Log.h>
#include <ripple/json/Output.h>
#include <ripple/json/json_reader.h>
#include <ripple/json/json_writer.h>
#include <ripple/json/to_string.h>
#include <ripple/protocol/jss.h>
#include <ripple/server/Port.h>
@@ -152,11 +153,12 @@ WebsocketClient::onReadMsg(error_code const& ec)
return;
}
Json::Value jv;
Json::Value jval;
Json::Reader jr;
jr.parse(buffer_string(rb_.data()), jv);
jr.parse(buffer_string(rb_.data()), jval);
rb_.consume(rb_.size());
callback_(jv);
JLOGV(j_.trace(), "WebsocketClient::onReadMsg", jv("msg", jval));
callback_(jval);
std::lock_guard l{m_};
ws_.async_read(

View File

@@ -124,6 +124,11 @@ parseGenericSeed(std::string const& str);
std::string
seedAs1751(Seed const& seed);
/** ripple-lib encodes seeds used to generate an Ed25519 wallet in a
* non-standard way. */
std::optional<Seed>
parseRippleLibSeed(std::string const& s);
/** Format a seed as a Base58 string */
inline std::string
toBase58(Seed const& seed)

View File

@@ -135,4 +135,17 @@ seedAs1751(Seed const& seed)
return encodedKey;
}
std::optional<Seed>
parseRippleLibSeed(std::string const& s)
{
auto const result = decodeBase58Token(s, TokenType::None);
if (result.size() == 18 &&
static_cast<std::uint8_t>(result[0]) == std::uint8_t(0xE1) &&
static_cast<std::uint8_t>(result[1]) == std::uint8_t(0x4B))
return Seed(makeSlice(result.substr(2)));
return std::nullopt;
}
} // namespace ripple

View File

@@ -683,14 +683,7 @@ parseRippleLibSeed(Json::Value const& value)
if (!value.isString())
return std::nullopt;
auto const result = decodeBase58Token(value.asString(), TokenType::None);
if (result.size() == 18 &&
static_cast<std::uint8_t>(result[0]) == std::uint8_t(0xE1) &&
static_cast<std::uint8_t>(result[1]) == std::uint8_t(0x4B))
return Seed(makeSlice(result.substr(2)));
return std::nullopt;
return ripple::parseRippleLibSeed(value.asString());
}
std::optional<Seed>