From 2544af2e32b80c2108839b2e1bf4c3b0fbeb50ca Mon Sep 17 00:00:00 2001 From: Corentin Chary Date: Fri, 20 Jul 2012 08:25:04 +0200 Subject: [PATCH] euscanwww/tasks: simplify tasks - strip '_task' end - remove "launcher" functions, if we want complicated starter functions we will put them somewhere else later. - now, everything is asynchroneous, maybe we could switch from group_one()/group_chunks() to .chunks() someday... Signed-off-by: Corentin Chary --- euscanwww/README.Celery | 2 +- .../djeuscan/processing/scan/scan_portage.py | 9 +- .../djeuscan/processing/scan/scan_upstream.py | 7 +- euscanwww/djeuscan/tasks.py | 316 +++++++----------- euscanwww/euscanwww/settings.py | 4 +- 5 files changed, 142 insertions(+), 196 deletions(-) diff --git a/euscanwww/README.Celery b/euscanwww/README.Celery index f2a5149..00884c5 100644 --- a/euscanwww/README.Celery +++ b/euscanwww/README.Celery @@ -9,7 +9,7 @@ or:: python setup.py install # to install euscan and requirements -If you prefer to use portage just install dev-python/django-celery-2.5.5 +If you prefer to use portage just install dev-python/django-celery There's the need of having a broker for tasks. The default and reccommended broker is RabbitMQ. diff --git a/euscanwww/djeuscan/processing/scan/scan_portage.py b/euscanwww/djeuscan/processing/scan/scan_portage.py index 022cff0..306595b 100644 --- a/euscanwww/djeuscan/processing/scan/scan_portage.py +++ b/euscanwww/djeuscan/processing/scan/scan_portage.py @@ -342,9 +342,14 @@ def scan_portage(packages=None, category=None, no_log=False, if prefetch: logger.info('Prefetching objects...') - for package in Package.objects.all(): + ppackages = Package.objects.all() + pversions = Version.objects.select_related('package').all() + if category: + ppackages = ppackages.filter(category=category) + pversions = pversions.filter(package__category=category) + for package in ppackages: scan_handler.cache_store_package(package) - for version in Version.objects.select_related('package').all(): + for version in pversions: scan_handler.cache_store_version(version) logger.info('done') diff --git a/euscanwww/djeuscan/processing/scan/scan_upstream.py b/euscanwww/djeuscan/processing/scan/scan_upstream.py index e3ee86a..8461a25 100644 --- a/euscanwww/djeuscan/processing/scan/scan_upstream.py +++ b/euscanwww/djeuscan/processing/scan/scan_upstream.py @@ -153,9 +153,12 @@ def scan_upstream(packages=None, purge_versions=False, for pkg in packages: try: - scan_handler.scan('%s/%s' % (pkg.category, pkg.name)) + package = '%s/%s' % (pkg.category, pkg.name) except AttributeError: - scan_handler.scan(pkg) + package = pkg + + logger.info('Scanning %s' % package) + scan_handler.scan(package) scan_handler.purge_old_versions() diff --git a/euscanwww/djeuscan/tasks.py b/euscanwww/djeuscan/tasks.py index 86b5105..512ad91 100644 --- a/euscanwww/djeuscan/tasks.py +++ b/euscanwww/djeuscan/tasks.py @@ -9,10 +9,8 @@ from celery.task import task, group, chord from django.conf import settings from djeuscan.models import Package, RefreshPackageQuery -from djeuscan.processing.misc import regen_rrds, update_counters, \ - update_portage_trees -from djeuscan.processing.scan import scan_metadata, scan_portage, scan_upstream - +from djeuscan.processing import scan, misc +from djeuscan.utils import queryset_iterator class TaskFailedException(Exception): """ @@ -20,103 +18,85 @@ class TaskFailedException(Exception): """ pass - -def _chunks(it, n): +def group_one(task, seq, *args, **kwargs): """ - Chunk generator, takes an iterator and the desired size of the chunk + Create a group of tasks, each task handle one element of seq """ - for first in it: - yield [first] + list(islice(it, n - 1)) + tasks = [] + for i in seq: + tasks.append(task.subtask(args=[seq[i]] + list(args), kwargs=kwargs)) + return group(tasks) - -def _run_in_chunks(task, packages, kwargs=None, - concurrently=settings.TASKS_CONCURRENTLY, - n=settings.TASKS_SUBTASK_PACKAGES): +def group_chunks(task, seq, n, *args, **kwargs): """ - Launches a group at a time with subtasks. - Each subtask has packages to handle + Creates a group of tasks, each subtask has elements to handle """ - output = [] - - chunk_generator = _chunks(iter(packages), n) - done = False - - while not done: - tasks = [] - for _ in range(concurrently): - try: - chunk = chunk_generator.next() - except StopIteration: - done = True - else: - tasks.append(task.subtask((chunk, ), kwargs)) - output.extend(group(tasks)()) - return output - + tasks = [] + for i in xrange(0, len(seq), n): + tasks.append(task.subtask(args=[seq[i:i+n]] + list(args), kwargs=kwargs)) + return group(tasks) @task -def regen_rrds_task(): +def regen_rrds(): """ Regenerate RRDs """ - return regen_rrds() - + misc.regen_rrds() + return True @task -def update_counters_task(fast=True): +def update_counters(fast=False): """ Updates counters """ - return update_counters(fast=fast) - + logger = update_counters.get_logger() + logger.info("Updating counters (fast=%s)...", fast) + misc.update_counters(fast=fast) + logger.info("Done") + return True @task -def _scan_metadata_task(packages): +def scan_metadata(packages=[], category=None): """ Scans metadata for the given set of packages """ - logger = _scan_metadata_task.get_logger() - logger.info("Starting metadata scanning subtask for %d packages...", - len(packages)) + logger = scan_metadata.get_logger() - scan_metadata( + if packages: + logger.info("Starting metadata scan for %d packages...", + len(packages)) + elif category: + logger.info("Starting metadata scan for %s...", + category) + else: + logger.info("Starting metadata scan...") + + scan.scan_metadata( packages=packages, + category=category, logger=logger, ) - + return True @task -def scan_metadata_list_task(query): - """ - Runs a parallel metadata scan for packages in the query list (space - separated string). Task used only from the web interface. - """ - return _run_in_chunks(_scan_metadata_task, [p for p in query.split()]) - - -@task -def scan_metadata_all_task(): - """ - Runs a parallel metadata scan for all packages - """ - return _run_in_chunks(_scan_metadata_task, Package.objects.all()) - - -@task -def _scan_portage_task(packages, category=None, no_log=False, - purge_packages=False, purge_versions=False, - prefetch=False): +def scan_portage(packages=[], category=None, + no_log=False, purge_packages=False, + purge_versions=False, prefetch=False): """ Scans portage for the given set of packages """ - logger = _scan_portage_task.get_logger() - if packages: - logger.info("Starting portage scanning subtask for %d packages...", - len(packages)) - else: - logger.info("Starting portage scanning for all packages...") + logger = scan_portage.get_logger() - scan_portage( + if packages: + logger.info("Starting portage scan for %d packages...", + len(packages)) + elif category: + logger.info("Starting portage scan for %s...", + category) + else: + logger.info("Starting portage scan...") + + scan.scan_portage( packages=packages, category=category, no_log=no_log, @@ -125,131 +105,77 @@ def _scan_portage_task(packages, category=None, no_log=False, prefetch=prefetch, logger=logger, ) - + return True @task -def scan_portage_list_task(query, no_log=False, purge_packages=False, - purge_versions=False, prefetch=False): - """ - Runs a parallel portage scan for packages in the query list (space - separated string). Task used only from the web interface. - """ - kwargs = {"no_log": no_log, "purge_packages": purge_packages, - "purge_versions": purge_versions, "prefetch": prefetch} - return _run_in_chunks( - _scan_portage_task, [p for p in query.split()], kwargs - ) - - -@task -def scan_portage_all_task(no_log=False, purge_packages=False, - purge_versions=False, prefetch=False): - """ - Runs a syncronous portage scan for all packages - """ - _scan_portage_task( - packages=None, - category=None, - no_log=no_log, - purge_packages=purge_packages, - purge_versions=purge_versions, - prefetch=prefetch, - ) - - -@task -def _scan_upstream_task(packages, purge_versions=False): +def scan_upstream(packages=[], purge_versions=False): """ Scans upstream for the given set of packages """ - logger = _scan_upstream_task.get_logger() + logger = scan_upstream.get_logger() - logger.info("Starting upstream scanning subtask for %d packages...", - len(packages)) + if len(packages): + logger.info("Starting upstream scan subtask for %d packages...", + len(packages)) + else: + logger.info("Starting upstream scan...", + len(packages)) - result = scan_upstream( + scan.scan_upstream( packages=packages, purge_versions=purge_versions, logger=logger, ) - # TODO: implement some kind of error raising in case of failure - #if not result: - # raise TaskFailedException - return result - + return True @task -def scan_upstream_list_task(query, purge_versions=False): - """ - Runs a parallel upstream scan for packages in the query list (space - separated string). Task used only from the web interface. - """ - - kwargs = {"purge_versions": purge_versions} - return _run_in_chunks(_scan_upstream_task, [p for p in query.split()], - kwargs) - - -@task -def scan_upstream_all_task(purge_versions=False): - """ - Runs a parallel portage scan for all packages - """ - kwargs = {"purge_versions": purge_versions} - return _run_in_chunks( - _scan_upstream_task, - Package.objects.all().order_by('?'), - kwargs - ) - - -@task -def update_portage_trees_task(): +def update_portage_trees(): """ Update portage tree """ - logger = update_portage_trees_task.get_logger() - update_portage_trees(logger=logger) - + logger = update_portage_trees.get_logger() + misc.update_portage_trees(logger=logger) + return True @task -def update_task(update_portage_trees=True, scan_portage=True, - scan_metadata=True, scan_upstream=True, update_counters=True): - """ - Update the whole euscan system - """ - if update_portage_trees: - update_portage_trees_task() - if scan_portage: - scan_portage_all_task(prefetch=True, purge_packages=True, - purge_versions=True) +def update_portage(packages=None): + ( + update_portage_trees.s() | + scan_portage.si(purge_packages=True, purge_versions=True, prefetch=True) | + #scan_metadata.si() | + group_one(scan_metadata, portage.settings.categories) | + update_counters.si(fast=False) + )() + return True - # metadata and upstream scan can run concurrently, launch them - # in a group and wait for them to finish - tasks = [] - if scan_metadata: - tasks.append(scan_metadata_all_task.subtask()) +@task +def update_upstream(): + if settings.TASKS_UPSTREAM_GROUPS >= 1: + packages = Package.objects.all() - if scan_upstream: - tasks.append(scan_upstream_all_task.subtask()) - - if update_counters: - chord(tasks)( - # immutable means that the result of previous tasks is not passed - update_counters_task.subtask((), {"fast": False}, immutable=True) - ) + scan_upstream_sub = group_chunks(scan_upstream, packages, + settings.TASKS_UPSTREAM_GROUPS, + purge_versions=True) else: - group(tasks)() + scan_upstream_sub = scan_upstream.si(purge_versions=True) + ( + scan_upstream_sub | + update_counters.si(fast=False) + )() + return True @task -def scan_package_task(package): - _scan_portage_task([package], purge_packages=True, purge_versions=True) - _scan_metadata_task([package]) - _scan_upstream_task([package]) +def scan_package(package): + scan_portage([package], purge_packages=True, purge_versions=True) + scan_metadata([package]) + scan_upstream([package]) + return True - -# Periodic tasks +@task(rate_limit="1/m") +def scan_package_user(package): + scan_package(package) + return True @task def consume_refresh_package_request(): @@ -257,25 +183,39 @@ def consume_refresh_package_request(): Satisfies user requests for package refreshing, runs every minute """ try: - obj = RefreshPackageQuery.objects.all().order_by('-priority')[0] + query = RefreshPackageQuery.objects.all().order_by('-priority')[0] except IndexError: - return {} - else: - result = scan_package_task(obj.package) - obj.delete() - return result + return + pkg = query.package + query.delete() + scan_package_user.delay(pkg) admin_tasks = [ - regen_rrds_task, - update_counters_task, - scan_metadata_list_task, - scan_metadata_all_task, - scan_portage_all_task, - scan_portage_list_task, - scan_upstream_all_task, - scan_upstream_list_task, - update_portage_trees_task, - update_task, - scan_package_task, + regen_rrds, + update_counters, + scan_metadata, + scan_portage, + scan_upstream, + update_portage_trees, + update_portage, + update_upstream, + scan_package, ] + +""" Chunk helpers (chunks can't use keyword arguments) """ +@task +def scan_metadata_category(category): + """ + Helper for calling scan_metadata with a category + """ + scan_metadata(category=category) + return True + +@task +def scan_upstream_purge(*packages): + """ + Helper for calling scan_upstream with purge_versions=True + """ + scan_upstream(packages, purge_versions=True) + return True diff --git a/euscanwww/euscanwww/settings.py b/euscanwww/euscanwww/settings.py index 7a936eb..fc6b668 100644 --- a/euscanwww/euscanwww/settings.py +++ b/euscanwww/euscanwww/settings.py @@ -159,7 +159,6 @@ TEMPLATE_CONTEXT_PROCESSORS = ( INSTALLED_APPS = ( 'euscanwww', 'djeuscan', - 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', @@ -237,8 +236,7 @@ CELERY_RESULT_BACKEND = "amqp" BROKER_CONNECTION_TIMEOUT = 3600 CELERYD_CONCURRENCY = 4 -TASKS_CONCURRENTLY = 8 -TASKS_SUBTASK_PACKAGES = 32 +TASKS_UPSTREAM_GROUPS = 32 CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"