#! /usr/bin/env python # -*- coding: UTF-8 -*- """The Sampling Server package $Id: test.py 803 2008-12-03 12:08:26Z goodwin $ """ # Copyright 2008-2010 eGovMon # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the eGovernment Monitoring # (eGovMon) # # eGovMon is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # eGovMon is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with eGovMon; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA __author__ = "$Author: goodwin $" __version__ = "$Revision: 803 $" __updated__ = "$LastChangedDate$" from egovmontime import * import Queue import SOAPpy import threading import os import sys import random import time import urlparse from socket import gethostname; import socket from logit import * from subprocess import * from threading import Semaphore from sc import SystemConfiguration sco=SystemConfiguration() PIDFILE="/var/run/samplingserver.pid" class Log: """Ensure auto-flush after each write""" def __init__(self, f): self.f = f def write(self, s): self.f.write(s) self.f.flush() def flush(self): self.f.flush() log=Log(open(sco.loglocation+'/samplingserver.log','a')) class Log: """Ensure auto-flush after each write""" def __init__(self, f): self.f = f def write(self, s): self.f.write(s) self.f.flush() def flush(self): self.f.flush() class SamplingServerT(threading.Thread): """Threaded ETL server class """ def __init__(self,sc,parent): """Constructor for the ETL Server Thread """ threading.Thread.__init__(self) self.starttime = None self.pid = None self.sc = sc self.parent = parent self.activesite = '' def duration(self): """Returns the duration of an ETL """ return time.time()-self.starttime def stopifhanged(self): """Claims this tread is stopped even if it has not. This is to prevent bugs due to too long duration """ if self.starttime and self.pid: if self.duration()>10000: cmd = 'kill -9 ' + str(self.pid) os.system(cmd) sys.stderr.write('Killed pid:'+str(self.pid)) sys.stderr.write('Investigate why \n') def run(self): """ Running the actual thread. Loops until stop(...) is called Returns None """ while 1: element = self.parent.urlqueue.get() if not element: #print 'Server is stopping' return else: site,testrun,contenttype = (element) self.activesite = site logit('Starting sampling of site'+site+'with testrunid '+testrun,somemodule='Sampler') logname = site.replace('.','_').replace('/','_') cmd = 'sampler ' + str(site) + ' ' + str(testrun) + ' ' + str(contenttype) fout = open(self.sc.loglocation+'sampler_'+str(logname)+'_stdout.log','w') ferr = open(self.sc.loglocation+'sampler_'+str(logname)+'_stderr.log','w') self.p = Popen(cmd,shell=True,stdout=fout,stderr=ferr) try: self.p.wait() except OSError: print 'Warning:',self.activesite,'enden abruptly.' self.activesite = '' logit('Size of sample queue:'+str(self.parent.urlqueue.qsize()),somemodule='Sampler',stdout=True) #os.system('sampler ' + str(site) + ' ' + str(testrun) +' 1>'+self.sc.loglocation+'sampler_'+str(logname)+'_stdout.log 2>'+self.sc.loglocation+'sampler_'+str(logname)+'_stderr.log') class SamplingServer: """Site URL server class """ def __init__(self,port=8891,host=None,numthreads=None): """initialise the class KeyWord arguments: port -- [Optional] Port to serve at. 8891 as default. host -- [Optional] Host to server at. Localhost as default. numthreads -- [Optional] Number of threads. """ self.port = port self.host = host self.samplingt = [] self.sc = SystemConfiguration() if numthreads == None: self.numthreads = int(self.sc.numberofsamplers) else: self.numthreads = numthreads self.urlqueue = Queue.Queue() self.start() def stop(self): """ Stopping all threads """ for t in self.samplingt: self.urlqueue.put(None) def start(self): """ Staring all threads Needed to start or restart all threads in the server Returns None """ #Just to make sure everything is stopped. The next command is strictly not needed it the ETL server is used correctly. However, it is no harm in calling it too much. self.stop() self.samplingt = [] for i in range(self.numthreads): t = SamplingServerT(self.sc,self) t.start() t.port = self.port t.host = self.host self.samplingt.append(t) self.loading = False self.loadingqueue = {} print 'Sampling Server is avilable at port',self.port,'running with',self.numthreads,'threads' def ping(self): return True def loadSample(self,site,testrun,contenttype): """ Pushes data to the ETL for loading into to DW Keyword arguments: site -- site to evaluate testrunnr -- Current Testrunnr contenttyep -- HTML, PDF or MobileOK Returns None """ logit('Received site '+str(site)+' in the sampler with testrunid '+testrun,somemodule='Sampler',stdout=True) # Checking if the site is already in the queue siteexists = False if site in [i[0] for i in self.urlqueue.queue]: siteexists = True logit('Site '+str(site)+' ignored because it was already in queue',somemodule='Sampler',stdout=True) if site in [i.activesite for i in self.samplingt]: siteexists = True logit('Site '+str(site)+' ignored because it was active.',somemodule='Sampler',stdout=True) if not siteexists: self.urlqueue.put((site,testrun,contenttype)) logit('Size of sample queue:'+str(self.urlqueue.qsize()),somemodule='Sampler',stdout=True) def printHelp(): """For printing help This is needed as input to any user starting the ETL server. Exits all active modules. Returns None """ print 'Usage:\npython samplingserver [OPTIONS]' print 'Options:' print '-p N - port to use. 8891 as default.' print '-t N - Number of sampling server server threads. 1 as default.' print '--help - This help file' sys.exit(1) def main(port,numthreads=None): sc = SystemConfiguration() host,port = urlparse.urlsplit(sc.samplingserver)[1].split(':') s = SamplingServer(host=host,port=int(port),numthreads=numthreads) tries = 1 error = True while tries<5 and error: #If address is taken, try several times. The reason is most often that restart has happened... try: server = SOAPpy.SOAPServer((host, int(port))) except socket.error: time.sleep(random.randint(1,30)*tries) tries += 1 else: error = False if tries==5: #If max tries, this will throw an exeption server = SOAPpy.SOAPServer((host, int(port))) server.registerFunction(s.loadSample) server.registerFunction(s.ping) try: server.serve_forever() except KeyboardInterrupt: s.stop() def daemonize(): # Disconnect from controlling TTY as a service try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Do not prevent unmounting... os.chdir("/") os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent, print eventual PID before #print "Daemon PID %d" % pid open(PIDFILE,'w').write("%d"%pid) sys.exit(0) except OSError, e: print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Redirect stdout/stderr to log file sys.stdout=sys.stderr=log # UID and GID Nobody os.setegid(65534) os.seteuid(65534) if __name__ == '__main__': if '--help' in sys.argv: printHelp() try: if '-p' in sys.argv: port = sys.argv[sys.argv.index('-p')+1] else: port = '8891' if '-d' in sys.argv: daemonise=True else: daemonise=False except IndexError: printHelp() if daemonise: daemonize() main(port)