An SNTP client implementation. About 95% complete right now.

This commit is contained in:
JoelKatz
2012-08-04 21:33:05 -07:00
parent 78f415f386
commit ee01b55874
7 changed files with 260 additions and 1 deletions

View File

@@ -85,6 +85,9 @@
# 192.168.0.1 3939
# 2001:0db8:0100:f101:0210:a4ff:fee3:9566
#
# [sntp_servers]
# IP address or domain of servers to use for time synchronization.
#
# [peer_ip]:
# IP address or domain to bind to allow external connections from peers.
# Defaults to not allow external connections from peers.
@@ -140,6 +143,11 @@
[debug_logfile]
debug.log
[sntp_servers]
time.windows.com
us.pool.ntp.org
time.apple.com
[validation_seed]
shh1D4oj5czH3PUEjYES8c7Bay3tE

View File

@@ -38,7 +38,8 @@ DatabaseCon::~DatabaseCon()
Application::Application() :
mUNL(mIOService),
mNetOps(mIOService, &mMasterLedger), mTempNodeCache(16384, 90), mHashedObjectStore(16384, 300),
mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL),
mSNTPClient(mIOService), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL),
mHashNodeDB(NULL), mNetNodeDB(NULL),
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL)
{
RAND_bytes(mNonce256.begin(), mNonce256.size());
@@ -68,6 +69,8 @@ void Application::run()
if (!theConfig.DEBUG_LOGFILE.empty())
Log::setLogFile(theConfig.DEBUG_LOGFILE);
mSNTPClient.init(theConfig.SNTP_SERVERS);
//
// Construct databases.
//

View File

@@ -16,6 +16,7 @@
#include "TaggedCache.h"
#include "ValidationCollection.h"
#include "Suppression.h"
#include "SNTPClient.h"
#include "../database/database.h"
@@ -50,6 +51,7 @@ class Application
ValidationCollection mValidations;
SuppressionTable mSuppressions;
HashedObjectStore mHashedObjectStore;
SNTPClient mSNTPClient;
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB;

View File

@@ -23,6 +23,7 @@
#define SECTION_RPC_ALLOW_REMOTE "rpc_allow_remote"
#define SECTION_RPC_IP "rpc_ip"
#define SECTION_RPC_PORT "rpc_port"
#define SECTION_SNTP "sntp_servers"
#define SECTION_UNL_DEFAULT "unl_default"
#define SECTION_VALIDATION_QUORUM "validation_quorum"
#define SECTION_VALIDATION_SEED "validation_seed"
@@ -193,6 +194,12 @@ void Config::load()
// sectionEntriesPrint(&IPS, SECTION_IPS);
}
smtTmp = sectionEntries(secConfig, SECTION_SNTP);
if (smtTmp)
{
SNTP_SERVERS = *smtTmp;
}
(void) sectionSingleB(secConfig, SECTION_VALIDATORS_SITE, VALIDATORS_SITE);
(void) sectionSingleB(secConfig, SECTION_PEER_IP, PEER_IP);

View File

@@ -54,6 +54,7 @@ public:
std::string VALIDATORS_SITE; // Where to find validators.txt on the Internet.
std::vector<std::string> VALIDATORS; // Validators from newcoind.cfg.
std::vector<std::string> IPS; // Peer IPs from newcoind.cfg.
std::vector<std::string> SNTP_SERVERS; // SNTP servers from newcoind.cfg.
// Network parameters
int NETWORK_START_TIME; // The Unix time we start ledger 0.

180
src/SNTPClient.cpp Normal file
View File

@@ -0,0 +1,180 @@
#include "SNTPClient.h"
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include "utils.h"
#include "Log.h"
static uint8_t SNTPQueryData[48] = {
0x1B,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
};
SNTPClient::SNTPClient(boost::asio::io_service& service) :
mIOService(service), mSocket(service), mTimer(service), mResolver(service),
mOffset(0), mLastOffsetUpdate((time_t) -1), mReceiveBuffer(256)
{
mSocket.open(boost::asio::ip::udp::v4());
mSocket.async_receive_from(boost::asio::buffer(mReceiveBuffer, 256), mReceiveEndpoint,
boost::bind(&SNTPClient::receivePacket, this, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
mTimer.expires_from_now(boost::posix_time::seconds(1));
mTimer.async_wait(boost::bind(&SNTPClient::timerEntry, this, boost::asio::placeholders::error));
}
void SNTPClient::resolveComplete(const boost::system::error_code& error, boost::asio::ip::udp::resolver::iterator it)
{
if (!error)
{
boost::asio::ip::udp::resolver::iterator sel = it;
int i = 1;
while (++it != boost::asio::ip::udp::resolver::iterator())
if ((rand() % ++i) == 0)
sel = it;
if (sel != boost::asio::ip::udp::resolver::iterator())
{
boost::mutex::scoped_lock sl(mLock);
SNTPQuery& query = mQueries[*sel];
time_t now = time(NULL);
if ((query.mLocalTimeSent == now) || ((query.mLocalTimeSent + 1) == now))
{
Log(lsINFO) << "SNTP: Redundant query suppressed";
return;
}
query.mReceivedReply = false;
query.mLocalTimeSent = now;
mSocket.async_send_to(boost::asio::buffer(SNTPQueryData, 256), *sel,
boost::bind(&SNTPClient::sendComplete, this,
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
}
}
void SNTPClient::receivePacket(const boost::system::error_code& error, std::size_t bytes_xferd)
{
if (!error)
{
boost::mutex::scoped_lock sl(mLock);
Log(lsINFO) << "SNTP: Packet from " << mReceiveEndpoint;
std::map<boost::asio::ip::udp::endpoint, SNTPQuery>::iterator query = mQueries.find(mReceiveEndpoint);
if (query == mQueries.end())
Log(lsINFO) << "SNTP: Reply found without matching query";
else if (query->second.mReceivedReply)
Log(lsINFO) << "SNTP: Duplicate response to query";
else
{
query->second.mReceivedReply = true;
if (time(NULL) > (query->second.mLocalTimeSent + 1))
Log(lsINFO) << "SNTP: Late response";
else
if (bytes_xferd < 48)
Log(lsINFO) << "SNTP: Short reply (" << bytes_xferd << ") " << mReceiveBuffer.size();
else
processReply();
}
}
mSocket.async_receive_from(boost::asio::buffer(mReceiveBuffer, 256), mReceiveEndpoint,
boost::bind(&SNTPClient::receivePacket, this, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void SNTPClient::sendComplete(const boost::system::error_code& error, std::size_t)
{
if (error)
Log(lsINFO) << "SNTP: Send error";
}
void SNTPClient::processReply()
{
assert(mReceiveBuffer.size() >= 48);
uint32 *recvBuffer = reinterpret_cast<uint32*>(&mReceiveBuffer.front());
unsigned info = ntohl(recvBuffer[0]);
int64_t timev = ntohl(recvBuffer[8]);
unsigned stratum = (info >> 16) & 0xff;
if ((info >> 30) == 3)
{
Log(lsINFO) << "SNTP: Alarm condition";
return;
}
if ((stratum == 0) || (stratum > 14))
{
Log(lsINFO) << "SNTP: Unreasonable stratum";
return;
}
timev -= time(NULL);
timev -= 0x83AA7E80ULL;
Log(lsINFO) << "SNTP: Offset is " << timev;
// WRITEME
}
void SNTPClient::timerEntry(const boost::system::error_code& error)
{
if (!error)
{
doQuery();
mTimer.expires_from_now(boost::posix_time::seconds(10));
mTimer.async_wait(boost::bind(&SNTPClient::timerEntry, this, boost::asio::placeholders::error));
}
}
void SNTPClient::addServer(const std::string& server)
{
boost::mutex::scoped_lock sl(mLock);
mServers.push_back(std::make_pair(server, (time_t) -1));
}
void SNTPClient::init(const std::vector<std::string>& servers)
{
std::vector<std::string>::const_iterator it = servers.begin();
if (it == servers.end())
{
Log(lsINFO) << "SNTP: no server specified";
return;
}
do
addServer(*it++);
while (it != servers.end());
queryAll();
}
void SNTPClient::queryAll()
{
while(doQuery())
nothing();
}
bool SNTPClient::doQuery()
{
boost::mutex::scoped_lock sl(mLock);
std::vector< std::pair<std::string, time_t> >::iterator best = mServers.end();
for (std::vector< std::pair<std::string, time_t> >::iterator it = mServers.begin(), end = best;
it != end; ++it)
if ((best == end) || (it->second == (time_t) -1) || (it->second < best->second))
best = it;
if (best == mServers.end())
{
Log(lsINFO) << "SNTP: No server to query";
return false;
}
time_t now = time(NULL);
if ((best->second == now) || (best->second == (now - 1)))
{
Log(lsINFO) << "SNTP: All servers recently queried";
return false;
}
best->second = now;
boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), best->first, "ntp");
mResolver.async_resolve(query,
boost::bind(&SNTPClient::resolveComplete, this,
boost::asio::placeholders::error, boost::asio::placeholders::iterator));
Log(lsINFO) << "SNTP: Resolve pending for " << best->first;
return true;
}

58
src/SNTPClient.h Normal file
View File

@@ -0,0 +1,58 @@
#ifndef __SNTPCLIENT__
#define __SNTPCLIENT__
#include <string>
#include <map>
#include <vector>
#include <boost/thread/mutex.hpp>
#include <boost/asio.hpp>
class SNTPQuery
{
public:
bool mReceivedReply;
time_t mLocalTimeSent;
SNTPQuery(time_t j = (time_t) -1) : mReceivedReply(false), mLocalTimeSent(j) { ; }
};
class SNTPClient
{
public:
typedef boost::shared_ptr<SNTPClient> pointer;
protected:
std::map<boost::asio::ip::udp::endpoint, SNTPQuery> mQueries;
boost::mutex mLock;
boost::asio::io_service& mIOService;
boost::asio::ip::udp::socket mSocket;
boost::asio::deadline_timer mTimer;
boost::asio::ip::udp::resolver mResolver;
std::vector< std::pair<std::string, time_t> > mServers;
int mOffset;
time_t mLastOffsetUpdate;
std::vector<uint8_t> mReceiveBuffer;
boost::asio::ip::udp::endpoint mReceiveEndpoint;
void receivePacket(const boost::system::error_code& error, std::size_t bytes);
void resolveComplete(const boost::system::error_code& error, boost::asio::ip::udp::resolver::iterator iterator);
void sentPacket(boost::shared_ptr<std::string>, const boost::system::error_code&, std::size_t);
void timerEntry(const boost::system::error_code&);
void sendComplete(const boost::system::error_code& error, std::size_t bytesTransferred);
void processReply();
public:
SNTPClient(boost::asio::io_service& service);
void init(const std::vector<std::string>& servers);
void addServer(const std::string& mServer);
void queryAll();
bool doQuery();
bool getOffset(int& offset);
};
#endif