Add consensus simulations

This commit is contained in:
Vinnie Falco
2015-08-13 15:42:36 -07:00
committed by Scott Schurr
parent b2cb4df29a
commit aa0e17dd93
13 changed files with 3265 additions and 446 deletions

View File

@@ -3892,10 +3892,26 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\unl\tests\BasicNetwork.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\unl\tests\Consensus_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\unl\tests\metrics.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\unl\tests\Network_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\unl\tests\qalloc.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim1.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim2.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim3.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim4.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\unl\tests\SlotPeer_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>

View File

@@ -4542,9 +4542,30 @@
<ClInclude Include="..\..\src\ripple\unl\tests\BasicNetwork.h">
<Filter>ripple\unl\tests</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\unl\tests\Consensus_test.cpp">
<Filter>ripple\unl\tests</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\unl\tests\metrics.h">
<Filter>ripple\unl\tests</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\unl\tests\Network_test.cpp">
<Filter>ripple\unl\tests</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\unl\tests\qalloc.h">
<Filter>ripple\unl\tests</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim1.h">
<Filter>ripple\unl\tests</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim2.h">
<Filter>ripple\unl\tests</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim3.h">
<Filter>ripple\unl\tests</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\unl\tests\Sim4.h">
<Filter>ripple\unl\tests</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\unl\tests\SlotPeer_test.cpp">
<Filter>ripple\unl\tests</Filter>
</ClCompile>

View File

@@ -19,5 +19,6 @@
#include <BeastConfig.h>
#include <ripple/unl/tests/Consensus_test.cpp>
#include <ripple/unl/tests/Network_test.cpp>
#include <ripple/unl/tests/SlotPeer_test.cpp>

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,48 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/unl/tests/metrics.h>
#include <ripple/unl/tests/Sim1.h>
#include <ripple/unl/tests/Sim2.h>
#include <ripple/unl/tests/Sim3.h>
#include <ripple/unl/tests/Sim4.h>
#include <beast/unit_test/suite.h>
namespace ripple {
namespace test {
class Consensus_test : public beast::unit_test::suite
{
public:
void
run()
{
Sim4<log_t>::run(log);
//Sim3<log_t>::run(log);
//Sim2<log_t>::run(log);
//Sim1::run(log);
pass();
}
};
BEAST_DEFINE_TESTSUITE_MANUAL(Consensus,sim,ripple);
}
}

View File

@@ -19,6 +19,7 @@
#include <BeastConfig.h>
#include <ripple/unl/tests/BasicNetwork.h>
#include <ripple/unl/tests/metrics.h>
#include <beast/unit_test/suite.h>
#include <boost/optional.hpp>
#include <algorithm>
@@ -57,7 +58,7 @@ public:
send (Net& net, Peer& from, Message&& m)
{
net.send (from, *this,
std::forward<Message>(m));
[&, m]() { receive(net, from, m); });
}
template <class Net>
@@ -69,8 +70,8 @@ public:
++p.hops;
set = true;
hops = p.hops;
for(auto& peer : net.peers(*this))
peer.send(net, *this, p);
for(auto& link : net.links(*this))
link.to.send(net, *this, p);
}
};
@@ -95,7 +96,7 @@ public:
send (Net& net, Peer& from, Message&& m)
{
net.send (from, *this,
std::forward<Message>(m));
[&, m]() { receive(net, from, m); });
}
template <class Net>
@@ -107,14 +108,13 @@ public:
++p.hops;
set = true;
hops = p.hops;
for(auto& peer : net.peers(*this))
peer.send(net, *this, p);
for(auto& link : net.links(*this))
link.to.send(net, *this, p);
}
};
template <class Peer>
struct Network
: BasicNetwork<Peer, Network<Peer>>
struct Network : BasicNetwork<Peer>
{
static std::size_t const nPeer = 10000;
static std::size_t const nDegree = 10;
@@ -168,9 +168,9 @@ public:
Net net;
net.pv[0].set = true;
net.pv[0].hops = 0;
for(auto& peer : net.peers(net.pv[0]))
peer.send(net, net.pv[0], Ping{});
net.run();
for(auto& link : net.links(net.pv[0]))
link.to.send(net, net.pv[0], Ping{});
net.step();
std::size_t reach = 0;
std::vector<int> dist;
std::vector<int> hops;
@@ -182,7 +182,7 @@ public:
++hops[peer.hops];
}
net.bfs(net.pv[0],
[&](Net& net, std::size_t d, Peer& peer)
[&](std::size_t d, Peer& peer)
{
++reach;
dist.resize(std::max<std::size_t>(

376
src/ripple/unl/tests/Sim1.h Normal file
View File

@@ -0,0 +1,376 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_UNL_SIM1_H_INCLUDED
#define RIPPLE_UNL_SIM1_H_INCLUDED
#include <ripple/unl/tests/BasicNetwork.h>
#include <boost/optional.hpp>
#include <algorithm>
#include <sstream>
#include <unordered_map>
#include <vector>
namespace ripple {
namespace test {
struct Sim1
{
struct Config
{
};
static int const nPeer = 100; // # of peers
static int const nDegree = 10; // outdegree
static int const nTrial = 10; // number of trials
static int const nRound = 1; // number of rounds
static int const nUNLMin = 20;
static int const nUNLMax = 30;
using clock_type = std::chrono::system_clock;
struct Network;
// A round of consensus.
// Each round consists of a series of votes,
// terminating when a supermajority is reached.
class Round
{
public:
static int const nPercent = 80; // % of agreement
int id_;
bool consensus_ = false;
std::unordered_map<int, std::pair<
std::size_t, bool>> pos_;
std::size_t count_ = 0;
clock_type::time_point t0_;
public:
// Create a new round with initial position
Round (int id, bool value,
clock_type::time_point now)
: id_ (id)
, t0_ (now)
{
pos_.emplace(std::make_pair(id,
std::make_pair(0, value)));
}
// Returns our value
bool
value() const
{
auto const iter = pos_.find(id_);
return iter->second.second;
}
// Return our position
// This increments the sequence number
std::pair<std::size_t, bool>
pos()
{
auto const iter = pos_.find(id_);
return { ++iter->second.first,
iter->second.second };
}
// Update a peer's position
// Return `true` if we should relay
bool
receive (int id,
std::size_t seq, bool value)
{
if (id == id_)
return false;
auto const result = pos_.emplace(
std::make_pair(id,
std::make_pair(seq, value)));
if (! result.second && seq <=
result.first->second.first)
return false;
result.first->second.first = seq;
result.first->second.second = value;
return true;
}
// Update our position
// Return `true` if we changed our position
template <class UNL>
bool
update (UNL const& unl,
clock_type::time_point const& now)
{
if (consensus_)
return false;
++count_;
std::array<std::size_t, 2> v;
v.fill(0);
for(auto const& p : pos_)
if (p.first == id_ ||
unl.count(p.first) > 0)
++v[p.second.second];
using namespace std::chrono;
auto const iter = pos_.find(id_);
auto const super =
((unl.size() * nPercent) + 50) / 100;
if (v[0] >= super || v[1] >= super)
consensus_ = true;
// agree to disagree
v[0] += duration_cast<milliseconds>(
now - t0_).count() / 250;
if (v[0] >= v[1])
{
if (iter->second.second != false)
{
iter->second.second = false;
return true;
}
}
else
{
if (iter->second.second != true)
{
iter->second.second = true;
return true;
}
}
return false;
}
};
//--------------------------------------------------------------------------
class Peer
{
private:
struct PosMsg
{
int id;
std::size_t seq;
bool value; // position
};
public:
int id_;
std::set<int> unl_;
Config const& config_;
boost::optional<Round> round_;
std::chrono::milliseconds delay_;
Network& net_;
Peer (int id, Config const& config,
Network& net)
: id_(id)
, config_ (config)
, delay_(std::chrono::milliseconds(
net.rand(5, 50)))
, net_(net)
{
auto const size = net_.rand(
nUNLMin, nUNLMax + 1);
while(unl_.size() < size)
{
unl_.insert(net_.rand(nPeer));
unl_.erase(id_);
}
}
// Called to begin the round
void
start()
{
round_.emplace(id_,
!(id_%3), net_.now());
++round_->count_;
PosMsg m;
m.id = id_;
std::tie(m.seq, m.value) =
round_->pos();
broadcast(m);
using namespace std::chrono;
net_.timer(milliseconds(
700 + net_.rand(700)),
[=]() { timer(); });
}
void
receive (Peer& from, PosMsg const& m)
{
if (round_->receive(m.id,
m.seq, m.value))
relay(from, m);
else
++net_.dup;
}
void
timer()
{
if (round_->update(unl_, net_.now()))
{
PosMsg m;
m.id = id_;
std::tie(m.seq, m.value) =
round_->pos();
broadcast(m);
}
if (round_->consensus_)
return;
using namespace std::chrono;
net_.timer(milliseconds(700),
[=]() { timer(); });
}
//----------------------------------------------------------------------
// Send a message to this peer
template <class Message>
void
send (Peer& from, Message&& m)
{
++net_.sent;
using namespace std::chrono;
net_.send (from, *this,
[&, m]() { receive(from, m); });
}
// Broadcast a message to all links
template <class Message>
void
broadcast (Message const& m)
{
for(auto& link : net_.links(*this))
link.to.send(*this, m);
}
// Relay a message to all links
template <class Message>
void
relay (Peer& from, Message const& m)
{
for(auto& link : net_.links(*this))
if (&link.to != &from)
link.to.send(*this, m);
}
};
//--------------------------------------------------------------------------
struct Network : BasicNetwork<Peer>
{
std::size_t dup = 0; // total dup
std::size_t sent = 0; // total sent
std::vector<Peer> pv;
Network (std::size_t seed,
Config const& config)
{
this->rng().seed(seed);
using namespace std;
using namespace std::chrono;
pv.reserve(nPeer);
for(std::size_t id = 0; id < nPeer; ++id)
pv.emplace_back(id, config, *this);
for(auto& peer : pv)
for(int i = 0; i < nDegree; ++i)
connect_one(peer);
}
// Add one random connection
void
connect_one(Peer& from)
{
using namespace std::chrono;
auto const delay = from.delay_ +
milliseconds(rand(5, 200));
for(;;)
if (connect(from,
pv[rand(pv.size())], delay))
break;
}
template <class Log>
void
report (std::chrono::milliseconds ms, Log& log)
{
std::array<std::size_t, 2> n;
std::vector<std::size_t> count;
n.fill(0);
std::size_t consensus = 0;
for(auto const& p : pv)
{
++n[p.round_->value()];
++nth(count, p.round_->count_);
if (p.round_->consensus_)
++consensus;
}
log <<
n[1] << "/" << n[0] << ", " <<
"consensus: " << consensus << " in " <<
ms.count() << "ms, " <<
"sent: " << sent << ", " <<
"dup: " << dup << ", " <<
"count: " << seq_string(count);
}
// Execute a round of consensus
template <class Log>
void
round (Log& log)
{
using namespace std::chrono;
for(int i = 0; i < nPeer; ++i)
pv[i].start();
auto const t0 = now();
#if 0
do
{
report(duration_cast<
milliseconds>(now() - t0), log);
}
while (step_for(milliseconds(50)));
#else
step();
#endif
report(duration_cast<
milliseconds>(now() - t0), log);
}
};
template <class Log>
static
void
run (Log& log)
{
log << "Sim1" << ":";
Config config;
for(auto i = 1; i <= nTrial; ++i)
{
Network net(i, config);
for(auto j = 1; j <= nRound; ++j)
net.round(log);
}
}
};
} // test
} // ripple
#endif

434
src/ripple/unl/tests/Sim2.h Normal file
View File

@@ -0,0 +1,434 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_UNL_SIM2_H_INCLUDED
#define RIPPLE_UNL_SIM2_H_INCLUDED
#include <ripple/unl/tests/BasicNetwork.h>
#include <beast/container/aged_unordered_map.h>
#include <boost/container/flat_set.hpp>
#include <boost/optional.hpp>
#include <algorithm>
#include <memory>
#include <sstream>
#include <unordered_map>
#include <vector>
namespace ripple {
namespace test {
template <class Log>
struct Sim2
{
struct Config
{
};
static int const nPeer = 100; // # of peers
static int const nDegree = 10; // outdegree
static int const nTrial = 1000000; // number of trials
static int const nRound = 1; // number of rounds
static int const nUNLMin = 20;
static int const nUNLMax = 30;
static int const nPos = 10;
using NodeKey = int; // identifies a consensus participant
using ItemKey = int; // identifies a ballot item
using ItemSet = boost::container::flat_set<ItemKey>;
using clock_type = std::chrono::system_clock;
struct Network;
struct PosMsg
{
NodeKey id;
std::size_t seq;
ItemSet items;
bool last;
};
// A round of consensus.
// Each round consists of a series of votes,
// terminating when a supermajority is reached.
class Round
{
private:
int thresh_ = 50;
public:
struct Pos
{
ItemSet items;
bool last = false;
std::size_t seq = 0;
};
int id_;
bool failed_ = false;
bool consensus_ = false;
std::unordered_map<NodeKey, Pos> pos_;
std::size_t count_ = 0;
clock_type::time_point t0_;
Log& log_;
public:
// Create a new round with initial position
Round (NodeKey id, ItemSet&& pos,
clock_type::time_point now, Log& log)
: id_ (id)
, t0_ (now)
, log_ (log)
{
using namespace std;
pos_[id].items = std::move(pos);
}
std::shared_ptr<PosMsg const>
posMsg()
{
auto const iter = pos_.find(id_);
auto m = std::make_shared<PosMsg>();
m->id = id_;
m->seq = ++iter->second.seq;
m->items = iter->second.items;
m->last = consensus_;
return m;
}
ItemSet const&
items() const
{
return pos_.find(id_)->second.items;
}
// Update a peer's position
// Return `true` if we should relay
bool
receive (PosMsg const& m)
{
if (m.id == id_)
return false;
using namespace std;
auto& pos = pos_[m.id];
if (m.seq <= pos.seq)
return false;
pos.seq = m.seq;
pos.last = m.last;
pos.items = m.items;
return true;
}
// Update our position
// Returns `true` if we changed our position
template <class UNL>
bool
update (UNL const& unl,
clock_type::time_point const& now)
{
if (consensus_)
return false;
// count votes per item from unl
boost::container::flat_map<
ItemKey, std::size_t> votes;
for(auto const& pos : pos_)
{
if (! unl.count(pos.first))
continue;
for(auto const& item : pos.second.items)
{
auto const result =
votes.emplace(item, 1);
if (! result.second)
++result.first->second;
}
}
// calculate our new position
ItemSet items;
{
auto const needed =
(thresh_ * unl.size() + 50) / 100;
for(auto const& v : votes)
if (v.second >= needed)
items.insert(v.first);
thresh_ += 5;
}
// see if we reached a consensus
std::size_t most = 0;
std::size_t agree = 0;
for(auto const& pos : pos_)
{
if (! unl.count(pos.first))
continue;
if (pos.second.items == items)
++agree;
else if (! pos.second.last)
++most;
}
//{
auto const needed =
(80 * unl.size() + 50) / 100;
if (agree >= needed)
{
consensus_ = true;
}
else if (agree + most < needed)
{
failed_ = true;
consensus_ = true;
}
//}
if (now.time_since_epoch() >=
std::chrono::seconds(7))
{
log_ <<
"agree = " << agree <<
", most = " << most <<
", needed = " << needed <<
", thresh_ = " << thresh_ <<
", items.size() = " << items.size();
}
auto const iter = pos_.find(id_);
if (! consensus_ &&
iter->second.items == items)
return false;
iter->second.items = items;
return true;
}
};
//--------------------------------------------------------------------------
class Peer
{
private:
//beast::aged_unordered_map<
public:
NodeKey id_;
std::set<NodeKey> unl_;
Config const& config_;
boost::optional<Round> round_;
std::chrono::milliseconds delay_;
Network& net_;
Peer (int id, Config const& config,
Network& net)
: id_ (id)
, config_ (config)
, delay_ (std::chrono::milliseconds(
net.rand(5, 50)))
, net_ (net)
{
auto const size = 1 + net_.rand(
nUNLMin, nUNLMax + 1);
unl_.insert(id_); // self
while(unl_.size() < size)
unl_.insert(net_.rand(nPeer));
}
// Called to begin the round
void
start()
{
{
ItemSet pos;
for(int i = 0; i < nPos; ++i)
if (net_.rand(2))
pos.insert(i);
round_.emplace(id_, std::move(pos),
net_.now(), net_.log);
}
broadcast(round_->posMsg());
using namespace std::chrono;
net_.timer(milliseconds(
700 + net_.rand(700)),
[=]() { timer(); });
}
void
receive (Peer& from,
std::shared_ptr<PosMsg const> const& m)
{
if (round_->receive(*m))
relay(from, m);
else
++net_.dup;
}
void
timer()
{
if (round_->update(unl_, net_.now()))
broadcast(round_->posMsg());
if (round_->consensus_)
return;
using namespace std::chrono;
net_.timer(milliseconds(700),
[=]() { timer(); });
}
//----------------------------------------------------------------------
// Send a message to this peer
template <class Message>
void
send (Peer& from,
std::shared_ptr<Message const> const& m)
{
++net_.sent;
net_.send (from, *this,
[&, m]() { receive(from, m); });
}
// Broadcast a message to all links
template <class Message>
void
broadcast (std::shared_ptr<
Message const> const& m)
{
for(auto& link : net_.links(*this))
link.to.send(*this, m);
}
// Relay a message to all links
template <class Message>
void
relay (Peer& from,
std::shared_ptr<Message const> const& m)
{
for(auto& link : net_.links(*this))
if (&link.to != &from)
link.to.send(*this, m);
}
};
//--------------------------------------------------------------------------
struct Network : BasicNetwork<Peer>
{
std::size_t dup = 0; // total dup
std::size_t sent = 0; // total sent
std::vector<Peer> pv;
Log& log;
Network (std::size_t seed,
Config const& config, Log& log_)
: log (log_)
{
this->rng.seed(seed);
using namespace std;
using namespace std::chrono;
pv.reserve(nPeer);
for(std::size_t id = 0; id < nPeer; ++id)
pv.emplace_back(id, config, *this);
for(auto& peer : pv)
for(int i = 0; i < nDegree; ++i)
connect_one(peer);
}
// Add one random connection
void
connect_one(Peer& from)
{
using namespace std::chrono;
auto const delay = from.delay_ +
milliseconds(this->rand(5, 200));
for(;;)
if (connect(from,
pv[this->rand(pv.size())], delay))
break;
}
void
report (std::size_t n,
std::chrono::milliseconds ms, Log& log)
{
std::size_t failed = 0;
std::size_t consensus = 0;
std::vector<std::size_t> hist;
hist.resize(nPos);
for(auto const& p : pv)
{
hist_accum(hist, p.round_->items());
if (p.round_->consensus_)
++consensus;
if (p.round_->failed_)
++failed;
}
log <<
((n > 0) ? "#" + std::to_string(n) + " " : "") <<
seq_string(hist, 3) << " " <<
"consensus: " << consensus - failed << " in " <<
ms.count() << "ms, " <<
"sent: " << sent << ", " <<
"dup: " << dup;
}
// Execute a round of consensus
void
round (std::size_t n)
{
using namespace std::chrono;
for(int i = 0; i < nPeer; ++i)
pv[i].start();
auto const t0 = this->now();
#if 0
do
{
report(0, duration_cast<
milliseconds>(now() - t0), log);
}
while (this->step_for(milliseconds(50)));
#else
this->step();
#endif
report(n, duration_cast<
milliseconds>(this->now() - t0), log);
}
};
static
void
run (Log& log)
{
log << "Sim2" << ":";
Config config;
for(auto i = 1; i <= nTrial; ++i)
{
//log << "Trial " << i;
Network net(i, config, log);
for(auto j = 1; j <= nRound; ++j)
net.round(i);
//log << "\n";
}
}
};
} // test
} // ripple
#endif
/*
Try limiting threshold to 80
Try slower increase of threshold
Increase UNL sizes
*/

512
src/ripple/unl/tests/Sim3.h Normal file
View File

@@ -0,0 +1,512 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_UNL_SIM3_H_INCLUDED
#define RIPPLE_UNL_SIM3_H_INCLUDED
#include <ripple/unl/tests/BasicNetwork.h>
#include <beast/container/aged_unordered_map.h>
#include <boost/container/flat_set.hpp>
#include <boost/optional.hpp>
#include <algorithm>
#include <memory>
#include <sstream>
#include <unordered_map>
#include <vector>
namespace ripple {
namespace test {
template <class Log>
struct Sim3
{
struct Config
{
int unl;
int peers = 100;
int trial = 100;
};
static int const nDegree = 10; // outdegree
static int const nItem = 10; // number of items
static int const nUpdateMS = 700;
using NodeKey = int; // identifies a consensus participant
using ItemKey = int; // identifies a ballot item
using ItemSet = boost::container::flat_set<ItemKey>;
using clock_type = std::chrono::system_clock;
using millis = std::chrono::milliseconds;
class Network;
struct PosMsg
{
NodeKey id;
std::size_t seq;
ItemSet items;
bool last;
};
// A round of consensus.
// Each round consists of a series of votes,
// terminating when a supermajority is reached.
class Round
{
private:
int thresh_ = 50;
public:
struct Pos
{
ItemSet items;
bool last = false;
std::size_t seq = 0;
};
NodeKey id_;
bool failed_ = false;
bool consensus_ = false;
std::unordered_map<NodeKey, Pos> pos_;
std::size_t count_ = 0;
clock_type::time_point t0_;
Log& log_;
public:
// Create a new round with initial position
Round (NodeKey id, ItemSet&& pos,
clock_type::time_point now, Log& log)
: id_ (id)
, t0_ (now)
, log_ (log)
{
using namespace std;
pos_[id].items = std::move(pos);
}
std::shared_ptr<PosMsg const>
posMsg()
{
auto const iter = pos_.find(id_);
auto m = std::make_shared<PosMsg>();
m->id = id_;
m->seq = ++iter->second.seq;
m->items = iter->second.items;
m->last = consensus_;
return m;
}
ItemSet const&
items() const
{
return pos_.find(id_)->second.items;
}
// Update a peer's position
// Return `true` if we should relay
bool
receive (PosMsg const& m)
{
if (m.id == id_)
return false;
auto& pos = pos_[m.id];
if (m.seq <= pos.seq)
return false;
pos.seq = m.seq;
pos.last = m.last;
pos.items = m.items;
return true;
}
// Update our position
// Returns `true` if we changed our position
template <class UNL>
bool
update (UNL const& unl,
clock_type::time_point const& now)
{
if (consensus_)
return false;
// count votes per item from unl
boost::container::flat_map<
ItemKey, std::size_t> votes;
for(auto const& pos : pos_)
{
if (! unl.count(pos.first))
continue;
for(auto const& item : pos.second.items)
{
auto const result =
votes.emplace(item, 1);
if (! result.second)
++result.first->second;
}
}
// calculate our new position
ItemSet items;
{
auto const needed =
(thresh_ * unl.size() + 50) / 100;
for(auto const& v : votes)
if (v.second >= needed)
items.insert(v.first);
#if 1
thresh_ += 5;
#endif
#if 0
// This causes occasional byzantine
// failure in a large number of nodes
if (thresh_ > 80)
thresh_ = 80;
#endif
}
// see if we reached a consensus
std::size_t most = 0;
std::size_t agree = 0;
for(auto const& pos : pos_)
{
if (! unl.count(pos.first))
continue;
if (pos.first == id_ ||
pos.second.items == items)
++agree;
else if (! pos.second.last)
++most;
}
{
auto const needed =
(80 * unl.size() + 50) / 100;
if (agree >= needed)
{
consensus_ = true;
}
else if (agree + most < needed)
{
failed_ = true;
consensus_ = true;
}
}
auto const iter = pos_.find(id_);
if (! consensus_ &&
iter->second.items == items)
return false;
iter->second.items = items;
return true;
}
};
//--------------------------------------------------------------------------
class Peer
{
private:
//beast::aged_unordered_map<
public:
NodeKey id_;
std::set<NodeKey> unl_;
Config const& config_;
boost::optional<Round> round_;
millis delay_;
Network& net_;
Peer (int id, Config const& config,
Network& net)
: id_ (id)
, config_ (config)
, delay_ (millis(
net.rand(5, 50)))
, net_ (net)
{
unl_.insert(id_); // self
while(unl_.size() <= config_.unl)
unl_.insert(net_.rand(config_.peers));
}
// Called to begin the round
void
start()
{
{
ItemSet pos;
for(int i = 0; i < nItem; ++i)
if (net_.rand(2))
pos.insert(i);
round_.emplace(id_, std::move(pos),
net_.now(), net_.log);
}
broadcast(round_->posMsg());
using namespace std::chrono;
net_.timer(milliseconds(
nUpdateMS + net_.rand(nUpdateMS)),
[=]() { timer(); });
}
void
receive (Peer& from,
std::shared_ptr<PosMsg const> const& m)
{
if (round_->receive(*m))
relay(from, m);
else
++net_.dup;
}
void
timer()
{
if (round_->update(unl_, net_.now()))
broadcast(round_->posMsg());
if (round_->consensus_)
return;
using namespace std::chrono;
net_.timer(milliseconds(nUpdateMS),
[=]() { timer(); });
}
//----------------------------------------------------------------------
// Send a message to this peer
template <class Message>
void
send (Peer& from,
std::shared_ptr<Message const> const& m)
{
++net_.sent;
net_.send (from, *this,
[&, m]() { receive(from, m); });
}
// Broadcast a message to all links
template <class Message>
void
broadcast (std::shared_ptr<
Message const> const& m)
{
for(auto& link : net_.links(*this))
link.to.send(*this, m);
}
// Relay a message to all links
template <class Message>
void
relay (Peer& from,
std::shared_ptr<Message const> const& m)
{
for(auto& link : net_.links(*this))
if (&link.to != &from)
link.to.send(*this, m);
}
};
//--------------------------------------------------------------------------
// The result of one round
struct Result
{
std::size_t elapsed;
std::size_t failure = 0;
std::size_t consensus = 0;
std::set<ItemSet> sets;
};
// The results of several rounds
struct Results
{
std::size_t rounds = 0;
std::size_t perfect = 0;
std::vector<std::size_t> elapsed;
std::vector<std::size_t> failure;
std::vector<std::size_t> consensus;
void
aggregate (Result const& result)
{
++rounds;
perfect += result.sets.size() == 1;
elapsed.push_back(result.elapsed);
failure.push_back(result.failure);
consensus.push_back(result.consensus);
}
};
struct Report
{
std::size_t perfect;
std::size_t elapsed_min;
std::size_t elapsed_max;
Report (Results& results, Config const& config)
{
perfect = results.perfect;
#if 0
std::sort(
results.elapsed.begin(), results.elapsed.end());
std::sort(
results.consensus.begin(), results.consensus.end(),
std::greater<std::size_t>{});
#endif
elapsed_min = results.elapsed.front();
elapsed_max = results.elapsed.back();
}
};
class Network : public BasicNetwork<Peer>
{
private:
Config const& config_;
public:
std::size_t dup = 0; // total dup
std::size_t sent = 0; // total sent
std::vector<Peer> pv;
Log& log;
Network (std::size_t seed,
Config const& config, Log& log_)
: config_ (config)
, log (log_)
{
this->rng.seed(seed);
using namespace std;
using namespace std::chrono;
pv.reserve(config.peers);
for(std::size_t id = 0; id < config_.peers; ++id)
pv.emplace_back(id, config, *this);
for(auto& peer : pv)
for(int i = 0; i < nDegree; ++i)
connect_one(peer);
}
// Add one random connection
void
connect_one(Peer& from)
{
using namespace std::chrono;
auto const delay = from.delay_ +
milliseconds(this->rand(5, 200));
for(;;)
if (connect(from,
pv[this->rand(pv.size())], delay))
break;
}
// Execute one round of consensus
Result
run()
{
Result result;
using namespace std::chrono;
for(int i = 0; i < config_.peers; ++i)
pv[i].start();
auto const t0 = this->now();
this->step();
result.elapsed = duration_cast<
millis>(this->now() - t0).count();
for(auto const& p : pv)
{
if (p.round_->failed_)
++result.failure;
if (p.round_->consensus_)
{
++result.consensus;
result.sets.insert(p.round_->items());
}
}
return result;
}
};
static
void
report (Log& log, Result const& result,
Config const& config)
{
log <<
result.elapsed << "\t" <<
result.failure << "\t" <<
result.consensus << "\t" <<
result.sets.size();
;
}
static
void
report (Log& log, Report const& report,
Config const& config)
{
log <<
report.perfect << "\t" <<
report.elapsed_min << "\t" <<
report.elapsed_max << "\t" <<
config.peers << "\t" <<
config.unl << "\t" <<
config.trial
;
}
static
void
run (Log& log)
{
log << "Sim3" << ":";
#if 1
log <<
"perfect\t" <<
"elapsed_min\t" <<
"elapsed_max\t" <<
"peers\t" <<
"unl\t" <<
"trial\t"
;
#else
log <<
"elapsed\t" <<
"failure\t" <<
"consensus\t" <<
"positions\t"
;
#endif
for (int unl = 40; unl > 5; --unl)
{
Results results;
Config config;
config.unl = unl;
for(auto i = 1; i <= config.trial; ++i)
{
Network net(i, config, log);
//report(log, net.run(), config);
results.aggregate(net.run());
}
report(log, Report(results, config), config);
}
}
};
} // test
} // ripple
#endif
/*
Try limiting threshold to 80
Try slower increase of threshold
Increase UNL sizes
*/

607
src/ripple/unl/tests/Sim4.h Normal file
View File

@@ -0,0 +1,607 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_UNL_SIM4_H_INCLUDED
#define RIPPLE_UNL_SIM4_H_INCLUDED
#include <ripple/unl/tests/BasicNetwork.h>
#include <beast/container/aged_unordered_map.h>
#include <boost/container/flat_set.hpp>
#include <boost/optional.hpp>
#include <algorithm>
#include <memory>
#include <sstream>
#include <unordered_set>
#include <unordered_map>
#include <vector>
namespace ripple {
namespace test {
template <class Log>
struct Sim4
{
struct Config
{
int unl = 20;
int peers = 100;
int trials = 100;
int rounds = 1;
};
static int const nDegree = 10; // outdegree
static int const nItem = 10; // number of items
enum
{
nUpdateMS = 700
};
using NodeKey = int; // identifies a consensus participant
using ItemKey = int; // identifies a ballot item
using ItemSet = boost::container::flat_set<ItemKey>;
using clock_type = std::chrono::system_clock;
using millis = std::chrono::milliseconds;
struct Network;
struct TxMsg
{
ItemKey id;
};
struct PosMsg
{
NodeKey id;
std::size_t ord;
std::size_t seq;
ItemSet items;
bool last;
};
// A pool of items
// This is the equivalent of the "open ledger"
class Pool
{
private:
ItemSet items_;
public:
// Insert an item into the pool
void
insert (ItemKey id)
{
items_.insert(id);
}
// Returns the items in the pool
ItemSet const&
items() const
{
return items_;
}
};
// A round of consensus.
// Each round consists of a series of votes,
// terminating when a supermajority is reached.
struct Round
{
struct Pos
{
ItemSet items;
bool last = false;
std::size_t seq = 0;
};
int id_;
Log& log_;
std::size_t ord_;
clock_type::time_point t0_;
int thresh_ = 50;
bool failed_ = false;
bool consensus_ = false;
std::size_t count_ = 0;
std::unordered_map<NodeKey, Pos> pos_;
// Create a new round with initial position
Round (NodeKey id, std::size_t ord,
ItemSet const& items,
clock_type::time_point now,
Log& log)
: id_ (id)
, log_ (log)
, ord_ (ord)
, t0_ (now)
{
using namespace std;
pos_[id].items = items;
}
std::shared_ptr<PosMsg const>
posMsg()
{
auto const iter = pos_.find(id_);
auto m = std::make_shared<PosMsg>();
m->id = id_;
m->seq = ++iter->second.seq;
m->items = iter->second.items;
m->last = consensus_;
return m;
}
ItemSet const&
items() const
{
return pos_.find(id_)->second.items;
}
// Update a peer's position
// Return `true` if we should relay
bool
receive (PosMsg const& m)
{
if (m.id == id_)
return false;
using namespace std;
auto& pos = pos_[m.id];
if (m.seq <= pos.seq)
return false;
pos.seq = m.seq;
pos.last = m.last;
pos.items = m.items;
return true;
}
// Update our position
// Returns `true` if we changed our position
template <class UNL>
bool
update (UNL const& unl,
clock_type::time_point const& now)
{
if (consensus_)
return false;
// count votes per item from unl
boost::container::flat_map<
ItemKey, std::size_t> votes;
for(auto const& pos : pos_)
{
if (! unl.count(pos.first))
continue;
for(auto const& item : pos.second.items)
{
auto const result =
votes.emplace(item, 1);
if (! result.second)
++result.first->second;
}
}
// calculate our new position
ItemSet items;
{
auto const needed =
(thresh_ * unl.size() + 50) / 100;
for(auto const& v : votes)
if (v.second >= needed)
items.insert(v.first);
thresh_ += 5;
}
// see if we reached a consensus
std::size_t most = 0;
std::size_t agree = 0;
for(auto const& pos : pos_)
{
if (! unl.count(pos.first))
continue;
if (pos.first == id_ ||
pos.second.items == items)
++agree;
else if (! pos.second.last)
++most;
}
{
auto const needed =
(80 * unl.size() + 50) / 100;
if (agree >= needed)
{
consensus_ = true;
}
else if (agree + most < needed)
{
failed_ = true;
consensus_ = true;
}
}
auto const iter = pos_.find(id_);
if (! consensus_ &&
iter->second.items == items)
return false;
iter->second.items = items;
return true;
}
};
//--------------------------------------------------------------------------
struct Peer
{
//beast::aged_unordered_map<
NodeKey id_;
std::size_t ord_ = 0;
std::set<NodeKey> unl_;
Config const& config_;
boost::optional<Round> round_;
millis delay_;
Network& net_;
Pool pool_;
std::unordered_map<ItemKey,
boost::container::flat_set<Peer*>> item_tab_;
Peer (int id, Config const& config,
Network& net)
: id_ (id)
, config_ (config)
, delay_ (millis(
net.rand(5, 50)))
, net_ (net)
{
unl_.insert(id_); // self
while(unl_.size() <= config_.unl)
unl_.insert(net_.rand(config_.peers));
}
void
init()
{
net_.timer(millis(2000),
[&]() { on_close(); });
}
// Broadcast a new item
void
inject (ItemKey id)
{
item_tab_[id].insert(this);
TxMsg m;
m.id = id;
broadcast(m);
}
// Closes the pool and starts the round
void
on_close()
{
round_.emplace(id_, ++ord_, pool_.items(),
net_.now(), net_.log);
broadcast(round_->posMsg());
net_.timer(millis(
nUpdateMS + net_.rand(nUpdateMS)),
[=]() { on_update(); });
}
// Updates our position during the round
void
on_update()
{
if (round_->update(unl_, net_.now()))
broadcast(round_->posMsg());
if (round_->consensus_)
return;
using namespace std::chrono;
net_.timer(millis(nUpdateMS),
[=]() { on_update(); });
}
// Called when a transaction is received
void
receive (Peer& from, TxMsg const& m)
{
auto& seen = item_tab_[m.id];
if(! seen.empty())
{
++net_.dup;
return;
}
seen.insert(&from);
net_.timer(net_.now() +
millis(net_.rand(200, 600)),
[&, m]()
{
pool_.insert(m.id);
for(auto& link : net_.links(*this))
if (seen.count(&link.to) == 0)
link.to.send(*this, m);
});
}
// Called when a position is received
void
receive (Peer& from,
std::shared_ptr<PosMsg const> const& m)
{
if (round_->receive(*m))
relay(from, m);
else
++net_.dup;
}
//----------------------------------------------------------------------
// Send a message to this peer
template <class Message>
void
send (Peer& from, Message const& m)
{
++net_.sent;
net_.send (from, *this,
[&, m]() { receive(from, m); });
}
// Send a message to this peer
template <class Message>
void
send (Peer& from,
std::shared_ptr<Message const> const& m)
{
++net_.sent;
net_.send (from, *this,
[&, m]() { receive(from, m); });
}
// Broadcast a message to all links
template <class Message>
void
broadcast (std::shared_ptr<
Message const> const& m)
{
for(auto& link : net_.links(*this))
link.to.send(*this, m);
}
// Broadcast a message to all links
template <class Message>
void
broadcast (Message const& m)
{
for(auto& link : net_.links(*this))
link.to.send(*this, m);
}
// Relay a message to all links
template <class Message>
void
relay (Peer& from,
std::shared_ptr<Message const> const& m)
{
for(auto& link : net_.links(*this))
if (&link.to != &from)
link.to.send(*this, m);
}
// Relay a message to all links
template <class Message>
void
relay (Peer& from, Message const& m)
{
for(auto& link : net_.links(*this))
if (&link.to != &from)
link.to.send(*this, m);
}
};
//--------------------------------------------------------------------------
// The result of one round
struct Result
{
std::size_t elapsed;
std::size_t failure = 0;
std::size_t consensus = 0;
std::set<ItemSet> sets;
};
// The results of several rounds
struct Results
{
std::size_t rounds = 0;
std::size_t perfect = 0;
std::vector<std::size_t> elapsed;
std::vector<std::size_t> failure;
std::vector<std::size_t> consensus;
void
aggregate (Result const& result)
{
++rounds;
perfect += result.sets.size() == 1;
elapsed.push_back(result.elapsed);
failure.push_back(result.failure);
consensus.push_back(result.consensus);
}
};
struct Report
{
std::size_t perfect;
std::size_t elapsed_min;
std::size_t elapsed_max;
Report (Results& results, Config const& config)
{
perfect = results.perfect;
#if 0
std::sort(
results.elapsed.begin(), results.elapsed.end());
std::sort(
results.consensus.begin(), results.consensus.end(),
std::greater<std::size_t>{});
#endif
elapsed_min = results.elapsed.front();
elapsed_max = results.elapsed.back();
}
};
class Network : public BasicNetwork<Peer>
{
private:
Config const& config_;
ItemKey seq_ = 0;
public:
std::size_t dup = 0; // total dup
std::size_t sent = 0; // total sent
std::vector<Peer> pv;
Log& log;
Network (std::size_t seed,
Config const& config, Log& log_)
: config_ (config)
, log (log_)
{
this->rng().seed(seed);
using namespace std;
using namespace std::chrono;
pv.reserve(config.peers);
for(std::size_t id = 0; id < config_.peers; ++id)
pv.emplace_back(id, config, *this);
for(auto& peer : pv)
for(int i = 0; i < nDegree; ++i)
connect_one(peer);
}
// Add one random connection
void
connect_one(Peer& from)
{
using namespace std::chrono;
auto const delay = from.delay_ +
milliseconds(this->rand(5, 200));
for(;;)
if (this->connect(from,
pv[this->rand(pv.size())], delay))
break;
}
void
report (std::size_t n,
millis ms, Log& log)
{
std::size_t failed = 0;
std::size_t consensus = 0;
std::set<ItemSet> unique;
for(auto const& p : pv)
{
if (! p.round_)
continue;
unique.insert(p.round_->items());
if (p.round_->consensus_)
++consensus;
if (p.round_->failed_)
++failed;
}
log <<
n << "\t" <<
unique.size() << "\t" <<
consensus << "\t" <<
failed << "\t" <<
ms.count() << "ms\t" <<
sent << "\t" <<
dup;
}
// Inject a random item
void
inject()
{
pv[this->rand(pv.size())].inject(++seq_);
}
void
on_timer()
{
inject();
if(this->now().time_since_epoch() <=
std::chrono::seconds(4))
this->timer(millis(250),
[&]() { on_timer(); });
}
// Execute a round of consensus
void
run (std::size_t n)
{
using namespace std::chrono;
for(int i = 0; i < config_.peers; ++i)
pv[i].init();
inject();
this->timer(millis(250),
[&]() { on_timer(); });
auto const t0 = this->now();
#if 0
do
{
report(n, duration_cast<
milliseconds>(now() - t0), log);
}
while (this->step_for(milliseconds(50)));
#else
this->step();
#endif
report(n, duration_cast<
milliseconds>(this->now() - t0), log);
}
};
static
void
run (Log& log)
{
log << "Sim4" << ":";
log <<
"n\t" <<
"unique\t" <<
"consensus\t" <<
"failed\t" <<
"time\t" <<
"sent\t" <<
"dup";
Config config;
for(auto i = 1; i <= config.trials; ++i)
{
Network net(i, config, log);
for(auto j = 1; j <= config.rounds; ++j)
net.run(i);
}
}
};
} // test
} // ripple
#endif
/*
Try limiting threshold to 80
Try slower increase of threshold
Increase UNL sizes
*/

View File

@@ -19,6 +19,7 @@
#include <BeastConfig.h>
#include <ripple/unl/tests/BasicNetwork.h>
#include <ripple/unl/tests/metrics.h>
#include <beast/unit_test/suite.h>
#include <boost/container/flat_set.hpp>
#include <algorithm>
@@ -43,29 +44,8 @@ struct Config
static int const nTrustedUplinks = 3; // # of uplinks for trusted
static int const nAllowedUplinks = 1; // # of uplinks for allowed
struct Peer
{
struct ValMsg
{
int id;
int seq;
ValMsg (int id_, int seq_)
: id(id_), seq(seq_) { }
};
struct SquelchMsg
{
int id;
SquelchMsg (int id_)
: id(id_) { }
};
struct UnsquelchMsg
{
int id;
UnsquelchMsg (int id_)
: id(id_) { }
};
class Network;
class Peer;
struct Policy
{
@@ -79,10 +59,10 @@ struct Config
std::unordered_map<int, Slot> slots;
// Returns a slot or nullptr
template <class Peers>
template <class Links>
Slot*
get (int id, Peer& from,
Peers const& peers)
Links const& links)
{
auto iter = slots.find(id);
if (iter != slots.end())
@@ -93,9 +73,9 @@ struct Config
if (id > nTrusted)
allowed.insert(id);
auto& slot = slots[id];
for(auto& peer : peers)
if (&peer != &from)
slot.down.insert(&peer);
for(auto& link : links)
if (&link.to != &from)
slot.down.insert(&link.to);
return &slot;
}
@@ -145,153 +125,170 @@ struct Config
}
};
//--------------------------------------------------------------------------
class Peer
{
private:
Network& net_;
public:
struct ValMsg
{
int id;
int seq;
ValMsg (int id_, int seq_)
: id(id_), seq(seq_) { }
};
struct SquelchMsg
{
int id;
SquelchMsg (int id_)
: id(id_) { }
};
struct UnsquelchMsg
{
int id;
UnsquelchMsg (int id_)
: id(id_) { }
};
int id = 0; // validator id or 0
int seq = 0;
Policy policy;
std::map<int, int> seen;
std::chrono::milliseconds delay;
Peer (int id_, Network& net)
: net_ (net)
, id (id_)
, delay (std::chrono::milliseconds(
net.rand(5, 50)))
{
}
// Called when a peer disconnects
template <class Net>
void
disconnect (Net& net, Peer& from)
disconnect (Peer& from)
{
std::vector<int> v;
for(auto const& item : policy.slots)
if (item.second.up.count(&from) > 0)
v.push_back(item.first);
for(auto id : v)
for(auto& peer : net.peers(*this))
peer.send(net, *this,
for(auto& link : net_.links(*this))
link.to.send(*this,
UnsquelchMsg{ id });
}
// Send a message to this peer
template <class Net, class Message>
void
send (Net& net, Peer& from, Message&& m)
{
++net.sent;
using namespace std::chrono;
net.send (from, *this,
std::forward<Message>(m));
}
// Relay a message to all links
template <class Net, class Message>
void
relay (Net& net, Message&& m)
{
for(auto& peer : net.peers(*this))
peer.send(net, *this,
std::forward<Message>(m));
}
// Broadcast a validation
template <class Net>
void
broadcast (Net& net)
broadcast()
{
relay(net, ValMsg{ id, ++seq });
broadcast(ValMsg{ id, ++seq });
}
// Receive a validation
template <class Net>
void
receive (Net& net, Peer& from, ValMsg const& m)
receive (Peer& from, ValMsg const& m)
{
if (m.id == id)
{
++nth(net.dup, m.id - 1);
return from.send(net, *this,
++nth(net_.dup, m.id - 1);
return from.send(*this,
SquelchMsg(m.id));
}
auto slot = policy.get(
m.id, from, net.peers(*this));
m.id, from, net_.links(*this));
if (! slot || ! policy.uplink(
m.id, from, *slot))
return from.send(net, *this,
return from.send(*this,
SquelchMsg(m.id));
auto& last = seen[m.id];
if (last >= m.seq)
{
++nth(net.dup, m.id - 1);
++nth(net_.dup, m.id - 1);
return;
}
last = m.seq;
policy.heard(m.id, m.seq);
++nth(net.heard, m.id - 1);
++nth(net_.heard, m.id - 1);
for(auto peer : slot->down)
peer->send(net, *this, m);
peer->send(*this, m);
}
// Receive a squelch message
template <class Net>
void
receive (Net& net, Peer& from, SquelchMsg const& m)
receive (Peer& from, SquelchMsg const& m)
{
policy.squelch (m.id, from);
}
// Receive an unsquelch message
template <class Net>
void
receive (Net& net, Peer& from, UnsquelchMsg const& m)
receive (Peer& from, UnsquelchMsg const& m)
{
policy.unsquelch (m.id, from);
}
//----------------------------------------------------------------------
// Send a message to this peer
template <class Message>
void
send (Peer& from, Message&& m)
{
++net_.sent;
using namespace std::chrono;
net_.send (from, *this,
[&, m]() { receive(from, m); });
}
// Broadcast a message to all links
template <class Message>
void
broadcast (Message const& m)
{
for(auto& link : net_.links(*this))
link.to.send(*this, m);
}
};
//--------------------------------------------------------------------------
struct Network
: BasicNetwork<Peer, Network>
class Network : public BasicNetwork<Peer>
{
public:
std::size_t sent = 0;
std::vector<Peer> pv;
std::mt19937_64 rng;
std::vector<std::size_t> heard;
std::vector<std::size_t> dup;
Network()
{
pv.resize(nPeer);
for (int i = 0; i < nValidator; ++i)
pv[i].id = i + 1;
for (auto& peer : pv)
{
using namespace std;
using namespace std::chrono;
peer.delay =
milliseconds(rand(5, 45));
pv.reserve(nPeer);
for(std::size_t id = 1; id <=nPeer; ++id)
pv.emplace_back(
id <= nValidator ? id : 0, *this);
for (auto& peer : pv)
for (auto i = 0; i < nDegree; ++i)
connect_one(peer);
}
}
// Return int in range [0, n)
std::size_t
rand (std::size_t n)
{
return std::uniform_int_distribution<
std::size_t>(0, n - 1)(rng);
}
// Return int in range [base, base+n)
std::size_t
rand (std::size_t base, std::size_t n)
{
return std::uniform_int_distribution<
std::size_t>(base, base + n - 1)(rng);
}
// Add one random connection
void
connect_one (Peer& peer)
connect_one(Peer& from)
{
using namespace std::chrono;
auto const delay = from.delay +
milliseconds(rand(5, 200));
for(;;)
if (connect(peer, pv[rand(pv.size())],
peer.delay + milliseconds(rand(5, 200))))
if (connect(from,
pv[rand(pv.size())], delay))
break;
}
@@ -303,9 +300,8 @@ struct Config
auto const link = links(peer)[
rand(links(peer).size())];
link.disconnect();
link.to.disconnect(*this, peer);
peer.disconnect(*this, link.to);
link.to.disconnect(*this, peer);
link.to.disconnect(peer);
peer.disconnect(link.to);
// preserve outbound counts, otherwise
// the outdegree invariant will break.
if (link.inbound)
@@ -326,14 +322,14 @@ struct Config
// Iterate the network
template <class Log>
void
step (Log& log)
run (Log& log)
{
for (int i = nStep; i--;)
{
churn();
for(int j = 0; j < nValidator; ++j)
pv[j].broadcast(*this);
run();
pv[j].broadcast();
step();
}
}
};
@@ -351,12 +347,12 @@ public:
using Peer = Config::Peer;
using Network = Config::Network;
Network net;
net.step(log);
net.run(log);
std::size_t reach = 0;
std::vector<int> dist;
std::vector<int> degree;
net.bfs(net.pv[0],
[&](Network& net, std::size_t d, Peer& peer)
[&](std::size_t d, Peer& peer)
{
++reach;
++nth(dist, d);

View File

@@ -0,0 +1,100 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SIM_METRICS_H_INCLUDED
#define RIPPLE_SIM_METRICS_H_INCLUDED
#include <iomanip>
#include <sstream>
#include <string>
namespace ripple {
namespace test {
template <class FwdRange>
std::string
seq_string (FwdRange const& r, int width = 0)
{
std::stringstream ss;
auto iter = std::begin(r);
if (iter == std::end(r))
return ss.str();
ss << std::setw(width) << *iter++;
while(iter != std::end(r))
ss << ", " <<
std::setw(width) << *iter++;
return ss.str();
}
template <class FwdRange>
typename FwdRange::value_type
seq_sum (FwdRange const& r)
{
typename FwdRange::value_type sum = 0;
for (auto const& n : r)
sum += n;
return sum;
}
template <class RanRange>
double
diameter (RanRange const& r)
{
if (r.empty())
return 0;
if (r.size() == 1)
return r.front();
auto h0 = *(r.end() - 2);
auto h1 = r.back();
return (r.size() - 2) +
double(h1) / (h0 + h1);
}
template <class Container>
typename Container::value_type&
nth (Container& c, std::size_t n)
{
c.resize(std::max(c.size(), n + 1));
return c[n];
}
template <class Hist, class FwdRange>
void
hist_accum (Hist& h, FwdRange const& r)
{
for(auto const& v : r)
++nth(h, v);
}
//------------------------------------------------------------------------------
template <class = void>
inline
std::string
pad (std::string s, std::size_t n)
{
if (s.size() < n)
s.insert(0, n - s.size(), ' ');
return s;
}
} // test
} // ripple
#endif

View File

@@ -0,0 +1,382 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SIM_QALLOC_H_INCLUDED
#define RIPPLE_SIM_QALLOC_H_INCLUDED
#include <boost/intrusive/list.hpp>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <limits>
#include <memory>
#include <new>
#include <sstream>
namespace ripple {
namespace test {
namespace detail {
template <class = void>
class qalloc_impl
{
private:
class block
{
private:
std::size_t count_ = 0;
std::size_t bytes_;
std::size_t remain_;
std::uint8_t* free_;
public:
block* next;
block (block const&) = delete;
block& operator= (block const&) = delete;
explicit
block (std::size_t bytes);
void*
allocate (std::size_t bytes, std::size_t align);
bool
deallocate();
};
block* used_ = nullptr;
block* free_ = nullptr;
public:
enum
{
block_size = 256 * 1024
};
qalloc_impl() = default;
qalloc_impl (qalloc_impl const&) = delete;
qalloc_impl& operator= (qalloc_impl const&) = delete;
~qalloc_impl();
void*
allocate (std::size_t bytes, std::size_t align);
void
deallocate (void* p);
};
} // detail
template <class T>
class qalloc_type
{
private:
template <class U>
friend class qalloc_type;
std::shared_ptr<
detail::qalloc_impl<>> impl_;
public:
using value_type = T;
using pointer = T*;
using const_pointer = T const*;
using reference = typename
std::add_lvalue_reference<T>::type;
using const_reference = typename
std::add_lvalue_reference<T const>::type;
template <class U>
struct rebind
{
using other = qalloc_type<U>;
};
qalloc_type (qalloc_type const&) = default;
qalloc_type& operator= (qalloc_type const&) = default;
#ifndef _MSC_VER
qalloc_type (qalloc_type&& other) = default;
qalloc_type& operator= (qalloc_type&&) = default;
#endif
qalloc_type();
template <class U>
qalloc_type (qalloc_type<U> const& u);
template <class U>
U*
alloc (std::size_t n);
template <class U>
void
dealloc (U* p, std::size_t n);
T*
allocate (std::size_t n);
void
deallocate (T* p, std::size_t n);
void
destroy (T* t);
template <class U>
bool
operator== (qalloc_type<U> const& u);
template <class U>
bool
operator!= (qalloc_type<U> const& u);
};
/** Allocator optimized for delete in temporal order.
This allocator is optimized for the case where objects
are deleted in approximately the same order that they
were created.
Thread Safety:
May not be called concurrently.
*/
using qalloc = qalloc_type<int>;
//------------------------------------------------------------------------------
namespace detail {
template <class _>
qalloc_impl<_>::block::block (std::size_t bytes)
: bytes_ (bytes - sizeof(*this))
, remain_ (bytes_)
, free_ (reinterpret_cast<
std::uint8_t*>(this + 1))
{
}
template <class _>
void*
qalloc_impl<_>::block::allocate(
std::size_t bytes, std::size_t align)
{
align = std::max(align,
std::alignment_of<block*>::value);
auto pad = [](void const* p, std::size_t align)
{
auto const i = reinterpret_cast<
std::uintptr_t>(p);
return (align - (i % align)) % align;
};
auto const n0 =
pad(free_ + sizeof(block*), align);
auto const n1 =
n0 + sizeof(block*) + bytes;
if (remain_ < n1)
return nullptr;
auto p = reinterpret_cast<block**>(
free_ + n0 + sizeof(block*));
assert(pad(p - 1,
std::alignment_of<block*>::value) == 0);
p[-1] = this;
++count_;
free_ += n1;
remain_ -= n1;
return p;
}
template <class _>
bool
qalloc_impl<_>::block::deallocate()
{
--count_;
if (count_ > 0)
return false;
remain_ = bytes_;
free_ = reinterpret_cast<
std::uint8_t*>(this + 1);
return true;
}
template <class _>
qalloc_impl<_>::~qalloc_impl()
{
if (used_)
{
used_->~block();
std::free(used_);
}
while (free_)
{
auto const next = free_->next;
free_->~block();
std::free(free_);
free_ = next;
}
}
template <class _>
void*
qalloc_impl<_>::allocate(
std::size_t bytes, std::size_t align)
{
if (used_)
{
auto const p =
used_->allocate(bytes, align);
if (p)
return p;
used_ = nullptr;
}
if (free_)
{
auto const p =
free_->allocate(bytes, align);
if (p)
{
used_ = free_;
free_ = free_->next;
return p;
}
}
std::size_t const adj_align =
std::max(align, std::alignment_of<block*>::value);
std::size_t const min_alloc = // align up
((sizeof (block) + sizeof (block*) + bytes) + (adj_align - 1)) &
~(adj_align - 1);
auto const n = std::max<std::size_t>(block_size, min_alloc);
block* const b =
new(std::malloc(n)) block(n);
if (! b)
throw std::bad_alloc();
used_ = b;
// VFALCO This has to succeed
return used_->allocate(bytes, align);
}
template <class _>
void
qalloc_impl<_>::deallocate (void* p)
{
auto const b =
reinterpret_cast<block**>(p)[-1];
if (b->deallocate())
{
if (used_ == b)
used_ = nullptr;
b->next = free_;
free_ = b;
}
}
} // detail
//------------------------------------------------------------------------------
template <class T>
qalloc_type<T>::qalloc_type()
: impl_ (std::make_shared<
detail::qalloc_impl<>>())
{
}
template <class T>
template <class U>
qalloc_type<T>::qalloc_type(
qalloc_type<U> const& u)
: impl_ (u.impl_)
{
}
template <class T>
template <class U>
U*
qalloc_type<T>::alloc (std::size_t n)
{
if (n > std::numeric_limits<
std::size_t>::max() / sizeof(U))
throw std::bad_alloc();
auto const bytes = n * sizeof(U);
return static_cast<U*>(
impl_->allocate(bytes,
std::alignment_of<U>::value));
}
template <class T>
template <class U>
inline
void
qalloc_type<T>::dealloc(
U* p, std::size_t n)
{
impl_->deallocate(p);
}
template <class T>
T*
qalloc_type<T>::allocate (std::size_t n)
{
return alloc<T>(n);
}
template <class T>
inline
void
qalloc_type<T>::deallocate(
T* p, std::size_t n)
{
dealloc(p, n);
}
template <class T>
inline
void
qalloc_type<T>::destroy (T* t)
{
t->~T();
}
template <class T>
template <class U>
inline
bool
qalloc_type<T>::operator==(
qalloc_type<U> const& u)
{
return impl_.get() == u.impl_.get();
}
template <class T>
template <class U>
inline
bool
qalloc_type<T>::operator!=(
qalloc_type<U> const& u)
{
return ! (*this == u);
}
} // test
} // ripple
#endif