From 62b1fbba4e9e846d4aa48a9ade08e3542d768dfc Mon Sep 17 00:00:00 2001 From: Corentin Chary Date: Fri, 3 Aug 2012 21:55:59 +0200 Subject: [PATCH] euscanwww: fix tasks - group_one was broken - new way of calling consume refresh queries Signed-off-by: Corentin Chary --- euscanwww/djeuscan/tasks.py | 66 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/euscanwww/djeuscan/tasks.py b/euscanwww/djeuscan/tasks.py index a474920..f3e89d4 100644 --- a/euscanwww/djeuscan/tasks.py +++ b/euscanwww/djeuscan/tasks.py @@ -5,6 +5,7 @@ Celery tasks for djeuscan from celery.task import task, group from django.conf import settings +from django.core.cache import cache import portage @@ -25,13 +26,18 @@ def group_one(task, seq, *args, **kwargs): """ tasks = [] + if "attr_name" in kwargs: + attr_name = kwargs['attr_name'] + del kwargs["attr_name"] + else: + attr_name = None + for elem in seq: - if "attr_name" in kwargs: - kwargs[kwargs["attr_name"]] = elem - del kwargs["attr_name"] - tasks.append(task.subtask(args=args, kwargs=kwargs)) + if attr_name: + kwargs[attr_name] = elem + tasks.append(task.subtask(args=args, kwargs=dict(kwargs))) else: - tasks.append(task.subtask(args=[elem] + list(args), kwargs=kwargs)) + tasks.append(task.subtask(args=[elem] + list(args), kwargs=dict(kwargs))) return group(tasks) @@ -203,19 +209,36 @@ def scan_package_user(package): @task -def consume_refresh_package_request(): +def consume_refresh_queue(locked=False): """ Satisfies user requests for package refreshing, runs every minute """ - try: - query = RefreshPackageQuery.objects.all().order_by('-priority')[0] - except IndexError: + LOCK_ID = 'lock-consume-refresh-queue' + unlock = lambda: cache.delete(LOCK_ID) + lock = lambda: cache.add(LOCK_ID, True, 120) + + logger = consume_refresh_queue.get_logger() + + if not locked and not lock(): return - pkg = query.package - query.delete() - scan_package_user.delay(pkg) + logger.info('Consumming package refresh request queue...') + try: + query = RefreshPackageQuery.objects.all().order_by('-priority')[0] + pkg = query.package + query.delete() + scan_package_user.delay(pkg) + logger.info('Done (%s)' % pkg) + except IndexError: + pass + finally: + unlock() + + if RefreshPackageQuery.objects.count(): + logger.info('Restarting myself in 60s') + lock() + consume_refresh_queue.apply_async(kwargs={'locked':True}, countdown=60) admin_tasks = [ regen_rrds, @@ -228,22 +251,3 @@ admin_tasks = [ update_upstream, scan_package, ] - - -# Chunk helpers (chunks can't use keyword arguments) -@task -def scan_metadata_category(category): - """ - Helper for calling scan_metadata with a category - """ - scan_metadata(category=category) - return True - - -@task -def scan_upstream_purge(*packages): - """ - Helper for calling scan_upstream with purge_versions=True - """ - scan_upstream(packages, purge_versions=True) - return True