Support UNLs with future effective dates:

* Creates a version 2 of the UNL file format allowing publishers to
  pre-publish the next UNL while the current one is still valid.
* Version 1 of the UNL file format is still valid and backward
  compatible.
* Also causes rippled to lock down if it has no valid UNLs, similar to
  being amendment blocked, except reversible.
* Resolves #3548
* Resolves #3470
This commit is contained in:
Edward Hennis
2020-09-09 18:51:08 -04:00
parent 54da532ace
commit 4b9d3ca7de
31 changed files with 3980 additions and 932 deletions

View File

@@ -949,7 +949,7 @@ RCLConsensus::Adaptor::preStartRound(
// and are not amendment blocked.
validating_ = valPublic_.size() != 0 &&
prevLgr.seq() >= app_.getMaxDisallowedLedger() &&
!app_.getOPs().isAmendmentBlocked();
!app_.getOPs().isBlocked();
// If we are not running in standalone mode and there's a configured UNL,
// check to make sure that it's not expired.

View File

@@ -352,7 +352,7 @@ LedgerMaster::setValidLedger(std::shared_ptr<Ledger const> const& l)
app_.getSHAMapStore().onLedgerClosed(getValidatedLedger());
mLedgerHistory.validatedLedger(l, consensusHash);
app_.getAmendmentTable().doValidatedLedger(l);
if (!app_.getOPs().isAmendmentBlocked())
if (!app_.getOPs().isBlocked())
{
if (app_.getAmendmentTable().hasUnsupportedEnabled())
{

View File

@@ -2117,6 +2117,18 @@ ApplicationImp::serverOkay(std::string& reason)
return false;
}
if (getOPs().isAmendmentBlocked())
{
reason = "Server version too old";
return false;
}
if (getOPs().isUNLBlocked())
{
reason = "No valid validator list available";
return false;
}
if (getOPs().getOperatingMode() < OperatingMode::SYNCING)
{
reason = "Not synchronized with network";
@@ -2132,12 +2144,6 @@ ApplicationImp::serverOkay(std::string& reason)
return false;
}
if (getOPs().isAmendmentBlocked())
{
reason = "Server version too old";
return false;
}
return true;
}

View File

@@ -261,19 +261,13 @@ public:
public:
OperatingMode
getOperatingMode() const override
{
return mMode;
}
getOperatingMode() const override;
std::string
strOperatingMode(OperatingMode const mode, bool const admin) const override;
std::string
strOperatingMode(bool const admin = false) const override
{
return strOperatingMode(mMode, admin);
}
strOperatingMode(bool const admin = false) const override;
//
// Transaction operations.
@@ -391,10 +385,7 @@ public:
void
endConsensus() override;
void
setStandAlone() override
{
setMode(OperatingMode::FULL);
}
setStandAlone() override;
/** Called to initially start our timers.
Not called for stand-alone mode.
@@ -403,51 +394,35 @@ public:
setStateTimer() override;
void
setNeedNetworkLedger() override
{
needNetworkLedger_ = true;
}
setNeedNetworkLedger() override;
void
clearNeedNetworkLedger() override
{
needNetworkLedger_ = false;
}
clearNeedNetworkLedger() override;
bool
isNeedNetworkLedger() override
{
return needNetworkLedger_;
}
isNeedNetworkLedger() override;
bool
isFull() override
{
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
}
isFull() override;
void
setMode(OperatingMode om) override;
bool
isAmendmentBlocked() override
{
return amendmentBlocked_;
}
isBlocked() override;
bool
isAmendmentBlocked() override;
void
setAmendmentBlocked() override;
bool
isAmendmentWarned() override
{
return !amendmentBlocked_ && amendmentWarned_;
}
isAmendmentWarned() override;
void
setAmendmentWarned() override
{
amendmentWarned_ = true;
}
setAmendmentWarned() override;
void
clearAmendmentWarned() override
{
amendmentWarned_ = false;
}
clearAmendmentWarned() override;
bool
isUNLBlocked() override;
void
setUNLBlocked() override;
void
clearUNLBlocked() override;
void
consensusViewChange() override;
@@ -470,15 +445,9 @@ public:
reportConsensusStateChange(ConsensusPhase phase);
void
updateLocalTx(ReadView const& view) override
{
m_localTX->sweep(view);
}
updateLocalTx(ReadView const& view) override;
std::size_t
getLocalTxCount() override
{
return m_localTX->size();
}
getLocalTxCount() override;
// Helper function to generate SQL query to get transactions.
std::string
@@ -638,34 +607,7 @@ public:
// Stoppable.
void
onStop() override
{
mAcquiringLedger.reset();
{
boost::system::error_code ec;
heartbeatTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: "
<< ec.message();
}
ec.clear();
clusterTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: "
<< ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;
waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
stopped();
}
onStop() override;
private:
void
@@ -720,6 +662,7 @@ private:
std::atomic<bool> needNetworkLedger_{false};
std::atomic<bool> amendmentBlocked_{false};
std::atomic<bool> amendmentWarned_{false};
std::atomic<bool> unlBlocked_{false};
ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
boost::asio::steady_timer heartbeatTimer_;
@@ -827,47 +770,7 @@ private:
private:
void
collect_metrics()
{
auto [counters, mode, start] = accounting_.getCounterData();
auto const current =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
std::lock_guard lock(m_statsMutex);
m_stats.disconnected_duration.set(
counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.dur.count());
m_stats.connected_duration.set(
counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
.dur.count());
m_stats.syncing_duration.set(
counters[static_cast<std::size_t>(OperatingMode::SYNCING)]
.dur.count());
m_stats.tracking_duration.set(
counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
.dur.count());
m_stats.full_duration.set(
counters[static_cast<std::size_t>(OperatingMode::FULL)]
.dur.count());
m_stats.disconnected_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions);
m_stats.connected_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
.transitions);
m_stats.syncing_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::SYNCING)]
.transitions);
m_stats.tracking_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
.transitions);
m_stats.full_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::FULL)]
.transitions);
}
collect_metrics();
};
//------------------------------------------------------------------------------
@@ -886,6 +789,48 @@ std::array<Json::StaticString const, 5> const
Json::StaticString(stateNames[4])}};
//------------------------------------------------------------------------------
inline OperatingMode
NetworkOPsImp::getOperatingMode() const
{
return mMode;
}
inline std::string
NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
{
return strOperatingMode(mMode, admin);
}
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL);
}
inline void
NetworkOPsImp::setNeedNetworkLedger()
{
needNetworkLedger_ = true;
}
inline void
NetworkOPsImp::clearNeedNetworkLedger()
{
needNetworkLedger_ = false;
}
inline bool
NetworkOPsImp::isNeedNetworkLedger()
{
return needNetworkLedger_;
}
inline bool
NetworkOPsImp::isFull()
{
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
}
std::string
NetworkOPsImp::getHostId(bool forAdmin)
{
@@ -1554,11 +1499,60 @@ NetworkOPsImp::getOwnerInfo(
// Other
//
inline bool
NetworkOPsImp::isBlocked()
{
return isAmendmentBlocked() || isUNLBlocked();
}
inline bool
NetworkOPsImp::isAmendmentBlocked()
{
return amendmentBlocked_;
}
void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::TRACKING);
setMode(OperatingMode::CONNECTED);
}
inline bool
NetworkOPsImp::isAmendmentWarned()
{
return !amendmentBlocked_ && amendmentWarned_;
}
inline void
NetworkOPsImp::setAmendmentWarned()
{
amendmentWarned_ = true;
}
inline void
NetworkOPsImp::clearAmendmentWarned()
{
amendmentWarned_ = false;
}
inline bool
NetworkOPsImp::isUNLBlocked()
{
return unlBlocked_;
}
void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED);
}
inline void
NetworkOPsImp::clearUNLBlocked()
{
unlBlocked_ = false;
}
bool
@@ -1752,7 +1746,11 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
if (prevLedger->rules().enabled(featureNegativeUNL))
app_.validators().setNegativeUNL(prevLedger->negativeUNL());
TrustChanges const changes = app_.validators().updateTrusted(
app_.getValidations().getCurrentNodeIDs());
app_.getValidations().getCurrentNodeIDs(),
closingInfo.parentCloseTime,
*this,
app_.overlay(),
app_.getHashRouter());
if (!changes.added.empty() || !changes.removed.empty())
app_.getValidations().trustChanged(changes.added, changes.removed);
@@ -2154,8 +2152,8 @@ NetworkOPsImp::setMode(OperatingMode om)
om = OperatingMode::CONNECTED;
}
if ((om > OperatingMode::TRACKING) && amendmentBlocked_)
om = OperatingMode::TRACKING;
if ((om > OperatingMode::CONNECTED) && isBlocked())
om = OperatingMode::CONNECTED;
if (mMode == om)
return;
@@ -2505,6 +2503,15 @@ NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
"This server is amendment blocked, and must be updated to be "
"able to stay in sync with the network.";
}
if (isUNLBlocked())
{
Json::Value& w = warnings.append(Json::objectValue);
w[jss::id] = warnRPC_EXPIRED_VALIDATOR_LIST;
w[jss::message] =
"This server has an expired validator list. validators.txt "
"may be incorrectly configured or some [validator_list_sites] "
"may be unreachable.";
}
if (admin && isAmendmentWarned())
{
Json::Value& w = warnings.append(Json::objectValue);
@@ -2948,6 +2955,17 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase)
[this, phase](Job&) { pubConsensus(phase); });
}
inline void
NetworkOPsImp::updateLocalTx(ReadView const& view)
{
m_localTX->sweep(view);
}
inline std::size_t
NetworkOPsImp::getLocalTxCount()
{
return m_localTX->size();
}
// This routine should only be used to publish accepted or validated
// transactions.
Json::Value
@@ -3498,6 +3516,34 @@ NetworkOPsImp::tryRemoveRpcSub(std::string const& strUrl)
return true;
}
void
NetworkOPsImp::onStop()
{
mAcquiringLedger.reset();
{
boost::system::error_code ec;
heartbeatTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: " << ec.message();
}
ec.clear();
clusterTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: " << ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;
waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
stopped();
}
#ifndef USE_NEW_BOOK_PAGE
// NIKB FIXME this should be looked at. There's no reason why this shouldn't
@@ -3855,6 +3901,44 @@ NetworkOPsImp::getBookPage(
#endif
inline void
NetworkOPsImp::collect_metrics()
{
auto [counters, mode, start] = accounting_.getCounterData();
auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
std::lock_guard lock(m_statsMutex);
m_stats.disconnected_duration.set(
counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.dur.count());
m_stats.connected_duration.set(
counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
.dur.count());
m_stats.syncing_duration.set(
counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
m_stats.tracking_duration.set(
counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
.dur.count());
m_stats.full_duration.set(
counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
m_stats.disconnected_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions);
m_stats.connected_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
.transitions);
m_stats.syncing_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
m_stats.tracking_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
.transitions);
m_stats.full_transitions.set(
counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
}
//------------------------------------------------------------------------------
NetworkOPs::NetworkOPs(Stoppable& parent)

View File

@@ -201,6 +201,8 @@ public:
virtual void
setMode(OperatingMode om) = 0;
virtual bool
isBlocked() = 0;
virtual bool
isAmendmentBlocked() = 0;
virtual void
setAmendmentBlocked() = 0;
@@ -210,6 +212,12 @@ public:
setAmendmentWarned() = 0;
virtual void
clearAmendmentWarned() = 0;
virtual bool
isUNLBlocked() = 0;
virtual void
setUNLBlocked() = 0;
virtual void
clearUNLBlocked() = 0;
virtual void
consensusViewChange() = 0;

View File

@@ -26,6 +26,7 @@
#include <ripple/core/TimeKeeper.h>
#include <ripple/crypto/csprng.h>
#include <ripple/json/json_value.h>
#include <ripple/overlay/Message.h>
#include <ripple/protocol/PublicKey.h>
#include <boost/iterator/counting_iterator.hpp>
#include <boost/range/adaptors.hpp>
@@ -34,33 +35,70 @@
#include <numeric>
#include <shared_mutex>
namespace protocol {
class TMValidatorList;
class TMValidatorListCollection;
} // namespace protocol
namespace ripple {
// predeclaration
class Overlay;
class HashRouter;
class Message;
class NetworkOPs;
class Peer;
class STValidation;
/* Entries in this enum are ordered by "desirability".
The "better" dispositions have lower values than the
"worse" dispositions */
enum class ListDisposition {
/// List is valid
accepted = 0,
/// List is expired, but has the largest non-pending sequence seen so far
expired,
/// List will be valid in the future
pending,
/// Same sequence as current list
same_sequence,
/// List version is not supported
unsupported_version,
/// List signed by untrusted publisher key
untrusted,
/// Future sequence already seen
known_sequence,
/// Trusted publisher key, but seq is too old
stale,
/// List signed by untrusted publisher key
untrusted,
/// List version is not supported
unsupported_version,
/// Invalid format or signature
invalid
};
/* Entries in this enum are ordered by "desirability".
The "better" dispositions have lower values than the
"worse" dispositions */
enum class PublisherStatus {
// Publisher has provided a valid file
available = 0,
// Current list is expired without replacement
expired,
// No file seen yet
unavailable,
// Publisher has revoked their manifest key
revoked,
};
std::string
to_string(ListDisposition disposition);
@@ -74,6 +112,18 @@ struct TrustChanges
hash_set<NodeID> removed;
};
/** Used to represent the information stored in the blobs_v2 Json array */
struct ValidatorBlobInfo
{
// base-64 encoded JSON containing the validator list.
std::string blob;
// hex-encoded signature of the blob using the publisher's signing key
std::string signature;
// base-64 or hex-encoded manifest containing the publisher's master and
// signing public keys
boost::optional<std::string> manifest;
};
/**
Trusted Validators List
-----------------------
@@ -90,12 +140,14 @@ struct TrustChanges
New lists are expected to include the following data:
@li @c "blob": Base64-encoded JSON string containing a @c "sequence", @c
"expiration", and @c "validators" field. @c "expiration" contains the
Ripple timestamp (seconds since January 1st, 2000 (00:00 UTC)) for when
the list expires. @c "validators" contains an array of objects with a
@c "validation_public_key" and optional @c "manifest" field.
@c "validation_public_key" should be the hex-encoded master public key.
@c "manifest" should be the base64-encoded validator manifest.
"validFrom", @c "validUntil", and @c "validators" field. @c "validFrom"
contains the Ripple timestamp (seconds since January 1st, 2000 (00:00
UTC)) for when the list becomes valid. @c "validUntil" contains the
Ripple timestamp for when the list expires. @c "validators" contains
an array of objects with a @c "validation_public_key" and optional
@c "manifest" field. @c "validation_public_key" should be the
hex-encoded master public key. @c "manifest" should be the
base64-encoded validator manifest.
@li @c "manifest": Base64-encoded serialization of a manifest containing the
publisher's master and signing public keys.
@@ -123,32 +175,66 @@ class ValidatorList
{
explicit PublisherList() = default;
bool available;
std::vector<PublicKey> list;
std::vector<std::string> manifests;
std::size_t sequence;
TimeKeeper::time_point expiration;
TimeKeeper::time_point validFrom;
TimeKeeper::time_point validUntil;
std::string siteUri;
std::string rawManifest;
// base-64 encoded JSON containing the validator list.
std::string rawBlob;
// hex-encoded signature of the blob using the publisher's signing key
std::string rawSignature;
std::uint32_t rawVersion;
// base-64 or hex-encoded manifest containing the publisher's master and
// signing public keys
boost::optional<std::string> rawManifest;
uint256 hash;
};
struct PublisherListCollection
{
PublisherStatus status;
/*
The `current` VL is the one which
1. Has the largest sequence number that
2. Has ever been effective (the effective date is absent or in the
past).
If this VL has expired, all VLs with previous sequence numbers
will also be considered expired, and thus there will be no valid VL
until one with a larger sequence number becomes effective. This is to
prevent allowing old VLs to reactivate.
*/
PublisherList current;
/*
The `remaining` list holds any relevant VLs which have a larger sequence
number than current. By definition they will all have an effective date
in the future. Relevancy will be determined by sorting the VLs by
sequence number, then iterating over the list and removing any VLs for
which the following VL (ignoring gaps) has the same or earlier effective
date.
*/
std::map<std::size_t, PublisherList> remaining;
boost::optional<std::size_t> maxSequence;
// The hash of the full set if sent in a single message
uint256 fullHash;
std::string rawManifest;
std::uint32_t rawVersion = 0;
};
ManifestCache& validatorManifests_;
ManifestCache& publisherManifests_;
TimeKeeper& timeKeeper_;
boost::filesystem::path const dataPath_;
beast::Journal const j_;
boost::shared_mutex mutable mutex_;
using unique_lock = std::unique_lock<boost::shared_mutex>;
using shared_lock = std::shared_lock<boost::shared_mutex>;
using lock_guard = std::lock_guard<decltype(mutex_)>;
using shared_lock = std::shared_lock<decltype(mutex_)>;
std::atomic<std::size_t> quorum_;
boost::optional<std::size_t> minimumQuorum_;
// Published lists stored by publisher master public key
hash_map<PublicKey, PublisherList> publisherLists_;
hash_map<PublicKey, PublisherListCollection> publisherLists_;
// Listed master public keys with the number of lists they appear on
hash_map<PublicKey, std::size_t> keyListings_;
@@ -166,8 +252,12 @@ class ValidatorList
// The master public keys of the current negative UNL
hash_set<PublicKey> negativeUNL_;
// Currently supported version of publisher list format
static constexpr std::uint32_t requiredListVersion = 1;
// Currently supported versions of publisher list format
static constexpr std::uint32_t supportedListVersions[]{1, 2};
// In the initial release, to prevent potential abuse and attacks, any VL
// collection with more than 5 entries will be considered malformed.
static constexpr std::size_t maxSupportedBlobs = 5;
// Prefix of the file name used to store cache files.
static const std::string filePrefix_;
public:
@@ -187,35 +277,51 @@ public:
*/
struct PublisherListStats
{
explicit PublisherListStats(ListDisposition d) : disposition(d)
{
}
explicit PublisherListStats() = default;
explicit PublisherListStats(ListDisposition d);
PublisherListStats(
ListDisposition d,
PublicKey key,
bool avail,
std::size_t seq)
: disposition(d), publisherKey(key), available(avail), sequence(seq)
{
}
PublisherStatus stat,
std::size_t seq);
ListDisposition disposition;
ListDisposition
bestDisposition() const;
ListDisposition
worstDisposition() const;
void
mergeDispositions(PublisherListStats const& src);
// Tracks the dispositions of each processed list and how many times it
// occurred
std::map<ListDisposition, std::size_t> dispositions;
boost::optional<PublicKey> publisherKey;
bool available = false;
boost::optional<std::size_t> sequence;
PublisherStatus status = PublisherStatus::unavailable;
std::size_t sequence = 0;
};
struct MessageWithHash
{
explicit MessageWithHash() = default;
explicit MessageWithHash(
std::shared_ptr<Message> const& message_,
uint256 hash_,
std::size_t num_);
std::shared_ptr<Message> message;
uint256 hash;
std::size_t numVLs = 0;
};
/** Load configured trusted keys.
@param localSigningKey This node's validation public key
@param configKeys List of trusted keys from config. Each entry consists
of a base58 encoded validation public key, optionally followed by a
comment.
@param configKeys List of trusted keys from config. Each entry
consists of a base58 encoded validation public key, optionally followed
by a comment.
@param publisherKeys List of trusted publisher public keys. Each entry
contains a base58 encoded account public key.
@param publisherKeys List of trusted publisher public keys. Each
entry contains a base58 encoded account public key.
@par Thread Safety
@@ -229,17 +335,53 @@ public:
std::vector<std::string> const& configKeys,
std::vector<std::string> const& publisherKeys);
/** Apply published list of public keys, then broadcast it to all
/** Pull the blob/signature/manifest information out of the appropriate Json
body fields depending on the version.
@return An empty vector indicates malformed Json.
*/
static std::vector<ValidatorBlobInfo>
parseBlobs(std::uint32_t version, Json::Value const& body);
static std::vector<ValidatorBlobInfo>
parseBlobs(protocol::TMValidatorList const& body);
static std::vector<ValidatorBlobInfo>
parseBlobs(protocol::TMValidatorListCollection const& body);
static void
sendValidatorList(
Peer& peer,
std::uint64_t peerSequence,
PublicKey const& publisherKey,
std::size_t maxSequence,
std::uint32_t rawVersion,
std::string const& rawManifest,
std::map<std::size_t, ValidatorBlobInfo> const& blobInfos,
HashRouter& hashRouter,
beast::Journal j);
[[nodiscard]] static std::pair<std::size_t, std::size_t>
buildValidatorListMessages(
std::size_t messageVersion,
std::uint64_t peerSequence,
std::size_t maxSequence,
std::uint32_t rawVersion,
std::string const& rawManifest,
std::map<std::size_t, ValidatorBlobInfo> const& blobInfos,
std::vector<MessageWithHash>& messages,
std::size_t maxSize = maximiumMessageSize);
/** Apply multiple published lists of public keys, then broadcast it to all
peers that have not seen it or sent it.
@param manifest base64-encoded publisher key manifest
@param blob base64-encoded json containing published validator list
@param signature Signature of the decoded blob
@param version Version of published list format
@param blobs Vector of BlobInfos representing one or more encoded
validator lists and signatures (and optional manifests)
@param siteUri Uri of the site from which the list was validated
@param hash Hash of the data parameters
@@ -249,6 +391,9 @@ public:
@param hashRouter HashRouter object which will determine which
peers not to send to
@param networkOPs NetworkOPs object which will be informed if there
is a valid VL
@return `ListDisposition::accepted`, plus some of the publisher
information, if list was successfully applied
@@ -257,30 +402,28 @@ public:
May be called concurrently
*/
PublisherListStats
applyListAndBroadcast(
applyListsAndBroadcast(
std::string const& manifest,
std::string const& blob,
std::string const& signature,
std::uint32_t version,
std::vector<ValidatorBlobInfo> const& blobs,
std::string siteUri,
uint256 const& hash,
Overlay& overlay,
HashRouter& hashRouter);
HashRouter& hashRouter,
NetworkOPs& networkOPs);
/** Apply published list of public keys
/** Apply multiple published lists of public keys.
@param manifest base64-encoded publisher key manifest
@param blob base64-encoded json containing published validator list
@param signature Signature of the decoded blob
@param version Version of published list format
@param blobs Vector of BlobInfos representing one or more encoded
validator lists and signatures (and optional manifests)
@param siteUri Uri of the site from which the list was validated
@param hash Optional hash of the data parameters.
Defaults to uninitialized
@param hash Optional hash of the data parameters
@return `ListDisposition::accepted`, plus some of the publisher
information, if list was successfully applied
@@ -290,11 +433,10 @@ public:
May be called concurrently
*/
PublisherListStats
applyList(
applyLists(
std::string const& manifest,
std::string const& blob,
std::string const& signature,
std::uint32_t version,
std::vector<ValidatorBlobInfo> const& blobs,
std::string siteUri,
boost::optional<uint256> const& hash = {});
@@ -326,7 +468,12 @@ public:
May be called concurrently
*/
TrustChanges
updateTrusted(hash_set<NodeID> const& seenValidators);
updateTrusted(
hash_set<NodeID> const& seenValidators,
NetClock::time_point closeTime,
NetworkOPs& ops,
Overlay& overlay,
HashRouter& hashRouter);
/** Get quorum value for current trusted key set
@@ -461,20 +608,22 @@ public:
May be called concurrently
*/
void
for_each_available(std::function<void(
for_each_available(
std::function<void(
std::string const& manifest,
std::string const& blob,
std::string const& signature,
std::uint32_t version,
std::map<std::size_t, ValidatorBlobInfo> const& blobInfos,
PublicKey const& pubKey,
std::size_t sequence,
std::size_t maxSequence,
uint256 const& hash)> func) const;
/** Returns the current valid list for the given publisher key,
if available, as a Json object.
*/
boost::optional<Json::Value>
getAvailable(boost::beast::string_view const& pubKey);
getAvailable(
boost::beast::string_view const& pubKey,
boost::optional<std::uint32_t> forceVersion = {});
/** Return the number of configured validator list sites. */
std::size_t
@@ -483,7 +632,7 @@ public:
/** Return the time when the validator list will expire
@note This may be a time in the past if a published list has not
been updated since its expiration. It will be boost::none if any
been updated since its validUntil. It will be boost::none if any
configured published list has not been fetched.
@par Thread Safety
@@ -584,18 +733,113 @@ private:
boost::optional<TimeKeeper::time_point>
expires(shared_lock const&) const;
/** Apply published list of public keys
@param manifest base64-encoded publisher key manifest
@param blob base64-encoded json containing published validator list
@param signature Signature of the decoded blob
@param version Version of published list format
@param siteUri Uri of the site from which the list was validated
@param hash Optional hash of the data parameters.
Defaults to uninitialized
@return `ListDisposition::accepted`, plus some of the publisher
information, if list was successfully applied
@par Thread Safety
May be called concurrently
*/
PublisherListStats
applyList(
std::string const& globalManifest,
boost::optional<std::string> const& localManifest,
std::string const& blob,
std::string const& signature,
std::uint32_t version,
std::string siteUri,
boost::optional<uint256> const& hash,
lock_guard const&);
void
updatePublisherList(
PublicKey const& pubKey,
PublisherList const& current,
std::vector<PublicKey> const& oldList,
lock_guard const&);
static void
buildBlobInfos(
std::map<std::size_t, ValidatorBlobInfo>& blobInfos,
PublisherListCollection const& lists);
static std::map<std::size_t, ValidatorBlobInfo>
buildBlobInfos(PublisherListCollection const& lists);
static void
broadcastBlobs(
PublicKey const& publisherKey,
PublisherListCollection const& lists,
std::size_t maxSequence,
uint256 const& hash,
Overlay& overlay,
HashRouter& hashRouter,
beast::Journal j);
static void
sendValidatorList(
Peer& peer,
std::uint64_t peerSequence,
PublicKey const& publisherKey,
std::size_t maxSequence,
std::uint32_t rawVersion,
std::string const& rawManifest,
std::map<std::size_t, ValidatorBlobInfo> const& blobInfos,
std::vector<MessageWithHash>& messages,
HashRouter& hashRouter,
beast::Journal j);
/** Get the filename used for caching UNLs
*/
boost::filesystem::path
GetCacheFileName(unique_lock const&, PublicKey const& pubKey);
getCacheFileName(lock_guard const&, PublicKey const& pubKey) const;
/** Build a Json representation of the collection, suitable for
writing to a cache file, or serving to a /vl/ query
*/
static Json::Value
buildFileData(
std::string const& pubKey,
PublisherListCollection const& pubCollection,
beast::Journal j);
/** Build a Json representation of the collection, suitable for
writing to a cache file, or serving to a /vl/ query
*/
static Json::Value
buildFileData(
std::string const& pubKey,
PublisherListCollection const& pubCollection,
boost::optional<std::uint32_t> forceVersion,
beast::Journal j);
template <class Hasher>
friend void
hash_append(Hasher& h, PublisherListCollection pl)
{
using beast::hash_append;
hash_append(h, pl.rawManifest, buildBlobInfos(pl), pl.rawVersion);
}
/** Write a JSON UNL to a cache file
*/
void
CacheValidatorFile(
unique_lock const& lock,
PublicKey const& pubKey,
PublisherList const& publisher);
cacheValidatorFile(lock_guard const& lock, PublicKey const& pubKey) const;
/** Check response for trusted valid published list
@@ -607,7 +851,7 @@ private:
*/
ListDisposition
verify(
unique_lock const&,
lock_guard const&,
Json::Value& list,
PublicKey& pubKey,
std::string const& manifest,
@@ -625,7 +869,10 @@ private:
Calling public member function is expected to lock mutex
*/
bool
removePublisherList(unique_lock const&, PublicKey const& publisherKey);
removePublisherList(
lock_guard const&,
PublicKey const& publisherKey,
PublisherStatus reason);
/** Return quorum for trusted validator set
@@ -643,6 +890,63 @@ private:
std::size_t effectiveUnlSize,
std::size_t seenSize);
};
// hashing helpers
template <class Hasher>
void
hash_append(Hasher& h, ValidatorBlobInfo const& blobInfo)
{
using beast::hash_append;
hash_append(h, blobInfo.blob, blobInfo.signature);
if (blobInfo.manifest)
{
hash_append(h, *blobInfo.manifest);
}
}
template <class Hasher>
void
hash_append(Hasher& h, std::vector<ValidatorBlobInfo> const& blobs)
{
for (auto const& item : blobs)
hash_append(h, item);
}
template <class Hasher>
void
hash_append(Hasher& h, std::map<std::size_t, ValidatorBlobInfo> const& blobs)
{
for (auto const& [_, item] : blobs)
{
(void)_;
hash_append(h, item);
}
}
} // namespace ripple
namespace protocol {
template <class Hasher>
void
hash_append(Hasher& h, TMValidatorList const& msg)
{
using beast::hash_append;
hash_append(h, msg.manifest(), msg.blob(), msg.signature(), msg.version());
}
template <class Hasher>
void
hash_append(Hasher& h, TMValidatorListCollection const& msg)
{
using beast::hash_append;
hash_append(
h,
msg.manifest(),
ripple::ValidatorList::parseBlobs(msg),
msg.version());
}
} // namespace protocol
#endif

View File

@@ -48,7 +48,7 @@ namespace ripple {
fields:
@li @c "blob": Base64-encoded JSON string containing a @c "sequence", @c
"expiration", and @c "validators" field. @c "expiration" contains the
"validUntil", and @c "validators" field. @c "validUntil" contains the
Ripple timestamp (seconds since January 1st, 2000 (00:00 UTC)) for when
the list expires. @c "validators" contains an array of objects with a
@c "validation_public_key" and optional @c "manifest" field.

File diff suppressed because it is too large Load Diff

View File

@@ -355,6 +355,7 @@ ValidatorSite::parseJsonResponse(
std::size_t siteIdx,
std::lock_guard<std::mutex>& sites_lock)
{
Json::Value const body = [&res, siteIdx, this]() {
Json::Reader r;
Json::Value body;
if (!r.parse(res.data(), body))
@@ -363,70 +364,107 @@ ValidatorSite::parseJsonResponse(
<< sites_[siteIdx].activeResource->uri;
throw std::runtime_error{"bad json"};
}
return body;
}();
if (!body.isObject() || !body.isMember("blob") ||
!body["blob"].isString() || !body.isMember("manifest") ||
!body["manifest"].isString() || !body.isMember("signature") ||
!body["signature"].isString() || !body.isMember("version") ||
!body["version"].isInt())
auto const [valid, version, blobs] = [&body]() {
// Check the easy fields first
bool valid = body.isObject() && body.isMember(jss::manifest) &&
body[jss::manifest].isString() && body.isMember(jss::version) &&
body[jss::version].isInt();
// Check the version-specific blob & signature fields
std::uint32_t version;
std::vector<ValidatorBlobInfo> blobs;
if (valid)
{
version = body[jss::version].asUInt();
blobs = ValidatorList::parseBlobs(version, body);
valid = !blobs.empty();
}
return std::make_tuple(valid, version, blobs);
}();
if (!valid)
{
JLOG(j_.warn()) << "Missing fields in JSON response from "
<< sites_[siteIdx].activeResource->uri;
throw std::runtime_error{"missing fields"};
}
auto const manifest = body["manifest"].asString();
auto const blob = body["blob"].asString();
auto const signature = body["signature"].asString();
auto const version = body["version"].asUInt();
auto const manifest = body[jss::manifest].asString();
assert(version == body[jss::version].asUInt());
auto const& uri = sites_[siteIdx].activeResource->uri;
auto const hash = sha512Half(manifest, blob, signature, version);
auto const applyResult = app_.validators().applyListAndBroadcast(
auto const hash = sha512Half(manifest, blobs, version);
auto const applyResult = app_.validators().applyListsAndBroadcast(
manifest,
blob,
signature,
version,
blobs,
uri,
hash,
app_.overlay(),
app_.getHashRouter());
auto const disp = applyResult.disposition;
app_.getHashRouter(),
app_.getOPs());
sites_[siteIdx].lastRefreshStatus.emplace(
Site::Status{clock_type::now(), disp, ""});
Site::Status{clock_type::now(), applyResult.bestDisposition(), ""});
for (auto const [disp, count] : applyResult.dispositions)
{
switch (disp)
{
case ListDisposition::accepted:
JLOG(j_.debug()) << "Applied new validator list from " << uri;
JLOG(j_.debug()) << "Applied " << count
<< " new validator list(s) from " << uri;
break;
case ListDisposition::expired:
JLOG(j_.debug()) << "Applied " << count
<< " expired validator list(s) from " << uri;
break;
case ListDisposition::same_sequence:
JLOG(j_.debug())
<< "Validator list with current sequence from " << uri;
<< "Ignored " << count
<< " validator list(s) with current sequence from " << uri;
break;
case ListDisposition::pending:
JLOG(j_.debug()) << "Processed " << count
<< " future validator list(s) from " << uri;
break;
case ListDisposition::known_sequence:
JLOG(j_.debug())
<< "Ignored " << count
<< " validator list(s) with future known sequence from "
<< uri;
break;
case ListDisposition::stale:
JLOG(j_.warn()) << "Stale validator list from " << uri;
JLOG(j_.warn()) << "Ignored " << count
<< "stale validator list(s) from " << uri;
break;
case ListDisposition::untrusted:
JLOG(j_.warn()) << "Untrusted validator list from " << uri;
JLOG(j_.warn()) << "Ignored " << count
<< " untrusted validator list(s) from " << uri;
break;
case ListDisposition::invalid:
JLOG(j_.warn()) << "Invalid validator list from " << uri;
JLOG(j_.warn()) << "Ignored " << count
<< " invalid validator list(s) from " << uri;
break;
case ListDisposition::unsupported_version:
JLOG(j_.warn())
<< "Unsupported version validator list from " << uri;
<< "Ignored " << count
<< " unsupported version validator list(s) from " << uri;
break;
default:
BOOST_ASSERT(false);
}
}
if (body.isMember("refresh_interval") &&
body["refresh_interval"].isNumeric())
if (body.isMember(jss::refresh_interval) &&
body[jss::refresh_interval].isNumeric())
{
using namespace std::chrono_literals;
std::chrono::minutes const refresh = boost::algorithm::clamp(
std::chrono::minutes{body["refresh_interval"].asUInt()}, 1min, 24h);
std::chrono::minutes{body[jss::refresh_interval].asUInt()},
1min,
24h);
sites_[siteIdx].refreshInterval = refresh;
sites_[siteIdx].nextRefresh =
clock_type::now() + sites_[siteIdx].refreshInterval;

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#include <ripple/basics/ByteUtilities.h>
#include <ripple/overlay/Compression.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/messages.h>
@@ -34,6 +35,8 @@
namespace ripple {
constexpr std::size_t maximiumMessageSize = megabytes(64);
// VFALCO NOTE If we forward declare Message and write out shared_ptr
// instead of using the in-class type alias, we can remove the
// entire ripple.pb.h from the main headers.
@@ -68,6 +71,12 @@ public:
std::size_t
getBufferSize();
static std::size_t
messageSize(::google::protobuf::Message const& message);
static std::size_t
totalSize(::google::protobuf::Message const& message);
/** Retrieve the packed message data. If compressed message is requested but
* the message is not compressible then the uncompressed buffer is returned.
* @param compressed Request compressed (Compress::On) or

View File

@@ -37,6 +37,7 @@ static constexpr std::uint32_t csHopLimit = 3;
enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation
};
/** Represents a peer connection in the overlay. */

View File

@@ -32,11 +32,7 @@ Message::Message(
{
using namespace ripple::compression;
#if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000)
auto const messageBytes = message.ByteSizeLong();
#else
unsigned const messageBytes = message.ByteSize();
#endif
auto const messageBytes = messageSize(message);
assert(messageBytes != 0);
@@ -46,6 +42,26 @@ Message::Message(
if (messageBytes != 0)
message.SerializeToArray(buffer_.data() + headerBytes, messageBytes);
assert(getBufferSize() == totalSize(message));
}
// static
std::size_t
Message::messageSize(::google::protobuf::Message const& message)
{
#if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000)
return message.ByteSizeLong();
#else
return message.ByteSize();
#endif
}
// static
std::size_t
Message::totalSize(::google::protobuf::Message const& message)
{
return messageSize(message) + compression::headerBytes;
}
void
@@ -68,6 +84,7 @@ Message::compress()
case protocol::mtLEDGER_DATA:
case protocol::mtGET_OBJECTS:
case protocol::mtVALIDATORLIST:
case protocol::mtVALIDATORLISTCOLLECTION:
return true;
case protocol::mtPING:
case protocol::mtCLUSTER:

View File

@@ -1026,13 +1026,7 @@ OverlayImpl::processValidatorList(
if (!req.target().starts_with(prefix.data()) || !setup_.vlEnabled)
return false;
auto key = req.target().substr(prefix.size());
if (key.empty())
return false;
// find the list
auto vl = app_.validators().getAvailable(key);
std::uint32_t version = 1;
boost::beast::http::response<json_body> msg;
msg.version(req.version());
@@ -1040,25 +1034,53 @@ OverlayImpl::processValidatorList(
msg.insert("Content-Type", "application/json");
msg.insert("Connection", "close");
if (!vl)
{
// 404 not found
msg.result(boost::beast::http::status::not_found);
auto fail = [&msg, &handoff](auto status) {
msg.result(status);
msg.insert("Content-Length", "0");
msg.body() = Json::nullValue;
msg.prepare_payload();
handoff.response = std::make_shared<SimpleWriter>(msg);
return true;
};
auto key = req.target().substr(prefix.size());
if (auto slash = key.find('/'); slash != boost::string_view::npos)
{
auto verString = key.substr(0, slash);
if (!boost::conversion::try_lexical_convert(verString, version))
return fail(boost::beast::http::status::bad_request);
key = key.substr(slash + 1);
}
if (key.empty())
return fail(boost::beast::http::status::bad_request);
// find the list
auto vl = app_.validators().getAvailable(key, version);
if (!vl)
{
// 404 not found
return fail(boost::beast::http::status::not_found);
}
else if (!*vl)
{
return fail(boost::beast::http::status::bad_request);
}
else
{
msg.result(boost::beast::http::status::ok);
msg.body() = *vl;
}
msg.prepare_payload();
handoff.response = std::make_shared<SimpleWriter>(msg);
return true;
}
}
bool
OverlayImpl::processHealth(http_request_type const& req, Handoff& handoff)

View File

@@ -433,6 +433,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
{
case ProtocolFeature::ValidatorListPropagation:
return protocol_ >= make_protocol(2, 1);
case ProtocolFeature::ValidatorList2Propagation:
return protocol_ >= make_protocol(2, 2);
}
return false;
}
@@ -817,28 +819,26 @@ PeerImp::doProtocolStart()
// Send all the validator lists that have been loaded
if (supportsFeature(ProtocolFeature::ValidatorListPropagation))
{
app_.validators().for_each_available([&](std::string const& manifest,
std::string const& blob,
std::string const& signature,
app_.validators().for_each_available(
[&](std::string const& manifest,
std::uint32_t version,
std::map<std::size_t, ValidatorBlobInfo> const& blobInfos,
PublicKey const& pubKey,
std::size_t sequence,
std::size_t maxSequence,
uint256 const& hash) {
protocol::TMValidatorList vl;
ValidatorList::sendValidatorList(
*this,
0,
pubKey,
maxSequence,
version,
manifest,
blobInfos,
app_.getHashRouter(),
p_journal_);
vl.set_manifest(manifest);
vl.set_blob(blob);
vl.set_signature(signature);
vl.set_version(version);
JLOG(p_journal_.debug())
<< "Sending validator list for " << strHex(pubKey)
<< " with sequence " << sequence << " to "
<< remote_address_.to_string() << " (" << id_ << ")";
send(std::make_shared<Message>(vl, protocol::mtVALIDATORLIST));
// Don't send it next time.
app_.getHashRouter().addSuppressionPeer(hash, id_);
setPublisherListSequence(pubKey, sequence);
});
}
@@ -1859,6 +1859,202 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
}
}
void
PeerImp::onValidatorListMessage(
std::string const& messageType,
std::string const& manifest,
std::uint32_t version,
std::vector<ValidatorBlobInfo> const& blobs)
{
// If there are no blobs, the message is malformed (possibly because of
// ValidatorList class rules), so charge accordingly and skip processing.
if (blobs.empty())
{
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
<< " from peer " << remote_address_;
// This shouldn't ever happen with a well-behaved peer
fee_ = Resource::feeHighBurdenPeer;
return;
}
auto const hash = sha512Half(manifest, blobs, version);
JLOG(p_journal_.debug())
<< "Received " << messageType << " from " << remote_address_.to_string()
<< " (" << id_ << ")";
if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
{
JLOG(p_journal_.debug())
<< messageType << ": received duplicate " << messageType;
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
return;
}
auto const applyResult = app_.validators().applyListsAndBroadcast(
manifest,
version,
blobs,
remote_address_.to_string(),
hash,
app_.overlay(),
app_.getHashRouter(),
app_.getOPs());
JLOG(p_journal_.debug())
<< "Processed " << messageType << " version " << version << " from "
<< (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
: "unknown or invalid publisher")
<< " from " << remote_address_.to_string() << " (" << id_
<< ") with best result " << to_string(applyResult.bestDisposition());
// Act based on the best result
switch (applyResult.bestDisposition())
{
// New list
case ListDisposition::accepted:
// Newest list is expired, and that needs to be broadcast, too
case ListDisposition::expired:
// Future list
case ListDisposition::pending: {
std::lock_guard<std::mutex> sl(recentLock_);
assert(applyResult.publisherKey);
auto const& pubKey = *applyResult.publisherKey;
#ifndef NDEBUG
if (auto const iter = publisherListSequences_.find(pubKey);
iter != publisherListSequences_.end())
{
assert(iter->second < applyResult.sequence);
}
#endif
publisherListSequences_[pubKey] = applyResult.sequence;
}
break;
case ListDisposition::same_sequence:
case ListDisposition::known_sequence:
#ifndef NDEBUG
{
std::lock_guard<std::mutex> sl(recentLock_);
assert(applyResult.sequence && applyResult.publisherKey);
assert(
publisherListSequences_[*applyResult.publisherKey] <=
applyResult.sequence);
}
#endif // !NDEBUG
break;
case ListDisposition::stale:
case ListDisposition::untrusted:
case ListDisposition::invalid:
case ListDisposition::unsupported_version:
break;
default:
assert(false);
}
// Charge based on the worst result
switch (applyResult.worstDisposition())
{
case ListDisposition::accepted:
case ListDisposition::expired:
case ListDisposition::pending:
// No charges for good data
break;
case ListDisposition::same_sequence:
case ListDisposition::known_sequence:
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
break;
case ListDisposition::stale:
// There are very few good reasons for a peer to send an
// old list, particularly more than once.
fee_ = Resource::feeBadData;
break;
case ListDisposition::untrusted:
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
break;
case ListDisposition::invalid:
// This shouldn't ever happen with a well-behaved peer
fee_ = Resource::feeInvalidSignature;
break;
case ListDisposition::unsupported_version:
// During a version transition, this may be legitimate.
// If it happens frequently, that's probably bad.
fee_ = Resource::feeBadData;
break;
default:
assert(false);
}
// Log based on all the results.
for (auto const [disp, count] : applyResult.dispositions)
{
switch (disp)
{
// New list
case ListDisposition::accepted:
JLOG(p_journal_.debug())
<< "Applied " << count << " new " << messageType
<< "(s) from peer " << remote_address_;
break;
// Newest list is expired, and that needs to be broadcast, too
case ListDisposition::expired:
JLOG(p_journal_.debug())
<< "Applied " << count << " expired " << messageType
<< "(s) from peer " << remote_address_;
break;
// Future list
case ListDisposition::pending:
JLOG(p_journal_.debug())
<< "Processed " << count << " future " << messageType
<< "(s) from peer " << remote_address_;
break;
case ListDisposition::same_sequence:
JLOG(p_journal_.warn())
<< "Ignored " << count << " " << messageType
<< "(s) with current sequence from peer "
<< remote_address_;
break;
case ListDisposition::known_sequence:
JLOG(p_journal_.warn())
<< "Ignored " << count << " " << messageType
<< "(s) with future sequence from peer " << remote_address_;
break;
case ListDisposition::stale:
JLOG(p_journal_.warn())
<< "Ignored " << count << "stale " << messageType
<< "(s) from peer " << remote_address_;
break;
case ListDisposition::untrusted:
JLOG(p_journal_.warn())
<< "Ignored " << count << " untrusted " << messageType
<< "(s) from peer " << remote_address_;
break;
case ListDisposition::unsupported_version:
JLOG(p_journal_.warn())
<< "Ignored " << count << "unsupported version "
<< messageType << "(s) from peer " << remote_address_;
break;
case ListDisposition::invalid:
JLOG(p_journal_.warn())
<< "Ignored " << count << "invalid " << messageType
<< "(s) from peer " << remote_address_;
break;
default:
assert(false);
}
}
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
{
@@ -1873,117 +2069,11 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
fee_ = Resource::feeUnwantedData;
return;
}
auto const& manifest = m->manifest();
auto const& blob = m->blob();
auto const& signature = m->signature();
auto const version = m->version();
auto const hash = sha512Half(manifest, blob, signature, version);
JLOG(p_journal_.debug())
<< "Received validator list from " << remote_address_.to_string()
<< " (" << id_ << ")";
if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
{
JLOG(p_journal_.debug())
<< "ValidatorList: received duplicate validator list";
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
return;
}
auto const applyResult = app_.validators().applyListAndBroadcast(
manifest,
blob,
signature,
version,
remote_address_.to_string(),
hash,
app_.overlay(),
app_.getHashRouter());
auto const disp = applyResult.disposition;
JLOG(p_journal_.debug())
<< "Processed validator list from "
<< (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
: "unknown or invalid publisher")
<< " from " << remote_address_.to_string() << " (" << id_
<< ") with result " << to_string(disp);
switch (disp)
{
case ListDisposition::accepted:
JLOG(p_journal_.debug())
<< "Applied new validator list from peer "
<< remote_address_;
{
std::lock_guard<std::mutex> sl(recentLock_);
assert(applyResult.sequence && applyResult.publisherKey);
auto const& pubKey = *applyResult.publisherKey;
#ifndef NDEBUG
if (auto const iter = publisherListSequences_.find(pubKey);
iter != publisherListSequences_.end())
{
assert(iter->second < *applyResult.sequence);
}
#endif
publisherListSequences_[pubKey] = *applyResult.sequence;
}
break;
case ListDisposition::same_sequence:
JLOG(p_journal_.warn())
<< "Validator list with current sequence from peer "
<< remote_address_;
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
#ifndef NDEBUG
{
std::lock_guard<std::mutex> sl(recentLock_);
assert(applyResult.sequence && applyResult.publisherKey);
assert(
publisherListSequences_[*applyResult.publisherKey] ==
*applyResult.sequence);
}
#endif // !NDEBUG
break;
case ListDisposition::stale:
JLOG(p_journal_.warn())
<< "Stale validator list from peer " << remote_address_;
// There are very few good reasons for a peer to send an
// old list, particularly more than once.
fee_ = Resource::feeBadData;
break;
case ListDisposition::untrusted:
JLOG(p_journal_.warn())
<< "Untrusted validator list from peer " << remote_address_;
// Charging this fee here won't hurt the peer in the normal
// course of operation (ie. refresh every 5 minutes), but
// will add up if the peer is misbehaving.
fee_ = Resource::feeUnwantedData;
break;
case ListDisposition::invalid:
JLOG(p_journal_.warn())
<< "Invalid validator list from peer " << remote_address_;
// This shouldn't ever happen with a well-behaved peer
fee_ = Resource::feeInvalidSignature;
break;
case ListDisposition::unsupported_version:
JLOG(p_journal_.warn())
<< "Unsupported version validator list from peer "
<< remote_address_;
// During a version transition, this may be legitimate.
// If it happens frequently, that's probably bad.
fee_ = Resource::feeBadData;
break;
default:
assert(false);
}
onValidatorListMessage(
"ValidatorList",
m->manifest(),
m->version(),
ValidatorList::parseBlobs(*m));
}
catch (std::exception const& e)
{
@@ -1993,6 +2083,45 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
}
}
void
PeerImp::onMessage(
std::shared_ptr<protocol::TMValidatorListCollection> const& m)
{
try
{
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
{
JLOG(p_journal_.debug())
<< "ValidatorListCollection: received validator list from peer "
<< "using protocol version " << to_string(protocol_)
<< " which shouldn't support this feature.";
fee_ = Resource::feeUnwantedData;
return;
}
else if (m->version() < 2)
{
JLOG(p_journal_.debug())
<< "ValidatorListCollection: received invalid validator list "
"version "
<< m->version() << " from peer using protocol version "
<< to_string(protocol_);
fee_ = Resource::feeBadData;
return;
}
onValidatorListMessage(
"ValidatorListCollection",
m->manifest(),
m->version(),
ValidatorList::parseBlobs(*m));
}
catch (std::exception const& e)
{
JLOG(p_journal_.warn()) << "ValidatorListCollection: Exception, "
<< e.what() << " from peer " << remote_address_;
fee_ = Resource::feeBadData;
}
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
{

View File

@@ -43,6 +43,8 @@
namespace ripple {
struct ValidatorBlobInfo;
class PeerImp : public Peer,
public std::enable_shared_from_this<PeerImp>,
public OverlayImpl::Child
@@ -509,6 +511,8 @@ public:
void
onMessage(std::shared_ptr<protocol::TMValidatorList> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidatorListCollection> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
@@ -527,6 +531,13 @@ private:
void
doFetchPack(const std::shared_ptr<protocol::TMGetObjectByHash>& packet);
void
onValidatorListMessage(
std::string const& messageType,
std::string const& manifest,
std::uint32_t version,
std::vector<ValidatorBlobInfo> const& blobs);
void
checkTransaction(
int flags,

View File

@@ -73,6 +73,8 @@ protocolMessageName(int type)
return "have_set";
case protocol::mtVALIDATORLIST:
return "validator_list";
case protocol::mtVALIDATORLISTCOLLECTION:
return "validator_list_collection";
case protocol::mtVALIDATION:
return "validation";
case protocol::mtGET_OBJECTS:
@@ -222,11 +224,10 @@ parseMessageHeader(
template <
class T,
class Buffers,
class Handler,
class = std::enable_if_t<
std::is_base_of<::google::protobuf::Message, T>::value>>
bool
invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
std::shared_ptr<T>
parseMessageContent(MessageHeader const& header, Buffers const& buffers)
{
auto const m = std::make_shared<T>();
@@ -246,9 +247,25 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
header.algorithm);
if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
return false;
return {};
}
else if (!m->ParseFromZeroCopyStream(&stream))
return {};
return m;
}
template <
class T,
class Buffers,
class Handler,
class = std::enable_if_t<
std::is_base_of<::google::protobuf::Message, T>::value>>
bool
invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
{
auto const m = parseMessageContent<T>(header, buffers);
if (!m)
return false;
handler.onMessageBegin(header.message_type, m, header.payload_wire_size);
@@ -300,8 +317,8 @@ invokeProtocolMessage(
// whose size exceeds this may result in the connection being dropped. A
// larger message size may be supported in the future or negotiated as
// part of a protocol upgrade.
if (header->payload_wire_size > megabytes(64) ||
header->uncompressed_size > megabytes(64))
if (header->payload_wire_size > maximiumMessageSize ||
header->uncompressed_size > maximiumMessageSize)
{
result.second = make_error_code(boost::system::errc::message_size);
return result;
@@ -391,6 +408,10 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMValidatorList>(
*header, buffers, handler);
break;
case protocol::mtVALIDATORLISTCOLLECTION:
success = detail::invoke<protocol::TMValidatorListCollection>(
*header, buffers, handler);
break;
case protocol::mtGET_OBJECTS:
success = detail::invoke<protocol::TMGetObjectByHash>(
*header, buffers, handler);

View File

@@ -38,7 +38,8 @@ constexpr ProtocolVersion const supportedProtocolList[]
{
{1, 2},
{2, 0},
{2, 1}
{2, 1},
{2, 2}
};
// clang-format on

View File

@@ -48,7 +48,8 @@ TrafficCount::categorize(
if (type == protocol::mtTRANSACTION)
return TrafficCount::category::transaction;
if (type == protocol::mtVALIDATORLIST)
if (type == protocol::mtVALIDATORLIST ||
type == protocol::mtVALIDATORLISTCOLLECTION)
return TrafficCount::category::validatorlist;
if (type == protocol::mtVALIDATION)

View File

@@ -24,6 +24,7 @@ enum MessageType
mtPEER_SHARD_INFO = 53;
mtVALIDATORLIST = 54;
mtSQUELCH = 55;
mtVALIDATORLISTCOLLECTION = 56;
}
// token, iterations, target, challenge = issue demand for proof of work
@@ -203,7 +204,7 @@ message TMHaveTransactionSet
required bytes hash = 2;
}
// Validator list
// Validator list (UNL)
message TMValidatorList
{
required bytes manifest = 1;
@@ -212,6 +213,22 @@ message TMValidatorList
required uint32 version = 4;
}
// Validator List v2
message ValidatorBlobInfo
{
optional bytes manifest = 1;
required bytes blob = 2;
required bytes signature = 3;
}
// Collection of Validator List v2 (UNL)
message TMValidatorListCollection
{
required uint32 version = 1;
required bytes manifest = 2;
repeated ValidatorBlobInfo blobs = 3;
}
// Used to sign a final closed ledger after reprocessing
message TMValidation
{

View File

@@ -134,8 +134,9 @@ enum error_code_i {
rpcDB_DESERIALIZATION = 77,
rpcEXCESSIVE_LGR_RANGE = 78,
rpcINVALID_LGR_RANGE = 79,
rpcEXPIRED_VALIDATOR_LIST = 80,
rpcLAST =
rpcINVALID_LGR_RANGE // rpcLAST should always equal the last code.=
rpcEXPIRED_VALIDATOR_LIST // rpcLAST should always equal the last code.
};
/** Codes returned in the `warnings` array of certain RPC commands.
@@ -145,6 +146,7 @@ enum error_code_i {
enum warning_code_i {
warnRPC_UNSUPPORTED_MAJORITY = 1001,
warnRPC_AMENDMENT_BLOCKED = 1002,
warnRPC_EXPIRED_VALIDATOR_LIST = 1003,
};
//------------------------------------------------------------------------------

View File

@@ -39,6 +39,7 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
{rpcAMENDMENT_BLOCKED,
"amendmentBlocked",
"Amendment blocked, need upgrade."},
{rpcEXPIRED_VALIDATOR_LIST, "unlBlocked", "Validator list expired."},
{rpcATX_DEPRECATED,
"deprecated",
"Use the new API or specify a ledger range."},

View File

@@ -143,6 +143,8 @@ JSS(bids); // out: Subscribe
JSS(binary); // in: AccountTX, LedgerEntry,
// AccountTxOld, Tx LedgerData
JSS(blob); // out: ValidatorList
JSS(blobs_v2); // out: ValidatorList
// in: UNL
JSS(books); // in: Subscribe, Unsubscribe
JSS(both); // in: Subscribe, Unsubscribe
JSS(both_sides); // in: Subscribe, Unsubscribe
@@ -211,6 +213,8 @@ JSS(directory); // in: LedgerEntry
JSS(domain); // out: ValidatorInfo, Manifest
JSS(drops); // out: TxQ
JSS(duration_us); // out: NetworkOPs
JSS(effective); // out: ValidatorList
// in: UNL
JSS(enabled); // out: AmendmentTable
JSS(engine_result); // out: NetworkOPs, TransactionSign, Submit
JSS(engine_result_code); // out: NetworkOPs, TransactionSign, Submit
@@ -439,8 +443,10 @@ JSS(random); // out: Random
JSS(raw_meta); // out: AcceptedLedgerTx
JSS(receive_currencies); // out: AccountCurrencies
JSS(reference_level); // out: TxQ
JSS(refresh_interval); // in: UNL
JSS(refresh_interval_min); // out: ValidatorSites
JSS(regular_seed); // in/out: LedgerEntry
JSS(remaining); // out: ValidatorList
JSS(remote); // out: Logic.h
JSS(request); // RPC
JSS(requested); // out: Manifest
@@ -471,6 +477,7 @@ JSS(seq); // in: LedgerEntry;
// out: NetworkOPs, RPCSub, AccountOffers,
// ValidatorList, ValidatorInfo, Manifest
JSS(seqNum); // out: LedgerToJson
JSS(sequence); // in: UNL
JSS(sequence_count); // out: AccountInfo
JSS(server_domain); // out: NetworkOPs
JSS(server_state); // out: NetworkOPs

View File

@@ -77,6 +77,20 @@ template <class T>
error_code_i
conditionMet(Condition condition_required, T& context)
{
if (context.app.getOPs().isAmendmentBlocked() &&
(condition_required & NEEDS_CURRENT_LEDGER ||
condition_required & NEEDS_CLOSED_LEDGER))
{
return rpcAMENDMENT_BLOCKED;
}
if (context.app.getOPs().isUNLBlocked() &&
(condition_required & NEEDS_CURRENT_LEDGER ||
condition_required & NEEDS_CLOSED_LEDGER))
{
return rpcEXPIRED_VALIDATOR_LIST;
}
if ((condition_required & NEEDS_NETWORK_CONNECTION) &&
(context.netOps.getOperatingMode() < OperatingMode::SYNCING))
{
@@ -88,13 +102,6 @@ conditionMet(Condition condition_required, T& context)
return rpcNOT_SYNCED;
}
if (context.app.getOPs().isAmendmentBlocked() &&
(condition_required & NEEDS_CURRENT_LEDGER ||
condition_required & NEEDS_CLOSED_LEDGER))
{
return rpcAMENDMENT_BLOCKED;
}
if (!context.app.config().standalone() &&
condition_required & NEEDS_CURRENT_LEDGER)
{

File diff suppressed because it is too large Load Diff

View File

@@ -53,6 +53,7 @@ realValidatorContents()
}
auto constexpr default_expires = std::chrono::seconds{3600};
auto constexpr default_effective_overlap = std::chrono::seconds{30};
} // namespace detail
class ValidatorSite_test : public beast::unit_test::suite
@@ -135,6 +136,8 @@ private:
bool failApply = false;
int serverVersion = 1;
std::chrono::seconds expiresFromNow = detail::default_expires;
std::chrono::seconds effectiveOverlap =
detail::default_effective_overlap;
int expectedRefreshMin = 0;
};
void
@@ -146,13 +149,17 @@ private:
boost::adaptors::transformed(
[](FetchListConfig const& cfg) {
return cfg.path +
(cfg.ssl ? " [https]" : " [http]");
(cfg.ssl ? " [https] v" : " [http] v") +
std::to_string(cfg.serverVersion) +
" " + cfg.msg;
}),
", ");
using namespace jtx;
using namespace std::chrono_literals;
Env env(*this);
auto& trustedKeys = env.app().validators();
env.timeKeeper().set(env.timeKeeper().now() + 30s);
test::StreamSink sink;
beast::Journal journal{sink};
@@ -184,10 +191,17 @@ private:
while (item.list.size() < listSize)
item.list.push_back(TrustedPublisherServer::randomValidator());
NetClock::time_point const expires =
env.timeKeeper().now() + cfg.expiresFromNow;
NetClock::time_point const effective2 =
expires - cfg.effectiveOverlap;
NetClock::time_point const expires2 =
effective2 + cfg.expiresFromNow;
item.server = make_TrustedPublisherServer(
env.app().getIOService(),
item.list,
env.timeKeeper().now() + cfg.expiresFromNow,
expires,
{{effective2, expires2}},
cfg.ssl,
cfg.serverVersion);
cfgPublishers.push_back(strHex(item.server->publisherPublic()));
@@ -201,7 +215,6 @@ private:
BEAST_EXPECT(
trustedKeys.load(emptyLocalKey, emptyCfgKeys, cfgPublishers));
using namespace std::chrono_literals;
// Normally, tests will only need a fraction of this time,
// but sometimes DNS resolution takes an inordinate amount
// of time, so the test will just wait.
@@ -381,8 +394,15 @@ public:
{
// fetch single site
testFetchList({{"/validators", "", ssl}});
testFetchList({{"/validators2", "", ssl}});
// fetch multiple sites
testFetchList({{"/validators", "", ssl}, {"/validators", "", ssl}});
testFetchList(
{{"/validators", "", ssl}, {"/validators2", "", ssl}});
testFetchList(
{{"/validators2", "", ssl}, {"/validators", "", ssl}});
testFetchList(
{{"/validators2", "", ssl}, {"/validators2", "", ssl}});
// fetch single site with single redirects
testFetchList({{"/redirect_once/301", "", ssl}});
testFetchList({{"/redirect_once/302", "", ssl}});
@@ -391,6 +411,19 @@ public:
// one redirect, one not
testFetchList(
{{"/validators", "", ssl}, {"/redirect_once/302", "", ssl}});
testFetchList(
{{"/validators2", "", ssl}, {"/redirect_once/302", "", ssl}});
// UNLs with a "gap" between validUntil of one and validFrom of the
// next
testFetchList(
{{"/validators2",
"",
ssl,
false,
false,
1,
detail::default_expires,
std::chrono::seconds{-90}}});
// fetch single site with undending redirect (fails to load)
testFetchList(
{{"/redirect_forever/301",
@@ -418,6 +451,14 @@ public:
ssl,
true,
true}});
// one undending redirect, one not
testFetchList(
{{"/validators2", "", ssl},
{"/redirect_forever/302",
"Exceeded max redirects",
ssl,
true,
true}});
// invalid redir Location
testFetchList(
{{"/redirect_to/ftp://invalid-url/302",
@@ -438,6 +479,12 @@ public:
ssl,
true,
true}});
testFetchList(
{{"/validators2/bad",
"Unable to parse JSON response",
ssl,
true,
true}});
// error status returned
testFetchList(
{{"/bad-resource", "returned bad status", ssl, true, true}});
@@ -455,30 +502,96 @@ public:
ssl,
true,
true}});
testFetchList(
{{"/validators2/missing",
"Missing fields in JSON response",
ssl,
true,
true}});
// timeout
testFetchList({{"/sleep/13", "took too long", ssl, true, true}});
// bad manifest format using known versions
// * Retrieves a v1 formatted list claiming version 2
testFetchList(
{{"/validators", "Missing fields", ssl, true, true, 2}});
// * Retrieves a v2 formatted list claiming version 1
testFetchList(
{{"/validators2", "Missing fields", ssl, true, true, 0}});
// bad manifest version
// Because versions other than 1 are treated as v2, the v1
// list won't have the blobs_v2 fields, and thus will claim to have
// missing fields
testFetchList(
{{"/validators", "Unsupported version", ssl, false, true, 4}});
using namespace std::chrono_literals;
// get old validator list
{{"/validators", "Missing fields", ssl, true, true, 4}});
testFetchList(
{{"/validators",
"Stale validator list",
{{"/validators2",
"1 unsupported version",
ssl,
false,
true,
1,
0s}});
// force an out-of-range expiration value
4}});
using namespace std::chrono_literals;
// get expired validator list
testFetchList(
{{"/validators",
"Invalid validator list",
"Applied 1 expired validator list(s)",
ssl,
false,
false,
1,
0s}});
testFetchList(
{{"/validators2",
"Applied 1 expired validator list(s)",
ssl,
false,
false,
1,
0s,
-1s}});
// force an out-of-range validUntil value
testFetchList(
{{"/validators",
"1 invalid validator list(s)",
ssl,
false,
true,
1,
std::chrono::seconds{Json::Value::maxInt + 1}}});
// force an out-of-range validUntil value on the future list
// The first list is accepted. The second fails. The parser
// returns the "best" result, so this looks like a success.
testFetchList(
{{"/validators2",
"",
ssl,
false,
false,
1,
std::chrono::seconds{Json::Value::maxInt - 300},
299s}});
// force an out-of-range validFrom value
// The first list is accepted. The second fails. The parser
// returns the "best" result, so this looks like a success.
testFetchList(
{{"/validators2",
"",
ssl,
false,
false,
1,
std::chrono::seconds{Json::Value::maxInt - 300},
301s}});
// force an out-of-range validUntil value on _both_ lists
testFetchList(
{{"/validators2",
"2 invalid validator list(s)",
ssl,
false,
true,
1,
std::chrono::seconds{Json::Value::maxInt + 1},
std::chrono::seconds{Json::Value::maxInt - 6000}}});
// verify refresh intervals are properly clamped
testFetchList(
{{"/validators/refresh/0",
@@ -488,6 +601,17 @@ public:
false,
1,
detail::default_expires,
detail::default_effective_overlap,
1}}); // minimum of 1 minute
testFetchList(
{{"/validators2/refresh/0",
"",
ssl,
false,
false,
1,
detail::default_expires,
detail::default_effective_overlap,
1}}); // minimum of 1 minute
testFetchList(
{{"/validators/refresh/10",
@@ -497,6 +621,17 @@ public:
false,
1,
detail::default_expires,
detail::default_effective_overlap,
10}}); // 10 minutes is fine
testFetchList(
{{"/validators2/refresh/10",
"",
ssl,
false,
false,
1,
detail::default_expires,
detail::default_effective_overlap,
10}}); // 10 minutes is fine
testFetchList(
{{"/validators/refresh/2000",
@@ -506,6 +641,17 @@ public:
false,
1,
detail::default_expires,
detail::default_effective_overlap,
60 * 24}}); // max of 24 hours
testFetchList(
{{"/validators2/refresh/2000",
"",
ssl,
false,
false,
1,
detail::default_expires,
detail::default_effective_overlap,
60 * 24}}); // max of 24 hours
}
testFileURLs();

View File

@@ -1860,7 +1860,12 @@ class NegativeUNLVoteFilterValidations_test : public beast::unit_test::suite
auto& local = *nUnlKeys.begin();
std::vector<std::string> cfgPublishers;
validators.load(local, cfgKeys, cfgPublishers);
validators.updateTrusted(activeValidators);
validators.updateTrusted(
activeValidators,
env.timeKeeper().now(),
env.app().getOPs(),
env.app().overlay(),
env.app().getHashRouter());
BEAST_EXPECT(validators.getTrustedMasterKeys().size() == numNodes);
validators.setNegativeUNL(nUnlKeys);
BEAST_EXPECT(validators.getNegativeUNL().size() == negUnlSize);

View File

@@ -57,7 +57,12 @@ class TrustedPublisherServer
socket_type sock_;
endpoint_type ep_;
boost::asio::ip::tcp::acceptor acceptor_;
// Generates a version 1 validator list, using the int parameter as the
// actual version.
std::function<std::string(int)> getList_;
// Generates a version 2 validator list, using the int parameter as the
// actual version.
std::function<std::string(int)> getList2_;
// The SSL context is required, and holds certificates
bool useSSL_;
@@ -91,6 +96,18 @@ class TrustedPublisherServer
sslCtx_.use_tmp_dh(boost::asio::buffer(dh().data(), dh().size()));
}
struct BlobInfo
{
BlobInfo(std::string b, std::string s) : blob(b), signature(s)
{
}
// base-64 encoded JSON containing the validator list.
std::string blob;
// hex-encoded signature of the blob using the publisher's signing key
std::string signature;
};
public:
struct Validator
{
@@ -144,14 +161,19 @@ public:
1)};
}
// TrustedPublisherServer must be accessed through a shared_ptr
// TrustedPublisherServer must be accessed through a shared_ptr.
// This constructor is only public so std::make_shared has access.
// The function`make_TrustedPublisherServer` should be used to create
// instances.
// The `futures` member is expected to be structured as
// effective / expiration time point pairs for use in version 2 UNLs
TrustedPublisherServer(
boost::asio::io_context& ioc,
std::vector<Validator> const& validators,
NetClock::time_point expiration,
NetClock::time_point validUntil,
std::vector<
std::pair<NetClock::time_point, NetClock::time_point>> const&
futures,
bool useSSL = false,
int version = 1,
bool immediateStart = true,
@@ -170,29 +192,80 @@ public:
auto const manifest = makeManifestString(
publisherPublic_, publisherSecret_, keys.first, keys.second, 1);
std::vector<BlobInfo> blobInfo;
blobInfo.reserve(futures.size() + 1);
auto const [data, blob] = [&]() -> std::pair<std::string, std::string> {
// Builds the validator list, then encodes it into a blob.
std::string data = "{\"sequence\":" + std::to_string(sequence) +
",\"expiration\":" +
std::to_string(expiration.time_since_epoch().count()) +
std::to_string(validUntil.time_since_epoch().count()) +
",\"validators\":[";
for (auto const& val : validators)
{
data += "{\"validation_public_key\":\"" + strHex(val.masterPublic) +
"\",\"manifest\":\"" + val.manifest + "\"},";
data += "{\"validation_public_key\":\"" +
strHex(val.masterPublic) + "\",\"manifest\":\"" +
val.manifest + "\"},";
}
data.pop_back();
data += "]}";
std::string blob = base64_encode(data);
auto const sig = sign(keys.first, keys.second, makeSlice(data));
getList_ = [blob, sig, manifest, version](int interval) {
return std::make_pair(data, blob);
}();
auto const sig = strHex(sign(keys.first, keys.second, makeSlice(data)));
blobInfo.emplace_back(blob, sig);
getList_ = [blob = blob, sig, manifest, version](int interval) {
// Build the contents of a version 1 format UNL file
std::stringstream l;
l << "{\"blob\":\"" << blob << "\""
<< ",\"signature\":\"" << strHex(sig) << "\""
<< ",\"signature\":\"" << sig << "\""
<< ",\"manifest\":\"" << manifest << "\""
<< ",\"refresh_interval\": " << interval
<< ",\"version\":" << version << '}';
return l.str();
};
for (auto const& future : futures)
{
std::string data = "{\"sequence\":" + std::to_string(++sequence) +
",\"effective\":" +
std::to_string(future.first.time_since_epoch().count()) +
",\"expiration\":" +
std::to_string(future.second.time_since_epoch().count()) +
",\"validators\":[";
// Use the same set of validators for simplicity
for (auto const& val : validators)
{
data += "{\"validation_public_key\":\"" +
strHex(val.masterPublic) + "\",\"manifest\":\"" +
val.manifest + "\"},";
}
data.pop_back();
data += "]}";
std::string blob = base64_encode(data);
auto const sig =
strHex(sign(keys.first, keys.second, makeSlice(data)));
blobInfo.emplace_back(blob, sig);
}
getList2_ = [blobInfo, manifest, version](int interval) {
// Build the contents of a version 2 format UNL file
// Use `version + 1` to get 2 for most tests, but have
// a "bad" version number for tests that provide an override.
std::stringstream l;
for (auto const& info : blobInfo)
{
l << "{\"blob\":\"" << info.blob << "\""
<< ",\"signature\":\"" << info.signature << "\"},";
}
std::string blobs = l.str();
blobs.pop_back();
l.str(std::string());
l << "{\"blobs_v2\": [ " << blobs << "],\"manifest\":\"" << manifest
<< "\""
<< ",\"refresh_interval\": " << interval
<< ",\"version\":" << (version + 1) << '}';
return l.str();
};
if (useSSL_)
{
@@ -505,7 +578,26 @@ private:
res.keep_alive(req.keep_alive());
bool prepare = true;
if (boost::starts_with(path, "/validators"))
if (boost::starts_with(path, "/validators2"))
{
res.result(http::status::ok);
res.insert("Content-Type", "application/json");
if (path == "/validators2/bad")
res.body() = "{ 'bad': \"2']";
else if (path == "/validators2/missing")
res.body() = "{\"version\": 2}";
else
{
int refresh = 5;
constexpr char const* refreshPrefix =
"/validators2/refresh/";
if (boost::starts_with(path, refreshPrefix))
refresh = boost::lexical_cast<unsigned int>(
path.substr(strlen(refreshPrefix)));
res.body() = getList2_(refresh);
}
}
else if (boost::starts_with(path, "/validators"))
{
res.result(http::status::ok);
res.insert("Content-Type", "application/json");
@@ -516,9 +608,11 @@ private:
else
{
int refresh = 5;
if (boost::starts_with(path, "/validators/refresh"))
constexpr char const* refreshPrefix =
"/validators/refresh/";
if (boost::starts_with(path, refreshPrefix))
refresh = boost::lexical_cast<unsigned int>(
path.substr(20));
path.substr(strlen(refreshPrefix)));
res.body() = getList_(refresh);
}
}
@@ -618,14 +712,16 @@ inline std::shared_ptr<TrustedPublisherServer>
make_TrustedPublisherServer(
boost::asio::io_context& ioc,
std::vector<TrustedPublisherServer::Validator> const& validators,
NetClock::time_point expiration,
NetClock::time_point validUntil,
std::vector<std::pair<NetClock::time_point, NetClock::time_point>> const&
futures,
bool useSSL = false,
int version = 1,
bool immediateStart = true,
int sequence = 1)
{
auto const r = std::make_shared<TrustedPublisherServer>(
ioc, validators, expiration, useSSL, version, sequence);
ioc, validators, validUntil, futures, useSSL, version, sequence);
if (immediateStart)
r->start();
return r;

View File

@@ -40,6 +40,8 @@ class DatabaseDownloader_test : public beast::unit_test::suite
env.app().getIOService(),
list,
env.timeKeeper().now() + std::chrono::seconds{3600},
// No future VLs
{},
ssl);
}

View File

@@ -344,11 +344,43 @@ public:
return list;
}
std::shared_ptr<protocol::TMValidatorListCollection>
buildValidatorListCollection()
{
auto list = std::make_shared<protocol::TMValidatorListCollection>();
auto master = randomKeyPair(KeyType::ed25519);
auto signing = randomKeyPair(KeyType::ed25519);
STObject st(sfGeneric);
st[sfSequence] = 0;
st[sfPublicKey] = std::get<0>(master);
st[sfSigningPubKey] = std::get<0>(signing);
st[sfDomain] = makeSlice(std::string("example.com"));
sign(
st,
HashPrefix::manifest,
KeyType::ed25519,
std::get<1>(master),
sfMasterSignature);
sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(signing));
Serializer s;
st.add(s);
list->set_manifest(s.data(), s.size());
list->set_version(4);
STObject signature(sfSignature);
ripple::sign(
st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(signing));
Serializer s1;
st.add(s1);
auto& blob = *list->add_blobs();
blob.set_signature(s1.data(), s1.size());
blob.set_blob(strHex(s.getString()));
return list;
}
void
testProtocol()
{
testcase("Message Compression");
auto thresh = beast::severities::Severity::kInfo;
auto logs = std::make_unique<Logs>(thresh);
@@ -359,6 +391,7 @@ public:
protocol::TMLedgerData ledger_data;
protocol::TMGetObjectByHash get_object;
protocol::TMValidatorList validator_list;
protocol::TMValidatorListCollection validator_list_collection;
// 4.5KB
doTest(buildManifests(20), protocol::mtMANIFESTS, 4, "TMManifests20");
@@ -418,6 +451,11 @@ public:
protocol::mtVALIDATORLIST,
4,
"TMValidatorList");
doTest(
buildValidatorListCollection(),
protocol::mtVALIDATORLISTCOLLECTION,
4,
"TMValidatorListCollection");
}
void

View File

@@ -46,6 +46,8 @@ class ShardArchiveHandler_test : public beast::unit_test::suite
env.app().getIOService(),
list,
env.timeKeeper().now() + std::chrono::seconds{3600},
// No future VLs
{},
ssl);
}

View File

@@ -189,12 +189,20 @@ public:
// Manage single-thread io_service for server.
BasicApp worker{1};
using namespace std::chrono_literals;
NetClock::time_point const expiration{3600s};
NetClock::time_point const validUntil{3600s};
NetClock::time_point const validFrom2{validUntil - 60s};
NetClock::time_point const validUntil2{validFrom2 + 3600s};
auto server = make_TrustedPublisherServer(
worker.get_io_service(), validators, expiration, false, 1, false);
worker.get_io_service(),
validators,
validUntil,
{{validFrom2, validUntil2}},
false,
1,
false);
//----------------------------------------------------------------------
// Publisher list site unavailable
// Publisher list site unavailable v1
{
// Publisher site information
using namespace std::string_literals;
@@ -261,11 +269,78 @@ public:
}
}
}
// Publisher list site unavailable v2
{
// Publisher site information
using namespace std::string_literals;
std::string siteURI =
"http://"s + getEnvLocalhostAddr() + ":1234/validators2";
Env env{
*this,
envconfig([&](std::unique_ptr<Config> cfg) {
cfg->section(SECTION_VALIDATOR_LIST_SITES).append(siteURI);
cfg->section(SECTION_VALIDATOR_LIST_KEYS)
.append(strHex(server->publisherPublic()));
return cfg;
}),
};
env.app().validatorSites().start();
env.app().validatorSites().join();
{
auto const jrr = env.rpc("server_info")[jss::result];
BEAST_EXPECT(
jrr[jss::info][jss::validator_list][jss::expiration] ==
"unknown");
}
{
auto const jrr = env.rpc("server_state")[jss::result];
BEAST_EXPECT(
jrr[jss::state][jss::validator_list_expires].asInt() == 0);
}
{
auto const jrr = env.rpc("validators")[jss::result];
BEAST_EXPECT(
jrr[jss::validation_quorum].asUInt() ==
std::numeric_limits<std::uint32_t>::max());
BEAST_EXPECT(jrr[jss::local_static_keys].size() == 0);
BEAST_EXPECT(jrr[jss::trusted_validator_keys].size() == 0);
BEAST_EXPECT(
jrr[jss::validator_list][jss::expiration] == "unknown");
if (BEAST_EXPECT(jrr[jss::publisher_lists].size() == 1))
{
auto jp = jrr[jss::publisher_lists][0u];
BEAST_EXPECT(jp[jss::available] == false);
BEAST_EXPECT(jp[jss::list].size() == 0);
BEAST_EXPECT(!jp.isMember(jss::seq));
BEAST_EXPECT(!jp.isMember(jss::expiration));
BEAST_EXPECT(!jp.isMember(jss::version));
BEAST_EXPECT(
jp[jss::pubkey_publisher] ==
strHex(server->publisherPublic()));
}
BEAST_EXPECT(jrr[jss::signing_keys].size() == 0);
}
{
auto const jrr = env.rpc("validator_list_sites")[jss::result];
if (BEAST_EXPECT(jrr[jss::validator_sites].size() == 1))
{
auto js = jrr[jss::validator_sites][0u];
BEAST_EXPECT(js[jss::refresh_interval_min].asUInt() == 5);
BEAST_EXPECT(js[jss::uri] == siteURI);
BEAST_EXPECT(js.isMember(jss::last_refresh_time));
BEAST_EXPECT(js[jss::last_refresh_status] == "invalid");
}
}
}
//----------------------------------------------------------------------
// Publisher list site available
{
server->start();
// Publisher list site available v1
{
std::stringstream uri;
uri << "http://" << server->local_endpoint() << "/validators";
auto siteURI = uri.str();
@@ -286,26 +361,31 @@ public:
for (auto const& val : validators)
startKeys.insert(calcNodeID(val.masterPublic));
env.app().validators().updateTrusted(startKeys);
env.app().validators().updateTrusted(
startKeys,
env.timeKeeper().now(),
env.app().getOPs(),
env.app().overlay(),
env.app().getHashRouter());
{
auto const jrr = env.rpc("server_info")[jss::result];
BEAST_EXPECT(
jrr[jss::info][jss::validator_list][jss::expiration] ==
to_string(expiration));
to_string(validUntil));
}
{
auto const jrr = env.rpc("server_state")[jss::result];
BEAST_EXPECT(
jrr[jss::state][jss::validator_list_expires].asUInt() ==
expiration.time_since_epoch().count());
validUntil.time_since_epoch().count());
}
{
auto const jrr = env.rpc("validators")[jss::result];
BEAST_EXPECT(jrr[jss::validation_quorum].asUInt() == 2);
BEAST_EXPECT(
jrr[jss::validator_list][jss::expiration] ==
to_string(expiration));
to_string(validUntil));
BEAST_EXPECT(jrr[jss::local_static_keys].size() == 0);
BEAST_EXPECT(
@@ -334,7 +414,7 @@ public:
BEAST_EXPECT(
jp[jss::pubkey_publisher] ==
strHex(server->publisherPublic()));
BEAST_EXPECT(jp[jss::expiration] == to_string(expiration));
BEAST_EXPECT(jp[jss::expiration] == to_string(validUntil));
BEAST_EXPECT(jp[jss::version] == 1);
}
auto jsk = jrr[jss::signing_keys];
@@ -361,6 +441,129 @@ public:
}
}
}
// Publisher list site available v2
{
std::stringstream uri;
uri << "http://" << server->local_endpoint() << "/validators2";
auto siteURI = uri.str();
Env env{
*this,
envconfig([&](std::unique_ptr<Config> cfg) {
cfg->section(SECTION_VALIDATOR_LIST_SITES).append(siteURI);
cfg->section(SECTION_VALIDATOR_LIST_KEYS)
.append(strHex(server->publisherPublic()));
return cfg;
}),
};
env.app().validatorSites().start();
env.app().validatorSites().join();
hash_set<NodeID> startKeys;
for (auto const& val : validators)
startKeys.insert(calcNodeID(val.masterPublic));
env.app().validators().updateTrusted(
startKeys,
env.timeKeeper().now(),
env.app().getOPs(),
env.app().overlay(),
env.app().getHashRouter());
{
auto const jrr = env.rpc("server_info")[jss::result];
BEAST_EXPECT(
jrr[jss::info][jss::validator_list][jss::expiration] ==
to_string(validUntil2));
}
{
auto const jrr = env.rpc("server_state")[jss::result];
BEAST_EXPECT(
jrr[jss::state][jss::validator_list_expires].asUInt() ==
validUntil2.time_since_epoch().count());
}
{
auto const jrr = env.rpc("validators")[jss::result];
BEAST_EXPECT(jrr[jss::validation_quorum].asUInt() == 2);
BEAST_EXPECT(
jrr[jss::validator_list][jss::expiration] ==
to_string(validUntil2));
BEAST_EXPECT(jrr[jss::local_static_keys].size() == 0);
BEAST_EXPECT(
jrr[jss::trusted_validator_keys].size() ==
expectedKeys.size());
for (auto const& jKey : jrr[jss::trusted_validator_keys])
{
BEAST_EXPECT(expectedKeys.count(jKey.asString()) == 1);
}
if (BEAST_EXPECT(jrr[jss::publisher_lists].size() == 1))
{
auto jp = jrr[jss::publisher_lists][0u];
BEAST_EXPECT(jp[jss::available] == true);
if (BEAST_EXPECT(jp[jss::list].size() == 2))
{
// check entries
std::set<std::string> foundKeys;
for (auto const& k : jp[jss::list])
{
foundKeys.insert(k.asString());
}
BEAST_EXPECT(foundKeys == expectedKeys);
}
BEAST_EXPECT(jp[jss::seq].asUInt() == 1);
BEAST_EXPECT(
jp[jss::pubkey_publisher] ==
strHex(server->publisherPublic()));
BEAST_EXPECT(jp[jss::expiration] == to_string(validUntil));
BEAST_EXPECT(jp[jss::version] == 2);
if (BEAST_EXPECT(jp.isMember(jss::remaining)) &&
BEAST_EXPECT(jp[jss::remaining].isArray()) &&
BEAST_EXPECT(jp[jss::remaining].size() == 1))
{
auto const& r = jp[jss::remaining][0u];
if (BEAST_EXPECT(r[jss::list].size() == 2))
{
// check entries
std::set<std::string> foundKeys;
for (auto const& k : r[jss::list])
{
foundKeys.insert(k.asString());
}
BEAST_EXPECT(foundKeys == expectedKeys);
}
BEAST_EXPECT(r[jss::seq].asUInt() == 2);
BEAST_EXPECT(
r[jss::effective] == to_string(validFrom2));
BEAST_EXPECT(
r[jss::expiration] == to_string(validUntil2));
}
}
auto jsk = jrr[jss::signing_keys];
BEAST_EXPECT(jsk.size() == 2);
for (auto const& val : validators)
{
BEAST_EXPECT(jsk.isMember(toStr(val.masterPublic)));
BEAST_EXPECT(
jsk[toStr(val.masterPublic)] ==
toStr(val.signingPublic));
}
}
{
auto const jrr = env.rpc("validator_list_sites")[jss::result];
if (BEAST_EXPECT(jrr[jss::validator_sites].size() == 1))
{
auto js = jrr[jss::validator_sites][0u];
BEAST_EXPECT(js[jss::refresh_interval_min].asUInt() == 5);
BEAST_EXPECT(js[jss::uri] == siteURI);
BEAST_EXPECT(js[jss::last_refresh_status] == "accepted");
// The actual time of the update will vary run to run, so
// just verify the time is there
BEAST_EXPECT(js.isMember(jss::last_refresh_time));
}
}
}
}
void