euscanwww: Refresh query logic and task queue split in chunks

Added the needed logic for running refresh queries from the web
interface.

Added a simple function to tasks.py that takes an iterable of arguments
and a task and launches many subtasks formed by 32 tasks each (for not
having a huge amount of tasks launched at the same time)

Signed-off-by: volpino <fox91@anche.no>
This commit is contained in:
volpino 2012-06-08 14:21:11 +02:00
parent 8913f826de
commit a94a24eeac
4 changed files with 82 additions and 41 deletions

View File

@ -1,12 +1,14 @@
import subprocess import subprocess
from StringIO import StringIO from StringIO import StringIO
from itertools import islice
from celery.task import task from celery.task import task, periodic_task
from celery.task.schedules import crontab
from celery.task.sets import TaskSet from celery.task.sets import TaskSet
from django.conf import settings from django.conf import settings
from djeuscan.models import Package from djeuscan.models import Package, RefreshPackageQuery
from djeuscan.management.commands.regen_rrds import regen_rrds from djeuscan.management.commands.regen_rrds import regen_rrds
from djeuscan.management.commands.update_counters import update_counters from djeuscan.management.commands.update_counters import update_counters
from djeuscan.management.commands.scan_metadata import ScanMetadata from djeuscan.management.commands.scan_metadata import ScanMetadata
@ -16,14 +18,38 @@ from djeuscan.management.commands.scan_upstream import ScanUpstream, \
purge_versions as scan_upstream_purge purge_versions as scan_upstream_purge
def _launch_command(cmd):
fp = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = StringIO(fp.communicate()[0])
return output.getvalue()
def _chunks(it, n):
for first in it:
yield [first] + list(islice(it, n - 1))
def _run_in_chunks(task, iterable, n=32):
output = []
for chunk in _chunks(iter(iterable), n):
job = TaskSet(tasks=[
task.subtask(args)
for args in chunk
])
result = job.apply_async()
output.extend(list(result.join()))
return output
@task @task
def regen_rrds_task(): def regen_rrds_task():
regen_rrds() return regen_rrds()
@task @task
def update_counters_task(): def update_counters_task():
update_counters() return update_counters()
@task @task
@ -32,33 +58,29 @@ def scan_metadata_task(query, obj=None):
logger.info("Starting metadata scanning for package %s ...", query) logger.info("Starting metadata scanning for package %s ...", query)
scan_metadata = ScanMetadata() scan_metadata = ScanMetadata()
scan_metadata.scan(query, obj) return scan_metadata.scan(query, obj)
@task @task
def scan_metadata_list_task(query): def scan_metadata_list_task(query):
job = TaskSet(tasks=[ return _run_in_chunks(scan_metadata_task, [(p, ) for p in query.split()])
scan_metadata_task.subtask((pkg, ))
for pkg in query.split()
])
job.apply_async()
@task @task
def scan_metadata_all_task(): def scan_metadata_all_task():
job = TaskSet(tasks=[ return _run_in_chunks(
scan_metadata_task.subtask(('%s/%s' % (pkg.category, pkg.name), pkg)) scan_metadata_task,
for pkg in Package.objects.all() [('%s/%s' % (pkg.category, pkg.name), pkg)
]) for pkg in Package.objects.all()]
job.apply_async() )
@task @task
def scan_portage_list_task(query, purge=False): def scan_portage_list_task(query, purge=False):
scan_portage = ScanPortage() scan_portage = ScanPortage()
logger = scan_portage_list_task.get_logger()
for pkg in query.split(): for pkg in query.split():
logger = scan_portage_list_task.get_logger()
logger.info("Starting Portage package scanning: %s ...", pkg) logger.info("Starting Portage package scanning: %s ...", pkg)
scan_portage.scan(pkg) scan_portage.scan(pkg)
@ -92,28 +114,26 @@ def scan_upstream_task(query):
logger.info("Starting upstream scanning for package %s ...", query) logger.info("Starting upstream scanning for package %s ...", query)
scan_upstream = ScanUpstream() scan_upstream = ScanUpstream()
scan_upstream.scan(query) return scan_upstream.scan(query)
@task @task
def scan_upstream_list_task(query): def scan_upstream_list_task(query):
job = TaskSet(tasks=[ return _run_in_chunks(scan_upstream_task, [(p, ) for p in query.split()])
scan_upstream_task.subtask((pkg, ))
for pkg in query.split()
])
job.apply_async()
@task @task
def scan_upstream_all_task(purge=False): def scan_upstream_all_task(purge=False):
tasks = [scan_upstream_task.subtask(('%s/%s' % (pkg.category, pkg.name), )) output = _run_in_chunks(
for pkg in Package.objects.all()] scan_upstream_task,
[('%s/%s' % (pkg.category, pkg.name), )
for pkg in Package.objects.all()]
)
if purge: if purge:
tasks.append(scan_upstream_purge_task.subtask()) output += [scan_upstream_purge_task()]
job = TaskSet(tasks=tasks) return output
job.apply_async()
@task @task
@ -121,13 +141,6 @@ def scan_upstream_purge_task():
scan_upstream_purge() scan_upstream_purge()
def _launch_command(cmd):
fp = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = StringIO(fp.communicate()[0])
return output.getvalue()
@task @task
def emerge_sync(): def emerge_sync():
cmd = ["emerge", "--sync", "--root", settings.PORTAGE_ROOT, cmd = ["emerge", "--sync", "--root", settings.PORTAGE_ROOT,
@ -157,7 +170,19 @@ def eix_update():
return _launch_command(cmd) return _launch_command(cmd)
launchable_tasks = [ @periodic_task(run_every=crontab(minute="*/1"))
def refresh_package_consume():
try:
obj = RefreshPackageQuery.objects.latest()
except RefreshPackageQuery.DoesNotExist:
return {}
else:
result = scan_upstream_task(obj.query)
obj.delete()
return result
admin_tasks = [
regen_rrds_task, regen_rrds_task,
update_counters_task, update_counters_task,
scan_metadata_list_task, scan_metadata_list_task,

View File

@ -140,8 +140,8 @@
<script type="text/javascript"> <script type="text/javascript">
$(".refresh-button").click(function() { $(".refresh-button").click(function() {
var url = "{% url "apply_task" "djeuscan.tasks.scan_upstream_task" %}"; var url = "{% url "refresh_package" "x/x" %}";
$.post(url, {query: $(this).data("package")}, function() { $.post(url.replace("x/x", $(this).data("package")), function() {
alert("Submitted!"); alert("Submitted!");
}); });
}); });

View File

@ -50,7 +50,10 @@ overlays_patterns = patterns('djeuscan.views',
url(r'^$', 'overlays', name="overlays"), url(r'^$', 'overlays', name="overlays"),
) )
tasks_patterns = patterns('djcelery.views', tasks_patterns = patterns('djeuscan.views',
url(r'^refresh_package/(?P<query>(?:[\w+][\w+.-]*/[\w+][\w+.-]*))/$',
"refresh_package",
name="refresh_package"),
url(r'^registered_tasks/$', admin_required(registered_tasks), url(r'^registered_tasks/$', admin_required(registered_tasks),
name="registered_tasks"), name="registered_tasks"),
url(r'^apply/(?P<task_name>.*)/$', admin_required(apply), url(r'^apply/(?P<task_name>.*)/$', admin_required(apply),

View File

@ -5,12 +5,14 @@ from annoying.decorators import render_to, ajax_request
from django.http import Http404 from django.http import Http404
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_POST
from djeuscan.helpers import version_key, packages_from_names from djeuscan.helpers import version_key, packages_from_names
from djeuscan.models import Version, Package, Herd, Maintainer, EuscanResult, \ from djeuscan.models import Version, Package, Herd, Maintainer, EuscanResult, \
VersionLog VersionLog, RefreshPackageQuery
from djeuscan.forms import WorldForm, PackagesForm from djeuscan.forms import WorldForm, PackagesForm
from djeuscan.tasks import launchable_tasks from djeuscan.tasks import admin_tasks
from djeuscan import charts from djeuscan import charts
@ -262,7 +264,7 @@ def chart_category(request, **kwargs):
@ajax_request @ajax_request
def registered_tasks(request): def registered_tasks(request):
data = {} data = {}
for task in launchable_tasks: for task in admin_tasks:
argspec = inspect.getargspec(task.run) argspec = inspect.getargspec(task.run)
data[task.name] = { data[task.name] = {
"args": argspec.args, "args": argspec.args,
@ -274,3 +276,14 @@ def registered_tasks(request):
"defaults_types": [type(x).__name__ for x in argspec.defaults] "defaults_types": [type(x).__name__ for x in argspec.defaults]
}) })
return {"tasks": data} return {"tasks": data}
@login_required
@require_POST
@ajax_request
def refresh_package(request, query):
obj, created = RefreshPackageQuery.objects.get_or_create(query=query)
if not created:
obj.priority += 1
obj.save()
return {"result": "success"}