545
euscan
Executable file
545
euscan
Executable file
@ -0,0 +1,545 @@
|
||||
#!/usr/bin/python
|
||||
##############################################################################
|
||||
# $Header: $
|
||||
##############################################################################
|
||||
# Distributed under the terms of the GNU General Public License, v2 or later
|
||||
# Author: Corentin Chary <corentin.chary@gmail.com>
|
||||
|
||||
# Gentoo new upstream release scan tool.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import StringIO
|
||||
from stat import *
|
||||
from xml.sax import saxutils, make_parser, handler
|
||||
from xml.sax.handler import feature_namespaces
|
||||
|
||||
import urllib
|
||||
import urllib2
|
||||
|
||||
import pkg_resources
|
||||
|
||||
import portage
|
||||
from portage.output import *
|
||||
from portage.dbapi.porttree import _parse_uri_map
|
||||
from portage.exception import InvalidDependString
|
||||
|
||||
__version__ = "svn"
|
||||
|
||||
settings = {
|
||||
"brute-force-level" : 2,
|
||||
"brute-force" : True,
|
||||
"brute-force-crazy" : True,
|
||||
"scan-dir" : True,
|
||||
"format" : "pretty",
|
||||
"verbose" : True,
|
||||
"stop-when-found" : False,
|
||||
"check-all-files" : False,
|
||||
}
|
||||
|
||||
output = EOutput()
|
||||
output.quiet = not settings['verbose']
|
||||
|
||||
def cast_int_components(version):
|
||||
for i, obj in enumerate(version):
|
||||
try:
|
||||
version[i] = int(obj)
|
||||
except ValueError:
|
||||
pass
|
||||
return version
|
||||
|
||||
def parse_version(version):
|
||||
version = pkg_resources.parse_version(version)
|
||||
#version = list(version)
|
||||
#return cast_int_components(version)
|
||||
return version
|
||||
|
||||
|
||||
def template_from_url(url, version):
|
||||
prefix, chunks = url.split('://')
|
||||
chunks = chunks.split('/')
|
||||
|
||||
for i in range(len(chunks)):
|
||||
chunk = chunks[i]
|
||||
|
||||
if not chunk:
|
||||
continue
|
||||
|
||||
# If it's the full version, it's easy
|
||||
if version in chunk:
|
||||
chunk = chunk.replace(version, '${PV}')
|
||||
# For directories made from a part of the version
|
||||
elif version.startswith(chunk):
|
||||
full = split_version(version)
|
||||
part = split_version(chunk)
|
||||
|
||||
for j in range(min(len(full), len(part))):
|
||||
|
||||
if part[j] != full[j]:
|
||||
break
|
||||
part[j] = '${%d}' % j
|
||||
|
||||
chunk = join_version(part)
|
||||
chunk = chunk.replace('}$', '}.$')
|
||||
|
||||
chunks[i] = chunk
|
||||
|
||||
return prefix + "://" + "/".join(chunks)
|
||||
|
||||
def url_from_template(url, version):
|
||||
components = split_version(version)
|
||||
|
||||
url = url.replace('${PV}', version)
|
||||
for i in range(len(components)):
|
||||
url = url.replace('${%d}' % i, str(components[i]))
|
||||
|
||||
return url
|
||||
|
||||
# Stolen from distutils.LooseVersion
|
||||
# Used for brute force to increment the version
|
||||
def split_version(version):
|
||||
component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE)
|
||||
components = filter(lambda x: x and x != '.', component_re.split(version))
|
||||
for i in range(len(components)):
|
||||
try:
|
||||
components[i] = int(components[i])
|
||||
except ValueError:
|
||||
pass
|
||||
return components
|
||||
|
||||
def join_version(components):
|
||||
version = ""
|
||||
for i in range(len(components)):
|
||||
version += str(components[i])
|
||||
if i >= len(components) - 1:
|
||||
break
|
||||
if type(components[i]) != str and type(components[i + 1]) != str:
|
||||
version += "."
|
||||
return version
|
||||
|
||||
def increment_version(components, level):
|
||||
n = len(components)
|
||||
|
||||
if level > n - 1 or level < 0:
|
||||
raise Exception
|
||||
|
||||
for i in range(n, level + 1, -1):
|
||||
if type(components[i - 1]) == int:
|
||||
components[i - 1] = 0
|
||||
|
||||
if type(components[level]) == int:
|
||||
components[level] += 1
|
||||
|
||||
return components
|
||||
|
||||
def gen_versions(components, level):
|
||||
n = len(components)
|
||||
depth = level
|
||||
level = min(level, n)
|
||||
|
||||
if not n:
|
||||
return []
|
||||
|
||||
versions = []
|
||||
|
||||
for i in range(n, n - level, -1):
|
||||
increment_version(components, i - 1)
|
||||
for j in range(depth):
|
||||
versions.append(list(components))
|
||||
increment_version(components, i - 1)
|
||||
|
||||
return versions
|
||||
|
||||
def tryurl(fileurl):
|
||||
result = False
|
||||
|
||||
output.ebegin("Trying: " + fileurl)
|
||||
|
||||
try:
|
||||
fp = urllib2.urlopen(fileurl, None, 5)
|
||||
headers = fp.info()
|
||||
|
||||
basename = os.path.basename(fileurl)
|
||||
|
||||
if 'Content-disposition' in headers and basename not in headers['Content-disposition']:
|
||||
result = False
|
||||
elif 'Content-Length' in headers and headers['Content-Length'] == '0':
|
||||
result = False
|
||||
elif 'text/html' in headers['Content-Type']:
|
||||
result = False
|
||||
else:
|
||||
result = True
|
||||
except:
|
||||
retult = False
|
||||
|
||||
output.eend(errno.ENOENT if not result else 0)
|
||||
|
||||
return result
|
||||
|
||||
def regex_from_template(template):
|
||||
template = re.escape(template)
|
||||
template = template.replace('\$\{', '${')
|
||||
template = template.replace('\}', '}')
|
||||
template = template.replace('}\.$', '}.$')
|
||||
template = re.sub(r'(\$\{\d+\}\.?)+', r'([\w\.\-]+?)', template)
|
||||
#template = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', template)
|
||||
#template = re.sub(r'(\$\{\d+\})+', '(.+?)', template)
|
||||
template = template.replace('${PV}', r'([\w\.\-]+?)')
|
||||
template = template + r'/?$'
|
||||
return template
|
||||
|
||||
def basedir_from_template(template):
|
||||
idx = template.find('${')
|
||||
if idx == -1:
|
||||
return template
|
||||
|
||||
idx = template[0:idx].rfind('/')
|
||||
if idx == -1:
|
||||
return ""
|
||||
|
||||
return template[0:idx]
|
||||
|
||||
def generate_scan_paths(url):
|
||||
prefix, chunks = url.split('://')
|
||||
chunks = chunks.split('/')
|
||||
|
||||
steps = []
|
||||
|
||||
path = prefix + ":/"
|
||||
for chunk in chunks:
|
||||
if '${' in chunk:
|
||||
steps.append((path, regex_from_template(chunk)))
|
||||
path = ""
|
||||
else:
|
||||
path += "/"
|
||||
path += chunk
|
||||
return steps
|
||||
|
||||
def scan_directory_recursive(url, steps, vmin, vmax):
|
||||
if not steps:
|
||||
return []
|
||||
|
||||
url += steps[0][0]
|
||||
pattern = steps[0][1]
|
||||
|
||||
steps = steps[1:]
|
||||
|
||||
output.einfo("Scanning: %s" % url)
|
||||
|
||||
try:
|
||||
fp = urllib2.urlopen(url, None, 5)
|
||||
except Exception, err:
|
||||
return []
|
||||
|
||||
data = fp.read()
|
||||
|
||||
results = []
|
||||
|
||||
if re.search("<\s*a\s+[^>]*href", data):
|
||||
from BeautifulSoup import BeautifulSoup
|
||||
|
||||
soup = BeautifulSoup(data)
|
||||
|
||||
for link in soup.findAll('a'):
|
||||
href = link.get("href")
|
||||
if not href:
|
||||
continue
|
||||
if href.startswith(url):
|
||||
href = href.replace(url, "", 1)
|
||||
|
||||
match = re.match(pattern, href)
|
||||
if match:
|
||||
results.append((match.group(1), match.group(0)))
|
||||
|
||||
elif url.startswith('ftp://'): # Probably a FTP Server
|
||||
buf = StringIO.StringIO(data)
|
||||
for line in buf.readlines():
|
||||
line = line.replace("\n", "").replace("\r", "")
|
||||
match = re.search(pattern, line)
|
||||
if match:
|
||||
results.append((match.group(1), match.group(0)))
|
||||
# add url
|
||||
|
||||
versions = []
|
||||
|
||||
for version, path in results:
|
||||
ver = parse_version(version)
|
||||
if vmin and ver <= vmin:
|
||||
continue
|
||||
if vmax and ver >= vmax:
|
||||
continue
|
||||
|
||||
if not url.endswith('/') and not path.startswith('/'):
|
||||
path = url + '/' + path
|
||||
else:
|
||||
path = url + path
|
||||
|
||||
versions.append((path, version))
|
||||
if steps:
|
||||
ret = scan_directory_recursive(path, steps, vmin, vmax)
|
||||
versions.extend(ret)
|
||||
return versions
|
||||
|
||||
def scan_directory(cpv, fileurl, limit=None):
|
||||
# Ftp: list dir
|
||||
# Handle mirrors
|
||||
if not settings["scan-dir"]:
|
||||
return []
|
||||
|
||||
catpkg, ver, rev = portage.pkgsplit(cpv)
|
||||
|
||||
template = template_from_url(fileurl, ver)
|
||||
|
||||
if '${' not in template:
|
||||
output.ewarn("Url doesn't seems to depend on version: %s not found in %s"
|
||||
% (ver, fileurl))
|
||||
return []
|
||||
else:
|
||||
output.einfo("Scanning: %s" % template)
|
||||
|
||||
vmin = parse_version(ver)
|
||||
|
||||
steps = generate_scan_paths(template)
|
||||
return scan_directory_recursive("", steps, vmin, limit)
|
||||
|
||||
def brute_force(cpv, fileurl, limit=None):
|
||||
if not settings["brute-force"]:
|
||||
return []
|
||||
|
||||
catpkg, ver, rev = portage.pkgsplit(cpv)
|
||||
|
||||
components = split_version(ver)
|
||||
versions = gen_versions(components, settings["brute-force-level"])
|
||||
|
||||
output.einfo("Generating version from " + ver)
|
||||
|
||||
if not versions:
|
||||
output.ewarn("Can't generate new versions from " + ver)
|
||||
return []
|
||||
|
||||
template = template_from_url(fileurl, ver)
|
||||
|
||||
if '${' not in template:
|
||||
output.ewarn("Url doesn't seems to depend on version: %s not found in %s"
|
||||
% (fileurl, ver))
|
||||
return []
|
||||
else:
|
||||
output.einfo("Brute forcing: %s" % template)
|
||||
|
||||
result = []
|
||||
|
||||
i = 0
|
||||
done = []
|
||||
while i < len(versions):
|
||||
components = versions[i]
|
||||
i += 1
|
||||
if components in done:
|
||||
continue
|
||||
done.append(tuple(components))
|
||||
|
||||
vstring = join_version(components)
|
||||
version = parse_version(vstring)
|
||||
|
||||
if limit and version >= limit:
|
||||
continue
|
||||
|
||||
url = url_from_template(template, vstring)
|
||||
|
||||
if not tryurl(url):
|
||||
continue
|
||||
|
||||
result.append([url, vstring])
|
||||
|
||||
if settings["brute-force-crazy"]:
|
||||
for v in gen_versions(components, settings["brute-force-level"]):
|
||||
if v not in versions and tuple(v) not in done:
|
||||
versions.append(v)
|
||||
|
||||
if settings["stop-when-found"]:
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
def euscan(cpv, portdir):
|
||||
catpkg, ver, rev = portage.pkgsplit(cpv)
|
||||
|
||||
if portdir:
|
||||
portdb = portage.portdbapi(portdir)
|
||||
else:
|
||||
portdb = portage.portdbapi()
|
||||
|
||||
src_uri, repo = portdb.aux_get(cpv, ['SRC_URI', 'repository'])
|
||||
|
||||
metadata = {
|
||||
"EAPI" : portage.settings["EAPI"],
|
||||
"SRC_URI" : src_uri,
|
||||
}
|
||||
use = frozenset(portage.settings["PORTAGE_USE"].split())
|
||||
try:
|
||||
alist = _parse_uri_map(cpv, metadata, use=use)
|
||||
aalist = _parse_uri_map(cpv, metadata)
|
||||
except InvalidDependString as e:
|
||||
red("!!! %s\n" % str(e))
|
||||
red(_("!!! Invalid SRC_URI for '%s'.\n") % cpv)
|
||||
del e
|
||||
return
|
||||
|
||||
if "mirror" in portage.settings.features:
|
||||
fetchme = aalist
|
||||
else:
|
||||
fetchme = alist
|
||||
|
||||
versions = []
|
||||
|
||||
for filename in fetchme:
|
||||
for fileurl in fetchme[filename]:
|
||||
if fileurl.startswith('mirror://'):
|
||||
output.eerror('mirror:// scheme not supported (%s)' % fileurl)
|
||||
continue
|
||||
|
||||
# Try list dir
|
||||
versions.extend(scan_directory(cpv, fileurl))
|
||||
|
||||
if versions and settings['stop-when-found']:
|
||||
break
|
||||
|
||||
# Try manual bump
|
||||
versions.extend(brute_force(cpv, fileurl))
|
||||
|
||||
if versions and settings['stop-when-found']:
|
||||
break
|
||||
|
||||
if versions and not settings["check-all-files"]:
|
||||
break
|
||||
|
||||
newversions = {}
|
||||
|
||||
for url, version in versions:
|
||||
if version in newversions and len(url) < len(newversions[version]):
|
||||
continue
|
||||
newversions[version] = url
|
||||
|
||||
for version in newversions:
|
||||
print darkgreen("New Upstream Version: ") + green("%s" % version) + " %s" % newversions[version]
|
||||
return versions
|
||||
|
||||
class Metadata_XML(handler.ContentHandler):
|
||||
_inside_herd="No"
|
||||
_inside_maintainer="No"
|
||||
_inside_email="No"
|
||||
_inside_longdescription="No"
|
||||
|
||||
_herd = []
|
||||
_maintainers = []
|
||||
_longdescription = ""
|
||||
|
||||
def startElement(self, tag, attr):
|
||||
if tag == "herd":
|
||||
self._inside_herd="Yes"
|
||||
if tag == "longdescription":
|
||||
self._inside_longdescription="Yes"
|
||||
if tag == "maintainer":
|
||||
self._inside_maintainer="Yes"
|
||||
if tag == "email":
|
||||
self._inside_email="Yes"
|
||||
|
||||
def endElement(self, tag):
|
||||
if tag == "herd":
|
||||
self._inside_herd="No"
|
||||
if tag == "longdescription":
|
||||
self._inside_longdescription="No"
|
||||
if tag == "maintainer":
|
||||
self._inside_maintainer="No"
|
||||
if tag == "email":
|
||||
self._inside_email="No"
|
||||
|
||||
def characters(self, contents):
|
||||
if self._inside_herd == "Yes":
|
||||
self._herd.append(contents)
|
||||
|
||||
if self._inside_longdescription == "Yes":
|
||||
self._longdescription = contents
|
||||
|
||||
if self._inside_maintainer=="Yes" and self._inside_email=="Yes":
|
||||
self._maintainers.append(contents)
|
||||
|
||||
|
||||
def check_metadata(cpv, portdir = None):
|
||||
"""Checks that the primary maintainer is still an active dev and list the herd the package belongs to"""
|
||||
if not portdir:
|
||||
portdb = portage.portdbapi()
|
||||
repo, = portdb.aux_get(cpv, ['repository'])
|
||||
portdir = portdb.getRepositoryPath(repo)
|
||||
|
||||
metadata_file = portdir + "/" + portage.pkgsplit(cpv)[0] + "/metadata.xml"
|
||||
|
||||
if not os.path.exists(metadata_file):
|
||||
print darkgreen("Maintainer: ") + red("Error (Missing metadata.xml)")
|
||||
return 1
|
||||
|
||||
parser = make_parser()
|
||||
handler = Metadata_XML()
|
||||
handler._maintainers = []
|
||||
parser.setContentHandler(handler)
|
||||
parser.parse( metadata_file )
|
||||
|
||||
if handler._herd:
|
||||
herds = ", ".join(handler._herd)
|
||||
print darkgreen("Herd: ") + herds
|
||||
else:
|
||||
print darkgreen("Herd: ") + red("Error (No Herd)")
|
||||
return 1
|
||||
|
||||
|
||||
if handler._maintainers:
|
||||
print darkgreen("Maintainer: ") + ", ".join(handler._maintainers)
|
||||
else:
|
||||
print darkgreen("Maintainer: ") + "none"
|
||||
|
||||
if len(handler._longdescription) > 1:
|
||||
print darkgreen("Description: ") + handler._longdescription
|
||||
print darkgreen("Location: ") + os.path.normpath(portdir + "/" + portage.pkgsplit(cpv)[0])
|
||||
|
||||
|
||||
def usage(code):
|
||||
"""Prints the uage information for this script"""
|
||||
print green("euscan"), "(%s)" % __version__
|
||||
print
|
||||
print "Usage: euscan [ebuild|[package-cat/]package[-version]]"
|
||||
sys.exit(code)
|
||||
|
||||
|
||||
# default color setup
|
||||
if ( not sys.stdout.isatty() ) or ( portage.settings["NOCOLOR"] in ["yes","true"] ):
|
||||
nocolor()
|
||||
|
||||
def fc(x,y):
|
||||
return cmp(y[0], x[0])
|
||||
|
||||
def main ():
|
||||
if len( sys.argv ) < 2:
|
||||
usage(1)
|
||||
|
||||
for pkg in sys.argv[1:]:
|
||||
#try:
|
||||
if pkg.endswith('.ebuild'):
|
||||
portdir = os.path.dirname(os.path.dirname(os.path.dirname(pkg)))
|
||||
package_list = os.path.basname(pkg)
|
||||
else:
|
||||
portdir = None
|
||||
print pkg
|
||||
package_list = portage.portdb.xmatch("match-all", pkg)
|
||||
|
||||
for cpv in package_list:
|
||||
print darkgreen("Package: ") + cpv
|
||||
#check_metadata(cpv, portdir)
|
||||
euscan(cpv, portdir)
|
||||
print ""
|
||||
#except Exception, err:
|
||||
# print red("Error: "+pkg+"\n")
|
||||
# print err
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user