From bd9af90ff60908b5495272410cbc94aef06af82a Mon Sep 17 00:00:00 2001 From: volpino Date: Tue, 12 Jun 2012 13:05:58 +0200 Subject: [PATCH] euscanwww: Removed useless purge tasks, some error handling Signed-off-by: volpino --- euscanwww/djeuscan/tasks.py | 35 +++++++++++++++++---------------- euscanwww/euscanwww/settings.py | 2 +- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/euscanwww/djeuscan/tasks.py b/euscanwww/djeuscan/tasks.py index 425467b..a9341d4 100644 --- a/euscanwww/djeuscan/tasks.py +++ b/euscanwww/djeuscan/tasks.py @@ -26,6 +26,9 @@ class TaskFailedException(Exception): def _launch_command(cmd): + """ + Helper for launching shell commands inside tasks + """ fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = StringIO(fp.communicate()[0]) @@ -33,11 +36,18 @@ def _launch_command(cmd): def _chunks(it, n): + """ + Chunk generator, takes an iterator and the desired size of the chunk + """ for first in it: yield [first] + list(islice(it, n - 1)) def _run_in_chunks(task, iterable, n=32): + """ + Runs the given task with the given iterable of args in chunks of + n subtasks + """ output = [] for chunk in _chunks(iter(iterable), n): job = TaskSet(tasks=[ @@ -46,7 +56,7 @@ def _run_in_chunks(task, iterable, n=32): ]) result = job.apply_async() # TODO: understand why this causes timeout - #output.extend(list(result.join(timeout=3600))) + output.extend(list(result.join(timeout=3600))) return output @@ -66,7 +76,10 @@ def scan_metadata_task(query, obj=None): logger.info("Starting metadata scanning for package %s ...", query) scan_metadata = ScanMetadata() - return scan_metadata.scan(query, obj) + result = scan_metadata.scan(query, obj) + if not result: + raise TaskFailedException("Couldn't scan metadata") + return result @task @@ -111,11 +124,6 @@ def scan_portage_all_task(purge=False): scan_portage_purge() -@task -def scan_portage_purge_task(): - scan_portage_purge() - - @task def scan_upstream_task(query): logger = scan_upstream_task.get_logger() @@ -123,8 +131,8 @@ def scan_upstream_task(query): scan_upstream = ScanUpstream() result = scan_upstream.scan(query) - if not result: - raise TaskFailedException("Couldn't scan upstream for this package") + if not result or result == {}: + raise TaskFailedException("Couldn't scan upstream") return result @@ -143,16 +151,11 @@ def scan_upstream_all_task(purge=False): ) if purge: - output += [scan_upstream_purge_task()] + output += [scan_upstream_purge()] return output -@task -def scan_upstream_purge_task(): - scan_upstream_purge() - - @task def emerge_sync(): cmd = ["emerge", "--sync", "--root", settings.PORTAGE_ROOT, @@ -201,10 +204,8 @@ admin_tasks = [ scan_metadata_all_task, scan_portage_all_task, scan_portage_list_task, - scan_portage_purge_task, scan_upstream_all_task, scan_upstream_list_task, - scan_upstream_purge_task, emerge_sync, layman_sync, emerge_regen, diff --git a/euscanwww/euscanwww/settings.py b/euscanwww/euscanwww/settings.py index 6cae613..733c72c 100644 --- a/euscanwww/euscanwww/settings.py +++ b/euscanwww/euscanwww/settings.py @@ -215,7 +215,7 @@ import djcelery djcelery.setup_loader() BROKER_URL = "amqp://guest:guest@localhost:5672//" CELERY_RESULT_BACKEND = "amqp" -BROKER_CONNECTION_TIMEOUT = 600 +BROKER_CONNECTION_TIMEOUT = 3600 CELERYD_CONCURRENCY = 4