#! /usr/bin/env python # vim: et ai si smarttab syn=python foldmethod=marker # -*- coding: UTF-8 -*- """Module for sampling when a crawl of the web site has been concluded $Id: test.py 803 2008-12-03 12:08:26Z goodwin $ """ # Copyright 2008-2010 eGovMon # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the eGovernment Monitoring # (eGovMon) # # eGovMon is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # eGovMon is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with eGovMon; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA from __future__ import with_statement __author__ = "$Author: christra, goodwin $" __version__ = "$Revision: 803 $" __updated__ = "$LastChangedDate$" import socket gsocket = socket.socket import sys, os import time import urllib import psycopg2 import threading import urlparse Semaphore = threading.Semaphore from egovmondb import * from logit import * from Queue import Queue from Queue import Full from Queue import Empty import traceback import SOAPpy import sc sc = sc.SystemConfiguration() socket.socket = gsocket socket.setdefaulttimeout(int(sc.samplerevaluationtimeout)) # {{{ Utilities class ServerWrapper: """ Simple Hidden Markov Chain to force the WAMs server to stick to one server unless it is unavailable""" #Ref: http://www.nd.edu/~lemmon/publications/ecrts05.pdf #The above automatically ignores WAM servers that are overloaded. def __init__(self,server,mainserver=None,protocol='soappy'): if protocol=='soappy': self.server = SOAPpy.SOAPProxy(server) #server else: #Pyro self.server = Pyro.core.getProxyForURI(server) self.numstates = 20 if server == mainserver: self.state = MVar(1) else: self.state = MVar(self.numstates) self.numreduce = 0 def increase(self): print 'Increasing:',self.server.proxy self.state.with_(lambda x: x + 1 if x < self.numstates else x) def reduce(self): print 'Reducing:',self.server.proxy self.state.with_(lambda x: x - 2 if x > 2 else 0) self.numreduce += 1 def markovsort(x,y): with x.state as xs: with y.state as ys: if xs == ys: return random.randint(-1, 1) return cmp(ys, xs) def getOneServer(servers_): with servers_ as servers: server = servers[0] #Default in case all servers are in state 0 servers.sort(markovsort) all = sum([server.state.read() for server in servers]) one = random.randint(0,all) allstates = 0 for server in servers: allstates += server.state.read() if allstates > one: return server return server class MVar: def __init__(self, value=None): self.value = value self.takemutex = Semaphore(1) self.putmutex = Semaphore(1) if value == None: self.takemutex.acquire() else: self.putmutex.acquire() self.entered = None def __enter__(self): self.entered = self.take() return self.entered def __exit__(self, type, value, traceback): self.put(self.entered) def take(self): self.takemutex.acquire() value = self.value self.value = None self.putmutex.release() return value def put(self, value): if value == None: raise Exception("Cannot have value == None.") self.putmutex.acquire() self.value = value self.takemutex.release() def switch(self, value): a = self.take() try: self.put(value) except: self.put(a) raise return a def read(self): a = self.take() self.put(a) return a def with_(self, f): a = f(self.take()) self.put(a) return a def inc(self): a = self.take() if type(a) != type(0): self.put(a) raise Exception("The MVar isn't an int.") self.put(a + 1) # }}} # {{{ DownloaderThread class DownloaderThread(threading.Thread): def __init__(self, parent): threading.Thread.__init__(self) self.opener = urllib2.build_opener() self.inqueue = parent.downloadqueue self.outqueue = parent.samplequeue self.parent = parent self.sleeping = False def stop(self): # Two times just to be sure. print "Emptying queue downloader" while not self.inqueue.empty(): try: self.inqueue.get(block=False) except Empty: pass print "Stopping downloader" self.inqueue.put((None,None)) print "Downloader stopped" #self.inqueue.put((None, None), False, 0.5) #self.inqueue.put((None, None), False, 0.5) def run(self): while 1: #if self.parent.shouldstop(): # return self.sleeping = True print 'Getting pageid' pageid = -1 #The following makes sure that get is not hanging on an empty queue: while pageid<0 and not self.parent.shouldstop(): try: pageid, url = self.inqueue.get(timeout=5) except Empty: pageid = -1 print 'Pageid got' self.sleeping = False if pageid == None or self.parent.shouldstop(): print 'Downloader Returning pageid:',pageid return #url = urllib.unquote(url) if False: # url.startswith('https'): try: d = self.opener.open(url) if d.geturl() != url: if self.parent.outOfScope(d.geturl(), url): self.parent.egovmondb.\ addRedirectOutOfScope(self.parent.siteid,\ self.parent.testrunid,\ pageid) logit('Ignoring '+repr(url)+' because it\'s redirected out of scope',\ 'egovmon', 'Sampler') continue content = d.read() header = dict(d.headers) print "Getting content type" ctype = header['content-type'].split(';')[0] print "Content type",ctype if len(ctype) > 1 and ctype[1][0:] == ' charset=': content = unicode(content, ctype[1][9:]) charset = ctype[1][9:] ctype = ctype[0] else: content = unicode(content, 'ISO-8859-1') charset = 'utf-8' if ctype not in ('text/html', 'application/xhtml+xml', 'application/xml', 'text/xml'): self.parent.egovmondb.addWrongContentType(self.parent.siteid,\ self.parent.testrunid,\ pageid) logit('Ignoring '+repr(url)+', ctype '+repr(ctype)+' isn\'t markup.', 'egovmon', 'Sampler') continue except Exception, e: self.parent.egovmondb.addPageUnavailable(self.parent.siteid,\ self.parent.testrunid, pageid) logit('Problem downloading URL '+repr(url),'egovmon','Sampler') traceback.print_exc() self.parent.sitetimeoutcount.inc() continue else: content = None header = None ctype = None charset = None self.outqueue.put((pageid, url, content, header, ctype, charset)) # }}} # {{{ SamplerThread class SamplerThread(threading.Thread): def __init__(self, parent): threading.Thread.__init__(self) self.parent = parent self.inqueue = parent.samplequeue self.outqueue = parent.insertqueue self.sleeping = False def stop(self): print "Emptying queue sampler" while not self.inqueue.empty(): try: self.inqueue.get(block=False) except Empty: pass print "Stopping sampler" self.inqueue.put((None,0,0,0,0,0)) print "Sampler stopped" #self.inqueue.put((None, 0, 0, 0, 0, 0), False, 0.5) #self.inqueue.put((None, 0, 0, 0, 0, 0), False, 0.5) def run(self): while 1: #if self.parent.shouldstop(): # return self.sleeping = True #print 'Gettining new data',self.inqueue.qsize() pageid = -1 #The following makes sure that get is not hanging on an empty queue: while pageid<0 and not self.parent.shouldstop(): try: pageid, url, content, header, ctype, charset = self.inqueue.get(timeout=False) except Empty: pageid = -1 print 'Pageid got' self.sleeping = False if pageid == None or self.parent.shouldstop(): print 'Sampler Returning. Pageid:',pageid return #print 'Not returing' retry = int(sc.samplertimeoutretrycount) if self.parent.contenttype == 'MobileOK': mobile = True logit("Checking with mobile on.","egovmon","Sampler") else: mobile = False logit("Checking with mobile off.","egovmon","Sampler") while retry > 0: thisserver = getOneServer(self.parent.servers) server = thisserver.server try: print 'Evaluating "%s" against "%s"' % (url, server.proxy) if content: logit("server.check("+repr(url)+", ..., contentType="+repr(ctype)+ ", encoding="+repr(charset)+", mobile="+repr(mobile)+")", "egovmon", "Sampler") res = eval(server.check(url, cont=content, httpheader=header, contentType=ctype, encoding=charset, mobile=mobile)) res["header"] = header else: logit("server.check("+repr(url)+", mobile="+repr(mobile)+")", "egovmon", "Sampler") res = eval(server.check(url,mobile=mobile)) print 'Evaluated "%s" against "%s"' % (url, server.proxy) retry = 0 except Exception, e: thisserver.reduce() string = str(e) if 'timed out' in string: time.sleep(10) retry -= 1 err = 'Connection timed out when evaluating '+repr(url) self.parent.wamtimeoutcount.inc() if retry > 0: err += ', retrying' else: self.parent.egovmondb.addWAMTimeout(self.parent.siteid,\ self.parent.testrunid, pageid) logit(err, 'egovmon', 'Sampler') elif 'Connection refused' in string: time.sleep(2.5) logit('Connection refused when evaluating url '+repr(url)+\ ' against '+repr(server.proxy), 'egovmon', 'Sampler') else: logit('Unknown error '+string, 'egovmon', 'Sampler') retry -= 1 if retry == 0: self.parent.egovmondb.addWAMTimeout(self.parent.siteid,\ self.parent.testrunid, pageid) res = None continue if res and not type(res) == type({}): res = eval(res) if res and type(res) == type({}): thisserver.increase() if res.has_key('invalidCType'): self.parent.egovmondb.addWrongContentType(self.parent.siteid,\ self.parent.testrunid,\ pageid) logit('Ignoring '+repr(url)+', the sampler says it isn\'t markup.', 'egovmon', 'Sampler') continue if res.has_key('result'): if 'redirect' in res: if self.parent.outOfScope(res['redirect'], url): logit('Ignoring '+repr(url)+' because it\'s redirected out of scope',\ 'egovmon', 'Sampler') self.parent.egovmondb.\ addRedirectOutOfScope(self.parent.siteid,\ self.parent.testrunid,\ pageid) continue if 'metadisallow' in res: logit('Ignoring '+repr(url)+' because it has a meta robots disallow tag.', 'egovmon', 'Sampler') self.parent.egovmondb.addMetaDisallow(self.parent.siteid, self.parent.testrunid, pageid) continue self.outqueue.put((pageid, url, res)) elif res.has_key('downloadok') and res['downloadok'] == 0: logit('Problem downloading '+repr(url), 'egovmon', 'Sampler') self.parent.egovmondb.addPageUnavailable(self.parent.siteid,\ self.parent.testrunid, pageid) self.parent.sitetimeoutcount.inc() else: logit('Problem downloading '+repr(url)+': Neither result nor downloadok == 0', 'egovmon', 'Sampler') else: logit('Evaluation of '+repr(url)+' failed.',\ 'egovmon', 'Sampler') # }}} # {{{ InserterThread class InserterThread(threading.Thread): def __init__(self, parent): threading.Thread.__init__(self) self.inqueue = parent.insertqueue self.parent = parent self.sleeping = False def stop(self): print "Emptying queue inserter" while not self.inqueue.empty(): try: self.inqueue.get(block=False) except Empty: pass print "Stopping inserter" self.inqueue.put((None,None,None)) print "Inserter stopped" #self.inqueue.put((None, None, None), False, 0.5) #self.inqueue.put((None, None, None), False, 0.5) def run(self): while 1: #if self.parent.shouldstop(): # return self.sleeping = True pageid = -1 #The following makes sure that get is not hanging on an empty queue: while pageid<0 and not self.parent.shouldstop(): try: pageid, url, sample = self.inqueue.get(timeout=5) except Empty: pageid = -1 self.sleeping = False if pageid == None or self.parent.shouldstop(): print 'Inserter returning. Pageid:',pageid return self.parent.scenariocount.inc() if "header" in sample and type(sample["header"]) == type({}): try: server = sample["header"]["server"] except: server = '' elif "header" in sample and type(sample["header"]) == type(""): server = '' for header in [i for i in sample["header"].split("\r\n") if i.strip() and ':' in i]: #print 'head, content = header.split(": ", 2)',header head, content = header.split(": ", 2) if head.lower() == "server": server = content break else: server = '' if self.parent.contenttype == 'HTML': score = self.parent.egovmondb.addPageHTMLResults(self.parent.siteid,\ self.parent.testrunid, pageid, sample["result"], sample["authtool"], server, sample["docstring"]) if self.parent.contenttype == 'PDF': score = self.parent.egovmondb.addPagePDFResults(self.parent.siteid,\ self.parent.testrunid, pageid, sample["result"]) if self.parent.contenttype == 'MobileOK': score = self.parent.egovmondb.addPageMobileOKResults(self.parent.siteid,\ self.parent.testrunid, pageid, sample) logit(str(self.parent.scenariocount.read()) + ' - Finished evaluating page '+repr(url)+' id '+str(pageid) + ' for site '+self.parent.site+' with score '+str(score),'egovmon','Sampler') # }}} # {{{ Sampler class Sampler: def __init__(self, site, testrunid,contenttype): self.egovmondb = eGovMonDB() if site.startswith('http'): self.site = ''.join(urlparse.urlparse(site)[1:]).rstrip('/') else: self.site = site self.testrunid = testrunid self.contenttype = contenttype if self.contenttype == 'HTML': self.minsamplecount = int(sc.htmlminsamplecount) if self.contenttype == 'PDF': self.minsamplecount = int(sc.pdfminsamplecount) if self.contenttype == 'MobileOK': self.minsamplecount = int(sc.mobileokminsamplecount) if self.egovmondb.getSiteFinished(self.site,self.testrunid,self.contenttype) or self.egovmondb.getSiteShouldBeFinished(self.site,self.testrunid,self.contenttype): print 'self.egovmondb.getSiteFinished(self.site,self.testrunid,self.contenttype)',self.egovmondb.getSiteFinished(self.site,self.testrunid,self.contenttype) print 'self.egovmondb.getSiteShouldBeFinished(self.site,self.testrunid,self.contenttype)',self.egovmondb.getSiteShouldBeFinished(self.site,self.testrunid,self.contenttype) logit('Cannot sample site '+self.site+' because results already exists on testrunid '+self.testrunid+' for content type'+self.contenttype+'.',somemodule='Sampler',stdout=True) sys.exit(1) self.siteid = self.egovmondb.getSiteID(self.site) self.downloadqueue = Queue(int(sc.samplerthreads) * 2) self.samplequeue = Queue(int(sc.samplerthreads) * 2) self.insertqueue = Queue() self.forcestop = MVar(False) self.scenariocount = MVar(0) self.sitetimeoutcount = MVar(0) self.wamtimeoutcount = MVar(0) if self.contenttype == 'HTML': wamservers = sc.wamservers if self.contenttype == 'PDF': wamservers = sc.pdfservers if self.contenttype == 'MobileOK': wamservers = sc.wamservers self.servers = MVar([ServerWrapper(urllib.unquote(i)) for i in wamservers.split(',')]) self.downloaders = [DownloaderThread(self) for i in xrange(int(sc.samplerthreads))] self.samplers = [SamplerThread(self) for i in xrange(int(sc.samplerthreads))] self.inserter = InserterThread(self) [i.start() for i in self.downloaders] [i.start() for i in self.samplers] self.inserter.start() def outOfScope(self, url, base): redirurl = urlparse.urljoin(base, url, False) return not self.egovmondb.performInclusionExclusionCheck(redirurl, self.site) def shouldstop(self): #print "self.scenariocount.read()",self.scenariocount.read() #print "int(sc.minsamplecount)",int(sc.minsamplecount) #print "self.forcestop.read()",self.forcestop.read() if self.scenariocount.read() >= self.minsamplecount or self.forcestop.read(): return True return False def stop(self): print 'Stopping' print 'self.forcestop.switch(True)' self.forcestop.switch(True) print '[i.stop() for i in self.downloaders]' [i.stop() for i in self.downloaders] print '[i.stop() for i in self.samplers]' [i.stop() for i in self.samplers] print 'self.inserter.stop()' self.inserter.stop() #exept full is not a good strategy because: #(1) it may be thrown by the first thread, preventing stop to be called for the remaining #(2) if a none is not added to the thread, it will not be known that it should stop. #try: # [i.stop() for i in self.downloaders] #except Full, e: # print 'Full queue' # pass #try: # [i.stop() for i in self.samplers] #except Full, e: # pass #try: # self.inserter.stop() #except Full, e: # pass def join(self): print "Joining." print "[i.join() for i in self.downloaders]" print "[i.inqueue for i in self.downloaders]",[i.inqueue.queue for i in self.downloaders] [i.join() for i in self.downloaders] print "i.join() for i in self.samplers]" [i.join() for i in self.samplers] print "self.insertes.join()" self.inserter.join() def is_it_silent(self): return len([i for i in self.downloaders if i.sleeping == False]) == 0 and\ len([i for i in self.samplers if i.sleeping == False]) == 0 and\ self.inserter.sleeping == True def sample(self): scenariocount,scenarios = self.egovmondb.getUnsampledPages(self.siteid,self.testrunid,self.contenttype) self.scenariocount.switch(scenariocount) logit('Starting sampling for site '+self.site+' from a total of '+str(len(scenarios))+' page scenarios. '+str(scenariocount) + ' pages already in the DB','egovmon','sampler') random.shuffle(scenarios) for tuple in scenarios: if self.shouldstop(): print "Breaking the main put-loop because of self.shouldstop()" break print "Adding "+repr(tuple[1])+" to queue." try: self.downloadqueue.put(tuple,timeout=120) except Full: print "Timeout on downloadqueue" print "Added." if self.shouldstop(): self.stop() else: # We ran out of scenarios. Give it 10 minutes to empty its queues. logit('Finished sampling and waiting for queues to finish for site '+self.site,'egovmon','Sampler') for _ in xrange(120): if self.shouldstop() or self.is_it_silent(): time.sleep(0.1) if self.shouldstop() or self.is_it_silent(): break time.sleep(5) print 'Watinging for queues to become empty...' self.stop() self.join() print "Finished joining. Finalising site" if self.contenttype == 'HTML': self.egovmondb.finaliseSiteHTML(self.site,self.testrunid) if self.contenttype == 'PDF': self.egovmondb.finaliseSitePDF(self.site,self.testrunid) if self.contenttype == 'MobileOK': self.egovmondb.finaliseSiteMobileOK(self.site,self.testrunid) print "All done." # }}} def main(): try: site = sys.argv[1] testrunid = sys.argv[2] contenttype = sys.argv[3] except IndexError: sys.stderr.writelines((\ 'Error in input\n',\ '\n',\ 'Usage:\n',\ str(sys.argv[0]) + ' site testrunid contenttype\n',\ 'Example:\n', str(sys.argv[0]) + ' www.fylkesmannen.no 1234 HTML\n' )) raise SystemExit, 1 print 'Starting sampling of site',site,'for',contenttype,'with testrunid:',testrunid,'.' s = Sampler(site, testrunid, contenttype) s.sample() if __name__ == '__main__': main()