#! /usr/bin/env python # -*- coding: UTF-8 -*- """Module for sampling when a crawl of the web site has been concluded $Id: test.py 803 2008-12-03 12:08:26Z goodwin $ """ # Copyright 2008 eGovMon # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the eGovernment Monitoring # (eGovMon) # # eGovMon is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # eGovMon is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with eGovMon; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA __author__ = "$Author: goodwin $" __version__ = "$Revision: 803 $" __updated__ = "$LastChangedDate$" import sys from egovmontime import * import time import urllib import urlparse import psycopg2 import random import urllib2 import os import sys import adaptivesampling from adaptivesampling import * from RDFgenerator import * from sc import * import Queue import SOAPpy from urlrep import * from socket import gethostname; import threading from samplingerror import * import RDFgenerator from RDFgenerator import * import RDFreaderwriter from RDFreaderwriter import * import sc from logit import * import socket socket.setdefaulttimeout(int(SystemConfiguration().samplerevaluationtimeout)) import base64 from threading import Semaphore class ServerWrapper: """ Simple Markov Chain to force the WAMs server to stick to one server unless it is unavailable""" #Ref: http://www.nd.edu/~lemmon/publications/ecrts05.pdf #The above automatically ignores WAM servers that are overloaded. def __init__(self,server,mainserver=None,protocol='soappy'): if protocol=='soappy': self.server = SOAPpy.SOAPProxy(server) #server else: #Pyro self.server = Pyro.core.getProxyForURI(server) self.numstates = 20 if server==mainserver: self.state = 1 else: self.state = self.numstates def increase(self): print 'Increasing:',self.server.proxy self.state +=1 if self.state > self.numstates: self.state = self.numstates def reduce(self): print 'Reducing:',self.server.proxy self.state -= 2 if self.state < 0: self.state = 0 def markovsort(x,y): if x.state==y.state: return random.randint(-1,1) return cmp(y.state,x.state) typeofsampling = 'html' htmlsampling = True pdfsampling = False if sys.argv[-1]=='pdf': typeofsampling='pdf' htmlsampling = False pdfsampling = True if pdfsampling: #should run pdf-sampling import Pyro.core allservers = [ServerWrapper('PYRONAME://pdfwam',protocol='pyr'),] else: allservers = [ServerWrapper(urllib.unquote(i)) for i in sc.wamservers.split(',') if urlparse.urlparse(sc.eaccchecker)[1].split(':')[0] not in i] allservers.sort(markovsort) print urlparse.urlparse(sc.eaccchecker)[1] print [i.server.proxy for i in allservers] def scenarioOK(scenario): """Investigates if the current scenario is OK scenario -- Current scenario as a list of RDF Returns False if any of the EARL is missing, or has earl:cannotTell. Returns True otherwise. """ for scen in scenario: if not scen: sys.stderr.write('Scenario not present: '+str(scen)+'\n') return False elif 'earl:cannotTell' in scen: sys.stderr.write('Scenario cannotTell: '+str(scen)+'\n') return False elif 'http://www.w3.org/WAI/ER/EARL/nmg-strawman#cannotTell' in scen: sys.stderr.write('Scenario cannotTell: '+str(scen)+'\n') return False return True def pageRDF(page,urlrep): """Magical function to get the RDF a page Keyword arguments: page -- Actual page """ sid,url,html,header,doctype,authoringtool,webcacheurl,language,fullurl = (page) urlrep.getNextPageSurvey() try: if not doctype: doctype = '' if not authoringtool: authoringtool = '' rdf = getpagesurveyrdf(doctype, authoringtool, header,webcacheurl,language=language,fullurl=fullurl) return rdf[1],rdf[0] except EmptyHeaderInformationError: #No header information available. Page is invalid and is returned as None. The scenario will be thrown. return urlrep.getCurrentPageSurvey(),None def getDocString(html): '''Extract the doc string from the html content Keyword arguments: html -- html content of web page Returns the docstring if present, else an empty string ''' html = html.strip().lower() if re.match(r'(\<\!doctype).*', html) is None: return '' else: tag = html[0:html.find('>')+1] return re.sub('\<|\>|(!doctype)|\n', '', tag).strip() def makeRealUnicode(data): if type(data) == type(u''): return data.encode('utf-8') try: return unicode(data,sys.getdefaultencoding()).encode('utf-8') except (UnicodeDecodeError,UnicodeDecodeError): pass try: return unicode(data,sys.getfilesystemencoding()).encode('utf-8') except (UnicodeDecoreError,UnicodeDecodeError): pass try: return unicode(data,'utf-8').encode('utf-8') except (UnicodeDecodeError,UnicodeDecodeError): pass try: return unicode(data,'iso-8859-1').encode('utf-8') except (UnicodeDecodeError,UnicodeDecodeError): pass return def getHeaderOfURL(url): """Retrieved header for a given URL as a dictionary. Empty dictionary of no header is available. Keyword arguments: url -- URL request as a socket object Returns header as a python dictionary""" return dict(url.headers) def getAuthoringtool(html): '''Extract the authoring tool string from the html content Keyword arguments: html -- html content of web page Returns the generator string if present, else an empty string ''' html = html.strip().lower() if html.find('',start)+1 tag = html[start:end] gen = re.sub('\<|\>|meta name="generator"|content=|\"','',tag) return gen.strip() def getCacheURL(cachefilename, sc): '''Construct the cache url from cache filename ''' webstring=sc.webcacheurl filesystemstring=sc.webcachedirectory return cachefilename.replace(filesystemstring, webstring) def getHeaderContentLength(header): """Gets the content length from the HTTP header, if any Keyword arguments: header -- HTTP header as a dictionary Returns conent length as integer. None if content length is present. """ return header.get('HeaderContentLength',None) def getBase64(html,charset=None): if type(html) == type(u''): return base64.encodestring(html.encode('utf-8')) if charset: try: return base64.encodestring(unicode(html,charset).encode('utf-8')) except (UnicodeDecodeError,UnicodeDecodeError): pass try: return base64.encodestring(unicode(html,sys.getdefaultencoding()).encode('utf-8')) except (UnicodeDecodeError,UnicodeDecodeError): pass try: return base64.encodestring(unicode(html,sys.getfilesystemencoding()).encode('utf-8')) except (UnicodeDecoreError,UnicodeDecodeError): pass try: return base64.encodestring(unicode(html,'utf-8').encode('utf-8')) except (UnicodeDecodeError,UnicodeDecodeError): pass try: return base64.encodestring(unicode(html,'iso-8859-1').encode('utf-8')) except (UnicodeDecodeError,UnicodeDecodeError): pass return def getCalculatedContentLength(html,charset=None): """Calculates the content length in bytes of an HTML file Keyword arguments: html -- html content of a web page charset -- [Optional] charset of the html Returns calculated content length as an integer """ #Even though this uses HTML as input, this function could also be used for e.g. CSS when CSS sampling is implemented. #Example based on http://www.velocityreviews.com/forums/t362115-how-to-get-size-of-unicode-stringstring-in-bytes-.html #Some expert advice: #The two ways to detect a string's encoding are: #(1) know the encoding ahead of time #(2) guess correctly if type(html) == type(u''): return len(html.encode('utf-8')) if charset: try: return len(unicode(html,charset).encode('utf-8')) except UnicodeDecodeError: pass try: return len(unicode(html,sys.getdefaultencoding()).encode('utf-8')) except UnicodeDecodeError: pass try: return len(unicode(html,sys.getfilesystemencoding()).encode('utf-8')) except UnicodeDecoreError: pass try: return len(unicode(html,'utf-8').encode('utf-8')) except UnicodeDecodeError: pass try: return len(unicode(html,'iso-8859-1').encode('utf-8')) except UnicodeDecodeError: pass class DownloaderThread(threading.Thread): def __init__(self,parent): threading.Thread.__init__(self) self.queue = Queue.Queue() self.parent = parent self.opener = urllib2.build_opener() self.shouldstop = False def download(self,url,thisqueue): self.queue.put((url,thisqueue)) def stop(self): self.queue.put(None) def run(self): while 1: element=self.queue.get() if element==None: return elif self.shouldstop: pass else: urls,thisqueue = element for url in urls: try: if url in self.parent.downloadcache.keys(): thisqueue.put(self.parent.downloadcache[url]) else: if url.startswith('https'): d = self.opener.open(url) content = d.read() content = unicode(content,'iso-8859-1') header = getHeaderOfURL(d) else: content = None header = None self.parent.downloadcache[url] = (url,content,header) thisqueue.put((url,content,header)) except Exception, e: self.parent.errors += 1 res = None sys.stderr.write('Problem downloading url:'+str(url) + '\n') import traceback traceback.print_exc() thisqueue.put(None) self.parent.parent.IncrementUnavailable() class SamplerThread(threading.Thread): def __init__(self,wamservers,rw,urlrep,sc,parent,testrunid): threading.Thread.__init__(self) self.downloaderthread = DownloaderThread(parent) self.downloaderthread.start() self.parent = parent self.testrunid = testrunid self.rw = rw self.sc = sc self.urlrep = urlrep #Setting the queue to a maximum 10 Scenarios at a time to minimize the over sampling #Main thread will wait if queue is at maximum level - avoiding puhsing scenarios more scenarios than are needed. self.queue = Queue.Queue(2) self.minscenario = int(self.sc.minsamplecount) self.shouldstop = False def check(self,scenarioid,urls): thisqueue = Queue.Queue() self.downloaderthread.download(urls,thisqueue) try: self.queue.put((scenarioid,urls,thisqueue),timeout=100) except Queue.Full: return False return True def stop(self): self.queue.put(None) self.downloaderthread.stop() def run(self): while 1: element=self.queue.get() if element==None: return elif self.shouldstop: pass elif self.parent.urlrep.getsamplecount() > self.minscenario: pass else: scenarioid,urls,thisqueue = element contents = [] for url in urls: element = thisqueue.get() if element: url,content,header = element contents.append((url,content,header)) wamresults = [] if [i for i in contents if not i[0].endswith('css')]: for url,content,header in [i for i in contents if not i[0].endswith('css')]: #Force CSS not to be part of the Sampler, even if it would pass through the crawler... if url in self.parent.wamcache.keys(): wamresults.append(self.parent.wamcache[url]) else: ctype = 'text/html'#header.get('content-type','').lower() charset = 'utf-8' rules = 'uwem_10' if ';' in ctype: ctype = ctype.split(';')[0] if ctype not in ('text/html', 'application/xhtml+xml', 'application/xml', 'text/xml'): print 'Ignoring URL:',url,ctype,'not value' retry = False res = None else: retry = True while retry: allservers.sort(markovsort) try: retry = False try: thisserver = allservers[0] wamserver = thisserver.server #random.sample(self.wamservers,1)[0] except: print 'Error getting WAM server' try: print 'URL to evaluate:',url,len(allservers) if pdfsampling: #PDF-wam does not support downloading of PDFs. Must be done by the sampler. content = None#urllib2.urlopen(url).read() ctype = 'application/pdf' header = {} res = wamserver.check(url,content,ctype,charset,'all',header) print 'Finished evaluateding url:',url except MemoryError: res = None self.parent.wamcache = {} self.parent.downloadcache = {} except (SOAPpy.HTTPError,Exception), e: thisserver.reduce() if 'Connection refused' in str(e): retry = True time.sleep(0.5) try: sys.stderr.write('Connection refused when evaluation url:'+str(url) +' towards ' +str(wamserver.proxy) + '\n') except: pass res = None try: sys.stderr.write('Problem evaluating url:'+str(url) +' towards ' +str(wamserver.proxy) + '\n') except: pass import traceback traceback.print_exc() print 'Hopefully finished evaluateding url:',url if res and (type(res)==type({}) or eval(res)): if not type(res)==type({}): res = eval(res) earl = res['earl'] if url.startswith('https'): docstring = getDocString(content) authtool = getAuthoringtool(content) else: header = dict([i.split(':',1) for i in res.get('header','').split('\n') if i]) #print header print wamserver.proxy if htmlsampling: docstring = res['docstring'] authtool = res['authtool'] else: docstring = '' authtool = '' wamresults.append((url,earl,header,content,docstring,authtool)) try: thisserver.increase() except: pass else: self.parent.IncrementUnavailable() thisserver.reduce() print 'Number of WAM results:',len(wamresults) currentscenario = [] pagesurveys = {} results = [] currentscenario = [] for url,result,header,content,docstring,authtool in wamresults: thisscen = (None,None,None,url,0,0,None) #Cache URL is no longer used, since we no longer store the pages. However, for the RDF graph to be valid and readable for the ETL, it needs to be there. #cacheurl = '/deprecated/we/no/longer/store/pages' cacheurl = '/'+'/'.join(urlparse.urlparse(url)[1:]) + '/' + str(self.testrunid) page = (scenarioid,url,content,header,docstring,authtool,getCacheURL(cacheurl,self.sc),None,url) pagesurveyid,pRDF = pageRDF(page,self.urlrep) pagesurveys[pagesurveyid] = (pRDF,result,url) results.append(result) currentscenario.append(pRDF) try: currentscenario.append(getearl(url,result,pagesurveyid,preparseEARL=True)) except: sys.stderr.write('Error getting EARL from '+str(url) + ' investigate why\n') currentscenario.append(None) if scenarioOK(currentscenario) and results: thisscen = (None,None,None,url,0,0,None) barrierindicator = getEARLCWAM2('\n'.join(results)) currentscenario.append(RDFgenerator.getscenariordf([thisscen],pagesurveyid=pagesurveys.keys()[0],keyusescenario=False,scenarioid=str(scenarioid),barrierindicator=barrierindicator)) d = dynamic(0.05,self.urlrep.getCurrentSiteSurvey(),results,self.urlrep,self.minscenario) self.urlrep.addSample(self.urlrep.domain,1) self.urlrep.numpages +=1 for rdf in currentscenario: rdf = rdf.strip() if rdf.startswith('0 def __SamplingErrorMargin(self,errmarg): """Funtion for sampling towards an error margin""" print "WARNING: Sampling accroding towards an error margin is deprecated. You should rather choose to sample towards a fixed interval." return False def sampling(self,*list,**dict): """Actual sampling function. This function is never intended to run, but should be replaced by another sampling function in runtime """ raise(NoSamplingChosenError()) def __del__(self): """Destructore method making sure all threads are closed""" self.rw.stop() def __getAllScenarios(self): """Function that retrieves all scenarios available from a site Returns a complete dictionary of all scenarios availabel from this set. """ #Note that this function does not use the urlrep-module. The reason for this is that we can only select a scenario once, which is not supported in the URL rep. #if self.site.startswith('http'): # self.site = urlparse.urlparse(site)[1] site = self.site testrunid = self.testrunid #Note that a subselect in the following query is required as long as it is essential that a scenairo is complete (all URLs part of the scenario is part retireved if the scenarioid is retrieved). if htmlsampling: self.cur.execute("select distinct bar.scenarioid,url from (select distinct foo.scenarioid from (select distinct scenarioid,random() from page where testrunid=%(testrunid)s and domain=%(site)s and not typeofpage=1 order by random()) as foo,page where testrunid=%(testrunid)s and domain=%(site)s and foo.scenarioid=page.scenarioid and not typeofpage=1 limit 600) as bar,page where testrunid=%(testrunid)s and domain=%(site)s and bar.scenarioid=page.scenarioid;",locals()) else: self.cur.execute("select distinct bar.scenarioid,url from (select distinct foo.scenarioid from (select distinct scenarioid,random() from page where testrunid=%(testrunid)s and domain=%(site)s and typeofpage=1 order by random()) as foo,page where testrunid=%(testrunid)s and domain=%(site)s and foo.scenarioid=page.scenarioid and typeofpage=1 limit 600) as bar,page where testrunid=%(testrunid)s and domain=%(site)s and bar.scenarioid=page.scenarioid;",locals()) data = self.cur.fetchall() if not data: return {} retdict = {} for d in data: id = int(d[0]) url = d[1] retdict[id] = retdict.get(id,[]) + [urllib.unquote(url)] self.cur.execute("select distinct domain from site where domain=%(site)s and smallsite=true",locals()) tempdata = self.cur.fetchall() if tempdata: self.issmall=True else: self.issmall=False if len(retdict.keys())599: #self.logger.info('ERROR: Maximum number of errors reached') #self.logger.info('No longer pushing scenarios. End criteria met. Waiting for threads to finish.') testrun = self.urlrep.getCurrentTestRun() testrunnr = self.urlrep.getCurrentTestRun(retint=True) surveynr = self.urlrep.getCurrentSiteSurvey(retint=True) rdf.append(gettestrunrdf(testrunnr)) rdf.append(getendtestrunrdf(testrunnr)) #Note that storage location is needed for the RDF graph to be valid and thus for the ETL to read it. #It is however fake, since we no longer store the pages. loglocation = '/we/no/longer/store/pages/this/should/be/removed/from/here/' import sys print 'Stopping.' sys.stdout.flush() self.samplerdispatcher.stop() print 'Joining.' sys.stdout.flush() self.samplerdispatcher.join() print 'Comitting' sys.stdout.flush() if sitedictionary: if not self.sitesurvey: self.sitesurvey = sitedictionary.keys()[0] try: average,standarddev,errmarg = getAvgCWAM(self.sitesurvey,needstddev=False) except TypeError: average,standarddev,errmarg = (0,0,0) average,standarddev,errmarg = getAvgCWAM(self.sitesurvey) rdf.append(getsitesurveyrdf('http://'+self.site,testrunnr, self.sitesurvey,loglocation,average,standarddev,errmarg,self.urlrep.getsamplecount(),surveynr,testrun,self.numberunavailable)) self.__writeRDF(rdf) self.rw.commit() if self.urlrep.getsamplecount() < int(self.sc.minpagessampled) and not self.issmall:# or self.samplerdispatcher.errors>150: #Too few scenarios errortext = 'Too few scenarios reached for site '+self.site+'. Result:'+str(average)+' Error margin: '+ str(errmarg) +' and number of samples: '+str(self.urlrep.getsamplecount())+' to database '+str(self.database) if self.toofewsamplesinurlrep: errortext += '. Too few samples in the URL repository, not even attempt to sample.' errortext += '\n' logit(errortext,'egovmon','sampler') logit(errortext,'failedsites','sampler') if not self.scenarios: logit('All scenarios for site '+self.site+' has been retrieved. Result:'+str(average)+' Error margin: '+ str(errmarg) +' and number of samples: '+str(self.urlrep.getsamplecount())+' to database '+str(self.database),'egovmon','sampler') else: logit('Stopping criteria reached for site '+self.site+'. Result:'+str(average)+' Error margin: '+ str(errmarg) +' and number of samples: '+str(self.urlrep.getsamplecount())+' to database '+str(self.database),'egovmon','sampler') def __writeRDF(self,rdf): """Internal method for pushing RDF to the RDF-writer Keyword arguments: rdf -- List of RDF as string / unicode returns None """ for r in rdf: r = r.replace('','') self.rw.writeRDF(r,async=True) def getRandomScenario(self): """Retrieves a random scenario from a site. A scenario is a set of URLs connected to one site. In most cases a scenario will consist of one (x)html and zero or more CSS files. However, in the case of frames, a frame scenario could include several (x)html pages""" if not self.scenarios: return {} id = random.sample(self.scenarios.keys(),1)[0] urls = self.scenarios.pop(id) if urls: return {id:urls} else: return {} def stop(self): self.rw.stop() import string def onlyascii(u): if u in string.ascii_letters: return u else: return '_' def create_dynamicdb(dbname): """Creating dynamic databases Keyword arguments: dbname -- Name of database returns None """ # Connect to MySQL as root and create new RDF database connection = MySQLdb.connect(host='localhost', user='root') c=connection.cursor() # Creating dynamic DB with name',dbnamea dbexists = False try: c.execute("CREATE DATABASE "+dbname) except ProgrammingError, e: # Error: RDF Database already exists. print e connection.close() dbexists = True #raise crawlererror.RDFDBAlreadyExistsError(dbname) #thissc = sc.SystemConfiguration() thissc = sc etlserver=urlparse.urlsplit(thissc.etlserver)[1].split(':')[0] # Granting privileges to localhost' connection = MySQLdb.connect(host='localhost', user='root') c=connection.cursor() #TODO: Find a solution which does not include hardcoding of the domains for server in ['localhost',etlserver,'eiao1.eiao.net','eiao2.eiao.net','eiao3.hia.no','eiao4.hia.no','']: c.execute("""GRANT ALL PRIVILEGES ON %s.* TO '%s'@%s IDENTIFIED BY '%s';""" %(dbname, thissc.dbusername, server, thissc.dbpassword)) # Granting privileges to etlserver #etlserver=urlparse.urlsplit(sc.etlserver)[1].split(':')[0] #c.execute("""GRANT ALL PRIVILEGES ON %s.* # TO '%s'@%s IDENTIFIED BY '%s'""" % # (dbname, sc.dbusername, etlserver, sc.dbpassword)) # Embedded tables.sql here, to avoid another external file reference. c.execute("USE "+dbname) if dbexists: c.execute("""DROP TABLE RAWRDF;""") c.execute(""" CREATE TABLE RAWRDF( rdf longblob ) ENGINE=MyISAM;""") def main(): try: site = sys.argv[1] db = sys.argv[2] testrunid = sys.argv[3] htmlofpdf = sys.argv[4] except IndexError: sys.stderr.writelines((\ 'Error in input\n',\ '\n',\ 'Usage:\n',\ str(sys.argv[0]) + ' site db testrunid pdforhtml\n',\ 'Example:\n',\ str(sys.argv[0]) + ' www.fylkesmannen.no rdf_1529_1190626383_43_6344313 1234 html\n' )) sys.exit(0) print 'Starting sampling of site',site,'with testrunid:',testrunid create_dynamicdb(db) #Checking if the site is already in the DW. If it is, do not sample from it. con = psycopg2.connect(user=sc.dwuser, database=sc.dwdatabase, password=sc.dwpassword,host=sc.dwhost) #con = psycopg2.connect(user=sc.SystemConfiguration().dwuser, database=sc.SystemConfiguration().dwdatabase, password=sc.SystemConfiguration().dwpassword,host=sc.SystemConfiguration().dwhost) cur = con.cursor() thissite = site if thissite.startswith('http://'): thissite = thissite[7:] elif thissite.startswith('https://'): thissite = thissite[8:] cur.execute("select True from datastaging.site natural join datastaging.resource natural join datastaging.resourceversion where testrunid=%(testrunid)s and site=%(thissite)s;",locals()) try: shouldvisit = not cur.fetchall()[0][0] except IndexError: shouldvisit = True if 'forcesample' in sys.argv: shouldvisit = True if shouldvisit: sa = SamplingAlgorithm(site,db,testrunid) pre = time.time() try: sa.performAdaptiveSampling() except Exception,e: sa.stop() raise e if site.startswith('http'): site = urlparse.urlsplit(site)[1] sitefilename = ''.join([onlyascii(i) for i in str(site)]) #Note the following is a hack to be able to perform performance evaluations. print 'Sampling finished' sa.stop() if (not sa.numscen < int(sa.sc.minpagessampled)) or sa.issmall: etlserver=sc.etlserver etlserver = SOAPpy.SOAPProxy(etlserver) host = gethostname() etlserver.loadETL(db,testrunid, False,host) print 'Loading to the ETL',sa.issmall else: print 'Not loading to the ETL',sa.issmall else: print 'Not sampling. Data already in the DW.' if __name__ == '__main__': #import psyco #psyco.full() import os os.system('renice -1 ' + str(os.getpid())) main() #import hotshot, hotshot.stats#, test.pystone #prof = hotshot.Profile("stones.prof") #prof.runcall(main) #prof.close()