euscanwww: Removed useless purge tasks, some error handling

Signed-off-by: volpino <fox91@anche.no>
This commit is contained in:
volpino 2012-06-12 13:05:58 +02:00
parent d34a0c4992
commit bd9af90ff6
2 changed files with 19 additions and 18 deletions

View File

@ -26,6 +26,9 @@ class TaskFailedException(Exception):
def _launch_command(cmd): def _launch_command(cmd):
"""
Helper for launching shell commands inside tasks
"""
fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, fp = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
output = StringIO(fp.communicate()[0]) output = StringIO(fp.communicate()[0])
@ -33,11 +36,18 @@ def _launch_command(cmd):
def _chunks(it, n): def _chunks(it, n):
"""
Chunk generator, takes an iterator and the desired size of the chunk
"""
for first in it: for first in it:
yield [first] + list(islice(it, n - 1)) yield [first] + list(islice(it, n - 1))
def _run_in_chunks(task, iterable, n=32): def _run_in_chunks(task, iterable, n=32):
"""
Runs the given task with the given iterable of args in chunks of
n subtasks
"""
output = [] output = []
for chunk in _chunks(iter(iterable), n): for chunk in _chunks(iter(iterable), n):
job = TaskSet(tasks=[ job = TaskSet(tasks=[
@ -46,7 +56,7 @@ def _run_in_chunks(task, iterable, n=32):
]) ])
result = job.apply_async() result = job.apply_async()
# TODO: understand why this causes timeout # TODO: understand why this causes timeout
#output.extend(list(result.join(timeout=3600))) output.extend(list(result.join(timeout=3600)))
return output return output
@ -66,7 +76,10 @@ def scan_metadata_task(query, obj=None):
logger.info("Starting metadata scanning for package %s ...", query) logger.info("Starting metadata scanning for package %s ...", query)
scan_metadata = ScanMetadata() scan_metadata = ScanMetadata()
return scan_metadata.scan(query, obj) result = scan_metadata.scan(query, obj)
if not result:
raise TaskFailedException("Couldn't scan metadata")
return result
@task @task
@ -111,11 +124,6 @@ def scan_portage_all_task(purge=False):
scan_portage_purge() scan_portage_purge()
@task
def scan_portage_purge_task():
scan_portage_purge()
@task @task
def scan_upstream_task(query): def scan_upstream_task(query):
logger = scan_upstream_task.get_logger() logger = scan_upstream_task.get_logger()
@ -123,8 +131,8 @@ def scan_upstream_task(query):
scan_upstream = ScanUpstream() scan_upstream = ScanUpstream()
result = scan_upstream.scan(query) result = scan_upstream.scan(query)
if not result: if not result or result == {}:
raise TaskFailedException("Couldn't scan upstream for this package") raise TaskFailedException("Couldn't scan upstream")
return result return result
@ -143,16 +151,11 @@ def scan_upstream_all_task(purge=False):
) )
if purge: if purge:
output += [scan_upstream_purge_task()] output += [scan_upstream_purge()]
return output return output
@task
def scan_upstream_purge_task():
scan_upstream_purge()
@task @task
def emerge_sync(): def emerge_sync():
cmd = ["emerge", "--sync", "--root", settings.PORTAGE_ROOT, cmd = ["emerge", "--sync", "--root", settings.PORTAGE_ROOT,
@ -201,10 +204,8 @@ admin_tasks = [
scan_metadata_all_task, scan_metadata_all_task,
scan_portage_all_task, scan_portage_all_task,
scan_portage_list_task, scan_portage_list_task,
scan_portage_purge_task,
scan_upstream_all_task, scan_upstream_all_task,
scan_upstream_list_task, scan_upstream_list_task,
scan_upstream_purge_task,
emerge_sync, emerge_sync,
layman_sync, layman_sync,
emerge_regen, emerge_regen,

View File

@ -215,7 +215,7 @@ import djcelery
djcelery.setup_loader() djcelery.setup_loader()
BROKER_URL = "amqp://guest:guest@localhost:5672//" BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp" CELERY_RESULT_BACKEND = "amqp"
BROKER_CONNECTION_TIMEOUT = 600 BROKER_CONNECTION_TIMEOUT = 3600
CELERYD_CONCURRENCY = 4 CELERYD_CONCURRENCY = 4