This commit is contained in:
volpino
2012-07-20 11:58:07 +02:00
33 changed files with 257 additions and 258 deletions
@@ -8,14 +8,12 @@ from xml.etree.ElementTree import iterparse, ParseError
from django.db.transaction import commit_on_success
from django.core.management.color import color_style
from euscan.helpers import get_version_type
from euscan.version import get_version_type
from djeuscan.processing import FakeLogger
from djeuscan.models import Package, Version, VersionLog
PORTDB = portage.db[portage.root]["porttree"].dbapi
PORTDB = None
class ScanPortage(object):
def __init__(self, logger=None, no_log=False, purge_packages=False,
@@ -25,6 +23,10 @@ class ScanPortage(object):
self.purge_packages = purge_packages
self.purge_versions = purge_versions
if not PORTDB: # Lazy loading for portdb
global PORTDB
PORTDB = portage.db[portage.root]["porttree"].dbapi
self.style = color_style()
self._cache = {'packages': {}, 'versions': {}}
@@ -342,9 +344,14 @@ def scan_portage(packages=None, category=None, no_log=False,
if prefetch:
logger.info('Prefetching objects...')
for package in Package.objects.all():
ppackages = Package.objects.all()
pversions = Version.objects.select_related('package').all()
if category:
ppackages = ppackages.filter(category=category)
pversions = pversions.filter(package__category=category)
for package in ppackages:
scan_handler.cache_store_package(package)
for version in Version.objects.select_related('package').all():
for version in pversions:
scan_handler.cache_store_version(version)
logger.info('done')
@@ -153,9 +153,12 @@ def scan_upstream(packages=None, purge_versions=False,
for pkg in packages:
try:
scan_handler.scan('%s/%s' % (pkg.category, pkg.name))
package = '%s/%s' % (pkg.category, pkg.name)
except AttributeError:
scan_handler.scan(pkg)
package = pkg
logger.info('Scanning %s' % package)
scan_handler.scan(package)
scan_handler.purge_old_versions()
+128 -188
View File
@@ -9,10 +9,8 @@ from celery.task import task, group, chord
from django.conf import settings
from djeuscan.models import Package, RefreshPackageQuery
from djeuscan.processing.misc import regen_rrds, update_counters, \
update_portage_trees
from djeuscan.processing.scan import scan_metadata, scan_portage, scan_upstream
from djeuscan.processing import scan, misc
from djeuscan.utils import queryset_iterator
class TaskFailedException(Exception):
"""
@@ -20,103 +18,85 @@ class TaskFailedException(Exception):
"""
pass
def _chunks(it, n):
def group_one(task, seq, *args, **kwargs):
"""
Chunk generator, takes an iterator and the desired size of the chunk
Create a group of tasks, each task handle one element of seq
"""
for first in it:
yield [first] + list(islice(it, n - 1))
tasks = []
for i in seq:
tasks.append(task.subtask(args=[seq[i]] + list(args), kwargs=kwargs))
return group(tasks)
def _run_in_chunks(task, packages, kwargs=None,
concurrently=settings.TASKS_CONCURRENTLY,
n=settings.TASKS_SUBTASK_PACKAGES):
def group_chunks(task, seq, n, *args, **kwargs):
"""
Launches a group at a time with <concurrently> subtasks.
Each subtask has <n> packages to handle
Creates a group of tasks, each subtask has <n> elements to handle
"""
output = []
chunk_generator = _chunks(iter(packages), n)
done = False
while not done:
tasks = []
for _ in range(concurrently):
try:
chunk = chunk_generator.next()
except StopIteration:
done = True
else:
tasks.append(task.subtask((chunk, ), kwargs))
output.extend(group(tasks)())
return output
tasks = []
for i in xrange(0, len(seq), n):
tasks.append(task.subtask(args=[seq[i:i+n]] + list(args), kwargs=kwargs))
return group(tasks)
@task
def regen_rrds_task():
def regen_rrds():
"""
Regenerate RRDs
"""
return regen_rrds()
misc.regen_rrds()
return True
@task
def update_counters_task(fast=True):
def update_counters(fast=False):
"""
Updates counters
"""
return update_counters(fast=fast)
logger = update_counters.get_logger()
logger.info("Updating counters (fast=%s)...", fast)
misc.update_counters(fast=fast)
logger.info("Done")
return True
@task
def _scan_metadata_task(packages):
def scan_metadata(packages=[], category=None):
"""
Scans metadata for the given set of packages
"""
logger = _scan_metadata_task.get_logger()
logger.info("Starting metadata scanning subtask for %d packages...",
len(packages))
logger = scan_metadata.get_logger()
scan_metadata(
if packages:
logger.info("Starting metadata scan for %d packages...",
len(packages))
elif category:
logger.info("Starting metadata scan for %s...",
category)
else:
logger.info("Starting metadata scan...")
scan.scan_metadata(
packages=packages,
category=category,
logger=logger,
)
return True
@task
def scan_metadata_list_task(query):
"""
Runs a parallel metadata scan for packages in the query list (space
separated string). Task used only from the web interface.
"""
return _run_in_chunks(_scan_metadata_task, [p for p in query.split()])
@task
def scan_metadata_all_task():
"""
Runs a parallel metadata scan for all packages
"""
return _run_in_chunks(_scan_metadata_task, Package.objects.all())
@task
def _scan_portage_task(packages, category=None, no_log=False,
purge_packages=False, purge_versions=False,
prefetch=False):
def scan_portage(packages=[], category=None,
no_log=False, purge_packages=False,
purge_versions=False, prefetch=False):
"""
Scans portage for the given set of packages
"""
logger = _scan_portage_task.get_logger()
if packages:
logger.info("Starting portage scanning subtask for %d packages...",
len(packages))
else:
logger.info("Starting portage scanning for all packages...")
logger = scan_portage.get_logger()
scan_portage(
if packages:
logger.info("Starting portage scan for %d packages...",
len(packages))
elif category:
logger.info("Starting portage scan for %s...",
category)
else:
logger.info("Starting portage scan...")
scan.scan_portage(
packages=packages,
category=category,
no_log=no_log,
@@ -125,131 +105,77 @@ def _scan_portage_task(packages, category=None, no_log=False,
prefetch=prefetch,
logger=logger,
)
return True
@task
def scan_portage_list_task(query, no_log=False, purge_packages=False,
purge_versions=False, prefetch=False):
"""
Runs a parallel portage scan for packages in the query list (space
separated string). Task used only from the web interface.
"""
kwargs = {"no_log": no_log, "purge_packages": purge_packages,
"purge_versions": purge_versions, "prefetch": prefetch}
return _run_in_chunks(
_scan_portage_task, [p for p in query.split()], kwargs
)
@task
def scan_portage_all_task(no_log=False, purge_packages=False,
purge_versions=False, prefetch=False):
"""
Runs a syncronous portage scan for all packages
"""
_scan_portage_task(
packages=None,
category=None,
no_log=no_log,
purge_packages=purge_packages,
purge_versions=purge_versions,
prefetch=prefetch,
)
@task
def _scan_upstream_task(packages, purge_versions=False):
def scan_upstream(packages=[], purge_versions=False):
"""
Scans upstream for the given set of packages
"""
logger = _scan_upstream_task.get_logger()
logger = scan_upstream.get_logger()
logger.info("Starting upstream scanning subtask for %d packages...",
len(packages))
if len(packages):
logger.info("Starting upstream scan subtask for %d packages...",
len(packages))
else:
logger.info("Starting upstream scan...",
len(packages))
result = scan_upstream(
scan.scan_upstream(
packages=packages,
purge_versions=purge_versions,
logger=logger,
)
# TODO: implement some kind of error raising in case of failure
#if not result:
# raise TaskFailedException
return result
return True
@task
def scan_upstream_list_task(query, purge_versions=False):
"""
Runs a parallel upstream scan for packages in the query list (space
separated string). Task used only from the web interface.
"""
kwargs = {"purge_versions": purge_versions}
return _run_in_chunks(_scan_upstream_task, [p for p in query.split()],
kwargs)
@task
def scan_upstream_all_task(purge_versions=False):
"""
Runs a parallel portage scan for all packages
"""
kwargs = {"purge_versions": purge_versions}
return _run_in_chunks(
_scan_upstream_task,
Package.objects.all().order_by('?'),
kwargs
)
@task
def update_portage_trees_task():
def update_portage_trees():
"""
Update portage tree
"""
logger = update_portage_trees_task.get_logger()
update_portage_trees(logger=logger)
logger = update_portage_trees.get_logger()
misc.update_portage_trees(logger=logger)
return True
@task
def update_task(update_portage_trees=True, scan_portage=True,
scan_metadata=True, scan_upstream=True, update_counters=True):
"""
Update the whole euscan system
"""
if update_portage_trees:
update_portage_trees_task()
if scan_portage:
scan_portage_all_task(prefetch=True, purge_packages=True,
purge_versions=True)
def update_portage(packages=None):
(
update_portage_trees.s() |
scan_portage.si(purge_packages=True, purge_versions=True, prefetch=True) |
#scan_metadata.si() |
group_one(scan_metadata, portage.settings.categories) |
update_counters.si(fast=False)
)()
return True
# metadata and upstream scan can run concurrently, launch them
# in a group and wait for them to finish
tasks = []
if scan_metadata:
tasks.append(scan_metadata_all_task.subtask())
@task
def update_upstream():
if settings.TASKS_UPSTREAM_GROUPS >= 1:
packages = Package.objects.all()
if scan_upstream:
tasks.append(scan_upstream_all_task.subtask())
if update_counters:
chord(tasks)(
# immutable means that the result of previous tasks is not passed
update_counters_task.subtask((), {"fast": False}, immutable=True)
)
scan_upstream_sub = group_chunks(scan_upstream, packages,
settings.TASKS_UPSTREAM_GROUPS,
purge_versions=True)
else:
group(tasks)()
scan_upstream_sub = scan_upstream.si(purge_versions=True)
(
scan_upstream_sub |
update_counters.si(fast=False)
)()
return True
@task
def scan_package_task(package):
_scan_portage_task([package], purge_packages=True, purge_versions=True)
_scan_metadata_task([package])
_scan_upstream_task([package])
def scan_package(package):
scan_portage([package], purge_packages=True, purge_versions=True)
scan_metadata([package])
scan_upstream([package])
return True
# Periodic tasks
@task(rate_limit="1/m")
def scan_package_user(package):
scan_package(package)
return True
@task
def consume_refresh_package_request():
@@ -257,25 +183,39 @@ def consume_refresh_package_request():
Satisfies user requests for package refreshing, runs every minute
"""
try:
obj = RefreshPackageQuery.objects.all().order_by('-priority')[0]
query = RefreshPackageQuery.objects.all().order_by('-priority')[0]
except IndexError:
return {}
else:
result = scan_package_task(obj.package)
obj.delete()
return result
return
pkg = query.package
query.delete()
scan_package_user.delay(pkg)
admin_tasks = [
regen_rrds_task,
update_counters_task,
scan_metadata_list_task,
scan_metadata_all_task,
scan_portage_all_task,
scan_portage_list_task,
scan_upstream_all_task,
scan_upstream_list_task,
update_portage_trees_task,
update_task,
scan_package_task,
regen_rrds,
update_counters,
scan_metadata,
scan_portage,
scan_upstream,
update_portage_trees,
update_portage,
update_upstream,
scan_package,
]
""" Chunk helpers (chunks can't use keyword arguments) """
@task
def scan_metadata_category(category):
"""
Helper for calling scan_metadata with a category
"""
scan_metadata(category=category)
return True
@task
def scan_upstream_purge(*packages):
"""
Helper for calling scan_upstream with purge_versions=True
"""
scan_upstream(packages, purge_versions=True)
return True
@@ -1,5 +1,5 @@
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
<table id="table" class="display">
<thead>
@@ -1,5 +1,5 @@
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
<table id="table" class="display">
<thead>
@@ -1,5 +1,5 @@
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
<table id="table" class="display">
<thead>
@@ -1,4 +1,4 @@
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load mul %}
{% load sub %}
{% load div %}
@@ -1,8 +1,8 @@
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load sub %}
{% load div %}
{% load mul %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% block title %}
{{ block.super }} - Watched categories
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% block title %}
{{ block.super }} - Watched herds
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% block title %}
{{ block.super }} - Watched maintainers
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,7 +1,7 @@
{% extends "euscan/_datatable.html" %}
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% block title %}
{{ block.super }} - Watched packages
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,7 +1,7 @@
{% extends "euscan/_datatable.html" %}
{% load url from future %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% block title %}
{{ block.super }} - Overlay: {{ overlay }}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block title %}
@@ -1,7 +1,7 @@
{% extends "euscan/_datatable.html" %}
{% load sub %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% load url from future %}
{% block meta %}
@@ -1,6 +1,6 @@
{% extends "euscan/_datatable.html" %}
{% load euscan_tags %}
{% load djeuscan_helpers %}
{% block title %}
{{ block.super }} - World Scan
@@ -1,11 +1,10 @@
from django import template
from django.conf import settings
from euscan import helpers
from euscan.version import is_version_type_stable, get_version_type
register = template.Library()
@register.inclusion_tag('euscan/_packages.html', takes_context=True)
def packages(context, pkgs):
context['packages'] = pkgs
@@ -61,9 +60,9 @@ def overlays_table(overlays):
@register.filter
def is_stable(version_type):
return helpers.is_version_type_stable(version_type)
return is_version_type_stable(version_type)
@register.filter
def version_type(version):
return helpers.get_version_type(version)
return get_version_type(version)
+22
View File
@@ -0,0 +1,22 @@
def queryset_iterator(queryset, chunksize=1000):
'''''
Iterate over a Django Queryset ordered by the primary key
This method loads a maximum of chunksize (default: 1000) rows in it's
memory at the same time while django normally would load all rows in it's
memory. Using the iterator() method only causes it to not preload all the
classes.
Note that the implementation of the iterator does not support ordered query sets.
'''
import gc
pk = 0
last_pk = queryset.order_by('-pk')[0].pk
queryset = queryset.order_by('pk')
while pk < last_pk:
for row in queryset.filter(pk__gt=pk)[:chunksize]:
pk = row.pk
yield row
gc.collect()