euscanwww/tasks: simplify tasks

- strip '_task' end
- remove "launcher" functions, if we want complicated starter
  functions we will put them somewhere else later.
- now, everything is asynchroneous, maybe we could switch from
  group_one()/group_chunks() to .chunks() someday...

Signed-off-by: Corentin Chary <corentin.chary@gmail.com>
This commit is contained in:
Corentin Chary 2012-07-20 08:25:04 +02:00
parent 634e06b779
commit 2544af2e32
5 changed files with 142 additions and 196 deletions

View File

@ -9,7 +9,7 @@ or::
python setup.py install # to install euscan and requirements python setup.py install # to install euscan and requirements
If you prefer to use portage just install dev-python/django-celery-2.5.5 If you prefer to use portage just install dev-python/django-celery
There's the need of having a broker for tasks. The default and reccommended There's the need of having a broker for tasks. The default and reccommended
broker is RabbitMQ. broker is RabbitMQ.

View File

@ -342,9 +342,14 @@ def scan_portage(packages=None, category=None, no_log=False,
if prefetch: if prefetch:
logger.info('Prefetching objects...') logger.info('Prefetching objects...')
for package in Package.objects.all(): ppackages = Package.objects.all()
pversions = Version.objects.select_related('package').all()
if category:
ppackages = ppackages.filter(category=category)
pversions = pversions.filter(package__category=category)
for package in ppackages:
scan_handler.cache_store_package(package) scan_handler.cache_store_package(package)
for version in Version.objects.select_related('package').all(): for version in pversions:
scan_handler.cache_store_version(version) scan_handler.cache_store_version(version)
logger.info('done') logger.info('done')

View File

@ -153,9 +153,12 @@ def scan_upstream(packages=None, purge_versions=False,
for pkg in packages: for pkg in packages:
try: try:
scan_handler.scan('%s/%s' % (pkg.category, pkg.name)) package = '%s/%s' % (pkg.category, pkg.name)
except AttributeError: except AttributeError:
scan_handler.scan(pkg) package = pkg
logger.info('Scanning %s' % package)
scan_handler.scan(package)
scan_handler.purge_old_versions() scan_handler.purge_old_versions()

View File

@ -9,10 +9,8 @@ from celery.task import task, group, chord
from django.conf import settings from django.conf import settings
from djeuscan.models import Package, RefreshPackageQuery from djeuscan.models import Package, RefreshPackageQuery
from djeuscan.processing.misc import regen_rrds, update_counters, \ from djeuscan.processing import scan, misc
update_portage_trees from djeuscan.utils import queryset_iterator
from djeuscan.processing.scan import scan_metadata, scan_portage, scan_upstream
class TaskFailedException(Exception): class TaskFailedException(Exception):
""" """
@ -20,103 +18,85 @@ class TaskFailedException(Exception):
""" """
pass pass
def group_one(task, seq, *args, **kwargs):
def _chunks(it, n):
""" """
Chunk generator, takes an iterator and the desired size of the chunk Create a group of tasks, each task handle one element of seq
""" """
for first in it:
yield [first] + list(islice(it, n - 1))
def _run_in_chunks(task, packages, kwargs=None,
concurrently=settings.TASKS_CONCURRENTLY,
n=settings.TASKS_SUBTASK_PACKAGES):
"""
Launches a group at a time with <concurrently> subtasks.
Each subtask has <n> packages to handle
"""
output = []
chunk_generator = _chunks(iter(packages), n)
done = False
while not done:
tasks = [] tasks = []
for _ in range(concurrently): for i in seq:
try: tasks.append(task.subtask(args=[seq[i]] + list(args), kwargs=kwargs))
chunk = chunk_generator.next() return group(tasks)
except StopIteration:
done = True
else:
tasks.append(task.subtask((chunk, ), kwargs))
output.extend(group(tasks)())
return output
def group_chunks(task, seq, n, *args, **kwargs):
"""
Creates a group of tasks, each subtask has <n> elements to handle
"""
tasks = []
for i in xrange(0, len(seq), n):
tasks.append(task.subtask(args=[seq[i:i+n]] + list(args), kwargs=kwargs))
return group(tasks)
@task @task
def regen_rrds_task(): def regen_rrds():
""" """
Regenerate RRDs Regenerate RRDs
""" """
return regen_rrds() misc.regen_rrds()
return True
@task @task
def update_counters_task(fast=True): def update_counters(fast=False):
""" """
Updates counters Updates counters
""" """
return update_counters(fast=fast) logger = update_counters.get_logger()
logger.info("Updating counters (fast=%s)...", fast)
misc.update_counters(fast=fast)
logger.info("Done")
return True
@task @task
def _scan_metadata_task(packages): def scan_metadata(packages=[], category=None):
""" """
Scans metadata for the given set of packages Scans metadata for the given set of packages
""" """
logger = _scan_metadata_task.get_logger() logger = scan_metadata.get_logger()
logger.info("Starting metadata scanning subtask for %d packages...",
len(packages))
scan_metadata( if packages:
logger.info("Starting metadata scan for %d packages...",
len(packages))
elif category:
logger.info("Starting metadata scan for %s...",
category)
else:
logger.info("Starting metadata scan...")
scan.scan_metadata(
packages=packages, packages=packages,
category=category,
logger=logger, logger=logger,
) )
return True
@task @task
def scan_metadata_list_task(query): def scan_portage(packages=[], category=None,
""" no_log=False, purge_packages=False,
Runs a parallel metadata scan for packages in the query list (space purge_versions=False, prefetch=False):
separated string). Task used only from the web interface.
"""
return _run_in_chunks(_scan_metadata_task, [p for p in query.split()])
@task
def scan_metadata_all_task():
"""
Runs a parallel metadata scan for all packages
"""
return _run_in_chunks(_scan_metadata_task, Package.objects.all())
@task
def _scan_portage_task(packages, category=None, no_log=False,
purge_packages=False, purge_versions=False,
prefetch=False):
""" """
Scans portage for the given set of packages Scans portage for the given set of packages
""" """
logger = _scan_portage_task.get_logger() logger = scan_portage.get_logger()
if packages:
logger.info("Starting portage scanning subtask for %d packages...",
len(packages))
else:
logger.info("Starting portage scanning for all packages...")
scan_portage( if packages:
logger.info("Starting portage scan for %d packages...",
len(packages))
elif category:
logger.info("Starting portage scan for %s...",
category)
else:
logger.info("Starting portage scan...")
scan.scan_portage(
packages=packages, packages=packages,
category=category, category=category,
no_log=no_log, no_log=no_log,
@ -125,131 +105,77 @@ def _scan_portage_task(packages, category=None, no_log=False,
prefetch=prefetch, prefetch=prefetch,
logger=logger, logger=logger,
) )
return True
@task @task
def scan_portage_list_task(query, no_log=False, purge_packages=False, def scan_upstream(packages=[], purge_versions=False):
purge_versions=False, prefetch=False):
"""
Runs a parallel portage scan for packages in the query list (space
separated string). Task used only from the web interface.
"""
kwargs = {"no_log": no_log, "purge_packages": purge_packages,
"purge_versions": purge_versions, "prefetch": prefetch}
return _run_in_chunks(
_scan_portage_task, [p for p in query.split()], kwargs
)
@task
def scan_portage_all_task(no_log=False, purge_packages=False,
purge_versions=False, prefetch=False):
"""
Runs a syncronous portage scan for all packages
"""
_scan_portage_task(
packages=None,
category=None,
no_log=no_log,
purge_packages=purge_packages,
purge_versions=purge_versions,
prefetch=prefetch,
)
@task
def _scan_upstream_task(packages, purge_versions=False):
""" """
Scans upstream for the given set of packages Scans upstream for the given set of packages
""" """
logger = _scan_upstream_task.get_logger() logger = scan_upstream.get_logger()
logger.info("Starting upstream scanning subtask for %d packages...", if len(packages):
logger.info("Starting upstream scan subtask for %d packages...",
len(packages))
else:
logger.info("Starting upstream scan...",
len(packages)) len(packages))
result = scan_upstream( scan.scan_upstream(
packages=packages, packages=packages,
purge_versions=purge_versions, purge_versions=purge_versions,
logger=logger, logger=logger,
) )
# TODO: implement some kind of error raising in case of failure return True
#if not result:
# raise TaskFailedException
return result
@task @task
def scan_upstream_list_task(query, purge_versions=False): def update_portage_trees():
"""
Runs a parallel upstream scan for packages in the query list (space
separated string). Task used only from the web interface.
"""
kwargs = {"purge_versions": purge_versions}
return _run_in_chunks(_scan_upstream_task, [p for p in query.split()],
kwargs)
@task
def scan_upstream_all_task(purge_versions=False):
"""
Runs a parallel portage scan for all packages
"""
kwargs = {"purge_versions": purge_versions}
return _run_in_chunks(
_scan_upstream_task,
Package.objects.all().order_by('?'),
kwargs
)
@task
def update_portage_trees_task():
""" """
Update portage tree Update portage tree
""" """
logger = update_portage_trees_task.get_logger() logger = update_portage_trees.get_logger()
update_portage_trees(logger=logger) misc.update_portage_trees(logger=logger)
return True
@task @task
def update_task(update_portage_trees=True, scan_portage=True, def update_portage(packages=None):
scan_metadata=True, scan_upstream=True, update_counters=True): (
""" update_portage_trees.s() |
Update the whole euscan system scan_portage.si(purge_packages=True, purge_versions=True, prefetch=True) |
""" #scan_metadata.si() |
if update_portage_trees: group_one(scan_metadata, portage.settings.categories) |
update_portage_trees_task() update_counters.si(fast=False)
if scan_portage: )()
scan_portage_all_task(prefetch=True, purge_packages=True, return True
@task
def update_upstream():
if settings.TASKS_UPSTREAM_GROUPS >= 1:
packages = Package.objects.all()
scan_upstream_sub = group_chunks(scan_upstream, packages,
settings.TASKS_UPSTREAM_GROUPS,
purge_versions=True) purge_versions=True)
# metadata and upstream scan can run concurrently, launch them
# in a group and wait for them to finish
tasks = []
if scan_metadata:
tasks.append(scan_metadata_all_task.subtask())
if scan_upstream:
tasks.append(scan_upstream_all_task.subtask())
if update_counters:
chord(tasks)(
# immutable means that the result of previous tasks is not passed
update_counters_task.subtask((), {"fast": False}, immutable=True)
)
else: else:
group(tasks)() scan_upstream_sub = scan_upstream.si(purge_versions=True)
(
scan_upstream_sub |
update_counters.si(fast=False)
)()
return True
@task @task
def scan_package_task(package): def scan_package(package):
_scan_portage_task([package], purge_packages=True, purge_versions=True) scan_portage([package], purge_packages=True, purge_versions=True)
_scan_metadata_task([package]) scan_metadata([package])
_scan_upstream_task([package]) scan_upstream([package])
return True
@task(rate_limit="1/m")
# Periodic tasks def scan_package_user(package):
scan_package(package)
return True
@task @task
def consume_refresh_package_request(): def consume_refresh_package_request():
@ -257,25 +183,39 @@ def consume_refresh_package_request():
Satisfies user requests for package refreshing, runs every minute Satisfies user requests for package refreshing, runs every minute
""" """
try: try:
obj = RefreshPackageQuery.objects.all().order_by('-priority')[0] query = RefreshPackageQuery.objects.all().order_by('-priority')[0]
except IndexError: except IndexError:
return {} return
else:
result = scan_package_task(obj.package)
obj.delete()
return result
pkg = query.package
query.delete()
scan_package_user.delay(pkg)
admin_tasks = [ admin_tasks = [
regen_rrds_task, regen_rrds,
update_counters_task, update_counters,
scan_metadata_list_task, scan_metadata,
scan_metadata_all_task, scan_portage,
scan_portage_all_task, scan_upstream,
scan_portage_list_task, update_portage_trees,
scan_upstream_all_task, update_portage,
scan_upstream_list_task, update_upstream,
update_portage_trees_task, scan_package,
update_task,
scan_package_task,
] ]
""" Chunk helpers (chunks can't use keyword arguments) """
@task
def scan_metadata_category(category):
"""
Helper for calling scan_metadata with a category
"""
scan_metadata(category=category)
return True
@task
def scan_upstream_purge(*packages):
"""
Helper for calling scan_upstream with purge_versions=True
"""
scan_upstream(packages, purge_versions=True)
return True

View File

@ -159,7 +159,6 @@ TEMPLATE_CONTEXT_PROCESSORS = (
INSTALLED_APPS = ( INSTALLED_APPS = (
'euscanwww', 'euscanwww',
'djeuscan', 'djeuscan',
'django.contrib.auth', 'django.contrib.auth',
'django.contrib.contenttypes', 'django.contrib.contenttypes',
'django.contrib.sessions', 'django.contrib.sessions',
@ -237,8 +236,7 @@ CELERY_RESULT_BACKEND = "amqp"
BROKER_CONNECTION_TIMEOUT = 3600 BROKER_CONNECTION_TIMEOUT = 3600
CELERYD_CONCURRENCY = 4 CELERYD_CONCURRENCY = 4
TASKS_CONCURRENTLY = 8 TASKS_UPSTREAM_GROUPS = 32
TASKS_SUBTASK_PACKAGES = 32
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler" CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"