#!/usr/bin/python """Copyright 2011 Gentoo Foundation Distributed under the terms of the GNU General Public License v2 """ from __future__ import print_function # Meta: __author__ = "Corentin Chary (iksaif)" __email__ = "corentin.chary@gmail.com" __version__ = "git" __productname__ = "euscan" __description__ = "A tool to detect new upstream releases." # ======= # Imports # ======= import os import sys import re import time import getopt import errno import random import urllib2 import StringIO import pkg_resources import portage import portage.versions from portage.output import white, yellow, turquoise, green, teal, red, EOutput from portage.dbapi.porttree import _parse_uri_map import gentoolkit.pprinter as pp from gentoolkit import errors from gentoolkit.query import Query from gentoolkit.eclean.search import (port_settings) # ======= # Globals # ======= QUERY_OPTS = {"include_masked": True} BLACKLIST_PACKAGES = [ # Compatibility package for running binaries linked against a pre gcc 3.4 libstdc++, won't be updated 'sys-libs/libstdc++-v3' # These kernels are almost dead 'sys-kernel/usermode-sources', 'sys-kernel/xbox-sources', 'sys-kernel/cell-sources', ] SCANDIR_BLACKLIST_URLS = [ 'mirror://rubygems/(.*)', # Not browsable 'mirror://gentoo/(.*)' # Directory too big ] BRUTEFORCE_BLACKLIST_PACKAGES = [ 'net-zope/plonepopoll' # infinite loop any http://plone.org/products/plonepopoll/releases/*/plonepopoll-2-6-1.tgz link will work ] BRUTEFORCE_BLACKLIST_URLS = [ 'http://(.*)dockapps.org/download.php/id/(.*)', # infinite loop 'http://hydra.nixos.org/build/(.*)', # infinite loop 'http://www.rennings.net/gentoo/distfiles/(.*)' # Doesn't respect 404, infinite loop ] def htop_vercmp(a, b): def fixver(v): if v in ['0.11', '0.12', '0.13']: v = '0.1.' + v[3:] return v return simple_vercmp(fixver(a), fixver(b)) VERSION_CMP_PACKAGE_QUIRKS = { 'sys-process/htop' : htop_vercmp } _v = r'((\d+)((\.\d+)*)([a-zA-Z]*?)(((-|_)(pre|p|beta|b|alpha|a|rc|r)\d*)*))' # ========= # Functions # ========= def cast_int_components(version): for i, obj in enumerate(version): try: version[i] = int(obj) except ValueError: pass return version def simple_vercmp(a, b): if a == b: return 0 # For sane versions r = portage.versions.vercmp(a, b) if r is not None: return r # Fallback a = pkg_resources.parse_version(a) b = pkg_resources.parse_version(b) if a < b: return -1 else: return 1 def vercmp(package, a, b): if package in VERSION_CMP_PACKAGE_QUIRKS: return VERSION_CMP_PACKAGE_QUIRKS[package](a, b) return simple_vercmp(a, b) def skipnightly(a, b): a = pkg_resources.parse_version(a) b = pkg_resources.parse_version(b) # Try to skip nightly builds when not wanted (www-apps/moodle) if len(a) != len(b) and len(b) == 2 and len(b[0]) == len('yyyymmdd'): return True return False def generate_templates_vars(version): ret = [] part = split_version(version) for i in range(2, len(part)): ver = [] var = [] for j in range(i): ver.append(str(part[j])) var.append('${%d}' % j) ret.append((".".join(ver), ".".join(var))) ret.append((version, '${PV}')) ret.reverse() return ret def template_from_url(url, version): prefix, chunks = url.split('://') chunks = chunks.split('/') for i in range(len(chunks)): chunk = chunks[i] subs = generate_templates_vars(version) for sub in subs: chunk = chunk.replace(sub[0], sub[1]) chunks[i] = chunk return prefix + "://" + "/".join(chunks) def url_from_template(url, version): components = split_version(version) url = url.replace('${PV}', version) for i in range(len(components)): url = url.replace('${%d}' % i, str(components[i])) return url # Stolen from distutils.LooseVersion # Used for brute force to increment the version def split_version(version): component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE) components = filter(lambda x: x and x != '.', component_re.split(version)) for i in range(len(components)): try: components[i] = int(components[i]) except ValueError: pass return components def join_version(components): version = "" for i in range(len(components)): version += str(components[i]) if i >= len(components) - 1: break if type(components[i]) != str and type(components[i + 1]) != str: version += "." return version def increment_version(components, level): n = len(components) if level > n - 1 or level < 0: raise Exception for i in range(n, level + 1, -1): if type(components[i - 1]) == int: components[i - 1] = 0 if type(components[level]) == int: components[level] += 1 return components def gen_versions(components, level): n = len(components) depth = level level = min(level, n) if not n: return [] versions = [] for i in range(n, n - level, -1): increment_version(components, i - 1) for j in range(depth): versions.append(list(components)) increment_version(components, i - 1) return versions def tryurl(fileurl, output, template): result = True output.ebegin("Trying: " + fileurl) try: basename = os.path.basename(fileurl) fp = urllib2.urlopen(fileurl, None, 5) headers = fp.info() if 'Content-disposition' in headers and basename not in headers['Content-disposition']: result = None elif 'Content-Length' in headers and headers['Content-Length'] == '0': result = None elif 'text/html' in headers['Content-Type']: result = None elif fp.geturl() != fileurl: regex = regex_from_template(template) baseregex = regex_from_template(os.path.basename(template)) basename2 = os.path.basename(fp.geturl()) # Redirect to another (earlier?) version if basename != basename2 and (re.match(regex, fp.geturl()) or re.match(baseregex, basename2)): result = None if result: result = (fp.geturl(), fp.info()) except urllib2.URLError: result = None except IOError: result = None output.eend(errno.ENOENT if not result else 0) return result def regex_from_template(template): template = re.escape(template) template = template.replace('\$\{', '${') template = template.replace('\}', '}') template = template.replace('}\.$', '}.$') template = template.replace('${1}', r'([\d]+?)') template = re.sub(r'(\$\{\d+\}\.?)+', r'([\w]+?)', template) #template = re.sub(r'(\$\{\d+\}\.?)+', r'([\w]+?)', template) #template = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', template) #template = re.sub(r'(\$\{\d+\})+', '(.+?)', template) template = template.replace('${PV}', _v) template = template + r'/?$' return template def basedir_from_template(template): idx = template.find('${') if idx == -1: return template idx = template[0:idx].rfind('/') if idx == -1: return "" return template[0:idx] def generate_scan_paths(url): prefix, chunks = url.split('://') chunks = chunks.split('/') steps = [] path = prefix + ":/" for chunk in chunks: if '${' in chunk: steps.append((path, regex_from_template(chunk))) path = "" else: path += "/" path += chunk return steps def scan_directory_recursive(cpv, url, steps, vmin, vmax, output): if not steps: return [] cp, ver, rev = portage.pkgsplit(cpv) url += steps[0][0] pattern = steps[0][1] steps = steps[1:] output.einfo("Scanning: %s" % url) try: fp = urllib2.urlopen(url, None, 5) except urllib2.URLError: return [] except IOError: return [] data = fp.read() results = [] if re.search("<\s*a\s+[^>]*href", data): from BeautifulSoup import BeautifulSoup soup = BeautifulSoup(data) for link in soup.findAll('a'): href = link.get("href") if not href: continue if href.startswith(url): href = href.replace(url, "", 1) match = re.match(pattern, href, re.I) if match: results.append((match.group(1), match.group(0))) elif url.startswith('ftp://'): # Probably a FTP Server buf = StringIO.StringIO(data) for line in buf.readlines(): line = line.replace("\n", "").replace("\r", "") match = re.search(pattern, line, re.I) if match: results.append((match.group(1), match.group(0))) # add url versions = [] for version, path in results: if vmin and vercmp(cp, version, vmin) <= 0: continue if vmax and vercmp(cp, version, vmax) >= 0: continue if skipnightly(vmin, version): continue if not url.endswith('/') and not path.startswith('/'): path = url + '/' + path else: path = url + path versions.append((path, version)) if steps: ret = scan_directory_recursive(cpv, path, steps, vmin, vmax, output) versions.extend(ret) return versions def scan_directory(cpv, fileurl, options, output, limit=None): # Ftp: list dir # Handle mirrors if not options["scan-dir"]: return [] catpkg, ver, rev = portage.pkgsplit(cpv) template = template_from_url(fileurl, ver) if '${' not in template: output.einfo("Url doesn't seems to depend on version: %s not found in %s" % (ver, fileurl)) return [] else: output.einfo("Scanning: %s" % template) steps = generate_scan_paths(template) return scan_directory_recursive(cpv, "", steps, ver, limit, output) def brute_force(cpv, fileurl, options, output, limit=None): if options["brute-force"] <= 0: return [] catpkg, ver, rev = portage.pkgsplit(cpv) for bp in BRUTEFORCE_BLACKLIST_PACKAGES: if re.match(bp, catpkg): output.einfo("%s is blacklisted by rule %s" % (catpkg, bp)) return [] for bp in BRUTEFORCE_BLACKLIST_URLS: if re.match(bp, fileurl): output.einfo("%s is blacklisted by rule %s" % (catpkg, bp)) return [] components = split_version(ver) versions = gen_versions(components, options["brute-force"]) """ Use the quirks to remove unwanted versions """ for v in versions: if vercmp(catpkg, ver, join_version(v)) >= 0: versions.remove(v) output.einfo("Generating version from " + ver) if not versions: output.einfo("Can't generate new versions from " + ver) return [] template = template_from_url(fileurl, ver) if '${PV}' not in template: output.einfo("Url doesn't seems to depend on full version: %s not found in %s" % (ver, fileurl)) return [] else: output.einfo("Brute forcing: %s" % template) result = [] i = 0 done = [] while i < len(versions): components = versions[i] i += 1 if components in done: continue done.append(tuple(components)) vstring = join_version(components) if limit and vercmp(catpkg, vstring, limit) >= 0: continue url = url_from_template(template, vstring) infos = tryurl(url, output, template) if not infos: continue result.append([url, vstring]) if options["brute-force-recursive"]: for v in gen_versions(components, options["brute-force"]): if v not in versions and tuple(v) not in done: versions.append(v) if options["oneshot"]: break return result def parseMirror(uri, output): from random import shuffle mirrors = portage.settings.thirdpartymirrors() if not uri.startswith("mirror://"): return uri eidx = uri.find("/", 9) if eidx == -1: output.einfo("Invalid mirror definition in SRC_URI:\n") output.einfo(" %s\n" % (uri)) return None mirrorname = uri[9:eidx] path = uri[eidx+1:] if mirrorname in mirrors: mirrors = mirrors[mirrorname] shuffle(mirrors) uri = mirrors[0].strip("/") + "/" + path else: output.einfo("No known mirror by the name: %s\n" % (mirrorname)) return None return uri def setupSignals(): """ This block ensures that ^C interrupts are handled quietly. """ import signal def exithandler(signum,frame): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) print () sys.exit(errno.EINTR) signal.signal(signal.SIGINT, exithandler) signal.signal(signal.SIGTERM, exithandler) signal.signal(signal.SIGPIPE, signal.SIG_DFL) def printVersion(): """Output the version info.""" print( "%s (%s) - %s" \ % (__productname__, __version__, __description__)) print() print("Author: %s <%s>" % (__author__,__email__)) print("Copyright 2011 Gentoo Foundation") print("Distributed under the terms of the GNU General Public License v2") def printUsage(_error=None, help=None): """Print help message. May also print partial help to stderr if an error from {'options'} is specified.""" out = sys.stdout if _error: out = sys.stderr if not _error in ('global-options', 'packages',): _error = None if not _error and not help: help = 'all' if _error in ('global-options',): print( pp.error("Wrong option on command line."), file=out) print( file=out) if _error in ('packages',): print( pp.error("You need to specify exactly one package."), file=out) print( file=out) print( white("Usage:"), file=out) if _error in ('global-options', 'packages',) or help == 'all': print( " "+turquoise(__productname__), yellow("[options]"), green(""), file=out) if _error in ('global-options',) or help == 'all': print( " "+turquoise(__productname__), yellow("[--help, --version]"), file=out) print(file=out) if _error in ('global-options',) or help: print( "Available ", yellow("options")+":", file=out) print( yellow(" -C, --nocolor")+ " - turn off colors on output", file=out) print( yellow(" -q, --quiet")+ " - be as quiet as possible", file=out) print( yellow(" -h, --help")+ \ " - display the help screen", file=out) print( yellow(" -V, --version")+ " - display version info", file=out) print( file=out) print( yellow(" -1, --oneshot")+ " - stop as soon as a new version is found", file=out) print( yellow(" -b, --brute-force=")+ " - define the brute force "+yellow("")+" (default: 2)\n" + " " * 29 + "bigger levels will generate more versions numbers\n" + " " * 29 + "0 means disabled", file=out) print( file=out) if _error in ('packages',) or help: print( green(" package")+ " - the package (or ebuild) you want to scan", file=out) print( file=out) #print( "More detailed instruction can be found in", # turquoise("`man %s`" % __productname__), file=out) class ParseArgsException(Exception): """For parseArgs() -> main() communications.""" def __init__(self, value): self.value = value # sdfgsdfsdfsd def __str__(self): return repr(self.value) def parseArgs(options={}): """Parse the command line arguments. Raise exceptions on errors. Returns package and affect the options dict. """ def optionSwitch(option,opts): """local function for interpreting command line options and setting options accordingly""" return_code = True for o, a in opts: if o in ("-h", "--help"): raise ParseArgsException('help') elif o in ("-V", "--version"): raise ParseArgsException('version') elif o in ("-C", "--nocolor"): options['nocolor'] = True pp.output.nocolor() elif o in ("-q", "--quiet"): options['quiet'] = True options['verbose'] = False elif o in ("-1", "--oneshot"): options['oneshot'] = True elif o in ("-b", "--brute-force"): options['brute-force'] = int(a) elif o in ("-v", "--verbose") and not options['quiet']: options['verbose'] = True else: return_code = False return return_code # here are the different allowed command line options (getopt args) getopt_options = {'short':{}, 'long':{}} getopt_options['short']['global'] = "hVCqv1b:" getopt_options['long']['global'] = ["help", "version", "nocolor", "quiet", "verbose", "oneshot", "brute-force="] # set default options, except 'nocolor', which is set in main() options['quiet'] = False options['verbose'] = False options['brute-force'] = 2 options['oneshot'] = False options['brute-force-recursive'] = True # FIXME add an option options['scan-dir'] = True # FIXME add an option short_opts = getopt_options['short']['global'] long_opts = getopt_options['long']['global'] opts_mode = 'global' # apply getopts to command line, show partial help on failure try: opts, args = getopt.getopt(sys.argv[1:], short_opts, long_opts) except: raise ParseArgsException(opts_mode+'-options') # set options accordingly optionSwitch(options,opts) if len(args) != 1: raise ParseArgsException('packages') return args[0] def scanUpstream(options, package, output): matches = Query(package).find( include_masked=QUERY_OPTS['include_masked'], in_installed=False ) if not matches: sys.stderr.write(pp.warn("No package matching '%s'" % pp.pkgquery(package))) sys.exit(errno.ENOENT) matches = sorted(matches) pkg = matches.pop() if '9999' in pkg.version: if len(matches) == 0: sys.stderr.write(pp.warn("Package '%s' only have a dev version (9999)" % pp.pkgquery(package))) sys.exit(errno.ENOENT) else: pkg = matches.pop() if pkg.cp in BLACKLIST_PACKAGES: sys.stderr.write(pp.warn("Package '%s' is blacklisted" % pp.pkgquery(package))) sys.exit(errno.ENOENT) pp.uprint(" * %s [%s]" % (pp.cpv(pkg.cpv), pp.section(pkg.repo_name()))) pp.uprint() ebuild_path = pkg.ebuild_path() if ebuild_path: pp.uprint('Ebuild: ' + pp.path(os.path.normpath(ebuild_path))) pp.uprint('Repository: ' + pkg.repo_name()) pp.uprint('Homepage: ' + pkg.environment("HOMEPAGE")) pp.uprint('Description: ' + pkg.environment("DESCRIPTION")) pp.uprint() cpv = pkg.cpv metadata = { "EAPI" : port_settings["EAPI"], "SRC_URI" : pkg.environment("SRC_URI", False), } use = frozenset(port_settings["PORTAGE_USE"].split()) try: alist = _parse_uri_map(cpv, metadata, use=use) aalist = _parse_uri_map(cpv, metadata) except InvalidDependString as e: sys.stderr.write(pp.warn("%s\n" % str(e))) sys.stderr.write(pp.warn("Invalid SRC_URI for '%s'" % pp.pkgquery(cpv))) sys.exit(errno.ENOENT) if "mirror" in portage.settings.features: fetchme = aalist else: fetchme = alist versions = [] for filename in fetchme: for fileurl in fetchme[filename]: skipscan = False output.einfo("SRC_URI is '%s'" % fileurl) if '://' not in fileurl: output.einfo("Invalid url '%s'" % fileurl) continue for bp in SCANDIR_BLACKLIST_URLS: if re.match(bp, fileurl): output.einfo("%s is blacklisted by rule %s" % (fileurl, bp)) skipscan = True url = parseMirror(fileurl, output) # Try list dir, but not for gentoo mirrors, it's too slow if not skipscan: versions.extend(scan_directory(cpv, url, options, output)) if versions and options['oneshot']: break # Try manual bump versions.extend(brute_force(cpv, url, options, output)) if versions and options['oneshot']: break newversions = {} for url, version in versions: if version in newversions and len(url) < len(newversions[version]): continue newversions[version] = url print () for version in newversions: print ("Upstream Version: " + pp.number("%s" % version) + pp.path(" %s" % newversions[version])) if not len(newversions): print (pp.warn("Didn't find any new version, check package's homepage for " + "more informations")); return versions def main(): """Parse command line and execute all actions.""" # set default options options = {} options['nocolor'] = (port_settings["NOCOLOR"] in ('yes','true') or not sys.stdout.isatty()) if options['nocolor']: pp.output.nocolor() # parse command line options and actions try: package = parseArgs(options) # filter exception to know what message to display except ParseArgsException as e: if e.value == 'help': printUsage(help='all') sys.exit(0) elif e.value[:5] == 'help-': printUsage(help=e.value[5:]) sys.exit(0) elif e.value == 'version': printVersion() sys.exit(0) else: printUsage(e.value) sys.exit(errno.EINVAL) output = EOutput(options['quiet']) scanUpstream(options, package, output) if __name__ == "__main__": try: setupSignals() main() except KeyboardInterrupt: print( "Aborted.") sys.exit(errno.EINTR) sys.exit(0)