#!/usr/bin/python ############################################################################## # $Header: $ ############################################################################## # Distributed under the terms of the GNU General Public License, v2 or later # Author: Corentin Chary # Gentoo new upstream release scan tool. import os import sys import re import StringIO from stat import * from xml.sax import saxutils, make_parser, handler from xml.sax.handler import feature_namespaces import urllib import urllib2 import pkg_resources import portage from portage.output import * from portage.dbapi.porttree import _parse_uri_map from portage.exception import InvalidDependString __version__ = "svn" settings = { "brute-force-level" : 2, "brute-force" : True, "brute-force-crazy" : True, "scan-dir" : True, "format" : "pretty", "verbose" : True, "stop-when-found" : False, "check-all-files" : False, } output = EOutput() output.quiet = not settings['verbose'] def cast_int_components(version): for i, obj in enumerate(version): try: version[i] = int(obj) except ValueError: pass return version def parse_version(version): version = pkg_resources.parse_version(version) #version = list(version) #return cast_int_components(version) return version def template_from_url(url, version): prefix, chunks = url.split('://') chunks = chunks.split('/') for i in range(len(chunks)): chunk = chunks[i] if not chunk: continue # If it's the full version, it's easy if version in chunk: chunk = chunk.replace(version, '${PV}') # For directories made from a part of the version elif version.startswith(chunk): full = split_version(version) part = split_version(chunk) for j in range(min(len(full), len(part))): if part[j] != full[j]: break part[j] = '${%d}' % j chunk = join_version(part) chunk = chunk.replace('}$', '}.$') chunks[i] = chunk return prefix + "://" + "/".join(chunks) def url_from_template(url, version): components = split_version(version) url = url.replace('${PV}', version) for i in range(len(components)): url = url.replace('${%d}' % i, str(components[i])) return url # Stolen from distutils.LooseVersion # Used for brute force to increment the version def split_version(version): component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE) components = filter(lambda x: x and x != '.', component_re.split(version)) for i in range(len(components)): try: components[i] = int(components[i]) except ValueError: pass return components def join_version(components): version = "" for i in range(len(components)): version += str(components[i]) if i >= len(components) - 1: break if type(components[i]) != str and type(components[i + 1]) != str: version += "." return version def increment_version(components, level): n = len(components) if level > n - 1 or level < 0: raise Exception for i in range(n, level + 1, -1): if type(components[i - 1]) == int: components[i - 1] = 0 if type(components[level]) == int: components[level] += 1 return components def gen_versions(components, level): n = len(components) depth = level level = min(level, n) if not n: return [] versions = [] for i in range(n, n - level, -1): increment_version(components, i - 1) for j in range(depth): versions.append(list(components)) increment_version(components, i - 1) return versions def tryurl(fileurl): result = False output.ebegin("Trying: " + fileurl) try: fp = urllib2.urlopen(fileurl, None, 5) headers = fp.info() basename = os.path.basename(fileurl) if 'Content-disposition' in headers and basename not in headers['Content-disposition']: result = False elif 'Content-Length' in headers and headers['Content-Length'] == '0': result = False elif 'text/html' in headers['Content-Type']: result = False else: result = True except: retult = False output.eend(errno.ENOENT if not result else 0) return result def regex_from_template(template): template = re.escape(template) template = template.replace('\$\{', '${') template = template.replace('\}', '}') template = template.replace('}\.$', '}.$') template = re.sub(r'(\$\{\d+\}\.?)+', r'([\w\.\-]+?)', template) #template = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', template) #template = re.sub(r'(\$\{\d+\})+', '(.+?)', template) template = template.replace('${PV}', r'([\w\.\-]+?)') template = template + r'/?$' return template def basedir_from_template(template): idx = template.find('${') if idx == -1: return template idx = template[0:idx].rfind('/') if idx == -1: return "" return template[0:idx] def generate_scan_paths(url): prefix, chunks = url.split('://') chunks = chunks.split('/') steps = [] path = prefix + ":/" for chunk in chunks: if '${' in chunk: steps.append((path, regex_from_template(chunk))) path = "" else: path += "/" path += chunk return steps def scan_directory_recursive(url, steps, vmin, vmax): if not steps: return [] url += steps[0][0] pattern = steps[0][1] steps = steps[1:] output.einfo("Scanning: %s" % url) try: fp = urllib2.urlopen(url, None, 5) except Exception, err: return [] data = fp.read() results = [] if re.search("<\s*a\s+[^>]*href", data): from BeautifulSoup import BeautifulSoup soup = BeautifulSoup(data) for link in soup.findAll('a'): href = link.get("href") if not href: continue if href.startswith(url): href = href.replace(url, "", 1) match = re.match(pattern, href) if match: results.append((match.group(1), match.group(0))) elif url.startswith('ftp://'): # Probably a FTP Server buf = StringIO.StringIO(data) for line in buf.readlines(): line = line.replace("\n", "").replace("\r", "") match = re.search(pattern, line) if match: results.append((match.group(1), match.group(0))) # add url versions = [] for version, path in results: ver = parse_version(version) if vmin and ver <= vmin: continue if vmax and ver >= vmax: continue if not url.endswith('/') and not path.startswith('/'): path = url + '/' + path else: path = url + path versions.append((path, version)) if steps: ret = scan_directory_recursive(path, steps, vmin, vmax) versions.extend(ret) return versions def scan_directory(cpv, fileurl, limit=None): # Ftp: list dir # Handle mirrors if not settings["scan-dir"]: return [] catpkg, ver, rev = portage.pkgsplit(cpv) template = template_from_url(fileurl, ver) if '${' not in template: output.ewarn("Url doesn't seems to depend on version: %s not found in %s" % (ver, fileurl)) return [] else: output.einfo("Scanning: %s" % template) vmin = parse_version(ver) steps = generate_scan_paths(template) return scan_directory_recursive("", steps, vmin, limit) def brute_force(cpv, fileurl, limit=None): if not settings["brute-force"]: return [] catpkg, ver, rev = portage.pkgsplit(cpv) components = split_version(ver) versions = gen_versions(components, settings["brute-force-level"]) output.einfo("Generating version from " + ver) if not versions: output.ewarn("Can't generate new versions from " + ver) return [] template = template_from_url(fileurl, ver) if '${' not in template: output.ewarn("Url doesn't seems to depend on version: %s not found in %s" % (fileurl, ver)) return [] else: output.einfo("Brute forcing: %s" % template) result = [] i = 0 done = [] while i < len(versions): components = versions[i] i += 1 if components in done: continue done.append(tuple(components)) vstring = join_version(components) version = parse_version(vstring) if limit and version >= limit: continue url = url_from_template(template, vstring) if not tryurl(url): continue result.append([url, vstring]) if settings["brute-force-crazy"]: for v in gen_versions(components, settings["brute-force-level"]): if v not in versions and tuple(v) not in done: versions.append(v) if settings["stop-when-found"]: break return result def euscan(cpv, portdir): catpkg, ver, rev = portage.pkgsplit(cpv) if portdir: portdb = portage.portdbapi(portdir) else: portdb = portage.portdbapi() src_uri, repo = portdb.aux_get(cpv, ['SRC_URI', 'repository']) metadata = { "EAPI" : portage.settings["EAPI"], "SRC_URI" : src_uri, } use = frozenset(portage.settings["PORTAGE_USE"].split()) try: alist = _parse_uri_map(cpv, metadata, use=use) aalist = _parse_uri_map(cpv, metadata) except InvalidDependString as e: red("!!! %s\n" % str(e)) red(_("!!! Invalid SRC_URI for '%s'.\n") % cpv) del e return if "mirror" in portage.settings.features: fetchme = aalist else: fetchme = alist versions = [] for filename in fetchme: for fileurl in fetchme[filename]: if fileurl.startswith('mirror://'): output.eerror('mirror:// scheme not supported (%s)' % fileurl) continue # Try list dir versions.extend(scan_directory(cpv, fileurl)) if versions and settings['stop-when-found']: break # Try manual bump versions.extend(brute_force(cpv, fileurl)) if versions and settings['stop-when-found']: break if versions and not settings["check-all-files"]: break newversions = {} for url, version in versions: if version in newversions and len(url) < len(newversions[version]): continue newversions[version] = url for version in newversions: print darkgreen("New Upstream Version: ") + green("%s" % version) + " %s" % newversions[version] return versions class Metadata_XML(handler.ContentHandler): _inside_herd="No" _inside_maintainer="No" _inside_email="No" _inside_longdescription="No" _herd = [] _maintainers = [] _longdescription = "" def startElement(self, tag, attr): if tag == "herd": self._inside_herd="Yes" if tag == "longdescription": self._inside_longdescription="Yes" if tag == "maintainer": self._inside_maintainer="Yes" if tag == "email": self._inside_email="Yes" def endElement(self, tag): if tag == "herd": self._inside_herd="No" if tag == "longdescription": self._inside_longdescription="No" if tag == "maintainer": self._inside_maintainer="No" if tag == "email": self._inside_email="No" def characters(self, contents): if self._inside_herd == "Yes": self._herd.append(contents) if self._inside_longdescription == "Yes": self._longdescription = contents if self._inside_maintainer=="Yes" and self._inside_email=="Yes": self._maintainers.append(contents) def check_metadata(cpv, portdir = None): """Checks that the primary maintainer is still an active dev and list the herd the package belongs to""" if not portdir: portdb = portage.portdbapi() repo, = portdb.aux_get(cpv, ['repository']) portdir = portdb.getRepositoryPath(repo) metadata_file = portdir + "/" + portage.pkgsplit(cpv)[0] + "/metadata.xml" if not os.path.exists(metadata_file): print darkgreen("Maintainer: ") + red("Error (Missing metadata.xml)") return 1 parser = make_parser() handler = Metadata_XML() handler._maintainers = [] parser.setContentHandler(handler) parser.parse( metadata_file ) if handler._herd: herds = ", ".join(handler._herd) print darkgreen("Herd: ") + herds else: print darkgreen("Herd: ") + red("Error (No Herd)") return 1 if handler._maintainers: print darkgreen("Maintainer: ") + ", ".join(handler._maintainers) else: print darkgreen("Maintainer: ") + "none" if len(handler._longdescription) > 1: print darkgreen("Description: ") + handler._longdescription print darkgreen("Location: ") + os.path.normpath(portdir + "/" + portage.pkgsplit(cpv)[0]) def usage(code): """Prints the uage information for this script""" print green("euscan"), "(%s)" % __version__ print print "Usage: euscan [ebuild|[package-cat/]package[-version]]" sys.exit(code) # default color setup if ( not sys.stdout.isatty() ) or ( portage.settings["NOCOLOR"] in ["yes","true"] ): nocolor() def fc(x,y): return cmp(y[0], x[0]) def main (): if len( sys.argv ) < 2: usage(1) for pkg in sys.argv[1:]: #try: if pkg.endswith('.ebuild'): portdir = os.path.dirname(os.path.dirname(os.path.dirname(pkg))) package_list = os.path.basname(pkg) else: portdir = None print pkg package_list = portage.portdb.xmatch("match-all", pkg) for cpv in package_list: print darkgreen("Package: ") + cpv #check_metadata(cpv, portdir) euscan(cpv, portdir) print "" #except Exception, err: # print red("Error: "+pkg+"\n") # print err if __name__ == '__main__': main()