Change source layout

* In preparation for PEP517 transition.

Signed-off-by: Alfred Wingate <parona@protonmail.com>
This commit is contained in:
Alfred Wingate
2023-11-14 19:58:44 +02:00
parent ec7399752c
commit c873e1520d
25 changed files with 3 additions and 3 deletions

91
src/euscan/__init__.py Normal file
View File

@ -0,0 +1,91 @@
#!/usr/bin/python
#
# Copyright 2011 Corentin Chary <corentin.chary@gmail.com>
# Distributed under the terms of the GNU General Public License v2
__version__ = "git"
import configparser
import os
from ast import literal_eval
CONFIG = {
'nocolor': False,
'quiet': False,
'verbose': 1,
'debug': False,
'brute-force': 3,
'brute-force-recursive': True,
'brute-force-false-watermark': 50,
'scan-dir': True,
'oneshot': True,
'user-agent': 'escan (http://euscan.iksaif.net)',
'skip-robots-txt': False,
'cache': False,
'format': None,
'indent': 2,
'progress': False,
'mirror': False,
'ignore-pre-release': False,
'ignore-pre-release-if-stable': False,
'ebuild-uri': False,
'handlers-exclude': [],
}
config = configparser.ConfigParser()
config.read(['/etc/euscan.conf', os.path.expanduser('~/.euscan.conf')])
if config.has_section("euscan"):
for key, value in config.items("euscan"):
if key in CONFIG:
CONFIG[key] = literal_eval(value)
BLACKLIST_VERSIONS = [
# Compatibility package for running binaries linked against a
# pre gcc 3.4 libstdc++, won't be updated
'>=sys-libs/libstdc++-v3-3.4',
# Actually older or incorrect
'~app-backup/backup-manager-0.7.15',
'=x11-plugins/wmacpimon-001',
]
BLACKLIST_PACKAGES = [
# These kernels are almost dead
'sys-kernel/xbox-sources',
]
SCANDIR_BLACKLIST_URLS = [
'mirror://rubygems/(.*)', # Not browsable
'mirror://gentoo/(.*)' # Directory too big
]
BRUTEFORCE_BLACKLIST_PACKAGES = [
# infinite loop any
# http://plone.org/products/plonepopoll/releases/*/plonepopoll-2-6-1.tgz
# link will work
'net-zope/plonepopoll'
]
BRUTEFORCE_BLACKLIST_URLS = [
'http://hydra.nixos.org/build/(.*)', # infinite loop
# Doesn't respect 404, infinite loop
'http://www.rennings.net/gentoo/distfiles/(.*)',
'http://art.gnome.org/download/(.*)',
'http://barelysufficient.org/~olemarkus/(.*)',
'http://olemarkus.org/~olemarkus/(.*)',
]
ROBOTS_TXT_BLACKLIST_DOMAINS = [
'(.*)sourceforge(.*)',
'(.*)github.com',
'(.*)qt\.nokia\.com(.*)',
'(.*)chromium\.org(.*)',
'(.*)nodejs\.org(.*)',
'(.*)download\.mono-project\.com(.*)',
'(.*)fedorahosted\.org(.*)',
'(.*)download\.tuxfamily\.org(.*)',
'(.*)festvox\.org(.*)',
]
from euscan.out import EuscanOutput
output = EuscanOutput(CONFIG)

89
src/euscan/ebuild.py Normal file
View File

@ -0,0 +1,89 @@
import os
import sys
import imp
import portage
from portage.const import VDB_PATH
from portage import _encodings
from portage import _shell_quote
from portage import _unicode_decode
from portage import _unicode_encode
# Stolen from the ebuild command
def package_from_ebuild(ebuild):
pf = None
if ebuild.endswith(".ebuild"):
pf = os.path.basename(ebuild)[:-7]
else:
return False
if not os.path.isabs(ebuild):
mycwd = os.getcwd()
# Try to get the non-canonical path from the PWD evironment variable,
# since the canonical path returned from os.getcwd() may may be
# unusable in cases where the directory stucture is built from
# symlinks.
pwd = os.environ.get('PWD', '')
if sys.hexversion < 0x3000000:
pwd = _unicode_decode(pwd, encoding=_encodings['content'],
errors='strict')
if pwd and pwd != mycwd and \
os.path.realpath(pwd) == mycwd:
mycwd = portage.normalize_path(pwd)
ebuild = os.path.join(mycwd, ebuild)
ebuild = portage.normalize_path(ebuild)
# portdbapi uses the canonical path for the base of the portage tree, but
# subdirectories of the base can be built from symlinks (like crossdev
# does).
ebuild_portdir = os.path.realpath(
os.path.dirname(os.path.dirname(os.path.dirname(ebuild))))
ebuild = os.path.join(ebuild_portdir, *ebuild.split(os.path.sep)[-3:])
vdb_path = os.path.join(portage.settings['ROOT'], VDB_PATH)
# Make sure that portdb.findname() returns the correct ebuild.
if ebuild_portdir != vdb_path and \
ebuild_portdir not in portage.portdb.porttrees:
if sys.hexversion >= 0x3000000:
os.environ["PORTDIR_OVERLAY"] = \
os.environ.get("PORTDIR_OVERLAY", "") + \
" " + _shell_quote(ebuild_portdir)
else:
os.environ["PORTDIR_OVERLAY"] = \
os.environ.get("PORTDIR_OVERLAY", "") + \
" " + _unicode_encode(_shell_quote(ebuild_portdir),
encoding=_encodings['content'], errors='strict')
portage.close_portdbapi_caches()
imp.reload(portage)
del portage.portdb.porttrees[1:]
if ebuild_portdir != portage.portdb.porttree_root:
portage.portdb.porttrees.append(ebuild_portdir)
if not os.path.exists(ebuild):
return False
ebuild_split = ebuild.split("/")
cpv = "%s/%s" % (ebuild_split[-3], pf)
if not portage.catpkgsplit(cpv):
return False
if ebuild.startswith(os.path.join(portage.root, portage.const.VDB_PATH)):
mytree = "vartree"
portage_ebuild = portage.db[portage.root][mytree].dbapi.findname(cpv)
if os.path.realpath(portage_ebuild) != ebuild:
return False
else:
mytree = "porttree"
portage_ebuild = portage.portdb.findname(cpv)
if not portage_ebuild or portage_ebuild != ebuild:
return False
return cpv

View File

@ -0,0 +1,216 @@
import os
import sys
import pkgutil
from euscan import CONFIG, output
from gentoolkit.metadata import MetaData
handlers = {'package': [], 'url': [], 'all': {}}
# autoimport all modules in this directory and append them to handlers list
for loader, module_name, is_pkg in pkgutil.walk_packages(__path__):
module = loader.find_module(module_name).load_module(module_name)
if not hasattr(module, 'HANDLER_NAME'):
continue
if hasattr(module, 'scan_url'):
handlers['url'].append(module)
if hasattr(module, 'scan_pkg'):
handlers['package'].append(module)
handlers['all'][module.HANDLER_NAME] = module
# sort handlers by priority
def sort_handlers(handlers):
return sorted(
handlers,
key=lambda handler: handler.PRIORITY,
reverse=True
)
handlers['package'] = sort_handlers(handlers['package'])
handlers['url'] = sort_handlers(handlers['url'])
def find_best_handler(kind, pkg, *args):
"""
Find the best handler for the given package
"""
for handler in handlers[kind]:
if (handler.HANDLER_NAME not in CONFIG["handlers-exclude"] and
handler.can_handle(pkg, *args)):
return handler
return None
def find_handlers(kind, names):
ret = []
for name in names:
# Does this handler exist, and handle this kind of thing ? (pkg / url)
if name in handlers['all'] and handlers['all'][name] in handlers[kind]:
ret.append(handlers['all'][name])
return ret
def get_metadata(pkg):
metadata = {}
pkg_metadata = None
meta_override = os.path.join('metadata', pkg.category, pkg.name,
'metadata.xml')
try:
if os.path.exists(meta_override):
pkg_metadata = MetaData(meta_override)
output.einfo('Using custom metadata: %s' % meta_override)
if not pkg_metadata:
pkg_metadata = pkg.metadata
except Exception as e:
output.ewarn('Error when fetching metadata: %s' % str(e))
if not pkg_metadata:
return {}
# Support multiple remote-id and multiple watch
for upstream in pkg_metadata._xml_tree.findall("upstream"):
for node in upstream.findall("watch"):
options = dict(node.attrib)
options['data'] = node.text
if "type" in options:
handler = options['type']
else:
handler = "url"
options['type'] = "url"
for key in ["versionmangle", "downloadurlmangle"]:
value = options.get(key, None)
if value:
options[key] = value.split(";")
if handler not in metadata:
metadata[handler] = []
metadata[handler].append(options)
for upstream in pkg_metadata._xml_tree.findall("upstream"):
for node in upstream.findall("remote-id"):
handler = node.attrib.get("type")
if not handler:
continue
if handler in metadata:
for i in range(len(metadata[handler])):
if not metadata[handler][i]['data']:
metadata[handler][i]['data'] = node.text
else:
metadata[handler] = [{'type': handler, 'data': node.text}]
return metadata
def scan_pkg(pkg_handler, pkg, options, on_progress=None):
versions = []
if on_progress:
on_progress(increment=35)
for o in options:
versions += pkg_handler.scan_pkg(pkg, o)
if on_progress:
on_progress(increment=35)
return versions
def scan_url(pkg, urls, options, on_progress=None):
versions = []
if on_progress:
progress_available = 70
num_urls = sum([len(urls[fn]) for fn in urls])
if num_urls > 0:
progress_increment = progress_available / num_urls
else:
progress_increment = 0
for filename in urls:
for url in urls[filename]:
if on_progress and progress_available > 0:
on_progress(increment=progress_increment)
progress_available -= progress_increment
output.einfo("SRC_URI is '%s'" % url)
if '://' not in url:
output.einfo("Invalid url '%s'" % url)
continue
try:
url_handler = find_best_handler('url', pkg, url)
if url_handler:
for o in options:
versions += url_handler.scan_url(pkg, url, o)
else:
output.eerror("Can't find a suitable handler!")
except Exception as e:
output.ewarn(
"Handler failed: [%s] %s" %
(e.__class__.__name__, str(e))
)
if versions and CONFIG['oneshot']:
break
if on_progress and progress_available > 0:
on_progress(increment=progress_available)
return versions
def scan(pkg, urls, on_progress=None):
"""
Scans upstream for the given package.
First tries if a package wide handler is available, then fallbacks
in url handling.
"""
if not CONFIG['quiet'] and not CONFIG['format']:
sys.stdout.write('\n')
metadata = get_metadata(pkg)
versions = []
pkg_handlers = find_handlers('package', list(metadata.keys()))
if not pkg_handlers:
pkg_handler = find_best_handler('package', pkg)
if pkg_handler:
pkg_handlers = [pkg_handler]
for pkg_handler in pkg_handlers:
options = metadata.get(pkg_handler.HANDLER_NAME, [{}])
versions += scan_pkg(pkg_handler, pkg, options, on_progress)
if not pkg_handlers:
versions += scan_url(pkg, urls, [{}], on_progress)
return versions
def mangle(kind, name, string):
if name not in handlers['all']:
return None
handler = handlers['all'][name]
if not hasattr(handler, 'mangle_%s' % kind):
return None
return getattr(handler, 'mangle_%s' % kind)(string)
def mangle_url(name, string):
return mangle('url', name, string)
def mangle_version(name, string):
return mangle('version', name, string)

View File

@ -0,0 +1,59 @@
import re
import urllib.request, urllib.parse, urllib.error
import portage
from euscan.helpers import regex_from_template
from euscan.handlers.url import process_scan as url_scan
from euscan import output
HANDLER_NAME = "berlios"
CONFIDENCE = 90
PRIORITY = 90
berlios_regex = r"mirror://berlios/([^/]+)/([^/]+)"
def can_handle(pkg, url=None):
if not url:
return False
cp, ver, rev = portage.pkgsplit(pkg.cpv)
if ver not in url:
return False
return re.search(berlios_regex, url)
def scan_url(pkg, url, options):
output.einfo("Using BerliOS handler")
cp, ver, rev = portage.pkgsplit(pkg.cpv)
project, filename = re.search(berlios_regex, url).groups()
project_page = "http://developer.berlios.de/projects/%s" % project
content = urllib.request.urlopen(project_page).read()
project_id = re.search(
r"/project/filelist.php\?group_id=(\d+)",
content
).group(1)
base_url = (
"http://developer.berlios.de/project/filelist.php?group_id=%s" %
project_id
)
file_pattern = regex_from_template(
filename.replace(ver, "${PV}")
)
result = url_scan(pkg, base_url, file_pattern)
ret = []
for found_url, pv, _, _ in result:
found_url = found_url.replace("prdownload", "download")
ret.append((found_url, pv, HANDLER_NAME, CONFIDENCE))
return ret

161
src/euscan/handlers/cpan.py Normal file
View File

@ -0,0 +1,161 @@
import re
import portage
import urllib.request, urllib.error, urllib.parse
import json
from euscan import helpers, output, mangling
HANDLER_NAME = "cpan"
CONFIDENCE = 100
PRIORITY = 90
_cpan_package_name_re = re.compile("mirror://cpan/authors/.*/([^/.]*).*")
def can_handle(pkg, url=None):
return url and url.startswith('mirror://cpan/')
def guess_package(cp, url):
match = _cpan_package_name_re.search(url)
pkg = None
if match:
pkg = match.group(1)
try:
cp, ver, rev = portage.pkgsplit('fake/' + pkg)
except:
pass
cat, pkg = cp.split("/")
return pkg
def mangle_version(up_pv):
if up_pv.startswith('v'):
return up_pv[1:]
# clean
up_pv = up_pv.replace("._", "_") # e.g.: 0.999._002 -> 0.999_002
up_pv = up_pv.replace("_0.", "_") # e.g.: 0.30_0.1 -> 0.30_1
# Detect _rc versions
rc_part = ""
if up_pv.count("_") == 1:
up_pv, rc_part = up_pv.split("_")
# Gentoo creates groups of 3 digits, except for the first digit,
# or when last digit is 0. e.g.: 4.11 -> 4.110.0
splitted = up_pv.split(".")
if len(splitted) == 2: # Split second part is sub-groups
part = splitted.pop()
for i in range(0, len(part), 3):
splitted.append(part[i:i + 3])
if len(splitted) == 2: # add last group if it's missing
splitted.append("0")
groups = [splitted[0]]
for part in splitted[1:-1]:
groups.append(part.ljust(3, "0"))
if splitted[-1] == "0":
groups.append(splitted[-1])
else:
groups.append(splitted[-1].ljust(3, "0"))
# if there's a group with leading zeros strip it. e.g.: 002 -> 2
groups = [g.lstrip("0") if g != "0" else g for g in groups]
pv = ".".join(groups)
if rc_part:
pv = "%s_rc%s" % (pv, rc_part)
return pv
def cpan_mangle_version(pv):
pos = pv.find('.')
if pos <= 0:
return pv
up_pv = pv.replace('.', '')
up_pv = up_pv[0:pos] + '.' + up_pv[pos:]
return up_pv
def cpan_vercmp(cp, a, b):
try:
return float(a) - float(b)
except:
return helpers.simple_vercmp(a, b)
def scan_url(pkg, url, options):
cp, ver, rev = portage.pkgsplit(pkg.cpv)
remote_pkg = guess_package(cp, url)
output.einfo("Using CPAN API: %s", remote_pkg)
return scan_pkg(pkg, {'data': remote_pkg})
def scan_pkg(pkg, options):
remote_pkg = options['data']
# Defaults to CPAN mangling rules
if 'versionmangle' not in options:
options['versionmangle'] = ['cpan', 'gentoo']
url = 'http://search.cpan.org/api/dist/%s' % remote_pkg
cp, ver, rev = pkg.cp, pkg.version, pkg.revision
m_ver = cpan_mangle_version(ver)
output.einfo("Using CPAN API: " + url)
try:
fp = helpers.urlopen(url)
except urllib.error.URLError:
return []
except IOError:
return []
if not fp:
return []
data = fp.read()
data = json.loads(data)
if 'releases' not in data:
return []
ret = []
for version in data['releases']:
#if version['status'] == 'testing':
# continue
up_pv = version['version']
pv = mangling.mangle_version(up_pv, options)
if up_pv.startswith('v'):
if helpers.version_filtered(cp, ver, pv):
continue
else:
m_pv = cpan_mangle_version(up_pv)
if helpers.version_filtered(cp, m_ver, m_pv, cpan_vercmp):
continue
url = 'mirror://cpan/authors/id/%s/%s/%s/%s' % (
version['cpanid'][0],
version['cpanid'][0:1],
version['cpanid'],
version['archive']
)
url = mangling.mangle_url(url, options)
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,53 @@
import urllib.request, urllib.parse, urllib.error
import re
import bz2
import zlib
import portage
from euscan import mangling, helpers, output
HANDLER_NAME = "deb"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return False
def scan_pkg(pkg, options):
cp, ver, rev = portage.pkgsplit(pkg.cpv)
packages_url, package_name = options['data'].strip().split(" ", 1)
output.einfo("Using Debian Packages: " + packages_url)
fp = urllib.request.urlopen(packages_url)
content = fp.read()
# Support for .gz and .bz2 Packages file
if packages_url.endswith(".bz2"):
content = bz2.decompress(content)
if packages_url.endswith(".gz"):
content = zlib.decompress(content, 16 + zlib.MAX_WBITS)
content = content.split("\n\n")
result = []
for package_info in content:
package_line = re.search(r"^Package: (.*)$", package_info, re.M)
version_line = re.search(r"^Version: (.*)$", package_info, re.M)
if package_line and package_line.group(1) == package_name:
if version_line:
result.append(version_line.group(1))
ret = []
for up_pv in result:
url = "" # TODO: How to find the url?
pv = mangling.mangle_version(up_pv, options)
if helpers.version_filtered(cp, ver, pv):
continue
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,48 @@
import urllib.request, urllib.parse, urllib.error
import re
import portage
from euscan import mangling, helpers, output
HANDLER_NAME = "freecode"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return False
def scan_pkg(pkg, options):
cp, ver, rev = portage.pkgsplit(pkg.cpv)
package = options['data'].strip()
output.einfo("Using FreeCode handler: " + package)
fp = urllib.request.urlopen("http://freecode.com/projects/%s/releases" % package)
content = str(fp.read())
result = re.findall(
r'<a href="/projects/%s/releases/(\d+)">([^<]+)</a>' % package,
content
)
ret = []
for release_id, up_pv in result:
pv = mangling.mangle_version(up_pv, options)
if helpers.version_filtered(cp, ver, pv):
continue
fp = urllib.request.urlopen("http://freecode.com/projects/%s/releases/%s" %
(package, release_id))
content = str(fp.read())
download_page = re.findall(r'<a href="(/urls/[^"]+)"', content)[0]
fp = urllib.request.urlopen("http://freecode.com%s" % download_page)
content = str(fp.read())
url = re.findall(
r'In case it doesn\'t, click here: <a href="([^"]+)"',
content
)[0]
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,276 @@
from urllib.parse import urljoin, urlparse
import urllib.request, urllib.error, urllib.parse
import re
import io
import difflib
try:
from BeautifulSoup import BeautifulSoup
except ImportError:
from bs4 import BeautifulSoup
import portage
from euscan import output, helpers, mangling, CONFIG, SCANDIR_BLACKLIST_URLS, \
BRUTEFORCE_BLACKLIST_PACKAGES, BRUTEFORCE_BLACKLIST_URLS
HANDLER_NAME = "generic"
CONFIDENCE = 45
PRIORITY = 0
BRUTEFORCE_HANDLER_NAME = "brute_force"
BRUTEFORCE_CONFIDENCE = 30
def confidence_score(found, original, minimum=CONFIDENCE):
found_p = urlparse(found)
original_p = urlparse(original)
# check if the base url is the same
if found_p.netloc != original_p.netloc:
return minimum
# check if the directory depth is the same
if len(found_p.path.split("/")) != len(original_p.path.split("/")):
return minimum
# strip numbers
found_path = re.sub(r"[\d+\.]?", "", found_p.path)
original_path = re.sub(r"[\d+\.]?", "", original_p.path)
# strip the first equal part of the path
i = 0
max_i = len(found_path)
while i < max_i and found_path[i] == original_path[i]:
i += 1
found_path = found_path[i:]
original_path = original_path[i:]
# calculate difference ratio
diff = difflib.SequenceMatcher(None, found_path, original_path).ratio()
return int(minimum + minimum * diff) # maximum score is minimum * 2
def scan_html(data, url, pattern):
soup = BeautifulSoup(data, features="lxml")
results = []
for link in soup.findAll('a'):
href = link.get("href")
if not href:
continue
if href.startswith(url):
href = href.replace(url, "", 1)
match = re.search(pattern, href, re.I)
if match:
results.append(
(".".join([x for x in match.groups() if x is not None]),
match.group(0))
)
return results
def scan_ftp(data, url, pattern):
buf = io.StringIO(data)
results = []
for line in buf.readlines():
line = line.replace("\n", "").replace("\r", "")
match = re.search(pattern, line, re.I)
if match:
results.append(
(".".join([x for x in match.groups() if x is not None]),
match.group(0))
)
return results
def scan_directory_recursive(cp, ver, rev, url, steps, orig_url, options):
if not steps:
return []
url += steps[0][0]
pattern = steps[0][1]
steps = steps[1:]
output.einfo("Scanning: %s" % url)
try:
fp = helpers.urlopen(url)
except urllib.error.URLError:
return []
except IOError:
return []
if not fp:
return []
data = fp.read()
results = []
if re.search(b"<\s*a\s+[^>]*href", data, re.I):
results.extend(scan_html(data, url, pattern))
elif url.startswith('ftp://'):
results.extend(scan_ftp(data, url, pattern))
versions = []
for up_pv, path in results:
pv = mangling.mangle_version(up_pv, options)
if helpers.version_filtered(cp, ver, pv):
continue
if not url.endswith("/"):
url = url + "/"
path = urljoin(url, path)
if not steps and path not in orig_url:
confidence = confidence_score(path, orig_url)
path = mangling.mangle_url(path, options)
versions.append((path, pv, HANDLER_NAME, confidence))
if steps:
ret = scan_directory_recursive(cp, ver, rev, path, steps, orig_url,
options)
versions.extend(ret)
return versions
def scan_url(pkg, url, options):
if CONFIG["scan-dir"]:
for bu in SCANDIR_BLACKLIST_URLS:
if re.match(bu, url):
output.einfo("%s is blacklisted by rule %s" % (url, bu))
return []
resolved_url = helpers.parse_mirror(url)
if not resolved_url:
return []
cp, ver, rev = portage.pkgsplit(pkg.cpv)
# 'Hack' for _beta/_rc versions where _ is used instead of -
if ver not in resolved_url:
newver = helpers.version_change_end_sep(ver)
if newver and newver in resolved_url:
output.einfo(
"Version: using %s instead of %s" % (newver, ver)
)
ver = newver
template = helpers.template_from_url(resolved_url, ver)
if '${' not in template:
output.einfo(
"Url doesn't seems to depend on version: %s not found in %s" %
(ver, resolved_url)
)
return []
else:
output.einfo("Scanning: %s" % template)
steps = helpers.generate_scan_paths(template)
ret = scan_directory_recursive(cp, ver, rev, "", steps, url, options)
if not ret:
ret = brute_force(pkg, url)
return ret
def brute_force(pkg, url):
if CONFIG["brute-force"] == 0:
return []
cp, ver, rev = portage.pkgsplit(pkg.cpv)
url = helpers.parse_mirror(url)
if not url:
return []
for bp in BRUTEFORCE_BLACKLIST_PACKAGES:
if re.match(bp, cp):
output.einfo("%s is blacklisted by rule %s" % (cp, bp))
return []
for bp in BRUTEFORCE_BLACKLIST_URLS:
if re.match(bp, url):
output.einfo("%s is blacklisted by rule %s" % (cp, bp))
return []
output.einfo("Generating version from " + ver)
components = helpers.split_version(ver)
versions = helpers.gen_versions(components, CONFIG["brute-force"])
# Remove unwanted versions
for v in versions:
if helpers.vercmp(cp, ver, helpers.join_version(v)) >= 0:
versions.remove(v)
if not versions:
output.einfo("Can't generate new versions from " + ver)
return []
template = helpers.template_from_url(url, ver)
if '${PV}' not in template:
output.einfo(
"Url doesn't seems to depend on full version: %s not found in %s" %
(ver, url))
return []
else:
output.einfo("Brute forcing: %s" % template)
result = []
i = 0
done = []
while i < len(versions):
components = versions[i]
i += 1
if components in done:
continue
done.append(tuple(components))
version = helpers.join_version(components)
if helpers.version_filtered(cp, ver, version):
continue
try_url = helpers.url_from_template(template, version)
infos = helpers.tryurl(try_url, template)
if not infos:
continue
confidence = confidence_score(try_url, url,
minimum=BRUTEFORCE_CONFIDENCE)
result.append([try_url, version, BRUTEFORCE_HANDLER_NAME, confidence])
if len(result) > CONFIG['brute-force-false-watermark']:
output.einfo(
"Broken server detected ! Skipping brute force."
)
return []
if CONFIG["brute-force-recursive"]:
for v in helpers.gen_versions(list(components),
CONFIG["brute-force"]):
if v not in versions and tuple(v) not in done:
versions.append(v)
if CONFIG["oneshot"]:
break
return result
def can_handle(pkg, url):
return True

View File

@ -0,0 +1,59 @@
import json
import urllib.request, urllib.error, urllib.parse
import re
import portage
from euscan import helpers, output, mangling
HANDLER_NAME = "github"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return url and url.startswith('mirror://github/')
def guess_package(cp, url):
match = re.search('^mirror://github/(.*?)/(.*?)/(.*)$', url)
assert(match)
return (match.group(1), match.group(2), match.group(3))
def scan_url(pkg, url, options):
'http://developer.github.com/v3/repos/downloads/'
user, project, filename = guess_package(pkg.cpv, url)
# find out where version is expected to be found
cp, ver, rev = portage.pkgsplit(pkg.cpv)
if ver not in filename:
return
# now create a filename-matching regexp
# XXX: supposedly replace first with (?P<foo>...)
# and remaining ones with (?P=foo)
fnre = re.compile('^%s$' % \
re.escape(filename).replace(re.escape(ver), '(.*?)'))
output.einfo("Using github API for: project=%s user=%s filename=%s" % \
(project, user, filename))
dlreq = urllib.request.urlopen('https://api.github.com/repos/%s/%s/downloads' % \
(user, project))
dls = json.load(dlreq)
ret = []
for dl in dls:
m = fnre.match(dl['name'])
if m:
pv = mangling.mangle_version(m.group(1), options)
if helpers.version_filtered(cp, ver, pv):
continue
url = mangling.mangle_url(dl['html_url'], options)
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
import re
import urllib.request, urllib.error, urllib.parse
try:
import simplejson as json
except ImportError:
import json
import portage
from euscan import mangling, helpers, output
HANDLER_NAME = "gnome"
CONFIDENCE = 100
PRIORITY = 90
GNOME_URL_SOURCE = 'http://ftp.gnome.org/pub/GNOME/sources'
def can_handle(_pkg, url=None):
return url and url.startswith('mirror://gnome/')
def guess_package(cp, url):
match = re.search('mirror://gnome/sources/([^/]+)/.*', url)
if match:
return match.group(1)
_cat, pkg = cp.split("/")
return pkg
def scan_url(pkg, url, options):
'http://ftp.gnome.org/pub/GNOME/sources/'
package = {
'data': guess_package(pkg.cpv, url),
'type': 'gnome',
}
return scan_pkg(pkg, package)
def scan_pkg(pkg, options):
package = options['data']
output.einfo("Using Gnome json cache: " + package)
fp = urllib.request.urlopen('/'.join([GNOME_URL_SOURCE, package, 'cache.json']))
content = fp.read()
fp.close()
cache = json.loads(content, encoding='ascii')
if cache[0] != 4:
output.eerror('Unknow cache format detected')
return []
versions = cache[2][package]
if not versions:
return []
versions.reverse()
cp, ver, _rev = portage.pkgsplit(pkg.cpv)
ret = []
for up_pv in versions:
pv = mangling.mangle_version(up_pv, options)
if helpers.version_filtered(cp, ver, pv):
continue
up_files = cache[1][package][up_pv]
for tarball_comp in ('tar.xz', 'tar.bz2', 'tar.gz'):
if tarball_comp in up_files:
url = '/'.join([GNOME_URL_SOURCE, package,
up_files[tarball_comp]])
break
else:
output.ewarn('No tarball for release %s' % up_pv)
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,43 @@
import re
import portage
from euscan import output
from euscan.helpers import regex_from_template
from euscan.handlers.url import process_scan as url_scan
HANDLER_NAME = "google-code"
CONFIDENCE = 90
PRIORITY = 90
package_name_regex = r"http://(.+).googlecode.com/files/.+"
def can_handle(pkg, url=None):
if not url:
return False
cp, ver, rev = portage.pkgsplit(pkg.cpv)
if ver not in url:
return False
return re.match(package_name_regex, url)
def scan_url(pkg, url, options):
output.einfo("Using Google Code handler")
cp, ver, rev = portage.pkgsplit(pkg.cpv)
package_name = re.match(package_name_regex, url).group(1)
base_url = "http://code.google.com/p/%s/downloads/list" % package_name
file_pattern = regex_from_template(
url.split("/")[-1].replace(ver, "${PV}")
)
result = url_scan(pkg, base_url, file_pattern)
ret = []
for url, pv, _, _ in result:
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,38 @@
from euscan.handlers import generic
PRIORITY = 90
HANDLER_NAME = "kde"
def can_handle(pkg, url):
return url and url.startswith('mirror://kde/')
def clean_results(results):
ret = []
for path, version, _, confidence in results:
if version == '5SUMS':
continue
ret.append((path, version, HANDLER_NAME, confidence))
return ret
def scan_url(pkg, url, options):
results = generic.scan(pkg.cpv, url)
if generic.startswith('mirror://kde/unstable/'):
url = generic.replace('mirror://kde/unstable/', 'mirror://kde/stable/')
results += generic.scan(pkg.cpv, url)
if not results: # if nothing was found go brute forcing
results = generic.brute_force(pkg.cpv, url)
if generic.startswith('mirror://kde/unstable/'):
url = generic.replace('mirror://kde/unstable/',
'mirror://kde/stable/')
results += generic.brute_force(pkg.cpv, url)
return clean_results(results)

View File

@ -0,0 +1,12 @@
from euscan.handlers import php
HANDLER_NAME = "pear"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return url and url.startswith('http://%s.php.net/get/' % HANDLER_NAME)
scan_url = php.scan_url
scan_pkg = php.scan_pkg

View File

@ -0,0 +1,11 @@
from euscan.handlers import php
HANDLER_NAME = "pecl"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return url and url.startswith('http://%s.php.net/get/' % HANDLER_NAME)
scan_url = php.scan_url
scan_pkg = php.scan_pkg

View File

@ -0,0 +1,69 @@
import re
import portage
import urllib.request, urllib.error, urllib.parse
import xml.dom.minidom
from euscan import helpers, output, mangling
HANDLER_NAME = "php"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return False
def guess_package_and_channel(cp, url):
match = re.search('http://(.*)\.php\.net/get/(.*)-(.*).tgz', url)
if match:
host = match.group(1)
pkg = match.group(2)
else:
cat, pkg = cp.split("/")
return pkg, host
def scan_url(pkg, url, options):
package, channel = guess_package_and_channel(pkg.cp, url)
return scan_pkg(pkg, {'type' : channel, 'data' : package })
def scan_pkg(pkg, options):
cp, ver, rev = pkg.cp, pkg.version, pkg.revision
package = options['data']
channel = options['type']
url = 'http://%s.php.net/rest/r/%s/allreleases.xml' % (channel, package.lower())
output.einfo("Using: " + url)
try:
fp = helpers.urlopen(url)
except urllib.error.URLError:
return []
except IOError:
return []
if not fp:
return []
data = fp.read()
dom = xml.dom.minidom.parseString(data)
nodes = dom.getElementsByTagName("v")
ret = []
for node in nodes:
up_pv = node.childNodes[0].data
pv = mangling.mangle_version(up_pv, options)
if helpers.version_filtered(cp, ver, pv):
continue
url = 'http://%s.php.net/get/%s-%s.tgz' % (channel, package, up_pv)
url = mangling.mangle_url(url, options)
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,58 @@
import xmlrpc.client
import re
import portage
from euscan import mangling, helpers, output
HANDLER_NAME = "pypi"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return url and url.startswith('mirror://pypi/')
def guess_package(cp, url):
match = re.search('mirror://pypi/\w+/(.*)/.*', url)
if match:
return match.group(1)
cat, pkg = cp.split("/")
return pkg
def scan_url(pkg, url, options):
'http://wiki.python.org/moin/PyPiXmlRpc'
package = guess_package(pkg.cpv, url)
return scan_pkg(pkg, {'data': package})
def scan_pkg(pkg, options):
package = options['data']
output.einfo("Using PyPi XMLRPC: " + package)
client = xmlrpc.client.ServerProxy('https://pypi.python.org/pypi')
versions = client.package_releases(package)
if not versions:
return versions
versions.reverse()
cp, ver, rev = portage.pkgsplit(pkg.cpv)
ret = []
for up_pv in versions:
pv = mangling.mangle_version(up_pv, options)
if helpers.version_filtered(cp, ver, pv):
continue
urls = client.release_urls(package, up_pv)
urls = " ".join([mangling.mangle_url(infos['url'], options)
for infos in urls])
ret.append((urls, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,75 @@
import re
import portage
import json
import urllib.request, urllib.error, urllib.parse
from euscan import helpers, output, mangling
HANDLER_NAME = "rubygems"
CONFIDENCE = 100
PRIORITY = 90
def can_handle(pkg, url=None):
return url and url.startswith('mirror://rubygems/')
def guess_gem(cpv, url):
match = re.search('mirror://rubygems/(.*).gem', url)
if match:
cpv = 'fake/%s' % match.group(1)
ret = portage.pkgsplit(cpv)
if not ret:
return None
cp, ver, rev = ret
cat, pkg = cp.split("/")
return pkg
def scan_url(pkg, url, options):
'http://guides.rubygems.org/rubygems-org-api/#gemversion'
gem = guess_gem(pkg.cpv, url)
if not gem:
output.eerror("Can't guess gem name using %s and %s" % \
(pkg.cpv, url))
return []
output.einfo("Using RubyGem API: %s" % gem)
return scan_pkg(pkg, {'data': gem})
def scan_pkg(pkg, options):
gem = options['data']
url = 'http://rubygems.org/api/v1/versions/%s.json' % gem
try:
fp = helpers.urlopen(url)
except urllib.error.URLError:
return []
except IOError:
return []
if not fp:
return []
data = fp.read()
versions = json.loads(data)
cp, ver, rev = portage.pkgsplit(pkg.cpv)
ret = []
for version in versions:
up_pv = version['number']
pv = mangling.mangle_version(up_pv, options)
if helpers.version_filtered(cp, ver, pv):
continue
url = 'http://rubygems.org/gems/%s-%s.gem' % (gem, up_pv)
url = mangling.mangle_url(url, options)
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

View File

@ -0,0 +1,45 @@
import re
import portage
from euscan.helpers import regex_from_template
from euscan.handlers.url import process_scan as url_scan
from euscan import output
HANDLER_NAME = "sourceforge"
CONFIDENCE = 90
PRIORITY = 90
def can_handle(pkg, url=None):
if not url:
return False
cp, ver, rev = portage.pkgsplit(pkg.cpv)
if ver not in url:
return False
return "mirror://sourceforge/" in url
def scan_url(pkg, url, options):
output.einfo("Using SourceForge handler")
cp, ver, rev = portage.pkgsplit(pkg.cpv)
project, filename = re.search(
"mirror://sourceforge/([^/]+)/(?:.*/)?([^/]+)",
url
).groups()
base_url = "http://qa.debian.org/watch/sf.php/%s" % project
file_pattern = regex_from_template(
filename.replace(ver, "${PV}")
)
result = url_scan(pkg, base_url, file_pattern)
ret = []
for url, pv, _, _ in result:
ret.append((url, pv, HANDLER_NAME, CONFIDENCE))
return ret

104
src/euscan/handlers/url.py Normal file
View File

@ -0,0 +1,104 @@
import re
import urllib.request, urllib.error, urllib.parse
from euscan.handlers import generic
from euscan import output, helpers
PRIORITY = 100
HANDLER_NAME = "url"
CONFIDENCE = 100.0
is_pattern = r"\([^\/]+\)"
def can_handle(*args):
return False
def handle_directory_patterns(base, file_pattern):
"""
Directory pattern matching
e.g.: base: ftp://ftp.nessus.org/pub/nessus/nessus-([\d\.]+)/src/
file_pattern: nessus-core-([\d\.]+)\.tar\.gz
"""
splitted = base.split("/")
i = 0
basedir = []
for elem in splitted:
if re.search(is_pattern, elem):
break
basedir.append(elem)
i += 1
basedir = "/".join(basedir)
directory_pattern = splitted[i]
final = "/".join(splitted[i + 1:])
try:
fp = helpers.urlopen(basedir)
except urllib.error.URLError:
return []
except IOError:
return []
if not fp:
return []
data = fp.read()
if basedir.startswith("ftp://"):
scan_data = generic.scan_ftp(data, basedir, directory_pattern)
else:
scan_data = generic.scan_html(data, basedir, directory_pattern)
return [("/".join((basedir, path, final)), file_pattern)
for _, path in scan_data]
def read_options(options):
try:
base, file_pattern = options['data'].split(" ")[:2]
except ValueError:
base, file_pattern = options['data'], None
# the file pattern can be in the base url
pattern_regex = r"/([^/]*\([^/]*\)[^/]*)$"
match = re.search(pattern_regex, base)
if match:
file_pattern = match.group(1)
base = base.replace(file_pattern, "")
# handle sf.net specially
base = base.replace(
"http://sf.net/", "http://qa.debian.org/watch/sf.php/"
)
return base, file_pattern
def process_scan(pkg, base, file_pattern, options=None):
if options is None:
options = {}
cp, ver, rev = pkg.cp, pkg.version, pkg.revision
results = []
if not re.search(is_pattern, base):
steps = [(base, file_pattern)]
results = generic.scan_directory_recursive(
cp, ver, rev, "", steps, base, options
)
else:
for step in handle_directory_patterns(base, file_pattern):
results += generic.scan_directory_recursive(
cp, ver, rev, "", [step], base, options
)
return results
def scan_pkg(pkg, options):
output.einfo("Using watch data")
base, file_pattern = read_options(options)
return process_scan(pkg, base, file_pattern, options)

487
src/euscan/helpers.py Normal file
View File

@ -0,0 +1,487 @@
import os
import re
import errno
import urllib.request, urllib.error, urllib.parse
from xml.dom.minidom import Document
import portage
from portage import dep
try:
from urllib import robotparser
from urllib import urlparse
except ImportError:
import urllib.robotparser
import urllib.parse
import euscan
from euscan import CONFIG, BLACKLIST_VERSIONS, ROBOTS_TXT_BLACKLIST_DOMAINS
from euscan.version import parse_version
def htop_vercmp(a, b):
def fixver(v):
if v in ['0.11', '0.12', '0.13']:
v = '0.1.' + v[3:]
return v
return simple_vercmp(fixver(a), fixver(b))
VERSION_CMP_PACKAGE_QUIRKS = {
'sys-process/htop': htop_vercmp
}
_v_end = r'(?:(?:-|_)(?:pre|p|beta|b|alpha|a|rc|r)\d*)'
_v = r'((?:\d+)(?:(?:\.\d+)*)(?:[a-zA-Z]*?)(?:' + _v_end + '*))'
def cast_int_components(version):
for i, obj in enumerate(version):
try:
version[i] = int(obj)
except ValueError:
pass
return version
def simple_vercmp(a, b):
if a == b:
return 0
# For sane versions
r = portage.versions.vercmp(a, b)
if r is not None:
return r
# Fallback
a = parse_version(a)
b = parse_version(b)
if a < b:
return -1
else:
return 1
def vercmp(package, a, b):
if package in VERSION_CMP_PACKAGE_QUIRKS:
return VERSION_CMP_PACKAGE_QUIRKS[package](a, b)
return simple_vercmp(a, b)
def version_is_nightly(a, b):
a = parse_version(a)
b = parse_version(b)
# Try to skip nightly builds when not wanted (www-apps/moodle)
if len(a) != len(b) and len(b) == 2 and len(b[0]) == len('yyyymmdd'):
if b[0][:4] != '0000':
return True
return False
def version_blacklisted(cp, version):
rule = None
cpv = '%s-%s' % (cp, version)
# Check that the generated cpv can be used by portage
if not portage.versions.catpkgsplit(cpv):
return False
for bv in BLACKLIST_VERSIONS:
if dep.match_from_list(bv, [cpv]):
rule = bv
None
if rule:
euscan.output.einfo("%s is blacklisted by rule %s" % (cpv, rule))
return rule is not None
def version_change_end_sep(version):
match = re.match(r".*(%s)" % _v_end, version)
if not match:
return None
end = match.group(1)
if end[0] == '_':
newend = end.replace('_', '-')
elif end[0] == '-':
newend = end.replace('-', '_')
else:
return None
return version.replace(end, newend)
def version_filtered(cp, base, version, vercmp=vercmp):
if vercmp(cp, base, version) >= 0:
return True
if version_blacklisted(cp, version):
return True
if version_is_nightly(base, version):
return True
return False
def generate_templates_vars(version):
ret = []
part = split_version(version)
for i in range(2, len(part)):
ver = []
var = []
for j in range(i):
ver.append(str(part[j]))
var.append('${%d}' % j)
ret.append((".".join(ver), ".".join(var)))
ret.append((version, '${PV}'))
ret.reverse()
return ret
def template_from_url(url, version):
prefix, chunks = url.split('://')
chunks = chunks.split('/')
for i in range(len(chunks)):
chunk = chunks[i]
subs = generate_templates_vars(version)
for sub in subs:
chunk = chunk.replace(sub[0], sub[1])
chunks[i] = chunk
return prefix + "://" + "/".join(chunks)
def url_from_template(url, version):
components = split_version(version)
url = url.replace('${PV}', version)
for i in range(len(components)):
url = url.replace('${%d}' % i, str(components[i]))
return url
# Stolen from distutils.LooseVersion
# Used for brute force to increment the version
def split_version(version):
component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE)
components = [x for x in component_re.split(version) if x and x != '.']
for i in range(len(components)):
try:
components[i] = int(components[i])
except ValueError:
pass
return components
def join_version(components):
version = ""
for i in range(len(components)):
version += str(components[i])
if i >= len(components) - 1:
break
if type(components[i]) != str and type(components[i + 1]) != str:
version += "."
return version
def increment_version(components, level):
n = len(components)
if level > n - 1 or level < 0:
raise Exception
for i in range(n, level + 1, -1):
if type(components[i - 1]) == int:
components[i - 1] = 0
if type(components[level]) == int:
components[level] += 1
return components
def gen_versions(components, level):
n = len(components)
depth = level
level = min(level, n)
if not n:
return []
versions = []
for i in range(n, n - level, -1):
increment_version(components, i - 1)
for j in range(depth):
versions.append(list(components))
increment_version(components, i - 1)
return versions
def timeout_for_url(url):
if 'sourceforge' in url:
timeout = 15
else:
timeout = 5
return timeout
class HeadRequest(urllib.request.Request):
def get_method(self):
return "HEAD"
# RobotParser cache
rpcache = {}
def urlallowed(url):
if CONFIG['skip-robots-txt']:
return True
protocol, domain = urllib.parse.urlparse(url)[:2]
for bd in ROBOTS_TXT_BLACKLIST_DOMAINS:
if re.match(bd, domain):
return True
for d in ['sourceforge', 'berlios', 'github.com']:
if d in domain:
return True
if protocol == 'ftp':
return True
baseurl = '%s://%s' % (protocol, domain)
robotsurl = urllib.parse.urljoin(baseurl, 'robots.txt')
if baseurl in rpcache:
rp = rpcache[baseurl]
else:
from socket import setdefaulttimeout, getdefaulttimeout
timeout = getdefaulttimeout()
setdefaulttimeout(5)
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robotsurl)
try:
rp.read()
rpcache[baseurl] = rp
except:
rp = None
setdefaulttimeout(timeout)
return rp.can_fetch(CONFIG['user-agent'], url) if rp else True
def urlopen(url, timeout=None, verb="GET"):
if not urlallowed(url):
euscan.output.einfo("Url '%s' blocked by robots.txt" % url)
return None
if not timeout:
timeout = timeout_for_url(url)
if verb == 'GET':
request = urllib.request.Request(url)
elif verb == 'HEAD':
request = HeadRequest(url)
else:
return None
request.add_header('User-Agent', CONFIG['user-agent'])
handlers = []
if CONFIG['cache']:
from cache import CacheHandler
handlers.append(CacheHandler(CONFIG['cache']))
if CONFIG['verbose']:
debuglevel = CONFIG['verbose'] - 1
handlers.append(urllib.request.HTTPHandler(debuglevel=debuglevel))
opener = urllib.request.build_opener(*handlers)
return opener.open(request, None, timeout)
def tryurl(fileurl, template):
result = True
if not urlallowed(fileurl):
euscan.output.einfo("Url '%s' blocked by robots.txt" % fileurl)
return None
euscan.output.ebegin("Trying: " + fileurl)
try:
basename = os.path.basename(fileurl)
fp = urlopen(fileurl, verb='HEAD')
if not fp:
euscan.output.eend(errno.EPERM)
return None
headers = fp.info()
# Some URLs return Content-disposition with different filename
# Disable check for now (I have no seen false positives)
#if 'Content-disposition' in headers and \
# basename not in headers['Content-disposition']:
# result = None
if 'Content-Length' in headers and headers['Content-Length'] == '0':
result = None
elif 'Content-Type' in headers and \
'text/html' in headers['Content-Type']:
result = None
elif 'Content-Type' in headers and \
'application/x-httpd-php' in headers['Content-Type']:
result = None
elif fp.geturl() != fileurl:
regex = regex_from_template(template)
baseregex = regex_from_template(os.path.basename(template))
basename2 = os.path.basename(fp.geturl())
# Redirect to another (earlier?) version
if basename != basename2 and (re.match(regex, fp.geturl()) or \
re.match(baseregex, basename2)):
result = None
if result:
result = (fp.geturl(), fp.info())
except urllib.error.URLError:
result = None
except IOError:
result = None
euscan.output.eend(errno.ENOENT if not result else 0)
return result
def regex_from_template(template):
# Escape
regexp = re.escape(template)
# Unescape specific stuff
regexp = regexp.replace('\$\{', '${')
regexp = regexp.replace('\}', '}')
regexp = regexp.replace('}\.$', '}.$')
# Replace ${\d+}
#regexp = regexp.replace('${0}', r'([\d]+?)')
regexp = re.sub(r'(\$\{\d+\}(\.?))+', r'([\\w\.]+?)', regexp)
#regexp = re.sub(r'(\$\{\d+\}\.?)+', r'([\w]+?)', regexp)
#regexp = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', regexp)
#regexp = re.sub(r'(\$\{\d+\})+', '(.+?)', regexp)
# Full version
regexp = regexp.replace('${PV}', _v)
# End
regexp = regexp + r'/?$'
return regexp
def basedir_from_template(template):
idx = template.find('${')
if idx == -1:
return template
idx = template[0:idx].rfind('/')
if idx == -1:
return ""
return template[0:idx]
def generate_scan_paths(url):
prefix, chunks = url.split('://')
chunks = chunks.split('/')
steps = []
path = prefix + ":/"
for chunk in chunks:
if '${' in chunk:
steps.append((path, '^(?:|.*/)' + regex_from_template(chunk)))
path = ""
else:
path += "/"
path += chunk
return steps
def parse_mirror(uri):
from random import shuffle
mirrors = portage.settings.thirdpartymirrors()
if not uri.startswith("mirror://"):
return uri
eidx = uri.find("/", 9)
if eidx == -1:
euscan.output.einfo("Invalid mirror definition in SRC_URI:\n")
euscan.output.einfo(" %s\n" % (uri))
return None
mirrorname = uri[9:eidx]
path = uri[eidx + 1:]
if mirrorname in mirrors:
mirrors = mirrors[mirrorname]
shuffle(mirrors)
uri = mirrors[0].strip("/") + "/" + path
else:
euscan.output.einfo("No known mirror by the name: %s" % (mirrorname))
return None
return uri
def dict_to_xml(data, indent):
doc = Document()
root = doc.createElement("euscan")
doc.appendChild(root)
def _set_value(parent, value):
if isinstance(value, dict):
for k, v in list(value.items()):
node = doc.createElement(k)
_set_value(node, v)
parent.appendChild(node)
elif isinstance(value, list):
for item in value:
node = doc.createElement("value")
text = doc.createTextNode(item)
node.appendChild(text)
parent.appendChild(node)
else:
text = doc.createTextNode(str(value))
parent.appendChild(text)
for key, value in list(data.items()):
node = doc.createElement("package")
node.setAttribute("name", key)
_set_value(node, value)
root.appendChild(node)
return doc.toprettyxml(indent=" " * indent)

171
src/euscan/mangling.py Normal file
View File

@ -0,0 +1,171 @@
import re
import euscan.handlers
def apply_mangling_rule(mangle, string):
# convert regex from perl format to python format
# there are some regex in this format: s/pattern/replacement/
m = re.match(r"s/(.*[^\\])/(.*)/", mangle)
if not m:
# or in this format s|pattern|replacement|
m = re.match(r"s\|(.*[^\\])\|(.*)\|", mangle)
if not m: # Not a known regex format
return string
pattern, repl = m.groups()
repl = re.sub(r"\$(\d+)", r"\\\1", repl)
return re.sub(pattern, repl, string)
def apply_mangling_rules(kind, rules, string):
"""
Apply multiple mangling rules (both sed-like and handlers)
in order
"""
if kind not in rules:
return string
for rule in rules[kind]:
ret = None
# First try handlers rules
if rule == 'gentoo' and kind == 'versionmangle':
ret = gentoo_mangle_version(string)
elif kind == 'downloadurlmangle':
ret = euscan.handlers.mangle_url(rule, string)
elif kind == 'versionmangle':
ret = euscan.handlers.mangle_version(rule, string)
if ret is not None: # Use return value as new string if not None
string = ret
else: # Apply sed like rules
string = apply_mangling_rule(rule, string)
return string
def mangle_version(up_pv, options):
# Default rule is gentoo when empty
if 'versionmangle' not in options or not options['versionmangle']:
options['versionmangle'] = ['gentoo']
return apply_mangling_rules('versionmangle', options, up_pv)
def mangle_url(url, options):
return apply_mangling_rules('downloadurlmangle', options, url)
# Stolen from g-pypi
def gentoo_mangle_version(up_pv):
"""Convert PV to MY_PV if needed
:param up_pv: Upstream package version
:type up_pv: string
:returns: pv
:rtype: string
Can't determine PV from upstream's version.
Do our best with some well-known versioning schemes:
* 1.0a1 (1.0_alpha1)
* 1.0-a1 (1.0_alpha1)
* 1.0b1 (1.0_beta1)
* 1.0-b1 (1.0_beta1)
* 1.0-r1234 (1.0_pre1234)
* 1.0dev-r1234 (1.0_pre1234)
* 1.0.dev-r1234 (1.0_pre1234)
* 1.0dev-20091118 (1.0_pre20091118)
Regex match.groups():
* pkgfoo-1.0.dev-r1234
* group 1 pv major (1.0)
* group 2 replace this with portage suffix (.dev-r)
* group 3 suffix version (1234)
The order of the regexes is significant. For instance if you have
.dev-r123, dev-r123 and -r123 you should order your regex's in
that order.
The chronological portage release versions are:
* _alpha
* _beta
* _pre
* _rc
* release
* _p
**Example:**
>>> gentoo_mangle_version('1.0b2')
'1.0_beta2'
.. note::
The number of regex's could have been reduced, but we use four
number of match.groups every time to simplify the code
"""
bad_suffixes = re.compile(
r'((?:[._-]*)(?:dev|devel|final|stable|snapshot)$)', re.I)
revision_suffixes = re.compile(
r'(.*?)([\._-]*(?:r|patch|p)[\._-]*)([0-9]*)$', re.I)
suf_matches = {
'_pre': [
r'(.*?)([\._-]*dev[\._-]*r?)([0-9]+)$',
r'(.*?)([\._-]*(?:pre|preview)[\._-]*)([0-9]*)$',
],
'_alpha': [
r'(.*?)([\._-]*(?:alpha|test)[\._-]*)([0-9]*)$',
r'(.*?)([\._-]*a[\._-]*)([0-9]*)$',
r'(.*[^a-z])(a)([0-9]*)$',
],
'_beta': [
r'(.*?)([\._-]*beta[\._-]*)([0-9]*)$',
r'(.*?)([\._-]*b)([0-9]*)$',
r'(.*[^a-z])(b)([0-9]*)$',
],
'_rc': [
r'(.*?)([\._-]*rc[\._-]*)([0-9]*)$',
r'(.*?)([\._-]*c[\._-]*)([0-9]*)$',
r'(.*[^a-z])(c[\._-]*)([0-9]+)$',
],
}
rs_match = None
pv = up_pv
additional_version = ""
rev_match = revision_suffixes.search(up_pv)
if rev_match:
pv = up_pv = rev_match.group(1)
replace_me = rev_match.group(2)
rev = rev_match.group(3)
additional_version = '_p' + rev
for this_suf in list(suf_matches.keys()):
if rs_match:
break
for regex in suf_matches[this_suf]:
rsuffix_regex = re.compile(regex, re.I)
rs_match = rsuffix_regex.match(up_pv)
if rs_match:
portage_suffix = this_suf
break
if rs_match:
# e.g. 1.0.dev-r1234
major_ver = rs_match.group(1) # 1.0
replace_me = rs_match.group(2) # .dev-r
rev = rs_match.group(3) # 1234
pv = major_ver + portage_suffix + rev
else:
# Single suffixes with no numeric component are simply removed.
match = bad_suffixes.search(up_pv)
if match:
suffix = match.groups()[0]
pv = up_pv[: - (len(suffix))]
pv = pv + additional_version
return pv

249
src/euscan/out.py Normal file
View File

@ -0,0 +1,249 @@
import sys
from io import StringIO
from collections import defaultdict
import json
import signal
import time
import re
import portage
from portage.output import EOutput, TermProgressBar
from gentoolkit import pprinter as pp
from euscan.helpers import dict_to_xml
mirrors_ = None
class ProgressHandler(object):
def __init__(self, progress_bar):
self.curval = 0
self.maxval = 0
self.last_update = 0
self.min_display_latency = 0.2
self.progress_bar = progress_bar
def on_progress(self, maxval=None, increment=1, label=None):
self.maxval = maxval or self.maxval
self.curval += increment
if label:
self.progress_bar.label(label)
cur_time = time.time()
if cur_time - self.last_update >= self.min_display_latency:
self.last_update = cur_time
self.display()
def display(self):
raise NotImplementedError(self)
def progress_bar():
on_progress = None
try:
progress_bar = TermProgressBar(fd=sys.stderr, title="euscan")
except TypeError:
progress_bar = TermProgressBar(title="euscan")
progress_handler = ProgressHandler(progress_bar)
on_progress = progress_handler.on_progress
def display():
progress_bar.set(progress_handler.curval, progress_handler.maxval)
progress_handler.display = display
def sigwinch_handler(signum, frame):
lines, progress_bar.term_columns = portage.output.get_term_size()
signal.signal(signal.SIGWINCH, sigwinch_handler)
yield on_progress
# make sure the final progress is displayed
progress_handler.display()
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
yield None
def clean_colors(string):
if type(string) is str:
string = re.sub("\033\[[0-9;]+m", "", string)
string = re.sub(r"\\u001b\[[0-9;]+m", "", string)
string = re.sub(r"\x1b\[[0-9;]+m", "", string)
return string
def transform_url(config, cpv, url):
if config['mirror']:
url = to_mirror(url)
if config['ebuild-uri']:
url = to_ebuild_uri(cpv, url)
return url
def to_ebuild_uri(cpv, url):
cat, pkg, ver, rev = portage.catpkgsplit(cpv)
p = '%s-%s' % (pkg, ver)
pvr = '%s%s' % (ver, '-%s' % rev if rev != 'r0' else '')
pf = '%s-%s' % (pkg, pvr)
evars = (
(p, 'P'),
(pkg, 'PN'),
(ver, 'PV'),
(rev, 'PR'),
(pvr, 'PVR'),
(pf, 'PF'),
(cat, 'CATEGORY')
)
for src, dst in evars:
url = url.replace(src, '${%s}' % dst)
return url
def load_mirrors():
import random
global mirrors_
if mirrors_ is None:
mirrors_ = portage.settings.thirdpartymirrors()
for mirror_name in mirrors_:
random.shuffle(mirrors_[mirror_name])
def from_mirror(url):
if not url.startswith('mirror://'):
return url
if not mirrors_:
load_mirrors()
for mirror_name in mirrors_:
prefix = 'mirror://' + mirror_name
if url.startswith(prefix):
return url.replace(prefix, mirrors_[mirror_name][0])
return url
def to_mirror(url):
if not mirrors_:
load_mirrors()
for mirror_name in mirrors_:
for mirror_url in mirrors_[mirror_name]:
if url.startswith(mirror_url):
url_part = url.split(mirror_url)[1]
return "mirror://%s%s%s" % (
mirror_name,
"" if url_part.startswith("/") else "/",
url_part
)
return url
class EOutputMem(EOutput):
"""
Override of EOutput, allows to specify an output file for writes
"""
def __init__(self, *args, **kwargs):
super(EOutputMem, self).__init__(*args, **kwargs)
self.out = StringIO()
def getvalue(self):
return self.out.getvalue()
def _write(self, f, msg):
super(EOutputMem, self)._write(self.out, msg)
class EuscanOutput(object):
"""
Class that handles output for euscan
"""
def __init__(self, config):
self.config = config
self.queries = defaultdict(dict)
self.current_query = None
def clean(self):
self.queries = defaultdict(dict)
self.current_query = None
def set_query(self, query):
self.current_query = query
if query is None:
return
if query in self.queries:
return
if self.config["format"]:
output = EOutputMem()
else:
output = EOutput()
self.queries[query] = {
"output": output,
"result": [],
"metadata": {},
}
def get_formatted_output(self, format_=None):
data = {}
for query in self.queries:
data[query] = {
"result": self.queries[query]["result"],
"metadata": self.queries[query]["metadata"],
"messages": self.queries[query]["output"].getvalue(),
}
format_ = format_ or self.config["format"]
if format_.lower() == "json":
return json.dumps(data, indent=self.config["indent"])
elif format_.lower() == "xml":
return dict_to_xml(data, indent=self.config["indent"])
elif format_.lower() == "dict":
return data
else:
raise TypeError("Invalid output format")
def result(self, cp, version, urls, handler, confidence):
from euscan.version import get_version_type
cpv = '%s-%s' % (cp, version)
urls = ' '.join(
transform_url(self.config, cpv, url) for url in urls.split()
)
if self.config['format'] in ['json', 'dict']:
_curr = self.queries[self.current_query]
_curr["result"].append(
{
"version": version,
"urls": urls.split(),
"handler": handler,
"confidence": confidence,
"type": get_version_type(version)
}
)
else:
if not self.config['quiet']:
print("Upstream Version:", pp.number("%s" % version), end=' ')
print(pp.path(" %s" % urls))
else:
print(pp.cpv("%s-%s" % (cp, version)) + ":", pp.path(urls))
def metadata(self, key, value, show=True):
if self.config["format"]:
self.queries[self.current_query]["metadata"][key] = value
elif show:
print("%s: %s" % (key.capitalize(), value))
def __getattr__(self, key):
if not self.config["quiet"] and self.current_query is not None:
output = self.queries[self.current_query]["output"]
return getattr(output, key)
else:
return lambda *x: None

204
src/euscan/scan.py Normal file
View File

@ -0,0 +1,204 @@
import os
import sys
from datetime import datetime
import portage
import gentoolkit.pprinter as pp
from gentoolkit.query import Query
from gentoolkit.package import Package
from euscan import CONFIG, BLACKLIST_PACKAGES
from euscan import handlers, output
from euscan.out import from_mirror
from euscan.helpers import version_blacklisted
from euscan.version import is_version_stable
from euscan.ebuild import package_from_ebuild
def filter_versions(cp, versions):
filtered = {}
for url, version, handler, confidence in versions:
# Try to keep the most specific urls (determinted by the length)
if version in filtered and len(url) < len(filtered[version]):
continue
# Remove blacklisted versions
if version_blacklisted(cp, version):
continue
filtered[version] = {
"url": url,
"handler": handler,
"confidence": confidence
}
return [
(cp, filtered[version]["url"], version, filtered[version]["handler"],
filtered[version]["confidence"])
for version in filtered
]
def parse_src_uri(uris):
ret = {}
uris = uris.split()
uris.reverse()
while uris:
uri = uris.pop()
if '://' not in uri:
continue
if uris and uris[-1] == "->":
uris.pop() # operator
file_ = uris.pop()
else:
file_ = os.path.basename(uri)
if file_ not in ret:
ret[file_] = []
ret[file_].append(uri)
return ret
def reload_gentoolkit():
import gentoolkit
# Not used in recent versions
if not hasattr(gentoolkit.package, 'PORTDB'):
return
PORTDB = portage.db[portage.root]["porttree"].dbapi
if hasattr(gentoolkit.dbapi, 'PORTDB'):
gentoolkit.dbapi.PORTDB = PORTDB
if hasattr(gentoolkit.package, 'PORTDB'):
gentoolkit.package.PORTDB = PORTDB
if hasattr(gentoolkit.query, 'PORTDB'):
gentoolkit.query.PORTDB = PORTDB
def scan_upstream(query, on_progress=None):
"""
Scans the upstream searching new versions for the given query
"""
matches = []
if query.endswith(".ebuild"):
cpv = package_from_ebuild(query)
reload_gentoolkit()
if cpv:
matches = [Package(cpv)]
else:
matches = Query(query).find(
include_masked=True,
in_installed=False,
)
if not matches:
output.ewarn(
pp.warn("No package matching '%s'" % pp.pkgquery(query))
)
return None
matches = sorted(matches)
pkg = matches.pop()
while '9999' in pkg.version and len(matches):
pkg = matches.pop()
if not pkg:
output.ewarn(
pp.warn("Package '%s' only have a dev version (9999)"
% pp.pkgquery(pkg.cp))
)
return None
# useful data only for formatted output
start_time = datetime.now()
output.metadata("datetime", start_time.isoformat(), show=False)
output.metadata("cp", pkg.cp, show=False)
output.metadata("cpv", pkg.cpv, show=False)
if on_progress:
on_progress(increment=10)
if pkg.cp in BLACKLIST_PACKAGES:
output.ewarn(
pp.warn("Package '%s' is blacklisted" % pp.pkgquery(pkg.cp))
)
return None
if not CONFIG['quiet']:
if not CONFIG['format']:
pp.uprint(
" * %s [%s]" % (pp.cpv(pkg.cpv), pp.section(pkg.repo_name()))
)
pp.uprint()
else:
output.metadata("overlay", pp.section(pkg.repo_name()))
ebuild_path = pkg.ebuild_path()
if ebuild_path:
output.metadata(
"ebuild", pp.path(os.path.normpath(ebuild_path))
)
uris, homepage, description = pkg.environment(
('SRC_URI', 'HOMEPAGE', 'DESCRIPTION')
)
output.metadata("repository", pkg.repo_name())
output.metadata("homepage", homepage)
output.metadata("description", description)
else:
uris = pkg.environment('SRC_URI')
cpv = pkg.cpv
uris = parse_src_uri(uris)
uris_expanded = [
from_mirror(uri) if 'mirror://' in uri else uri for uri in uris
]
pkg._uris = uris
pkg._uris_expanded = uris_expanded
versions = handlers.scan(pkg, uris, on_progress)
cp, ver, rev = portage.pkgsplit(pkg.cpv)
result = filter_versions(cp, versions)
if on_progress:
on_progress(increment=10)
# output scan time for formatted output
scan_time = (datetime.now() - start_time).total_seconds()
output.metadata("scan_time", scan_time, show=False)
is_current_version_stable = is_version_stable(ver)
if len(result) > 0:
if not (CONFIG['format'] or CONFIG['quiet']):
print("")
for cp, url, version, handler, confidence in result:
if CONFIG["ignore-pre-release"]:
if not is_version_stable(version):
continue
if CONFIG["ignore-pre-release-if-stable"]:
if is_current_version_stable and \
not is_version_stable(version):
continue
if CONFIG['progress']:
print("", file=sys.stderr)
output.result(cp, version, url, handler, confidence)
return result

89
src/euscan/version.py Normal file
View File

@ -0,0 +1,89 @@
import re
gentoo_unstable = ("alpha", "beta", "pre", "rc")
gentoo_types = ("alpha", "beta", "pre", "rc", "p")
def is_version_type_stable(version_type):
return version_type not in gentoo_unstable
def is_version_stable(version):
return is_version_type_stable(get_version_type(version))
def get_version_type(version):
types = []
if "9999" in version or "99999999" in version:
return "live"
for token in re.findall("[\._-]([a-zA-Z]+)", version):
if token in gentoo_types:
types.append(token)
if types:
return types[0] # TODO: consider returning all types
return "release"
# Stolen from pkg_resources, but importing it is not a good idea
component_re = re.compile(r'(\d+ | [a-z]+ | \.| -)', re.VERBOSE)
replace = \
{'pre': 'c', 'preview': 'c', '-': 'final-', 'rc': 'c', 'dev': '@'}.get
def _parse_version_parts(s):
for part in component_re.split(s):
part = replace(part, part)
if not part or part == '.':
continue
if part[:1] in '0123456789':
yield part.zfill(8) # pad for numeric comparison
else:
yield '*' + part
yield '*final' # ensure that alpha/beta/candidate are before final
def parse_version(s):
"""Convert a version string to a chronologically-sortable key
This is a rough cross between distutils' StrictVersion and LooseVersion;
if you give it versions that would work with StrictVersion, then it behaves
the same; otherwise it acts like a slightly-smarter LooseVersion. It is
*possible* to create pathological version coding schemes that will fool
this parser, but they should be very rare in practice.
The returned value will be a tuple of strings. Numeric portions of the
version are padded to 8 digits so they will compare numerically, but
without relying on how numbers compare relative to strings. Dots are
dropped, but dashes are retained. Trailing zeros between alpha segments
or dashes are suppressed, so that e.g. "2.4.0" is considered the same as
"2.4". Alphanumeric parts are lower-cased.
The algorithm assumes that strings like "-" and any alpha string that
alphabetically follows "final" represents a "patch level". So, "2.4-1"
is assumed to be a branch or patch of "2.4", and therefore "2.4.1" is
considered newer than "2.4-1", which in turn is newer than "2.4".
Strings like "a", "b", "c", "alpha", "beta", "candidate" and so on (that
come before "final" alphabetically) are assumed to be pre-release versions,
so that the version "2.4" is considered newer than "2.4a1".
Finally, to handle miscellaneous cases, the strings "pre", "preview", and
"rc" are treated as if they were "c", i.e. as though they were release
candidates, and therefore are not as new as a version string that does not
contain them, and "dev" is replaced with an '@' so that it sorts lower than
than any other pre-release tag.
"""
parts = []
for part in _parse_version_parts(s.lower()):
if part.startswith('*'):
if part < '*final': # remove '-' before a prerelease tag
while parts and parts[-1] == '*final-':
parts.pop()
# remove trailing zeros from each series of numeric parts
while parts and parts[-1] == '00000000':
parts.pop()
parts.append(part)
return tuple(parts)