euscan-ng/euscan
Corentin Chary 5634c59944 euscan: better blacklists
Signed-off-by: Corentin Chary <corentincj@iksaif.net>
2011-08-28 13:20:40 +02:00

826 lines
21 KiB
Python
Executable File

#!/usr/bin/python
"""Copyright 2011 Gentoo Foundation
Distributed under the terms of the GNU General Public License v2
"""
from __future__ import print_function
# Meta:
__author__ = "Corentin Chary (iksaif)"
__email__ = "corentin.chary@gmail.com"
__version__ = "git"
__productname__ = "euscan"
__description__ = "A tool to detect new upstream releases."
# =======
# Imports
# =======
import os
import sys
import re
import time
import getopt
import errno
import random
import urllib2
import StringIO
import pkg_resources
import portage
import portage.versions
from portage import dep
from portage.dbapi import porttree
from portage.output import white, yellow, turquoise, green, teal, red, EOutput
import gentoolkit.pprinter as pp
from gentoolkit import errors
from gentoolkit.query import Query
from gentoolkit.eclean.search import (port_settings)
# =======
# Globals
# =======
QUERY_OPTS = {"include_masked": True}
BLACKLIST_VERSIONS = [
# Compatibility package for running binaries linked against a pre gcc 3.4 libstdc++, won't be updated
'>=sys-libs/libstdc++-v3-3.4',
]
BLACKLIST_PACKAGES = [
# These kernels are almost dead
'sys-kernel/usermode-sources',
'sys-kernel/xbox-sources',
'sys-kernel/cell-sources',
]
SCANDIR_BLACKLIST_URLS = [
'mirror://rubygems/(.*)', # Not browsable
'mirror://gentoo/(.*)' # Directory too big
]
BRUTEFORCE_BLACKLIST_PACKAGES = [
'net-zope/plonepopoll' # infinite loop any http://plone.org/products/plonepopoll/releases/*/plonepopoll-2-6-1.tgz link will work
]
BRUTEFORCE_BLACKLIST_URLS = [
'http://(.*)dockapps.org/download.php/id/(.*)', # infinite loop
'http://hydra.nixos.org/build/(.*)', # infinite loop
'http://www.rennings.net/gentoo/distfiles/(.*)' # Doesn't respect 404, infinite loop
]
def htop_vercmp(a, b):
def fixver(v):
if v in ['0.11', '0.12', '0.13']:
v = '0.1.' + v[3:]
return v
return simple_vercmp(fixver(a), fixver(b))
VERSION_CMP_PACKAGE_QUIRKS = {
'sys-process/htop' : htop_vercmp
}
_v = r'((\d+)((\.\d+)*)([a-zA-Z]*?)(((-|_)(pre|p|beta|b|alpha|a|rc|r)\d*)*))'
# =========
# Functions
# =========
def cast_int_components(version):
for i, obj in enumerate(version):
try:
version[i] = int(obj)
except ValueError:
pass
return version
def simple_vercmp(a, b):
if a == b:
return 0
# For sane versions
r = portage.versions.vercmp(a, b)
if r is not None:
return r
# Fallback
a = pkg_resources.parse_version(a)
b = pkg_resources.parse_version(b)
if a < b:
return -1
else:
return 1
def vercmp(package, a, b):
if package in VERSION_CMP_PACKAGE_QUIRKS:
return VERSION_CMP_PACKAGE_QUIRKS[package](a, b)
return simple_vercmp(a, b)
def skipnightly(a, b):
a = pkg_resources.parse_version(a)
b = pkg_resources.parse_version(b)
# Try to skip nightly builds when not wanted (www-apps/moodle)
if len(a) != len(b) and len(b) == 2 and len(b[0]) == len('yyyymmdd'):
return True
return False
def generate_templates_vars(version):
ret = []
part = split_version(version)
for i in range(2, len(part)):
ver = []
var = []
for j in range(i):
ver.append(str(part[j]))
var.append('${%d}' % j)
ret.append((".".join(ver), ".".join(var)))
ret.append((version, '${PV}'))
ret.reverse()
return ret
def template_from_url(url, version):
prefix, chunks = url.split('://')
chunks = chunks.split('/')
for i in range(len(chunks)):
chunk = chunks[i]
subs = generate_templates_vars(version)
for sub in subs:
chunk = chunk.replace(sub[0], sub[1])
chunks[i] = chunk
return prefix + "://" + "/".join(chunks)
def url_from_template(url, version):
components = split_version(version)
url = url.replace('${PV}', version)
for i in range(len(components)):
url = url.replace('${%d}' % i, str(components[i]))
return url
# Stolen from distutils.LooseVersion
# Used for brute force to increment the version
def split_version(version):
component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE)
components = filter(lambda x: x and x != '.', component_re.split(version))
for i in range(len(components)):
try:
components[i] = int(components[i])
except ValueError:
pass
return components
def join_version(components):
version = ""
for i in range(len(components)):
version += str(components[i])
if i >= len(components) - 1:
break
if type(components[i]) != str and type(components[i + 1]) != str:
version += "."
return version
def increment_version(components, level):
n = len(components)
if level > n - 1 or level < 0:
raise Exception
for i in range(n, level + 1, -1):
if type(components[i - 1]) == int:
components[i - 1] = 0
if type(components[level]) == int:
components[level] += 1
return components
def gen_versions(components, level):
n = len(components)
depth = level
level = min(level, n)
if not n:
return []
versions = []
for i in range(n, n - level, -1):
increment_version(components, i - 1)
for j in range(depth):
versions.append(list(components))
increment_version(components, i - 1)
return versions
def tryurl(fileurl, output, template):
result = True
output.ebegin("Trying: " + fileurl)
try:
basename = os.path.basename(fileurl)
fp = urllib2.urlopen(fileurl, None, 5)
headers = fp.info()
if 'Content-disposition' in headers and basename not in headers['Content-disposition']:
result = None
elif 'Content-Length' in headers and headers['Content-Length'] == '0':
result = None
elif 'text/html' in headers['Content-Type']:
result = None
elif fp.geturl() != fileurl:
regex = regex_from_template(template)
baseregex = regex_from_template(os.path.basename(template))
basename2 = os.path.basename(fp.geturl())
# Redirect to another (earlier?) version
if basename != basename2 and (re.match(regex, fp.geturl()) or re.match(baseregex, basename2)):
result = None
if result:
result = (fp.geturl(), fp.info())
except urllib2.URLError:
result = None
except IOError:
result = None
output.eend(errno.ENOENT if not result else 0)
return result
def regex_from_template(template):
template = re.escape(template)
template = template.replace('\$\{', '${')
template = template.replace('\}', '}')
template = template.replace('}\.$', '}.$')
template = template.replace('${1}', r'([\d]+?)')
template = re.sub(r'(\$\{\d+\}\.?)+', r'([\w]+?)', template)
#template = re.sub(r'(\$\{\d+\}\.?)+', r'([\w]+?)', template)
#template = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', template)
#template = re.sub(r'(\$\{\d+\})+', '(.+?)', template)
template = template.replace('${PV}', _v)
template = template + r'/?$'
return template
def basedir_from_template(template):
idx = template.find('${')
if idx == -1:
return template
idx = template[0:idx].rfind('/')
if idx == -1:
return ""
return template[0:idx]
def generate_scan_paths(url):
prefix, chunks = url.split('://')
chunks = chunks.split('/')
steps = []
path = prefix + ":/"
for chunk in chunks:
if '${' in chunk:
steps.append((path, regex_from_template(chunk)))
path = ""
else:
path += "/"
path += chunk
return steps
def versionBlacklisted(cp, version, output=None):
rule = None
cpv = '%s-%s' % (cp, version)
for bv in BLACKLIST_VERSIONS:
if dep.match_from_list(bv, [cpv]):
rule = bv
None
if rule and output:
output.einfo("%s is blacklisted by rule %s" % (cpv, bv))
return rule is not None
def scan_directory_recursive(cpv, url, steps, vmin, vmax, output):
if not steps:
return []
cp, ver, rev = portage.pkgsplit(cpv)
url += steps[0][0]
pattern = steps[0][1]
steps = steps[1:]
output.einfo("Scanning: %s" % url)
try:
fp = urllib2.urlopen(url, None, 5)
except urllib2.URLError:
return []
except IOError:
return []
data = fp.read()
results = []
if re.search("<\s*a\s+[^>]*href", data):
from BeautifulSoup import BeautifulSoup
soup = BeautifulSoup(data)
for link in soup.findAll('a'):
href = link.get("href")
if not href:
continue
if href.startswith(url):
href = href.replace(url, "", 1)
match = re.match(pattern, href, re.I)
if match:
results.append((match.group(1), match.group(0)))
elif url.startswith('ftp://'): # Probably a FTP Server
buf = StringIO.StringIO(data)
for line in buf.readlines():
line = line.replace("\n", "").replace("\r", "")
match = re.search(pattern, line, re.I)
if match:
results.append((match.group(1), match.group(0)))
# add url
versions = []
for version, path in results:
if vmin and vercmp(cp, version, vmin) <= 0:
continue
if vmax and vercmp(cp, version, vmax) >= 0:
continue
if versionBlacklisted(cp, version, output):
continue
if skipnightly(vmin, version):
continue
if not url.endswith('/') and not path.startswith('/'):
path = url + '/' + path
else:
path = url + path
versions.append((path, version))
if steps:
ret = scan_directory_recursive(cpv, path, steps, vmin, vmax, output)
versions.extend(ret)
return versions
'''
- python: PyPi
- PHP: PECL / PEAR
- ftp.kde.org: doesn't scan the "unstable" tree
- mysql: should use http://downloads.mysql.com/archives/
- mariadb: should use http://downloads.askmonty.org/MariaDB/+releases/
'''
def scan_directory(cpv, url, options, output, limit=None):
# Ftp: list dir
# Handle mirrors
if not options["scan-dir"]:
return []
for bu in SCANDIR_BLACKLIST_URLS:
if re.match(bu, url):
output.einfo("%s is blacklisted by rule %s" % (url, bu))
return []
resolved_url = parseMirror(url, output)
catpkg, ver, rev = portage.pkgsplit(cpv)
template = template_from_url(resolved_url, ver)
if '${' not in template:
output.einfo("Url doesn't seems to depend on version: %s not found in %s"
% (ver, fileurl))
return []
else:
output.einfo("Scanning: %s" % template)
steps = generate_scan_paths(template)
return scan_directory_recursive(cpv, "", steps, ver, limit, output)
def brute_force(cpv, fileurl, options, output, limit=None):
if options["brute-force"] <= 0:
return []
catpkg, ver, rev = portage.pkgsplit(cpv)
for bp in BRUTEFORCE_BLACKLIST_PACKAGES:
if re.match(bp, catpkg):
output.einfo("%s is blacklisted by rule %s" % (catpkg, bp))
return []
for bp in BRUTEFORCE_BLACKLIST_URLS:
if re.match(bp, fileurl):
output.einfo("%s is blacklisted by rule %s" % (catpkg, bp))
return []
output.einfo("Generating version from " + ver)
components = split_version(ver)
versions = gen_versions(components, options["brute-force"])
""" Remove unwanted versions """
for v in versions:
if vercmp(catpkg, ver, join_version(v)) >= 0:
versions.remove(v)
if not versions:
output.einfo("Can't generate new versions from " + ver)
return []
template = template_from_url(fileurl, ver)
if '${PV}' not in template:
output.einfo("Url doesn't seems to depend on full version: %s not found in %s"
% (ver, fileurl))
return []
else:
output.einfo("Brute forcing: %s" % template)
result = []
i = 0
done = []
while i < len(versions):
components = versions[i]
i += 1
if components in done:
continue
done.append(tuple(components))
vstring = join_version(components)
if versionBlacklisted(catpkg, vstring, output):
continue
if limit and vercmp(catpkg, vstring, limit) >= 0:
continue
url = url_from_template(template, vstring)
infos = tryurl(url, output, template)
if not infos:
continue
result.append([url, vstring])
if options["brute-force-recursive"]:
for v in gen_versions(components, options["brute-force"]):
if v not in versions and tuple(v) not in done:
versions.append(v)
if options["oneshot"]:
break
return result
def parseMirror(uri, output):
from random import shuffle
mirrors = portage.settings.thirdpartymirrors()
if not uri.startswith("mirror://"):
return uri
eidx = uri.find("/", 9)
if eidx == -1:
output.einfo("Invalid mirror definition in SRC_URI:\n")
output.einfo(" %s\n" % (uri))
return None
mirrorname = uri[9:eidx]
path = uri[eidx+1:]
if mirrorname in mirrors:
mirrors = mirrors[mirrorname]
shuffle(mirrors)
uri = mirrors[0].strip("/") + "/" + path
else:
output.einfo("No known mirror by the name: %s\n" % (mirrorname))
return None
return uri
def setupSignals():
""" This block ensures that ^C interrupts are handled quietly. """
import signal
def exithandler(signum,frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
print ()
sys.exit(errno.EINTR)
signal.signal(signal.SIGINT, exithandler)
signal.signal(signal.SIGTERM, exithandler)
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def printVersion():
"""Output the version info."""
print( "%s (%s) - %s" \
% (__productname__, __version__, __description__))
print()
print("Author: %s <%s>" % (__author__,__email__))
print("Copyright 2011 Gentoo Foundation")
print("Distributed under the terms of the GNU General Public License v2")
def printUsage(_error=None, help=None):
"""Print help message. May also print partial help to stderr if an
error from {'options'} is specified."""
out = sys.stdout
if _error:
out = sys.stderr
if not _error in ('global-options', 'packages',):
_error = None
if not _error and not help: help = 'all'
if _error in ('global-options',):
print( pp.error("Wrong option on command line."), file=out)
print( file=out)
if _error in ('packages',):
print( pp.error("You need to specify exactly one package."), file=out)
print( file=out)
print( white("Usage:"), file=out)
if _error in ('global-options', 'packages',) or help == 'all':
print( " "+turquoise(__productname__),
yellow("[options]"),
green("<package>"), file=out)
if _error in ('global-options',) or help == 'all':
print( " "+turquoise(__productname__),
yellow("[--help, --version]"), file=out)
print(file=out)
if _error in ('global-options',) or help:
print( "Available ", yellow("options")+":", file=out)
print( yellow(" -C, --nocolor")+
" - turn off colors on output", file=out)
print( yellow(" -q, --quiet")+
" - be as quiet as possible", file=out)
print( yellow(" -h, --help")+ \
" - display the help screen", file=out)
print( yellow(" -V, --version")+
" - display version info", file=out)
print( file=out)
print( yellow(" -1, --oneshot")+
" - stop as soon as a new version is found", file=out)
print( yellow(" -b, --brute-force=<level>")+
" - define the brute force "+yellow("<level>")+" (default: 2)\n" +
" " * 29 + "bigger levels will generate more versions numbers\n" +
" " * 29 + "0 means disabled", file=out)
print( file=out)
if _error in ('packages',) or help:
print( green(" package")+
" - the package (or ebuild) you want to scan", file=out)
print( file=out)
#print( "More detailed instruction can be found in",
# turquoise("`man %s`" % __productname__), file=out)
class ParseArgsException(Exception):
"""For parseArgs() -> main() communications."""
def __init__(self, value):
self.value = value # sdfgsdfsdfsd
def __str__(self):
return repr(self.value)
def parseArgs(options={}):
"""Parse the command line arguments. Raise exceptions on
errors. Returns package and affect the options dict.
"""
def optionSwitch(option,opts):
"""local function for interpreting command line options
and setting options accordingly"""
return_code = True
for o, a in opts:
if o in ("-h", "--help"):
raise ParseArgsException('help')
elif o in ("-V", "--version"):
raise ParseArgsException('version')
elif o in ("-C", "--nocolor"):
options['nocolor'] = True
pp.output.nocolor()
elif o in ("-q", "--quiet"):
options['quiet'] = True
options['verbose'] = False
elif o in ("-1", "--oneshot"):
options['oneshot'] = True
elif o in ("-b", "--brute-force"):
options['brute-force'] = int(a)
elif o in ("-v", "--verbose") and not options['quiet']:
options['verbose'] = True
else:
return_code = False
return return_code
# here are the different allowed command line options (getopt args)
getopt_options = {'short':{}, 'long':{}}
getopt_options['short']['global'] = "hVCqv1b:"
getopt_options['long']['global'] = ["help", "version", "nocolor", "quiet",
"verbose", "oneshot", "brute-force="]
# set default options, except 'nocolor', which is set in main()
options['quiet'] = False
options['verbose'] = False
options['brute-force'] = 2
options['oneshot'] = False
options['brute-force-recursive'] = True # FIXME add an option
options['scan-dir'] = True # FIXME add an option
short_opts = getopt_options['short']['global']
long_opts = getopt_options['long']['global']
opts_mode = 'global'
# apply getopts to command line, show partial help on failure
try:
opts, args = getopt.getopt(sys.argv[1:], short_opts, long_opts)
except:
raise ParseArgsException(opts_mode+'-options')
# set options accordingly
optionSwitch(options,opts)
if len(args) != 1:
raise ParseArgsException('packages')
return args[0]
def scanUpstream(options, package, output):
matches = Query(package).find(
include_masked=QUERY_OPTS['include_masked'],
in_installed=False
)
if not matches:
sys.stderr.write(pp.warn("No package matching '%s'" % pp.pkgquery(package)))
sys.exit(errno.ENOENT)
matches = sorted(matches)
pkg = matches.pop()
if '9999' in pkg.version:
if len(matches) == 0:
sys.stderr.write(pp.warn("Package '%s' only have a dev version (9999)" % pp.pkgquery(package)))
sys.exit(errno.ENOENT)
else:
pkg = matches.pop()
if pkg.cp in BLACKLIST_PACKAGES:
sys.stderr.write(pp.warn("Package '%s' is blacklisted" % pp.pkgquery(package)))
sys.exit(errno.ENOENT)
pp.uprint(" * %s [%s]" % (pp.cpv(pkg.cpv), pp.section(pkg.repo_name())))
pp.uprint()
ebuild_path = pkg.ebuild_path()
if ebuild_path:
pp.uprint('Ebuild: ' + pp.path(os.path.normpath(ebuild_path)))
pp.uprint('Repository: ' + pkg.repo_name())
pp.uprint('Homepage: ' + pkg.environment("HOMEPAGE"))
pp.uprint('Description: ' + pkg.environment("DESCRIPTION"))
cpv = pkg.cpv
metadata = {
"EAPI" : port_settings["EAPI"],
"SRC_URI" : pkg.environment("SRC_URI", False),
}
use = frozenset(port_settings["PORTAGE_USE"].split())
try:
alist = porttree._parse_uri_map(cpv, metadata, use=use)
aalist = porttree._parse_uri_map(cpv, metadata)
except InvalidDependString as e:
sys.stderr.write(pp.warn("%s\n" % str(e)))
sys.stderr.write(pp.warn("Invalid SRC_URI for '%s'" % pp.pkgquery(cpv)))
sys.exit(errno.ENOENT)
if "mirror" in portage.settings.features:
fetchme = aalist
else:
fetchme = alist
versions = []
for filename in fetchme:
for url in fetchme[filename]:
print ()
output.einfo("SRC_URI is '%s'" % url)
if '://' not in url:
output.einfo("Invalid url '%s'" % url)
continue
''' Try normal scan '''
versions.extend(scan_directory(cpv, url, options, output))
if versions and options['oneshot']:
break
''' Brute Force '''
versions.extend(brute_force(cpv, url, options, output))
if versions and options['oneshot']:
break
newversions = {}
for url, version in versions:
''' Try to keep the most specific urls (determinted by the length) '''
if version in newversions and len(url) < len(newversions[version]):
continue
''' Remove blacklisted versions '''
if versionBlacklisted(pkg.cp, version, output):
continue
newversions[version] = url
print ()
for version in newversions:
print ("Upstream Version:"
+ pp.number("%s" % version)
+ pp.path(" %s" % newversions[version]))
if not len(newversions):
print (pp.warn("Didn't find any new version,"
+ "check package's homepage for "
+ "more informations"));
return versions
def main():
"""Parse command line and execute all actions."""
# set default options
options = {}
options['nocolor'] = (port_settings["NOCOLOR"] in ('yes','true')
or not sys.stdout.isatty())
if options['nocolor']:
pp.output.nocolor()
# parse command line options and actions
try:
package = parseArgs(options)
# filter exception to know what message to display
except ParseArgsException as e:
if e.value == 'help':
printUsage(help='all')
sys.exit(0)
elif e.value[:5] == 'help-':
printUsage(help=e.value[5:])
sys.exit(0)
elif e.value == 'version':
printVersion()
sys.exit(0)
else:
printUsage(e.value)
sys.exit(errno.EINVAL)
output = EOutput(options['quiet'])
scanUpstream(options, package, output)
if __name__ == "__main__":
try:
setupSignals()
main()
except KeyboardInterrupt:
print( "Aborted.")
sys.exit(errno.EINTR)
sys.exit(0)