From a94a24eeac582dcb817b2599254bdf409606b155 Mon Sep 17 00:00:00 2001 From: volpino Date: Fri, 8 Jun 2012 14:21:11 +0200 Subject: [PATCH] euscanwww: Refresh query logic and task queue split in chunks Added the needed logic for running refresh queries from the web interface. Added a simple function to tasks.py that takes an iterable of arguments and a task and launches many subtasks formed by 32 tasks each (for not having a huge amount of tasks launched at the same time) Signed-off-by: volpino --- euscanwww/djeuscan/tasks.py | 95 ++++++++++++------- .../djeuscan/templates/euscan/package.html | 4 +- euscanwww/djeuscan/urls.py | 5 +- euscanwww/djeuscan/views.py | 19 +++- 4 files changed, 82 insertions(+), 41 deletions(-) diff --git a/euscanwww/djeuscan/tasks.py b/euscanwww/djeuscan/tasks.py index f29f69a..77064b0 100644 --- a/euscanwww/djeuscan/tasks.py +++ b/euscanwww/djeuscan/tasks.py @@ -1,12 +1,14 @@ import subprocess from StringIO import StringIO +from itertools import islice -from celery.task import task +from celery.task import task, periodic_task +from celery.task.schedules import crontab from celery.task.sets import TaskSet from django.conf import settings -from djeuscan.models import Package +from djeuscan.models import Package, RefreshPackageQuery from djeuscan.management.commands.regen_rrds import regen_rrds from djeuscan.management.commands.update_counters import update_counters from djeuscan.management.commands.scan_metadata import ScanMetadata @@ -16,14 +18,38 @@ from djeuscan.management.commands.scan_upstream import ScanUpstream, \ purge_versions as scan_upstream_purge +def _launch_command(cmd): + fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + output = StringIO(fp.communicate()[0]) + return output.getvalue() + + +def _chunks(it, n): + for first in it: + yield [first] + list(islice(it, n - 1)) + + +def _run_in_chunks(task, iterable, n=32): + output = [] + for chunk in _chunks(iter(iterable), n): + job = TaskSet(tasks=[ + task.subtask(args) + for args in chunk + ]) + result = job.apply_async() + output.extend(list(result.join())) + return output + + @task def regen_rrds_task(): - regen_rrds() + return regen_rrds() @task def update_counters_task(): - update_counters() + return update_counters() @task @@ -32,33 +58,29 @@ def scan_metadata_task(query, obj=None): logger.info("Starting metadata scanning for package %s ...", query) scan_metadata = ScanMetadata() - scan_metadata.scan(query, obj) + return scan_metadata.scan(query, obj) @task def scan_metadata_list_task(query): - job = TaskSet(tasks=[ - scan_metadata_task.subtask((pkg, )) - for pkg in query.split() - ]) - job.apply_async() + return _run_in_chunks(scan_metadata_task, [(p, ) for p in query.split()]) @task def scan_metadata_all_task(): - job = TaskSet(tasks=[ - scan_metadata_task.subtask(('%s/%s' % (pkg.category, pkg.name), pkg)) - for pkg in Package.objects.all() - ]) - job.apply_async() + return _run_in_chunks( + scan_metadata_task, + [('%s/%s' % (pkg.category, pkg.name), pkg) + for pkg in Package.objects.all()] + ) @task def scan_portage_list_task(query, purge=False): scan_portage = ScanPortage() + logger = scan_portage_list_task.get_logger() for pkg in query.split(): - logger = scan_portage_list_task.get_logger() logger.info("Starting Portage package scanning: %s ...", pkg) scan_portage.scan(pkg) @@ -92,28 +114,26 @@ def scan_upstream_task(query): logger.info("Starting upstream scanning for package %s ...", query) scan_upstream = ScanUpstream() - scan_upstream.scan(query) + return scan_upstream.scan(query) @task def scan_upstream_list_task(query): - job = TaskSet(tasks=[ - scan_upstream_task.subtask((pkg, )) - for pkg in query.split() - ]) - job.apply_async() + return _run_in_chunks(scan_upstream_task, [(p, ) for p in query.split()]) @task def scan_upstream_all_task(purge=False): - tasks = [scan_upstream_task.subtask(('%s/%s' % (pkg.category, pkg.name), )) - for pkg in Package.objects.all()] + output = _run_in_chunks( + scan_upstream_task, + [('%s/%s' % (pkg.category, pkg.name), ) + for pkg in Package.objects.all()] + ) if purge: - tasks.append(scan_upstream_purge_task.subtask()) + output += [scan_upstream_purge_task()] - job = TaskSet(tasks=tasks) - job.apply_async() + return output @task @@ -121,13 +141,6 @@ def scan_upstream_purge_task(): scan_upstream_purge() -def _launch_command(cmd): - fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - output = StringIO(fp.communicate()[0]) - return output.getvalue() - - @task def emerge_sync(): cmd = ["emerge", "--sync", "--root", settings.PORTAGE_ROOT, @@ -157,7 +170,19 @@ def eix_update(): return _launch_command(cmd) -launchable_tasks = [ +@periodic_task(run_every=crontab(minute="*/1")) +def refresh_package_consume(): + try: + obj = RefreshPackageQuery.objects.latest() + except RefreshPackageQuery.DoesNotExist: + return {} + else: + result = scan_upstream_task(obj.query) + obj.delete() + return result + + +admin_tasks = [ regen_rrds_task, update_counters_task, scan_metadata_list_task, diff --git a/euscanwww/djeuscan/templates/euscan/package.html b/euscanwww/djeuscan/templates/euscan/package.html index 5b3fb81..f042d04 100644 --- a/euscanwww/djeuscan/templates/euscan/package.html +++ b/euscanwww/djeuscan/templates/euscan/package.html @@ -140,8 +140,8 @@