Move SNTPClient to core:

* Refactor SNTPClient
* asio best practices
* Not derived from Stoppable
This commit is contained in:
Vinnie Falco
2015-07-15 13:33:45 -07:00
parent 8d1b169f5a
commit e82d774d32
8 changed files with 451 additions and 403 deletions

View File

@@ -2071,6 +2071,14 @@
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='debug.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='release.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<ClCompile Include="..\..\src\ripple\core\impl\SNTPClient.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='debug.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='release.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<ClInclude Include="..\..\src\ripple\core\impl\SNTPClient.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\core\impl\SociDB.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
@@ -2405,10 +2413,6 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\net\impl\SNTPClient.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\net\InfoSub.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\RPCCall.h">
@@ -2417,8 +2421,6 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\RPCSub.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\SNTPClient.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\nodestore\Backend.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\nodestore\backend\MemoryFactory.cpp">

View File

@@ -2805,6 +2805,12 @@
<ClCompile Include="..\..\src\ripple\core\impl\LoadMonitor.cpp">
<Filter>ripple\core\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\core\impl\SNTPClient.cpp">
<Filter>ripple\core\impl</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\core\impl\SNTPClient.h">
<Filter>ripple\core\impl</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\core\impl\SociDB.cpp">
<Filter>ripple\core\impl</Filter>
</ClCompile>
@@ -3132,9 +3138,6 @@
<ClCompile Include="..\..\src\ripple\net\impl\RPCSub.cpp">
<Filter>ripple\net\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\net\impl\SNTPClient.cpp">
<Filter>ripple\net\impl</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\net\InfoSub.h">
<Filter>ripple\net</Filter>
</ClInclude>
@@ -3147,9 +3150,6 @@
<ClInclude Include="..\..\src\ripple\net\RPCSub.h">
<Filter>ripple\net</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\net\SNTPClient.h">
<Filter>ripple\net</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\nodestore\Backend.h">
<Filter>ripple\nodestore</Filter>
</ClInclude>

View File

@@ -53,7 +53,7 @@
#include <ripple/core/LoadFeeTrack.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/ledger/CachedSLEs.h>
#include <ripple/net/SNTPClient.h>
#include <ripple/core/impl/SNTPClient.h>
#include <ripple/nodestore/Database.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/Manager.h>
@@ -403,7 +403,8 @@ public:
, serverHandler_ (make_ServerHandler (*m_networkOPs, get_io_service (),
*m_jobQueue, *m_networkOPs, *m_resourceManager, *m_collectorManager))
, m_sntpClient (SNTPClient::New (*this))
, m_sntpClient (make_SNTPClient(
deprecatedLogs().journal("SNTPClient")))
, m_validators (Validators::make_Manager(*this, get_io_service(),
m_logs.journal("UVL"), getConfig ()))

View File

@@ -0,0 +1,423 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/ThreadName.h>
#include <ripple/crypto/RandomNumbers.h>
#include <ripple/core/impl/SNTPClient.h>
#include <beast/asio/placeholders.h>
#include <beast/threads/Thread.h>
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <map>
#include <beast/cxx14/memory.h> // <memory>
#include <mutex>
#include <thread>
namespace ripple {
// #define SNTP_DEBUG
static uint8_t SNTPQueryData[48] =
{ 0x1B, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
// NTP query frequency - 4 minutes
#define NTP_QUERY_FREQUENCY (4 * 60)
// NTP minimum interval to query same servers - 3 minutes
#define NTP_MIN_QUERY (3 * 60)
// NTP sample window (should be odd)
#define NTP_SAMPLE_WINDOW 9
// NTP timestamp constant
#define NTP_UNIX_OFFSET 0x83AA7E80
// NTP timestamp validity
#define NTP_TIMESTAMP_VALID ((NTP_QUERY_FREQUENCY + NTP_MIN_QUERY) * 2)
// SNTP packet offsets
#define NTP_OFF_INFO 0
#define NTP_OFF_ROOTDELAY 1
#define NTP_OFF_ROOTDISP 2
#define NTP_OFF_REFERENCEID 3
#define NTP_OFF_REFTS_INT 4
#define NTP_OFF_REFTS_FRAC 5
#define NTP_OFF_ORGTS_INT 6
#define NTP_OFF_ORGTS_FRAC 7
#define NTP_OFF_RECVTS_INT 8
#define NTP_OFF_RECVTS_FRAC 9
#define NTP_OFF_XMITTS_INT 10
#define NTP_OFF_XMITTS_FRAC 11
class SNTPClientImp
: public SNTPClient
{
private:
struct Query
{
bool replied;
time_t sent; // VFALCO time_t, really?
std::uint32_t nonce;
Query (time_t j = (time_t) -1)
: replied (false)
, sent (j)
{
}
};
beast::Journal j_;
std::mutex mutex_;
std::thread thread_;
boost::asio::io_service io_service_;
boost::optional<
boost::asio::io_service::work> work_;
std::map <boost::asio::ip::udp::endpoint, Query> queries_;
boost::asio::ip::udp::socket socket_;
boost::asio::deadline_timer timer_;
boost::asio::ip::udp::resolver resolver_;
std::vector<std::pair<std::string, time_t>> servers_;
int offset_;
time_t lastUpdate_;
std::deque<int> offsets_;
std::vector<uint8_t> buf_;
boost::asio::ip::udp::endpoint ep_;
public:
using error_code = boost::system::error_code;
explicit
SNTPClientImp (beast::Journal j)
: j_ (j)
, work_(io_service_)
, socket_ (io_service_)
, timer_ (io_service_)
, resolver_ (io_service_)
, offset_ (0)
, lastUpdate_ ((time_t) -1)
, buf_ (256)
{
using namespace boost::asio;
socket_.open (ip::udp::v4 ());
socket_.async_receive_from (buffer (buf_, 256),
ep_, std::bind(
&SNTPClientImp::onRead, this,
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred));
timer_.expires_from_now(
boost::posix_time::seconds(NTP_QUERY_FREQUENCY));
timer_.async_wait(std::bind(
&SNTPClientImp::onTimer, this,
beast::asio::placeholders::error));
thread_ = std::thread(&SNTPClientImp::run, this);
}
~SNTPClientImp ()
{
error_code ec;
timer_.cancel(ec);
socket_.cancel(ec);
work_ = boost::none;
thread_.join();
}
void run ()
{
setCallingThreadName("SNTPClient");
io_service_.run();
}
//--------------------------------------------------------------------------
void
onTimer (error_code const& ec)
{
using namespace boost::asio;
if (ec == error::operation_aborted)
return;
if (ec)
{
JLOG(j_.error) <<
"SNTPClient::onTimer: " << ec.message();
return;
}
doQuery ();
timer_.expires_from_now(
boost::posix_time::seconds (NTP_QUERY_FREQUENCY));
timer_.async_wait(std::bind(
&SNTPClientImp::onTimer, this,
beast::asio::placeholders::error));
}
void
onRead (error_code const& ec, std::size_t bytes_xferd)
{
using namespace boost::asio;
if (ec == error::operation_aborted)
return;
// VFALCO Should we return on any error?
/*
if (ec)
return;
*/
if (! ec)
{
JLOG(j_.trace) <<
"SNTP: Packet from " << ep_;
std::lock_guard<std::mutex> lock (mutex_);
auto const query = queries_.find (ep_);
if (query == queries_.end ())
JLOG(j_.debug) <<
"SNTP: Reply from " << ep_ << " found without matching query";
else if (query->second.replied)
JLOG(j_.debug) <<
"SNTP: Duplicate response from " << ep_;
else
{
query->second.replied = true;
if (time (nullptr) > (query->second.sent + 1))
JLOG(j_.warning) <<
"SNTP: Late response from " << ep_;
else if (bytes_xferd < 48)
JLOG(j_.warning) <<
"SNTP: Short reply from " << ep_ <<
" (" << bytes_xferd << ") " << buf_.size ();
else if (reinterpret_cast<std::uint32_t*>(
&buf_[0])[NTP_OFF_ORGTS_FRAC] !=
query->second.nonce)
JLOG(j_.warning) <<
"SNTP: Reply from " << ep_ << "had wrong nonce";
else
processReply ();
}
}
socket_.async_receive_from(buffer(buf_, 256),
ep_, std::bind(&SNTPClientImp::onRead, this,
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred));
}
//--------------------------------------------------------------------------
void init (const std::vector<std::string>& servers)
{
std::vector<std::string>::const_iterator it = servers.begin ();
if (it == servers.end ())
{
JLOG(j_.info) <<
"SNTP: no server specified";
return;
}
for (auto const& it : servers)
addServer (it);
queryAll ();
}
void addServer (std::string const& server)
{
std::lock_guard<std::mutex> lock (mutex_);
servers_.push_back (std::make_pair (server, (time_t) - 1));
}
void queryAll ()
{
while (doQuery ())
{
}
}
bool getOffset (int& offset)
{
std::lock_guard<std::mutex> lock (mutex_);
if ((lastUpdate_ == (time_t) - 1) || ((lastUpdate_ + NTP_TIMESTAMP_VALID) < time (nullptr)))
return false;
offset = offset_;
return true;
}
bool doQuery ()
{
std::lock_guard<std::mutex> lock (mutex_);
std::vector< std::pair<std::string, time_t> >::iterator best = servers_.end ();
for (std::vector< std::pair<std::string, time_t> >::iterator it = servers_.begin (), end = best;
it != end; ++it)
if ((best == end) || (it->second == (time_t) - 1) || (it->second < best->second))
best = it;
if (best == servers_.end ())
{
JLOG(j_.trace) <<
"SNTP: No server to query";
return false;
}
time_t now = time (nullptr);
if ((best->second != (time_t) - 1) && ((best->second + NTP_MIN_QUERY) >= now))
{
JLOG(j_.trace) <<
"SNTP: All servers recently queried";
return false;
}
best->second = now;
boost::asio::ip::udp::resolver::query query(
boost::asio::ip::udp::v4 (), best->first, "ntp");
resolver_.async_resolve (query, std::bind (
&SNTPClientImp::resolveComplete, this,
beast::asio::placeholders::error,
beast::asio::placeholders::iterator));
JLOG(j_.trace) <<
"SNTP: Resolve pending for " << best->first;
return true;
}
void resolveComplete (const error_code& error, boost::asio::ip::udp::resolver::iterator it)
{
if (!error)
{
boost::asio::ip::udp::resolver::iterator sel = it;
int i = 1;
while (++it != boost::asio::ip::udp::resolver::iterator ())
if ((rand () % ++i) == 0)
sel = it;
if (sel != boost::asio::ip::udp::resolver::iterator ())
{
std::lock_guard<std::mutex> lock (mutex_);
Query& query = queries_[*sel];
time_t now = time (nullptr);
if ((query.sent == now) || ((query.sent + 1) == now))
{
// This can happen if the same IP address is reached through multiple names
JLOG(j_.trace) <<
"SNTP: Redundant query suppressed";
return;
}
query.replied = false;
query.sent = now;
random_fill (&query.nonce);
reinterpret_cast<std::uint32_t*> (SNTPQueryData)[NTP_OFF_XMITTS_INT] = static_cast<std::uint32_t> (time (nullptr)) + NTP_UNIX_OFFSET;
reinterpret_cast<std::uint32_t*> (SNTPQueryData)[NTP_OFF_XMITTS_FRAC] = query.nonce;
socket_.async_send_to (boost::asio::buffer (SNTPQueryData, 48), *sel,
std::bind (&SNTPClientImp::onSend, this,
beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred));
}
}
}
void onSend (error_code const& ec, std::size_t)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
{
JLOG(j_.warning) <<
"SNTPClient::onSend: " << ec.message();
return;
}
}
void processReply ()
{
assert (buf_.size () >= 48);
std::uint32_t* recvBuffer = reinterpret_cast<std::uint32_t*> (&buf_.front ());
unsigned info = ntohl (recvBuffer[NTP_OFF_INFO]);
int64_t timev = ntohl (recvBuffer[NTP_OFF_RECVTS_INT]);
unsigned stratum = (info >> 16) & 0xff;
if ((info >> 30) == 3)
{
JLOG(j_.info) <<
"SNTP: Alarm condition " << ep_;
return;
}
if ((stratum == 0) || (stratum > 14))
{
JLOG(j_.info) <<
"SNTP: Unreasonable stratum (" << stratum << ") from " << ep_;
return;
}
std::int64_t now = static_cast<int> (time (nullptr));
timev -= now;
timev -= NTP_UNIX_OFFSET;
// add offset to list, replacing oldest one if appropriate
offsets_.push_back (timev);
if (offsets_.size () >= NTP_SAMPLE_WINDOW)
offsets_.pop_front ();
lastUpdate_ = now;
// select median time
auto offsetList = offsets_;
std::sort(offsetList.begin(), offsetList.end());
auto j = offsetList.size ();
auto it = std::next(offsetList.begin (), j/2);
offset_ = *it;
if ((j % 2) == 0)
offset_ = (offset_ + (*--it)) / 2;
// debounce: small corrections likely
// do more harm than good
if ((offset_ == -1) || (offset_ == 1))
offset_ = 0;
if (timev || offset_)
{
JLOG(j_.trace) << "SNTP: Offset is " << timev <<
", new system offset is " << offset_;
}
}
};
//------------------------------------------------------------------------------
std::unique_ptr<SNTPClient>
make_SNTPClient (beast::Journal j)
{
return std::make_unique<SNTPClientImp>(j);
}
} // ripple

View File

@@ -20,24 +20,27 @@
#ifndef RIPPLE_NET_SNTPCLIENT_H_INCLUDED
#define RIPPLE_NET_SNTPCLIENT_H_INCLUDED
#include <beast/threads/Stoppable.h>
#include <beast/utility/Journal.h>
#include <memory>
#include <string>
#include <vector>
namespace ripple {
class SNTPClient : public beast::Stoppable
class SNTPClient
{
protected:
explicit SNTPClient (beast::Stoppable& parent);
public:
static SNTPClient* New (beast::Stoppable& parent);
virtual ~SNTPClient() { }
virtual ~SNTPClient() = default;
virtual void init (std::vector <std::string> const& servers) = 0;
virtual void addServer (std::string const& mServer) = 0;
virtual void queryAll () = 0;
virtual bool getOffset (int& offset) = 0;
};
extern
std::unique_ptr<SNTPClient>
make_SNTPClient (beast::Journal);
} // ripple
#endif

View File

@@ -1,381 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/basics/Log.h>
#include <ripple/crypto/RandomNumbers.h>
#include <ripple/net/SNTPClient.h>
#include <beast/asio/placeholders.h>
#include <beast/threads/Thread.h>
#include <boost/asio.hpp>
#include <deque>
#include <mutex>
namespace ripple {
// #define SNTP_DEBUG
static uint8_t SNTPQueryData[48] =
{ 0x1B, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
// NTP query frequency - 4 minutes
#define NTP_QUERY_FREQUENCY (4 * 60)
// NTP minimum interval to query same servers - 3 minutes
#define NTP_MIN_QUERY (3 * 60)
// NTP sample window (should be odd)
#define NTP_SAMPLE_WINDOW 9
// NTP timestamp constant
#define NTP_UNIX_OFFSET 0x83AA7E80
// NTP timestamp validity
#define NTP_TIMESTAMP_VALID ((NTP_QUERY_FREQUENCY + NTP_MIN_QUERY) * 2)
// SNTP packet offsets
#define NTP_OFF_INFO 0
#define NTP_OFF_ROOTDELAY 1
#define NTP_OFF_ROOTDISP 2
#define NTP_OFF_REFERENCEID 3
#define NTP_OFF_REFTS_INT 4
#define NTP_OFF_REFTS_FRAC 5
#define NTP_OFF_ORGTS_INT 6
#define NTP_OFF_ORGTS_FRAC 7
#define NTP_OFF_RECVTS_INT 8
#define NTP_OFF_RECVTS_FRAC 9
#define NTP_OFF_XMITTS_INT 10
#define NTP_OFF_XMITTS_FRAC 11
class SNTPClientImp
: public SNTPClient
, public beast::Thread
{
public:
class SNTPQuery
{
public:
bool mReceivedReply;
time_t mLocalTimeSent;
std::uint32_t mQueryNonce;
SNTPQuery (time_t j = (time_t) - 1) : mReceivedReply (false), mLocalTimeSent (j)
{
;
}
};
//--------------------------------------------------------------------------
explicit SNTPClientImp (Stoppable& parent)
: SNTPClient (parent)
, Thread ("SNTPClient")
, mSocket (m_io_service)
, mTimer (m_io_service)
, mResolver (m_io_service)
, mOffset (0)
, mLastOffsetUpdate ((time_t) - 1)
, mReceiveBuffer (256)
{
mSocket.open (boost::asio::ip::udp::v4 ());
mSocket.async_receive_from (boost::asio::buffer (mReceiveBuffer, 256),
mReceiveEndpoint, std::bind (
&SNTPClientImp::receivePacket, this,
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred));
mTimer.expires_from_now (boost::posix_time::seconds (NTP_QUERY_FREQUENCY));
mTimer.async_wait (std::bind (&SNTPClientImp::timerEntry, this, beast::asio::placeholders::error));
}
~SNTPClientImp ()
{
stopThread ();
}
//--------------------------------------------------------------------------
void onStart ()
{
startThread ();
}
void onStop ()
{
// HACK!
m_io_service.stop ();
}
void run ()
{
m_io_service.run ();
stopped ();
}
//--------------------------------------------------------------------------
void init (const std::vector<std::string>& servers)
{
std::vector<std::string>::const_iterator it = servers.begin ();
if (it == servers.end ())
{
WriteLog (lsINFO, SNTPClient) << "SNTP: no server specified";
return;
}
for (auto const& it : servers)
addServer (it);
queryAll ();
}
void addServer (std::string const& server)
{
ScopedLockType sl (mLock);
mServers.push_back (std::make_pair (server, (time_t) - 1));
}
void queryAll ()
{
while (doQuery ())
{
}
}
bool getOffset (int& offset)
{
ScopedLockType sl (mLock);
if ((mLastOffsetUpdate == (time_t) - 1) || ((mLastOffsetUpdate + NTP_TIMESTAMP_VALID) < time (nullptr)))
return false;
offset = mOffset;
return true;
}
bool doQuery ()
{
ScopedLockType sl (mLock);
std::vector< std::pair<std::string, time_t> >::iterator best = mServers.end ();
for (std::vector< std::pair<std::string, time_t> >::iterator it = mServers.begin (), end = best;
it != end; ++it)
if ((best == end) || (it->second == (time_t) - 1) || (it->second < best->second))
best = it;
if (best == mServers.end ())
{
WriteLog (lsTRACE, SNTPClient) << "SNTP: No server to query";
return false;
}
time_t now = time (nullptr);
if ((best->second != (time_t) - 1) && ((best->second + NTP_MIN_QUERY) >= now))
{
WriteLog (lsTRACE, SNTPClient) << "SNTP: All servers recently queried";
return false;
}
best->second = now;
boost::asio::ip::udp::resolver::query query (boost::asio::ip::udp::v4 (), best->first, "ntp");
mResolver.async_resolve (query,
std::bind (&SNTPClientImp::resolveComplete, this,
beast::asio::placeholders::error, beast::asio::placeholders::iterator));
#ifdef SNTP_DEBUG
WriteLog (lsTRACE, SNTPClient) << "SNTP: Resolve pending for " << best->first;
#endif
return true;
}
void resolveComplete (const boost::system::error_code& error, boost::asio::ip::udp::resolver::iterator it)
{
if (!error)
{
boost::asio::ip::udp::resolver::iterator sel = it;
int i = 1;
while (++it != boost::asio::ip::udp::resolver::iterator ())
if ((rand () % ++i) == 0)
sel = it;
if (sel != boost::asio::ip::udp::resolver::iterator ())
{
ScopedLockType sl (mLock);
SNTPQuery& query = mQueries[*sel];
time_t now = time (nullptr);
if ((query.mLocalTimeSent == now) || ((query.mLocalTimeSent + 1) == now))
{
// This can happen if the same IP address is reached through multiple names
WriteLog (lsTRACE, SNTPClient) << "SNTP: Redundant query suppressed";
return;
}
query.mReceivedReply = false;
query.mLocalTimeSent = now;
random_fill (&query.mQueryNonce);
reinterpret_cast<std::uint32_t*> (SNTPQueryData)[NTP_OFF_XMITTS_INT] = static_cast<std::uint32_t> (time (nullptr)) + NTP_UNIX_OFFSET;
reinterpret_cast<std::uint32_t*> (SNTPQueryData)[NTP_OFF_XMITTS_FRAC] = query.mQueryNonce;
mSocket.async_send_to (boost::asio::buffer (SNTPQueryData, 48), *sel,
std::bind (&SNTPClientImp::sendComplete, this,
beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred));
}
}
}
void receivePacket (const boost::system::error_code& error, std::size_t bytes_xferd)
{
if (!error)
{
ScopedLockType sl (mLock);
#ifdef SNTP_DEBUG
WriteLog (lsTRACE, SNTPClient) << "SNTP: Packet from " << mReceiveEndpoint;
#endif
std::map<boost::asio::ip::udp::endpoint, SNTPQuery>::iterator query = mQueries.find (mReceiveEndpoint);
if (query == mQueries.end ())
WriteLog (lsDEBUG, SNTPClient) << "SNTP: Reply from " << mReceiveEndpoint << " found without matching query";
else if (query->second.mReceivedReply)
WriteLog (lsDEBUG, SNTPClient) << "SNTP: Duplicate response from " << mReceiveEndpoint;
else
{
query->second.mReceivedReply = true;
if (time (nullptr) > (query->second.mLocalTimeSent + 1))
WriteLog (lsWARNING, SNTPClient) << "SNTP: Late response from " << mReceiveEndpoint;
else if (bytes_xferd < 48)
WriteLog (lsWARNING, SNTPClient) << "SNTP: Short reply from " << mReceiveEndpoint
<< " (" << bytes_xferd << ") " << mReceiveBuffer.size ();
else if (reinterpret_cast<std::uint32_t*> (&mReceiveBuffer[0])[NTP_OFF_ORGTS_FRAC] != query->second.mQueryNonce)
WriteLog (lsWARNING, SNTPClient) << "SNTP: Reply from " << mReceiveEndpoint << "had wrong nonce";
else
processReply ();
}
}
mSocket.async_receive_from (boost::asio::buffer (mReceiveBuffer, 256), mReceiveEndpoint,
std::bind (&SNTPClientImp::receivePacket, this, beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred));
}
void sendComplete (const boost::system::error_code& error, std::size_t)
{
CondLog (error, lsWARNING, SNTPClient) << "SNTP: Send error";
}
void processReply ()
{
assert (mReceiveBuffer.size () >= 48);
std::uint32_t* recvBuffer = reinterpret_cast<std::uint32_t*> (&mReceiveBuffer.front ());
unsigned info = ntohl (recvBuffer[NTP_OFF_INFO]);
int64_t timev = ntohl (recvBuffer[NTP_OFF_RECVTS_INT]);
unsigned stratum = (info >> 16) & 0xff;
if ((info >> 30) == 3)
{
WriteLog (lsINFO, SNTPClient) << "SNTP: Alarm condition " << mReceiveEndpoint;
return;
}
if ((stratum == 0) || (stratum > 14))
{
WriteLog (lsINFO, SNTPClient) << "SNTP: Unreasonable stratum (" << stratum << ") from " << mReceiveEndpoint;
return;
}
std::int64_t now = static_cast<int> (time (nullptr));
timev -= now;
timev -= NTP_UNIX_OFFSET;
// add offset to list, replacing oldest one if appropriate
mOffsetList.push_back (timev);
if (mOffsetList.size () >= NTP_SAMPLE_WINDOW)
mOffsetList.pop_front ();
mLastOffsetUpdate = now;
// select median time
auto offsetList = mOffsetList;
std::sort(offsetList.begin(), offsetList.end());
auto j = offsetList.size ();
auto it = std::next(offsetList.begin (), j/2);
mOffset = *it;
if ((j % 2) == 0)
mOffset = (mOffset + (*--it)) / 2;
if ((mOffset == -1) || (mOffset == 1)) // small corrections likely do more harm than good
mOffset = 0;
CondLog (timev || mOffset, lsTRACE, SNTPClient) << "SNTP: Offset is " << timev << ", new system offset is " << mOffset;
}
void timerEntry (const boost::system::error_code& error)
{
if (!error)
{
doQuery ();
mTimer.expires_from_now (boost::posix_time::seconds (NTP_QUERY_FREQUENCY));
mTimer.async_wait (std::bind (&SNTPClientImp::timerEntry, this, beast::asio::placeholders::error));
}
}
private:
using LockType = std::mutex;
using ScopedLockType = std::lock_guard <LockType>;
LockType mLock;
boost::asio::io_service m_io_service;
std::map <boost::asio::ip::udp::endpoint, SNTPQuery> mQueries;
boost::asio::ip::udp::socket mSocket;
boost::asio::deadline_timer mTimer;
boost::asio::ip::udp::resolver mResolver;
std::vector< std::pair<std::string, time_t> > mServers;
int mOffset;
time_t mLastOffsetUpdate;
std::deque<int> mOffsetList;
std::vector<uint8_t> mReceiveBuffer;
boost::asio::ip::udp::endpoint mReceiveEndpoint;
};
//------------------------------------------------------------------------------
SNTPClient::SNTPClient (Stoppable& parent)
: Stoppable ("SNTPClient", parent)
{
}
//------------------------------------------------------------------------------
SNTPClient* SNTPClient::New (Stoppable& parent)
{
return new SNTPClientImp (parent);
}
} // ripple

View File

@@ -26,6 +26,7 @@
#include <ripple/core/impl/LoadMonitor.cpp>
#include <ripple/core/impl/Job.cpp>
#include <ripple/core/impl/JobQueue.cpp>
#include <ripple/core/impl/SNTPClient.cpp>
#include <ripple/core/tests/LoadFeeTrack.test.cpp>
#include <ripple/core/tests/Config.test.cpp>

View File

@@ -27,4 +27,3 @@
#include <ripple/net/impl/RPCCall.cpp>
#include <ripple/net/impl/RPCErr.cpp>
#include <ripple/net/impl/RPCSub.cpp>
#include <ripple/net/impl/SNTPClient.cpp>