#!/usr/bin/python # Copyright (C) 2013 Jeff Epler # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import socket import struct import string import time import itertools import select def map_unprintable_to_underscore(s): return "".join(c if c in string.printable else "_" for c in s) def xbits(w, st, *args): w >>= st for i in args: yield w & ((1<>= i def pbits(*args): if len(args) % 2: raise TypeError, "requires width-value pairs" widths = args[:len(args)/2] values = args[len(args)/2:] r = 0 s = 0 for i, w, v in zip(itertools.count(), widths, values): if v < 0 or v >= (1 << w): raise ValueError, ( "%d'th value out of range (%d does not fit in %d bits)" % (i, v, w)) r |= v << s s += w return r class NtpDatagram: def __init__(self, data): self.data = data (self.livnm, self.stratum, self.poll, self.precision, self.root_delay, self.root_dispersion, self.reference_id, self.reference_timestamp, self.origin_timestamp, self.receive_timestamp, self.transmit_timestamp) = struct.unpack( "!BBBBIIIQQQQ", data[:48]) self.root_delay_s = self.root_delay * 2**-16 self.root_dispersion_s = self.root_dispersion * 2**-16 self.reference_timestamp_s = self.reference_timestamp * 2**-32 self.origin_timestamp_s = self.origin_timestamp * 2**-32 self.receive_timestamp_s = self.receive_timestamp * 2**-32 self.transmit_timestamp_s = self.transmit_timestamp * 2**-32 self.reference_timestamp_su = \ ntp_epoch_to_unix_epoch(self.reference_timestamp_s) self.origin_timestamp_su = \ ntp_epoch_to_unix_epoch(self.origin_timestamp_s) self.receive_timestamp_su = \ ntp_epoch_to_unix_epoch(self.receive_timestamp_s) self.transmit_timestamp_su = \ ntp_epoch_to_unix_epoch(self.transmit_timestamp_s) self.mode, self.vn, self.li = xbits(self.livnm, 0, 3, 3, 2) if len(data) > 192: self.extension_data = data[192:-12] self.auth_data = data[-12:] else: self.extension_data = None self.auth_data = None def ref_id_as_string(self): if self.stratum == 1: return map_unprintable_to_underscore(struct.pack("!I", self.reference_id)) else: return ("%d.%d.%d.%d" % tuple(xbits(self.reference_id, 0, 8, 8, 8, 8))[::-1]) def solicit_ntp_datagram(addr): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) d = '\xe3' + '\0' * 47 s.sendto(d, addr) r, _, _ = select.select([s], [], [], 5) if not r: return None d, a = s.recvfrom(1024) return NtpDatagram(d) def ntp_livnm(li, vn, m): return pbits(3, 3, 2, m, vn, li) def ntp_epoch_to_unix_epoch(n): return n - 2208988800L if __name__ == '__main__': import sys if len(sys.argv) > 1: addr = sys.argv[1], 123 else: addr = '', 123 while 1: d = solicit_ntp_datagram(addr) if d is None: print "XXX No response received" continue # The server's idea of the time print time.strftime("%T", time.localtime(d.transmit_timestamp_su)), # LAMBDA is the ntp "synchronization distance", ntp's estimate # of the worst-case error in the time estimate in seconds. For # values above 1s, consider the system unsynchronized DELTA = d.root_delay_s EPSILON = d.root_dispersion_s LAMBDA = EPSILON + DELTA / 2 print "%8.5f" % LAMBDA, # the difference between the receive timestamp and the reference # timestamp gives the time since the server last talked to its upstream print "%10.5f" % (d.receive_timestamp_s - d.reference_timestamp_s), # The stratum number can indicate unsynchronized (if not in the range # 1..15 inclusive) st_desynch = d.stratum == 0 or d.stratum > 15 print "%s" % ("ST-UNSYNCH" if st_desynch else "ST-SYNCH"), # The leap second indicator can also indicate unsynchronized print "%s" % ("LI-UNSYNCH" if d.li == 3 else "LI-SYNCH") time.sleep(5)