#!/usr/bin/python # Script which can process CentOS errata announcements and convert # them to errata in spacewalk. Based on rhn-tool.py by Lars Jonsson # Copyright (C) 2010 Biomathematics and Statistics Scotland # # Author: Lars Jonsson (ljonsson@redhat.com) # David Nutter (davidn@bioss.ac.uk) # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # CHANGELOG: # # 0.2 2010-05-28 # - Search spacewalk server for packages (thanks to Raal Goff) # - Multiple architecture support # - User can specify configuration files # - Scraping of mail-archive.com and RHN for errata details (thanks to Raal Goff) # - Improved argument parsing logic # 0.1 2009-05-30 # - First release of the tool from optparse import OptionParser from datetime import datetime import sys import re import os import xmlrpclib import libxml2 import email import traceback import rpm import ConfigParser import urllib import getpass #Config constants. CONFIG_FILE="centos-errata.cfg" VALID_ARCH=set(["i386","x86_64","ia64","ppc", "alpha", "sparc", "s390", "s390(x)"]) MAILARCHIVE_BASE="http://www.mail-archive.com/centos-announce@centos.org/" #Common regular expressions PACKAGE_LIST="(?P\S+)\s+(?P[\.\w-]+.rpm)" PACKAGE_NAMEONLY="(?P.*?)-\d+.([\.\w-]+.rpm)" SECURITY_INFO="(?P\w+) CentOS\s+(?P\d)\s+(?P\w+)\s+(?P.*)$" BUG_INFO="CentOS\s+(?P\d)\s+(?P\w+)\s+(?P.*)$" ENHANCE_INFO="CentOS\s+(?P\d)\s+(?P\w+)\s+(?P.*)$" #Things to match in centos-announce messages DIGEST_BEGIN="----------------------------------------------------------------------\n\n" DIGEST_SEPARATOR="------------------------------\n\n" ARCHIVE_SEPARATOR="From .*[A-Za-z]{3,3} [A-Za-z]{3,3} [ 0-9]{2,2} \d{2,2}:\d{2,2}:\d{2,2} \d{4,4}\n" ERRATA_SUBJECT="\[CentOS-announce\] (?P\w{4,4})-(?P\d{4,4})(:|-)(?P\d{4,4})\s+(?P.*)$" #Things to match in pages downloaded from mail-archive.com MAILARCHIVE_SUBJECT = "\d+)\" href=\"(?P[\w.]+)\">(?P[^<]+)" #Things to match in pages downloaded from RHN RHN_ERRATA_DETAILS="

Details

(\s+)
(?P
[\w\W\s]+)
(\s+)
(\s+)

Solution

" RHN_ERRATA_SOLUTION="

Solution

(\s+)
(?P[\w\W\s]+)
(\s+)
(\s+)

Updated packages

" #Tags for the different advisory types SECURITY_ERRATA="CESA" BUG_ERRATA="CEBA" ENHANCE_ERRATA="CEEA" #Special names for errata types in spacewalk SECURITY='Security Advisory' ENHANCEMENT='Product Enhancement Advisory' BUGFIX='Bug Fix Advisory' #Cache of already processed errata errata_cache = {} active_arches = [] # Library used to represent a system ID in spacewalk class RHNSystem: def __init__(self,sysid,name,lastCheckin): self.systemid=sysid self.name=name self.lastCheckin=lastCheckin # Module to represent package info as returned by findByNVREA API function # TODO: more substantial class required for package details class RHNPackage: def __init__(self,name,version,release,epoch,archLabel): self.id = None self.name = name self.version = version self.release = release self.epoch = epoch self.archLabel = archLabel self.path = None self.provider = None self.lastModified = datetime.today() #TODO: add epoch to this, if present, rename method def getNVRA(self): result = "%s-%s-%s.%s" % (self.name,self.version,self.release,self.archLabel) return result # Library which represents an Errata in spacewalk. # Used for creating and retrieving errata information # TODO: Channel and Bugs should probably be types in their own right class RHNErrata: def __init__(self): self.synopsis = None self.advisoryName = None self.advisoryRelease = 1 self.advisoryType = SECURITY self.product = None self.topic = None self.description = None self.references = "" self.notes = "" self.solution = None self.publish = False self.channelLabel = [] self.keywords=[] self.bugs=[] self.packages=[] self.issueDate = datetime.now() self.modifiedDate = datetime.now() self.updateDate = datetime.now() def getPackageIds(self): result=map((lambda pkg: pkg.id),self.packages) return result def getInfoDict(self): result={} result['synopsis']=self.synopsis result['advisory_name']=self.advisoryName result['advisory_release']=self.advisoryRelease result['advisory_type']=self.advisoryType result['product']=self.product result['topic']=self.topic result['description']=self.description result['references']=self.references result['notes']=self.notes result['solution']=self.solution return result def readyToCreate(self): info = self.getInfoDict() for required_attr in ['synopsis','advisory_name','advisory_release','advisory_type','product','topic','description','solution']: if info[required_attr] is None: return False return True def addPublishChannel(self,new_channel_label): try: self.channelLabel.index(new_channel_label) return False except: self.channelLabel.append(new_channel_label) return True def printOut(self): print "%-20s = %s" % ("Name:",self.advisoryName) print "%-20s = %s" % ("Release:",self.advisoryRelease) print "%-20s = %s" % ("Product:",self.product) print "%-20s = %s" % ("Synopsis:",self.synopsis) print "%-20s = %s" % ("Topic:",self.topic) print "%-20s = %s" % ("Description:",self.description) print "%-20s = %s" % ("Solution:",self.solution) for channel in self.channelLabel: print "%-20s = %s" % ("Target Channel:",channel) for pkg in self.packages: print " Package: %s" % pkg.getNVRA() # Library used to communicate with Spacewalk class RHNSession: def __init__(self, servername, user, passwd): self.rhnServerName = servername self.login = user self.password = passwd self.rhnUrl = 'https://'+self.rhnServerName+'/rpc/api' self.server = xmlrpclib.Server(self.rhnUrl) self.rhnSessionKey=self.rhnLogin(self.login,self.password) def rhnLogin(self, login, password): try: rhnSessionKey=self.server.auth.login(login,password) except xmlrpclib.Fault, f: if f.faultCode==-20: print "Session expired" self.rhnLogin(login,password) else: print "Failed to login",f raise return rhnSessionKey def getSystemByName(self,profileName): out=[] try: out=self.server.system.getId(self.rhnSessionKey,profileName) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.getSystemByName(profileName) else: raise systemObj=None if (len(out) > 0): systemObj = RHNSystem(out[0]['id'],out[0]['name'],out[0]['last_checkin']) return systemObj def getSystemByID(self,systemid): out=[] try: out = self.server.system.getName(self.rhnSessionKey,systemid) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.getSystemByID(systemid) else: raise systemObj=None if (len(out) > 0): systemObj = RHNSystem(out[0]['id'],out[0]['name'],out[0]['last_checkin']) return systemObj def getThisMachine(self): out=[] try: p = libxml2.parseDoc(file("/etc/sysconfig/rhn/systemid").read()) systemid = p.xpathEval('string(//member[* = "system_id"]/value/string)').split('-')[1] except IOError: print "systemid file not found." raise systemObj = self.getSystemByID(systemid) return systemObj #TODO: this should probably return an object rather than a dictionary def getSystemDetails(self,systemObj): out={} try: out=self.server.system.getDetails(self.rhnSessionKey,int(systemObj.systemid)) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.getSystemDetails(systemObj) else: raise return out def listGroups(self,systemObj): out=[] try: out=self.server.system.listGroups(self.rhnSessionKey,int(systemObj.systemid)) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.listGroups(systemObj) else: raise return out def listUserSystems(self): out=[] try: out=self.server.system.listUserSystems(self.rhnSessionKey) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.listUserSystems() else: raise if (len(out) > 0): out2=[] for sys_info in out: sys_obj=RHNSystem(sys_info['id'],sys_info['name'],sys_info['last_checkin']) out2.append(sys_obj) out=out2 return out def listActivationKeys(self): out=[] try: out=self.server.activationkey.listActivationKeys(self.rhnSessionKey) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.listActivationKeys() else: raise return out def deleteSystems(self,systemObj): out=[] try: out=self.server.system.deleteSystems(self.rhnSessionKey,systemObj.systemid) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.deleteSystems(systemObj) else: raise return out def setGroupMembership(self,systemObj,groupid,join): out=[] try: out = self.server.system.setGroupMembership(self.rhnSessionKey,int(systemObj.systemid),groupid,join) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.setGroupMembership(systemObj,groupid,join) else: raise return out def addNote(self,systemObj,label,msg): out=[] try: out = self.server.system.addNote(self.rhnSessionKey,int(systemObj.systemid),label,msg) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.addNote(systemObj,label,msg) else: raise return out def setCustomValues(self,systemObj,customInfoDict): out=[] if not customInfoDict is None: try: out = self.server.system.setCustomValues(self.rhnSessionKey,int(systemObj.systemid),customInfoDict) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.setCustomValues(systemObj,customInfoDict) else: raise return out def setCustomValue(self,systemObj,label,value): customInfoArgs={label:value[0]} return self.setCustomValues(systemObj,customInfoArgs) def setSystemDetails(self,systemObj,detailsDict): out=0 if not detailsDict is None: try: out = self.server.system.setDetails(self.rhnSessionKey,int(systemObj.systemid),detailsDict) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.setSystemDetails(systemObj,detailsDict) else: raise return out def setNewProfileName(self,systemObj,name): out=[] try: systemObj.name=name out = self.server.system.setProfileName(self.rhnSessionKey,int(systemObj.systemid),name) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.setProfileName(systemObj,name) else: raise return out def setGroup(self, systemObj, groupname, join=1): for c in self.listGroups(): if c['system_group_name'] == groupname: sgid=c['sgid'] if int(c['subscribed']) == 1: join = 0 if join == 1: self.setGroupMembership(systemObj, int(sgid),join) print "System %s has joined %s " % systemObj.name, groupname else: self.setGroupMembership(systemObj, int(sgid),join) print "System %s has left %s" % systemObj.name, groupname def getCustomValues(self,systemObj): out={} try: out = self.server.system.getCustomValues(self.rhnSessionKey,int(systemObj.systemid)) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.getCustomKeyLabels() else: raise return out def addCustomKey(self,keyLabel,keyDescription): out=0 try: out = self.server.system.custominfo.createKey(self.rhnSessionKey,keyLabel,keyDescription) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.addCustomKey(keyLabel,keyDescription) else: raise return out def getCustomKeyLabels(self): out=set() result=[] try: result = self.server.system.custominfo.listAllKeys(self.rhnSessionKey) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.getCustomKeyLabels() else: raise for keyInfo in result: out.add(keyInfo['label']) return out def getErrataDetails(self,advisoryName): result=None try: result = self.server.errata.getDetails(self.rhnSessionKey,advisoryName) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.getErrataDetails(advisoryName) elif f.faultCode==-208: #This seems to be the fault returned when the errata does not exist result = None else: raise if not result is None: errata = RHNErrata() errata.advisoryName=advisoryName errata.issueDate = result['issue_date'] errata.modifiedDate = result['update_date'] errata.updateDate = result['last_modified_date'] errata.description=result['description'] errata.synopsis=result['synopsis'] errata.topic=result['topic'] errata.references=result['references'] errata.notes=result['notes'] errata.advisoryType=result['type'] else: errata = None return errata def findPackageByNVREA(self,pkg_info): result= None pkg_details = None try: #Fortunately this RPC method returns an empty list if the package does not exist, no need to handle an undocumented exception if pkg_info.epoch: result = self.server.packages.findByNvrea(self.rhnSessionKey,pkg_info.name,pkg_info.version,pkg_info.release,pkg_info.epoch,pkg_info.archLabel) else: result = self.server.packages.findByNvrea(self.rhnSessionKey,pkg_info.name,pkg_info.version,pkg_info.release,"",pkg_info.archLabel) if len(result) > 0: pkg_details = result[0] except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.findPackageByNVREA(pkg_info) else: raise if not pkg_details is None: server_pkg = RHNPackage(pkg_details['name'],pkg_details['version'],pkg_details['release'],pkg_details['epoch'],pkg_details['arch_label']) server_pkg.id = pkg_details['id'] server_pkg.path = pkg_details['path'] server_pkg.provider = pkg_details['provider'] server_pkg.lastModified=pkg_details['last_modified'] return server_pkg return None def findPackageByNameAndChecksum(self,pkg_name, pkg_checksum): result= None pkg_details = None try: #Fortunately this RPC method returns an empty list if the package does not exist, no need to handle an undocumented exception result = self.server.packages.search.name(self.rhnSessionKey,pkg_name) if len(result) > 0: for search_result in result: if pkg_name == search_result['name']: pkg_details = self.server.packages.getDetails(self.rhnSessionKey,search_result['id']) if pkg_details['checksum'] == pkg_checksum: server_pkg = RHNPackage(pkg_details['name'],pkg_details['version'],pkg_details['release'],pkg_details['epoch'],pkg_details['arch_label']) server_pkg.id = pkg_details['id'] server_pkg.path = pkg_details['path'] server_pkg.lastModified=pkg_details['last_modified_date'] return server_pkg except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.findPackageByNameAndChecksum(pkg_name,pkg_checksum) else: raise return None def createErrata(self,erratum): result= None #Note: this method has not been tested when the errata has any bugs or keywords. Sending the value [{}] for bugs seems to cause a crash - maybe you need [{id:"12345",name:"foobug"}] or similar for it to work if not erratum.readyToCreate(): raise try: result = self.server.errata.create(self.rhnSessionKey,erratum.getInfoDict(),erratum.bugs,erratum.keywords,erratum.getPackageIds(),erratum.publish,erratum.channelLabel) except xmlrpclib.Fault, f: if f.faultCode==-20: self.rhnLogin(self.login,self.password) return self.createErrata(erratum) else: raise return result def addRequiredOptions(parser): parser.add_option("-s", "--server", type="string", dest="server", help="RHN Satellite server hostname") parser.add_option("-l", "--login", type="string", dest="login", help="RHN Login") parser.add_option("", "--password", type="string", dest="passwd", help="RHN password (cleartext)") def establishSession(options,cmdName): if (options.server and options.login) is None: print "Please specify --server and --login. Try: "+cmdName+" --help" sys.exit(2) try: while options.passwd is None: options.passwd = getpass.getpass("RHN Password: ") except Exception, e: print "Terminal does not seem to be functional. You should specify a password with --password. Aborting." sys.exit(2) mySession = RHNSession(options.server,options.login,options.passwd) return mySession def process_args(): config = ConfigParser.SafeConfigParser() config.readfp(open(CONFIG_FILE)) parser = OptionParser(usage="%prog [options] [filename]",version="%prog 0.2") addRequiredOptions(parser) parser.add_option("","--max-errata",type="int",dest="max_errata",default=10000, help="Maximum number of errata to process at once. Only relevant to format 'mail-archive.com'") parser.add_option("-c","--config",type="string",dest="config_file", help="Read the specified config file instead of the default %s" % CONFIG_FILE) parser.add_option("-f","--format",type="string",dest="format",default="digest", help="Select input format for tool. Default is digest. Valid options are digest, archive, mail-archive.com") parser.add_option("","--scrape-rhn",action="store_true",dest="scrape_rhn",default=False, help="Connect to the RedHat Network site and attempt to download errata information") parser.add_option("","--show-config",action="store_true",dest="print_config", default=False, help="Do not connect to the RHN server, just print configuration information") parser.add_option("-t","--test",action="store_true",dest="testmode", default=False, help="Do not connect to the RHN server, just process the input file and print errata information. Will also print configuration information") parser.add_option("","--centos-version",type="string",dest="centos_version", help="The centos version (e.g. '5' for Centos 5.3) ") #Have to perform DIRTY HACK here because optparse will exit early #if -h is passed, not what we want since we only want to retrieve #the config file at this point and parse again later. The ideal #would be the ability to prevent parse_args from exiting early so #we could call it twice even with --help user_config = None try: arg_index = sys.argv.index("-c") user_config = sys.argv[arg_index+1] except Exception,err: pass if not user_config is None: user_cfg_result = config.read([user_config]) if len(user_cfg_result) != 1 or user_cfg_result[0] != user_config: print "Failed to read config file %s " % user_config sys.exit(2) interpolation_vars = {'version' : config.get("centos errata","version"), 'release' : config.get("centos errata","release")} parser.set_defaults(centos_version=config.get("centos errata","version")) parser.set_defaults(centos_release=config.get("centos errata","release")) #Setup args to control our configured architectures for arch in VALID_ARCH: if config.has_section(arch): active_arches.append(arch) channel_label_opt = "%s_channel" % arch package_dir_opt = "%s_packagedir" % arch parser.add_option("","--%s-channel" % arch ,type="string",dest=channel_label_opt, help="The updates channel for arch %s" % arch) parser.add_option("","--%s-packagedir" % arch ,type="string",dest=package_dir_opt, help="The package directory for arch %s" % arch) if config.has_option(arch,"channel"): parser.set_default(channel_label_opt,config.get(arch,"channel",0,interpolation_vars)) if config.has_option(arch,"package_dir"): parser.set_default(package_dir_opt,config.get(arch,"package_dir",0,interpolation_vars)) if config.has_option("spacewalk","server"): parser.set_defaults(server=config.get("spacewalk","server")) if config.has_option("spacewalk","password"): parser.set_defaults(password=config.get("spacewalk","password")) if config.has_option("spacewalk","login"): parser.set_defaults(login=config.get("spacewalk","login")) if config.has_option("centos errata","scrape_rhn"): parser.set_defaults(scrape_rhn=config.getboolean("centos errata","scrape_rhn")) if config.has_option("centos errata", "max_errata"): parser.set_defaults(max_errata=config.getint("centos errata", "max_errata")) (options,args) = parser.parse_args() return (options,args) def process_pkg_file(pkgfile): #Swiped from http://www.sharms.org/blog/2009/05/21/python-rpm/ as rpm-python has no documentation rpmQuery = rpm.ts() try: fd = os.open(pkgfile, os.O_RDONLY) header = rpmQuery.hdrFromFdno(fd) os.close(fd) except Exception,msg: print "process_pkg_file failed with exception %s. " % msg traceback.print_exc(file=sys.stdout) return None pkgInfo = RHNPackage(header['name'],header['version'],header['release'],header['epoch'],header['arch']) return pkgInfo def errata_prepare_core(options,cache,erratum_subject,extra_info): erratum_subject_re = re.compile(ERRATA_SUBJECT) sec_info_re = re.compile(SECURITY_INFO) bug_info_re = re.compile(BUG_INFO) enhance_info_re = re.compile(ENHANCE_INFO) erratum_subject_match = erratum_subject_re.match(erratum_subject) if erratum_subject_match is None: print "Message with subject '%s' doesnt appear to be an errata " % erratum_subject return None errata_type = erratum_subject_match.group('errata_type') advisory_name = errata_type+"-"+erratum_subject_match.group('year')+":"+erratum_subject_match.group('errata_id') #TODO: could also load existing errata from server here if cache.has_key(advisory_name): erratum = cache[advisory_name] else: erratum = RHNErrata() erratum.advisoryName = advisory_name erratum.publish = True if errata_type == SECURITY_ERRATA: info_match = sec_info_re.match(erratum_subject_match.group('other_info')) erratum.advisoryType=SECURITY elif errata_type == BUG_ERRATA: info_match = bug_info_re.match(erratum_subject_match.group('other_info')) erratum.advisoryType=BUGFIX elif errata_type == ENHANCE_ERRATA: info_match = enhance_info_re.match(erratum_subject_match.group('other_info')) erratum.advisoryType=ENHANCEMENT else: print "Unknown errata type %s, assuming type BUG" % erratum_subject_match.group('errata_type') errata_type = BUG_ERRATA erratum.advisoryType=BUGFIX info_match = bug_info_re.match(erratum_subject_match.group('other_info')) if info_match is None: print "Errata '%s' doesnt match any of the known types " % erratum_subject return None if info_match.group('version') != options.centos_version: print "Errata '%s' is inapplicable to the targeted CentOS release " % re.sub("\s+"," ",erratum_subject) return None extra_info['arch'] = info_match.group('arch') update_channel = get_update_channel(options,extra_info['arch']) extra_info['package_dir'] = get_package_dir(options,extra_info['arch']) if update_channel is None: print "No update channel configured for architecture '%s'. Skipping Errata '%s'" % (extra_info['arch'],re.sub("\s+"," ",erratum_subject)) return None erratum.addPublishChannel(update_channel) erratum.product = "CentOS "+options.centos_version rhn_url = "https://rhn.redhat.com/errata/"+errata_type+"-"+erratum_subject_match.group('year')+"-"+erratum_subject_match.group('errata_id')+".html" rhn_url = re.sub(r"/CE","/RH",rhn_url) erratum.topic=rhn_url if options.scrape_rhn: download_description(erratum,rhn_url) if erratum.description is None: erratum.description ="Automatically imported CentOS erratum" if erratum.solution is None: erratum.solution = "Install these packages to correct the erratum" if info_match.groupdict().has_key('severity'): erratum.synopsis= info_match.group('severity')+": "+info_match.group('synopsis').replace('\t', ' ') else: erratum.synopsis= info_match.group('synopsis').replace('\t', ' ') return erratum def errata_prepare_packagelist(session,options,erratum,package_dir,package_list): packagelist_re = re.compile(PACKAGE_LIST) for pkgfile in packagelist_re.findall(package_list): pkg_checksum = pkgfile[0] pkg_filename = pkgfile[1] if not pkg_filename.endswith(".src.rpm"): pkg_info = search_spacewalk_server(session,options,pkg_filename,pkg_checksum) if pkg_info is None: if package_dir is None: print "Spacewalk search for package %s failed Skipping errata %s" % (pkg_filename,erratum.advisoryName) else: print "Spacewalk search failed: falling back to searching %s for %s" % (package_dir,pkg_filename) pkg_info = process_pkg_file(package_dir+pkg_filename) if pkg_info is None: print "Package %s%s does not exist or cannot be read. Skipping errata %s" % (package_dir,pkg_filename,erratum.advisoryName) if not pkg_info is None: erratum.packages.append(pkg_info) else: return None return erratum #Load email and decide on whether to act on it #Return None if no useful errata information is found in the email or a RHNErrata object if info was found def process_msg(session,options,cache,msg_text): try: errataMsg = email.message_from_string(msg_text) stripNewLine = re.compile('\n') erratum_subject = errataMsg.get("Subject") if erratum_subject is None: return None erratum_subject = stripNewLine.sub("",erratum_subject) extra_info = {} erratum = errata_prepare_core(options,cache,erratum_subject,extra_info) if not erratum is None: if not errata_prepare_packagelist(session,options,erratum,extra_info['package_dir'],errataMsg.get_payload()) is None: cache[erratum.advisoryName] = erratum return erratum except Exception, e: print "Failed to process message. Reason:" print e traceback.print_exc(file=sys.stdout) return None def process_file_input(session,options,input_file): try: inputData=open(input_file).read() except IOError,msg: print "Failed to read %s. Reason: %s" % (input_file,msg) sys.exit(2) if options.format == "digest": digestMsg=email.message_from_string(inputData) if digestMsg.is_multipart(): print "Don't know how to handle multipart messages" sys.exit(2) try: digestPayloads = digestMsg.get_payload().split(DIGEST_BEGIN)[1] for msg in digestPayloads.split(DIGEST_SEPARATOR): new_errata = process_msg(session,options,errata_cache,msg) except IndexError,index_msg: print "The file %s does not appear to be a digest from centos-announce" % input_file sys.exit(2) elif options.format == "archive": #Split on lines formmated thusly: From kbsingh at centos.org Thu Jan 8 16:25:09 2009 splitter = re.compile(ARCHIVE_SEPARATOR) for msg in splitter.split(inputData): new_errata = process_msg(session,options,errata_cache,msg) else: print "Unknown input format %s" % options.format sys.exit(2) def process_mailarchive_msg(session,options,cache,msg_url,erratum_subject): try: extra_info = {} erratum = errata_prepare_core(options,cache,erratum_subject,extra_info) if not erratum is None: print "Downloading package data for " + erratum.advisoryName message_f = urllib.urlopen(msg_url) message_src = message_f.read() message_f.close() if not errata_prepare_packagelist(session,options,erratum,extra_info['package_dir'],message_src) is None: cache[erratum.advisoryName] = erratum return erratum except Exception, e: print "Failed to process message. Reason: %s" % e traceback.print_exc(file=sys.stdout) return None def process_mailarchive(session,options): parsed_errata = 0 try: messages_list_f = urllib.urlopen(MAILARCHIVE_BASE+"maillist.html") s = messages_list_f.read() messages_list_f.close() except IOError,msg: print "Failed to open URL %s. Reason: %s" % (MAILARCHIVE_BASE+"maillist.html",msg) sys.exit(2) lines = s.split("\n") subjects_re = re.compile(MAILARCHIVE_SUBJECT) for line in lines: subjects_match = subjects_re.match(line) if parsed_errata >= options.max_errata: print "Max errata count %d exceeded. Processing no more errata" % options.max_errata return parsed_errata if subjects_match is None: continue if process_mailarchive_msg(session,options,errata_cache, MAILARCHIVE_BASE+subjects_match.group('relurl'), subjects_match.group('subject')) is None: parsed_errata += 1 return parsed_errata def get_update_channel(app_options,target_arch): channel_opt = "%s_channel" % target_arch if hasattr(app_options,channel_opt): return getattr(app_options,channel_opt) return None def get_package_dir(app_options,target_arch): package_dir_opt = "%s_packagedir" % target_arch if hasattr(app_options,package_dir_opt): return getattr(app_options,package_dir_opt) return None def search_spacewalk_server(session,options,pkg_filename,pkg_checksum): pkg_name_matched = re.match(PACKAGE_NAMEONLY, pkg_filename) pkg_name = pkg_name_matched.group('pkg_name') if pkg_name is None: print "Bad package filename %s" % pkg_filename return None if options.testmode or session is None: print "Test mode: would search spacewalk for %s (%s)" % (pkg_name, pkg_checksum) return None return session.findPackageByNameAndChecksum(pkg_name, pkg_checksum) def download_description(erratum,rhn_url): if not erratum.description is None: return rhn_details_re = re.compile(RHN_ERRATA_DETAILS) rhn_solution_re = re.compile(RHN_ERRATA_SOLUTION) try: print "Downloading RHN data for " + erratum.advisoryName message_f = urllib.urlopen(rhn_url) message_src = message_f.read() message_f.close() details_match = rhn_details_re.search(message_src) if not details_match is None: erratum.description = replace_rhn_content(details_match.group("details")) solution_match = rhn_solution_re.search(message_src) if not solution_match is None: erratum.solution = replace_rhn_content(solution_match.group("solution")) except: print "Failed to download details for %s, using defaults" % erratum.advisoryName def replace_rhn_content(rhn_content): ret = rhn_content.replace("
*", "\n*") ret = ret.replace("
", "\n") ret = ret.replace("

", "") ret = ret.replace("

", "") ret = ret.replace(""", "'") ret = ret.replace("Red Hat Enterprise Linux", "CentOS") return ret; def main(): (options,args) = process_args() if options.testmode or options.print_config: print "Current configuration:" for option,value in options.__dict__.items(): print "%-20s = %s" % (option,value) if options.print_config: sys.exit(0) abort_test = False for arch in active_arches: package_dir = get_package_dir(options,arch) if package_dir is None: print "Warning: %s is not set" % package_dir_opt continue elif not os.path.exists(package_dir): print "Warning: %s arch package directory %s does not exist" % (arch,package_dir) abort_test = True elif not os.path.isdir(package_dir): print "Warning: %s arch package dir %s is not a directory" % (arch,package_dir) abort_test = True elif not os.access(package_dir,os.R_OK): print "Warning: %s arch package dir %s is not readable" % (arch,package_dir) abort_test = True if abort_test and options.testmode: print "Test mode does not access the spacewalk server and therefore requires package directories" sys.exit(2) if options.format == "mail-archive.com": session = None if not options.testmode: session = establishSession(options,sys.argv[0]) process_mailarchive(session,options) elif options.format == "archive" or options.format == "digest": if len(args) == 0: print "I need an input filename. See %s --help" % sys.argv[0] sys.exit(2) inputFile = args[0] if not os.path.exists(inputFile): print "Input file %s does not exist" % inputFile sys.exit(2) elif not os.path.isfile(inputFile): print "Input file %s is not a normal file" % inputFile sys.exit(2) elif not os.access(inputFile,os.R_OK): print "Input file %s is not readable" % inputFile sys.exit(2) session = None if not options.testmode: session = establishSession(options,sys.argv[0]) process_file_input(session,options,inputFile) elif not options.format is None: print "Unknown format %s. See --help for valid formats " % options.format sys.exit(2) #Process any errata we have if len(errata_cache.keys()) > 0: print "Processing %d errata..." % len(errata_cache.keys()) for erratum in errata_cache.values(): try: if options.testmode: print "In test mode. Not checking server for existing erratum %s" % erratum.advisoryName erratum.printOut() print "------" else: skip = False if not session.getErrataDetails(erratum.advisoryName) is None: print "Errata %s already exists on server, skipping" % erratum.advisoryName #TODO: should handle updates of errata here. Basically if it's in the cache we need to get the package delta using listPackages then call addPackages #Errata the script has created might be incomplete (e.g. advisories split across emails) so don't try anything cleverer than this continue for pkg_info in erratum.packages: if pkg_info.id is None: rhn_pkg_info = session.findPackageByNVREA(pkg_info) if not rhn_pkg_info is None: pkg_info.id=rhn_pkg_info.id else: print "Package %s is not available on the server. " % pkg_info.getNVRA skip = True if skip: print "Skipping erratum %s due to missing packages" % erratum.advisoryName continue else: session.createErrata(erratum) except Exception,e: print "An exception occured when communicating with the server. Skipping erratum %s. Reason:" % erratum.advisoryName print e traceback.print_exc(file=sys.stdout) if __name__ == "__main__": main()