euscanwww: starting implementing Celery tasks

Basic tasks.py module, some fixes in the management commands

Signed-off-by: volpino <fox91@anche.no>
This commit is contained in:
volpino 2012-05-30 22:54:55 +02:00
parent c36f625a54
commit 56c4d79fb2
5 changed files with 126 additions and 44 deletions

View File

@ -18,7 +18,7 @@ class ScanMetadata(object):
self.style = color_style() self.style = color_style()
@commit_on_success @commit_on_success
def run(self, query=None, obj=None): def scan(self, query=None, obj=None):
matches = Query(query).find( matches = Query(query).find(
include_masked=True, include_masked=True,
in_installed=False, in_installed=False,
@ -158,10 +158,10 @@ class Command(BaseCommand):
if options['all']: if options['all']:
for pkg in Package.objects.all(): for pkg in Package.objects.all():
scan_metadata.run('%s/%s' % (pkg.category, pkg.name), pkg) scan_metadata.scan('%s/%s' % (pkg.category, pkg.name), pkg)
elif len(args) > 0: elif len(args) > 0:
for package in args: for package in args:
scan_metadata.run(package) scan_metadata.scan(package)
else: else:
for package in sys.stdin.readlines(): for package in sys.stdin.readlines():
scan_metadata.run(package[:-1]) scan_metadata.scan(package[:-1])

View File

@ -3,8 +3,8 @@ import portage
import sys import sys
import os import os
import re import re
from optparse import make_option from optparse import make_option
from collections import defaultdict
from django.db.transaction import commit_on_success from django.db.transaction import commit_on_success
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
@ -14,13 +14,10 @@ from djeuscan.models import Package, Version, VersionLog
class ScanPortage(object): class ScanPortage(object):
def __init__(self, stdout=None, **options): def __init__(self, stdout=None, options=None):
if stdout is None: self.stdout = sys.stdout if stdout is None else stdout
self.stdout = sys.stdout self.options = defaultdict(None) if options is None else options
else:
self.stdout = stdout
self.options = options
self.style = color_style() self.style = color_style()
self._cache = {'packages': {}, 'versions': {}} self._cache = {'packages': {}, 'versions': {}}
self._overlays = None self._overlays = None
@ -84,7 +81,7 @@ class ScanPortage(object):
return self._overlays return self._overlays
@commit_on_success @commit_on_success
def run(self, query=None): def scan(self, query=None):
env = os.environ env = os.environ
env['MY'] = "<category>/<name>-<version>:<slot> [<overlaynum>]\n" env['MY'] = "<category>/<name>-<version>:<slot> [<overlaynum>]\n"
@ -312,7 +309,7 @@ class Command(BaseCommand):
help = 'Scans portage tree and fills database' help = 'Scans portage tree and fills database'
def handle(self, *args, **options): def handle(self, *args, **options):
scan_portage = ScanPortage(stdout=self.stdout, **options) scan_portage = ScanPortage(stdout=self.stdout, options=options)
if not options['quiet']: if not options['quiet']:
self.stdout.write('Scanning portage tree...\n') self.stdout.write('Scanning portage tree...\n')
@ -329,13 +326,13 @@ class Command(BaseCommand):
self.stdout.write('done\n') self.stdout.write('done\n')
if options['all']: if options['all']:
scan_portage.run() scan_portage.scan()
elif len(args): elif len(args):
for package in args: for package in args:
scan_portage.run(package) scan_portage.scan(package)
else: else:
for package in sys.stdin.readlines(): for package in sys.stdin.readlines():
scan_portage.run(package[:-1]) scan_portage.scan(package[:-1])
if options['purge-versions']: if options['purge-versions']:
purge_versions(options) purge_versions(options)

View File

@ -4,7 +4,6 @@ import sys
import re import re
from StringIO import StringIO from StringIO import StringIO
from optparse import make_option from optparse import make_option
from collections import defaultdict
from django.utils import timezone from django.utils import timezone
from django.db.transaction import commit_on_success from django.db.transaction import commit_on_success
@ -14,21 +13,17 @@ from djeuscan.models import Package, Version, EuscanResult, VersionLog
class ScanUpstream(object): class ScanUpstream(object):
def __init__(self, options=None): def __init__(self, quiet=False):
if options is None: self.quiet = quiet
self.options = defaultdict(None)
else:
self.options = options
def run(self, packages=None): def scan(self, package):
for package in packages: cmd = ['euscan', package]
cmd = ['euscan', package]
fp = subprocess.Popen(cmd, stdout=subprocess.PIPE, fp = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
output = StringIO(fp.communicate()[0]) output = StringIO(fp.communicate()[0])
self.parse_output(output) self.parse_output(output)
def store_result(self, package, log): def store_result(self, package, log):
# Remove previous logs # Remove previous logs
@ -45,7 +40,7 @@ class ScanUpstream(object):
obj, created = Package.objects.get_or_create(category=cat, name=pkg) obj, created = Package.objects.get_or_create(category=cat, name=pkg)
if created and not self.options['quiet']: if created and not self.quiet:
sys.stdout.write('+ [p] %s/%s\n' % (cat, pkg)) sys.stdout.write('+ [p] %s/%s\n' % (cat, pkg))
# Set all versions dead, then set found versions alive and # Set all versions dead, then set found versions alive and
@ -69,7 +64,7 @@ class ScanUpstream(object):
if not created: if not created:
return return
if not self.options['quiet']: if not self.quiet:
sys.stdout.write('+ [u] %s %s\n' % (obj, url)) sys.stdout.write('+ [u] %s %s\n' % (obj, url))
VersionLog.objects.create( VersionLog.objects.create(
@ -125,7 +120,7 @@ class ScanUpstream(object):
@commit_on_success @commit_on_success
def purge_versions(options): def purge_versions(quiet=False):
# For each dead versions # For each dead versions
for version in Version.objects.filter(packaged=False, alive=False): for version in Version.objects.filter(packaged=False, alive=False):
VersionLog.objects.create( VersionLog.objects.create(
@ -140,7 +135,7 @@ def purge_versions(options):
version.package.n_versions -= 1 version.package.n_versions -= 1
version.package.save() version.package.save()
if not options['quiet']: if not quiet:
sys.stdout.write('- [u] %s %s\n' % (version, version.urls)) sys.stdout.write('- [u] %s %s\n' % (version, version.urls))
Version.objects.filter(packaged=False, alive=False).delete() Version.objects.filter(packaged=False, alive=False).delete()
@ -174,31 +169,29 @@ class Command(BaseCommand):
help = 'Scans metadata and fills database' help = 'Scans metadata and fills database'
def handle(self, *args, **options): def handle(self, *args, **options):
scan_upstream = ScanUpstream(options) scan_upstream = ScanUpstream(options["quiet"])
if options['feed']: if options['feed']:
scan_upstream.parse_output(options, sys.stdin) scan_upstream.parse_output(sys.stdin)
if options['purge-versions']: if options['purge-versions']:
purge_versions(options) purge_versions(options["quiet"])
return return
if not options['quiet']: if not options['quiet']:
self.stdout.write('Scanning upstream...\n') self.stdout.write('Scanning upstream...\n')
packages = []
if options['all']: if options['all']:
for pkg in Package.objects.all(): for pkg in Package.objects.all():
packages.append('%s/%s' % (pkg.category, pkg.name)) scan_upstream.scan('%s/%s' % (pkg.category, pkg.name))
elif args: elif args:
packages = list(args) for arg in args:
scan_upstream.scan(arg)
else: else:
packages = [package[:-1] for package in sys.stdin.readlines()] for package in sys.stdin.readlines():
scan_upstream.scan(package[:-1])
scan_upstream.run(packages)
if options['purge-versions']: if options['purge-versions']:
purge_versions(options) purge_versions(options["quiet"])
if not options['quiet']: if not options['quiet']:
self.stdout.write('Done.\n') self.stdout.write('Done.\n')

View File

@ -0,0 +1,83 @@
from celery.task import task
from celery.task.sets import TaskSet
from djeuscan.models import Package
from djeuscan.management.commands.regen_rrds import regen_rrds
from djeuscan.management.commands.update_counters import update_counters
from djeuscan.management.commands.scan_metadata import ScanMetadata
from djeuscan.management.commands.scan_portage import ScanPortage, \
purge_versions as scan_portage_purge
from djeuscan.management.commands.scan_upstream import ScanUpstream, \
purge_versions as scan_upstream_purge
@task
def regen_rrds_task():
regen_rrds()
@task
def update_counters_task():
update_counters()
@task
def scan_metadata_task(query, obj=None):
scan_metadata = ScanMetadata()
scan_metadata.scan(query)
@task
def scan_metadata_all_task():
job = TaskSet(tasks=[
scan_metadata_task.subtask(('%s/%s' % (pkg.category, pkg.name), pkg))
for pkg in Package.objects.all()
])
job.apply_async()
@task
def scan_portage_all_task(purge=False):
scan_portage = ScanPortage()
scan_portage.scan()
if purge:
scan_portage_purge()
@task
def scan_portage_task(query, purge=False):
scan_portage = ScanPortage()
scan_portage.scan(query)
if purge:
scan_portage_purge()
@task
def scan_portage_purge_task():
scan_portage_purge()
@task
def scan_upstream_all_task(purge=False):
tasks = [scan_upstream_task.subtask(('%s/%s' % (pkg.category, pkg.name)))
for pkg in Package.objects.all()]
if purge:
tasks.append(scan_upstream_purge_task.subtask())
job = TaskSet(tasks=tasks)
job.apply_async()
@task
def scan_upstream_task(query):
scan_upstream = ScanUpstream()
scan_upstream.scan(query)
@task
def scan_upstream_purge_task():
scan_upstream_purge()

View File

@ -166,6 +166,7 @@ INSTALLED_APPS = (
# Uncomment the next line to enable admin documentation: # Uncomment the next line to enable admin documentation:
# 'django.contrib.admindocs', # 'django.contrib.admindocs',
'south', 'south',
'djcelery',
'euscanwww', 'euscanwww',
'djeuscan', 'djeuscan',
) )
@ -199,6 +200,14 @@ LOGGING = {
} }
} }
# Celery config
import djcelery
djcelery.setup_loader()
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
try: try:
from local_settings import * from local_settings import *
except ImportError, ex: except ImportError, ex: