From 03cf3b90a4c349e904ac2608675fcefe93887afe Mon Sep 17 00:00:00 2001 From: volpino Date: Sat, 7 Jul 2012 18:09:14 +0200 Subject: [PATCH] euscanwww: Upgraded to Celery 3.0 and fixed scan_portage Fixed TaskSet calls and synchronous stuff. Everything should be deadlock safe Signed-off-by: volpino --- euscanwww/djeuscan/tasks.py | 48 ++++++++++++++++----------------- euscanwww/euscanwww/settings.py | 6 +++-- pym/euscan/helpers.py | 5 ++-- setup.py | 2 +- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/euscanwww/djeuscan/tasks.py b/euscanwww/djeuscan/tasks.py index 7ecfa44..86b5105 100644 --- a/euscanwww/djeuscan/tasks.py +++ b/euscanwww/djeuscan/tasks.py @@ -4,8 +4,7 @@ Celery tasks for djeuscan from itertools import islice -from celery.task import task -from celery.task.sets import TaskSet +from celery.task import task, group, chord from django.conf import settings @@ -34,7 +33,7 @@ def _run_in_chunks(task, packages, kwargs=None, concurrently=settings.TASKS_CONCURRENTLY, n=settings.TASKS_SUBTASK_PACKAGES): """ - Launches a TaskSet at a time with subtasks. + Launches a group at a time with subtasks. Each subtask has packages to handle """ output = [] @@ -51,10 +50,7 @@ def _run_in_chunks(task, packages, kwargs=None, done = True else: tasks.append(task.subtask((chunk, ), kwargs)) - job = TaskSet(tasks=tasks) - result = job.apply_async() - # TODO: understand why this causes timeout - output.extend(list(result.join())) + output.extend(group(tasks)()) return output @@ -107,8 +103,9 @@ def scan_metadata_all_task(): @task -def _scan_portage_task(packages, no_log=False, purge_packages=False, - purge_versions=False, prefetch=False): +def _scan_portage_task(packages, category=None, no_log=False, + purge_packages=False, purge_versions=False, + prefetch=False): """ Scans portage for the given set of packages """ @@ -121,6 +118,7 @@ def _scan_portage_task(packages, no_log=False, purge_packages=False, scan_portage( packages=packages, + category=category, no_log=no_log, purge_packages=purge_packages, purge_versions=purge_versions, @@ -138,8 +136,9 @@ def scan_portage_list_task(query, no_log=False, purge_packages=False, """ kwargs = {"no_log": no_log, "purge_packages": purge_packages, "purge_versions": purge_versions, "prefetch": prefetch} - return _run_in_chunks(_scan_portage_task, [p for p in query.split()], - kwargs) + return _run_in_chunks( + _scan_portage_task, [p for p in query.split()], kwargs + ) @task @@ -148,8 +147,9 @@ def scan_portage_all_task(no_log=False, purge_packages=False, """ Runs a syncronous portage scan for all packages """ - return _scan_portage_task( + _scan_portage_task( packages=None, + category=None, no_log=no_log, purge_packages=purge_packages, purge_versions=purge_versions, @@ -214,7 +214,7 @@ def update_portage_trees_task(): @task def update_task(update_portage_trees=True, scan_portage=True, - scan_metadata=True, scan_upstream=True, update_counter=True): + scan_metadata=True, scan_upstream=True, update_counters=True): """ Update the whole euscan system """ @@ -225,21 +225,21 @@ def update_task(update_portage_trees=True, scan_portage=True, purge_versions=True) # metadata and upstream scan can run concurrently, launch them - # asynchronously and wait for them to finish - metadata_job = None + # in a group and wait for them to finish + tasks = [] if scan_metadata: - metadata_job = scan_metadata_all_task().delay() + tasks.append(scan_metadata_all_task.subtask()) - upstream_job = None if scan_upstream: - upstream_job = scan_upstream_all_task().delay() + tasks.append(scan_upstream_all_task.subtask()) - if metadata_job: - metadata_job.wait() - if upstream_job: - upstream_job.wait() - - update_counters(fast=False) + if update_counters: + chord(tasks)( + # immutable means that the result of previous tasks is not passed + update_counters_task.subtask((), {"fast": False}, immutable=True) + ) + else: + group(tasks)() @task diff --git a/euscanwww/euscanwww/settings.py b/euscanwww/euscanwww/settings.py index c37c174..be6cc85 100644 --- a/euscanwww/euscanwww/settings.py +++ b/euscanwww/euscanwww/settings.py @@ -225,7 +225,9 @@ EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend' # djeuscan tasks PORTAGE_ROOT = "/" PORTAGE_CONFIGROOT = "/" +EIX_CACHEFILE = os.path.join(PORTAGE_ROOT, 'var/cache/eix') LAYMAN_CONFIG = "/etc/layman/layman.cfg" + EGENCACHE_JOBS = 4 # Celery config @@ -236,7 +238,7 @@ CELERY_RESULT_BACKEND = "amqp" BROKER_CONNECTION_TIMEOUT = 3600 CELERYD_CONCURRENCY = 4 -TASKS_CONCURRENTLY = 4 +TASKS_CONCURRENTLY = 8 TASKS_SUBTASK_PACKAGES = 32 CELERYBEAT_SCHEDULE = { @@ -275,4 +277,4 @@ except ImportError, ex: os.environ['ROOT'] = PORTAGE_ROOT os.environ['PORTAGE_CONFIGROOT'] = PORTAGE_CONFIGROOT -os.environ['EIX_CACHEFILE'] = os.path.join(PORTAGE_ROOT, 'var/cache/eix') +os.environ['EIX_CACHEFILE'] = EIX_CACHEFILE diff --git a/pym/euscan/helpers.py b/pym/euscan/helpers.py index e50267a..5315dd4 100644 --- a/pym/euscan/helpers.py +++ b/pym/euscan/helpers.py @@ -17,6 +17,7 @@ import euscan from euscan import CONFIG, BLACKLIST_VERSIONS, ROBOTS_TXT_BLACKLIST_DOMAINS from euscan.version import parse_version + def htop_vercmp(a, b): def fixver(v): if v in ['0.11', '0.12', '0.13']: @@ -179,8 +180,8 @@ def simple_vercmp(a, b): return r # Fallback - a = pkg_parse_version(a) - b = pkg_parse_version(b) + a = parse_version(a) + b = parse_version(b) if a < b: return -1 diff --git a/setup.py b/setup.py index 2954d54..e9aeb1b 100755 --- a/setup.py +++ b/setup.py @@ -90,7 +90,7 @@ setup( install_requires=[ 'Django==1.4', 'django-annoying==0.7.6', 'South==0.7.4', 'django-piston==0.2.3', 'BeautifulSoup==3.2.1', 'matplotlib==1.1.0', - 'django-celery==2.5.5', 'django-registration==0.8', + 'django-celery==3.0.0', 'django-registration==0.8', 'python-ldap==2.4.10', 'django-auth-ldap==1.1', ], package_dir={'': 'pym'},