euscan-ng/src/euscan/helpers.py

491 lines
12 KiB
Python
Raw Normal View History

import errno
import os
import re
import urllib.error
import urllib.parse
import urllib.request
from xml.dom.minidom import Document
import portage
from portage import dep
try:
from urllib import robotparser, urlparse
except ImportError:
2019-12-05 17:46:19 +01:00
import urllib.robotparser
import urllib.parse
import euscan
from euscan import BLACKLIST_VERSIONS, CONFIG, ROBOTS_TXT_BLACKLIST_DOMAINS
from euscan.version import parse_version
2012-04-28 18:16:05 +02:00
def htop_vercmp(a, b):
def fixver(v):
if v in ["0.11", "0.12", "0.13"]:
v = "0.1." + v[3:]
return v
return simple_vercmp(fixver(a), fixver(b))
VERSION_CMP_PACKAGE_QUIRKS = {"sys-process/htop": htop_vercmp}
_v_end = r"(?:(?:-|_)(?:pre|p|beta|b|alpha|a|rc|r)\d*)"
_v = r"((?:\d+)(?:(?:\.\d+)*)(?:[a-zA-Z]*?)(?:" + _v_end + "*))"
def cast_int_components(version):
for i, obj in enumerate(version):
try:
version[i] = int(obj)
except ValueError:
pass
return version
2012-04-28 18:16:05 +02:00
def simple_vercmp(a, b):
if a == b:
return 0
# For sane versions
r = portage.versions.vercmp(a, b)
if r is not None:
return r
# Fallback
a = parse_version(a)
b = parse_version(b)
if a < b:
return -1
else:
return 1
2012-04-28 18:16:05 +02:00
def vercmp(package, a, b):
if package in VERSION_CMP_PACKAGE_QUIRKS:
return VERSION_CMP_PACKAGE_QUIRKS[package](a, b)
return simple_vercmp(a, b)
2012-04-28 18:16:05 +02:00
def version_is_nightly(a, b):
a = parse_version(a)
b = parse_version(b)
# Try to skip nightly builds when not wanted (www-apps/moodle)
if len(a) != len(b) and len(b) == 2 and len(b[0]) == len("yyyymmdd"):
if b[0][:4] != "0000":
return True
return False
2012-04-28 18:16:05 +02:00
def version_blacklisted(cp, version):
rule = None
cpv = "%s-%s" % (cp, version)
# Check that the generated cpv can be used by portage
if not portage.versions.catpkgsplit(cpv):
return False
for bv in BLACKLIST_VERSIONS:
if dep.match_from_list(bv, [cpv]):
rule = bv
None
if rule:
euscan.output.einfo("%s is blacklisted by rule %s" % (cpv, rule))
return rule is not None
2012-04-28 18:16:05 +02:00
def version_change_end_sep(version):
match = re.match(r".*(%s)" % _v_end, version)
if not match:
return None
end = match.group(1)
if end[0] == "_":
newend = end.replace("_", "-")
elif end[0] == "-":
newend = end.replace("-", "_")
else:
return None
return version.replace(end, newend)
2012-04-28 18:16:05 +02:00
def version_filtered(cp, base, version, vercmp=vercmp):
if vercmp(cp, base, version) >= 0:
return True
if version_blacklisted(cp, version):
return True
if version_is_nightly(base, version):
return True
return False
2012-04-28 18:16:05 +02:00
def generate_templates_vars(version):
ret = []
part = split_version(version)
for i in range(2, len(part)):
ver = []
var = []
for j in range(i):
ver.append(str(part[j]))
var.append("${%d}" % j)
ret.append((".".join(ver), ".".join(var)))
ret.append((version, "${PV}"))
ret.reverse()
return ret
2012-04-28 18:16:05 +02:00
def template_from_url(url, version):
prefix, chunks = url.split("://")
chunks = chunks.split("/")
for i in range(len(chunks)):
chunk = chunks[i]
subs = generate_templates_vars(version)
for sub in subs:
chunk = chunk.replace(sub[0], sub[1])
chunks[i] = chunk
return prefix + "://" + "/".join(chunks)
2012-04-28 18:16:05 +02:00
def url_from_template(url, version):
components = split_version(version)
url = url.replace("${PV}", version)
for i in range(len(components)):
url = url.replace("${%d}" % i, str(components[i]))
return url
2012-04-28 18:16:05 +02:00
# Stolen from distutils.LooseVersion
# Used for brute force to increment the version
def split_version(version):
component_re = re.compile(r"(\d+ | [a-z]+ | \.)", re.VERBOSE)
components = [x for x in component_re.split(version) if x and x != "."]
for i in range(len(components)):
try:
components[i] = int(components[i])
except ValueError:
pass
return components
2012-04-28 18:16:05 +02:00
def join_version(components):
version = ""
for i in range(len(components)):
version += str(components[i])
if i >= len(components) - 1:
break
if type(components[i]) != str and type(components[i + 1]) != str:
version += "."
return version
2012-04-28 18:16:05 +02:00
def increment_version(components, level):
n = len(components)
if level > n - 1 or level < 0:
raise Exception
for i in range(n, level + 1, -1):
if type(components[i - 1]) == int:
components[i - 1] = 0
if type(components[level]) == int:
components[level] += 1
return components
2012-04-28 18:16:05 +02:00
def gen_versions(components, level):
n = len(components)
depth = level
level = min(level, n)
if not n:
return []
versions = []
for i in range(n, n - level, -1):
increment_version(components, i - 1)
for j in range(depth):
versions.append(list(components))
increment_version(components, i - 1)
return versions
2012-04-28 18:16:05 +02:00
def timeout_for_url(url):
if "sourceforge" in url:
timeout = 15
else:
timeout = 5
return timeout
2012-04-28 18:16:05 +02:00
2019-12-05 17:46:19 +01:00
class HeadRequest(urllib.request.Request):
def get_method(self):
return "HEAD"
2012-04-28 18:16:05 +02:00
# RobotParser cache
rpcache = {}
2012-04-28 18:16:05 +02:00
def urlallowed(url):
if CONFIG["skip-robots-txt"]:
return True
2019-12-05 17:46:19 +01:00
protocol, domain = urllib.parse.urlparse(url)[:2]
for bd in ROBOTS_TXT_BLACKLIST_DOMAINS:
if re.match(bd, domain):
return True
for d in ["sourceforge", "berlios", "github.com"]:
if d in domain:
return True
if protocol == "ftp":
return True
baseurl = "%s://%s" % (protocol, domain)
robotsurl = urllib.parse.urljoin(baseurl, "robots.txt")
2012-04-28 18:16:05 +02:00
if baseurl in rpcache:
rp = rpcache[baseurl]
else:
from socket import getdefaulttimeout, setdefaulttimeout
timeout = getdefaulttimeout()
setdefaulttimeout(5)
2019-12-05 17:46:19 +01:00
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robotsurl)
try:
rp.read()
rpcache[baseurl] = rp
except:
rp = None
setdefaulttimeout(timeout)
return rp.can_fetch(CONFIG["user-agent"], url) if rp else True
2012-04-28 18:16:05 +02:00
def urlopen(url, timeout=None, verb="GET"):
if not urlallowed(url):
euscan.output.einfo("Url '%s' blocked by robots.txt" % url)
return None
if not timeout:
timeout = timeout_for_url(url)
if verb == "GET":
2019-12-05 17:46:19 +01:00
request = urllib.request.Request(url)
elif verb == "HEAD":
request = HeadRequest(url)
else:
return None
request.add_header("User-Agent", CONFIG["user-agent"])
handlers = []
if CONFIG["cache"]:
from cache import CacheHandler
handlers.append(CacheHandler(CONFIG["cache"]))
if CONFIG["verbose"]:
debuglevel = CONFIG["verbose"] - 1
2019-12-05 17:46:19 +01:00
handlers.append(urllib.request.HTTPHandler(debuglevel=debuglevel))
2019-12-05 17:46:19 +01:00
opener = urllib.request.build_opener(*handlers)
return opener.open(request, None, timeout)
2012-04-28 18:16:05 +02:00
def tryurl(fileurl, template):
result = True
if not urlallowed(fileurl):
euscan.output.einfo("Url '%s' blocked by robots.txt" % fileurl)
return None
euscan.output.ebegin("Trying: " + fileurl)
try:
basename = os.path.basename(fileurl)
fp = urlopen(fileurl, verb="HEAD")
if not fp:
euscan.output.eend(errno.EPERM)
return None
headers = fp.info()
# Some URLs return Content-disposition with different filename
# Disable check for now (I have no seen false positives)
# if 'Content-disposition' in headers and \
# basename not in headers['Content-disposition']:
# result = None
if "Content-Length" in headers and headers["Content-Length"] == "0":
result = None
elif "Content-Type" in headers and "text/html" in headers["Content-Type"]:
result = None
elif (
"Content-Type" in headers
and "application/x-httpd-php" in headers["Content-Type"]
):
result = None
elif fp.geturl() != fileurl:
regex = regex_from_template(template)
baseregex = regex_from_template(os.path.basename(template))
basename2 = os.path.basename(fp.geturl())
# Redirect to another (earlier?) version
if basename != basename2 and (
re.match(regex, fp.geturl()) or re.match(baseregex, basename2)
):
result = None
if result:
result = (fp.geturl(), fp.info())
2019-12-05 17:46:19 +01:00
except urllib.error.URLError:
result = None
except IOError:
result = None
euscan.output.eend(errno.ENOENT if not result else 0)
return result
2012-04-28 18:16:05 +02:00
def regex_from_template(template):
# Escape
regexp = re.escape(template)
# Unescape specific stuff
regexp = regexp.replace("\$\{", "${")
regexp = regexp.replace("\}", "}")
regexp = regexp.replace("}\.$", "}.$")
# Replace ${\d+}
# regexp = regexp.replace('${0}', r'([\d]+?)')
regexp = re.sub(r"(\$\{\d+\}(\.?))+", r"([\\w\.]+?)", regexp)
# regexp = re.sub(r'(\$\{\d+\}\.?)+', r'([\w]+?)', regexp)
# regexp = re.sub(r'(\$\{\d+\}\.+)+', '(.+?)\.', regexp)
# regexp = re.sub(r'(\$\{\d+\})+', '(.+?)', regexp)
# Full version
regexp = regexp.replace("${PV}", _v)
# End
regexp = regexp + r"/?$"
return regexp
2012-04-28 18:16:05 +02:00
def basedir_from_template(template):
idx = template.find("${")
if idx == -1:
return template
idx = template[0:idx].rfind("/")
if idx == -1:
return ""
return template[0:idx]
def generate_scan_paths(url):
prefix, chunks = url.split("://")
chunks = chunks.split("/")
steps = []
path = prefix + ":/"
for chunk in chunks:
if "${" in chunk:
steps.append((path, "^(?:|.*/)" + regex_from_template(chunk)))
path = ""
else:
path += "/"
path += chunk
return steps
def parse_mirror(uri):
from random import shuffle
mirrors = portage.settings.thirdpartymirrors()
if not uri.startswith("mirror://"):
return uri
eidx = uri.find("/", 9)
if eidx == -1:
euscan.output.einfo("Invalid mirror definition in SRC_URI:\n")
euscan.output.einfo(" %s\n" % (uri))
return None
mirrorname = uri[9:eidx]
path = uri[eidx + 1 :]
if mirrorname in mirrors:
mirrors = mirrors[mirrorname]
shuffle(mirrors)
uri = mirrors[0].strip("/") + "/" + path
else:
euscan.output.einfo("No known mirror by the name: %s" % (mirrorname))
return None
return uri
def dict_to_xml(data, indent):
doc = Document()
root = doc.createElement("euscan")
doc.appendChild(root)
def _set_value(parent, value):
if isinstance(value, dict):
2019-12-05 17:46:19 +01:00
for k, v in list(value.items()):
node = doc.createElement(k)
_set_value(node, v)
parent.appendChild(node)
elif isinstance(value, list):
for item in value:
node = doc.createElement("value")
text = doc.createTextNode(item)
node.appendChild(text)
parent.appendChild(node)
else:
2019-12-05 17:46:19 +01:00
text = doc.createTextNode(str(value))
parent.appendChild(text)
2019-12-05 17:46:19 +01:00
for key, value in list(data.items()):
node = doc.createElement("package")
node.setAttribute("name", key)
_set_value(node, value)
root.appendChild(node)
return doc.toprettyxml(indent=" " * indent)