#!/usr/bin/python """Copyright 2011 Gentoo Foundation Distributed under the terms of the GNU General Public License v2 """ from __future__ import print_function """ TODO: - custom url handlers (portscout) - sourceforge: use rss feeds - respect robots.txt (portscout) - check other distros (youri) - clean blacklist system """ # Meta: __author__ = "Corentin Chary (iksaif)" __email__ = "corentin.chary@gmail.com" __version__ = "git" __productname__ = "euscan" __description__ = "A tool to detect new upstream releases." # ======= # Imports # ======= import os import sys import re import time import getopt import errno import random import urllib2 import StringIO import pkg_resources import portage from portage.output import white, yellow, turquoise, green, teal, red, EOutput from portage.dbapi.porttree import _parse_uri_map import gentoolkit.pprinter as pp from gentoolkit import errors from gentoolkit.query import Query from gentoolkit.eclean.search import (port_settings) # ======= # Globals # ======= QUERY_OPTS = {"include_masked": True} BLACKLIST_PACKAGES = ['sys-kernel/usermode-sources', 'sys-kernel/xbox-sources', 'sys-kernel/cell-sources', 'sys-libs/libstdc++-v3'] SCANDIR_BLACKLIST_URLS = ['mirror://rubygems/(.*)', 'mirror://gentoo/(.*)'] BRUTEFORCE_BLACKLIST_PACKAGES = ['dev-util/patchelf', 'net-zope/plonepopoll'] BRUTEFORCE_BLACKLIST_URLS = ['http://(.*)dockapps.org/download.php/id/(.*)'] # ========= # Functions # ========= def cast_int_components(version): for i, obj in enumerate(version): try: version[i] = int(obj) except ValueError: pass return version def parse_version(version): version = pkg_resources.parse_version(version) #version = list(version) #return cast_int_components(version) return version def template_from_url(url, version): prefix, chunks = url.split('://') chunks = chunks.split('/') for i in range(len(chunks)): chunk = chunks[i] if not chunk: continue # If it's the full version, it's easy if version in chunk: chunk = chunk.replace(version, '${PV}') # For directories made from a part of the version elif version.startswith(chunk): full = split_version(version) part = split_version(chunk) for j in range(min(len(full), len(part))): if part[j] != full[j]: break part[j] = '${%d}' % j chunk = join_version(part) chunk = chunk.replace('}$', '}.$') chunks[i] = chunk return prefix + "://" + "/".join(chunks) def url_from_template(url, version): components = split_version(version) url = url.replace('${PV}', version) for i in range(len(components)): url = url.replace('${%d}' % i, str(components[i])) return url # Stolen from distutils.LooseVersion # Used for brute force to increment the version def split_version(version): component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE) components = filter(lambda x: x and x != '.', component_re.split(version)) for i in range(len(components)): try: components[i] = int(components[i]) except ValueError: pass return components def join_version(components): version = "" for i in range(len(components)): version += str(components[i]) if i >= len(components) - 1: break if type(components[i]) != str and type(components[i + 1]) != str: version += "." return version def increment_version(components, level): n = len(components) if level > n - 1 or level < 0: raise Exception for i in range(n, level + 1, -1): if type(components[i - 1]) == int: components[i - 1] = 0 if type(components[level]) == int: components[level] += 1 return components def gen_versions(components, level): n = len(components) depth = level level = min(level, n) if not n: return [] versions = [] for i in range(n, n - level, -1): increment_version(components, i - 1) for j in range(depth): versions.append(list(components)) increment_version(components, i - 1) return versions def tryurl(fileurl, output, regex): result = False output.ebegin("Trying: " + fileurl) try: basename = os.path.basename(fileurl) fp = urllib2.urlopen(fileurl, None, 5) headers = fp.info() if 'Content-disposition' in headers and basename not in headers['Content-disposition']: result = False elif 'Content-Length' in headers and headers['Content-Length'] == '0': result = False elif 'text/html' in headers['Content-Type']: result = False elif fp.geturl() != fileurl: basename2 = os.path.basename(fp.geturl()) # Redirect to another (earlier?) version if basename != basename2 and re.match(regex, fp.geturl()): result = False else: result = True else: result = True except urllib2.URLError: result = False except IOError: result = False output.eend(errno.ENOENT if not result else 0) return result def regex_from_template(template): template = re.escape(template) template = template.replace('\$\{', '${') template = template.replace('\}', '}') template = template.replace('}\.$', '}.$') template = re.sub(r'(\$\{\d+\}\.?)+', r'([\w\.\-]+?)', template) #template = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', template) #template = re.sub(r'(\$\{\d+\})+', '(.+?)', template) template = template.replace('${PV}', r'((\d+)((\.\d+)*)([a-zA-Z]?)(((-|_)(pre|p|beta|b|alpha|a|rc|r)\d*)*))') template = template + r'/?$' return template def basedir_from_template(template): idx = template.find('${') if idx == -1: return template idx = template[0:idx].rfind('/') if idx == -1: return "" return template[0:idx] def generate_scan_paths(url): prefix, chunks = url.split('://') chunks = chunks.split('/') steps = [] path = prefix + ":/" for chunk in chunks: if '${' in chunk: steps.append((path, regex_from_template(chunk))) path = "" else: path += "/" path += chunk return steps def scan_directory_recursive(url, steps, vmin, vmax, output): if not steps: return [] url += steps[0][0] pattern = steps[0][1] steps = steps[1:] output.einfo("Scanning: %s" % url) try: fp = urllib2.urlopen(url, None, 5) except urllib2.URLError: return [] except IOError: return [] data = fp.read() results = [] if re.search("<\s*a\s+[^>]*href", data): from BeautifulSoup import BeautifulSoup soup = BeautifulSoup(data) for link in soup.findAll('a'): href = link.get("href") if not href: continue if href.startswith(url): href = href.replace(url, "", 1) match = re.match(pattern, href, re.I) if match: results.append((match.group(1), match.group(0))) elif url.startswith('ftp://'): # Probably a FTP Server buf = StringIO.StringIO(data) for line in buf.readlines(): line = line.replace("\n", "").replace("\r", "") match = re.search(pattern, line, re.I) if match: results.append((match.group(1), match.group(0))) # add url versions = [] for version, path in results: ver = parse_version(version) if vmin and ver <= vmin: continue if vmax and ver >= vmax: continue # Try to skip nightly builds when not wanted (www-apps/moodle) if len(vmin) != len(ver) and len(ver) == 2 and len(ver[0]) == len('yyyymmdd'): continue if not url.endswith('/') and not path.startswith('/'): path = url + '/' + path else: path = url + path versions.append((path, version)) if steps: ret = scan_directory_recursive(path, steps, vmin, vmax, output) versions.extend(ret) return versions def scan_directory(cpv, fileurl, options, output, limit=None): # Ftp: list dir # Handle mirrors if not options["scan-dir"]: return [] catpkg, ver, rev = portage.pkgsplit(cpv) template = template_from_url(fileurl, ver) if '${' not in template: output.einfo("Url doesn't seems to depend on version: %s not found in %s" % (ver, fileurl)) return [] else: output.einfo("Scanning: %s" % template) vmin = parse_version(ver) steps = generate_scan_paths(template) return scan_directory_recursive("", steps, vmin, limit, output) def brute_force(cpv, fileurl, options, output, limit=None): if options["brute-force"] <= 0: return [] catpkg, ver, rev = portage.pkgsplit(cpv) for bp in BRUTEFORCE_BLACKLIST_PACKAGES: if re.match(bp, catpkg): output.einfo("%s is blacklisted by rule %s" % (catpkg, bp)) return [] for bp in BRUTEFORCE_BLACKLIST_URLS: if re.match(bp, fileurl): output.einfo("%s is blacklisted by rule %s" % (catpkg, bp)) return [] components = split_version(ver) versions = gen_versions(components, options["brute-force"]) output.einfo("Generating version from " + ver) if not versions: output.einfo("Can't generate new versions from " + ver) return [] template = template_from_url(fileurl, ver) if '${PV}' not in template: output.einfo("Url doesn't seems to depend on full version: %s not found in %s" % (ver, fileurl)) return [] else: output.einfo("Brute forcing: %s" % template) result = [] i = 0 done = [] while i < len(versions): components = versions[i] i += 1 if components in done: continue done.append(tuple(components)) vstring = join_version(components) version = parse_version(vstring) if limit and version >= limit: continue url = url_from_template(template, vstring) regex = regex_from_template(template) if not tryurl(url, output, regex): continue result.append([url, vstring]) if options["brute-force-recursive"]: for v in gen_versions(components, options["brute-force"]): if v not in versions and tuple(v) not in done: versions.append(v) if options["oneshot"]: break return result def parseMirror(uri, output): from random import shuffle mirrors = portage.settings.thirdpartymirrors() if not uri.startswith("mirror://"): return uri eidx = uri.find("/", 9) if eidx == -1: output.einfo("Invalid mirror definition in SRC_URI:\n") output.einfo(" %s\n" % (uri)) return None mirrorname = uri[9:eidx] path = uri[eidx+1:] if mirrorname in mirrors: mirrors = mirrors[mirrorname] shuffle(mirrors) uri = mirrors[0].strip("/") + "/" + path else: output.einfo("No known mirror by the name: %s\n" % (mirrorname)) return None return uri def setupSignals(): """ This block ensures that ^C interrupts are handled quietly. """ import signal def exithandler(signum,frame): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) print () sys.exit(errno.EINTR) signal.signal(signal.SIGINT, exithandler) signal.signal(signal.SIGTERM, exithandler) signal.signal(signal.SIGPIPE, signal.SIG_DFL) def printVersion(): """Output the version info.""" print( "%s (%s) - %s" \ % (__productname__, __version__, __description__)) print() print("Author: %s <%s>" % (__author__,__email__)) print("Copyright 2011 Gentoo Foundation") print("Distributed under the terms of the GNU General Public License v2") def printUsage(_error=None, help=None): """Print help message. May also print partial help to stderr if an error from {'options'} is specified.""" out = sys.stdout if _error: out = sys.stderr if not _error in ('global-options', 'packages',): _error = None if not _error and not help: help = 'all' if _error in ('global-options',): print( pp.error("Wrong option on command line."), file=out) print( file=out) if _error in ('packages',): print( pp.error("You need to specify exactly one package."), file=out) print( file=out) print( white("Usage:"), file=out) if _error in ('global-options', 'packages',) or help == 'all': print( " "+turquoise(__productname__), yellow("[options]"), green(""), file=out) if _error in ('global-options',) or help == 'all': print( " "+turquoise(__productname__), yellow("[--help, --version]"), file=out) print(file=out) if _error in ('global-options',) or help: print( "Available ", yellow("options")+":", file=out) print( yellow(" -C, --nocolor")+ " - turn off colors on output", file=out) print( yellow(" -q, --quiet")+ " - be as quiet as possible", file=out) print( yellow(" -h, --help")+ \ " - display the help screen", file=out) print( yellow(" -V, --version")+ " - display version info", file=out) print( file=out) print( yellow(" -1, --oneshot")+ " - stop as soon as a new version is found", file=out) print( yellow(" -b, --brute-force=")+ " - define the brute force "+yellow("")+" (default: 2)\n" + " " * 29 + "bigger levels will generate more versions numbers\n" + " " * 29 + "0 means disabled", file=out) print( file=out) if _error in ('packages',) or help: print( green(" package")+ " - the package (or ebuild) you want to scan", file=out) print( file=out) #print( "More detailed instruction can be found in", # turquoise("`man %s`" % __productname__), file=out) class ParseArgsException(Exception): """For parseArgs() -> main() communications.""" def __init__(self, value): self.value = value # sdfgsdfsdfsd def __str__(self): return repr(self.value) def parseArgs(options={}): """Parse the command line arguments. Raise exceptions on errors. Returns package and affect the options dict. """ def optionSwitch(option,opts): """local function for interpreting command line options and setting options accordingly""" return_code = True for o, a in opts: if o in ("-h", "--help"): raise ParseArgsException('help') elif o in ("-V", "--version"): raise ParseArgsException('version') elif o in ("-C", "--nocolor"): options['nocolor'] = True pp.output.nocolor() elif o in ("-q", "--quiet"): options['quiet'] = True options['verbose'] = False elif o in ("-1", "--oneshot"): options['oneshot'] = True elif o in ("-b", "--brute-force"): options['brute-force'] = int(a) elif o in ("-v", "--verbose") and not options['quiet']: options['verbose'] = True else: return_code = False return return_code # here are the different allowed command line options (getopt args) getopt_options = {'short':{}, 'long':{}} getopt_options['short']['global'] = "hVCqv1b:" getopt_options['long']['global'] = ["help", "version", "nocolor", "quiet", "verbose", "oneshot", "brute-force="] # set default options, except 'nocolor', which is set in main() options['quiet'] = False options['verbose'] = False options['brute-force'] = 2 options['oneshot'] = False options['brute-force-recursive'] = True # FIXME add an option options['scan-dir'] = True # FIXME add an option short_opts = getopt_options['short']['global'] long_opts = getopt_options['long']['global'] opts_mode = 'global' # apply getopts to command line, show partial help on failure try: opts, args = getopt.getopt(sys.argv[1:], short_opts, long_opts) except: raise ParseArgsException(opts_mode+'-options') # set options accordingly optionSwitch(options,opts) if len(args) != 1: raise ParseArgsException('packages') return args[0] def scanUpstream(options, package, output): matches = Query(package).find( include_masked=QUERY_OPTS['include_masked'], in_installed=False ) if not matches: sys.stderr.write(pp.warn("No package matching '%s'" % pp.pkgquery(package))) sys.exit(errno.ENOENT) matches = sorted(matches) pkg = matches.pop() if pkg.version == '9999': if len(matches) == 0: sys.stderr.write(pp.warn("Package '%s' only have a dev version (9999)" % pp.pkgquery(package))) sys.exit(errno.ENOENT) else: pkg = matches.pop() if pkg.cp in BLACKLIST_PACKAGES: sys.stderr.write(pp.warn("Package '%s' is blacklisted" % pp.pkgquery(package))) sys.exit(errno.ENOENT) pp.uprint(" * %s [%s]" % (pp.cpv(pkg.cpv), pp.section(pkg.repo_name()))) pp.uprint() ebuild_path = pkg.ebuild_path() if ebuild_path: pp.uprint('Ebuild: ' + pp.path(os.path.normpath(ebuild_path))) pp.uprint('Repository: ' + pkg.repo_name()) pp.uprint('Homepage: ' + pkg.environment("HOMEPAGE")) pp.uprint('Description: ' + pkg.environment("DESCRIPTION")) pp.uprint() cpv = pkg.cpv metadata = { "EAPI" : port_settings["EAPI"], "SRC_URI" : pkg.environment("SRC_URI", False), } use = frozenset(port_settings["PORTAGE_USE"].split()) try: alist = _parse_uri_map(cpv, metadata, use=use) aalist = _parse_uri_map(cpv, metadata) except InvalidDependString as e: sys.stderr.write(pp.warn("%s\n" % str(e))) sys.stderr.write(pp.warn("Invalid SRC_URI for '%s'" % pp.pkgquery(cpv))) sys.exit(errno.ENOENT) if "mirror" in portage.settings.features: fetchme = aalist else: fetchme = alist versions = [] for filename in fetchme: for fileurl in fetchme[filename]: skipscan = False output.einfo("SRC_URI is '%s'" % fileurl) if '://' not in fileurl: output.einfo("Invalid url '%s'" % fileurl) continue for bp in SCANDIR_BLACKLIST_URLS: if re.match(bp, fileurl): output.einfo("%s is blacklisted by rule %s" % (fileurl, bp)) skipscan = True url = parseMirror(fileurl, output) # Try list dir, but not for gentoo mirrors, it's too slow if not skipscan: versions.extend(scan_directory(cpv, url, options, output)) if versions and options['oneshot']: break # Try manual bump versions.extend(brute_force(cpv, url, options, output)) if versions and options['oneshot']: break newversions = {} for url, version in versions: if version in newversions and len(url) < len(newversions[version]): continue newversions[version] = url print () for version in newversions: print ("Upstream Version: " + pp.number("%s" % version) + pp.path(" %s" % newversions[version])) if not len(newversions): print (pp.warn("Didn't find any new version, check package's homepage for " + "more informations")); return versions def main(): """Parse command line and execute all actions.""" # set default options options = {} options['nocolor'] = (port_settings["NOCOLOR"] in ('yes','true') or not sys.stdout.isatty()) if options['nocolor']: pp.output.nocolor() # parse command line options and actions try: package = parseArgs(options) # filter exception to know what message to display except ParseArgsException as e: if e.value == 'help': printUsage(help='all') sys.exit(0) elif e.value[:5] == 'help-': printUsage(help=e.value[5:]) sys.exit(0) elif e.value == 'version': printVersion() sys.exit(0) else: printUsage(e.value) sys.exit(errno.EINVAL) output = EOutput(options['quiet']) scanUpstream(options, package, output) if __name__ == "__main__": try: setupSignals() main() except KeyboardInterrupt: print( "Aborted.") sys.exit(errno.EINTR) sys.exit(0)