From e82d774d32b5c9c96647c1956896a417bdedceb7 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 15 Jul 2015 13:33:45 -0700 Subject: [PATCH] Move SNTPClient to core: * Refactor SNTPClient * asio best practices * Not derived from Stoppable --- Builds/VisualStudio2013/RippleD.vcxproj | 14 +- .../VisualStudio2013/RippleD.vcxproj.filters | 12 +- src/ripple/app/main/Application.cpp | 5 +- src/ripple/core/impl/SNTPClient.cpp | 423 ++++++++++++++++++ src/ripple/{net => core/impl}/SNTPClient.h | 17 +- src/ripple/net/impl/SNTPClient.cpp | 381 ---------------- src/ripple/unity/core.cpp | 1 + src/ripple/unity/net.cpp | 1 - 8 files changed, 451 insertions(+), 403 deletions(-) create mode 100644 src/ripple/core/impl/SNTPClient.cpp rename src/ripple/{net => core/impl}/SNTPClient.h (85%) delete mode 100644 src/ripple/net/impl/SNTPClient.cpp diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 86abbb60a1..741f68e994 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2071,6 +2071,14 @@ ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) + + True + True + ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) + ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) + + + True True @@ -2405,10 +2413,6 @@ True True - - True - True - @@ -2417,8 +2421,6 @@ - - diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index a6b81c3243..c8afd1126e 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -2805,6 +2805,12 @@ ripple\core\impl + + ripple\core\impl + + + ripple\core\impl + ripple\core\impl @@ -3132,9 +3138,6 @@ ripple\net\impl - - ripple\net\impl - ripple\net @@ -3147,9 +3150,6 @@ ripple\net - - ripple\net - ripple\nodestore diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index be0eee952b..4873923c7d 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -53,7 +53,7 @@ #include #include #include -#include +#include #include #include #include @@ -403,7 +403,8 @@ public: , serverHandler_ (make_ServerHandler (*m_networkOPs, get_io_service (), *m_jobQueue, *m_networkOPs, *m_resourceManager, *m_collectorManager)) - , m_sntpClient (SNTPClient::New (*this)) + , m_sntpClient (make_SNTPClient( + deprecatedLogs().journal("SNTPClient"))) , m_validators (Validators::make_Manager(*this, get_io_service(), m_logs.journal("UVL"), getConfig ())) diff --git a/src/ripple/core/impl/SNTPClient.cpp b/src/ripple/core/impl/SNTPClient.cpp new file mode 100644 index 0000000000..90ab29d5c6 --- /dev/null +++ b/src/ripple/core/impl/SNTPClient.cpp @@ -0,0 +1,423 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // +#include +#include + +namespace ripple { + +// #define SNTP_DEBUG + +static uint8_t SNTPQueryData[48] = +{ 0x1B, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + +// NTP query frequency - 4 minutes +#define NTP_QUERY_FREQUENCY (4 * 60) + +// NTP minimum interval to query same servers - 3 minutes +#define NTP_MIN_QUERY (3 * 60) + +// NTP sample window (should be odd) +#define NTP_SAMPLE_WINDOW 9 + +// NTP timestamp constant +#define NTP_UNIX_OFFSET 0x83AA7E80 + +// NTP timestamp validity +#define NTP_TIMESTAMP_VALID ((NTP_QUERY_FREQUENCY + NTP_MIN_QUERY) * 2) + +// SNTP packet offsets +#define NTP_OFF_INFO 0 +#define NTP_OFF_ROOTDELAY 1 +#define NTP_OFF_ROOTDISP 2 +#define NTP_OFF_REFERENCEID 3 +#define NTP_OFF_REFTS_INT 4 +#define NTP_OFF_REFTS_FRAC 5 +#define NTP_OFF_ORGTS_INT 6 +#define NTP_OFF_ORGTS_FRAC 7 +#define NTP_OFF_RECVTS_INT 8 +#define NTP_OFF_RECVTS_FRAC 9 +#define NTP_OFF_XMITTS_INT 10 +#define NTP_OFF_XMITTS_FRAC 11 + +class SNTPClientImp + : public SNTPClient +{ +private: + struct Query + { + bool replied; + time_t sent; // VFALCO time_t, really? + std::uint32_t nonce; + + Query (time_t j = (time_t) -1) + : replied (false) + , sent (j) + { + } + }; + + beast::Journal j_; + std::mutex mutex_; + std::thread thread_; + boost::asio::io_service io_service_; + boost::optional< + boost::asio::io_service::work> work_; + + std::map queries_; + boost::asio::ip::udp::socket socket_; + boost::asio::deadline_timer timer_; + boost::asio::ip::udp::resolver resolver_; + std::vector> servers_; + int offset_; + time_t lastUpdate_; + std::deque offsets_; + std::vector buf_; + boost::asio::ip::udp::endpoint ep_; + +public: + using error_code = boost::system::error_code; + + explicit + SNTPClientImp (beast::Journal j) + : j_ (j) + , work_(io_service_) + , socket_ (io_service_) + , timer_ (io_service_) + , resolver_ (io_service_) + , offset_ (0) + , lastUpdate_ ((time_t) -1) + , buf_ (256) + { + using namespace boost::asio; + socket_.open (ip::udp::v4 ()); + socket_.async_receive_from (buffer (buf_, 256), + ep_, std::bind( + &SNTPClientImp::onRead, this, + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred)); + timer_.expires_from_now( + boost::posix_time::seconds(NTP_QUERY_FREQUENCY)); + timer_.async_wait(std::bind( + &SNTPClientImp::onTimer, this, + beast::asio::placeholders::error)); + thread_ = std::thread(&SNTPClientImp::run, this); + } + + ~SNTPClientImp () + { + error_code ec; + timer_.cancel(ec); + socket_.cancel(ec); + work_ = boost::none; + thread_.join(); + } + + void run () + { + setCallingThreadName("SNTPClient"); + io_service_.run(); + } + + //-------------------------------------------------------------------------- + + void + onTimer (error_code const& ec) + { + using namespace boost::asio; + if (ec == error::operation_aborted) + return; + if (ec) + { + JLOG(j_.error) << + "SNTPClient::onTimer: " << ec.message(); + return; + } + + doQuery (); + timer_.expires_from_now( + boost::posix_time::seconds (NTP_QUERY_FREQUENCY)); + timer_.async_wait(std::bind( + &SNTPClientImp::onTimer, this, + beast::asio::placeholders::error)); + } + + void + onRead (error_code const& ec, std::size_t bytes_xferd) + { + using namespace boost::asio; + if (ec == error::operation_aborted) + return; + + // VFALCO Should we return on any error? + /* + if (ec) + return; + */ + + if (! ec) + { + JLOG(j_.trace) << + "SNTP: Packet from " << ep_; + std::lock_guard lock (mutex_); + auto const query = queries_.find (ep_); + if (query == queries_.end ()) + JLOG(j_.debug) << + "SNTP: Reply from " << ep_ << " found without matching query"; + else if (query->second.replied) + JLOG(j_.debug) << + "SNTP: Duplicate response from " << ep_; + else + { + query->second.replied = true; + + if (time (nullptr) > (query->second.sent + 1)) + JLOG(j_.warning) << + "SNTP: Late response from " << ep_; + else if (bytes_xferd < 48) + JLOG(j_.warning) << + "SNTP: Short reply from " << ep_ << + " (" << bytes_xferd << ") " << buf_.size (); + else if (reinterpret_cast( + &buf_[0])[NTP_OFF_ORGTS_FRAC] != + query->second.nonce) + JLOG(j_.warning) << + "SNTP: Reply from " << ep_ << "had wrong nonce"; + else + processReply (); + } + } + + socket_.async_receive_from(buffer(buf_, 256), + ep_, std::bind(&SNTPClientImp::onRead, this, + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred)); + } + + //-------------------------------------------------------------------------- + + void init (const std::vector& servers) + { + std::vector::const_iterator it = servers.begin (); + + if (it == servers.end ()) + { + JLOG(j_.info) << + "SNTP: no server specified"; + return; + } + + for (auto const& it : servers) + addServer (it); + queryAll (); + } + + void addServer (std::string const& server) + { + std::lock_guard lock (mutex_); + servers_.push_back (std::make_pair (server, (time_t) - 1)); + } + + void queryAll () + { + while (doQuery ()) + { + } + } + + bool getOffset (int& offset) + { + std::lock_guard lock (mutex_); + + if ((lastUpdate_ == (time_t) - 1) || ((lastUpdate_ + NTP_TIMESTAMP_VALID) < time (nullptr))) + return false; + + offset = offset_; + return true; + } + + bool doQuery () + { + + std::lock_guard lock (mutex_); + std::vector< std::pair >::iterator best = servers_.end (); + + for (std::vector< std::pair >::iterator it = servers_.begin (), end = best; + it != end; ++it) + if ((best == end) || (it->second == (time_t) - 1) || (it->second < best->second)) + best = it; + + if (best == servers_.end ()) + { + JLOG(j_.trace) << + "SNTP: No server to query"; + return false; + } + + time_t now = time (nullptr); + + if ((best->second != (time_t) - 1) && ((best->second + NTP_MIN_QUERY) >= now)) + { + JLOG(j_.trace) << + "SNTP: All servers recently queried"; + return false; + } + + best->second = now; + + boost::asio::ip::udp::resolver::query query( + boost::asio::ip::udp::v4 (), best->first, "ntp"); + resolver_.async_resolve (query, std::bind ( + &SNTPClientImp::resolveComplete, this, + beast::asio::placeholders::error, + beast::asio::placeholders::iterator)); + JLOG(j_.trace) << + "SNTP: Resolve pending for " << best->first; + return true; + } + + void resolveComplete (const error_code& error, boost::asio::ip::udp::resolver::iterator it) + { + if (!error) + { + boost::asio::ip::udp::resolver::iterator sel = it; + int i = 1; + + while (++it != boost::asio::ip::udp::resolver::iterator ()) + if ((rand () % ++i) == 0) + sel = it; + + if (sel != boost::asio::ip::udp::resolver::iterator ()) + { + std::lock_guard lock (mutex_); + Query& query = queries_[*sel]; + time_t now = time (nullptr); + + if ((query.sent == now) || ((query.sent + 1) == now)) + { + // This can happen if the same IP address is reached through multiple names + JLOG(j_.trace) << + "SNTP: Redundant query suppressed"; + return; + } + + query.replied = false; + query.sent = now; + random_fill (&query.nonce); + reinterpret_cast (SNTPQueryData)[NTP_OFF_XMITTS_INT] = static_cast (time (nullptr)) + NTP_UNIX_OFFSET; + reinterpret_cast (SNTPQueryData)[NTP_OFF_XMITTS_FRAC] = query.nonce; + socket_.async_send_to (boost::asio::buffer (SNTPQueryData, 48), *sel, + std::bind (&SNTPClientImp::onSend, this, + beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred)); + } + } + } + + void onSend (error_code const& ec, std::size_t) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + JLOG(j_.warning) << + "SNTPClient::onSend: " << ec.message(); + return; + } + } + + void processReply () + { + assert (buf_.size () >= 48); + std::uint32_t* recvBuffer = reinterpret_cast (&buf_.front ()); + + unsigned info = ntohl (recvBuffer[NTP_OFF_INFO]); + int64_t timev = ntohl (recvBuffer[NTP_OFF_RECVTS_INT]); + unsigned stratum = (info >> 16) & 0xff; + + if ((info >> 30) == 3) + { + JLOG(j_.info) << + "SNTP: Alarm condition " << ep_; + return; + } + + if ((stratum == 0) || (stratum > 14)) + { + JLOG(j_.info) << + "SNTP: Unreasonable stratum (" << stratum << ") from " << ep_; + return; + } + + std::int64_t now = static_cast (time (nullptr)); + timev -= now; + timev -= NTP_UNIX_OFFSET; + + // add offset to list, replacing oldest one if appropriate + offsets_.push_back (timev); + + if (offsets_.size () >= NTP_SAMPLE_WINDOW) + offsets_.pop_front (); + + lastUpdate_ = now; + + // select median time + auto offsetList = offsets_; + std::sort(offsetList.begin(), offsetList.end()); + auto j = offsetList.size (); + auto it = std::next(offsetList.begin (), j/2); + offset_ = *it; + + if ((j % 2) == 0) + offset_ = (offset_ + (*--it)) / 2; + + // debounce: small corrections likely + // do more harm than good + if ((offset_ == -1) || (offset_ == 1)) + offset_ = 0; + + if (timev || offset_) + { + JLOG(j_.trace) << "SNTP: Offset is " << timev << + ", new system offset is " << offset_; + } + } +}; + +//------------------------------------------------------------------------------ + +std::unique_ptr +make_SNTPClient (beast::Journal j) +{ + return std::make_unique(j); +} + +} // ripple diff --git a/src/ripple/net/SNTPClient.h b/src/ripple/core/impl/SNTPClient.h similarity index 85% rename from src/ripple/net/SNTPClient.h rename to src/ripple/core/impl/SNTPClient.h index a43c68d268..05b34c17e1 100644 --- a/src/ripple/net/SNTPClient.h +++ b/src/ripple/core/impl/SNTPClient.h @@ -20,24 +20,27 @@ #ifndef RIPPLE_NET_SNTPCLIENT_H_INCLUDED #define RIPPLE_NET_SNTPCLIENT_H_INCLUDED -#include +#include +#include +#include +#include namespace ripple { -class SNTPClient : public beast::Stoppable +class SNTPClient { -protected: - explicit SNTPClient (beast::Stoppable& parent); - public: - static SNTPClient* New (beast::Stoppable& parent); - virtual ~SNTPClient() { } + virtual ~SNTPClient() = default; virtual void init (std::vector const& servers) = 0; virtual void addServer (std::string const& mServer) = 0; virtual void queryAll () = 0; virtual bool getOffset (int& offset) = 0; }; +extern +std::unique_ptr +make_SNTPClient (beast::Journal); + } // ripple #endif diff --git a/src/ripple/net/impl/SNTPClient.cpp b/src/ripple/net/impl/SNTPClient.cpp deleted file mode 100644 index cae47c89c9..0000000000 --- a/src/ripple/net/impl/SNTPClient.cpp +++ /dev/null @@ -1,381 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ripple { - -// #define SNTP_DEBUG - -static uint8_t SNTPQueryData[48] = -{ 0x1B, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - -// NTP query frequency - 4 minutes -#define NTP_QUERY_FREQUENCY (4 * 60) - -// NTP minimum interval to query same servers - 3 minutes -#define NTP_MIN_QUERY (3 * 60) - -// NTP sample window (should be odd) -#define NTP_SAMPLE_WINDOW 9 - -// NTP timestamp constant -#define NTP_UNIX_OFFSET 0x83AA7E80 - -// NTP timestamp validity -#define NTP_TIMESTAMP_VALID ((NTP_QUERY_FREQUENCY + NTP_MIN_QUERY) * 2) - -// SNTP packet offsets -#define NTP_OFF_INFO 0 -#define NTP_OFF_ROOTDELAY 1 -#define NTP_OFF_ROOTDISP 2 -#define NTP_OFF_REFERENCEID 3 -#define NTP_OFF_REFTS_INT 4 -#define NTP_OFF_REFTS_FRAC 5 -#define NTP_OFF_ORGTS_INT 6 -#define NTP_OFF_ORGTS_FRAC 7 -#define NTP_OFF_RECVTS_INT 8 -#define NTP_OFF_RECVTS_FRAC 9 -#define NTP_OFF_XMITTS_INT 10 -#define NTP_OFF_XMITTS_FRAC 11 - -class SNTPClientImp - : public SNTPClient - , public beast::Thread -{ -public: - class SNTPQuery - { - public: - bool mReceivedReply; - time_t mLocalTimeSent; - std::uint32_t mQueryNonce; - - SNTPQuery (time_t j = (time_t) - 1) : mReceivedReply (false), mLocalTimeSent (j) - { - ; - } - }; - - //-------------------------------------------------------------------------- - - explicit SNTPClientImp (Stoppable& parent) - : SNTPClient (parent) - , Thread ("SNTPClient") - , mSocket (m_io_service) - , mTimer (m_io_service) - , mResolver (m_io_service) - , mOffset (0) - , mLastOffsetUpdate ((time_t) - 1) - , mReceiveBuffer (256) - { - mSocket.open (boost::asio::ip::udp::v4 ()); - - mSocket.async_receive_from (boost::asio::buffer (mReceiveBuffer, 256), - mReceiveEndpoint, std::bind ( - &SNTPClientImp::receivePacket, this, - beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred)); - - mTimer.expires_from_now (boost::posix_time::seconds (NTP_QUERY_FREQUENCY)); - mTimer.async_wait (std::bind (&SNTPClientImp::timerEntry, this, beast::asio::placeholders::error)); - } - - ~SNTPClientImp () - { - stopThread (); - } - - //-------------------------------------------------------------------------- - - void onStart () - { - startThread (); - } - - void onStop () - { - // HACK! - m_io_service.stop (); - } - - void run () - { - m_io_service.run (); - - stopped (); - } - - //-------------------------------------------------------------------------- - - void init (const std::vector& servers) - { - std::vector::const_iterator it = servers.begin (); - - if (it == servers.end ()) - { - WriteLog (lsINFO, SNTPClient) << "SNTP: no server specified"; - return; - } - - for (auto const& it : servers) - addServer (it); - queryAll (); - } - - void addServer (std::string const& server) - { - ScopedLockType sl (mLock); - mServers.push_back (std::make_pair (server, (time_t) - 1)); - } - - void queryAll () - { - while (doQuery ()) - { - } - } - - bool getOffset (int& offset) - { - ScopedLockType sl (mLock); - - if ((mLastOffsetUpdate == (time_t) - 1) || ((mLastOffsetUpdate + NTP_TIMESTAMP_VALID) < time (nullptr))) - return false; - - offset = mOffset; - return true; - } - - bool doQuery () - { - ScopedLockType sl (mLock); - std::vector< std::pair >::iterator best = mServers.end (); - - for (std::vector< std::pair >::iterator it = mServers.begin (), end = best; - it != end; ++it) - if ((best == end) || (it->second == (time_t) - 1) || (it->second < best->second)) - best = it; - - if (best == mServers.end ()) - { - WriteLog (lsTRACE, SNTPClient) << "SNTP: No server to query"; - return false; - } - - time_t now = time (nullptr); - - if ((best->second != (time_t) - 1) && ((best->second + NTP_MIN_QUERY) >= now)) - { - WriteLog (lsTRACE, SNTPClient) << "SNTP: All servers recently queried"; - return false; - } - - best->second = now; - - boost::asio::ip::udp::resolver::query query (boost::asio::ip::udp::v4 (), best->first, "ntp"); - mResolver.async_resolve (query, - std::bind (&SNTPClientImp::resolveComplete, this, - beast::asio::placeholders::error, beast::asio::placeholders::iterator)); - #ifdef SNTP_DEBUG - WriteLog (lsTRACE, SNTPClient) << "SNTP: Resolve pending for " << best->first; - #endif - return true; - } - - void resolveComplete (const boost::system::error_code& error, boost::asio::ip::udp::resolver::iterator it) - { - if (!error) - { - boost::asio::ip::udp::resolver::iterator sel = it; - int i = 1; - - while (++it != boost::asio::ip::udp::resolver::iterator ()) - if ((rand () % ++i) == 0) - sel = it; - - if (sel != boost::asio::ip::udp::resolver::iterator ()) - { - ScopedLockType sl (mLock); - SNTPQuery& query = mQueries[*sel]; - time_t now = time (nullptr); - - if ((query.mLocalTimeSent == now) || ((query.mLocalTimeSent + 1) == now)) - { - // This can happen if the same IP address is reached through multiple names - WriteLog (lsTRACE, SNTPClient) << "SNTP: Redundant query suppressed"; - return; - } - - query.mReceivedReply = false; - query.mLocalTimeSent = now; - random_fill (&query.mQueryNonce); - reinterpret_cast (SNTPQueryData)[NTP_OFF_XMITTS_INT] = static_cast (time (nullptr)) + NTP_UNIX_OFFSET; - reinterpret_cast (SNTPQueryData)[NTP_OFF_XMITTS_FRAC] = query.mQueryNonce; - mSocket.async_send_to (boost::asio::buffer (SNTPQueryData, 48), *sel, - std::bind (&SNTPClientImp::sendComplete, this, - beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred)); - } - } - } - - void receivePacket (const boost::system::error_code& error, std::size_t bytes_xferd) - { - if (!error) - { - ScopedLockType sl (mLock); - #ifdef SNTP_DEBUG - WriteLog (lsTRACE, SNTPClient) << "SNTP: Packet from " << mReceiveEndpoint; - #endif - std::map::iterator query = mQueries.find (mReceiveEndpoint); - - if (query == mQueries.end ()) - WriteLog (lsDEBUG, SNTPClient) << "SNTP: Reply from " << mReceiveEndpoint << " found without matching query"; - else if (query->second.mReceivedReply) - WriteLog (lsDEBUG, SNTPClient) << "SNTP: Duplicate response from " << mReceiveEndpoint; - else - { - query->second.mReceivedReply = true; - - if (time (nullptr) > (query->second.mLocalTimeSent + 1)) - WriteLog (lsWARNING, SNTPClient) << "SNTP: Late response from " << mReceiveEndpoint; - else if (bytes_xferd < 48) - WriteLog (lsWARNING, SNTPClient) << "SNTP: Short reply from " << mReceiveEndpoint - << " (" << bytes_xferd << ") " << mReceiveBuffer.size (); - else if (reinterpret_cast (&mReceiveBuffer[0])[NTP_OFF_ORGTS_FRAC] != query->second.mQueryNonce) - WriteLog (lsWARNING, SNTPClient) << "SNTP: Reply from " << mReceiveEndpoint << "had wrong nonce"; - else - processReply (); - } - } - - mSocket.async_receive_from (boost::asio::buffer (mReceiveBuffer, 256), mReceiveEndpoint, - std::bind (&SNTPClientImp::receivePacket, this, beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred)); - } - - void sendComplete (const boost::system::error_code& error, std::size_t) - { - CondLog (error, lsWARNING, SNTPClient) << "SNTP: Send error"; - } - - void processReply () - { - assert (mReceiveBuffer.size () >= 48); - std::uint32_t* recvBuffer = reinterpret_cast (&mReceiveBuffer.front ()); - - unsigned info = ntohl (recvBuffer[NTP_OFF_INFO]); - int64_t timev = ntohl (recvBuffer[NTP_OFF_RECVTS_INT]); - unsigned stratum = (info >> 16) & 0xff; - - if ((info >> 30) == 3) - { - WriteLog (lsINFO, SNTPClient) << "SNTP: Alarm condition " << mReceiveEndpoint; - return; - } - - if ((stratum == 0) || (stratum > 14)) - { - WriteLog (lsINFO, SNTPClient) << "SNTP: Unreasonable stratum (" << stratum << ") from " << mReceiveEndpoint; - return; - } - - std::int64_t now = static_cast (time (nullptr)); - timev -= now; - timev -= NTP_UNIX_OFFSET; - - // add offset to list, replacing oldest one if appropriate - mOffsetList.push_back (timev); - - if (mOffsetList.size () >= NTP_SAMPLE_WINDOW) - mOffsetList.pop_front (); - - mLastOffsetUpdate = now; - - // select median time - auto offsetList = mOffsetList; - std::sort(offsetList.begin(), offsetList.end()); - auto j = offsetList.size (); - auto it = std::next(offsetList.begin (), j/2); - mOffset = *it; - - if ((j % 2) == 0) - mOffset = (mOffset + (*--it)) / 2; - - if ((mOffset == -1) || (mOffset == 1)) // small corrections likely do more harm than good - mOffset = 0; - - CondLog (timev || mOffset, lsTRACE, SNTPClient) << "SNTP: Offset is " << timev << ", new system offset is " << mOffset; - } - - void timerEntry (const boost::system::error_code& error) - { - if (!error) - { - doQuery (); - mTimer.expires_from_now (boost::posix_time::seconds (NTP_QUERY_FREQUENCY)); - mTimer.async_wait (std::bind (&SNTPClientImp::timerEntry, this, beast::asio::placeholders::error)); - } - } - -private: - using LockType = std::mutex; - using ScopedLockType = std::lock_guard ; - LockType mLock; - - boost::asio::io_service m_io_service; - std::map mQueries; - - boost::asio::ip::udp::socket mSocket; - boost::asio::deadline_timer mTimer; - boost::asio::ip::udp::resolver mResolver; - - std::vector< std::pair > mServers; - - int mOffset; - time_t mLastOffsetUpdate; - std::deque mOffsetList; - - std::vector mReceiveBuffer; - boost::asio::ip::udp::endpoint mReceiveEndpoint; -}; - -//------------------------------------------------------------------------------ - -SNTPClient::SNTPClient (Stoppable& parent) - : Stoppable ("SNTPClient", parent) -{ -} - -//------------------------------------------------------------------------------ - -SNTPClient* SNTPClient::New (Stoppable& parent) -{ - return new SNTPClientImp (parent); -} - -} // ripple diff --git a/src/ripple/unity/core.cpp b/src/ripple/unity/core.cpp index 3cbdc8f1de..1ec0cce05b 100644 --- a/src/ripple/unity/core.cpp +++ b/src/ripple/unity/core.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include diff --git a/src/ripple/unity/net.cpp b/src/ripple/unity/net.cpp index 3726c1213d..e4fd1e3360 100644 --- a/src/ripple/unity/net.cpp +++ b/src/ripple/unity/net.cpp @@ -27,4 +27,3 @@ #include #include #include -#include