From 56c4d79fb2c30a3481c80693875f24c9a871bd51 Mon Sep 17 00:00:00 2001 From: volpino Date: Wed, 30 May 2012 22:54:55 +0200 Subject: [PATCH] euscanwww: starting implementing Celery tasks Basic tasks.py module, some fixes in the management commands Signed-off-by: volpino --- .../management/commands/scan_metadata.py | 8 +- .../management/commands/scan_portage.py | 21 ++--- .../management/commands/scan_upstream.py | 49 +++++------ euscanwww/djeuscan/tasks.py | 83 +++++++++++++++++++ euscanwww/euscanwww/settings.py | 9 ++ 5 files changed, 126 insertions(+), 44 deletions(-) create mode 100644 euscanwww/djeuscan/tasks.py diff --git a/euscanwww/djeuscan/management/commands/scan_metadata.py b/euscanwww/djeuscan/management/commands/scan_metadata.py index 8590412..d9c7e49 100644 --- a/euscanwww/djeuscan/management/commands/scan_metadata.py +++ b/euscanwww/djeuscan/management/commands/scan_metadata.py @@ -18,7 +18,7 @@ class ScanMetadata(object): self.style = color_style() @commit_on_success - def run(self, query=None, obj=None): + def scan(self, query=None, obj=None): matches = Query(query).find( include_masked=True, in_installed=False, @@ -158,10 +158,10 @@ class Command(BaseCommand): if options['all']: for pkg in Package.objects.all(): - scan_metadata.run('%s/%s' % (pkg.category, pkg.name), pkg) + scan_metadata.scan('%s/%s' % (pkg.category, pkg.name), pkg) elif len(args) > 0: for package in args: - scan_metadata.run(package) + scan_metadata.scan(package) else: for package in sys.stdin.readlines(): - scan_metadata.run(package[:-1]) + scan_metadata.scan(package[:-1]) diff --git a/euscanwww/djeuscan/management/commands/scan_portage.py b/euscanwww/djeuscan/management/commands/scan_portage.py index 4227501..9533730 100644 --- a/euscanwww/djeuscan/management/commands/scan_portage.py +++ b/euscanwww/djeuscan/management/commands/scan_portage.py @@ -3,8 +3,8 @@ import portage import sys import os import re - from optparse import make_option +from collections import defaultdict from django.db.transaction import commit_on_success from django.core.management.base import BaseCommand @@ -14,13 +14,10 @@ from djeuscan.models import Package, Version, VersionLog class ScanPortage(object): - def __init__(self, stdout=None, **options): - if stdout is None: - self.stdout = sys.stdout - else: - self.stdout = stdout + def __init__(self, stdout=None, options=None): + self.stdout = sys.stdout if stdout is None else stdout + self.options = defaultdict(None) if options is None else options - self.options = options self.style = color_style() self._cache = {'packages': {}, 'versions': {}} self._overlays = None @@ -84,7 +81,7 @@ class ScanPortage(object): return self._overlays @commit_on_success - def run(self, query=None): + def scan(self, query=None): env = os.environ env['MY'] = "/-: []\n" @@ -312,7 +309,7 @@ class Command(BaseCommand): help = 'Scans portage tree and fills database' def handle(self, *args, **options): - scan_portage = ScanPortage(stdout=self.stdout, **options) + scan_portage = ScanPortage(stdout=self.stdout, options=options) if not options['quiet']: self.stdout.write('Scanning portage tree...\n') @@ -329,13 +326,13 @@ class Command(BaseCommand): self.stdout.write('done\n') if options['all']: - scan_portage.run() + scan_portage.scan() elif len(args): for package in args: - scan_portage.run(package) + scan_portage.scan(package) else: for package in sys.stdin.readlines(): - scan_portage.run(package[:-1]) + scan_portage.scan(package[:-1]) if options['purge-versions']: purge_versions(options) diff --git a/euscanwww/djeuscan/management/commands/scan_upstream.py b/euscanwww/djeuscan/management/commands/scan_upstream.py index f6e78c3..2e2d9c2 100644 --- a/euscanwww/djeuscan/management/commands/scan_upstream.py +++ b/euscanwww/djeuscan/management/commands/scan_upstream.py @@ -4,7 +4,6 @@ import sys import re from StringIO import StringIO from optparse import make_option -from collections import defaultdict from django.utils import timezone from django.db.transaction import commit_on_success @@ -14,21 +13,17 @@ from djeuscan.models import Package, Version, EuscanResult, VersionLog class ScanUpstream(object): - def __init__(self, options=None): - if options is None: - self.options = defaultdict(None) - else: - self.options = options + def __init__(self, quiet=False): + self.quiet = quiet - def run(self, packages=None): - for package in packages: - cmd = ['euscan', package] + def scan(self, package): + cmd = ['euscan', package] - fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - output = StringIO(fp.communicate()[0]) + fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + output = StringIO(fp.communicate()[0]) - self.parse_output(output) + self.parse_output(output) def store_result(self, package, log): # Remove previous logs @@ -45,7 +40,7 @@ class ScanUpstream(object): obj, created = Package.objects.get_or_create(category=cat, name=pkg) - if created and not self.options['quiet']: + if created and not self.quiet: sys.stdout.write('+ [p] %s/%s\n' % (cat, pkg)) # Set all versions dead, then set found versions alive and @@ -69,7 +64,7 @@ class ScanUpstream(object): if not created: return - if not self.options['quiet']: + if not self.quiet: sys.stdout.write('+ [u] %s %s\n' % (obj, url)) VersionLog.objects.create( @@ -125,7 +120,7 @@ class ScanUpstream(object): @commit_on_success -def purge_versions(options): +def purge_versions(quiet=False): # For each dead versions for version in Version.objects.filter(packaged=False, alive=False): VersionLog.objects.create( @@ -140,7 +135,7 @@ def purge_versions(options): version.package.n_versions -= 1 version.package.save() - if not options['quiet']: + if not quiet: sys.stdout.write('- [u] %s %s\n' % (version, version.urls)) Version.objects.filter(packaged=False, alive=False).delete() @@ -174,31 +169,29 @@ class Command(BaseCommand): help = 'Scans metadata and fills database' def handle(self, *args, **options): - scan_upstream = ScanUpstream(options) + scan_upstream = ScanUpstream(options["quiet"]) if options['feed']: - scan_upstream.parse_output(options, sys.stdin) + scan_upstream.parse_output(sys.stdin) if options['purge-versions']: - purge_versions(options) + purge_versions(options["quiet"]) return if not options['quiet']: self.stdout.write('Scanning upstream...\n') - packages = [] - if options['all']: for pkg in Package.objects.all(): - packages.append('%s/%s' % (pkg.category, pkg.name)) + scan_upstream.scan('%s/%s' % (pkg.category, pkg.name)) elif args: - packages = list(args) + for arg in args: + scan_upstream.scan(arg) else: - packages = [package[:-1] for package in sys.stdin.readlines()] - - scan_upstream.run(packages) + for package in sys.stdin.readlines(): + scan_upstream.scan(package[:-1]) if options['purge-versions']: - purge_versions(options) + purge_versions(options["quiet"]) if not options['quiet']: self.stdout.write('Done.\n') diff --git a/euscanwww/djeuscan/tasks.py b/euscanwww/djeuscan/tasks.py new file mode 100644 index 0000000..fbc216f --- /dev/null +++ b/euscanwww/djeuscan/tasks.py @@ -0,0 +1,83 @@ +from celery.task import task +from celery.task.sets import TaskSet + +from djeuscan.models import Package + +from djeuscan.management.commands.regen_rrds import regen_rrds +from djeuscan.management.commands.update_counters import update_counters +from djeuscan.management.commands.scan_metadata import ScanMetadata +from djeuscan.management.commands.scan_portage import ScanPortage, \ + purge_versions as scan_portage_purge +from djeuscan.management.commands.scan_upstream import ScanUpstream, \ + purge_versions as scan_upstream_purge + + +@task +def regen_rrds_task(): + regen_rrds() + + +@task +def update_counters_task(): + update_counters() + + +@task +def scan_metadata_task(query, obj=None): + scan_metadata = ScanMetadata() + scan_metadata.scan(query) + + +@task +def scan_metadata_all_task(): + job = TaskSet(tasks=[ + scan_metadata_task.subtask(('%s/%s' % (pkg.category, pkg.name), pkg)) + for pkg in Package.objects.all() + ]) + job.apply_async() + + +@task +def scan_portage_all_task(purge=False): + scan_portage = ScanPortage() + scan_portage.scan() + + if purge: + scan_portage_purge() + + +@task +def scan_portage_task(query, purge=False): + scan_portage = ScanPortage() + scan_portage.scan(query) + + if purge: + scan_portage_purge() + + +@task +def scan_portage_purge_task(): + scan_portage_purge() + + +@task +def scan_upstream_all_task(purge=False): + tasks = [scan_upstream_task.subtask(('%s/%s' % (pkg.category, pkg.name))) + for pkg in Package.objects.all()] + + if purge: + tasks.append(scan_upstream_purge_task.subtask()) + + job = TaskSet(tasks=tasks) + job.apply_async() + + +@task +def scan_upstream_task(query): + scan_upstream = ScanUpstream() + scan_upstream.scan(query) + + +@task +def scan_upstream_purge_task(): + scan_upstream_purge() diff --git a/euscanwww/euscanwww/settings.py b/euscanwww/euscanwww/settings.py index 0365bc7..0f2b483 100644 --- a/euscanwww/euscanwww/settings.py +++ b/euscanwww/euscanwww/settings.py @@ -166,6 +166,7 @@ INSTALLED_APPS = ( # Uncomment the next line to enable admin documentation: # 'django.contrib.admindocs', 'south', + 'djcelery', 'euscanwww', 'djeuscan', ) @@ -199,6 +200,14 @@ LOGGING = { } } + +# Celery config +import djcelery +djcelery.setup_loader() +BROKER_URL = "amqp://guest:guest@localhost:5672//" +CELERY_RESULT_BACKEND = "amqp" + + try: from local_settings import * except ImportError, ex: