# Copyright (C) 2009 Jeff Epler # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os, time, serial, traceback, glob, random OK, INFO, WARNING, ERROR = range(4) def check_system(command, level=ERROR): print "#", command if os.system(command): return level return 0 def check_ping(hostname, level=ERROR): return check_system("ping -c1 -W5 %s > /dev/null 2>&1" % hostname, level) def check_df(fs="", level=ERROR): return check_system("! (df %s | egrep '(99|100)%%')" % fs, level) def check_load(*levels): print "# load" lavg = float(open("/proc/loadavg").read().split()[0]) result = 0 for l, v in levels: if lavg > l: result = max(result, v) return result def check_ext(): print "# ext" level = 0 for f in glob.glob(os.path.expanduser("~/.checks/*")): print "#", f f = open(f) d = f.readline() f.close() d = d.split()[0] level = max(level, int(d)) return level checks = [ (lambda:check_ping('lamp.unpy.net'), 6), (lambda:check_ping('bald.unpy.net'), 6), (lambda:check_ping('cvs.linuxcnc.org'), 6), (lambda:check_ping('dsl.unpy.net'), 6), (lambda:check_ping('199.184.119.126'), 60), (check_ext, 5), (lambda:check_df('-text3'), 5), (lambda:check_load((2, INFO),(5, WARNING), (15, ERROR)), 60), ] dchecks = {} for a, b in checks: now = time.time() dchecks[a] = (b, now + random.uniform(0, b), a()) def calc_next(now, interval, lev): print "calc_next", interval / 4.**lev return now + interval / (4.**lev) def do_checks(): result = 0 now = time.time() maxlev = max(lev for interval, next, lev in dchecks.values()) for f, (interval, next, lev) in dchecks.items(): if now > next: lev = do_check(f) dchecks[f] = (interval, calc_next(now, interval, lev), lev) result = max(result, lev) return result def do_check(f): try: return f() except KeyboardInterrupt: raise except: log_exception() return ERROR def log_exception(): traceback.print_exc() def main(): s = serial.Serial("/dev/ttyUSB0", 19200) last_level = 3 while 1: level = do_checks() if level or last_level: print level last_level = level try: s.write(chr(level)) except KeyboardInterrupt: raise except: log_exception() try: s.close() except: pass try: s = serial.Serial("/dev/ttyUSB0", 19200) except: s = None time.sleep(.1) main()