euscanwww: fix tasks
- group_one was broken - new way of calling consume refresh queries Signed-off-by: Corentin Chary <corentin.chary@gmail.com>
This commit is contained in:
parent
a8e6b09125
commit
62b1fbba4e
@ -5,6 +5,7 @@ Celery tasks for djeuscan
|
||||
from celery.task import task, group
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
|
||||
import portage
|
||||
|
||||
@ -25,13 +26,18 @@ def group_one(task, seq, *args, **kwargs):
|
||||
"""
|
||||
tasks = []
|
||||
|
||||
if "attr_name" in kwargs:
|
||||
attr_name = kwargs['attr_name']
|
||||
del kwargs["attr_name"]
|
||||
else:
|
||||
attr_name = None
|
||||
|
||||
for elem in seq:
|
||||
if "attr_name" in kwargs:
|
||||
kwargs[kwargs["attr_name"]] = elem
|
||||
del kwargs["attr_name"]
|
||||
tasks.append(task.subtask(args=args, kwargs=kwargs))
|
||||
if attr_name:
|
||||
kwargs[attr_name] = elem
|
||||
tasks.append(task.subtask(args=args, kwargs=dict(kwargs)))
|
||||
else:
|
||||
tasks.append(task.subtask(args=[elem] + list(args), kwargs=kwargs))
|
||||
tasks.append(task.subtask(args=[elem] + list(args), kwargs=dict(kwargs)))
|
||||
return group(tasks)
|
||||
|
||||
|
||||
@ -203,19 +209,36 @@ def scan_package_user(package):
|
||||
|
||||
|
||||
@task
|
||||
def consume_refresh_package_request():
|
||||
def consume_refresh_queue(locked=False):
|
||||
"""
|
||||
Satisfies user requests for package refreshing, runs every minute
|
||||
"""
|
||||
try:
|
||||
query = RefreshPackageQuery.objects.all().order_by('-priority')[0]
|
||||
except IndexError:
|
||||
LOCK_ID = 'lock-consume-refresh-queue'
|
||||
unlock = lambda: cache.delete(LOCK_ID)
|
||||
lock = lambda: cache.add(LOCK_ID, True, 120)
|
||||
|
||||
logger = consume_refresh_queue.get_logger()
|
||||
|
||||
if not locked and not lock():
|
||||
return
|
||||
|
||||
pkg = query.package
|
||||
query.delete()
|
||||
scan_package_user.delay(pkg)
|
||||
logger.info('Consumming package refresh request queue...')
|
||||
|
||||
try:
|
||||
query = RefreshPackageQuery.objects.all().order_by('-priority')[0]
|
||||
pkg = query.package
|
||||
query.delete()
|
||||
scan_package_user.delay(pkg)
|
||||
logger.info('Done (%s)' % pkg)
|
||||
except IndexError:
|
||||
pass
|
||||
finally:
|
||||
unlock()
|
||||
|
||||
if RefreshPackageQuery.objects.count():
|
||||
logger.info('Restarting myself in 60s')
|
||||
lock()
|
||||
consume_refresh_queue.apply_async(kwargs={'locked':True}, countdown=60)
|
||||
|
||||
admin_tasks = [
|
||||
regen_rrds,
|
||||
@ -228,22 +251,3 @@ admin_tasks = [
|
||||
update_upstream,
|
||||
scan_package,
|
||||
]
|
||||
|
||||
|
||||
# Chunk helpers (chunks can't use keyword arguments)
|
||||
@task
|
||||
def scan_metadata_category(category):
|
||||
"""
|
||||
Helper for calling scan_metadata with a category
|
||||
"""
|
||||
scan_metadata(category=category)
|
||||
return True
|
||||
|
||||
|
||||
@task
|
||||
def scan_upstream_purge(*packages):
|
||||
"""
|
||||
Helper for calling scan_upstream with purge_versions=True
|
||||
"""
|
||||
scan_upstream(packages, purge_versions=True)
|
||||
return True
|
||||
|
Loading…
Reference in New Issue
Block a user