euscan-ng/pym/euscan/helpers.py

488 lines
12 KiB
Python
Raw Normal View History

import os
import re
import errno
2019-12-05 17:46:19 +01:00
import urllib.request, urllib.error, urllib.parse
from xml.dom.minidom import Document
import portage
from portage import dep
try:
from urllib import robotparser
from urllib import urlparse
except ImportError:
2019-12-05 17:46:19 +01:00
import urllib.robotparser
import urllib.parse
import euscan
from euscan import CONFIG, BLACKLIST_VERSIONS, ROBOTS_TXT_BLACKLIST_DOMAINS
from euscan.version import parse_version
2012-04-28 18:16:05 +02:00
def htop_vercmp(a, b):
def fixver(v):
if v in ['0.11', '0.12', '0.13']:
v = '0.1.' + v[3:]
return v
return simple_vercmp(fixver(a), fixver(b))
VERSION_CMP_PACKAGE_QUIRKS = {
2012-04-28 18:16:05 +02:00
'sys-process/htop': htop_vercmp
}
_v_end = r'(?:(?:-|_)(?:pre|p|beta|b|alpha|a|rc|r)\d*)'
_v = r'((?:\d+)(?:(?:\.\d+)*)(?:[a-zA-Z]*?)(?:' + _v_end + '*))'
def cast_int_components(version):
for i, obj in enumerate(version):
try:
version[i] = int(obj)
except ValueError:
pass
return version
2012-04-28 18:16:05 +02:00
def simple_vercmp(a, b):
if a == b:
return 0
# For sane versions
r = portage.versions.vercmp(a, b)
if r is not None:
return r
# Fallback
a = parse_version(a)
b = parse_version(b)
if a < b:
return -1
else:
return 1
2012-04-28 18:16:05 +02:00
def vercmp(package, a, b):
if package in VERSION_CMP_PACKAGE_QUIRKS:
return VERSION_CMP_PACKAGE_QUIRKS[package](a, b)
return simple_vercmp(a, b)
2012-04-28 18:16:05 +02:00
def version_is_nightly(a, b):
a = parse_version(a)
b = parse_version(b)
# Try to skip nightly builds when not wanted (www-apps/moodle)
if len(a) != len(b) and len(b) == 2 and len(b[0]) == len('yyyymmdd'):
if b[0][:4] != '0000':
return True
return False
2012-04-28 18:16:05 +02:00
def version_blacklisted(cp, version):
rule = None
cpv = '%s-%s' % (cp, version)
# Check that the generated cpv can be used by portage
if not portage.versions.catpkgsplit(cpv):
return False
for bv in BLACKLIST_VERSIONS:
if dep.match_from_list(bv, [cpv]):
rule = bv
None
if rule:
euscan.output.einfo("%s is blacklisted by rule %s" % (cpv, rule))
return rule is not None
2012-04-28 18:16:05 +02:00
def version_change_end_sep(version):
match = re.match(r".*(%s)" % _v_end, version)
if not match:
return None
end = match.group(1)
if end[0] == '_':
newend = end.replace('_', '-')
elif end[0] == '-':
newend = end.replace('-', '_')
else:
return None
return version.replace(end, newend)
2012-04-28 18:16:05 +02:00
def version_filtered(cp, base, version, vercmp=vercmp):
if vercmp(cp, base, version) >= 0:
return True
if version_blacklisted(cp, version):
return True
if version_is_nightly(base, version):
return True
return False
2012-04-28 18:16:05 +02:00
def generate_templates_vars(version):
ret = []
part = split_version(version)
for i in range(2, len(part)):
ver = []
var = []
for j in range(i):
ver.append(str(part[j]))
var.append('${%d}' % j)
ret.append((".".join(ver), ".".join(var)))
ret.append((version, '${PV}'))
ret.reverse()
return ret
2012-04-28 18:16:05 +02:00
def template_from_url(url, version):
prefix, chunks = url.split('://')
chunks = chunks.split('/')
for i in range(len(chunks)):
chunk = chunks[i]
subs = generate_templates_vars(version)
for sub in subs:
chunk = chunk.replace(sub[0], sub[1])
chunks[i] = chunk
return prefix + "://" + "/".join(chunks)
2012-04-28 18:16:05 +02:00
def url_from_template(url, version):
components = split_version(version)
url = url.replace('${PV}', version)
for i in range(len(components)):
url = url.replace('${%d}' % i, str(components[i]))
return url
2012-04-28 18:16:05 +02:00
# Stolen from distutils.LooseVersion
# Used for brute force to increment the version
def split_version(version):
component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE)
2019-12-05 17:46:19 +01:00
components = [x for x in component_re.split(version) if x and x != '.']
for i in range(len(components)):
try:
components[i] = int(components[i])
except ValueError:
pass
return components
2012-04-28 18:16:05 +02:00
def join_version(components):
version = ""
for i in range(len(components)):
version += str(components[i])
if i >= len(components) - 1:
break
if type(components[i]) != str and type(components[i + 1]) != str:
version += "."
return version
2012-04-28 18:16:05 +02:00
def increment_version(components, level):
n = len(components)
if level > n - 1 or level < 0:
raise Exception
for i in range(n, level + 1, -1):
if type(components[i - 1]) == int:
components[i - 1] = 0
if type(components[level]) == int:
components[level] += 1
return components
2012-04-28 18:16:05 +02:00
def gen_versions(components, level):
n = len(components)
depth = level
level = min(level, n)
if not n:
return []
versions = []
for i in range(n, n - level, -1):
increment_version(components, i - 1)
for j in range(depth):
versions.append(list(components))
increment_version(components, i - 1)
return versions
2012-04-28 18:16:05 +02:00
def timeout_for_url(url):
if 'sourceforge' in url:
timeout = 15
else:
timeout = 5
return timeout
2012-04-28 18:16:05 +02:00
2019-12-05 17:46:19 +01:00
class HeadRequest(urllib.request.Request):
def get_method(self):
return "HEAD"
2012-04-28 18:16:05 +02:00
# RobotParser cache
rpcache = {}
2012-04-28 18:16:05 +02:00
def urlallowed(url):
if CONFIG['skip-robots-txt']:
return True
2019-12-05 17:46:19 +01:00
protocol, domain = urllib.parse.urlparse(url)[:2]
for bd in ROBOTS_TXT_BLACKLIST_DOMAINS:
if re.match(bd, domain):
return True
for d in ['sourceforge', 'berlios', 'github.com']:
if d in domain:
return True
if protocol == 'ftp':
return True
baseurl = '%s://%s' % (protocol, domain)
2019-12-05 17:46:19 +01:00
robotsurl = urllib.parse.urljoin(baseurl, 'robots.txt')
2012-04-28 18:16:05 +02:00
if baseurl in rpcache:
rp = rpcache[baseurl]
else:
from socket import setdefaulttimeout, getdefaulttimeout
timeout = getdefaulttimeout()
setdefaulttimeout(5)
2019-12-05 17:46:19 +01:00
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robotsurl)
try:
rp.read()
rpcache[baseurl] = rp
except:
rp = None
setdefaulttimeout(timeout)
return rp.can_fetch(CONFIG['user-agent'], url) if rp else True
2012-04-28 18:16:05 +02:00
def urlopen(url, timeout=None, verb="GET"):
if not urlallowed(url):
euscan.output.einfo("Url '%s' blocked by robots.txt" % url)
return None
if not timeout:
timeout = timeout_for_url(url)
if verb == 'GET':
2019-12-05 17:46:19 +01:00
request = urllib.request.Request(url)
elif verb == 'HEAD':
request = HeadRequest(url)
else:
return None
request.add_header('User-Agent', CONFIG['user-agent'])
handlers = []
if CONFIG['cache']:
from cache import CacheHandler
handlers.append(CacheHandler(CONFIG['cache']))
if CONFIG['verbose']:
debuglevel = CONFIG['verbose'] - 1
2019-12-05 17:46:19 +01:00
handlers.append(urllib.request.HTTPHandler(debuglevel=debuglevel))
2019-12-05 17:46:19 +01:00
opener = urllib.request.build_opener(*handlers)
return opener.open(request, None, timeout)
2012-04-28 18:16:05 +02:00
def tryurl(fileurl, template):
result = True
if not urlallowed(fileurl):
euscan.output.einfo("Url '%s' blocked by robots.txt" % fileurl)
return None
euscan.output.ebegin("Trying: " + fileurl)
try:
basename = os.path.basename(fileurl)
fp = urlopen(fileurl, verb='HEAD')
if not fp:
euscan.output.eend(errno.EPERM)
return None
headers = fp.info()
# Some URLs return Content-disposition with different filename
# Disable check for now (I have no seen false positives)
#if 'Content-disposition' in headers and \
# basename not in headers['Content-disposition']:
# result = None
if 'Content-Length' in headers and headers['Content-Length'] == '0':
result = None
2012-04-28 18:16:05 +02:00
elif 'Content-Type' in headers and \
'text/html' in headers['Content-Type']:
result = None
2012-04-28 18:16:05 +02:00
elif 'Content-Type' in headers and \
'application/x-httpd-php' in headers['Content-Type']:
result = None
elif fp.geturl() != fileurl:
regex = regex_from_template(template)
baseregex = regex_from_template(os.path.basename(template))
basename2 = os.path.basename(fp.geturl())
# Redirect to another (earlier?) version
2012-04-28 18:16:05 +02:00
if basename != basename2 and (re.match(regex, fp.geturl()) or \
re.match(baseregex, basename2)):
result = None
if result:
result = (fp.geturl(), fp.info())
2019-12-05 17:46:19 +01:00
except urllib.error.URLError:
result = None
except IOError:
result = None
euscan.output.eend(errno.ENOENT if not result else 0)
return result
2012-04-28 18:16:05 +02:00
def regex_from_template(template):
# Escape
regexp = re.escape(template)
# Unescape specific stuff
regexp = regexp.replace('\$\{', '${')
regexp = regexp.replace('\}', '}')
regexp = regexp.replace('}\.$', '}.$')
# Replace ${\d+}
#regexp = regexp.replace('${0}', r'([\d]+?)')
2020-01-14 16:29:58 +01:00
regexp = re.sub(r'(\$\{\d+\}(\.?))+', r'([\\w\.]+?)', regexp)
#regexp = re.sub(r'(\$\{\d+\}\.?)+', r'([\w]+?)', regexp)
#regexp = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', regexp)
#regexp = re.sub(r'(\$\{\d+\})+', '(.+?)', regexp)
# Full version
regexp = regexp.replace('${PV}', _v)
# End
regexp = regexp + r'/?$'
return regexp
2012-04-28 18:16:05 +02:00
def basedir_from_template(template):
idx = template.find('${')
if idx == -1:
return template
idx = template[0:idx].rfind('/')
if idx == -1:
return ""
return template[0:idx]
def generate_scan_paths(url):
prefix, chunks = url.split('://')
chunks = chunks.split('/')
steps = []
path = prefix + ":/"
for chunk in chunks:
if '${' in chunk:
steps.append((path, '^(?:|.*/)' + regex_from_template(chunk)))
path = ""
else:
path += "/"
path += chunk
return steps
def parse_mirror(uri):
from random import shuffle
mirrors = portage.settings.thirdpartymirrors()
if not uri.startswith("mirror://"):
return uri
eidx = uri.find("/", 9)
if eidx == -1:
euscan.output.einfo("Invalid mirror definition in SRC_URI:\n")
euscan.output.einfo(" %s\n" % (uri))
return None
mirrorname = uri[9:eidx]
2012-04-28 18:16:05 +02:00
path = uri[eidx + 1:]
if mirrorname in mirrors:
mirrors = mirrors[mirrorname]
shuffle(mirrors)
uri = mirrors[0].strip("/") + "/" + path
else:
euscan.output.einfo("No known mirror by the name: %s" % (mirrorname))
return None
return uri
def dict_to_xml(data, indent):
doc = Document()
root = doc.createElement("euscan")
doc.appendChild(root)
def _set_value(parent, value):
if isinstance(value, dict):
2019-12-05 17:46:19 +01:00
for k, v in list(value.items()):
node = doc.createElement(k)
_set_value(node, v)
parent.appendChild(node)
elif isinstance(value, list):
for item in value:
node = doc.createElement("value")
text = doc.createTextNode(item)
node.appendChild(text)
parent.appendChild(node)
else:
2019-12-05 17:46:19 +01:00
text = doc.createTextNode(str(value))
parent.appendChild(text)
2019-12-05 17:46:19 +01:00
for key, value in list(data.items()):
node = doc.createElement("package")
node.setAttribute("name", key)
_set_value(node, value)
root.appendChild(node)
return doc.toprettyxml(indent=" " * indent)